pinecone.data.features.bulk_import

  1from enum import Enum
  2from typing import Optional, Literal, Iterator, List, Type, cast
  3
  4from pinecone.config.config import ConfigBuilder
  5from pinecone.core_ea.openapi.db_data import ApiClient
  6from pinecone.core_ea.openapi.db_data.api.bulk_operations_api import BulkOperationsApi
  7from pinecone.core_ea.openapi.shared import API_VERSION
  8
  9from pinecone.utils import parse_non_empty_args, install_json_repr_override, setup_openapi_client
 10
 11from pinecone.core_ea.openapi.db_data.models import (
 12    StartImportRequest,
 13    StartImportResponse,
 14    ImportListResponse,
 15    ImportModel,
 16    ImportErrorMode as ImportErrorModeClass,
 17)
 18
 19for m in [StartImportResponse, ImportListResponse, ImportModel]:
 20    install_json_repr_override(m)
 21
 22ImportErrorMode: Type[Enum] = cast(
 23    Type[Enum], Enum("ImportErrorMode", ImportErrorModeClass.allowed_values[("on_error",)])
 24)
 25
 26
 27class ImportFeatureMixin:
 28    def __init__(self, **kwargs):
 29        config = ConfigBuilder.build(**kwargs)
 30        openapi_config = ConfigBuilder.build_openapi_config(
 31            config, kwargs.get("openapi_config", None)
 32        )
 33
 34        if kwargs.get("__import_operations_api", None):
 35            self.__import_operations_api = kwargs.get("__import_operations_api")
 36        else:
 37            self.__import_operations_api = setup_openapi_client(
 38                api_client_klass=ApiClient,
 39                api_klass=BulkOperationsApi,
 40                config=config,
 41                openapi_config=openapi_config,
 42                pool_threads=kwargs.get("pool_threads", 1),
 43                api_version=API_VERSION,
 44            )
 45
 46    def start_import(
 47        self,
 48        uri: str,
 49        integration_id: Optional[str] = None,
 50        error_mode: Optional[Literal["CONTINUE", "ABORT"]] = "CONTINUE",
 51    ) -> StartImportResponse:
 52        """Import data from a storage provider into an index. The uri must start with the scheme of a supported
 53        storage provider. For buckets that are not publicly readable, you will also need to separately configure
 54        a storage integration and pass the integration id.
 55
 56        Examples:
 57            >>> from pinecone import Pinecone
 58            >>> index = Pinecone().Index('my-index')
 59            >>> index.start_import(uri="s3://bucket-name/path/to/data.parquet")
 60            { id: "1" }
 61
 62        Args:
 63            uri (str): The URI of the data to import. The URI must start with the scheme of a supported storage provider.
 64            integration_id (Optional[str], optional): If your bucket requires authentication to access, you need to pass the id of your storage integration using this property. Defaults to None.
 65            error_mode: Defaults to "CONTINUE". If set to "CONTINUE", the import operation will continue even if some
 66                records fail to import. Pass "ABORT" to stop the import operation if any records fail to import.
 67
 68        Returns:
 69            StartImportResponse: Contains the id of the import operation.
 70        """
 71        if isinstance(error_mode, ImportErrorMode):
 72            error_mode = error_mode.value
 73        elif isinstance(error_mode, str):
 74            try:
 75                error_mode = ImportErrorMode(error_mode.lower()).value
 76            except ValueError:
 77                raise ValueError(f"Invalid error_mode value: {error_mode}")
 78
 79        args_dict = parse_non_empty_args(
 80            [
 81                ("uri", uri),
 82                ("integration_id", integration_id),
 83                ("error_mode", ImportErrorModeClass(on_error=error_mode)),
 84            ]
 85        )
 86
 87        return self.__import_operations_api.start_import(StartImportRequest(**args_dict))
 88
 89    def list_imports(self, **kwargs) -> Iterator[List[ImportModel]]:
 90        """
 91        Returns a generator that yields each import operation. It automatically handles pagination tokens on your behalf so you can
 92        easily iterate over all results. The `list_imports` method accepts all of the same arguments as list_imports_paginated
 93
 94        ```python
 95        for op in index.list_imports():
 96            print(op)
 97        ```
 98
 99        You can convert the generator into a list by wrapping the generator in a call to the built-in `list` function:
100
101        ```python
102        operations = list(index.list_imports())
103        ```
104
105        You should be cautious with this approach because it will fetch all operations at once, which could be a large number
106        of network calls and a lot of memory to hold the results.
107
108        Args:
109            limit (Optional[int]): The maximum number of operations to fetch in each network call. If unspecified, the server will use a default value. [optional]
110            pagination_token (Optional[str]): When there are multiple pages of results, a pagination token is returned in the response. The token can be used
111                to fetch the next page of results. [optional]
112        """
113        done = False
114        while not done:
115            results = self.list_imports_paginated(**kwargs)
116            if len(results.data) > 0:
117                for op in results.data:
118                    yield op
119
120            if results.pagination:
121                kwargs.update({"pagination_token": results.pagination.next})
122            else:
123                done = True
124
125    def list_imports_paginated(
126        self, limit: Optional[int] = None, pagination_token: Optional[str] = None, **kwargs
127    ) -> ImportListResponse:
128        """
129        The list_imports_paginated operation returns information about import operations.
130        It returns operations in a paginated form, with a pagination token to fetch the next page of results.
131
132        Consider using the `list_imports` method to avoid having to handle pagination tokens manually.
133
134        Examples:
135            >>> results = index.list_imports_paginated(limit=5)
136            >>> results.pagination.next
137            eyJza2lwX3Bhc3QiOiI5OTMiLCJwcmVmaXgiOiI5OSJ9
138            >>> results.data[0]
139            {
140                "id": "6",
141                "uri": "s3://dev-bulk-import-datasets-pub/10-records-dim-10/",
142                "status": "Completed",
143                "percent_complete": 100.0,
144                "records_imported": 10,
145                "created_at": "2024-09-06T14:52:02.567776+00:00",
146                "finished_at": "2024-09-06T14:52:28.130717+00:00"
147            }
148            >>> next_results = index.list_imports_paginated(limit=5, pagination_token=results.pagination.next)
149
150        Args:
151            limit (Optional[int]): The maximum number of ids to return. If unspecified, the server will use a default value. [optional]
152            pagination_token (Optional[str]): A token needed to fetch the next page of results. This token is returned
153                in the response if additional results are available. [optional]
154
155        Returns: ImportListResponse object which contains the list of operations as ImportModel objects, pagination information,
156            and usage showing the number of read_units consumed.
157        """
158        args_dict = parse_non_empty_args([("limit", limit), ("pagination_token", pagination_token)])
159        return self.__import_operations_api.list_imports(**args_dict)
160
161    def describe_import(self, id: str) -> ImportModel:
162        """
163        describe_import is used to get detailed information about a specific import operation.
164
165        Args:
166            id (str): The id of the import operation. This value is returned when
167            starting an import, and can be looked up using list_imports.
168
169        Returns:
170            ImportModel: An object containing operation id, status, and other details.
171        """
172        if isinstance(id, int):
173            id = str(id)
174        return self.__import_operations_api.describe_import(id=id)
175
176    def cancel_import(self, id: str):
177        """Cancel an import operation.
178
179        Args:
180            id (str): The id of the import operation to cancel.
181        """
182        if isinstance(id, int):
183            id = str(id)
184        return self.__import_operations_api.cancel_import(id=id)
class ImportErrorMode(enum.Enum):

