pinecone.data.features.bulk_import

  1from enum import Enum
  2from typing import Optional, Literal, Iterator, List, Type, cast
  3
  4from pinecone.config.config import ConfigBuilder
  5from pinecone.core_ea.openapi.db_data import ApiClient
  6from pinecone.core_ea.openapi.db_data.api.bulk_operations_api import BulkOperationsApi
  7from pinecone.core_ea.openapi.shared import API_VERSION
  8
  9from pinecone.utils import parse_non_empty_args, install_json_repr_override, setup_openapi_client
 10
 11from pinecone.core_ea.openapi.db_data.models import (
 12    StartImportRequest,
 13    StartImportResponse,
 14    ImportListResponse,
 15    ImportModel,
 16    ImportErrorMode as ImportErrorModeClass,
 17)
 18
 19for m in [StartImportResponse, ImportListResponse, ImportModel]:
 20    install_json_repr_override(m)
 21
 22ImportErrorMode: Type[Enum] = cast(
 23    Type[Enum], Enum("ImportErrorMode", ImportErrorModeClass.allowed_values[("on_error",)])
 24)
 25
 26
 27class ImportFeatureMixin:
 28    def __init__(self, **kwargs):
 29        config = ConfigBuilder.build(
 30            **kwargs,
 31        )
 32        openapi_config = ConfigBuilder.build_openapi_config(config, kwargs.get("openapi_config", None))
 33
 34        if kwargs.get("__import_operations_api", None):
 35            self.__import_operations_api = kwargs.get("__import_operations_api")
 36        else:
 37            self.__import_operations_api = setup_openapi_client(
 38                api_client_klass=ApiClient,
 39                api_klass=BulkOperationsApi,
 40                config=config,
 41                openapi_config=openapi_config,
 42                pool_threads=kwargs.get("pool_threads", 1),
 43                api_version=API_VERSION,
 44            )
 45
 46    def start_import(
 47        self,
 48        uri: str,
 49        integration_id: Optional[str] = None,
 50        error_mode: Optional[Literal["CONTINUE", "ABORT"]] = "CONTINUE",
 51    ) -> StartImportResponse:
 52        """Import data from a storage provider into an index. The uri must start with the scheme of a supported
 53        storage provider. For buckets that are not publicly readable, you will also need to separately configure
 54        a storage integration and pass the integration id.
 55
 56        Examples:
 57            >>> from pinecone import Pinecone
 58            >>> index = Pinecone().Index('my-index')
 59            >>> index.start_import(uri="s3://bucket-name/path/to/data.parquet")
 60            { id: "1" }
 61
 62        Args:
 63            uri (str): The URI of the data to import. The URI must start with the scheme of a supported storage provider.
 64            integration_id (Optional[str], optional): If your bucket requires authentication to access, you need to pass the id of your storage integration using this property. Defaults to None.
 65            error_mode: Defaults to "CONTINUE". If set to "CONTINUE", the import operation will continue even if some
 66                records fail to import. Pass "ABORT" to stop the import operation if any records fail to import.
 67
 68        Returns:
 69            StartImportResponse: Contains the id of the import operation.
 70        """
 71        if isinstance(error_mode, ImportErrorMode):
 72            error_mode = error_mode.value
 73        elif isinstance(error_mode, str):
 74            try:
 75                error_mode = ImportErrorMode(error_mode.lower()).value
 76            except ValueError:
 77                raise ValueError(f"Invalid error_mode value: {error_mode}")
 78
 79        args_dict = parse_non_empty_args(
 80            [
 81                ("uri", uri),
 82                ("integration_id", integration_id),
 83                ("error_mode", ImportErrorModeClass(on_error=error_mode)),
 84            ]
 85        )
 86
 87        return self.__import_operations_api.start_import(StartImportRequest(**args_dict))
 88
 89    def list_imports(self, **kwargs) -> Iterator[List[ImportModel]]:
 90        """
 91        Returns a generator that yields each import operation. It automatically handles pagination tokens on your behalf so you can
 92        easily iterate over all results. The `list_imports` method accepts all of the same arguments as list_imports_paginated
 93
 94        ```python
 95        for op in index.list_imports():
 96            print(op)
 97        ```
 98
 99        You can convert the generator into a list by wrapping the generator in a call to the built-in `list` function:
100
101        ```python
102        operations = list(index.list_imports())
103        ```
104
105        You should be cautious with this approach because it will fetch all operations at once, which could be a large number
106        of network calls and a lot of memory to hold the results.
107
108        Args:
109            limit (Optional[int]): The maximum number of operations to fetch in each network call. If unspecified, the server will use a default value. [optional]
110            pagination_token (Optional[str]): When there are multiple pages of results, a pagination token is returned in the response. The token can be used
111                to fetch the next page of results. [optional]
112        """
113        done = False
114        while not done:
115            results = self.list_imports_paginated(**kwargs)
116            if len(results.data) > 0:
117                for op in results.data:
118                    yield op
119
120            if results.pagination:
121                kwargs.update({"pagination_token": results.pagination.next})
122            else:
123                done = True
124
125    def list_imports_paginated(
126        self,
127        limit: Optional[int] = None,
128        pagination_token: Optional[str] = None,
129        **kwargs,
130    ) -> ImportListResponse:
131        """
132        The list_imports_paginated operation returns information about import operations.
133        It returns operations in a paginated form, with a pagination token to fetch the next page of results.
134
135        Consider using the `list_imports` method to avoid having to handle pagination tokens manually.
136
137        Examples:
138            >>> results = index.list_imports_paginated(limit=5)
139            >>> results.pagination.next
140            eyJza2lwX3Bhc3QiOiI5OTMiLCJwcmVmaXgiOiI5OSJ9
141            >>> results.data[0]
142            {
143                "id": "6",
144                "uri": "s3://dev-bulk-import-datasets-pub/10-records-dim-10/",
145                "status": "Completed",
146                "percent_complete": 100.0,
147                "records_imported": 10,
148                "created_at": "2024-09-06T14:52:02.567776+00:00",
149                "finished_at": "2024-09-06T14:52:28.130717+00:00"
150            }
151            >>> next_results = index.list_imports_paginated(limit=5, pagination_token=results.pagination.next)
152
153        Args:
154            limit (Optional[int]): The maximum number of ids to return. If unspecified, the server will use a default value. [optional]
155            pagination_token (Optional[str]): A token needed to fetch the next page of results. This token is returned
156                in the response if additional results are available. [optional]
157
158        Returns: ImportListResponse object which contains the list of operations as ImportModel objects, pagination information,
159            and usage showing the number of read_units consumed.
160        """
161        args_dict = parse_non_empty_args(
162            [
163                ("limit", limit),
164                ("pagination_token", pagination_token),
165            ]
166        )
167        return self.__import_operations_api.list_imports(**args_dict)
168
169    def describe_import(self, id: str) -> ImportModel:
170        """
171        describe_import is used to get detailed information about a specific import operation.
172
173        Args:
174            id (str): The id of the import operation. This value is returned when
175            starting an import, and can be looked up using list_imports.
176
177        Returns:
178            ImportModel: An object containing operation id, status, and other details.
179        """
180        if isinstance(id, int):
181            id = str(id)
182        return self.__import_operations_api.describe_import(id=id)
183
184    def cancel_import(self, id: str):
185        """Cancel an import operation.
186
187        Args:
188            id (str): The id of the import operation to cancel.
189        """
190        if isinstance(id, int):
191            id = str(id)
192        return self.__import_operations_api.cancel_import(id=id)
class ImportErrorMode(enum.Enum):

