pinecone.grpc.retry

 1import abc
 2import logging
 3import random
 4import time
 5from typing import Optional, Tuple, NamedTuple
 6
 7import grpc
 8
 9
10_logger = logging.getLogger(__name__)
11
12
13class SleepPolicy(abc.ABC):
14    @abc.abstractmethod
15    def sleep(self, try_i: int):
16        """
17        How long to sleep in milliseconds.
18        :param try_i: the number of retry (starting from zero)
19        """
20        assert try_i >= 0
21
22
23class ExponentialBackoff(SleepPolicy):
24    def __init__(self, *, init_backoff_ms: int, max_backoff_ms: int, multiplier: int):
25        self.init_backoff = random.randint(0, init_backoff_ms)
26        self.max_backoff = max_backoff_ms
27        self.multiplier = multiplier
28
29    def sleep(self, try_i: int):
30        sleep_range = min(self.init_backoff * self.multiplier**try_i, self.max_backoff)
31        sleep_ms = random.randint(0, sleep_range)
32        _logger.debug(f"gRPC retry. Sleeping for {sleep_ms}ms")
33        time.sleep(sleep_ms / 1000)
34
35
36class RetryOnRpcErrorClientInterceptor(
37    grpc.UnaryUnaryClientInterceptor,
38    grpc.UnaryStreamClientInterceptor,
39    grpc.StreamUnaryClientInterceptor,
40    grpc.StreamStreamClientInterceptor,
41):
42    """gRPC retry.
43
44    Referece: https://github.com/grpc/grpc/issues/19514#issuecomment-531700657
45    """
46
47    def __init__(self, retry_config: "RetryConfig"):
48        self.max_attempts = retry_config.max_attempts
49        self.sleep_policy = retry_config.sleep_policy
50        self.retryable_status = retry_config.retryable_status
51
52    def _is_retryable_error(self, response_or_error):
53        """Determine if a response is a retryable error."""
54        return (
55            isinstance(response_or_error, grpc.RpcError)
56            and "_MultiThreadedRendezvous" not in response_or_error.__class__.__name__
57            and response_or_error.code() in self.retryable_status
58        )
59
60    def _intercept_call(self, continuation, client_call_details, request_or_iterator):
61        response = None
62        for try_i in range(self.max_attempts):
63            response = continuation(client_call_details, request_or_iterator)
64            if not self._is_retryable_error(response):
65                break
66            self.sleep_policy.sleep(try_i)
67        return response
68
69    def intercept_unary_unary(self, continuation, client_call_details, request):
70        return self._intercept_call(continuation, client_call_details, request)
71
72    def intercept_unary_stream(self, continuation, client_call_details, request):
73        return self._intercept_call(continuation, client_call_details, request)
74
75    def intercept_stream_unary(self, continuation, client_call_details, request_iterator):
76        return self._intercept_call(continuation, client_call_details, request_iterator)
77
78    def intercept_stream_stream(self, continuation, client_call_details, request_iterator):
79        return self._intercept_call(continuation, client_call_details, request_iterator)
80
81
82class RetryConfig(NamedTuple):
83    """Config settings related to retry"""
84
85    max_attempts: int = 4
86    sleep_policy: SleepPolicy = ExponentialBackoff(init_backoff_ms=100, max_backoff_ms=1600, multiplier=2)
87    retryable_status: Optional[Tuple[grpc.StatusCode, ...]] = (grpc.StatusCode.UNAVAILABLE,)
class SleepPolicy(abc.ABC):
14class SleepPolicy(abc.ABC):
15    @abc.abstractmethod
16    def sleep(self, try_i: int):
17        """
18        How long to sleep in milliseconds.
19        :param try_i: the number of retry (starting from zero)
20        """
21        assert try_i >= 0

Helper class that provides a standard way to create an ABC using inheritance.

@abc.abstractmethod
def sleep(self, try_i: int):
15    @abc.abstractmethod
16    def sleep(self, try_i: int):
17        """
18        How long to sleep in milliseconds.
19        :param try_i: the number of retry (starting from zero)
20        """
21        assert try_i >= 0

How long to sleep in milliseconds.

Parameters
  • try_i: the number of retry (starting from zero)
class ExponentialBackoff(SleepPolicy):
24class ExponentialBackoff(SleepPolicy):
25    def __init__(self, *, init_backoff_ms: int, max_backoff_ms: int, multiplier: int):
26        self.init_backoff = random.randint(0, init_backoff_ms)
27        self.max_backoff = max_backoff_ms
28        self.multiplier = multiplier
29
30    def sleep(self, try_i: int):
31        sleep_range = min(self.init_backoff * self.multiplier**try_i, self.max_backoff)
32        sleep_ms = random.randint(0, sleep_range)
33        _logger.debug(f"gRPC retry. Sleeping for {sleep_ms}ms")
34        time.sleep(sleep_ms / 1000)

