Source code for pinecone.async_client.assistants

"""Async Assistants namespace — control-plane operations for Pinecone assistants."""

from __future__ import annotations

import asyncio
import builtins
import io
import logging
import os
import time
from collections.abc import AsyncIterator
from typing import IO, TYPE_CHECKING, Any

import anyio
import msgspec
import msgspec.structs
import orjson

from pinecone._internal.adapters.assistants_adapter import AssistantsAdapter
from pinecone._internal.constants import (
    ASSISTANT_API_VERSION,
    ASSISTANT_API_VERSION_2026_04,
    ASSISTANT_EVALUATION_BASE_URL,
    DEFAULT_BASE_URL,
)
from pinecone.async_client._assistants_legacy import AsyncAssistantsLegacyNamespaceMixin
from pinecone.errors.exceptions import (
    NotFoundError,
    PineconeError,
    PineconeTimeoutError,
    PineconeValueError,
)
from pinecone.models.assistant.chat import ChatCompletionResponse, ChatResponse
from pinecone.models.assistant.context import ContextResponse
from pinecone.models.assistant.evaluation import AlignmentResult
from pinecone.models.assistant.file_model import AssistantFileModel
from pinecone.models.assistant.list import ListAssistantsResponse, ListFilesResponse
from pinecone.models.assistant.message import Message
from pinecone.models.assistant.model import AssistantModel
from pinecone.models.assistant.options import ContextOptions
from pinecone.models.assistant.streaming import (
    AsyncChatCompletionStream,
    AsyncChatStream,
    ChatCompletionStreamChunk,
    ChatStreamChunk,
)
from pinecone.models.pagination import AsyncPaginator, Page

if TYPE_CHECKING:
    from pinecone._internal.config import PineconeConfig
    from pinecone._internal.http_client import AsyncHTTPClient

logger = logging.getLogger(__name__)

_VALID_REGIONS = ("us", "eu")
_CREATE_POLL_INTERVAL_SECONDS = 0.5
_DELETE_POLL_INTERVAL_SECONDS = 5
_UPLOAD_POLL_INTERVAL_SECONDS = 5


