Source code for pinecone.grpc.index_grpc

import logging
from typing import Optional, Dict, Union, List, Tuple, Any, Iterable, cast, Literal

from google.protobuf import json_format

from pinecone.utils.tqdm import tqdm
from concurrent.futures import as_completed, Future


from .utils import (
    dict_to_proto_struct,
    parse_fetch_response,
    parse_query_response,
    parse_stats_response,
)
from .vector_factory_grpc import VectorFactoryGRPC
from .sparse_values_factory import SparseValuesFactory

from pinecone.core.openapi.db_data.models import (
    FetchResponse,
    QueryResponse,
    IndexDescription as DescribeIndexStatsResponse,
)
from pinecone.db_control.models.list_response import ListResponse as SimpleListResponse, Pagination
from pinecone.core.grpc.protos.db_data_2025_01_pb2 import (
    Vector as GRPCVector,
    QueryVector as GRPCQueryVector,
    UpsertRequest,
    UpsertResponse,
    DeleteRequest,
    QueryRequest,
    FetchRequest,
    UpdateRequest,
    ListRequest,
    DescribeIndexStatsRequest,
    DeleteResponse,
    UpdateResponse,
    SparseValues as GRPCSparseValues,
)
from pinecone import Vector, SparseValues
from pinecone.db_data.query_results_aggregator import QueryNamespacesResults, QueryResultsAggregator
from pinecone.core.grpc.protos.db_data_2025_01_pb2_grpc import VectorServiceStub
from .base import GRPCIndexBase
from .future import PineconeGrpcFuture
from ..db_data.types import (
    SparseVectorTypedDict,
    VectorTypedDict,
    VectorTuple,
    FilterTypedDict,
    VectorMetadataTypedDict,
)


__all__ = ["GRPCIndex", "GRPCVector", "GRPCQueryVector", "GRPCSparseValues"]

_logger = logging.getLogger(__name__)
""" :meta private: """