Create a collection of name/value pairs.

Example enumeration:

>>> class Color(Enum):
...     RED = 1
...     BLUE = 2
...     GREEN = 3

Access them by:

  • attribute access::
>>> Color.RED
<Color.RED: 1>
  • value lookup:
>>> Color(1)
<Color.RED: 1>
  • name lookup:
>>> Color['RED']
<Color.RED: 1>

Enumerations can be iterated over, and know how many members they have:

>>> len(Color)
3
>>> list(Color)
[<Color.RED: 1>, <Color.BLUE: 2>, <Color.GREEN: 3>]

Methods can be added to enumerations, and members can have their own attributes -- see the documentation for details.

ABORT = <ImportErrorMode.ABORT: 'abort'>
CONTINUE = <ImportErrorMode.CONTINUE: 'continue'>
Inherited Members
enum.Enum
name
value
class ImportFeatureMixin:
 28class ImportFeatureMixin:
 29    def __init__(self, **kwargs):
 30        config = ConfigBuilder.build(**kwargs)
 31        openapi_config = ConfigBuilder.build_openapi_config(
 32            config, kwargs.get("openapi_config", None)
 33        )
 34
 35        if kwargs.get("__import_operations_api", None):
 36            self.__import_operations_api = kwargs.get("__import_operations_api")
 37        else:
 38            self.__import_operations_api = setup_openapi_client(
 39                api_client_klass=ApiClient,
 40                api_klass=BulkOperationsApi,
 41                config=config,
 42                openapi_config=openapi_config,
 43                pool_threads=kwargs.get("pool_threads", 1),
 44                api_version=API_VERSION,
 45            )
 46
 47    def start_import(
 48        self,
 49        uri: str,
 50        integration_id: Optional[str] = None,
 51        error_mode: Optional[Literal["CONTINUE", "ABORT"]] = "CONTINUE",
 52    ) -> StartImportResponse:
 53        """Import data from a storage provider into an index. The uri must start with the scheme of a supported
 54        storage provider. For buckets that are not publicly readable, you will also need to separately configure
 55        a storage integration and pass the integration id.
 56
 57        Examples:
 58            >>> from pinecone import Pinecone
 59            >>> index = Pinecone().Index('my-index')
 60            >>> index.start_import(uri="s3://bucket-name/path/to/data.parquet")
 61            { id: "1" }
 62
 63        Args:
 64            uri (str): The URI of the data to import. The URI must start with the scheme of a supported storage provider.
 65            integration_id (Optional[str], optional): If your bucket requires authentication to access, you need to pass the id of your storage integration using this property. Defaults to None.
 66            error_mode: Defaults to "CONTINUE". If set to "CONTINUE", the import operation will continue even if some
 67                records fail to import. Pass "ABORT" to stop the import operation if any records fail to import.
 68
 69        Returns:
 70            StartImportResponse: Contains the id of the import operation.
 71        """
 72        if isinstance(error_mode, ImportErrorMode):
 73            error_mode = error_mode.value
 74        elif isinstance(error_mode, str):
 75            try:
 76                error_mode = ImportErrorMode(error_mode.lower()).value
 77            except ValueError:
 78                raise ValueError(f"Invalid error_mode value: {error_mode}")
 79
 80        args_dict = parse_non_empty_args(
 81            [
 82                ("uri", uri),
 83                ("integration_id", integration_id),
 84                ("error_mode", ImportErrorModeClass(on_error=error_mode)),
 85            ]
 86        )
 87
 88        return self.__import_operations_api.start_import(StartImportRequest(**args_dict))
 89
 90    def list_imports(self, **kwargs) -> Iterator[List[ImportModel]]:
 91        """
 92        Returns a generator that yields each import operation. It automatically handles pagination tokens on your behalf so you can
 93        easily iterate over all results. The `list_imports` method accepts all of the same arguments as list_imports_paginated
 94
 95        ```python
 96        for op in index.list_imports():
 97            print(op)
 98        ```
 99
100        You can convert the generator into a list by wrapping the generator in a call to the built-in `list` function:
101
102        ```python
103        operations = list(index.list_imports())
104        ```
105
106        You should be cautious with this approach because it will fetch all operations at once, which could be a large number
107        of network calls and a lot of memory to hold the results.
108
109        Args:
110            limit (Optional[int]): The maximum number of operations to fetch in each network call. If unspecified, the server will use a default value. [optional]
111            pagination_token (Optional[str]): When there are multiple pages of results, a pagination token is returned in the response. The token can be used
112                to fetch the next page of results. [optional]
113        """
114        done = False
115        while not done:
116            results = self.list_imports_paginated(**kwargs)
117            if len(results.data) > 0:
118                for op in results.data:
119                    yield op
120
121            if results.pagination:
122                kwargs.update({"pagination_token": results.pagination.next})
123            else:
124                done = True
125
126    def list_imports_paginated(
127        self, limit: Optional[int] = None, pagination_token: Optional[str] = None, **kwargs
128    ) -> ImportListResponse:
129        """
130        The list_imports_paginated operation returns information about import operations.
131        It returns operations in a paginated form, with a pagination token to fetch the next page of results.
132
133        Consider using the `list_imports` method to avoid having to handle pagination tokens manually.
134
135        Examples:
136            >>> results = index.list_imports_paginated(limit=5)
137            >>> results.pagination.next
138            eyJza2lwX3Bhc3QiOiI5OTMiLCJwcmVmaXgiOiI5OSJ9
139            >>> results.data[0]
140            {
141                "id": "6",
142                "uri": "s3://dev-bulk-import-datasets-pub/10-records-dim-10/",
143                "status": "Completed",
144                "percent_complete": 100.0,
145                "records_imported": 10,
146                "created_at": "2024-09-06T14:52:02.567776+00:00",
147                "finished_at": "2024-09-06T14:52:28.130717+00:00"
148            }
149            >>> next_results = index.list_imports_paginated(limit=5, pagination_token=results.pagination.next)
150
151        Args:
152            limit (Optional[int]): The maximum number of ids to return. If unspecified, the server will use a default value. [optional]
153            pagination_token (Optional[str]): A token needed to fetch the next page of results. This token is returned
154                in the response if additional results are available. [optional]
155
156        Returns: ImportListResponse object which contains the list of operations as ImportModel objects, pagination information,
157            and usage showing the number of read_units consumed.
158        """
159        args_dict = parse_non_empty_args([("limit", limit), ("pagination_token", pagination_token)])
160        return self.__import_operations_api.list_imports(**args_dict)
161
162    def describe_import(self, id: str) -> ImportModel:
163        """
164        describe_import is used to get detailed information about a specific import operation.
165
166        Args:
167            id (str): The id of the import operation. This value is returned when
168            starting an import, and can be looked up using list_imports.
169
170        Returns:
171            ImportModel: An object containing operation id, status, and other details.
172        """
173        if isinstance(id, int):
174            id = str(id)
175        return self.__import_operations_api.describe_import(id=id)
176
177    def cancel_import(self, id: str):
178        """Cancel an import operation.
179
180        Args:
181            id (str): The id of the import operation to cancel.
182        """
183        if isinstance(id, int):
184            id = str(id)
185        return self.__import_operations_api.cancel_import(id=id)
ImportFeatureMixin(**kwargs)
29    def __init__(self, **kwargs):
30        config = ConfigBuilder.build(**kwargs)
31        openapi_config = ConfigBuilder.build_openapi_config(
32            config, kwargs.get("openapi_config", None)
33        )
34
35        if kwargs.get("__import_operations_api", None):
36            self.__import_operations_api = kwargs.get("__import_operations_api")
37        else:
38            self.__import_operations_api = setup_openapi_client(
39                api_client_klass=ApiClient,
40                api_klass=BulkOperationsApi,
41                config=config,
42                openapi_config=openapi_config,
43                pool_threads=kwargs.get("pool_threads", 1),
44                api_version=API_VERSION,
45            )
def start_import( self, uri: str, integration_id: Optional[str] = None, error_mode: Optional[Literal['CONTINUE', 'ABORT']] = 'CONTINUE') -> pinecone.core_ea.openapi.db_data.model.start_import_response.StartImportResponse:
47    def start_import(
48        self,
49        uri: str,
50        integration_id: Optional[str] = None,
51        error_mode: Optional[Literal["CONTINUE", "ABORT"]] = "CONTINUE",
52    ) -> StartImportResponse:
53        """Import data from a storage provider into an index. The uri must start with the scheme of a supported
54        storage provider. For buckets that are not publicly readable, you will also need to separately configure
55        a storage integration and pass the integration id.
56
57        Examples:
58            >>> from pinecone import Pinecone
59            >>> index = Pinecone().Index('my-index')
60            >>> index.start_import(uri="s3://bucket-name/path/to/data.parquet")
61            { id: "1" }
62
63        Args:
64            uri (str): The URI of the data to import. The URI must start with the scheme of a supported storage provider.
65            integration_id (Optional[str], optional): If your bucket requires authentication to access, you need to pass the id of your storage integration using this property. Defaults to None.
66            error_mode: Defaults to "CONTINUE". If set to "CONTINUE", the import operation will continue even if some
67                records fail to import. Pass "ABORT" to stop the import operation if any records fail to import.
68
69        Returns:
70            StartImportResponse: Contains the id of the import operation.
71        """
72        if isinstance(error_mode, ImportErrorMode):
73            error_mode = error_mode.value
74        elif isinstance(error_mode, str):
75            try:
76                error_mode = ImportErrorMode(error_mode.lower()).value
77            except ValueError:
78                raise ValueError(f"Invalid error_mode value: {error_mode}")
79
80        args_dict = parse_non_empty_args(
81            [
82                ("uri", uri),
83                ("integration_id", integration_id),
84                ("error_mode", ImportErrorModeClass(on_error=error_mode)),
85            ]
86        )
87
88        return self.__import_operations_api.start_import(StartImportRequest(**args_dict))

