Source code for pinecone.grpc

"""Synchronous gRPC data plane client for a Pinecone index."""

from __future__ import annotations

import builtins
import logging
import os
from collections.abc import Iterator, Sequence
from concurrent.futures import ThreadPoolExecutor
from typing import TYPE_CHECKING, Any

if TYPE_CHECKING:
    import pandas as pd  # type: ignore[import-untyped]

from pinecone._internal.adapters.vectors_adapter import VectorsAdapter, extract_response_info
from pinecone._internal.batch import batch_execute
from pinecone._internal.batching import chunked, validate_batch_size, with_progress
from pinecone._internal.config import PineconeConfig
from pinecone._internal.constants import DATA_PLANE_API_VERSION
from pinecone._internal.data_plane_helpers import _validate_host
from pinecone._internal.validation import require_in_range
from pinecone._internal.vector_factory import VectorFactory
from pinecone.errors.exceptions import (
    PineconeValueError,
    ValidationError,
)
from pinecone.grpc._protocol import GrpcChannelProtocol
from pinecone.grpc.future import PineconeFuture
from pinecone.models.vectors.responses import (
    DescribeIndexStatsResponse,
    FetchResponse,
    ListItem,
    ListResponse,
    NamespaceSummary,
    Pagination,
    QueryResponse,
    UpdateResponse,
    UpsertRecordsResponse,
    UpsertResponse,
)
from pinecone.models.vectors.search import RerankConfig, SearchInputs, SearchRecordsResponse
from pinecone.models.vectors.sparse import SparseValues
from pinecone.models.vectors.usage import Usage
from pinecone.models.vectors.vector import ScoredVector, Vector

logger = logging.getLogger(__name__)


def _build_grpc_endpoint(host: str, secure: bool) -> str:
    """Build a gRPC endpoint URL from a host string.

    Strips any existing scheme and applies the correct one for gRPC.
    """
    bare = host
    for prefix in ("https://", "http://"):
        if bare.startswith(prefix):
            bare = bare[len(prefix) :]
            break

    scheme = "https" if secure else "http"
    return f"{scheme}://{bare}"


def _vector_to_grpc_dict(v: Vector) -> dict[str, Any]:
    """Serialize a Vector to a dict matching GrpcChannel's expected input format."""
    d: dict[str, Any] = {"id": v.id, "values": v.values}
    if v.sparse_values is not None:
        d["sparse_values"] = {
            "indices": v.sparse_values.indices,
            "values": v.sparse_values.values,
        }
    if v.metadata is not None:
        d["metadata"] = v.metadata
    return d


def _dict_to_vector(vid: str, data: dict[str, Any]) -> Vector:
    """Convert a GrpcChannel vector dict to a Vector model."""
    sparse = None
    sv = data.get("sparse_values")
    if sv is not None:
        sparse = SparseValues(sv["indices"], sv["values"])
    return Vector(
        id=vid,
        values=data.get("values", []),
        sparse_values=sparse,
        metadata=data.get("metadata"),
    )


def _dict_to_scored_vector(data: dict[str, Any]) -> ScoredVector:
    """Convert a GrpcChannel scored vector dict to a ScoredVector model."""
    sparse = None
    sv = data.get("sparse_values")
    if sv is not None:
        sparse = SparseValues(sv["indices"], sv["values"])
    return ScoredVector(
        id=data["id"],
        score=data.get("score", 0.0),
        values=data.get("values", []),
        sparse_values=sparse,
        metadata=data.get("metadata"),
    )


def _dict_to_usage(data: dict[str, Any] | None) -> Usage | None:
    """Convert a usage dict to a Usage model, or None."""
    if data is None:
        return None
    return Usage(read_units=data.get("read_units", 0))