Create a collection of name/value pairs.

Example enumeration:

>>> class Color(Enum):
...     RED = 1
...     BLUE = 2
...     GREEN = 3

Access them by:

  • attribute access::
>>> Color.RED
<Color.RED: 1>
  • value lookup:
>>> Color(1)
<Color.RED: 1>
  • name lookup:
>>> Color['RED']
<Color.RED: 1>

Enumerations can be iterated over, and know how many members they have:

>>> len(Color)
3
>>> list(Color)
[<Color.RED: 1>, <Color.BLUE: 2>, <Color.GREEN: 3>]

Methods can be added to enumerations, and members can have their own attributes -- see the documentation for details.

ABORT = <ImportErrorMode.ABORT: 'abort'>
CONTINUE = <ImportErrorMode.CONTINUE: 'continue'>
Inherited Members
enum.Enum
name
value
class ImportFeatureMixin:
 28class ImportFeatureMixin:
 29    def __init__(self, **kwargs):
 30        config = ConfigBuilder.build(
 31            **kwargs,
 32        )
 33        openapi_config = ConfigBuilder.build_openapi_config(config, kwargs.get("openapi_config", None))
 34
 35        if kwargs.get("__import_operations_api", None):
 36            self.__import_operations_api = kwargs.get("__import_operations_api")
 37        else:
 38            self.__import_operations_api = setup_openapi_client(
 39                api_client_klass=ApiClient,
 40                api_klass=BulkOperationsApi,
 41                config=config,
 42                openapi_config=openapi_config,
 43                pool_threads=kwargs.get("pool_threads", 1),
 44                api_version=API_VERSION,
 45            )
 46
 47    def start_import(
 48        self,
 49        uri: str,
 50        integration_id: Optional[str] = None,
 51        error_mode: Optional[Literal["CONTINUE", "ABORT"]] = "CONTINUE",
 52    ) -> StartImportResponse:
 53        """Import data from a storage provider into an index. The uri must start with the scheme of a supported
 54        storage provider. For buckets that are not publicly readable, you will also need to separately configure
 55        a storage integration and pass the integration id.
 56
 57        Examples:
 58            >>> from pinecone import Pinecone
 59            >>> index = Pinecone().Index('my-index')
 60            >>> index.start_import(uri="s3://bucket-name/path/to/data.parquet")
 61            { id: "1" }
 62
 63        Args:
 64            uri (str): The URI of the data to import. The URI must start with the scheme of a supported storage provider.
 65            integration_id (Optional[str], optional): If your bucket requires authentication to access, you need to pass the id of your storage integration using this property. Defaults to None.
 66            error_mode: Defaults to "CONTINUE". If set to "CONTINUE", the import operation will continue even if some
 67                records fail to import. Pass "ABORT" to stop the import operation if any records fail to import.
 68
 69        Returns:
 70            StartImportResponse: Contains the id of the import operation.
 71        """
 72        if isinstance(error_mode, ImportErrorMode):
 73            error_mode = error_mode.value
 74        elif isinstance(error_mode, str):
 75            try:
 76                error_mode = ImportErrorMode(error_mode.lower()).value
 77            except ValueError:
 78                raise ValueError(f"Invalid error_mode value: {error_mode}")
 79
 80        args_dict = parse_non_empty_args(
 81            [
 82                ("uri", uri),
 83                ("integration_id", integration_id),
 84                ("error_mode", ImportErrorModeClass(on_error=error_mode)),
 85            ]
 86        )
 87
 88        return self.__import_operations_api.start_import(StartImportRequest(**args_dict))
 89
 90    def list_imports(self, **kwargs) -> Iterator[List[ImportModel]]:
 91        """
 92        Returns a generator that yields each import operation. It automatically handles pagination tokens on your behalf so you can
 93        easily iterate over all results. The `list_imports` method accepts all of the same arguments as list_imports_paginated
 94
 95        ```python
 96        for op in index.list_imports():
 97            print(op)
 98        ```
 99
100        You can convert the generator into a list by wrapping the generator in a call to the built-in `list` function:
101
102        ```python
103        operations = list(index.list_imports())
104        ```
105
106        You should be cautious with this approach because it will fetch all operations at once, which could be a large number
107        of network calls and a lot of memory to hold the results.
108
109        Args:
110            limit (Optional[int]): The maximum number of operations to fetch in each network call. If unspecified, the server will use a default value. [optional]
111            pagination_token (Optional[str]): When there are multiple pages of results, a pagination token is returned in the response. The token can be used
112                to fetch the next page of results. [optional]
113        """
114        done = False
115        while not done:
116            results = self.list_imports_paginated(**kwargs)
117            if len(results.data) > 0:
118                for op in results.data:
119                    yield op
120
121            if results.pagination:
122                kwargs.update({"pagination_token": results.pagination.next})
123            else:
124                done = True
125
126    def list_imports_paginated(
127        self,
128        limit: Optional[int] = None,
129        pagination_token: Optional[str] = None,
130        **kwargs,
131    ) -> ImportListResponse:
132        """
133        The list_imports_paginated operation returns information about import operations.
134        It returns operations in a paginated form, with a pagination token to fetch the next page of results.
135
136        Consider using the `list_imports` method to avoid having to handle pagination tokens manually.
137
138        Examples:
139            >>> results = index.list_imports_paginated(limit=5)
140            >>> results.pagination.next
141            eyJza2lwX3Bhc3QiOiI5OTMiLCJwcmVmaXgiOiI5OSJ9
142            >>> results.data[0]
143            {
144                "id": "6",
145                "uri": "s3://dev-bulk-import-datasets-pub/10-records-dim-10/",
146                "status": "Completed",
147                "percent_complete": 100.0,
148                "records_imported": 10,
149                "created_at": "2024-09-06T14:52:02.567776+00:00",
150                "finished_at": "2024-09-06T14:52:28.130717+00:00"
151            }
152            >>> next_results = index.list_imports_paginated(limit=5, pagination_token=results.pagination.next)
153
154        Args:
155            limit (Optional[int]): The maximum number of ids to return. If unspecified, the server will use a default value. [optional]
156            pagination_token (Optional[str]): A token needed to fetch the next page of results. This token is returned
157                in the response if additional results are available. [optional]
158
159        Returns: ImportListResponse object which contains the list of operations as ImportModel objects, pagination information,
160            and usage showing the number of read_units consumed.
161        """
162        args_dict = parse_non_empty_args(
163            [
164                ("limit", limit),
165                ("pagination_token", pagination_token),
166            ]
167        )
168        return self.__import_operations_api.list_imports(**args_dict)
169
170    def describe_import(self, id: str) -> ImportModel:
171        """
172        describe_import is used to get detailed information about a specific import operation.
173
174        Args:
175            id (str): The id of the import operation. This value is returned when
176            starting an import, and can be looked up using list_imports.
177
178        Returns:
179            ImportModel: An object containing operation id, status, and other details.
180        """
181        if isinstance(id, int):
182            id = str(id)
183        return self.__import_operations_api.describe_import(id=id)
184
185    def cancel_import(self, id: str):
186        """Cancel an import operation.
187
188        Args:
189            id (str): The id of the import operation to cancel.
190        """
191        if isinstance(id, int):
192            id = str(id)
193        return self.__import_operations_api.cancel_import(id=id)
ImportFeatureMixin(**kwargs)
29    def __init__(self, **kwargs):
30        config = ConfigBuilder.build(
31            **kwargs,
32        )
33        openapi_config = ConfigBuilder.build_openapi_config(config, kwargs.get("openapi_config", None))
34
35        if kwargs.get("__import_operations_api", None):
36            self.__import_operations_api = kwargs.get("__import_operations_api")
37        else:
38            self.__import_operations_api = setup_openapi_client(
39                api_client_klass=ApiClient,
40                api_klass=BulkOperationsApi,
41                config=config,
42                openapi_config=openapi_config,
43                pool_threads=kwargs.get("pool_threads", 1),
44                api_version=API_VERSION,
45            )
def start_import( self, uri: str, integration_id: Optional[str] = None, error_mode: Optional[Literal['CONTINUE', 'ABORT']] = 'CONTINUE') -> pinecone.core_ea.openapi.db_data.model.start_import_response.StartImportResponse:
47    def start_import(
48        self,
49        uri: str,
50        integration_id: Optional[str] = None,
51        error_mode: Optional[Literal["CONTINUE", "ABORT"]] = "CONTINUE",
52    ) -> StartImportResponse:
53        """Import data from a storage provider into an index. The uri must start with the scheme of a supported
54        storage provider. For buckets that are not publicly readable, you will also need to separately configure
55        a storage integration and pass the integration id.
56
57        Examples:
58            >>> from pinecone import Pinecone
59            >>> index = Pinecone().Index('my-index')
60            >>> index.start_import(uri="s3://bucket-name/path/to/data.parquet")
61            { id: "1" }
62
63        Args:
64            uri (str): The URI of the data to import. The URI must start with the scheme of a supported storage provider.
65            integration_id (Optional[str], optional): If your bucket requires authentication to access, you need to pass the id of your storage integration using this property. Defaults to None.
66            error_mode: Defaults to "CONTINUE". If set to "CONTINUE", the import operation will continue even if some
67                records fail to import. Pass "ABORT" to stop the import operation if any records fail to import.
68
69        Returns:
70            StartImportResponse: Contains the id of the import operation.
71        """
72        if isinstance(error_mode, ImportErrorMode):
73            error_mode = error_mode.value
74        elif isinstance(error_mode, str):
75            try:
76                error_mode = ImportErrorMode(error_mode.lower()).value
77            except ValueError:
78                raise ValueError(f"Invalid error_mode value: {error_mode}")
79
80        args_dict = parse_non_empty_args(
81            [
82                ("uri", uri),
83                ("integration_id", integration_id),
84                ("error_mode", ImportErrorModeClass(on_error=error_mode)),
85            ]
86        )
87
88        return self.__import_operations_api.start_import(StartImportRequest(**args_dict))

