pinecone.data.index

  1from pinecone.utils.tqdm import tqdm
  2
  3import logging
  4import json
  5from typing import Union, List, Optional, Dict, Any, Literal
  6
  7from pinecone.config import ConfigBuilder
  8
  9from pinecone.openapi_support import ApiClient
 10from pinecone.core.openapi.db_data.api.vector_operations_api import VectorOperationsApi
 11from pinecone.core.openapi.db_data import API_VERSION
 12from pinecone.core.openapi.db_data.models import (
 13    QueryResponse,
 14    IndexDescription as DescribeIndexStatsResponse,
 15    UpsertResponse,
 16    ListResponse,
 17    SearchRecordsResponse,
 18)
 19from .dataclasses import Vector, SparseValues, FetchResponse, SearchQuery, SearchRerank
 20from .interfaces import IndexInterface
 21from .request_factory import IndexRequestFactory
 22from .features.bulk_import import ImportFeatureMixin
 23from .types import (
 24    SparseVectorTypedDict,
 25    VectorTypedDict,
 26    VectorMetadataTypedDict,
 27    VectorTuple,
 28    VectorTupleWithMetadata,
 29    FilterTypedDict,
 30    SearchRerankTypedDict,
 31    SearchQueryTypedDict,
 32)
 33from ..utils import (
 34    setup_openapi_client,
 35    parse_non_empty_args,
 36    validate_and_convert_errors,
 37    filter_dict,
 38    PluginAware,
 39)
 40from .query_results_aggregator import QueryResultsAggregator, QueryNamespacesResults
 41from pinecone.openapi_support import OPENAPI_ENDPOINT_PARAMS
 42
 43from multiprocessing.pool import ApplyResult
 44from multiprocessing import cpu_count
 45from concurrent.futures import as_completed
 46
 47
 48logger = logging.getLogger(__name__)
 49""" @private """
 50
 51
 52def parse_query_response(response: QueryResponse):
 53    """@private"""
 54    response._data_store.pop("results", None)
 55    return response
 56
 57
 58class Index(IndexInterface, ImportFeatureMixin, PluginAware):
 59    """
 60    A client for interacting with a Pinecone index via REST API.
 61    For improved performance, use the Pinecone GRPC index client.
 62    """
 63
 64    def __init__(
 65        self,
 66        api_key: str,
 67        host: str,
 68        pool_threads: Optional[int] = None,
 69        additional_headers: Optional[Dict[str, str]] = {},
 70        openapi_config=None,
 71        **kwargs,
 72    ):
 73        self.config = ConfigBuilder.build(
 74            api_key=api_key, host=host, additional_headers=additional_headers, **kwargs
 75        )
 76        """ @private """
 77        self.openapi_config = ConfigBuilder.build_openapi_config(self.config, openapi_config)
 78        """ @private """
 79
 80        if pool_threads is None:
 81            self.pool_threads = 5 * cpu_count()
 82            """ @private """
 83        else:
 84            self.pool_threads = pool_threads
 85            """ @private """
 86
 87        if kwargs.get("connection_pool_maxsize", None):
 88            self.openapi_config.connection_pool_maxsize = kwargs.get("connection_pool_maxsize")
 89
 90        self._vector_api = setup_openapi_client(
 91            api_client_klass=ApiClient,
 92            api_klass=VectorOperationsApi,
 93            config=self.config,
 94            openapi_config=self.openapi_config,
 95            pool_threads=pool_threads,
 96            api_version=API_VERSION,
 97        )
 98
 99        self._api_client = self._vector_api.api_client