Import data from a storage provider into an index. The uri must start with the scheme of a supported storage provider. For buckets that are not publicly readable, you will also need to separately configure a storage integration and pass the integration id.

Examples:
>>> from pinecone import Pinecone
>>> index = Pinecone().Index('my-index')
>>> index.start_import(uri="s3://bucket-name/path/to/data.parquet")
{ id: "1" }
Arguments:
  • uri (str): The URI of the data to import. The URI must start with the scheme of a supported storage provider.
  • integration_id (Optional[str], optional): If your bucket requires authentication to access, you need to pass the id of your storage integration using this property. Defaults to None.
  • error_mode: Defaults to "CONTINUE". If set to "CONTINUE", the import operation will continue even if some records fail to import. Pass "ABORT" to stop the import operation if any records fail to import.
Returns:

StartImportResponse: Contains the id of the import operation.

def list_imports( self, **kwargs) -> Iterator[List[pinecone.core_ea.openapi.db_data.model.import_model.ImportModel]]:
 90    def list_imports(self, **kwargs) -> Iterator[List[ImportModel]]:
 91        """
 92        Returns a generator that yields each import operation. It automatically handles pagination tokens on your behalf so you can
 93        easily iterate over all results. The `list_imports` method accepts all of the same arguments as list_imports_paginated
 94
 95        ```python
 96        for op in index.list_imports():
 97            print(op)
 98        ```
 99
100        You can convert the generator into a list by wrapping the generator in a call to the built-in `list` function:
101
102        ```python
103        operations = list(index.list_imports())
104        ```
105
106        You should be cautious with this approach because it will fetch all operations at once, which could be a large number
107        of network calls and a lot of memory to hold the results.
108
109        Args:
110            limit (Optional[int]): The maximum number of operations to fetch in each network call. If unspecified, the server will use a default value. [optional]
111            pagination_token (Optional[str]): When there are multiple pages of results, a pagination token is returned in the response. The token can be used
112                to fetch the next page of results. [optional]
113        """
114        done = False
115        while not done:
116            results = self.list_imports_paginated(**kwargs)
117            if len(results.data) > 0:
118                for op in results.data:
119                    yield op
120
121            if results.pagination:
122                kwargs.update({"pagination_token": results.pagination.next})
123            else:
124                done = True