[docs] class GRPCIndex(GRPCIndexBase): """A client for interacting with a Pinecone index via GRPC API.""" @property def stub_class(self): """:meta private:""" return VectorServiceStub
[docs] def upsert( self, vectors: Union[List[Vector], List[GRPCVector], List[VectorTuple], List[VectorTypedDict]], async_req: bool = False, namespace: Optional[str] = None, batch_size: Optional[int] = None, show_progress: bool = True, **kwargs, ) -> Union[UpsertResponse, PineconeGrpcFuture]: """ The upsert operation writes vectors into a namespace. If a new value is upserted for an existing vector id, it will overwrite the previous value. Examples: .. code-block:: python >>> index.upsert([('id1', [1.0, 2.0, 3.0], {'key': 'value'}), ('id2', [1.0, 2.0, 3.0]) ], namespace='ns1', async_req=True) >>> index.upsert([{'id': 'id1', 'values': [1.0, 2.0, 3.0], 'metadata': {'key': 'value'}}, {'id': 'id2', 'values': [1.0, 2.0, 3.0], 'sparse_values': {'indices': [1, 8], 'values': [0.2, 0.4]}, ]) >>> index.upsert([GRPCVector(id='id1', values=[1.0, 2.0, 3.0], metadata={'key': 'value'}), GRPCVector(id='id2', values=[1.0, 2.0, 3.0]), GRPCVector(id='id3', values=[1.0, 2.0, 3.0], sparse_values=GRPCSparseValues(indices=[1, 2], values=[0.2, 0.4]))]) Args: vectors (Union[List[Vector], List[Tuple]]): A list of vectors to upsert. A vector can be represented by a 1) GRPCVector object, a 2) tuple or 3) a dictionary 1) if a tuple is used, it must be of the form (id, values, metadata) or (id, values). where id is a string, vector is a list of floats, and metadata is a dict. Examples: ('id1', [1.0, 2.0, 3.0], {'key': 'value'}), ('id2', [1.0, 2.0, 3.0]) 2) if a GRPCVector object is used, a GRPCVector object must be of the form GRPCVector(id, values, metadata), where metadata is an optional argument of type Dict[str, Union[str, float, int, bool, List[int], List[float], List[str]]] Examples: GRPCVector(id='id1', values=[1.0, 2.0, 3.0], metadata={'key': 'value'}), GRPCVector(id='id2', values=[1.0, 2.0, 3.0]), GRPCVector(id='id3', values=[1.0, 2.0, 3.0], sparse_values=GRPCSparseValues(indices=[1, 2], values=[0.2, 0.4])) 3) if a dictionary is used, it must be in the form {'id': str, 'values': List[float], 'sparse_values': {'indices': List[int], 'values': List[float]}, 'metadata': dict} Note: the dimension of each vector must match the dimension of the index. async_req (bool): If True, the upsert operation will be performed asynchronously. Cannot be used with batch_size. Defaults to False. See: https://docs.pinecone.io/docs/performance-tuning [optional] namespace (str): The namespace to write to. If not specified, the default namespace is used. [optional] batch_size (int): The number of vectors to upsert in each batch. Cannot be used with async_req=True. If not specified, all vectors will be upserted in a single batch. [optional] show_progress (bool): Whether to show a progress bar using tqdm. Applied only if batch_size is provided. Default is True. Returns: UpsertResponse, contains the number of vectors upserted """ if async_req and batch_size is not None: raise ValueError( "async_req is not supported when batch_size is provided." "To upsert in parallel, please follow: " "https://docs.pinecone.io/docs/performance-tuning" ) timeout = kwargs.pop("timeout", None) vectors = list(map(VectorFactoryGRPC.build, vectors)) if async_req: args_dict = self._parse_non_empty_args([("namespace", namespace)]) request = UpsertRequest(vectors=vectors, **args_dict, **kwargs) future = self.runner.run(self.stub.Upsert.future, request, timeout=timeout) return PineconeGrpcFuture(future) if batch_size is None: return self._upsert_batch(vectors, namespace, timeout=timeout, **kwargs) if not isinstance(batch_size, int) or batch_size <= 0: raise ValueError("batch_size must be a positive integer") pbar = tqdm(total=len(vectors), disable=not show_progress, desc="Upserted vectors") total_upserted = 0 for i in range(0, len(vectors), batch_size): batch_result = self._upsert_batch( vectors[i : i + batch_size], namespace, timeout=timeout, **kwargs ) pbar.update(batch_result.upserted_count) # we can't use here pbar.n for the case show_progress=False total_upserted += batch_result.upserted_count return UpsertResponse(upserted_count=total_upserted)
def _upsert_batch( self, vectors: List[GRPCVector], namespace: Optional[str], timeout: Optional[int], **kwargs ) -> UpsertResponse: args_dict = self._parse_non_empty_args([("namespace", namespace)]) request = UpsertRequest(vectors=vectors, **args_dict) return self.runner.run(self.stub.Upsert, request, timeout=timeout, **kwargs) def upsert_from_dataframe( self, df, namespace: str = "", batch_size: int = 500, use_async_requests: bool = True, show_progress: bool = True, ) -> UpsertResponse: """Upserts a dataframe into the index. Args: df: A pandas dataframe with the following columns: id, values, sparse_values, and metadata. namespace: The namespace to upsert into. batch_size: The number of rows to upsert in a single batch. use_async_requests: Whether to upsert multiple requests at the same time using asynchronous request mechanism. Set to ``False`` show_progress: Whether to show a progress bar. """ try: import pandas as pd except ImportError: raise RuntimeError( "The `pandas` package is not installed. Please install pandas to use `upsert_from_dataframe()`" ) if not isinstance(df, pd.DataFrame): raise ValueError(f"Only pandas dataframes are supported. Found: {type(df)}") pbar = tqdm(total=len(df), disable=not show_progress, desc="sending upsert requests") results = [] for chunk in self._iter_dataframe(df, batch_size=batch_size): res = self.upsert(vectors=chunk, namespace=namespace, async_req=use_async_requests) pbar.update(len(chunk)) results.append(res) if use_async_requests: cast_results = cast(List[PineconeGrpcFuture], results) results = [ async_result.result() for async_result in tqdm( iterable=cast_results, disable=not show_progress, desc="collecting async responses", ) ] upserted_count = 0 for res in results: if hasattr(res, "upserted_count") and isinstance(res.upserted_count, int): upserted_count += res.upserted_count return UpsertResponse(upserted_count=upserted_count) @staticmethod def _iter_dataframe(df, batch_size): for i in range(0, len(df), batch_size): batch = df.iloc[i : i + batch_size].to_dict(orient="records") yield batch
[docs] def delete( self, ids: Optional[List[str]] = None, delete_all: Optional[bool] = None, namespace: Optional[str] = None, filter: Optional[FilterTypedDict] = None, async_req: bool = False, **kwargs, ) -> Union[DeleteResponse, PineconeGrpcFuture]: """ The Delete operation deletes vectors from the index, from a single namespace. No error raised if the vector id does not exist. Args: ids (List[str]): Vector ids to delete [optional] delete_all (bool): This indicates that all vectors in the index namespace should be deleted.. [optional] Default is False. namespace (str): The namespace to delete vectors from [optional] If not specified, the default namespace is used. filter (FilterTypedDict): If specified, the metadata filter here will be used to select the vectors to delete. This is mutually exclusive with specifying ids to delete in the ids param or using delete_all=True. See `metadata filtering <https://www.pinecone.io/docs/metadata-filtering/>_` [optional] async_req (bool): If True, the delete operation will be performed asynchronously. Defaults to False. [optional] Returns: DeleteResponse (contains no data) or a PineconeGrpcFuture object if async_req is True. .. admonition:: Note For any delete call, if namespace is not specified, the default namespace is used. Delete can occur in the following mutual exclusive ways: 1. Delete by ids from a single namespace 2. Delete all vectors from a single namespace by setting delete_all to True 3. Delete all vectors from a single namespace by specifying a metadata filter (note that for this option delete all must be set to False) Examples: .. code-block:: python >>> index.delete(ids=['id1', 'id2'], namespace='my_namespace') >>> index.delete(delete_all=True, namespace='my_namespace') >>> index.delete(filter={'key': 'value'}, namespace='my_namespace', async_req=True) """ if filter is not None: filter_struct = dict_to_proto_struct(filter) else: filter_struct = None args_dict = self._parse_non_empty_args( [ ("ids", ids), ("delete_all", delete_all), ("namespace", namespace), ("filter", filter_struct), ] ) timeout = kwargs.pop("timeout", None) request = DeleteRequest(**args_dict, **kwargs) if async_req: future = self.runner.run(self.stub.Delete.future, request, timeout=timeout) return PineconeGrpcFuture(future) else: return self.runner.run(self.stub.Delete, request, timeout=timeout)
[docs] def fetch( self, ids: Optional[List[str]], namespace: Optional[str] = None, async_req: Optional[bool] = False, **kwargs, ) -> Union[FetchResponse, PineconeGrpcFuture]: """ The fetch operation looks up and returns vectors, by ID, from a single namespace. The returned vectors include the vector data and/or metadata. Examples: .. code-block:: python >>> index.fetch(ids=['id1', 'id2'], namespace='my_namespace') >>> index.fetch(ids=['id1', 'id2']) Args: ids (List[str]): The vector IDs to fetch. namespace (str): The namespace to fetch vectors from. If not specified, the default namespace is used. [optional] Returns: FetchResponse object which contains the list of Vector objects, and namespace name. """ timeout = kwargs.pop("timeout", None) args_dict = self._parse_non_empty_args([("namespace", namespace)]) request = FetchRequest(ids=ids, **args_dict, **kwargs) if async_req: future = self.runner.run(self.stub.Fetch.future, request, timeout=timeout) return PineconeGrpcFuture(future, result_transformer=parse_fetch_response) else: response = self.runner.run(self.stub.Fetch, request, timeout=timeout) return parse_fetch_response(response)
[docs] def query( self, vector: Optional[List[float]] = None, id: Optional[str] = None, namespace: Optional[str] = None, top_k: Optional[int] = None, filter: Optional[FilterTypedDict] = None, include_values: Optional[bool] = None, include_metadata: Optional[bool] = None, sparse_vector: Optional[ Union[SparseValues, GRPCSparseValues, SparseVectorTypedDict] ] = None, async_req: Optional[bool] = False, **kwargs, ) -> Union[QueryResponse, PineconeGrpcFuture]: """ The Query operation searches a namespace, using a query vector. It retrieves the ids of the most similar items in a namespace, along with their similarity scores. Examples: .. code-block:: python >>> index.query(vector=[1, 2, 3], top_k=10, namespace='my_namespace') >>> index.query(id='id1', top_k=10, namespace='my_namespace') >>> index.query(vector=[1, 2, 3], top_k=10, namespace='my_namespace', filter={'key': 'value'}) >>> index.query(id='id1', top_k=10, namespace='my_namespace', include_metadata=True, include_values=True) >>> index.query(vector=[1, 2, 3], sparse_vector={'indices': [1, 2], 'values': [0.2, 0.4]}, >>> top_k=10, namespace='my_namespace') >>> index.query(vector=[1, 2, 3], sparse_vector=GRPCSparseValues([1, 2], [0.2, 0.4]), >>> top_k=10, namespace='my_namespace') Args: vector (List[float]): The query vector. This should be the same length as the dimension of the index being queried. Each ``query()`` request can contain only one of the parameters ``id`` or ``vector``.. [optional] id (str): The unique ID of the vector to be used as a query vector. Each ``query()`` request can contain only one of the parameters ``vector`` or ``id``.. [optional] top_k (int): The number of results to return for each query. Must be an integer greater than 1. namespace (str): The namespace to fetch vectors from. If not specified, the default namespace is used. [optional] filter (Dict[str, Union[str, float, int, bool, List, dict]]): The filter to apply. You can use vector metadata to limit your search. See `metadata filtering <https://www.pinecone.io/docs/metadata-filtering/>_` [optional] include_values (bool): Indicates whether vector values are included in the response. If omitted the server will use the default value of False [optional] include_metadata (bool): Indicates whether metadata is included in the response as well as the ids. If omitted the server will use the default value of False [optional] sparse_vector: (Union[SparseValues, Dict[str, Union[List[float], List[int]]]]): sparse values of the query vector. Expected to be either a SparseValues object or a dict of the form: {'indices': List[int], 'values': List[float]}, where the lists each have the same length. Returns: QueryResponse object which contains the list of the closest vectors as ScoredVector objects, and namespace name. """ if vector is not None and id is not None: raise ValueError("Cannot specify both `id` and `vector`") if filter is not None: filter_struct = dict_to_proto_struct(filter) else: filter_struct = None sparse_vector = SparseValuesFactory.build(sparse_vector) args_dict = self._parse_non_empty_args( [ ("vector", vector), ("id", id), ("namespace", namespace), ("top_k", top_k), ("filter", filter_struct), ("include_values", include_values), ("include_metadata", include_metadata), ("sparse_vector", sparse_vector), ] ) request = QueryRequest(**args_dict) timeout = kwargs.pop("timeout", None) if async_req: future = self.runner.run(self.stub.Query.future, request, timeout=timeout) return PineconeGrpcFuture(future) else: response = self.runner.run(self.stub.Query, request, timeout=timeout) json_response = json_format.MessageToDict(response) return parse_query_response(json_response, _check_type=False)
[docs] def query_namespaces( self, vector: List[float], namespaces: List[str], metric: Literal["cosine", "euclidean", "dotproduct"], top_k: Optional[int] = None, filter: Optional[FilterTypedDict] = None, include_values: Optional[bool] = None, include_metadata: Optional[bool] = None, sparse_vector: Optional[Union[GRPCSparseValues, SparseVectorTypedDict]] = None, **kwargs, ) -> QueryNamespacesResults: if namespaces is None or len(namespaces) == 0: raise ValueError("At least one namespace must be specified") if len(vector) == 0: raise ValueError("Query vector must not be empty") overall_topk = top_k if top_k is not None else 10 aggregator = QueryResultsAggregator(top_k=overall_topk, metric=metric) target_namespaces = set(namespaces) # dedup namespaces futures = [ self.threadpool_executor.submit( self.query, vector=vector, namespace=ns, top_k=overall_topk, filter=filter, include_values=include_values, include_metadata=include_metadata, sparse_vector=sparse_vector, async_req=False, **kwargs, ) for ns in target_namespaces ] only_futures = cast(Iterable[Future], futures) for response in as_completed(only_futures): aggregator.add_results(response.result()) final_results = aggregator.get_results() return final_results
def update( self, id: str, async_req: bool = False, values: Optional[List[float]] = None, set_metadata: Optional[VectorMetadataTypedDict] = None, namespace: Optional[str] = None, sparse_values: Optional[Union[GRPCSparseValues, SparseVectorTypedDict]] = None, **kwargs, ) -> Union[UpdateResponse, PineconeGrpcFuture]: """ The Update operation updates vector in a namespace. If a value is included, it will overwrite the previous value. If a set_metadata is included, the values of the fields specified in it will be added or overwrite the previous value. Examples: .. code-block:: python >>> index.update(id='id1', values=[1, 2, 3], namespace='my_namespace') >>> index.update(id='id1', set_metadata={'key': 'value'}, namespace='my_namespace', async_req=True) >>> index.update(id='id1', values=[1, 2, 3], sparse_values={'indices': [1, 2], 'values': [0.2, 0.4]}, >>> namespace='my_namespace') >>> index.update(id='id1', values=[1, 2, 3], sparse_values=GRPCSparseValues(indices=[1, 2], values=[0.2, 0.4]), >>> namespace='my_namespace') Args: id (str): Vector's unique id. async_req (bool): If True, the update operation will be performed asynchronously. Defaults to False. [optional] values (List[float]): vector values to set. [optional] set_metadata (Dict[str, Union[str, float, int, bool, List[int], List[float], List[str]]]]): metadata to set for vector. [optional] namespace (str): Namespace name where to update the vector.. [optional] sparse_values: (Dict[str, Union[List[float], List[int]]]): sparse values to update for the vector. Expected to be either a GRPCSparseValues object or a dict of the form: {'indices': List[int], 'values': List[float]} where the lists each have the same length. Returns: UpdateResponse (contains no data) or a PineconeGrpcFuture object if async_req is True. """ if set_metadata is not None: set_metadata_struct = dict_to_proto_struct(set_metadata) else: set_metadata_struct = None timeout = kwargs.pop("timeout", None) sparse_values = SparseValuesFactory.build(sparse_values) args_dict = self._parse_non_empty_args( [ ("values", values), ("set_metadata", set_metadata_struct), ("namespace", namespace), ("sparse_values", sparse_values), ] ) request = UpdateRequest(id=id, **args_dict) if async_req: future = self.runner.run(self.stub.Update.future, request, timeout=timeout) return PineconeGrpcFuture(future) else: return self.runner.run(self.stub.Update, request, timeout=timeout)
[docs] def list_paginated( self, prefix: Optional[str] = None, limit: Optional[int] = None, pagination_token: Optional[str] = None, namespace: Optional[str] = None, **kwargs, ) -> SimpleListResponse: """ The list_paginated operation finds vectors based on an id prefix within a single namespace. It returns matching ids in a paginated form, with a pagination token to fetch the next page of results. This id list can then be passed to fetch or delete operations, depending on your use case. Consider using the ``list`` method to avoid having to handle pagination tokens manually. Examples: .. code-block:: python >>> results = index.list_paginated(prefix='99', limit=5, namespace='my_namespace') >>> [v.id for v in results.vectors] ['99', '990', '991', '992', '993'] >>> results.pagination.next eyJza2lwX3Bhc3QiOiI5OTMiLCJwcmVmaXgiOiI5OSJ9 >>> next_results = index.list_paginated(prefix='99', limit=5, namespace='my_namespace', pagination_token=results.pagination.next) Args: prefix (Optional[str]): The id prefix to match. If unspecified, an empty string prefix will be used with the effect of listing all ids in a namespace [optional] limit (Optional[int]): The maximum number of ids to return. If unspecified, the server will use a default value. [optional] pagination_token (Optional[str]): A token needed to fetch the next page of results. This token is returned in the response if additional results are available. [optional] namespace (Optional[str]): The namespace to fetch vectors from. If not specified, the default namespace is used. [optional] Returns: SimpleListResponse object which contains the list of ids, the namespace name, pagination information, and usage showing the number of read_units consumed. """ args_dict = self._parse_non_empty_args( [ ("prefix", prefix), ("limit", limit), ("namespace", namespace), ("pagination_token", pagination_token), ] ) request = ListRequest(**args_dict, **kwargs) timeout = kwargs.pop("timeout", None) response = self.runner.run(self.stub.List, request, timeout=timeout) if response.pagination and response.pagination.next != "": pagination = Pagination(next=response.pagination.next) else: pagination = None return SimpleListResponse( namespace=response.namespace, vectors=response.vectors, pagination=pagination )
[docs] def list(self, **kwargs): """ The list operation accepts all of the same arguments as list_paginated, and returns a generator that yields a list of the matching vector ids in each page of results. It automatically handles pagination tokens on your behalf. Examples: .. code-block:: python >>> for ids in index.list(prefix='99', limit=5, namespace='my_namespace'): >>> print(ids) ['99', '990', '991', '992', '993'] ['994', '995', '996', '997', '998'] ['999'] Args: prefix (Optional[str]): The id prefix to match. If unspecified, an empty string prefix will be used with the effect of listing all ids in a namespace [optional] limit (Optional[int]): The maximum number of ids to return. If unspecified, the server will use a default value. [optional] pagination_token (Optional[str]): A token needed to fetch the next page of results. This token is returned in the response if additional results are available. [optional] namespace (Optional[str]): The namespace to fetch vectors from. If not specified, the default namespace is used. [optional] """ done = False while not done: try: results = self.list_paginated(**kwargs) except Exception as e: raise e if len(results.vectors) > 0: yield [v.id for v in results.vectors] if results.pagination and results.pagination.next: kwargs.update({"pagination_token": results.pagination.next}) else: done = True
[docs] def describe_index_stats( self, filter: Optional[FilterTypedDict] = None, **kwargs ) -> DescribeIndexStatsResponse: """ The DescribeIndexStats operation returns statistics about the index's contents. For example: The vector count per namespace and the number of dimensions. Examples: .. code-block:: python >>> index.describe_index_stats() >>> index.describe_index_stats(filter={'key': 'value'}) Args: filter (Dict[str, Union[str, float, int, bool, List, dict]]): If this parameter is present, the operation only returns statistics for vectors that satisfy the filter. See `metadata filtering <https://www.pinecone.io/docs/metadata-filtering/>_` [optional] Returns: DescribeIndexStatsResponse object which contains stats about the index. """ if filter is not None: filter_struct = dict_to_proto_struct(filter) else: filter_struct = None args_dict = self._parse_non_empty_args([("filter", filter_struct)]) timeout = kwargs.pop("timeout", None) request = DescribeIndexStatsRequest(**args_dict) response = self.runner.run(self.stub.DescribeIndexStats, request, timeout=timeout) json_response = json_format.MessageToDict(response) return parse_stats_response(json_response)
@staticmethod def _parse_non_empty_args(args: List[Tuple[str, Any]]) -> Dict[str, Any]: return {arg_name: val for arg_name, val in args if val is not None}