100
101        # Pass the same api_client to the ImportFeatureMixin
102        super().__init__(api_client=self._api_client)
103
104        self.load_plugins(
105            config=self.config, openapi_config=self.openapi_config, pool_threads=self.pool_threads
106        )
107
108    def _openapi_kwargs(self, kwargs: Dict[str, Any]) -> Dict[str, Any]:
109        return filter_dict(kwargs, OPENAPI_ENDPOINT_PARAMS)
110
111    def __enter__(self):
112        return self
113
114    def __exit__(self, exc_type, exc_value, traceback):
115        self._vector_api.api_client.close()
116
117    def close(self):
118        self._vector_api.api_client.close()
119
120    @validate_and_convert_errors
121    def upsert(
122        self,
123        vectors: Union[
124            List[Vector], List[VectorTuple], List[VectorTupleWithMetadata], List[VectorTypedDict]
125        ],
126        namespace: Optional[str] = None,
127        batch_size: Optional[int] = None,
128        show_progress: bool = True,
129        **kwargs,
130    ) -> UpsertResponse:
131        _check_type = kwargs.pop("_check_type", True)
132
133        if kwargs.get("async_req", False) and batch_size is not None:
134            raise ValueError(
135                "async_req is not supported when batch_size is provided."
136                "To upsert in parallel, please follow: "
137                "https://docs.pinecone.io/docs/insert-data#sending-upserts-in-parallel"
138            )
139
140        if batch_size is None:
141            return self._upsert_batch(vectors, namespace, _check_type, **kwargs)
142
143        if not isinstance(batch_size, int) or batch_size <= 0:
144            raise ValueError("batch_size must be a positive integer")
145
146        pbar = tqdm(total=len(vectors), disable=not show_progress, desc="Upserted vectors")
147        total_upserted = 0
148        for i in range(0, len(vectors), batch_size):
149            batch_result = self._upsert_batch(
150                vectors[i : i + batch_size], namespace, _check_type, **kwargs
151            )
152            pbar.update(batch_result.upserted_count)
153            # we can't use here pbar.n for the case show_progress=False
154            total_upserted += batch_result.upserted_count
155
156        return UpsertResponse(upserted_count=total_upserted)
157
158    def _upsert_batch(
159        self,
160        vectors: Union[
161            List[Vector], List[VectorTuple], List[VectorTupleWithMetadata], List[VectorTypedDict]
162        ],
163        namespace: Optional[str],
164        _check_type: bool,
165        **kwargs,
166    ) -> UpsertResponse:
167        return self._vector_api.upsert_vectors(
168            IndexRequestFactory.upsert_request(vectors, namespace, _check_type, **kwargs),
169            **self._openapi_kwargs(kwargs),
170        )
171
172    @staticmethod
173    def _iter_dataframe(df, batch_size):
174        for i in range(0, len(df), batch_size):
175            batch = df.iloc[i : i + batch_size].to_dict(orient="records")
176            yield batch
177
178    @validate_and_convert_errors
179    def upsert_from_dataframe(
180        self, df, namespace: Optional[str] = None, batch_size: int = 500, show_progress: bool = True
181    ) -> UpsertResponse:
182        try:
183            import pandas as pd
184        except ImportError:
185            raise RuntimeError(
186                "The `pandas` package is not installed. Please install pandas to use `upsert_from_dataframe()`"
187            )
188
189        if not isinstance(df, pd.DataFrame):
190            raise ValueError(f"Only pandas dataframes are supported. Found: {type(df)}")
191
192        pbar = tqdm(total=len(df), disable=not show_progress, desc="sending upsert requests")
193        results = []
194        for chunk in self._iter_dataframe(df, batch_size=batch_size):
195            res = self.upsert(vectors=chunk, namespace=namespace)
196            pbar.update(len(chunk))
197            results.append(res)
198
199        upserted_count = 0
200        for res in results:
201            upserted_count += res.upserted_count
202
203        return UpsertResponse(upserted_count=upserted_count)
204
205    def upsert_records(self, namespace: str, records: List[Dict]):
206        args = IndexRequestFactory.upsert_records_args(namespace=namespace, records=records)
207        self._vector_api.upsert_records_namespace(**args)
208
209    @validate_and_convert_errors
210    def search(
211        self,
212        namespace: str,
213        query: Union[SearchQueryTypedDict, SearchQuery],
214        rerank: Optional[Union[SearchRerankTypedDict, SearchRerank]] = None,
215        fields: Optional[List[str]] = ["*"],  # Default to returning all fields
216    ) -> SearchRecordsResponse:
217        if namespace is None:
218            raise Exception("Namespace is required when searching records")
219
220        request = IndexRequestFactory.search_request(query=query, rerank=rerank, fields=fields)
221
222        return self._vector_api.search_records_namespace(namespace, request)
223
224    @validate_and_convert_errors
225    def search_records(
226        self,
227        namespace: str,
228        query: Union[SearchQueryTypedDict, SearchQuery],
229        rerank: Optional[Union[SearchRerankTypedDict, SearchRerank]] = None,
230        fields: Optional[List[str]] = ["*"],  # Default to returning all fields
231    ) -> SearchRecordsResponse:
232        return self.search(namespace, query=query, rerank=rerank, fields=fields)
233
234    @validate_and_convert_errors
235    def delete(
236        self,
237        ids: Optional[List[str]] = None,
238        delete_all: Optional[bool] = None,
239        namespace: Optional[str] = None,
240        filter: Optional[Dict[str, Union[str, float, int, bool, List, dict]]] = None,
241        **kwargs,
242    ) -> Dict[str, Any]:
243        return self._vector_api.delete_vectors(
244            IndexRequestFactory.delete_request(
245                ids=ids, delete_all=delete_all, namespace=namespace, filter=filter, **kwargs
246            ),
247            **self._openapi_kwargs(kwargs),
248        )
249
250    @validate_and_convert_errors
251    def fetch(self, ids: List[str], namespace: Optional[str] = None, **kwargs) -> FetchResponse:
252        args_dict = parse_non_empty_args([("namespace", namespace)])
253        result = self._vector_api.fetch_vectors(ids=ids, **args_dict, **kwargs)
254        return FetchResponse(
255            namespace=result.namespace,
256            vectors={k: Vector.from_dict(v) for k, v in result.vectors.items()},
257            usage=result.usage,
258        )
259
260    @validate_and_convert_errors
261    def query(
262        self,
263        *args,
264        top_k: int,
265        vector: Optional[List[float]] = None,
266        id: Optional[str] = None,
267        namespace: Optional[str] = None,
268        filter: Optional[FilterTypedDict] = None,
269        include_values: Optional[bool] = None,
270        include_metadata: Optional[bool] = None,
271        sparse_vector: Optional[Union[SparseValues, SparseVectorTypedDict]] = None,
272        **kwargs,
273    ) -> Union[QueryResponse, ApplyResult]:
274        response = self._query(
275            *args,
276            top_k=top_k,
277            vector=vector,
278            id=id,
279            namespace=namespace,
280            filter=filter,
281            include_values=include_values,
282            include_metadata=include_metadata,
283            sparse_vector=sparse_vector,
284            **kwargs,
285        )
286
287        if kwargs.get("async_req", False) or kwargs.get("async_threadpool_executor", False):
288            return response
289        else:
290            return parse_query_response(response)
291
292    def _query(
293        self,
294        *args,
295        top_k: int,
296        vector: Optional[List[float]] = None,
297        id: Optional[str] = None,
298        namespace: Optional[str] = None,
299        filter: Optional[FilterTypedDict] = None,
300        include_values: Optional[bool] = None,
301        include_metadata: Optional[bool] = None,
302        sparse_vector: Optional[Union[SparseValues, SparseVectorTypedDict]] = None,
303        **kwargs,
304    ) -> QueryResponse:
305        if len(args) > 0:
306            raise ValueError(
307                "The argument order for `query()` has changed; please use keyword arguments instead of positional arguments. Example: index.query(vector=[0.1, 0.2, 0.3], top_k=10, namespace='my_namespace')"
308            )
309
310        if top_k < 1:
311            raise ValueError("top_k must be a positive integer")
312
313        request = IndexRequestFactory.query_request(
314            top_k=top_k,
315            vector=vector,
316            id=id,
317            namespace=namespace,
318            filter=filter,
319            include_values=include_values,
320            include_metadata=include_metadata,
321            sparse_vector=sparse_vector,
322            **kwargs,
323        )
324        return self._vector_api.query_vectors(request, **self._openapi_kwargs(kwargs))
325
326    @validate_and_convert_errors
327    def query_namespaces(
328        self,
329        vector: Optional[List[float]],
330        namespaces: List[str],
331        metric: Literal["cosine", "euclidean", "dotproduct"],
332        top_k: Optional[int] = None,
333        filter: Optional[Dict[str, Union[str, float, int, bool, List, dict]]] = None,
334        include_values: Optional[bool] = None,
335        include_metadata: Optional[bool] = None,
336        sparse_vector: Optional[
337            Union[SparseValues, Dict[str, Union[List[float], List[int]]]]
338        ] = None,
339        **kwargs,
340    ) -> QueryNamespacesResults:
341        if namespaces is None or len(namespaces) == 0:
342            raise ValueError("At least one namespace must be specified")
343        if sparse_vector is None and vector is not None and len(vector) == 0:
344            # If querying with a vector, it must not be empty
345            raise ValueError("Query vector must not be empty")
346
347        overall_topk = top_k if top_k is not None else 10
348        aggregator = QueryResultsAggregator(top_k=overall_topk, metric=metric)
349
350        target_namespaces = set(namespaces)  # dedup namespaces
351        async_futures = [
352            self.query(
353                vector=vector,
354                namespace=ns,
355                top_k=overall_topk,
356                filter=filter,
357                include_values=include_values,
358                include_metadata=include_metadata,
359                sparse_vector=sparse_vector,
360                async_threadpool_executor=True,
361                _preload_content=False,
362                **kwargs,
363            )
364            for ns in target_namespaces
365        ]
366
367        for result in as_completed(async_futures):
368            raw_result = result.result()
369            response = json.loads(raw_result.data.decode("utf-8"))
370            aggregator.add_results(response)
371
372        final_results = aggregator.get_results()
373        return final_results
374
375    @validate_and_convert_errors
376    def update(
377        self,
378        id: str,
379        values: Optional[List[float]] = None,
380        set_metadata: Optional[VectorMetadataTypedDict] = None,
381        namespace: Optional[str] = None,
382        sparse_values: Optional[Union[SparseValues, SparseVectorTypedDict]] = None,
383        **kwargs,
384    ) -> Dict[str, Any]:
385        return self._vector_api.update_vector(
386            IndexRequestFactory.update_request(
387                id=id,
388                values=values,
389                set_metadata=set_metadata,
390                namespace=namespace,
391                sparse_values=sparse_values,
392                **kwargs,
393            ),
394            **self._openapi_kwargs(kwargs),
395        )
396
397    @validate_and_convert_errors
398    def describe_index_stats(
399        self, filter: Optional[FilterTypedDict] = None, **kwargs
400    ) -> DescribeIndexStatsResponse:
401        return self._vector_api.describe_index_stats(
402            IndexRequestFactory.describe_index_stats_request(filter, **kwargs),
403            **self._openapi_kwargs(kwargs),
404        )
405
406    @validate_and_convert_errors
407    def list_paginated(
408        self,
409        prefix: Optional[str] = None,
410        limit: Optional[int] = None,
411        pagination_token: Optional[str] = None,
412        namespace: Optional[str] = None,
413        **kwargs,
414    ) -> ListResponse:
415        args_dict = IndexRequestFactory.list_paginated_args(
416            prefix=prefix,
417            limit=limit,
418            pagination_token=pagination_token,
419            namespace=namespace,
420            **kwargs,
421        )
422        return self._vector_api.list_vectors(**args_dict, **kwargs)
423
424    @validate_and_convert_errors
425    def list(self, **kwargs):
426        done = False
427        while not done:
428            results = self.list_paginated(**kwargs)
429            if len(results.vectors) > 0:
430                yield [v.id for v in results.vectors]
431
432            if results.pagination:
433                kwargs.update({"pagination_token": results.pagination.next})
434            else:
435                done = True
class Index(pinecone.data.interfaces.IndexInterface, pinecone.data.features.bulk_import.bulk_import.ImportFeatureMixin, pinecone.utils.plugin_aware.PluginAware):
 59class Index(IndexInterface, ImportFeatureMixin, PluginAware):
 60    """
 61    A client for interacting with a Pinecone index via REST API.
 62    For improved performance, use the Pinecone GRPC index client.
 63    """
 64
 65    def __init__(
 66        self,
 67        api_key: str,
 68        host: str,
 69        pool_threads: Optional[int] = None,
 70        additional_headers: Optional[Dict[str, str]] = {},
 71        openapi_config=None,
 72        **kwargs,
 73    ):
 74        self.config = ConfigBuilder.build(
 75            api_key=api_key, host=host, additional_headers=additional_headers, **kwargs
 76        )
 77        """ @private """
 78        self.openapi_config = ConfigBuilder.build_openapi_config(self.config, openapi_config)
 79        """ @private """
 80
 81        if pool_threads is None:
 82            self.pool_threads = 5 * cpu_count()
 83            """ @private """
 84        else:
 85            self.pool_threads = pool_threads
 86            """ @private """
 87
 88        if kwargs.get("connection_pool_maxsize", None):
 89            self.openapi_config.connection_pool_maxsize = kwargs.get("connection_pool_maxsize")
 90
 91        self._vector_api = setup_openapi_client(
 92            api_client_klass=ApiClient,
 93            api_klass=VectorOperationsApi,
 94            config=self.config,
 95            openapi_config=self.openapi_config,
 96            pool_threads=pool_threads,
 97            api_version=API_VERSION,
 98        )
 99