Returns a generator that yields each import operation. It automatically handles pagination tokens on your behalf so you can easily iterate over all results. The list_imports method accepts all of the same arguments as list_imports_paginated

for op in index.list_imports():
    print(op)

You can convert the generator into a list by wrapping the generator in a call to the built-in list function:

operations = list(index.list_imports())

You should be cautious with this approach because it will fetch all operations at once, which could be a large number of network calls and a lot of memory to hold the results.

Arguments:
  • limit (Optional[int]): The maximum number of operations to fetch in each network call. If unspecified, the server will use a default value. [optional]
  • pagination_token (Optional[str]): When there are multiple pages of results, a pagination token is returned in the response. The token can be used to fetch the next page of results. [optional]
def list_imports_paginated( self, limit: Optional[int] = None, pagination_token: Optional[str] = None, **kwargs) -> pinecone.core_ea.openapi.db_data.model.import_list_response.ImportListResponse:
126    def list_imports_paginated(
127        self, limit: Optional[int] = None, pagination_token: Optional[str] = None, **kwargs
128    ) -> ImportListResponse:
129        """
130        The list_imports_paginated operation returns information about import operations.
131        It returns operations in a paginated form, with a pagination token to fetch the next page of results.
132
133        Consider using the `list_imports` method to avoid having to handle pagination tokens manually.
134
135        Examples:
136            >>> results = index.list_imports_paginated(limit=5)
137            >>> results.pagination.next
138            eyJza2lwX3Bhc3QiOiI5OTMiLCJwcmVmaXgiOiI5OSJ9
139            >>> results.data[0]
140            {
141                "id": "6",
142                "uri": "s3://dev-bulk-import-datasets-pub/10-records-dim-10/",
143                "status": "Completed",
144                "percent_complete": 100.0,
145                "records_imported": 10,
146                "created_at": "2024-09-06T14:52:02.567776+00:00",
147                "finished_at": "2024-09-06T14:52:28.130717+00:00"
148            }
149            >>> next_results = index.list_imports_paginated(limit=5, pagination_token=results.pagination.next)
150
151        Args:
152            limit (Optional[int]): The maximum number of ids to return. If unspecified, the server will use a default value. [optional]
153            pagination_token (Optional[str]): A token needed to fetch the next page of results. This token is returned
154                in the response if additional results are available. [optional]
155
156        Returns: ImportListResponse object which contains the list of operations as ImportModel objects, pagination information,
157            and usage showing the number of read_units consumed.
158        """
159        args_dict = parse_non_empty_args([("limit", limit), ("pagination_token", pagination_token)])
160        return self.__import_operations_api.list_imports(**args_dict)

