Source code for pinecone.grpc.future
"""PineconeFuture — a thin wrapper around concurrent.futures.Future.
Provides SDK-specific timeout defaults and exception translation so that
callers get :class:`~pinecone.errors.PineconeTimeoutError` instead of the
stdlib ``TimeoutError`` when a result is not ready in time.
"""
from __future__ import annotations
from collections.abc import Callable
from concurrent.futures import Future
from concurrent.futures import TimeoutError as _FuturesTimeoutError
from typing import Any, TypeVar
from pinecone.errors.exceptions import PineconeTimeoutError
_T = TypeVar("_T")
_DEFAULT_TIMEOUT: float = 5.0
[docs]
class PineconeFuture(Future["_T"]):
"""Future returned by ``GrpcIndex.*_async()`` methods.
Wraps a :class:`concurrent.futures.Future` and is fully compatible with
:func:`concurrent.futures.as_completed` and
:func:`concurrent.futures.wait`.
The default :meth:`result` timeout is **5 seconds**. When the timeout
elapses, :class:`~pinecone.errors.PineconeTimeoutError` is raised with
the message ``"deadline exceeded"``.
Examples:
.. code-block:: python
from pinecone.grpc import GrpcIndex
idx = GrpcIndex(host="article-search-abc123.svc.pinecone.io", api_key="your-api-key")
future = idx.upsert_async(vectors=[("article-101", [0.012, -0.087, 0.153, ...])])
result = future.result() # blocks up to 5 seconds
result.upserted_count
# 1
.. code-block:: python
from concurrent.futures import as_completed
futures = [
idx.upsert_async(vectors=[("article-101", [0.012, -0.087, 0.153, ...])]),
idx.upsert_async(vectors=[("article-102", [0.045, 0.021, -0.064, ...])]),
]
for future in as_completed(futures):
print(future.result().upserted_count)
"""
[docs]
def __init__(self, underlying: Future[_T]) -> None:
# Do NOT call super().__init__() — we delegate everything to the
# underlying future. We *do* need the internal state that Future
# expects however, so we initialise ourselves as a bare Future and
# then wire up callbacks so our own state mirrors the underlying one.
super().__init__()
self._underlying = underlying
# Mirror terminal state from the underlying future into *self* so
# that concurrent.futures infrastructure (as_completed / wait) which
# inspects our internal condition/state sees the correct values.
self._underlying.add_done_callback(self._propagate_state)
# ------------------------------------------------------------------
# State propagation
# ------------------------------------------------------------------
def _propagate_state(self, _fut: Future[_T]) -> None:
"""Copy the terminal state of the underlying future into *self*."""
if self._underlying.cancelled():
# Mark ourselves cancelled so wait/as_completed see it.
super().cancel()
super().set_running_or_notify_cancel()
elif self._underlying.exception() is not None:
try:
super().set_exception(self._underlying.exception())
except Exception:
pass # already in terminal state
else:
try:
super().set_result(self._underlying.result(timeout=0))
except Exception:
pass # already in terminal state
# ------------------------------------------------------------------
# Public interface — delegates to the underlying future
# ------------------------------------------------------------------
[docs]
def result(self, timeout: float | None = _DEFAULT_TIMEOUT) -> _T:
"""Return the result of the call that the future represents.
Args:
timeout: Maximum seconds to wait. Defaults to 5.0.
Pass ``None`` to block indefinitely.
Returns:
The result value set by the underlying future.
Raises:
PineconeTimeoutError: If *timeout* seconds elapse before the
result is available.
Examples:
.. code-block:: python
future = idx.upsert_async(vectors=[("article-101", [0.012, -0.087, 0.153, ...])])
result = future.result()
result.upserted_count # 1
.. code-block:: python
future = idx.upsert_async(vectors=large_batch)
result = future.result(timeout=30.0)
.. code-block:: python
result = future.result(timeout=None)
"""
try:
return self._underlying.result(timeout=timeout)
except _FuturesTimeoutError:
raise PineconeTimeoutError("deadline exceeded") from None
[docs]
def exception(self, timeout: float | None = _DEFAULT_TIMEOUT) -> BaseException | None:
"""Return the exception raised by the call, or ``None``.
Args:
timeout: Maximum seconds to wait. Defaults to 5.0.
Raises:
PineconeTimeoutError: If *timeout* seconds elapse.
"""
try:
return self._underlying.exception(timeout=timeout)
except _FuturesTimeoutError:
raise PineconeTimeoutError("deadline exceeded") from None
[docs]
def cancel(self) -> bool:
"""Attempt to cancel the underlying call.
Returns ``True`` if the call was successfully cancelled, ``False``
if the call has already completed or is running.
"""
return self._underlying.cancel()
[docs]
def cancelled(self) -> bool:
"""Return ``True`` if the call was successfully cancelled."""
return self._underlying.cancelled()
[docs]
def done(self) -> bool:
"""Return ``True`` if the call has completed or was cancelled."""
return self._underlying.done()
[docs]
def running(self) -> bool:
"""Return ``True`` if the call is currently being executed."""
return self._underlying.running()
[docs]
def add_done_callback(self, fn: Callable[..., Any]) -> None:
"""Attach a callable to be called when the future finishes.
The callable will be called with the future as its only argument.
"""
self._underlying.add_done_callback(lambda _underlying: fn(self))