pinecone.data.features.bulk_import.bulk_import_asyncio
1from typing import Optional, Literal, AsyncIterator, List 2 3from pinecone.core.openapi.db_data.api.bulk_operations_api import AsyncioBulkOperationsApi 4 5from pinecone.utils import install_json_repr_override 6 7from pinecone.core.openapi.db_data.models import ( 8 StartImportResponse, 9 ListImportsResponse, 10 ImportModel, 11) 12 13from .bulk_import_request_factory import BulkImportRequestFactory 14 15for m in [StartImportResponse, ListImportsResponse, ImportModel]: 16 install_json_repr_override(m) 17 18 19class ImportFeatureMixinAsyncio: 20 def __init__(self, api_client, **kwargs) -> None: 21 self.__import_operations_api = AsyncioBulkOperationsApi(api_client) 22 23 async def start_import( 24 self, 25 uri: str, 26 integration_id: Optional[str] = None, 27 error_mode: Optional[Literal["CONTINUE", "ABORT"]] = "CONTINUE", 28 ) -> StartImportResponse: 29 """ 30 Args: 31 uri (str): The URI of the data to import. The URI must start with the scheme of a supported storage provider. 32 integration_id (Optional[str], optional): If your bucket requires authentication to access, you need to pass the id of your storage integration using this property. Defaults to None. 33 error_mode: Defaults to "CONTINUE". If set to "CONTINUE", the import operation will continue even if some 34 records fail to import. Pass "ABORT" to stop the import operation if any records fail to import. 35 36 Returns: 37 `StartImportResponse`: Contains the id of the import operation. 38 39 Import data from a storage provider into an index. The uri must start with the scheme of a supported 40 storage provider. For buckets that are not publicly readable, you will also need to separately configure 41 a storage integration and pass the integration id. 42 43 Examples: 44 >>> from pinecone import Pinecone 45 >>> index = Pinecone().IndexAsyncio(host="example-index.svc.aped-4627-b74a.pinecone.io") 46 >>> await index.start_import(uri="s3://bucket-name/path/to/data.parquet") 47 { id: "1" } 48 49 """ 50 req = BulkImportRequestFactory.start_import_request( 51 uri=uri, integration_id=integration_id, error_mode=error_mode 52 ) 53 return await self.__import_operations_api.start_bulk_import(req) 54 55 async def list_imports(self, **kwargs) -> AsyncIterator[List[ImportModel]]: 56 """ 57 Args: 58 limit (Optional[int]): The maximum number of operations to fetch in each network call. If unspecified, the server will use a default value. [optional] 59 pagination_token (Optional[str]): When there are multiple pages of results, a pagination token is returned in the response. The token can be used 60 to fetch the next page of results. [optional] 61 62 Returns an async generator that yields each import operation. It automatically handles pagination tokens on your behalf so you can 63 easily iterate over all results. The `list_imports` method accepts all of the same arguments as `list_imports_paginated` 64 65 ```python 66 async for op in index.list_imports(): 67 print(op) 68 ``` 69 """ 70 done = False 71 while not done: 72 results = await self.list_imports_paginated(**kwargs) 73 if len(results.data) > 0: 74 for op in results.data: 75 yield op 76 77 if results.pagination: 78 kwargs.update({"pagination_token": results.pagination.next}) 79 else: 80 done = True 81 82 async def list_imports_paginated( 83 self, limit: Optional[int] = None, pagination_token: Optional[str] = None, **kwargs 84 ) -> ListImportsResponse: 85 """ 86 Args: 87 limit (Optional[int]): The maximum number of ids to return. If unspecified, the server will use a default value. [optional] 88 pagination_token (Optional[str]): A token needed to fetch the next page of results. This token is returned 89 in the response if additional results are available. [optional] 90 91 Returns: 92 `ListImportsResponse` object which contains the list of operations as ImportModel objects, pagination information, 93 and usage showing the number of read_units consumed. 94 95 The `list_imports_paginated` operation returns information about import operations. 96 It returns operations in a paginated form, with a pagination token to fetch the next page of results. 97 98 Consider using the `list_imports` method to avoid having to handle pagination tokens manually. 99 100 Examples: 101 >>> results = await index.list_imports_paginated(limit=5) 102 >>> results.pagination.next 103 eyJza2lwX3Bhc3QiOiI5OTMiLCJwcmVmaXgiOiI5OSJ9 104 >>> results.data[0] 105 { 106 "id": "6", 107 "uri": "s3://dev-bulk-import-datasets-pub/10-records-dim-10/", 108 "status": "Completed", 109 "percent_complete": 100.0, 110 "records_imported": 10, 111 "created_at": "2024-09-06T14:52:02.567776+00:00", 112 "finished_at": "2024-09-06T14:52:28.130717+00:00" 113 } 114 >>> next_results = await index.list_imports_paginated(limit=5, pagination_token=results.pagination.next) 115 116 """ 117 args_dict = BulkImportRequestFactory.list_imports_paginated_args( 118 limit=limit, pagination_token=pagination_token, **kwargs 119 ) 120 return await self.__import_operations_api.list_bulk_imports(**args_dict) 121 122 async def describe_import(self, id: str) -> ImportModel: 123 """ 124 Args: 125 id (str): The id of the import operation. This value is returned when 126 starting an import, and can be looked up using list_imports. 127 128 Returns: 129 ImportModel: An object containing operation id, status, and other details. 130 131 `describe_import` is used to get detailed information about a specific import operation. 132 """ 133 args = BulkImportRequestFactory.describe_import_args(id=id) 134 return await self.__import_operations_api.describe_bulk_import(**args) 135 136 async def cancel_import(self, id: str): 137 """Cancel an import operation. 138 139 Args: 140 id (str): The id of the import operation to cancel. 141 """ 142 args = BulkImportRequestFactory.cancel_import_args(id=id) 143 return await self.__import_operations_api.cancel_bulk_import(**args)
20class ImportFeatureMixinAsyncio: 21 def __init__(self, api_client, **kwargs) -> None: 22 self.__import_operations_api = AsyncioBulkOperationsApi(api_client) 23 24 async def start_import( 25 self, 26 uri: str, 27 integration_id: Optional[str] = None, 28 error_mode: Optional[Literal["CONTINUE", "ABORT"]] = "CONTINUE", 29 ) -> StartImportResponse: 30 """ 31 Args: 32 uri (str): The URI of the data to import. The URI must start with the scheme of a supported storage provider. 33 integration_id (Optional[str], optional): If your bucket requires authentication to access, you need to pass the id of your storage integration using this property. Defaults to None. 34 error_mode: Defaults to "CONTINUE". If set to "CONTINUE", the import operation will continue even if some 35 records fail to import. Pass "ABORT" to stop the import operation if any records fail to import. 36 37 Returns: 38 `StartImportResponse`: Contains the id of the import operation. 39 40 Import data from a storage provider into an index. The uri must start with the scheme of a supported 41 storage provider. For buckets that are not publicly readable, you will also need to separately configure 42 a storage integration and pass the integration id. 43 44 Examples: 45 >>> from pinecone import Pinecone 46 >>> index = Pinecone().IndexAsyncio(host="example-index.svc.aped-4627-b74a.pinecone.io") 47 >>> await index.start_import(uri="s3://bucket-name/path/to/data.parquet") 48 { id: "1" } 49 50 """ 51 req = BulkImportRequestFactory.start_import_request( 52 uri=uri, integration_id=integration_id, error_mode=error_mode 53 ) 54 return await self.__import_operations_api.start_bulk_import(req) 55 56 async def list_imports(self, **kwargs) -> AsyncIterator[List[ImportModel]]: 57 """ 58 Args: 59 limit (Optional[int]): The maximum number of operations to fetch in each network call. If unspecified, the server will use a default value. [optional] 60 pagination_token (Optional[str]): When there are multiple pages of results, a pagination token is returned in the response. The token can be used 61 to fetch the next page of results. [optional] 62 63 Returns an async generator that yields each import operation. It automatically handles pagination tokens on your behalf so you can 64 easily iterate over all results. The `list_imports` method accepts all of the same arguments as `list_imports_paginated` 65 66 ```python 67 async for op in index.list_imports(): 68 print(op) 69 ``` 70 """ 71 done = False 72 while not done: 73 results = await self.list_imports_paginated(**kwargs) 74 if len(results.data) > 0: 75 for op in results.data: 76 yield op 77 78 if results.pagination: 79 kwargs.update({"pagination_token": results.pagination.next}) 80 else: 81 done = True 82 83 async def list_imports_paginated( 84 self, limit: Optional[int] = None, pagination_token: Optional[str] = None, **kwargs 85 ) -> ListImportsResponse: 86 """ 87 Args: 88 limit (Optional[int]): The maximum number of ids to return. If unspecified, the server will use a default value. [optional] 89 pagination_token (Optional[str]): A token needed to fetch the next page of results. This token is returned 90 in the response if additional results are available. [optional] 91 92 Returns: 93 `ListImportsResponse` object which contains the list of operations as ImportModel objects, pagination information, 94 and usage showing the number of read_units consumed. 95 96 The `list_imports_paginated` operation returns information about import operations. 97 It returns operations in a paginated form, with a pagination token to fetch the next page of results. 98 99 Consider using the `list_imports` method to avoid having to handle pagination tokens manually. 100 101 Examples: 102 >>> results = await index.list_imports_paginated(limit=5) 103 >>> results.pagination.next 104 eyJza2lwX3Bhc3QiOiI5OTMiLCJwcmVmaXgiOiI5OSJ9 105 >>> results.data[0] 106 { 107 "id": "6", 108 "uri": "s3://dev-bulk-import-datasets-pub/10-records-dim-10/", 109 "status": "Completed", 110 "percent_complete": 100.0, 111 "records_imported": 10, 112 "created_at": "2024-09-06T14:52:02.567776+00:00", 113 "finished_at": "2024-09-06T14:52:28.130717+00:00" 114 } 115 >>> next_results = await index.list_imports_paginated(limit=5, pagination_token=results.pagination.next) 116 117 """ 118 args_dict = BulkImportRequestFactory.list_imports_paginated_args( 119 limit=limit, pagination_token=pagination_token, **kwargs 120 ) 121 return await self.__import_operations_api.list_bulk_imports(**args_dict) 122 123 async def describe_import(self, id: str) -> ImportModel: 124 """ 125 Args: 126 id (str): The id of the import operation. This value is returned when 127 starting an import, and can be looked up using list_imports. 128 129 Returns: 130 ImportModel: An object containing operation id, status, and other details. 131 132 `describe_import` is used to get detailed information about a specific import operation. 133 """ 134 args = BulkImportRequestFactory.describe_import_args(id=id) 135 return await self.__import_operations_api.describe_bulk_import(**args) 136 137 async def cancel_import(self, id: str): 138 """Cancel an import operation. 139 140 Args: 141 id (str): The id of the import operation to cancel. 142 """ 143 args = BulkImportRequestFactory.cancel_import_args(id=id) 144 return await self.__import_operations_api.cancel_bulk_import(**args)
24 async def start_import( 25 self, 26 uri: str, 27 integration_id: Optional[str] = None, 28 error_mode: Optional[Literal["CONTINUE", "ABORT"]] = "CONTINUE", 29 ) -> StartImportResponse: 30 """ 31 Args: 32 uri (str): The URI of the data to import. The URI must start with the scheme of a supported storage provider. 33 integration_id (Optional[str], optional): If your bucket requires authentication to access, you need to pass the id of your storage integration using this property. Defaults to None. 34 error_mode: Defaults to "CONTINUE". If set to "CONTINUE", the import operation will continue even if some 35 records fail to import. Pass "ABORT" to stop the import operation if any records fail to import. 36 37 Returns: 38 `StartImportResponse`: Contains the id of the import operation. 39 40 Import data from a storage provider into an index. The uri must start with the scheme of a supported 41 storage provider. For buckets that are not publicly readable, you will also need to separately configure 42 a storage integration and pass the integration id. 43 44 Examples: 45 >>> from pinecone import Pinecone 46 >>> index = Pinecone().IndexAsyncio(host="example-index.svc.aped-4627-b74a.pinecone.io") 47 >>> await index.start_import(uri="s3://bucket-name/path/to/data.parquet") 48 { id: "1" } 49 50 """ 51 req = BulkImportRequestFactory.start_import_request( 52 uri=uri, integration_id=integration_id, error_mode=error_mode 53 ) 54 return await self.__import_operations_api.start_bulk_import(req)
Arguments:
- uri (str): The URI of the data to import. The URI must start with the scheme of a supported storage provider.
- integration_id (Optional[str], optional): If your bucket requires authentication to access, you need to pass the id of your storage integration using this property. Defaults to None.
- error_mode: Defaults to "CONTINUE". If set to "CONTINUE", the import operation will continue even if some records fail to import. Pass "ABORT" to stop the import operation if any records fail to import.
Returns:
StartImportResponse
: Contains the id of the import operation.
Import data from a storage provider into an index. The uri must start with the scheme of a supported storage provider. For buckets that are not publicly readable, you will also need to separately configure a storage integration and pass the integration id.
Examples:
>>> from pinecone import Pinecone >>> index = Pinecone().IndexAsyncio(host="example-index.svc.aped-4627-b74a.pinecone.io") >>> await index.start_import(uri="s3://bucket-name/path/to/data.parquet") { id: "1" }
56 async def list_imports(self, **kwargs) -> AsyncIterator[List[ImportModel]]: 57 """ 58 Args: 59 limit (Optional[int]): The maximum number of operations to fetch in each network call. If unspecified, the server will use a default value. [optional] 60 pagination_token (Optional[str]): When there are multiple pages of results, a pagination token is returned in the response. The token can be used 61 to fetch the next page of results. [optional] 62 63 Returns an async generator that yields each import operation. It automatically handles pagination tokens on your behalf so you can 64 easily iterate over all results. The `list_imports` method accepts all of the same arguments as `list_imports_paginated` 65 66 ```python 67 async for op in index.list_imports(): 68 print(op) 69 ``` 70 """ 71 done = False 72 while not done: 73 results = await self.list_imports_paginated(**kwargs) 74 if len(results.data) > 0: 75 for op in results.data: 76 yield op 77 78 if results.pagination: 79 kwargs.update({"pagination_token": results.pagination.next}) 80 else: 81 done = True
Arguments:
- limit (Optional[int]): The maximum number of operations to fetch in each network call. If unspecified, the server will use a default value. [optional]
- pagination_token (Optional[str]): When there are multiple pages of results, a pagination token is returned in the response. The token can be used to fetch the next page of results. [optional]
Returns an async generator that yields each import operation. It automatically handles pagination tokens on your behalf so you can
easily iterate over all results. The list_imports
method accepts all of the same arguments as list_imports_paginated
async for op in index.list_imports():
print(op)
83 async def list_imports_paginated( 84 self, limit: Optional[int] = None, pagination_token: Optional[str] = None, **kwargs 85 ) -> ListImportsResponse: 86 """ 87 Args: 88 limit (Optional[int]): The maximum number of ids to return. If unspecified, the server will use a default value. [optional] 89 pagination_token (Optional[str]): A token needed to fetch the next page of results. This token is returned 90 in the response if additional results are available. [optional] 91 92 Returns: 93 `ListImportsResponse` object which contains the list of operations as ImportModel objects, pagination information, 94 and usage showing the number of read_units consumed. 95 96 The `list_imports_paginated` operation returns information about import operations. 97 It returns operations in a paginated form, with a pagination token to fetch the next page of results. 98 99 Consider using the `list_imports` method to avoid having to handle pagination tokens manually. 100 101 Examples: 102 >>> results = await index.list_imports_paginated(limit=5) 103 >>> results.pagination.next 104 eyJza2lwX3Bhc3QiOiI5OTMiLCJwcmVmaXgiOiI5OSJ9 105 >>> results.data[0] 106 { 107 "id": "6", 108 "uri": "s3://dev-bulk-import-datasets-pub/10-records-dim-10/", 109 "status": "Completed", 110 "percent_complete": 100.0, 111 "records_imported": 10, 112 "created_at": "2024-09-06T14:52:02.567776+00:00", 113 "finished_at": "2024-09-06T14:52:28.130717+00:00" 114 } 115 >>> next_results = await index.list_imports_paginated(limit=5, pagination_token=results.pagination.next) 116 117 """ 118 args_dict = BulkImportRequestFactory.list_imports_paginated_args( 119 limit=limit, pagination_token=pagination_token, **kwargs 120 ) 121 return await self.__import_operations_api.list_bulk_imports(**args_dict)
Arguments:
- limit (Optional[int]): The maximum number of ids to return. If unspecified, the server will use a default value. [optional]
- pagination_token (Optional[str]): A token needed to fetch the next page of results. This token is returned in the response if additional results are available. [optional]
Returns:
ListImportsResponse
object which contains the list of operations as ImportModel objects, pagination information, and usage showing the number of read_units consumed.
The list_imports_paginated
operation returns information about import operations.
It returns operations in a paginated form, with a pagination token to fetch the next page of results.
Consider using the list_imports
method to avoid having to handle pagination tokens manually.
Examples:
>>> results = await index.list_imports_paginated(limit=5) >>> results.pagination.next eyJza2lwX3Bhc3QiOiI5OTMiLCJwcmVmaXgiOiI5OSJ9 >>> results.data[0] { "id": "6", "uri": "s3://dev-bulk-import-datasets-pub/10-records-dim-10/", "status": "Completed", "percent_complete": 100.0, "records_imported": 10, "created_at": "2024-09-06T14:52:02.567776+00:00", "finished_at": "2024-09-06T14:52:28.130717+00:00" } >>> next_results = await index.list_imports_paginated(limit=5, pagination_token=results.pagination.next)
123 async def describe_import(self, id: str) -> ImportModel: 124 """ 125 Args: 126 id (str): The id of the import operation. This value is returned when 127 starting an import, and can be looked up using list_imports. 128 129 Returns: 130 ImportModel: An object containing operation id, status, and other details. 131 132 `describe_import` is used to get detailed information about a specific import operation. 133 """ 134 args = BulkImportRequestFactory.describe_import_args(id=id) 135 return await self.__import_operations_api.describe_bulk_import(**args)
Arguments:
- id (str): The id of the import operation. This value is returned when
- starting an import, and can be looked up using list_imports.
Returns:
ImportModel: An object containing operation id, status, and other details.
describe_import
is used to get detailed information about a specific import operation.
137 async def cancel_import(self, id: str): 138 """Cancel an import operation. 139 140 Args: 141 id (str): The id of the import operation to cancel. 142 """ 143 args = BulkImportRequestFactory.cancel_import_args(id=id) 144 return await self.__import_operations_api.cancel_bulk_import(**args)
Cancel an import operation.
Arguments:
- id (str): The id of the import operation to cancel.