[docs] class AsyncAssistants(AsyncAssistantsLegacyNamespaceMixin): """Async control-plane operations for Pinecone assistants. Args: config (PineconeConfig): SDK configuration used to construct an HTTP client targeting the assistant API version. Examples: .. code-block:: python from pinecone import AsyncPinecone async with AsyncPinecone(api_key="your-api-key") as pc: assistants = pc.assistants """
[docs] def __init__(self, config: PineconeConfig) -> None: from pinecone._internal.config import PineconeConfig as _PineconeConfig from pinecone._internal.http_client import AsyncHTTPClient as _AsyncHTTPClient self._config = config cp_host = (config.host or DEFAULT_BASE_URL).rstrip("/") cp_config = _PineconeConfig( api_key=config.api_key, host=f"{cp_host}/assistant", timeout=config.timeout, additional_headers=config.additional_headers, source_tag=config.source_tag or "", proxy_url=config.proxy_url or "", proxy_headers=config.proxy_headers, ssl_ca_certs=config.ssl_ca_certs, ssl_verify=config.ssl_verify, connection_pool_maxsize=config.connection_pool_maxsize, retry_config=config.retry_config, ) self._http = _AsyncHTTPClient(cp_config, ASSISTANT_API_VERSION) self._adapter = AssistantsAdapter() self._data_plane_clients: dict[str, AsyncHTTPClient] = {} eval_config = _PineconeConfig( api_key=config.api_key, host=ASSISTANT_EVALUATION_BASE_URL, timeout=config.timeout, additional_headers=config.additional_headers, source_tag=config.source_tag or "", proxy_url=config.proxy_url or "", proxy_headers=config.proxy_headers, ssl_ca_certs=config.ssl_ca_certs, ssl_verify=config.ssl_verify, connection_pool_maxsize=config.connection_pool_maxsize, retry_config=config.retry_config, ) self._eval_http = _AsyncHTTPClient(eval_config, ASSISTANT_API_VERSION)
[docs] async def close(self) -> None: """Close the underlying HTTP client and any cached data-plane clients.""" await self._http.close() await self._eval_http.close() for client in self._data_plane_clients.values(): await client.close() self._data_plane_clients.clear()
async def _data_plane_http(self, assistant_name: str) -> AsyncHTTPClient: """Return an AsyncHTTPClient targeting the assistant's data-plane host. Caches clients by assistant name to avoid repeated describe calls. """ if assistant_name not in self._data_plane_clients: from pinecone._internal.config import PineconeConfig as _PineconeConfig from pinecone._internal.http_client import AsyncHTTPClient as _AsyncHTTPClient assistant = await self.describe(name=assistant_name) if not assistant.host: raise PineconeValueError(f"Assistant '{assistant_name}' has no data-plane host") data_config = _PineconeConfig( api_key=self._config.api_key, host=f"{assistant.host.rstrip('/')}/assistant", timeout=self._config.timeout, additional_headers=self._config.additional_headers, source_tag=self._config.source_tag or "", proxy_url=self._config.proxy_url or "", proxy_headers=self._config.proxy_headers, ssl_ca_certs=self._config.ssl_ca_certs, ssl_verify=self._config.ssl_verify, connection_pool_maxsize=self._config.connection_pool_maxsize, retry_config=self._config.retry_config, ) self._data_plane_clients[assistant_name] = _AsyncHTTPClient( data_config, ASSISTANT_API_VERSION ) return self._data_plane_clients[assistant_name] def _attach_ref(self, model: AssistantModel) -> AssistantModel: """Attach a back-reference to *self* on *model* for legacy method detection. Called after every API response that constructs an :class:`AssistantModel` so that ``_resolve_assistants`` can detect that the model came from an async namespace and raise a clear :exc:`TypeError` directing callers to the async namespace method. Uses the same ``__dict__`` write technique as sync :class:`Assistants` to bypass msgspec's field-restricted ``__setattr__``. """ model.__dict__["_assistants"] = self return model def __repr__(self) -> str: """Return developer-friendly representation.""" return "AsyncAssistants()"
[docs] async def create( self, *, name: str | None = None, instructions: str | None = None, metadata: dict[str, Any] | None = None, region: str = "us", timeout: float | None = None, **kwargs: Any, ) -> AssistantModel: """Create a new Pinecone assistant. Creates an assistant and optionally polls until it reaches ``"Ready"`` status. The assistant starts in ``"Initializing"`` status. Args: name (str): Name for the new assistant. Must be 1-63 characters, start and end with an alphanumeric character, and consist only of lowercase alphanumeric characters or hyphens. instructions (str | None): Optional directive for the assistant. Maximum 16 KB. metadata (dict[str, Any] | None): Optional metadata dictionary. Defaults to an empty dict if not provided. region (str): Region to deploy the assistant in. Must be ``"us"`` or ``"eu"`` (case-sensitive). Defaults to ``"us"``. timeout (float | None): Seconds to wait for the assistant to become ready. Use ``None`` (default) to poll indefinitely. Use ``-1`` to return immediately without polling. Use ``0`` or a positive value to poll with a deadline. Returns: :class:`AssistantModel` describing the created assistant. Raises: :exc:`PineconeValueError`: If *region* is not ``"us"`` or ``"eu"``. :exc:`PineconeTimeoutError`: If the assistant does not become ready before the deadline. :exc:`ApiError`: If the API returns an error response. Examples: .. code-block:: python from pinecone import AsyncPinecone async with AsyncPinecone(api_key="your-api-key") as pc: assistant = await pc.assistants.create(name="my-assistant") """ from pinecone._internal.kwargs_aliases import ( reject_unknown_kwargs, remap_legacy_kwargs, ) remapped = remap_legacy_kwargs( kwargs, aliases={"assistant_name": "name"}, method_name="create", ) reject_unknown_kwargs(remapped, allowed={"name"}, method_name="create") if "name" in remapped: if name is not None: raise PineconeValueError( "create() received both 'assistant_name' (legacy) and 'name'. " "Pass only one — prefer 'name'." ) name = remapped["name"] if name is None: raise PineconeValueError( "create() missing required argument: 'name' (or legacy alias 'assistant_name')." ) if region not in _VALID_REGIONS: raise PineconeValueError(f"region must be one of {_VALID_REGIONS!r}, got {region!r}") body: dict[str, Any] = { "name": name, "instructions": instructions, "metadata": metadata if metadata is not None else {}, "region": region, } logger.info("Creating assistant %r", name) response = await self._http.post("/assistants", json=body) model = self._attach_ref(self._adapter.to_assistant(response.content)) logger.debug("Created assistant %r (status=%s)", name, model.status) if timeout == -1: return model return await self._poll_until_ready(name, timeout)
[docs] async def describe(self, *, name: str | None = None, **kwargs: Any) -> AssistantModel: """Get detailed information about a named assistant. Args: name (str): The name of the assistant to describe. Returns: :class:`AssistantModel` with name, status, created_at, updated_at, metadata, instructions, and host. Raises: :exc:`ApiError`: If the API returns an error response (e.g. 404 when the assistant does not exist). Examples: .. code-block:: python assistant = await pc.assistants.describe(name="my-assistant") print(assistant.status) """ from pinecone._internal.kwargs_aliases import ( reject_unknown_kwargs, remap_legacy_kwargs, ) remapped = remap_legacy_kwargs( kwargs, aliases={"assistant_name": "name"}, method_name="describe", ) reject_unknown_kwargs(remapped, allowed={"name"}, method_name="describe") if "name" in remapped: if name is not None: raise PineconeValueError( "describe() received both 'assistant_name' (legacy) and 'name'. " "Pass only one — prefer 'name'." ) name = remapped["name"] if name is None: raise PineconeValueError( "describe() missing required argument: 'name' (or legacy alias 'assistant_name')." ) logger.info("Describing assistant %r", name) response = await self._http.get(f"/assistants/{name}") model = self._attach_ref(self._adapter.to_assistant(response.content)) logger.debug("Described assistant %r (status=%s)", name, model.status) return model
[docs] def list( self, *, limit: int | None = None, pagination_token: str | None = None, ) -> AsyncPaginator[AssistantModel]: """List assistants in the project with transparent lazy pagination. Args: limit (int | None): Maximum number of assistants to yield across all pages. ``None`` (default) yields all assistants. pagination_token (str | None): Token to resume pagination from a previous call. Returns: :class:`AsyncPaginator` over :class:`AssistantModel` objects. Supports ``async for`` loops, ``.to_list()``, ``.pages()``, and ``limit``. Raises: :exc:`ApiError`: If the API returns an error response. Examples: .. code-block:: python async for a in pc.assistants.list(): print(a.name, a.status) all_assistants = await pc.assistants.list().to_list() """ logger.info("Listing assistants") async def fetch_page(token: str | None) -> Page[AssistantModel]: result = await self.list_page(pagination_token=token) return Page(items=result.assistants, pagination_token=result.next) return AsyncPaginator(fetch_page=fetch_page, initial_token=pagination_token, limit=limit)
[docs] async def list_page( self, *, page_size: int | None = None, pagination_token: str | None = None, **kwargs: Any, ) -> ListAssistantsResponse: """List one page of assistants with explicit pagination control. Args: page_size (int | None): Maximum number of assistants per page. pagination_token (str | None): Token from a previous response to fetch the next page. Returns: :class:`ListAssistantsResponse` with an ``assistants`` list and an optional ``next`` continuation token. Raises: :exc:`ApiError`: If the API returns an error response. Examples: .. code-block:: python page = await pc.assistants.list_page(page_size=10) for a in page.assistants: print(a.name) if page.next: next_page = await pc.assistants.list_page(pagination_token=page.next) """ from pinecone._internal.kwargs_aliases import ( reject_unknown_kwargs, remap_legacy_kwargs, ) remapped = remap_legacy_kwargs( kwargs, aliases={"limit": "page_size"}, method_name="list_page", ) reject_unknown_kwargs(remapped, allowed={"page_size"}, method_name="list_page") if "page_size" in remapped: if page_size is not None: raise PineconeValueError( "list_page() received both 'limit' (legacy) and 'page_size'. " "Pass only one — prefer 'page_size'." ) page_size = remapped["page_size"] params: dict[str, str | int] = {} if page_size is not None: params["pageSize"] = page_size if pagination_token is not None: params["paginationToken"] = pagination_token logger.info("Listing assistants page") response = await self._http.get("/assistants", params=params) result = self._adapter.to_assistant_list(response.content) for item in result.assistants: self._attach_ref(item) logger.debug( "Listed %d assistants (has_next=%s)", len(result.assistants), result.next is not None, ) return result
[docs] async def update( self, *, name: str | None = None, instructions: str | None = None, metadata: dict[str, Any] | None = None, **kwargs: Any, ) -> AssistantModel: """Update an existing Pinecone assistant. Updates the specified assistant's instructions and/or metadata. Metadata is fully replaced (not merged) when provided. Args: name (str): The name of the assistant to update. instructions (str | None): New instructions for the assistant. Pass an empty string to clear existing instructions. metadata (dict[str, Any] | None): New metadata dictionary. Fully replaces any existing metadata rather than merging. Returns: :class:`AssistantModel` describing the updated assistant. Raises: :exc:`ApiError`: If the API returns an error response (e.g. 404 when the assistant does not exist). Examples: .. code-block:: python # Update an assistant's instructions assistant = await pc.assistants.update( name="my-assistant", instructions="You are a helpful research assistant.", ) .. code-block:: python # Replace an assistant's metadata assistant = await pc.assistants.update( name="my-assistant", metadata={"team": "ml", "version": "2"}, ) """ from pinecone._internal.kwargs_aliases import ( reject_unknown_kwargs, remap_legacy_kwargs, ) remapped = remap_legacy_kwargs( kwargs, aliases={"assistant_name": "name"}, method_name="update", ) reject_unknown_kwargs(remapped, allowed={"name"}, method_name="update") if "name" in remapped: if name is not None: raise PineconeValueError( "update() received both 'assistant_name' (legacy) and 'name'. " "Pass only one — prefer 'name'." ) name = remapped["name"] if name is None: raise PineconeValueError( "update() missing required argument: 'name' (or legacy alias 'assistant_name')." ) body: dict[str, Any] = {} if instructions is not None: body["instructions"] = instructions if metadata is not None: body["metadata"] = metadata logger.info("Updating assistant %r", name) response = await self._http.patch(f"/assistants/{name}", json=body) model = self._attach_ref(self._adapter.to_assistant(response.content)) logger.debug("Updated assistant %r", name) return model
[docs] async def delete( self, *, name: str | None = None, timeout: float | None = None, **kwargs: Any, ) -> None: """Delete a Pinecone assistant by name. Sends a DELETE request, then polls every 5 seconds until the assistant is confirmed gone (404 from describe). Other errors during polling propagate immediately. Args: name (str): The name of the assistant to delete. timeout (float | None): Seconds to wait for the assistant to disappear. Use ``None`` (default) to poll indefinitely. Use ``-1`` to return immediately without polling. Use a positive value to poll with a deadline. Raises :exc:`PineconeTimeoutError` if the assistant is not gone before the deadline. **kwargs (Any): Accepted for backwards compatibility only. Unknown kwargs raise :exc:`PineconeValueError`. Returns: None. Raises: :exc:`PineconeTimeoutError`: If the assistant still exists after *timeout* seconds. :exc:`ApiError`: If the API returns an error response. :rtype: None Examples: .. code-block:: python await pc.assistants.delete(name="my-assistant") # Return immediately without waiting for deletion await pc.assistants.delete(name="my-assistant", timeout=-1) """ from pinecone._internal.kwargs_aliases import ( reject_unknown_kwargs, remap_legacy_kwargs, ) remapped = remap_legacy_kwargs( kwargs, aliases={"assistant_name": "name"}, method_name="delete", ) reject_unknown_kwargs(remapped, allowed={"name"}, method_name="delete") if "name" in remapped: if name is not None: raise PineconeValueError( "delete() received both 'assistant_name' (legacy) and 'name'. " "Pass only one — prefer 'name'." ) name = remapped["name"] if name is None: raise PineconeValueError( "delete() missing required argument: 'name' (or legacy alias 'assistant_name')." ) logger.info("Deleting assistant %r", name) await self._http.delete(f"/assistants/{name}") logger.debug("Deleted assistant %r", name) if timeout == -1: return start = time.monotonic() while True: try: await self.describe(name=name) except NotFoundError: return if timeout is not None: elapsed = time.monotonic() - start if elapsed >= timeout: raise PineconeTimeoutError(f"Assistant '{name}' still exists after {timeout}s") await asyncio.sleep(_DELETE_POLL_INTERVAL_SECONDS)
[docs] async def describe_file( self, *, assistant_name: str, file_id: str, include_url: bool = False, ) -> AssistantFileModel: """Get the status and metadata of a file uploaded to an assistant. Args: assistant_name: Name of the assistant that owns the file. file_id: Unique identifier of the file to retrieve. include_url: If ``True``, include a signed download URL in the response. Defaults to ``False``. Returns: :class:`AssistantFileModel` with file metadata and status. Raises: :exc:`NotFoundError`: If the file does not exist. :exc:`ApiError`: If the API returns an error response. Examples: .. code-block:: python file = await pc.assistants.describe_file( assistant_name="my-assistant", file_id="file-abc123", ) print(file.status) """ data_http = await self._data_plane_http(assistant_name) params: dict[str, str] = {} if include_url: params["include_url"] = "true" logger.info("Describing file %r in assistant %r", file_id, assistant_name) response = await data_http.get(f"/files/{assistant_name}/{file_id}", params=params) return self._adapter.to_file(response.content)
[docs] async def list_files_page( self, *, assistant_name: str, page_size: int | None = None, pagination_token: str | None = None, filter: dict[str, Any] | None = None, **kwargs: Any, ) -> ListFilesResponse: """List one page of files for an assistant with explicit pagination control. Args: assistant_name: Name of the assistant whose files to list. page_size: Maximum number of files per page. pagination_token: Token from a previous response to fetch the next page. filter: Optional metadata filter expression. Serialized to a JSON string before being sent to the API. Returns: :class:`ListFilesResponse` with a ``files`` list and an optional ``next`` continuation token. Raises: :exc:`ApiError`: If the API returns an error response. Examples: .. code-block:: python page = await pc.assistants.list_files_page( assistant_name="my-assistant", ) for f in page.files: print(f.name) if page.next: next_page = await pc.assistants.list_files_page( assistant_name="my-assistant", pagination_token=page.next, ) """ from pinecone._internal.kwargs_aliases import ( reject_unknown_kwargs, remap_legacy_kwargs, ) remapped = remap_legacy_kwargs( kwargs, aliases={"limit": "page_size"}, method_name="list_files_page", ) reject_unknown_kwargs(remapped, allowed={"page_size"}, method_name="list_files_page") if "page_size" in remapped: if page_size is not None: raise PineconeValueError( "list_files_page() received both 'limit' (legacy) and 'page_size'. " "Pass only one — prefer 'page_size'." ) page_size = remapped["page_size"] import json as _json data_http = await self._data_plane_http(assistant_name) params: dict[str, str | int] = {} if page_size is not None: params["pageSize"] = page_size if pagination_token is not None: params["paginationToken"] = pagination_token if filter is not None: params["filter"] = _json.dumps(filter) logger.info("Listing files page for assistant %r", assistant_name) response = await data_http.get(f"/files/{assistant_name}", params=params) result = self._adapter.to_file_list(response.content) logger.debug( "Listed %d files for assistant %r (has_next=%s)", len(result.files), assistant_name, result.next is not None, ) return result
[docs] def list_files( self, *, assistant_name: str, filter: dict[str, Any] | None = None, limit: int | None = None, pagination_token: str | None = None, ) -> AsyncPaginator[AssistantFileModel]: """List files for an assistant with lazy async pagination. Args: assistant_name: Name of the assistant whose files to list. filter: Optional metadata filter expression. Serialized to a JSON string before being sent to the API. limit: Maximum number of files to yield across all pages. ``None`` (default) yields all files. pagination_token: Token to resume pagination from a previous call. Returns: :class:`AsyncPaginator` over :class:`AssistantFileModel` objects. Supports ``async for`` loops, ``.to_list()``, ``.pages()``, and ``limit``. Raises: :exc:`ApiError`: If the API returns an error response. Examples: .. code-block:: python async for f in pc.assistants.list_files(assistant_name="my-assistant"): print(f.name, f.status) files = await pc.assistants.list_files(assistant_name="my-assistant").to_list() """ logger.info("Listing files for assistant %r", assistant_name) async def fetch_page(token: str | None) -> Page[AssistantFileModel]: result = await self.list_files_page( assistant_name=assistant_name, pagination_token=token, filter=filter, ) return Page(items=result.files, pagination_token=result.next) return AsyncPaginator(fetch_page=fetch_page, initial_token=pagination_token, limit=limit)
async def _poll_file_until_processed( self, data_http: AsyncHTTPClient, assistant_name: str, file_id: str, timeout: float | None, ) -> AssistantFileModel: """Poll ``GET /files/{assistant_name}/{file_id}`` until processing completes.""" start = time.monotonic() while True: response = await data_http.get(f"/files/{assistant_name}/{file_id}") file_model = self._adapter.to_file(response.content) if file_model.status != "Processing": if file_model.status == "ProcessingFailed": error_msg = file_model.error_message or "Unknown processing error" raise PineconeError(f"File processing failed for '{file_id}': {error_msg}") return file_model if timeout is not None: elapsed = time.monotonic() - start if elapsed >= timeout: raise PineconeTimeoutError( f"File processing timed out after {timeout}s (operation_id={file_id})" ) await asyncio.sleep(_UPLOAD_POLL_INTERVAL_SECONDS) async def _upsert_http(self, assistant_name: str) -> AsyncHTTPClient: """Return an AsyncHTTPClient for the assistant's data-plane host using API 2026-04.""" from pinecone._internal.config import PineconeConfig as _PineconeConfig from pinecone._internal.http_client import AsyncHTTPClient as _AsyncHTTPClient assistant = await self.describe(name=assistant_name) if not assistant.host: raise PineconeValueError(f"Assistant '{assistant_name}' has no data-plane host") data_config = _PineconeConfig( api_key=self._config.api_key, host=f"{assistant.host.rstrip('/')}/assistant", timeout=self._config.timeout, additional_headers=self._config.additional_headers, source_tag=self._config.source_tag or "", proxy_url=self._config.proxy_url or "", proxy_headers=self._config.proxy_headers, ssl_ca_certs=self._config.ssl_ca_certs, ssl_verify=self._config.ssl_verify, connection_pool_maxsize=self._config.connection_pool_maxsize, retry_config=self._config.retry_config, ) return _AsyncHTTPClient(data_config, ASSISTANT_API_VERSION_2026_04) async def _poll_operation_until_done( self, upsert_http: AsyncHTTPClient, assistant_name: str, operation_id: str, timeout: float | None, ) -> None: """Poll ``GET /operations/{assistant_name}/{operation_id}`` until done.""" start = time.monotonic() while True: response = await upsert_http.get(f"/operations/{assistant_name}/{operation_id}") op_model = self._adapter.to_operation(response.content) if op_model.status != "Processing": if op_model.status == "Failed": error_msg = op_model.error or "Unknown operation error" raise PineconeError( f"Upsert operation failed for operation '{operation_id}': {error_msg}" ) return if timeout is not None: elapsed = time.monotonic() - start if elapsed >= timeout: raise PineconeTimeoutError( f"Upsert operation timed out after {timeout}s (operation_id={operation_id})" ) await asyncio.sleep(_UPLOAD_POLL_INTERVAL_SECONDS)
[docs] async def upload_file( self, *, assistant_name: str, file_path: str | None = None, file_stream: IO[bytes] | None = None, file_name: str | None = None, metadata: dict[str, Any] | None = None, multimodal: bool | None = None, file_id: str | None = None, timeout: float | None = None, ) -> AssistantFileModel: """Upload a file to a Pinecone assistant. Uploads a file from a local path or an in-memory byte stream, then polls until server-side processing completes. Args: assistant_name: Name of the target assistant. file_path: Path to a local file to upload. Mutually exclusive with *file_stream*. file_stream: An open byte stream to upload. Mutually exclusive with *file_path*. Use *file_name* to set the filename. file_name: Filename to associate with *file_stream*. Ignored when *file_path* is provided. metadata: Optional metadata dictionary. Sent as a JSON string. multimodal: Whether to enable multimodal processing for PDFs. file_id: Optional caller-specified file identifier for upsert behavior. timeout: Seconds to wait for processing to complete. ``None`` (default) polls indefinitely. Use ``-1`` to return immediately after upload with one describe call. Raises :exc:`PineconeTimeoutError` if processing is not done before the deadline. Returns: :class:`AssistantFileModel` fetched fresh from the API after processing completes. Raises: :exc:`PineconeValueError`: If both or neither of *file_path* and *file_stream* are provided, or if *file_path* does not exist. :exc:`PineconeTimeoutError`: If processing does not complete before *timeout*. :exc:`PineconeError`: If server-side processing fails. Examples: .. code-block:: python file = await async_pc.assistants.upload_file( assistant_name="research-assistant", file_path="/data/report.pdf", ) print(file.status) with open("report.pdf", "rb") as f: file = await async_pc.assistants.upload_file( assistant_name="research-assistant", file_stream=f, file_name="report.pdf", metadata={"source": "quarterly-review"}, ) print(file.status) """ import json as _json if (file_path is None) == (file_stream is None): raise PineconeValueError("Exactly one of file_path or file_stream must be provided") handle: IO[bytes] if file_path is not None: if not await anyio.Path(file_path).is_file(): raise PineconeValueError(f"File not found: {file_path}") handle = io.BytesIO(await anyio.Path(file_path).read_bytes()) upload_name = os.path.basename(file_path) else: if file_stream is None: raise PineconeValueError("Exactly one of file_path or file_stream must be provided") handle = file_stream upload_name = file_name or "upload" data_http = await self._data_plane_http(assistant_name) params: dict[str, str] = {} if metadata is not None: params["metadata"] = _json.dumps(metadata) if multimodal is not None: params["multimodal"] = str(multimodal).lower() if file_id is not None: # Use the 2026-04 upsert endpoint: PUT /files/{assistant_name}/{file_id} upsert_http = await self._upsert_http(assistant_name) logger.info( "Upserting file %r (id=%s) to assistant %r", upload_name, file_id, assistant_name, ) upsert_response = await upsert_http.put( f"/files/{assistant_name}/{file_id}", files={"file": (upload_name, handle)}, params=params, ) op_model = self._adapter.to_operation(upsert_response.content) operation_id = op_model.operation_id if timeout == -1: return await self.describe_file(assistant_name=assistant_name, file_id=file_id) await self._poll_operation_until_done( upsert_http, assistant_name, operation_id, timeout ) return await self.describe_file(assistant_name=assistant_name, file_id=file_id) logger.info("Uploading file %r to assistant %r", upload_name, assistant_name) response = await data_http.post( f"/files/{assistant_name}", files={"file": (upload_name, handle)}, params=params, ) file_model = self._adapter.to_file(response.content) logger.debug( "Uploaded file %r (id=%s, status=%s)", upload_name, file_model.id, file_model.status, ) if timeout == -1: return await self.describe_file(assistant_name=assistant_name, file_id=file_model.id) return await self._poll_file_until_processed( data_http, assistant_name, file_model.id, timeout )
[docs] async def delete_file( self, *, assistant_name: str, file_id: str, timeout: float | None = None, ) -> None: """Delete a file from a Pinecone assistant. Sends a DELETE request, then polls every 5 seconds until the file is confirmed gone (404 from describe_file). Other errors during polling propagate immediately. Args: assistant_name (str): Name of the assistant that owns the file. file_id (str): Unique identifier of the file to delete. timeout (float | None): Seconds to wait for the file to be deleted. Use ``None`` (default) to poll indefinitely. Use ``-1`` to return immediately without polling. Use a positive value to poll with a deadline. Raises :exc:`PineconeTimeoutError` if the file is not gone before the deadline. Raises: :exc:`PineconeError`: If server-side file deletion fails. :exc:`PineconeTimeoutError`: If the file still exists after *timeout* seconds. :exc:`ApiError`: If the API returns an error response. :rtype: None Examples: .. code-block:: python await pc.assistants.delete_file( assistant_name="my-assistant", file_id="file-abc123", ) """ data_http = await self._data_plane_http(assistant_name) logger.info("Deleting file %r from assistant %r", file_id, assistant_name) await data_http.delete(f"/files/{assistant_name}/{file_id}") logger.debug("Deleted file %r from assistant %r", file_id, assistant_name) if timeout == -1: return start = time.monotonic() while True: try: file_model = await self.describe_file( assistant_name=assistant_name, file_id=file_id ) except NotFoundError: return if file_model.status not in ("Deleting", None): error_msg = file_model.error_message or "Unknown deletion error" raise PineconeError(f"File deletion failed for '{file_id}': {error_msg}") if timeout is not None: elapsed = time.monotonic() - start if elapsed >= timeout: raise PineconeTimeoutError(f"File '{file_id}' still exists after {timeout}s") await asyncio.sleep(_DELETE_POLL_INTERVAL_SECONDS)
[docs] async def context( self, *, assistant_name: str, query: str | None = None, messages: builtins.list[Message | dict[str, str]] | None = None, filter: dict[str, Any] | None = None, top_k: int | None = None, snippet_size: int | None = None, multimodal: bool | None = None, include_binary_content: bool | None = None, ) -> ContextResponse: """Retrieve relevant context snippets from a Pinecone assistant. Retrieves context snippets matching a text query or conversation history. Exactly one of *query* or *messages* must be provided and non-empty. Args: assistant_name: Name of the assistant to retrieve context from. query: Text query to use for context retrieval. Mutually exclusive with *messages*. Empty string is treated as not provided. messages: Conversation messages to use for context retrieval. Mutually exclusive with *query*. Empty list is treated as not provided. Dicts are converted to :class:`Message` objects. filter: Metadata filter restricting which documents contribute context. Omitted from request when ``None``. top_k: Maximum number of context snippets to return. Omitted from request when ``None``. snippet_size: Maximum snippet size in tokens. Omitted from request when ``None``. multimodal: Whether to include image-related context snippets. Omitted from request when ``None``. include_binary_content: Whether image snippets include base64 image data. Only meaningful when *multimodal* is ``True``. Omitted from request when ``None``. Returns: :class:`ContextResponse` containing the matching context snippets. Raises: :exc:`PineconeValueError`: If both or neither of *query* and *messages* are provided (or if they are empty). :exc:`ApiError`: If the API returns an error response. Examples: .. code-block:: python # Retrieve context using a text query response = await pc.assistants.context( assistant_name="my-assistant", query="What is Pinecone?", ) for snippet in response.snippets: print(snippet.content) """ query_truthy = query is not None and query != "" messages_truthy = messages is not None and len(messages) > 0 if query_truthy and messages_truthy: raise PineconeValueError("Exactly one of query or messages must be provided, not both.") if not query_truthy and not messages_truthy: raise PineconeValueError("Exactly one of query or messages must be provided.") body: dict[str, Any] = {} if query_truthy: body["query"] = query else: if messages is None: raise PineconeValueError("Exactly one of query or messages must be provided.") parsed: list[Message] = [ m if isinstance(m, Message) else Message.from_dict(m) for m in messages ] body["messages"] = [{"role": m.role, "content": m.content} for m in parsed] if filter is not None: body["filter"] = filter if top_k is not None: body["top_k"] = top_k if snippet_size is not None: body["snippet_size"] = snippet_size if multimodal is not None: body["multimodal"] = multimodal if include_binary_content is not None: body["include_binary_content"] = include_binary_content http = await self._data_plane_http(assistant_name) response = await http.post(f"/chat/{assistant_name}/context", json=body) return self._adapter.to_context_response(response.content)
[docs] async def chat( self, *, assistant_name: str, messages: builtins.list[Message | dict[str, str]], model: str = "gpt-4o", stream: bool = False, temperature: float | None = None, filter: dict[str, Any] | None = None, json_response: bool = False, include_highlights: bool = False, context_options: ContextOptions | dict[str, Any] | None = None, ) -> ChatResponse | AsyncChatStream: """Chat with an assistant and receive citations in Pinecone-native format. Args: assistant_name (str): Name of the assistant to chat with. messages (list[Message | dict[str, str]]): Conversation messages. Dicts are converted to :class:`Message` objects; role defaults to ``"user"`` when not present. model (str): Large language model to use. Defaults to ``"gpt-4o"``. stream (bool): If ``True``, return an :class:`AsyncChatStream`. Defaults to ``False``. temperature (float | None): Controls randomness. Lower values produce more deterministic responses. Omitted from request when ``None``. filter (dict[str, Any] | None): Metadata filter restricting which documents are used as context. Omitted from request when ``None``. json_response (bool): If ``True``, instruct the assistant to return a JSON response. Cannot be used with streaming. include_highlights (bool): If ``True``, include highlight snippets from referenced documents in citations. context_options (ContextOptions | dict[str, Any] | None): Options controlling context retrieval. Omitted from request when ``None``. Returns: :class:`ChatResponse` for non-streaming requests, or an :class:`AsyncChatStream` for streaming requests. Raises: :exc:`PineconeValueError`: If both ``stream=True`` and ``json_response=True`` are specified. :exc:`ApiError`: If the API returns an error response. Examples: .. code-block:: python # Non-streaming chat import asyncio from pinecone import AsyncPinecone pc = AsyncPinecone(api_key="your-api-key") async def main() -> None: response = await pc.assistants.chat( assistant_name="my-assistant", messages=[{"content": "What is Pinecone?"}], ) asyncio.run(main()) .. code-block:: python # Streaming chat async def stream_main() -> None: stream = await pc.assistants.chat( assistant_name="my-assistant", messages=[{"content": "What is Pinecone?"}], stream=True, ) async for text in stream.text(): print(text, end="", flush=True) asyncio.run(stream_main()) """ if stream and json_response: raise PineconeValueError("json_response cannot be used with stream=True") parsed: list[Message] = [ m if isinstance(m, Message) else Message.from_dict(m) for m in messages ] body: dict[str, Any] = { "messages": [{"role": m.role, "content": m.content} for m in parsed], "model": model, "stream": stream, } if temperature is not None: body["temperature"] = temperature if filter is not None: body["filter"] = filter if json_response: body["json_response"] = json_response if stream or include_highlights: body["include_highlights"] = include_highlights if context_options is not None: if isinstance(context_options, dict): body["context_options"] = context_options else: body["context_options"] = { k: v for k, v in msgspec.structs.asdict(context_options).items() if v is not None } data_http = await self._data_plane_http(assistant_name) if stream: return AsyncChatStream( self._chat_streaming(data_http=data_http, url=f"/chat/{assistant_name}", body=body) ) response = await data_http.post(f"/chat/{assistant_name}", json=body) return self._adapter.to_chat_response(response.content)
async def _chat_streaming( self, *, data_http: AsyncHTTPClient, url: str, body: dict[str, Any], ) -> AsyncIterator[ChatStreamChunk]: """Stream Pinecone-native chat chunks via SSE. POSTs to the given *url* with ``stream=True`` in the body, parses each SSE line, and yields typed chunk objects dispatched by the ``type`` field. Args: data_http: AsyncHTTPClient targeting the assistant's data-plane host. url: Request URL path (e.g. ``/chat/{assistant_name}``). body: Pre-built request body (must include ``stream=True``). Yields: :class:`StreamMessageStart`, :class:`StreamContentChunk`, :class:`StreamCitationChunk`, or :class:`StreamMessageEnd` depending on the ``type`` field of each SSE chunk. Raises: :exc:`ApiError`: If the server returns an HTTP error. """ async with data_http.stream( "POST", url, content=orjson.dumps(body), headers={"Content-Type": "application/json"}, ) as response: async for line in response.aiter_lines(): if not line: continue if not line.startswith("data:"): continue line = line[5:].lstrip() if not line: continue if line == "[DONE]": break chunk_data: dict[str, Any] = orjson.loads(line) try: yield msgspec.convert(chunk_data, ChatStreamChunk) except msgspec.ValidationError: logger.debug("Skipping unknown chunk type: %s", chunk_data.get("type"))
[docs] async def chat_completions( self, *, assistant_name: str, messages: builtins.list[Message | dict[str, str]], model: str = "gpt-4o", stream: bool = False, temperature: float | None = None, filter: dict[str, Any] | None = None, ) -> ChatCompletionResponse | AsyncChatCompletionStream: """Chat with an assistant using an OpenAI-compatible interface. Returns responses in OpenAI chat completion format. Useful when you need inline citations or OpenAI-compatible responses. Has limited functionality compared to the standard :meth:`chat` interface — does not support ``include_highlights``, ``context_options``, or ``json_response`` parameters. Args: assistant_name (str): Name of the assistant to chat with. messages (list[Message | dict[str, str]]): Conversation messages. Dicts are converted to :class:`Message` objects; role defaults to ``"user"`` when not present. model (str): Large language model to use. Defaults to ``"gpt-4o"``. Not validated client-side — any string is accepted. stream (bool): If ``True``, return an async streaming iterator. Defaults to ``False``. temperature (float | None): Controls randomness. Lower values produce more deterministic responses. Omitted from request when ``None``. filter (dict[str, Any] | None): Metadata filter restricting which documents are used as context. Omitted from request when ``None``. Returns: :class:`ChatCompletionResponse` for non-streaming requests, or an :class:`AsyncIterator[ChatCompletionStreamChunk]` for streaming. Raises: :exc:`ApiError`: If the API returns an error response. Examples: .. code-block:: python # Non-streaming chat completion import asyncio from pinecone import AsyncPinecone pc = AsyncPinecone(api_key="your-api-key") async def main() -> None: response = await pc.assistants.chat_completions( assistant_name="research-assistant", messages=[{"content": "Explain quantum entanglement briefly."}], ) print(response.choices[0].message.content) asyncio.run(main()) .. code-block:: python # Streaming chat completion async def stream_main() -> None: stream = await pc.assistants.chat_completions( assistant_name="research-assistant", messages=[{"content": "Explain quantum entanglement briefly."}], stream=True, ) async for chunk in stream: print(chunk) asyncio.run(stream_main()) """ parsed: list[Message] = [ m if isinstance(m, Message) else Message.from_dict(m) for m in messages ] body: dict[str, Any] = { "messages": [{"role": m.role, "content": m.content} for m in parsed], "model": model, "stream": stream, } if temperature is not None: body["temperature"] = temperature if filter is not None: body["filter"] = filter data_http = await self._data_plane_http(assistant_name) if stream: return AsyncChatCompletionStream( self._chat_completions_streaming( data_http=data_http, url=f"/chat/{assistant_name}/chat/completions", body=body, ) ) response = await data_http.post(f"/chat/{assistant_name}/chat/completions", json=body) return self._adapter.to_chat_completion_response(response.content)
async def _chat_completions_streaming( self, *, data_http: AsyncHTTPClient, url: str, body: dict[str, Any], ) -> AsyncIterator[ChatCompletionStreamChunk]: """Stream OpenAI-compatible chat completion chunks via SSE. POSTs to the given *url* with ``stream=True`` in the body and yields each SSE line parsed as a :class:`ChatCompletionStreamChunk`. Args: data_http: AsyncHTTPClient targeting the assistant's data-plane host. url: Request URL path (e.g. ``/chat/{assistant_name}/chat/completions``). body: Pre-built request body (must include ``stream=True``). Yields: :class:`ChatCompletionStreamChunk` for each non-empty SSE line. Raises: :exc:`ApiError`: If the server returns an HTTP error. """ async with data_http.stream( "POST", url, content=orjson.dumps(body), headers={"Content-Type": "application/json"}, ) as response: async for line in response.aiter_lines(): if not line: continue if not line.startswith("data:"): continue line = line[5:].lstrip() if not line: continue if line == "[DONE]": break yield msgspec.convert(orjson.loads(line), ChatCompletionStreamChunk)
[docs] async def evaluate_alignment( self, *, question: str, answer: str, ground_truth_answer: str, ) -> AlignmentResult: """Evaluate answer alignment against a ground truth answer. Measures the correctness and completeness of a generated answer with respect to a ground truth answer. Alignment is the harmonic mean of correctness (precision) and completeness (recall). Args: question: The question for which the answer was generated. answer: The generated answer to evaluate. ground_truth_answer: The ground truth answer to compare against. Returns: :class:`AlignmentResult` with aggregate scores, per-fact entailment results, and token usage statistics. Raises: :exc:`ApiError`: If the API returns an error response. Examples: .. code-block:: python result = await pc.assistants.evaluate_alignment( question="What is the capital of Spain?", answer="Barcelona.", ground_truth_answer="Madrid.", ) print(result.scores.alignment) """ body = { "question": question, "answer": answer, "ground_truth_answer": ground_truth_answer, } logger.info("Evaluating alignment for question %r", question) response = await self._eval_http.post("/evaluation/metrics/alignment", json=body) result = self._adapter.to_alignment_result(response.content) logger.debug("Alignment evaluation complete (alignment=%.3f)", result.scores.alignment) return result
async def _poll_until_ready(self, name: str, timeout: float | None) -> AssistantModel: """Poll ``GET /assistants/{name}`` until status is ``"Ready"`` or timeout.""" start = time.monotonic() while True: response = await self._http.get(f"/assistants/{name}") model = self._attach_ref(self._adapter.to_assistant(response.content)) if model.status == "Ready": return model if model.status in ("Failed", "InitializationFailed"): raise PineconeError( f"Assistant '{name}' entered terminal state '{model.status}'. " f"Check status with pc.assistants.describe(name='{name}')." ) if timeout is not None: elapsed = time.monotonic() - start if elapsed >= timeout: raise PineconeTimeoutError( f"Assistant '{name}' not ready after {timeout}s. " f"Check status with pc.assistants.describe(name='{name}')." ) await asyncio.sleep(_CREATE_POLL_INTERVAL_SECONDS)