100        self._api_client = self._vector_api.api_client
101
102        # Pass the same api_client to the ImportFeatureMixin
103        super().__init__(api_client=self._api_client)
104
105        self.load_plugins(
106            config=self.config, openapi_config=self.openapi_config, pool_threads=self.pool_threads
107        )
108
109    def _openapi_kwargs(self, kwargs: Dict[str, Any]) -> Dict[str, Any]:
110        return filter_dict(kwargs, OPENAPI_ENDPOINT_PARAMS)
111
112    def __enter__(self):
113        return self
114
115    def __exit__(self, exc_type, exc_value, traceback):
116        self._vector_api.api_client.close()
117
118    def close(self):
119        self._vector_api.api_client.close()
120
121    @validate_and_convert_errors
122    def upsert(
123        self,
124        vectors: Union[
125            List[Vector], List[VectorTuple], List[VectorTupleWithMetadata], List[VectorTypedDict]
126        ],
127        namespace: Optional[str] = None,
128        batch_size: Optional[int] = None,
129        show_progress: bool = True,
130        **kwargs,
131    ) -> UpsertResponse:
132        _check_type = kwargs.pop("_check_type", True)
133
134        if kwargs.get("async_req", False) and batch_size is not None:
135            raise ValueError(
136                "async_req is not supported when batch_size is provided."
137                "To upsert in parallel, please follow: "
138                "https://docs.pinecone.io/docs/insert-data#sending-upserts-in-parallel"
139            )
140
141        if batch_size is None:
142            return self._upsert_batch(vectors, namespace, _check_type, **kwargs)
143
144        if not isinstance(batch_size, int) or batch_size <= 0:
145            raise ValueError("batch_size must be a positive integer")
146
147        pbar = tqdm(total=len(vectors), disable=not show_progress, desc="Upserted vectors")
148        total_upserted = 0
149        for i in range(0, len(vectors), batch_size):
150            batch_result = self._upsert_batch(
151                vectors[i : i + batch_size], namespace, _check_type, **kwargs
152            )
153            pbar.update(batch_result.upserted_count)
154            # we can't use here pbar.n for the case show_progress=False
155            total_upserted += batch_result.upserted_count
156
157        return UpsertResponse(upserted_count=total_upserted)
158
159    def _upsert_batch(
160        self,
161        vectors: Union[
162            List[Vector], List[VectorTuple], List[VectorTupleWithMetadata], List[VectorTypedDict]
163        ],
164        namespace: Optional[str],
165        _check_type: bool,
166        **kwargs,
167    ) -> UpsertResponse:
168        return self._vector_api.upsert_vectors(
169            IndexRequestFactory.upsert_request(vectors, namespace, _check_type, **kwargs),
170            **self._openapi_kwargs(kwargs),
171        )
172
173    @staticmethod
174    def _iter_dataframe(df, batch_size):
175        for i in range(0, len(df), batch_size):
176            batch = df.iloc[i : i + batch_size].to_dict(orient="records")
177            yield batch
178
179    @validate_and_convert_errors
180    def upsert_from_dataframe(
181        self, df, namespace: Optional[str] = None, batch_size: int = 500, show_progress: bool = True
182    ) -> UpsertResponse:
183        try:
184            import pandas as pd
185        except ImportError:
186            raise RuntimeError(
187                "The `pandas` package is not installed. Please install pandas to use `upsert_from_dataframe()`"
188            )
189
190        if not isinstance(df, pd.DataFrame):
191            raise ValueError(f"Only pandas dataframes are supported. Found: {type(df)}")
192
193        pbar = tqdm(total=len(df), disable=not show_progress, desc="sending upsert requests")
194        results = []
195        for chunk in self._iter_dataframe(df, batch_size=batch_size):
196            res = self.upsert(vectors=chunk, namespace=namespace)
197            pbar.update(len(chunk))
198            results.append(res)
199
200        upserted_count = 0
201        for res in results:
202            upserted_count += res.upserted_count
203
204        return UpsertResponse(upserted_count=upserted_count)
205
206    def upsert_records(self, namespace: str, records: List[Dict]):
207        args = IndexRequestFactory.upsert_records_args(namespace=namespace, records=records)
208        self._vector_api.upsert_records_namespace(**args)
209
210    @validate_and_convert_errors
211    def search(
212        self,
213        namespace: str,
214        query: Union[SearchQueryTypedDict, SearchQuery],
215        rerank: Optional[Union[SearchRerankTypedDict, SearchRerank]] = None,
216        fields: Optional[List[str]] = ["*"],  # Default to returning all fields
217    ) -> SearchRecordsResponse:
218        if namespace is None:
219            raise Exception("Namespace is required when searching records")
220
221        request = IndexRequestFactory.search_request(query=query, rerank=rerank, fields=fields)
222
223        return self._vector_api.search_records_namespace(namespace, request)
224
225    @validate_and_convert_errors
226    def search_records(
227        self,
228        namespace: str,
229        query: Union[SearchQueryTypedDict, SearchQuery],
230        rerank: Optional[Union[SearchRerankTypedDict, SearchRerank]] = None,
231        fields: Optional[List[str]] = ["*"],  # Default to returning all fields
232    ) -> SearchRecordsResponse:
233        return self.search(namespace, query=query, rerank=rerank, fields=fields)
234
235    @validate_and_convert_errors
236    def delete(
237        self,
238        ids: Optional[List[str]] = None,
239        delete_all: Optional[bool] = None,
240        namespace: Optional[str] = None,
241        filter: Optional[Dict[str, Union[str, float, int, bool, List, dict]]] = None,
242        **kwargs,
243    ) -> Dict[str, Any]:
244        return self._vector_api.delete_vectors(
245            IndexRequestFactory.delete_request(
246                ids=ids, delete_all=delete_all, namespace=namespace, filter=filter, **kwargs
247            ),
248            **self._openapi_kwargs(kwargs),
249        )
250
251    @validate_and_convert_errors
252    def fetch(self, ids: List[str], namespace: Optional[str] = None, **kwargs) -> FetchResponse:
253        args_dict = parse_non_empty_args([("namespace", namespace)])
254        result = self._vector_api.fetch_vectors(ids=ids, **args_dict, **kwargs)
255        return FetchResponse(
256            namespace=result.namespace,
257            vectors={k: Vector.from_dict(v) for k, v in result.vectors.items()},
258            usage=result.usage,
259        )
260
261    @validate_and_convert_errors
262    def query(
263        self,
264        *args,
265        top_k: int,
266        vector: Optional[List[float]] = None,
267        id: Optional[str] = None,
268        namespace: Optional[str] = None,
269        filter: Optional[FilterTypedDict] = None,
270        include_values: Optional[bool] = None,
271        include_metadata: Optional[bool] = None,
272        sparse_vector: Optional[Union[SparseValues, SparseVectorTypedDict]] = None,
273        **kwargs,
274    ) -> Union[QueryResponse, ApplyResult]:
275        response = self._query(
276            *args,
277            top_k=top_k,
278            vector=vector,
279            id=id,
280            namespace=namespace,
281            filter=filter,
282            include_values=include_values,
283            include_metadata=include_metadata,
284            sparse_vector=sparse_vector,
285            **kwargs,
286        )
287
288        if kwargs.get("async_req", False) or kwargs.get("async_threadpool_executor", False):
289            return response
290        else:
291            return parse_query_response(response)
292
293    def _query(
294        self,
295        *args,
296        top_k: int,
297        vector: Optional[List[float]] = None,
298        id: Optional[str] = None,
299        namespace: Optional[str] = None,
300        filter: Optional[FilterTypedDict] = None,
301        include_values: Optional[bool] = None,
302        include_metadata: Optional[bool] = None,
303        sparse_vector: Optional[Union[SparseValues, SparseVectorTypedDict]] = None,
304        **kwargs,
305    ) -> QueryResponse:
306        if len(args) > 0:
307            raise ValueError(
308                "The argument order for `query()` has changed; please use keyword arguments instead of positional arguments. Example: index.query(vector=[0.1, 0.2, 0.3], top_k=10, namespace='my_namespace')"
309            )
310
311        if top_k < 1:
312            raise ValueError("top_k must be a positive integer")
313
314        request = IndexRequestFactory.query_request(
315            top_k=top_k,
316            vector=vector,
317            id=id,
318            namespace=namespace,
319            filter=filter,
320            include_values=include_values,
321            include_metadata=include_metadata,
322            sparse_vector=sparse_vector,
323            **kwargs,
324        )
325        return self._vector_api.query_vectors(request, **self._openapi_kwargs(kwargs))
326
327    @validate_and_convert_errors
328    def query_namespaces(
329        self,
330        vector: Optional[List[float]],
331        namespaces: List[str],
332        metric: Literal["cosine", "euclidean", "dotproduct"],
333        top_k: Optional[int] = None,
334        filter: Optional[Dict[str, Union[str, float, int, bool, List, dict]]] = None,
335        include_values: Optional[bool] = None,
336        include_metadata: Optional[bool] = None,
337        sparse_vector: Optional[
338            Union[SparseValues, Dict[str, Union[List[float], List[int]]]]
339        ] = None,
340        **kwargs,
341    ) -> QueryNamespacesResults:
342        if namespaces is None or len(namespaces) == 0:
343            raise ValueError("At least one namespace must be specified")
344        if sparse_vector is None and vector is not None and len(vector) == 0:
345            # If querying with a vector, it must not be empty
346            raise ValueError("Query vector must not be empty")
347
348        overall_topk = top_k if top_k is not None else 10
349        aggregator = QueryResultsAggregator(top_k=overall_topk, metric=metric)
350
351        target_namespaces = set(namespaces)  # dedup namespaces
352        async_futures = [
353            self.query(
354                vector=vector,
355                namespace=ns,
356                top_k=overall_topk,
357                filter=filter,
358                include_values=include_values,
359                include_metadata=include_metadata,
360                sparse_vector=sparse_vector,
361                async_threadpool_executor=True,
362                _preload_content=False,
363                **kwargs,
364            )
365            for ns in target_namespaces
366        ]
367
368        for result in as_completed(async_futures):
369            raw_result = result.result()
370            response = json.loads(raw_result.data.decode("utf-8"))
371            aggregator.add_results(response)
372
373        final_results = aggregator.get_results()
374        return final_results
375
376    @validate_and_convert_errors
377    def update(
378        self,
379        id: str,
380        values: Optional[List[float]] = None,
381        set_metadata: Optional[VectorMetadataTypedDict] = None,
382        namespace: Optional[str] = None,
383        sparse_values: Optional[Union[SparseValues, SparseVectorTypedDict]] = None,
384        **kwargs,
385    ) -> Dict[str, Any]:
386        return self._vector_api.update_vector(
387            IndexRequestFactory.update_request(
388                id=id,
389                values=values,
390                set_metadata=set_metadata,
391                namespace=namespace,
392                sparse_values=sparse_values,
393                **kwargs,
394            ),
395            **self._openapi_kwargs(kwargs),
396        )
397
398    @validate_and_convert_errors
399    def describe_index_stats(
400        self, filter: Optional[FilterTypedDict] = None, **kwargs
401    ) -> DescribeIndexStatsResponse:
402        return self._vector_api.describe_index_stats(
403            IndexRequestFactory.describe_index_stats_request(filter, **kwargs),
404            **self._openapi_kwargs(kwargs),
405        )
406
407    @validate_and_convert_errors
408    def list_paginated(
409        self,
410        prefix: Optional[str] = None,
411        limit: Optional[int] = None,
412        pagination_token: Optional[str] = None,
413        namespace: Optional[str] = None,
414        **kwargs,
415    ) -> ListResponse:
416        args_dict = IndexRequestFactory.list_paginated_args(
417            prefix=prefix,
418            limit=limit,
419            pagination_token=pagination_token,
420            namespace=namespace,
421            **kwargs,
422        )
423        return self._vector_api.list_vectors(**args_dict, **kwargs)
424
425    @validate_and_convert_errors
426    def list(self, **kwargs):
427        done = False
428        while not done:
429            results = self.list_paginated(**kwargs)
430            if len(results.vectors) > 0:
431                yield [v.id for v in results.vectors]
432
433            if results.pagination:
434                kwargs.update({"pagination_token": results.pagination.next})
435            else:
436                done = True

