pinecone .data .index
1from pinecone.utils.tqdm import tqdm 2 3import logging 4import json 5from typing import Union, List, Optional, Dict, Any, Literal 6 7from pinecone.config import ConfigBuilder 8 9from pinecone.openapi_support import ApiClient 10from pinecone.core.openapi.db_data.api.vector_operations_api import VectorOperationsApi 11from pinecone.core.openapi.db_data import API_VERSION 12from pinecone.core.openapi.db_data.models import ( 13 QueryResponse, 14 IndexDescription as DescribeIndexStatsResponse, 15 UpsertResponse, 16 ListResponse, 17 SearchRecordsResponse, 18) 19from .dataclasses import Vector, SparseValues, FetchResponse, SearchQuery, SearchRerank 20from .interfaces import IndexInterface 21from .request_factory import IndexRequestFactory 22from .features.bulk_import import ImportFeatureMixin 23from .types import ( 24 SparseVectorTypedDict, 25 VectorTypedDict, 26 VectorMetadataTypedDict, 27 VectorTuple, 28 VectorTupleWithMetadata, 29 FilterTypedDict, 30 SearchRerankTypedDict, 31 SearchQueryTypedDict, 32) 33from ..utils import ( 34 setup_openapi_client, 35 parse_non_empty_args, 36 validate_and_convert_errors, 37 filter_dict, 38 PluginAware, 39) 40from .query_results_aggregator import QueryResultsAggregator, QueryNamespacesResults 41from pinecone.openapi_support import OPENAPI_ENDPOINT_PARAMS 42 43from multiprocessing.pool import ApplyResult 44from multiprocessing import cpu_count 45from concurrent.futures import as_completed 46 47 48logger = logging.getLogger(__name__) 49""" @private """ 50 51 52def parse_query_response(response: QueryResponse): 53 """@private""" 54 response._data_store.pop("results", None) 55 return response 56 57 58class Index(IndexInterface, ImportFeatureMixin, PluginAware): 59 """ 60 A client for interacting with a Pinecone index via REST API. 61 For improved performance, use the Pinecone GRPC index client. 62 """ 63 64 def __init__( 65 self, 66 api_key: str, 67 host: str, 68 pool_threads: Optional[int] = None, 69 additional_headers: Optional[Dict[str, str]] = {}, 70 openapi_config=None, 71 **kwargs, 72 ): 73 self.config = ConfigBuilder.build( 74 api_key=api_key, host=host, additional_headers=additional_headers, **kwargs 75 ) 76 """ @private """ 77 self.openapi_config = ConfigBuilder.build_openapi_config(self.config, openapi_config) 78 """ @private """ 79 80 if pool_threads is None: 81 self.pool_threads = 5 * cpu_count() 82 """ @private """ 83 else: 84 self.pool_threads = pool_threads 85 """ @private """ 86 87 if kwargs.get("connection_pool_maxsize", None): 88 self.openapi_config.connection_pool_maxsize = kwargs.get("connection_pool_maxsize") 89 90 self._vector_api = setup_openapi_client( 91 api_client_klass=ApiClient, 92 api_klass=VectorOperationsApi, 93 config=self.config, 94 openapi_config=self.openapi_config, 95 pool_threads=pool_threads, 96 api_version=API_VERSION, 97 ) 98 99 self._api_client = self._vector_api.api_client 100 101 # Pass the same api_client to the ImportFeatureMixin 102 super().__init__(api_client=self._api_client) 103 104 self.load_plugins( 105 config=self.config, openapi_config=self.openapi_config, pool_threads=self.pool_threads 106 ) 107 108 def _openapi_kwargs(self, kwargs: Dict[str, Any]) -> Dict[str, Any]: 109 return filter_dict(kwargs, OPENAPI_ENDPOINT_PARAMS) 110 111 def __enter__(self): 112 return self 113 114 def __exit__(self, exc_type, exc_value, traceback): 115 self._vector_api.api_client.close() 116 117 def close(self): 118 self._vector_api.api_client.close() 119 120 @validate_and_convert_errors 121 def upsert( 122 self, 123 vectors: Union[ 124 List[Vector], List[VectorTuple], List[VectorTupleWithMetadata], List[VectorTypedDict] 125 ], 126 namespace: Optional[str] = None, 127 batch_size: Optional[int] = None, 128 show_progress: bool = True, 129 **kwargs, 130 ) -> UpsertResponse: 131 _check_type = kwargs.pop("_check_type", True) 132 133 if kwargs.get("async_req", False) and batch_size is not None: 134 raise ValueError( 135 "async_req is not supported when batch_size is provided." 136 "To upsert in parallel, please follow: " 137 "https://docs.pinecone.io/docs/insert-data#sending-upserts-in-parallel" 138 ) 139 140 if batch_size is None: 141 return self._upsert_batch(vectors, namespace, _check_type, **kwargs) 142 143 if not isinstance(batch_size, int) or batch_size <= 0: 144 raise ValueError("batch_size must be a positive integer") 145 146 pbar = tqdm(total=len(vectors), disable=not show_progress, desc="Upserted vectors") 147 total_upserted = 0 148 for i in range(0, len(vectors), batch_size): 149 batch_result = self._upsert_batch( 150 vectors[i : i + batch_size], namespace, _check_type, **kwargs 151 ) 152 pbar.update(batch_result.upserted_count) 153 # we can't use here pbar.n for the case show_progress=False 154 total_upserted += batch_result.upserted_count 155 156 return UpsertResponse(upserted_count=total_upserted) 157 158 def _upsert_batch( 159 self, 160 vectors: Union[ 161 List[Vector], List[VectorTuple], List[VectorTupleWithMetadata], List[VectorTypedDict] 162 ], 163 namespace: Optional[str], 164 _check_type: bool, 165 **kwargs, 166 ) -> UpsertResponse: 167 return self._vector_api.upsert_vectors( 168 IndexRequestFactory.upsert_request(vectors, namespace, _check_type, **kwargs), 169 **self._openapi_kwargs(kwargs), 170 ) 171 172 @staticmethod 173 def _iter_dataframe(df, batch_size): 174 for i in range(0, len(df), batch_size): 175 batch = df.iloc[i : i + batch_size].to_dict(orient="records") 176 yield batch 177 178 @validate_and_convert_errors 179 def upsert_from_dataframe( 180 self, df, namespace: Optional[str] = None, batch_size: int = 500, show_progress: bool = True 181 ) -> UpsertResponse: 182 try: 183 import pandas as pd 184 except ImportError: 185 raise RuntimeError( 186 "The `pandas` package is not installed. Please install pandas to use `upsert_from_dataframe()`" 187 ) 188 189 if not isinstance(df, pd.DataFrame): 190 raise ValueError(f"Only pandas dataframes are supported. Found: {type(df)}") 191 192 pbar = tqdm(total=len(df), disable=not show_progress, desc="sending upsert requests") 193 results = [] 194 for chunk in self._iter_dataframe(df, batch_size=batch_size): 195 res = self.upsert(vectors=chunk, namespace=namespace) 196 pbar.update(len(chunk)) 197 results.append(res) 198 199 upserted_count = 0 200 for res in results: 201 upserted_count += res.upserted_count 202 203 return UpsertResponse(upserted_count=upserted_count) 204 205 def upsert_records(self, namespace: str, records: List[Dict]): 206 args = IndexRequestFactory.upsert_records_args(namespace=namespace, records=records) 207 self._vector_api.upsert_records_namespace(**args) 208 209 @validate_and_convert_errors 210 def search( 211 self, 212 namespace: str, 213 query: Union[SearchQueryTypedDict, SearchQuery], 214 rerank: Optional[Union[SearchRerankTypedDict, SearchRerank]] = None, 215 fields: Optional[List[str]] = ["*"], # Default to returning all fields 216 ) -> SearchRecordsResponse: 217 if namespace is None: 218 raise Exception("Namespace is required when searching records") 219 220 request = IndexRequestFactory.search_request(query=query, rerank=rerank, fields=fields) 221 222 return self._vector_api.search_records_namespace(namespace, request) 223 224 @validate_and_convert_errors 225 def search_records( 226 self, 227 namespace: str, 228 query: Union[SearchQueryTypedDict, SearchQuery], 229 rerank: Optional[Union[SearchRerankTypedDict, SearchRerank]] = None, 230 fields: Optional[List[str]] = ["*"], # Default to returning all fields 231 ) -> SearchRecordsResponse: 232 return self.search(namespace, query=query, rerank=rerank, fields=fields) 233 234 @validate_and_convert_errors 235 def delete( 236 self, 237 ids: Optional[List[str]] = None, 238 delete_all: Optional[bool] = None, 239 namespace: Optional[str] = None, 240 filter: Optional[Dict[str, Union[str, float, int, bool, List, dict]]] = None, 241 **kwargs, 242 ) -> Dict[str, Any]: 243 return self._vector_api.delete_vectors( 244 IndexRequestFactory.delete_request( 245 ids=ids, delete_all=delete_all, namespace=namespace, filter=filter, **kwargs 246 ), 247 **self._openapi_kwargs(kwargs), 248 ) 249 250 @validate_and_convert_errors 251 def fetch(self, ids: List[str], namespace: Optional[str] = None, **kwargs) -> FetchResponse: 252 args_dict = parse_non_empty_args([("namespace", namespace)]) 253 result = self._vector_api.fetch_vectors(ids=ids, **args_dict, **kwargs) 254 return FetchResponse( 255 namespace=result.namespace, 256 vectors={k: Vector.from_dict(v) for k, v in result.vectors.items()}, 257 usage=result.usage, 258 ) 259 260 @validate_and_convert_errors 261 def query( 262 self, 263 *args, 264 top_k: int, 265 vector: Optional[List[float]] = None, 266 id: Optional[str] = None, 267 namespace: Optional[str] = None, 268 filter: Optional[FilterTypedDict] = None, 269 include_values: Optional[bool] = None, 270 include_metadata: Optional[bool] = None, 271 sparse_vector: Optional[Union[SparseValues, SparseVectorTypedDict]] = None, 272 **kwargs, 273 ) -> Union[QueryResponse, ApplyResult]: 274 response = self._query( 275 *args, 276 top_k=top_k, 277 vector=vector, 278 id=id, 279 namespace=namespace, 280 filter=filter, 281 include_values=include_values, 282 include_metadata=include_metadata, 283 sparse_vector=sparse_vector, 284 **kwargs, 285 ) 286 287 if kwargs.get("async_req", False) or kwargs.get("async_threadpool_executor", False): 288 return response 289 else: 290 return parse_query_response(response) 291 292 def _query( 293 self, 294 *args, 295 top_k: int, 296 vector: Optional[List[float]] = None, 297 id: Optional[str] = None, 298 namespace: Optional[str] = None, 299 filter: Optional[FilterTypedDict] = None, 300 include_values: Optional[bool] = None, 301 include_metadata: Optional[bool] = None, 302 sparse_vector: Optional[Union[SparseValues, SparseVectorTypedDict]] = None, 303 **kwargs, 304 ) -> QueryResponse: 305 if len(args) > 0: 306 raise ValueError( 307 "The argument order for `query()` has changed; please use keyword arguments instead of positional arguments. Example: index.query(vector=[0.1, 0.2, 0.3], top_k=10, namespace='my_namespace')" 308 ) 309 310 if top_k < 1: 311 raise ValueError("top_k must be a positive integer") 312 313 request = IndexRequestFactory.query_request( 314 top_k=top_k, 315 vector=vector, 316 id=id, 317 namespace=namespace, 318 filter=filter, 319 include_values=include_values, 320 include_metadata=include_metadata, 321 sparse_vector=sparse_vector, 322 **kwargs, 323 ) 324 return self._vector_api.query_vectors(request, **self._openapi_kwargs(kwargs)) 325 326 @validate_and_convert_errors 327 def query_namespaces( 328 self, 329 vector: Optional[List[float]], 330 namespaces: List[str], 331 metric: Literal["cosine", "euclidean", "dotproduct"], 332 top_k: Optional[int] = None, 333 filter: Optional[Dict[str, Union[str, float, int, bool, List, dict]]] = None, 334 include_values: Optional[bool] = None, 335 include_metadata: Optional[bool] = None, 336 sparse_vector: Optional[ 337 Union[SparseValues, Dict[str, Union[List[float], List[int]]]] 338 ] = None, 339 **kwargs, 340 ) -> QueryNamespacesResults: 341 if namespaces is None or len(namespaces) == 0: 342 raise ValueError("At least one namespace must be specified") 343 if sparse_vector is None and vector is not None and len(vector) == 0: 344 # If querying with a vector, it must not be empty 345 raise ValueError("Query vector must not be empty") 346 347 overall_topk = top_k if top_k is not None else 10 348 aggregator = QueryResultsAggregator(top_k=overall_topk, metric=metric) 349 350 target_namespaces = set(namespaces) # dedup namespaces 351 async_futures = [ 352 self.query( 353 vector=vector, 354 namespace=ns, 355 top_k=overall_topk, 356 filter=filter, 357 include_values=include_values, 358 include_metadata=include_metadata, 359 sparse_vector=sparse_vector, 360 async_threadpool_executor=True, 361 _preload_content=False, 362 **kwargs, 363 ) 364 for ns in target_namespaces 365 ] 366 367 for result in as_completed(async_futures): 368 raw_result = result.result() 369 response = json.loads(raw_result.data.decode("utf-8")) 370 aggregator.add_results(response) 371 372 final_results = aggregator.get_results() 373 return final_results 374 375 @validate_and_convert_errors 376 def update( 377 self, 378 id: str, 379 values: Optional[List[float]] = None, 380 set_metadata: Optional[VectorMetadataTypedDict] = None, 381 namespace: Optional[str] = None, 382 sparse_values: Optional[Union[SparseValues, SparseVectorTypedDict]] = None, 383 **kwargs, 384 ) -> Dict[str, Any]: 385 return self._vector_api.update_vector( 386 IndexRequestFactory.update_request( 387 id=id, 388 values=values, 389 set_metadata=set_metadata, 390 namespace=namespace, 391 sparse_values=sparse_values, 392 **kwargs, 393 ), 394 **self._openapi_kwargs(kwargs), 395 ) 396 397 @validate_and_convert_errors 398 def describe_index_stats( 399 self, filter: Optional[FilterTypedDict] = None, **kwargs 400 ) -> DescribeIndexStatsResponse: 401 return self._vector_api.describe_index_stats( 402 IndexRequestFactory.describe_index_stats_request(filter, **kwargs), 403 **self._openapi_kwargs(kwargs), 404 ) 405 406 @validate_and_convert_errors 407 def list_paginated( 408 self, 409 prefix: Optional[str] = None, 410 limit: Optional[int] = None, 411 pagination_token: Optional[str] = None, 412 namespace: Optional[str] = None, 413 **kwargs, 414 ) -> ListResponse: 415 args_dict = IndexRequestFactory.list_paginated_args( 416 prefix=prefix, 417 limit=limit, 418 pagination_token=pagination_token, 419 namespace=namespace, 420 **kwargs, 421 ) 422 return self._vector_api.list_vectors(**args_dict, **kwargs) 423 424 @validate_and_convert_errors 425 def list(self, **kwargs): 426 done = False 427 while not done: 428 results = self.list_paginated(**kwargs) 429 if len(results.vectors) > 0: 430 yield [v.id for v in results.vectors] 431 432 if results.pagination: 433 kwargs.update({"pagination_token": results.pagination.next}) 434 else: 435 done = True
59class Index(IndexInterface, ImportFeatureMixin, PluginAware): 60 """ 61 A client for interacting with a Pinecone index via REST API. 62 For improved performance, use the Pinecone GRPC index client. 63 """ 64 65 def __init__( 66 self, 67 api_key: str, 68 host: str, 69 pool_threads: Optional[int] = None, 70 additional_headers: Optional[Dict[str, str]] = {}, 71 openapi_config=None, 72 **kwargs, 73 ): 74 self.config = ConfigBuilder.build( 75 api_key=api_key, host=host, additional_headers=additional_headers, **kwargs 76 ) 77 """ @private """ 78 self.openapi_config = ConfigBuilder.build_openapi_config(self.config, openapi_config) 79 """ @private """ 80 81 if pool_threads is None: 82 self.pool_threads = 5 * cpu_count() 83 """ @private """ 84 else: 85 self.pool_threads = pool_threads 86 """ @private """ 87 88 if kwargs.get("connection_pool_maxsize", None): 89 self.openapi_config.connection_pool_maxsize = kwargs.get("connection_pool_maxsize") 90 91 self._vector_api = setup_openapi_client( 92 api_client_klass=ApiClient, 93 api_klass=VectorOperationsApi, 94 config=self.config, 95 openapi_config=self.openapi_config, 96 pool_threads=pool_threads, 97 api_version=API_VERSION, 98 ) 99 100 self._api_client = self._vector_api.api_client 101 102 # Pass the same api_client to the ImportFeatureMixin 103 super().__init__(api_client=self._api_client) 104 105 self.load_plugins( 106 config=self.config, openapi_config=self.openapi_config, pool_threads=self.pool_threads 107 ) 108 109 def _openapi_kwargs(self, kwargs: Dict[str, Any]) -> Dict[str, Any]: 110 return filter_dict(kwargs, OPENAPI_ENDPOINT_PARAMS) 111 112 def __enter__(self): 113 return self 114 115 def __exit__(self, exc_type, exc_value, traceback): 116 self._vector_api.api_client.close() 117 118 def close(self): 119 self._vector_api.api_client.close() 120 121 @validate_and_convert_errors 122 def upsert( 123 self, 124 vectors: Union[ 125 List[Vector], List[VectorTuple], List[VectorTupleWithMetadata], List[VectorTypedDict] 126 ], 127 namespace: Optional[str] = None, 128 batch_size: Optional[int] = None, 129 show_progress: bool = True, 130 **kwargs, 131 ) -> UpsertResponse: 132 _check_type = kwargs.pop("_check_type", True) 133 134 if kwargs.get("async_req", False) and batch_size is not None: 135 raise ValueError( 136 "async_req is not supported when batch_size is provided." 137 "To upsert in parallel, please follow: " 138 "https://docs.pinecone.io/docs/insert-data#sending-upserts-in-parallel" 139 ) 140 141 if batch_size is None: 142 return self._upsert_batch(vectors, namespace, _check_type, **kwargs) 143 144 if not isinstance(batch_size, int) or batch_size <= 0: 145 raise ValueError("batch_size must be a positive integer") 146 147 pbar = tqdm(total=len(vectors), disable=not show_progress, desc="Upserted vectors") 148 total_upserted = 0 149 for i in range(0, len(vectors), batch_size): 150 batch_result = self._upsert_batch( 151 vectors[i : i + batch_size], namespace, _check_type, **kwargs 152 ) 153 pbar.update(batch_result.upserted_count) 154 # we can't use here pbar.n for the case show_progress=False 155 total_upserted += batch_result.upserted_count 156 157 return UpsertResponse(upserted_count=total_upserted) 158 159 def _upsert_batch( 160 self, 161 vectors: Union[ 162 List[Vector], List[VectorTuple], List[VectorTupleWithMetadata], List[VectorTypedDict] 163 ], 164 namespace: Optional[str], 165 _check_type: bool, 166 **kwargs, 167 ) -> UpsertResponse: 168 return self._vector_api.upsert_vectors( 169 IndexRequestFactory.upsert_request(vectors, namespace, _check_type, **kwargs), 170 **self._openapi_kwargs(kwargs), 171 ) 172 173 @staticmethod 174 def _iter_dataframe(df, batch_size): 175 for i in range(0, len(df), batch_size): 176 batch = df.iloc[i : i + batch_size].to_dict(orient="records") 177 yield batch 178 179 @validate_and_convert_errors 180 def upsert_from_dataframe( 181 self, df, namespace: Optional[str] = None, batch_size: int = 500, show_progress: bool = True 182 ) -> UpsertResponse: 183 try: 184 import pandas as pd 185 except ImportError: 186 raise RuntimeError( 187 "The `pandas` package is not installed. Please install pandas to use `upsert_from_dataframe()`" 188 ) 189 190 if not isinstance(df, pd.DataFrame): 191 raise ValueError(f"Only pandas dataframes are supported. Found: {type(df)}") 192 193 pbar = tqdm(total=len(df), disable=not show_progress, desc="sending upsert requests") 194 results = [] 195 for chunk in self._iter_dataframe(df, batch_size=batch_size): 196 res = self.upsert(vectors=chunk, namespace=namespace) 197 pbar.update(len(chunk)) 198 results.append(res) 199 200 upserted_count = 0 201 for res in results: 202 upserted_count += res.upserted_count 203 204 return UpsertResponse(upserted_count=upserted_count) 205 206 def upsert_records(self, namespace: str, records: List[Dict]): 207 args = IndexRequestFactory.upsert_records_args(namespace=namespace, records=records) 208 self._vector_api.upsert_records_namespace(**args) 209 210 @validate_and_convert_errors 211 def search( 212 self, 213 namespace: str, 214 query: Union[SearchQueryTypedDict, SearchQuery], 215 rerank: Optional[Union[SearchRerankTypedDict, SearchRerank]] = None, 216 fields: Optional[List[str]] = ["*"], # Default to returning all fields 217 ) -> SearchRecordsResponse: 218 if namespace is None: 219 raise Exception("Namespace is required when searching records") 220 221 request = IndexRequestFactory.search_request(query=query, rerank=rerank, fields=fields) 222 223 return self._vector_api.search_records_namespace(namespace, request) 224 225 @validate_and_convert_errors 226 def search_records( 227 self, 228 namespace: str, 229 query: Union[SearchQueryTypedDict, SearchQuery], 230 rerank: Optional[Union[SearchRerankTypedDict, SearchRerank]] = None, 231 fields: Optional[List[str]] = ["*"], # Default to returning all fields 232 ) -> SearchRecordsResponse: 233 return self.search(namespace, query=query, rerank=rerank, fields=fields) 234 235 @validate_and_convert_errors 236 def delete( 237 self, 238 ids: Optional[List[str]] = None, 239 delete_all: Optional[bool] = None, 240 namespace: Optional[str] = None, 241 filter: Optional[Dict[str, Union[str, float, int, bool, List, dict]]] = None, 242 **kwargs, 243 ) -> Dict[str, Any]: 244 return self._vector_api.delete_vectors( 245 IndexRequestFactory.delete_request( 246 ids=ids, delete_all=delete_all, namespace=namespace, filter=filter, **kwargs 247 ), 248 **self._openapi_kwargs(kwargs), 249 ) 250 251 @validate_and_convert_errors 252 def fetch(self, ids: List[str], namespace: Optional[str] = None, **kwargs) -> FetchResponse: 253 args_dict = parse_non_empty_args([("namespace", namespace)]) 254 result = self._vector_api.fetch_vectors(ids=ids, **args_dict, **kwargs) 255 return FetchResponse( 256 namespace=result.namespace, 257 vectors={k: Vector.from_dict(v) for k, v in result.vectors.items()}, 258 usage=result.usage, 259 ) 260 261 @validate_and_convert_errors 262 def query( 263 self, 264 *args, 265 top_k: int, 266 vector: Optional[List[float]] = None, 267 id: Optional[str] = None, 268 namespace: Optional[str] = None, 269 filter: Optional[FilterTypedDict] = None, 270 include_values: Optional[bool] = None, 271 include_metadata: Optional[bool] = None, 272 sparse_vector: Optional[Union[SparseValues, SparseVectorTypedDict]] = None, 273 **kwargs, 274 ) -> Union[QueryResponse, ApplyResult]: 275 response = self._query( 276 *args, 277 top_k=top_k, 278 vector=vector, 279 id=id, 280 namespace=namespace, 281 filter=filter, 282 include_values=include_values, 283 include_metadata=include_metadata, 284 sparse_vector=sparse_vector, 285 **kwargs, 286 ) 287 288 if kwargs.get("async_req", False) or kwargs.get("async_threadpool_executor", False): 289 return response 290 else: 291 return parse_query_response(response) 292 293 def _query( 294 self, 295 *args, 296 top_k: int, 297 vector: Optional[List[float]] = None, 298 id: Optional[str] = None, 299 namespace: Optional[str] = None, 300 filter: Optional[FilterTypedDict] = None, 301 include_values: Optional[bool] = None, 302 include_metadata: Optional[bool] = None, 303 sparse_vector: Optional[Union[SparseValues, SparseVectorTypedDict]] = None, 304 **kwargs, 305 ) -> QueryResponse: 306 if len(args) > 0: 307 raise ValueError( 308 "The argument order for `query()` has changed; please use keyword arguments instead of positional arguments. Example: index.query(vector=[0.1, 0.2, 0.3], top_k=10, namespace='my_namespace')" 309 ) 310 311 if top_k < 1: 312 raise ValueError("top_k must be a positive integer") 313 314 request = IndexRequestFactory.query_request( 315 top_k=top_k, 316 vector=vector, 317 id=id, 318 namespace=namespace, 319 filter=filter, 320 include_values=include_values, 321 include_metadata=include_metadata, 322 sparse_vector=sparse_vector, 323 **kwargs, 324 ) 325 return self._vector_api.query_vectors(request, **self._openapi_kwargs(kwargs)) 326 327 @validate_and_convert_errors 328 def query_namespaces( 329 self, 330 vector: Optional[List[float]], 331 namespaces: List[str], 332 metric: Literal["cosine", "euclidean", "dotproduct"], 333 top_k: Optional[int] = None, 334 filter: Optional[Dict[str, Union[str, float, int, bool, List, dict]]] = None, 335 include_values: Optional[bool] = None, 336 include_metadata: Optional[bool] = None, 337 sparse_vector: Optional[ 338 Union[SparseValues, Dict[str, Union[List[float], List[int]]]] 339 ] = None, 340 **kwargs, 341 ) -> QueryNamespacesResults: 342 if namespaces is None or len(namespaces) == 0: 343 raise ValueError("At least one namespace must be specified") 344 if sparse_vector is None and vector is not None and len(vector) == 0: 345 # If querying with a vector, it must not be empty 346 raise ValueError("Query vector must not be empty") 347 348 overall_topk = top_k if top_k is not None else 10 349 aggregator = QueryResultsAggregator(top_k=overall_topk, metric=metric) 350 351 target_namespaces = set(namespaces) # dedup namespaces 352 async_futures = [ 353 self.query( 354 vector=vector, 355 namespace=ns, 356 top_k=overall_topk, 357 filter=filter, 358 include_values=include_values, 359 include_metadata=include_metadata, 360 sparse_vector=sparse_vector, 361 async_threadpool_executor=True, 362 _preload_content=False, 363 **kwargs, 364 ) 365 for ns in target_namespaces 366 ] 367 368 for result in as_completed(async_futures): 369 raw_result = result.result() 370 response = json.loads(raw_result.data.decode("utf-8")) 371 aggregator.add_results(response) 372 373 final_results = aggregator.get_results() 374 return final_results 375 376 @validate_and_convert_errors 377 def update( 378 self, 379 id: str, 380 values: Optional[List[float]] = None, 381 set_metadata: Optional[VectorMetadataTypedDict] = None, 382 namespace: Optional[str] = None, 383 sparse_values: Optional[Union[SparseValues, SparseVectorTypedDict]] = None, 384 **kwargs, 385 ) -> Dict[str, Any]: 386 return self._vector_api.update_vector( 387 IndexRequestFactory.update_request( 388 id=id, 389 values=values, 390 set_metadata=set_metadata, 391 namespace=namespace, 392 sparse_values=sparse_values, 393 **kwargs, 394 ), 395 **self._openapi_kwargs(kwargs), 396 ) 397 398 @validate_and_convert_errors 399 def describe_index_stats( 400 self, filter: Optional[FilterTypedDict] = None, **kwargs 401 ) -> DescribeIndexStatsResponse: 402 return self._vector_api.describe_index_stats( 403 IndexRequestFactory.describe_index_stats_request(filter, **kwargs), 404 **self._openapi_kwargs(kwargs), 405 ) 406 407 @validate_and_convert_errors 408 def list_paginated( 409 self, 410 prefix: Optional[str] = None, 411 limit: Optional[int] = None, 412 pagination_token: Optional[str] = None, 413 namespace: Optional[str] = None, 414 **kwargs, 415 ) -> ListResponse: 416 args_dict = IndexRequestFactory.list_paginated_args( 417 prefix=prefix, 418 limit=limit, 419 pagination_token=pagination_token, 420 namespace=namespace, 421 **kwargs, 422 ) 423 return self._vector_api.list_vectors(**args_dict, **kwargs) 424 425 @validate_and_convert_errors 426 def list(self, **kwargs): 427 done = False 428 while not done: 429 results = self.list_paginated(**kwargs) 430 if len(results.vectors) > 0: 431 yield [v.id for v in results.vectors] 432 433 if results.pagination: 434 kwargs.update({"pagination_token": results.pagination.next}) 435 else: 436 done = True
A client for interacting with a Pinecone index via REST API. For improved performance, use the Pinecone GRPC index client.
65 def __init__( 66 self, 67 api_key: str, 68 host: str, 69 pool_threads: Optional[int] = None, 70 additional_headers: Optional[Dict[str, str]] = {}, 71 openapi_config=None, 72 **kwargs, 73 ): 74 self.config = ConfigBuilder.build( 75 api_key=api_key, host=host, additional_headers=additional_headers, **kwargs 76 ) 77 """ @private """ 78 self.openapi_config = ConfigBuilder.build_openapi_config(self.config, openapi_config) 79 """ @private """ 80 81 if pool_threads is None: 82 self.pool_threads = 5 * cpu_count() 83 """ @private """ 84 else: 85 self.pool_threads = pool_threads 86 """ @private """ 87 88 if kwargs.get("connection_pool_maxsize", None): 89 self.openapi_config.connection_pool_maxsize = kwargs.get("connection_pool_maxsize") 90 91 self._vector_api = setup_openapi_client( 92 api_client_klass=ApiClient, 93 api_klass=VectorOperationsApi, 94 config=self.config, 95 openapi_config=self.openapi_config, 96 pool_threads=pool_threads, 97 api_version=API_VERSION, 98 ) 99 100 self._api_client = self._vector_api.api_client 101 102 # Pass the same api_client to the ImportFeatureMixin 103 super().__init__(api_client=self._api_client) 104 105 self.load_plugins( 106 config=self.config, openapi_config=self.openapi_config, pool_threads=self.pool_threads 107 )
121 @validate_and_convert_errors 122 def upsert( 123 self, 124 vectors: Union[ 125 List[Vector], List[VectorTuple], List[VectorTupleWithMetadata], List[VectorTypedDict] 126 ], 127 namespace: Optional[str] = None, 128 batch_size: Optional[int] = None, 129 show_progress: bool = True, 130 **kwargs, 131 ) -> UpsertResponse: 132 _check_type = kwargs.pop("_check_type", True) 133 134 if kwargs.get("async_req", False) and batch_size is not None: 135 raise ValueError( 136 "async_req is not supported when batch_size is provided." 137 "To upsert in parallel, please follow: " 138 "https://docs.pinecone.io/docs/insert-data#sending-upserts-in-parallel" 139 ) 140 141 if batch_size is None: 142 return self._upsert_batch(vectors, namespace, _check_type, **kwargs) 143 144 if not isinstance(batch_size, int) or batch_size <= 0: 145 raise ValueError("batch_size must be a positive integer") 146 147 pbar = tqdm(total=len(vectors), disable=not show_progress, desc="Upserted vectors") 148 total_upserted = 0 149 for i in range(0, len(vectors), batch_size): 150 batch_result = self._upsert_batch( 151 vectors[i : i + batch_size], namespace, _check_type, **kwargs 152 ) 153 pbar.update(batch_result.upserted_count) 154 # we can't use here pbar.n for the case show_progress=False 155 total_upserted += batch_result.upserted_count 156 157 return UpsertResponse(upserted_count=total_upserted)
Arguments:
- vectors (Union[List[Vector], List[VectorTuple], List[VectorTupleWithMetadata], List[VectorTypedDict]]): A list of vectors to upsert.
- namespace (str): The namespace to write to. If not specified, the default namespace is used. [optional]
- batch_size (int): The number of vectors to upsert in each batch. If not specified, all vectors will be upserted in a single batch. [optional]
- show_progress (bool): Whether to show a progress bar using tqdm. Applied only if batch_size is provided. Default is True.
Returns:
UpsertResponse
, includes the number of vectors upserted.
The upsert operation writes vectors into a namespace. If a new value is upserted for an existing vector id, it will overwrite the previous value.
To upsert in parallel follow: https://docs.pinecone.io/docs/insert-data#sending-upserts-in-parallel
Upserting dense vectors
Note: the dimension of each dense vector must match the dimension of the index.
A vector can be represented in a variety of ways.
from pinecone import Pinecone, Vector
pc = Pinecone()
idx = pc.Index("index-name")
# A Vector object
idx.upsert(
namespace = 'my-namespace',
vectors = [
Vector(id='id1', values=[0.1, 0.2, 0.3, 0.4], metadata={'metadata_key': 'metadata_value'}),
]
)
# A vector tuple
idx.upsert(
namespace = 'my-namespace',
vectors = [
('id1', [0.1, 0.2, 0.3, 0.4]),
]
)
# A vector tuple with metadata
idx.upsert(
namespace = 'my-namespace',
vectors = [
('id1', [0.1, 0.2, 0.3, 0.4], {'metadata_key': 'metadata_value'}),
]
)
# A vector dictionary
idx.upsert(
namespace = 'my-namespace',
vectors = [
{"id": 1, "values": [0.1, 0.2, 0.3, 0.4], "metadata": {"metadata_key": "metadata_value"}},
]
Upserting sparse vectors
from pinecone import Pinecone, Vector, SparseValues
pc = Pinecone()
idx = pc.Index("index-name")
# A Vector object
idx.upsert(
namespace = 'my-namespace',
vectors = [
Vector(id='id1', sparse_values=SparseValues(indices=[1, 2], values=[0.2, 0.4])),
]
)
# A dictionary
idx.upsert(
namespace = 'my-namespace',
vectors = [
{"id": 1, "sparse_values": {"indices": [1, 2], "values": [0.2, 0.4]}},
]
)
Batch upsert
If you have a large number of vectors, you can upsert them in batches.
from pinecone import Pinecone, Vector
pc = Pinecone()
idx = pc.Index("index-name")
idx.upsert(
namespace = 'my-namespace',
vectors = [
{'id': 'id1', 'values': [0.1, 0.2, 0.3, 0.4]},
{'id': 'id2', 'values': [0.2, 0.3, 0.4, 0.5]},
{'id': 'id3', 'values': [0.3, 0.4, 0.5, 0.6]},
{'id': 'id4', 'values': [0.4, 0.5, 0.6, 0.7]},
{'id': 'id5', 'values': [0.5, 0.6, 0.7, 0.8]},
# More vectors here
],
batch_size = 50
)
Visual progress bar with tqdm
To see a progress bar when upserting in batches, you will need to separately install the tqdm
package.
If tqdm
is present, the client will detect and use it to display progress when show_progress=True
.
179 @validate_and_convert_errors 180 def upsert_from_dataframe( 181 self, df, namespace: Optional[str] = None, batch_size: int = 500, show_progress: bool = True 182 ) -> UpsertResponse: 183 try: 184 import pandas as pd 185 except ImportError: 186 raise RuntimeError( 187 "The `pandas` package is not installed. Please install pandas to use `upsert_from_dataframe()`" 188 ) 189 190 if not isinstance(df, pd.DataFrame): 191 raise ValueError(f"Only pandas dataframes are supported. Found: {type(df)}") 192 193 pbar = tqdm(total=len(df), disable=not show_progress, desc="sending upsert requests") 194 results = [] 195 for chunk in self._iter_dataframe(df, batch_size=batch_size): 196 res = self.upsert(vectors=chunk, namespace=namespace) 197 pbar.update(len(chunk)) 198 results.append(res) 199 200 upserted_count = 0 201 for res in results: 202 upserted_count += res.upserted_count 203 204 return UpsertResponse(upserted_count=upserted_count)
Upserts a dataframe into the index.
Arguments:
- df: A pandas dataframe with the following columns: id, values, sparse_values, and metadata.
- namespace: The namespace to upsert into.
- batch_size: The number of rows to upsert in a single batch.
- show_progress: Whether to show a progress bar.
206 def upsert_records(self, namespace: str, records: List[Dict]): 207 args = IndexRequestFactory.upsert_records_args(namespace=namespace, records=records) 208 self._vector_api.upsert_records_namespace(**args)
Parameters
- namespace: The namespace of the index to upsert records to.
- records: The records to upsert into the index.
Upsert records to a namespace. A record is a dictionary that contains eitiher an id
or _id
field along with other fields that will be stored as metadata. The id
or _id
field is used
as the unique identifier for the record. At least one field in the record should correspond to
a field mapping in the index's embed configuration.
When records are upserted, Pinecone converts mapped fields into embeddings and upserts them into the specified namespacce of the index.
from pinecone import (
Pinecone,
CloudProvider,
AwsRegion,
EmbedModel
IndexEmbed
)
pc = Pinecone(api_key="<<PINECONE_API_KEY>>")
# Create an index for your embedding model
index_model = pc.create_index_for_model(
name="my-model-index",
cloud=CloudProvider.AWS,
region=AwsRegion.US_WEST_2,
embed=IndexEmbed(
model=EmbedModel.Multilingual_E5_Large,
field_map={"text": "my_text_field"}
)
)
# Instantiate the index client
idx = pc.Index(host=index_model.host)
# upsert records
idx.upsert_records(
namespace="my-namespace",
records=[
{
"_id": "test1",
"my_text_field": "Apple is a popular fruit known for its sweetness and crisp texture.",
},
{
"_id": "test2",
"my_text_field": "The tech company Apple is known for its innovative products like the iPhone.",
},
{
"_id": "test3",
"my_text_field": "Many people enjoy eating apples as a healthy snack.",
},
{
"_id": "test4",
"my_text_field": "Apple Inc. has revolutionized the tech industry with its sleek designs and user-friendly interfaces.",
},
{
"_id": "test5",
"my_text_field": "An apple a day keeps the doctor away, as the saying goes.",
},
{
"_id": "test6",
"my_text_field": "Apple Computer Company was founded on April 1, 1976, by Steve Jobs, Steve Wozniak, and Ronald Wayne as a partnership.",
},
],
)
from pinecone import SearchQuery, SearchRerank, RerankModel
# search for similar records
response = idx.search_records(
namespace="my-namespace",
query=SearchQuery(
inputs={
"text": "Apple corporation",
},
top_k=3,
),
rerank=SearchRerank(
model=RerankModel.Bge_Reranker_V2_M3,
rank_fields=["my_text_field"],
top_n=3,
),
)
210 @validate_and_convert_errors 211 def search( 212 self, 213 namespace: str, 214 query: Union[SearchQueryTypedDict, SearchQuery], 215 rerank: Optional[Union[SearchRerankTypedDict, SearchRerank]] = None, 216 fields: Optional[List[str]] = ["*"], # Default to returning all fields 217 ) -> SearchRecordsResponse: 218 if namespace is None: 219 raise Exception("Namespace is required when searching records") 220 221 request = IndexRequestFactory.search_request(query=query, rerank=rerank, fields=fields) 222 223 return self._vector_api.search_records_namespace(namespace, request)
Parameters
- namespace: The namespace in the index to search.
- query: The SearchQuery to use for the search.
- rerank: The SearchRerank to use with the search request.
Returns
The records that match the search.
Search for records.
This operation converts a query to a vector embedding and then searches a namespace. You can optionally provide a reranking operation as part of the search.
from pinecone import (
Pinecone,
CloudProvider,
AwsRegion,
EmbedModel
IndexEmbed
)
pc = Pinecone(api_key="<<PINECONE_API_KEY>>")
# Create an index for your embedding model
index_model = pc.create_index_for_model(
name="my-model-index",
cloud=CloudProvider.AWS,
region=AwsRegion.US_WEST_2,
embed=IndexEmbed(
model=EmbedModel.Multilingual_E5_Large,
field_map={"text": "my_text_field"}
)
)
# Instantiate the index client
idx = pc.Index(host=index_model.host)
# upsert records
idx.upsert_records(
namespace="my-namespace",
records=[
{
"_id": "test1",
"my_text_field": "Apple is a popular fruit known for its sweetness and crisp texture.",
},
{
"_id": "test2",
"my_text_field": "The tech company Apple is known for its innovative products like the iPhone.",
},
{
"_id": "test3",
"my_text_field": "Many people enjoy eating apples as a healthy snack.",
},
{
"_id": "test4",
"my_text_field": "Apple Inc. has revolutionized the tech industry with its sleek designs and user-friendly interfaces.",
},
{
"_id": "test5",
"my_text_field": "An apple a day keeps the doctor away, as the saying goes.",
},
{
"_id": "test6",
"my_text_field": "Apple Computer Company was founded on April 1, 1976, by Steve Jobs, Steve Wozniak, and Ronald Wayne as a partnership.",
},
],
)
from pinecone import SearchQuery, SearchRerank, RerankModel
# search for similar records
response = idx.search_records(
namespace="my-namespace",
query=SearchQuery(
inputs={
"text": "Apple corporation",
},
top_k=3,
),
rerank=SearchRerank(
model=RerankModel.Bge_Reranker_V2_M3,
rank_fields=["my_text_field"],
top_n=3,
),
)
225 @validate_and_convert_errors 226 def search_records( 227 self, 228 namespace: str, 229 query: Union[SearchQueryTypedDict, SearchQuery], 230 rerank: Optional[Union[SearchRerankTypedDict, SearchRerank]] = None, 231 fields: Optional[List[str]] = ["*"], # Default to returning all fields 232 ) -> SearchRecordsResponse: 233 return self.search(namespace, query=query, rerank=rerank, fields=fields)
Alias of the search() method.
235 @validate_and_convert_errors 236 def delete( 237 self, 238 ids: Optional[List[str]] = None, 239 delete_all: Optional[bool] = None, 240 namespace: Optional[str] = None, 241 filter: Optional[Dict[str, Union[str, float, int, bool, List, dict]]] = None, 242 **kwargs, 243 ) -> Dict[str, Any]: 244 return self._vector_api.delete_vectors( 245 IndexRequestFactory.delete_request( 246 ids=ids, delete_all=delete_all, namespace=namespace, filter=filter, **kwargs 247 ), 248 **self._openapi_kwargs(kwargs), 249 )
Arguments:
- ids (List[str]): Vector ids to delete [optional]
- delete_all (bool): This indicates that all vectors in the index namespace should be deleted.. [optional] Default is False.
- namespace (str): The namespace to delete vectors from [optional] If not specified, the default namespace is used.
- filter (Dict[str, Union[str, float, int, bool, List, dict]]): If specified, the metadata filter here will be used to select the vectors to delete. This is mutually exclusive with specifying ids to delete in the ids param or using delete_all=True. See https://www.pinecone.io/docs/metadata-filtering/.. [optional]
The Delete operation deletes vectors from the index, from a single namespace.
No error is raised if the vector id does not exist.
Note: For any delete call, if namespace is not specified, the default namespace ""
is used.
Since the delete operation does not error when ids are not present, this means you may not receive
an error if you delete from the wrong namespace.
Delete can occur in the following mutual exclusive ways:
- Delete by ids from a single namespace
- Delete all vectors from a single namespace by setting delete_all to True
- Delete all vectors from a single namespace by specifying a metadata filter (note that for this option delete all must be set to False)
API reference: https://docs.pinecone.io/reference/delete_post
Examples:
>>> index.delete(ids=['id1', 'id2'], namespace='my_namespace') >>> index.delete(delete_all=True, namespace='my_namespace') >>> index.delete(filter={'key': 'value'}, namespace='my_namespace')
Returns: An empty dictionary if the delete operation was successful.
251 @validate_and_convert_errors 252 def fetch(self, ids: List[str], namespace: Optional[str] = None, **kwargs) -> FetchResponse: 253 args_dict = parse_non_empty_args([("namespace", namespace)]) 254 result = self._vector_api.fetch_vectors(ids=ids, **args_dict, **kwargs) 255 return FetchResponse( 256 namespace=result.namespace, 257 vectors={k: Vector.from_dict(v) for k, v in result.vectors.items()}, 258 usage=result.usage, 259 )
The fetch operation looks up and returns vectors, by ID, from a single namespace. The returned vectors include the vector data and/or metadata.
API reference: https://docs.pinecone.io/reference/fetch
Examples:
>>> index.fetch(ids=['id1', 'id2'], namespace='my_namespace') >>> index.fetch(ids=['id1', 'id2'])
Arguments:
- ids (List[str]): The vector IDs to fetch.
- namespace (str): The namespace to fetch vectors from. If not specified, the default namespace is used. [optional]
Returns: FetchResponse object which contains the list of Vector objects, and namespace name.
261 @validate_and_convert_errors 262 def query( 263 self, 264 *args, 265 top_k: int, 266 vector: Optional[List[float]] = None, 267 id: Optional[str] = None, 268 namespace: Optional[str] = None, 269 filter: Optional[FilterTypedDict] = None, 270 include_values: Optional[bool] = None, 271 include_metadata: Optional[bool] = None, 272 sparse_vector: Optional[Union[SparseValues, SparseVectorTypedDict]] = None, 273 **kwargs, 274 ) -> Union[QueryResponse, ApplyResult]: 275 response = self._query( 276 *args, 277 top_k=top_k, 278 vector=vector, 279 id=id, 280 namespace=namespace, 281 filter=filter, 282 include_values=include_values, 283 include_metadata=include_metadata, 284 sparse_vector=sparse_vector, 285 **kwargs, 286 ) 287 288 if kwargs.get("async_req", False) or kwargs.get("async_threadpool_executor", False): 289 return response 290 else: 291 return parse_query_response(response)
The Query operation searches a namespace, using a query vector. It retrieves the ids of the most similar items in a namespace, along with their similarity scores.
API reference: https://docs.pinecone.io/reference/query
Examples:
>>> index.query(vector=[1, 2, 3], top_k=10, namespace='my_namespace') >>> index.query(id='id1', top_k=10, namespace='my_namespace') >>> index.query(vector=[1, 2, 3], top_k=10, namespace='my_namespace', filter={'key': 'value'}) >>> index.query(id='id1', top_k=10, namespace='my_namespace', include_metadata=True, include_values=True) >>> index.query(vector=[1, 2, 3], sparse_vector={'indices': [1, 2], 'values': [0.2, 0.4]}, >>> top_k=10, namespace='my_namespace') >>> index.query(vector=[1, 2, 3], sparse_vector=SparseValues([1, 2], [0.2, 0.4]), >>> top_k=10, namespace='my_namespace')
Arguments:
- vector (List[float]): The query vector. This should be the same length as the dimension of the index
being queried. Each
query()
request can contain only one of the parametersid
orvector
.. [optional] - id (str): The unique ID of the vector to be used as a query vector.
Each
query()
request can contain only one of the parametersvector
orid
. [optional] - top_k (int): The number of results to return for each query. Must be an integer greater than 1.
- namespace (str): The namespace to query vectors from. If not specified, the default namespace is used. [optional]
- filter (Dict[str, Union[str, float, int, bool, List, dict]): The filter to apply. You can use vector metadata to limit your search. See https://www.pinecone.io/docs/metadata-filtering/.. [optional]
- include_values (bool): Indicates whether vector values are included in the response. If omitted the server will use the default value of False [optional]
- include_metadata (bool): Indicates whether metadata is included in the response as well as the ids. If omitted the server will use the default value of False [optional]
- sparse_vector: (Union[SparseValues, Dict[str, Union[List[float], List[int]]]]): sparse values of the query vector. Expected to be either a SparseValues object or a dict of the form: {'indices': List[int], 'values': List[float]}, where the lists each have the same length.
Returns: QueryResponse object which contains the list of the closest vectors as ScoredVector objects, and namespace name.
327 @validate_and_convert_errors 328 def query_namespaces( 329 self, 330 vector: Optional[List[float]], 331 namespaces: List[str], 332 metric: Literal["cosine", "euclidean", "dotproduct"], 333 top_k: Optional[int] = None, 334 filter: Optional[Dict[str, Union[str, float, int, bool, List, dict]]] = None, 335 include_values: Optional[bool] = None, 336 include_metadata: Optional[bool] = None, 337 sparse_vector: Optional[ 338 Union[SparseValues, Dict[str, Union[List[float], List[int]]]] 339 ] = None, 340 **kwargs, 341 ) -> QueryNamespacesResults: 342 if namespaces is None or len(namespaces) == 0: 343 raise ValueError("At least one namespace must be specified") 344 if sparse_vector is None and vector is not None and len(vector) == 0: 345 # If querying with a vector, it must not be empty 346 raise ValueError("Query vector must not be empty") 347 348 overall_topk = top_k if top_k is not None else 10 349 aggregator = QueryResultsAggregator(top_k=overall_topk, metric=metric) 350 351 target_namespaces = set(namespaces) # dedup namespaces 352 async_futures = [ 353 self.query( 354 vector=vector, 355 namespace=ns, 356 top_k=overall_topk, 357 filter=filter, 358 include_values=include_values, 359 include_metadata=include_metadata, 360 sparse_vector=sparse_vector, 361 async_threadpool_executor=True, 362 _preload_content=False, 363 **kwargs, 364 ) 365 for ns in target_namespaces 366 ] 367 368 for result in as_completed(async_futures): 369 raw_result = result.result() 370 response = json.loads(raw_result.data.decode("utf-8")) 371 aggregator.add_results(response) 372 373 final_results = aggregator.get_results() 374 return final_results
The query_namespaces() method is used to make a query to multiple namespaces in parallel and combine the results into one result set.
Since several asynchronous calls are made on your behalf when calling this method, you will need to tune the pool_threads and connection_pool_maxsize parameter of the Index constructor to suite your workload.
Examples:
from pinecone import Pinecone
pc = Pinecone(api_key="your-api-key")
index = pc.Index(
host="index-name",
pool_threads=32,
connection_pool_maxsize=32
)
query_vec = [0.1, 0.2, 0.3] # An embedding that matches the index dimension
combined_results = index.query_namespaces(
vector=query_vec,
namespaces=['ns1', 'ns2', 'ns3', 'ns4'],
metric="cosine",
top_k=10,
filter={'genre': {"$eq": "drama"}},
include_values=True,
include_metadata=True
)
for vec in combined_results.matches:
print(vec.id, vec.score)
print(combined_results.usage)
Arguments:
- vector (List[float]): The query vector, must be the same length as the dimension of the index being queried.
- namespaces (List[str]): The list of namespaces to query.
- top_k (Optional[int], optional): The number of results you would like to request from each namespace. Defaults to 10.
- metric (str): Must be one of 'cosine', 'euclidean', 'dotproduct'. This is needed in order to merge results across namespaces, since the interpretation of score depends on the index metric type.
- filter (Optional[Dict[str, Union[str, float, int, bool, List, dict]]], optional): Pass an optional filter to filter results based on metadata. Defaults to None.
- include_values (Optional[bool], optional): Boolean field indicating whether vector values should be included with results. Defaults to None.
- include_metadata (Optional[bool], optional): Boolean field indicating whether vector metadata should be included with results. Defaults to None.
- sparse_vector (Optional[ Union[SparseValues, Dict[str, Union[List[float], List[int]]]] ], optional): If you are working with a dotproduct index, you can pass a sparse vector as part of your hybrid search. Defaults to None.
Returns:
QueryNamespacesResults: A QueryNamespacesResults object containing the combined results from all namespaces, as well as the combined usage cost in read units.
376 @validate_and_convert_errors 377 def update( 378 self, 379 id: str, 380 values: Optional[List[float]] = None, 381 set_metadata: Optional[VectorMetadataTypedDict] = None, 382 namespace: Optional[str] = None, 383 sparse_values: Optional[Union[SparseValues, SparseVectorTypedDict]] = None, 384 **kwargs, 385 ) -> Dict[str, Any]: 386 return self._vector_api.update_vector( 387 IndexRequestFactory.update_request( 388 id=id, 389 values=values, 390 set_metadata=set_metadata, 391 namespace=namespace, 392 sparse_values=sparse_values, 393 **kwargs, 394 ), 395 **self._openapi_kwargs(kwargs), 396 )
The Update operation updates vector in a namespace. If a value is included, it will overwrite the previous value. If a set_metadata is included, the values of the fields specified in it will be added or overwrite the previous value.
API reference: https://docs.pinecone.io/reference/update
Examples:
>>> index.update(id='id1', values=[1, 2, 3], namespace='my_namespace') >>> index.update(id='id1', set_metadata={'key': 'value'}, namespace='my_namespace') >>> index.update(id='id1', values=[1, 2, 3], sparse_values={'indices': [1, 2], 'values': [0.2, 0.4]}, >>> namespace='my_namespace') >>> index.update(id='id1', values=[1, 2, 3], sparse_values=SparseValues(indices=[1, 2], values=[0.2, 0.4]), >>> namespace='my_namespace')
Arguments:
- id (str): Vector's unique id.
- values (List[float]): vector values to set. [optional]
- set_metadata (Dict[str, Union[str, float, int, bool, List[int], List[float], List[str]]]]): metadata to set for vector. [optional]
- namespace (str): Namespace name where to update the vector.. [optional]
- sparse_values: (Dict[str, Union[List[float], List[int]]]): sparse values to update for the vector. Expected to be either a SparseValues object or a dict of the form: {'indices': List[int], 'values': List[float]} where the lists each have the same length.
Returns: An empty dictionary if the update was successful.
398 @validate_and_convert_errors 399 def describe_index_stats( 400 self, filter: Optional[FilterTypedDict] = None, **kwargs 401 ) -> DescribeIndexStatsResponse: 402 return self._vector_api.describe_index_stats( 403 IndexRequestFactory.describe_index_stats_request(filter, **kwargs), 404 **self._openapi_kwargs(kwargs), 405 )
The DescribeIndexStats operation returns statistics about the index's contents. For example: The vector count per namespace and the number of dimensions.
API reference: https://docs.pinecone.io/reference/describe_index_stats_post
Examples:
>>> index.describe_index_stats() >>> index.describe_index_stats(filter={'key': 'value'})
Arguments:
- filter (Dict[str, Union[str, float, int, bool, List, dict]]):
- If this parameter is present, the operation only returns statistics for vectors that satisfy the filter.
- See https: //www.pinecone.io/docs/metadata-filtering/.. [optional]
Returns: DescribeIndexStatsResponse object which contains stats about the index.
407 @validate_and_convert_errors 408 def list_paginated( 409 self, 410 prefix: Optional[str] = None, 411 limit: Optional[int] = None, 412 pagination_token: Optional[str] = None, 413 namespace: Optional[str] = None, 414 **kwargs, 415 ) -> ListResponse: 416 args_dict = IndexRequestFactory.list_paginated_args( 417 prefix=prefix, 418 limit=limit, 419 pagination_token=pagination_token, 420 namespace=namespace, 421 **kwargs, 422 ) 423 return self._vector_api.list_vectors(**args_dict, **kwargs)
The list_paginated operation finds vectors based on an id prefix within a single namespace. It returns matching ids in a paginated form, with a pagination token to fetch the next page of results. This id list can then be passed to fetch or delete operations, depending on your use case.
Consider using the list
method to avoid having to handle pagination tokens manually.
Examples:
>>> results = index.list_paginated(prefix='99', limit=5, namespace='my_namespace') >>> [v.id for v in results.vectors] ['99', '990', '991', '992', '993'] >>> results.pagination.next eyJza2lwX3Bhc3QiOiI5OTMiLCJwcmVmaXgiOiI5OSJ9 >>> next_results = index.list_paginated(prefix='99', limit=5, namespace='my_namespace', pagination_token=results.pagination.next)
Arguments:
- prefix (Optional[str]): The id prefix to match. If unspecified, an empty string prefix will be used with the effect of listing all ids in a namespace [optional]
- limit (Optional[int]): The maximum number of ids to return. If unspecified, the server will use a default value. [optional]
- pagination_token (Optional[str]): A token needed to fetch the next page of results. This token is returned in the response if additional results are available. [optional]
- namespace (Optional[str]): The namespace to fetch vectors from. If not specified, the default namespace is used. [optional]
Returns: ListResponse object which contains the list of ids, the namespace name, pagination information, and usage showing the number of read_units consumed.
425 @validate_and_convert_errors 426 def list(self, **kwargs): 427 done = False 428 while not done: 429 results = self.list_paginated(**kwargs) 430 if len(results.vectors) > 0: 431 yield [v.id for v in results.vectors] 432 433 if results.pagination: 434 kwargs.update({"pagination_token": results.pagination.next}) 435 else: 436 done = True
The list operation accepts all of the same arguments as list_paginated, and returns a generator that yields a list of the matching vector ids in each page of results. It automatically handles pagination tokens on your behalf.
Examples:
>>> for ids in index.list(prefix='99', limit=5, namespace='my_namespace'): >>> print(ids) ['99', '990', '991', '992', '993'] ['994', '995', '996', '997', '998'] ['999']
Arguments:
- prefix (Optional[str]): The id prefix to match. If unspecified, an empty string prefix will be used with the effect of listing all ids in a namespace [optional]
- limit (Optional[int]): The maximum number of ids to return. If unspecified, the server will use a default value. [optional]
- pagination_token (Optional[str]): A token needed to fetch the next page of results. This token is returned in the response if additional results are available. [optional]
- namespace (Optional[str]): The namespace to fetch vectors from. If not specified, the default namespace is used. [optional]