from __future__ import annotations
import logging
from typing import List, Any, Iterable, cast, Literal, Iterator, TYPE_CHECKING
from pinecone.utils.tqdm import tqdm
from pinecone.utils import require_kwargs
from concurrent.futures import as_completed, Future
from .utils import (
dict_to_proto_struct,
parse_fetch_response,
parse_fetch_by_metadata_response,
parse_query_response,
query_response_to_dict,
parse_stats_response,
parse_upsert_response,
parse_update_response,
parse_delete_response,
parse_namespace_description,
parse_list_namespaces_response,
)
from .vector_factory_grpc import VectorFactoryGRPC
from .sparse_values_factory import SparseValuesFactory
from pinecone.core.openapi.db_data.models import (
IndexDescription as DescribeIndexStatsResponse,
NamespaceDescription,
ListNamespacesResponse,
)
from pinecone.db_data.dataclasses import (
FetchByMetadataResponse,
UpdateResponse,
UpsertResponse,
FetchResponse,
QueryResponse,
)
from pinecone.db_control.models.list_response import ListResponse as SimpleListResponse, Pagination
from pinecone.core.grpc.protos.db_data_2025_10_pb2 import (
Vector as GRPCVector,
QueryVector as GRPCQueryVector,
QueryResponse as ProtoQueryResponse,
UpsertRequest,
DeleteRequest,
QueryRequest,
FetchRequest,
FetchByMetadataRequest,
UpdateRequest,
ListRequest,
DescribeIndexStatsRequest,
SparseValues as GRPCSparseValues,
DescribeNamespaceRequest,
DeleteNamespaceRequest,
ListNamespacesRequest,
CreateNamespaceRequest,
MetadataSchema,
MetadataFieldProperties,
)
from pinecone.core.grpc.protos.db_data_2025_10_pb2_grpc import VectorServiceStub
from pinecone import Vector, SparseValues
from pinecone.db_data.query_results_aggregator import QueryNamespacesResults, QueryResultsAggregator
from .base import GRPCIndexBase
from .future import PineconeGrpcFuture
if TYPE_CHECKING:
from typing import Type
from ..db_data.types import (
SparseVectorTypedDict,
VectorTypedDict,
VectorTuple,
FilterTypedDict,
VectorMetadataTypedDict,
)
__all__ = [
"GRPCIndex",
"GRPCVector",
"GRPCQueryVector",
"GRPCSparseValues",
"NamespaceDescription",
"ListNamespacesResponse",
]
_logger = logging.getLogger(__name__)
""" :meta private: """
[docs]
class GRPCIndex(GRPCIndexBase):
"""A client for interacting with a Pinecone index via GRPC API."""
@property
def stub_class(self) -> "Type[VectorServiceStub]":
""":meta private:"""
return VectorServiceStub
[docs]
def upsert(
self,
vectors: list[Vector] | list[GRPCVector] | list[VectorTuple] | list[VectorTypedDict],
async_req: bool = False,
namespace: str | None = None,
batch_size: int | None = None,
show_progress: bool = True,
**kwargs,
) -> UpsertResponse | PineconeGrpcFuture:
"""
The upsert operation writes vectors into a namespace.
If a new value is upserted for an existing vector id, it will overwrite the previous value.
Examples:
.. code-block:: python
>>> index.upsert([('id1', [1.0, 2.0, 3.0], {'key': 'value'}),
('id2', [1.0, 2.0, 3.0])
],
namespace='ns1', async_req=True)
>>> index.upsert([{'id': 'id1', 'values': [1.0, 2.0, 3.0], 'metadata': {'key': 'value'}},
{'id': 'id2',
'values': [1.0, 2.0, 3.0],
'sparse_values': {'indices': [1, 8], 'values': [0.2, 0.4]},
])
>>> index.upsert([GRPCVector(id='id1', values=[1.0, 2.0, 3.0], metadata={'key': 'value'}),
GRPCVector(id='id2', values=[1.0, 2.0, 3.0]),
GRPCVector(id='id3',
values=[1.0, 2.0, 3.0],
sparse_values=GRPCSparseValues(indices=[1, 2], values=[0.2, 0.4]))])
Args:
vectors (Union[list[Vector], list[Tuple]]): A list of vectors to upsert.
A vector can be represented by a 1) GRPCVector object, a 2) tuple or 3) a dictionary
1) if a tuple is used, it must be of the form (id, values, metadata) or (id, values).
where id is a string, vector is a list of floats, and metadata is a dict.
Examples: ('id1', [1.0, 2.0, 3.0], {'key': 'value'}), ('id2', [1.0, 2.0, 3.0])
2) if a GRPCVector object is used, a GRPCVector object must be of the form
GRPCVector(id, values, metadata), where metadata is an optional argument of type
dict[str, Union[str, float, int, bool, list[int], list[float], list[str]]]
Examples: GRPCVector(id='id1', values=[1.0, 2.0, 3.0], metadata={'key': 'value'}),
GRPCVector(id='id2', values=[1.0, 2.0, 3.0]),
GRPCVector(id='id3',
values=[1.0, 2.0, 3.0],
sparse_values=GRPCSparseValues(indices=[1, 2], values=[0.2, 0.4]))
3) if a dictionary is used, it must be in the form
{'id': str, 'values': list[float], 'sparse_values': {'indices': list[int], 'values': list[float]},
'metadata': dict}
Note: the dimension of each vector must match the dimension of the index.
async_req (bool): If True, the upsert operation will be performed asynchronously.
Cannot be used with batch_size.
Defaults to False. See: https://docs.pinecone.io/docs/performance-tuning [optional]
namespace (str): The namespace to write to. If not specified, the default namespace is used. [optional]
batch_size (int): The number of vectors to upsert in each batch.
Cannot be used with async_req=True.
If not specified, all vectors will be upserted in a single batch. [optional]
show_progress (bool): Whether to show a progress bar using tqdm.
Applied only if batch_size is provided. Default is True.
Returns: UpsertResponse, contains the number of vectors upserted
"""
if async_req and batch_size is not None:
raise ValueError(
"async_req is not supported when batch_size is provided."
"To upsert in parallel, please follow: "
"https://docs.pinecone.io/docs/performance-tuning"
)
timeout = kwargs.pop("timeout", None)
vectors = list(map(VectorFactoryGRPC.build, vectors))
if async_req:
args_dict = self._parse_non_empty_args([("namespace", namespace)])
request = UpsertRequest(vectors=vectors, **args_dict, **kwargs)
future_result = self.runner.run(self.stub.Upsert.future, request, timeout=timeout)
# For .future calls, runner returns (future, None, None) since .future doesn't support with_call
# The future itself will provide metadata when it completes
future = future_result[0] if isinstance(future_result, tuple) else future_result
return PineconeGrpcFuture(
future, timeout=timeout, result_transformer=parse_upsert_response
)
if batch_size is None:
return self._upsert_batch(vectors, namespace, timeout=timeout, **kwargs)
if not isinstance(batch_size, int) or batch_size <= 0:
raise ValueError("batch_size must be a positive integer")
pbar = tqdm(total=len(vectors), disable=not show_progress, desc="Upserted vectors")
total_upserted = 0
last_batch_result = None
for i in range(0, len(vectors), batch_size):
batch_result = self._upsert_batch(
vectors[i : i + batch_size], namespace, timeout=timeout, **kwargs
)
pbar.update(batch_result.upserted_count)
# we can't use here pbar.n for the case show_progress=False
total_upserted += batch_result.upserted_count
last_batch_result = batch_result
# Create aggregated response with metadata from final batch
from pinecone.db_data.dataclasses import UpsertResponse
response_info = None
if last_batch_result and hasattr(last_batch_result, "_response_info"):
response_info = last_batch_result._response_info
else:
from pinecone.utils.response_info import extract_response_info
response_info = extract_response_info({})
return UpsertResponse(upserted_count=total_upserted, _response_info=response_info)
def _upsert_batch(
self, vectors: list[GRPCVector], namespace: str | None, timeout: int | None, **kwargs
) -> UpsertResponse:
args_dict = self._parse_non_empty_args([("namespace", namespace)])
request = UpsertRequest(vectors=vectors, **args_dict)
response, initial_metadata = self.runner.run(
self.stub.Upsert, request, timeout=timeout, **kwargs
)
return parse_upsert_response(response, initial_metadata=initial_metadata)
[docs]
def upsert_from_dataframe(
self,
df: Any,
namespace: str | None = None,
batch_size: int = 500,
use_async_requests: bool = True,
show_progress: bool = True,
) -> UpsertResponse:
"""Upserts a dataframe into the index.
Args:
df: A pandas dataframe with the following columns: id, values, sparse_values, and metadata.
namespace: The namespace to upsert into.
batch_size: The number of rows to upsert in a single batch.
use_async_requests: Whether to upsert multiple requests at the same time using asynchronous request mechanism.
Set to ``False``
show_progress: Whether to show a progress bar.
"""
try:
import pandas as pd
except ImportError:
raise RuntimeError(
"The `pandas` package is not installed. Please install pandas to use `upsert_from_dataframe()`"
)
if not isinstance(df, pd.DataFrame):
raise ValueError(f"Only pandas dataframes are supported. Found: {type(df)}")
pbar = tqdm(total=len(df), disable=not show_progress, desc="sending upsert requests")
results = []
for chunk in self._iter_dataframe(df, batch_size=batch_size):
# Type cast: dataframe dicts match VectorTypedDict structure
res = self.upsert(
vectors=cast(list[VectorTypedDict], chunk),
namespace=namespace,
async_req=use_async_requests,
)
pbar.update(len(chunk))
results.append(res)
if use_async_requests:
cast_results = cast(list[PineconeGrpcFuture], results)
results = [
async_result.result()
for async_result in tqdm(
iterable=cast_results,
disable=not show_progress,
desc="collecting async responses",
)
]
upserted_count = 0
last_result = None
for res in results:
if hasattr(res, "upserted_count") and isinstance(res.upserted_count, int):
upserted_count += res.upserted_count
last_result = res
response_info = None
if last_result and hasattr(last_result, "_response_info"):
response_info = last_result._response_info
else:
from pinecone.utils.response_info import extract_response_info
response_info = extract_response_info({})
return UpsertResponse(upserted_count=upserted_count, _response_info=response_info)
@staticmethod
def _iter_dataframe(df: Any, batch_size: int) -> Iterator[list[dict[str, Any]]]:
for i in range(0, len(df), batch_size):
batch = df.iloc[i : i + batch_size].to_dict(orient="records")
yield batch
[docs]
def delete(
self,
ids: list[str] | None = None,
delete_all: bool | None = None,
namespace: str | None = None,
filter: FilterTypedDict | None = None,
async_req: bool = False,
**kwargs,
) -> dict[str, Any] | PineconeGrpcFuture:
"""
The Delete operation deletes vectors from the index, from a single namespace.
No error raised if the vector id does not exist.
Args:
ids (list[str]): Vector ids to delete [optional]
delete_all (bool): This indicates that all vectors in the index namespace should be deleted.. [optional]
Default is False.
namespace (str): The namespace to delete vectors from [optional]
If not specified, the default namespace is used.
filter (FilterTypedDict):
If specified, the metadata filter here will be used to select the vectors to delete.
This is mutually exclusive with specifying ids to delete in the ids param or using delete_all=True.
See `metadata filtering <https://www.pinecone.io/docs/metadata-filtering/>_` [optional]
async_req (bool): If True, the delete operation will be performed asynchronously.
Defaults to False. [optional]
Returns: DeleteResponse (contains no data) or a PineconeGrpcFuture object if async_req is True.
.. admonition:: Note
For any delete call, if namespace is not specified, the default namespace is used.
Delete can occur in the following mutual exclusive ways:
1. Delete by ids from a single namespace
2. Delete all vectors from a single namespace by setting delete_all to True
3. Delete all vectors from a single namespace by specifying a metadata filter
(note that for this option delete all must be set to False)
Examples:
.. code-block:: python
>>> index.delete(ids=['id1', 'id2'], namespace='my_namespace')
>>> index.delete(delete_all=True, namespace='my_namespace')
>>> index.delete(filter={'key': 'value'}, namespace='my_namespace', async_req=True)
"""
if filter is not None:
filter_struct = dict_to_proto_struct(filter)
else:
filter_struct = None
args_dict = self._parse_non_empty_args(
[
("ids", ids),
("delete_all", delete_all),
("namespace", namespace),
("filter", filter_struct),
]
)
timeout = kwargs.pop("timeout", None)
request = DeleteRequest(**args_dict, **kwargs)
if async_req:
future_result = self.runner.run(self.stub.Delete.future, request, timeout=timeout)
# For .future calls, runner returns (future, None, None) since .future doesn't support with_call
future = future_result[0] if isinstance(future_result, tuple) else future_result
return PineconeGrpcFuture(
future, timeout=timeout, result_transformer=parse_delete_response
)
else:
response, initial_metadata = self.runner.run(self.stub.Delete, request, timeout=timeout)
return parse_delete_response(response, initial_metadata=initial_metadata)
[docs]
def fetch(
self,
ids: list[str] | None,
namespace: str | None = None,
async_req: bool | None = False,
**kwargs,
) -> FetchResponse | PineconeGrpcFuture:
"""
The fetch operation looks up and returns vectors, by ID, from a single namespace.
The returned vectors include the vector data and/or metadata.
Examples:
.. code-block:: python
>>> index.fetch(ids=['id1', 'id2'], namespace='my_namespace')
>>> index.fetch(ids=['id1', 'id2'])
Args:
ids (list[str]): The vector IDs to fetch.
namespace (str): The namespace to fetch vectors from.
If not specified, the default namespace is used. [optional]
Returns: FetchResponse object which contains the list of Vector objects, and namespace name.
"""
timeout = kwargs.pop("timeout", None)
args_dict = self._parse_non_empty_args([("namespace", namespace)])
request = FetchRequest(ids=ids, **args_dict, **kwargs)
if async_req:
future_result = self.runner.run(self.stub.Fetch.future, request, timeout=timeout)
# For .future calls, runner returns (future, None, None) since .future doesn't support with_call
future = future_result[0] if isinstance(future_result, tuple) else future_result
return PineconeGrpcFuture(
future, result_transformer=parse_fetch_response, timeout=timeout
)
else:
response, initial_metadata = self.runner.run(self.stub.Fetch, request, timeout=timeout)
return parse_fetch_response(response, initial_metadata=initial_metadata)
def _query(
self,
vector: list[float] | None = None,
id: str | None = None,
namespace: str | None = None,
top_k: int | None = None,
filter: FilterTypedDict | None = None,
include_values: bool | None = None,
include_metadata: bool | None = None,
sparse_vector: (SparseValues | GRPCSparseValues | SparseVectorTypedDict) | None = None,
**kwargs,
) -> tuple[ProtoQueryResponse, dict[str, str] | None]:
"""
Low-level query method that returns protobuf Message and initial metadata without parsing.
Used internally by query() and query_namespaces() for performance.
Returns:
Tuple of (protobuf_message, initial_metadata). initial_metadata may be None.
"""
if vector is not None and id is not None:
raise ValueError("Cannot specify both `id` and `vector`")
if filter is not None:
filter_struct = dict_to_proto_struct(filter)
else:
filter_struct = None
sparse_vector = SparseValuesFactory.build(sparse_vector)
args_dict = self._parse_non_empty_args(
[
("vector", vector),
("id", id),
("namespace", namespace),
("top_k", top_k),
("filter", filter_struct),
("include_values", include_values),
("include_metadata", include_metadata),
("sparse_vector", sparse_vector),
]
)
request = QueryRequest(**args_dict)
timeout = kwargs.pop("timeout", None)
response, initial_metadata = self.runner.run(self.stub.Query, request, timeout=timeout)
return response, initial_metadata
[docs]
def query(
self,
vector: list[float] | None = None,
id: str | None = None,
namespace: str | None = None,
top_k: int | None = None,
filter: FilterTypedDict | None = None,
include_values: bool | None = None,
include_metadata: bool | None = None,
sparse_vector: (SparseValues | GRPCSparseValues | SparseVectorTypedDict) | None = None,
async_req: bool | None = False,
**kwargs,
) -> "QueryResponse" | PineconeGrpcFuture:
"""
The Query operation searches a namespace, using a query vector.
It retrieves the ids of the most similar items in a namespace, along with their similarity scores.
Examples:
.. code-block:: python
>>> index.query(vector=[1, 2, 3], top_k=10, namespace='my_namespace')
>>> index.query(id='id1', top_k=10, namespace='my_namespace')
>>> index.query(vector=[1, 2, 3], top_k=10, namespace='my_namespace', filter={'key': 'value'})
>>> index.query(id='id1', top_k=10, namespace='my_namespace', include_metadata=True, include_values=True)
>>> index.query(vector=[1, 2, 3], sparse_vector={'indices': [1, 2], 'values': [0.2, 0.4]},
>>> top_k=10, namespace='my_namespace')
>>> index.query(vector=[1, 2, 3], sparse_vector=GRPCSparseValues([1, 2], [0.2, 0.4]),
>>> top_k=10, namespace='my_namespace')
Args:
vector (list[float]): The query vector. This should be the same length as the dimension of the index
being queried. Each ``query()`` request can contain only one of the parameters
``id`` or ``vector``.. [optional]
id (str): The unique ID of the vector to be used as a query vector.
Each ``query()`` request can contain only one of the parameters
``vector`` or ``id``.. [optional]
top_k (int): The number of results to return for each query. Must be an integer greater than 1.
namespace (str): The namespace to fetch vectors from.
If not specified, the default namespace is used. [optional]
filter (dict[str, Union[str, float, int, bool, List, dict]]):
The filter to apply. You can use vector metadata to limit your search.
See `metadata filtering <https://www.pinecone.io/docs/metadata-filtering/>_` [optional]
include_values (bool): Indicates whether vector values are included in the response.
If omitted the server will use the default value of False [optional]
include_metadata (bool): Indicates whether metadata is included in the response as well as the ids.
If omitted the server will use the default value of False [optional]
sparse_vector: (Union[SparseValues, dict[str, Union[list[float], list[int]]]]): sparse values of the query vector.
Expected to be either a SparseValues object or a dict of the form:
{'indices': list[int], 'values': list[float]}, where the lists each have the same length.
Returns: QueryResponse object which contains the list of the closest vectors as ScoredVector objects,
and namespace name.
"""
timeout = kwargs.pop("timeout", None)
if async_req:
# For async requests, we need to build the request manually
if vector is not None and id is not None:
raise ValueError("Cannot specify both `id` and `vector`")
if filter is not None:
filter_struct = dict_to_proto_struct(filter)
else:
filter_struct = None
sparse_vector = SparseValuesFactory.build(sparse_vector)
args_dict = self._parse_non_empty_args(
[
("vector", vector),
("id", id),
("namespace", namespace),
("top_k", top_k),
("filter", filter_struct),
("include_values", include_values),
("include_metadata", include_metadata),
("sparse_vector", sparse_vector),
]
)
request = QueryRequest(**args_dict)
future_result = self.runner.run(self.stub.Query.future, request, timeout=timeout)
# For .future calls, runner returns (future, None) since .future doesn't support with_call
future = future_result[0] if isinstance(future_result, tuple) else future_result
return PineconeGrpcFuture(
future, result_transformer=parse_query_response, timeout=timeout
)
else:
# For sync requests, use _query to get protobuf Message and metadata, then parse it
response, initial_metadata = self._query(
vector=vector,
id=id,
namespace=namespace,
top_k=top_k,
filter=filter,
include_values=include_values,
include_metadata=include_metadata,
sparse_vector=sparse_vector,
timeout=timeout,
**kwargs,
)
return parse_query_response(
response, _check_type=False, initial_metadata=initial_metadata
)
[docs]
def query_namespaces(
self,
vector: list[float],
namespaces: list[str],
metric: Literal["cosine", "euclidean", "dotproduct"],
top_k: int | None = None,
filter: FilterTypedDict | None = None,
include_values: bool | None = None,
include_metadata: bool | None = None,
sparse_vector: (GRPCSparseValues | SparseVectorTypedDict) | None = None,
**kwargs,
) -> QueryNamespacesResults:
if namespaces is None or len(namespaces) == 0:
raise ValueError("At least one namespace must be specified")
if len(vector) == 0:
raise ValueError("Query vector must not be empty")
overall_topk = top_k if top_k is not None else 10
aggregator = QueryResultsAggregator(top_k=overall_topk, metric=metric)
target_namespaces = set(namespaces) # dedup namespaces
futures = [
self.threadpool_executor.submit(
self._query,
vector=vector,
namespace=ns,
top_k=overall_topk,
filter=filter,
include_values=include_values,
include_metadata=include_metadata,
sparse_vector=sparse_vector,
**kwargs,
)
for ns in target_namespaces
]
only_futures = cast(Iterable[Future], futures)
for response in as_completed(only_futures):
proto_response, _ = response.result() # Ignore initial_metadata for query_namespaces
# Convert protobuf Message to dict format for aggregator using optimized helper
json_response = query_response_to_dict(proto_response)
aggregator.add_results(json_response)
final_results = aggregator.get_results()
return final_results
[docs]
def update(
self,
id: str | None = None,
async_req: bool = False,
values: list[float] | None = None,
set_metadata: VectorMetadataTypedDict | None = None,
namespace: str | None = None,
sparse_values: (GRPCSparseValues | SparseVectorTypedDict) | None = None,
filter: FilterTypedDict | None = None,
dry_run: bool | None = None,
**kwargs,
) -> UpdateResponse | PineconeGrpcFuture:
"""
The Update operation updates vectors in a namespace.
This method supports two update modes:
1. **Single vector update by ID**: Provide `id` to update a specific vector.
- Updates the vector with the given ID
- If `values` is included, it will overwrite the previous vector values
- If `set_metadata` is included, the metadata will be merged with existing metadata on the vector.
Fields specified in `set_metadata` will overwrite existing fields with the same key, while
fields not in `set_metadata` will remain unchanged.
2. **Bulk update by metadata filter**: Provide `filter` to update all vectors matching the filter criteria.
- Updates all vectors in the namespace that match the filter expression
- Useful for updating metadata across multiple vectors at once
- If `set_metadata` is included, the metadata will be merged with existing metadata on each vector.
Fields specified in `set_metadata` will overwrite existing fields with the same key, while
fields not in `set_metadata` will remain unchanged.
- The response includes `matched_records` indicating how many vectors were updated
Either `id` or `filter` must be provided (but not both in the same call).
Examples:
**Single vector update by ID:**
.. code-block:: python
>>> # Update vector values
>>> index.update(id='id1', values=[1, 2, 3], namespace='my_namespace')
>>> # Update vector metadata
>>> index.update(id='id1', set_metadata={'key': 'value'}, namespace='my_namespace', async_req=True)
>>> # Update vector values and sparse values
>>> index.update(id='id1', values=[1, 2, 3], sparse_values={'indices': [1, 2], 'values': [0.2, 0.4]},
>>> namespace='my_namespace')
>>> index.update(id='id1', values=[1, 2, 3], sparse_values=GRPCSparseValues(indices=[1, 2], values=[0.2, 0.4]),
>>> namespace='my_namespace')
**Bulk update by metadata filter:**
.. code-block:: python
>>> # Update metadata for all vectors matching the filter
>>> response = index.update(set_metadata={'status': 'active'}, filter={'genre': {'$eq': 'drama'}},
>>> namespace='my_namespace')
>>> print(f"Updated {response.matched_records} vectors")
>>> # Preview how many vectors would be updated (dry run)
>>> response = index.update(set_metadata={'status': 'active'}, filter={'genre': {'$eq': 'drama'}},
>>> namespace='my_namespace', dry_run=True)
>>> print(f"Would update {response.matched_records} vectors")
Args:
id (str): Vector's unique id. Required for single vector updates. Must not be provided when using filter. [optional]
async_req (bool): If True, the update operation will be performed asynchronously.
Defaults to False. [optional]
values (list[float]): Vector values to set. [optional]
set_metadata (dict[str, Union[str, float, int, bool, list[int], list[float], list[str]]]]):
Metadata to merge with existing metadata on the vector(s). Fields specified will overwrite
existing fields with the same key, while fields not specified will remain unchanged. [optional]
namespace (str): Namespace name where to update the vector(s). [optional]
sparse_values: (dict[str, Union[list[float], list[int]]]): Sparse values to update for the vector.
Expected to be either a GRPCSparseValues object or a dict of the form:
{'indices': list[int], 'values': list[float]} where the lists each have the same length. [optional]
filter (dict[str, Union[str, float, int, bool, List, dict]]): A metadata filter expression.
When provided, updates all vectors in the namespace that match the filter criteria.
See `metadata filtering <https://www.pinecone.io/docs/metadata-filtering/>_`.
Must not be provided when using id. Either `id` or `filter` must be provided. [optional]
dry_run (bool): If `True`, return the number of records that match the `filter` without executing
the update. Only meaningful when using `filter` (not with `id`). Useful for previewing
the impact of a bulk update before applying changes. Defaults to `False`. [optional]
Returns:
UpdateResponse or PineconeGrpcFuture: When using filter-based updates, the UpdateResponse includes
`matched_records` indicating the number of vectors that were updated (or would be updated if
`dry_run=True`). If `async_req=True`, returns a PineconeGrpcFuture object instead.
"""
# Validate that exactly one of id or filter is provided
if id is None and filter is None:
raise ValueError("Either 'id' or 'filter' must be provided to update vectors.")
if id is not None and filter is not None:
raise ValueError(
"Cannot provide both 'id' and 'filter' in the same update call. Use 'id' for single vector updates or 'filter' for bulk updates."
)
if set_metadata is not None:
set_metadata_struct = dict_to_proto_struct(set_metadata)
else:
set_metadata_struct = None
if filter is not None:
filter_struct = dict_to_proto_struct(filter)
else:
filter_struct = None
timeout = kwargs.pop("timeout", None)
sparse_values = SparseValuesFactory.build(sparse_values)
args_dict = self._parse_non_empty_args(
[
("id", id),
("values", values),
("set_metadata", set_metadata_struct),
("namespace", namespace),
("sparse_values", sparse_values),
("filter", filter_struct),
("dry_run", dry_run),
]
)
request = UpdateRequest(**args_dict)
if async_req:
future_result = self.runner.run(self.stub.Update.future, request, timeout=timeout)
# For .future calls, runner returns (future, None, None) since .future doesn't support with_call
future = future_result[0] if isinstance(future_result, tuple) else future_result
return PineconeGrpcFuture(
future, timeout=timeout, result_transformer=parse_update_response
)
else:
response, initial_metadata = self.runner.run(self.stub.Update, request, timeout=timeout)
return parse_update_response(response, initial_metadata=initial_metadata)
[docs]
def list_paginated(
self,
prefix: str | None = None,
limit: int | None = None,
pagination_token: str | None = None,
namespace: str | None = None,
**kwargs,
) -> SimpleListResponse:
"""
The list_paginated operation finds vectors based on an id prefix within a single namespace.
It returns matching ids in a paginated form, with a pagination token to fetch the next page of results.
This id list can then be passed to fetch or delete operations, depending on your use case.
Consider using the ``list`` method to avoid having to handle pagination tokens manually.
Examples:
.. code-block:: python
>>> results = index.list_paginated(prefix='99', limit=5, namespace='my_namespace')
>>> [v.id for v in results.vectors]
['99', '990', '991', '992', '993']
>>> results.pagination.next
eyJza2lwX3Bhc3QiOiI5OTMiLCJwcmVmaXgiOiI5OSJ9
>>> next_results = index.list_paginated(prefix='99', limit=5, namespace='my_namespace', pagination_token=results.pagination.next)
Args:
prefix (Optional[str]): The id prefix to match. If unspecified, an empty string prefix will
be used with the effect of listing all ids in a namespace [optional]
limit (Optional[int]): The maximum number of ids to return. If unspecified, the server will use a default value. [optional]
pagination_token (Optional[str]): A token needed to fetch the next page of results. This token is returned
in the response if additional results are available. [optional]
namespace (Optional[str]): The namespace to fetch vectors from. If not specified, the default namespace is used. [optional]
Returns: SimpleListResponse object which contains the list of ids, the namespace name, pagination information, and usage showing the number of read_units consumed.
"""
args_dict = self._parse_non_empty_args(
[
("prefix", prefix),
("limit", limit),
("namespace", namespace),
("pagination_token", pagination_token),
]
)
request = ListRequest(**args_dict, **kwargs)
timeout = kwargs.pop("timeout", None)
response, _ = self.runner.run(self.stub.List, request, timeout=timeout)
if response.pagination and response.pagination.next != "":
pagination = Pagination(next=response.pagination.next)
else:
pagination = None
return SimpleListResponse(
namespace=response.namespace, vectors=response.vectors, pagination=pagination
)
[docs]
def list(self, **kwargs) -> Iterator[list[str]]:
"""
The list operation accepts all of the same arguments as list_paginated, and returns a generator that yields
a list of the matching vector ids in each page of results. It automatically handles pagination tokens on your
behalf.
Examples:
.. code-block:: python
>>> for ids in index.list(prefix='99', limit=5, namespace='my_namespace'):
>>> print(ids)
['99', '990', '991', '992', '993']
['994', '995', '996', '997', '998']
['999']
Args:
prefix (Optional[str]): The id prefix to match. If unspecified, an empty string prefix will
be used with the effect of listing all ids in a namespace [optional]
limit (Optional[int]): The maximum number of ids to return. If unspecified, the server will use a default value. [optional]
pagination_token (Optional[str]): A token needed to fetch the next page of results. This token is returned
in the response if additional results are available. [optional]
namespace (Optional[str]): The namespace to fetch vectors from. If not specified, the default namespace is used. [optional]
"""
done = False
while not done:
try:
results = self.list_paginated(**kwargs)
except Exception as e:
raise e
if len(results.vectors) > 0:
yield [v.id for v in results.vectors]
if results.pagination and results.pagination.next:
kwargs.update({"pagination_token": results.pagination.next})
else:
done = True
[docs]
def describe_index_stats(
self, filter: FilterTypedDict | None = None, **kwargs
) -> DescribeIndexStatsResponse:
"""
The DescribeIndexStats operation returns statistics about the index's contents.
For example: The vector count per namespace and the number of dimensions.
Examples:
.. code-block:: python
>>> index.describe_index_stats()
>>> index.describe_index_stats(filter={'key': 'value'})
Args:
filter (dict[str, Union[str, float, int, bool, List, dict]]):
If this parameter is present, the operation only returns statistics for vectors that satisfy the filter.
See `metadata filtering <https://www.pinecone.io/docs/metadata-filtering/>_` [optional]
Returns: DescribeIndexStatsResponse object which contains stats about the index.
"""
if filter is not None:
filter_struct = dict_to_proto_struct(filter)
else:
filter_struct = None
args_dict = self._parse_non_empty_args([("filter", filter_struct)])
timeout = kwargs.pop("timeout", None)
request = DescribeIndexStatsRequest(**args_dict)
response, _ = self.runner.run(self.stub.DescribeIndexStats, request, timeout=timeout)
return parse_stats_response(response)
[docs]
@require_kwargs
def create_namespace(
self, name: str, schema: dict[str, Any] | None = None, async_req: bool = False, **kwargs
) -> NamespaceDescription | PineconeGrpcFuture:
"""
The create_namespace operation creates a namespace in a serverless index.
Examples:
.. code-block:: python
>>> index.create_namespace(name='my_namespace')
>>> # Create namespace asynchronously
>>> future = index.create_namespace(name='my_namespace', async_req=True)
>>> namespace = future.result()
Args:
name (str): The name of the namespace to create.
schema (Optional[dict[str, Any]]): Optional schema configuration for the namespace as a dictionary. [optional]
async_req (bool): If True, the create_namespace operation will be performed asynchronously. [optional]
Returns: NamespaceDescription object which contains information about the created namespace, or a PineconeGrpcFuture object if async_req is True.
"""
timeout = kwargs.pop("timeout", None)
# Build MetadataSchema from dict if provided
metadata_schema = None
if schema is not None:
if isinstance(schema, dict):
# Convert dict to MetadataSchema
fields = {}
for key, value in schema.get("fields", {}).items():
if isinstance(value, dict):
filterable = value.get("filterable", False)
fields[key] = MetadataFieldProperties(filterable=filterable)
else:
# If value is already a MetadataFieldProperties, use it directly
fields[key] = value
metadata_schema = MetadataSchema(fields=fields)
else:
# Assume it's already a MetadataSchema
metadata_schema = schema
request_kwargs: dict[str, Any] = {"name": name}
if metadata_schema is not None:
request_kwargs["schema"] = metadata_schema
request = CreateNamespaceRequest(**request_kwargs)
if async_req:
future_result = self.runner.run(
self.stub.CreateNamespace.future, request, timeout=timeout
)
# For .future calls, runner returns (future, None, None) since .future doesn't support with_call
future = future_result[0] if isinstance(future_result, tuple) else future_result
return PineconeGrpcFuture(
future, timeout=timeout, result_transformer=parse_namespace_description
)
response, initial_metadata = self.runner.run(
self.stub.CreateNamespace, request, timeout=timeout
)
return parse_namespace_description(response, initial_metadata=initial_metadata)
[docs]
@require_kwargs
def describe_namespace(self, namespace: str, **kwargs) -> NamespaceDescription:
"""
The describe_namespace operation returns information about a specific namespace,
including the total number of vectors in the namespace.
Examples:
.. code-block:: python
>>> index.describe_namespace(namespace='my_namespace')
Args:
namespace (str): The namespace to describe.
Returns: NamespaceDescription object which contains information about the namespace.
"""
timeout = kwargs.pop("timeout", None)
request = DescribeNamespaceRequest(namespace=namespace)
response, initial_metadata = self.runner.run(
self.stub.DescribeNamespace, request, timeout=timeout
)
return parse_namespace_description(response, initial_metadata=initial_metadata)
[docs]
@require_kwargs
def delete_namespace(self, namespace: str, **kwargs) -> dict[str, Any]:
"""
The delete_namespace operation deletes a namespace from an index.
This operation is irreversible and will permanently delete all data in the namespace.
Examples:
.. code-block:: python
>>> index.delete_namespace(namespace='my_namespace')
Args:
namespace (str): The namespace to delete.
Returns: Empty dictionary indicating successful deletion.
"""
timeout = kwargs.pop("timeout", None)
request = DeleteNamespaceRequest(namespace=namespace)
response, initial_metadata = self.runner.run(
self.stub.DeleteNamespace, request, timeout=timeout
)
return parse_delete_response(response, initial_metadata=initial_metadata)
[docs]
@require_kwargs
def list_namespaces_paginated(
self, limit: int | None = None, pagination_token: str | None = None, **kwargs
) -> ListNamespacesResponse:
"""
The list_namespaces_paginated operation returns a list of all namespaces in a serverless index.
It returns namespaces in a paginated form, with a pagination token to fetch the next page of results.
Examples:
.. code-block:: python
>>> results = index.list_namespaces_paginated(limit=10)
>>> [ns.name for ns in results.namespaces]
['namespace1', 'namespace2', 'namespace3']
>>> results.pagination.next
eyJza2lwX3Bhc3QiOiI5OTMiLCJwcmVmaXgiOiI5OSJ9
>>> next_results = index.list_namespaces_paginated(limit=10, pagination_token=results.pagination.next)
Args:
limit (Optional[int]): The maximum number of namespaces to return. If unspecified, the server will use a default value. [optional]
pagination_token (Optional[str]): A token needed to fetch the next page of results. This token is returned
in the response if additional results are available. [optional]
Returns: ListNamespacesResponse object which contains the list of namespaces and pagination information.
"""
args_dict = self._parse_non_empty_args(
[("limit", limit), ("pagination_token", pagination_token)]
)
timeout = kwargs.pop("timeout", None)
request = ListNamespacesRequest(**args_dict, **kwargs)
response, _ = self.runner.run(self.stub.ListNamespaces, request, timeout=timeout)
return parse_list_namespaces_response(response)
[docs]
@require_kwargs
def list_namespaces(self, limit: int | None = None, **kwargs):
"""
The list_namespaces operation accepts all of the same arguments as list_namespaces_paginated, and returns a generator that yields
each namespace. It automatically handles pagination tokens on your behalf.
Args:
limit (Optional[int]): The maximum number of namespaces to fetch in each network call. If unspecified, the server will use a default value. [optional]
Returns:
Returns a generator that yields each namespace. It automatically handles pagination tokens on your behalf so you can
easily iterate over all results. The ``list_namespaces`` method accepts all of the same arguments as list_namespaces_paginated
Examples:
.. code-block:: python
>>> for namespace in index.list_namespaces():
>>> print(namespace.name)
namespace1
namespace2
namespace3
You can convert the generator into a list by wrapping the generator in a call to the built-in ``list`` function:
.. code-block:: python
namespaces = list(index.list_namespaces())
You should be cautious with this approach because it will fetch all namespaces at once, which could be a large number
of network calls and a lot of memory to hold the results.
"""
done = False
while not done:
try:
results = self.list_namespaces_paginated(limit=limit, **kwargs)
except Exception as e:
raise e
if results.namespaces and len(results.namespaces) > 0:
for namespace in results.namespaces:
yield namespace
if results.pagination and results.pagination.next:
kwargs.update({"pagination_token": results.pagination.next})
else:
done = True
@staticmethod
def _parse_non_empty_args(args: List[tuple[str, Any]]) -> dict[str, Any]:
return {arg_name: val for arg_name, val in args if val is not None}