Import data from a storage provider into an index. The uri must start with the scheme of a supported storage provider. For buckets that are not publicly readable, you will also need to separately configure a storage integration and pass the integration id.

Examples:
>>> from pinecone import Pinecone
>>> index = Pinecone().Index('my-index')
>>> index.start_import(uri="s3://bucket-name/path/to/data.parquet")
{ id: "1" }
Arguments:
  • uri (str): The URI of the data to import. The URI must start with the scheme of a supported storage provider.
  • integration_id (Optional[str], optional): If your bucket requires authentication to access, you need to pass the id of your storage integration using this property. Defaults to None.
  • error_mode: Defaults to "CONTINUE". If set to "CONTINUE", the import operation will continue even if some records fail to import. Pass "ABORT" to stop the import operation if any records fail to import.
Returns:

StartImportResponse: Contains the id of the import operation.

def list_imports( self, **kwargs) -> Iterator[List[pinecone.core_ea.openapi.db_data.model.import_model.ImportModel]]:
 90    def list_imports(self, **kwargs) -> Iterator[List[ImportModel]]:
 91        """
 92        Returns a generator that yields each import operation. It automatically handles pagination tokens on your behalf so you can
 93        easily iterate over all results. The `list_imports` method accepts all of the same arguments as list_imports_paginated
 94
 95        ```python
 96        for op in index.list_imports():
 97            print(op)
 98        ```
 99
100        You can convert the generator into a list by wrapping the generator in a call to the built-in `list` function:
101
102        ```python
103        operations = list(index.list_imports())
104        ```
105
106        You should be cautious with this approach because it will fetch all operations at once, which could be a large number
107        of network calls and a lot of memory to hold the results.
108
109        Args:
110            limit (Optional[int]): The maximum number of operations to fetch in each network call. If unspecified, the server will use a default value. [optional]
111            pagination_token (Optional[str]): When there are multiple pages of results, a pagination token is returned in the response. The token can be used
112                to fetch the next page of results. [optional]
113        """
114        done = False
115        while not done:
116            results = self.list_imports_paginated(**kwargs)
117            if len(results.data) > 0:
118                for op in results.data:
119                    yield op
120
121            if results.pagination:
122                kwargs.update({"pagination_token": results.pagination.next})
123            else:
124                done = True