[docs] class GrpcIndex: """Synchronous gRPC data plane client targeting a specific Pinecone index. Provides the same interface as :class:`~pinecone.index.Index` but routes data-plane operations through a gRPC transport (via the Rust-backed :class:`~pinecone._grpc.GrpcChannel`) instead of HTTP/REST. Args: host (str): The index-specific data plane host URL. api_key (str | None): Pinecone API key. Falls back to ``PINECONE_API_KEY`` env var. api_version (str): API version string. Defaults to the current data plane version. source_tag (str | None): Tag appended to the User-Agent string for request attribution. secure (bool): Whether to use TLS encryption. Defaults to ``True``. timeout (float): Request timeout in seconds. Defaults to ``20.0``. connect_timeout (float): Connection timeout in seconds. Defaults to ``1.0``. Raises: :exc:`ValidationError`: If no API key can be resolved or the host is invalid. Examples: .. code-block:: python from pinecone.grpc import GrpcIndex idx = GrpcIndex(host="movie-recs-abc123.svc.pinecone.io", api_key="...") """
[docs] def __init__( self, *, host: str, api_key: str | None = None, api_version: str = DATA_PLANE_API_VERSION, source_tag: str | None = None, secure: bool = True, timeout: float = 20.0, connect_timeout: float = 1.0, ) -> None: # Resolve API key: explicit arg > env var resolved_key = api_key or os.environ.get("PINECONE_API_KEY", "") if not resolved_key: raise ValidationError( "No API key provided. Pass api_key='...' or set the " "PINECONE_API_KEY environment variable." ) # Validate and normalize host self._host = _validate_host(host) self._source_tag = source_tag # Build gRPC endpoint and create the Rust-backed channel endpoint = _build_grpc_endpoint(self._host, secure) from pinecone import __version__ from pinecone._grpc import GrpcChannel # type: ignore[import-not-found] self._channel: GrpcChannelProtocol = GrpcChannel( endpoint, resolved_key, api_version, __version__, secure, timeout, connect_timeout, source_tag=source_tag, ) self._executor = ThreadPoolExecutor() self._batch_executor: ThreadPoolExecutor | None = None self._batch_executor_workers: int = 0 # REST HTTP client for records operations (integrated inference). # upsert_records and search use REST endpoints with no gRPC equivalent. from pinecone._internal.http_client import HTTPClient rest_config = PineconeConfig( api_key=resolved_key, host=self._host, timeout=timeout, source_tag=source_tag or "", ssl_verify=secure, ) self._http = HTTPClient(rest_config, DATA_PLANE_API_VERSION) self._adapter = VectorsAdapter() logger.info("GrpcIndex client created for host %s", self._host)
@property def host(self) -> str: """The data plane host URL for this index.""" return self._host def _get_batch_executor(self, max_concurrency: int) -> ThreadPoolExecutor: if self._batch_executor is None or self._batch_executor_workers != max_concurrency: if self._batch_executor is not None: self._batch_executor.shutdown(wait=False) self._batch_executor = ThreadPoolExecutor( max_concurrency, thread_name_prefix="pinecone-grpc-batch-upsert", ) self._batch_executor_workers = max_concurrency return self._batch_executor
[docs] def upsert( self, *, vectors: Sequence[ Vector | tuple[str, builtins.list[float]] | tuple[str, builtins.list[float], dict[str, Any]] | dict[str, Any] ], namespace: str = "", batch_size: int | None = None, max_concurrency: int = 4, show_progress: bool = True, timeout: float | None = None, ) -> UpsertResponse: """Upsert a batch of vectors into a namespace. If a vector with the same ID already exists in the namespace, it is overwritten. Args: vectors: Sequence of vectors to upsert. Each element can be a ``Vector`` instance, a tuple of ``(id, values)`` or ``(id, values, metadata)``, or a dict with ``id``, ``values``, and optional ``sparse_values`` / ``metadata`` keys. namespace (str): Target namespace. Defaults to the default (empty-string) namespace. batch_size (int | None): If set, splits ``vectors`` into batches of this size and submits them in **parallel** via a ``ThreadPoolExecutor``. ``None`` (default) sends all vectors in a single channel call. Must be a positive integer when set. max_concurrency (int): Number of parallel threads used when ``batch_size`` is set. Default ``4``, range ``[1, 64]``. Ignored when ``batch_size`` is ``None``. show_progress (bool): If ``True`` and ``tqdm`` is installed, display a progress bar while submitting batches. Ignored when ``batch_size`` is ``None``. Defaults to ``True``. timeout (float | None): Per-call timeout in seconds. Applied per batch when batching. None uses the client-level default. Returns: :class:`UpsertResponse` with the count of vectors upserted. Raises: :exc:`TypeError`: If a vector element is not a recognized format. :exc:`ValueError`: If a vector element is malformed. :exc:`PineconeValueError`: If ``batch_size`` is not a positive integer or ``max_concurrency`` is outside ``[1, 64]``. :exc:`PineconeTimeoutError`: If the call exceeds *timeout* or the server returns CANCELLED with a timeout cause. Notes: When ``batch_size`` is set, batches are submitted **in parallel** via a ``ThreadPoolExecutor`` of ``max_concurrency`` workers (default 4, range 1–64). Per-batch retries are handled by the gRPC channel's own retry policy. **Partial failures do not raise** — the returned :class:`UpsertResponse` carries ``upserted_count``, ``failed_item_count``, ``errors``, and ``failed_items`` for inspection / retry. Pass ``response.failed_items`` back to ``upsert(...)`` to retry only the failures. Examples: .. code-block:: python from pinecone.grpc import GrpcIndex from pinecone.models.vectors.vector import Vector idx = GrpcIndex(host="article-search-abc123.svc.pinecone.io", api_key="...") response = idx.upsert( vectors=[ Vector( id="article-101", values=[0.012, -0.087, 0.153, ...], # 1536-dim ), ("article-102", [0.045, 0.021, -0.064, ...]), {"id": "article-103", "values": [0.091, -0.032, 0.178, ...]}, ], namespace="articles-en", ) print(response.upserted_count) """ if batch_size is None: built = [VectorFactory.build(v) for v in vectors] grpc_vectors = [_vector_to_grpc_dict(v) for v in built] logger.info("Upserting %d vectors via gRPC into namespace %r", len(built), namespace) result = self._channel.upsert(grpc_vectors, namespace or None, timeout_s=timeout) return UpsertResponse(upserted_count=result.get("upserted_count", 0)) validate_batch_size(batch_size) require_in_range("max_concurrency", max_concurrency, 1, 64) built = [VectorFactory.build(v) for v in vectors] items: builtins.list[dict[str, Any]] = [_vector_to_grpc_dict(v) for v in built] def _operation(chunk: builtins.list[dict[str, Any]]) -> dict[str, Any]: return self._channel.upsert(chunk, namespace or None, timeout_s=timeout) batch_result = batch_execute( items=items, operation=_operation, batch_size=batch_size, max_concurrency=max_concurrency, show_progress=show_progress, desc="Upserting", executor=self._get_batch_executor(max_concurrency), ) return UpsertResponse( upserted_count=batch_result.successful_item_count, total_item_count=batch_result.total_item_count, failed_item_count=batch_result.failed_item_count, total_batch_count=batch_result.total_batch_count, successful_batch_count=batch_result.successful_batch_count, failed_batch_count=batch_result.failed_batch_count, errors=batch_result.errors, )
[docs] def query( self, *, top_k: int, vector: list[float] | None = None, id: str | None = None, namespace: str = "", filter: dict[str, Any] | None = None, include_values: bool = False, include_metadata: bool = False, sparse_vector: SparseValues | dict[str, Any] | None = None, scan_factor: float | None = None, max_candidates: int | None = None, timeout: float | None = None, ) -> QueryResponse: """Query a namespace for the nearest neighbors of a vector. Args: top_k (int): Number of results to return (must be >= 1). vector (list[float] | None): Dense query vector values. id (str | None): ID of a stored vector to use as the query. namespace (str): Namespace to query. Defaults to the default namespace. filter (dict[str, Any] | None): Metadata filter expression. include_values (bool): Whether to include vector values in results. include_metadata (bool): Whether to include metadata in results. sparse_vector (SparseValues | dict[str, Any] | None): Sparse query vector with indices and values. scan_factor (float | None): DRN optimization — adjusts how much of the index is scanned. Range 0.5–4.0. Only supported for dedicated read node indexes. None uses server default. max_candidates (int | None): DRN optimization — caps candidate vectors to rerank. Range 1–100000. Only supported for dedicated read node indexes. None uses server default. timeout (float | None): Per-call timeout in seconds. None uses the client-level default. Returns: :class:`QueryResponse` with matches, namespace, and usage info. Raises: :exc:`ValidationError`: If top_k < 1, both vector and id are provided, or none of vector, id, or sparse_vector are provided. :exc:`PineconeTimeoutError`: If the call exceeds *timeout* or the server returns CANCELLED with a timeout cause. Examples: .. code-block:: python response = idx.query( top_k=10, vector=[0.012, -0.087, 0.153, ...], # 1536-dim embedding ) for match in response.matches: print(match.id, match.score) """ if top_k < 1: raise ValidationError(f"top_k must be a positive integer, got {top_k}") has_vector = vector is not None has_id = id is not None has_sparse = sparse_vector is not None if has_vector and has_id: raise ValidationError("Exactly one of vector or id must be provided, not both") if not has_vector and not has_id and not has_sparse: raise ValidationError("At least one of vector, id, or sparse_vector must be provided") # Convert SparseValues model to dict for GrpcChannel sv_dict: dict[str, Any] | None = None if sparse_vector is not None: if isinstance(sparse_vector, SparseValues): sv_dict = { "indices": sparse_vector.indices, "values": sparse_vector.values, } else: sv_dict = sparse_vector logger.info("Querying index via gRPC with top_k=%d", top_k) result = self._channel.query( top_k, vector=vector, id=id, namespace=namespace or None, filter=filter, include_values=include_values, include_metadata=include_metadata, sparse_vector=sv_dict, scan_factor=scan_factor, max_candidates=max_candidates, timeout_s=timeout, ) matches = [_dict_to_scored_vector(m) for m in result.get("matches", [])] usage = _dict_to_usage(result.get("usage")) return QueryResponse( matches=matches, namespace=result.get("namespace", ""), usage=usage, )
[docs] def fetch( self, *, ids: list[str], namespace: str = "", timeout: float | None = None, ) -> FetchResponse: """Fetch vectors by their IDs from a namespace. Args: ids (list[str]): List of vector IDs to fetch (must be non-empty). namespace (str): Namespace to fetch from. Defaults to the default namespace. timeout (float | None): Per-call timeout in seconds. None uses the client-level default. Returns: :class:`FetchResponse` with a map of vector IDs to Vector objects, namespace, and usage info. Raises: :exc:`ValidationError`: If ids is empty. :exc:`PineconeTimeoutError`: If the call exceeds *timeout* or the server returns CANCELLED with a timeout cause. Examples: .. code-block:: python response = idx.fetch(ids=["article-101", "article-102"]) for vid, vec in response.vectors.items(): print(vid, vec.values) """ if not ids: raise ValidationError("ids must be a non-empty list") logger.info("Fetching %d vectors via gRPC", len(ids)) result = self._channel.fetch(ids, namespace=namespace or None, timeout_s=timeout) vectors: dict[str, Vector] = {} for vid, vdata in result.get("vectors", {}).items(): vectors[vid] = _dict_to_vector(vid, vdata) usage = _dict_to_usage(result.get("usage")) return FetchResponse( vectors=vectors, namespace=result.get("namespace", ""), usage=usage, )
[docs] def delete( self, *, ids: list[str] | None = None, delete_all: bool = False, filter: dict[str, Any] | None = None, namespace: str = "", timeout: float | None = None, ) -> None: """Delete vectors from a namespace by ID, filter, or delete-all flag. Exactly one of ``ids``, ``delete_all``, or ``filter`` must be specified. Args: ids (list[str] | None): List of vector IDs to delete. delete_all (bool): If True, delete all vectors in the namespace. filter (dict[str, Any] | None): Metadata filter expression selecting vectors to delete. namespace (str): Namespace to delete from. Defaults to the default namespace. timeout (float | None): Per-call timeout in seconds. None uses the client-level default. Returns: None Raises: :exc:`ValidationError`: If zero or more than one deletion mode is specified. :exc:`PineconeTimeoutError`: If the call exceeds *timeout* or the server returns CANCELLED with a timeout cause. Examples: .. code-block:: python # Delete by IDs idx.delete(ids=["article-101", "article-102"]) # Delete all vectors in a namespace idx.delete(delete_all=True, namespace="articles-deprecated") # Delete by metadata filter idx.delete(filter={"category": {"$eq": "obsolete"}}) """ mode_count = sum([ids is not None, delete_all, filter is not None]) if mode_count == 0: raise ValidationError("Must specify one of ids, delete_all, or filter") if mode_count > 1: raise ValidationError( "Cannot combine ids, delete_all, and filter — specify exactly one" ) logger.info("Deleting vectors via gRPC from namespace %r", namespace) self._channel.delete( ids=ids, delete_all=delete_all, namespace=namespace or None, filter=filter, timeout_s=timeout, )
[docs] def update( self, *, id: str | None = None, values: list[float] | None = None, sparse_values: SparseValues | dict[str, Any] | None = None, set_metadata: dict[str, Any] | None = None, namespace: str = "", filter: dict[str, Any] | None = None, dry_run: bool = False, timeout: float | None = None, ) -> UpdateResponse: """Update vectors by ID or metadata filter. Args: id (str | None): ID of the vector to update. values (list[float] | None): New dense vector values. sparse_values (SparseValues | dict[str, Any] | None): New sparse vector. set_metadata (dict[str, Any] | None): Metadata fields to set or overwrite. namespace (str): Namespace to target. Defaults to the default namespace. filter (dict[str, Any] | None): Metadata filter expression selecting vectors to update. dry_run (bool): If True, return the count of records that would be affected without applying changes. timeout (float | None): Per-call timeout in seconds. None uses the client-level default. Returns: :class:`UpdateResponse` with matched_records count (when available). Raises: :exc:`ValidationError`: If both or neither of id and filter are provided. :exc:`PineconeTimeoutError`: If the call exceeds *timeout* or the server returns CANCELLED with a timeout cause. Examples: .. code-block:: python # Update by ID idx.update(id="article-101", values=[0.012, -0.087, 0.153, ...]) # Bulk-update metadata by filter idx.update( filter={"genre": {"$eq": "drama"}}, set_metadata={"year": 2020}, ) """ has_id = id is not None has_filter = filter is not None if has_id and has_filter: raise ValidationError("Exactly one of id or filter must be provided, not both") if not has_id and not has_filter: raise ValidationError("Exactly one of id or filter must be provided, got neither") # Convert SparseValues model to dict for GrpcChannel sv_dict: dict[str, Any] | None = None if sparse_values is not None: if isinstance(sparse_values, SparseValues): sv_dict = { "indices": sparse_values.indices, "values": sparse_values.values, } else: sv_dict = sparse_values logger.info("Updating vectors via gRPC in namespace %r", namespace) # The Rust channel's update() requires `id` as a positional string arg. # For filter-based updates id is None, so pass "" which the API ignores # when a filter is provided. result = self._channel.update( id if id is not None else "", values=values, sparse_values=sv_dict, set_metadata=set_metadata, namespace=namespace or None, filter=filter, dry_run=dry_run or None, timeout_s=timeout, ) return UpdateResponse(matched_records=result.get("matched_records"))
[docs] def list_paginated( self, *, prefix: str | None = None, limit: int | None = None, pagination_token: str | None = None, namespace: str = "", timeout: float | None = None, ) -> ListResponse: """Fetch a single page of vector IDs from a namespace. Args: prefix (str | None): Return only IDs starting with this prefix. limit (int | None): Maximum number of IDs to return in this page. pagination_token (str | None): Token from a previous response to fetch the next page. namespace (str): Namespace to list from. Defaults to the default namespace. timeout (float | None): Per-call timeout in seconds. None uses the client-level default. Returns: :class:`ListResponse` with vector IDs, pagination info, namespace, and usage. Raises: :exc:`PineconeTimeoutError`: If the call exceeds *timeout* or the server returns CANCELLED with a timeout cause. Examples: .. code-block:: python response = idx.list_paginated(prefix="doc1#", limit=50) for item in response.vectors: print(item.id) """ logger.info("Listing vectors via gRPC in namespace %r", namespace) result = self._channel.list( prefix=prefix, limit=limit, pagination_token=pagination_token, namespace=namespace or None, timeout_s=timeout, ) vectors = [ListItem(id=v.get("id")) for v in result.get("vectors", [])] pagination_data = result.get("pagination") pagination = None if pagination_data is not None: pagination = Pagination(next=pagination_data.get("next")) usage = _dict_to_usage(result.get("usage")) return ListResponse( vectors=vectors, pagination=pagination, namespace=result.get("namespace", ""), usage=usage, )
[docs] def list( self, *, prefix: str | None = None, limit: int | None = None, namespace: str = "", timeout: float | None = None, ) -> Iterator[ListResponse]: """List vector IDs in a namespace, automatically following pagination. Yields one ``ListResponse`` per page. Args: prefix (str | None): Return only IDs starting with this prefix. limit (int | None): Maximum number of IDs to return per page. namespace (str): Namespace to list from. Defaults to the default namespace. timeout (float | None): Per-call timeout in seconds applied to each page request. None uses the client-level default. Yields: :class:`ListResponse` for each page of results. Raises: :exc:`PineconeTimeoutError`: If any page call exceeds *timeout* or the server returns CANCELLED with a timeout cause. Examples: .. code-block:: python for page in idx.list(prefix="doc1#"): for item in page.vectors: print(item.id) """ pagination_token: str | None = None while True: page = self.list_paginated( prefix=prefix, limit=limit, pagination_token=pagination_token, namespace=namespace, timeout=timeout, ) if page.vectors: yield page if page.pagination is not None and page.pagination.next is not None: pagination_token = page.pagination.next else: break
[docs] def describe_index_stats( self, *, filter: dict[str, Any] | None = None, timeout: float | None = None, ) -> DescribeIndexStatsResponse: """Return statistics for this index. Args: filter (dict[str, Any] | None): Metadata filter expression. When provided, only vectors matching the filter are counted. Returns: :class:`DescribeIndexStatsResponse` with namespace summaries, dimension, total vector count, and fullness metrics. Examples: .. code-block:: python stats = idx.describe_index_stats() print(stats.total_vector_count, stats.dimension) # With filter — only count vectors matching the expression stats = idx.describe_index_stats( filter={"genre": {"$eq": "drama"}} ) """ logger.info("Describing index stats via gRPC") result = self._channel.describe_index_stats(filter=filter, timeout_s=timeout) namespaces: dict[str, NamespaceSummary] = {} for ns_name, ns_data in result.get("namespaces", {}).items(): namespaces[ns_name] = NamespaceSummary( vector_count=ns_data.get("vector_count", 0), ) return DescribeIndexStatsResponse( namespaces=namespaces, dimension=result.get("dimension"), index_fullness=result.get("index_fullness", 0.0), total_vector_count=result.get("total_vector_count", 0), metric=result.get("metric"), vector_type=result.get("vector_type"), memory_fullness=result.get("memory_fullness"), storage_fullness=result.get("storage_fullness"), )
[docs] def upsert_from_dataframe( self, df: pd.DataFrame, namespace: str = "", batch_size: int = 500, show_progress: bool = True, ) -> UpsertResponse: """Upsert vectors from a pandas DataFrame using async batching. Splits the DataFrame into batches of ``batch_size`` rows and submits each batch asynchronously via :meth:`upsert_async`, then aggregates the results. Args: df: A ``pandas.DataFrame`` with at least ``id`` and ``values`` columns. ``sparse_values`` and ``metadata`` columns are included when present and non-None. namespace: Target namespace. Defaults to the default namespace. batch_size: Number of rows per upsert batch. Defaults to 500. show_progress: If ``True`` and ``tqdm`` is installed, display a progress bar. If ``tqdm`` is not installed, silently falls back to no progress bar. Returns: :class:`UpsertResponse` with the total count of vectors upserted across all batches. Raises: :exc:`RuntimeError`: If ``pandas`` is not installed. :exc:`PineconeValueError`: If *df* is not a ``pandas.DataFrame``. :exc:`PineconeValueError`: If *batch_size* is not a positive integer. Examples: .. code-block:: python import pandas as pd from pinecone.grpc import GrpcIndex idx = GrpcIndex( host="article-search-abc123.svc.pinecone.io", api_key="your-api-key", ) df = pd.DataFrame([ {"id": "article-101", "values": [0.012, -0.087, 0.153]}, {"id": "article-102", "values": [0.045, 0.021, -0.064]}, ]) response = idx.upsert_from_dataframe(df) response.upserted_count .. code-block:: python df = pd.DataFrame([ { "id": "article-101", "values": [0.012, -0.087, 0.153], "metadata": {"topic": "science", "year": 2024}, }, { "id": "article-102", "values": [0.045, 0.021, -0.064], "metadata": {"topic": "technology", "year": 2024}, }, ]) response = idx.upsert_from_dataframe( df, namespace="articles-en", batch_size=100, ) """ try: import pandas as pd except ImportError: raise RuntimeError( "pandas is required for upsert_from_dataframe. Install it with: pip install pandas" ) from None if not isinstance(df, pd.DataFrame): raise PineconeValueError("df must be a pandas DataFrame") validate_batch_size(batch_size) has_sparse = "sparse_values" in df.columns has_metadata = "metadata" in df.columns records: builtins.list[dict[str, Any]] = [] for _, row in df.iterrows(): record: dict[str, Any] = {"id": row["id"], "values": row["values"]} if has_sparse and row["sparse_values"] is not None: record["sparse_values"] = row["sparse_values"] if has_metadata and row["metadata"] is not None: record["metadata"] = row["metadata"] records.append(record) batches = chunked(records, batch_size) futures: builtins.list[PineconeFuture[UpsertResponse]] = [ self.upsert_async(vectors=batch, namespace=namespace) for batch in with_progress(batches, show_progress=show_progress) ] total_count = 0 for future in futures: result = future.result() total_count += result.upserted_count return UpsertResponse(upserted_count=total_count)
# ------------------------------------------------------------------ # Async (future-returning) variants # ------------------------------------------------------------------
[docs] def upsert_async( self, *, vectors: Sequence[ Vector | tuple[str, builtins.list[float]] | tuple[str, builtins.list[float], dict[str, Any]] | dict[str, Any] ], namespace: str = "", timeout: float | None = None, ) -> PineconeFuture[UpsertResponse]: """Submit an upsert operation and return a :class:`PineconeFuture`. Same parameters as :meth:`upsert`, including ``timeout (float | None)`` which sets a per-call timeout in seconds. Returns: :class:`PineconeFuture` [:class:`UpsertResponse`] that resolves to the upsert result. Examples: .. code-block:: python future = index.upsert_async( vectors=[("doc-42", [0.012, -0.087, 0.153])], ) result = future.result() result.upserted_count # 1 """ future: PineconeFuture[UpsertResponse] = PineconeFuture( self._executor.submit( self.upsert, vectors=vectors, namespace=namespace, timeout=timeout ) ) return future
[docs] def query_async( self, *, top_k: int, vector: builtins.list[float] | None = None, id: str | None = None, namespace: str = "", filter: dict[str, Any] | None = None, include_values: bool = False, include_metadata: bool = False, sparse_vector: SparseValues | dict[str, Any] | None = None, scan_factor: float | None = None, max_candidates: int | None = None, timeout: float | None = None, ) -> PineconeFuture[QueryResponse]: """Submit a query operation and return a :class:`PineconeFuture`. Same parameters as :meth:`query`, including ``timeout (float | None)`` which sets a per-call timeout in seconds. Returns: :class:`PineconeFuture` [:class:`QueryResponse`] that resolves to the query result containing scored matches. Examples: .. code-block:: python future = index.query_async( vector=[0.012, -0.087, 0.153], top_k=5, ) result = future.result() result.matches[0].id # 'doc-42' result.matches[0].score # 0.95 """ future: PineconeFuture[QueryResponse] = PineconeFuture( self._executor.submit( self.query, top_k=top_k, vector=vector, id=id, namespace=namespace, filter=filter, include_values=include_values, include_metadata=include_metadata, sparse_vector=sparse_vector, scan_factor=scan_factor, max_candidates=max_candidates, timeout=timeout, ) ) return future
[docs] def fetch_async( self, *, ids: builtins.list[str], namespace: str = "", timeout: float | None = None, ) -> PineconeFuture[FetchResponse]: """Submit a fetch operation and return a :class:`PineconeFuture`. Same parameters as :meth:`fetch`, including ``timeout (float | None)`` which sets a per-call timeout in seconds. Returns: :class:`PineconeFuture` [:class:`FetchResponse`] that resolves to the fetched vectors keyed by ID. Examples: .. code-block:: python future = index.fetch_async(ids=["doc-42", "doc-43"]) result = future.result() result.vectors["doc-42"].values # [0.012, -0.087, 0.153] """ future: PineconeFuture[FetchResponse] = PineconeFuture( self._executor.submit(self.fetch, ids=ids, namespace=namespace, timeout=timeout) ) return future
[docs] def delete_async( self, *, ids: builtins.list[str] | None = None, delete_all: bool = False, filter: dict[str, Any] | None = None, namespace: str = "", timeout: float | None = None, ) -> PineconeFuture[None]: """Submit a delete operation and return a :class:`PineconeFuture`. Same parameters as :meth:`delete`, including ``timeout (float | None)`` which sets a per-call timeout in seconds. Returns: :class:`PineconeFuture` [None] that resolves when the delete operation completes. Examples: .. code-block:: python future = index.delete_async(ids=["doc-42", "doc-43"]) future.result() .. code-block:: python future = index.delete_async(delete_all=True, namespace="docs") future.result() """ future: PineconeFuture[None] = PineconeFuture( self._executor.submit( self.delete, ids=ids, delete_all=delete_all, filter=filter, namespace=namespace, timeout=timeout, ) ) return future
[docs] def update_async( self, *, id: str | None = None, values: builtins.list[float] | None = None, sparse_values: SparseValues | dict[str, Any] | None = None, set_metadata: dict[str, Any] | None = None, filter: dict[str, Any] | None = None, namespace: str = "", dry_run: bool = False, timeout: float | None = None, ) -> PineconeFuture[UpdateResponse]: """Submit an update call without blocking; returns a :class:`PineconeFuture`.""" return PineconeFuture( self._executor.submit( self.update, id=id, values=values, sparse_values=sparse_values, set_metadata=set_metadata, filter=filter, namespace=namespace, dry_run=dry_run, timeout=timeout, ) )
[docs] def upsert_records( self, *, records: builtins.list[dict[str, Any]], namespace: str, timeout: float | None = None, ) -> UpsertRecordsResponse: """Upsert records for indexes with integrated inference. Records are sent as newline-delimited JSON (NDJSON) over REST. Embeddings are generated server-side. This method delegates to the REST endpoint because the Pinecone gRPC API does not expose a records upsert operation. Args: records: List of record dicts. Each must contain an ``_id`` or ``id`` field. Additional fields are passed through for server-side embedding. namespace (str): Target namespace (required). Use ``""`` for the default namespace. Returns: :class:`UpsertRecordsResponse` with the count of records submitted. Raises: :exc:`PineconeValueError`: If namespace is not a string or is empty/whitespace, records is empty, or a record is missing an identifier field. :exc:`ApiError`: If the API returns an error response. :exc:`PineconeConnectionError`: If a network-level connection fails (DNS, refused, transport error). :exc:`PineconeTimeoutError`: If the request exceeds the configured timeout. Examples: .. code-block:: python pc = Pinecone(api_key="YOUR_API_KEY") idx = pc.index("my-index", grpc=True) response = idx.upsert_records( namespace="articles-en", records=[ {"_id": "article-101", "text": "Vector DBs enable similarity search."}, {"_id": "article-102", "text": "RAG combines search with LLMs."}, ], ) print(response.record_count) """ if not isinstance(namespace, str): raise ValidationError("namespace must be a string") if not namespace or not namespace.strip(): raise ValidationError("namespace must be a non-empty string") if not records: raise ValidationError("records must be a non-empty list") for i, record in enumerate(records): if "_id" not in record and "id" not in record: raise ValidationError(f"Record at index {i} must contain an '_id' or 'id' field") import orjson normalized: builtins.list[dict[str, Any]] = [] for record in records: r = dict(record) if "_id" not in r and "id" in r: r["_id"] = r.pop("id") normalized.append(r) ndjson_lines = [orjson.dumps(r).decode("utf-8") for r in normalized] ndjson_body = "\n".join(ndjson_lines) + "\n" logger.info( "Upserting %d records into namespace %r (NDJSON via REST)", len(records), namespace ) response = self._http.post( f"/records/namespaces/{namespace}/upsert", timeout=timeout, content=ndjson_body.encode("utf-8"), headers={"Content-Type": "application/x-ndjson"}, ) result = UpsertRecordsResponse(record_count=len(records)) result.response_info = extract_response_info(response) return result
[docs] def search( self, *, namespace: str, top_k: int, inputs: SearchInputs | dict[str, Any] | None = None, vector: builtins.list[float] | None = None, id: str | None = None, filter: dict[str, Any] | None = None, fields: builtins.list[str] | None = None, rerank: RerankConfig | dict[str, Any] | None = None, match_terms: dict[str, Any] | None = None, timeout: float | None = None, ) -> SearchRecordsResponse: """Search records by text, vector, or ID with optional reranking. Delegates to the REST endpoint because the Pinecone gRPC API does not expose a records search operation for integrated inference indexes. .. note:: Use this method for indexes with integrated inference. For classic indexes where you provide your own vectors, use :meth:`query`. Args: namespace (str): Namespace to search in (required). top_k (int): Number of results to return (must be >= 1). inputs (SearchInputs | dict[str, Any] | None): Inputs for server-side embedding (e.g. ``{"text": "query text"}``). vector (list[float] | None): Dense query vector values. id (str | None): ID of an existing record to use as the query. filter (dict[str, Any] | None): Metadata filter expression. fields (list[str] | None): Field names to include in results. When ``None``, the server returns all available fields. rerank (RerankConfig | dict[str, Any] | None): Reranking configuration with ``model`` (required), ``rank_fields`` (required), and optional ``top_n``, ``parameters``, ``query`` keys. Use :class:`RerankConfig` for IDE autocompletion. match_terms (dict[str, Any] | None): Term-matching constraint for sparse search. Requires keys ``"strategy"`` (currently only ``"all"``) and ``"terms"`` (list of strings). Only supported for sparse indexes using ``pinecone-sparse-english-v0``. ``None`` disables term matching. Returns: :class:`SearchRecordsResponse` with hits and usage statistics. Raises: :exc:`PineconeValueError`: If ``namespace`` is not a string, ``top_k < 1``, or ``rerank`` is missing required keys. :exc:`ApiError`: If the API returns an error response. :exc:`PineconeConnectionError`: If a network-level connection fails (DNS, refused, transport error). :exc:`PineconeTimeoutError`: If the request exceeds the configured timeout. Examples: .. code-block:: python response = idx.search( namespace="articles-en", top_k=10, inputs={"text": "benefits of vector databases for search"}, ) for hit in response.result.hits: print(hit.id, hit.score) Search with reranking: .. code-block:: python response = idx.search( namespace="articles-en", top_k=10, inputs={"text": "benefits of vector databases"}, rerank={ "model": "bge-reranker-v2-m3", "rank_fields": ["text"], "top_n": 5, }, ) for hit in response.result.hits: print(hit.id, hit.score) .. note:: Use inline ``rerank`` when searching and reranking in a single call. Use ``pc.inference.rerank()`` when reranking results from a different source or when you need to rerank without searching. """ if not isinstance(namespace, str): raise ValidationError("namespace must be a string") if not namespace or not namespace.strip(): raise ValidationError("namespace must be a non-empty string") if top_k < 1: raise ValidationError(f"top_k must be a positive integer, got {top_k}") if rerank is not None: if "model" not in rerank: raise ValidationError("rerank requires 'model' to be specified") if "rank_fields" not in rerank: raise ValidationError("rerank requires 'rank_fields' to be specified") if inputs is None and vector is None and id is None: raise ValidationError( "At least one of inputs, vector, or id must be provided as a query source" ) query_body: dict[str, Any] = {"top_k": top_k} if inputs is not None: query_body["inputs"] = inputs if vector is not None: query_body["vector"] = vector if id is not None: query_body["id"] = id if filter is not None: query_body["filter"] = filter if match_terms is not None: query_body["match_terms"] = match_terms body: dict[str, Any] = {"query": query_body} if fields is not None: body["fields"] = fields if rerank is not None: body["rerank"] = rerank logger.info("Searching namespace %r with top_k=%d (via REST)", namespace, top_k) response = self._http.post( f"/records/namespaces/{namespace}/search", timeout=timeout, json=body ) result = self._adapter.to_search_response(response.content) result.response_info = extract_response_info(response) return result
[docs] def search_records( self, *, namespace: str, top_k: int, inputs: SearchInputs | dict[str, Any] | None = None, vector: builtins.list[float] | None = None, id: str | None = None, filter: dict[str, Any] | None = None, fields: builtins.list[str] | None = None, rerank: RerankConfig | dict[str, Any] | None = None, match_terms: dict[str, Any] | None = None, timeout: float | None = None, ) -> SearchRecordsResponse: """Alias for :meth:`search`. Prefer calling :meth:`search` directly — this alias exists for backwards compatibility. """ return self.search( namespace=namespace, top_k=top_k, inputs=inputs, vector=vector, id=id, filter=filter, fields=fields, rerank=rerank, match_terms=match_terms, timeout=timeout, )
[docs] def close(self) -> None: """Close the underlying gRPC channel, REST client, and release resources.""" self._executor.shutdown(wait=True) if self._batch_executor is not None: self._batch_executor.shutdown(wait=False) self._http.close() if hasattr(self._channel, "close"): self._channel.close()
[docs] def __enter__(self) -> GrpcIndex: return self
[docs] def __exit__(self, *args: Any) -> None: self.close()
# Legacy capitalisation alias (BCG-141). GRPCIndex = GrpcIndex # Legacy name (renamed from PineconeGrpcFuture in the rewrite — BCG-143). PineconeGrpcFuture = PineconeFuture from pinecone.grpc.pinecone_grpc import PineconeGRPC # noqa: E402 __all__ = ["GRPCIndex", "GrpcIndex", "PineconeGRPC", "PineconeGrpcFuture"]