pinecone.grpc.base
1import logging 2from abc import ABC, abstractmethod 3from functools import wraps 4from typing import Dict, Optional 5 6import certifi 7import grpc 8from grpc._channel import _InactiveRpcError, Channel 9import json 10 11from .retry import RetryConfig 12 13from pinecone import Config 14from .utils import _generate_request_id 15from .config import GRPCClientConfig 16from pinecone.utils.constants import MAX_MSG_SIZE, REQUEST_ID, CLIENT_VERSION 17from pinecone.utils.user_agent import get_user_agent_grpc 18from pinecone.exceptions.exceptions import PineconeException 19 20_logger = logging.getLogger(__name__) 21 22 23class GRPCIndexBase(ABC): 24 """ 25 Base class for grpc-based interaction with Pinecone indexes 26 """ 27 28 _pool = None 29 30 def __init__( 31 self, 32 index_name: str, 33 config: Config, 34 channel: Optional[Channel] = None, 35 grpc_config: Optional[GRPCClientConfig] = None, 36 _endpoint_override: Optional[str] = None, 37 ): 38 self.name = index_name 39 40 self.config = config 41 self.grpc_client_config = grpc_config or GRPCClientConfig() 42 self.retry_config = self.grpc_client_config.retry_config or RetryConfig() 43 44 self.fixed_metadata = { 45 "api-key": config.api_key, 46 "service-name": index_name, 47 "client-version": CLIENT_VERSION, 48 } 49 if self.grpc_client_config.additional_metadata: 50 self.fixed_metadata.update(self.grpc_client_config.additional_metadata) 51 52 self._endpoint_override = _endpoint_override 53 54 self.method_config = json.dumps( 55 { 56 "methodConfig": [ 57 { 58 "name": [{"service": "VectorService.Upsert"}], 59 "retryPolicy": { 60 "maxAttempts": 5, 61 "initialBackoff": "0.1s", 62 "maxBackoff": "1s", 63 "backoffMultiplier": 2, 64 "retryableStatusCodes": ["UNAVAILABLE"], 65 }, 66 }, 67 { 68 "name": [{"service": "VectorService"}], 69 "retryPolicy": { 70 "maxAttempts": 5, 71 "initialBackoff": "0.1s", 72 "maxBackoff": "1s", 73 "backoffMultiplier": 2, 74 "retryableStatusCodes": ["UNAVAILABLE"], 75 }, 76 }, 77 ] 78 } 79 ) 80 81 options = {"grpc.primary_user_agent": get_user_agent_grpc(config)} 82 self._channel = channel or self._gen_channel(options=options) 83 self.stub = self.stub_class(self._channel) 84 85 @property 86 @abstractmethod 87 def stub_class(self): 88 pass 89 90 def _endpoint(self): 91 grpc_host = self.config.host.replace("https://", "") 92 if ":" not in grpc_host: 93 grpc_host = f"{grpc_host}:443" 94 return self._endpoint_override if self._endpoint_override else grpc_host 95 96 def _gen_channel(self, options=None): 97 target = self._endpoint() 98 default_options = { 99 "grpc.max_send_message_length": MAX_MSG_SIZE, 100 "grpc.max_receive_message_length": MAX_MSG_SIZE, 101 "grpc.service_config": self.method_config, 102 "grpc.enable_retries": True, 103 "grpc.per_rpc_retry_buffer_size": MAX_MSG_SIZE, 104 } 105 if self.grpc_client_config.secure: 106 default_options["grpc.ssl_target_name_override"] = target.split(":")[0] 107 if self.config.proxy_url: 108 default_options["grpc.http_proxy"] = self.config.proxy_url 109 user_provided_options = options or {} 110 _options = tuple((k, v) for k, v in {**default_options, **user_provided_options}.items()) 111 _logger.debug( 112 "creating new channel with endpoint %s options %s and config %s", 113 target, 114 _options, 115 self.grpc_client_config, 116 ) 117 if not self.grpc_client_config.secure: 118 channel = grpc.insecure_channel(target, options=_options) 119 else: 120 ca_certs = self.config.ssl_ca_certs if self.config.ssl_ca_certs else certifi.where() 121 root_cas = open(ca_certs, "rb").read() 122 tls = grpc.ssl_channel_credentials(root_certificates=root_cas) 123 channel = grpc.secure_channel(target, tls, options=_options) 124 125 return channel 126 127 @property 128 def channel(self): 129 """Creates GRPC channel.""" 130 if self.grpc_client_config.reuse_channel and self._channel and self.grpc_server_on(): 131 return self._channel 132 self._channel = self._gen_channel() 133 return self._channel 134 135 def grpc_server_on(self) -> bool: 136 try: 137 grpc.channel_ready_future(self._channel).result(timeout=self.grpc_client_config.conn_timeout) 138 return True 139 except grpc.FutureTimeoutError: 140 return False 141 142 def close(self): 143 """Closes the connection to the index.""" 144 try: 145 self._channel.close() 146 except TypeError: 147 pass 148 149 def _wrap_grpc_call( 150 self, 151 func, 152 request, 153 timeout=None, 154 metadata=None, 155 credentials=None, 156 wait_for_ready=None, 157 compression=None, 158 ): 159 @wraps(func) 160 def wrapped(): 161 user_provided_metadata = metadata or {} 162 _metadata = tuple( 163 (k, v) 164 for k, v in { 165 **self.fixed_metadata, 166 **self._request_metadata(), 167 **user_provided_metadata, 168 }.items() 169 ) 170 try: 171 return func( 172 request, 173 timeout=timeout, 174 metadata=_metadata, 175 credentials=credentials, 176 wait_for_ready=wait_for_ready, 177 compression=compression, 178 ) 179 except _InactiveRpcError as e: 180 raise PineconeException(e._state.debug_error_string) from e 181 182 return wrapped() 183 184 def _request_metadata(self) -> Dict[str, str]: 185 return {REQUEST_ID: _generate_request_id()} 186 187 def __enter__(self): 188 return self 189 190 def __exit__(self, exc_type, exc_value, traceback): 191 self.close()
class
GRPCIndexBase(abc.ABC):
24class GRPCIndexBase(ABC): 25 """ 26 Base class for grpc-based interaction with Pinecone indexes 27 """ 28 29 _pool = None 30 31 def __init__( 32 self, 33 index_name: str, 34 config: Config, 35 channel: Optional[Channel] = None, 36 grpc_config: Optional[GRPCClientConfig] = None, 37 _endpoint_override: Optional[str] = None, 38 ): 39 self.name = index_name 40 41 self.config = config 42 self.grpc_client_config = grpc_config or GRPCClientConfig() 43 self.retry_config = self.grpc_client_config.retry_config or RetryConfig() 44 45 self.fixed_metadata = { 46 "api-key": config.api_key, 47 "service-name": index_name, 48 "client-version": CLIENT_VERSION, 49 } 50 if self.grpc_client_config.additional_metadata: 51 self.fixed_metadata.update(self.grpc_client_config.additional_metadata) 52 53 self._endpoint_override = _endpoint_override 54 55 self.method_config = json.dumps( 56 { 57 "methodConfig": [ 58 { 59 "name": [{"service": "VectorService.Upsert"}], 60 "retryPolicy": { 61 "maxAttempts": 5, 62 "initialBackoff": "0.1s", 63 "maxBackoff": "1s", 64 "backoffMultiplier": 2, 65 "retryableStatusCodes": ["UNAVAILABLE"], 66 }, 67 }, 68 { 69 "name": [{"service": "VectorService"}], 70 "retryPolicy": { 71 "maxAttempts": 5, 72 "initialBackoff": "0.1s", 73 "maxBackoff": "1s", 74 "backoffMultiplier": 2, 75 "retryableStatusCodes": ["UNAVAILABLE"], 76 }, 77 }, 78 ] 79 } 80 ) 81 82 options = {"grpc.primary_user_agent": get_user_agent_grpc(config)} 83 self._channel = channel or self._gen_channel(options=options) 84 self.stub = self.stub_class(self._channel) 85 86 @property 87 @abstractmethod 88 def stub_class(self): 89 pass 90 91 def _endpoint(self): 92 grpc_host = self.config.host.replace("https://", "") 93 if ":" not in grpc_host: 94 grpc_host = f"{grpc_host}:443" 95 return self._endpoint_override if self._endpoint_override else grpc_host 96 97 def _gen_channel(self, options=None): 98 target = self._endpoint() 99 default_options = { 100 "grpc.max_send_message_length": MAX_MSG_SIZE, 101 "grpc.max_receive_message_length": MAX_MSG_SIZE, 102 "grpc.service_config": self.method_config, 103 "grpc.enable_retries": True, 104 "grpc.per_rpc_retry_buffer_size": MAX_MSG_SIZE, 105 } 106 if self.grpc_client_config.secure: 107 default_options["grpc.ssl_target_name_override"] = target.split(":")[0] 108 if self.config.proxy_url: 109 default_options["grpc.http_proxy"] = self.config.proxy_url 110 user_provided_options = options or {} 111 _options = tuple((k, v) for k, v in {**default_options, **user_provided_options}.items()) 112 _logger.debug( 113 "creating new channel with endpoint %s options %s and config %s", 114 target, 115 _options, 116 self.grpc_client_config, 117 ) 118 if not self.grpc_client_config.secure: 119 channel = grpc.insecure_channel(target, options=_options) 120 else: 121 ca_certs = self.config.ssl_ca_certs if self.config.ssl_ca_certs else certifi.where() 122 root_cas = open(ca_certs, "rb").read() 123 tls = grpc.ssl_channel_credentials(root_certificates=root_cas) 124 channel = grpc.secure_channel(target, tls, options=_options) 125 126 return channel 127 128 @property 129 def channel(self): 130 """Creates GRPC channel.""" 131 if self.grpc_client_config.reuse_channel and self._channel and self.grpc_server_on(): 132 return self._channel 133 self._channel = self._gen_channel() 134 return self._channel 135 136 def grpc_server_on(self) -> bool: 137 try: 138 grpc.channel_ready_future(self._channel).result(timeout=self.grpc_client_config.conn_timeout) 139 return True 140 except grpc.FutureTimeoutError: 141 return False 142 143 def close(self): 144 """Closes the connection to the index.""" 145 try: 146 self._channel.close() 147 except TypeError: 148 pass 149 150 def _wrap_grpc_call( 151 self, 152 func, 153 request, 154 timeout=None, 155 metadata=None, 156 credentials=None, 157 wait_for_ready=None, 158 compression=None, 159 ): 160 @wraps(func) 161 def wrapped(): 162 user_provided_metadata = metadata or {} 163 _metadata = tuple( 164 (k, v) 165 for k, v in { 166 **self.fixed_metadata, 167 **self._request_metadata(), 168 **user_provided_metadata, 169 }.items() 170 ) 171 try: 172 return func( 173 request, 174 timeout=timeout, 175 metadata=_metadata, 176 credentials=credentials, 177 wait_for_ready=wait_for_ready, 178 compression=compression, 179 ) 180 except _InactiveRpcError as e: 181 raise PineconeException(e._state.debug_error_string) from e 182 183 return wrapped() 184 185 def _request_metadata(self) -> Dict[str, str]: 186 return {REQUEST_ID: _generate_request_id()} 187 188 def __enter__(self): 189 return self 190 191 def __exit__(self, exc_type, exc_value, traceback): 192 self.close()
Base class for grpc-based interaction with Pinecone indexes
channel
128 @property 129 def channel(self): 130 """Creates GRPC channel.""" 131 if self.grpc_client_config.reuse_channel and self._channel and self.grpc_server_on(): 132 return self._channel 133 self._channel = self._gen_channel() 134 return self._channel
Creates GRPC channel.