A client for interacting with a Pinecone index via REST API. For improved performance, use the Pinecone GRPC index client.

Index( api_key: str, host: str, pool_threads: Optional[int] = None, additional_headers: Optional[Dict[str, str]] = {}, openapi_config=None, **kwargs)
 65    def __init__(
 66        self,
 67        api_key: str,
 68        host: str,
 69        pool_threads: Optional[int] = None,
 70        additional_headers: Optional[Dict[str, str]] = {},
 71        openapi_config=None,
 72        **kwargs,
 73    ):
 74        self.config = ConfigBuilder.build(
 75            api_key=api_key, host=host, additional_headers=additional_headers, **kwargs
 76        )
 77        """ @private """
 78        self.openapi_config = ConfigBuilder.build_openapi_config(self.config, openapi_config)
 79        """ @private """
 80
 81        if pool_threads is None:
 82            self.pool_threads = 5 * cpu_count()
 83            """ @private """
 84        else:
 85            self.pool_threads = pool_threads
 86            """ @private """
 87
 88        if kwargs.get("connection_pool_maxsize", None):
 89            self.openapi_config.connection_pool_maxsize = kwargs.get("connection_pool_maxsize")
 90
 91        self._vector_api = setup_openapi_client(
 92            api_client_klass=ApiClient,
 93            api_klass=VectorOperationsApi,
 94            config=self.config,
 95            openapi_config=self.openapi_config,
 96            pool_threads=pool_threads,
 97            api_version=API_VERSION,
 98        )
 99
100        self._api_client = self._vector_api.api_client
101
102        # Pass the same api_client to the ImportFeatureMixin
103        super().__init__(api_client=self._api_client)
104
105        self.load_plugins(
106            config=self.config, openapi_config=self.openapi_config, pool_threads=self.pool_threads
107        )
def close(self):
118    def close(self):
119        self._vector_api.api_client.close()
@validate_and_convert_errors
def upsert( self, vectors: Union[List[pinecone.data.dataclasses.vector.Vector], List[Tuple[str, List[float]]], List[Tuple[str, List[float], Dict[str, Union[str, int, float, List[str], List[int], List[float]]]]], List[pinecone.data.types.vector_typed_dict.VectorTypedDict]], namespace: Optional[str] = None, batch_size: Optional[int] = None, show_progress: bool = True, **kwargs) -> pinecone.core.openapi.db_data.model.upsert_response.UpsertResponse:
121    @validate_and_convert_errors
122    def upsert(
123        self,
124        vectors: Union[
125            List[Vector], List[VectorTuple], List[VectorTupleWithMetadata], List[VectorTypedDict]
126        ],
127        namespace: Optional[str] = None,
128        batch_size: Optional[int] = None,
129        show_progress: bool = True,
130        **kwargs,
131    ) -> UpsertResponse:
132        _check_type = kwargs.pop("_check_type", True)
133
134        if kwargs.get("async_req", False) and batch_size is not None:
135            raise ValueError(
136                "async_req is not supported when batch_size is provided."
137                "To upsert in parallel, please follow: "
138                "https://docs.pinecone.io/docs/insert-data#sending-upserts-in-parallel"
139            )
140
141        if batch_size is None:
142            return self._upsert_batch(vectors, namespace, _check_type, **kwargs)
143
144        if not isinstance(batch_size, int) or batch_size <= 0:
145            raise ValueError("batch_size must be a positive integer")
146
147        pbar = tqdm(total=len(vectors), disable=not show_progress, desc="Upserted vectors")
148        total_upserted = 0
149        for i in range(0, len(vectors), batch_size):
150            batch_result = self._upsert_batch(
151                vectors[i : i + batch_size], namespace, _check_type, **kwargs
152            )
153            pbar.update(batch_result.upserted_count)
154            # we can't use here pbar.n for the case show_progress=False
155            total_upserted += batch_result.upserted_count
156
157        return UpsertResponse(upserted_count=total_upserted)
Arguments:
  • vectors (Union[List[Vector], List[VectorTuple], List[VectorTupleWithMetadata], List[VectorTypedDict]]): A list of vectors to upsert.
  • namespace (str): The namespace to write to. If not specified, the default namespace is used. [optional]
  • batch_size (int): The number of vectors to upsert in each batch. If not specified, all vectors will be upserted in a single batch. [optional]
  • show_progress (bool): Whether to show a progress bar using tqdm. Applied only if batch_size is provided. Default is True.