Returns a generator that yields each import operation. It automatically handles pagination tokens on your behalf so you can easily iterate over all results. The list_imports method accepts all of the same arguments as list_imports_paginated

for op in index.list_imports():
    print(op)

You can convert the generator into a list by wrapping the generator in a call to the built-in list function:

operations = list(index.list_imports())

You should be cautious with this approach because it will fetch all operations at once, which could be a large number of network calls and a lot of memory to hold the results.

Arguments:
  • limit (Optional[int]): The maximum number of operations to fetch in each network call. If unspecified, the server will use a default value. [optional]
  • pagination_token (Optional[str]): When there are multiple pages of results, a pagination token is returned in the response. The token can be used to fetch the next page of results. [optional]
def list_imports_paginated( self, limit: Optional[int] = None, pagination_token: Optional[str] = None, **kwargs) -> pinecone.core_ea.openapi.db_data.model.import_list_response.ImportListResponse:
126    def list_imports_paginated(
127        self,
128        limit: Optional[int] = None,
129        pagination_token: Optional[str] = None,
130        **kwargs,
131    ) -> ImportListResponse:
132        """
133        The list_imports_paginated operation returns information about import operations.
134        It returns operations in a paginated form, with a pagination token to fetch the next page of results.
135
136        Consider using the `list_imports` method to avoid having to handle pagination tokens manually.
137
138        Examples:
139            >>> results = index.list_imports_paginated(limit=5)
140            >>> results.pagination.next
141            eyJza2lwX3Bhc3QiOiI5OTMiLCJwcmVmaXgiOiI5OSJ9
142            >>> results.data[0]
143            {
144                "id": "6",
145                "uri": "s3://dev-bulk-import-datasets-pub/10-records-dim-10/",
146                "status": "Completed",
147                "percent_complete": 100.0,
148                "records_imported": 10,
149                "created_at": "2024-09-06T14:52:02.567776+00:00",
150                "finished_at": "2024-09-06T14:52:28.130717+00:00"
151            }
152            >>> next_results = index.list_imports_paginated(limit=5, pagination_token=results.pagination.next)
153
154        Args:
155            limit (Optional[int]): The maximum number of ids to return. If unspecified, the server will use a default value. [optional]
156            pagination_token (Optional[str]): A token needed to fetch the next page of results. This token is returned
157                in the response if additional results are available. [optional]
158
159        Returns: ImportListResponse object which contains the list of operations as ImportModel objects, pagination information,
160            and usage showing the number of read_units consumed.
161        """
162        args_dict = parse_non_empty_args(
163            [
164                ("limit", limit),
165                ("pagination_token", pagination_token),
166            ]
167        )
168        return self.__import_operations_api.list_imports(**args_dict)

