pinecone.data.features.bulk_import
1from enum import Enum 2from typing import Optional, Literal, Iterator, List, Type, cast 3 4from pinecone.config.config import ConfigBuilder 5from pinecone.core_ea.openapi.db_data import ApiClient 6from pinecone.core_ea.openapi.db_data.api.bulk_operations_api import BulkOperationsApi 7from pinecone.core_ea.openapi.shared import API_VERSION 8 9from pinecone.utils import parse_non_empty_args, install_json_repr_override, setup_openapi_client 10 11from pinecone.core_ea.openapi.db_data.models import ( 12 StartImportRequest, 13 StartImportResponse, 14 ImportListResponse, 15 ImportModel, 16 ImportErrorMode as ImportErrorModeClass, 17) 18 19for m in [StartImportResponse, ImportListResponse, ImportModel]: 20 install_json_repr_override(m) 21 22ImportErrorMode: Type[Enum] = cast( 23 Type[Enum], Enum("ImportErrorMode", ImportErrorModeClass.allowed_values[("on_error",)]) 24) 25 26 27class ImportFeatureMixin: 28 def __init__(self, **kwargs): 29 config = ConfigBuilder.build( 30 **kwargs, 31 ) 32 openapi_config = ConfigBuilder.build_openapi_config(config, kwargs.get("openapi_config", None)) 33 34 if kwargs.get("__import_operations_api", None): 35 self.__import_operations_api = kwargs.get("__import_operations_api") 36 else: 37 self.__import_operations_api = setup_openapi_client( 38 api_client_klass=ApiClient, 39 api_klass=BulkOperationsApi, 40 config=config, 41 openapi_config=openapi_config, 42 pool_threads=kwargs.get("pool_threads", 1), 43 api_version=API_VERSION, 44 ) 45 46 def start_import( 47 self, 48 uri: str, 49 integration_id: Optional[str] = None, 50 error_mode: Optional[Literal["CONTINUE", "ABORT"]] = "CONTINUE", 51 ) -> StartImportResponse: 52 """Import data from a storage provider into an index. The uri must start with the scheme of a supported 53 storage provider. For buckets that are not publicly readable, you will also need to separately configure 54 a storage integration and pass the integration id. 55 56 Examples: 57 >>> from pinecone import Pinecone 58 >>> index = Pinecone().Index('my-index') 59 >>> index.start_import(uri="s3://bucket-name/path/to/data.parquet") 60 { id: "1" } 61 62 Args: 63 uri (str): The URI of the data to import. The URI must start with the scheme of a supported storage provider. 64 integration_id (Optional[str], optional): If your bucket requires authentication to access, you need to pass the id of your storage integration using this property. Defaults to None. 65 error_mode: Defaults to "CONTINUE". If set to "CONTINUE", the import operation will continue even if some 66 records fail to import. Pass "ABORT" to stop the import operation if any records fail to import. 67 68 Returns: 69 StartImportResponse: Contains the id of the import operation. 70 """ 71 if isinstance(error_mode, ImportErrorMode): 72 error_mode = error_mode.value 73 elif isinstance(error_mode, str): 74 try: 75 error_mode = ImportErrorMode(error_mode.lower()).value 76 except ValueError: 77 raise ValueError(f"Invalid error_mode value: {error_mode}") 78 79 args_dict = parse_non_empty_args( 80 [ 81 ("uri", uri), 82 ("integration_id", integration_id), 83 ("error_mode", ImportErrorModeClass(on_error=error_mode)), 84 ] 85 ) 86 87 return self.__import_operations_api.start_import(StartImportRequest(**args_dict)) 88 89 def list_imports(self, **kwargs) -> Iterator[List[ImportModel]]: 90 """ 91 Returns a generator that yields each import operation. It automatically handles pagination tokens on your behalf so you can 92 easily iterate over all results. The `list_imports` method accepts all of the same arguments as list_imports_paginated 93 94 ```python 95 for op in index.list_imports(): 96 print(op) 97 ``` 98 99 You can convert the generator into a list by wrapping the generator in a call to the built-in `list` function: 100 101 ```python 102 operations = list(index.list_imports()) 103 ``` 104 105 You should be cautious with this approach because it will fetch all operations at once, which could be a large number 106 of network calls and a lot of memory to hold the results. 107 108 Args: 109 limit (Optional[int]): The maximum number of operations to fetch in each network call. If unspecified, the server will use a default value. [optional] 110 pagination_token (Optional[str]): When there are multiple pages of results, a pagination token is returned in the response. The token can be used 111 to fetch the next page of results. [optional] 112 """ 113 done = False 114 while not done: 115 results = self.list_imports_paginated(**kwargs) 116 if len(results.data) > 0: 117 for op in results.data: 118 yield op 119 120 if results.pagination: 121 kwargs.update({"pagination_token": results.pagination.next}) 122 else: 123 done = True 124 125 def list_imports_paginated( 126 self, 127 limit: Optional[int] = None, 128 pagination_token: Optional[str] = None, 129 **kwargs, 130 ) -> ImportListResponse: 131 """ 132 The list_imports_paginated operation returns information about import operations. 133 It returns operations in a paginated form, with a pagination token to fetch the next page of results. 134 135 Consider using the `list_imports` method to avoid having to handle pagination tokens manually. 136 137 Examples: 138 >>> results = index.list_imports_paginated(limit=5) 139 >>> results.pagination.next 140 eyJza2lwX3Bhc3QiOiI5OTMiLCJwcmVmaXgiOiI5OSJ9 141 >>> results.data[0] 142 { 143 "id": "6", 144 "uri": "s3://dev-bulk-import-datasets-pub/10-records-dim-10/", 145 "status": "Completed", 146 "percent_complete": 100.0, 147 "records_imported": 10, 148 "created_at": "2024-09-06T14:52:02.567776+00:00", 149 "finished_at": "2024-09-06T14:52:28.130717+00:00" 150 } 151 >>> next_results = index.list_imports_paginated(limit=5, pagination_token=results.pagination.next) 152 153 Args: 154 limit (Optional[int]): The maximum number of ids to return. If unspecified, the server will use a default value. [optional] 155 pagination_token (Optional[str]): A token needed to fetch the next page of results. This token is returned 156 in the response if additional results are available. [optional] 157 158 Returns: ImportListResponse object which contains the list of operations as ImportModel objects, pagination information, 159 and usage showing the number of read_units consumed. 160 """ 161 args_dict = parse_non_empty_args( 162 [ 163 ("limit", limit), 164 ("pagination_token", pagination_token), 165 ] 166 ) 167 return self.__import_operations_api.list_imports(**args_dict) 168 169 def describe_import(self, id: str) -> ImportModel: 170 """ 171 describe_import is used to get detailed information about a specific import operation. 172 173 Args: 174 id (str): The id of the import operation. This value is returned when 175 starting an import, and can be looked up using list_imports. 176 177 Returns: 178 ImportModel: An object containing operation id, status, and other details. 179 """ 180 if isinstance(id, int): 181 id = str(id) 182 return self.__import_operations_api.describe_import(id=id) 183 184 def cancel_import(self, id: str): 185 """Cancel an import operation. 186 187 Args: 188 id (str): The id of the import operation to cancel. 189 """ 190 if isinstance(id, int): 191 id = str(id) 192 return self.__import_operations_api.cancel_import(id=id)
Create a collection of name/value pairs.
Example enumeration:
>>> class Color(Enum):
... RED = 1
... BLUE = 2
... GREEN = 3
Access them by:
- attribute access::
>>> Color.RED
<Color.RED: 1>
- value lookup:
>>> Color(1)
<Color.RED: 1>
- name lookup:
>>> Color['RED']
<Color.RED: 1>
Enumerations can be iterated over, and know how many members they have:
>>> len(Color)
3
>>> list(Color)
[<Color.RED: 1>, <Color.BLUE: 2>, <Color.GREEN: 3>]
Methods can be added to enumerations, and members can have their own attributes -- see the documentation for details.
Inherited Members
- enum.Enum
- name
- value
28class ImportFeatureMixin: 29 def __init__(self, **kwargs): 30 config = ConfigBuilder.build( 31 **kwargs, 32 ) 33 openapi_config = ConfigBuilder.build_openapi_config(config, kwargs.get("openapi_config", None)) 34 35 if kwargs.get("__import_operations_api", None): 36 self.__import_operations_api = kwargs.get("__import_operations_api") 37 else: 38 self.__import_operations_api = setup_openapi_client( 39 api_client_klass=ApiClient, 40 api_klass=BulkOperationsApi, 41 config=config, 42 openapi_config=openapi_config, 43 pool_threads=kwargs.get("pool_threads", 1), 44 api_version=API_VERSION, 45 ) 46 47 def start_import( 48 self, 49 uri: str, 50 integration_id: Optional[str] = None, 51 error_mode: Optional[Literal["CONTINUE", "ABORT"]] = "CONTINUE", 52 ) -> StartImportResponse: 53 """Import data from a storage provider into an index. The uri must start with the scheme of a supported 54 storage provider. For buckets that are not publicly readable, you will also need to separately configure 55 a storage integration and pass the integration id. 56 57 Examples: 58 >>> from pinecone import Pinecone 59 >>> index = Pinecone().Index('my-index') 60 >>> index.start_import(uri="s3://bucket-name/path/to/data.parquet") 61 { id: "1" } 62 63 Args: 64 uri (str): The URI of the data to import. The URI must start with the scheme of a supported storage provider. 65 integration_id (Optional[str], optional): If your bucket requires authentication to access, you need to pass the id of your storage integration using this property. Defaults to None. 66 error_mode: Defaults to "CONTINUE". If set to "CONTINUE", the import operation will continue even if some 67 records fail to import. Pass "ABORT" to stop the import operation if any records fail to import. 68 69 Returns: 70 StartImportResponse: Contains the id of the import operation. 71 """ 72 if isinstance(error_mode, ImportErrorMode): 73 error_mode = error_mode.value 74 elif isinstance(error_mode, str): 75 try: 76 error_mode = ImportErrorMode(error_mode.lower()).value 77 except ValueError: 78 raise ValueError(f"Invalid error_mode value: {error_mode}") 79 80 args_dict = parse_non_empty_args( 81 [ 82 ("uri", uri), 83 ("integration_id", integration_id), 84 ("error_mode", ImportErrorModeClass(on_error=error_mode)), 85 ] 86 ) 87 88 return self.__import_operations_api.start_import(StartImportRequest(**args_dict)) 89 90 def list_imports(self, **kwargs) -> Iterator[List[ImportModel]]: 91 """ 92 Returns a generator that yields each import operation. It automatically handles pagination tokens on your behalf so you can 93 easily iterate over all results. The `list_imports` method accepts all of the same arguments as list_imports_paginated 94 95 ```python 96 for op in index.list_imports(): 97 print(op) 98 ``` 99 100 You can convert the generator into a list by wrapping the generator in a call to the built-in `list` function: 101 102 ```python 103 operations = list(index.list_imports()) 104 ``` 105 106 You should be cautious with this approach because it will fetch all operations at once, which could be a large number 107 of network calls and a lot of memory to hold the results. 108 109 Args: 110 limit (Optional[int]): The maximum number of operations to fetch in each network call. If unspecified, the server will use a default value. [optional] 111 pagination_token (Optional[str]): When there are multiple pages of results, a pagination token is returned in the response. The token can be used 112 to fetch the next page of results. [optional] 113 """ 114 done = False 115 while not done: 116 results = self.list_imports_paginated(**kwargs) 117 if len(results.data) > 0: 118 for op in results.data: 119 yield op 120 121 if results.pagination: 122 kwargs.update({"pagination_token": results.pagination.next}) 123 else: 124 done = True 125 126 def list_imports_paginated( 127 self, 128 limit: Optional[int] = None, 129 pagination_token: Optional[str] = None, 130 **kwargs, 131 ) -> ImportListResponse: 132 """ 133 The list_imports_paginated operation returns information about import operations. 134 It returns operations in a paginated form, with a pagination token to fetch the next page of results. 135 136 Consider using the `list_imports` method to avoid having to handle pagination tokens manually. 137 138 Examples: 139 >>> results = index.list_imports_paginated(limit=5) 140 >>> results.pagination.next 141 eyJza2lwX3Bhc3QiOiI5OTMiLCJwcmVmaXgiOiI5OSJ9 142 >>> results.data[0] 143 { 144 "id": "6", 145 "uri": "s3://dev-bulk-import-datasets-pub/10-records-dim-10/", 146 "status": "Completed", 147 "percent_complete": 100.0, 148 "records_imported": 10, 149 "created_at": "2024-09-06T14:52:02.567776+00:00", 150 "finished_at": "2024-09-06T14:52:28.130717+00:00" 151 } 152 >>> next_results = index.list_imports_paginated(limit=5, pagination_token=results.pagination.next) 153 154 Args: 155 limit (Optional[int]): The maximum number of ids to return. If unspecified, the server will use a default value. [optional] 156 pagination_token (Optional[str]): A token needed to fetch the next page of results. This token is returned 157 in the response if additional results are available. [optional] 158 159 Returns: ImportListResponse object which contains the list of operations as ImportModel objects, pagination information, 160 and usage showing the number of read_units consumed. 161 """ 162 args_dict = parse_non_empty_args( 163 [ 164 ("limit", limit), 165 ("pagination_token", pagination_token), 166 ] 167 ) 168 return self.__import_operations_api.list_imports(**args_dict) 169 170 def describe_import(self, id: str) -> ImportModel: 171 """ 172 describe_import is used to get detailed information about a specific import operation. 173 174 Args: 175 id (str): The id of the import operation. This value is returned when 176 starting an import, and can be looked up using list_imports. 177 178 Returns: 179 ImportModel: An object containing operation id, status, and other details. 180 """ 181 if isinstance(id, int): 182 id = str(id) 183 return self.__import_operations_api.describe_import(id=id) 184 185 def cancel_import(self, id: str): 186 """Cancel an import operation. 187 188 Args: 189 id (str): The id of the import operation to cancel. 190 """ 191 if isinstance(id, int): 192 id = str(id) 193 return self.__import_operations_api.cancel_import(id=id)
29 def __init__(self, **kwargs): 30 config = ConfigBuilder.build( 31 **kwargs, 32 ) 33 openapi_config = ConfigBuilder.build_openapi_config(config, kwargs.get("openapi_config", None)) 34 35 if kwargs.get("__import_operations_api", None): 36 self.__import_operations_api = kwargs.get("__import_operations_api") 37 else: 38 self.__import_operations_api = setup_openapi_client( 39 api_client_klass=ApiClient, 40 api_klass=BulkOperationsApi, 41 config=config, 42 openapi_config=openapi_config, 43 pool_threads=kwargs.get("pool_threads", 1), 44 api_version=API_VERSION, 45 )
47 def start_import( 48 self, 49 uri: str, 50 integration_id: Optional[str] = None, 51 error_mode: Optional[Literal["CONTINUE", "ABORT"]] = "CONTINUE", 52 ) -> StartImportResponse: 53 """Import data from a storage provider into an index. The uri must start with the scheme of a supported 54 storage provider. For buckets that are not publicly readable, you will also need to separately configure 55 a storage integration and pass the integration id. 56 57 Examples: 58 >>> from pinecone import Pinecone 59 >>> index = Pinecone().Index('my-index') 60 >>> index.start_import(uri="s3://bucket-name/path/to/data.parquet") 61 { id: "1" } 62 63 Args: 64 uri (str): The URI of the data to import. The URI must start with the scheme of a supported storage provider. 65 integration_id (Optional[str], optional): If your bucket requires authentication to access, you need to pass the id of your storage integration using this property. Defaults to None. 66 error_mode: Defaults to "CONTINUE". If set to "CONTINUE", the import operation will continue even if some 67 records fail to import. Pass "ABORT" to stop the import operation if any records fail to import. 68 69 Returns: 70 StartImportResponse: Contains the id of the import operation. 71 """ 72 if isinstance(error_mode, ImportErrorMode): 73 error_mode = error_mode.value 74 elif isinstance(error_mode, str): 75 try: 76 error_mode = ImportErrorMode(error_mode.lower()).value 77 except ValueError: 78 raise ValueError(f"Invalid error_mode value: {error_mode}") 79 80 args_dict = parse_non_empty_args( 81 [ 82 ("uri", uri), 83 ("integration_id", integration_id), 84 ("error_mode", ImportErrorModeClass(on_error=error_mode)), 85 ] 86 ) 87 88 return self.__import_operations_api.start_import(StartImportRequest(**args_dict))
Import data from a storage provider into an index. The uri must start with the scheme of a supported storage provider. For buckets that are not publicly readable, you will also need to separately configure a storage integration and pass the integration id.
Examples:
>>> from pinecone import Pinecone >>> index = Pinecone().Index('my-index') >>> index.start_import(uri="s3://bucket-name/path/to/data.parquet") { id: "1" }
Arguments:
- uri (str): The URI of the data to import. The URI must start with the scheme of a supported storage provider.
- integration_id (Optional[str], optional): If your bucket requires authentication to access, you need to pass the id of your storage integration using this property. Defaults to None.
- error_mode: Defaults to "CONTINUE". If set to "CONTINUE", the import operation will continue even if some records fail to import. Pass "ABORT" to stop the import operation if any records fail to import.
Returns:
StartImportResponse: Contains the id of the import operation.
90 def list_imports(self, **kwargs) -> Iterator[List[ImportModel]]: 91 """ 92 Returns a generator that yields each import operation. It automatically handles pagination tokens on your behalf so you can 93 easily iterate over all results. The `list_imports` method accepts all of the same arguments as list_imports_paginated 94 95 ```python 96 for op in index.list_imports(): 97 print(op) 98 ``` 99 100 You can convert the generator into a list by wrapping the generator in a call to the built-in `list` function: 101 102 ```python 103 operations = list(index.list_imports()) 104 ``` 105 106 You should be cautious with this approach because it will fetch all operations at once, which could be a large number 107 of network calls and a lot of memory to hold the results. 108 109 Args: 110 limit (Optional[int]): The maximum number of operations to fetch in each network call. If unspecified, the server will use a default value. [optional] 111 pagination_token (Optional[str]): When there are multiple pages of results, a pagination token is returned in the response. The token can be used 112 to fetch the next page of results. [optional] 113 """ 114 done = False 115 while not done: 116 results = self.list_imports_paginated(**kwargs) 117 if len(results.data) > 0: 118 for op in results.data: 119 yield op 120 121 if results.pagination: 122 kwargs.update({"pagination_token": results.pagination.next}) 123 else: 124 done = True
Returns a generator that yields each import operation. It automatically handles pagination tokens on your behalf so you can
easily iterate over all results. The list_imports
method accepts all of the same arguments as list_imports_paginated
for op in index.list_imports():
print(op)
You can convert the generator into a list by wrapping the generator in a call to the built-in list
function:
operations = list(index.list_imports())
You should be cautious with this approach because it will fetch all operations at once, which could be a large number of network calls and a lot of memory to hold the results.
Arguments:
- limit (Optional[int]): The maximum number of operations to fetch in each network call. If unspecified, the server will use a default value. [optional]
- pagination_token (Optional[str]): When there are multiple pages of results, a pagination token is returned in the response. The token can be used to fetch the next page of results. [optional]
126 def list_imports_paginated( 127 self, 128 limit: Optional[int] = None, 129 pagination_token: Optional[str] = None, 130 **kwargs, 131 ) -> ImportListResponse: 132 """ 133 The list_imports_paginated operation returns information about import operations. 134 It returns operations in a paginated form, with a pagination token to fetch the next page of results. 135 136 Consider using the `list_imports` method to avoid having to handle pagination tokens manually. 137 138 Examples: 139 >>> results = index.list_imports_paginated(limit=5) 140 >>> results.pagination.next 141 eyJza2lwX3Bhc3QiOiI5OTMiLCJwcmVmaXgiOiI5OSJ9 142 >>> results.data[0] 143 { 144 "id": "6", 145 "uri": "s3://dev-bulk-import-datasets-pub/10-records-dim-10/", 146 "status": "Completed", 147 "percent_complete": 100.0, 148 "records_imported": 10, 149 "created_at": "2024-09-06T14:52:02.567776+00:00", 150 "finished_at": "2024-09-06T14:52:28.130717+00:00" 151 } 152 >>> next_results = index.list_imports_paginated(limit=5, pagination_token=results.pagination.next) 153 154 Args: 155 limit (Optional[int]): The maximum number of ids to return. If unspecified, the server will use a default value. [optional] 156 pagination_token (Optional[str]): A token needed to fetch the next page of results. This token is returned 157 in the response if additional results are available. [optional] 158 159 Returns: ImportListResponse object which contains the list of operations as ImportModel objects, pagination information, 160 and usage showing the number of read_units consumed. 161 """ 162 args_dict = parse_non_empty_args( 163 [ 164 ("limit", limit), 165 ("pagination_token", pagination_token), 166 ] 167 ) 168 return self.__import_operations_api.list_imports(**args_dict)
The list_imports_paginated operation returns information about import operations. It returns operations in a paginated form, with a pagination token to fetch the next page of results.
Consider using the list_imports
method to avoid having to handle pagination tokens manually.
Examples:
>>> results = index.list_imports_paginated(limit=5) >>> results.pagination.next eyJza2lwX3Bhc3QiOiI5OTMiLCJwcmVmaXgiOiI5OSJ9 >>> results.data[0] { "id": "6", "uri": "s3://dev-bulk-import-datasets-pub/10-records-dim-10/", "status": "Completed", "percent_complete": 100.0, "records_imported": 10, "created_at": "2024-09-06T14:52:02.567776+00:00", "finished_at": "2024-09-06T14:52:28.130717+00:00" } >>> next_results = index.list_imports_paginated(limit=5, pagination_token=results.pagination.next)
Arguments:
- limit (Optional[int]): The maximum number of ids to return. If unspecified, the server will use a default value. [optional]
- pagination_token (Optional[str]): A token needed to fetch the next page of results. This token is returned in the response if additional results are available. [optional]
Returns: ImportListResponse object which contains the list of operations as ImportModel objects, pagination information, and usage showing the number of read_units consumed.
170 def describe_import(self, id: str) -> ImportModel: 171 """ 172 describe_import is used to get detailed information about a specific import operation. 173 174 Args: 175 id (str): The id of the import operation. This value is returned when 176 starting an import, and can be looked up using list_imports. 177 178 Returns: 179 ImportModel: An object containing operation id, status, and other details. 180 """ 181 if isinstance(id, int): 182 id = str(id) 183 return self.__import_operations_api.describe_import(id=id)
describe_import is used to get detailed information about a specific import operation.
Arguments:
- id (str): The id of the import operation. This value is returned when
- starting an import, and can be looked up using list_imports.
Returns:
ImportModel: An object containing operation id, status, and other details.
185 def cancel_import(self, id: str): 186 """Cancel an import operation. 187 188 Args: 189 id (str): The id of the import operation to cancel. 190 """ 191 if isinstance(id, int): 192 id = str(id) 193 return self.__import_operations_api.cancel_import(id=id)
Cancel an import operation.
Arguments:
- id (str): The id of the import operation to cancel.