pinecone .data .index_asyncio
1from pinecone.utils.tqdm import tqdm 2 3 4import logging 5import asyncio 6import json 7 8from .index_asyncio_interface import IndexAsyncioInterface 9from .query_results_aggregator import QueryResultsAggregator 10from typing import Union, List, Optional, Dict, Any, Literal 11 12from pinecone.config import ConfigBuilder 13 14from pinecone.openapi_support import AsyncioApiClient 15from pinecone.core.openapi.db_data.api.vector_operations_api import AsyncioVectorOperationsApi 16from pinecone.core.openapi.db_data import API_VERSION 17from pinecone.core.openapi.db_data.models import ( 18 QueryResponse, 19 IndexDescription as DescribeIndexStatsResponse, 20 UpsertRequest, 21 UpsertResponse, 22 DeleteRequest, 23 ListResponse, 24 SearchRecordsResponse, 25) 26 27from ..utils import ( 28 setup_async_openapi_client, 29 parse_non_empty_args, 30 validate_and_convert_errors, 31 filter_dict, 32) 33from .types import ( 34 SparseVectorTypedDict, 35 VectorTypedDict, 36 VectorMetadataTypedDict, 37 VectorTuple, 38 VectorTupleWithMetadata, 39 FilterTypedDict, 40 SearchQueryTypedDict, 41 SearchRerankTypedDict, 42) 43from .dataclasses import Vector, SparseValues, FetchResponse, SearchQuery, SearchRerank 44 45from pinecone.openapi_support import OPENAPI_ENDPOINT_PARAMS 46from .index import IndexRequestFactory 47 48from .vector_factory import VectorFactory 49from .query_results_aggregator import QueryNamespacesResults 50from .features.bulk_import import ImportFeatureMixinAsyncio 51 52 53logger = logging.getLogger(__name__) 54""" @private """ 55 56__all__ = ["_IndexAsyncio"] 57 58_OPENAPI_ENDPOINT_PARAMS = ( 59 "_return_http_data_only", 60 "_preload_content", 61 "_request_timeout", 62 "_check_input_type", 63 "_check_return_type", 64) 65""" @private """ 66 67 68def parse_query_response(response: QueryResponse): 69 if hasattr(response, "_data_store"): 70 # I'm not sure, but I think this is no longer needed. At some point 71 # in the past the query response returned "results" instead of matches 72 # and then for some time it returned both keys even though "results" 73 # was always empty. I'm leaving this here just in case. 74 response._data_store.pop("results", None) 75 return response 76 77 78class _IndexAsyncio(IndexAsyncioInterface, ImportFeatureMixinAsyncio): 79 """ 80 The `IndexAsyncio` class provides an asynchronous interface to interact with Pinecone indexes. 81 82 Like the `Index` class, it provides methods to upsert, delete, fetch, and query vectors in a Pinecone index. 83 84 The `IndexAsyncio` class is instantiated through a helper method of the `Pinecone` class. It is not meant to be instantiated directly. 85 This is to ensure that configuration is handled consistently across all Pinecone objects. 86 87 ## Managing the async context 88 89 The `IndexAsyncio` class relies on an underlying `aiohttp` `ClientSession` to make asynchronous HTTP requests. To ensure that the session is properly closed, you 90 should use the `async with` syntax when creating a `IndexAsyncio` object to use it as an async context manager. This will ensure that the session is properly 91 closed when the context is exited. 92 93 ```python 94 import asyncio 95 from pinecone import Pinecone 96 97 async def main(): 98 pc = Pinecone(api_key='YOUR_API_KEY') 99 async with pc.IndexAsyncio(host='example-index-dojoi3u.svc.eu-west1-gcp.pinecone.io') as idx: 100 # Do async things 101 await idx.upsert( 102 vectors=[ 103 ... 104 ] 105 ) 106 107 asyncio.run(main()) 108 ``` 109 110 As an alternative, if you prefer to avoid code with a nested appearance and are willing to manage cleanup yourself, you can await the `close()` method to close the session when you are done. 111 112 ```python 113 import asyncio 114 from pinecone import Pinecone 115 116 async def main(): 117 pc = Pinecone(api_key='YOUR_API_KEY') 118 idx = pc.IndexAsyncio(host='example-index-dojoi3u.svc.eu-west1-gcp.pinecone.io') 119 120 # Do async things 121 await idx.describe_index_stats() 122 123 # After you're done, you're responsible for calling this yourself 124 await pc.close() 125 126 asyncio.run(main()) 127 ``` 128 129 Failing to do this may result in error messages appearing from the underlyling aiohttp library. 130 """ 131 132 def __init__( 133 self, 134 api_key: str, 135 host: str, 136 additional_headers: Optional[Dict[str, str]] = {}, 137 openapi_config=None, 138 **kwargs, 139 ): 140 self.config = ConfigBuilder.build( 141 api_key=api_key, host=host, additional_headers=additional_headers, **kwargs 142 ) 143 """ @private """ 144 self._openapi_config = ConfigBuilder.build_openapi_config(self.config, openapi_config) 145 """ @private """ 146 147 if kwargs.get("connection_pool_maxsize", None): 148 self._openapi_config.connection_pool_maxsize = kwargs.get("connection_pool_maxsize") 149 150 self._vector_api = setup_async_openapi_client( 151 api_client_klass=AsyncioApiClient, 152 api_klass=AsyncioVectorOperationsApi, 153 config=self.config, 154 openapi_config=self._openapi_config, 155 api_version=API_VERSION, 156 ) 157 """ @private """ 158 159 self._api_client = self._vector_api.api_client 160 """ @private """ 161 162 # Pass the same api_client to the ImportFeatureMixinAsyncio 163 # This is important for async context management to work correctly 164 super().__init__(api_client=self._api_client) 165 166 async def __aenter__(self): 167 return self 168 169 async def __aexit__(self, exc_type, exc_value, traceback): 170 await self._api_client.close() 171 172 async def close(self): 173 """Cleanup resources used by the Pinecone Index client. 174 175 This method should be called when the client is no longer needed so that 176 it can cleanup the aioahttp session and other resources. 177 178 After close has been called, the client instance should not be used. 179 180 ```python 181 import asyncio 182 from pinecone import Pinecone 183 184 async def main(): 185 pc = Pinecone() 186 idx = pc.IndexAsyncio(host='example-index-dojoi3u.svc.eu-west1-gcp.pinecone.io') 187 await idx.upsert_records( 188 namespace='my-namespace', 189 records=[ 190 ... 191 ] 192 ) 193 194 # Close the client when done 195 await idx.close() 196 197 asyncio.run(main()) 198 ``` 199 200 If you are using the client as a context manager, the close method is called automatically 201 when exiting. 202 203 ```python 204 import asyncio 205 from pinecone import Pinecone 206 207 async def main(): 208 pc = Pinecone() 209 async with pc.IndexAscynio(host='example-index-dojoi3u.svc.eu-west1-gcp.pinecone.io') as idx: 210 await idx.upsert_records( 211 namespace='my-namespace', 212 records=[ 213 ... 214 ] 215 ) 216 217 # No need to call close in this case because the "async with" syntax 218 # automatically calls close when exiting the block. 219 asyncio.run(main()) 220 ``` 221 222 """ 223 await self._api_client.close() 224 225 @validate_and_convert_errors 226 async def upsert( 227 self, 228 vectors: Union[ 229 List[Vector], List[VectorTuple], List[VectorTupleWithMetadata], List[VectorTypedDict] 230 ], 231 namespace: Optional[str] = None, 232 batch_size: Optional[int] = None, 233 show_progress: bool = True, 234 **kwargs, 235 ) -> UpsertResponse: 236 _check_type = kwargs.pop("_check_type", True) 237 238 if batch_size is None: 239 return await self._upsert_batch(vectors, namespace, _check_type, **kwargs) 240 241 if not isinstance(batch_size, int) or batch_size <= 0: 242 raise ValueError("batch_size must be a positive integer") 243 244 upsert_tasks = [ 245 self._upsert_batch(vectors[i : i + batch_size], namespace, _check_type, **kwargs) 246 for i in range(0, len(vectors), batch_size) 247 ] 248 249 total_upserted = 0 250 with tqdm(total=len(vectors), desc="Upserted vectors", disable=not show_progress) as pbar: 251 for task in asyncio.as_completed(upsert_tasks): 252 res = await task 253 pbar.update(res.upserted_count) 254 total_upserted += res.upserted_count 255 256 return UpsertResponse(upserted_count=total_upserted) 257 258 @validate_and_convert_errors 259 async def _upsert_batch( 260 self, 261 vectors: Union[ 262 List[Vector], List[VectorTuple], List[VectorTupleWithMetadata], List[VectorTypedDict] 263 ], 264 namespace: Optional[str], 265 _check_type: bool, 266 **kwargs, 267 ) -> UpsertResponse: 268 args_dict = parse_non_empty_args([("namespace", namespace)]) 269 270 def vec_builder(v): 271 return VectorFactory.build(v, check_type=_check_type) 272 273 return await self._vector_api.upsert_vectors( 274 UpsertRequest( 275 vectors=list(map(vec_builder, vectors)), 276 **args_dict, 277 _check_type=_check_type, 278 **{k: v for k, v in kwargs.items() if k not in _OPENAPI_ENDPOINT_PARAMS}, 279 ), 280 **{k: v for k, v in kwargs.items() if k in _OPENAPI_ENDPOINT_PARAMS}, 281 ) 282 283 @validate_and_convert_errors 284 async def upsert_from_dataframe( 285 self, df, namespace: Optional[str] = None, batch_size: int = 500, show_progress: bool = True 286 ): 287 raise NotImplementedError("upsert_from_dataframe is not implemented for asyncio") 288 289 @validate_and_convert_errors 290 async def delete( 291 self, 292 ids: Optional[List[str]] = None, 293 delete_all: Optional[bool] = None, 294 namespace: Optional[str] = None, 295 filter: Optional[FilterTypedDict] = None, 296 **kwargs, 297 ) -> Dict[str, Any]: 298 _check_type = kwargs.pop("_check_type", False) 299 args_dict = parse_non_empty_args( 300 [("ids", ids), ("delete_all", delete_all), ("namespace", namespace), ("filter", filter)] 301 ) 302 303 return await self._vector_api.delete_vectors( 304 DeleteRequest( 305 **args_dict, 306 **{ 307 k: v 308 for k, v in kwargs.items() 309 if k not in _OPENAPI_ENDPOINT_PARAMS and v is not None 310 }, 311 _check_type=_check_type, 312 ), 313 **{k: v for k, v in kwargs.items() if k in _OPENAPI_ENDPOINT_PARAMS}, 314 ) 315 316 @validate_and_convert_errors 317 async def fetch( 318 self, ids: List[str], namespace: Optional[str] = None, **kwargs 319 ) -> FetchResponse: 320 args_dict = parse_non_empty_args([("namespace", namespace)]) 321 return await self._vector_api.fetch_vectors(ids=ids, **args_dict, **kwargs) 322 323 @validate_and_convert_errors 324 async def query( 325 self, 326 *args, 327 top_k: int, 328 vector: Optional[List[float]] = None, 329 id: Optional[str] = None, 330 namespace: Optional[str] = None, 331 filter: Optional[FilterTypedDict] = None, 332 include_values: Optional[bool] = None, 333 include_metadata: Optional[bool] = None, 334 sparse_vector: Optional[Union[SparseValues, SparseVectorTypedDict]] = None, 335 **kwargs, 336 ) -> QueryResponse: 337 response = await self._query( 338 *args, 339 top_k=top_k, 340 vector=vector, 341 id=id, 342 namespace=namespace, 343 filter=filter, 344 include_values=include_values, 345 include_metadata=include_metadata, 346 sparse_vector=sparse_vector, 347 **kwargs, 348 ) 349 return parse_query_response(response) 350 351 async def _query( 352 self, 353 *args, 354 top_k: int, 355 vector: Optional[List[float]] = None, 356 id: Optional[str] = None, 357 namespace: Optional[str] = None, 358 filter: Optional[FilterTypedDict] = None, 359 include_values: Optional[bool] = None, 360 include_metadata: Optional[bool] = None, 361 sparse_vector: Optional[Union[SparseValues, SparseVectorTypedDict]] = None, 362 **kwargs, 363 ) -> QueryResponse: 364 if len(args) > 0: 365 raise ValueError( 366 "Please use keyword arguments instead of positional arguments. Example: index.query(vector=[0.1, 0.2, 0.3], top_k=10, namespace='my_namespace')" 367 ) 368 369 request = IndexRequestFactory.query_request( 370 top_k=top_k, 371 vector=vector, 372 id=id, 373 namespace=namespace, 374 filter=filter, 375 include_values=include_values, 376 include_metadata=include_metadata, 377 sparse_vector=sparse_vector, 378 **kwargs, 379 ) 380 return await self._vector_api.query_vectors( 381 request, **{k: v for k, v in kwargs.items() if k in _OPENAPI_ENDPOINT_PARAMS} 382 ) 383 384 @validate_and_convert_errors 385 async def query_namespaces( 386 self, 387 namespaces: List[str], 388 metric: Literal["cosine", "euclidean", "dotproduct"], 389 top_k: Optional[int] = None, 390 filter: Optional[Dict[str, Union[str, float, int, bool, List, dict]]] = None, 391 include_values: Optional[bool] = None, 392 include_metadata: Optional[bool] = None, 393 vector: Optional[List[float]] = None, 394 sparse_vector: Optional[ 395 Union[SparseValues, Dict[str, Union[List[float], List[int]]]] 396 ] = None, 397 **kwargs, 398 ) -> QueryNamespacesResults: 399 if namespaces is None or len(namespaces) == 0: 400 raise ValueError("At least one namespace must be specified") 401 if sparse_vector is None and vector is not None and len(vector) == 0: 402 # If querying with a vector, it must not be empty 403 raise ValueError("Query vector must not be empty") 404 405 overall_topk = top_k if top_k is not None else 10 406 aggregator = QueryResultsAggregator(top_k=overall_topk, metric=metric) 407 408 target_namespaces = set(namespaces) # dedup namespaces 409 tasks = [ 410 self.query( 411 vector=vector, 412 namespace=ns, 413 top_k=overall_topk, 414 filter=filter, 415 include_values=include_values, 416 include_metadata=include_metadata, 417 sparse_vector=sparse_vector, 418 async_threadpool_executor=True, 419 _preload_content=False, 420 **kwargs, 421 ) 422 for ns in target_namespaces 423 ] 424 425 for task in asyncio.as_completed(tasks): 426 raw_result = await task 427 response = json.loads(raw_result.data.decode("utf-8")) 428 aggregator.add_results(response) 429 430 final_results = aggregator.get_results() 431 return final_results 432 433 @validate_and_convert_errors 434 async def update( 435 self, 436 id: str, 437 values: Optional[List[float]] = None, 438 set_metadata: Optional[VectorMetadataTypedDict] = None, 439 namespace: Optional[str] = None, 440 sparse_values: Optional[Union[SparseValues, SparseVectorTypedDict]] = None, 441 **kwargs, 442 ) -> Dict[str, Any]: 443 return await self._vector_api.update_vector( 444 IndexRequestFactory.update_request( 445 id=id, 446 values=values, 447 set_metadata=set_metadata, 448 namespace=namespace, 449 sparse_values=sparse_values, 450 **kwargs, 451 ), 452 **self._openapi_kwargs(kwargs), 453 ) 454 455 @validate_and_convert_errors 456 async def describe_index_stats( 457 self, filter: Optional[FilterTypedDict] = None, **kwargs 458 ) -> DescribeIndexStatsResponse: 459 return await self._vector_api.describe_index_stats( 460 IndexRequestFactory.describe_index_stats_request(filter, **kwargs), 461 **self._openapi_kwargs(kwargs), 462 ) 463 464 @validate_and_convert_errors 465 async def list_paginated( 466 self, 467 prefix: Optional[str] = None, 468 limit: Optional[int] = None, 469 pagination_token: Optional[str] = None, 470 namespace: Optional[str] = None, 471 **kwargs, 472 ) -> ListResponse: 473 args_dict = IndexRequestFactory.list_paginated_args( 474 prefix=prefix, 475 limit=limit, 476 pagination_token=pagination_token, 477 namespace=namespace, 478 **kwargs, 479 ) 480 return await self._vector_api.list_vectors(**args_dict, **kwargs) 481 482 @validate_and_convert_errors 483 async def list(self, **kwargs): 484 done = False 485 while not done: 486 results = await self.list_paginated(**kwargs) 487 if len(results.vectors) > 0: 488 yield [v.id for v in results.vectors] 489 490 if results.pagination: 491 kwargs.update({"pagination_token": results.pagination.next}) 492 else: 493 done = True 494 495 async def upsert_records(self, namespace: str, records: List[Dict]): 496 args = IndexRequestFactory.upsert_records_args(namespace=namespace, records=records) 497 await self._vector_api.upsert_records_namespace(**args) 498 499 async def search( 500 self, 501 namespace: str, 502 query: Union[SearchQueryTypedDict, SearchQuery], 503 rerank: Optional[Union[SearchRerankTypedDict, SearchRerank]] = None, 504 fields: Optional[List[str]] = ["*"], # Default to returning all fields 505 ) -> SearchRecordsResponse: 506 if namespace is None: 507 raise Exception("Namespace is required when searching records") 508 509 request = IndexRequestFactory.search_request(query=query, rerank=rerank, fields=fields) 510 511 return await self._vector_api.search_records_namespace(namespace, request) 512 513 async def search_records( 514 self, 515 namespace: str, 516 query: Union[SearchQueryTypedDict, SearchQuery], 517 rerank: Optional[Union[SearchRerankTypedDict, SearchRerank]] = None, 518 fields: Optional[List[str]] = ["*"], # Default to returning all fields 519 ) -> SearchRecordsResponse: 520 return await self.search(namespace, query=query, rerank=rerank, fields=fields) 521 522 def _openapi_kwargs(self, kwargs: Dict[str, Any]) -> Dict[str, Any]: 523 return filter_dict(kwargs, OPENAPI_ENDPOINT_PARAMS)
79class _IndexAsyncio(IndexAsyncioInterface, ImportFeatureMixinAsyncio): 80 """ 81 The `IndexAsyncio` class provides an asynchronous interface to interact with Pinecone indexes. 82 83 Like the `Index` class, it provides methods to upsert, delete, fetch, and query vectors in a Pinecone index. 84 85 The `IndexAsyncio` class is instantiated through a helper method of the `Pinecone` class. It is not meant to be instantiated directly. 86 This is to ensure that configuration is handled consistently across all Pinecone objects. 87 88 ## Managing the async context 89 90 The `IndexAsyncio` class relies on an underlying `aiohttp` `ClientSession` to make asynchronous HTTP requests. To ensure that the session is properly closed, you 91 should use the `async with` syntax when creating a `IndexAsyncio` object to use it as an async context manager. This will ensure that the session is properly 92 closed when the context is exited. 93 94 ```python 95 import asyncio 96 from pinecone import Pinecone 97 98 async def main(): 99 pc = Pinecone(api_key='YOUR_API_KEY') 100 async with pc.IndexAsyncio(host='example-index-dojoi3u.svc.eu-west1-gcp.pinecone.io') as idx: 101 # Do async things 102 await idx.upsert( 103 vectors=[ 104 ... 105 ] 106 ) 107 108 asyncio.run(main()) 109 ``` 110 111 As an alternative, if you prefer to avoid code with a nested appearance and are willing to manage cleanup yourself, you can await the `close()` method to close the session when you are done. 112 113 ```python 114 import asyncio 115 from pinecone import Pinecone 116 117 async def main(): 118 pc = Pinecone(api_key='YOUR_API_KEY') 119 idx = pc.IndexAsyncio(host='example-index-dojoi3u.svc.eu-west1-gcp.pinecone.io') 120 121 # Do async things 122 await idx.describe_index_stats() 123 124 # After you're done, you're responsible for calling this yourself 125 await pc.close() 126 127 asyncio.run(main()) 128 ``` 129 130 Failing to do this may result in error messages appearing from the underlyling aiohttp library. 131 """ 132 133 def __init__( 134 self, 135 api_key: str, 136 host: str, 137 additional_headers: Optional[Dict[str, str]] = {}, 138 openapi_config=None, 139 **kwargs, 140 ): 141 self.config = ConfigBuilder.build( 142 api_key=api_key, host=host, additional_headers=additional_headers, **kwargs 143 ) 144 """ @private """ 145 self._openapi_config = ConfigBuilder.build_openapi_config(self.config, openapi_config) 146 """ @private """ 147 148 if kwargs.get("connection_pool_maxsize", None): 149 self._openapi_config.connection_pool_maxsize = kwargs.get("connection_pool_maxsize") 150 151 self._vector_api = setup_async_openapi_client( 152 api_client_klass=AsyncioApiClient, 153 api_klass=AsyncioVectorOperationsApi, 154 config=self.config, 155 openapi_config=self._openapi_config, 156 api_version=API_VERSION, 157 ) 158 """ @private """ 159 160 self._api_client = self._vector_api.api_client 161 """ @private """ 162 163 # Pass the same api_client to the ImportFeatureMixinAsyncio 164 # This is important for async context management to work correctly 165 super().__init__(api_client=self._api_client) 166 167 async def __aenter__(self): 168 return self 169 170 async def __aexit__(self, exc_type, exc_value, traceback): 171 await self._api_client.close() 172 173 async def close(self): 174 """Cleanup resources used by the Pinecone Index client. 175 176 This method should be called when the client is no longer needed so that 177 it can cleanup the aioahttp session and other resources. 178 179 After close has been called, the client instance should not be used. 180 181 ```python 182 import asyncio 183 from pinecone import Pinecone 184 185 async def main(): 186 pc = Pinecone() 187 idx = pc.IndexAsyncio(host='example-index-dojoi3u.svc.eu-west1-gcp.pinecone.io') 188 await idx.upsert_records( 189 namespace='my-namespace', 190 records=[ 191 ... 192 ] 193 ) 194 195 # Close the client when done 196 await idx.close() 197 198 asyncio.run(main()) 199 ``` 200 201 If you are using the client as a context manager, the close method is called automatically 202 when exiting. 203 204 ```python 205 import asyncio 206 from pinecone import Pinecone 207 208 async def main(): 209 pc = Pinecone() 210 async with pc.IndexAscynio(host='example-index-dojoi3u.svc.eu-west1-gcp.pinecone.io') as idx: 211 await idx.upsert_records( 212 namespace='my-namespace', 213 records=[ 214 ... 215 ] 216 ) 217 218 # No need to call close in this case because the "async with" syntax 219 # automatically calls close when exiting the block. 220 asyncio.run(main()) 221 ``` 222 223 """ 224 await self._api_client.close() 225 226 @validate_and_convert_errors 227 async def upsert( 228 self, 229 vectors: Union[ 230 List[Vector], List[VectorTuple], List[VectorTupleWithMetadata], List[VectorTypedDict] 231 ], 232 namespace: Optional[str] = None, 233 batch_size: Optional[int] = None, 234 show_progress: bool = True, 235 **kwargs, 236 ) -> UpsertResponse: 237 _check_type = kwargs.pop("_check_type", True) 238 239 if batch_size is None: 240 return await self._upsert_batch(vectors, namespace, _check_type, **kwargs) 241 242 if not isinstance(batch_size, int) or batch_size <= 0: 243 raise ValueError("batch_size must be a positive integer") 244 245 upsert_tasks = [ 246 self._upsert_batch(vectors[i : i + batch_size], namespace, _check_type, **kwargs) 247 for i in range(0, len(vectors), batch_size) 248 ] 249 250 total_upserted = 0 251 with tqdm(total=len(vectors), desc="Upserted vectors", disable=not show_progress) as pbar: 252 for task in asyncio.as_completed(upsert_tasks): 253 res = await task 254 pbar.update(res.upserted_count) 255 total_upserted += res.upserted_count 256 257 return UpsertResponse(upserted_count=total_upserted) 258 259 @validate_and_convert_errors 260 async def _upsert_batch( 261 self, 262 vectors: Union[ 263 List[Vector], List[VectorTuple], List[VectorTupleWithMetadata], List[VectorTypedDict] 264 ], 265 namespace: Optional[str], 266 _check_type: bool, 267 **kwargs, 268 ) -> UpsertResponse: 269 args_dict = parse_non_empty_args([("namespace", namespace)]) 270 271 def vec_builder(v): 272 return VectorFactory.build(v, check_type=_check_type) 273 274 return await self._vector_api.upsert_vectors( 275 UpsertRequest( 276 vectors=list(map(vec_builder, vectors)), 277 **args_dict, 278 _check_type=_check_type, 279 **{k: v for k, v in kwargs.items() if k not in _OPENAPI_ENDPOINT_PARAMS}, 280 ), 281 **{k: v for k, v in kwargs.items() if k in _OPENAPI_ENDPOINT_PARAMS}, 282 ) 283 284 @validate_and_convert_errors 285 async def upsert_from_dataframe( 286 self, df, namespace: Optional[str] = None, batch_size: int = 500, show_progress: bool = True 287 ): 288 raise NotImplementedError("upsert_from_dataframe is not implemented for asyncio") 289 290 @validate_and_convert_errors 291 async def delete( 292 self, 293 ids: Optional[List[str]] = None, 294 delete_all: Optional[bool] = None, 295 namespace: Optional[str] = None, 296 filter: Optional[FilterTypedDict] = None, 297 **kwargs, 298 ) -> Dict[str, Any]: 299 _check_type = kwargs.pop("_check_type", False) 300 args_dict = parse_non_empty_args( 301 [("ids", ids), ("delete_all", delete_all), ("namespace", namespace), ("filter", filter)] 302 ) 303 304 return await self._vector_api.delete_vectors( 305 DeleteRequest( 306 **args_dict, 307 **{ 308 k: v 309 for k, v in kwargs.items() 310 if k not in _OPENAPI_ENDPOINT_PARAMS and v is not None 311 }, 312 _check_type=_check_type, 313 ), 314 **{k: v for k, v in kwargs.items() if k in _OPENAPI_ENDPOINT_PARAMS}, 315 ) 316 317 @validate_and_convert_errors 318 async def fetch( 319 self, ids: List[str], namespace: Optional[str] = None, **kwargs 320 ) -> FetchResponse: 321 args_dict = parse_non_empty_args([("namespace", namespace)]) 322 return await self._vector_api.fetch_vectors(ids=ids, **args_dict, **kwargs) 323 324 @validate_and_convert_errors 325 async def query( 326 self, 327 *args, 328 top_k: int, 329 vector: Optional[List[float]] = None, 330 id: Optional[str] = None, 331 namespace: Optional[str] = None, 332 filter: Optional[FilterTypedDict] = None, 333 include_values: Optional[bool] = None, 334 include_metadata: Optional[bool] = None, 335 sparse_vector: Optional[Union[SparseValues, SparseVectorTypedDict]] = None, 336 **kwargs, 337 ) -> QueryResponse: 338 response = await self._query( 339 *args, 340 top_k=top_k, 341 vector=vector, 342 id=id, 343 namespace=namespace, 344 filter=filter, 345 include_values=include_values, 346 include_metadata=include_metadata, 347 sparse_vector=sparse_vector, 348 **kwargs, 349 ) 350 return parse_query_response(response) 351 352 async def _query( 353 self, 354 *args, 355 top_k: int, 356 vector: Optional[List[float]] = None, 357 id: Optional[str] = None, 358 namespace: Optional[str] = None, 359 filter: Optional[FilterTypedDict] = None, 360 include_values: Optional[bool] = None, 361 include_metadata: Optional[bool] = None, 362 sparse_vector: Optional[Union[SparseValues, SparseVectorTypedDict]] = None, 363 **kwargs, 364 ) -> QueryResponse: 365 if len(args) > 0: 366 raise ValueError( 367 "Please use keyword arguments instead of positional arguments. Example: index.query(vector=[0.1, 0.2, 0.3], top_k=10, namespace='my_namespace')" 368 ) 369 370 request = IndexRequestFactory.query_request( 371 top_k=top_k, 372 vector=vector, 373 id=id, 374 namespace=namespace, 375 filter=filter, 376 include_values=include_values, 377 include_metadata=include_metadata, 378 sparse_vector=sparse_vector, 379 **kwargs, 380 ) 381 return await self._vector_api.query_vectors( 382 request, **{k: v for k, v in kwargs.items() if k in _OPENAPI_ENDPOINT_PARAMS} 383 ) 384 385 @validate_and_convert_errors 386 async def query_namespaces( 387 self, 388 namespaces: List[str], 389 metric: Literal["cosine", "euclidean", "dotproduct"], 390 top_k: Optional[int] = None, 391 filter: Optional[Dict[str, Union[str, float, int, bool, List, dict]]] = None, 392 include_values: Optional[bool] = None, 393 include_metadata: Optional[bool] = None, 394 vector: Optional[List[float]] = None, 395 sparse_vector: Optional[ 396 Union[SparseValues, Dict[str, Union[List[float], List[int]]]] 397 ] = None, 398 **kwargs, 399 ) -> QueryNamespacesResults: 400 if namespaces is None or len(namespaces) == 0: 401 raise ValueError("At least one namespace must be specified") 402 if sparse_vector is None and vector is not None and len(vector) == 0: 403 # If querying with a vector, it must not be empty 404 raise ValueError("Query vector must not be empty") 405 406 overall_topk = top_k if top_k is not None else 10 407 aggregator = QueryResultsAggregator(top_k=overall_topk, metric=metric) 408 409 target_namespaces = set(namespaces) # dedup namespaces 410 tasks = [ 411 self.query( 412 vector=vector, 413 namespace=ns, 414 top_k=overall_topk, 415 filter=filter, 416 include_values=include_values, 417 include_metadata=include_metadata, 418 sparse_vector=sparse_vector, 419 async_threadpool_executor=True, 420 _preload_content=False, 421 **kwargs, 422 ) 423 for ns in target_namespaces 424 ] 425 426 for task in asyncio.as_completed(tasks): 427 raw_result = await task 428 response = json.loads(raw_result.data.decode("utf-8")) 429 aggregator.add_results(response) 430 431 final_results = aggregator.get_results() 432 return final_results 433 434 @validate_and_convert_errors 435 async def update( 436 self, 437 id: str, 438 values: Optional[List[float]] = None, 439 set_metadata: Optional[VectorMetadataTypedDict] = None, 440 namespace: Optional[str] = None, 441 sparse_values: Optional[Union[SparseValues, SparseVectorTypedDict]] = None, 442 **kwargs, 443 ) -> Dict[str, Any]: 444 return await self._vector_api.update_vector( 445 IndexRequestFactory.update_request( 446 id=id, 447 values=values, 448 set_metadata=set_metadata, 449 namespace=namespace, 450 sparse_values=sparse_values, 451 **kwargs, 452 ), 453 **self._openapi_kwargs(kwargs), 454 ) 455 456 @validate_and_convert_errors 457 async def describe_index_stats( 458 self, filter: Optional[FilterTypedDict] = None, **kwargs 459 ) -> DescribeIndexStatsResponse: 460 return await self._vector_api.describe_index_stats( 461 IndexRequestFactory.describe_index_stats_request(filter, **kwargs), 462 **self._openapi_kwargs(kwargs), 463 ) 464 465 @validate_and_convert_errors 466 async def list_paginated( 467 self, 468 prefix: Optional[str] = None, 469 limit: Optional[int] = None, 470 pagination_token: Optional[str] = None, 471 namespace: Optional[str] = None, 472 **kwargs, 473 ) -> ListResponse: 474 args_dict = IndexRequestFactory.list_paginated_args( 475 prefix=prefix, 476 limit=limit, 477 pagination_token=pagination_token, 478 namespace=namespace, 479 **kwargs, 480 ) 481 return await self._vector_api.list_vectors(**args_dict, **kwargs) 482 483 @validate_and_convert_errors 484 async def list(self, **kwargs): 485 done = False 486 while not done: 487 results = await self.list_paginated(**kwargs) 488 if len(results.vectors) > 0: 489 yield [v.id for v in results.vectors] 490 491 if results.pagination: 492 kwargs.update({"pagination_token": results.pagination.next}) 493 else: 494 done = True 495 496 async def upsert_records(self, namespace: str, records: List[Dict]): 497 args = IndexRequestFactory.upsert_records_args(namespace=namespace, records=records) 498 await self._vector_api.upsert_records_namespace(**args) 499 500 async def search( 501 self, 502 namespace: str, 503 query: Union[SearchQueryTypedDict, SearchQuery], 504 rerank: Optional[Union[SearchRerankTypedDict, SearchRerank]] = None, 505 fields: Optional[List[str]] = ["*"], # Default to returning all fields 506 ) -> SearchRecordsResponse: 507 if namespace is None: 508 raise Exception("Namespace is required when searching records") 509 510 request = IndexRequestFactory.search_request(query=query, rerank=rerank, fields=fields) 511 512 return await self._vector_api.search_records_namespace(namespace, request) 513 514 async def search_records( 515 self, 516 namespace: str, 517 query: Union[SearchQueryTypedDict, SearchQuery], 518 rerank: Optional[Union[SearchRerankTypedDict, SearchRerank]] = None, 519 fields: Optional[List[str]] = ["*"], # Default to returning all fields 520 ) -> SearchRecordsResponse: 521 return await self.search(namespace, query=query, rerank=rerank, fields=fields) 522 523 def _openapi_kwargs(self, kwargs: Dict[str, Any]) -> Dict[str, Any]: 524 return filter_dict(kwargs, OPENAPI_ENDPOINT_PARAMS)
The IndexAsyncio
class provides an asynchronous interface to interact with Pinecone indexes.
Like the Index
class, it provides methods to upsert, delete, fetch, and query vectors in a Pinecone index.
The IndexAsyncio
class is instantiated through a helper method of the Pinecone
class. It is not meant to be instantiated directly.
This is to ensure that configuration is handled consistently across all Pinecone objects.
Managing the async context
The IndexAsyncio
class relies on an underlying aiohttp
ClientSession
to make asynchronous HTTP requests. To ensure that the session is properly closed, you
should use the async with
syntax when creating a IndexAsyncio
object to use it as an async context manager. This will ensure that the session is properly
closed when the context is exited.
import asyncio
from pinecone import Pinecone
async def main():
pc = Pinecone(api_key='YOUR_API_KEY')
async with pc.IndexAsyncio(host='example-index-dojoi3u.svc.eu-west1-gcp.pinecone.io') as idx:
# Do async things
await idx.upsert(
vectors=[
...
]
)
asyncio.run(main())
As an alternative, if you prefer to avoid code with a nested appearance and are willing to manage cleanup yourself, you can await the close()
method to close the session when you are done.
import asyncio
from pinecone import Pinecone
async def main():
pc = Pinecone(api_key='YOUR_API_KEY')
idx = pc.IndexAsyncio(host='example-index-dojoi3u.svc.eu-west1-gcp.pinecone.io')
# Do async things
await idx.describe_index_stats()
# After you're done, you're responsible for calling this yourself
await pc.close()
asyncio.run(main())
Failing to do this may result in error messages appearing from the underlyling aiohttp library.
133 def __init__( 134 self, 135 api_key: str, 136 host: str, 137 additional_headers: Optional[Dict[str, str]] = {}, 138 openapi_config=None, 139 **kwargs, 140 ): 141 self.config = ConfigBuilder.build( 142 api_key=api_key, host=host, additional_headers=additional_headers, **kwargs 143 ) 144 """ @private """ 145 self._openapi_config = ConfigBuilder.build_openapi_config(self.config, openapi_config) 146 """ @private """ 147 148 if kwargs.get("connection_pool_maxsize", None): 149 self._openapi_config.connection_pool_maxsize = kwargs.get("connection_pool_maxsize") 150 151 self._vector_api = setup_async_openapi_client( 152 api_client_klass=AsyncioApiClient, 153 api_klass=AsyncioVectorOperationsApi, 154 config=self.config, 155 openapi_config=self._openapi_config, 156 api_version=API_VERSION, 157 ) 158 """ @private """ 159 160 self._api_client = self._vector_api.api_client 161 """ @private """ 162 163 # Pass the same api_client to the ImportFeatureMixinAsyncio 164 # This is important for async context management to work correctly 165 super().__init__(api_client=self._api_client)
173 async def close(self): 174 """Cleanup resources used by the Pinecone Index client. 175 176 This method should be called when the client is no longer needed so that 177 it can cleanup the aioahttp session and other resources. 178 179 After close has been called, the client instance should not be used. 180 181 ```python 182 import asyncio 183 from pinecone import Pinecone 184 185 async def main(): 186 pc = Pinecone() 187 idx = pc.IndexAsyncio(host='example-index-dojoi3u.svc.eu-west1-gcp.pinecone.io') 188 await idx.upsert_records( 189 namespace='my-namespace', 190 records=[ 191 ... 192 ] 193 ) 194 195 # Close the client when done 196 await idx.close() 197 198 asyncio.run(main()) 199 ``` 200 201 If you are using the client as a context manager, the close method is called automatically 202 when exiting. 203 204 ```python 205 import asyncio 206 from pinecone import Pinecone 207 208 async def main(): 209 pc = Pinecone() 210 async with pc.IndexAscynio(host='example-index-dojoi3u.svc.eu-west1-gcp.pinecone.io') as idx: 211 await idx.upsert_records( 212 namespace='my-namespace', 213 records=[ 214 ... 215 ] 216 ) 217 218 # No need to call close in this case because the "async with" syntax 219 # automatically calls close when exiting the block. 220 asyncio.run(main()) 221 ``` 222 223 """ 224 await self._api_client.close()
Cleanup resources used by the Pinecone Index client.
This method should be called when the client is no longer needed so that it can cleanup the aioahttp session and other resources.
After close has been called, the client instance should not be used.
import asyncio
from pinecone import Pinecone
async def main():
pc = Pinecone()
idx = pc.IndexAsyncio(host='example-index-dojoi3u.svc.eu-west1-gcp.pinecone.io')
await idx.upsert_records(
namespace='my-namespace',
records=[
...
]
)
# Close the client when done
await idx.close()
asyncio.run(main())
If you are using the client as a context manager, the close method is called automatically when exiting.
import asyncio
from pinecone import Pinecone
async def main():
pc = Pinecone()
async with pc.IndexAscynio(host='example-index-dojoi3u.svc.eu-west1-gcp.pinecone.io') as idx:
await idx.upsert_records(
namespace='my-namespace',
records=[
...
]
)
# No need to call close in this case because the "async with" syntax
# automatically calls close when exiting the block.
asyncio.run(main())
226 @validate_and_convert_errors 227 async def upsert( 228 self, 229 vectors: Union[ 230 List[Vector], List[VectorTuple], List[VectorTupleWithMetadata], List[VectorTypedDict] 231 ], 232 namespace: Optional[str] = None, 233 batch_size: Optional[int] = None, 234 show_progress: bool = True, 235 **kwargs, 236 ) -> UpsertResponse: 237 _check_type = kwargs.pop("_check_type", True) 238 239 if batch_size is None: 240 return await self._upsert_batch(vectors, namespace, _check_type, **kwargs) 241 242 if not isinstance(batch_size, int) or batch_size <= 0: 243 raise ValueError("batch_size must be a positive integer") 244 245 upsert_tasks = [ 246 self._upsert_batch(vectors[i : i + batch_size], namespace, _check_type, **kwargs) 247 for i in range(0, len(vectors), batch_size) 248 ] 249 250 total_upserted = 0 251 with tqdm(total=len(vectors), desc="Upserted vectors", disable=not show_progress) as pbar: 252 for task in asyncio.as_completed(upsert_tasks): 253 res = await task 254 pbar.update(res.upserted_count) 255 total_upserted += res.upserted_count 256 257 return UpsertResponse(upserted_count=total_upserted)
Arguments:
- vectors (Union[List[Vector], List[VectorTuple], List[VectorTupleWithMetadata], List[VectorTypedDict]]): A list of vectors to upsert.
- namespace (str): The namespace to write to. If not specified, the default namespace is used. [optional]
- batch_size (int): The number of vectors to upsert in each batch. If not specified, all vectors will be upserted in a single batch. [optional]
- show_progress (bool): Whether to show a progress bar using tqdm. Applied only if batch_size is provided. Default is True.
Returns:
UpsertResponse
, includes the number of vectors upserted.
The upsert operation writes vectors into a namespace. If a new value is upserted for an existing vector id, it will overwrite the previous value.
To upsert in parallel follow: https://docs.pinecone.io/docs/insert-data#sending-upserts-in-parallel
Upserting dense vectors
Note: the dimension of each dense vector must match the dimension of the index.
A vector can be represented in a variety of ways.
import asyncio
from pinecone import Pinecone, Vector
async def main():
pc = Pinecone()
async with pc.IndexAsyncio(host="example-dojoi3u.svc.aped-4627-b74a.pinecone.io") as idx:
# A Vector object
await idx.upsert(
namespace = 'my-namespace',
vectors = [
Vector(id='id1', values=[0.1, 0.2, 0.3, 0.4], metadata={'metadata_key': 'metadata_value'}),
]
)
# A vector tuple
await idx.upsert(
namespace = 'my-namespace',
vectors = [
('id1', [0.1, 0.2, 0.3, 0.4]),
]
)
# A vector tuple with metadata
await idx.upsert(
namespace = 'my-namespace',
vectors = [
('id1', [0.1, 0.2, 0.3, 0.4], {'metadata_key': 'metadata_value'}),
]
)
# A vector dictionary
await idx.upsert(
namespace = 'my-namespace',
vectors = [
{"id": 1, "values": [0.1, 0.2, 0.3, 0.4], "metadata": {"metadata_key": "metadata_value"}},
]
asyncio.run(main())
Upserting sparse vectors
import asyncio
from pinecone import Pinecone, Vector, SparseValues
async def main():
pc = Pinecone()
async with pc.IndexAsyncio(host="example-dojoi3u.svc.aped-4627-b74a.pinecone.io") as idx:
# A Vector object
await idx.upsert(
namespace = 'my-namespace',
vectors = [
Vector(id='id1', sparse_values=SparseValues(indices=[1, 2], values=[0.2, 0.4])),
]
)
# A dictionary
await idx.upsert(
namespace = 'my-namespace',
vectors = [
{"id": 1, "sparse_values": {"indices": [1, 2], "values": [0.2, 0.4]}},
]
)
asyncio.run(main())
Batch upsert
If you have a large number of vectors, you can upsert them in batches.
import asyncio
from pinecone import Pinecone, Vector, SparseValues
async def main():
pc = Pinecone()
async with pc.IndexAsyncio(host="example-dojoi3u.svc.aped-4627-b74a.pinecone.io") as idx:
await idx.upsert(
namespace = 'my-namespace',
vectors = [
{'id': 'id1', 'values': [0.1, 0.2, 0.3, 0.4]},
{'id': 'id2', 'values': [0.2, 0.3, 0.4, 0.5]},
{'id': 'id3', 'values': [0.3, 0.4, 0.5, 0.6]},
{'id': 'id4', 'values': [0.4, 0.5, 0.6, 0.7]},
{'id': 'id5', 'values': [0.5, 0.6, 0.7, 0.8]},
# More vectors here
],
batch_size = 50
)
asyncio.run(main())
Visual progress bar with tqdm
To see a progress bar when upserting in batches, you will need to separately install the tqdm
package.
If tqdm
is present, the client will detect and use it to display progress when show_progress=True
.
284 @validate_and_convert_errors 285 async def upsert_from_dataframe( 286 self, df, namespace: Optional[str] = None, batch_size: int = 500, show_progress: bool = True 287 ): 288 raise NotImplementedError("upsert_from_dataframe is not implemented for asyncio")
This method has not been implemented yet for the IndexAsyncio class.
290 @validate_and_convert_errors 291 async def delete( 292 self, 293 ids: Optional[List[str]] = None, 294 delete_all: Optional[bool] = None, 295 namespace: Optional[str] = None, 296 filter: Optional[FilterTypedDict] = None, 297 **kwargs, 298 ) -> Dict[str, Any]: 299 _check_type = kwargs.pop("_check_type", False) 300 args_dict = parse_non_empty_args( 301 [("ids", ids), ("delete_all", delete_all), ("namespace", namespace), ("filter", filter)] 302 ) 303 304 return await self._vector_api.delete_vectors( 305 DeleteRequest( 306 **args_dict, 307 **{ 308 k: v 309 for k, v in kwargs.items() 310 if k not in _OPENAPI_ENDPOINT_PARAMS and v is not None 311 }, 312 _check_type=_check_type, 313 ), 314 **{k: v for k, v in kwargs.items() if k in _OPENAPI_ENDPOINT_PARAMS}, 315 )
Arguments:
- ids (List[str]): Vector ids to delete [optional]
- delete_all (bool): This indicates that all vectors in the index namespace should be deleted.. [optional] Default is False.
- namespace (str): The namespace to delete vectors from [optional] If not specified, the default namespace is used.
- filter (Dict[str, Union[str, float, int, bool, List, dict]]): If specified, the metadata filter here will be used to select the vectors to delete. This is mutually exclusive with specifying ids to delete in the ids param or using delete_all=True. See https://www.pinecone.io/docs/metadata-filtering/.. [optional]
The Delete operation deletes vectors from the index, from a single namespace.
No error is raised if the vector id does not exist.
Note: For any delete call, if namespace is not specified, the default namespace ""
is used.
Since the delete operation does not error when ids are not present, this means you may not receive
an error if you delete from the wrong namespace.
Delete can occur in the following mutual exclusive ways:
- Delete by ids from a single namespace
- Delete all vectors from a single namespace by setting delete_all to True
- Delete all vectors from a single namespace by specifying a metadata filter (note that for this option delete all must be set to False)
API reference: https://docs.pinecone.io/reference/delete_post
import asyncio
from pinecone import Pinecone, Vector, SparseValues
async def main():
pc = Pinecone()
async with pc.IndexAsyncio(host="example-dojoi3u.svc.aped-4627-b74a.pinecone.io") as idx:
# Delete specific ids
await idx.delete(
ids=['id1', 'id2'],
namespace='my_namespace'
)
# Delete everything in a namespace
await idx.delete(
delete_all=True,
namespace='my_namespace'
)
# Delete by metadata filter
await idx.delete(
filter={'key': 'value'},
namespace='my_namespace'
)
asyncio.run(main())
Returns: An empty dictionary if the delete operation was successful.
317 @validate_and_convert_errors 318 async def fetch( 319 self, ids: List[str], namespace: Optional[str] = None, **kwargs 320 ) -> FetchResponse: 321 args_dict = parse_non_empty_args([("namespace", namespace)]) 322 return await self._vector_api.fetch_vectors(ids=ids, **args_dict, **kwargs)
The fetch operation looks up and returns vectors, by ID, from a single namespace. The returned vectors include the vector data and/or metadata.
API reference: https://docs.pinecone.io/reference/fetch
import asyncio
from pinecone import Pinecone, Vector, SparseValues
async def main():
pc = Pinecone()
async with pc.IndexAsyncio(host="example-dojoi3u.svc.aped-4627-b74a.pinecone.io") as idx:
# Fetch specific ids in namespace
fetched = await idx.fetch(
ids=['id1', 'id2'],
namespace='my_namespace'
)
for vec_id in fetched.vectors:
vector = fetched.vectors[vec_id]
print(vector.id)
print(vector.metadata)
print(vector.values)
asyncio.run(main())
Arguments:
- ids (List[str]): The vector IDs to fetch.
- namespace (str): The namespace to fetch vectors from. If not specified, the default namespace is used. [optional]
Returns: FetchResponse object which contains the list of Vector objects, and namespace name.
324 @validate_and_convert_errors 325 async def query( 326 self, 327 *args, 328 top_k: int, 329 vector: Optional[List[float]] = None, 330 id: Optional[str] = None, 331 namespace: Optional[str] = None, 332 filter: Optional[FilterTypedDict] = None, 333 include_values: Optional[bool] = None, 334 include_metadata: Optional[bool] = None, 335 sparse_vector: Optional[Union[SparseValues, SparseVectorTypedDict]] = None, 336 **kwargs, 337 ) -> QueryResponse: 338 response = await self._query( 339 *args, 340 top_k=top_k, 341 vector=vector, 342 id=id, 343 namespace=namespace, 344 filter=filter, 345 include_values=include_values, 346 include_metadata=include_metadata, 347 sparse_vector=sparse_vector, 348 **kwargs, 349 ) 350 return parse_query_response(response)
The Query operation searches a namespace, using a query vector. It retrieves the ids of the most similar items in a namespace, along with their similarity scores.
API reference: https://docs.pinecone.io/reference/query
Querying with dense vectors
import asyncio
from pinecone import Pinecone, Vector, SparseValues
async def main():
pc = Pinecone()
async with pc.IndexAsyncio(host="example-dojoi3u.svc.aped-4627-b74a.pinecone.io") as idx:
query_embedding = [0.1, 0.2, 0.3, ...] # An embedding that matches the index dimension
# Query by vector values
results = await idx.query(
vector=query_embedding,
top_k=10,
filter={'genre': {"$eq": "drama"}}, # Optionally filter by metadata
namespace='my_namespace',
include_values=False,
include_metadata=True
)
# Query using vector id (the values from this stored vector will be used to query)
results = await idx.query(
id='1',
top_k=10,
filter={"year": {"$gt": 2000}},
namespace='my_namespace',
)
asyncio.run(main())
Query with sparse vectors
import asyncio
from pinecone import Pinecone, Vector, SparseValues
async def main():
pc = Pinecone()
async with pc.IndexAsyncio(host="example-dojoi3u.svc.aped-4627-b74a.pinecone.io") as idx:
query_embedding = [0.1, 0.2, 0.3, ...] # An embedding that matches the index dimension
# Query by vector values
results = await idx.query(
vector=query_embedding,
top_k=10,
filter={'genre': {"$eq": "drama"}}, # Optionally filter by metadata
namespace='my_namespace',
include_values=False,
include_metadata=True
)
# Query using vector id (the values from this stored vector will be used to query)
results = await idx.query(
id='1',
top_k=10,
filter={"year": {"$gt": 2000}},
namespace='my_namespace',
)
asyncio.run(main())
Examples:
>>> index.query(vector=[1, 2, 3], top_k=10, namespace='my_namespace') >>> index.query(id='id1', top_k=10, namespace='my_namespace') >>> index.query(vector=[1, 2, 3], top_k=10, namespace='my_namespace', filter={'key': 'value'}) >>> index.query(id='id1', top_k=10, namespace='my_namespace', include_metadata=True, include_values=True) >>> index.query(vector=[1, 2, 3], sparse_vector={'indices': [1, 2], 'values': [0.2, 0.4]}, >>> top_k=10, namespace='my_namespace') >>> index.query(vector=[1, 2, 3], sparse_vector=SparseValues([1, 2], [0.2, 0.4]), >>> top_k=10, namespace='my_namespace')
Arguments:
- vector (List[float]): The query vector. This should be the same length as the dimension of the index
being queried. Each
query()
request can contain only one of the parametersid
orvector
.. [optional] - id (str): The unique ID of the vector to be used as a query vector.
Each
query()
request can contain only one of the parametersvector
orid
. [optional] - top_k (int): The number of results to return for each query. Must be an integer greater than 1.
- namespace (str): The namespace to fetch vectors from. If not specified, the default namespace is used. [optional]
- filter (Dict[str, Union[str, float, int, bool, List, dict]): The filter to apply. You can use vector metadata to limit your search. See https://www.pinecone.io/docs/metadata-filtering/.. [optional]
- include_values (bool): Indicates whether vector values are included in the response. If omitted the server will use the default value of False [optional]
- include_metadata (bool): Indicates whether metadata is included in the response as well as the ids. If omitted the server will use the default value of False [optional]
- sparse_vector: (Union[SparseValues, Dict[str, Union[List[float], List[int]]]]): sparse values of the query vector. Expected to be either a SparseValues object or a dict of the form: {'indices': List[int], 'values': List[float]}, where the lists each have the same length.
Returns: QueryResponse object which contains the list of the closest vectors as ScoredVector objects, and namespace name.
385 @validate_and_convert_errors 386 async def query_namespaces( 387 self, 388 namespaces: List[str], 389 metric: Literal["cosine", "euclidean", "dotproduct"], 390 top_k: Optional[int] = None, 391 filter: Optional[Dict[str, Union[str, float, int, bool, List, dict]]] = None, 392 include_values: Optional[bool] = None, 393 include_metadata: Optional[bool] = None, 394 vector: Optional[List[float]] = None, 395 sparse_vector: Optional[ 396 Union[SparseValues, Dict[str, Union[List[float], List[int]]]] 397 ] = None, 398 **kwargs, 399 ) -> QueryNamespacesResults: 400 if namespaces is None or len(namespaces) == 0: 401 raise ValueError("At least one namespace must be specified") 402 if sparse_vector is None and vector is not None and len(vector) == 0: 403 # If querying with a vector, it must not be empty 404 raise ValueError("Query vector must not be empty") 405 406 overall_topk = top_k if top_k is not None else 10 407 aggregator = QueryResultsAggregator(top_k=overall_topk, metric=metric) 408 409 target_namespaces = set(namespaces) # dedup namespaces 410 tasks = [ 411 self.query( 412 vector=vector, 413 namespace=ns, 414 top_k=overall_topk, 415 filter=filter, 416 include_values=include_values, 417 include_metadata=include_metadata, 418 sparse_vector=sparse_vector, 419 async_threadpool_executor=True, 420 _preload_content=False, 421 **kwargs, 422 ) 423 for ns in target_namespaces 424 ] 425 426 for task in asyncio.as_completed(tasks): 427 raw_result = await task 428 response = json.loads(raw_result.data.decode("utf-8")) 429 aggregator.add_results(response) 430 431 final_results = aggregator.get_results() 432 return final_results
The query_namespaces() method is used to make a query to multiple namespaces in parallel and combine the results into one result set.
Arguments:
- vector (List[float]): The query vector, must be the same length as the dimension of the index being queried.
- namespaces (List[str]): The list of namespaces to query.
- top_k (Optional[int], optional): The number of results you would like to request from each namespace. Defaults to 10.
- filter (Optional[Dict[str, Union[str, float, int, bool, List, dict]]], optional): Pass an optional filter to filter results based on metadata. Defaults to None.
- include_values (Optional[bool], optional): Boolean field indicating whether vector values should be included with results. Defaults to None.
- include_metadata (Optional[bool], optional): Boolean field indicating whether vector metadata should be included with results. Defaults to None.
- sparse_vector (Optional[ Union[SparseValues, Dict[str, Union[List[float], List[int]]]] ], optional): If you are working with a dotproduct index, you can pass a sparse vector as part of your hybrid search. Defaults to None.
Returns:
QueryNamespacesResults: A QueryNamespacesResults object containing the combined results from all namespaces, as well as the combined usage cost in read units.
Examples:
import asyncio
from pinecone import Pinecone
async def main():
pc = Pinecone(api_key="your-api-key")
idx = pc.IndexAsyncio(
host="example-dojoi3u.svc.aped-4627-b74a.pinecone.io",
)
query_vec = [0.1, 0.2, 0.3] # An embedding that matches the index dimension
combined_results = await idx.query_namespaces(
vector=query_vec,
namespaces=['ns1', 'ns2', 'ns3', 'ns4'],
top_k=10,
filter={'genre': {"$eq": "drama"}},
include_values=True,
include_metadata=True
)
for vec in combined_results.matches:
print(vec.id, vec.score)
print(combined_results.usage)
await idx.close()
asyncio.run(main())
434 @validate_and_convert_errors 435 async def update( 436 self, 437 id: str, 438 values: Optional[List[float]] = None, 439 set_metadata: Optional[VectorMetadataTypedDict] = None, 440 namespace: Optional[str] = None, 441 sparse_values: Optional[Union[SparseValues, SparseVectorTypedDict]] = None, 442 **kwargs, 443 ) -> Dict[str, Any]: 444 return await self._vector_api.update_vector( 445 IndexRequestFactory.update_request( 446 id=id, 447 values=values, 448 set_metadata=set_metadata, 449 namespace=namespace, 450 sparse_values=sparse_values, 451 **kwargs, 452 ), 453 **self._openapi_kwargs(kwargs), 454 )
The Update operation updates vector in a namespace.
Arguments:
- id (str): Vector's unique id.
- values (List[float]): vector values to set. [optional]
- set_metadata (Dict[str, Union[str, float, int, bool, List[int], List[float], List[str]]]]): metadata to set for vector. [optional]
- namespace (str): Namespace name where to update the vector.. [optional]
- sparse_values: (Dict[str, Union[List[float], List[int]]]): sparse values to update for the vector. Expected to be either a SparseValues object or a dict of the form: {'indices': List[int], 'values': List[float]} where the lists each have the same length.
If a value is included, it will overwrite the previous value. If a set_metadata is included, the values of the fields specified in it will be added or overwrite the previous value.
API reference: https://docs.pinecone.io/reference/update
Examples:
import asyncio
from pinecone import Pinecone, Vector, SparseValues
async def main():
pc = Pinecone()
async with pc.IndexAsyncio(host="example-dojoi3u.svc.aped-4627-b74a.pinecone.io") as idx:
# Update vector values
await idx.update(
id='id1',
values=[0.1, 0.2, 0.3, ...],
namespace='my_namespace'
)
# Update metadata
await idx.update(
id='id1',
set_metadata={'key': 'value'},
namespace='my_namespace'
)
# Update sparse values
await idx.update(
id='id1',
sparse_values={'indices': [1, 2], 'values': [0.2, 0.4]},
namespace='my_namespace'
)
# Update sparse values with SparseValues object
await idx.update(
id='id1',
sparse_values=SparseValues(indices=[234781, 5432], values=[0.2, 0.4]),
namespace='my_namespace'
)
asyncio.run(main())
456 @validate_and_convert_errors 457 async def describe_index_stats( 458 self, filter: Optional[FilterTypedDict] = None, **kwargs 459 ) -> DescribeIndexStatsResponse: 460 return await self._vector_api.describe_index_stats( 461 IndexRequestFactory.describe_index_stats_request(filter, **kwargs), 462 **self._openapi_kwargs(kwargs), 463 )
The DescribeIndexStats operation returns statistics about the index's contents. For example: The vector count per namespace and the number of dimensions.
API reference: https://docs.pinecone.io/reference/describe_index_stats_post
Arguments:
- filter (Dict[str, Union[str, float, int, bool, List, dict]]):
- If this parameter is present, the operation only returns statistics for vectors that satisfy the filter.
- See https: //www.pinecone.io/docs/metadata-filtering/.. [optional]
Returns: DescribeIndexStatsResponse object which contains stats about the index.
import asyncio
from pinecone import Pinecone, Vector, SparseValues
async def main():
pc = Pinecone()
async with pc.IndexAsyncio(host="example-dojoi3u.svc.aped-4627-b74a.pinecone.io") as idx:
print(await idx.describe_index_stats())
asyncio.run(main())
465 @validate_and_convert_errors 466 async def list_paginated( 467 self, 468 prefix: Optional[str] = None, 469 limit: Optional[int] = None, 470 pagination_token: Optional[str] = None, 471 namespace: Optional[str] = None, 472 **kwargs, 473 ) -> ListResponse: 474 args_dict = IndexRequestFactory.list_paginated_args( 475 prefix=prefix, 476 limit=limit, 477 pagination_token=pagination_token, 478 namespace=namespace, 479 **kwargs, 480 ) 481 return await self._vector_api.list_vectors(**args_dict, **kwargs)
The list_paginated operation finds vectors based on an id prefix within a single namespace. It returns matching ids in a paginated form, with a pagination token to fetch the next page of results. This id list can then be passed to fetch or delete operations, depending on your use case.
Consider using the list
method to avoid having to handle pagination tokens manually.
Examples:
>>> results = index.list_paginated(prefix='99', limit=5, namespace='my_namespace') >>> [v.id for v in results.vectors] ['99', '990', '991', '992', '993'] >>> results.pagination.next eyJza2lwX3Bhc3QiOiI5OTMiLCJwcmVmaXgiOiI5OSJ9 >>> next_results = index.list_paginated(prefix='99', limit=5, namespace='my_namespace', pagination_token=results.pagination.next)
Arguments:
- prefix (Optional[str]): The id prefix to match. If unspecified, an empty string prefix will be used with the effect of listing all ids in a namespace [optional]
- limit (Optional[int]): The maximum number of ids to return. If unspecified, the server will use a default value. [optional]
- pagination_token (Optional[str]): A token needed to fetch the next page of results. This token is returned in the response if additional results are available. [optional]
- namespace (Optional[str]): The namespace to fetch vectors from. If not specified, the default namespace is used. [optional]
Returns: ListResponse object which contains the list of ids, the namespace name, pagination information, and usage showing the number of read_units consumed.
483 @validate_and_convert_errors 484 async def list(self, **kwargs): 485 done = False 486 while not done: 487 results = await self.list_paginated(**kwargs) 488 if len(results.vectors) > 0: 489 yield [v.id for v in results.vectors] 490 491 if results.pagination: 492 kwargs.update({"pagination_token": results.pagination.next}) 493 else: 494 done = True
The list operation accepts all of the same arguments as list_paginated, and returns a generator that yields a list of the matching vector ids in each page of results. It automatically handles pagination tokens on your behalf.
Examples:
>>> for ids in index.list(prefix='99', limit=5, namespace='my_namespace'): >>> print(ids) ['99', '990', '991', '992', '993'] ['994', '995', '996', '997', '998'] ['999']
Arguments:
- prefix (Optional[str]): The id prefix to match. If unspecified, an empty string prefix will be used with the effect of listing all ids in a namespace [optional]
- limit (Optional[int]): The maximum number of ids to return. If unspecified, the server will use a default value. [optional]
- pagination_token (Optional[str]): A token needed to fetch the next page of results. This token is returned in the response if additional results are available. [optional]
- namespace (Optional[str]): The namespace to fetch vectors from. If not specified, the default namespace is used. [optional]
496 async def upsert_records(self, namespace: str, records: List[Dict]): 497 args = IndexRequestFactory.upsert_records_args(namespace=namespace, records=records) 498 await self._vector_api.upsert_records_namespace(**args)
Parameters
- namespace: The namespace of the index to upsert records to.
- records: The records to upsert into the index.
Upsert records to a namespace. A record is a dictionary that contains eitiher an id
or _id
field along with other fields that will be stored as metadata. The id
or _id
field is used
as the unique identifier for the record. At least one field in the record should correspond to
a field mapping in the index's embed configuration.
When records are upserted, Pinecone converts mapped fields into embeddings and upserts them into the specified namespacce of the index.
import asyncio
from pinecone import (
Pinecone,
CloudProvider,
AwsRegion,
EmbedModel
IndexEmbed
)
async def main():
pc = Pinecone()
async with pc.IndexAsyncio(host="example-dojoi3u.svc.aped-4627-b74a.pinecone.io") as idx:
# upsert records
await idx.upsert_records(
namespace="my-namespace",
records=[
{
"_id": "test1",
"my_text_field": "Apple is a popular fruit known for its sweetness and crisp texture.",
},
{
"_id": "test2",
"my_text_field": "The tech company Apple is known for its innovative products like the iPhone.",
},
{
"_id": "test3",
"my_text_field": "Many people enjoy eating apples as a healthy snack.",
},
{
"_id": "test4",
"my_text_field": "Apple Inc. has revolutionized the tech industry with its sleek designs and user-friendly interfaces.",
},
{
"_id": "test5",
"my_text_field": "An apple a day keeps the doctor away, as the saying goes.",
},
{
"_id": "test6",
"my_text_field": "Apple Computer Company was founded on April 1, 1976, by Steve Jobs, Steve Wozniak, and Ronald Wayne as a partnership.",
},
],
)
from pinecone import SearchQuery, SearchRerank, RerankModel
# search for similar records
response = await idx.search_records(
namespace="my-namespace",
query=SearchQuery(
inputs={
"text": "Apple corporation",
},
top_k=3,
),
rerank=SearchRerank(
model=RerankModel.Bge_Reranker_V2_M3,
rank_fields=["my_text_field"],
top_n=3,
),
)
asyncio.run(main())
500 async def search( 501 self, 502 namespace: str, 503 query: Union[SearchQueryTypedDict, SearchQuery], 504 rerank: Optional[Union[SearchRerankTypedDict, SearchRerank]] = None, 505 fields: Optional[List[str]] = ["*"], # Default to returning all fields 506 ) -> SearchRecordsResponse: 507 if namespace is None: 508 raise Exception("Namespace is required when searching records") 509 510 request = IndexRequestFactory.search_request(query=query, rerank=rerank, fields=fields) 511 512 return await self._vector_api.search_records_namespace(namespace, request)
Parameters
- namespace: The namespace in the index to search.
- query: The SearchQuery to use for the search.
- rerank: The SearchRerank to use with the search request.
Returns
The records that match the search.
Search for records.
This operation converts a query to a vector embedding and then searches a namespace. You can optionally provide a reranking operation as part of the search.
import asyncio
from pinecone import (
Pinecone,
CloudProvider,
AwsRegion,
EmbedModel
IndexEmbed
)
async def main():
pc = Pinecone()
async with pc.IndexAsyncio(host="example-dojoi3u.svc.aped-4627-b74a.pinecone.io") as idx:
# upsert records
await idx.upsert_records(
namespace="my-namespace",
records=[
{
"_id": "test1",
"my_text_field": "Apple is a popular fruit known for its sweetness and crisp texture.",
},
{
"_id": "test2",
"my_text_field": "The tech company Apple is known for its innovative products like the iPhone.",
},
{
"_id": "test3",
"my_text_field": "Many people enjoy eating apples as a healthy snack.",
},
{
"_id": "test4",
"my_text_field": "Apple Inc. has revolutionized the tech industry with its sleek designs and user-friendly interfaces.",
},
{
"_id": "test5",
"my_text_field": "An apple a day keeps the doctor away, as the saying goes.",
},
{
"_id": "test6",
"my_text_field": "Apple Computer Company was founded on April 1, 1976, by Steve Jobs, Steve Wozniak, and Ronald Wayne as a partnership.",
},
],
)
from pinecone import SearchQuery, SearchRerank, RerankModel
# search for similar records
response = await idx.search_records(
namespace="my-namespace",
query=SearchQuery(
inputs={
"text": "Apple corporation",
},
top_k=3,
),
rerank=SearchRerank(
model=RerankModel.Bge_Reranker_V2_M3,
rank_fields=["my_text_field"],
top_n=3,
),
)
asyncio.run(main())
514 async def search_records( 515 self, 516 namespace: str, 517 query: Union[SearchQueryTypedDict, SearchQuery], 518 rerank: Optional[Union[SearchRerankTypedDict, SearchRerank]] = None, 519 fields: Optional[List[str]] = ["*"], # Default to returning all fields 520 ) -> SearchRecordsResponse: 521 return await self.search(namespace, query=query, rerank=rerank, fields=fields)
Alias of the search() method.