The list_imports_paginated operation returns information about import operations. It returns operations in a paginated form, with a pagination token to fetch the next page of results.

Consider using the list_imports method to avoid having to handle pagination tokens manually.

Examples:
>>> results = index.list_imports_paginated(limit=5)
>>> results.pagination.next
eyJza2lwX3Bhc3QiOiI5OTMiLCJwcmVmaXgiOiI5OSJ9
>>> results.data[0]
{
    "id": "6",
    "uri": "s3://dev-bulk-import-datasets-pub/10-records-dim-10/",
    "status": "Completed",
    "percent_complete": 100.0,
    "records_imported": 10,
    "created_at": "2024-09-06T14:52:02.567776+00:00",
    "finished_at": "2024-09-06T14:52:28.130717+00:00"
}
>>> next_results = index.list_imports_paginated(limit=5, pagination_token=results.pagination.next)
Arguments:
  • limit (Optional[int]): The maximum number of ids to return. If unspecified, the server will use a default value. [optional]
  • pagination_token (Optional[str]): A token needed to fetch the next page of results. This token is returned in the response if additional results are available. [optional]

Returns: ImportListResponse object which contains the list of operations as ImportModel objects, pagination information, and usage showing the number of read_units consumed.

def describe_import( self, id: str) -> pinecone.core_ea.openapi.db_data.model.import_model.ImportModel:
162    def describe_import(self, id: str) -> ImportModel:
163        """
164        describe_import is used to get detailed information about a specific import operation.
165
166        Args:
167            id (str): The id of the import operation. This value is returned when
168            starting an import, and can be looked up using list_imports.
169
170        Returns:
171            ImportModel: An object containing operation id, status, and other details.
172        """
173        if isinstance(id, int):
174            id = str(id)
175        return self.__import_operations_api.describe_import(id=id)

describe_import is used to get detailed information about a specific import operation.

Arguments:
  • id (str): The id of the import operation. This value is returned when
  • starting an import, and can be looked up using list_imports.
Returns:

ImportModel: An object containing operation id, status, and other details.

def cancel_import(self, id: str):
177    def cancel_import(self, id: str):
178        """Cancel an import operation.
179
180        Args:
181            id (str): The id of the import operation to cancel.
182        """
183        if isinstance(id, int):
184            id = str(id)
185        return self.__import_operations_api.cancel_import(id=id)

Cancel an import operation.

Arguments:
  • id (str): The id of the import operation to cancel.