pinecone .data .features .bulk_import .bulk_import
1from typing import Optional, Literal, Iterator, List, Union 2 3from pinecone.core.openapi.db_data.api.bulk_operations_api import BulkOperationsApi 4 5from pinecone.utils import install_json_repr_override 6 7from pinecone.core.openapi.db_data.models import ( 8 StartImportResponse, 9 ListImportsResponse, 10 ImportModel, 11) 12 13from .bulk_import_request_factory import BulkImportRequestFactory, ImportErrorMode 14 15for m in [StartImportResponse, ListImportsResponse, ImportModel]: 16 install_json_repr_override(m) 17 18 19class ImportFeatureMixin: 20 def __init__(self, api_client, **kwargs) -> None: 21 self.__import_operations_api = BulkOperationsApi(api_client) 22 23 def start_import( 24 self, 25 uri: str, 26 integration_id: Optional[str] = None, 27 error_mode: Optional[ 28 Union[ImportErrorMode, Literal["CONTINUE", "ABORT"], str] 29 ] = "CONTINUE", 30 ) -> StartImportResponse: 31 """ 32 Args: 33 uri (str): The URI of the data to import. The URI must start with the scheme of a supported storage provider. 34 integration_id (Optional[str], optional): If your bucket requires authentication to access, you need to pass the id of your storage integration using this property. Defaults to None. 35 error_mode: Defaults to "CONTINUE". If set to "CONTINUE", the import operation will continue even if some 36 records fail to import. Pass "ABORT" to stop the import operation if any records fail to import. 37 38 Returns: 39 `StartImportResponse`: Contains the id of the import operation. 40 41 Import data from a storage provider into an index. The uri must start with the scheme of a supported 42 storage provider. For buckets that are not publicly readable, you will also need to separately configure 43 a storage integration and pass the integration id. 44 45 Examples: 46 >>> from pinecone import Pinecone 47 >>> index = Pinecone().Index('my-index') 48 >>> index.start_import(uri="s3://bucket-name/path/to/data.parquet") 49 { id: "1" } 50 """ 51 req = BulkImportRequestFactory.start_import_request( 52 uri=uri, integration_id=integration_id, error_mode=error_mode 53 ) 54 return self.__import_operations_api.start_bulk_import(req) 55 56 def list_imports(self, **kwargs) -> Iterator[List[ImportModel]]: 57 """ 58 Args: 59 limit (Optional[int]): The maximum number of operations to fetch in each network call. If unspecified, the server will use a default value. [optional] 60 pagination_token (Optional[str]): When there are multiple pages of results, a pagination token is returned in the response. The token can be used 61 to fetch the next page of results. [optional] 62 63 Returns: 64 Returns a generator that yields each import operation. It automatically handles pagination tokens on your behalf so you can 65 easily iterate over all results. The `list_imports` method accepts all of the same arguments as list_imports_paginated 66 67 ```python 68 for op in index.list_imports(): 69 print(op) 70 ``` 71 72 You can convert the generator into a list by wrapping the generator in a call to the built-in `list` function: 73 74 ```python 75 operations = list(index.list_imports()) 76 ``` 77 78 You should be cautious with this approach because it will fetch all operations at once, which could be a large number 79 of network calls and a lot of memory to hold the results. 80 """ 81 done = False 82 while not done: 83 results = self.list_imports_paginated(**kwargs) 84 if len(results.data) > 0: 85 for op in results.data: 86 yield op 87 88 if results.pagination: 89 kwargs.update({"pagination_token": results.pagination.next}) 90 else: 91 done = True 92 93 def list_imports_paginated( 94 self, limit: Optional[int] = None, pagination_token: Optional[str] = None, **kwargs 95 ) -> ListImportsResponse: 96 """ 97 Args: 98 limit (Optional[int]): The maximum number of ids to return. If unspecified, the server will use a default value. [optional] 99 pagination_token (Optional[str]): A token needed to fetch the next page of results. This token is returned 100 in the response if additional results are available. [optional] 101 102 Returns: ListImportsResponse object which contains the list of operations as ImportModel objects, pagination information, 103 and usage showing the number of read_units consumed. 104 105 The list_imports_paginated operation returns information about import operations. 106 It returns operations in a paginated form, with a pagination token to fetch the next page of results. 107 108 Consider using the `list_imports` method to avoid having to handle pagination tokens manually. 109 110 Examples: 111 >>> results = index.list_imports_paginated(limit=5) 112 >>> results.pagination.next 113 eyJza2lwX3Bhc3QiOiI5OTMiLCJwcmVmaXgiOiI5OSJ9 114 >>> results.data[0] 115 { 116 "id": "6", 117 "uri": "s3://dev-bulk-import-datasets-pub/10-records-dim-10/", 118 "status": "Completed", 119 "percent_complete": 100.0, 120 "records_imported": 10, 121 "created_at": "2024-09-06T14:52:02.567776+00:00", 122 "finished_at": "2024-09-06T14:52:28.130717+00:00" 123 } 124 >>> next_results = index.list_imports_paginated(limit=5, pagination_token=results.pagination.next) 125 """ 126 args_dict = BulkImportRequestFactory.list_imports_paginated_args( 127 limit=limit, pagination_token=pagination_token, **kwargs 128 ) 129 return self.__import_operations_api.list_bulk_imports(**args_dict) 130 131 def describe_import(self, id: str) -> ImportModel: 132 """ 133 Args: 134 id (str): The id of the import operation. This value is returned when 135 starting an import, and can be looked up using list_imports. 136 137 Returns: 138 `ImportModel`: An object containing operation id, status, and other details. 139 140 describe_import is used to get detailed information about a specific import operation. 141 """ 142 args = BulkImportRequestFactory.describe_import_args(id=id) 143 return self.__import_operations_api.describe_bulk_import(**args) 144 145 def cancel_import(self, id: str): 146 """Cancel an import operation. 147 148 Args: 149 id (str): The id of the import operation to cancel. 150 """ 151 args = BulkImportRequestFactory.cancel_import_args(id=id) 152 return self.__import_operations_api.cancel_bulk_import(**args)
20class ImportFeatureMixin: 21 def __init__(self, api_client, **kwargs) -> None: 22 self.__import_operations_api = BulkOperationsApi(api_client) 23 24 def start_import( 25 self, 26 uri: str, 27 integration_id: Optional[str] = None, 28 error_mode: Optional[ 29 Union[ImportErrorMode, Literal["CONTINUE", "ABORT"], str] 30 ] = "CONTINUE", 31 ) -> StartImportResponse: 32 """ 33 Args: 34 uri (str): The URI of the data to import. The URI must start with the scheme of a supported storage provider. 35 integration_id (Optional[str], optional): If your bucket requires authentication to access, you need to pass the id of your storage integration using this property. Defaults to None. 36 error_mode: Defaults to "CONTINUE". If set to "CONTINUE", the import operation will continue even if some 37 records fail to import. Pass "ABORT" to stop the import operation if any records fail to import. 38 39 Returns: 40 `StartImportResponse`: Contains the id of the import operation. 41 42 Import data from a storage provider into an index. The uri must start with the scheme of a supported 43 storage provider. For buckets that are not publicly readable, you will also need to separately configure 44 a storage integration and pass the integration id. 45 46 Examples: 47 >>> from pinecone import Pinecone 48 >>> index = Pinecone().Index('my-index') 49 >>> index.start_import(uri="s3://bucket-name/path/to/data.parquet") 50 { id: "1" } 51 """ 52 req = BulkImportRequestFactory.start_import_request( 53 uri=uri, integration_id=integration_id, error_mode=error_mode 54 ) 55 return self.__import_operations_api.start_bulk_import(req) 56 57 def list_imports(self, **kwargs) -> Iterator[List[ImportModel]]: 58 """ 59 Args: 60 limit (Optional[int]): The maximum number of operations to fetch in each network call. If unspecified, the server will use a default value. [optional] 61 pagination_token (Optional[str]): When there are multiple pages of results, a pagination token is returned in the response. The token can be used 62 to fetch the next page of results. [optional] 63 64 Returns: 65 Returns a generator that yields each import operation. It automatically handles pagination tokens on your behalf so you can 66 easily iterate over all results. The `list_imports` method accepts all of the same arguments as list_imports_paginated 67 68 ```python 69 for op in index.list_imports(): 70 print(op) 71 ``` 72 73 You can convert the generator into a list by wrapping the generator in a call to the built-in `list` function: 74 75 ```python 76 operations = list(index.list_imports()) 77 ``` 78 79 You should be cautious with this approach because it will fetch all operations at once, which could be a large number 80 of network calls and a lot of memory to hold the results. 81 """ 82 done = False 83 while not done: 84 results = self.list_imports_paginated(**kwargs) 85 if len(results.data) > 0: 86 for op in results.data: 87 yield op 88 89 if results.pagination: 90 kwargs.update({"pagination_token": results.pagination.next}) 91 else: 92 done = True 93 94 def list_imports_paginated( 95 self, limit: Optional[int] = None, pagination_token: Optional[str] = None, **kwargs 96 ) -> ListImportsResponse: 97 """ 98 Args: 99 limit (Optional[int]): The maximum number of ids to return. If unspecified, the server will use a default value. [optional] 100 pagination_token (Optional[str]): A token needed to fetch the next page of results. This token is returned 101 in the response if additional results are available. [optional] 102 103 Returns: ListImportsResponse object which contains the list of operations as ImportModel objects, pagination information, 104 and usage showing the number of read_units consumed. 105 106 The list_imports_paginated operation returns information about import operations. 107 It returns operations in a paginated form, with a pagination token to fetch the next page of results. 108 109 Consider using the `list_imports` method to avoid having to handle pagination tokens manually. 110 111 Examples: 112 >>> results = index.list_imports_paginated(limit=5) 113 >>> results.pagination.next 114 eyJza2lwX3Bhc3QiOiI5OTMiLCJwcmVmaXgiOiI5OSJ9 115 >>> results.data[0] 116 { 117 "id": "6", 118 "uri": "s3://dev-bulk-import-datasets-pub/10-records-dim-10/", 119 "status": "Completed", 120 "percent_complete": 100.0, 121 "records_imported": 10, 122 "created_at": "2024-09-06T14:52:02.567776+00:00", 123 "finished_at": "2024-09-06T14:52:28.130717+00:00" 124 } 125 >>> next_results = index.list_imports_paginated(limit=5, pagination_token=results.pagination.next) 126 """ 127 args_dict = BulkImportRequestFactory.list_imports_paginated_args( 128 limit=limit, pagination_token=pagination_token, **kwargs 129 ) 130 return self.__import_operations_api.list_bulk_imports(**args_dict) 131 132 def describe_import(self, id: str) -> ImportModel: 133 """ 134 Args: 135 id (str): The id of the import operation. This value is returned when 136 starting an import, and can be looked up using list_imports. 137 138 Returns: 139 `ImportModel`: An object containing operation id, status, and other details. 140 141 describe_import is used to get detailed information about a specific import operation. 142 """ 143 args = BulkImportRequestFactory.describe_import_args(id=id) 144 return self.__import_operations_api.describe_bulk_import(**args) 145 146 def cancel_import(self, id: str): 147 """Cancel an import operation. 148 149 Args: 150 id (str): The id of the import operation to cancel. 151 """ 152 args = BulkImportRequestFactory.cancel_import_args(id=id) 153 return self.__import_operations_api.cancel_bulk_import(**args)
24 def start_import( 25 self, 26 uri: str, 27 integration_id: Optional[str] = None, 28 error_mode: Optional[ 29 Union[ImportErrorMode, Literal["CONTINUE", "ABORT"], str] 30 ] = "CONTINUE", 31 ) -> StartImportResponse: 32 """ 33 Args: 34 uri (str): The URI of the data to import. The URI must start with the scheme of a supported storage provider. 35 integration_id (Optional[str], optional): If your bucket requires authentication to access, you need to pass the id of your storage integration using this property. Defaults to None. 36 error_mode: Defaults to "CONTINUE". If set to "CONTINUE", the import operation will continue even if some 37 records fail to import. Pass "ABORT" to stop the import operation if any records fail to import. 38 39 Returns: 40 `StartImportResponse`: Contains the id of the import operation. 41 42 Import data from a storage provider into an index. The uri must start with the scheme of a supported 43 storage provider. For buckets that are not publicly readable, you will also need to separately configure 44 a storage integration and pass the integration id. 45 46 Examples: 47 >>> from pinecone import Pinecone 48 >>> index = Pinecone().Index('my-index') 49 >>> index.start_import(uri="s3://bucket-name/path/to/data.parquet") 50 { id: "1" } 51 """ 52 req = BulkImportRequestFactory.start_import_request( 53 uri=uri, integration_id=integration_id, error_mode=error_mode 54 ) 55 return self.__import_operations_api.start_bulk_import(req)
Arguments:
- uri (str): The URI of the data to import. The URI must start with the scheme of a supported storage provider.
- integration_id (Optional[str], optional): If your bucket requires authentication to access, you need to pass the id of your storage integration using this property. Defaults to None.
- error_mode: Defaults to "CONTINUE". If set to "CONTINUE", the import operation will continue even if some records fail to import. Pass "ABORT" to stop the import operation if any records fail to import.
Returns:
StartImportResponse
: Contains the id of the import operation.
Import data from a storage provider into an index. The uri must start with the scheme of a supported storage provider. For buckets that are not publicly readable, you will also need to separately configure a storage integration and pass the integration id.
Examples:
>>> from pinecone import Pinecone >>> index = Pinecone().Index('my-index') >>> index.start_import(uri="s3://bucket-name/path/to/data.parquet") { id: "1" }
57 def list_imports(self, **kwargs) -> Iterator[List[ImportModel]]: 58 """ 59 Args: 60 limit (Optional[int]): The maximum number of operations to fetch in each network call. If unspecified, the server will use a default value. [optional] 61 pagination_token (Optional[str]): When there are multiple pages of results, a pagination token is returned in the response. The token can be used 62 to fetch the next page of results. [optional] 63 64 Returns: 65 Returns a generator that yields each import operation. It automatically handles pagination tokens on your behalf so you can 66 easily iterate over all results. The `list_imports` method accepts all of the same arguments as list_imports_paginated 67 68 ```python 69 for op in index.list_imports(): 70 print(op) 71 ``` 72 73 You can convert the generator into a list by wrapping the generator in a call to the built-in `list` function: 74 75 ```python 76 operations = list(index.list_imports()) 77 ``` 78 79 You should be cautious with this approach because it will fetch all operations at once, which could be a large number 80 of network calls and a lot of memory to hold the results. 81 """ 82 done = False 83 while not done: 84 results = self.list_imports_paginated(**kwargs) 85 if len(results.data) > 0: 86 for op in results.data: 87 yield op 88 89 if results.pagination: 90 kwargs.update({"pagination_token": results.pagination.next}) 91 else: 92 done = True
Arguments:
- limit (Optional[int]): The maximum number of operations to fetch in each network call. If unspecified, the server will use a default value. [optional]
- pagination_token (Optional[str]): When there are multiple pages of results, a pagination token is returned in the response. The token can be used to fetch the next page of results. [optional]
Returns:
Returns a generator that yields each import operation. It automatically handles pagination tokens on your behalf so you can easily iterate over all results. The
list_imports
method accepts all of the same arguments as list_imports_paginated
for op in index.list_imports():
print(op)
You can convert the generator into a list by wrapping the generator in a call to the built-in list
function:
operations = list(index.list_imports())
You should be cautious with this approach because it will fetch all operations at once, which could be a large number of network calls and a lot of memory to hold the results.
94 def list_imports_paginated( 95 self, limit: Optional[int] = None, pagination_token: Optional[str] = None, **kwargs 96 ) -> ListImportsResponse: 97 """ 98 Args: 99 limit (Optional[int]): The maximum number of ids to return. If unspecified, the server will use a default value. [optional] 100 pagination_token (Optional[str]): A token needed to fetch the next page of results. This token is returned 101 in the response if additional results are available. [optional] 102 103 Returns: ListImportsResponse object which contains the list of operations as ImportModel objects, pagination information, 104 and usage showing the number of read_units consumed. 105 106 The list_imports_paginated operation returns information about import operations. 107 It returns operations in a paginated form, with a pagination token to fetch the next page of results. 108 109 Consider using the `list_imports` method to avoid having to handle pagination tokens manually. 110 111 Examples: 112 >>> results = index.list_imports_paginated(limit=5) 113 >>> results.pagination.next 114 eyJza2lwX3Bhc3QiOiI5OTMiLCJwcmVmaXgiOiI5OSJ9 115 >>> results.data[0] 116 { 117 "id": "6", 118 "uri": "s3://dev-bulk-import-datasets-pub/10-records-dim-10/", 119 "status": "Completed", 120 "percent_complete": 100.0, 121 "records_imported": 10, 122 "created_at": "2024-09-06T14:52:02.567776+00:00", 123 "finished_at": "2024-09-06T14:52:28.130717+00:00" 124 } 125 >>> next_results = index.list_imports_paginated(limit=5, pagination_token=results.pagination.next) 126 """ 127 args_dict = BulkImportRequestFactory.list_imports_paginated_args( 128 limit=limit, pagination_token=pagination_token, **kwargs 129 ) 130 return self.__import_operations_api.list_bulk_imports(**args_dict)
Arguments:
- limit (Optional[int]): The maximum number of ids to return. If unspecified, the server will use a default value. [optional]
- pagination_token (Optional[str]): A token needed to fetch the next page of results. This token is returned in the response if additional results are available. [optional]
Returns: ListImportsResponse object which contains the list of operations as ImportModel objects, pagination information, and usage showing the number of read_units consumed.
The list_imports_paginated operation returns information about import operations. It returns operations in a paginated form, with a pagination token to fetch the next page of results.
Consider using the list_imports
method to avoid having to handle pagination tokens manually.
Examples:
>>> results = index.list_imports_paginated(limit=5) >>> results.pagination.next eyJza2lwX3Bhc3QiOiI5OTMiLCJwcmVmaXgiOiI5OSJ9 >>> results.data[0] { "id": "6", "uri": "s3://dev-bulk-import-datasets-pub/10-records-dim-10/", "status": "Completed", "percent_complete": 100.0, "records_imported": 10, "created_at": "2024-09-06T14:52:02.567776+00:00", "finished_at": "2024-09-06T14:52:28.130717+00:00" } >>> next_results = index.list_imports_paginated(limit=5, pagination_token=results.pagination.next)
132 def describe_import(self, id: str) -> ImportModel: 133 """ 134 Args: 135 id (str): The id of the import operation. This value is returned when 136 starting an import, and can be looked up using list_imports. 137 138 Returns: 139 `ImportModel`: An object containing operation id, status, and other details. 140 141 describe_import is used to get detailed information about a specific import operation. 142 """ 143 args = BulkImportRequestFactory.describe_import_args(id=id) 144 return self.__import_operations_api.describe_bulk_import(**args)
Arguments:
- id (str): The id of the import operation. This value is returned when starting an import, and can be looked up using list_imports.
Returns:
ImportModel
: An object containing operation id, status, and other details.
describe_import is used to get detailed information about a specific import operation.
146 def cancel_import(self, id: str): 147 """Cancel an import operation. 148 149 Args: 150 id (str): The id of the import operation to cancel. 151 """ 152 args = BulkImportRequestFactory.cancel_import_args(id=id) 153 return self.__import_operations_api.cancel_bulk_import(**args)
Cancel an import operation.
Arguments:
- id (str): The id of the import operation to cancel.