PineconeAsyncio¶
- class pinecone.PineconeAsyncio(api_key: str | None = None, host: str | None = None, proxy_url: str | None = None, ssl_ca_certs: str | None = None, ssl_verify: bool | None = None, additional_headers: dict[str, str] | None = {}, **kwargs)[source]¶
PineconeAsynciois an asyncio client for interacting with Pinecone’s control plane API.This class implements methods for managing and interacting with Pinecone resources such as collections and indexes.
To perform data operations such as inserting and querying vectors, use the
IndexAsyncioclass.import asyncio from pinecone import PineconeAsyncio async def main(): async with PineconeAsyncio() as pc: async with pc.IndexAsyncio(host="my-index.pinecone.io") as idx: await idx.upsert(vectors=[(1, [1, 2, 3]), (2, [4, 5, 6])]) asyncio.run(main())
- PineconeAsyncio.__init__(api_key: str | None = None, host: str | None = None, proxy_url: str | None = None, ssl_ca_certs: str | None = None, ssl_verify: bool | None = None, additional_headers: dict[str, str] | None = {}, **kwargs) None[source]¶
Initialize the
PineconeAsyncioclient.- Parameters:
api_key (str, optional) – The API key to use for authentication. If not passed via kwarg, the API key will be read from the environment variable
PINECONE_API_KEY.host (str, optional) – The control plane host. If unspecified, the host
api.pinecone.iowill be used.proxy_url (str, optional) – The URL of the proxy to use for the connection.
ssl_ca_certs (str, optional) – The path to the SSL CA certificate bundle to use for the connection. This path should point to a file in PEM format. When not passed, the SDK will use the certificate bundle returned from
certifi.where().ssl_verify (bool, optional) – SSL verification is performed by default, but can be disabled using the boolean flag when testing with Pinecone Local or troubleshooting a proxy setup. You should never run with SSL verification disabled in production.
additional_headers (dict[str, str], optional) – Additional headers to pass to the API. This is mainly to support internal testing at Pinecone. End users should not need to use this unless following specific instructions to do so.
Note
The
proxy_headersparameter is not currently supported forPineconeAsyncio.
- PineconeAsyncio.IndexAsyncio(host: str, **kwargs) _IndexAsyncio[source]¶
Build an asyncio-compatible client for index data operations.
- Parameters:
host – The host url of the index.
import os import asyncio from pinecone import PineconeAsyncio api_key = os.environ.get("PINECONE_API_KEY") index_host = os.environ.get("PINECONE_INDEX_HOST") async def main(): async with Pinecone(api_key=api_key) as pc: async with pc.Index(host=index_host) as idx: # Now you're ready to perform data operations await index.query(vector=[...], top_k=10) asyncio.run(main())
To find your host url, you can use the
describe_index. Or, alternatively, the host is displayed in the Pinecone web console.import os import asyncio from pinecone import PineconeAsyncio async def main(): async with PineconeAsyncio( api_key=os.environ.get("PINECONE_API_KEY") ) as pc: host = await pc.describe_index('index-name').host asyncio.run(main())
Alternative setup
Like instances of the
PineconeAsyncioclass, instances ofIndexAsynciohave async context that needs to be cleaned up when you are done with it in order to avoid error messages about unclosed session from aiohttp. Nesting these in code is a bit cumbersome, so if you are only planning to do data operations you may prefer to setup theIndexAsyncioobject via thePineconeclass which will avoid creating an outer async context.import os import asyncio from pinecone import Pinecone api_key = os.environ.get("PINECONE_API_KEY") async def main(): pc = Pinecone(api_key=api_key) # sync client, so no async context to worry about async with pc.AsyncioIndex(host='your_index_host') as idx: # Now you're ready to perform data operations await idx.query(vector=[...], top_k=10)
- async PineconeAsyncio.close() None[source]¶
Cleanup resources used by the Pinecone client.
This method should be called when the client is no longer needed so that it can cleanup the aiohttp session and other resources.
After close has been called, the client instance should not be used.
import asyncio from pinecone import PineconeAsyncio async def main(): pc = PineconeAsyncio() desc = await pc.describe_index(name="my-index") await pc.close() asyncio.run(main())
If you are using the client as a context manager, the close method is called automatically when exiting.
import asyncio from pinecone import PineconeAsyncio async def main(): async with PineconeAsyncio() as pc: desc = await pc.describe_index(name="my-index") # No need to call close in this case because the "async with" syntax # automatically calls close when exiting the block. asyncio.run(main())
DB Control Plane¶
Indexes¶
- async PineconeAsyncio.create_index(name: str, spec: Dict | 'ServerlessSpec' | 'PodSpec' | 'ByocSpec', dimension: int | None = None, metric: 'Metric' | str | None = 'cosine', timeout: int | None = None, deletion_protection: 'DeletionProtection' | str | None = 'disabled', vector_type: 'VectorType' | str | None = 'dense', tags: dict[str, str] | None = None) IndexModel[source]¶
Creates a Pinecone index.
- Parameters:
name (str) – The name of the index to create. Must be unique within your project and cannot be changed once created. Allowed characters are lowercase letters, numbers, and hyphens and the name may not begin or end with hyphens. Maximum length is 45 characters.
metric (str, optional) – Type of similarity metric used in the vector index when querying, one of
{"cosine", "dotproduct", "euclidean"}.spec (Dict) – A dictionary containing configurations describing how the index should be deployed. For serverless indexes, specify region and cloud. Optionally, you can specify
read_capacityto configure dedicated read capacity mode (OnDemand or Dedicated) andschemato configure which metadata fields are filterable. For pod indexes, specify replicas, shards, pods, pod_type, metadata_config, and source_collection. Alternatively, use theServerlessSpecorPodSpecobjects to specify these configurations.dimension (int) – If you are creating an index with
vector_type="dense"(which is the default), you need to specifydimensionto indicate the size of your vectors. This should match the dimension of the embeddings you will be inserting. For example, if you are using OpenAI’s CLIP model, you should usedimension=1536. Dimension is a required field when creating an index withvector_type="dense"and should not be passed whenvector_type="sparse".timeout (int, optional) – Specify the number of seconds to wait until index gets ready. If None, wait indefinitely; if >=0, time out after this many seconds; if -1, return immediately and do not wait.
deletion_protection (Optional[Literal["enabled", "disabled"]]) – If enabled, the index cannot be deleted. If disabled, the index can be deleted.
vector_type (str, optional) – The type of vectors to be stored in the index. One of
{"dense", "sparse"}.tags (Optional[dict[str, str]]) – Tags are key-value pairs you can attach to indexes to better understand, organize, and identify your resources. Some example use cases include tagging indexes with the name of the model that generated the embeddings, the date the index was created, or the purpose of the index.
- Returns:
A
IndexModelinstance containing a description of the index that was created.
Creating a serverless index
import os import asyncio from pinecone import ( PineconeAsyncio, ServerlessSpec, CloudProvider, AwsRegion, Metric, DeletionProtection, VectorType ) async def main(): async with PineconeAsyncio(api_key=os.environ.get("PINECONE_API_KEY")) as pc: await pc.create_index( name="my_index", dimension=1536, metric=Metric.COSINE, spec=ServerlessSpec( cloud=CloudProvider.AWS, region=AwsRegion.US_WEST_2, read_capacity={ "mode": "Dedicated", "dedicated": { "node_type": "t1", "scaling": "Manual", "manual": {"shards": 2, "replicas": 2}, }, }, schema={ "genre": {"filterable": True}, "year": {"filterable": True}, "rating": {"filterable": True}, }, ), deletion_protection=DeletionProtection.DISABLED, vector_type=VectorType.DENSE, tags={ "model": "clip", "app": "image-search", "env": "production" } ) asyncio.run(main())
Creating a pod index
import os import asyncio from pinecone import ( Pinecone, PodSpec, PodIndexEnvironment, PodType, Metric, DeletionProtection, VectorType ) async def main(): async with Pinecone(api_key=os.environ.get("PINECONE_API_KEY")) as pc: await pc.create_index( name="my_index", dimension=1536, metric=Metric.COSINE, spec=PodSpec( environment=PodIndexEnvironment.US_EAST4_GCP, pod_type=PodType.P1_X1 ), deletion_protection=DeletionProtection.DISABLED, tags={ "model": "clip", "app": "image-search", "env": "testing" } ) asyncio.run(main())
- async PineconeAsyncio.create_index_for_model(name: str, cloud: 'CloudProvider' | str, region: 'AwsRegion' | 'GcpRegion' | 'AzureRegion' | str, embed: IndexEmbed' | 'CreateIndexForModelEmbedTypedDict, tags: dict[str, str] | None = None, deletion_protection: 'DeletionProtection' | str | None = 'disabled', read_capacity: 'ReadCapacityDict' | 'ReadCapacity' | 'ReadCapacityOnDemandSpec' | 'ReadCapacityDedicatedSpec' | None = None, schema: dict[str, 'MetadataSchemaFieldConfig'] | dict[str, dict[str, Any]] | 'BackupModelSchema' | None = None, timeout: int | None = None) IndexModel[source]¶
- Parameters:
name (str) – The name of the index to create. Must be unique within your project and cannot be changed once created. Allowed characters are lowercase letters, numbers, and hyphens and the name may not begin or end with hyphens. Maximum length is 45 characters.
cloud (str) – The cloud provider to use for the index. One of
{"aws", "gcp", "azure"}.region (str) – The region to use for the index. Enum objects
AwsRegion,GcpRegion, andAzureRegionare also available to help you quickly set these parameters, but may not be up to date as new regions become available.embed (Union[Dict, IndexEmbed]) – The embedding configuration for the index. This param accepts a dictionary or an instance of the
IndexEmbedobject.tags (Optional[dict[str, str]]) – Tags are key-value pairs you can attach to indexes to better understand, organize, and identify your resources. Some example use cases include tagging indexes with the name of the model that generated the embeddings, the date the index was created, or the purpose of the index.
deletion_protection (Optional[Literal["enabled", "disabled"]]) – If enabled, the index cannot be deleted. If disabled, the index can be deleted. This setting can be changed with
configure_index.read_capacity (Optional[Union[ReadCapacityDict, ReadCapacity, ReadCapacityOnDemandSpec, ReadCapacityDedicatedSpec]]) – Optional read capacity configuration. You can specify
read_capacityto configure dedicated read capacity mode (OnDemand or Dedicated). SeeServerlessSpecdocumentation for details on read capacity configuration.schema (Optional[Union[dict[str, MetadataSchemaFieldConfig], dict[str, dict[str, Any]], BackupModelSchema]]) – Optional metadata schema configuration. You can specify
schemato configure which metadata fields are filterable. The schema can be provided as a dictionary mapping field names to their configurations (e.g.,{"genre": {"filterable": True}}) or as a dictionary with afieldskey (e.g.,{"fields": {"genre": {"filterable": True}}}).timeout (Optional[int]) – Specify the number of seconds to wait until index is ready to receive data. If None, wait indefinitely; if >=0, time out after this many seconds; if -1, return immediately and do not wait.
- Returns:
A description of the index that was created.
- Return type:
IndexModel
This method is used to create a Serverless index that is configured for use with Pinecone’s integrated inference models.
The resulting index can be described, listed, configured, and deleted like any other Pinecone index with the
describe_index,list_indexes,configure_index, anddelete_indexmethods.After the model is created, you can upsert records into the index with the
upsert_recordsmethod, and search your records with thesearchmethod.import asyncio from pinecone import ( PineconeAsyncio, IndexEmbed, CloudProvider, AwsRegion, EmbedModel, Metric, ) async def main(): async with PineconeAsyncio() as pc: if not await pc.has_index("book-search"): desc = await pc.create_index_for_model( name="book-search", cloud=CloudProvider.AWS, region=AwsRegion.US_EAST_1, embed=IndexEmbed( model=EmbedModel.Multilingual_E5_Large, metric=Metric.COSINE, field_map={ "text": "description", }, ) ) asyncio.run(main())
Creating an index for model with schema and dedicated read capacity
import asyncio from pinecone import ( PineconeAsyncio, IndexEmbed, CloudProvider, AwsRegion, EmbedModel, Metric, ) async def main(): async with PineconeAsyncio() as pc: if not await pc.has_index("book-search"): desc = await pc.create_index_for_model( name="book-search", cloud=CloudProvider.AWS, region=AwsRegion.US_EAST_1, embed=IndexEmbed( model=EmbedModel.Multilingual_E5_Large, metric=Metric.COSINE, field_map={ "text": "description", }, ), read_capacity={ "mode": "Dedicated", "dedicated": { "node_type": "t1", "scaling": "Manual", "manual": {"shards": 2, "replicas": 2}, }, }, schema={ "genre": {"filterable": True}, "year": {"filterable": True}, "rating": {"filterable": True}, }, ) asyncio.run(main())
See also:
See the Model Gallery to learn about available models
- PineconeAsyncio.create_index_from_backup(*, name: str, backup_id: str, deletion_protection: 'DeletionProtection' | str | None = 'disabled', tags: dict[str, str] | None = None, timeout: int | None = None) IndexModel[source]¶
Create an index from a backup.
Call
list_backupsto get a list of backups for your project.- Parameters:
name (str) – The name of the index to create.
backup_id (str) – The ID of the backup to restore.
deletion_protection (Optional[Literal["enabled", "disabled"]]) – If enabled, the index cannot be deleted. If disabled, the index can be deleted. This setting can be changed with
configure_index.tags (Optional[dict[str, str]]) – Tags are key-value pairs you can attach to indexes to better understand, organize, and identify your resources. Some example use cases include tagging indexes with the name of the model that generated the embeddings, the date the index was created, or the purpose of the index.
timeout – Specify the number of seconds to wait until index is ready to receive data. If None, wait indefinitely; if >=0, time out after this many seconds; if -1, return immediately and do not wait.
- Returns:
A description of the index that was created.
- Return type:
IndexModel
- async PineconeAsyncio.list_indexes() IndexList[source]¶
- Returns:
Returns an
IndexListobject, which is iterable and contains a list ofIndexModelobjects. TheIndexListalso has a convenience methodnames()which returns a list of index names for situations where you just want to iterate over all index names.
Lists all indexes in your project.
The results include a description of all indexes in your project, including the index name, dimension, metric, status, and spec.
If you simply want to check whether an index exists, see the
has_index()convenience method.You can use the
list_indexes()method to iterate over descriptions of every index in your project.import asyncio from pinecone import PineconeAsyncio async def main(): pc = PineconeAsyncio() available_indexes = await pc.list_indexes() for index in available_indexes: print(index.name) print(index.dimension) print(index.metric) print(index.status) print(index.host) print(index.spec) await pc.close() asyncio.run(main())
- async PineconeAsyncio.describe_index(name: str) IndexModel[source]¶
- Parameters:
name – the name of the index to describe.
- Returns:
Returns an
IndexModelobject which gives access to properties such as the index name, dimension, metric, host url, status, and spec.
Describes a Pinecone index.
Getting your index host url
In a real production situation, you probably want to store the host url in an environment variable so you don’t have to call describe_index and re-fetch it every time you want to use the index. But this example shows how to get the value from the API using describe_index.
import asyncio from pinecone import Pinecone, PineconeAsyncio, Index async def main(): pc = PineconeAsyncio() index_name="my_index" description = await pc.describe_index(name=index_name) print(description) # { # "name": "my_index", # "metric": "cosine", # "host": "my_index-dojoi3u.svc.aped-4627-b74a.pinecone.io", # "spec": { # "serverless": { # "cloud": "aws", # "region": "us-east-1" # } # }, # "status": { # "ready": true, # "state": "Ready" # }, # "vector_type": "dense", # "dimension": 1024, # "deletion_protection": "enabled", # "tags": { # "environment": "production" # } # } print(f"Your index is hosted at {description.host}") await pc.close() async with Pinecone().IndexAsyncio(host=description.host) as idx: await idx.upsert(vectors=[...]) asyncio.run(main())
- async PineconeAsyncio.configure_index(name: str, replicas: int | None = None, pod_type: 'PodType' | str | None = None, deletion_protection: 'DeletionProtection' | str | None = None, tags: dict[str, str] | None = None, embed: 'ConfigureIndexEmbed' | Dict | None = None, read_capacity: 'ReadCapacityDict' | 'ReadCapacity' | 'ReadCapacityOnDemandSpec' | 'ReadCapacityDedicatedSpec' | None = None) None[source]¶
- Param:
name: the name of the Index
- Param:
replicas: the desired number of replicas, lowest value is 0.
- Param:
pod_type: the new pod_type for the index. To learn more about the available pod types, please see Understanding Indexes
- Param:
deletion_protection: If set to ‘enabled’, the index cannot be deleted. If ‘disabled’, the index can be deleted.
- Param:
tags: A dictionary of tags to apply to the index. Tags are key-value pairs that can be used to organize and manage indexes. To remove a tag, set the value to “”. Tags passed to configure_index will be merged with existing tags and any with the value empty string will be removed.
- Parameters:
embed (Optional[Union[ConfigureIndexEmbed, Dict]], optional) – configures the integrated inference embedding settings for the index. You can convert an existing index to an integrated index by specifying the embedding model and field_map. The index vector type and dimension must match the model vector type and dimension, and the index similarity metric must be supported by the model. You can later change the embedding configuration to update the field_map, read_parameters, or write_parameters. Once set, the model cannot be changed.
read_capacity (Optional[Union[ReadCapacityDict, ReadCapacity, ReadCapacityOnDemandSpec, ReadCapacityDedicatedSpec]]) – Optional read capacity configuration for serverless indexes. You can specify
read_capacityto configure dedicated read capacity mode (OnDemand or Dedicated). SeeServerlessSpecdocumentation for details on read capacity configuration. Note that read capacity configuration is only available for serverless indexes.
This method is used to modify an index’s configuration. It can be used to:
Configure read capacity for serverless indexes using
read_capacityScale a pod-based index horizontally using
replicasScale a pod-based index vertically using
pod_typeEnable or disable deletion protection using
deletion_protectionAdd, change, or remove tags using
tags
Configuring read capacity for serverless indexes
To configure read capacity for serverless indexes, pass the
read_capacityparameter to theconfigure_indexmethod. You can configure either OnDemand or Dedicated read capacity mode.import asyncio from pinecone import PineconeAsyncio async def main(): async with PineconeAsyncio() as pc: # Configure to OnDemand read capacity (default) await pc.configure_index( name="my_index", read_capacity={"mode": "OnDemand"} ) # Configure to Dedicated read capacity with manual scaling await pc.configure_index( name="my_index", read_capacity={ "mode": "Dedicated", "dedicated": { "node_type": "t1", "scaling": "Manual", "manual": {"shards": 1, "replicas": 1} } } ) # Verify the configuration was applied desc = await pc.describe_index("my_index") assert desc.spec.serverless.read_capacity.mode == "Dedicated" asyncio.run(main())
Scaling pod-based indexes
To scale your pod-based index, you pass a
replicasand/orpod_typeparam to theconfigure_indexmethod.pod_typemay be a string or a value from thePodTypeenum.import asyncio from pinecone import PineconeAsyncio, PodType async def main(): async with PineconeAsyncio() as pc: await pc.configure_index( name="my_index", replicas=2, pod_type=PodType.P1_X2 ) asyncio.run(main())
After providing these new configurations, you must call
describe_indexto see the status of the index as the changes are applied.Enabling or disabling deletion protection
To enable or disable deletion protection, pass the
deletion_protectionparameter to theconfigure_indexmethod. When deletion protection is enabled, the index cannot be deleted with thedelete_indexmethod.import asyncio from pinecone import PineconeAsyncio, DeletionProtection async def main(): async with PineconeAsyncio() as pc: # Enable deletion protection await pc.configure_index( name="my_index", deletion_protection=DeletionProtection.ENABLED ) # Call describe_index to see the change was applied. desc = await pc.describe_index("my_index") assert desc.deletion_protection == "enabled" # Disable deletion protection await pc.configure_index( name="my_index", deletion_protection=DeletionProtection.DISABLED ) asyncio.run(main())
Adding, changing, or removing tags
To add, change, or remove tags, pass the
tagsparameter to theconfigure_indexmethod. When tags are passed usingconfigure_index, they are merged with any existing tags already on the index. To remove a tag, set the value of the key to an empty string.import asyncio from pinecone import PineconeAsyncio async def main(): async with PineconeAsyncio() as pc: # Add a tag await pc.configure_index(name="my_index", tags={"environment": "staging"}) # Change a tag await pc.configure_index(name="my_index", tags={"environment": "production"}) # Remove a tag await pc.configure_index(name="my_index", tags={"environment": ""}) # Call describe_index to view the tags are changed await pc.describe_index("my_index") print(desc.tags) asyncio.run(main())
- async PineconeAsyncio.delete_index(name: str, timeout: int | None = None) None[source]¶
- Parameters:
name (str) – the name of the index.
timeout (int, optional) – Number of seconds to poll status checking whether the index has been deleted. If None, wait indefinitely; if >=0, time out after this many seconds; if -1, return immediately and do not wait.
Deletes a Pinecone index.
Deleting an index is an irreversible operation. All data in the index will be lost. When you use this command, a request is sent to the Pinecone control plane to delete the index, but the termination is not synchronous because resources take a few moments to be released.
By default the
delete_indexmethod will block until polling of thedescribe_indexmethod shows that the delete operation has completed. If you prefer to return immediately and not wait for the index to be deleted, you can passtimeout=-1to the method.After the delete request is submitted, polling
describe_indexwill show that the index transitions into aTerminatingstate before eventually resulting in a 404 after it has been removed.This operation can fail if the index is configured with
deletion_protection="enabled". In this case, you will need to callconfigure_indexto disable deletion protection before you can delete the index.import asyncio from pinecone import PineconeAsyncio async def main(): pc = PineconeAsyncio() index_name = "my_index" desc = await pc.describe_index(name=index_name) if desc.deletion_protection == "enabled": # If for some reason deletion protection is enabled, you will need to disable it first # before you can delete the index. But use caution as this operation is not reversible # and if somebody enabled deletion protection, they probably had a good reason. await pc.configure_index(name=index_name, deletion_protection="disabled") await pc.delete_index(name=index_name) await pc.close() asyncio.run(main())
- async PineconeAsyncio.has_index(name: str) bool[source]¶
- Parameters:
name – The name of the index to check for existence.
- Returns:
Returns
Trueif the index exists,Falseotherwise.
Checks if a Pinecone index exists.
import asyncio from pinecone import PineconeAsyncio, ServerlessSpec async def main(): async with PineconeAsyncio() as pc: index_name = "my_index" if not await pc.has_index(index_name): print("Index does not exist, creating...") pc.create_index( name=index_name, dimension=768, metric="cosine", spec=ServerlessSpec(cloud="aws", region="us-west-2") ) asyncio.run(main())
Backups¶
- PineconeAsyncio.create_backup(*, index_name: str, backup_name: str, description: str = '') BackupModel[source]¶
Create a backup of an index.
- Parameters:
index_name (str) – The name of the index to backup.
backup_name (str) – The name to give the backup.
description (str) – Optional description of the backup.
- PineconeAsyncio.list_backups(*, index_name: str | None = None, limit: int | None = 10, pagination_token: str | None = None) BackupList[source]¶
List backups.
If index_name is provided, the backups will be filtered by index. If no index_name is provided, all backups in the projectwill be returned.
- Parameters:
index_name (str) – The name of the index to list backups for.
limit (int) – The maximum number of backups to return.
pagination_token (str) – The pagination token to use for pagination.
Collections¶
- async PineconeAsyncio.create_collection(name: str, source: str) None[source]¶
Create a collection from a pod-based index
- Parameters:
name – Name of the collection
source – Name of the source index
- async PineconeAsyncio.list_collections() CollectionList[source]¶
List all collections
import asyncio from pinecone import PineconeAsyncio async def main(): pc = PineconeAsyncio() collections = await pc.list_collections() for collection in collections: print(collection.name) print(collection.source) # You can also iterate specifically over # a list of collection names by calling # the .names() helper. collection_name = "my_collection" collections = await pc.list_collections() if collection_name in collections.names(): print('Collection exists') await pc.close() asyncio.run(main())
- async PineconeAsyncio.describe_collection(name: str) dict[str, Any][source]¶
Describes a collection. :param: The name of the collection :return: Description of the collection
import asyncio from pinecone import PineconeAsyncio async def main(): async with PineconeAsyncio() as pc: description = await pc.describe_collection("my_collection") print(description.name) print(description.source) print(description.status) print(description.size) asyncio.run(main())
- async PineconeAsyncio.delete_collection(name: str) None[source]¶
Describes a collection. :param: The name of the collection :return: Description of the collection
import asyncio from pinecone import PineconeAsyncio async def main(): async with PineconeAsyncio() as pc: description = await pc.describe_collection("my_collection") print(description.name) print(description.source) print(description.status) print(description.size) asyncio.run(main())
Restore Jobs¶
DB Data Plane¶
- pinecone.db_data.IndexAsyncio¶
alias of
_IndexAsyncio
- IndexAsyncio.__init__(api_key: str, host: str, additional_headers: dict[str, str] | None = {}, openapi_config=None, **kwargs) None¶
- IndexAsyncio.describe_index_stats(filter: dict[str, str | int | float | bool] | dict[Literal['$eq'], str | int | float | bool] | dict[Literal['$ne'], str | int | float | bool] | dict[Literal['$gt'], int | float] | dict[Literal['$gte'], int | float] | dict[Literal['$lt'], int | float] | dict[Literal['$lte'], int | float] | dict[Literal['$in'], list[str | int | float | bool]] | dict[Literal['$nin'], list[str | int | float | bool]] | dict[Literal['$exists'], bool] | dict[Literal['$and'], list[dict[str, str | int | float | bool] | dict[Literal['$eq'], str | int | float | bool] | dict[Literal['$ne'], str | int | float | bool] | dict[Literal['$gt'], int | float] | dict[Literal['$gte'], int | float] | dict[Literal['$lt'], int | float] | dict[Literal['$lte'], int | float] | dict[Literal['$in'], list[str | int | float | bool]] | dict[Literal['$nin'], list[str | int | float | bool]] | dict[Literal['$exists'], bool]]] | dict[Literal['$or'], list[dict[str, str | int | float | bool] | dict[Literal['$eq'], str | int | float | bool] | dict[Literal['$ne'], str | int | float | bool] | dict[Literal['$gt'], int | float] | dict[Literal['$gte'], int | float] | dict[Literal['$lt'], int | float] | dict[Literal['$lte'], int | float] | dict[Literal['$in'], list[str | int | float | bool]] | dict[Literal['$nin'], list[str | int | float | bool]] | dict[Literal['$exists'], bool]]] | None = None, **kwargs) IndexDescription¶
The DescribeIndexStats operation returns statistics about the index’s contents. For example: The vector count per namespace and the number of dimensions.
- Parameters:
filter (dict[str, Union[str, float, int, bool, List, dict]])
present (If this parameter is)
filter. (the operation only returns statistics for vectors that satisfy the)
<https (See `metadata filtering) –
//www.pinecone.io/docs/metadata-filtering/>_` [optional]
Returns: DescribeIndexStatsResponse object which contains stats about the index.
import asyncio from pinecone import Pinecone, Vector, SparseValues async def main(): pc = Pinecone() async with pc.IndexAsyncio(host="example-dojoi3u.svc.aped-4627-b74a.pinecone.io") as idx: print(await idx.describe_index_stats()) asyncio.run(main())
Vectors¶
- IndexAsyncio.upsert(vectors: list[Vector] | list[tuple[str, list[float]]] | list[tuple[str, list[float], dict[str, str | int | float | list[str] | list[int] | list[float]]]] | list[VectorTypedDict], namespace: str | None = None, batch_size: int | None = None, show_progress: bool = True, **kwargs) UpsertResponse¶
- Parameters:
vectors (Union[list[Vector], list[VectorTuple], list[VectorTupleWithMetadata], list[VectorTypedDict]]) – A list of vectors to upsert.
namespace (str) – The namespace to write to. If not specified, the default namespace is used. [optional]
batch_size (int) – The number of vectors to upsert in each batch. If not specified, all vectors will be upserted in a single batch. [optional]
show_progress (bool) – Whether to show a progress bar using tqdm. Applied only if batch_size is provided. Default is True.
- Returns:
UpsertResponse, includes the number of vectors upserted.
The upsert operation writes vectors into a namespace. If a new value is upserted for an existing vector id, it will overwrite the previous value.
To upsert in parallel follow this link.
Upserting dense vectors
Note
The dimension of each dense vector must match the dimension of the index.
A vector can be represented in a variety of ways.
import asyncio from pinecone import Pinecone, Vector async def main(): pc = Pinecone() async with pc.IndexAsyncio(host="example-dojoi3u.svc.aped-4627-b74a.pinecone.io") as idx: # A Vector object await idx.upsert( namespace = 'my-namespace', vectors = [ Vector(id='id1', values=[0.1, 0.2, 0.3, 0.4], metadata={'metadata_key': 'metadata_value'}), ] ) # A vector tuple await idx.upsert( namespace = 'my-namespace', vectors = [ ('id1', [0.1, 0.2, 0.3, 0.4]), ] ) # A vector tuple with metadata await idx.upsert( namespace = 'my-namespace', vectors = [ ('id1', [0.1, 0.2, 0.3, 0.4], {'metadata_key': 'metadata_value'}), ] ) # A vector dictionary await idx.upsert( namespace = 'my-namespace', vectors = [ {"id": 1, "values": [0.1, 0.2, 0.3, 0.4], "metadata": {"metadata_key": "metadata_value"}}, ] asyncio.run(main())
Upserting sparse vectors
import asyncio from pinecone import Pinecone, Vector, SparseValues async def main(): pc = Pinecone() async with pc.IndexAsyncio(host="example-dojoi3u.svc.aped-4627-b74a.pinecone.io") as idx: # A Vector object await idx.upsert( namespace = 'my-namespace', vectors = [ Vector(id='id1', sparse_values=SparseValues(indices=[1, 2], values=[0.2, 0.4])), ] ) # A dictionary await idx.upsert( namespace = 'my-namespace', vectors = [ {"id": 1, "sparse_values": {"indices": [1, 2], "values": [0.2, 0.4]}}, ] ) asyncio.run(main())
Batch upsert
If you have a large number of vectors, you can upsert them in batches.
import asyncio from pinecone import Pinecone, Vector, SparseValues async def main(): pc = Pinecone() async with pc.IndexAsyncio(host="example-dojoi3u.svc.aped-4627-b74a.pinecone.io") as idx: await idx.upsert( namespace = 'my-namespace', vectors = [ {'id': 'id1', 'values': [0.1, 0.2, 0.3, 0.4]}, {'id': 'id2', 'values': [0.2, 0.3, 0.4, 0.5]}, {'id': 'id3', 'values': [0.3, 0.4, 0.5, 0.6]}, {'id': 'id4', 'values': [0.4, 0.5, 0.6, 0.7]}, {'id': 'id5', 'values': [0.5, 0.6, 0.7, 0.8]}, # More vectors here ], batch_size = 50 ) asyncio.run(main())
Visual progress bar with tqdm
To see a progress bar when upserting in batches, you will need to separately install the tqdm package. If tqdm is present, the client will detect and use it to display progress when show_progress=True.
- IndexAsyncio.query(*args, top_k: int, vector: list[float] | None = None, id: str | None = None, namespace: str | None = None, filter: dict[str, str | int | float | bool] | dict[Literal['$eq'], str | int | float | bool] | dict[Literal['$ne'], str | int | float | bool] | dict[Literal['$gt'], int | float] | dict[Literal['$gte'], int | float] | dict[Literal['$lt'], int | float] | dict[Literal['$lte'], int | float] | dict[Literal['$in'], list[str | int | float | bool]] | dict[Literal['$nin'], list[str | int | float | bool]] | dict[Literal['$exists'], bool] | dict[Literal['$and'], list[dict[str, str | int | float | bool] | dict[Literal['$eq'], str | int | float | bool] | dict[Literal['$ne'], str | int | float | bool] | dict[Literal['$gt'], int | float] | dict[Literal['$gte'], int | float] | dict[Literal['$lt'], int | float] | dict[Literal['$lte'], int | float] | dict[Literal['$in'], list[str | int | float | bool]] | dict[Literal['$nin'], list[str | int | float | bool]] | dict[Literal['$exists'], bool]]] | dict[Literal['$or'], list[dict[str, str | int | float | bool] | dict[Literal['$eq'], str | int | float | bool] | dict[Literal['$ne'], str | int | float | bool] | dict[Literal['$gt'], int | float] | dict[Literal['$gte'], int | float] | dict[Literal['$lt'], int | float] | dict[Literal['$lte'], int | float] | dict[Literal['$in'], list[str | int | float | bool]] | dict[Literal['$nin'], list[str | int | float | bool]] | dict[Literal['$exists'], bool]]] | None = None, include_values: bool | None = None, include_metadata: bool | None = None, sparse_vector: SparseValues | SparseVectorTypedDict | None = None, **kwargs) QueryResponse¶
The Query operation searches a namespace, using a query vector. It retrieves the ids of the most similar items in a namespace, along with their similarity scores.
Querying with dense vectors
import asyncio from pinecone import Pinecone, Vector, SparseValues async def main(): pc = Pinecone() async with pc.IndexAsyncio(host="example-dojoi3u.svc.aped-4627-b74a.pinecone.io") as idx: query_embedding = [0.1, 0.2, 0.3, ...] # An embedding that matches the index dimension # Query by vector values results = await idx.query( vector=query_embedding, top_k=10, filter={'genre': {"$eq": "drama"}}, # Optionally filter by metadata namespace='my_namespace', include_values=False, include_metadata=True ) # Query using vector id (the values from this stored vector will be used to query) results = await idx.query( id='1', top_k=10, filter={"year": {"$gt": 2000}}, namespace='my_namespace', ) asyncio.run(main())
Query with sparse vectors
import asyncio from pinecone import Pinecone, Vector, SparseValues async def main(): pc = Pinecone() async with pc.IndexAsyncio(host="example-dojoi3u.svc.aped-4627-b74a.pinecone.io") as idx: query_embedding = [0.1, 0.2, 0.3, ...] # An embedding that matches the index dimension # Query by vector values results = await idx.query( vector=query_embedding, top_k=10, filter={'genre': {"$eq": "drama"}}, # Optionally filter by metadata namespace='my_namespace', include_values=False, include_metadata=True ) # Query using vector id (the values from this stored vector will be used to query) results = await idx.query( id='1', top_k=10, filter={"year": {"$gt": 2000}}, namespace='my_namespace', ) asyncio.run(main())
Examples:
>>> index.query(vector=[1, 2, 3], top_k=10, namespace='my_namespace') >>> index.query(id='id1', top_k=10, namespace='my_namespace') >>> index.query(vector=[1, 2, 3], top_k=10, namespace='my_namespace', filter={'key': 'value'}) >>> index.query(id='id1', top_k=10, namespace='my_namespace', include_metadata=True, include_values=True) >>> index.query(vector=[1, 2, 3], sparse_vector={'indices': [1, 2], 'values': [0.2, 0.4]}, >>> top_k=10, namespace='my_namespace') >>> index.query(vector=[1, 2, 3], sparse_vector=SparseValues([1, 2], [0.2, 0.4]), >>> top_k=10, namespace='my_namespace')
- Parameters:
vector (list[float]) – The query vector. This should be the same length as the dimension of the index being queried. Each query() request can contain only one of the parameters id or vector.. [optional]
id (str) – The unique ID of the vector to be used as a query vector. Each query() request can contain only one of the parameters vector or id. [optional]
top_k (int) – The number of results to return for each query. Must be an integer greater than 1.
namespace (str) – The namespace to fetch vectors from. If not specified, the default namespace is used. [optional]
filter (dict[str, Union[str, float, int, bool, List, dict]) – The filter to apply. You can use vector metadata to limit your search. See metadata filtering <https://www.pinecone.io/docs/metadata-filtering/>_ [optional]
include_values (bool) – Indicates whether vector values are included in the response. If omitted the server will use the default value of False [optional]
include_metadata (bool) – Indicates whether metadata is included in the response as well as the ids. If omitted the server will use the default value of False [optional]
sparse_vector –
(Union[SparseValues, dict[str, Union[list[float], list[int]]]]): sparse values of the query vector. Expected to be either a SparseValues object or a dict of the form:
{‘indices’: list[int], ‘values’: list[float]}, where the lists each have the same length.
- Returns: QueryResponse object which contains the list of the closest vectors as ScoredVector objects,
and namespace name.
- IndexAsyncio.query_namespaces(namespaces: list[str], metric: Literal['cosine', 'euclidean', 'dotproduct'], top_k: int | None = None, filter: dict[str, str | int | float | bool] | dict[Literal['$eq'], str | int | float | bool] | dict[Literal['$ne'], str | int | float | bool] | dict[Literal['$gt'], int | float] | dict[Literal['$gte'], int | float] | dict[Literal['$lt'], int | float] | dict[Literal['$lte'], int | float] | dict[Literal['$in'], list[str | int | float | bool]] | dict[Literal['$nin'], list[str | int | float | bool]] | dict[Literal['$exists'], bool] | dict[Literal['$and'], list[dict[str, str | int | float | bool] | dict[Literal['$eq'], str | int | float | bool] | dict[Literal['$ne'], str | int | float | bool] | dict[Literal['$gt'], int | float] | dict[Literal['$gte'], int | float] | dict[Literal['$lt'], int | float] | dict[Literal['$lte'], int | float] | dict[Literal['$in'], list[str | int | float | bool]] | dict[Literal['$nin'], list[str | int | float | bool]] | dict[Literal['$exists'], bool]]] | dict[Literal['$or'], list[dict[str, str | int | float | bool] | dict[Literal['$eq'], str | int | float | bool] | dict[Literal['$ne'], str | int | float | bool] | dict[Literal['$gt'], int | float] | dict[Literal['$gte'], int | float] | dict[Literal['$lt'], int | float] | dict[Literal['$lte'], int | float] | dict[Literal['$in'], list[str | int | float | bool]] | dict[Literal['$nin'], list[str | int | float | bool]] | dict[Literal['$exists'], bool]]] | None = None, include_values: bool | None = None, include_metadata: bool | None = None, vector: list[float] | None = None, sparse_vector: SparseValues | SparseVectorTypedDict | None = None, **kwargs) QueryNamespacesResults¶
The query_namespaces() method is used to make a query to multiple namespaces in parallel and combine the results into one result set.
- Parameters:
vector (list[float]) – The query vector, must be the same length as the dimension of the index being queried.
namespaces (list[str]) – The list of namespaces to query.
top_k (Optional[int], optional) – The number of results you would like to request from each namespace. Defaults to 10.
filter (Optional[dict[str, Union[str, float, int, bool, List, dict]]], optional) – Pass an optional filter to filter results based on metadata. Defaults to None.
include_values (Optional[bool], optional) – Boolean field indicating whether vector values should be included with results. Defaults to None.
include_metadata (Optional[bool], optional) – Boolean field indicating whether vector metadata should be included with results. Defaults to None.
sparse_vector (Optional[ Union[SparseValues, dict[str, Union[list[float], list[int]]]] ], optional) – If you are working with a dotproduct index, you can pass a sparse vector as part of your hybrid search. Defaults to None.
- Returns:
A QueryNamespacesResults object containing the combined results from all namespaces, as well as the combined usage cost in read units.
- Return type:
QueryNamespacesResults
Examples:
import asyncio from pinecone import Pinecone async def main(): pc = Pinecone(api_key="your-api-key") idx = pc.IndexAsyncio( host="example-dojoi3u.svc.aped-4627-b74a.pinecone.io", ) query_vec = [0.1, 0.2, 0.3] # An embedding that matches the index dimension combined_results = await idx.query_namespaces( vector=query_vec, namespaces=['ns1', 'ns2', 'ns3', 'ns4'], top_k=10, filter={'genre': {"$eq": "drama"}}, include_values=True, include_metadata=True ) for vec in combined_results.matches: print(vec.id, vec.score) print(combined_results.usage) await idx.close() asyncio.run(main())
- IndexAsyncio.delete(ids: list[str] | None = None, delete_all: bool | None = None, namespace: str | None = None, filter: dict[str, str | int | float | bool] | dict[Literal['$eq'], str | int | float | bool] | dict[Literal['$ne'], str | int | float | bool] | dict[Literal['$gt'], int | float] | dict[Literal['$gte'], int | float] | dict[Literal['$lt'], int | float] | dict[Literal['$lte'], int | float] | dict[Literal['$in'], list[str | int | float | bool]] | dict[Literal['$nin'], list[str | int | float | bool]] | dict[Literal['$exists'], bool] | dict[Literal['$and'], list[dict[str, str | int | float | bool] | dict[Literal['$eq'], str | int | float | bool] | dict[Literal['$ne'], str | int | float | bool] | dict[Literal['$gt'], int | float] | dict[Literal['$gte'], int | float] | dict[Literal['$lt'], int | float] | dict[Literal['$lte'], int | float] | dict[Literal['$in'], list[str | int | float | bool]] | dict[Literal['$nin'], list[str | int | float | bool]] | dict[Literal['$exists'], bool]]] | dict[Literal['$or'], list[dict[str, str | int | float | bool] | dict[Literal['$eq'], str | int | float | bool] | dict[Literal['$ne'], str | int | float | bool] | dict[Literal['$gt'], int | float] | dict[Literal['$gte'], int | float] | dict[Literal['$lt'], int | float] | dict[Literal['$lte'], int | float] | dict[Literal['$in'], list[str | int | float | bool]] | dict[Literal['$nin'], list[str | int | float | bool]] | dict[Literal['$exists'], bool]]] | None = None, **kwargs) dict[str, Any]¶
- Parameters:
ids (list[str]) – Vector ids to delete [optional]
delete_all (bool) – This indicates that all vectors in the index namespace should be deleted.. [optional] Default is False.
namespace (str) – The namespace to delete vectors from [optional] If not specified, the default namespace is used.
filter (dict[str, Union[str, float, int, bool, List, dict]]) – If specified, the metadata filter here will be used to select the vectors to delete. This is mutually exclusive with specifying ids to delete in the ids param or using delete_all=True. See metadata filtering <https://www.pinecone.io/docs/metadata-filtering/>_ [optional]
The Delete operation deletes vectors from the index, from a single namespace.
No error is raised if the vector id does not exist.
Note: For any delete call, if namespace is not specified, the default namespace “” is used. Since the delete operation does not error when ids are not present, this means you may not receive an error if you delete from the wrong namespace.
Delete can occur in the following mutual exclusive ways:
Delete by ids from a single namespace
Delete all vectors from a single namespace by setting delete_all to True
- Delete all vectors from a single namespace by specifying a metadata filter
(note that for this option delete all must be set to False)
import asyncio from pinecone import Pinecone, Vector, SparseValues async def main(): pc = Pinecone() async with pc.IndexAsyncio(host="example-dojoi3u.svc.aped-4627-b74a.pinecone.io") as idx: # Delete specific ids await idx.delete( ids=['id1', 'id2'], namespace='my_namespace' ) # Delete everything in a namespace await idx.delete( delete_all=True, namespace='my_namespace' ) # Delete by metadata filter await idx.delete( filter={'key': 'value'}, namespace='my_namespace' ) asyncio.run(main())
Returns: An empty dictionary if the delete operation was successful.
- IndexAsyncio.fetch(ids: list[str], namespace: str | None = None, **kwargs) FetchResponse¶
The fetch operation looks up and returns vectors, by ID, from a single namespace. The returned vectors include the vector data and/or metadata.
import asyncio from pinecone import Pinecone, Vector, SparseValues async def main(): pc = Pinecone() async with pc.IndexAsyncio(host="example-dojoi3u.svc.aped-4627-b74a.pinecone.io") as idx: # Fetch specific ids in namespace fetched = await idx.fetch( ids=['id1', 'id2'], namespace='my_namespace' ) for vec_id in fetched.vectors: vector = fetched.vectors[vec_id] print(vector.id) print(vector.metadata) print(vector.values) asyncio.run(main())
- Parameters:
ids (list[str]) – The vector IDs to fetch.
namespace (str) – The namespace to fetch vectors from. If not specified, the default namespace is used. [optional]
Returns: FetchResponse object which contains the list of Vector objects, and namespace name.
- IndexAsyncio.list(**kwargs) AsyncIterator[list[str]]¶
The list operation accepts all of the same arguments as list_paginated, and returns a generator that yields a list of the matching vector ids in each page of results. It automatically handles pagination tokens on your behalf.
Examples
>>> for ids in index.list(prefix='99', limit=5, namespace='my_namespace'): >>> print(ids) ['99', '990', '991', '992', '993'] ['994', '995', '996', '997', '998'] ['999']
- Parameters:
prefix (Optional[str]) – The id prefix to match. If unspecified, an empty string prefix will be used with the effect of listing all ids in a namespace [optional]
limit (Optional[int]) – The maximum number of ids to return. If unspecified, the server will use a default value. [optional]
pagination_token (Optional[str]) – A token needed to fetch the next page of results. This token is returned in the response if additional results are available. [optional]
namespace (Optional[str]) – The namespace to fetch vectors from. If not specified, the default namespace is used. [optional]
- IndexAsyncio.list_paginated(prefix: str | None = None, limit: int | None = None, pagination_token: str | None = None, namespace: str | None = None, **kwargs) ListResponse¶
The list_paginated operation finds vectors based on an id prefix within a single namespace. It returns matching ids in a paginated form, with a pagination token to fetch the next page of results. This id list can then be passed to fetch or delete operations, depending on your use case.
Consider using the list method to avoid having to handle pagination tokens manually.
Examples
>>> results = index.list_paginated(prefix='99', limit=5, namespace='my_namespace') >>> [v.id for v in results.vectors] ['99', '990', '991', '992', '993'] >>> results.pagination.next eyJza2lwX3Bhc3QiOiI5OTMiLCJwcmVmaXgiOiI5OSJ9 >>> next_results = index.list_paginated(prefix='99', limit=5, namespace='my_namespace', pagination_token=results.pagination.next)
- Parameters:
prefix (Optional[str]) – The id prefix to match. If unspecified, an empty string prefix will be used with the effect of listing all ids in a namespace [optional]
limit (Optional[int]) – The maximum number of ids to return. If unspecified, the server will use a default value. [optional]
pagination_token (Optional[str]) – A token needed to fetch the next page of results. This token is returned in the response if additional results are available. [optional]
namespace (Optional[str]) – The namespace to fetch vectors from. If not specified, the default namespace is used. [optional]
Returns: ListResponse object which contains the list of ids, the namespace name, pagination information, and usage showing the number of read_units consumed.
- IndexAsyncio.fetch_by_metadata(filter: dict[str, str | int | float | bool] | dict[Literal['$eq'], str | int | float | bool] | dict[Literal['$ne'], str | int | float | bool] | dict[Literal['$gt'], int | float] | dict[Literal['$gte'], int | float] | dict[Literal['$lt'], int | float] | dict[Literal['$lte'], int | float] | dict[Literal['$in'], list[str | int | float | bool]] | dict[Literal['$nin'], list[str | int | float | bool]] | dict[Literal['$exists'], bool] | dict[Literal['$and'], list[dict[str, str | int | float | bool] | dict[Literal['$eq'], str | int | float | bool] | dict[Literal['$ne'], str | int | float | bool] | dict[Literal['$gt'], int | float] | dict[Literal['$gte'], int | float] | dict[Literal['$lt'], int | float] | dict[Literal['$lte'], int | float] | dict[Literal['$in'], list[str | int | float | bool]] | dict[Literal['$nin'], list[str | int | float | bool]] | dict[Literal['$exists'], bool]]] | dict[Literal['$or'], list[dict[str, str | int | float | bool] | dict[Literal['$eq'], str | int | float | bool] | dict[Literal['$ne'], str | int | float | bool] | dict[Literal['$gt'], int | float] | dict[Literal['$gte'], int | float] | dict[Literal['$lt'], int | float] | dict[Literal['$lte'], int | float] | dict[Literal['$in'], list[str | int | float | bool]] | dict[Literal['$nin'], list[str | int | float | bool]] | dict[Literal['$exists'], bool]]], namespace: str | None = None, limit: int | None = None, pagination_token: str | None = None, **kwargs) FetchByMetadataResponse¶
Fetch vectors by metadata filter.
Look up and return vectors by metadata filter from a single namespace. The returned vectors include the vector data and/or metadata.
Examples:
import asyncio from pinecone import Pinecone async def main(): pc = Pinecone() async with pc.IndexAsyncio(host="example-host") as idx: result = await idx.fetch_by_metadata( filter={'genre': {'$in': ['comedy', 'drama']}, 'year': {'$eq': 2019}}, namespace='my_namespace', limit=50 ) for vec_id in result.vectors: vector = result.vectors[vec_id] print(vector.id) print(vector.metadata) asyncio.run(main())
- Parameters:
filter (dict[str, str | float | int | bool | List | dict]) – Metadata filter expression to select vectors. See metadata filtering <https://www.pinecone.io/docs/metadata-filtering/>_
namespace (str) – The namespace to fetch vectors from. If not specified, the default namespace is used. [optional]
limit (int) – Max number of vectors to return. Defaults to 100. [optional]
pagination_token (str) – Pagination token to continue a previous listing operation. [optional]
- Returns:
Object containing the fetched vectors, namespace, usage, and pagination token.
- Return type:
FetchByMetadataResponse
- IndexAsyncio.update(id: str | None = None, values: list[float] | None = None, set_metadata: dict[str, str | int | float | list[str] | list[int] | list[float]] | None = None, namespace: str | None = None, sparse_values: SparseValues | SparseVectorTypedDict | None = None, filter: dict[str, str | int | float | bool] | dict[Literal['$eq'], str | int | float | bool] | dict[Literal['$ne'], str | int | float | bool] | dict[Literal['$gt'], int | float] | dict[Literal['$gte'], int | float] | dict[Literal['$lt'], int | float] | dict[Literal['$lte'], int | float] | dict[Literal['$in'], list[str | int | float | bool]] | dict[Literal['$nin'], list[str | int | float | bool]] | dict[Literal['$exists'], bool] | dict[Literal['$and'], list[dict[str, str | int | float | bool] | dict[Literal['$eq'], str | int | float | bool] | dict[Literal['$ne'], str | int | float | bool] | dict[Literal['$gt'], int | float] | dict[Literal['$gte'], int | float] | dict[Literal['$lt'], int | float] | dict[Literal['$lte'], int | float] | dict[Literal['$in'], list[str | int | float | bool]] | dict[Literal['$nin'], list[str | int | float | bool]] | dict[Literal['$exists'], bool]]] | dict[Literal['$or'], list[dict[str, str | int | float | bool] | dict[Literal['$eq'], str | int | float | bool] | dict[Literal['$ne'], str | int | float | bool] | dict[Literal['$gt'], int | float] | dict[Literal['$gte'], int | float] | dict[Literal['$lt'], int | float] | dict[Literal['$lte'], int | float] | dict[Literal['$in'], list[str | int | float | bool]] | dict[Literal['$nin'], list[str | int | float | bool]] | dict[Literal['$exists'], bool]]] | None = None, dry_run: bool | None = None, **kwargs) UpdateResponse¶
The Update operation updates vectors in a namespace.
This method supports two update modes:
Single vector update by ID: Provide id to update a specific vector. - Updates the vector with the given ID - If values is included, it will overwrite the previous vector values - If set_metadata is included, the metadata will be merged with existing metadata on the vector.
Fields specified in set_metadata will overwrite existing fields with the same key, while fields not in set_metadata will remain unchanged.
Bulk update by metadata filter: Provide filter to update all vectors matching the filter criteria. - Updates all vectors in the namespace that match the filter expression - Useful for updating metadata across multiple vectors at once - If set_metadata is included, the metadata will be merged with existing metadata on each vector.
Fields specified in set_metadata will overwrite existing fields with the same key, while fields not in set_metadata will remain unchanged.
The response includes matched_records indicating how many vectors were updated
Either id or filter must be provided (but not both in the same call).
Examples:
Single vector update by ID:
import asyncio from pinecone import Pinecone, Vector, SparseValues async def main(): pc = Pinecone() async with pc.IndexAsyncio(host="example-dojoi3u.svc.aped-4627-b74a.pinecone.io") as idx: # Update vector values await idx.update( id='id1', values=[0.1, 0.2, 0.3, ...], namespace='my_namespace' ) # Update metadata await idx.update( id='id1', set_metadata={'key': 'value'}, namespace='my_namespace' ) # Update sparse values await idx.update( id='id1', sparse_values={'indices': [1, 2], 'values': [0.2, 0.4]}, namespace='my_namespace' ) # Update sparse values with SparseValues object await idx.update( id='id1', sparse_values=SparseValues(indices=[234781, 5432], values=[0.2, 0.4]), namespace='my_namespace' )
Bulk update by metadata filter:
# Update metadata for all vectors matching the filter response = await idx.update( set_metadata={'status': 'active'}, filter={'genre': {'$eq': 'drama'}}, namespace='my_namespace' ) print(f"Updated {response.matched_records} vectors") # Preview how many vectors would be updated (dry run) response = await idx.update( set_metadata={'status': 'active'}, filter={'genre': {'$eq': 'drama'}}, namespace='my_namespace', dry_run=True ) print(f"Would update {response.matched_records} vectors") asyncio.run(main())
- Parameters:
id (str) – Vector’s unique id. Required for single vector updates. Must not be provided when using filter. [optional]
values (list[float]) – Vector values to set. [optional]
set_metadata (dict[str, Union[str, float, int, bool, list[int], list[float], list[str]]]]) – Metadata to merge with existing metadata on the vector(s). Fields specified will overwrite existing fields with the same key, while fields not specified will remain unchanged. [optional]
namespace (str) – Namespace name where to update the vector(s). [optional]
sparse_values – (dict[str, Union[list[float], list[int]]]): Sparse values to update for the vector. Expected to be either a SparseValues object or a dict of the form: {‘indices’: list[int], ‘values’: list[float]} where the lists each have the same length. [optional]
filter (dict[str, Union[str, float, int, bool, List, dict]]) – A metadata filter expression. When provided, updates all vectors in the namespace that match the filter criteria. See metadata filtering <https://www.pinecone.io/docs/metadata-filtering/>_. Must not be provided when using id. Either id or filter must be provided. [optional]
dry_run (bool) – If True, return the number of records that match the filter without executing the update. Only meaningful when using filter (not with id). Useful for previewing the impact of a bulk update before applying changes. Defaults to False. [optional]
- Returns:
An UpdateResponse object. When using filter-based updates, the response includes matched_records indicating the number of vectors that were updated (or would be updated if dry_run=True).
- Return type:
UpdateResponse
- IndexAsyncio.upsert_from_dataframe(df, namespace: str | None = None, batch_size: int = 500, show_progress: bool = True)¶
This method has not been implemented yet for the IndexAsyncio class.
Bulk Import¶
- async IndexAsyncio.start_import(uri: str, integration_id: str | None = None, error_mode: Literal['CONTINUE', 'ABORT'] | None = 'CONTINUE') StartImportResponse¶
- Parameters:
uri (str) – The URI of the data to import. The URI must start with the scheme of a supported storage provider.
integration_id (Optional[str], optional) – If your bucket requires authentication to access, you need to pass the id of your storage integration using this property. Defaults to None.
error_mode – Defaults to “CONTINUE”. If set to “CONTINUE”, the import operation will continue even if some records fail to import. Pass “ABORT” to stop the import operation if any records fail to import.
- Returns:
Contains the id of the import operation.
- Return type:
StartImportResponse
Import data from a storage provider into an index. The uri must start with the scheme of a supported storage provider. For buckets that are not publicly readable, you will also need to separately configure a storage integration and pass the integration id.
Examples
>>> from pinecone import Pinecone >>> index = Pinecone().IndexAsyncio(host="example-index.svc.aped-4627-b74a.pinecone.io") >>> await index.start_import(uri="s3://bucket-name/path/to/data.parquet") { id: "1" }
- async IndexAsyncio.list_imports(**kwargs) AsyncIterator['ImportModel']¶
- Parameters:
limit (Optional[int]) – The maximum number of operations to fetch in each network call. If unspecified, the server will use a default value. [optional]
pagination_token (Optional[str]) – When there are multiple pages of results, a pagination token is returned in the response. The token can be used to fetch the next page of results. [optional]
Returns an async generator that yields each import operation. It automatically handles pagination tokens on your behalf so you can easily iterate over all results. The list_imports method accepts all of the same arguments as list_imports_paginated
```python async for op in index.list_imports():
print(op)
- async IndexAsyncio.list_imports_paginated(limit: int | None = None, pagination_token: str | None = None, **kwargs) ListImportsResponse¶
- Parameters:
limit (Optional[int]) – The maximum number of ids to return. If unspecified, the server will use a default value. [optional]
pagination_token (Optional[str]) – A token needed to fetch the next page of results. This token is returned in the response if additional results are available. [optional]
- Returns:
- ListImportsResponse object which contains the list of operations as ImportModel objects, pagination information,
and usage showing the number of read_units consumed.
The list_imports_paginated operation returns information about import operations. It returns operations in a paginated form, with a pagination token to fetch the next page of results.
Consider using the list_imports method to avoid having to handle pagination tokens manually.
Examples
>>> results = await index.list_imports_paginated(limit=5) >>> results.pagination.next eyJza2lwX3Bhc3QiOiI5OTMiLCJwcmVmaXgiOiI5OSJ9 >>> results.data[0] { "id": "6", "uri": "s3://dev-bulk-import-datasets-pub/10-records-dim-10/", "status": "Completed", "percent_complete": 100.0, "records_imported": 10, "created_at": "2024-09-06T14:52:02.567776+00:00", "finished_at": "2024-09-06T14:52:28.130717+00:00" } >>> next_results = await index.list_imports_paginated(limit=5, pagination_token=results.pagination.next)
- async IndexAsyncio.describe_import(id: str) ImportModel¶
- Parameters:
id (str) – The id of the import operation. This value is returned when
import (starting an)
list_imports. (and can be looked up using)
- Returns:
An object containing operation id, status, and other details.
- Return type:
ImportModel
describe_import is used to get detailed information about a specific import operation.
- async IndexAsyncio.cancel_import(id: str)¶
Cancel an import operation.
- Parameters:
id (str) – The id of the import operation to cancel.
Records¶
If you have created an index using integrated inference, you can use the following methods to search and retrieve records.
- async IndexAsyncio.upsert_records(namespace: str, records: List[Dict]) UpsertResponse¶
- Parameters:
namespace (str, required) – The namespace of the index to upsert records to.
records (list[Dict], required) – The records to upsert into the index.
Upsert records to a namespace. A record is a dictionary that contains eitiher an id or _id field along with other fields that will be stored as metadata. The id or _id field is used as the unique identifier for the record. At least one field in the record should correspond to a field mapping in the index’s embed configuration.
When records are upserted, Pinecone converts mapped fields into embeddings and upserts them into the specified namespacce of the index.
import asyncio from pinecone import ( Pinecone, CloudProvider, AwsRegion, EmbedModel IndexEmbed ) async def main(): pc = Pinecone() async with pc.IndexAsyncio(host="example-dojoi3u.svc.aped-4627-b74a.pinecone.io") as idx: # upsert records await idx.upsert_records( namespace="my-namespace", records=[ { "_id": "test1", "my_text_field": "Apple is a popular fruit known for its sweetness and crisp texture.", }, { "_id": "test2", "my_text_field": "The tech company Apple is known for its innovative products like the iPhone.", }, { "_id": "test3", "my_text_field": "Many people enjoy eating apples as a healthy snack.", }, { "_id": "test4", "my_text_field": "Apple Inc. has revolutionized the tech industry with its sleek designs and user-friendly interfaces.", }, { "_id": "test5", "my_text_field": "An apple a day keeps the doctor away, as the saying goes.", }, { "_id": "test6", "my_text_field": "Apple Computer Company was founded on April 1, 1976, by Steve Jobs, Steve Wozniak, and Ronald Wayne as a partnership.", }, ], ) from pinecone import SearchQuery, SearchRerank, RerankModel # search for similar records response = await idx.search_records( namespace="my-namespace", query=SearchQuery( inputs={ "text": "Apple corporation", }, top_k=3, ), rerank=SearchRerank( model=RerankModel.Bge_Reranker_V2_M3, rank_fields=["my_text_field"], top_n=3, ), ) asyncio.run(main())
- async IndexAsyncio.search(namespace: str, query: SearchQueryTypedDict | SearchQuery, rerank: SearchRerankTypedDict | SearchRerank | None = None, fields: List[str] | None = ['*']) SearchRecordsResponse¶
- Parameters:
namespace (str, required) – The namespace in the index to search.
query (Union[Dict, SearchQuery], required) – The SearchQuery to use for the search. The query can include a
match_termsfield to specify which terms must be present in the text of each search hit. The match_terms should be a dict withstrategy(str) andterms(list[str]) keys, e.g.{"strategy": "all", "terms": ["term1", "term2"]}. Currently only “all” strategy is supported, which means all specified terms must be present. Note: match_terms is only supported for sparse indexes with integrated embedding configured to use the pinecone-sparse-english-v0 model.rerank (Union[Dict, SearchRerank], optional) – The SearchRerank to use with the search request.
- Returns:
The records that match the search.
Search for records.
This operation converts a query to a vector embedding and then searches a namespace. You can optionally provide a reranking operation as part of the search.
import asyncio from pinecone import ( Pinecone, CloudProvider, AwsRegion, EmbedModel IndexEmbed ) async def main(): pc = Pinecone() async with pc.IndexAsyncio(host="example-dojoi3u.svc.aped-4627-b74a.pinecone.io") as idx: # upsert records await idx.upsert_records( namespace="my-namespace", records=[ { "_id": "test1", "my_text_field": "Apple is a popular fruit known for its sweetness and crisp texture.", }, { "_id": "test2", "my_text_field": "The tech company Apple is known for its innovative products like the iPhone.", }, { "_id": "test3", "my_text_field": "Many people enjoy eating apples as a healthy snack.", }, { "_id": "test4", "my_text_field": "Apple Inc. has revolutionized the tech industry with its sleek designs and user-friendly interfaces.", }, { "_id": "test5", "my_text_field": "An apple a day keeps the doctor away, as the saying goes.", }, { "_id": "test6", "my_text_field": "Apple Computer Company was founded on April 1, 1976, by Steve Jobs, Steve Wozniak, and Ronald Wayne as a partnership.", }, ], ) from pinecone import SearchQuery, SearchRerank, RerankModel # search for similar records response = await idx.search_records( namespace="my-namespace", query=SearchQuery( inputs={ "text": "Apple corporation", }, top_k=3, ), rerank=SearchRerank( model=RerankModel.Bge_Reranker_V2_M3, rank_fields=["my_text_field"], top_n=3, ), ) asyncio.run(main())
- async IndexAsyncio.search_records(namespace: str, query: SearchQueryTypedDict | SearchQuery, rerank: SearchRerankTypedDict | SearchRerank | None = None, fields: List[str] | None = ['*']) SearchRecordsResponse¶
Alias of the search() method.
Namespaces¶
- IndexAsyncio.create_namespace(name: str, schema: dict[str, Any] | None = None, **kwargs) NamespaceDescription¶
Create a namespace in a serverless index.
- Parameters:
name (str) – The name of the namespace to create
schema (Optional[dict[str, Any]]) – Optional schema configuration for the namespace as a dictionary. [optional]
- Returns:
Information about the created namespace including vector count
- Return type:
NamespaceDescription
Create a namespace in a serverless index. For guidance and examples, see Manage namespaces.
Note: This operation is not supported for pod-based indexes.
Examples
>>> # Create a namespace with just a name >>> import asyncio >>> from pinecone import Pinecone >>> >>> async def main(): ... pc = Pinecone() ... async with pc.IndexAsyncio(host="example-index-dojoi3u.svc.eu-west1-gcp.pinecone.io") as idx: ... namespace = await idx.create_namespace(name="my-namespace") ... print(f"Created namespace: {namespace.name}, Vector count: {namespace.vector_count}") >>> >>> asyncio.run(main()) >>> # Create a namespace with schema configuration >>> from pinecone.core.openapi.db_data.model.create_namespace_request_schema import CreateNamespaceRequestSchema >>> schema = CreateNamespaceRequestSchema(fields={...}) >>> namespace = await idx.create_namespace(name="my-namespace", schema=schema)
- IndexAsyncio.describe_namespace(namespace: str, **kwargs) NamespaceDescription¶
Describe a namespace within an index, showing the vector count within the namespace.
- Parameters:
namespace (str) – The namespace to describe
- Returns:
Information about the namespace including vector count
- Return type:
NamespaceDescription
- IndexAsyncio.delete_namespace(namespace: str, **kwargs) dict[str, Any]¶
Delete a namespace from an index.
- Parameters:
namespace (str) – The namespace to delete
- Returns:
Response from the delete operation
- Return type:
dict[str, Any]
- IndexAsyncio.list_namespaces(limit: int | None = None, **kwargs) AsyncIterator[ListNamespacesResponse]¶
List all namespaces in an index. This method automatically handles pagination to return all results.
- Parameters:
limit (Optional[int]) – The maximum number of namespaces to return. If unspecified, the server will use a default value. [optional]
- Returns:
Object containing the list of namespaces.
- Return type:
ListNamespacesResponse
Examples
- IndexAsyncio.list_namespaces_paginated(limit: int | None = None, pagination_token: str | None = None, **kwargs) ListNamespacesResponse¶
List all namespaces in an index with pagination support. The response includes pagination information if there are more results available.
Consider using the
list_namespacesmethod to avoid having to handle pagination tokens manually.- Parameters:
limit (Optional[int]) – The maximum number of namespaces to return. If unspecified, the server will use a default value. [optional]
pagination_token (Optional[str]) – A token needed to fetch the next page of results. This token is returned in the response if additional results are available. [optional]
- Returns:
Object containing the list of namespaces and pagination information.
- Return type:
ListNamespacesResponse
Examples
Inference¶
- async AsyncioInference.embed(model: str, inputs: str | list[Dict] | list[str], parameters: dict[str, Any] | None = None) EmbeddingsList[source]¶
Generates embeddings for the provided inputs using the specified model and (optional) parameters.
- Parameters:
model (str, required) – The model to use for generating embeddings.
inputs (list, required) – A list of items to generate embeddings for.
parameters (dict, optional) – A dictionary of parameters to use when generating embeddings.
- Returns:
EmbeddingsListobject with keysdata,model, andusage. Thedatakey contains a list ofnembeddings, wheren= len(inputs). Precision of returned embeddings is either float16 or float32, with float32 being the default.modelkey is the model used to generate the embeddings.usagekey contains the total number of tokens used at request-time.- Return type:
EmbeddingsList
import asyncio from pinecone import PineconeAsyncio async def main(): async with PineconeAsyncio() as pc: inputs = ["Who created the first computer?"] outputs = await pc.inference.embed( model="multilingual-e5-large", inputs=inputs, parameters={"input_type": "passage", "truncate": "END"} ) print(outputs) # EmbeddingsList( # model='multilingual-e5-large', # data=[ # {'values': [0.1, ...., 0.2]}, # ], # usage={'total_tokens': 6} # ) asyncio.run(main())
You can also use a single string input:
import asyncio from pinecone import PineconeAsyncio async def main(): async with PineconeAsyncio() as pc: output = await pc.inference.embed( model="text-embedding-3-small", inputs="Hello, world!" ) asyncio.run(main())
Or use the EmbedModel enum:
import asyncio from pinecone import PineconeAsyncio from pinecone.inference import EmbedModel async def main(): async with PineconeAsyncio() as pc: outputs = await pc.inference.embed( model=EmbedModel.TEXT_EMBEDDING_3_SMALL, inputs=["Document 1", "Document 2"] ) asyncio.run(main())
- async AsyncioInference.rerank(model: str, query: str, documents: list[str] | list[dict[str, Any]], rank_fields: list[str] = ['text'], return_documents: bool = True, top_n: int | None = None, parameters: dict[str, Any] | None = None) RerankResult[source]¶
Rerank documents with associated relevance scores that represent the relevance of each document to the provided query using the specified model.
- Parameters:
model (str, required) – The model to use for reranking.
query (str, required) – The query to compare with documents.
documents (list, required) – A list of documents or strings to rank.
rank_fields (list, optional) – A list of document fields to use for ranking. Defaults to [“text”].
return_documents (bool, optional) – Whether to include the documents in the response. Defaults to True.
top_n (int, optional) – How many documents to return. Defaults to len(documents).
parameters (dict, optional) – A dictionary of parameters to use when ranking documents.
- Returns:
RerankResultobject with keysdataandusage. Thedatakey contains a list ofndocuments, wheren=top_n. The documents are sorted in order of relevance, with the first being the most relevant. Theindexfield can be used to locate the document relative to the list of documents specified in the request. Each document contains ascorekey representing how close the document relates to the query.- Return type:
RerankResult
import asyncio from pinecone import PineconeAsyncio async def main(): async with PineconeAsyncio() as pc: result = await pc.inference.rerank( model="bge-reranker-v2-m3", query="Tell me about tech companies", documents=[ "Apple is a popular fruit known for its sweetness and crisp texture.", "Software is still eating the world.", "Many people enjoy eating apples as a healthy snack.", "Acme Inc. has revolutionized the tech industry with its sleek designs and user-friendly interfaces.", "An apple a day keeps the doctor away, as the saying goes.", ], top_n=2, return_documents=True, ) print(result) # RerankResult( # model='bge-reranker-v2-m3', # data=[ # { index=3, score=0.020980744, # document={text="Acme Inc. has rev..."} }, # { index=1, score=0.00034015716, # document={text="Software is still..."} } # ], # usage={'rerank_units': 1} # ) asyncio.run(main())
You can also use document dictionaries with custom fields:
import asyncio from pinecone import PineconeAsyncio async def main(): async with PineconeAsyncio() as pc: result = await pc.inference.rerank( model="pinecone-rerank-v0", query="What is machine learning?", documents=[ {"text": "Machine learning is a subset of AI.", "category": "tech"}, {"text": "Cooking recipes for pasta.", "category": "food"}, ], rank_fields=["text"], top_n=1 ) asyncio.run(main())
Or use the RerankModel enum:
import asyncio from pinecone import PineconeAsyncio from pinecone.inference import RerankModel async def main(): async with PineconeAsyncio() as pc: result = await pc.inference.rerank( model=RerankModel.PINECONE_RERANK_V0, query="Your query here", documents=["doc1", "doc2", "doc3"] ) asyncio.run(main())
- AsyncioInference.list_models(*, type: str | None = None, vector_type: str | None = None) ModelInfoList[source]¶
List all available models.
- Parameters:
type (str, optional) – The type of model to list. Either “embed” or “rerank”.
vector_type (str, optional) – The type of vector to list. Either “dense” or “sparse”.
- Returns:
A list of models.
- Return type:
ModelInfoList
import asyncio from pinecone import PineconeAsyncio async def main(): async with PineconeAsyncio() as pc: # List all models models = await pc.inference.list_models() # List models, with model type filtering models = await pc.inference.list_models(type="embed") models = await pc.inference.list_models(type="rerank") # List models, with vector type filtering models = await pc.inference.list_models(vector_type="dense") models = await pc.inference.list_models(vector_type="sparse") # List models, with both type and vector type filtering models = await pc.inference.list_models(type="rerank", vector_type="dense") asyncio.run(main())
- AsyncioInference.get_model(model_name: str) ModelInfo[source]¶
Get details on a specific model.
- Parameters:
model_name (str, required) – The name of the model to get details on.
- Returns:
A ModelInfo object.
- Return type:
ModelInfo
import asyncio from pinecone import PineconeAsyncio async def main(): async with PineconeAsyncio() as pc: model_info = await pc.inference.get_model(model_name="text-embedding-3-small") print(model_info) # { # "model": "text-embedding-3-small", # "short_description": "...", # "type": "embed", # ... # } asyncio.run(main())