pinecone.grpc.retry
1import abc 2import logging 3import random 4import time 5from typing import Optional, Tuple, NamedTuple 6 7import grpc 8 9 10_logger = logging.getLogger(__name__) 11 12 13class SleepPolicy(abc.ABC): 14 @abc.abstractmethod 15 def sleep(self, try_i: int): 16 """ 17 How long to sleep in milliseconds. 18 :param try_i: the number of retry (starting from zero) 19 """ 20 assert try_i >= 0 21 22 23class ExponentialBackoff(SleepPolicy): 24 def __init__(self, *, init_backoff_ms: int, max_backoff_ms: int, multiplier: int): 25 self.init_backoff = random.randint(0, init_backoff_ms) 26 self.max_backoff = max_backoff_ms 27 self.multiplier = multiplier 28 29 def sleep(self, try_i: int): 30 sleep_range = min(self.init_backoff * self.multiplier**try_i, self.max_backoff) 31 sleep_ms = random.randint(0, sleep_range) 32 _logger.debug(f"gRPC retry. Sleeping for {sleep_ms}ms") 33 time.sleep(sleep_ms / 1000) 34 35 36class RetryOnRpcErrorClientInterceptor( 37 grpc.UnaryUnaryClientInterceptor, 38 grpc.UnaryStreamClientInterceptor, 39 grpc.StreamUnaryClientInterceptor, 40 grpc.StreamStreamClientInterceptor, 41): 42 """gRPC retry. 43 44 Referece: https://github.com/grpc/grpc/issues/19514#issuecomment-531700657 45 """ 46 47 def __init__(self, retry_config: "RetryConfig"): 48 self.max_attempts = retry_config.max_attempts 49 self.sleep_policy = retry_config.sleep_policy 50 self.retryable_status = retry_config.retryable_status 51 52 def _is_retryable_error(self, response_or_error): 53 """Determine if a response is a retryable error.""" 54 return ( 55 isinstance(response_or_error, grpc.RpcError) 56 and "_MultiThreadedRendezvous" not in response_or_error.__class__.__name__ 57 and response_or_error.code() in self.retryable_status 58 ) 59 60 def _intercept_call(self, continuation, client_call_details, request_or_iterator): 61 response = None 62 for try_i in range(self.max_attempts): 63 response = continuation(client_call_details, request_or_iterator) 64 if not self._is_retryable_error(response): 65 break 66 self.sleep_policy.sleep(try_i) 67 return response 68 69 def intercept_unary_unary(self, continuation, client_call_details, request): 70 return self._intercept_call(continuation, client_call_details, request) 71 72 def intercept_unary_stream(self, continuation, client_call_details, request): 73 return self._intercept_call(continuation, client_call_details, request) 74 75 def intercept_stream_unary(self, continuation, client_call_details, request_iterator): 76 return self._intercept_call(continuation, client_call_details, request_iterator) 77 78 def intercept_stream_stream(self, continuation, client_call_details, request_iterator): 79 return self._intercept_call(continuation, client_call_details, request_iterator) 80 81 82class RetryConfig(NamedTuple): 83 """Config settings related to retry""" 84 85 max_attempts: int = 4 86 sleep_policy: SleepPolicy = ExponentialBackoff(init_backoff_ms=100, max_backoff_ms=1600, multiplier=2) 87 retryable_status: Optional[Tuple[grpc.StatusCode, ...]] = (grpc.StatusCode.UNAVAILABLE,)
14class SleepPolicy(abc.ABC): 15 @abc.abstractmethod 16 def sleep(self, try_i: int): 17 """ 18 How long to sleep in milliseconds. 19 :param try_i: the number of retry (starting from zero) 20 """ 21 assert try_i >= 0
Helper class that provides a standard way to create an ABC using inheritance.
15 @abc.abstractmethod 16 def sleep(self, try_i: int): 17 """ 18 How long to sleep in milliseconds. 19 :param try_i: the number of retry (starting from zero) 20 """ 21 assert try_i >= 0
How long to sleep in milliseconds.
Parameters
- try_i: the number of retry (starting from zero)
24class ExponentialBackoff(SleepPolicy): 25 def __init__(self, *, init_backoff_ms: int, max_backoff_ms: int, multiplier: int): 26 self.init_backoff = random.randint(0, init_backoff_ms) 27 self.max_backoff = max_backoff_ms 28 self.multiplier = multiplier 29 30 def sleep(self, try_i: int): 31 sleep_range = min(self.init_backoff * self.multiplier**try_i, self.max_backoff) 32 sleep_ms = random.randint(0, sleep_range) 33 _logger.debug(f"gRPC retry. Sleeping for {sleep_ms}ms") 34 time.sleep(sleep_ms / 1000)
Helper class that provides a standard way to create an ABC using inheritance.
30 def sleep(self, try_i: int): 31 sleep_range = min(self.init_backoff * self.multiplier**try_i, self.max_backoff) 32 sleep_ms = random.randint(0, sleep_range) 33 _logger.debug(f"gRPC retry. Sleeping for {sleep_ms}ms") 34 time.sleep(sleep_ms / 1000)
How long to sleep in milliseconds.
Parameters
- try_i: the number of retry (starting from zero)
37class RetryOnRpcErrorClientInterceptor( 38 grpc.UnaryUnaryClientInterceptor, 39 grpc.UnaryStreamClientInterceptor, 40 grpc.StreamUnaryClientInterceptor, 41 grpc.StreamStreamClientInterceptor, 42): 43 """gRPC retry. 44 45 Referece: https://github.com/grpc/grpc/issues/19514#issuecomment-531700657 46 """ 47 48 def __init__(self, retry_config: "RetryConfig"): 49 self.max_attempts = retry_config.max_attempts 50 self.sleep_policy = retry_config.sleep_policy 51 self.retryable_status = retry_config.retryable_status 52 53 def _is_retryable_error(self, response_or_error): 54 """Determine if a response is a retryable error.""" 55 return ( 56 isinstance(response_or_error, grpc.RpcError) 57 and "_MultiThreadedRendezvous" not in response_or_error.__class__.__name__ 58 and response_or_error.code() in self.retryable_status 59 ) 60 61 def _intercept_call(self, continuation, client_call_details, request_or_iterator): 62 response = None 63 for try_i in range(self.max_attempts): 64 response = continuation(client_call_details, request_or_iterator) 65 if not self._is_retryable_error(response): 66 break 67 self.sleep_policy.sleep(try_i) 68 return response 69 70 def intercept_unary_unary(self, continuation, client_call_details, request): 71 return self._intercept_call(continuation, client_call_details, request) 72 73 def intercept_unary_stream(self, continuation, client_call_details, request): 74 return self._intercept_call(continuation, client_call_details, request) 75 76 def intercept_stream_unary(self, continuation, client_call_details, request_iterator): 77 return self._intercept_call(continuation, client_call_details, request_iterator) 78 79 def intercept_stream_stream(self, continuation, client_call_details, request_iterator): 80 return self._intercept_call(continuation, client_call_details, request_iterator)
gRPC retry.
Referece: https://github.com/grpc/grpc/issues/19514#issuecomment-531700657
70 def intercept_unary_unary(self, continuation, client_call_details, request): 71 return self._intercept_call(continuation, client_call_details, request)
Intercepts a unary-unary invocation asynchronously.
Arguments:
- continuation: A function that proceeds with the invocation by
executing the next interceptor in chain or invoking the
actual RPC on the underlying Channel. It is the interceptor's
responsibility to call it if it decides to move the RPC forward.
The interceptor can use
response_future = continuation(client_call_details, request)
to continue with the RPC.continuation
returns an object that is both a Call for the RPC and a Future. In the event of RPC completion, the return Call-Future's result value will be the response message of the RPC. Should the event terminate with non-OK status, the returned Call-Future's exception value will be an RpcError. - client_call_details: A ClientCallDetails object describing the outgoing RPC.
- request: The request value for the RPC.
Returns:
An object that is both a Call for the RPC and a Future. In the event of RPC completion, the return Call-Future's result value will be the response message of the RPC. Should the event terminate with non-OK status, the returned Call-Future's exception value will be an RpcError.
73 def intercept_unary_stream(self, continuation, client_call_details, request): 74 return self._intercept_call(continuation, client_call_details, request)
Intercepts a unary-stream invocation.
Arguments:
- continuation: A function that proceeds with the invocation by
executing the next interceptor in chain or invoking the
actual RPC on the underlying Channel. It is the interceptor's
responsibility to call it if it decides to move the RPC forward.
The interceptor can use
response_iterator = continuation(client_call_details, request)
to continue with the RPC.continuation
returns an object that is both a Call for the RPC and an iterator for response values. Drawing response values from the returned Call-iterator may raise RpcError indicating termination of the RPC with non-OK status. - client_call_details: A ClientCallDetails object describing the outgoing RPC.
- request: The request value for the RPC.
Returns:
An object that is both a Call for the RPC and an iterator of response values. Drawing response values from the returned Call-iterator may raise RpcError indicating termination of the RPC with non-OK status. This object should also fulfill the Future interface, though it may not.
76 def intercept_stream_unary(self, continuation, client_call_details, request_iterator): 77 return self._intercept_call(continuation, client_call_details, request_iterator)
Intercepts a stream-unary invocation asynchronously.
Arguments:
- continuation: A function that proceeds with the invocation by
executing the next interceptor in chain or invoking the
actual RPC on the underlying Channel. It is the interceptor's
responsibility to call it if it decides to move the RPC forward.
The interceptor can use
response_future = continuation(client_call_details, request_iterator)
to continue with the RPC.continuation
returns an object that is both a Call for the RPC and a Future. In the event of RPC completion, the return Call-Future's result value will be the response message of the RPC. Should the event terminate with non-OK status, the returned Call-Future's exception value will be an RpcError. - client_call_details: A ClientCallDetails object describing the outgoing RPC.
- request_iterator: An iterator that yields request values for the RPC.
Returns:
An object that is both a Call for the RPC and a Future. In the event of RPC completion, the return Call-Future's result value will be the response message of the RPC. Should the event terminate with non-OK status, the returned Call-Future's exception value will be an RpcError.
79 def intercept_stream_stream(self, continuation, client_call_details, request_iterator): 80 return self._intercept_call(continuation, client_call_details, request_iterator)
Intercepts a stream-stream invocation.
Arguments:
- continuation: A function that proceeds with the invocation by
executing the next interceptor in chain or invoking the
actual RPC on the underlying Channel. It is the interceptor's
responsibility to call it if it decides to move the RPC forward.
The interceptor can use
response_iterator = continuation(client_call_details, request_iterator)
to continue with the RPC.continuation
returns an object that is both a Call for the RPC and an iterator for response values. Drawing response values from the returned Call-iterator may raise RpcError indicating termination of the RPC with non-OK status. - client_call_details: A ClientCallDetails object describing the outgoing RPC.
- request_iterator: An iterator that yields request values for the RPC.
Returns:
An object that is both a Call for the RPC and an iterator of response values. Drawing response values from the returned Call-iterator may raise RpcError indicating termination of the RPC with non-OK status. This object should also fulfill the Future interface, though it may not.
83class RetryConfig(NamedTuple): 84 """Config settings related to retry""" 85 86 max_attempts: int = 4 87 sleep_policy: SleepPolicy = ExponentialBackoff(init_backoff_ms=100, max_backoff_ms=1600, multiplier=2) 88 retryable_status: Optional[Tuple[grpc.StatusCode, ...]] = (grpc.StatusCode.UNAVAILABLE,)
Config settings related to retry
Create new instance of RetryConfig(max_attempts, sleep_policy, retryable_status)
Inherited Members
- builtins.tuple
- index
- count