pinecone .grpc
Connecting to Pinecone with GRPC
The pinecone.grpc
submodule provides an alternative version of the Pinecone
client that uses gRPC instead of HTTP for data operations. This provides a
significant performance boost for data operations.
Installing the gRPC client
You must install extra dependencies in order to install the GRPC client.
Installing with pip
# Install the latest version
pip3 install pinecone[grpc]
# Install a specific version
pip3 install "pinecone[grpc]"==3.0.0
Installing with poetry
# Install the latest version
poetry add pinecone --extras grpc
# Install a specific version
poetry add pinecone==3.0.0 --extras grpc
Using the gRPC client
import os
from pinecone.grpc import PineconeGRPC
client = PineconeGRPC(api_key=os.environ.get("PINECONE_API_KEY"))
# From this point on, usage is identical to the HTTP client.
index = client.Index(host=os.environ("PINECONE_INDEX_HOST"))
index.query(vector=[...], top_k=10)
1""" 2Connecting to Pinecone with GRPC 3 4The `pinecone.grpc` submodule provides an alternative version of the Pinecone 5client that uses gRPC instead of HTTP for data operations. This provides a 6significant performance boost for data operations. 7 8### Installing the gRPC client 9 10You must install extra dependencies in order to install the GRPC client. 11 12#### Installing with pip 13 14```bash 15# Install the latest version 16pip3 install pinecone[grpc] 17 18# Install a specific version 19pip3 install "pinecone[grpc]"==3.0.0 20``` 21 22#### Installing with poetry 23 24```bash 25# Install the latest version 26poetry add pinecone --extras grpc 27 28# Install a specific version 29poetry add pinecone==3.0.0 --extras grpc 30``` 31 32### Using the gRPC client 33 34```python 35import os 36from pinecone.grpc import PineconeGRPC 37 38client = PineconeGRPC(api_key=os.environ.get("PINECONE_API_KEY")) 39 40# From this point on, usage is identical to the HTTP client. 41index = client.Index(host=os.environ("PINECONE_INDEX_HOST")) 42index.query(vector=[...], top_k=10) 43``` 44 45""" 46 47from .index_grpc import GRPCIndex 48from .pinecone import PineconeGRPC 49from .config import GRPCClientConfig 50from .future import PineconeGrpcFuture 51 52from pinecone.data.dataclasses import Vector, SparseValues 53 54from pinecone.core.grpc.protos.db_data_2025_01_pb2 import ( 55 Vector as GRPCVector, 56 SparseValues as GRPCSparseValues, 57 DeleteResponse as GRPCDeleteResponse, 58) 59 60__all__ = [ 61 "GRPCIndex", 62 "PineconeGRPC", 63 "GRPCDeleteResponse", 64 "GRPCClientConfig", 65 "GRPCVector", 66 "GRPCSparseValues", 67 "Vector", 68 "SparseValues", 69 "PineconeGrpcFuture", 70]
61class GRPCIndex(GRPCIndexBase): 62 """A client for interacting with a Pinecone index via GRPC API.""" 63 64 @property 65 def stub_class(self): 66 """@private""" 67 return VectorServiceStub 68 69 def upsert( 70 self, 71 vectors: Union[List[Vector], List[GRPCVector], List[VectorTuple], List[VectorTypedDict]], 72 async_req: bool = False, 73 namespace: Optional[str] = None, 74 batch_size: Optional[int] = None, 75 show_progress: bool = True, 76 **kwargs, 77 ) -> Union[UpsertResponse, PineconeGrpcFuture]: 78 """ 79 The upsert operation writes vectors into a namespace. 80 If a new value is upserted for an existing vector id, it will overwrite the previous value. 81 82 Examples: 83 >>> index.upsert([('id1', [1.0, 2.0, 3.0], {'key': 'value'}), 84 ('id2', [1.0, 2.0, 3.0]) 85 ], 86 namespace='ns1', async_req=True) 87 >>> index.upsert([{'id': 'id1', 'values': [1.0, 2.0, 3.0], 'metadata': {'key': 'value'}}, 88 {'id': 'id2', 89 'values': [1.0, 2.0, 3.0], 90 'sparse_values': {'indices': [1, 8], 'values': [0.2, 0.4]}, 91 ]) 92 >>> index.upsert([GRPCVector(id='id1', values=[1.0, 2.0, 3.0], metadata={'key': 'value'}), 93 GRPCVector(id='id2', values=[1.0, 2.0, 3.0]), 94 GRPCVector(id='id3', 95 values=[1.0, 2.0, 3.0], 96 sparse_values=GRPCSparseValues(indices=[1, 2], values=[0.2, 0.4]))]) 97 98 Args: 99 vectors (Union[List[Vector], List[Tuple]]): A list of vectors to upsert. 100 101 A vector can be represented by a 1) GRPCVector object, a 2) tuple or 3) a dictionary 102 1) if a tuple is used, it must be of the form (id, values, metadata) or (id, values). 103 where id is a string, vector is a list of floats, and metadata is a dict. 104 Examples: ('id1', [1.0, 2.0, 3.0], {'key': 'value'}), ('id2', [1.0, 2.0, 3.0]) 105 106 2) if a GRPCVector object is used, a GRPCVector object must be of the form 107 GRPCVector(id, values, metadata), where metadata is an optional argument of type 108 Dict[str, Union[str, float, int, bool, List[int], List[float], List[str]]] 109 Examples: GRPCVector(id='id1', values=[1.0, 2.0, 3.0], metadata={'key': 'value'}), 110 GRPCVector(id='id2', values=[1.0, 2.0, 3.0]), 111 GRPCVector(id='id3', 112 values=[1.0, 2.0, 3.0], 113 sparse_values=GRPCSparseValues(indices=[1, 2], values=[0.2, 0.4])) 114 115 3) if a dictionary is used, it must be in the form 116 {'id': str, 'values': List[float], 'sparse_values': {'indices': List[int], 'values': List[float]}, 117 'metadata': dict} 118 119 Note: the dimension of each vector must match the dimension of the index. 120 async_req (bool): If True, the upsert operation will be performed asynchronously. 121 Cannot be used with batch_size. 122 Defaults to False. See: https://docs.pinecone.io/docs/performance-tuning [optional] 123 namespace (str): The namespace to write to. If not specified, the default namespace is used. [optional] 124 batch_size (int): The number of vectors to upsert in each batch. 125 Cannot be used with async_req=True. 126 If not specified, all vectors will be upserted in a single batch. [optional] 127 show_progress (bool): Whether to show a progress bar using tqdm. 128 Applied only if batch_size is provided. Default is True. 129 130 Returns: UpsertResponse, contains the number of vectors upserted 131 """ 132 if async_req and batch_size is not None: 133 raise ValueError( 134 "async_req is not supported when batch_size is provided." 135 "To upsert in parallel, please follow: " 136 "https://docs.pinecone.io/docs/performance-tuning" 137 ) 138 139 timeout = kwargs.pop("timeout", None) 140 141 vectors = list(map(VectorFactoryGRPC.build, vectors)) 142 if async_req: 143 args_dict = self._parse_non_empty_args([("namespace", namespace)]) 144 request = UpsertRequest(vectors=vectors, **args_dict, **kwargs) 145 future = self.runner.run(self.stub.Upsert.future, request, timeout=timeout) 146 return PineconeGrpcFuture(future) 147 148 if batch_size is None: 149 return self._upsert_batch(vectors, namespace, timeout=timeout, **kwargs) 150 151 if not isinstance(batch_size, int) or batch_size <= 0: 152 raise ValueError("batch_size must be a positive integer") 153 154 pbar = tqdm(total=len(vectors), disable=not show_progress, desc="Upserted vectors") 155 total_upserted = 0 156 for i in range(0, len(vectors), batch_size): 157 batch_result = self._upsert_batch( 158 vectors[i : i + batch_size], namespace, timeout=timeout, **kwargs 159 ) 160 pbar.update(batch_result.upserted_count) 161 # we can't use here pbar.n for the case show_progress=False 162 total_upserted += batch_result.upserted_count 163 164 return UpsertResponse(upserted_count=total_upserted) 165 166 def _upsert_batch( 167 self, vectors: List[GRPCVector], namespace: Optional[str], timeout: Optional[int], **kwargs 168 ) -> UpsertResponse: 169 args_dict = self._parse_non_empty_args([("namespace", namespace)]) 170 request = UpsertRequest(vectors=vectors, **args_dict) 171 return self.runner.run(self.stub.Upsert, request, timeout=timeout, **kwargs) 172 173 def upsert_from_dataframe( 174 self, 175 df, 176 namespace: str = "", 177 batch_size: int = 500, 178 use_async_requests: bool = True, 179 show_progress: bool = True, 180 ) -> UpsertResponse: 181 """Upserts a dataframe into the index. 182 183 Args: 184 df: A pandas dataframe with the following columns: id, values, sparse_values, and metadata. 185 namespace: The namespace to upsert into. 186 batch_size: The number of rows to upsert in a single batch. 187 use_async_requests: Whether to upsert multiple requests at the same time using asynchronous request mechanism. 188 Set to `False` 189 show_progress: Whether to show a progress bar. 190 """ 191 try: 192 import pandas as pd 193 except ImportError: 194 raise RuntimeError( 195 "The `pandas` package is not installed. Please install pandas to use `upsert_from_dataframe()`" 196 ) 197 198 if not isinstance(df, pd.DataFrame): 199 raise ValueError(f"Only pandas dataframes are supported. Found: {type(df)}") 200 201 pbar = tqdm(total=len(df), disable=not show_progress, desc="sending upsert requests") 202 results = [] 203 for chunk in self._iter_dataframe(df, batch_size=batch_size): 204 res = self.upsert(vectors=chunk, namespace=namespace, async_req=use_async_requests) 205 pbar.update(len(chunk)) 206 results.append(res) 207 208 if use_async_requests: 209 cast_results = cast(List[PineconeGrpcFuture], results) 210 results = [ 211 async_result.result() 212 for async_result in tqdm( 213 iterable=cast_results, 214 disable=not show_progress, 215 desc="collecting async responses", 216 ) 217 ] 218 219 upserted_count = 0 220 for res in results: 221 if hasattr(res, "upserted_count") and isinstance(res.upserted_count, int): 222 upserted_count += res.upserted_count 223 224 return UpsertResponse(upserted_count=upserted_count) 225 226 @staticmethod 227 def _iter_dataframe(df, batch_size): 228 for i in range(0, len(df), batch_size): 229 batch = df.iloc[i : i + batch_size].to_dict(orient="records") 230 yield batch 231 232 def delete( 233 self, 234 ids: Optional[List[str]] = None, 235 delete_all: Optional[bool] = None, 236 namespace: Optional[str] = None, 237 filter: Optional[FilterTypedDict] = None, 238 async_req: bool = False, 239 **kwargs, 240 ) -> Union[DeleteResponse, PineconeGrpcFuture]: 241 """ 242 The Delete operation deletes vectors from the index, from a single namespace. 243 No error raised if the vector id does not exist. 244 Note: for any delete call, if namespace is not specified, the default namespace is used. 245 246 Delete can occur in the following mutual exclusive ways: 247 1. Delete by ids from a single namespace 248 2. Delete all vectors from a single namespace by setting delete_all to True 249 3. Delete all vectors from a single namespace by specifying a metadata filter 250 (note that for this option delete all must be set to False) 251 252 Examples: 253 >>> index.delete(ids=['id1', 'id2'], namespace='my_namespace') 254 >>> index.delete(delete_all=True, namespace='my_namespace') 255 >>> index.delete(filter={'key': 'value'}, namespace='my_namespace', async_req=True) 256 257 Args: 258 ids (List[str]): Vector ids to delete [optional] 259 delete_all (bool): This indicates that all vectors in the index namespace should be deleted.. [optional] 260 Default is False. 261 namespace (str): The namespace to delete vectors from [optional] 262 If not specified, the default namespace is used. 263 filter (FilterTypedDict): 264 If specified, the metadata filter here will be used to select the vectors to delete. 265 This is mutually exclusive with specifying ids to delete in the ids param or using delete_all=True. 266 See https://www.pinecone.io/docs/metadata-filtering/.. [optional] 267 async_req (bool): If True, the delete operation will be performed asynchronously. 268 Defaults to False. [optional] 269 270 Returns: DeleteResponse (contains no data) or a PineconeGrpcFuture object if async_req is True. 271 """ 272 273 if filter is not None: 274 filter_struct = dict_to_proto_struct(filter) 275 else: 276 filter_struct = None 277 278 args_dict = self._parse_non_empty_args( 279 [ 280 ("ids", ids), 281 ("delete_all", delete_all), 282 ("namespace", namespace), 283 ("filter", filter_struct), 284 ] 285 ) 286 timeout = kwargs.pop("timeout", None) 287 288 request = DeleteRequest(**args_dict, **kwargs) 289 if async_req: 290 future = self.runner.run(self.stub.Delete.future, request, timeout=timeout) 291 return PineconeGrpcFuture(future) 292 else: 293 return self.runner.run(self.stub.Delete, request, timeout=timeout) 294 295 def fetch( 296 self, 297 ids: Optional[List[str]], 298 namespace: Optional[str] = None, 299 async_req: Optional[bool] = False, 300 **kwargs, 301 ) -> Union[FetchResponse, PineconeGrpcFuture]: 302 """ 303 The fetch operation looks up and returns vectors, by ID, from a single namespace. 304 The returned vectors include the vector data and/or metadata. 305 306 Examples: 307 >>> index.fetch(ids=['id1', 'id2'], namespace='my_namespace') 308 >>> index.fetch(ids=['id1', 'id2']) 309 310 Args: 311 ids (List[str]): The vector IDs to fetch. 312 namespace (str): The namespace to fetch vectors from. 313 If not specified, the default namespace is used. [optional] 314 315 Returns: FetchResponse object which contains the list of Vector objects, and namespace name. 316 """ 317 timeout = kwargs.pop("timeout", None) 318 319 args_dict = self._parse_non_empty_args([("namespace", namespace)]) 320 321 request = FetchRequest(ids=ids, **args_dict, **kwargs) 322 323 if async_req: 324 future = self.runner.run(self.stub.Fetch.future, request, timeout=timeout) 325 return PineconeGrpcFuture(future, result_transformer=parse_fetch_response) 326 else: 327 response = self.runner.run(self.stub.Fetch, request, timeout=timeout) 328 return parse_fetch_response(response) 329 330 def query( 331 self, 332 vector: Optional[List[float]] = None, 333 id: Optional[str] = None, 334 namespace: Optional[str] = None, 335 top_k: Optional[int] = None, 336 filter: Optional[FilterTypedDict] = None, 337 include_values: Optional[bool] = None, 338 include_metadata: Optional[bool] = None, 339 sparse_vector: Optional[ 340 Union[SparseValues, GRPCSparseValues, SparseVectorTypedDict] 341 ] = None, 342 async_req: Optional[bool] = False, 343 **kwargs, 344 ) -> Union[QueryResponse, PineconeGrpcFuture]: 345 """ 346 The Query operation searches a namespace, using a query vector. 347 It retrieves the ids of the most similar items in a namespace, along with their similarity scores. 348 349 Examples: 350 >>> index.query(vector=[1, 2, 3], top_k=10, namespace='my_namespace') 351 >>> index.query(id='id1', top_k=10, namespace='my_namespace') 352 >>> index.query(vector=[1, 2, 3], top_k=10, namespace='my_namespace', filter={'key': 'value'}) 353 >>> index.query(id='id1', top_k=10, namespace='my_namespace', include_metadata=True, include_values=True) 354 >>> index.query(vector=[1, 2, 3], sparse_vector={'indices': [1, 2], 'values': [0.2, 0.4]}, 355 >>> top_k=10, namespace='my_namespace') 356 >>> index.query(vector=[1, 2, 3], sparse_vector=GRPCSparseValues([1, 2], [0.2, 0.4]), 357 >>> top_k=10, namespace='my_namespace') 358 359 Args: 360 vector (List[float]): The query vector. This should be the same length as the dimension of the index 361 being queried. Each `query()` request can contain only one of the parameters 362 `id` or `vector`.. [optional] 363 id (str): The unique ID of the vector to be used as a query vector. 364 Each `query()` request can contain only one of the parameters 365 `vector` or `id`.. [optional] 366 top_k (int): The number of results to return for each query. Must be an integer greater than 1. 367 namespace (str): The namespace to fetch vectors from. 368 If not specified, the default namespace is used. [optional] 369 filter (Dict[str, Union[str, float, int, bool, List, dict]]): 370 The filter to apply. You can use vector metadata to limit your search. 371 See https://www.pinecone.io/docs/metadata-filtering/.. [optional] 372 include_values (bool): Indicates whether vector values are included in the response. 373 If omitted the server will use the default value of False [optional] 374 include_metadata (bool): Indicates whether metadata is included in the response as well as the ids. 375 If omitted the server will use the default value of False [optional] 376 sparse_vector: (Union[SparseValues, Dict[str, Union[List[float], List[int]]]]): sparse values of the query vector. 377 Expected to be either a SparseValues object or a dict of the form: 378 {'indices': List[int], 'values': List[float]}, where the lists each have the same length. 379 380 Returns: QueryResponse object which contains the list of the closest vectors as ScoredVector objects, 381 and namespace name. 382 """ 383 384 if vector is not None and id is not None: 385 raise ValueError("Cannot specify both `id` and `vector`") 386 387 if filter is not None: 388 filter_struct = dict_to_proto_struct(filter) 389 else: 390 filter_struct = None 391 392 sparse_vector = SparseValuesFactory.build(sparse_vector) 393 args_dict = self._parse_non_empty_args( 394 [ 395 ("vector", vector), 396 ("id", id), 397 ("namespace", namespace), 398 ("top_k", top_k), 399 ("filter", filter_struct), 400 ("include_values", include_values), 401 ("include_metadata", include_metadata), 402 ("sparse_vector", sparse_vector), 403 ] 404 ) 405 406 request = QueryRequest(**args_dict) 407 408 timeout = kwargs.pop("timeout", None) 409 410 if async_req: 411 future = self.runner.run(self.stub.Query.future, request, timeout=timeout) 412 return PineconeGrpcFuture(future) 413 else: 414 response = self.runner.run(self.stub.Query, request, timeout=timeout) 415 json_response = json_format.MessageToDict(response) 416 return parse_query_response(json_response, _check_type=False) 417 418 def query_namespaces( 419 self, 420 vector: List[float], 421 namespaces: List[str], 422 metric: Literal["cosine", "euclidean", "dotproduct"], 423 top_k: Optional[int] = None, 424 filter: Optional[FilterTypedDict] = None, 425 include_values: Optional[bool] = None, 426 include_metadata: Optional[bool] = None, 427 sparse_vector: Optional[Union[GRPCSparseValues, SparseVectorTypedDict]] = None, 428 **kwargs, 429 ) -> QueryNamespacesResults: 430 if namespaces is None or len(namespaces) == 0: 431 raise ValueError("At least one namespace must be specified") 432 if len(vector) == 0: 433 raise ValueError("Query vector must not be empty") 434 435 overall_topk = top_k if top_k is not None else 10 436 aggregator = QueryResultsAggregator(top_k=overall_topk, metric=metric) 437 438 target_namespaces = set(namespaces) # dedup namespaces 439 futures = [ 440 self.threadpool_executor.submit( 441 self.query, 442 vector=vector, 443 namespace=ns, 444 top_k=overall_topk, 445 filter=filter, 446 include_values=include_values, 447 include_metadata=include_metadata, 448 sparse_vector=sparse_vector, 449 async_req=False, 450 **kwargs, 451 ) 452 for ns in target_namespaces 453 ] 454 455 only_futures = cast(Iterable[Future], futures) 456 for response in as_completed(only_futures): 457 aggregator.add_results(response.result()) 458 459 final_results = aggregator.get_results() 460 return final_results 461 462 def update( 463 self, 464 id: str, 465 async_req: bool = False, 466 values: Optional[List[float]] = None, 467 set_metadata: Optional[VectorMetadataTypedDict] = None, 468 namespace: Optional[str] = None, 469 sparse_values: Optional[Union[GRPCSparseValues, SparseVectorTypedDict]] = None, 470 **kwargs, 471 ) -> Union[UpdateResponse, PineconeGrpcFuture]: 472 """ 473 The Update operation updates vector in a namespace. 474 If a value is included, it will overwrite the previous value. 475 If a set_metadata is included, 476 the values of the fields specified in it will be added or overwrite the previous value. 477 478 Examples: 479 >>> index.update(id='id1', values=[1, 2, 3], namespace='my_namespace') 480 >>> index.update(id='id1', set_metadata={'key': 'value'}, namespace='my_namespace', async_req=True) 481 >>> index.update(id='id1', values=[1, 2, 3], sparse_values={'indices': [1, 2], 'values': [0.2, 0.4]}, 482 >>> namespace='my_namespace') 483 >>> index.update(id='id1', values=[1, 2, 3], sparse_values=GRPCSparseValues(indices=[1, 2], values=[0.2, 0.4]), 484 >>> namespace='my_namespace') 485 486 Args: 487 id (str): Vector's unique id. 488 async_req (bool): If True, the update operation will be performed asynchronously. 489 Defaults to False. [optional] 490 values (List[float]): vector values to set. [optional] 491 set_metadata (Dict[str, Union[str, float, int, bool, List[int], List[float], List[str]]]]): 492 metadata to set for vector. [optional] 493 namespace (str): Namespace name where to update the vector.. [optional] 494 sparse_values: (Dict[str, Union[List[float], List[int]]]): sparse values to update for the vector. 495 Expected to be either a GRPCSparseValues object or a dict of the form: 496 {'indices': List[int], 'values': List[float]} where the lists each have the same length. 497 498 499 Returns: UpdateResponse (contains no data) or a PineconeGrpcFuture object if async_req is True. 500 """ 501 if set_metadata is not None: 502 set_metadata_struct = dict_to_proto_struct(set_metadata) 503 else: 504 set_metadata_struct = None 505 506 timeout = kwargs.pop("timeout", None) 507 sparse_values = SparseValuesFactory.build(sparse_values) 508 args_dict = self._parse_non_empty_args( 509 [ 510 ("values", values), 511 ("set_metadata", set_metadata_struct), 512 ("namespace", namespace), 513 ("sparse_values", sparse_values), 514 ] 515 ) 516 517 request = UpdateRequest(id=id, **args_dict) 518 if async_req: 519 future = self.runner.run(self.stub.Update.future, request, timeout=timeout) 520 return PineconeGrpcFuture(future) 521 else: 522 return self.runner.run(self.stub.Update, request, timeout=timeout) 523 524 def list_paginated( 525 self, 526 prefix: Optional[str] = None, 527 limit: Optional[int] = None, 528 pagination_token: Optional[str] = None, 529 namespace: Optional[str] = None, 530 **kwargs, 531 ) -> SimpleListResponse: 532 """ 533 The list_paginated operation finds vectors based on an id prefix within a single namespace. 534 It returns matching ids in a paginated form, with a pagination token to fetch the next page of results. 535 This id list can then be passed to fetch or delete operations, depending on your use case. 536 537 Consider using the `list` method to avoid having to handle pagination tokens manually. 538 539 Examples: 540 >>> results = index.list_paginated(prefix='99', limit=5, namespace='my_namespace') 541 >>> [v.id for v in results.vectors] 542 ['99', '990', '991', '992', '993'] 543 >>> results.pagination.next 544 eyJza2lwX3Bhc3QiOiI5OTMiLCJwcmVmaXgiOiI5OSJ9 545 >>> next_results = index.list_paginated(prefix='99', limit=5, namespace='my_namespace', pagination_token=results.pagination.next) 546 547 Args: 548 prefix (Optional[str]): The id prefix to match. If unspecified, an empty string prefix will 549 be used with the effect of listing all ids in a namespace [optional] 550 limit (Optional[int]): The maximum number of ids to return. If unspecified, the server will use a default value. [optional] 551 pagination_token (Optional[str]): A token needed to fetch the next page of results. This token is returned 552 in the response if additional results are available. [optional] 553 namespace (Optional[str]): The namespace to fetch vectors from. If not specified, the default namespace is used. [optional] 554 555 Returns: SimpleListResponse object which contains the list of ids, the namespace name, pagination information, and usage showing the number of read_units consumed. 556 """ 557 args_dict = self._parse_non_empty_args( 558 [ 559 ("prefix", prefix), 560 ("limit", limit), 561 ("namespace", namespace), 562 ("pagination_token", pagination_token), 563 ] 564 ) 565 request = ListRequest(**args_dict, **kwargs) 566 timeout = kwargs.pop("timeout", None) 567 response = self.runner.run(self.stub.List, request, timeout=timeout) 568 569 if response.pagination and response.pagination.next != "": 570 pagination = Pagination(next=response.pagination.next) 571 else: 572 pagination = None 573 574 return SimpleListResponse( 575 namespace=response.namespace, vectors=response.vectors, pagination=pagination 576 ) 577 578 def list(self, **kwargs): 579 """ 580 The list operation accepts all of the same arguments as list_paginated, and returns a generator that yields 581 a list of the matching vector ids in each page of results. It automatically handles pagination tokens on your 582 behalf. 583 584 Examples: 585 >>> for ids in index.list(prefix='99', limit=5, namespace='my_namespace'): 586 >>> print(ids) 587 ['99', '990', '991', '992', '993'] 588 ['994', '995', '996', '997', '998'] 589 ['999'] 590 591 Args: 592 prefix (Optional[str]): The id prefix to match. If unspecified, an empty string prefix will 593 be used with the effect of listing all ids in a namespace [optional] 594 limit (Optional[int]): The maximum number of ids to return. If unspecified, the server will use a default value. [optional] 595 pagination_token (Optional[str]): A token needed to fetch the next page of results. This token is returned 596 in the response if additional results are available. [optional] 597 namespace (Optional[str]): The namespace to fetch vectors from. If not specified, the default namespace is used. [optional] 598 """ 599 done = False 600 while not done: 601 try: 602 results = self.list_paginated(**kwargs) 603 except Exception as e: 604 raise e 605 606 if len(results.vectors) > 0: 607 yield [v.id for v in results.vectors] 608 609 if results.pagination and results.pagination.next: 610 kwargs.update({"pagination_token": results.pagination.next}) 611 else: 612 done = True 613 614 def describe_index_stats( 615 self, filter: Optional[FilterTypedDict] = None, **kwargs 616 ) -> DescribeIndexStatsResponse: 617 """ 618 The DescribeIndexStats operation returns statistics about the index's contents. 619 For example: The vector count per namespace and the number of dimensions. 620 621 Examples: 622 >>> index.describe_index_stats() 623 >>> index.describe_index_stats(filter={'key': 'value'}) 624 625 Args: 626 filter (Dict[str, Union[str, float, int, bool, List, dict]]): 627 If this parameter is present, the operation only returns statistics for vectors that satisfy the filter. 628 See https://www.pinecone.io/docs/metadata-filtering/.. [optional] 629 630 Returns: DescribeIndexStatsResponse object which contains stats about the index. 631 """ 632 if filter is not None: 633 filter_struct = dict_to_proto_struct(filter) 634 else: 635 filter_struct = None 636 args_dict = self._parse_non_empty_args([("filter", filter_struct)]) 637 timeout = kwargs.pop("timeout", None) 638 639 request = DescribeIndexStatsRequest(**args_dict) 640 response = self.runner.run(self.stub.DescribeIndexStats, request, timeout=timeout) 641 json_response = json_format.MessageToDict(response) 642 return parse_stats_response(json_response) 643 644 @staticmethod 645 def _parse_non_empty_args(args: List[Tuple[str, Any]]) -> Dict[str, Any]: 646 return {arg_name: val for arg_name, val in args if val is not None}
A client for interacting with a Pinecone index via GRPC API.
69 def upsert( 70 self, 71 vectors: Union[List[Vector], List[GRPCVector], List[VectorTuple], List[VectorTypedDict]], 72 async_req: bool = False, 73 namespace: Optional[str] = None, 74 batch_size: Optional[int] = None, 75 show_progress: bool = True, 76 **kwargs, 77 ) -> Union[UpsertResponse, PineconeGrpcFuture]: 78 """ 79 The upsert operation writes vectors into a namespace. 80 If a new value is upserted for an existing vector id, it will overwrite the previous value. 81 82 Examples: 83 >>> index.upsert([('id1', [1.0, 2.0, 3.0], {'key': 'value'}), 84 ('id2', [1.0, 2.0, 3.0]) 85 ], 86 namespace='ns1', async_req=True) 87 >>> index.upsert([{'id': 'id1', 'values': [1.0, 2.0, 3.0], 'metadata': {'key': 'value'}}, 88 {'id': 'id2', 89 'values': [1.0, 2.0, 3.0], 90 'sparse_values': {'indices': [1, 8], 'values': [0.2, 0.4]}, 91 ]) 92 >>> index.upsert([GRPCVector(id='id1', values=[1.0, 2.0, 3.0], metadata={'key': 'value'}), 93 GRPCVector(id='id2', values=[1.0, 2.0, 3.0]), 94 GRPCVector(id='id3', 95 values=[1.0, 2.0, 3.0], 96 sparse_values=GRPCSparseValues(indices=[1, 2], values=[0.2, 0.4]))]) 97 98 Args: 99 vectors (Union[List[Vector], List[Tuple]]): A list of vectors to upsert. 100 101 A vector can be represented by a 1) GRPCVector object, a 2) tuple or 3) a dictionary 102 1) if a tuple is used, it must be of the form (id, values, metadata) or (id, values). 103 where id is a string, vector is a list of floats, and metadata is a dict. 104 Examples: ('id1', [1.0, 2.0, 3.0], {'key': 'value'}), ('id2', [1.0, 2.0, 3.0]) 105 106 2) if a GRPCVector object is used, a GRPCVector object must be of the form 107 GRPCVector(id, values, metadata), where metadata is an optional argument of type 108 Dict[str, Union[str, float, int, bool, List[int], List[float], List[str]]] 109 Examples: GRPCVector(id='id1', values=[1.0, 2.0, 3.0], metadata={'key': 'value'}), 110 GRPCVector(id='id2', values=[1.0, 2.0, 3.0]), 111 GRPCVector(id='id3', 112 values=[1.0, 2.0, 3.0], 113 sparse_values=GRPCSparseValues(indices=[1, 2], values=[0.2, 0.4])) 114 115 3) if a dictionary is used, it must be in the form 116 {'id': str, 'values': List[float], 'sparse_values': {'indices': List[int], 'values': List[float]}, 117 'metadata': dict} 118 119 Note: the dimension of each vector must match the dimension of the index. 120 async_req (bool): If True, the upsert operation will be performed asynchronously. 121 Cannot be used with batch_size. 122 Defaults to False. See: https://docs.pinecone.io/docs/performance-tuning [optional] 123 namespace (str): The namespace to write to. If not specified, the default namespace is used. [optional] 124 batch_size (int): The number of vectors to upsert in each batch. 125 Cannot be used with async_req=True. 126 If not specified, all vectors will be upserted in a single batch. [optional] 127 show_progress (bool): Whether to show a progress bar using tqdm. 128 Applied only if batch_size is provided. Default is True. 129 130 Returns: UpsertResponse, contains the number of vectors upserted 131 """ 132 if async_req and batch_size is not None: 133 raise ValueError( 134 "async_req is not supported when batch_size is provided." 135 "To upsert in parallel, please follow: " 136 "https://docs.pinecone.io/docs/performance-tuning" 137 ) 138 139 timeout = kwargs.pop("timeout", None) 140 141 vectors = list(map(VectorFactoryGRPC.build, vectors)) 142 if async_req: 143 args_dict = self._parse_non_empty_args([("namespace", namespace)]) 144 request = UpsertRequest(vectors=vectors, **args_dict, **kwargs) 145 future = self.runner.run(self.stub.Upsert.future, request, timeout=timeout) 146 return PineconeGrpcFuture(future) 147 148 if batch_size is None: 149 return self._upsert_batch(vectors, namespace, timeout=timeout, **kwargs) 150 151 if not isinstance(batch_size, int) or batch_size <= 0: 152 raise ValueError("batch_size must be a positive integer") 153 154 pbar = tqdm(total=len(vectors), disable=not show_progress, desc="Upserted vectors") 155 total_upserted = 0 156 for i in range(0, len(vectors), batch_size): 157 batch_result = self._upsert_batch( 158 vectors[i : i + batch_size], namespace, timeout=timeout, **kwargs 159 ) 160 pbar.update(batch_result.upserted_count) 161 # we can't use here pbar.n for the case show_progress=False 162 total_upserted += batch_result.upserted_count 163 164 return UpsertResponse(upserted_count=total_upserted)
The upsert operation writes vectors into a namespace. If a new value is upserted for an existing vector id, it will overwrite the previous value.
Examples:
>>> index.upsert([('id1', [1.0, 2.0, 3.0], {'key': 'value'}), ('id2', [1.0, 2.0, 3.0]) ], namespace='ns1', async_req=True) >>> index.upsert([{'id': 'id1', 'values': [1.0, 2.0, 3.0], 'metadata': {'key': 'value'}}, {'id': 'id2', 'values': [1.0, 2.0, 3.0], 'sparse_values': {'indices': [1, 8], 'values': [0.2, 0.4]}, ]) >>> index.upsert([GRPCVector(id='id1', values=[1.0, 2.0, 3.0], metadata={'key': 'value'}), GRPCVector(id='id2', values=[1.0, 2.0, 3.0]), GRPCVector(id='id3', values=[1.0, 2.0, 3.0], sparse_values=GRPCSparseValues(indices=[1, 2], values=[0.2, 0.4]))])
Arguments:
vectors (Union[List[Vector], List[Tuple]]): A list of vectors to upsert.
A vector can be represented by a 1) GRPCVector object, a 2) tuple or 3) a dictionary 1) if a tuple is used, it must be of the form (id, values, metadata) or (id, values). where id is a string, vector is a list of floats, and metadata is a dict. Examples: ('id1', [1.0, 2.0, 3.0], {'key': 'value'}), ('id2', [1.0, 2.0, 3.0])
2) if a GRPCVector object is used, a GRPCVector object must be of the form GRPCVector(id, values, metadata), where metadata is an optional argument of type Dict[str, Union[str, float, int, bool, List[int], List[float], List[str]]] Examples: GRPCVector(id='id1', values=[1.0, 2.0, 3.0], metadata={'key': 'value'}), GRPCVector(id='id2', values=[1.0, 2.0, 3.0]), GRPCVector(id='id3', values=[1.0, 2.0, 3.0], sparse_values=GRPCSparseValues(indices=[1, 2], values=[0.2, 0.4]))
3) if a dictionary is used, it must be in the form {'id': str, 'values': List[float], 'sparse_values': {'indices': List[int], 'values': List[float]}, 'metadata': dict}
Note: the dimension of each vector must match the dimension of the index.
- async_req (bool): If True, the upsert operation will be performed asynchronously. Cannot be used with batch_size. Defaults to False. See: https://docs.pinecone.io/docs/performance-tuning [optional]
- namespace (str): The namespace to write to. If not specified, the default namespace is used. [optional]
- batch_size (int): The number of vectors to upsert in each batch. Cannot be used with async_req=True. If not specified, all vectors will be upserted in a single batch. [optional]
- show_progress (bool): Whether to show a progress bar using tqdm. Applied only if batch_size is provided. Default is True.
Returns: UpsertResponse, contains the number of vectors upserted
173 def upsert_from_dataframe( 174 self, 175 df, 176 namespace: str = "", 177 batch_size: int = 500, 178 use_async_requests: bool = True, 179 show_progress: bool = True, 180 ) -> UpsertResponse: 181 """Upserts a dataframe into the index. 182 183 Args: 184 df: A pandas dataframe with the following columns: id, values, sparse_values, and metadata. 185 namespace: The namespace to upsert into. 186 batch_size: The number of rows to upsert in a single batch. 187 use_async_requests: Whether to upsert multiple requests at the same time using asynchronous request mechanism. 188 Set to `False` 189 show_progress: Whether to show a progress bar. 190 """ 191 try: 192 import pandas as pd 193 except ImportError: 194 raise RuntimeError( 195 "The `pandas` package is not installed. Please install pandas to use `upsert_from_dataframe()`" 196 ) 197 198 if not isinstance(df, pd.DataFrame): 199 raise ValueError(f"Only pandas dataframes are supported. Found: {type(df)}") 200 201 pbar = tqdm(total=len(df), disable=not show_progress, desc="sending upsert requests") 202 results = [] 203 for chunk in self._iter_dataframe(df, batch_size=batch_size): 204 res = self.upsert(vectors=chunk, namespace=namespace, async_req=use_async_requests) 205 pbar.update(len(chunk)) 206 results.append(res) 207 208 if use_async_requests: 209 cast_results = cast(List[PineconeGrpcFuture], results) 210 results = [ 211 async_result.result() 212 for async_result in tqdm( 213 iterable=cast_results, 214 disable=not show_progress, 215 desc="collecting async responses", 216 ) 217 ] 218 219 upserted_count = 0 220 for res in results: 221 if hasattr(res, "upserted_count") and isinstance(res.upserted_count, int): 222 upserted_count += res.upserted_count 223 224 return UpsertResponse(upserted_count=upserted_count)
Upserts a dataframe into the index.
Arguments:
- df: A pandas dataframe with the following columns: id, values, sparse_values, and metadata.
- namespace: The namespace to upsert into.
- batch_size: The number of rows to upsert in a single batch.
- use_async_requests: Whether to upsert multiple requests at the same time using asynchronous request mechanism.
Set to
False
- show_progress: Whether to show a progress bar.
232 def delete( 233 self, 234 ids: Optional[List[str]] = None, 235 delete_all: Optional[bool] = None, 236 namespace: Optional[str] = None, 237 filter: Optional[FilterTypedDict] = None, 238 async_req: bool = False, 239 **kwargs, 240 ) -> Union[DeleteResponse, PineconeGrpcFuture]: 241 """ 242 The Delete operation deletes vectors from the index, from a single namespace. 243 No error raised if the vector id does not exist. 244 Note: for any delete call, if namespace is not specified, the default namespace is used. 245 246 Delete can occur in the following mutual exclusive ways: 247 1. Delete by ids from a single namespace 248 2. Delete all vectors from a single namespace by setting delete_all to True 249 3. Delete all vectors from a single namespace by specifying a metadata filter 250 (note that for this option delete all must be set to False) 251 252 Examples: 253 >>> index.delete(ids=['id1', 'id2'], namespace='my_namespace') 254 >>> index.delete(delete_all=True, namespace='my_namespace') 255 >>> index.delete(filter={'key': 'value'}, namespace='my_namespace', async_req=True) 256 257 Args: 258 ids (List[str]): Vector ids to delete [optional] 259 delete_all (bool): This indicates that all vectors in the index namespace should be deleted.. [optional] 260 Default is False. 261 namespace (str): The namespace to delete vectors from [optional] 262 If not specified, the default namespace is used. 263 filter (FilterTypedDict): 264 If specified, the metadata filter here will be used to select the vectors to delete. 265 This is mutually exclusive with specifying ids to delete in the ids param or using delete_all=True. 266 See https://www.pinecone.io/docs/metadata-filtering/.. [optional] 267 async_req (bool): If True, the delete operation will be performed asynchronously. 268 Defaults to False. [optional] 269 270 Returns: DeleteResponse (contains no data) or a PineconeGrpcFuture object if async_req is True. 271 """ 272 273 if filter is not None: 274 filter_struct = dict_to_proto_struct(filter) 275 else: 276 filter_struct = None 277 278 args_dict = self._parse_non_empty_args( 279 [ 280 ("ids", ids), 281 ("delete_all", delete_all), 282 ("namespace", namespace), 283 ("filter", filter_struct), 284 ] 285 ) 286 timeout = kwargs.pop("timeout", None) 287 288 request = DeleteRequest(**args_dict, **kwargs) 289 if async_req: 290 future = self.runner.run(self.stub.Delete.future, request, timeout=timeout) 291 return PineconeGrpcFuture(future) 292 else: 293 return self.runner.run(self.stub.Delete, request, timeout=timeout)
The Delete operation deletes vectors from the index, from a single namespace. No error raised if the vector id does not exist. Note: for any delete call, if namespace is not specified, the default namespace is used.
Delete can occur in the following mutual exclusive ways:
- Delete by ids from a single namespace
- Delete all vectors from a single namespace by setting delete_all to True
- Delete all vectors from a single namespace by specifying a metadata filter (note that for this option delete all must be set to False)
Examples:
>>> index.delete(ids=['id1', 'id2'], namespace='my_namespace') >>> index.delete(delete_all=True, namespace='my_namespace') >>> index.delete(filter={'key': 'value'}, namespace='my_namespace', async_req=True)
Arguments:
- ids (List[str]): Vector ids to delete [optional]
- delete_all (bool): This indicates that all vectors in the index namespace should be deleted.. [optional] Default is False.
- namespace (str): The namespace to delete vectors from [optional] If not specified, the default namespace is used.
- filter (FilterTypedDict): If specified, the metadata filter here will be used to select the vectors to delete. This is mutually exclusive with specifying ids to delete in the ids param or using delete_all=True. See https://www.pinecone.io/docs/metadata-filtering/.. [optional]
- async_req (bool): If True, the delete operation will be performed asynchronously. Defaults to False. [optional]
Returns: DeleteResponse (contains no data) or a PineconeGrpcFuture object if async_req is True.
295 def fetch( 296 self, 297 ids: Optional[List[str]], 298 namespace: Optional[str] = None, 299 async_req: Optional[bool] = False, 300 **kwargs, 301 ) -> Union[FetchResponse, PineconeGrpcFuture]: 302 """ 303 The fetch operation looks up and returns vectors, by ID, from a single namespace. 304 The returned vectors include the vector data and/or metadata. 305 306 Examples: 307 >>> index.fetch(ids=['id1', 'id2'], namespace='my_namespace') 308 >>> index.fetch(ids=['id1', 'id2']) 309 310 Args: 311 ids (List[str]): The vector IDs to fetch. 312 namespace (str): The namespace to fetch vectors from. 313 If not specified, the default namespace is used. [optional] 314 315 Returns: FetchResponse object which contains the list of Vector objects, and namespace name. 316 """ 317 timeout = kwargs.pop("timeout", None) 318 319 args_dict = self._parse_non_empty_args([("namespace", namespace)]) 320 321 request = FetchRequest(ids=ids, **args_dict, **kwargs) 322 323 if async_req: 324 future = self.runner.run(self.stub.Fetch.future, request, timeout=timeout) 325 return PineconeGrpcFuture(future, result_transformer=parse_fetch_response) 326 else: 327 response = self.runner.run(self.stub.Fetch, request, timeout=timeout) 328 return parse_fetch_response(response)
The fetch operation looks up and returns vectors, by ID, from a single namespace. The returned vectors include the vector data and/or metadata.
Examples:
>>> index.fetch(ids=['id1', 'id2'], namespace='my_namespace') >>> index.fetch(ids=['id1', 'id2'])
Arguments:
- ids (List[str]): The vector IDs to fetch.
- namespace (str): The namespace to fetch vectors from. If not specified, the default namespace is used. [optional]
Returns: FetchResponse object which contains the list of Vector objects, and namespace name.
330 def query( 331 self, 332 vector: Optional[List[float]] = None, 333 id: Optional[str] = None, 334 namespace: Optional[str] = None, 335 top_k: Optional[int] = None, 336 filter: Optional[FilterTypedDict] = None, 337 include_values: Optional[bool] = None, 338 include_metadata: Optional[bool] = None, 339 sparse_vector: Optional[ 340 Union[SparseValues, GRPCSparseValues, SparseVectorTypedDict] 341 ] = None, 342 async_req: Optional[bool] = False, 343 **kwargs, 344 ) -> Union[QueryResponse, PineconeGrpcFuture]: 345 """ 346 The Query operation searches a namespace, using a query vector. 347 It retrieves the ids of the most similar items in a namespace, along with their similarity scores. 348 349 Examples: 350 >>> index.query(vector=[1, 2, 3], top_k=10, namespace='my_namespace') 351 >>> index.query(id='id1', top_k=10, namespace='my_namespace') 352 >>> index.query(vector=[1, 2, 3], top_k=10, namespace='my_namespace', filter={'key': 'value'}) 353 >>> index.query(id='id1', top_k=10, namespace='my_namespace', include_metadata=True, include_values=True) 354 >>> index.query(vector=[1, 2, 3], sparse_vector={'indices': [1, 2], 'values': [0.2, 0.4]}, 355 >>> top_k=10, namespace='my_namespace') 356 >>> index.query(vector=[1, 2, 3], sparse_vector=GRPCSparseValues([1, 2], [0.2, 0.4]), 357 >>> top_k=10, namespace='my_namespace') 358 359 Args: 360 vector (List[float]): The query vector. This should be the same length as the dimension of the index 361 being queried. Each `query()` request can contain only one of the parameters 362 `id` or `vector`.. [optional] 363 id (str): The unique ID of the vector to be used as a query vector. 364 Each `query()` request can contain only one of the parameters 365 `vector` or `id`.. [optional] 366 top_k (int): The number of results to return for each query. Must be an integer greater than 1. 367 namespace (str): The namespace to fetch vectors from. 368 If not specified, the default namespace is used. [optional] 369 filter (Dict[str, Union[str, float, int, bool, List, dict]]): 370 The filter to apply. You can use vector metadata to limit your search. 371 See https://www.pinecone.io/docs/metadata-filtering/.. [optional] 372 include_values (bool): Indicates whether vector values are included in the response. 373 If omitted the server will use the default value of False [optional] 374 include_metadata (bool): Indicates whether metadata is included in the response as well as the ids. 375 If omitted the server will use the default value of False [optional] 376 sparse_vector: (Union[SparseValues, Dict[str, Union[List[float], List[int]]]]): sparse values of the query vector. 377 Expected to be either a SparseValues object or a dict of the form: 378 {'indices': List[int], 'values': List[float]}, where the lists each have the same length. 379 380 Returns: QueryResponse object which contains the list of the closest vectors as ScoredVector objects, 381 and namespace name. 382 """ 383 384 if vector is not None and id is not None: 385 raise ValueError("Cannot specify both `id` and `vector`") 386 387 if filter is not None: 388 filter_struct = dict_to_proto_struct(filter) 389 else: 390 filter_struct = None 391 392 sparse_vector = SparseValuesFactory.build(sparse_vector) 393 args_dict = self._parse_non_empty_args( 394 [ 395 ("vector", vector), 396 ("id", id), 397 ("namespace", namespace), 398 ("top_k", top_k), 399 ("filter", filter_struct), 400 ("include_values", include_values), 401 ("include_metadata", include_metadata), 402 ("sparse_vector", sparse_vector), 403 ] 404 ) 405 406 request = QueryRequest(**args_dict) 407 408 timeout = kwargs.pop("timeout", None) 409 410 if async_req: 411 future = self.runner.run(self.stub.Query.future, request, timeout=timeout) 412 return PineconeGrpcFuture(future) 413 else: 414 response = self.runner.run(self.stub.Query, request, timeout=timeout) 415 json_response = json_format.MessageToDict(response) 416 return parse_query_response(json_response, _check_type=False)
The Query operation searches a namespace, using a query vector. It retrieves the ids of the most similar items in a namespace, along with their similarity scores.
Examples:
>>> index.query(vector=[1, 2, 3], top_k=10, namespace='my_namespace') >>> index.query(id='id1', top_k=10, namespace='my_namespace') >>> index.query(vector=[1, 2, 3], top_k=10, namespace='my_namespace', filter={'key': 'value'}) >>> index.query(id='id1', top_k=10, namespace='my_namespace', include_metadata=True, include_values=True) >>> index.query(vector=[1, 2, 3], sparse_vector={'indices': [1, 2], 'values': [0.2, 0.4]}, >>> top_k=10, namespace='my_namespace') >>> index.query(vector=[1, 2, 3], sparse_vector=GRPCSparseValues([1, 2], [0.2, 0.4]), >>> top_k=10, namespace='my_namespace')
Arguments:
- vector (List[float]): The query vector. This should be the same length as the dimension of the index
being queried. Each
query()
request can contain only one of the parametersid
orvector
.. [optional] - id (str): The unique ID of the vector to be used as a query vector.
Each
query()
request can contain only one of the parametersvector
orid
.. [optional] - top_k (int): The number of results to return for each query. Must be an integer greater than 1.
- namespace (str): The namespace to fetch vectors from. If not specified, the default namespace is used. [optional]
- filter (Dict[str, Union[str, float, int, bool, List, dict]]): The filter to apply. You can use vector metadata to limit your search. See https://www.pinecone.io/docs/metadata-filtering/.. [optional]
- include_values (bool): Indicates whether vector values are included in the response. If omitted the server will use the default value of False [optional]
- include_metadata (bool): Indicates whether metadata is included in the response as well as the ids. If omitted the server will use the default value of False [optional]
- sparse_vector: (Union[SparseValues, Dict[str, Union[List[float], List[int]]]]): sparse values of the query vector. Expected to be either a SparseValues object or a dict of the form: {'indices': List[int], 'values': List[float]}, where the lists each have the same length.
Returns: QueryResponse object which contains the list of the closest vectors as ScoredVector objects, and namespace name.
418 def query_namespaces( 419 self, 420 vector: List[float], 421 namespaces: List[str], 422 metric: Literal["cosine", "euclidean", "dotproduct"], 423 top_k: Optional[int] = None, 424 filter: Optional[FilterTypedDict] = None, 425 include_values: Optional[bool] = None, 426 include_metadata: Optional[bool] = None, 427 sparse_vector: Optional[Union[GRPCSparseValues, SparseVectorTypedDict]] = None, 428 **kwargs, 429 ) -> QueryNamespacesResults: 430 if namespaces is None or len(namespaces) == 0: 431 raise ValueError("At least one namespace must be specified") 432 if len(vector) == 0: 433 raise ValueError("Query vector must not be empty") 434 435 overall_topk = top_k if top_k is not None else 10 436 aggregator = QueryResultsAggregator(top_k=overall_topk, metric=metric) 437 438 target_namespaces = set(namespaces) # dedup namespaces 439 futures = [ 440 self.threadpool_executor.submit( 441 self.query, 442 vector=vector, 443 namespace=ns, 444 top_k=overall_topk, 445 filter=filter, 446 include_values=include_values, 447 include_metadata=include_metadata, 448 sparse_vector=sparse_vector, 449 async_req=False, 450 **kwargs, 451 ) 452 for ns in target_namespaces 453 ] 454 455 only_futures = cast(Iterable[Future], futures) 456 for response in as_completed(only_futures): 457 aggregator.add_results(response.result()) 458 459 final_results = aggregator.get_results() 460 return final_results
462 def update( 463 self, 464 id: str, 465 async_req: bool = False, 466 values: Optional[List[float]] = None, 467 set_metadata: Optional[VectorMetadataTypedDict] = None, 468 namespace: Optional[str] = None, 469 sparse_values: Optional[Union[GRPCSparseValues, SparseVectorTypedDict]] = None, 470 **kwargs, 471 ) -> Union[UpdateResponse, PineconeGrpcFuture]: 472 """ 473 The Update operation updates vector in a namespace. 474 If a value is included, it will overwrite the previous value. 475 If a set_metadata is included, 476 the values of the fields specified in it will be added or overwrite the previous value. 477 478 Examples: 479 >>> index.update(id='id1', values=[1, 2, 3], namespace='my_namespace') 480 >>> index.update(id='id1', set_metadata={'key': 'value'}, namespace='my_namespace', async_req=True) 481 >>> index.update(id='id1', values=[1, 2, 3], sparse_values={'indices': [1, 2], 'values': [0.2, 0.4]}, 482 >>> namespace='my_namespace') 483 >>> index.update(id='id1', values=[1, 2, 3], sparse_values=GRPCSparseValues(indices=[1, 2], values=[0.2, 0.4]), 484 >>> namespace='my_namespace') 485 486 Args: 487 id (str): Vector's unique id. 488 async_req (bool): If True, the update operation will be performed asynchronously. 489 Defaults to False. [optional] 490 values (List[float]): vector values to set. [optional] 491 set_metadata (Dict[str, Union[str, float, int, bool, List[int], List[float], List[str]]]]): 492 metadata to set for vector. [optional] 493 namespace (str): Namespace name where to update the vector.. [optional] 494 sparse_values: (Dict[str, Union[List[float], List[int]]]): sparse values to update for the vector. 495 Expected to be either a GRPCSparseValues object or a dict of the form: 496 {'indices': List[int], 'values': List[float]} where the lists each have the same length. 497 498 499 Returns: UpdateResponse (contains no data) or a PineconeGrpcFuture object if async_req is True. 500 """ 501 if set_metadata is not None: 502 set_metadata_struct = dict_to_proto_struct(set_metadata) 503 else: 504 set_metadata_struct = None 505 506 timeout = kwargs.pop("timeout", None) 507 sparse_values = SparseValuesFactory.build(sparse_values) 508 args_dict = self._parse_non_empty_args( 509 [ 510 ("values", values), 511 ("set_metadata", set_metadata_struct), 512 ("namespace", namespace), 513 ("sparse_values", sparse_values), 514 ] 515 ) 516 517 request = UpdateRequest(id=id, **args_dict) 518 if async_req: 519 future = self.runner.run(self.stub.Update.future, request, timeout=timeout) 520 return PineconeGrpcFuture(future) 521 else: 522 return self.runner.run(self.stub.Update, request, timeout=timeout)
The Update operation updates vector in a namespace. If a value is included, it will overwrite the previous value. If a set_metadata is included, the values of the fields specified in it will be added or overwrite the previous value.
Examples:
>>> index.update(id='id1', values=[1, 2, 3], namespace='my_namespace') >>> index.update(id='id1', set_metadata={'key': 'value'}, namespace='my_namespace', async_req=True) >>> index.update(id='id1', values=[1, 2, 3], sparse_values={'indices': [1, 2], 'values': [0.2, 0.4]}, >>> namespace='my_namespace') >>> index.update(id='id1', values=[1, 2, 3], sparse_values=GRPCSparseValues(indices=[1, 2], values=[0.2, 0.4]), >>> namespace='my_namespace')
Arguments:
- id (str): Vector's unique id.
- async_req (bool): If True, the update operation will be performed asynchronously. Defaults to False. [optional]
- values (List[float]): vector values to set. [optional]
- set_metadata (Dict[str, Union[str, float, int, bool, List[int], List[float], List[str]]]]): metadata to set for vector. [optional]
- namespace (str): Namespace name where to update the vector.. [optional]
- sparse_values: (Dict[str, Union[List[float], List[int]]]): sparse values to update for the vector. Expected to be either a GRPCSparseValues object or a dict of the form: {'indices': List[int], 'values': List[float]} where the lists each have the same length.
Returns: UpdateResponse (contains no data) or a PineconeGrpcFuture object if async_req is True.
524 def list_paginated( 525 self, 526 prefix: Optional[str] = None, 527 limit: Optional[int] = None, 528 pagination_token: Optional[str] = None, 529 namespace: Optional[str] = None, 530 **kwargs, 531 ) -> SimpleListResponse: 532 """ 533 The list_paginated operation finds vectors based on an id prefix within a single namespace. 534 It returns matching ids in a paginated form, with a pagination token to fetch the next page of results. 535 This id list can then be passed to fetch or delete operations, depending on your use case. 536 537 Consider using the `list` method to avoid having to handle pagination tokens manually. 538 539 Examples: 540 >>> results = index.list_paginated(prefix='99', limit=5, namespace='my_namespace') 541 >>> [v.id for v in results.vectors] 542 ['99', '990', '991', '992', '993'] 543 >>> results.pagination.next 544 eyJza2lwX3Bhc3QiOiI5OTMiLCJwcmVmaXgiOiI5OSJ9 545 >>> next_results = index.list_paginated(prefix='99', limit=5, namespace='my_namespace', pagination_token=results.pagination.next) 546 547 Args: 548 prefix (Optional[str]): The id prefix to match. If unspecified, an empty string prefix will 549 be used with the effect of listing all ids in a namespace [optional] 550 limit (Optional[int]): The maximum number of ids to return. If unspecified, the server will use a default value. [optional] 551 pagination_token (Optional[str]): A token needed to fetch the next page of results. This token is returned 552 in the response if additional results are available. [optional] 553 namespace (Optional[str]): The namespace to fetch vectors from. If not specified, the default namespace is used. [optional] 554 555 Returns: SimpleListResponse object which contains the list of ids, the namespace name, pagination information, and usage showing the number of read_units consumed. 556 """ 557 args_dict = self._parse_non_empty_args( 558 [ 559 ("prefix", prefix), 560 ("limit", limit), 561 ("namespace", namespace), 562 ("pagination_token", pagination_token), 563 ] 564 ) 565 request = ListRequest(**args_dict, **kwargs) 566 timeout = kwargs.pop("timeout", None) 567 response = self.runner.run(self.stub.List, request, timeout=timeout) 568 569 if response.pagination and response.pagination.next != "": 570 pagination = Pagination(next=response.pagination.next) 571 else: 572 pagination = None 573 574 return SimpleListResponse( 575 namespace=response.namespace, vectors=response.vectors, pagination=pagination 576 )
The list_paginated operation finds vectors based on an id prefix within a single namespace. It returns matching ids in a paginated form, with a pagination token to fetch the next page of results. This id list can then be passed to fetch or delete operations, depending on your use case.
Consider using the list
method to avoid having to handle pagination tokens manually.
Examples:
>>> results = index.list_paginated(prefix='99', limit=5, namespace='my_namespace') >>> [v.id for v in results.vectors] ['99', '990', '991', '992', '993'] >>> results.pagination.next eyJza2lwX3Bhc3QiOiI5OTMiLCJwcmVmaXgiOiI5OSJ9 >>> next_results = index.list_paginated(prefix='99', limit=5, namespace='my_namespace', pagination_token=results.pagination.next)
Arguments:
- prefix (Optional[str]): The id prefix to match. If unspecified, an empty string prefix will be used with the effect of listing all ids in a namespace [optional]
- limit (Optional[int]): The maximum number of ids to return. If unspecified, the server will use a default value. [optional]
- pagination_token (Optional[str]): A token needed to fetch the next page of results. This token is returned in the response if additional results are available. [optional]
- namespace (Optional[str]): The namespace to fetch vectors from. If not specified, the default namespace is used. [optional]
Returns: SimpleListResponse object which contains the list of ids, the namespace name, pagination information, and usage showing the number of read_units consumed.
578 def list(self, **kwargs): 579 """ 580 The list operation accepts all of the same arguments as list_paginated, and returns a generator that yields 581 a list of the matching vector ids in each page of results. It automatically handles pagination tokens on your 582 behalf. 583 584 Examples: 585 >>> for ids in index.list(prefix='99', limit=5, namespace='my_namespace'): 586 >>> print(ids) 587 ['99', '990', '991', '992', '993'] 588 ['994', '995', '996', '997', '998'] 589 ['999'] 590 591 Args: 592 prefix (Optional[str]): The id prefix to match. If unspecified, an empty string prefix will 593 be used with the effect of listing all ids in a namespace [optional] 594 limit (Optional[int]): The maximum number of ids to return. If unspecified, the server will use a default value. [optional] 595 pagination_token (Optional[str]): A token needed to fetch the next page of results. This token is returned 596 in the response if additional results are available. [optional] 597 namespace (Optional[str]): The namespace to fetch vectors from. If not specified, the default namespace is used. [optional] 598 """ 599 done = False 600 while not done: 601 try: 602 results = self.list_paginated(**kwargs) 603 except Exception as e: 604 raise e 605 606 if len(results.vectors) > 0: 607 yield [v.id for v in results.vectors] 608 609 if results.pagination and results.pagination.next: 610 kwargs.update({"pagination_token": results.pagination.next}) 611 else: 612 done = True
The list operation accepts all of the same arguments as list_paginated, and returns a generator that yields a list of the matching vector ids in each page of results. It automatically handles pagination tokens on your behalf.
Examples:
>>> for ids in index.list(prefix='99', limit=5, namespace='my_namespace'): >>> print(ids) ['99', '990', '991', '992', '993'] ['994', '995', '996', '997', '998'] ['999']
Arguments:
- prefix (Optional[str]): The id prefix to match. If unspecified, an empty string prefix will be used with the effect of listing all ids in a namespace [optional]
- limit (Optional[int]): The maximum number of ids to return. If unspecified, the server will use a default value. [optional]
- pagination_token (Optional[str]): A token needed to fetch the next page of results. This token is returned in the response if additional results are available. [optional]
- namespace (Optional[str]): The namespace to fetch vectors from. If not specified, the default namespace is used. [optional]
614 def describe_index_stats( 615 self, filter: Optional[FilterTypedDict] = None, **kwargs 616 ) -> DescribeIndexStatsResponse: 617 """ 618 The DescribeIndexStats operation returns statistics about the index's contents. 619 For example: The vector count per namespace and the number of dimensions. 620 621 Examples: 622 >>> index.describe_index_stats() 623 >>> index.describe_index_stats(filter={'key': 'value'}) 624 625 Args: 626 filter (Dict[str, Union[str, float, int, bool, List, dict]]): 627 If this parameter is present, the operation only returns statistics for vectors that satisfy the filter. 628 See https://www.pinecone.io/docs/metadata-filtering/.. [optional] 629 630 Returns: DescribeIndexStatsResponse object which contains stats about the index. 631 """ 632 if filter is not None: 633 filter_struct = dict_to_proto_struct(filter) 634 else: 635 filter_struct = None 636 args_dict = self._parse_non_empty_args([("filter", filter_struct)]) 637 timeout = kwargs.pop("timeout", None) 638 639 request = DescribeIndexStatsRequest(**args_dict) 640 response = self.runner.run(self.stub.DescribeIndexStats, request, timeout=timeout) 641 json_response = json_format.MessageToDict(response) 642 return parse_stats_response(json_response)
The DescribeIndexStats operation returns statistics about the index's contents. For example: The vector count per namespace and the number of dimensions.
Examples:
>>> index.describe_index_stats() >>> index.describe_index_stats(filter={'key': 'value'})
Arguments:
- filter (Dict[str, Union[str, float, int, bool, List, dict]]):
- If this parameter is present, the operation only returns statistics for vectors that satisfy the filter.
- See https: //www.pinecone.io/docs/metadata-filtering/.. [optional]
Returns: DescribeIndexStatsResponse object which contains stats about the index.
Inherited Members
- pinecone.grpc.base.GRPCIndexBase
- GRPCIndexBase
- config
- grpc_client_config
- pool_threads
- runner
- channel_factory
- stub
- threadpool_executor
- channel
- grpc_server_on
- close
7class PineconeGRPC(Pinecone): 8 """ 9 An alternative version of the Pinecone client that uses gRPC instead of HTTP for 10 data operations. 11 12 ### Installing the gRPC client 13 14 You must install extra dependencies in order to install the GRPC client. 15 16 #### Installing with pip 17 18 ```bash 19 # Install the latest version 20 pip3 install pinecone[grpc] 21 22 # Install a specific version 23 pip3 install "pinecone[grpc]"==3.0.0 24 ``` 25 26 #### Installing with poetry 27 28 ```bash 29 # Install the latest version 30 poetry add pinecone --extras grpc 31 32 # Install a specific version 33 poetry add pinecone==3.0.0 --extras grpc 34 ``` 35 36 ### Using the gRPC client 37 38 ```python 39 import os 40 from pinecone.grpc import PineconeGRPC 41 42 client = PineconeGRPC(api_key=os.environ.get("PINECONE_API_KEY")) 43 44 # From this point on, usage is identical to the HTTP client. 45 index = client.Index("my-index", host=os.environ("PINECONE_INDEX_HOST")) 46 index.query(...) 47 ``` 48 49 """ 50 51 def Index(self, name: str = "", host: str = "", **kwargs): 52 """ 53 Target an index for data operations. 54 55 ### Target an index by host url 56 57 In production situations, you want to uspert or query your data as quickly 58 as possible. If you know in advance the host url of your index, you can 59 eliminate a round trip to the Pinecone control plane by specifying the 60 host of the index. 61 62 ```python 63 import os 64 from pinecone.grpc import PineconeGRPC 65 66 api_key = os.environ.get("PINECONE_API_KEY") 67 index_host = os.environ.get("PINECONE_INDEX_HOST") 68 69 pc = PineconeGRPC(api_key=api_key) 70 index = pc.Index(host=index_host) 71 72 # Now you're ready to perform data operations 73 index.query(vector=[...], top_k=10) 74 ``` 75 76 To find your host url, you can use the Pinecone control plane to describe 77 the index. The host url is returned in the response. Or, alternatively, the 78 host is displayed in the Pinecone web console. 79 80 ```python 81 import os 82 from pinecone import Pinecone 83 84 pc = Pinecone( 85 api_key=os.environ.get("PINECONE_API_KEY") 86 ) 87 88 host = pc.describe_index('index-name').host 89 ``` 90 91 ### Target an index by name (not recommended for production) 92 93 For more casual usage, such as when you are playing and exploring with Pinecone 94 in a notebook setting, you can also target an index by name. If you use this 95 approach, the client may need to perform an extra call to the Pinecone control 96 plane to get the host url on your behalf to get the index host. 97 98 The client will cache the index host for future use whenever it is seen, so you 99 will only incur the overhead of only one call. But this approach is not 100 recommended for production usage. 101 102 ```python 103 import os 104 from pinecone import ServerlessSpec 105 from pinecone.grpc import PineconeGRPC 106 107 api_key = os.environ.get("PINECONE_API_KEY") 108 109 pc = PineconeGRPC(api_key=api_key) 110 pc.create_index( 111 name='my-index', 112 dimension=1536, 113 metric='cosine', 114 spec=ServerlessSpec(cloud='aws', region='us-west-2') 115 ) 116 index = pc.Index('my-index') 117 118 # Now you're ready to perform data operations 119 index.query(vector=[...], top_k=10) 120 ``` 121 """ 122 if name == "" and host == "": 123 raise ValueError("Either name or host must be specified") 124 125 # Use host if it is provided, otherwise get host from describe_index 126 index_host = host or self.index_host_store.get_host(self.index_api, self.config, name) 127 128 pt = kwargs.pop("pool_threads", None) or self.pool_threads 129 130 config = ConfigBuilder.build( 131 api_key=self.config.api_key, 132 host=index_host, 133 source_tag=self.config.source_tag, 134 proxy_url=self.config.proxy_url, 135 ssl_ca_certs=self.config.ssl_ca_certs, 136 ) 137 return GRPCIndex(index_name=name, config=config, pool_threads=pt, **kwargs)
An alternative version of the Pinecone client that uses gRPC instead of HTTP for data operations.
Installing the gRPC client
You must install extra dependencies in order to install the GRPC client.
Installing with pip
# Install the latest version
pip3 install pinecone[grpc]
# Install a specific version
pip3 install "pinecone[grpc]"==3.0.0
Installing with poetry
# Install the latest version
poetry add pinecone --extras grpc
# Install a specific version
poetry add pinecone==3.0.0 --extras grpc
Using the gRPC client
import os
from pinecone.grpc import PineconeGRPC
client = PineconeGRPC(api_key=os.environ.get("PINECONE_API_KEY"))
# From this point on, usage is identical to the HTTP client.
index = client.Index("my-index", host=os.environ("PINECONE_INDEX_HOST"))
index.query(...)
51 def Index(self, name: str = "", host: str = "", **kwargs): 52 """ 53 Target an index for data operations. 54 55 ### Target an index by host url 56 57 In production situations, you want to uspert or query your data as quickly 58 as possible. If you know in advance the host url of your index, you can 59 eliminate a round trip to the Pinecone control plane by specifying the 60 host of the index. 61 62 ```python 63 import os 64 from pinecone.grpc import PineconeGRPC 65 66 api_key = os.environ.get("PINECONE_API_KEY") 67 index_host = os.environ.get("PINECONE_INDEX_HOST") 68 69 pc = PineconeGRPC(api_key=api_key) 70 index = pc.Index(host=index_host) 71 72 # Now you're ready to perform data operations 73 index.query(vector=[...], top_k=10) 74 ``` 75 76 To find your host url, you can use the Pinecone control plane to describe 77 the index. The host url is returned in the response. Or, alternatively, the 78 host is displayed in the Pinecone web console. 79 80 ```python 81 import os 82 from pinecone import Pinecone 83 84 pc = Pinecone( 85 api_key=os.environ.get("PINECONE_API_KEY") 86 ) 87 88 host = pc.describe_index('index-name').host 89 ``` 90 91 ### Target an index by name (not recommended for production) 92 93 For more casual usage, such as when you are playing and exploring with Pinecone 94 in a notebook setting, you can also target an index by name. If you use this 95 approach, the client may need to perform an extra call to the Pinecone control 96 plane to get the host url on your behalf to get the index host. 97 98 The client will cache the index host for future use whenever it is seen, so you 99 will only incur the overhead of only one call. But this approach is not 100 recommended for production usage. 101 102 ```python 103 import os 104 from pinecone import ServerlessSpec 105 from pinecone.grpc import PineconeGRPC 106 107 api_key = os.environ.get("PINECONE_API_KEY") 108 109 pc = PineconeGRPC(api_key=api_key) 110 pc.create_index( 111 name='my-index', 112 dimension=1536, 113 metric='cosine', 114 spec=ServerlessSpec(cloud='aws', region='us-west-2') 115 ) 116 index = pc.Index('my-index') 117 118 # Now you're ready to perform data operations 119 index.query(vector=[...], top_k=10) 120 ``` 121 """ 122 if name == "" and host == "": 123 raise ValueError("Either name or host must be specified") 124 125 # Use host if it is provided, otherwise get host from describe_index 126 index_host = host or self.index_host_store.get_host(self.index_api, self.config, name) 127 128 pt = kwargs.pop("pool_threads", None) or self.pool_threads 129 130 config = ConfigBuilder.build( 131 api_key=self.config.api_key, 132 host=index_host, 133 source_tag=self.config.source_tag, 134 proxy_url=self.config.proxy_url, 135 ssl_ca_certs=self.config.ssl_ca_certs, 136 ) 137 return GRPCIndex(index_name=name, config=config, pool_threads=pt, **kwargs)
Target an index for data operations.
Target an index by host url
In production situations, you want to uspert or query your data as quickly as possible. If you know in advance the host url of your index, you can eliminate a round trip to the Pinecone control plane by specifying the host of the index.
import os
from pinecone.grpc import PineconeGRPC
api_key = os.environ.get("PINECONE_API_KEY")
index_host = os.environ.get("PINECONE_INDEX_HOST")
pc = PineconeGRPC(api_key=api_key)
index = pc.Index(host=index_host)
# Now you're ready to perform data operations
index.query(vector=[...], top_k=10)
To find your host url, you can use the Pinecone control plane to describe the index. The host url is returned in the response. Or, alternatively, the host is displayed in the Pinecone web console.
import os
from pinecone import Pinecone
pc = Pinecone(
api_key=os.environ.get("PINECONE_API_KEY")
)
host = pc.describe_index('index-name').host
Target an index by name (not recommended for production)
For more casual usage, such as when you are playing and exploring with Pinecone in a notebook setting, you can also target an index by name. If you use this approach, the client may need to perform an extra call to the Pinecone control plane to get the host url on your behalf to get the index host.
The client will cache the index host for future use whenever it is seen, so you will only incur the overhead of only one call. But this approach is not recommended for production usage.
import os
from pinecone import ServerlessSpec
from pinecone.grpc import PineconeGRPC
api_key = os.environ.get("PINECONE_API_KEY")
pc = PineconeGRPC(api_key=api_key)
pc.create_index(
name='my-index',
dimension=1536,
metric='cosine',
spec=ServerlessSpec(cloud='aws', region='us-west-2')
)
index = pc.Index('my-index')
# Now you're ready to perform data operations
index.query(vector=[...], top_k=10)
6class GRPCClientConfig(NamedTuple): 7 """ 8 GRPC client configuration options. 9 10 :param secure: Whether to use encrypted protocol (SSL). defaults to True. 11 :type secure: bool, optional 12 :param timeout: defaults to 2 seconds. Fail if gateway doesn't receive response within timeout. 13 :type timeout: int, optional 14 :param conn_timeout: defaults to 1. Timeout to retry connection if gRPC is unavailable. 0 is no retry. 15 :type conn_timeout: int, optional 16 :param reuse_channel: Whether to reuse the same grpc channel for multiple requests 17 :type reuse_channel: bool, optional 18 :param retry_config: RetryConfig indicating how requests should be retried 19 :type retry_config: RetryConfig, optional 20 :param grpc_channel_options: A dict of gRPC channel arguments 21 :type grpc_channel_options: Dict[str, str] 22 :param additional_metadata: Additional metadata to be sent to the server with each request. Note that this 23 metadata refers to [gRPC metadata](https://grpc.io/docs/guides/metadata/) which is a concept similar 24 to HTTP headers. This is unrelated to the metadata can be stored with a vector in the index. 25 :type additional_metadata: Dict[str, str] 26 """ 27 28 secure: bool = True 29 timeout: int = 20 30 conn_timeout: int = 1 31 reuse_channel: bool = True 32 retry_config: Optional[RetryConfig] = None 33 grpc_channel_options: Optional[Dict[str, str]] = None 34 additional_metadata: Optional[Dict[str, str]] = None 35 36 @classmethod 37 def _from_dict(cls, kwargs: dict): 38 cls_kwargs = {kk: vv for kk, vv in kwargs.items() if kk in cls._fields} 39 return cls(**cls_kwargs)
GRPC client configuration options.
Parameters
- secure: Whether to use encrypted protocol (SSL). defaults to True.
- timeout: defaults to 2 seconds. Fail if gateway doesn't receive response within timeout.
- conn_timeout: defaults to 1. Timeout to retry connection if gRPC is unavailable. 0 is no retry.
- reuse_channel: Whether to reuse the same grpc channel for multiple requests
- retry_config: RetryConfig indicating how requests should be retried
- grpc_channel_options: A dict of gRPC channel arguments
- additional_metadata: Additional metadata to be sent to the server with each request. Note that this metadata refers to gRPC metadata which is a concept similar to HTTP headers. This is unrelated to the metadata can be stored with a vector in the index.
Create new instance of GRPCClientConfig(secure, timeout, conn_timeout, reuse_channel, retry_config, grpc_channel_options, additional_metadata)
Inherited Members
- builtins.tuple
- index
- count
10@dataclass 11class Vector(DictLike): 12 id: str 13 values: List[float] = field(default_factory=list) 14 metadata: Optional[VectorMetadataTypedDict] = None 15 sparse_values: Optional[SparseValues] = None 16 17 def __post_init__(self): 18 if self.sparse_values is None and len(self.values) == 0: 19 raise ValueError("The values and sparse_values fields cannot both be empty") 20 21 def to_dict(self) -> VectorTypedDict: 22 vector_dict: VectorTypedDict = {"id": self.id, "values": self.values} 23 if self.metadata is not None: 24 vector_dict["metadata"] = self.metadata 25 if self.sparse_values is not None: 26 vector_dict["sparse_values"] = self.sparse_values.to_dict() 27 return vector_dict 28 29 @staticmethod 30 def from_dict(vector_dict: VectorTypedDict) -> "Vector": 31 passed_sparse_values = vector_dict.get("sparse_values") 32 if passed_sparse_values is not None: 33 parsed_sparse_values = SparseValues.from_dict(passed_sparse_values) 34 else: 35 parsed_sparse_values = None 36 37 return Vector( 38 id=vector_dict["id"], 39 values=vector_dict["values"], 40 metadata=vector_dict.get("metadata"), 41 sparse_values=parsed_sparse_values, 42 )
21 def to_dict(self) -> VectorTypedDict: 22 vector_dict: VectorTypedDict = {"id": self.id, "values": self.values} 23 if self.metadata is not None: 24 vector_dict["metadata"] = self.metadata 25 if self.sparse_values is not None: 26 vector_dict["sparse_values"] = self.sparse_values.to_dict() 27 return vector_dict
29 @staticmethod 30 def from_dict(vector_dict: VectorTypedDict) -> "Vector": 31 passed_sparse_values = vector_dict.get("sparse_values") 32 if passed_sparse_values is not None: 33 parsed_sparse_values = SparseValues.from_dict(passed_sparse_values) 34 else: 35 parsed_sparse_values = None 36 37 return Vector( 38 id=vector_dict["id"], 39 values=vector_dict["values"], 40 metadata=vector_dict.get("metadata"), 41 sparse_values=parsed_sparse_values, 42 )
9@dataclass 10class SparseValues(DictLike): 11 indices: List[int] 12 values: List[float] 13 14 def to_dict(self) -> SparseVectorTypedDict: 15 return {"indices": self.indices, "values": self.values} 16 17 @staticmethod 18 def from_dict(sparse_values_dict: SparseVectorTypedDict) -> "SparseValues": 19 return SparseValues( 20 indices=sparse_values_dict["indices"], values=sparse_values_dict["values"] 21 )
8class PineconeGrpcFuture(ConcurrentFuture): 9 def __init__( 10 self, grpc_future: GrpcFuture, timeout: Optional[int] = None, result_transformer=None 11 ): 12 super().__init__() 13 self._grpc_future = grpc_future 14 self._result_transformer = result_transformer 15 if timeout is not None: 16 self._default_timeout = timeout # seconds 17 else: 18 self._default_timeout = 5 # seconds 19 20 # Sync initial state, in case the gRPC future is already done 21 self._sync_state(self._grpc_future) 22 23 # Add callback to subscribe to updates from the gRPC future 24 self._grpc_future.add_done_callback(self._sync_state) 25 26 @property 27 def grpc_future(self): 28 return self._grpc_future 29 30 def _sync_state(self, grpc_future): 31 if self.done(): 32 return 33 34 if grpc_future.cancelled(): 35 self.cancel() 36 elif grpc_future.exception(timeout=self._default_timeout): 37 self.set_exception(grpc_future.exception()) 38 elif grpc_future.done(): 39 try: 40 result = grpc_future.result(timeout=self._default_timeout) 41 self.set_result(result) 42 except Exception as e: 43 self.set_exception(e) 44 elif grpc_future.running(): 45 self.set_running_or_notify_cancel() 46 47 def set_result(self, result): 48 if self._result_transformer: 49 result = self._result_transformer(result) 50 return super().set_result(result) 51 52 def cancel(self): 53 self._grpc_future.cancel() 54 return super().cancel() 55 56 def exception(self, timeout=None): 57 exception = super().exception(timeout=self._timeout(timeout)) 58 if isinstance(exception, RpcError): 59 return self._wrap_rpc_exception(exception) 60 return exception 61 62 def traceback(self, timeout=None): 63 # This is not part of the ConcurrentFuture interface, but keeping it for 64 # backward compatibility 65 return self._grpc_future.traceback(timeout=self._timeout(timeout)) 66 67 def result(self, timeout=None): 68 try: 69 return super().result(timeout=self._timeout(timeout)) 70 except RpcError as e: 71 raise self._wrap_rpc_exception(e) from e 72 73 def _timeout(self, timeout: Optional[int] = None) -> int: 74 if timeout is not None: 75 return timeout 76 else: 77 return self._default_timeout 78 79 def _wrap_rpc_exception(self, e): 80 if e._state and e._state.debug_error_string: 81 return PineconeException(e._state.debug_error_string) 82 else: 83 return PineconeException("Unknown GRPC error") 84 85 def __del__(self): 86 self._grpc_future.cancel() 87 self = None # release the reference to the grpc future
Represents the result of an asynchronous computation.
9 def __init__( 10 self, grpc_future: GrpcFuture, timeout: Optional[int] = None, result_transformer=None 11 ): 12 super().__init__() 13 self._grpc_future = grpc_future 14 self._result_transformer = result_transformer 15 if timeout is not None: 16 self._default_timeout = timeout # seconds 17 else: 18 self._default_timeout = 5 # seconds 19 20 # Sync initial state, in case the gRPC future is already done 21 self._sync_state(self._grpc_future) 22 23 # Add callback to subscribe to updates from the gRPC future 24 self._grpc_future.add_done_callback(self._sync_state)
Initializes the future. Should not be called by clients.
47 def set_result(self, result): 48 if self._result_transformer: 49 result = self._result_transformer(result) 50 return super().set_result(result)
Sets the return value of work associated with the future.
Should only be used by Executor implementations and unit tests.
Cancel the future if possible.
Returns True if the future was cancelled, False otherwise. A future cannot be cancelled if it is running or has already completed.
56 def exception(self, timeout=None): 57 exception = super().exception(timeout=self._timeout(timeout)) 58 if isinstance(exception, RpcError): 59 return self._wrap_rpc_exception(exception) 60 return exception
Return the exception raised by the call that the future represents.
Arguments:
- timeout: The number of seconds to wait for the exception if the future isn't done. If None, then there is no limit on the wait time.
Returns:
The exception raised by the call that the future represents or None if the call completed without raising.
Raises:
- CancelledError: If the future was cancelled.
- TimeoutError: If the future didn't finish executing before the given timeout.
67 def result(self, timeout=None): 68 try: 69 return super().result(timeout=self._timeout(timeout)) 70 except RpcError as e: 71 raise self._wrap_rpc_exception(e) from e
Return the result of the call that the future represents.
Arguments:
- timeout: The number of seconds to wait for the result if the future isn't done. If None, then there is no limit on the wait time.
Returns:
The result of the call that the future represents.
Raises:
- CancelledError: If the future was cancelled.
- TimeoutError: If the future didn't finish executing before the given timeout.
- Exception: If the call raised then that exception will be raised.
Inherited Members
- concurrent.futures._base.Future
- cancelled
- running
- done
- add_done_callback
- set_running_or_notify_cancel
- set_exception