"""Assistants namespace — control-plane operations for Pinecone assistants."""
from __future__ import annotations
import builtins
import logging
import os
import time
from collections.abc import Iterator
from typing import IO, TYPE_CHECKING, Any
import msgspec
import msgspec.structs
import orjson
from pinecone._internal.adapters.assistants_adapter import AssistantsAdapter
from pinecone._internal.constants import (
ASSISTANT_API_VERSION,
ASSISTANT_API_VERSION_2026_04,
ASSISTANT_EVALUATION_BASE_URL,
DEFAULT_BASE_URL,
)
from pinecone.client._assistants_legacy import AssistantsLegacyNamespaceMixin
from pinecone.errors.exceptions import (
NotFoundError,
PineconeError,
PineconeTimeoutError,
PineconeValueError,
)
from pinecone.models.assistant.chat import ChatCompletionResponse, ChatResponse
from pinecone.models.assistant.context import ContextResponse
from pinecone.models.assistant.evaluation import AlignmentResult
from pinecone.models.assistant.file_model import AssistantFileModel
from pinecone.models.assistant.list import ListAssistantsResponse, ListFilesResponse
from pinecone.models.assistant.message import Message
from pinecone.models.assistant.model import AssistantModel
from pinecone.models.assistant.options import ContextOptions
from pinecone.models.assistant.streaming import (
ChatCompletionStream,
ChatCompletionStreamChunk,
ChatStream,
ChatStreamChunk,
)
from pinecone.models.pagination import Page, Paginator
if TYPE_CHECKING:
from pinecone._internal.config import PineconeConfig
from pinecone._internal.http_client import HTTPClient
logger = logging.getLogger(__name__)
_VALID_REGIONS = ("us", "eu")
_CREATE_POLL_INTERVAL_SECONDS = 0.5
_DELETE_POLL_INTERVAL_SECONDS = 5
_UPLOAD_POLL_INTERVAL_SECONDS = 5
[docs]
class Assistants(AssistantsLegacyNamespaceMixin):
"""Control-plane operations for Pinecone assistants.
Args:
config (PineconeConfig): SDK configuration used to construct an
HTTP client targeting the assistant API version.
Examples:
.. code-block:: python
from pinecone import Pinecone
pc = Pinecone(api_key="your-api-key")
assistants = pc.assistants
"""
[docs]
def __init__(self, config: PineconeConfig) -> None:
from pinecone._internal.config import PineconeConfig as _PineconeConfig
from pinecone._internal.http_client import HTTPClient as _HTTPClient
self._config = config
cp_host = (config.host or DEFAULT_BASE_URL).rstrip("/")
cp_config = _PineconeConfig(
api_key=config.api_key,
host=f"{cp_host}/assistant",
timeout=config.timeout,
additional_headers=config.additional_headers,
source_tag=config.source_tag or "",
proxy_url=config.proxy_url or "",
proxy_headers=config.proxy_headers,
ssl_ca_certs=config.ssl_ca_certs,
ssl_verify=config.ssl_verify,
connection_pool_maxsize=config.connection_pool_maxsize,
retry_config=config.retry_config,
)
self._http = _HTTPClient(cp_config, ASSISTANT_API_VERSION)
self._adapter = AssistantsAdapter()
self._data_plane_clients: dict[str, HTTPClient] = {}
eval_config = _PineconeConfig(
api_key=config.api_key,
host=ASSISTANT_EVALUATION_BASE_URL,
timeout=config.timeout,
additional_headers=config.additional_headers,
source_tag=config.source_tag or "",
proxy_url=config.proxy_url or "",
proxy_headers=config.proxy_headers,
ssl_ca_certs=config.ssl_ca_certs,
ssl_verify=config.ssl_verify,
connection_pool_maxsize=config.connection_pool_maxsize,
retry_config=config.retry_config,
)
self._eval_http = _HTTPClient(eval_config, ASSISTANT_API_VERSION)
def _attach_ref(self, model: AssistantModel) -> AssistantModel:
"""Attach a back-reference to *self* on *model* for legacy method delegation.
Called after every API response that constructs an :class:`AssistantModel`
so that legacy instance methods (``assistant.chat(...)``, etc.) can
delegate to this :class:`Assistants` namespace.
``AssistantModel`` is declared with ``dict=True`` so instances have a
``__dict__``. We write directly into ``model.__dict__`` to bypass
msgspec's field-restricted ``__setattr__``, which only allows setting
declared struct fields.
"""
model.__dict__["_assistants"] = self
return model
[docs]
def close(self) -> None:
"""Close the underlying HTTP client and any cached data-plane clients."""
self._http.close()
self._eval_http.close()
for client in self._data_plane_clients.values():
client.close()
self._data_plane_clients.clear()
def __repr__(self) -> str:
"""Return developer-friendly representation."""
return "Assistants()"
def _data_plane_http(self, assistant_name: str) -> HTTPClient:
"""Return an HTTPClient targeting the assistant's data-plane host.
Caches clients by assistant name to avoid repeated describe calls.
"""
if assistant_name not in self._data_plane_clients:
from pinecone._internal.config import PineconeConfig as _PineconeConfig
from pinecone._internal.http_client import HTTPClient as _HTTPClient
assistant = self.describe(name=assistant_name)
if not assistant.host:
raise PineconeValueError(f"Assistant '{assistant_name}' has no data-plane host")
data_config = _PineconeConfig(
api_key=self._config.api_key,
host=f"{assistant.host.rstrip('/')}/assistant",
timeout=self._config.timeout,
additional_headers=self._config.additional_headers,
source_tag=self._config.source_tag or "",
proxy_url=self._config.proxy_url or "",
proxy_headers=self._config.proxy_headers,
ssl_ca_certs=self._config.ssl_ca_certs,
ssl_verify=self._config.ssl_verify,
connection_pool_maxsize=self._config.connection_pool_maxsize,
retry_config=self._config.retry_config,
)
self._data_plane_clients[assistant_name] = _HTTPClient(
data_config, ASSISTANT_API_VERSION
)
return self._data_plane_clients[assistant_name]
def _upsert_http(self, assistant_name: str) -> HTTPClient:
"""Return an HTTPClient targeting the assistant's data-plane host for 2026-04 upserts."""
from pinecone._internal.config import PineconeConfig as _PineconeConfig
from pinecone._internal.http_client import HTTPClient as _HTTPClient
assistant = self.describe(name=assistant_name)
if not assistant.host:
raise PineconeValueError(f"Assistant '{assistant_name}' has no data-plane host")
data_config = _PineconeConfig(
api_key=self._config.api_key,
host=f"{assistant.host.rstrip('/')}/assistant",
timeout=self._config.timeout,
additional_headers=self._config.additional_headers,
source_tag=self._config.source_tag or "",
proxy_url=self._config.proxy_url or "",
proxy_headers=self._config.proxy_headers,
ssl_ca_certs=self._config.ssl_ca_certs,
ssl_verify=self._config.ssl_verify,
connection_pool_maxsize=self._config.connection_pool_maxsize,
retry_config=self._config.retry_config,
)
return _HTTPClient(data_config, ASSISTANT_API_VERSION_2026_04)
def _poll_operation_until_done(
self,
upsert_http: HTTPClient,
assistant_name: str,
operation_id: str,
timeout: float | None,
) -> None:
"""Poll ``GET /operations/{assistant_name}/{operation_id}`` until done."""
start = time.monotonic()
while True:
response = upsert_http.get(f"/operations/{assistant_name}/{operation_id}")
op_model = self._adapter.to_operation(response.content)
if op_model.status != "Processing":
if op_model.status == "Failed":
error_msg = op_model.error or "Unknown operation error"
raise PineconeError(
f"Upsert operation failed for operation '{operation_id}': {error_msg}"
)
return
if timeout is not None:
elapsed = time.monotonic() - start
if elapsed >= timeout:
raise PineconeTimeoutError(
f"Upsert operation timed out after {timeout}s (operation_id={operation_id})"
)
time.sleep(_UPLOAD_POLL_INTERVAL_SECONDS)
[docs]
def upload_file(
self,
*,
assistant_name: str,
file_path: str | None = None,
file_stream: IO[bytes] | None = None,
file_name: str | None = None,
metadata: dict[str, Any] | None = None,
multimodal: bool | None = None,
file_id: str | None = None,
timeout: float | None = None,
) -> AssistantFileModel:
"""Upload a file to a Pinecone assistant.
Uploads a file from a local path or an in-memory byte stream, then
polls until server-side processing completes.
Args:
assistant_name: Name of the target assistant.
file_path: Path to a local file to upload. Mutually exclusive
with *file_stream*.
file_stream: An open byte stream to upload. Mutually exclusive
with *file_path*. Use *file_name* to set the filename.
file_name: Filename to associate with *file_stream*. Ignored
when *file_path* is provided.
metadata: Optional metadata dictionary. Sent as a JSON string.
multimodal: Whether to enable multimodal processing for PDFs.
file_id: Optional caller-specified file identifier for upsert
behavior.
timeout: Seconds to wait for processing to complete. ``None``
(default) polls indefinitely. Use ``-1`` to return
immediately after upload with one describe call. Raises
:exc:`PineconeTimeoutError` if processing is not done
before the deadline.
Returns:
:class:`AssistantFileModel` fetched fresh from the API after
processing completes.
Raises:
:exc:`PineconeValueError`: If both or neither of *file_path*
and *file_stream* are provided, or if *file_path* does not
exist.
:exc:`PineconeTimeoutError`: If processing does not complete
before *timeout*.
:exc:`PineconeError`: If server-side processing fails.
Examples:
>>> file = pc.assistants.upload_file(
... assistant_name="research-assistant",
... file_path="/data/report.pdf",
... )
>>> file.status # doctest: +SKIP
'Available'
"""
import json as _json
if (file_path is None) == (file_stream is None):
raise PineconeValueError("Exactly one of file_path or file_stream must be provided")
opened_file: IO[bytes] | None = None
if file_path is not None:
if not os.path.isfile(file_path):
raise PineconeValueError(f"File not found: {file_path}")
opened_file = open(file_path, "rb") # noqa: SIM115
handle: IO[bytes] = opened_file
upload_name = os.path.basename(file_path)
else:
if file_stream is None:
raise PineconeValueError("Exactly one of file_path or file_stream must be provided")
handle = file_stream
upload_name = file_name or "upload"
try:
data_http = self._data_plane_http(assistant_name)
params: dict[str, str] = {}
if metadata is not None:
params["metadata"] = _json.dumps(metadata)
if multimodal is not None:
params["multimodal"] = str(multimodal).lower()
if file_id is not None:
# Use the 2026-04 upsert endpoint: PUT /files/{assistant_name}/{file_id}
upsert_http = self._upsert_http(assistant_name)
logger.info(
"Upserting file %r (id=%s) to assistant %r",
upload_name,
file_id,
assistant_name,
)
upsert_response = upsert_http.put(
f"/files/{assistant_name}/{file_id}",
files={"file": (upload_name, handle)},
params=params,
)
op_model = self._adapter.to_operation(upsert_response.content)
operation_id = op_model.operation_id
if timeout == -1:
return self.describe_file(assistant_name=assistant_name, file_id=file_id)
self._poll_operation_until_done(upsert_http, assistant_name, operation_id, timeout)
return self.describe_file(assistant_name=assistant_name, file_id=file_id)
logger.info("Uploading file %r to assistant %r", upload_name, assistant_name)
response = data_http.post(
f"/files/{assistant_name}",
files={"file": (upload_name, handle)},
params=params,
)
file_model = self._adapter.to_file(response.content)
logger.debug(
"Uploaded file %r (id=%s, status=%s)",
upload_name,
file_model.id,
file_model.status,
)
finally:
if opened_file is not None:
opened_file.close()
if timeout == -1:
return self.describe_file(assistant_name=assistant_name, file_id=file_model.id)
return self._poll_file_until_processed(data_http, assistant_name, file_model.id, timeout)
def _poll_file_until_processed(
self,
data_http: HTTPClient,
assistant_name: str,
file_id: str,
timeout: float | None,
) -> AssistantFileModel:
"""Poll ``GET /files/{assistant_name}/{file_id}`` until processing completes."""
start = time.monotonic()
while True:
response = data_http.get(f"/files/{assistant_name}/{file_id}")
file_model = self._adapter.to_file(response.content)
if file_model.status != "Processing":
if file_model.status == "ProcessingFailed":
error_msg = file_model.error_message or "Unknown processing error"
raise PineconeError(f"File processing failed for '{file_id}': {error_msg}")
return file_model
if timeout is not None:
elapsed = time.monotonic() - start
if elapsed >= timeout:
raise PineconeTimeoutError(
f"File processing timed out after {timeout}s (operation_id={file_id})"
)
time.sleep(_UPLOAD_POLL_INTERVAL_SECONDS)
[docs]
def describe_file(
self,
*,
assistant_name: str,
file_id: str,
include_url: bool = False,
) -> AssistantFileModel:
"""Get the status and metadata of a file uploaded to an assistant.
Args:
assistant_name: Name of the assistant that owns the file.
file_id: Unique identifier of the file to retrieve.
include_url: If ``True``, include a signed download URL in the
response. Defaults to ``False``.
Returns:
:class:`AssistantFileModel` with file metadata and status.
Raises:
:exc:`NotFoundError`: If the file does not exist.
:exc:`ApiError`: If the API returns an error response.
Examples:
>>> file = pc.assistants.describe_file(
... assistant_name="my-assistant",
... file_id="file-abc123",
... )
>>> file.status # doctest: +SKIP
'Available'
"""
data_http = self._data_plane_http(assistant_name)
params: dict[str, str] = {}
if include_url:
params["include_url"] = "true"
logger.info("Describing file %r in assistant %r", file_id, assistant_name)
response = data_http.get(f"/files/{assistant_name}/{file_id}", params=params)
return self._adapter.to_file(response.content)
[docs]
def list_files(
self,
*,
assistant_name: str,
filter: dict[str, Any] | None = None,
limit: int | None = None,
pagination_token: str | None = None,
) -> Paginator[AssistantFileModel]:
"""List files for an assistant with lazy pagination.
Args:
assistant_name: Name of the assistant whose files to list.
filter: Optional metadata filter expression. Serialized to a JSON
string before being sent to the API.
limit: Maximum number of files to yield across all pages. ``None``
(default) yields all files.
pagination_token: Token to resume pagination from a previous call.
Returns:
:class:`Paginator` over :class:`AssistantFileModel` objects.
Supports ``for`` loops, ``.to_list()``, ``.pages()``, and
``limit``.
Raises:
:exc:`ApiError`: If the API returns an error response.
Examples:
.. code-block:: python
for f in pc.assistants.list_files(assistant_name="my-assistant"):
print(f.name, f.status)
files = pc.assistants.list_files(assistant_name="my-assistant").to_list()
"""
logger.info("Listing files for assistant %r", assistant_name)
def fetch_page(token: str | None) -> Page[AssistantFileModel]:
result = self.list_files_page(
assistant_name=assistant_name,
pagination_token=token,
filter=filter,
)
return Page(items=result.files, pagination_token=result.next)
return Paginator(fetch_page=fetch_page, initial_token=pagination_token, limit=limit)
[docs]
def list_files_page(
self,
*,
assistant_name: str,
pagination_token: str | None = None,
filter: dict[str, Any] | None = None,
) -> ListFilesResponse:
"""List one page of files for an assistant with explicit pagination control.
Only the parameters that are explicitly provided are sent in the
request. Omitted parameters are not included as query params.
Args:
assistant_name: Name of the assistant whose files to list.
pagination_token: Token from a previous response to fetch the
next page.
filter: Optional metadata filter expression. Serialized to a JSON
string before being sent to the API.
Returns:
:class:`ListFilesResponse` with a ``files`` list and an optional
``next`` continuation token.
Raises:
:exc:`ApiError`: If the API returns an error response.
Examples:
.. code-block:: python
page = pc.assistants.list_files_page(assistant_name="my-assistant")
names = [f.name for f in page.files]
token = page.next # use as pagination_token for the next call
"""
import json as _json
data_http = self._data_plane_http(assistant_name)
params: dict[str, str | int] = {}
if pagination_token is not None:
params["paginationToken"] = pagination_token
if filter is not None:
params["filter"] = _json.dumps(filter)
logger.info("Listing files page for assistant %r", assistant_name)
response = data_http.get(f"/files/{assistant_name}", params=params)
result = self._adapter.to_file_list(response.content)
logger.debug(
"Listed %d files for assistant %r (has_next=%s)",
len(result.files),
assistant_name,
result.next is not None,
)
return result
[docs]
def delete_file(
self,
*,
assistant_name: str,
file_id: str,
timeout: float | None = None,
) -> None:
"""Delete a file from a Pinecone assistant.
Sends a DELETE request, then polls every 5 seconds until the file is
confirmed gone (404 from describe_file). Other errors during polling
propagate immediately.
Args:
assistant_name: Name of the assistant that owns the file.
file_id: Unique identifier of the file to delete.
timeout: Seconds to wait for the file to be deleted. Use ``None``
(default) to poll indefinitely. Use ``-1`` to return
immediately without polling. Use a positive value to poll with
a deadline. Raises :exc:`PineconeTimeoutError` if the file
is not gone before the deadline.
Returns:
``None``
Raises:
:exc:`PineconeError`: If server-side file deletion fails.
:exc:`PineconeTimeoutError`: If the file still exists after
*timeout* seconds.
:exc:`ApiError`: If the API returns an error response.
Examples:
>>> pc.assistants.delete_file(
... assistant_name="my-assistant",
... file_id="file-abc123",
... )
"""
data_http = self._data_plane_http(assistant_name)
logger.info("Deleting file %r from assistant %r", file_id, assistant_name)
data_http.delete(f"/files/{assistant_name}/{file_id}")
logger.debug("Deleted file %r from assistant %r", file_id, assistant_name)
if timeout == -1:
return
start = time.monotonic()
while True:
try:
file_model = self.describe_file(assistant_name=assistant_name, file_id=file_id)
except NotFoundError:
return
if file_model.status not in ("Deleting", None):
error_msg = file_model.error_message or "Unknown deletion error"
raise PineconeError(f"File deletion failed for '{file_id}': {error_msg}")
if timeout is not None:
elapsed = time.monotonic() - start
if elapsed >= timeout:
raise PineconeTimeoutError(f"File '{file_id}' still exists after {timeout}s")
time.sleep(_DELETE_POLL_INTERVAL_SECONDS)
[docs]
def create(
self,
*,
name: str | None = None,
instructions: str | None = None,
metadata: dict[str, Any] | None = None,
region: str = "us",
timeout: float | None = None,
**kwargs: Any,
) -> AssistantModel:
"""Create a new Pinecone assistant.
Creates an assistant and optionally polls until it reaches ``"Ready"``
status. The assistant starts in ``"Initializing"`` status.
Args:
name (str): Name for the new assistant. Must be 1-63 characters,
start and end with an alphanumeric character, and consist only
of lowercase alphanumeric characters or hyphens.
instructions (str | None): Optional directive for the assistant to
apply to all responses. Maximum 16 KB.
metadata (dict[str, Any] | None): Optional metadata dictionary.
Defaults to an empty dict if not provided.
region (str): Region to deploy the assistant in. Must be ``"us"``
or ``"eu"`` (case-sensitive). Defaults to ``"us"``.
timeout (float | None): Seconds to wait for the assistant to become
ready. Use ``None`` (default) to poll indefinitely. Use ``-1``
to return immediately without polling. Use ``0`` or a positive
value to poll with a deadline. Raises
:exc:`PineconeTimeoutError` if the assistant is not ready
before the deadline.
Returns:
:class:`AssistantModel` describing the created assistant.
Raises:
:exc:`PineconeValueError`: If *region* is not ``"us"`` or ``"eu"``.
:exc:`PineconeTimeoutError`: If the assistant does not become ready
before the deadline.
:exc:`ApiError`: If the API returns an error response.
Examples:
>>> from pinecone import Pinecone
>>> pc = Pinecone(api_key="your-api-key")
>>> assistant = pc.assistants.create(name="my-assistant") # doctest: +SKIP
>>> assistant = pc.assistants.create( # doctest: +SKIP
... name="research-assistant",
... instructions="You are a helpful research assistant.",
... metadata={"team": "engineering", "version": "1"},
... region="eu",
... )
"""
from pinecone._internal.kwargs_aliases import (
reject_unknown_kwargs,
remap_legacy_kwargs,
)
remapped = remap_legacy_kwargs(
kwargs,
aliases={"assistant_name": "name"},
method_name="create",
)
reject_unknown_kwargs(remapped, allowed={"name"}, method_name="create")
if "name" in remapped:
if name is not None:
raise PineconeValueError(
"create() received both 'assistant_name' (legacy) and 'name'. "
"Pass only one — prefer 'name'."
)
name = remapped["name"]
if name is None:
raise PineconeValueError(
"create() missing required argument: 'name' (or legacy alias 'assistant_name')."
)
if region not in _VALID_REGIONS:
raise PineconeValueError(f"region must be one of {_VALID_REGIONS!r}, got {region!r}")
body: dict[str, Any] = {
"name": name,
"instructions": instructions,
"metadata": metadata if metadata is not None else {},
"region": region,
}
logger.info("Creating assistant %r", name)
response = self._http.post("/assistants", json=body)
model = self._attach_ref(self._adapter.to_assistant(response.content))
logger.debug("Created assistant %r (status=%s)", name, model.status)
if timeout == -1:
return model
return self._poll_until_ready(name, timeout)
[docs]
def describe(self, *, name: str | None = None, **kwargs: Any) -> AssistantModel:
"""Get detailed information about a named assistant.
Args:
name (str): The name of the assistant to describe.
Returns:
:class:`AssistantModel` with name, status, created_at, updated_at,
metadata, instructions, and host.
Raises:
:exc:`ApiError`: If the API returns an error response (e.g. 404
when the assistant does not exist).
Examples:
>>> assistant = pc.assistants.describe(name="my-assistant")
>>> assistant.status # doctest: +SKIP
'Ready'
"""
from pinecone._internal.kwargs_aliases import (
reject_unknown_kwargs,
remap_legacy_kwargs,
)
remapped = remap_legacy_kwargs(
kwargs,
aliases={"assistant_name": "name"},
method_name="describe",
)
reject_unknown_kwargs(remapped, allowed={"name"}, method_name="describe")
if "name" in remapped:
if name is not None:
raise PineconeValueError(
"describe() received both 'assistant_name' (legacy) and 'name'. "
"Pass only one — prefer 'name'."
)
name = remapped["name"]
if name is None:
raise PineconeValueError(
"describe() missing required argument: 'name' (or legacy alias 'assistant_name')."
)
logger.info("Describing assistant %r", name)
response = self._http.get(f"/assistants/{name}")
model = self._attach_ref(self._adapter.to_assistant(response.content))
logger.debug("Described assistant %r (status=%s)", name, model.status)
return model
[docs]
def list(
self,
*,
limit: int | None = None,
pagination_token: str | None = None,
) -> Paginator[AssistantModel]:
"""List assistants in the project with transparent lazy pagination.
Args:
limit (int | None): Maximum number of assistants to yield across
all pages. ``None`` (default) yields all assistants.
pagination_token (str | None): Token to resume pagination from a
previous call.
Returns:
:class:`Paginator` over :class:`AssistantModel` objects. Supports
``for`` loops, ``.to_list()``, ``.pages()``, and ``limit``.
Raises:
:exc:`ApiError`: If the API returns an error response.
Examples:
.. code-block:: python
for a in pc.assistants.list():
print(a.name, a.status)
all_assistants = pc.assistants.list().to_list()
"""
logger.info("Listing assistants")
def fetch_page(token: str | None) -> Page[AssistantModel]:
result = self.list_page(pagination_token=token)
return Page(items=result.assistants, pagination_token=result.next)
return Paginator(fetch_page=fetch_page, initial_token=pagination_token, limit=limit)
[docs]
def list_page(
self,
*,
page_size: int | None = None,
pagination_token: str | None = None,
**kwargs: Any,
) -> ListAssistantsResponse:
"""List one page of assistants with explicit pagination control.
Only the parameters that are explicitly provided are sent in the
request. Omitted parameters are not included as query params.
Args:
page_size (int | None): Maximum number of assistants per page.
Only sent when explicitly provided.
pagination_token (str | None): Token from a previous response
to fetch the next page.
Returns:
:class:`ListAssistantsResponse` with an ``assistants`` list and
an optional ``next`` continuation token.
Raises:
:exc:`ApiError`: If the API returns an error response.
Examples:
.. code-block:: python
page = pc.assistants.list_page(page_size=10)
names = [a.name for a in page.assistants]
token = page.next # use as pagination_token for the next call
"""
from pinecone._internal.kwargs_aliases import (
reject_unknown_kwargs,
remap_legacy_kwargs,
)
remapped = remap_legacy_kwargs(
kwargs,
aliases={"limit": "page_size"},
method_name="list_page",
)
reject_unknown_kwargs(remapped, allowed={"page_size"}, method_name="list_page")
if "page_size" in remapped:
if page_size is not None:
raise PineconeValueError(
"list_page() received both 'limit' (legacy) and 'page_size'. "
"Pass only one — prefer 'page_size'."
)
page_size = remapped["page_size"]
params: dict[str, str | int] = {}
if page_size is not None:
params["pageSize"] = page_size
if pagination_token is not None:
params["paginationToken"] = pagination_token
logger.info("Listing assistants page")
response = self._http.get("/assistants", params=params)
result = self._adapter.to_assistant_list(response.content)
for item in result.assistants:
self._attach_ref(item)
logger.debug(
"Listed %d assistants (has_next=%s)",
len(result.assistants),
result.next is not None,
)
return result
[docs]
def update(
self,
*,
name: str | None = None,
instructions: str | None = None,
metadata: dict[str, Any] | None = None,
**kwargs: Any,
) -> AssistantModel:
"""Update an existing Pinecone assistant.
Updates the specified assistant's instructions and/or metadata.
Metadata is fully replaced (not merged) when provided.
Args:
name (str): The name of the assistant to update.
instructions (str | None): New instructions for the assistant.
Pass an empty string to clear existing instructions.
metadata (dict[str, Any] | None): New metadata dictionary. Fully
replaces any existing metadata rather than merging.
Returns:
:class:`AssistantModel` describing the updated assistant.
Raises:
:exc:`ApiError`: If the API returns an error response (e.g. 404
when the assistant does not exist).
Examples:
>>> assistant = pc.assistants.update( # doctest: +SKIP
... name="my-assistant",
... instructions="You are a helpful research assistant.",
... )
>>> assistant = pc.assistants.update( # doctest: +SKIP
... name="my-assistant",
... metadata={"team": "ml", "version": "2"},
... )
"""
from pinecone._internal.kwargs_aliases import (
reject_unknown_kwargs,
remap_legacy_kwargs,
)
remapped = remap_legacy_kwargs(
kwargs,
aliases={"assistant_name": "name"},
method_name="update",
)
reject_unknown_kwargs(remapped, allowed={"name"}, method_name="update")
if "name" in remapped:
if name is not None:
raise PineconeValueError(
"update() received both 'assistant_name' (legacy) and 'name'. "
"Pass only one — prefer 'name'."
)
name = remapped["name"]
if name is None:
raise PineconeValueError(
"update() missing required argument: 'name' (or legacy alias 'assistant_name')."
)
body: dict[str, Any] = {}
if instructions is not None:
body["instructions"] = instructions
if metadata is not None:
body["metadata"] = metadata
logger.info("Updating assistant %r", name)
response = self._http.patch(f"/assistants/{name}", json=body)
model = self._attach_ref(self._adapter.to_assistant(response.content))
logger.debug("Updated assistant %r", name)
return model
[docs]
def delete(
self,
*,
name: str | None = None,
timeout: float | None = None,
**kwargs: Any,
) -> None:
"""Delete a Pinecone assistant by name.
Sends a DELETE request, then polls every 5 seconds until the
assistant is confirmed gone (404 from describe). Other errors
during polling propagate immediately.
Args:
name (str): The name of the assistant to delete.
timeout (float | None): Seconds to wait for the assistant to
disappear. Use ``None`` (default) to poll indefinitely.
Use ``-1`` to return immediately without polling.
Use a positive value to poll with a deadline. Raises
:exc:`PineconeTimeoutError` if the assistant is not gone
before the deadline.
Returns:
None
Raises:
:exc:`PineconeTimeoutError`: If the assistant still exists after
*timeout* seconds.
:exc:`ApiError`: If the API returns an error response.
Examples:
.. code-block:: python
pc.assistants.delete(name="my-assistant")
# Return immediately without waiting for deletion
pc.assistants.delete(name="my-assistant", timeout=-1)
"""
from pinecone._internal.kwargs_aliases import (
reject_unknown_kwargs,
remap_legacy_kwargs,
)
remapped = remap_legacy_kwargs(
kwargs,
aliases={"assistant_name": "name"},
method_name="delete",
)
reject_unknown_kwargs(remapped, allowed={"name"}, method_name="delete")
if "name" in remapped:
if name is not None:
raise PineconeValueError(
"delete() received both 'assistant_name' (legacy) and 'name'. "
"Pass only one — prefer 'name'."
)
name = remapped["name"]
if name is None:
raise PineconeValueError(
"delete() missing required argument: 'name' (or legacy alias 'assistant_name')."
)
logger.info("Deleting assistant %r", name)
self._http.delete(f"/assistants/{name}")
logger.debug("Deleted assistant %r", name)
if timeout == -1:
return
start = time.monotonic()
while True:
try:
self.describe(name=name)
except NotFoundError:
return
if timeout is not None:
elapsed = time.monotonic() - start
if elapsed >= timeout:
raise PineconeTimeoutError(f"Assistant '{name}' still exists after {timeout}s")
time.sleep(_DELETE_POLL_INTERVAL_SECONDS)
[docs]
def context(
self,
*,
assistant_name: str,
query: str | None = None,
messages: builtins.list[Message | dict[str, str]] | None = None,
filter: dict[str, Any] | None = None,
top_k: int | None = None,
snippet_size: int | None = None,
multimodal: bool | None = None,
include_binary_content: bool | None = None,
) -> ContextResponse:
"""Retrieve relevant context snippets from a Pinecone assistant.
Retrieves context snippets matching a text query or conversation
history. Exactly one of *query* or *messages* must be provided
and non-empty.
Args:
assistant_name: Name of the assistant to retrieve context from.
query: Text query to use for context retrieval. Mutually exclusive
with *messages*. Empty string is treated as not provided.
messages: Conversation messages to use for context retrieval.
Mutually exclusive with *query*. Empty list is treated as not
provided. Dicts are converted to :class:`Message` objects.
filter: Metadata filter restricting which documents contribute
context. Omitted from request when ``None``.
top_k: Maximum number of context snippets to return. Omitted
from request when ``None``.
snippet_size: Maximum snippet size in tokens. Omitted from
request when ``None``.
multimodal: Whether to include image-related context snippets.
Omitted from request when ``None``.
include_binary_content: Whether image snippets include base64
image data. Only meaningful when *multimodal* is ``True``.
Omitted from request when ``None``.
Returns:
:class:`ContextResponse` containing the matching context snippets.
Raises:
:exc:`PineconeValueError`: If both or neither of *query* and
*messages* are provided (or if they are empty).
:exc:`ApiError`: If the API returns an error response.
Examples:
.. code-block:: python
response = pc.assistants.context(
assistant_name="my-assistant",
query="What is Pinecone?",
)
for snippet in response.snippets:
print(snippet.content)
"""
query_truthy = query is not None and query != ""
messages_truthy = messages is not None and len(messages) > 0
if query_truthy and messages_truthy:
raise PineconeValueError("Exactly one of query or messages must be provided, not both.")
if not query_truthy and not messages_truthy:
raise PineconeValueError("Exactly one of query or messages must be provided.")
body: dict[str, Any] = {}
if query_truthy:
body["query"] = query
else:
if messages is None:
raise PineconeValueError("Exactly one of query or messages must be provided.")
parsed: list[Message] = [
m if isinstance(m, Message) else Message.from_dict(m) for m in messages
]
body["messages"] = [{"role": m.role, "content": m.content} for m in parsed]
if filter is not None:
body["filter"] = filter
if top_k is not None:
body["top_k"] = top_k
if snippet_size is not None:
body["snippet_size"] = snippet_size
if multimodal is not None:
body["multimodal"] = multimodal
if include_binary_content is not None:
body["include_binary_content"] = include_binary_content
http = self._data_plane_http(assistant_name)
response = http.post(f"/chat/{assistant_name}/context", json=body)
return self._adapter.to_context_response(response.content)
[docs]
def chat(
self,
*,
assistant_name: str,
messages: builtins.list[Message | dict[str, str]],
model: str = "gpt-4o",
stream: bool = False,
temperature: float | None = None,
filter: dict[str, Any] | None = None,
json_response: bool = False,
include_highlights: bool = False,
context_options: ContextOptions | dict[str, Any] | None = None,
) -> ChatResponse | ChatStream:
"""Chat with an assistant and receive citations in Pinecone-native format.
Args:
assistant_name (str): Name of the assistant to chat with.
messages (list[Message | dict[str, str]]): Conversation messages.
Dicts are converted to :class:`Message` objects; role defaults
to ``"user"`` when not present.
model (str): Large language model to use. Defaults to ``"gpt-4o"``.
stream (bool): If ``True``, return a :class:`ChatStream`. Defaults
to ``False``.
temperature (float | None): Controls randomness. Lower values produce
more deterministic responses. Omitted from request when ``None``.
filter (dict[str, Any] | None): Metadata filter restricting which
documents are used as context. Omitted from request when ``None``.
json_response (bool): If ``True``, instruct the assistant to return
a JSON response. Cannot be used with streaming.
include_highlights (bool): If ``True``, include highlight snippets
from referenced documents in citations.
context_options (ContextOptions | dict[str, Any] | None): Options
controlling context retrieval. Omitted from request when ``None``.
Returns:
:class:`ChatResponse` for non-streaming requests, or a
:class:`ChatStream` for streaming requests.
Raises:
:exc:`PineconeValueError`: If both ``stream=True`` and
``json_response=True`` are specified.
:exc:`ApiError`: If the API returns an error response.
Examples:
.. code-block:: python
from pinecone import Pinecone
pc = Pinecone(api_key="your-api-key")
response = pc.assistants.chat(
assistant_name="my-assistant",
messages=[{"content": "What is Pinecone?"}],
)
.. code-block:: python
stream = pc.assistants.chat(
assistant_name="my-assistant",
messages=[{"content": "What is Pinecone?"}],
stream=True,
)
for text in stream.text():
print(text, end="", flush=True)
"""
if stream and json_response:
raise PineconeValueError("json_response cannot be used with stream=True")
parsed: list[Message] = [
m if isinstance(m, Message) else Message.from_dict(m) for m in messages
]
body: dict[str, Any] = {
"messages": [{"role": m.role, "content": m.content} for m in parsed],
"model": model,
"stream": stream,
}
if temperature is not None:
body["temperature"] = temperature
if filter is not None:
body["filter"] = filter
if json_response:
body["json_response"] = json_response
# Streaming requests always include include_highlights (defaults to False)
if stream or include_highlights:
body["include_highlights"] = include_highlights
if context_options is not None:
if isinstance(context_options, dict):
body["context_options"] = context_options
else:
body["context_options"] = {
k: v
for k, v in msgspec.structs.asdict(context_options).items()
if v is not None
}
http = self._data_plane_http(assistant_name)
if stream:
return ChatStream(
self._chat_streaming(http=http, url=f"/chat/{assistant_name}", body=body)
)
response = http.post(f"/chat/{assistant_name}", json=body)
return self._adapter.to_chat_response(response.content)
[docs]
def chat_completions(
self,
*,
assistant_name: str,
messages: builtins.list[Message | dict[str, str]],
model: str = "gpt-4o",
stream: bool = False,
temperature: float | None = None,
filter: dict[str, Any] | None = None,
) -> ChatCompletionResponse | ChatCompletionStream:
"""Chat with an assistant using an OpenAI-compatible interface.
Returns responses in OpenAI chat completion format. Useful when you
need inline citations or OpenAI-compatible responses. Has limited
functionality compared to the standard :meth:`chat` interface — does
not support ``include_highlights``, ``context_options``, or
``json_response`` parameters.
The model parameter accepts any string value and is not validated
client-side. Known models include ``"gpt-4o"``, ``"gpt-4.1"``,
``"o4-mini"``, ``"claude-3-5-sonnet"``, ``"claude-3-7-sonnet"``,
and ``"gemini-2.5-pro"``.
Args:
assistant_name (str): Name of the assistant to chat with.
messages (list[Message | dict[str, str]]): Conversation messages.
Dicts are converted to :class:`Message` objects; role defaults
to ``"user"`` when not present.
model (str): Large language model to use. Defaults to ``"gpt-4o"``.
Not validated client-side — any string is accepted.
stream (bool): If ``True``, return a :class:`ChatCompletionStream`.
Defaults to ``False``.
temperature (float | None): Controls randomness. Lower values produce
more deterministic responses. Omitted from request when ``None``.
filter (dict[str, Any] | None): Metadata filter restricting which
documents are used as context. Omitted from request when ``None``.
Returns:
:class:`ChatCompletionResponse` for non-streaming requests, or a
:class:`ChatCompletionStream` for streaming requests.
Raises:
:exc:`ApiError`: If the API returns an error response.
Examples:
.. code-block:: python
from pinecone import Pinecone
pc = Pinecone(api_key="your-api-key")
response = pc.assistants.chat_completions(
assistant_name="research-assistant",
messages=[{"content": "Explain quantum entanglement briefly."}],
)
response.choices[0].message.content
.. code-block:: python
stream = pc.assistants.chat_completions(
assistant_name="research-assistant",
messages=[{"content": "Explain quantum entanglement briefly."}],
stream=True,
)
for chunk in stream:
print(chunk)
"""
parsed: list[Message] = [
m if isinstance(m, Message) else Message.from_dict(m) for m in messages
]
body: dict[str, Any] = {
"messages": [{"role": m.role, "content": m.content} for m in parsed],
"model": model,
"stream": stream,
}
if temperature is not None:
body["temperature"] = temperature
if filter is not None:
body["filter"] = filter
http = self._data_plane_http(assistant_name)
if stream:
url = f"/chat/{assistant_name}/chat/completions"
return ChatCompletionStream(
self._chat_completions_streaming(http=http, url=url, body=body)
)
response = http.post(f"/chat/{assistant_name}/chat/completions", json=body)
return self._adapter.to_chat_completion_response(response.content)
def _chat_streaming(
self,
*,
http: HTTPClient,
url: str,
body: dict[str, Any],
) -> Iterator[ChatStreamChunk]:
"""Stream Pinecone-native chat chunks via SSE.
POSTs to the given URL with ``stream=True`` in the body,
parses each SSE line, and yields typed chunk objects dispatched by the
``type`` field.
Args:
http: Pre-resolved data-plane HTTP client for the assistant.
url: Request URL path (e.g. ``/chat/{assistant_name}``).
body: Pre-built request body (must include ``stream=True``).
Yields:
:class:`StreamMessageStart`, :class:`StreamContentChunk`,
:class:`StreamCitationChunk`, or :class:`StreamMessageEnd`
depending on the ``type`` field of each SSE chunk.
Raises:
:exc:`ApiError`: If the server returns an HTTP error.
"""
with http.stream(
"POST",
url,
content=orjson.dumps(body),
headers={"Content-Type": "application/json"},
) as response:
for line in response.iter_lines():
if not line:
continue
if not line.startswith("data:"):
continue
line = line[5:].lstrip()
if not line:
continue
if line == "[DONE]":
break
chunk_data: dict[str, Any] = orjson.loads(line)
try:
yield msgspec.convert(chunk_data, ChatStreamChunk)
except msgspec.ValidationError:
logger.debug("Skipping unknown chunk type: %s", chunk_data.get("type"))
def _chat_completions_streaming(
self,
*,
http: HTTPClient,
url: str,
body: dict[str, Any],
) -> Iterator[ChatCompletionStreamChunk]:
"""Stream OpenAI-compatible chat completion chunks via SSE.
POSTs to the given URL with ``stream=True`` in the body and yields each
SSE line parsed as a :class:`ChatCompletionStreamChunk`.
Args:
http: Pre-resolved data-plane HTTP client for the assistant.
url: Request URL path (e.g. ``/chat/{assistant_name}/chat/completions``).
body: Pre-built request body (must include ``stream=True``).
Yields:
:class:`ChatCompletionStreamChunk` for each non-empty SSE line.
Raises:
:exc:`ApiError`: If the server returns an HTTP error.
"""
with http.stream(
"POST",
url,
content=orjson.dumps(body),
headers={"Content-Type": "application/json"},
) as response:
for line in response.iter_lines():
if not line:
continue
if not line.startswith("data:"):
continue
line = line[5:].lstrip()
if not line:
continue
if line == "[DONE]":
break
yield msgspec.convert(orjson.loads(line), ChatCompletionStreamChunk)
[docs]
def evaluate_alignment(
self,
*,
question: str,
answer: str,
ground_truth_answer: str,
) -> AlignmentResult:
"""Evaluate answer alignment against a ground truth answer.
Measures the correctness and completeness of a generated answer with
respect to a ground truth answer. Alignment is the harmonic mean of
correctness (precision) and completeness (recall).
Args:
question: The question for which the answer was generated.
answer: The generated answer to evaluate.
ground_truth_answer: The ground truth answer to compare against.
Returns:
:class:`AlignmentResult` with aggregate scores, per-fact entailment
results, and token usage statistics.
Raises:
:exc:`ApiError`: If the API returns an error response.
Examples:
>>> result = pc.assistants.evaluate_alignment(
... question="What is the capital of Spain?",
... answer="Barcelona.",
... ground_truth_answer="Madrid.",
... )
"""
body = {
"question": question,
"answer": answer,
"ground_truth_answer": ground_truth_answer,
}
logger.info("Evaluating alignment for question %r", question)
response = self._eval_http.post("/evaluation/metrics/alignment", json=body)
result = self._adapter.to_alignment_result(response.content)
logger.debug("Alignment evaluation complete (alignment=%.3f)", result.scores.alignment)
return result
def _poll_until_ready(self, name: str, timeout: float | None) -> AssistantModel:
"""Poll ``GET /assistants/{name}`` until status is ``"Ready"`` or timeout."""
start = time.monotonic()
while True:
response = self._http.get(f"/assistants/{name}")
model = self._attach_ref(self._adapter.to_assistant(response.content))
if model.status == "Ready":
return model
if model.status in ("Failed", "InitializationFailed"):
raise PineconeError(
f"Assistant '{name}' entered terminal state '{model.status}'. "
f"Check status with pc.assistants.describe(name='{name}')."
)
if timeout is not None:
elapsed = time.monotonic() - start
if elapsed >= timeout:
raise PineconeTimeoutError(
f"Assistant '{name}' not ready after {timeout}s. "
f"Check status with pc.assistants.describe(name='{name}')."
)
time.sleep(_CREATE_POLL_INTERVAL_SECONDS)