pinecone .data .request_factory
1import logging 2from typing import Union, List, Optional, Dict, Any, cast 3 4from pinecone.core.openapi.db_data.models import ( 5 QueryRequest, 6 UpsertRequest, 7 DeleteRequest, 8 UpdateRequest, 9 DescribeIndexStatsRequest, 10 SearchRecordsRequest, 11 SearchRecordsRequestQuery, 12 SearchRecordsRequestRerank, 13 VectorValues, 14 SearchRecordsVector, 15 UpsertRecord, 16) 17from ..utils import parse_non_empty_args, convert_enum_to_string 18from .vector_factory import VectorFactory 19from .sparse_values_factory import SparseValuesFactory 20from pinecone.openapi_support import OPENAPI_ENDPOINT_PARAMS 21from .types import ( 22 VectorTypedDict, 23 SparseVectorTypedDict, 24 VectorMetadataTypedDict, 25 VectorTuple, 26 VectorTupleWithMetadata, 27 FilterTypedDict, 28 SearchQueryTypedDict, 29 SearchRerankTypedDict, 30 SearchQueryVectorTypedDict, 31) 32 33from .dataclasses import Vector, SparseValues, SearchQuery, SearchRerank, SearchQueryVector 34 35logger = logging.getLogger(__name__) 36""" @private """ 37 38 39def non_openapi_kwargs(kwargs: Dict[str, Any]) -> Dict[str, Any]: 40 return {k: v for k, v in kwargs.items() if k not in OPENAPI_ENDPOINT_PARAMS} 41 42 43class IndexRequestFactory: 44 @staticmethod 45 def query_request( 46 top_k: int, 47 vector: Optional[List[float]] = None, 48 id: Optional[str] = None, 49 namespace: Optional[str] = None, 50 filter: Optional[FilterTypedDict] = None, 51 include_values: Optional[bool] = None, 52 include_metadata: Optional[bool] = None, 53 sparse_vector: Optional[Union[SparseValues, SparseVectorTypedDict]] = None, 54 **kwargs, 55 ) -> QueryRequest: 56 if vector is not None and id is not None: 57 raise ValueError("Cannot specify both `id` and `vector`") 58 59 sparse_vector_normalized = SparseValuesFactory.build(sparse_vector) 60 args_dict = parse_non_empty_args( 61 [ 62 ("vector", vector), 63 ("id", id), 64 ("queries", None), 65 ("top_k", top_k), 66 ("namespace", namespace), 67 ("filter", filter), 68 ("include_values", include_values), 69 ("include_metadata", include_metadata), 70 ("sparse_vector", sparse_vector_normalized), 71 ] 72 ) 73 74 return QueryRequest( 75 **args_dict, _check_type=kwargs.pop("_check_type", False), **non_openapi_kwargs(kwargs) 76 ) 77 78 @staticmethod 79 def upsert_request( 80 vectors: Union[ 81 List[Vector], List[VectorTuple], List[VectorTupleWithMetadata], List[VectorTypedDict] 82 ], 83 namespace: Optional[str], 84 _check_type: bool, 85 **kwargs, 86 ) -> UpsertRequest: 87 args_dict = parse_non_empty_args([("namespace", namespace)]) 88 89 def vec_builder(v): 90 return VectorFactory.build(v, check_type=_check_type) 91 92 return UpsertRequest( 93 vectors=list(map(vec_builder, vectors)), 94 **args_dict, 95 _check_type=_check_type, 96 **non_openapi_kwargs(kwargs), 97 ) 98 99 @staticmethod 100 def delete_request( 101 ids: Optional[List[str]] = None, 102 delete_all: Optional[bool] = None, 103 namespace: Optional[str] = None, 104 filter: Optional[Dict[str, Union[str, float, int, bool, List, dict]]] = None, 105 **kwargs, 106 ) -> DeleteRequest: 107 _check_type = kwargs.pop("_check_type", False) 108 args_dict = parse_non_empty_args( 109 [("ids", ids), ("delete_all", delete_all), ("namespace", namespace), ("filter", filter)] 110 ) 111 return DeleteRequest(**args_dict, **non_openapi_kwargs(kwargs), _check_type=_check_type) 112 113 @staticmethod 114 def update_request( 115 id: str, 116 values: Optional[List[float]] = None, 117 set_metadata: Optional[VectorMetadataTypedDict] = None, 118 namespace: Optional[str] = None, 119 sparse_values: Optional[Union[SparseValues, SparseVectorTypedDict]] = None, 120 **kwargs, 121 ) -> UpdateRequest: 122 _check_type = kwargs.pop("_check_type", False) 123 sparse_values_normalized = SparseValuesFactory.build(sparse_values) 124 args_dict = parse_non_empty_args( 125 [ 126 ("values", values), 127 ("set_metadata", set_metadata), 128 ("namespace", namespace), 129 ("sparse_values", sparse_values_normalized), 130 ] 131 ) 132 133 return UpdateRequest( 134 id=id, **args_dict, _check_type=_check_type, **non_openapi_kwargs(kwargs) 135 ) 136 137 @staticmethod 138 def describe_index_stats_request( 139 filter: Optional[FilterTypedDict] = None, **kwargs 140 ) -> DescribeIndexStatsRequest: 141 _check_type = kwargs.pop("_check_type", False) 142 args_dict = parse_non_empty_args([("filter", filter)]) 143 144 return DescribeIndexStatsRequest( 145 **args_dict, **non_openapi_kwargs(kwargs), _check_type=_check_type 146 ) 147 148 @staticmethod 149 def list_paginated_args( 150 prefix: Optional[str] = None, 151 limit: Optional[int] = None, 152 pagination_token: Optional[str] = None, 153 namespace: Optional[str] = None, 154 **kwargs, 155 ) -> Dict[str, Any]: 156 return parse_non_empty_args( 157 [ 158 ("prefix", prefix), 159 ("limit", limit), 160 ("namespace", namespace), 161 ("pagination_token", pagination_token), 162 ] 163 ) 164 165 @staticmethod 166 def search_request( 167 query: Union[SearchQueryTypedDict, SearchQuery], 168 rerank: Optional[Union[SearchRerankTypedDict, SearchRerank]] = None, 169 fields: Optional[List[str]] = ["*"], # Default to returning all fields 170 ) -> SearchRecordsRequest: 171 request_args = parse_non_empty_args( 172 [ 173 ("query", IndexRequestFactory._parse_search_query(query)), 174 ("fields", fields), 175 ("rerank", IndexRequestFactory._parse_search_rerank(rerank)), 176 ] 177 ) 178 179 return SearchRecordsRequest(**request_args) 180 181 @staticmethod 182 def _parse_search_query( 183 query: Union[SearchQueryTypedDict, SearchQuery], 184 ) -> SearchRecordsRequestQuery: 185 if isinstance(query, SearchQuery): 186 query_dict = query.as_dict() 187 else: 188 query_dict = cast(dict[str, Any], query) 189 190 required_fields = {"top_k"} 191 for key in required_fields: 192 if query_dict.get(key, None) is None: 193 raise ValueError(f"Missing required field '{key}' in search query.") 194 195 # User-provided dict could contain object that need conversion 196 if isinstance(query_dict.get("vector", None), SearchQueryVector): 197 query_dict["vector"] = query_dict["vector"].as_dict() 198 199 srrq = SearchRecordsRequestQuery( 200 **{k: v for k, v in query_dict.items() if k not in {"vector"}} 201 ) 202 if query_dict.get("vector", None) is not None: 203 srrq.vector = IndexRequestFactory._parse_search_vector(query_dict["vector"]) 204 return srrq 205 206 @staticmethod 207 def _parse_search_vector( 208 vector: Optional[Union[SearchQueryVectorTypedDict, SearchQueryVector]], 209 ): 210 if vector is None: 211 return None 212 213 if isinstance(vector, SearchQueryVector): 214 if vector.values is None and vector.sparse_values is None: 215 return None 216 vector_dict = vector.as_dict() 217 else: 218 vector_dict = cast(dict[str, Any], vector) 219 if ( 220 vector_dict.get("values", None) is None 221 and vector_dict.get("sparse_values", None) is None 222 ): 223 return None 224 225 srv = SearchRecordsVector(**{k: v for k, v in vector_dict.items() if k not in {"values"}}) 226 227 values = vector_dict.get("values", None) 228 if values is not None: 229 srv.values = VectorValues(value=values) 230 231 return srv 232 233 @staticmethod 234 def _parse_search_rerank(rerank: Optional[Union[SearchRerankTypedDict, SearchRerank]] = None): 235 if rerank is None: 236 return None 237 238 if isinstance(rerank, SearchRerank): 239 rerank_dict = rerank.as_dict() 240 else: 241 rerank_dict = cast(dict[str, Any], rerank) 242 243 required_fields = {"model", "rank_fields"} 244 for key in required_fields: 245 if rerank_dict.get(key, None) is None: 246 raise ValueError(f"Missing required field '{key}' in rerank.") 247 248 rerank_dict["model"] = convert_enum_to_string(rerank_dict["model"]) 249 250 return SearchRecordsRequestRerank(**rerank_dict) 251 252 @staticmethod 253 def upsert_records_args(namespace: str, records: List[Dict]): 254 if namespace is None: 255 raise ValueError("namespace is required when upserting records") 256 if not records or len(records) == 0: 257 raise ValueError("No records provided") 258 259 records_to_upsert = [] 260 for record in records: 261 if not record.get("_id") and not record.get("id"): 262 raise ValueError("Each record must have an '_id' or 'id' value") 263 264 records_to_upsert.append( 265 UpsertRecord( 266 record.get("_id", record.get("id")), 267 **{k: v for k, v in record.items() if k not in {"_id", "id"}}, 268 ) 269 ) 270 271 return {"namespace": namespace, "upsert_record": records_to_upsert}
def
non_openapi_kwargs(kwargs: Dict[str, Any]) -> Dict[str, Any]:
class
IndexRequestFactory:
44class IndexRequestFactory: 45 @staticmethod 46 def query_request( 47 top_k: int, 48 vector: Optional[List[float]] = None, 49 id: Optional[str] = None, 50 namespace: Optional[str] = None, 51 filter: Optional[FilterTypedDict] = None, 52 include_values: Optional[bool] = None, 53 include_metadata: Optional[bool] = None, 54 sparse_vector: Optional[Union[SparseValues, SparseVectorTypedDict]] = None, 55 **kwargs, 56 ) -> QueryRequest: 57 if vector is not None and id is not None: 58 raise ValueError("Cannot specify both `id` and `vector`") 59 60 sparse_vector_normalized = SparseValuesFactory.build(sparse_vector) 61 args_dict = parse_non_empty_args( 62 [ 63 ("vector", vector), 64 ("id", id), 65 ("queries", None), 66 ("top_k", top_k), 67 ("namespace", namespace), 68 ("filter", filter), 69 ("include_values", include_values), 70 ("include_metadata", include_metadata), 71 ("sparse_vector", sparse_vector_normalized), 72 ] 73 ) 74 75 return QueryRequest( 76 **args_dict, _check_type=kwargs.pop("_check_type", False), **non_openapi_kwargs(kwargs) 77 ) 78 79 @staticmethod 80 def upsert_request( 81 vectors: Union[ 82 List[Vector], List[VectorTuple], List[VectorTupleWithMetadata], List[VectorTypedDict] 83 ], 84 namespace: Optional[str], 85 _check_type: bool, 86 **kwargs, 87 ) -> UpsertRequest: 88 args_dict = parse_non_empty_args([("namespace", namespace)]) 89 90 def vec_builder(v): 91 return VectorFactory.build(v, check_type=_check_type) 92 93 return UpsertRequest( 94 vectors=list(map(vec_builder, vectors)), 95 **args_dict, 96 _check_type=_check_type, 97 **non_openapi_kwargs(kwargs), 98 ) 99 100 @staticmethod 101 def delete_request( 102 ids: Optional[List[str]] = None, 103 delete_all: Optional[bool] = None, 104 namespace: Optional[str] = None, 105 filter: Optional[Dict[str, Union[str, float, int, bool, List, dict]]] = None, 106 **kwargs, 107 ) -> DeleteRequest: 108 _check_type = kwargs.pop("_check_type", False) 109 args_dict = parse_non_empty_args( 110 [("ids", ids), ("delete_all", delete_all), ("namespace", namespace), ("filter", filter)] 111 ) 112 return DeleteRequest(**args_dict, **non_openapi_kwargs(kwargs), _check_type=_check_type) 113 114 @staticmethod 115 def update_request( 116 id: str, 117 values: Optional[List[float]] = None, 118 set_metadata: Optional[VectorMetadataTypedDict] = None, 119 namespace: Optional[str] = None, 120 sparse_values: Optional[Union[SparseValues, SparseVectorTypedDict]] = None, 121 **kwargs, 122 ) -> UpdateRequest: 123 _check_type = kwargs.pop("_check_type", False) 124 sparse_values_normalized = SparseValuesFactory.build(sparse_values) 125 args_dict = parse_non_empty_args( 126 [ 127 ("values", values), 128 ("set_metadata", set_metadata), 129 ("namespace", namespace), 130 ("sparse_values", sparse_values_normalized), 131 ] 132 ) 133 134 return UpdateRequest( 135 id=id, **args_dict, _check_type=_check_type, **non_openapi_kwargs(kwargs) 136 ) 137 138 @staticmethod 139 def describe_index_stats_request( 140 filter: Optional[FilterTypedDict] = None, **kwargs 141 ) -> DescribeIndexStatsRequest: 142 _check_type = kwargs.pop("_check_type", False) 143 args_dict = parse_non_empty_args([("filter", filter)]) 144 145 return DescribeIndexStatsRequest( 146 **args_dict, **non_openapi_kwargs(kwargs), _check_type=_check_type 147 ) 148 149 @staticmethod 150 def list_paginated_args( 151 prefix: Optional[str] = None, 152 limit: Optional[int] = None, 153 pagination_token: Optional[str] = None, 154 namespace: Optional[str] = None, 155 **kwargs, 156 ) -> Dict[str, Any]: 157 return parse_non_empty_args( 158 [ 159 ("prefix", prefix), 160 ("limit", limit), 161 ("namespace", namespace), 162 ("pagination_token", pagination_token), 163 ] 164 ) 165 166 @staticmethod 167 def search_request( 168 query: Union[SearchQueryTypedDict, SearchQuery], 169 rerank: Optional[Union[SearchRerankTypedDict, SearchRerank]] = None, 170 fields: Optional[List[str]] = ["*"], # Default to returning all fields 171 ) -> SearchRecordsRequest: 172 request_args = parse_non_empty_args( 173 [ 174 ("query", IndexRequestFactory._parse_search_query(query)), 175 ("fields", fields), 176 ("rerank", IndexRequestFactory._parse_search_rerank(rerank)), 177 ] 178 ) 179 180 return SearchRecordsRequest(**request_args) 181 182 @staticmethod 183 def _parse_search_query( 184 query: Union[SearchQueryTypedDict, SearchQuery], 185 ) -> SearchRecordsRequestQuery: 186 if isinstance(query, SearchQuery): 187 query_dict = query.as_dict() 188 else: 189 query_dict = cast(dict[str, Any], query) 190 191 required_fields = {"top_k"} 192 for key in required_fields: 193 if query_dict.get(key, None) is None: 194 raise ValueError(f"Missing required field '{key}' in search query.") 195 196 # User-provided dict could contain object that need conversion 197 if isinstance(query_dict.get("vector", None), SearchQueryVector): 198 query_dict["vector"] = query_dict["vector"].as_dict() 199 200 srrq = SearchRecordsRequestQuery( 201 **{k: v for k, v in query_dict.items() if k not in {"vector"}} 202 ) 203 if query_dict.get("vector", None) is not None: 204 srrq.vector = IndexRequestFactory._parse_search_vector(query_dict["vector"]) 205 return srrq 206 207 @staticmethod 208 def _parse_search_vector( 209 vector: Optional[Union[SearchQueryVectorTypedDict, SearchQueryVector]], 210 ): 211 if vector is None: 212 return None 213 214 if isinstance(vector, SearchQueryVector): 215 if vector.values is None and vector.sparse_values is None: 216 return None 217 vector_dict = vector.as_dict() 218 else: 219 vector_dict = cast(dict[str, Any], vector) 220 if ( 221 vector_dict.get("values", None) is None 222 and vector_dict.get("sparse_values", None) is None 223 ): 224 return None 225 226 srv = SearchRecordsVector(**{k: v for k, v in vector_dict.items() if k not in {"values"}}) 227 228 values = vector_dict.get("values", None) 229 if values is not None: 230 srv.values = VectorValues(value=values) 231 232 return srv 233 234 @staticmethod 235 def _parse_search_rerank(rerank: Optional[Union[SearchRerankTypedDict, SearchRerank]] = None): 236 if rerank is None: 237 return None 238 239 if isinstance(rerank, SearchRerank): 240 rerank_dict = rerank.as_dict() 241 else: 242 rerank_dict = cast(dict[str, Any], rerank) 243 244 required_fields = {"model", "rank_fields"} 245 for key in required_fields: 246 if rerank_dict.get(key, None) is None: 247 raise ValueError(f"Missing required field '{key}' in rerank.") 248 249 rerank_dict["model"] = convert_enum_to_string(rerank_dict["model"]) 250 251 return SearchRecordsRequestRerank(**rerank_dict) 252 253 @staticmethod 254 def upsert_records_args(namespace: str, records: List[Dict]): 255 if namespace is None: 256 raise ValueError("namespace is required when upserting records") 257 if not records or len(records) == 0: 258 raise ValueError("No records provided") 259 260 records_to_upsert = [] 261 for record in records: 262 if not record.get("_id") and not record.get("id"): 263 raise ValueError("Each record must have an '_id' or 'id' value") 264 265 records_to_upsert.append( 266 UpsertRecord( 267 record.get("_id", record.get("id")), 268 **{k: v for k, v in record.items() if k not in {"_id", "id"}}, 269 ) 270 ) 271 272 return {"namespace": namespace, "upsert_record": records_to_upsert}
@staticmethod
def
query_request( top_k: int, vector: Optional[List[float]] = None, id: Optional[str] = None, namespace: Optional[str] = None, filter: Union[Dict[str, Union[str, int, float, bool]], Dict[Literal['$eq'], Union[str, int, float, bool]], Dict[Literal['$ne'], Union[str, int, float, bool]], Dict[Literal['$gt'], Union[int, float]], Dict[Literal['$gte'], Union[int, float]], Dict[Literal['$lt'], Union[int, float]], Dict[Literal['$lte'], Union[int, float]], Dict[Literal['$in'], List[Union[str, int, float, bool]]], Dict[Literal['$nin'], List[Union[str, int, float, bool]]], Dict[Literal['$and'], List[Union[Dict[str, Union[str, int, float, bool]], Dict[Literal['$eq'], Union[str, int, float, bool]], Dict[Literal['$ne'], Union[str, int, float, bool]], Dict[Literal['$gt'], Union[int, float]], Dict[Literal['$gte'], Union[int, float]], Dict[Literal['$lt'], Union[int, float]], Dict[Literal['$lte'], Union[int, float]], Dict[Literal['$in'], List[Union[str, int, float, bool]]], Dict[Literal['$nin'], List[Union[str, int, float, bool]]]]]], NoneType] = None, include_values: Optional[bool] = None, include_metadata: Optional[bool] = None, sparse_vector: Union[pinecone.data.dataclasses.sparse_values.SparseValues, pinecone.data.types.sparse_vector_typed_dict.SparseVectorTypedDict, NoneType] = None, **kwargs) -> pinecone.core.openapi.db_data.model.query_request.QueryRequest:
45 @staticmethod 46 def query_request( 47 top_k: int, 48 vector: Optional[List[float]] = None, 49 id: Optional[str] = None, 50 namespace: Optional[str] = None, 51 filter: Optional[FilterTypedDict] = None, 52 include_values: Optional[bool] = None, 53 include_metadata: Optional[bool] = None, 54 sparse_vector: Optional[Union[SparseValues, SparseVectorTypedDict]] = None, 55 **kwargs, 56 ) -> QueryRequest: 57 if vector is not None and id is not None: 58 raise ValueError("Cannot specify both `id` and `vector`") 59 60 sparse_vector_normalized = SparseValuesFactory.build(sparse_vector) 61 args_dict = parse_non_empty_args( 62 [ 63 ("vector", vector), 64 ("id", id), 65 ("queries", None), 66 ("top_k", top_k), 67 ("namespace", namespace), 68 ("filter", filter), 69 ("include_values", include_values), 70 ("include_metadata", include_metadata), 71 ("sparse_vector", sparse_vector_normalized), 72 ] 73 ) 74 75 return QueryRequest( 76 **args_dict, _check_type=kwargs.pop("_check_type", False), **non_openapi_kwargs(kwargs) 77 )
@staticmethod
def
upsert_request( vectors: Union[List[pinecone.data.dataclasses.vector.Vector], List[Tuple[str, List[float]]], List[Tuple[str, List[float], Dict[str, Union[str, int, float, List[str], List[int], List[float]]]]], List[pinecone.data.types.vector_typed_dict.VectorTypedDict]], namespace: Optional[str], _check_type: bool, **kwargs) -> pinecone.core.openapi.db_data.model.upsert_request.UpsertRequest:
79 @staticmethod 80 def upsert_request( 81 vectors: Union[ 82 List[Vector], List[VectorTuple], List[VectorTupleWithMetadata], List[VectorTypedDict] 83 ], 84 namespace: Optional[str], 85 _check_type: bool, 86 **kwargs, 87 ) -> UpsertRequest: 88 args_dict = parse_non_empty_args([("namespace", namespace)]) 89 90 def vec_builder(v): 91 return VectorFactory.build(v, check_type=_check_type) 92 93 return UpsertRequest( 94 vectors=list(map(vec_builder, vectors)), 95 **args_dict, 96 _check_type=_check_type, 97 **non_openapi_kwargs(kwargs), 98 )
@staticmethod
def
delete_request( ids: Optional[List[str]] = None, delete_all: Optional[bool] = None, namespace: Optional[str] = None, filter: Optional[Dict[str, Union[str, float, int, bool, List, dict]]] = None, **kwargs) -> pinecone.core.openapi.db_data.model.delete_request.DeleteRequest:
100 @staticmethod 101 def delete_request( 102 ids: Optional[List[str]] = None, 103 delete_all: Optional[bool] = None, 104 namespace: Optional[str] = None, 105 filter: Optional[Dict[str, Union[str, float, int, bool, List, dict]]] = None, 106 **kwargs, 107 ) -> DeleteRequest: 108 _check_type = kwargs.pop("_check_type", False) 109 args_dict = parse_non_empty_args( 110 [("ids", ids), ("delete_all", delete_all), ("namespace", namespace), ("filter", filter)] 111 ) 112 return DeleteRequest(**args_dict, **non_openapi_kwargs(kwargs), _check_type=_check_type)
@staticmethod
def
update_request( id: str, values: Optional[List[float]] = None, set_metadata: Optional[Dict[str, Union[str, int, float, List[str], List[int], List[float]]]] = None, namespace: Optional[str] = None, sparse_values: Union[pinecone.data.dataclasses.sparse_values.SparseValues, pinecone.data.types.sparse_vector_typed_dict.SparseVectorTypedDict, NoneType] = None, **kwargs) -> pinecone.core.openapi.db_data.model.update_request.UpdateRequest:
114 @staticmethod 115 def update_request( 116 id: str, 117 values: Optional[List[float]] = None, 118 set_metadata: Optional[VectorMetadataTypedDict] = None, 119 namespace: Optional[str] = None, 120 sparse_values: Optional[Union[SparseValues, SparseVectorTypedDict]] = None, 121 **kwargs, 122 ) -> UpdateRequest: 123 _check_type = kwargs.pop("_check_type", False) 124 sparse_values_normalized = SparseValuesFactory.build(sparse_values) 125 args_dict = parse_non_empty_args( 126 [ 127 ("values", values), 128 ("set_metadata", set_metadata), 129 ("namespace", namespace), 130 ("sparse_values", sparse_values_normalized), 131 ] 132 ) 133 134 return UpdateRequest( 135 id=id, **args_dict, _check_type=_check_type, **non_openapi_kwargs(kwargs) 136 )
@staticmethod
def
describe_index_stats_request( filter: Union[Dict[str, Union[str, int, float, bool]], Dict[Literal['$eq'], Union[str, int, float, bool]], Dict[Literal['$ne'], Union[str, int, float, bool]], Dict[Literal['$gt'], Union[int, float]], Dict[Literal['$gte'], Union[int, float]], Dict[Literal['$lt'], Union[int, float]], Dict[Literal['$lte'], Union[int, float]], Dict[Literal['$in'], List[Union[str, int, float, bool]]], Dict[Literal['$nin'], List[Union[str, int, float, bool]]], Dict[Literal['$and'], List[Union[Dict[str, Union[str, int, float, bool]], Dict[Literal['$eq'], Union[str, int, float, bool]], Dict[Literal['$ne'], Union[str, int, float, bool]], Dict[Literal['$gt'], Union[int, float]], Dict[Literal['$gte'], Union[int, float]], Dict[Literal['$lt'], Union[int, float]], Dict[Literal['$lte'], Union[int, float]], Dict[Literal['$in'], List[Union[str, int, float, bool]]], Dict[Literal['$nin'], List[Union[str, int, float, bool]]]]]], NoneType] = None, **kwargs) -> pinecone.core.openapi.db_data.model.describe_index_stats_request.DescribeIndexStatsRequest:
138 @staticmethod 139 def describe_index_stats_request( 140 filter: Optional[FilterTypedDict] = None, **kwargs 141 ) -> DescribeIndexStatsRequest: 142 _check_type = kwargs.pop("_check_type", False) 143 args_dict = parse_non_empty_args([("filter", filter)]) 144 145 return DescribeIndexStatsRequest( 146 **args_dict, **non_openapi_kwargs(kwargs), _check_type=_check_type 147 )
@staticmethod
def
list_paginated_args( prefix: Optional[str] = None, limit: Optional[int] = None, pagination_token: Optional[str] = None, namespace: Optional[str] = None, **kwargs) -> Dict[str, Any]:
149 @staticmethod 150 def list_paginated_args( 151 prefix: Optional[str] = None, 152 limit: Optional[int] = None, 153 pagination_token: Optional[str] = None, 154 namespace: Optional[str] = None, 155 **kwargs, 156 ) -> Dict[str, Any]: 157 return parse_non_empty_args( 158 [ 159 ("prefix", prefix), 160 ("limit", limit), 161 ("namespace", namespace), 162 ("pagination_token", pagination_token), 163 ] 164 )
@staticmethod
def
search_request( query: Union[pinecone.data.types.search_query_typed_dict.SearchQueryTypedDict, pinecone.data.dataclasses.search_query.SearchQuery], rerank: Union[pinecone.data.types.search_rerank_typed_dict.SearchRerankTypedDict, pinecone.data.dataclasses.search_rerank.SearchRerank, NoneType] = None, fields: Optional[List[str]] = ['*']) -> pinecone.core.openapi.db_data.model.search_records_request.SearchRecordsRequest:
166 @staticmethod 167 def search_request( 168 query: Union[SearchQueryTypedDict, SearchQuery], 169 rerank: Optional[Union[SearchRerankTypedDict, SearchRerank]] = None, 170 fields: Optional[List[str]] = ["*"], # Default to returning all fields 171 ) -> SearchRecordsRequest: 172 request_args = parse_non_empty_args( 173 [ 174 ("query", IndexRequestFactory._parse_search_query(query)), 175 ("fields", fields), 176 ("rerank", IndexRequestFactory._parse_search_rerank(rerank)), 177 ] 178 ) 179 180 return SearchRecordsRequest(**request_args)
@staticmethod
def
upsert_records_args(namespace: str, records: List[Dict]):
253 @staticmethod 254 def upsert_records_args(namespace: str, records: List[Dict]): 255 if namespace is None: 256 raise ValueError("namespace is required when upserting records") 257 if not records or len(records) == 0: 258 raise ValueError("No records provided") 259 260 records_to_upsert = [] 261 for record in records: 262 if not record.get("_id") and not record.get("id"): 263 raise ValueError("Each record must have an '_id' or 'id' value") 264 265 records_to_upsert.append( 266 UpsertRecord( 267 record.get("_id", record.get("id")), 268 **{k: v for k, v in record.items() if k not in {"_id", "id"}}, 269 ) 270 ) 271 272 return {"namespace": namespace, "upsert_record": records_to_upsert}