pinecone.grpc
Connecting to Pinecone with GRPC
The pinecone.grpc
submodule provides an alternative version of the Pinecone
client that uses gRPC instead of HTTP for data operations. This provides a
significant performance boost for data operations.
Installing the gRPC client
You must install extra dependencies in order to install the GRPC client.
Installing with pip
# Install the latest version
pip3 install pinecone[grpc]
# Install a specific version
pip3 install "pinecone[grpc]"==3.0.0
Installing with poetry
# Install the latest version
poetry add pinecone --extras grpc
# Install a specific version
poetry add pinecone==3.0.0 --extras grpc
Using the gRPC client
import os
from pinecone.grpc import PineconeGRPC
client = PineconeGRPC(api_key=os.environ.get("PINECONE_API_KEY"))
# From this point on, usage is identical to the HTTP client.
index = client.Index(host=os.environ("PINECONE_INDEX_HOST"))
index.query(vector=[...], top_k=10)
1""" 2Connecting to Pinecone with GRPC 3 4The `pinecone.grpc` submodule provides an alternative version of the Pinecone 5client that uses gRPC instead of HTTP for data operations. This provides a 6significant performance boost for data operations. 7 8### Installing the gRPC client 9 10You must install extra dependencies in order to install the GRPC client. 11 12#### Installing with pip 13 14```bash 15# Install the latest version 16pip3 install pinecone[grpc] 17 18# Install a specific version 19pip3 install "pinecone[grpc]"==3.0.0 20``` 21 22#### Installing with poetry 23 24```bash 25# Install the latest version 26poetry add pinecone --extras grpc 27 28# Install a specific version 29poetry add pinecone==3.0.0 --extras grpc 30``` 31 32### Using the gRPC client 33 34```python 35import os 36from pinecone.grpc import PineconeGRPC 37 38client = PineconeGRPC(api_key=os.environ.get("PINECONE_API_KEY")) 39 40# From this point on, usage is identical to the HTTP client. 41index = client.Index(host=os.environ("PINECONE_INDEX_HOST")) 42index.query(vector=[...], top_k=10) 43``` 44 45""" 46 47from .index_grpc import GRPCIndex 48from .pinecone import PineconeGRPC 49from .config import GRPCClientConfig 50from .future import PineconeGrpcFuture 51 52from pinecone.core.grpc.protos.vector_service_pb2 import ( 53 Vector as GRPCVector, 54 SparseValues as GRPCSparseValues, 55 Vector, 56 SparseValues, 57 DeleteResponse as GRPCDeleteResponse, 58) 59 60__all__ = [ 61 "GRPCIndex", 62 "PineconeGRPC", 63 "GRPCDeleteResponse", 64 "GRPCClientConfig", 65 "GRPCVector", 66 "GRPCSparseValues", 67 "Vector", 68 "SparseValues", 69 "PineconeGrpcFuture", 70]
57class GRPCIndex(GRPCIndexBase): 58 """A client for interacting with a Pinecone index via GRPC API.""" 59 60 @property 61 def stub_class(self): 62 return VectorServiceStub 63 64 def upsert( 65 self, 66 vectors: Union[List[GRPCVector], List[NonGRPCVector], List[tuple], List[dict]], 67 async_req: bool = False, 68 namespace: Optional[str] = None, 69 batch_size: Optional[int] = None, 70 show_progress: bool = True, 71 **kwargs, 72 ) -> Union[UpsertResponse, PineconeGrpcFuture]: 73 """ 74 The upsert operation writes vectors into a namespace. 75 If a new value is upserted for an existing vector id, it will overwrite the previous value. 76 77 Examples: 78 >>> index.upsert([('id1', [1.0, 2.0, 3.0], {'key': 'value'}), 79 ('id2', [1.0, 2.0, 3.0]) 80 ], 81 namespace='ns1', async_req=True) 82 >>> index.upsert([{'id': 'id1', 'values': [1.0, 2.0, 3.0], 'metadata': {'key': 'value'}}, 83 {'id': 'id2', 84 'values': [1.0, 2.0, 3.0], 85 'sparse_values': {'indices': [1, 8], 'values': [0.2, 0.4]}, 86 ]) 87 >>> index.upsert([GRPCVector(id='id1', values=[1.0, 2.0, 3.0], metadata={'key': 'value'}), 88 GRPCVector(id='id2', values=[1.0, 2.0, 3.0]), 89 GRPCVector(id='id3', 90 values=[1.0, 2.0, 3.0], 91 sparse_values=GRPCSparseValues(indices=[1, 2], values=[0.2, 0.4]))]) 92 93 Args: 94 vectors (Union[List[Vector], List[Tuple]]): A list of vectors to upsert. 95 96 A vector can be represented by a 1) GRPCVector object, a 2) tuple or 3) a dictionary 97 1) if a tuple is used, it must be of the form (id, values, metadata) or (id, values). 98 where id is a string, vector is a list of floats, and metadata is a dict. 99 Examples: ('id1', [1.0, 2.0, 3.0], {'key': 'value'}), ('id2', [1.0, 2.0, 3.0]) 100 101 2) if a GRPCVector object is used, a GRPCVector object must be of the form 102 GRPCVector(id, values, metadata), where metadata is an optional argument of type 103 Dict[str, Union[str, float, int, bool, List[int], List[float], List[str]]] 104 Examples: GRPCVector(id='id1', values=[1.0, 2.0, 3.0], metadata={'key': 'value'}), 105 GRPCVector(id='id2', values=[1.0, 2.0, 3.0]), 106 GRPCVector(id='id3', 107 values=[1.0, 2.0, 3.0], 108 sparse_values=GRPCSparseValues(indices=[1, 2], values=[0.2, 0.4])) 109 110 3) if a dictionary is used, it must be in the form 111 {'id': str, 'values': List[float], 'sparse_values': {'indices': List[int], 'values': List[float]}, 112 'metadata': dict} 113 114 Note: the dimension of each vector must match the dimension of the index. 115 async_req (bool): If True, the upsert operation will be performed asynchronously. 116 Cannot be used with batch_size. 117 Defaults to False. See: https://docs.pinecone.io/docs/performance-tuning [optional] 118 namespace (str): The namespace to write to. If not specified, the default namespace is used. [optional] 119 batch_size (int): The number of vectors to upsert in each batch. 120 Cannot be used with async_req=True. 121 If not specified, all vectors will be upserted in a single batch. [optional] 122 show_progress (bool): Whether to show a progress bar using tqdm. 123 Applied only if batch_size is provided. Default is True. 124 125 Returns: UpsertResponse, contains the number of vectors upserted 126 """ 127 if async_req and batch_size is not None: 128 raise ValueError( 129 "async_req is not supported when batch_size is provided." 130 "To upsert in parallel, please follow: " 131 "https://docs.pinecone.io/docs/performance-tuning" 132 ) 133 134 timeout = kwargs.pop("timeout", None) 135 136 vectors = list(map(VectorFactoryGRPC.build, vectors)) 137 if async_req: 138 args_dict = self._parse_non_empty_args([("namespace", namespace)]) 139 request = UpsertRequest(vectors=vectors, **args_dict, **kwargs) 140 future = self.runner.run(self.stub.Upsert.future, request, timeout=timeout) 141 return PineconeGrpcFuture(future) 142 143 if batch_size is None: 144 return self._upsert_batch(vectors, namespace, timeout=timeout, **kwargs) 145 146 if not isinstance(batch_size, int) or batch_size <= 0: 147 raise ValueError("batch_size must be a positive integer") 148 149 pbar = tqdm(total=len(vectors), disable=not show_progress, desc="Upserted vectors") 150 total_upserted = 0 151 for i in range(0, len(vectors), batch_size): 152 batch_result = self._upsert_batch( 153 vectors[i : i + batch_size], namespace, timeout=timeout, **kwargs 154 ) 155 pbar.update(batch_result.upserted_count) 156 # we can't use here pbar.n for the case show_progress=False 157 total_upserted += batch_result.upserted_count 158 159 return UpsertResponse(upserted_count=total_upserted) 160 161 def _upsert_batch( 162 self, vectors: List[GRPCVector], namespace: Optional[str], timeout: Optional[int], **kwargs 163 ) -> UpsertResponse: 164 args_dict = self._parse_non_empty_args([("namespace", namespace)]) 165 request = UpsertRequest(vectors=vectors, **args_dict) 166 return self.runner.run(self.stub.Upsert, request, timeout=timeout, **kwargs) 167 168 def upsert_from_dataframe( 169 self, 170 df, 171 namespace: str = "", 172 batch_size: int = 500, 173 use_async_requests: bool = True, 174 show_progress: bool = True, 175 ) -> UpsertResponse: 176 """Upserts a dataframe into the index. 177 178 Args: 179 df: A pandas dataframe with the following columns: id, values, sparse_values, and metadata. 180 namespace: The namespace to upsert into. 181 batch_size: The number of rows to upsert in a single batch. 182 use_async_requests: Whether to upsert multiple requests at the same time using asynchronous request mechanism. 183 Set to `False` 184 show_progress: Whether to show a progress bar. 185 """ 186 try: 187 import pandas as pd 188 except ImportError: 189 raise RuntimeError( 190 "The `pandas` package is not installed. Please install pandas to use `upsert_from_dataframe()`" 191 ) 192 193 if not isinstance(df, pd.DataFrame): 194 raise ValueError(f"Only pandas dataframes are supported. Found: {type(df)}") 195 196 pbar = tqdm(total=len(df), disable=not show_progress, desc="sending upsert requests") 197 results = [] 198 for chunk in self._iter_dataframe(df, batch_size=batch_size): 199 res = self.upsert(vectors=chunk, namespace=namespace, async_req=use_async_requests) 200 pbar.update(len(chunk)) 201 results.append(res) 202 203 if use_async_requests: 204 cast_results = cast(List[PineconeGrpcFuture], results) 205 results = [ 206 async_result.result() 207 for async_result in tqdm( 208 cast_results, disable=not show_progress, desc="collecting async responses" 209 ) 210 ] 211 212 upserted_count = 0 213 for res in results: 214 if hasattr(res, "upserted_count") and isinstance(res.upserted_count, int): 215 upserted_count += res.upserted_count 216 217 return UpsertResponse(upserted_count=upserted_count) 218 219 @staticmethod 220 def _iter_dataframe(df, batch_size): 221 for i in range(0, len(df), batch_size): 222 batch = df.iloc[i : i + batch_size].to_dict(orient="records") 223 yield batch 224 225 def delete( 226 self, 227 ids: Optional[List[str]] = None, 228 delete_all: Optional[bool] = None, 229 namespace: Optional[str] = None, 230 filter: Optional[Dict[str, Union[str, float, int, bool, List, dict]]] = None, 231 async_req: bool = False, 232 **kwargs, 233 ) -> Union[DeleteResponse, PineconeGrpcFuture]: 234 """ 235 The Delete operation deletes vectors from the index, from a single namespace. 236 No error raised if the vector id does not exist. 237 Note: for any delete call, if namespace is not specified, the default namespace is used. 238 239 Delete can occur in the following mutual exclusive ways: 240 1. Delete by ids from a single namespace 241 2. Delete all vectors from a single namespace by setting delete_all to True 242 3. Delete all vectors from a single namespace by specifying a metadata filter 243 (note that for this option delete all must be set to False) 244 245 Examples: 246 >>> index.delete(ids=['id1', 'id2'], namespace='my_namespace') 247 >>> index.delete(delete_all=True, namespace='my_namespace') 248 >>> index.delete(filter={'key': 'value'}, namespace='my_namespace', async_req=True) 249 250 Args: 251 ids (List[str]): Vector ids to delete [optional] 252 delete_all (bool): This indicates that all vectors in the index namespace should be deleted.. [optional] 253 Default is False. 254 namespace (str): The namespace to delete vectors from [optional] 255 If not specified, the default namespace is used. 256 filter (Dict[str, Union[str, float, int, bool, List, dict]]): 257 If specified, the metadata filter here will be used to select the vectors to delete. 258 This is mutually exclusive with specifying ids to delete in the ids param or using delete_all=True. 259 See https://www.pinecone.io/docs/metadata-filtering/.. [optional] 260 async_req (bool): If True, the delete operation will be performed asynchronously. 261 Defaults to False. [optional] 262 263 Returns: DeleteResponse (contains no data) or a PineconeGrpcFuture object if async_req is True. 264 """ 265 266 if filter is not None: 267 filter_struct = dict_to_proto_struct(filter) 268 else: 269 filter_struct = None 270 271 args_dict = self._parse_non_empty_args( 272 [ 273 ("ids", ids), 274 ("delete_all", delete_all), 275 ("namespace", namespace), 276 ("filter", filter_struct), 277 ] 278 ) 279 timeout = kwargs.pop("timeout", None) 280 281 request = DeleteRequest(**args_dict, **kwargs) 282 if async_req: 283 future = self.runner.run(self.stub.Delete.future, request, timeout=timeout) 284 return PineconeGrpcFuture(future) 285 else: 286 return self.runner.run(self.stub.Delete, request, timeout=timeout) 287 288 def fetch( 289 self, 290 ids: Optional[List[str]], 291 namespace: Optional[str] = None, 292 async_req: Optional[bool] = False, 293 **kwargs, 294 ) -> Union[FetchResponse, PineconeGrpcFuture]: 295 """ 296 The fetch operation looks up and returns vectors, by ID, from a single namespace. 297 The returned vectors include the vector data and/or metadata. 298 299 Examples: 300 >>> index.fetch(ids=['id1', 'id2'], namespace='my_namespace') 301 >>> index.fetch(ids=['id1', 'id2']) 302 303 Args: 304 ids (List[str]): The vector IDs to fetch. 305 namespace (str): The namespace to fetch vectors from. 306 If not specified, the default namespace is used. [optional] 307 308 Returns: FetchResponse object which contains the list of Vector objects, and namespace name. 309 """ 310 timeout = kwargs.pop("timeout", None) 311 312 args_dict = self._parse_non_empty_args([("namespace", namespace)]) 313 314 request = FetchRequest(ids=ids, **args_dict, **kwargs) 315 316 if async_req: 317 future = self.runner.run(self.stub.Fetch.future, request, timeout=timeout) 318 return PineconeGrpcFuture(future, result_transformer=parse_fetch_response) 319 else: 320 response = self.runner.run(self.stub.Fetch, request, timeout=timeout) 321 return parse_fetch_response(response) 322 323 def query( 324 self, 325 vector: Optional[List[float]] = None, 326 id: Optional[str] = None, 327 namespace: Optional[str] = None, 328 top_k: Optional[int] = None, 329 filter: Optional[Dict[str, Union[str, float, int, bool, List, dict]]] = None, 330 include_values: Optional[bool] = None, 331 include_metadata: Optional[bool] = None, 332 sparse_vector: Optional[Union[GRPCSparseValues, SparseVectorTypedDict]] = None, 333 async_req: Optional[bool] = False, 334 **kwargs, 335 ) -> Union[QueryResponse, PineconeGrpcFuture]: 336 """ 337 The Query operation searches a namespace, using a query vector. 338 It retrieves the ids of the most similar items in a namespace, along with their similarity scores. 339 340 Examples: 341 >>> index.query(vector=[1, 2, 3], top_k=10, namespace='my_namespace') 342 >>> index.query(id='id1', top_k=10, namespace='my_namespace') 343 >>> index.query(vector=[1, 2, 3], top_k=10, namespace='my_namespace', filter={'key': 'value'}) 344 >>> index.query(id='id1', top_k=10, namespace='my_namespace', include_metadata=True, include_values=True) 345 >>> index.query(vector=[1, 2, 3], sparse_vector={'indices': [1, 2], 'values': [0.2, 0.4]}, 346 >>> top_k=10, namespace='my_namespace') 347 >>> index.query(vector=[1, 2, 3], sparse_vector=GRPCSparseValues([1, 2], [0.2, 0.4]), 348 >>> top_k=10, namespace='my_namespace') 349 350 Args: 351 vector (List[float]): The query vector. This should be the same length as the dimension of the index 352 being queried. Each `query()` request can contain only one of the parameters 353 `id` or `vector`.. [optional] 354 id (str): The unique ID of the vector to be used as a query vector. 355 Each `query()` request can contain only one of the parameters 356 `vector` or `id`.. [optional] 357 top_k (int): The number of results to return for each query. Must be an integer greater than 1. 358 namespace (str): The namespace to fetch vectors from. 359 If not specified, the default namespace is used. [optional] 360 filter (Dict[str, Union[str, float, int, bool, List, dict]]): 361 The filter to apply. You can use vector metadata to limit your search. 362 See https://www.pinecone.io/docs/metadata-filtering/.. [optional] 363 include_values (bool): Indicates whether vector values are included in the response. 364 If omitted the server will use the default value of False [optional] 365 include_metadata (bool): Indicates whether metadata is included in the response as well as the ids. 366 If omitted the server will use the default value of False [optional] 367 sparse_vector: (Union[SparseValues, Dict[str, Union[List[float], List[int]]]]): sparse values of the query vector. 368 Expected to be either a GRPCSparseValues object or a dict of the form: 369 {'indices': List[int], 'values': List[float]}, where the lists each have the same length. 370 371 Returns: QueryResponse object which contains the list of the closest vectors as ScoredVector objects, 372 and namespace name. 373 """ 374 375 if vector is not None and id is not None: 376 raise ValueError("Cannot specify both `id` and `vector`") 377 378 if filter is not None: 379 filter_struct = dict_to_proto_struct(filter) 380 else: 381 filter_struct = None 382 383 sparse_vector = self._parse_sparse_values_arg(sparse_vector) 384 args_dict = self._parse_non_empty_args( 385 [ 386 ("vector", vector), 387 ("id", id), 388 ("namespace", namespace), 389 ("top_k", top_k), 390 ("filter", filter_struct), 391 ("include_values", include_values), 392 ("include_metadata", include_metadata), 393 ("sparse_vector", sparse_vector), 394 ] 395 ) 396 397 request = QueryRequest(**args_dict) 398 399 timeout = kwargs.pop("timeout", None) 400 401 if async_req: 402 future = self.runner.run(self.stub.Query.future, request, timeout=timeout) 403 return PineconeGrpcFuture(future) 404 else: 405 response = self.runner.run(self.stub.Query, request, timeout=timeout) 406 json_response = json_format.MessageToDict(response) 407 return parse_query_response(json_response, _check_type=False) 408 409 def query_namespaces( 410 self, 411 vector: List[float], 412 namespaces: List[str], 413 top_k: Optional[int] = None, 414 filter: Optional[Dict[str, Union[str, float, int, bool, List, dict]]] = None, 415 include_values: Optional[bool] = None, 416 include_metadata: Optional[bool] = None, 417 sparse_vector: Optional[Union[GRPCSparseValues, SparseVectorTypedDict]] = None, 418 **kwargs, 419 ) -> QueryNamespacesResults: 420 if namespaces is None or len(namespaces) == 0: 421 raise ValueError("At least one namespace must be specified") 422 if len(vector) == 0: 423 raise ValueError("Query vector must not be empty") 424 425 overall_topk = top_k if top_k is not None else 10 426 aggregator = QueryResultsAggregator(top_k=overall_topk) 427 428 target_namespaces = set(namespaces) # dedup namespaces 429 futures = [ 430 self.threadpool_executor.submit( 431 self.query, 432 vector=vector, 433 namespace=ns, 434 top_k=overall_topk, 435 filter=filter, 436 include_values=include_values, 437 include_metadata=include_metadata, 438 sparse_vector=sparse_vector, 439 async_req=False, 440 **kwargs, 441 ) 442 for ns in target_namespaces 443 ] 444 445 only_futures = cast(Iterable[Future], futures) 446 for response in as_completed(only_futures): 447 aggregator.add_results(response.result()) 448 449 final_results = aggregator.get_results() 450 return final_results 451 452 def update( 453 self, 454 id: str, 455 async_req: bool = False, 456 values: Optional[List[float]] = None, 457 set_metadata: Optional[ 458 Dict[str, Union[str, float, int, bool, List[int], List[float], List[str]]] 459 ] = None, 460 namespace: Optional[str] = None, 461 sparse_values: Optional[Union[GRPCSparseValues, SparseVectorTypedDict]] = None, 462 **kwargs, 463 ) -> Union[UpdateResponse, PineconeGrpcFuture]: 464 """ 465 The Update operation updates vector in a namespace. 466 If a value is included, it will overwrite the previous value. 467 If a set_metadata is included, 468 the values of the fields specified in it will be added or overwrite the previous value. 469 470 Examples: 471 >>> index.update(id='id1', values=[1, 2, 3], namespace='my_namespace') 472 >>> index.update(id='id1', set_metadata={'key': 'value'}, namespace='my_namespace', async_req=True) 473 >>> index.update(id='id1', values=[1, 2, 3], sparse_values={'indices': [1, 2], 'values': [0.2, 0.4]}, 474 >>> namespace='my_namespace') 475 >>> index.update(id='id1', values=[1, 2, 3], sparse_values=GRPCSparseValues(indices=[1, 2], values=[0.2, 0.4]), 476 >>> namespace='my_namespace') 477 478 Args: 479 id (str): Vector's unique id. 480 async_req (bool): If True, the update operation will be performed asynchronously. 481 Defaults to False. [optional] 482 values (List[float]): vector values to set. [optional] 483 set_metadata (Dict[str, Union[str, float, int, bool, List[int], List[float], List[str]]]]): 484 metadata to set for vector. [optional] 485 namespace (str): Namespace name where to update the vector.. [optional] 486 sparse_values: (Dict[str, Union[List[float], List[int]]]): sparse values to update for the vector. 487 Expected to be either a GRPCSparseValues object or a dict of the form: 488 {'indices': List[int], 'values': List[float]} where the lists each have the same length. 489 490 491 Returns: UpdateResponse (contains no data) or a PineconeGrpcFuture object if async_req is True. 492 """ 493 if set_metadata is not None: 494 set_metadata_struct = dict_to_proto_struct(set_metadata) 495 else: 496 set_metadata_struct = None 497 498 timeout = kwargs.pop("timeout", None) 499 sparse_values = self._parse_sparse_values_arg(sparse_values) 500 args_dict = self._parse_non_empty_args( 501 [ 502 ("values", values), 503 ("set_metadata", set_metadata_struct), 504 ("namespace", namespace), 505 ("sparse_values", sparse_values), 506 ] 507 ) 508 509 request = UpdateRequest(id=id, **args_dict) 510 if async_req: 511 future = self.runner.run(self.stub.Update.future, request, timeout=timeout) 512 return PineconeGrpcFuture(future) 513 else: 514 return self.runner.run(self.stub.Update, request, timeout=timeout) 515 516 def list_paginated( 517 self, 518 prefix: Optional[str] = None, 519 limit: Optional[int] = None, 520 pagination_token: Optional[str] = None, 521 namespace: Optional[str] = None, 522 **kwargs, 523 ) -> SimpleListResponse: 524 """ 525 The list_paginated operation finds vectors based on an id prefix within a single namespace. 526 It returns matching ids in a paginated form, with a pagination token to fetch the next page of results. 527 This id list can then be passed to fetch or delete operations, depending on your use case. 528 529 Consider using the `list` method to avoid having to handle pagination tokens manually. 530 531 Examples: 532 >>> results = index.list_paginated(prefix='99', limit=5, namespace='my_namespace') 533 >>> [v.id for v in results.vectors] 534 ['99', '990', '991', '992', '993'] 535 >>> results.pagination.next 536 eyJza2lwX3Bhc3QiOiI5OTMiLCJwcmVmaXgiOiI5OSJ9 537 >>> next_results = index.list_paginated(prefix='99', limit=5, namespace='my_namespace', pagination_token=results.pagination.next) 538 539 Args: 540 prefix (Optional[str]): The id prefix to match. If unspecified, an empty string prefix will 541 be used with the effect of listing all ids in a namespace [optional] 542 limit (Optional[int]): The maximum number of ids to return. If unspecified, the server will use a default value. [optional] 543 pagination_token (Optional[str]): A token needed to fetch the next page of results. This token is returned 544 in the response if additional results are available. [optional] 545 namespace (Optional[str]): The namespace to fetch vectors from. If not specified, the default namespace is used. [optional] 546 547 Returns: SimpleListResponse object which contains the list of ids, the namespace name, pagination information, and usage showing the number of read_units consumed. 548 """ 549 args_dict = self._parse_non_empty_args( 550 [ 551 ("prefix", prefix), 552 ("limit", limit), 553 ("namespace", namespace), 554 ("pagination_token", pagination_token), 555 ] 556 ) 557 request = ListRequest(**args_dict, **kwargs) 558 timeout = kwargs.pop("timeout", None) 559 response = self.runner.run(self.stub.List, request, timeout=timeout) 560 561 if response.pagination and response.pagination.next != "": 562 pagination = Pagination(next=response.pagination.next) 563 else: 564 pagination = None 565 566 return SimpleListResponse( 567 namespace=response.namespace, vectors=response.vectors, pagination=pagination 568 ) 569 570 def list(self, **kwargs): 571 """ 572 The list operation accepts all of the same arguments as list_paginated, and returns a generator that yields 573 a list of the matching vector ids in each page of results. It automatically handles pagination tokens on your 574 behalf. 575 576 Examples: 577 >>> for ids in index.list(prefix='99', limit=5, namespace='my_namespace'): 578 >>> print(ids) 579 ['99', '990', '991', '992', '993'] 580 ['994', '995', '996', '997', '998'] 581 ['999'] 582 583 Args: 584 prefix (Optional[str]): The id prefix to match. If unspecified, an empty string prefix will 585 be used with the effect of listing all ids in a namespace [optional] 586 limit (Optional[int]): The maximum number of ids to return. If unspecified, the server will use a default value. [optional] 587 pagination_token (Optional[str]): A token needed to fetch the next page of results. This token is returned 588 in the response if additional results are available. [optional] 589 namespace (Optional[str]): The namespace to fetch vectors from. If not specified, the default namespace is used. [optional] 590 """ 591 done = False 592 while not done: 593 try: 594 results = self.list_paginated(**kwargs) 595 except Exception as e: 596 raise e 597 598 if len(results.vectors) > 0: 599 yield [v.id for v in results.vectors] 600 601 if results.pagination and results.pagination.next: 602 kwargs.update({"pagination_token": results.pagination.next}) 603 else: 604 done = True 605 606 def describe_index_stats( 607 self, filter: Optional[Dict[str, Union[str, float, int, bool, List, dict]]] = None, **kwargs 608 ) -> DescribeIndexStatsResponse: 609 """ 610 The DescribeIndexStats operation returns statistics about the index's contents. 611 For example: The vector count per namespace and the number of dimensions. 612 613 Examples: 614 >>> index.describe_index_stats() 615 >>> index.describe_index_stats(filter={'key': 'value'}) 616 617 Args: 618 filter (Dict[str, Union[str, float, int, bool, List, dict]]): 619 If this parameter is present, the operation only returns statistics for vectors that satisfy the filter. 620 See https://www.pinecone.io/docs/metadata-filtering/.. [optional] 621 622 Returns: DescribeIndexStatsResponse object which contains stats about the index. 623 """ 624 if filter is not None: 625 filter_struct = dict_to_proto_struct(filter) 626 else: 627 filter_struct = None 628 args_dict = self._parse_non_empty_args([("filter", filter_struct)]) 629 timeout = kwargs.pop("timeout", None) 630 631 request = DescribeIndexStatsRequest(**args_dict) 632 response = self.runner.run(self.stub.DescribeIndexStats, request, timeout=timeout) 633 json_response = json_format.MessageToDict(response) 634 return parse_stats_response(json_response) 635 636 @staticmethod 637 def _parse_non_empty_args(args: List[Tuple[str, Any]]) -> Dict[str, Any]: 638 return {arg_name: val for arg_name, val in args if val is not None} 639 640 @staticmethod 641 def _parse_sparse_values_arg( 642 sparse_values: Optional[Union[GRPCSparseValues, SparseVectorTypedDict]], 643 ) -> Optional[GRPCSparseValues]: 644 if sparse_values is None: 645 return None 646 647 if isinstance(sparse_values, GRPCSparseValues): 648 return sparse_values 649 650 if ( 651 not isinstance(sparse_values, dict) 652 or "indices" not in sparse_values 653 or "values" not in sparse_values 654 ): 655 raise ValueError( 656 "Invalid sparse values argument. Expected a dict of: {'indices': List[int], 'values': List[float]}." 657 f"Received: {sparse_values}" 658 ) 659 660 return GRPCSparseValues(indices=sparse_values["indices"], values=sparse_values["values"])
A client for interacting with a Pinecone index via GRPC API.
64 def upsert( 65 self, 66 vectors: Union[List[GRPCVector], List[NonGRPCVector], List[tuple], List[dict]], 67 async_req: bool = False, 68 namespace: Optional[str] = None, 69 batch_size: Optional[int] = None, 70 show_progress: bool = True, 71 **kwargs, 72 ) -> Union[UpsertResponse, PineconeGrpcFuture]: 73 """ 74 The upsert operation writes vectors into a namespace. 75 If a new value is upserted for an existing vector id, it will overwrite the previous value. 76 77 Examples: 78 >>> index.upsert([('id1', [1.0, 2.0, 3.0], {'key': 'value'}), 79 ('id2', [1.0, 2.0, 3.0]) 80 ], 81 namespace='ns1', async_req=True) 82 >>> index.upsert([{'id': 'id1', 'values': [1.0, 2.0, 3.0], 'metadata': {'key': 'value'}}, 83 {'id': 'id2', 84 'values': [1.0, 2.0, 3.0], 85 'sparse_values': {'indices': [1, 8], 'values': [0.2, 0.4]}, 86 ]) 87 >>> index.upsert([GRPCVector(id='id1', values=[1.0, 2.0, 3.0], metadata={'key': 'value'}), 88 GRPCVector(id='id2', values=[1.0, 2.0, 3.0]), 89 GRPCVector(id='id3', 90 values=[1.0, 2.0, 3.0], 91 sparse_values=GRPCSparseValues(indices=[1, 2], values=[0.2, 0.4]))]) 92 93 Args: 94 vectors (Union[List[Vector], List[Tuple]]): A list of vectors to upsert. 95 96 A vector can be represented by a 1) GRPCVector object, a 2) tuple or 3) a dictionary 97 1) if a tuple is used, it must be of the form (id, values, metadata) or (id, values). 98 where id is a string, vector is a list of floats, and metadata is a dict. 99 Examples: ('id1', [1.0, 2.0, 3.0], {'key': 'value'}), ('id2', [1.0, 2.0, 3.0]) 100 101 2) if a GRPCVector object is used, a GRPCVector object must be of the form 102 GRPCVector(id, values, metadata), where metadata is an optional argument of type 103 Dict[str, Union[str, float, int, bool, List[int], List[float], List[str]]] 104 Examples: GRPCVector(id='id1', values=[1.0, 2.0, 3.0], metadata={'key': 'value'}), 105 GRPCVector(id='id2', values=[1.0, 2.0, 3.0]), 106 GRPCVector(id='id3', 107 values=[1.0, 2.0, 3.0], 108 sparse_values=GRPCSparseValues(indices=[1, 2], values=[0.2, 0.4])) 109 110 3) if a dictionary is used, it must be in the form 111 {'id': str, 'values': List[float], 'sparse_values': {'indices': List[int], 'values': List[float]}, 112 'metadata': dict} 113 114 Note: the dimension of each vector must match the dimension of the index. 115 async_req (bool): If True, the upsert operation will be performed asynchronously. 116 Cannot be used with batch_size. 117 Defaults to False. See: https://docs.pinecone.io/docs/performance-tuning [optional] 118 namespace (str): The namespace to write to. If not specified, the default namespace is used. [optional] 119 batch_size (int): The number of vectors to upsert in each batch. 120 Cannot be used with async_req=True. 121 If not specified, all vectors will be upserted in a single batch. [optional] 122 show_progress (bool): Whether to show a progress bar using tqdm. 123 Applied only if batch_size is provided. Default is True. 124 125 Returns: UpsertResponse, contains the number of vectors upserted 126 """ 127 if async_req and batch_size is not None: 128 raise ValueError( 129 "async_req is not supported when batch_size is provided." 130 "To upsert in parallel, please follow: " 131 "https://docs.pinecone.io/docs/performance-tuning" 132 ) 133 134 timeout = kwargs.pop("timeout", None) 135 136 vectors = list(map(VectorFactoryGRPC.build, vectors)) 137 if async_req: 138 args_dict = self._parse_non_empty_args([("namespace", namespace)]) 139 request = UpsertRequest(vectors=vectors, **args_dict, **kwargs) 140 future = self.runner.run(self.stub.Upsert.future, request, timeout=timeout) 141 return PineconeGrpcFuture(future) 142 143 if batch_size is None: 144 return self._upsert_batch(vectors, namespace, timeout=timeout, **kwargs) 145 146 if not isinstance(batch_size, int) or batch_size <= 0: 147 raise ValueError("batch_size must be a positive integer") 148 149 pbar = tqdm(total=len(vectors), disable=not show_progress, desc="Upserted vectors") 150 total_upserted = 0 151 for i in range(0, len(vectors), batch_size): 152 batch_result = self._upsert_batch( 153 vectors[i : i + batch_size], namespace, timeout=timeout, **kwargs 154 ) 155 pbar.update(batch_result.upserted_count) 156 # we can't use here pbar.n for the case show_progress=False 157 total_upserted += batch_result.upserted_count 158 159 return UpsertResponse(upserted_count=total_upserted)
The upsert operation writes vectors into a namespace. If a new value is upserted for an existing vector id, it will overwrite the previous value.
Examples:
>>> index.upsert([('id1', [1.0, 2.0, 3.0], {'key': 'value'}), ('id2', [1.0, 2.0, 3.0]) ], namespace='ns1', async_req=True) >>> index.upsert([{'id': 'id1', 'values': [1.0, 2.0, 3.0], 'metadata': {'key': 'value'}}, {'id': 'id2', 'values': [1.0, 2.0, 3.0], 'sparse_values': {'indices': [1, 8], 'values': [0.2, 0.4]}, ]) >>> index.upsert([GRPCVector(id='id1', values=[1.0, 2.0, 3.0], metadata={'key': 'value'}), GRPCVector(id='id2', values=[1.0, 2.0, 3.0]), GRPCVector(id='id3', values=[1.0, 2.0, 3.0], sparse_values=GRPCSparseValues(indices=[1, 2], values=[0.2, 0.4]))])
Arguments:
vectors (Union[List[Vector], List[Tuple]]): A list of vectors to upsert.
A vector can be represented by a 1) GRPCVector object, a 2) tuple or 3) a dictionary 1) if a tuple is used, it must be of the form (id, values, metadata) or (id, values). where id is a string, vector is a list of floats, and metadata is a dict. Examples: ('id1', [1.0, 2.0, 3.0], {'key': 'value'}), ('id2', [1.0, 2.0, 3.0])
2) if a GRPCVector object is used, a GRPCVector object must be of the form GRPCVector(id, values, metadata), where metadata is an optional argument of type Dict[str, Union[str, float, int, bool, List[int], List[float], List[str]]] Examples: GRPCVector(id='id1', values=[1.0, 2.0, 3.0], metadata={'key': 'value'}), GRPCVector(id='id2', values=[1.0, 2.0, 3.0]), GRPCVector(id='id3', values=[1.0, 2.0, 3.0], sparse_values=GRPCSparseValues(indices=[1, 2], values=[0.2, 0.4]))
3) if a dictionary is used, it must be in the form {'id': str, 'values': List[float], 'sparse_values': {'indices': List[int], 'values': List[float]}, 'metadata': dict}
Note: the dimension of each vector must match the dimension of the index.
- async_req (bool): If True, the upsert operation will be performed asynchronously. Cannot be used with batch_size. Defaults to False. See: https://docs.pinecone.io/docs/performance-tuning [optional]
- namespace (str): The namespace to write to. If not specified, the default namespace is used. [optional]
- batch_size (int): The number of vectors to upsert in each batch. Cannot be used with async_req=True. If not specified, all vectors will be upserted in a single batch. [optional]
- show_progress (bool): Whether to show a progress bar using tqdm. Applied only if batch_size is provided. Default is True.
Returns: UpsertResponse, contains the number of vectors upserted
168 def upsert_from_dataframe( 169 self, 170 df, 171 namespace: str = "", 172 batch_size: int = 500, 173 use_async_requests: bool = True, 174 show_progress: bool = True, 175 ) -> UpsertResponse: 176 """Upserts a dataframe into the index. 177 178 Args: 179 df: A pandas dataframe with the following columns: id, values, sparse_values, and metadata. 180 namespace: The namespace to upsert into. 181 batch_size: The number of rows to upsert in a single batch. 182 use_async_requests: Whether to upsert multiple requests at the same time using asynchronous request mechanism. 183 Set to `False` 184 show_progress: Whether to show a progress bar. 185 """ 186 try: 187 import pandas as pd 188 except ImportError: 189 raise RuntimeError( 190 "The `pandas` package is not installed. Please install pandas to use `upsert_from_dataframe()`" 191 ) 192 193 if not isinstance(df, pd.DataFrame): 194 raise ValueError(f"Only pandas dataframes are supported. Found: {type(df)}") 195 196 pbar = tqdm(total=len(df), disable=not show_progress, desc="sending upsert requests") 197 results = [] 198 for chunk in self._iter_dataframe(df, batch_size=batch_size): 199 res = self.upsert(vectors=chunk, namespace=namespace, async_req=use_async_requests) 200 pbar.update(len(chunk)) 201 results.append(res) 202 203 if use_async_requests: 204 cast_results = cast(List[PineconeGrpcFuture], results) 205 results = [ 206 async_result.result() 207 for async_result in tqdm( 208 cast_results, disable=not show_progress, desc="collecting async responses" 209 ) 210 ] 211 212 upserted_count = 0 213 for res in results: 214 if hasattr(res, "upserted_count") and isinstance(res.upserted_count, int): 215 upserted_count += res.upserted_count 216 217 return UpsertResponse(upserted_count=upserted_count)
Upserts a dataframe into the index.
Arguments:
- df: A pandas dataframe with the following columns: id, values, sparse_values, and metadata.
- namespace: The namespace to upsert into.
- batch_size: The number of rows to upsert in a single batch.
- use_async_requests: Whether to upsert multiple requests at the same time using asynchronous request mechanism.
Set to
False
- show_progress: Whether to show a progress bar.
225 def delete( 226 self, 227 ids: Optional[List[str]] = None, 228 delete_all: Optional[bool] = None, 229 namespace: Optional[str] = None, 230 filter: Optional[Dict[str, Union[str, float, int, bool, List, dict]]] = None, 231 async_req: bool = False, 232 **kwargs, 233 ) -> Union[DeleteResponse, PineconeGrpcFuture]: 234 """ 235 The Delete operation deletes vectors from the index, from a single namespace. 236 No error raised if the vector id does not exist. 237 Note: for any delete call, if namespace is not specified, the default namespace is used. 238 239 Delete can occur in the following mutual exclusive ways: 240 1. Delete by ids from a single namespace 241 2. Delete all vectors from a single namespace by setting delete_all to True 242 3. Delete all vectors from a single namespace by specifying a metadata filter 243 (note that for this option delete all must be set to False) 244 245 Examples: 246 >>> index.delete(ids=['id1', 'id2'], namespace='my_namespace') 247 >>> index.delete(delete_all=True, namespace='my_namespace') 248 >>> index.delete(filter={'key': 'value'}, namespace='my_namespace', async_req=True) 249 250 Args: 251 ids (List[str]): Vector ids to delete [optional] 252 delete_all (bool): This indicates that all vectors in the index namespace should be deleted.. [optional] 253 Default is False. 254 namespace (str): The namespace to delete vectors from [optional] 255 If not specified, the default namespace is used. 256 filter (Dict[str, Union[str, float, int, bool, List, dict]]): 257 If specified, the metadata filter here will be used to select the vectors to delete. 258 This is mutually exclusive with specifying ids to delete in the ids param or using delete_all=True. 259 See https://www.pinecone.io/docs/metadata-filtering/.. [optional] 260 async_req (bool): If True, the delete operation will be performed asynchronously. 261 Defaults to False. [optional] 262 263 Returns: DeleteResponse (contains no data) or a PineconeGrpcFuture object if async_req is True. 264 """ 265 266 if filter is not None: 267 filter_struct = dict_to_proto_struct(filter) 268 else: 269 filter_struct = None 270 271 args_dict = self._parse_non_empty_args( 272 [ 273 ("ids", ids), 274 ("delete_all", delete_all), 275 ("namespace", namespace), 276 ("filter", filter_struct), 277 ] 278 ) 279 timeout = kwargs.pop("timeout", None) 280 281 request = DeleteRequest(**args_dict, **kwargs) 282 if async_req: 283 future = self.runner.run(self.stub.Delete.future, request, timeout=timeout) 284 return PineconeGrpcFuture(future) 285 else: 286 return self.runner.run(self.stub.Delete, request, timeout=timeout)
The Delete operation deletes vectors from the index, from a single namespace. No error raised if the vector id does not exist. Note: for any delete call, if namespace is not specified, the default namespace is used.
Delete can occur in the following mutual exclusive ways:
- Delete by ids from a single namespace
- Delete all vectors from a single namespace by setting delete_all to True
- Delete all vectors from a single namespace by specifying a metadata filter (note that for this option delete all must be set to False)
Examples:
>>> index.delete(ids=['id1', 'id2'], namespace='my_namespace') >>> index.delete(delete_all=True, namespace='my_namespace') >>> index.delete(filter={'key': 'value'}, namespace='my_namespace', async_req=True)
Arguments:
- ids (List[str]): Vector ids to delete [optional]
- delete_all (bool): This indicates that all vectors in the index namespace should be deleted.. [optional] Default is False.
- namespace (str): The namespace to delete vectors from [optional] If not specified, the default namespace is used.
- filter (Dict[str, Union[str, float, int, bool, List, dict]]): If specified, the metadata filter here will be used to select the vectors to delete. This is mutually exclusive with specifying ids to delete in the ids param or using delete_all=True. See https://www.pinecone.io/docs/metadata-filtering/.. [optional]
- async_req (bool): If True, the delete operation will be performed asynchronously. Defaults to False. [optional]
Returns: DeleteResponse (contains no data) or a PineconeGrpcFuture object if async_req is True.
288 def fetch( 289 self, 290 ids: Optional[List[str]], 291 namespace: Optional[str] = None, 292 async_req: Optional[bool] = False, 293 **kwargs, 294 ) -> Union[FetchResponse, PineconeGrpcFuture]: 295 """ 296 The fetch operation looks up and returns vectors, by ID, from a single namespace. 297 The returned vectors include the vector data and/or metadata. 298 299 Examples: 300 >>> index.fetch(ids=['id1', 'id2'], namespace='my_namespace') 301 >>> index.fetch(ids=['id1', 'id2']) 302 303 Args: 304 ids (List[str]): The vector IDs to fetch. 305 namespace (str): The namespace to fetch vectors from. 306 If not specified, the default namespace is used. [optional] 307 308 Returns: FetchResponse object which contains the list of Vector objects, and namespace name. 309 """ 310 timeout = kwargs.pop("timeout", None) 311 312 args_dict = self._parse_non_empty_args([("namespace", namespace)]) 313 314 request = FetchRequest(ids=ids, **args_dict, **kwargs) 315 316 if async_req: 317 future = self.runner.run(self.stub.Fetch.future, request, timeout=timeout) 318 return PineconeGrpcFuture(future, result_transformer=parse_fetch_response) 319 else: 320 response = self.runner.run(self.stub.Fetch, request, timeout=timeout) 321 return parse_fetch_response(response)
The fetch operation looks up and returns vectors, by ID, from a single namespace. The returned vectors include the vector data and/or metadata.
Examples:
>>> index.fetch(ids=['id1', 'id2'], namespace='my_namespace') >>> index.fetch(ids=['id1', 'id2'])
Arguments:
- ids (List[str]): The vector IDs to fetch.
- namespace (str): The namespace to fetch vectors from. If not specified, the default namespace is used. [optional]
Returns: FetchResponse object which contains the list of Vector objects, and namespace name.
323 def query( 324 self, 325 vector: Optional[List[float]] = None, 326 id: Optional[str] = None, 327 namespace: Optional[str] = None, 328 top_k: Optional[int] = None, 329 filter: Optional[Dict[str, Union[str, float, int, bool, List, dict]]] = None, 330 include_values: Optional[bool] = None, 331 include_metadata: Optional[bool] = None, 332 sparse_vector: Optional[Union[GRPCSparseValues, SparseVectorTypedDict]] = None, 333 async_req: Optional[bool] = False, 334 **kwargs, 335 ) -> Union[QueryResponse, PineconeGrpcFuture]: 336 """ 337 The Query operation searches a namespace, using a query vector. 338 It retrieves the ids of the most similar items in a namespace, along with their similarity scores. 339 340 Examples: 341 >>> index.query(vector=[1, 2, 3], top_k=10, namespace='my_namespace') 342 >>> index.query(id='id1', top_k=10, namespace='my_namespace') 343 >>> index.query(vector=[1, 2, 3], top_k=10, namespace='my_namespace', filter={'key': 'value'}) 344 >>> index.query(id='id1', top_k=10, namespace='my_namespace', include_metadata=True, include_values=True) 345 >>> index.query(vector=[1, 2, 3], sparse_vector={'indices': [1, 2], 'values': [0.2, 0.4]}, 346 >>> top_k=10, namespace='my_namespace') 347 >>> index.query(vector=[1, 2, 3], sparse_vector=GRPCSparseValues([1, 2], [0.2, 0.4]), 348 >>> top_k=10, namespace='my_namespace') 349 350 Args: 351 vector (List[float]): The query vector. This should be the same length as the dimension of the index 352 being queried. Each `query()` request can contain only one of the parameters 353 `id` or `vector`.. [optional] 354 id (str): The unique ID of the vector to be used as a query vector. 355 Each `query()` request can contain only one of the parameters 356 `vector` or `id`.. [optional] 357 top_k (int): The number of results to return for each query. Must be an integer greater than 1. 358 namespace (str): The namespace to fetch vectors from. 359 If not specified, the default namespace is used. [optional] 360 filter (Dict[str, Union[str, float, int, bool, List, dict]]): 361 The filter to apply. You can use vector metadata to limit your search. 362 See https://www.pinecone.io/docs/metadata-filtering/.. [optional] 363 include_values (bool): Indicates whether vector values are included in the response. 364 If omitted the server will use the default value of False [optional] 365 include_metadata (bool): Indicates whether metadata is included in the response as well as the ids. 366 If omitted the server will use the default value of False [optional] 367 sparse_vector: (Union[SparseValues, Dict[str, Union[List[float], List[int]]]]): sparse values of the query vector. 368 Expected to be either a GRPCSparseValues object or a dict of the form: 369 {'indices': List[int], 'values': List[float]}, where the lists each have the same length. 370 371 Returns: QueryResponse object which contains the list of the closest vectors as ScoredVector objects, 372 and namespace name. 373 """ 374 375 if vector is not None and id is not None: 376 raise ValueError("Cannot specify both `id` and `vector`") 377 378 if filter is not None: 379 filter_struct = dict_to_proto_struct(filter) 380 else: 381 filter_struct = None 382 383 sparse_vector = self._parse_sparse_values_arg(sparse_vector) 384 args_dict = self._parse_non_empty_args( 385 [ 386 ("vector", vector), 387 ("id", id), 388 ("namespace", namespace), 389 ("top_k", top_k), 390 ("filter", filter_struct), 391 ("include_values", include_values), 392 ("include_metadata", include_metadata), 393 ("sparse_vector", sparse_vector), 394 ] 395 ) 396 397 request = QueryRequest(**args_dict) 398 399 timeout = kwargs.pop("timeout", None) 400 401 if async_req: 402 future = self.runner.run(self.stub.Query.future, request, timeout=timeout) 403 return PineconeGrpcFuture(future) 404 else: 405 response = self.runner.run(self.stub.Query, request, timeout=timeout) 406 json_response = json_format.MessageToDict(response) 407 return parse_query_response(json_response, _check_type=False)
The Query operation searches a namespace, using a query vector. It retrieves the ids of the most similar items in a namespace, along with their similarity scores.
Examples:
>>> index.query(vector=[1, 2, 3], top_k=10, namespace='my_namespace') >>> index.query(id='id1', top_k=10, namespace='my_namespace') >>> index.query(vector=[1, 2, 3], top_k=10, namespace='my_namespace', filter={'key': 'value'}) >>> index.query(id='id1', top_k=10, namespace='my_namespace', include_metadata=True, include_values=True) >>> index.query(vector=[1, 2, 3], sparse_vector={'indices': [1, 2], 'values': [0.2, 0.4]}, >>> top_k=10, namespace='my_namespace') >>> index.query(vector=[1, 2, 3], sparse_vector=GRPCSparseValues([1, 2], [0.2, 0.4]), >>> top_k=10, namespace='my_namespace')
Arguments:
- vector (List[float]): The query vector. This should be the same length as the dimension of the index
being queried. Each
query()
request can contain only one of the parametersid
orvector
.. [optional] - id (str): The unique ID of the vector to be used as a query vector.
Each
query()
request can contain only one of the parametersvector
orid
.. [optional] - top_k (int): The number of results to return for each query. Must be an integer greater than 1.
- namespace (str): The namespace to fetch vectors from. If not specified, the default namespace is used. [optional]
- filter (Dict[str, Union[str, float, int, bool, List, dict]]): The filter to apply. You can use vector metadata to limit your search. See https://www.pinecone.io/docs/metadata-filtering/.. [optional]
- include_values (bool): Indicates whether vector values are included in the response. If omitted the server will use the default value of False [optional]
- include_metadata (bool): Indicates whether metadata is included in the response as well as the ids. If omitted the server will use the default value of False [optional]
- sparse_vector: (Union[SparseValues, Dict[str, Union[List[float], List[int]]]]): sparse values of the query vector. Expected to be either a GRPCSparseValues object or a dict of the form: {'indices': List[int], 'values': List[float]}, where the lists each have the same length.
Returns: QueryResponse object which contains the list of the closest vectors as ScoredVector objects, and namespace name.
409 def query_namespaces( 410 self, 411 vector: List[float], 412 namespaces: List[str], 413 top_k: Optional[int] = None, 414 filter: Optional[Dict[str, Union[str, float, int, bool, List, dict]]] = None, 415 include_values: Optional[bool] = None, 416 include_metadata: Optional[bool] = None, 417 sparse_vector: Optional[Union[GRPCSparseValues, SparseVectorTypedDict]] = None, 418 **kwargs, 419 ) -> QueryNamespacesResults: 420 if namespaces is None or len(namespaces) == 0: 421 raise ValueError("At least one namespace must be specified") 422 if len(vector) == 0: 423 raise ValueError("Query vector must not be empty") 424 425 overall_topk = top_k if top_k is not None else 10 426 aggregator = QueryResultsAggregator(top_k=overall_topk) 427 428 target_namespaces = set(namespaces) # dedup namespaces 429 futures = [ 430 self.threadpool_executor.submit( 431 self.query, 432 vector=vector, 433 namespace=ns, 434 top_k=overall_topk, 435 filter=filter, 436 include_values=include_values, 437 include_metadata=include_metadata, 438 sparse_vector=sparse_vector, 439 async_req=False, 440 **kwargs, 441 ) 442 for ns in target_namespaces 443 ] 444 445 only_futures = cast(Iterable[Future], futures) 446 for response in as_completed(only_futures): 447 aggregator.add_results(response.result()) 448 449 final_results = aggregator.get_results() 450 return final_results
452 def update( 453 self, 454 id: str, 455 async_req: bool = False, 456 values: Optional[List[float]] = None, 457 set_metadata: Optional[ 458 Dict[str, Union[str, float, int, bool, List[int], List[float], List[str]]] 459 ] = None, 460 namespace: Optional[str] = None, 461 sparse_values: Optional[Union[GRPCSparseValues, SparseVectorTypedDict]] = None, 462 **kwargs, 463 ) -> Union[UpdateResponse, PineconeGrpcFuture]: 464 """ 465 The Update operation updates vector in a namespace. 466 If a value is included, it will overwrite the previous value. 467 If a set_metadata is included, 468 the values of the fields specified in it will be added or overwrite the previous value. 469 470 Examples: 471 >>> index.update(id='id1', values=[1, 2, 3], namespace='my_namespace') 472 >>> index.update(id='id1', set_metadata={'key': 'value'}, namespace='my_namespace', async_req=True) 473 >>> index.update(id='id1', values=[1, 2, 3], sparse_values={'indices': [1, 2], 'values': [0.2, 0.4]}, 474 >>> namespace='my_namespace') 475 >>> index.update(id='id1', values=[1, 2, 3], sparse_values=GRPCSparseValues(indices=[1, 2], values=[0.2, 0.4]), 476 >>> namespace='my_namespace') 477 478 Args: 479 id (str): Vector's unique id. 480 async_req (bool): If True, the update operation will be performed asynchronously. 481 Defaults to False. [optional] 482 values (List[float]): vector values to set. [optional] 483 set_metadata (Dict[str, Union[str, float, int, bool, List[int], List[float], List[str]]]]): 484 metadata to set for vector. [optional] 485 namespace (str): Namespace name where to update the vector.. [optional] 486 sparse_values: (Dict[str, Union[List[float], List[int]]]): sparse values to update for the vector. 487 Expected to be either a GRPCSparseValues object or a dict of the form: 488 {'indices': List[int], 'values': List[float]} where the lists each have the same length. 489 490 491 Returns: UpdateResponse (contains no data) or a PineconeGrpcFuture object if async_req is True. 492 """ 493 if set_metadata is not None: 494 set_metadata_struct = dict_to_proto_struct(set_metadata) 495 else: 496 set_metadata_struct = None 497 498 timeout = kwargs.pop("timeout", None) 499 sparse_values = self._parse_sparse_values_arg(sparse_values) 500 args_dict = self._parse_non_empty_args( 501 [ 502 ("values", values), 503 ("set_metadata", set_metadata_struct), 504 ("namespace", namespace), 505 ("sparse_values", sparse_values), 506 ] 507 ) 508 509 request = UpdateRequest(id=id, **args_dict) 510 if async_req: 511 future = self.runner.run(self.stub.Update.future, request, timeout=timeout) 512 return PineconeGrpcFuture(future) 513 else: 514 return self.runner.run(self.stub.Update, request, timeout=timeout)
The Update operation updates vector in a namespace. If a value is included, it will overwrite the previous value. If a set_metadata is included, the values of the fields specified in it will be added or overwrite the previous value.
Examples:
>>> index.update(id='id1', values=[1, 2, 3], namespace='my_namespace') >>> index.update(id='id1', set_metadata={'key': 'value'}, namespace='my_namespace', async_req=True) >>> index.update(id='id1', values=[1, 2, 3], sparse_values={'indices': [1, 2], 'values': [0.2, 0.4]}, >>> namespace='my_namespace') >>> index.update(id='id1', values=[1, 2, 3], sparse_values=GRPCSparseValues(indices=[1, 2], values=[0.2, 0.4]), >>> namespace='my_namespace')
Arguments:
- id (str): Vector's unique id.
- async_req (bool): If True, the update operation will be performed asynchronously. Defaults to False. [optional]
- values (List[float]): vector values to set. [optional]
- set_metadata (Dict[str, Union[str, float, int, bool, List[int], List[float], List[str]]]]): metadata to set for vector. [optional]
- namespace (str): Namespace name where to update the vector.. [optional]
- sparse_values: (Dict[str, Union[List[float], List[int]]]): sparse values to update for the vector. Expected to be either a GRPCSparseValues object or a dict of the form: {'indices': List[int], 'values': List[float]} where the lists each have the same length.
Returns: UpdateResponse (contains no data) or a PineconeGrpcFuture object if async_req is True.
516 def list_paginated( 517 self, 518 prefix: Optional[str] = None, 519 limit: Optional[int] = None, 520 pagination_token: Optional[str] = None, 521 namespace: Optional[str] = None, 522 **kwargs, 523 ) -> SimpleListResponse: 524 """ 525 The list_paginated operation finds vectors based on an id prefix within a single namespace. 526 It returns matching ids in a paginated form, with a pagination token to fetch the next page of results. 527 This id list can then be passed to fetch or delete operations, depending on your use case. 528 529 Consider using the `list` method to avoid having to handle pagination tokens manually. 530 531 Examples: 532 >>> results = index.list_paginated(prefix='99', limit=5, namespace='my_namespace') 533 >>> [v.id for v in results.vectors] 534 ['99', '990', '991', '992', '993'] 535 >>> results.pagination.next 536 eyJza2lwX3Bhc3QiOiI5OTMiLCJwcmVmaXgiOiI5OSJ9 537 >>> next_results = index.list_paginated(prefix='99', limit=5, namespace='my_namespace', pagination_token=results.pagination.next) 538 539 Args: 540 prefix (Optional[str]): The id prefix to match. If unspecified, an empty string prefix will 541 be used with the effect of listing all ids in a namespace [optional] 542 limit (Optional[int]): The maximum number of ids to return. If unspecified, the server will use a default value. [optional] 543 pagination_token (Optional[str]): A token needed to fetch the next page of results. This token is returned 544 in the response if additional results are available. [optional] 545 namespace (Optional[str]): The namespace to fetch vectors from. If not specified, the default namespace is used. [optional] 546 547 Returns: SimpleListResponse object which contains the list of ids, the namespace name, pagination information, and usage showing the number of read_units consumed. 548 """ 549 args_dict = self._parse_non_empty_args( 550 [ 551 ("prefix", prefix), 552 ("limit", limit), 553 ("namespace", namespace), 554 ("pagination_token", pagination_token), 555 ] 556 ) 557 request = ListRequest(**args_dict, **kwargs) 558 timeout = kwargs.pop("timeout", None) 559 response = self.runner.run(self.stub.List, request, timeout=timeout) 560 561 if response.pagination and response.pagination.next != "": 562 pagination = Pagination(next=response.pagination.next) 563 else: 564 pagination = None 565 566 return SimpleListResponse( 567 namespace=response.namespace, vectors=response.vectors, pagination=pagination 568 )
The list_paginated operation finds vectors based on an id prefix within a single namespace. It returns matching ids in a paginated form, with a pagination token to fetch the next page of results. This id list can then be passed to fetch or delete operations, depending on your use case.
Consider using the list
method to avoid having to handle pagination tokens manually.
Examples:
>>> results = index.list_paginated(prefix='99', limit=5, namespace='my_namespace') >>> [v.id for v in results.vectors] ['99', '990', '991', '992', '993'] >>> results.pagination.next eyJza2lwX3Bhc3QiOiI5OTMiLCJwcmVmaXgiOiI5OSJ9 >>> next_results = index.list_paginated(prefix='99', limit=5, namespace='my_namespace', pagination_token=results.pagination.next)
Arguments:
- prefix (Optional[str]): The id prefix to match. If unspecified, an empty string prefix will be used with the effect of listing all ids in a namespace [optional]
- limit (Optional[int]): The maximum number of ids to return. If unspecified, the server will use a default value. [optional]
- pagination_token (Optional[str]): A token needed to fetch the next page of results. This token is returned in the response if additional results are available. [optional]
- namespace (Optional[str]): The namespace to fetch vectors from. If not specified, the default namespace is used. [optional]
Returns: SimpleListResponse object which contains the list of ids, the namespace name, pagination information, and usage showing the number of read_units consumed.
570 def list(self, **kwargs): 571 """ 572 The list operation accepts all of the same arguments as list_paginated, and returns a generator that yields 573 a list of the matching vector ids in each page of results. It automatically handles pagination tokens on your 574 behalf. 575 576 Examples: 577 >>> for ids in index.list(prefix='99', limit=5, namespace='my_namespace'): 578 >>> print(ids) 579 ['99', '990', '991', '992', '993'] 580 ['994', '995', '996', '997', '998'] 581 ['999'] 582 583 Args: 584 prefix (Optional[str]): The id prefix to match. If unspecified, an empty string prefix will 585 be used with the effect of listing all ids in a namespace [optional] 586 limit (Optional[int]): The maximum number of ids to return. If unspecified, the server will use a default value. [optional] 587 pagination_token (Optional[str]): A token needed to fetch the next page of results. This token is returned 588 in the response if additional results are available. [optional] 589 namespace (Optional[str]): The namespace to fetch vectors from. If not specified, the default namespace is used. [optional] 590 """ 591 done = False 592 while not done: 593 try: 594 results = self.list_paginated(**kwargs) 595 except Exception as e: 596 raise e 597 598 if len(results.vectors) > 0: 599 yield [v.id for v in results.vectors] 600 601 if results.pagination and results.pagination.next: 602 kwargs.update({"pagination_token": results.pagination.next}) 603 else: 604 done = True
The list operation accepts all of the same arguments as list_paginated, and returns a generator that yields a list of the matching vector ids in each page of results. It automatically handles pagination tokens on your behalf.
Examples:
>>> for ids in index.list(prefix='99', limit=5, namespace='my_namespace'): >>> print(ids) ['99', '990', '991', '992', '993'] ['994', '995', '996', '997', '998'] ['999']
Arguments:
- prefix (Optional[str]): The id prefix to match. If unspecified, an empty string prefix will be used with the effect of listing all ids in a namespace [optional]
- limit (Optional[int]): The maximum number of ids to return. If unspecified, the server will use a default value. [optional]
- pagination_token (Optional[str]): A token needed to fetch the next page of results. This token is returned in the response if additional results are available. [optional]
- namespace (Optional[str]): The namespace to fetch vectors from. If not specified, the default namespace is used. [optional]
606 def describe_index_stats( 607 self, filter: Optional[Dict[str, Union[str, float, int, bool, List, dict]]] = None, **kwargs 608 ) -> DescribeIndexStatsResponse: 609 """ 610 The DescribeIndexStats operation returns statistics about the index's contents. 611 For example: The vector count per namespace and the number of dimensions. 612 613 Examples: 614 >>> index.describe_index_stats() 615 >>> index.describe_index_stats(filter={'key': 'value'}) 616 617 Args: 618 filter (Dict[str, Union[str, float, int, bool, List, dict]]): 619 If this parameter is present, the operation only returns statistics for vectors that satisfy the filter. 620 See https://www.pinecone.io/docs/metadata-filtering/.. [optional] 621 622 Returns: DescribeIndexStatsResponse object which contains stats about the index. 623 """ 624 if filter is not None: 625 filter_struct = dict_to_proto_struct(filter) 626 else: 627 filter_struct = None 628 args_dict = self._parse_non_empty_args([("filter", filter_struct)]) 629 timeout = kwargs.pop("timeout", None) 630 631 request = DescribeIndexStatsRequest(**args_dict) 632 response = self.runner.run(self.stub.DescribeIndexStats, request, timeout=timeout) 633 json_response = json_format.MessageToDict(response) 634 return parse_stats_response(json_response)
The DescribeIndexStats operation returns statistics about the index's contents. For example: The vector count per namespace and the number of dimensions.
Examples:
>>> index.describe_index_stats() >>> index.describe_index_stats(filter={'key': 'value'})
Arguments:
- filter (Dict[str, Union[str, float, int, bool, List, dict]]):
- If this parameter is present, the operation only returns statistics for vectors that satisfy the filter.
- See https: //www.pinecone.io/docs/metadata-filtering/.. [optional]
Returns: DescribeIndexStatsResponse object which contains stats about the index.
Inherited Members
- pinecone.grpc.base.GRPCIndexBase
- GRPCIndexBase
- config
- grpc_client_config
- pool_threads
- runner
- channel_factory
- stub
- threadpool_executor
- channel
- grpc_server_on
- close
7class PineconeGRPC(Pinecone): 8 """ 9 An alternative version of the Pinecone client that uses gRPC instead of HTTP for 10 data operations. 11 12 ### Installing the gRPC client 13 14 You must install extra dependencies in order to install the GRPC client. 15 16 #### Installing with pip 17 18 ```bash 19 # Install the latest version 20 pip3 install pinecone[grpc] 21 22 # Install a specific version 23 pip3 install "pinecone[grpc]"==3.0.0 24 ``` 25 26 #### Installing with poetry 27 28 ```bash 29 # Install the latest version 30 poetry add pinecone --extras grpc 31 32 # Install a specific version 33 poetry add pinecone==3.0.0 --extras grpc 34 ``` 35 36 ### Using the gRPC client 37 38 ```python 39 import os 40 from pinecone.grpc import PineconeGRPC 41 42 client = PineconeGRPC(api_key=os.environ.get("PINECONE_API_KEY")) 43 44 # From this point on, usage is identical to the HTTP client. 45 index = client.Index("my-index", host=os.environ("PINECONE_INDEX_HOST")) 46 index.query(...) 47 ``` 48 49 """ 50 51 def Index(self, name: str = "", host: str = "", **kwargs): 52 """ 53 Target an index for data operations. 54 55 ### Target an index by host url 56 57 In production situations, you want to uspert or query your data as quickly 58 as possible. If you know in advance the host url of your index, you can 59 eliminate a round trip to the Pinecone control plane by specifying the 60 host of the index. 61 62 ```python 63 import os 64 from pinecone.grpc import PineconeGRPC 65 66 api_key = os.environ.get("PINECONE_API_KEY") 67 index_host = os.environ.get("PINECONE_INDEX_HOST") 68 69 pc = PineconeGRPC(api_key=api_key) 70 index = pc.Index(host=index_host) 71 72 # Now you're ready to perform data operations 73 index.query(vector=[...], top_k=10) 74 ``` 75 76 To find your host url, you can use the Pinecone control plane to describe 77 the index. The host url is returned in the response. Or, alternatively, the 78 host is displayed in the Pinecone web console. 79 80 ```python 81 import os 82 from pinecone import Pinecone 83 84 pc = Pinecone( 85 api_key=os.environ.get("PINECONE_API_KEY") 86 ) 87 88 host = pc.describe_index('index-name').host 89 ``` 90 91 ### Target an index by name (not recommended for production) 92 93 For more casual usage, such as when you are playing and exploring with Pinecone 94 in a notebook setting, you can also target an index by name. If you use this 95 approach, the client may need to perform an extra call to the Pinecone control 96 plane to get the host url on your behalf to get the index host. 97 98 The client will cache the index host for future use whenever it is seen, so you 99 will only incur the overhead of only one call. But this approach is not 100 recommended for production usage. 101 102 ```python 103 import os 104 from pinecone import ServerlessSpec 105 from pinecone.grpc import PineconeGRPC 106 107 api_key = os.environ.get("PINECONE_API_KEY") 108 109 pc = PineconeGRPC(api_key=api_key) 110 pc.create_index( 111 name='my-index', 112 dimension=1536, 113 metric='cosine', 114 spec=ServerlessSpec(cloud='aws', region='us-west-2') 115 ) 116 index = pc.Index('my-index') 117 118 # Now you're ready to perform data operations 119 index.query(vector=[...], top_k=10) 120 ``` 121 """ 122 if name == "" and host == "": 123 raise ValueError("Either name or host must be specified") 124 125 # Use host if it is provided, otherwise get host from describe_index 126 index_host = host or self.index_host_store.get_host(self.index_api, self.config, name) 127 128 pt = kwargs.pop("pool_threads", None) or self.pool_threads 129 130 config = ConfigBuilder.build( 131 api_key=self.config.api_key, 132 host=index_host, 133 source_tag=self.config.source_tag, 134 proxy_url=self.config.proxy_url, 135 ssl_ca_certs=self.config.ssl_ca_certs, 136 ) 137 return GRPCIndex(index_name=name, config=config, pool_threads=pt, **kwargs)
An alternative version of the Pinecone client that uses gRPC instead of HTTP for data operations.
Installing the gRPC client
You must install extra dependencies in order to install the GRPC client.
Installing with pip
# Install the latest version
pip3 install pinecone[grpc]
# Install a specific version
pip3 install "pinecone[grpc]"==3.0.0
Installing with poetry
# Install the latest version
poetry add pinecone --extras grpc
# Install a specific version
poetry add pinecone==3.0.0 --extras grpc
Using the gRPC client
import os
from pinecone.grpc import PineconeGRPC
client = PineconeGRPC(api_key=os.environ.get("PINECONE_API_KEY"))
# From this point on, usage is identical to the HTTP client.
index = client.Index("my-index", host=os.environ("PINECONE_INDEX_HOST"))
index.query(...)
51 def Index(self, name: str = "", host: str = "", **kwargs): 52 """ 53 Target an index for data operations. 54 55 ### Target an index by host url 56 57 In production situations, you want to uspert or query your data as quickly 58 as possible. If you know in advance the host url of your index, you can 59 eliminate a round trip to the Pinecone control plane by specifying the 60 host of the index. 61 62 ```python 63 import os 64 from pinecone.grpc import PineconeGRPC 65 66 api_key = os.environ.get("PINECONE_API_KEY") 67 index_host = os.environ.get("PINECONE_INDEX_HOST") 68 69 pc = PineconeGRPC(api_key=api_key) 70 index = pc.Index(host=index_host) 71 72 # Now you're ready to perform data operations 73 index.query(vector=[...], top_k=10) 74 ``` 75 76 To find your host url, you can use the Pinecone control plane to describe 77 the index. The host url is returned in the response. Or, alternatively, the 78 host is displayed in the Pinecone web console. 79 80 ```python 81 import os 82 from pinecone import Pinecone 83 84 pc = Pinecone( 85 api_key=os.environ.get("PINECONE_API_KEY") 86 ) 87 88 host = pc.describe_index('index-name').host 89 ``` 90 91 ### Target an index by name (not recommended for production) 92 93 For more casual usage, such as when you are playing and exploring with Pinecone 94 in a notebook setting, you can also target an index by name. If you use this 95 approach, the client may need to perform an extra call to the Pinecone control 96 plane to get the host url on your behalf to get the index host. 97 98 The client will cache the index host for future use whenever it is seen, so you 99 will only incur the overhead of only one call. But this approach is not 100 recommended for production usage. 101 102 ```python 103 import os 104 from pinecone import ServerlessSpec 105 from pinecone.grpc import PineconeGRPC 106 107 api_key = os.environ.get("PINECONE_API_KEY") 108 109 pc = PineconeGRPC(api_key=api_key) 110 pc.create_index( 111 name='my-index', 112 dimension=1536, 113 metric='cosine', 114 spec=ServerlessSpec(cloud='aws', region='us-west-2') 115 ) 116 index = pc.Index('my-index') 117 118 # Now you're ready to perform data operations 119 index.query(vector=[...], top_k=10) 120 ``` 121 """ 122 if name == "" and host == "": 123 raise ValueError("Either name or host must be specified") 124 125 # Use host if it is provided, otherwise get host from describe_index 126 index_host = host or self.index_host_store.get_host(self.index_api, self.config, name) 127 128 pt = kwargs.pop("pool_threads", None) or self.pool_threads 129 130 config = ConfigBuilder.build( 131 api_key=self.config.api_key, 132 host=index_host, 133 source_tag=self.config.source_tag, 134 proxy_url=self.config.proxy_url, 135 ssl_ca_certs=self.config.ssl_ca_certs, 136 ) 137 return GRPCIndex(index_name=name, config=config, pool_threads=pt, **kwargs)
Target an index for data operations.
Target an index by host url
In production situations, you want to uspert or query your data as quickly as possible. If you know in advance the host url of your index, you can eliminate a round trip to the Pinecone control plane by specifying the host of the index.
import os
from pinecone.grpc import PineconeGRPC
api_key = os.environ.get("PINECONE_API_KEY")
index_host = os.environ.get("PINECONE_INDEX_HOST")
pc = PineconeGRPC(api_key=api_key)
index = pc.Index(host=index_host)
# Now you're ready to perform data operations
index.query(vector=[...], top_k=10)
To find your host url, you can use the Pinecone control plane to describe the index. The host url is returned in the response. Or, alternatively, the host is displayed in the Pinecone web console.
import os
from pinecone import Pinecone
pc = Pinecone(
api_key=os.environ.get("PINECONE_API_KEY")
)
host = pc.describe_index('index-name').host
Target an index by name (not recommended for production)
For more casual usage, such as when you are playing and exploring with Pinecone in a notebook setting, you can also target an index by name. If you use this approach, the client may need to perform an extra call to the Pinecone control plane to get the host url on your behalf to get the index host.
The client will cache the index host for future use whenever it is seen, so you will only incur the overhead of only one call. But this approach is not recommended for production usage.
import os
from pinecone import ServerlessSpec
from pinecone.grpc import PineconeGRPC
api_key = os.environ.get("PINECONE_API_KEY")
pc = PineconeGRPC(api_key=api_key)
pc.create_index(
name='my-index',
dimension=1536,
metric='cosine',
spec=ServerlessSpec(cloud='aws', region='us-west-2')
)
index = pc.Index('my-index')
# Now you're ready to perform data operations
index.query(vector=[...], top_k=10)
6class GRPCClientConfig(NamedTuple): 7 """ 8 GRPC client configuration options. 9 10 :param secure: Whether to use encrypted protocol (SSL). defaults to True. 11 :type secure: bool, optional 12 :param timeout: defaults to 2 seconds. Fail if gateway doesn't receive response within timeout. 13 :type timeout: int, optional 14 :param conn_timeout: defaults to 1. Timeout to retry connection if gRPC is unavailable. 0 is no retry. 15 :type conn_timeout: int, optional 16 :param reuse_channel: Whether to reuse the same grpc channel for multiple requests 17 :type reuse_channel: bool, optional 18 :param retry_config: RetryConfig indicating how requests should be retried 19 :type retry_config: RetryConfig, optional 20 :param grpc_channel_options: A dict of gRPC channel arguments 21 :type grpc_channel_options: Dict[str, str] 22 :param additional_metadata: Additional metadata to be sent to the server with each request. Note that this 23 metadata refers to [gRPC metadata](https://grpc.io/docs/guides/metadata/) which is a concept similar 24 to HTTP headers. This is unrelated to the metadata can be stored with a vector in the index. 25 :type additional_metadata: Dict[str, str] 26 """ 27 28 secure: bool = True 29 timeout: int = 20 30 conn_timeout: int = 1 31 reuse_channel: bool = True 32 retry_config: Optional[RetryConfig] = None 33 grpc_channel_options: Optional[Dict[str, str]] = None 34 additional_metadata: Optional[Dict[str, str]] = None 35 36 @classmethod 37 def _from_dict(cls, kwargs: dict): 38 cls_kwargs = {kk: vv for kk, vv in kwargs.items() if kk in cls._fields} 39 return cls(**cls_kwargs)
GRPC client configuration options.
Parameters
- secure: Whether to use encrypted protocol (SSL). defaults to True.
- timeout: defaults to 2 seconds. Fail if gateway doesn't receive response within timeout.
- conn_timeout: defaults to 1. Timeout to retry connection if gRPC is unavailable. 0 is no retry.
- reuse_channel: Whether to reuse the same grpc channel for multiple requests
- retry_config: RetryConfig indicating how requests should be retried
- grpc_channel_options: A dict of gRPC channel arguments
- additional_metadata: Additional metadata to be sent to the server with each request. Note that this metadata refers to gRPC metadata which is a concept similar to HTTP headers. This is unrelated to the metadata can be stored with a vector in the index.
Create new instance of GRPCClientConfig(secure, timeout, conn_timeout, reuse_channel, retry_config, grpc_channel_options, additional_metadata)
Inherited Members
- builtins.tuple
- index
- count
A ProtocolMessage
Inherited Members
- google._upb._message.Message
- Message
- MergeFrom
- CopyFrom
- Clear
- SetInParent
- IsInitialized
- MergeFromString
- ParseFromString
- SerializeToString
- SerializePartialToString
- ListFields
- HasField
- ClearField
- WhichOneof
- HasExtension
- ClearExtension
- UnknownFields
- DiscardUnknownFields
- ByteSize
- FromString
- FindInitializationErrors
- Extensions
- google.protobuf.message.Message
- RegisterExtension
A ProtocolMessage
Inherited Members
- google._upb._message.Message
- Message
- MergeFrom
- CopyFrom
- Clear
- SetInParent
- IsInitialized
- MergeFromString
- ParseFromString
- SerializeToString
- SerializePartialToString
- ListFields
- HasField
- ClearField
- WhichOneof
- HasExtension
- ClearExtension
- UnknownFields
- DiscardUnknownFields
- ByteSize
- FromString
- FindInitializationErrors
- Extensions
- google.protobuf.message.Message
- RegisterExtension
8class PineconeGrpcFuture(ConcurrentFuture): 9 def __init__( 10 self, grpc_future: GrpcFuture, timeout: Optional[int] = None, result_transformer=None 11 ): 12 super().__init__() 13 self._grpc_future = grpc_future 14 self._result_transformer = result_transformer 15 if timeout is not None: 16 self._default_timeout = timeout # seconds 17 else: 18 self._default_timeout = 5 # seconds 19 20 # Sync initial state, in case the gRPC future is already done 21 self._sync_state(self._grpc_future) 22 23 # Add callback to subscribe to updates from the gRPC future 24 self._grpc_future.add_done_callback(self._sync_state) 25 26 @property 27 def grpc_future(self): 28 return self._grpc_future 29 30 def _sync_state(self, grpc_future): 31 if self.done(): 32 return 33 34 if grpc_future.cancelled(): 35 self.cancel() 36 elif grpc_future.exception(timeout=self._default_timeout): 37 self.set_exception(grpc_future.exception()) 38 elif grpc_future.done(): 39 try: 40 result = grpc_future.result(timeout=self._default_timeout) 41 self.set_result(result) 42 except Exception as e: 43 self.set_exception(e) 44 elif grpc_future.running(): 45 self.set_running_or_notify_cancel() 46 47 def set_result(self, result): 48 if self._result_transformer: 49 result = self._result_transformer(result) 50 return super().set_result(result) 51 52 def cancel(self): 53 self._grpc_future.cancel() 54 return super().cancel() 55 56 def exception(self, timeout=None): 57 exception = super().exception(timeout=self._timeout(timeout)) 58 if isinstance(exception, RpcError): 59 return self._wrap_rpc_exception(exception) 60 return exception 61 62 def traceback(self, timeout=None): 63 # This is not part of the ConcurrentFuture interface, but keeping it for 64 # backward compatibility 65 return self._grpc_future.traceback(timeout=self._timeout(timeout)) 66 67 def result(self, timeout=None): 68 try: 69 return super().result(timeout=self._timeout(timeout)) 70 except RpcError as e: 71 raise self._wrap_rpc_exception(e) from e 72 73 def _timeout(self, timeout: Optional[int] = None) -> int: 74 if timeout is not None: 75 return timeout 76 else: 77 return self._default_timeout 78 79 def _wrap_rpc_exception(self, e): 80 if e._state and e._state.debug_error_string: 81 return PineconeException(e._state.debug_error_string) 82 else: 83 return PineconeException("Unknown GRPC error") 84 85 def __del__(self): 86 self._grpc_future.cancel() 87 self = None # release the reference to the grpc future
Represents the result of an asynchronous computation.
9 def __init__( 10 self, grpc_future: GrpcFuture, timeout: Optional[int] = None, result_transformer=None 11 ): 12 super().__init__() 13 self._grpc_future = grpc_future 14 self._result_transformer = result_transformer 15 if timeout is not None: 16 self._default_timeout = timeout # seconds 17 else: 18 self._default_timeout = 5 # seconds 19 20 # Sync initial state, in case the gRPC future is already done 21 self._sync_state(self._grpc_future) 22 23 # Add callback to subscribe to updates from the gRPC future 24 self._grpc_future.add_done_callback(self._sync_state)
Initializes the future. Should not be called by clients.
47 def set_result(self, result): 48 if self._result_transformer: 49 result = self._result_transformer(result) 50 return super().set_result(result)
Sets the return value of work associated with the future.
Should only be used by Executor implementations and unit tests.
Cancel the future if possible.
Returns True if the future was cancelled, False otherwise. A future cannot be cancelled if it is running or has already completed.
56 def exception(self, timeout=None): 57 exception = super().exception(timeout=self._timeout(timeout)) 58 if isinstance(exception, RpcError): 59 return self._wrap_rpc_exception(exception) 60 return exception
Return the exception raised by the call that the future represents.
Arguments:
- timeout: The number of seconds to wait for the exception if the future isn't done. If None, then there is no limit on the wait time.
Returns:
The exception raised by the call that the future represents or None if the call completed without raising.
Raises:
- CancelledError: If the future was cancelled.
- TimeoutError: If the future didn't finish executing before the given timeout.
67 def result(self, timeout=None): 68 try: 69 return super().result(timeout=self._timeout(timeout)) 70 except RpcError as e: 71 raise self._wrap_rpc_exception(e) from e
Return the result of the call that the future represents.
Arguments:
- timeout: The number of seconds to wait for the result if the future isn't done. If None, then there is no limit on the wait time.
Returns:
The result of the call that the future represents.
Raises:
- CancelledError: If the future was cancelled.
- TimeoutError: If the future didn't finish executing before the given timeout.
- Exception: If the call raised then that exception will be raised.
Inherited Members
- concurrent.futures._base.Future
- cancelled
- running
- done
- add_done_callback
- set_running_or_notify_cancel
- set_exception