Returns:

UpsertResponse, includes the number of vectors upserted.

The upsert operation writes vectors into a namespace. If a new value is upserted for an existing vector id, it will overwrite the previous value.

To upsert in parallel follow: https://docs.pinecone.io/docs/insert-data#sending-upserts-in-parallel

Upserting dense vectors

Note: the dimension of each dense vector must match the dimension of the index.

A vector can be represented in a variety of ways.

from pinecone import Pinecone, Vector

pc = Pinecone()
idx = pc.Index("index-name")

# A Vector object
idx.upsert(
    namespace = 'my-namespace',
    vectors = [
        Vector(id='id1', values=[0.1, 0.2, 0.3, 0.4], metadata={'metadata_key': 'metadata_value'}),
    ]
)

# A vector tuple
idx.upsert(
    namespace = 'my-namespace',
    vectors = [
        ('id1', [0.1, 0.2, 0.3, 0.4]),
    ]
)

# A vector tuple with metadata
idx.upsert(
    namespace = 'my-namespace',
    vectors = [
        ('id1', [0.1, 0.2, 0.3, 0.4], {'metadata_key': 'metadata_value'}),
    ]
)

# A vector dictionary
idx.upsert(
    namespace = 'my-namespace',
    vectors = [
        {"id": 1, "values": [0.1, 0.2, 0.3, 0.4], "metadata": {"metadata_key": "metadata_value"}},
    ]

Upserting sparse vectors

from pinecone import Pinecone, Vector, SparseValues

pc = Pinecone()
idx = pc.Index("index-name")

# A Vector object
idx.upsert(
    namespace = 'my-namespace',
    vectors = [
        Vector(id='id1', sparse_values=SparseValues(indices=[1, 2], values=[0.2, 0.4])),
    ]
)

# A dictionary
idx.upsert(
    namespace = 'my-namespace',
    vectors = [
        {"id": 1, "sparse_values": {"indices": [1, 2], "values": [0.2, 0.4]}},
    ]
)

Batch upsert

If you have a large number of vectors, you can upsert them in batches.

from pinecone import Pinecone, Vector

pc = Pinecone()
idx = pc.Index("index-name")

idx.upsert(
    namespace = 'my-namespace',
    vectors = [
        {'id': 'id1', 'values': [0.1, 0.2, 0.3, 0.4]},
        {'id': 'id2', 'values': [0.2, 0.3, 0.4, 0.5]},
        {'id': 'id3', 'values': [0.3, 0.4, 0.5, 0.6]},
        {'id': 'id4', 'values': [0.4, 0.5, 0.6, 0.7]},
        {'id': 'id5', 'values': [0.5, 0.6, 0.7, 0.8]},
        # More vectors here
    ],
    batch_size = 50
)

Visual progress bar with tqdm

To see a progress bar when upserting in batches, you will need to separately install the tqdm package. If tqdm is present, the client will detect and use it to display progress when show_progress=True.

@validate_and_convert_errors
def upsert_from_dataframe( self, df, namespace: Optional[str] = None, batch_size: int = 500, show_progress: bool = True) -> pinecone.core.openapi.db_data.model.upsert_response.UpsertResponse:
179    @validate_and_convert_errors
180    def upsert_from_dataframe(
181        self, df, namespace: Optional[str] = None, batch_size: int = 500, show_progress: bool = True
182    ) -> UpsertResponse:
183        try:
184            import pandas as pd
185        except ImportError:
186            raise RuntimeError(
187                "The `pandas` package is not installed. Please install pandas to use `upsert_from_dataframe()`"
188            )
189
190        if not isinstance(df, pd.DataFrame):
191            raise ValueError(f"Only pandas dataframes are supported. Found: {type(df)}")
192
193        pbar = tqdm(total=len(df), disable=not show_progress, desc="sending upsert requests")
194        results = []
195        for chunk in self._iter_dataframe(df, batch_size=batch_size):
196            res = self.upsert(vectors=chunk, namespace=namespace)
197            pbar.update(len(chunk))
198            results.append(res)
199
200        upserted_count = 0
201        for res in results:
202            upserted_count += res.upserted_count
203
204        return UpsertResponse(upserted_count=upserted_count)

Upserts a dataframe into the index.

Arguments:
  • df: A pandas dataframe with the following columns: id, values, sparse_values, and metadata.
  • namespace: The namespace to upsert into.
  • batch_size: The number of rows to upsert in a single batch.
  • show_progress: Whether to show a progress bar.
def upsert_records(self, namespace: str, records: List[Dict]):
206    def upsert_records(self, namespace: str, records: List[Dict]):
207        args = IndexRequestFactory.upsert_records_args(namespace=namespace, records=records)
208        self._vector_api.upsert_records_namespace(**args)
Parameters
  • namespace: The namespace of the index to upsert records to.
  • records: The records to upsert into the index.

Upsert records to a namespace. A record is a dictionary that contains eitiher an id or _id field along with other fields that will be stored as metadata. The id or _id field is used as the unique identifier for the record. At least one field in the record should correspond to a field mapping in the index's embed configuration.

When records are upserted, Pinecone converts mapped fields into embeddings and upserts them into the specified namespacce of the index.

from pinecone import (
    Pinecone,
    CloudProvider,
    AwsRegion,
    EmbedModel
    IndexEmbed
)

pc = Pinecone(api_key="<<PINECONE_API_KEY>>")

# Create an index for your embedding model
index_model = pc.create_index_for_model(
    name="my-model-index",
    cloud=CloudProvider.AWS,
    region=AwsRegion.US_WEST_2,
    embed=IndexEmbed(
        model=EmbedModel.Multilingual_E5_Large,
        field_map={"text": "my_text_field"}
    )
)

# Instantiate the index client
idx = pc.Index(host=index_model.host)

# upsert records
idx.upsert_records(
    namespace="my-namespace",
    records=[
        {
            "_id": "test1",
            "my_text_field": "Apple is a popular fruit known for its sweetness and crisp texture.",
        },
        {
            "_id": "test2",
            "my_text_field": "The tech company Apple is known for its innovative products like the iPhone.",
        },
        {
            "_id": "test3",
            "my_text_field": "Many people enjoy eating apples as a healthy snack.",
        },
        {
            "_id": "test4",
            "my_text_field": "Apple Inc. has revolutionized the tech industry with its sleek designs and user-friendly interfaces.",
        },
        {
            "_id": "test5",
            "my_text_field": "An apple a day keeps the doctor away, as the saying goes.",
        },
        {
            "_id": "test6",
            "my_text_field": "Apple Computer Company was founded on April 1, 1976, by Steve Jobs, Steve Wozniak, and Ronald Wayne as a partnership.",
        },
    ],
)

from pinecone import SearchQuery, SearchRerank, RerankModel

# search for similar records
response = idx.search_records(
    namespace="my-namespace",
    query=SearchQuery(
        inputs={
            "text": "Apple corporation",
        },
        top_k=3,
    ),
    rerank=SearchRerank(
        model=RerankModel.Bge_Reranker_V2_M3,
        rank_fields=["my_text_field"],
        top_n=3,
    ),
)
@validate_and_convert_errors
def search( self, namespace: str, query: Union[pinecone.data.types.search_query_typed_dict.SearchQueryTypedDict, pinecone.data.dataclasses.search_query.SearchQuery], rerank: Union[pinecone.data.types.search_rerank_typed_dict.SearchRerankTypedDict, pinecone.data.dataclasses.search_rerank.SearchRerank, NoneType] = None, fields: Optional[List[str]] = ['*']) -> pinecone.core.openapi.db_data.model.search_records_response.SearchRecordsResponse:
210    @validate_and_convert_errors
211    def search(
212        self,
213        namespace: str,
214        query: Union[SearchQueryTypedDict, SearchQuery],
215        rerank: Optional[Union[SearchRerankTypedDict, SearchRerank]] = None,
216        fields: Optional[List[str]] = ["*"],  # Default to returning all fields
217    ) -> SearchRecordsResponse:
218        if namespace is None:
219            raise Exception("Namespace is required when searching records")
220
221        request = IndexRequestFactory.search_request(query=query, rerank=rerank, fields=fields)
222
223        return self._vector_api.search_records_namespace(namespace, request)
Parameters
  • namespace: The namespace in the index to search.
  • query: The SearchQuery to use for the search.
  • rerank: The SearchRerank to use with the search request.
Returns

The records that match the search.

Search for records.

This operation converts a query to a vector embedding and then searches a namespace. You can optionally provide a reranking operation as part of the search.

from pinecone import (
    Pinecone,
    CloudProvider,
    AwsRegion,
    EmbedModel
    IndexEmbed
)

pc = Pinecone(api_key="<<PINECONE_API_KEY>>")

# Create an index for your embedding model
index_model = pc.create_index_for_model(
    name="my-model-index",
    cloud=CloudProvider.AWS,
    region=AwsRegion.US_WEST_2,
    embed=IndexEmbed(
        model=EmbedModel.Multilingual_E5_Large,
        field_map={"text": "my_text_field"}
    )
)

# Instantiate the index client
idx = pc.Index(host=index_model.host)

# upsert records
idx.upsert_records(
    namespace="my-namespace",
    records=[
        {
            "_id": "test1",
            "my_text_field": "Apple is a popular fruit known for its sweetness and crisp texture.",
        },
        {
            "_id": "test2",
            "my_text_field": "The tech company Apple is known for its innovative products like the iPhone.",
        },
        {
            "_id": "test3",
            "my_text_field": "Many people enjoy eating apples as a healthy snack.",
        },
        {
            "_id": "test4",
            "my_text_field": "Apple Inc. has revolutionized the tech industry with its sleek designs and user-friendly interfaces.",
        },
        {
            "_id": "test5",
            "my_text_field": "An apple a day keeps the doctor away, as the saying goes.",
        },
        {
            "_id": "test6",
            "my_text_field": "Apple Computer Company was founded on April 1, 1976, by Steve Jobs, Steve Wozniak, and Ronald Wayne as a partnership.",
        },
    ],
)

from pinecone import SearchQuery, SearchRerank, RerankModel

# search for similar records
response = idx.search_records(
    namespace="my-namespace",
    query=SearchQuery(
        inputs={
            "text": "Apple corporation",
        },
        top_k=3,
    ),
    rerank=SearchRerank(
        model=RerankModel.Bge_Reranker_V2_M3,
        rank_fields=["my_text_field"],
        top_n=3,
    ),
)
@validate_and_convert_errors
def search_records( self, namespace: str, query: Union[pinecone.data.types.search_query_typed_dict.SearchQueryTypedDict, pinecone.data.dataclasses.search_query.SearchQuery], rerank: Union[pinecone.data.types.search_rerank_typed_dict.SearchRerankTypedDict, pinecone.data.dataclasses.search_rerank.SearchRerank, NoneType] = None, fields: Optional[List[str]] = ['*']) -> pinecone.core.openapi.db_data.model.search_records_response.SearchRecordsResponse:
225    @validate_and_convert_errors
226    def search_records(
227        self,
228        namespace: str,
229        query: Union[SearchQueryTypedDict, SearchQuery],
230        rerank: Optional[Union[SearchRerankTypedDict, SearchRerank]] = None,
231        fields: Optional[List[str]] = ["*"],  # Default to returning all fields
232    ) -> SearchRecordsResponse:
233        return self.search(namespace, query=query, rerank=rerank, fields=fields)

Alias of the search() method.

@validate_and_convert_errors
def delete( self, ids: Optional[List[str]] = None, delete_all: Optional[bool] = None, namespace: Optional[str] = None, filter: Optional[Dict[str, Union[str, float, int, bool, List, dict]]] = None, **kwargs) -> Dict[str, Any]:
235    @validate_and_convert_errors
236    def delete(
237        self,
238        ids: Optional[List[str]] = None,
239        delete_all: Optional[bool] = None,
240        namespace: Optional[str] = None,
241        filter: Optional[Dict[str, Union[str, float, int, bool, List, dict]]] = None,
242        **kwargs,
243    ) -> Dict[str, Any]:
244        return self._vector_api.delete_vectors(
245            IndexRequestFactory.delete_request(
246                ids=ids, delete_all=delete_all, namespace=namespace, filter=filter, **kwargs
247            ),
248            **self._openapi_kwargs(kwargs),
249        )
Arguments:
  • ids (List[str]): Vector ids to delete [optional]
  • delete_all (bool): This indicates that all vectors in the index namespace should be deleted.. [optional] Default is False.
  • namespace (str): The namespace to delete vectors from [optional] If not specified, the default namespace is used.
  • filter (Dict[str, Union[str, float, int, bool, List, dict]]): If specified, the metadata filter here will be used to select the vectors to delete. This is mutually exclusive with specifying ids to delete in the ids param or using delete_all=True. See https://www.pinecone.io/docs/metadata-filtering/.. [optional]

The Delete operation deletes vectors from the index, from a single namespace.

No error is raised if the vector id does not exist.

Note: For any delete call, if namespace is not specified, the default namespace "" is used. Since the delete operation does not error when ids are not present, this means you may not receive an error if you delete from the wrong namespace.

Delete can occur in the following mutual exclusive ways:

  1. Delete by ids from a single namespace
  2. Delete all vectors from a single namespace by setting delete_all to True
  3. Delete all vectors from a single namespace by specifying a metadata filter (note that for this option delete all must be set to False)

API reference: https://docs.pinecone.io/reference/delete_post

Examples:
>>> index.delete(ids=['id1', 'id2'], namespace='my_namespace')
  >>> index.delete(delete_all=True, namespace='my_namespace')
  >>> index.delete(filter={'key': 'value'}, namespace='my_namespace')

Returns: An empty dictionary if the delete operation was successful.

@validate_and_convert_errors
def fetch( self, ids: List[str], namespace: Optional[str] = None, **kwargs) -> pinecone.data.dataclasses.fetch_response.FetchResponse:
251    @validate_and_convert_errors
252    def fetch(self, ids: List[str], namespace: Optional[str] = None, **kwargs) -> FetchResponse:
253        args_dict = parse_non_empty_args([("namespace", namespace)])
254        result = self._vector_api.fetch_vectors(ids=ids, **args_dict, **kwargs)
255        return FetchResponse(
256            namespace=result.namespace,
257            vectors={k: Vector.from_dict(v) for k, v in result.vectors.items()},
258            usage=result.usage,
259        )

The fetch operation looks up and returns vectors, by ID, from a single namespace. The returned vectors include the vector data and/or metadata.

API reference: https://docs.pinecone.io/reference/fetch

Examples:
>>> index.fetch(ids=['id1', 'id2'], namespace='my_namespace')
>>> index.fetch(ids=['id1', 'id2'])
Arguments:
  • ids (List[str]): The vector IDs to fetch.
  • namespace (str): The namespace to fetch vectors from. If not specified, the default namespace is used. [optional]

Returns: FetchResponse object which contains the list of Vector objects, and namespace name.

@validate_and_convert_errors
def query( self, *args, top_k: int, vector: Optional[List[float]] = None, id: Optional[str] = None, namespace: Optional[str] = None, filter: Union[Dict[str, Union[str, int, float, bool]], Dict[Literal['$eq'], Union[str, int, float, bool]], Dict[Literal['$ne'], Union[str, int, float, bool]], Dict[Literal['$gt'], Union[int, float]], Dict[Literal['$gte'], Union[int, float]], Dict[Literal['$lt'], Union[int, float]], Dict[Literal['$lte'], Union[int, float]], Dict[Literal['$in'], List[Union[str, int, float, bool]]], Dict[Literal['$nin'], List[Union[str, int, float, bool]]], Dict[Literal['$and'], List[Union[Dict[str, Union[str, int, float, bool]], Dict[Literal['$eq'], Union[str, int, float, bool]], Dict[Literal['$ne'], Union[str, int, float, bool]], Dict[Literal['$gt'], Union[int, float]], Dict[Literal['$gte'], Union[int, float]], Dict[Literal['$lt'], Union[int, float]], Dict[Literal['$lte'], Union[int, float]], Dict[Literal['$in'], List[Union[str, int, float, bool]]], Dict[Literal['$nin'], List[Union[str, int, float, bool]]]]]], NoneType] = None, include_values: Optional[bool] = None, include_metadata: Optional[bool] = None, sparse_vector: Union[pinecone.data.dataclasses.sparse_values.SparseValues, pinecone.data.types.sparse_vector_typed_dict.SparseVectorTypedDict, NoneType] = None, **kwargs) -> Union[pinecone.core.openapi.db_data.model.query_response.QueryResponse, multiprocessing.pool.ApplyResult]:
261    @validate_and_convert_errors
262    def query(
263        self,
264        *args,
265        top_k: int,
266        vector: Optional[List[float]] = None,
267        id: Optional[str] = None,
268        namespace: Optional[str] = None,
269        filter: Optional[FilterTypedDict] = None,
270        include_values: Optional[bool] = None,
271        include_metadata: Optional[bool] = None,
272        sparse_vector: Optional[Union[SparseValues, SparseVectorTypedDict]] = None,
273        **kwargs,
274    ) -> Union[QueryResponse, ApplyResult]:
275        response = self._query(
276            *args,
277            top_k=top_k,
278            vector=vector,
279            id=id,
280            namespace=namespace,
281            filter=filter,
282            include_values=include_values,
283            include_metadata=include_metadata,
284            sparse_vector=sparse_vector,
285            **kwargs,
286        )
287
288        if kwargs.get("async_req", False) or kwargs.get("async_threadpool_executor", False):
289            return response
290        else:
291            return parse_query_response(response)

The Query operation searches a namespace, using a query vector. It retrieves the ids of the most similar items in a namespace, along with their similarity scores.

API reference: https://docs.pinecone.io/reference/query

Examples:
>>> index.query(vector=[1, 2, 3], top_k=10, namespace='my_namespace')
>>> index.query(id='id1', top_k=10, namespace='my_namespace')
>>> index.query(vector=[1, 2, 3], top_k=10, namespace='my_namespace', filter={'key': 'value'})
>>> index.query(id='id1', top_k=10, namespace='my_namespace', include_metadata=True, include_values=True)
>>> index.query(vector=[1, 2, 3], sparse_vector={'indices': [1, 2], 'values': [0.2, 0.4]},
>>>             top_k=10, namespace='my_namespace')
>>> index.query(vector=[1, 2, 3], sparse_vector=SparseValues([1, 2], [0.2, 0.4]),
>>>             top_k=10, namespace='my_namespace')
Arguments:
  • vector (List[float]): The query vector. This should be the same length as the dimension of the index being queried. Each query() request can contain only one of the parameters id or vector.. [optional]
  • id (str): The unique ID of the vector to be used as a query vector. Each query() request can contain only one of the parameters vector or id. [optional]
  • top_k (int): The number of results to return for each query. Must be an integer greater than 1.
  • namespace (str): The namespace to query vectors from. If not specified, the default namespace is used. [optional]
  • filter (Dict[str, Union[str, float, int, bool, List, dict]): The filter to apply. You can use vector metadata to limit your search. See https://www.pinecone.io/docs/metadata-filtering/.. [optional]
  • include_values (bool): Indicates whether vector values are included in the response. If omitted the server will use the default value of False [optional]
  • include_metadata (bool): Indicates whether metadata is included in the response as well as the ids. If omitted the server will use the default value of False [optional]
  • sparse_vector: (Union[SparseValues, Dict[str, Union[List[float], List[int]]]]): sparse values of the query vector. Expected to be either a SparseValues object or a dict of the form: {'indices': List[int], 'values': List[float]}, where the lists each have the same length.

Returns: QueryResponse object which contains the list of the closest vectors as ScoredVector objects, and namespace name.

@validate_and_convert_errors
def query_namespaces( self, vector: Optional[List[float]], namespaces: List[str], metric: Literal['cosine', 'euclidean', 'dotproduct'], top_k: Optional[int] = None, filter: Optional[Dict[str, Union[str, float, int, bool, List, dict]]] = None, include_values: Optional[bool] = None, include_metadata: Optional[bool] = None, sparse_vector: Union[pinecone.data.dataclasses.sparse_values.SparseValues, Dict[str, Union[List[float], List[int]]], NoneType] = None, **kwargs) -> pinecone.data.query_results_aggregator.QueryNamespacesResults:
327    @validate_and_convert_errors
328    def query_namespaces(
329        self,
330        vector: Optional[List[float]],
331        namespaces: List[str],
332        metric: Literal["cosine", "euclidean", "dotproduct"],
333        top_k: Optional[int] = None,
334        filter: Optional[Dict[str, Union[str, float, int, bool, List, dict]]] = None,
335        include_values: Optional[bool] = None,
336        include_metadata: Optional[bool] = None,
337        sparse_vector: Optional[
338            Union[SparseValues, Dict[str, Union[List[float], List[int]]]]
339        ] = None,
340        **kwargs,
341    ) -> QueryNamespacesResults:
342        if namespaces is None or len(namespaces) == 0:
343            raise ValueError("At least one namespace must be specified")
344        if sparse_vector is None and vector is not None and len(vector) == 0:
345            # If querying with a vector, it must not be empty
346            raise ValueError("Query vector must not be empty")
347
348        overall_topk = top_k if top_k is not None else 10
349        aggregator = QueryResultsAggregator(top_k=overall_topk, metric=metric)
350
351        target_namespaces = set(namespaces)  # dedup namespaces
352        async_futures = [
353            self.query(
354                vector=vector,
355                namespace=ns,
356                top_k=overall_topk,
357                filter=filter,
358                include_values=include_values,
359                include_metadata=include_metadata,
360                sparse_vector=sparse_vector,
361                async_threadpool_executor=True,
362                _preload_content=False,
363                **kwargs,
364            )
365            for ns in target_namespaces
366        ]
367
368        for result in as_completed(async_futures):
369            raw_result = result.result()
370            response = json.loads(raw_result.data.decode("utf-8"))
371            aggregator.add_results(response)
372
373        final_results = aggregator.get_results()
374        return final_results

The query_namespaces() method is used to make a query to multiple namespaces in parallel and combine the results into one result set.

Since several asynchronous calls are made on your behalf when calling this method, you will need to tune the pool_threads and connection_pool_maxsize parameter of the Index constructor to suite your workload.

Examples:

from pinecone import Pinecone

pc = Pinecone(api_key="your-api-key")
index = pc.Index(
    host="index-name",
    pool_threads=32,
    connection_pool_maxsize=32
)

query_vec = [0.1, 0.2, 0.3] # An embedding that matches the index dimension
combined_results = index.query_namespaces(
    vector=query_vec,
    namespaces=['ns1', 'ns2', 'ns3', 'ns4'],
    metric="cosine",
    top_k=10,
    filter={'genre': {"$eq": "drama"}},
    include_values=True,
    include_metadata=True
)
for vec in combined_results.matches:
    print(vec.id, vec.score)
print(combined_results.usage)
Arguments:
  • vector (List[float]): The query vector, must be the same length as the dimension of the index being queried.
  • namespaces (List[str]): The list of namespaces to query.
  • top_k (Optional[int], optional): The number of results you would like to request from each namespace. Defaults to 10.
  • metric (str): Must be one of 'cosine', 'euclidean', 'dotproduct'. This is needed in order to merge results across namespaces, since the interpretation of score depends on the index metric type.
  • filter (Optional[Dict[str, Union[str, float, int, bool, List, dict]]], optional): Pass an optional filter to filter results based on metadata. Defaults to None.
  • include_values (Optional[bool], optional): Boolean field indicating whether vector values should be included with results. Defaults to None.
  • include_metadata (Optional[bool], optional): Boolean field indicating whether vector metadata should be included with results. Defaults to None.
  • sparse_vector (Optional[ Union[SparseValues, Dict[str, Union[List[float], List[int]]]] ], optional): If you are working with a dotproduct index, you can pass a sparse vector as part of your hybrid search. Defaults to None.
Returns:

QueryNamespacesResults: A QueryNamespacesResults object containing the combined results from all namespaces, as well as the combined usage cost in read units.

@validate_and_convert_errors
def update( self, id: str, values: Optional[List[float]] = None, set_metadata: Optional[Dict[str, Union[str, int, float, List[str], List[int], List[float]]]] = None, namespace: Optional[str] = None, sparse_values: Union[pinecone.data.dataclasses.sparse_values.SparseValues, pinecone.data.types.sparse_vector_typed_dict.SparseVectorTypedDict, NoneType] = None, **kwargs) -> Dict[str, Any]:
376    @validate_and_convert_errors
377    def update(
378        self,
379        id: str,
380        values: Optional[List[float]] = None,
381        set_metadata: Optional[VectorMetadataTypedDict] = None,
382        namespace: Optional[str] = None,
383        sparse_values: Optional[Union[SparseValues, SparseVectorTypedDict]] = None,
384        **kwargs,
385    ) -> Dict[str, Any]:
386        return self._vector_api.update_vector(
387            IndexRequestFactory.update_request(
388                id=id,
389                values=values,
390                set_metadata=set_metadata,
391                namespace=namespace,
392                sparse_values=sparse_values,
393                **kwargs,
394            ),
395            **self._openapi_kwargs(kwargs),
396        )

The Update operation updates vector in a namespace. If a value is included, it will overwrite the previous value. If a set_metadata is included, the values of the fields specified in it will be added or overwrite the previous value.

API reference: https://docs.pinecone.io/reference/update

Examples:
>>> index.update(id='id1', values=[1, 2, 3], namespace='my_namespace')
>>> index.update(id='id1', set_metadata={'key': 'value'}, namespace='my_namespace')
>>> index.update(id='id1', values=[1, 2, 3], sparse_values={'indices': [1, 2], 'values': [0.2, 0.4]},
>>>              namespace='my_namespace')
>>> index.update(id='id1', values=[1, 2, 3], sparse_values=SparseValues(indices=[1, 2], values=[0.2, 0.4]),
>>>              namespace='my_namespace')
Arguments:
  • id (str): Vector's unique id.
  • values (List[float]): vector values to set. [optional]
  • set_metadata (Dict[str, Union[str, float, int, bool, List[int], List[float], List[str]]]]): metadata to set for vector. [optional]
  • namespace (str): Namespace name where to update the vector.. [optional]
  • sparse_values: (Dict[str, Union[List[float], List[int]]]): sparse values to update for the vector. Expected to be either a SparseValues object or a dict of the form: {'indices': List[int], 'values': List[float]} where the lists each have the same length.

Returns: An empty dictionary if the update was successful.

@validate_and_convert_errors
def describe_index_stats( self, filter: Union[Dict[str, Union[str, int, float, bool]], Dict[Literal['$eq'], Union[str, int, float, bool]], Dict[Literal['$ne'], Union[str, int, float, bool]], Dict[Literal['$gt'], Union[int, float]], Dict[Literal['$gte'], Union[int, float]], Dict[Literal['$lt'], Union[int, float]], Dict[Literal['$lte'], Union[int, float]], Dict[Literal['$in'], List[Union[str, int, float, bool]]], Dict[Literal['$nin'], List[Union[str, int, float, bool]]], Dict[Literal['$and'], List[Union[Dict[str, Union[str, int, float, bool]], Dict[Literal['$eq'], Union[str, int, float, bool]], Dict[Literal['$ne'], Union[str, int, float, bool]], Dict[Literal['$gt'], Union[int, float]], Dict[Literal['$gte'], Union[int, float]], Dict[Literal['$lt'], Union[int, float]], Dict[Literal['$lte'], Union[int, float]], Dict[Literal['$in'], List[Union[str, int, float, bool]]], Dict[Literal['$nin'], List[Union[str, int, float, bool]]]]]], NoneType] = None, **kwargs) -> pinecone.core.openapi.db_data.model.index_description.IndexDescription:
398    @validate_and_convert_errors
399    def describe_index_stats(
400        self, filter: Optional[FilterTypedDict] = None, **kwargs
401    ) -> DescribeIndexStatsResponse:
402        return self._vector_api.describe_index_stats(
403            IndexRequestFactory.describe_index_stats_request(filter, **kwargs),
404            **self._openapi_kwargs(kwargs),
405        )

The DescribeIndexStats operation returns statistics about the index's contents. For example: The vector count per namespace and the number of dimensions.

API reference: https://docs.pinecone.io/reference/describe_index_stats_post

Examples:
>>> index.describe_index_stats()
>>> index.describe_index_stats(filter={'key': 'value'})
Arguments:
  • filter (Dict[str, Union[str, float, int, bool, List, dict]]):
  • If this parameter is present, the operation only returns statistics for vectors that satisfy the filter.
  • See https: //www.pinecone.io/docs/metadata-filtering/.. [optional]

Returns: DescribeIndexStatsResponse object which contains stats about the index.

@validate_and_convert_errors
def list_paginated( self, prefix: Optional[str] = None, limit: Optional[int] = None, pagination_token: Optional[str] = None, namespace: Optional[str] = None, **kwargs) -> pinecone.core.openapi.db_data.model.list_response.ListResponse:
407    @validate_and_convert_errors
408    def list_paginated(
409        self,
410        prefix: Optional[str] = None,
411        limit: Optional[int] = None,
412        pagination_token: Optional[str] = None,
413        namespace: Optional[str] = None,
414        **kwargs,
415    ) -> ListResponse:
416        args_dict = IndexRequestFactory.list_paginated_args(
417            prefix=prefix,
418            limit=limit,
419            pagination_token=pagination_token,
420            namespace=namespace,
421            **kwargs,
422        )
423        return self._vector_api.list_vectors(**args_dict, **kwargs)

The list_paginated operation finds vectors based on an id prefix within a single namespace. It returns matching ids in a paginated form, with a pagination token to fetch the next page of results. This id list can then be passed to fetch or delete operations, depending on your use case.

Consider using the list method to avoid having to handle pagination tokens manually.

Examples:
>>> results = index.list_paginated(prefix='99', limit=5, namespace='my_namespace')
>>> [v.id for v in results.vectors]
['99', '990', '991', '992', '993']
>>> results.pagination.next
eyJza2lwX3Bhc3QiOiI5OTMiLCJwcmVmaXgiOiI5OSJ9
>>> next_results = index.list_paginated(prefix='99', limit=5, namespace='my_namespace', pagination_token=results.pagination.next)
Arguments:
  • prefix (Optional[str]): The id prefix to match. If unspecified, an empty string prefix will be used with the effect of listing all ids in a namespace [optional]
  • limit (Optional[int]): The maximum number of ids to return. If unspecified, the server will use a default value. [optional]
  • pagination_token (Optional[str]): A token needed to fetch the next page of results. This token is returned in the response if additional results are available. [optional]
  • namespace (Optional[str]): The namespace to fetch vectors from. If not specified, the default namespace is used. [optional]

Returns: ListResponse object which contains the list of ids, the namespace name, pagination information, and usage showing the number of read_units consumed.

@validate_and_convert_errors
def list(self, **kwargs):
425    @validate_and_convert_errors
426    def list(self, **kwargs):
427        done = False
428        while not done:
429            results = self.list_paginated(**kwargs)
430            if len(results.vectors) > 0:
431                yield [v.id for v in results.vectors]
432
433            if results.pagination:
434                kwargs.update({"pagination_token": results.pagination.next})
435            else:
436                done = True

The list operation accepts all of the same arguments as list_paginated, and returns a generator that yields a list of the matching vector ids in each page of results. It automatically handles pagination tokens on your behalf.

Examples:
>>> for ids in index.list(prefix='99', limit=5, namespace='my_namespace'):
>>>     print(ids)
['99', '990', '991', '992', '993']
['994', '995', '996', '997', '998']
['999']
Arguments:
  • prefix (Optional[str]): The id prefix to match. If unspecified, an empty string prefix will be used with the effect of listing all ids in a namespace [optional]
  • limit (Optional[int]): The maximum number of ids to return. If unspecified, the server will use a default value. [optional]
  • pagination_token (Optional[str]): A token needed to fetch the next page of results. This token is returned in the response if additional results are available. [optional]
  • namespace (Optional[str]): The namespace to fetch vectors from. If not specified, the default namespace is used. [optional]