pinecone.grpc.base

  1import logging
  2from abc import ABC, abstractmethod
  3from functools import wraps
  4from typing import Dict, Optional
  5
  6import certifi
  7import grpc
  8from grpc._channel import _InactiveRpcError, Channel
  9import json
 10
 11from .retry import RetryConfig
 12
 13from pinecone import Config
 14from .utils import _generate_request_id
 15from .config import GRPCClientConfig
 16from pinecone.utils.constants import MAX_MSG_SIZE, REQUEST_ID, CLIENT_VERSION
 17from pinecone.utils.user_agent import get_user_agent_grpc
 18from pinecone.exceptions.exceptions import PineconeException
 19
 20_logger = logging.getLogger(__name__)
 21
 22
 23class GRPCIndexBase(ABC):
 24    """
 25    Base class for grpc-based interaction with Pinecone indexes
 26    """
 27
 28    _pool = None
 29
 30    def __init__(
 31        self,
 32        index_name: str,
 33        config: Config,
 34        channel: Optional[Channel] = None,
 35        grpc_config: Optional[GRPCClientConfig] = None,
 36        _endpoint_override: Optional[str] = None,
 37    ):
 38        self.name = index_name
 39
 40        self.config = config
 41        self.grpc_client_config = grpc_config or GRPCClientConfig()
 42        self.retry_config = self.grpc_client_config.retry_config or RetryConfig()
 43
 44        self.fixed_metadata = {
 45            "api-key": config.api_key,
 46            "service-name": index_name,
 47            "client-version": CLIENT_VERSION,
 48        }
 49        if self.grpc_client_config.additional_metadata:
 50            self.fixed_metadata.update(self.grpc_client_config.additional_metadata)
 51
 52        self._endpoint_override = _endpoint_override
 53
 54        self.method_config = json.dumps(
 55            {
 56                "methodConfig": [
 57                    {
 58                        "name": [{"service": "VectorService.Upsert"}],
 59                        "retryPolicy": {
 60                            "maxAttempts": 5,
 61                            "initialBackoff": "0.1s",
 62                            "maxBackoff": "1s",
 63                            "backoffMultiplier": 2,
 64                            "retryableStatusCodes": ["UNAVAILABLE"],
 65                        },
 66                    },
 67                    {
 68                        "name": [{"service": "VectorService"}],
 69                        "retryPolicy": {
 70                            "maxAttempts": 5,
 71                            "initialBackoff": "0.1s",
 72                            "maxBackoff": "1s",
 73                            "backoffMultiplier": 2,
 74                            "retryableStatusCodes": ["UNAVAILABLE"],
 75                        },
 76                    },
 77                ]
 78            }
 79        )
 80
 81        options = {"grpc.primary_user_agent": get_user_agent_grpc(config)}
 82        self._channel = channel or self._gen_channel(options=options)
 83        self.stub = self.stub_class(self._channel)
 84
 85    @property
 86    @abstractmethod
 87    def stub_class(self):
 88        pass
 89
 90    def _endpoint(self):
 91        grpc_host = self.config.host.replace("https://", "")
 92        if ":" not in grpc_host:
 93            grpc_host = f"{grpc_host}:443"
 94        return self._endpoint_override if self._endpoint_override else grpc_host
 95
 96    def _gen_channel(self, options=None):
 97        target = self._endpoint()
 98        default_options = {
 99            "grpc.max_send_message_length": MAX_MSG_SIZE,
100            "grpc.max_receive_message_length": MAX_MSG_SIZE,
101            "grpc.service_config": self.method_config,
102            "grpc.enable_retries": True,
103            "grpc.per_rpc_retry_buffer_size": MAX_MSG_SIZE,
104        }
105        if self.grpc_client_config.secure:
106            default_options["grpc.ssl_target_name_override"] = target.split(":")[0]
107        if self.config.proxy_url:
108            default_options["grpc.http_proxy"] = self.config.proxy_url
109        user_provided_options = options or {}
110        _options = tuple((k, v) for k, v in {**default_options, **user_provided_options}.items())
111        _logger.debug(
112            "creating new channel with endpoint %s options %s and config %s",
113            target,
114            _options,
115            self.grpc_client_config,
116        )
117        if not self.grpc_client_config.secure:
118            channel = grpc.insecure_channel(target, options=_options)
119        else:
120            ca_certs = self.config.ssl_ca_certs if self.config.ssl_ca_certs else certifi.where()
121            root_cas = open(ca_certs, "rb").read()
122            tls = grpc.ssl_channel_credentials(root_certificates=root_cas)
123            channel = grpc.secure_channel(target, tls, options=_options)
124
125        return channel
126
127    @property
128    def channel(self):
129        """Creates GRPC channel."""
130        if self.grpc_client_config.reuse_channel and self._channel and self.grpc_server_on():
131            return self._channel
132        self._channel = self._gen_channel()
133        return self._channel
134
135    def grpc_server_on(self) -> bool:
136        try:
137            grpc.channel_ready_future(self._channel).result(timeout=self.grpc_client_config.conn_timeout)
138            return True
139        except grpc.FutureTimeoutError:
140            return False
141
142    def close(self):
143        """Closes the connection to the index."""
144        try:
145            self._channel.close()
146        except TypeError:
147            pass
148
149    def _wrap_grpc_call(
150        self,
151        func,
152        request,
153        timeout=None,
154        metadata=None,
155        credentials=None,
156        wait_for_ready=None,
157        compression=None,
158    ):
159        @wraps(func)
160        def wrapped():
161            user_provided_metadata = metadata or {}
162            _metadata = tuple(
163                (k, v)
164                for k, v in {
165                    **self.fixed_metadata,
166                    **self._request_metadata(),
167                    **user_provided_metadata,
168                }.items()
169            )
170            try:
171                return func(
172                    request,
173                    timeout=timeout,
174                    metadata=_metadata,
175                    credentials=credentials,
176                    wait_for_ready=wait_for_ready,
177                    compression=compression,
178                )
179            except _InactiveRpcError as e:
180                raise PineconeException(e._state.debug_error_string) from e
181
182        return wrapped()
183
184    def _request_metadata(self) -> Dict[str, str]:
185        return {REQUEST_ID: _generate_request_id()}
186
187    def __enter__(self):
188        return self
189
190    def __exit__(self, exc_type, exc_value, traceback):
191        self.close()
class GRPCIndexBase(abc.ABC):
 24class GRPCIndexBase(ABC):
 25    """
 26    Base class for grpc-based interaction with Pinecone indexes
 27    """
 28
 29    _pool = None
 30
 31    def __init__(
 32        self,
 33        index_name: str,
 34        config: Config,
 35        channel: Optional[Channel] = None,
 36        grpc_config: Optional[GRPCClientConfig] = None,
 37        _endpoint_override: Optional[str] = None,
 38    ):
 39        self.name = index_name
 40
 41        self.config = config
 42        self.grpc_client_config = grpc_config or GRPCClientConfig()
 43        self.retry_config = self.grpc_client_config.retry_config or RetryConfig()
 44
 45        self.fixed_metadata = {
 46            "api-key": config.api_key,
 47            "service-name": index_name,
 48            "client-version": CLIENT_VERSION,
 49        }
 50        if self.grpc_client_config.additional_metadata:
 51            self.fixed_metadata.update(self.grpc_client_config.additional_metadata)
 52
 53        self._endpoint_override = _endpoint_override
 54
 55        self.method_config = json.dumps(
 56            {
 57                "methodConfig": [
 58                    {
 59                        "name": [{"service": "VectorService.Upsert"}],
 60                        "retryPolicy": {
 61                            "maxAttempts": 5,
 62                            "initialBackoff": "0.1s",
 63                            "maxBackoff": "1s",
 64                            "backoffMultiplier": 2,
 65                            "retryableStatusCodes": ["UNAVAILABLE"],
 66                        },
 67                    },
 68                    {
 69                        "name": [{"service": "VectorService"}],
 70                        "retryPolicy": {
 71                            "maxAttempts": 5,
 72                            "initialBackoff": "0.1s",
 73                            "maxBackoff": "1s",
 74                            "backoffMultiplier": 2,
 75                            "retryableStatusCodes": ["UNAVAILABLE"],
 76                        },
 77                    },
 78                ]
 79            }
 80        )
 81
 82        options = {"grpc.primary_user_agent": get_user_agent_grpc(config)}
 83        self._channel = channel or self._gen_channel(options=options)
 84        self.stub = self.stub_class(self._channel)
 85
 86    @property
 87    @abstractmethod
 88    def stub_class(self):
 89        pass
 90
 91    def _endpoint(self):
 92        grpc_host = self.config.host.replace("https://", "")
 93        if ":" not in grpc_host:
 94            grpc_host = f"{grpc_host}:443"
 95        return self._endpoint_override if self._endpoint_override else grpc_host
 96
 97    def _gen_channel(self, options=None):
 98        target = self._endpoint()
 99        default_options = {
100            "grpc.max_send_message_length": MAX_MSG_SIZE,
101            "grpc.max_receive_message_length": MAX_MSG_SIZE,
102            "grpc.service_config": self.method_config,
103            "grpc.enable_retries": True,
104            "grpc.per_rpc_retry_buffer_size": MAX_MSG_SIZE,
105        }
106        if self.grpc_client_config.secure:
107            default_options["grpc.ssl_target_name_override"] = target.split(":")[0]
108        if self.config.proxy_url:
109            default_options["grpc.http_proxy"] = self.config.proxy_url
110        user_provided_options = options or {}
111        _options = tuple((k, v) for k, v in {**default_options, **user_provided_options}.items())
112        _logger.debug(
113            "creating new channel with endpoint %s options %s and config %s",
114            target,
115            _options,
116            self.grpc_client_config,
117        )
118        if not self.grpc_client_config.secure:
119            channel = grpc.insecure_channel(target, options=_options)
120        else:
121            ca_certs = self.config.ssl_ca_certs if self.config.ssl_ca_certs else certifi.where()
122            root_cas = open(ca_certs, "rb").read()
123            tls = grpc.ssl_channel_credentials(root_certificates=root_cas)
124            channel = grpc.secure_channel(target, tls, options=_options)
125
126        return channel
127
128    @property
129    def channel(self):
130        """Creates GRPC channel."""
131        if self.grpc_client_config.reuse_channel and self._channel and self.grpc_server_on():
132            return self._channel
133        self._channel = self._gen_channel()
134        return self._channel
135
136    def grpc_server_on(self) -> bool:
137        try:
138            grpc.channel_ready_future(self._channel).result(timeout=self.grpc_client_config.conn_timeout)
139            return True
140        except grpc.FutureTimeoutError:
141            return False
142
143    def close(self):
144        """Closes the connection to the index."""
145        try:
146            self._channel.close()
147        except TypeError:
148            pass
149
150    def _wrap_grpc_call(
151        self,
152        func,
153        request,
154        timeout=None,
155        metadata=None,
156        credentials=None,
157        wait_for_ready=None,
158        compression=None,
159    ):
160        @wraps(func)
161        def wrapped():
162            user_provided_metadata = metadata or {}
163            _metadata = tuple(
164                (k, v)
165                for k, v in {
166                    **self.fixed_metadata,
167                    **self._request_metadata(),
168                    **user_provided_metadata,
169                }.items()
170            )
171            try:
172                return func(
173                    request,
174                    timeout=timeout,
175                    metadata=_metadata,
176                    credentials=credentials,
177                    wait_for_ready=wait_for_ready,
178                    compression=compression,
179                )
180            except _InactiveRpcError as e:
181                raise PineconeException(e._state.debug_error_string) from e
182
183        return wrapped()
184
185    def _request_metadata(self) -> Dict[str, str]:
186        return {REQUEST_ID: _generate_request_id()}
187
188    def __enter__(self):
189        return self
190
191    def __exit__(self, exc_type, exc_value, traceback):
192        self.close()

Base class for grpc-based interaction with Pinecone indexes

name
config
grpc_client_config
retry_config
fixed_metadata
method_config
stub
stub_class
86    @property
87    @abstractmethod
88    def stub_class(self):
89        pass
channel
128    @property
129    def channel(self):
130        """Creates GRPC channel."""
131        if self.grpc_client_config.reuse_channel and self._channel and self.grpc_server_on():
132            return self._channel
133        self._channel = self._gen_channel()
134        return self._channel

Creates GRPC channel.

def grpc_server_on(self) -> bool:
136    def grpc_server_on(self) -> bool:
137        try:
138            grpc.channel_ready_future(self._channel).result(timeout=self.grpc_client_config.conn_timeout)
139            return True
140        except grpc.FutureTimeoutError:
141            return False
def close(self):
143    def close(self):
144        """Closes the connection to the index."""
145        try:
146            self._channel.close()
147        except TypeError:
148            pass

Closes the connection to the index.