pinecone.data.features.bulk_import
1from enum import Enum 2from typing import Optional, Literal, Iterator, List, Type, cast 3 4from pinecone.config.config import ConfigBuilder 5from pinecone.core_ea.openapi.db_data import ApiClient 6from pinecone.core_ea.openapi.db_data.api.bulk_operations_api import BulkOperationsApi 7from pinecone.core_ea.openapi.shared import API_VERSION 8 9from pinecone.utils import parse_non_empty_args, install_json_repr_override, setup_openapi_client 10 11from pinecone.core_ea.openapi.db_data.models import ( 12 StartImportRequest, 13 StartImportResponse, 14 ImportListResponse, 15 ImportModel, 16 ImportErrorMode as ImportErrorModeClass, 17) 18 19for m in [StartImportResponse, ImportListResponse, ImportModel]: 20 install_json_repr_override(m) 21 22ImportErrorMode: Type[Enum] = cast( 23 Type[Enum], Enum("ImportErrorMode", ImportErrorModeClass.allowed_values[("on_error",)]) 24) 25 26 27class ImportFeatureMixin: 28 def __init__(self, **kwargs): 29 config = ConfigBuilder.build(**kwargs) 30 openapi_config = ConfigBuilder.build_openapi_config( 31 config, kwargs.get("openapi_config", None) 32 ) 33 34 if kwargs.get("__import_operations_api", None): 35 self.__import_operations_api = kwargs.get("__import_operations_api") 36 else: 37 self.__import_operations_api = setup_openapi_client( 38 api_client_klass=ApiClient, 39 api_klass=BulkOperationsApi, 40 config=config, 41 openapi_config=openapi_config, 42 pool_threads=kwargs.get("pool_threads", 1), 43 api_version=API_VERSION, 44 ) 45 46 def start_import( 47 self, 48 uri: str, 49 integration_id: Optional[str] = None, 50 error_mode: Optional[Literal["CONTINUE", "ABORT"]] = "CONTINUE", 51 ) -> StartImportResponse: 52 """Import data from a storage provider into an index. The uri must start with the scheme of a supported 53 storage provider. For buckets that are not publicly readable, you will also need to separately configure 54 a storage integration and pass the integration id. 55 56 Examples: 57 >>> from pinecone import Pinecone 58 >>> index = Pinecone().Index('my-index') 59 >>> index.start_import(uri="s3://bucket-name/path/to/data.parquet") 60 { id: "1" } 61 62 Args: 63 uri (str): The URI of the data to import. The URI must start with the scheme of a supported storage provider. 64 integration_id (Optional[str], optional): If your bucket requires authentication to access, you need to pass the id of your storage integration using this property. Defaults to None. 65 error_mode: Defaults to "CONTINUE". If set to "CONTINUE", the import operation will continue even if some 66 records fail to import. Pass "ABORT" to stop the import operation if any records fail to import. 67 68 Returns: 69 StartImportResponse: Contains the id of the import operation. 70 """ 71 if isinstance(error_mode, ImportErrorMode): 72 error_mode = error_mode.value 73 elif isinstance(error_mode, str): 74 try: 75 error_mode = ImportErrorMode(error_mode.lower()).value 76 except ValueError: 77 raise ValueError(f"Invalid error_mode value: {error_mode}") 78 79 args_dict = parse_non_empty_args( 80 [ 81 ("uri", uri), 82 ("integration_id", integration_id), 83 ("error_mode", ImportErrorModeClass(on_error=error_mode)), 84 ] 85 ) 86 87 return self.__import_operations_api.start_import(StartImportRequest(**args_dict)) 88 89 def list_imports(self, **kwargs) -> Iterator[List[ImportModel]]: 90 """ 91 Returns a generator that yields each import operation. It automatically handles pagination tokens on your behalf so you can 92 easily iterate over all results. The `list_imports` method accepts all of the same arguments as list_imports_paginated 93 94 ```python 95 for op in index.list_imports(): 96 print(op) 97 ``` 98 99 You can convert the generator into a list by wrapping the generator in a call to the built-in `list` function: 100 101 ```python 102 operations = list(index.list_imports()) 103 ``` 104 105 You should be cautious with this approach because it will fetch all operations at once, which could be a large number 106 of network calls and a lot of memory to hold the results. 107 108 Args: 109 limit (Optional[int]): The maximum number of operations to fetch in each network call. If unspecified, the server will use a default value. [optional] 110 pagination_token (Optional[str]): When there are multiple pages of results, a pagination token is returned in the response. The token can be used 111 to fetch the next page of results. [optional] 112 """ 113 done = False 114 while not done: 115 results = self.list_imports_paginated(**kwargs) 116 if len(results.data) > 0: 117 for op in results.data: 118 yield op 119 120 if results.pagination: 121 kwargs.update({"pagination_token": results.pagination.next}) 122 else: 123 done = True 124 125 def list_imports_paginated( 126 self, limit: Optional[int] = None, pagination_token: Optional[str] = None, **kwargs 127 ) -> ImportListResponse: 128 """ 129 The list_imports_paginated operation returns information about import operations. 130 It returns operations in a paginated form, with a pagination token to fetch the next page of results. 131 132 Consider using the `list_imports` method to avoid having to handle pagination tokens manually. 133 134 Examples: 135 >>> results = index.list_imports_paginated(limit=5) 136 >>> results.pagination.next 137 eyJza2lwX3Bhc3QiOiI5OTMiLCJwcmVmaXgiOiI5OSJ9 138 >>> results.data[0] 139 { 140 "id": "6", 141 "uri": "s3://dev-bulk-import-datasets-pub/10-records-dim-10/", 142 "status": "Completed", 143 "percent_complete": 100.0, 144 "records_imported": 10, 145 "created_at": "2024-09-06T14:52:02.567776+00:00", 146 "finished_at": "2024-09-06T14:52:28.130717+00:00" 147 } 148 >>> next_results = index.list_imports_paginated(limit=5, pagination_token=results.pagination.next) 149 150 Args: 151 limit (Optional[int]): The maximum number of ids to return. If unspecified, the server will use a default value. [optional] 152 pagination_token (Optional[str]): A token needed to fetch the next page of results. This token is returned 153 in the response if additional results are available. [optional] 154 155 Returns: ImportListResponse object which contains the list of operations as ImportModel objects, pagination information, 156 and usage showing the number of read_units consumed. 157 """ 158 args_dict = parse_non_empty_args([("limit", limit), ("pagination_token", pagination_token)]) 159 return self.__import_operations_api.list_imports(**args_dict) 160 161 def describe_import(self, id: str) -> ImportModel: 162 """ 163 describe_import is used to get detailed information about a specific import operation. 164 165 Args: 166 id (str): The id of the import operation. This value is returned when 167 starting an import, and can be looked up using list_imports. 168 169 Returns: 170 ImportModel: An object containing operation id, status, and other details. 171 """ 172 if isinstance(id, int): 173 id = str(id) 174 return self.__import_operations_api.describe_import(id=id) 175 176 def cancel_import(self, id: str): 177 """Cancel an import operation. 178 179 Args: 180 id (str): The id of the import operation to cancel. 181 """ 182 if isinstance(id, int): 183 id = str(id) 184 return self.__import_operations_api.cancel_import(id=id)
Create a collection of name/value pairs.
Example enumeration:
>>> class Color(Enum):
... RED = 1
... BLUE = 2
... GREEN = 3
Access them by:
- attribute access::
>>> Color.RED
<Color.RED: 1>
- value lookup:
>>> Color(1)
<Color.RED: 1>
- name lookup:
>>> Color['RED']
<Color.RED: 1>
Enumerations can be iterated over, and know how many members they have:
>>> len(Color)
3
>>> list(Color)
[<Color.RED: 1>, <Color.BLUE: 2>, <Color.GREEN: 3>]
Methods can be added to enumerations, and members can have their own attributes -- see the documentation for details.
Inherited Members
- enum.Enum
- name
- value
28class ImportFeatureMixin: 29 def __init__(self, **kwargs): 30 config = ConfigBuilder.build(**kwargs) 31 openapi_config = ConfigBuilder.build_openapi_config( 32 config, kwargs.get("openapi_config", None) 33 ) 34 35 if kwargs.get("__import_operations_api", None): 36 self.__import_operations_api = kwargs.get("__import_operations_api") 37 else: 38 self.__import_operations_api = setup_openapi_client( 39 api_client_klass=ApiClient, 40 api_klass=BulkOperationsApi, 41 config=config, 42 openapi_config=openapi_config, 43 pool_threads=kwargs.get("pool_threads", 1), 44 api_version=API_VERSION, 45 ) 46 47 def start_import( 48 self, 49 uri: str, 50 integration_id: Optional[str] = None, 51 error_mode: Optional[Literal["CONTINUE", "ABORT"]] = "CONTINUE", 52 ) -> StartImportResponse: 53 """Import data from a storage provider into an index. The uri must start with the scheme of a supported 54 storage provider. For buckets that are not publicly readable, you will also need to separately configure 55 a storage integration and pass the integration id. 56 57 Examples: 58 >>> from pinecone import Pinecone 59 >>> index = Pinecone().Index('my-index') 60 >>> index.start_import(uri="s3://bucket-name/path/to/data.parquet") 61 { id: "1" } 62 63 Args: 64 uri (str): The URI of the data to import. The URI must start with the scheme of a supported storage provider. 65 integration_id (Optional[str], optional): If your bucket requires authentication to access, you need to pass the id of your storage integration using this property. Defaults to None. 66 error_mode: Defaults to "CONTINUE". If set to "CONTINUE", the import operation will continue even if some 67 records fail to import. Pass "ABORT" to stop the import operation if any records fail to import. 68 69 Returns: 70 StartImportResponse: Contains the id of the import operation. 71 """ 72 if isinstance(error_mode, ImportErrorMode): 73 error_mode = error_mode.value 74 elif isinstance(error_mode, str): 75 try: 76 error_mode = ImportErrorMode(error_mode.lower()).value 77 except ValueError: 78 raise ValueError(f"Invalid error_mode value: {error_mode}") 79 80 args_dict = parse_non_empty_args( 81 [ 82 ("uri", uri), 83 ("integration_id", integration_id), 84 ("error_mode", ImportErrorModeClass(on_error=error_mode)), 85 ] 86 ) 87 88 return self.__import_operations_api.start_import(StartImportRequest(**args_dict)) 89 90 def list_imports(self, **kwargs) -> Iterator[List[ImportModel]]: 91 """ 92 Returns a generator that yields each import operation. It automatically handles pagination tokens on your behalf so you can 93 easily iterate over all results. The `list_imports` method accepts all of the same arguments as list_imports_paginated 94 95 ```python 96 for op in index.list_imports(): 97 print(op) 98 ``` 99 100 You can convert the generator into a list by wrapping the generator in a call to the built-in `list` function: 101 102 ```python 103 operations = list(index.list_imports()) 104 ``` 105 106 You should be cautious with this approach because it will fetch all operations at once, which could be a large number 107 of network calls and a lot of memory to hold the results. 108 109 Args: 110 limit (Optional[int]): The maximum number of operations to fetch in each network call. If unspecified, the server will use a default value. [optional] 111 pagination_token (Optional[str]): When there are multiple pages of results, a pagination token is returned in the response. The token can be used 112 to fetch the next page of results. [optional] 113 """ 114 done = False 115 while not done: 116 results = self.list_imports_paginated(**kwargs) 117 if len(results.data) > 0: 118 for op in results.data: 119 yield op 120 121 if results.pagination: 122 kwargs.update({"pagination_token": results.pagination.next}) 123 else: 124 done = True 125 126 def list_imports_paginated( 127 self, limit: Optional[int] = None, pagination_token: Optional[str] = None, **kwargs 128 ) -> ImportListResponse: 129 """ 130 The list_imports_paginated operation returns information about import operations. 131 It returns operations in a paginated form, with a pagination token to fetch the next page of results. 132 133 Consider using the `list_imports` method to avoid having to handle pagination tokens manually. 134 135 Examples: 136 >>> results = index.list_imports_paginated(limit=5) 137 >>> results.pagination.next 138 eyJza2lwX3Bhc3QiOiI5OTMiLCJwcmVmaXgiOiI5OSJ9 139 >>> results.data[0] 140 { 141 "id": "6", 142 "uri": "s3://dev-bulk-import-datasets-pub/10-records-dim-10/", 143 "status": "Completed", 144 "percent_complete": 100.0, 145 "records_imported": 10, 146 "created_at": "2024-09-06T14:52:02.567776+00:00", 147 "finished_at": "2024-09-06T14:52:28.130717+00:00" 148 } 149 >>> next_results = index.list_imports_paginated(limit=5, pagination_token=results.pagination.next) 150 151 Args: 152 limit (Optional[int]): The maximum number of ids to return. If unspecified, the server will use a default value. [optional] 153 pagination_token (Optional[str]): A token needed to fetch the next page of results. This token is returned 154 in the response if additional results are available. [optional] 155 156 Returns: ImportListResponse object which contains the list of operations as ImportModel objects, pagination information, 157 and usage showing the number of read_units consumed. 158 """ 159 args_dict = parse_non_empty_args([("limit", limit), ("pagination_token", pagination_token)]) 160 return self.__import_operations_api.list_imports(**args_dict) 161 162 def describe_import(self, id: str) -> ImportModel: 163 """ 164 describe_import is used to get detailed information about a specific import operation. 165 166 Args: 167 id (str): The id of the import operation. This value is returned when 168 starting an import, and can be looked up using list_imports. 169 170 Returns: 171 ImportModel: An object containing operation id, status, and other details. 172 """ 173 if isinstance(id, int): 174 id = str(id) 175 return self.__import_operations_api.describe_import(id=id) 176 177 def cancel_import(self, id: str): 178 """Cancel an import operation. 179 180 Args: 181 id (str): The id of the import operation to cancel. 182 """ 183 if isinstance(id, int): 184 id = str(id) 185 return self.__import_operations_api.cancel_import(id=id)
29 def __init__(self, **kwargs): 30 config = ConfigBuilder.build(**kwargs) 31 openapi_config = ConfigBuilder.build_openapi_config( 32 config, kwargs.get("openapi_config", None) 33 ) 34 35 if kwargs.get("__import_operations_api", None): 36 self.__import_operations_api = kwargs.get("__import_operations_api") 37 else: 38 self.__import_operations_api = setup_openapi_client( 39 api_client_klass=ApiClient, 40 api_klass=BulkOperationsApi, 41 config=config, 42 openapi_config=openapi_config, 43 pool_threads=kwargs.get("pool_threads", 1), 44 api_version=API_VERSION, 45 )
47 def start_import( 48 self, 49 uri: str, 50 integration_id: Optional[str] = None, 51 error_mode: Optional[Literal["CONTINUE", "ABORT"]] = "CONTINUE", 52 ) -> StartImportResponse: 53 """Import data from a storage provider into an index. The uri must start with the scheme of a supported 54 storage provider. For buckets that are not publicly readable, you will also need to separately configure 55 a storage integration and pass the integration id. 56 57 Examples: 58 >>> from pinecone import Pinecone 59 >>> index = Pinecone().Index('my-index') 60 >>> index.start_import(uri="s3://bucket-name/path/to/data.parquet") 61 { id: "1" } 62 63 Args: 64 uri (str): The URI of the data to import. The URI must start with the scheme of a supported storage provider. 65 integration_id (Optional[str], optional): If your bucket requires authentication to access, you need to pass the id of your storage integration using this property. Defaults to None. 66 error_mode: Defaults to "CONTINUE". If set to "CONTINUE", the import operation will continue even if some 67 records fail to import. Pass "ABORT" to stop the import operation if any records fail to import. 68 69 Returns: 70 StartImportResponse: Contains the id of the import operation. 71 """ 72 if isinstance(error_mode, ImportErrorMode): 73 error_mode = error_mode.value 74 elif isinstance(error_mode, str): 75 try: 76 error_mode = ImportErrorMode(error_mode.lower()).value 77 except ValueError: 78 raise ValueError(f"Invalid error_mode value: {error_mode}") 79 80 args_dict = parse_non_empty_args( 81 [ 82 ("uri", uri), 83 ("integration_id", integration_id), 84 ("error_mode", ImportErrorModeClass(on_error=error_mode)), 85 ] 86 ) 87 88 return self.__import_operations_api.start_import(StartImportRequest(**args_dict))
Import data from a storage provider into an index. The uri must start with the scheme of a supported storage provider. For buckets that are not publicly readable, you will also need to separately configure a storage integration and pass the integration id.
Examples:
>>> from pinecone import Pinecone >>> index = Pinecone().Index('my-index') >>> index.start_import(uri="s3://bucket-name/path/to/data.parquet") { id: "1" }
Arguments:
- uri (str): The URI of the data to import. The URI must start with the scheme of a supported storage provider.
- integration_id (Optional[str], optional): If your bucket requires authentication to access, you need to pass the id of your storage integration using this property. Defaults to None.
- error_mode: Defaults to "CONTINUE". If set to "CONTINUE", the import operation will continue even if some records fail to import. Pass "ABORT" to stop the import operation if any records fail to import.
Returns:
StartImportResponse: Contains the id of the import operation.
90 def list_imports(self, **kwargs) -> Iterator[List[ImportModel]]: 91 """ 92 Returns a generator that yields each import operation. It automatically handles pagination tokens on your behalf so you can 93 easily iterate over all results. The `list_imports` method accepts all of the same arguments as list_imports_paginated 94 95 ```python 96 for op in index.list_imports(): 97 print(op) 98 ``` 99 100 You can convert the generator into a list by wrapping the generator in a call to the built-in `list` function: 101 102 ```python 103 operations = list(index.list_imports()) 104 ``` 105 106 You should be cautious with this approach because it will fetch all operations at once, which could be a large number 107 of network calls and a lot of memory to hold the results. 108 109 Args: 110 limit (Optional[int]): The maximum number of operations to fetch in each network call. If unspecified, the server will use a default value. [optional] 111 pagination_token (Optional[str]): When there are multiple pages of results, a pagination token is returned in the response. The token can be used 112 to fetch the next page of results. [optional] 113 """ 114 done = False 115 while not done: 116 results = self.list_imports_paginated(**kwargs) 117 if len(results.data) > 0: 118 for op in results.data: 119 yield op 120 121 if results.pagination: 122 kwargs.update({"pagination_token": results.pagination.next}) 123 else: 124 done = True
Returns a generator that yields each import operation. It automatically handles pagination tokens on your behalf so you can
easily iterate over all results. The list_imports
method accepts all of the same arguments as list_imports_paginated
for op in index.list_imports():
print(op)
You can convert the generator into a list by wrapping the generator in a call to the built-in list
function:
operations = list(index.list_imports())
You should be cautious with this approach because it will fetch all operations at once, which could be a large number of network calls and a lot of memory to hold the results.
Arguments:
- limit (Optional[int]): The maximum number of operations to fetch in each network call. If unspecified, the server will use a default value. [optional]
- pagination_token (Optional[str]): When there are multiple pages of results, a pagination token is returned in the response. The token can be used to fetch the next page of results. [optional]
126 def list_imports_paginated( 127 self, limit: Optional[int] = None, pagination_token: Optional[str] = None, **kwargs 128 ) -> ImportListResponse: 129 """ 130 The list_imports_paginated operation returns information about import operations. 131 It returns operations in a paginated form, with a pagination token to fetch the next page of results. 132 133 Consider using the `list_imports` method to avoid having to handle pagination tokens manually. 134 135 Examples: 136 >>> results = index.list_imports_paginated(limit=5) 137 >>> results.pagination.next 138 eyJza2lwX3Bhc3QiOiI5OTMiLCJwcmVmaXgiOiI5OSJ9 139 >>> results.data[0] 140 { 141 "id": "6", 142 "uri": "s3://dev-bulk-import-datasets-pub/10-records-dim-10/", 143 "status": "Completed", 144 "percent_complete": 100.0, 145 "records_imported": 10, 146 "created_at": "2024-09-06T14:52:02.567776+00:00", 147 "finished_at": "2024-09-06T14:52:28.130717+00:00" 148 } 149 >>> next_results = index.list_imports_paginated(limit=5, pagination_token=results.pagination.next) 150 151 Args: 152 limit (Optional[int]): The maximum number of ids to return. If unspecified, the server will use a default value. [optional] 153 pagination_token (Optional[str]): A token needed to fetch the next page of results. This token is returned 154 in the response if additional results are available. [optional] 155 156 Returns: ImportListResponse object which contains the list of operations as ImportModel objects, pagination information, 157 and usage showing the number of read_units consumed. 158 """ 159 args_dict = parse_non_empty_args([("limit", limit), ("pagination_token", pagination_token)]) 160 return self.__import_operations_api.list_imports(**args_dict)
The list_imports_paginated operation returns information about import operations. It returns operations in a paginated form, with a pagination token to fetch the next page of results.
Consider using the list_imports
method to avoid having to handle pagination tokens manually.
Examples:
>>> results = index.list_imports_paginated(limit=5) >>> results.pagination.next eyJza2lwX3Bhc3QiOiI5OTMiLCJwcmVmaXgiOiI5OSJ9 >>> results.data[0] { "id": "6", "uri": "s3://dev-bulk-import-datasets-pub/10-records-dim-10/", "status": "Completed", "percent_complete": 100.0, "records_imported": 10, "created_at": "2024-09-06T14:52:02.567776+00:00", "finished_at": "2024-09-06T14:52:28.130717+00:00" } >>> next_results = index.list_imports_paginated(limit=5, pagination_token=results.pagination.next)
Arguments:
- limit (Optional[int]): The maximum number of ids to return. If unspecified, the server will use a default value. [optional]
- pagination_token (Optional[str]): A token needed to fetch the next page of results. This token is returned in the response if additional results are available. [optional]
Returns: ImportListResponse object which contains the list of operations as ImportModel objects, pagination information, and usage showing the number of read_units consumed.
162 def describe_import(self, id: str) -> ImportModel: 163 """ 164 describe_import is used to get detailed information about a specific import operation. 165 166 Args: 167 id (str): The id of the import operation. This value is returned when 168 starting an import, and can be looked up using list_imports. 169 170 Returns: 171 ImportModel: An object containing operation id, status, and other details. 172 """ 173 if isinstance(id, int): 174 id = str(id) 175 return self.__import_operations_api.describe_import(id=id)
describe_import is used to get detailed information about a specific import operation.
Arguments:
- id (str): The id of the import operation. This value is returned when
- starting an import, and can be looked up using list_imports.
Returns:
ImportModel: An object containing operation id, status, and other details.
177 def cancel_import(self, id: str): 178 """Cancel an import operation. 179 180 Args: 181 id (str): The id of the import operation to cancel. 182 """ 183 if isinstance(id, int): 184 id = str(id) 185 return self.__import_operations_api.cancel_import(id=id)
Cancel an import operation.
Arguments:
- id (str): The id of the import operation to cancel.