The list_imports_paginated operation returns information about import operations. It returns operations in a paginated form, with a pagination token to fetch the next page of results.

Consider using the list_imports method to avoid having to handle pagination tokens manually.

Examples:
>>> results = index.list_imports_paginated(limit=5)
>>> results.pagination.next
eyJza2lwX3Bhc3QiOiI5OTMiLCJwcmVmaXgiOiI5OSJ9
>>> results.data[0]
{
    "id": "6",
    "uri": "s3://dev-bulk-import-datasets-pub/10-records-dim-10/",
    "status": "Completed",
    "percent_complete": 100.0,
    "records_imported": 10,
    "created_at": "2024-09-06T14:52:02.567776+00:00",
    "finished_at": "2024-09-06T14:52:28.130717+00:00"
}
>>> next_results = index.list_imports_paginated(limit=5, pagination_token=results.pagination.next)
Arguments:
  • limit (Optional[int]): The maximum number of ids to return. If unspecified, the server will use a default value. [optional]
  • pagination_token (Optional[str]): A token needed to fetch the next page of results. This token is returned in the response if additional results are available. [optional]

Returns: ImportListResponse object which contains the list of operations as ImportModel objects, pagination information, and usage showing the number of read_units consumed.

def describe_import( self, id: str) -> pinecone.core_ea.openapi.db_data.model.import_model.ImportModel:
170    def describe_import(self, id: str) -> ImportModel:
171        """
172        describe_import is used to get detailed information about a specific import operation.
173
174        Args:
175            id (str): The id of the import operation. This value is returned when
176            starting an import, and can be looked up using list_imports.
177
178        Returns:
179            ImportModel: An object containing operation id, status, and other details.
180        """
181        if isinstance(id, int):
182            id = str(id)
183        return self.__import_operations_api.describe_import(id=id)

describe_import is used to get detailed information about a specific import operation.

Arguments:
  • id (str): The id of the import operation. This value is returned when
  • starting an import, and can be looked up using list_imports.
Returns:

ImportModel: An object containing operation id, status, and other details.

def cancel_import(self, id: str):
185    def cancel_import(self, id: str):
186        """Cancel an import operation.
187
188        Args:
189            id (str): The id of the import operation to cancel.
190        """
191        if isinstance(id, int):
192            id = str(id)
193        return self.__import_operations_api.cancel_import(id=id)

Cancel an import operation.

Arguments:
  • id (str): The id of the import operation to cancel.