Helper class that provides a standard way to create an ABC using inheritance.

ExponentialBackoff(*, init_backoff_ms: int, max_backoff_ms: int, multiplier: int)
25    def __init__(self, *, init_backoff_ms: int, max_backoff_ms: int, multiplier: int):
26        self.init_backoff = random.randint(0, init_backoff_ms)
27        self.max_backoff = max_backoff_ms
28        self.multiplier = multiplier
init_backoff
max_backoff
multiplier
def sleep(self, try_i: int):
30    def sleep(self, try_i: int):
31        sleep_range = min(self.init_backoff * self.multiplier**try_i, self.max_backoff)
32        sleep_ms = random.randint(0, sleep_range)
33        _logger.debug(f"gRPC retry. Sleeping for {sleep_ms}ms")
34        time.sleep(sleep_ms / 1000)

How long to sleep in milliseconds.

Parameters
  • try_i: the number of retry (starting from zero)
class RetryOnRpcErrorClientInterceptor(grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor, grpc.StreamUnaryClientInterceptor, grpc.StreamStreamClientInterceptor):
37class RetryOnRpcErrorClientInterceptor(
38    grpc.UnaryUnaryClientInterceptor,
39    grpc.UnaryStreamClientInterceptor,
40    grpc.StreamUnaryClientInterceptor,
41    grpc.StreamStreamClientInterceptor,
42):
43    """gRPC retry.
44
45    Referece: https://github.com/grpc/grpc/issues/19514#issuecomment-531700657
46    """
47
48    def __init__(self, retry_config: "RetryConfig"):
49        self.max_attempts = retry_config.max_attempts
50        self.sleep_policy = retry_config.sleep_policy
51        self.retryable_status = retry_config.retryable_status
52
53    def _is_retryable_error(self, response_or_error):
54        """Determine if a response is a retryable error."""
55        return (
56            isinstance(response_or_error, grpc.RpcError)
57            and "_MultiThreadedRendezvous" not in response_or_error.__class__.__name__
58            and response_or_error.code() in self.retryable_status
59        )
60
61    def _intercept_call(self, continuation, client_call_details, request_or_iterator):
62        response = None
63        for try_i in range(self.max_attempts):
64            response = continuation(client_call_details, request_or_iterator)
65            if not self._is_retryable_error(response):
66                break
67            self.sleep_policy.sleep(try_i)
68        return response
69
70    def intercept_unary_unary(self, continuation, client_call_details, request):
71        return self._intercept_call(continuation, client_call_details, request)
72
73    def intercept_unary_stream(self, continuation, client_call_details, request):
74        return self._intercept_call(continuation, client_call_details, request)
75
76    def intercept_stream_unary(self, continuation, client_call_details, request_iterator):
77        return self._intercept_call(continuation, client_call_details, request_iterator)
78
79    def intercept_stream_stream(self, continuation, client_call_details, request_iterator):
80        return self._intercept_call(continuation, client_call_details, request_iterator)
RetryOnRpcErrorClientInterceptor(retry_config: RetryConfig)
48    def __init__(self, retry_config: "RetryConfig"):
49        self.max_attempts = retry_config.max_attempts
50        self.sleep_policy = retry_config.sleep_policy
51        self.retryable_status = retry_config.retryable_status
max_attempts
sleep_policy
retryable_status
def intercept_unary_unary(self, continuation, client_call_details, request):
70    def intercept_unary_unary(self, continuation, client_call_details, request):
71        return self._intercept_call(continuation, client_call_details, request)

Intercepts a unary-unary invocation asynchronously.

Arguments:
  • continuation: A function that proceeds with the invocation by executing the next interceptor in chain or invoking the actual RPC on the underlying Channel. It is the interceptor's responsibility to call it if it decides to move the RPC forward. The interceptor can use response_future = continuation(client_call_details, request) to continue with the RPC. continuation returns an object that is both a Call for the RPC and a Future. In the event of RPC completion, the return Call-Future's result value will be the response message of the RPC. Should the event terminate with non-OK status, the returned Call-Future's exception value will be an RpcError.
  • client_call_details: A ClientCallDetails object describing the outgoing RPC.
  • request: The request value for the RPC.
Returns:

An object that is both a Call for the RPC and a Future. In the event of RPC completion, the return Call-Future's result value will be the response message of the RPC. Should the event terminate with non-OK status, the returned Call-Future's exception value will be an RpcError.

def intercept_unary_stream(self, continuation, client_call_details, request):
73    def intercept_unary_stream(self, continuation, client_call_details, request):
74        return self._intercept_call(continuation, client_call_details, request)

Intercepts a unary-stream invocation.

Arguments:
  • continuation: A function that proceeds with the invocation by executing the next interceptor in chain or invoking the actual RPC on the underlying Channel. It is the interceptor's responsibility to call it if it decides to move the RPC forward. The interceptor can use response_iterator = continuation(client_call_details, request) to continue with the RPC. continuation returns an object that is both a Call for the RPC and an iterator for response values. Drawing response values from the returned Call-iterator may raise RpcError indicating termination of the RPC with non-OK status.
  • client_call_details: A ClientCallDetails object describing the outgoing RPC.
  • request: The request value for the RPC.
Returns:

An object that is both a Call for the RPC and an iterator of response values. Drawing response values from the returned Call-iterator may raise RpcError indicating termination of the RPC with non-OK status. This object should also fulfill the Future interface, though it may not.

def intercept_stream_unary(self, continuation, client_call_details, request_iterator):
76    def intercept_stream_unary(self, continuation, client_call_details, request_iterator):
77        return self._intercept_call(continuation, client_call_details, request_iterator)

Intercepts a stream-unary invocation asynchronously.

Arguments:
  • continuation: A function that proceeds with the invocation by executing the next interceptor in chain or invoking the actual RPC on the underlying Channel. It is the interceptor's responsibility to call it if it decides to move the RPC forward. The interceptor can use response_future = continuation(client_call_details, request_iterator) to continue with the RPC. continuation returns an object that is both a Call for the RPC and a Future. In the event of RPC completion, the return Call-Future's result value will be the response message of the RPC. Should the event terminate with non-OK status, the returned Call-Future's exception value will be an RpcError.
  • client_call_details: A ClientCallDetails object describing the outgoing RPC.
  • request_iterator: An iterator that yields request values for the RPC.
Returns:

An object that is both a Call for the RPC and a Future. In the event of RPC completion, the return Call-Future's result value will be the response message of the RPC. Should the event terminate with non-OK status, the returned Call-Future's exception value will be an RpcError.

def intercept_stream_stream(self, continuation, client_call_details, request_iterator):
79    def intercept_stream_stream(self, continuation, client_call_details, request_iterator):
80        return self._intercept_call(continuation, client_call_details, request_iterator)

Intercepts a stream-stream invocation.

Arguments:
  • continuation: A function that proceeds with the invocation by executing the next interceptor in chain or invoking the actual RPC on the underlying Channel. It is the interceptor's responsibility to call it if it decides to move the RPC forward. The interceptor can use response_iterator = continuation(client_call_details, request_iterator) to continue with the RPC. continuation returns an object that is both a Call for the RPC and an iterator for response values. Drawing response values from the returned Call-iterator may raise RpcError indicating termination of the RPC with non-OK status.
  • client_call_details: A ClientCallDetails object describing the outgoing RPC.
  • request_iterator: An iterator that yields request values for the RPC.
Returns:

An object that is both a Call for the RPC and an iterator of response values. Drawing response values from the returned Call-iterator may raise RpcError indicating termination of the RPC with non-OK status. This object should also fulfill the Future interface, though it may not.

class RetryConfig(typing.NamedTuple):
83class RetryConfig(NamedTuple):
84    """Config settings related to retry"""
85
86    max_attempts: int = 4
87    sleep_policy: SleepPolicy = ExponentialBackoff(init_backoff_ms=100, max_backoff_ms=1600, multiplier=2)
88    retryable_status: Optional[Tuple[grpc.StatusCode, ...]] = (grpc.StatusCode.UNAVAILABLE,)

Config settings related to retry

RetryConfig( max_attempts: int = 4, sleep_policy: SleepPolicy = <ExponentialBackoff object>, retryable_status: Optional[Tuple[grpc.StatusCode, ...]] = (<StatusCode.UNAVAILABLE: (14, 'unavailable')>,))

Create new instance of RetryConfig(max_attempts, sleep_policy, retryable_status)

max_attempts: int

Alias for field number 0

sleep_policy: SleepPolicy

Alias for field number 1

retryable_status: Optional[Tuple[grpc.StatusCode, ...]]

Alias for field number 2

Inherited Members
builtins.tuple
index
count