PineconeAsyncio

class pinecone.PineconeAsyncio(api_key: str | None = None, host: str | None = None, proxy_url: str | None = None, ssl_ca_certs: str | None = None, ssl_verify: bool | None = None, additional_headers: dict[str, str] | None = {}, **kwargs)[source]

PineconeAsyncio is an asyncio client for interacting with Pinecone’s control plane API.

This class implements methods for managing and interacting with Pinecone resources such as collections and indexes.

To perform data operations such as inserting and querying vectors, use the IndexAsyncio class.

import asyncio
from pinecone import PineconeAsyncio

async def main():
    async with PineconeAsyncio() as pc:
        async with pc.IndexAsyncio(host="my-index.pinecone.io") as idx:
            await idx.upsert(vectors=[(1, [1, 2, 3]), (2, [4, 5, 6])])

asyncio.run(main())
PineconeAsyncio.__init__(api_key: str | None = None, host: str | None = None, proxy_url: str | None = None, ssl_ca_certs: str | None = None, ssl_verify: bool | None = None, additional_headers: dict[str, str] | None = {}, **kwargs) None[source]

Initialize the PineconeAsyncio client.

Parameters:
  • api_key (str, optional) – The API key to use for authentication. If not passed via kwarg, the API key will be read from the environment variable PINECONE_API_KEY.

  • host (str, optional) – The control plane host. If unspecified, the host api.pinecone.io will be used.

  • proxy_url (str, optional) – The URL of the proxy to use for the connection.

  • ssl_ca_certs (str, optional) – The path to the SSL CA certificate bundle to use for the connection. This path should point to a file in PEM format. When not passed, the SDK will use the certificate bundle returned from certifi.where().

  • ssl_verify (bool, optional) – SSL verification is performed by default, but can be disabled using the boolean flag when testing with Pinecone Local or troubleshooting a proxy setup. You should never run with SSL verification disabled in production.

  • additional_headers (dict[str, str], optional) – Additional headers to pass to the API. This is mainly to support internal testing at Pinecone. End users should not need to use this unless following specific instructions to do so.

Note

The proxy_headers parameter is not currently supported for PineconeAsyncio.

PineconeAsyncio.IndexAsyncio(host: str, **kwargs) _IndexAsyncio[source]

Build an asyncio-compatible client for index data operations.

Parameters:

host – The host url of the index.

import os
import asyncio

from pinecone import PineconeAsyncio

api_key = os.environ.get("PINECONE_API_KEY")
index_host = os.environ.get("PINECONE_INDEX_HOST")

async def main():
    async with Pinecone(api_key=api_key) as pc:
        async with pc.Index(host=index_host) as idx:
            # Now you're ready to perform data operations
            await index.query(vector=[...], top_k=10)

asyncio.run(main())

To find your host url, you can use the describe_index. Or, alternatively, the host is displayed in the Pinecone web console.

import os
import asyncio

from pinecone import PineconeAsyncio

async def main():
    async with PineconeAsyncio(
        api_key=os.environ.get("PINECONE_API_KEY")
    ) as pc:
        host = await pc.describe_index('index-name').host

asyncio.run(main())

Alternative setup

Like instances of the PineconeAsyncio class, instances of IndexAsyncio have async context that needs to be cleaned up when you are done with it in order to avoid error messages about unclosed session from aiohttp. Nesting these in code is a bit cumbersome, so if you are only planning to do data operations you may prefer to setup the IndexAsyncio object via the Pinecone class which will avoid creating an outer async context.

import os
import asyncio
from pinecone import Pinecone

api_key = os.environ.get("PINECONE_API_KEY")

async def main():
    pc = Pinecone(api_key=api_key) # sync client, so no async context to worry about

    async with pc.AsyncioIndex(host='your_index_host') as idx:
        # Now you're ready to perform data operations
        await idx.query(vector=[...], top_k=10)
async PineconeAsyncio.close() None[source]

Cleanup resources used by the Pinecone client.

This method should be called when the client is no longer needed so that it can cleanup the aiohttp session and other resources.

After close has been called, the client instance should not be used.

import asyncio
from pinecone import PineconeAsyncio

async def main():
    pc = PineconeAsyncio()
    desc = await pc.describe_index(name="my-index")
    await pc.close()

asyncio.run(main())

If you are using the client as a context manager, the close method is called automatically when exiting.

import asyncio
from pinecone import PineconeAsyncio

async def main():
    async with PineconeAsyncio() as pc:
        desc = await pc.describe_index(name="my-index")

    # No need to call close in this case because the "async with" syntax
    # automatically calls close when exiting the block.

asyncio.run(main())

DB Control Plane

Indexes

async PineconeAsyncio.create_index(name: str, spec: Dict | 'ServerlessSpec' | 'PodSpec' | 'ByocSpec', dimension: int | None = None, metric: 'Metric' | str | None = 'cosine', timeout: int | None = None, deletion_protection: 'DeletionProtection' | str | None = 'disabled', vector_type: 'VectorType' | str | None = 'dense', tags: dict[str, str] | None = None) IndexModel[source]

Creates a Pinecone index.

Parameters:
  • name (str) – The name of the index to create. Must be unique within your project and cannot be changed once created. Allowed characters are lowercase letters, numbers, and hyphens and the name may not begin or end with hyphens. Maximum length is 45 characters.

  • metric (str, optional) – Type of similarity metric used in the vector index when querying, one of {"cosine", "dotproduct", "euclidean"}.

  • spec (Dict) – A dictionary containing configurations describing how the index should be deployed. For serverless indexes, specify region and cloud. Optionally, you can specify read_capacity to configure dedicated read capacity mode (OnDemand or Dedicated) and schema to configure which metadata fields are filterable. For pod indexes, specify replicas, shards, pods, pod_type, metadata_config, and source_collection. Alternatively, use the ServerlessSpec or PodSpec objects to specify these configurations.

  • dimension (int) – If you are creating an index with vector_type="dense" (which is the default), you need to specify dimension to indicate the size of your vectors. This should match the dimension of the embeddings you will be inserting. For example, if you are using OpenAI’s CLIP model, you should use dimension=1536. Dimension is a required field when creating an index with vector_type="dense" and should not be passed when vector_type="sparse".

  • timeout (int, optional) – Specify the number of seconds to wait until index gets ready. If None, wait indefinitely; if >=0, time out after this many seconds; if -1, return immediately and do not wait.

  • deletion_protection (Optional[Literal["enabled", "disabled"]]) – If enabled, the index cannot be deleted. If disabled, the index can be deleted.

  • vector_type (str, optional) – The type of vectors to be stored in the index. One of {"dense", "sparse"}.

  • tags (Optional[dict[str, str]]) – Tags are key-value pairs you can attach to indexes to better understand, organize, and identify your resources. Some example use cases include tagging indexes with the name of the model that generated the embeddings, the date the index was created, or the purpose of the index.

Returns:

A IndexModel instance containing a description of the index that was created.

Creating a serverless index

import os
import asyncio

from pinecone import (
    PineconeAsyncio,
    ServerlessSpec,
    CloudProvider,
    AwsRegion,
    Metric,
    DeletionProtection,
    VectorType
)

async def main():
    async with PineconeAsyncio(api_key=os.environ.get("PINECONE_API_KEY")) as pc:
        await pc.create_index(
            name="my_index",
            dimension=1536,
            metric=Metric.COSINE,
            spec=ServerlessSpec(
                cloud=CloudProvider.AWS,
                region=AwsRegion.US_WEST_2,
                read_capacity={
                    "mode": "Dedicated",
                    "dedicated": {
                        "node_type": "t1",
                        "scaling": "Manual",
                        "manual": {"shards": 2, "replicas": 2},
                    },
                },
                schema={
                    "genre": {"filterable": True},
                    "year": {"filterable": True},
                    "rating": {"filterable": True},
                },
            ),
            deletion_protection=DeletionProtection.DISABLED,
            vector_type=VectorType.DENSE,
            tags={
                "model": "clip",
                "app": "image-search",
                "env": "production"
            }
        )

asyncio.run(main())

Creating a pod index

import os
import asyncio

from pinecone import (
    Pinecone,
    PodSpec,
    PodIndexEnvironment,
    PodType,
    Metric,
    DeletionProtection,
    VectorType
)

async def main():
    async with Pinecone(api_key=os.environ.get("PINECONE_API_KEY")) as pc:
        await pc.create_index(
            name="my_index",
            dimension=1536,
            metric=Metric.COSINE,
            spec=PodSpec(
                environment=PodIndexEnvironment.US_EAST4_GCP,
                pod_type=PodType.P1_X1
            ),
            deletion_protection=DeletionProtection.DISABLED,
            tags={
                "model": "clip",
                "app": "image-search",
                "env": "testing"
            }
        )

asyncio.run(main())
async PineconeAsyncio.create_index_for_model(name: str, cloud: 'CloudProvider' | str, region: 'AwsRegion' | 'GcpRegion' | 'AzureRegion' | str, embed: IndexEmbed' | 'CreateIndexForModelEmbedTypedDict, tags: dict[str, str] | None = None, deletion_protection: 'DeletionProtection' | str | None = 'disabled', read_capacity: 'ReadCapacityDict' | 'ReadCapacity' | 'ReadCapacityOnDemandSpec' | 'ReadCapacityDedicatedSpec' | None = None, schema: dict[str, 'MetadataSchemaFieldConfig'] | dict[str, dict[str, Any]] | 'BackupModelSchema' | None = None, timeout: int | None = None) IndexModel[source]
Parameters:
  • name (str) – The name of the index to create. Must be unique within your project and cannot be changed once created. Allowed characters are lowercase letters, numbers, and hyphens and the name may not begin or end with hyphens. Maximum length is 45 characters.

  • cloud (str) – The cloud provider to use for the index. One of {"aws", "gcp", "azure"}.

  • region (str) – The region to use for the index. Enum objects AwsRegion, GcpRegion, and AzureRegion are also available to help you quickly set these parameters, but may not be up to date as new regions become available.

  • embed (Union[Dict, IndexEmbed]) – The embedding configuration for the index. This param accepts a dictionary or an instance of the IndexEmbed object.

  • tags (Optional[dict[str, str]]) – Tags are key-value pairs you can attach to indexes to better understand, organize, and identify your resources. Some example use cases include tagging indexes with the name of the model that generated the embeddings, the date the index was created, or the purpose of the index.

  • deletion_protection (Optional[Literal["enabled", "disabled"]]) – If enabled, the index cannot be deleted. If disabled, the index can be deleted. This setting can be changed with configure_index.

  • read_capacity (Optional[Union[ReadCapacityDict, ReadCapacity, ReadCapacityOnDemandSpec, ReadCapacityDedicatedSpec]]) – Optional read capacity configuration. You can specify read_capacity to configure dedicated read capacity mode (OnDemand or Dedicated). See ServerlessSpec documentation for details on read capacity configuration.

  • schema (Optional[Union[dict[str, MetadataSchemaFieldConfig], dict[str, dict[str, Any]], BackupModelSchema]]) – Optional metadata schema configuration. You can specify schema to configure which metadata fields are filterable. The schema can be provided as a dictionary mapping field names to their configurations (e.g., {"genre": {"filterable": True}}) or as a dictionary with a fields key (e.g., {"fields": {"genre": {"filterable": True}}}).

  • timeout (Optional[int]) – Specify the number of seconds to wait until index is ready to receive data. If None, wait indefinitely; if >=0, time out after this many seconds; if -1, return immediately and do not wait.

Returns:

A description of the index that was created.

Return type:

IndexModel

This method is used to create a Serverless index that is configured for use with Pinecone’s integrated inference models.

The resulting index can be described, listed, configured, and deleted like any other Pinecone index with the describe_index, list_indexes, configure_index, and delete_index methods.

After the model is created, you can upsert records into the index with the upsert_records method, and search your records with the search method.

import asyncio

from pinecone import (
    PineconeAsyncio,
    IndexEmbed,
    CloudProvider,
    AwsRegion,
    EmbedModel,
    Metric,
)

async def main():
    async with PineconeAsyncio() as pc:
        if not await pc.has_index("book-search"):
            desc = await pc.create_index_for_model(
                name="book-search",
                cloud=CloudProvider.AWS,
                region=AwsRegion.US_EAST_1,
                embed=IndexEmbed(
                    model=EmbedModel.Multilingual_E5_Large,
                    metric=Metric.COSINE,
                    field_map={
                        "text": "description",
                    },
                )
            )

asyncio.run(main())

Creating an index for model with schema and dedicated read capacity

import asyncio

from pinecone import (
    PineconeAsyncio,
    IndexEmbed,
    CloudProvider,
    AwsRegion,
    EmbedModel,
    Metric,
)

async def main():
    async with PineconeAsyncio() as pc:
        if not await pc.has_index("book-search"):
            desc = await pc.create_index_for_model(
                name="book-search",
                cloud=CloudProvider.AWS,
                region=AwsRegion.US_EAST_1,
                embed=IndexEmbed(
                    model=EmbedModel.Multilingual_E5_Large,
                    metric=Metric.COSINE,
                    field_map={
                        "text": "description",
                    },
                ),
                read_capacity={
                    "mode": "Dedicated",
                    "dedicated": {
                        "node_type": "t1",
                        "scaling": "Manual",
                        "manual": {"shards": 2, "replicas": 2},
                    },
                },
                schema={
                    "genre": {"filterable": True},
                    "year": {"filterable": True},
                    "rating": {"filterable": True},
                },
            )

asyncio.run(main())

See also:

PineconeAsyncio.create_index_from_backup(*, name: str, backup_id: str, deletion_protection: 'DeletionProtection' | str | None = 'disabled', tags: dict[str, str] | None = None, timeout: int | None = None) IndexModel[source]

Create an index from a backup.

Call list_backups to get a list of backups for your project.

Parameters:
  • name (str) – The name of the index to create.

  • backup_id (str) – The ID of the backup to restore.

  • deletion_protection (Optional[Literal["enabled", "disabled"]]) – If enabled, the index cannot be deleted. If disabled, the index can be deleted. This setting can be changed with configure_index.

  • tags (Optional[dict[str, str]]) – Tags are key-value pairs you can attach to indexes to better understand, organize, and identify your resources. Some example use cases include tagging indexes with the name of the model that generated the embeddings, the date the index was created, or the purpose of the index.

  • timeout – Specify the number of seconds to wait until index is ready to receive data. If None, wait indefinitely; if >=0, time out after this many seconds; if -1, return immediately and do not wait.

Returns:

A description of the index that was created.

Return type:

IndexModel

async PineconeAsyncio.list_indexes() IndexList[source]
Returns:

Returns an IndexList object, which is iterable and contains a list of IndexModel objects. The IndexList also has a convenience method names() which returns a list of index names for situations where you just want to iterate over all index names.

Lists all indexes in your project.

The results include a description of all indexes in your project, including the index name, dimension, metric, status, and spec.

If you simply want to check whether an index exists, see the has_index() convenience method.

You can use the list_indexes() method to iterate over descriptions of every index in your project.

import asyncio

from pinecone import PineconeAsyncio

async def main():
    pc = PineconeAsyncio()

    available_indexes = await pc.list_indexes()
    for index in available_indexes:
        print(index.name)
        print(index.dimension)
        print(index.metric)
        print(index.status)
        print(index.host)
        print(index.spec)

    await pc.close()

asyncio.run(main())
async PineconeAsyncio.describe_index(name: str) IndexModel[source]
Parameters:

name – the name of the index to describe.

Returns:

Returns an IndexModel object which gives access to properties such as the index name, dimension, metric, host url, status, and spec.

Describes a Pinecone index.

Getting your index host url

In a real production situation, you probably want to store the host url in an environment variable so you don’t have to call describe_index and re-fetch it every time you want to use the index. But this example shows how to get the value from the API using describe_index.

import asyncio
from pinecone import Pinecone, PineconeAsyncio, Index

async def main():
    pc = PineconeAsyncio()

    index_name="my_index"
    description = await pc.describe_index(name=index_name)
    print(description)
    # {
    #     "name": "my_index",
    #     "metric": "cosine",
    #     "host": "my_index-dojoi3u.svc.aped-4627-b74a.pinecone.io",
    #     "spec": {
    #         "serverless": {
    #             "cloud": "aws",
    #             "region": "us-east-1"
    #         }
    #     },
    #     "status": {
    #         "ready": true,
    #         "state": "Ready"
    #     },
    #     "vector_type": "dense",
    #     "dimension": 1024,
    #     "deletion_protection": "enabled",
    #     "tags": {
    #         "environment": "production"
    #     }
    # }

    print(f"Your index is hosted at {description.host}")
    await pc.close()

    async with Pinecone().IndexAsyncio(host=description.host) as idx:
        await idx.upsert(vectors=[...])

asyncio.run(main())
async PineconeAsyncio.configure_index(name: str, replicas: int | None = None, pod_type: 'PodType' | str | None = None, deletion_protection: 'DeletionProtection' | str | None = None, tags: dict[str, str] | None = None, embed: 'ConfigureIndexEmbed' | Dict | None = None, read_capacity: 'ReadCapacityDict' | 'ReadCapacity' | 'ReadCapacityOnDemandSpec' | 'ReadCapacityDedicatedSpec' | None = None) None[source]
Param:

name: the name of the Index

Param:

replicas: the desired number of replicas, lowest value is 0.

Param:

pod_type: the new pod_type for the index. To learn more about the available pod types, please see Understanding Indexes

Param:

deletion_protection: If set to ‘enabled’, the index cannot be deleted. If ‘disabled’, the index can be deleted.

Param:

tags: A dictionary of tags to apply to the index. Tags are key-value pairs that can be used to organize and manage indexes. To remove a tag, set the value to “”. Tags passed to configure_index will be merged with existing tags and any with the value empty string will be removed.

Parameters:
  • embed (Optional[Union[ConfigureIndexEmbed, Dict]], optional) – configures the integrated inference embedding settings for the index. You can convert an existing index to an integrated index by specifying the embedding model and field_map. The index vector type and dimension must match the model vector type and dimension, and the index similarity metric must be supported by the model. You can later change the embedding configuration to update the field_map, read_parameters, or write_parameters. Once set, the model cannot be changed.

  • read_capacity (Optional[Union[ReadCapacityDict, ReadCapacity, ReadCapacityOnDemandSpec, ReadCapacityDedicatedSpec]]) – Optional read capacity configuration for serverless indexes. You can specify read_capacity to configure dedicated read capacity mode (OnDemand or Dedicated). See ServerlessSpec documentation for details on read capacity configuration. Note that read capacity configuration is only available for serverless indexes.

This method is used to modify an index’s configuration. It can be used to:

  • Configure read capacity for serverless indexes using read_capacity

  • Scale a pod-based index horizontally using replicas

  • Scale a pod-based index vertically using pod_type

  • Enable or disable deletion protection using deletion_protection

  • Add, change, or remove tags using tags

Configuring read capacity for serverless indexes

To configure read capacity for serverless indexes, pass the read_capacity parameter to the configure_index method. You can configure either OnDemand or Dedicated read capacity mode.

import asyncio
from pinecone import PineconeAsyncio

async def main():
    async with PineconeAsyncio() as pc:
        # Configure to OnDemand read capacity (default)
        await pc.configure_index(
            name="my_index",
            read_capacity={"mode": "OnDemand"}
        )

        # Configure to Dedicated read capacity with manual scaling
        await pc.configure_index(
            name="my_index",
            read_capacity={
                "mode": "Dedicated",
                "dedicated": {
                    "node_type": "t1",
                    "scaling": "Manual",
                    "manual": {"shards": 1, "replicas": 1}
                }
            }
        )

        # Verify the configuration was applied
        desc = await pc.describe_index("my_index")
        assert desc.spec.serverless.read_capacity.mode == "Dedicated"

asyncio.run(main())

Scaling pod-based indexes

To scale your pod-based index, you pass a replicas and/or pod_type param to the configure_index method. pod_type may be a string or a value from the PodType enum.

import asyncio
from pinecone import PineconeAsyncio, PodType

async def main():
    async with PineconeAsyncio() as pc:
        await pc.configure_index(
            name="my_index",
            replicas=2,
            pod_type=PodType.P1_X2
        )

asyncio.run(main())

After providing these new configurations, you must call describe_index to see the status of the index as the changes are applied.

Enabling or disabling deletion protection

To enable or disable deletion protection, pass the deletion_protection parameter to the configure_index method. When deletion protection is enabled, the index cannot be deleted with the delete_index method.

import asyncio
from pinecone import PineconeAsyncio, DeletionProtection

async def main():
    async with PineconeAsyncio() as pc:
        # Enable deletion protection
        await pc.configure_index(
            name="my_index",
            deletion_protection=DeletionProtection.ENABLED
        )

        # Call describe_index to see the change was applied.
        desc = await pc.describe_index("my_index")
        assert desc.deletion_protection == "enabled"

        # Disable deletion protection
        await pc.configure_index(
            name="my_index",
            deletion_protection=DeletionProtection.DISABLED
        )

asyncio.run(main())

Adding, changing, or removing tags

To add, change, or remove tags, pass the tags parameter to the configure_index method. When tags are passed using configure_index, they are merged with any existing tags already on the index. To remove a tag, set the value of the key to an empty string.

import asyncio

from pinecone import PineconeAsyncio

async def main():
    async with PineconeAsyncio() as pc:
        # Add a tag
        await pc.configure_index(name="my_index", tags={"environment": "staging"})

        # Change a tag
        await pc.configure_index(name="my_index", tags={"environment": "production"})

        # Remove a tag
        await pc.configure_index(name="my_index", tags={"environment": ""})

        # Call describe_index to view the tags are changed
        await pc.describe_index("my_index")
        print(desc.tags)

asyncio.run(main())
async PineconeAsyncio.delete_index(name: str, timeout: int | None = None) None[source]
Parameters:
  • name (str) – the name of the index.

  • timeout (int, optional) – Number of seconds to poll status checking whether the index has been deleted. If None, wait indefinitely; if >=0, time out after this many seconds; if -1, return immediately and do not wait.

Deletes a Pinecone index.

Deleting an index is an irreversible operation. All data in the index will be lost. When you use this command, a request is sent to the Pinecone control plane to delete the index, but the termination is not synchronous because resources take a few moments to be released.

By default the delete_index method will block until polling of the describe_index method shows that the delete operation has completed. If you prefer to return immediately and not wait for the index to be deleted, you can pass timeout=-1 to the method.

After the delete request is submitted, polling describe_index will show that the index transitions into a Terminating state before eventually resulting in a 404 after it has been removed.

This operation can fail if the index is configured with deletion_protection="enabled". In this case, you will need to call configure_index to disable deletion protection before you can delete the index.

import asyncio

from pinecone import PineconeAsyncio

async def main():
    pc = PineconeAsyncio()

    index_name = "my_index"
    desc = await pc.describe_index(name=index_name)

    if desc.deletion_protection == "enabled":
        # If for some reason deletion protection is enabled, you will need to disable it first
        # before you can delete the index. But use caution as this operation is not reversible
        # and if somebody enabled deletion protection, they probably had a good reason.
        await pc.configure_index(name=index_name, deletion_protection="disabled")

    await pc.delete_index(name=index_name)
    await pc.close()

asyncio.run(main())
async PineconeAsyncio.has_index(name: str) bool[source]
Parameters:

name – The name of the index to check for existence.

Returns:

Returns True if the index exists, False otherwise.

Checks if a Pinecone index exists.

import asyncio
from pinecone import PineconeAsyncio, ServerlessSpec

async def main():
    async with PineconeAsyncio() as pc:
        index_name = "my_index"
        if not await pc.has_index(index_name):
            print("Index does not exist, creating...")
            pc.create_index(
                name=index_name,
                dimension=768,
                metric="cosine",
                spec=ServerlessSpec(cloud="aws", region="us-west-2")
            )

asyncio.run(main())

Backups

PineconeAsyncio.create_backup(*, index_name: str, backup_name: str, description: str = '') BackupModel[source]

Create a backup of an index.

Parameters:
  • index_name (str) – The name of the index to backup.

  • backup_name (str) – The name to give the backup.

  • description (str) – Optional description of the backup.

PineconeAsyncio.list_backups(*, index_name: str | None = None, limit: int | None = 10, pagination_token: str | None = None) BackupList[source]

List backups.

If index_name is provided, the backups will be filtered by index. If no index_name is provided, all backups in the projectwill be returned.

Parameters:
  • index_name (str) – The name of the index to list backups for.

  • limit (int) – The maximum number of backups to return.

  • pagination_token (str) – The pagination token to use for pagination.

PineconeAsyncio.describe_backup(*, backup_id: str) BackupModel[source]

Describe a backup.

Parameters:

backup_id (str) – The ID of the backup to describe.

PineconeAsyncio.delete_backup(*, backup_id: str) None[source]

Delete a backup.

Parameters:

backup_id (str) – The ID of the backup to delete.

Collections

async PineconeAsyncio.create_collection(name: str, source: str) None[source]

Create a collection from a pod-based index

Parameters:
  • name – Name of the collection

  • source – Name of the source index

async PineconeAsyncio.list_collections() CollectionList[source]

List all collections

import asyncio
from pinecone import PineconeAsyncio

async def main():
    pc = PineconeAsyncio()

    collections = await pc.list_collections()
    for collection in collections:
        print(collection.name)
        print(collection.source)

    # You can also iterate specifically over
    # a list of collection names by calling
    # the .names() helper.
    collection_name = "my_collection"
    collections = await pc.list_collections()
    if collection_name in collections.names():
        print('Collection exists')

    await pc.close()

asyncio.run(main())
async PineconeAsyncio.describe_collection(name: str) dict[str, Any][source]

Describes a collection. :param: The name of the collection :return: Description of the collection

import asyncio
from pinecone import PineconeAsyncio

async def main():
    async with PineconeAsyncio() as pc:
        description = await pc.describe_collection("my_collection")
        print(description.name)
        print(description.source)
        print(description.status)
        print(description.size)

asyncio.run(main())
async PineconeAsyncio.delete_collection(name: str) None[source]

Describes a collection. :param: The name of the collection :return: Description of the collection

import asyncio
from pinecone import PineconeAsyncio

async def main():
    async with PineconeAsyncio() as pc:

        description = await pc.describe_collection("my_collection")
        print(description.name)
        print(description.source)
        print(description.status)
        print(description.size)

asyncio.run(main())

Restore Jobs

PineconeAsyncio.list_restore_jobs(*, limit: int | None = 10, pagination_token: str | None = None) RestoreJobList[source]

List restore jobs.

Parameters:
  • limit (int) – The maximum number of restore jobs to return.

  • pagination_token (str) – The pagination token to use for pagination.

PineconeAsyncio.describe_restore_job(*, job_id: str) RestoreJobModel[source]

Describe a restore job.

Parameters:

job_id (str) – The ID of the restore job to describe.

DB Data Plane

pinecone.db_data.IndexAsyncio

alias of _IndexAsyncio

IndexAsyncio.__init__(api_key: str, host: str, additional_headers: dict[str, str] | None = {}, openapi_config=None, **kwargs) None
IndexAsyncio.describe_index_stats(filter: dict[str, str | int | float | bool] | dict[Literal['$eq'], str | int | float | bool] | dict[Literal['$ne'], str | int | float | bool] | dict[Literal['$gt'], int | float] | dict[Literal['$gte'], int | float] | dict[Literal['$lt'], int | float] | dict[Literal['$lte'], int | float] | dict[Literal['$in'], list[str | int | float | bool]] | dict[Literal['$nin'], list[str | int | float | bool]] | dict[Literal['$exists'], bool] | dict[Literal['$and'], list[dict[str, str | int | float | bool] | dict[Literal['$eq'], str | int | float | bool] | dict[Literal['$ne'], str | int | float | bool] | dict[Literal['$gt'], int | float] | dict[Literal['$gte'], int | float] | dict[Literal['$lt'], int | float] | dict[Literal['$lte'], int | float] | dict[Literal['$in'], list[str | int | float | bool]] | dict[Literal['$nin'], list[str | int | float | bool]] | dict[Literal['$exists'], bool]]] | dict[Literal['$or'], list[dict[str, str | int | float | bool] | dict[Literal['$eq'], str | int | float | bool] | dict[Literal['$ne'], str | int | float | bool] | dict[Literal['$gt'], int | float] | dict[Literal['$gte'], int | float] | dict[Literal['$lt'], int | float] | dict[Literal['$lte'], int | float] | dict[Literal['$in'], list[str | int | float | bool]] | dict[Literal['$nin'], list[str | int | float | bool]] | dict[Literal['$exists'], bool]]] | None = None, **kwargs) IndexDescription

The DescribeIndexStats operation returns statistics about the index’s contents. For example: The vector count per namespace and the number of dimensions.

Parameters:
  • filter (dict[str, Union[str, float, int, bool, List, dict]])

  • present (If this parameter is)

  • filter. (the operation only returns statistics for vectors that satisfy the)

  • <https (See `metadata filtering) –

    //www.pinecone.io/docs/metadata-filtering/>_` [optional]

Returns: DescribeIndexStatsResponse object which contains stats about the index.

import asyncio
from pinecone import Pinecone, Vector, SparseValues

async def main():
    pc = Pinecone()
    async with pc.IndexAsyncio(host="example-dojoi3u.svc.aped-4627-b74a.pinecone.io") as idx:
        print(await idx.describe_index_stats())

asyncio.run(main())

Vectors

IndexAsyncio.upsert(vectors: list[Vector] | list[tuple[str, list[float]]] | list[tuple[str, list[float], dict[str, str | int | float | list[str] | list[int] | list[float]]]] | list[VectorTypedDict], namespace: str | None = None, batch_size: int | None = None, show_progress: bool = True, **kwargs) UpsertResponse
Parameters:
  • vectors (Union[list[Vector], list[VectorTuple], list[VectorTupleWithMetadata], list[VectorTypedDict]]) – A list of vectors to upsert.

  • namespace (str) – The namespace to write to. If not specified, the default namespace is used. [optional]

  • batch_size (int) – The number of vectors to upsert in each batch. If not specified, all vectors will be upserted in a single batch. [optional]

  • show_progress (bool) – Whether to show a progress bar using tqdm. Applied only if batch_size is provided. Default is True.

Returns:

UpsertResponse, includes the number of vectors upserted.

The upsert operation writes vectors into a namespace. If a new value is upserted for an existing vector id, it will overwrite the previous value.

To upsert in parallel follow this link.

Upserting dense vectors

Note

The dimension of each dense vector must match the dimension of the index.

A vector can be represented in a variety of ways.

import asyncio
from pinecone import Pinecone, Vector

async def main():
    pc = Pinecone()
    async with pc.IndexAsyncio(host="example-dojoi3u.svc.aped-4627-b74a.pinecone.io") as idx:
        # A Vector object
        await idx.upsert(
            namespace = 'my-namespace',
            vectors = [
                Vector(id='id1', values=[0.1, 0.2, 0.3, 0.4], metadata={'metadata_key': 'metadata_value'}),
            ]
        )

        # A vector tuple
        await idx.upsert(
            namespace = 'my-namespace',
            vectors = [
                ('id1', [0.1, 0.2, 0.3, 0.4]),
            ]
        )

        # A vector tuple with metadata
        await idx.upsert(
            namespace = 'my-namespace',
            vectors = [
                ('id1', [0.1, 0.2, 0.3, 0.4], {'metadata_key': 'metadata_value'}),
            ]
        )

        # A vector dictionary
        await idx.upsert(
            namespace = 'my-namespace',
            vectors = [
                {"id": 1, "values": [0.1, 0.2, 0.3, 0.4], "metadata": {"metadata_key": "metadata_value"}},
            ]

asyncio.run(main())

Upserting sparse vectors

import asyncio
from pinecone import Pinecone, Vector, SparseValues

async def main():
    pc = Pinecone()
    async with pc.IndexAsyncio(host="example-dojoi3u.svc.aped-4627-b74a.pinecone.io") as idx:
        # A Vector object
        await idx.upsert(
            namespace = 'my-namespace',
            vectors = [
                Vector(id='id1', sparse_values=SparseValues(indices=[1, 2], values=[0.2, 0.4])),
            ]
        )

        # A dictionary
        await idx.upsert(
            namespace = 'my-namespace',
            vectors = [
                {"id": 1, "sparse_values": {"indices": [1, 2], "values": [0.2, 0.4]}},
            ]
        )

asyncio.run(main())

Batch upsert

If you have a large number of vectors, you can upsert them in batches.

import asyncio
from pinecone import Pinecone, Vector, SparseValues

async def main():
    pc = Pinecone()
    async with pc.IndexAsyncio(host="example-dojoi3u.svc.aped-4627-b74a.pinecone.io") as idx:

    await idx.upsert(
        namespace = 'my-namespace',
        vectors = [
            {'id': 'id1', 'values': [0.1, 0.2, 0.3, 0.4]},
            {'id': 'id2', 'values': [0.2, 0.3, 0.4, 0.5]},
            {'id': 'id3', 'values': [0.3, 0.4, 0.5, 0.6]},
            {'id': 'id4', 'values': [0.4, 0.5, 0.6, 0.7]},
            {'id': 'id5', 'values': [0.5, 0.6, 0.7, 0.8]},
            # More vectors here
        ],
        batch_size = 50
    )

asyncio.run(main())

Visual progress bar with tqdm

To see a progress bar when upserting in batches, you will need to separately install the tqdm package. If tqdm is present, the client will detect and use it to display progress when show_progress=True.

IndexAsyncio.query(*args, top_k: int, vector: list[float] | None = None, id: str | None = None, namespace: str | None = None, filter: dict[str, str | int | float | bool] | dict[Literal['$eq'], str | int | float | bool] | dict[Literal['$ne'], str | int | float | bool] | dict[Literal['$gt'], int | float] | dict[Literal['$gte'], int | float] | dict[Literal['$lt'], int | float] | dict[Literal['$lte'], int | float] | dict[Literal['$in'], list[str | int | float | bool]] | dict[Literal['$nin'], list[str | int | float | bool]] | dict[Literal['$exists'], bool] | dict[Literal['$and'], list[dict[str, str | int | float | bool] | dict[Literal['$eq'], str | int | float | bool] | dict[Literal['$ne'], str | int | float | bool] | dict[Literal['$gt'], int | float] | dict[Literal['$gte'], int | float] | dict[Literal['$lt'], int | float] | dict[Literal['$lte'], int | float] | dict[Literal['$in'], list[str | int | float | bool]] | dict[Literal['$nin'], list[str | int | float | bool]] | dict[Literal['$exists'], bool]]] | dict[Literal['$or'], list[dict[str, str | int | float | bool] | dict[Literal['$eq'], str | int | float | bool] | dict[Literal['$ne'], str | int | float | bool] | dict[Literal['$gt'], int | float] | dict[Literal['$gte'], int | float] | dict[Literal['$lt'], int | float] | dict[Literal['$lte'], int | float] | dict[Literal['$in'], list[str | int | float | bool]] | dict[Literal['$nin'], list[str | int | float | bool]] | dict[Literal['$exists'], bool]]] | None = None, include_values: bool | None = None, include_metadata: bool | None = None, sparse_vector: SparseValues | SparseVectorTypedDict | None = None, **kwargs) QueryResponse

The Query operation searches a namespace, using a query vector. It retrieves the ids of the most similar items in a namespace, along with their similarity scores.

Querying with dense vectors

import asyncio
from pinecone import Pinecone, Vector, SparseValues

async def main():
    pc = Pinecone()
    async with pc.IndexAsyncio(host="example-dojoi3u.svc.aped-4627-b74a.pinecone.io") as idx:
        query_embedding = [0.1, 0.2, 0.3, ...] # An embedding that matches the index dimension

        # Query by vector values
        results = await idx.query(
            vector=query_embedding,
            top_k=10,
            filter={'genre': {"$eq": "drama"}}, # Optionally filter by metadata
            namespace='my_namespace',
            include_values=False,
            include_metadata=True
        )

        # Query using vector id (the values from this stored vector will be used to query)
        results = await idx.query(
            id='1',
            top_k=10,
            filter={"year": {"$gt": 2000}},
            namespace='my_namespace',
        )

asyncio.run(main())

Query with sparse vectors

import asyncio
from pinecone import Pinecone, Vector, SparseValues

async def main():
    pc = Pinecone()
    async with pc.IndexAsyncio(host="example-dojoi3u.svc.aped-4627-b74a.pinecone.io") as idx:
        query_embedding = [0.1, 0.2, 0.3, ...] # An embedding that matches the index dimension

        # Query by vector values
        results = await idx.query(
            vector=query_embedding,
            top_k=10,
            filter={'genre': {"$eq": "drama"}}, # Optionally filter by metadata
            namespace='my_namespace',
            include_values=False,
            include_metadata=True
        )

        # Query using vector id (the values from this stored vector will be used to query)
        results = await idx.query(
            id='1',
            top_k=10,
            filter={"year": {"$gt": 2000}},
            namespace='my_namespace',
        )

asyncio.run(main())

Examples:

>>> index.query(vector=[1, 2, 3], top_k=10, namespace='my_namespace')
>>> index.query(id='id1', top_k=10, namespace='my_namespace')
>>> index.query(vector=[1, 2, 3], top_k=10, namespace='my_namespace', filter={'key': 'value'})
>>> index.query(id='id1', top_k=10, namespace='my_namespace', include_metadata=True, include_values=True)
>>> index.query(vector=[1, 2, 3], sparse_vector={'indices': [1, 2], 'values': [0.2, 0.4]},
>>>             top_k=10, namespace='my_namespace')
>>> index.query(vector=[1, 2, 3], sparse_vector=SparseValues([1, 2], [0.2, 0.4]),
>>>             top_k=10, namespace='my_namespace')
Parameters:
  • vector (list[float]) – The query vector. This should be the same length as the dimension of the index being queried. Each query() request can contain only one of the parameters id or vector.. [optional]

  • id (str) – The unique ID of the vector to be used as a query vector. Each query() request can contain only one of the parameters vector or id. [optional]

  • top_k (int) – The number of results to return for each query. Must be an integer greater than 1.

  • namespace (str) – The namespace to fetch vectors from. If not specified, the default namespace is used. [optional]

  • filter (dict[str, Union[str, float, int, bool, List, dict]) – The filter to apply. You can use vector metadata to limit your search. See metadata filtering <https://www.pinecone.io/docs/metadata-filtering/>_ [optional]

  • include_values (bool) – Indicates whether vector values are included in the response. If omitted the server will use the default value of False [optional]

  • include_metadata (bool) – Indicates whether metadata is included in the response as well as the ids. If omitted the server will use the default value of False [optional]

  • sparse_vector

    (Union[SparseValues, dict[str, Union[list[float], list[int]]]]): sparse values of the query vector. Expected to be either a SparseValues object or a dict of the form:

    {‘indices’: list[int], ‘values’: list[float]}, where the lists each have the same length.

Returns: QueryResponse object which contains the list of the closest vectors as ScoredVector objects,

and namespace name.

IndexAsyncio.query_namespaces(namespaces: list[str], metric: Literal['cosine', 'euclidean', 'dotproduct'], top_k: int | None = None, filter: dict[str, str | int | float | bool] | dict[Literal['$eq'], str | int | float | bool] | dict[Literal['$ne'], str | int | float | bool] | dict[Literal['$gt'], int | float] | dict[Literal['$gte'], int | float] | dict[Literal['$lt'], int | float] | dict[Literal['$lte'], int | float] | dict[Literal['$in'], list[str | int | float | bool]] | dict[Literal['$nin'], list[str | int | float | bool]] | dict[Literal['$exists'], bool] | dict[Literal['$and'], list[dict[str, str | int | float | bool] | dict[Literal['$eq'], str | int | float | bool] | dict[Literal['$ne'], str | int | float | bool] | dict[Literal['$gt'], int | float] | dict[Literal['$gte'], int | float] | dict[Literal['$lt'], int | float] | dict[Literal['$lte'], int | float] | dict[Literal['$in'], list[str | int | float | bool]] | dict[Literal['$nin'], list[str | int | float | bool]] | dict[Literal['$exists'], bool]]] | dict[Literal['$or'], list[dict[str, str | int | float | bool] | dict[Literal['$eq'], str | int | float | bool] | dict[Literal['$ne'], str | int | float | bool] | dict[Literal['$gt'], int | float] | dict[Literal['$gte'], int | float] | dict[Literal['$lt'], int | float] | dict[Literal['$lte'], int | float] | dict[Literal['$in'], list[str | int | float | bool]] | dict[Literal['$nin'], list[str | int | float | bool]] | dict[Literal['$exists'], bool]]] | None = None, include_values: bool | None = None, include_metadata: bool | None = None, vector: list[float] | None = None, sparse_vector: SparseValues | SparseVectorTypedDict | None = None, **kwargs) QueryNamespacesResults

The query_namespaces() method is used to make a query to multiple namespaces in parallel and combine the results into one result set.

Parameters:
  • vector (list[float]) – The query vector, must be the same length as the dimension of the index being queried.

  • namespaces (list[str]) – The list of namespaces to query.

  • top_k (Optional[int], optional) – The number of results you would like to request from each namespace. Defaults to 10.

  • filter (Optional[dict[str, Union[str, float, int, bool, List, dict]]], optional) – Pass an optional filter to filter results based on metadata. Defaults to None.

  • include_values (Optional[bool], optional) – Boolean field indicating whether vector values should be included with results. Defaults to None.

  • include_metadata (Optional[bool], optional) – Boolean field indicating whether vector metadata should be included with results. Defaults to None.

  • sparse_vector (Optional[ Union[SparseValues, dict[str, Union[list[float], list[int]]]] ], optional) – If you are working with a dotproduct index, you can pass a sparse vector as part of your hybrid search. Defaults to None.

Returns:

A QueryNamespacesResults object containing the combined results from all namespaces, as well as the combined usage cost in read units.

Return type:

QueryNamespacesResults

Examples:

import asyncio
from pinecone import Pinecone

async def main():
    pc = Pinecone(api_key="your-api-key")
    idx = pc.IndexAsyncio(
        host="example-dojoi3u.svc.aped-4627-b74a.pinecone.io",
    )

    query_vec = [0.1, 0.2, 0.3] # An embedding that matches the index dimension
    combined_results = await idx.query_namespaces(
        vector=query_vec,
        namespaces=['ns1', 'ns2', 'ns3', 'ns4'],
        top_k=10,
        filter={'genre': {"$eq": "drama"}},
        include_values=True,
        include_metadata=True
    )
    for vec in combined_results.matches:
        print(vec.id, vec.score)
    print(combined_results.usage)

    await idx.close()

asyncio.run(main())
IndexAsyncio.delete(ids: list[str] | None = None, delete_all: bool | None = None, namespace: str | None = None, filter: dict[str, str | int | float | bool] | dict[Literal['$eq'], str | int | float | bool] | dict[Literal['$ne'], str | int | float | bool] | dict[Literal['$gt'], int | float] | dict[Literal['$gte'], int | float] | dict[Literal['$lt'], int | float] | dict[Literal['$lte'], int | float] | dict[Literal['$in'], list[str | int | float | bool]] | dict[Literal['$nin'], list[str | int | float | bool]] | dict[Literal['$exists'], bool] | dict[Literal['$and'], list[dict[str, str | int | float | bool] | dict[Literal['$eq'], str | int | float | bool] | dict[Literal['$ne'], str | int | float | bool] | dict[Literal['$gt'], int | float] | dict[Literal['$gte'], int | float] | dict[Literal['$lt'], int | float] | dict[Literal['$lte'], int | float] | dict[Literal['$in'], list[str | int | float | bool]] | dict[Literal['$nin'], list[str | int | float | bool]] | dict[Literal['$exists'], bool]]] | dict[Literal['$or'], list[dict[str, str | int | float | bool] | dict[Literal['$eq'], str | int | float | bool] | dict[Literal['$ne'], str | int | float | bool] | dict[Literal['$gt'], int | float] | dict[Literal['$gte'], int | float] | dict[Literal['$lt'], int | float] | dict[Literal['$lte'], int | float] | dict[Literal['$in'], list[str | int | float | bool]] | dict[Literal['$nin'], list[str | int | float | bool]] | dict[Literal['$exists'], bool]]] | None = None, **kwargs) dict[str, Any]
Parameters:
  • ids (list[str]) – Vector ids to delete [optional]

  • delete_all (bool) – This indicates that all vectors in the index namespace should be deleted.. [optional] Default is False.

  • namespace (str) – The namespace to delete vectors from [optional] If not specified, the default namespace is used.

  • filter (dict[str, Union[str, float, int, bool, List, dict]]) – If specified, the metadata filter here will be used to select the vectors to delete. This is mutually exclusive with specifying ids to delete in the ids param or using delete_all=True. See metadata filtering <https://www.pinecone.io/docs/metadata-filtering/>_ [optional]

The Delete operation deletes vectors from the index, from a single namespace.

No error is raised if the vector id does not exist.

Note: For any delete call, if namespace is not specified, the default namespace “” is used. Since the delete operation does not error when ids are not present, this means you may not receive an error if you delete from the wrong namespace.

Delete can occur in the following mutual exclusive ways:

  1. Delete by ids from a single namespace

  2. Delete all vectors from a single namespace by setting delete_all to True

  3. Delete all vectors from a single namespace by specifying a metadata filter

    (note that for this option delete all must be set to False)

import asyncio
from pinecone import Pinecone, Vector, SparseValues

async def main():
    pc = Pinecone()
    async with pc.IndexAsyncio(host="example-dojoi3u.svc.aped-4627-b74a.pinecone.io") as idx:
        # Delete specific ids
        await idx.delete(
            ids=['id1', 'id2'],
            namespace='my_namespace'
        )

        # Delete everything in a namespace
        await idx.delete(
            delete_all=True,
            namespace='my_namespace'
        )

        # Delete by metadata filter
        await idx.delete(
            filter={'key': 'value'},
            namespace='my_namespace'
        )

asyncio.run(main())

Returns: An empty dictionary if the delete operation was successful.

IndexAsyncio.fetch(ids: list[str], namespace: str | None = None, **kwargs) FetchResponse

The fetch operation looks up and returns vectors, by ID, from a single namespace. The returned vectors include the vector data and/or metadata.

import asyncio
from pinecone import Pinecone, Vector, SparseValues

async def main():
    pc = Pinecone()
    async with pc.IndexAsyncio(host="example-dojoi3u.svc.aped-4627-b74a.pinecone.io") as idx:
        # Fetch specific ids in namespace
        fetched = await idx.fetch(
            ids=['id1', 'id2'],
            namespace='my_namespace'
        )
        for vec_id in fetched.vectors:
            vector = fetched.vectors[vec_id]
            print(vector.id)
            print(vector.metadata)
            print(vector.values)

asyncio.run(main())
Parameters:
  • ids (list[str]) – The vector IDs to fetch.

  • namespace (str) – The namespace to fetch vectors from. If not specified, the default namespace is used. [optional]

Returns: FetchResponse object which contains the list of Vector objects, and namespace name.

IndexAsyncio.list(**kwargs) AsyncIterator[list[str]]

The list operation accepts all of the same arguments as list_paginated, and returns a generator that yields a list of the matching vector ids in each page of results. It automatically handles pagination tokens on your behalf.

Examples

>>> for ids in index.list(prefix='99', limit=5, namespace='my_namespace'):
>>>     print(ids)
['99', '990', '991', '992', '993']
['994', '995', '996', '997', '998']
['999']
Parameters:
  • prefix (Optional[str]) – The id prefix to match. If unspecified, an empty string prefix will be used with the effect of listing all ids in a namespace [optional]

  • limit (Optional[int]) – The maximum number of ids to return. If unspecified, the server will use a default value. [optional]

  • pagination_token (Optional[str]) – A token needed to fetch the next page of results. This token is returned in the response if additional results are available. [optional]

  • namespace (Optional[str]) – The namespace to fetch vectors from. If not specified, the default namespace is used. [optional]

IndexAsyncio.list_paginated(prefix: str | None = None, limit: int | None = None, pagination_token: str | None = None, namespace: str | None = None, **kwargs) ListResponse

The list_paginated operation finds vectors based on an id prefix within a single namespace. It returns matching ids in a paginated form, with a pagination token to fetch the next page of results. This id list can then be passed to fetch or delete operations, depending on your use case.

Consider using the list method to avoid having to handle pagination tokens manually.

Examples

>>> results = index.list_paginated(prefix='99', limit=5, namespace='my_namespace')
>>> [v.id for v in results.vectors]
['99', '990', '991', '992', '993']
>>> results.pagination.next
eyJza2lwX3Bhc3QiOiI5OTMiLCJwcmVmaXgiOiI5OSJ9
>>> next_results = index.list_paginated(prefix='99', limit=5, namespace='my_namespace', pagination_token=results.pagination.next)
Parameters:
  • prefix (Optional[str]) – The id prefix to match. If unspecified, an empty string prefix will be used with the effect of listing all ids in a namespace [optional]

  • limit (Optional[int]) – The maximum number of ids to return. If unspecified, the server will use a default value. [optional]

  • pagination_token (Optional[str]) – A token needed to fetch the next page of results. This token is returned in the response if additional results are available. [optional]

  • namespace (Optional[str]) – The namespace to fetch vectors from. If not specified, the default namespace is used. [optional]

Returns: ListResponse object which contains the list of ids, the namespace name, pagination information, and usage showing the number of read_units consumed.

IndexAsyncio.fetch_by_metadata(filter: dict[str, str | int | float | bool] | dict[Literal['$eq'], str | int | float | bool] | dict[Literal['$ne'], str | int | float | bool] | dict[Literal['$gt'], int | float] | dict[Literal['$gte'], int | float] | dict[Literal['$lt'], int | float] | dict[Literal['$lte'], int | float] | dict[Literal['$in'], list[str | int | float | bool]] | dict[Literal['$nin'], list[str | int | float | bool]] | dict[Literal['$exists'], bool] | dict[Literal['$and'], list[dict[str, str | int | float | bool] | dict[Literal['$eq'], str | int | float | bool] | dict[Literal['$ne'], str | int | float | bool] | dict[Literal['$gt'], int | float] | dict[Literal['$gte'], int | float] | dict[Literal['$lt'], int | float] | dict[Literal['$lte'], int | float] | dict[Literal['$in'], list[str | int | float | bool]] | dict[Literal['$nin'], list[str | int | float | bool]] | dict[Literal['$exists'], bool]]] | dict[Literal['$or'], list[dict[str, str | int | float | bool] | dict[Literal['$eq'], str | int | float | bool] | dict[Literal['$ne'], str | int | float | bool] | dict[Literal['$gt'], int | float] | dict[Literal['$gte'], int | float] | dict[Literal['$lt'], int | float] | dict[Literal['$lte'], int | float] | dict[Literal['$in'], list[str | int | float | bool]] | dict[Literal['$nin'], list[str | int | float | bool]] | dict[Literal['$exists'], bool]]], namespace: str | None = None, limit: int | None = None, pagination_token: str | None = None, **kwargs) FetchByMetadataResponse

Fetch vectors by metadata filter.

Look up and return vectors by metadata filter from a single namespace. The returned vectors include the vector data and/or metadata.

Examples:

import asyncio
from pinecone import Pinecone

async def main():
    pc = Pinecone()
    async with pc.IndexAsyncio(host="example-host") as idx:
        result = await idx.fetch_by_metadata(
            filter={'genre': {'$in': ['comedy', 'drama']}, 'year': {'$eq': 2019}},
            namespace='my_namespace',
            limit=50
        )
        for vec_id in result.vectors:
            vector = result.vectors[vec_id]
            print(vector.id)
            print(vector.metadata)

asyncio.run(main())
Parameters:
  • filter (dict[str, str | float | int | bool | List | dict]) – Metadata filter expression to select vectors. See metadata filtering <https://www.pinecone.io/docs/metadata-filtering/>_

  • namespace (str) – The namespace to fetch vectors from. If not specified, the default namespace is used. [optional]

  • limit (int) – Max number of vectors to return. Defaults to 100. [optional]

  • pagination_token (str) – Pagination token to continue a previous listing operation. [optional]

Returns:

Object containing the fetched vectors, namespace, usage, and pagination token.

Return type:

FetchByMetadataResponse

IndexAsyncio.update(id: str | None = None, values: list[float] | None = None, set_metadata: dict[str, str | int | float | list[str] | list[int] | list[float]] | None = None, namespace: str | None = None, sparse_values: SparseValues | SparseVectorTypedDict | None = None, filter: dict[str, str | int | float | bool] | dict[Literal['$eq'], str | int | float | bool] | dict[Literal['$ne'], str | int | float | bool] | dict[Literal['$gt'], int | float] | dict[Literal['$gte'], int | float] | dict[Literal['$lt'], int | float] | dict[Literal['$lte'], int | float] | dict[Literal['$in'], list[str | int | float | bool]] | dict[Literal['$nin'], list[str | int | float | bool]] | dict[Literal['$exists'], bool] | dict[Literal['$and'], list[dict[str, str | int | float | bool] | dict[Literal['$eq'], str | int | float | bool] | dict[Literal['$ne'], str | int | float | bool] | dict[Literal['$gt'], int | float] | dict[Literal['$gte'], int | float] | dict[Literal['$lt'], int | float] | dict[Literal['$lte'], int | float] | dict[Literal['$in'], list[str | int | float | bool]] | dict[Literal['$nin'], list[str | int | float | bool]] | dict[Literal['$exists'], bool]]] | dict[Literal['$or'], list[dict[str, str | int | float | bool] | dict[Literal['$eq'], str | int | float | bool] | dict[Literal['$ne'], str | int | float | bool] | dict[Literal['$gt'], int | float] | dict[Literal['$gte'], int | float] | dict[Literal['$lt'], int | float] | dict[Literal['$lte'], int | float] | dict[Literal['$in'], list[str | int | float | bool]] | dict[Literal['$nin'], list[str | int | float | bool]] | dict[Literal['$exists'], bool]]] | None = None, dry_run: bool | None = None, **kwargs) UpdateResponse

The Update operation updates vectors in a namespace.

This method supports two update modes:

  1. Single vector update by ID: Provide id to update a specific vector. - Updates the vector with the given ID - If values is included, it will overwrite the previous vector values - If set_metadata is included, the metadata will be merged with existing metadata on the vector.

    Fields specified in set_metadata will overwrite existing fields with the same key, while fields not in set_metadata will remain unchanged.

  2. Bulk update by metadata filter: Provide filter to update all vectors matching the filter criteria. - Updates all vectors in the namespace that match the filter expression - Useful for updating metadata across multiple vectors at once - If set_metadata is included, the metadata will be merged with existing metadata on each vector.

    Fields specified in set_metadata will overwrite existing fields with the same key, while fields not in set_metadata will remain unchanged.

    • The response includes matched_records indicating how many vectors were updated

Either id or filter must be provided (but not both in the same call).

Examples:

Single vector update by ID:

import asyncio
from pinecone import Pinecone, Vector, SparseValues

async def main():
    pc = Pinecone()
    async with pc.IndexAsyncio(host="example-dojoi3u.svc.aped-4627-b74a.pinecone.io") as idx:
        # Update vector values
        await idx.update(
            id='id1',
            values=[0.1, 0.2, 0.3, ...],
            namespace='my_namespace'
        )

        # Update metadata
        await idx.update(
            id='id1',
            set_metadata={'key': 'value'},
            namespace='my_namespace'
        )

        # Update sparse values
        await idx.update(
            id='id1',
            sparse_values={'indices': [1, 2], 'values': [0.2, 0.4]},
            namespace='my_namespace'
        )

        # Update sparse values with SparseValues object
        await idx.update(
            id='id1',
            sparse_values=SparseValues(indices=[234781, 5432], values=[0.2, 0.4]),
            namespace='my_namespace'
        )

Bulk update by metadata filter:

        # Update metadata for all vectors matching the filter
        response = await idx.update(
            set_metadata={'status': 'active'},
            filter={'genre': {'$eq': 'drama'}},
            namespace='my_namespace'
        )
        print(f"Updated {response.matched_records} vectors")
        # Preview how many vectors would be updated (dry run)
        response = await idx.update(
            set_metadata={'status': 'active'},
            filter={'genre': {'$eq': 'drama'}},
            namespace='my_namespace',
            dry_run=True
        )
        print(f"Would update {response.matched_records} vectors")

asyncio.run(main())
Parameters:
  • id (str) – Vector’s unique id. Required for single vector updates. Must not be provided when using filter. [optional]

  • values (list[float]) – Vector values to set. [optional]

  • set_metadata (dict[str, Union[str, float, int, bool, list[int], list[float], list[str]]]]) – Metadata to merge with existing metadata on the vector(s). Fields specified will overwrite existing fields with the same key, while fields not specified will remain unchanged. [optional]

  • namespace (str) – Namespace name where to update the vector(s). [optional]

  • sparse_values – (dict[str, Union[list[float], list[int]]]): Sparse values to update for the vector. Expected to be either a SparseValues object or a dict of the form: {‘indices’: list[int], ‘values’: list[float]} where the lists each have the same length. [optional]

  • filter (dict[str, Union[str, float, int, bool, List, dict]]) – A metadata filter expression. When provided, updates all vectors in the namespace that match the filter criteria. See metadata filtering <https://www.pinecone.io/docs/metadata-filtering/>_. Must not be provided when using id. Either id or filter must be provided. [optional]

  • dry_run (bool) – If True, return the number of records that match the filter without executing the update. Only meaningful when using filter (not with id). Useful for previewing the impact of a bulk update before applying changes. Defaults to False. [optional]

Returns:

An UpdateResponse object. When using filter-based updates, the response includes matched_records indicating the number of vectors that were updated (or would be updated if dry_run=True).

Return type:

UpdateResponse

IndexAsyncio.upsert_from_dataframe(df, namespace: str | None = None, batch_size: int = 500, show_progress: bool = True)

This method has not been implemented yet for the IndexAsyncio class.

Bulk Import

async IndexAsyncio.start_import(uri: str, integration_id: str | None = None, error_mode: Literal['CONTINUE', 'ABORT'] | None = 'CONTINUE') StartImportResponse
Parameters:
  • uri (str) – The URI of the data to import. The URI must start with the scheme of a supported storage provider.

  • integration_id (Optional[str], optional) – If your bucket requires authentication to access, you need to pass the id of your storage integration using this property. Defaults to None.

  • error_mode – Defaults to “CONTINUE”. If set to “CONTINUE”, the import operation will continue even if some records fail to import. Pass “ABORT” to stop the import operation if any records fail to import.

Returns:

Contains the id of the import operation.

Return type:

StartImportResponse

Import data from a storage provider into an index. The uri must start with the scheme of a supported storage provider. For buckets that are not publicly readable, you will also need to separately configure a storage integration and pass the integration id.

Examples

>>> from pinecone import Pinecone
>>> index = Pinecone().IndexAsyncio(host="example-index.svc.aped-4627-b74a.pinecone.io")
>>> await index.start_import(uri="s3://bucket-name/path/to/data.parquet")
{ id: "1" }
async IndexAsyncio.list_imports(**kwargs) AsyncIterator['ImportModel']
Parameters:
  • limit (Optional[int]) – The maximum number of operations to fetch in each network call. If unspecified, the server will use a default value. [optional]

  • pagination_token (Optional[str]) – When there are multiple pages of results, a pagination token is returned in the response. The token can be used to fetch the next page of results. [optional]

Returns an async generator that yields each import operation. It automatically handles pagination tokens on your behalf so you can easily iterate over all results. The list_imports method accepts all of the same arguments as list_imports_paginated

```python async for op in index.list_imports():

print(op)

```

async IndexAsyncio.list_imports_paginated(limit: int | None = None, pagination_token: str | None = None, **kwargs) ListImportsResponse
Parameters:
  • limit (Optional[int]) – The maximum number of ids to return. If unspecified, the server will use a default value. [optional]

  • pagination_token (Optional[str]) – A token needed to fetch the next page of results. This token is returned in the response if additional results are available. [optional]

Returns:

ListImportsResponse object which contains the list of operations as ImportModel objects, pagination information,

and usage showing the number of read_units consumed.

The list_imports_paginated operation returns information about import operations. It returns operations in a paginated form, with a pagination token to fetch the next page of results.

Consider using the list_imports method to avoid having to handle pagination tokens manually.

Examples

>>> results = await index.list_imports_paginated(limit=5)
>>> results.pagination.next
eyJza2lwX3Bhc3QiOiI5OTMiLCJwcmVmaXgiOiI5OSJ9
>>> results.data[0]
{
    "id": "6",
    "uri": "s3://dev-bulk-import-datasets-pub/10-records-dim-10/",
    "status": "Completed",
    "percent_complete": 100.0,
    "records_imported": 10,
    "created_at": "2024-09-06T14:52:02.567776+00:00",
    "finished_at": "2024-09-06T14:52:28.130717+00:00"
}
>>> next_results = await index.list_imports_paginated(limit=5, pagination_token=results.pagination.next)
async IndexAsyncio.describe_import(id: str) ImportModel
Parameters:
  • id (str) – The id of the import operation. This value is returned when

  • import (starting an)

  • list_imports. (and can be looked up using)

Returns:

An object containing operation id, status, and other details.

Return type:

ImportModel

describe_import is used to get detailed information about a specific import operation.

async IndexAsyncio.cancel_import(id: str)

Cancel an import operation.

Parameters:

id (str) – The id of the import operation to cancel.

Records

If you have created an index using integrated inference, you can use the following methods to search and retrieve records.

async IndexAsyncio.upsert_records(namespace: str, records: List[Dict]) UpsertResponse
Parameters:
  • namespace (str, required) – The namespace of the index to upsert records to.

  • records (list[Dict], required) – The records to upsert into the index.

Upsert records to a namespace. A record is a dictionary that contains eitiher an id or _id field along with other fields that will be stored as metadata. The id or _id field is used as the unique identifier for the record. At least one field in the record should correspond to a field mapping in the index’s embed configuration.

When records are upserted, Pinecone converts mapped fields into embeddings and upserts them into the specified namespacce of the index.

import asyncio
from pinecone import (
    Pinecone,
    CloudProvider,
    AwsRegion,
    EmbedModel
    IndexEmbed
)

async def main():
    pc = Pinecone()
    async with pc.IndexAsyncio(host="example-dojoi3u.svc.aped-4627-b74a.pinecone.io") as idx:
        # upsert records
        await idx.upsert_records(
            namespace="my-namespace",
            records=[
                {
                    "_id": "test1",
                    "my_text_field": "Apple is a popular fruit known for its sweetness and crisp texture.",
                },
                {
                    "_id": "test2",
                    "my_text_field": "The tech company Apple is known for its innovative products like the iPhone.",
                },
                {
                    "_id": "test3",
                    "my_text_field": "Many people enjoy eating apples as a healthy snack.",
                },
                {
                    "_id": "test4",
                    "my_text_field": "Apple Inc. has revolutionized the tech industry with its sleek designs and user-friendly interfaces.",
                },
                {
                    "_id": "test5",
                    "my_text_field": "An apple a day keeps the doctor away, as the saying goes.",
                },
                {
                    "_id": "test6",
                    "my_text_field": "Apple Computer Company was founded on April 1, 1976, by Steve Jobs, Steve Wozniak, and Ronald Wayne as a partnership.",
                },
            ],
        )

        from pinecone import SearchQuery, SearchRerank, RerankModel

        # search for similar records
        response = await idx.search_records(
            namespace="my-namespace",
            query=SearchQuery(
                inputs={
                    "text": "Apple corporation",
                },
                top_k=3,
            ),
            rerank=SearchRerank(
                model=RerankModel.Bge_Reranker_V2_M3,
                rank_fields=["my_text_field"],
                top_n=3,
            ),
        )

asyncio.run(main())
async IndexAsyncio.search(namespace: str, query: SearchQueryTypedDict | SearchQuery, rerank: SearchRerankTypedDict | SearchRerank | None = None, fields: List[str] | None = ['*']) SearchRecordsResponse
Parameters:
  • namespace (str, required) – The namespace in the index to search.

  • query (Union[Dict, SearchQuery], required) – The SearchQuery to use for the search. The query can include a match_terms field to specify which terms must be present in the text of each search hit. The match_terms should be a dict with strategy (str) and terms (list[str]) keys, e.g. {"strategy": "all", "terms": ["term1", "term2"]}. Currently only “all” strategy is supported, which means all specified terms must be present. Note: match_terms is only supported for sparse indexes with integrated embedding configured to use the pinecone-sparse-english-v0 model.

  • rerank (Union[Dict, SearchRerank], optional) – The SearchRerank to use with the search request.

Returns:

The records that match the search.

Search for records.

This operation converts a query to a vector embedding and then searches a namespace. You can optionally provide a reranking operation as part of the search.

import asyncio
from pinecone import (
    Pinecone,
    CloudProvider,
    AwsRegion,
    EmbedModel
    IndexEmbed
)

async def main():
    pc = Pinecone()
    async with pc.IndexAsyncio(host="example-dojoi3u.svc.aped-4627-b74a.pinecone.io") as idx:
        # upsert records
        await idx.upsert_records(
            namespace="my-namespace",
            records=[
                {
                    "_id": "test1",
                    "my_text_field": "Apple is a popular fruit known for its sweetness and crisp texture.",
                },
                {
                    "_id": "test2",
                    "my_text_field": "The tech company Apple is known for its innovative products like the iPhone.",
                },
                {
                    "_id": "test3",
                    "my_text_field": "Many people enjoy eating apples as a healthy snack.",
                },
                {
                    "_id": "test4",
                    "my_text_field": "Apple Inc. has revolutionized the tech industry with its sleek designs and user-friendly interfaces.",
                },
                {
                    "_id": "test5",
                    "my_text_field": "An apple a day keeps the doctor away, as the saying goes.",
                },
                {
                    "_id": "test6",
                    "my_text_field": "Apple Computer Company was founded on April 1, 1976, by Steve Jobs, Steve Wozniak, and Ronald Wayne as a partnership.",
                },
            ],
        )

        from pinecone import SearchQuery, SearchRerank, RerankModel

        # search for similar records
        response = await idx.search_records(
            namespace="my-namespace",
            query=SearchQuery(
                inputs={
                    "text": "Apple corporation",
                },
                top_k=3,
            ),
            rerank=SearchRerank(
                model=RerankModel.Bge_Reranker_V2_M3,
                rank_fields=["my_text_field"],
                top_n=3,
            ),
        )

asyncio.run(main())
async IndexAsyncio.search_records(namespace: str, query: SearchQueryTypedDict | SearchQuery, rerank: SearchRerankTypedDict | SearchRerank | None = None, fields: List[str] | None = ['*']) SearchRecordsResponse

Alias of the search() method.

Namespaces

IndexAsyncio.create_namespace(name: str, schema: dict[str, Any] | None = None, **kwargs) NamespaceDescription

Create a namespace in a serverless index.

Parameters:
  • name (str) – The name of the namespace to create

  • schema (Optional[dict[str, Any]]) – Optional schema configuration for the namespace as a dictionary. [optional]

Returns:

Information about the created namespace including vector count

Return type:

NamespaceDescription

Create a namespace in a serverless index. For guidance and examples, see Manage namespaces.

Note: This operation is not supported for pod-based indexes.

Examples

>>> # Create a namespace with just a name
>>> import asyncio
>>> from pinecone import Pinecone
>>>
>>> async def main():
...     pc = Pinecone()
...     async with pc.IndexAsyncio(host="example-index-dojoi3u.svc.eu-west1-gcp.pinecone.io") as idx:
...         namespace = await idx.create_namespace(name="my-namespace")
...         print(f"Created namespace: {namespace.name}, Vector count: {namespace.vector_count}")
>>>
>>> asyncio.run(main())

>>> # Create a namespace with schema configuration
>>> from pinecone.core.openapi.db_data.model.create_namespace_request_schema import CreateNamespaceRequestSchema
>>> schema = CreateNamespaceRequestSchema(fields={...})
>>> namespace = await idx.create_namespace(name="my-namespace", schema=schema)
IndexAsyncio.describe_namespace(namespace: str, **kwargs) NamespaceDescription

Describe a namespace within an index, showing the vector count within the namespace.

Parameters:

namespace (str) – The namespace to describe

Returns:

Information about the namespace including vector count

Return type:

NamespaceDescription

IndexAsyncio.delete_namespace(namespace: str, **kwargs) dict[str, Any]

Delete a namespace from an index.

Parameters:

namespace (str) – The namespace to delete

Returns:

Response from the delete operation

Return type:

dict[str, Any]

IndexAsyncio.list_namespaces(limit: int | None = None, **kwargs) AsyncIterator[ListNamespacesResponse]

List all namespaces in an index. This method automatically handles pagination to return all results.

Parameters:

limit (Optional[int]) – The maximum number of namespaces to return. If unspecified, the server will use a default value. [optional]

Returns:

Object containing the list of namespaces.

Return type:

ListNamespacesResponse

Examples

IndexAsyncio.list_namespaces_paginated(limit: int | None = None, pagination_token: str | None = None, **kwargs) ListNamespacesResponse

List all namespaces in an index with pagination support. The response includes pagination information if there are more results available.

Consider using the list_namespaces method to avoid having to handle pagination tokens manually.

Parameters:
  • limit (Optional[int]) – The maximum number of namespaces to return. If unspecified, the server will use a default value. [optional]

  • pagination_token (Optional[str]) – A token needed to fetch the next page of results. This token is returned in the response if additional results are available. [optional]

Returns:

Object containing the list of namespaces and pagination information.

Return type:

ListNamespacesResponse

Examples

Inference

async AsyncioInference.embed(model: str, inputs: str | list[Dict] | list[str], parameters: dict[str, Any] | None = None) EmbeddingsList[source]

Generates embeddings for the provided inputs using the specified model and (optional) parameters.

Parameters:
  • model (str, required) – The model to use for generating embeddings.

  • inputs (list, required) – A list of items to generate embeddings for.

  • parameters (dict, optional) – A dictionary of parameters to use when generating embeddings.

Returns:

EmbeddingsList object with keys data, model, and usage. The data key contains a list of n embeddings, where n = len(inputs). Precision of returned embeddings is either float16 or float32, with float32 being the default. model key is the model used to generate the embeddings. usage key contains the total number of tokens used at request-time.

Return type:

EmbeddingsList

import asyncio
from pinecone import PineconeAsyncio

async def main():
    async with PineconeAsyncio() as pc:
        inputs = ["Who created the first computer?"]
        outputs = await pc.inference.embed(
            model="multilingual-e5-large",
            inputs=inputs,
            parameters={"input_type": "passage", "truncate": "END"}
        )
        print(outputs)
        # EmbeddingsList(
        #     model='multilingual-e5-large',
        #     data=[
        #         {'values': [0.1, ...., 0.2]},
        #     ],
        #     usage={'total_tokens': 6}
        # )

asyncio.run(main())

You can also use a single string input:

import asyncio
from pinecone import PineconeAsyncio

async def main():
    async with PineconeAsyncio() as pc:
        output = await pc.inference.embed(
            model="text-embedding-3-small",
            inputs="Hello, world!"
        )

asyncio.run(main())

Or use the EmbedModel enum:

import asyncio
from pinecone import PineconeAsyncio
from pinecone.inference import EmbedModel

async def main():
    async with PineconeAsyncio() as pc:
        outputs = await pc.inference.embed(
            model=EmbedModel.TEXT_EMBEDDING_3_SMALL,
            inputs=["Document 1", "Document 2"]
        )

asyncio.run(main())
async AsyncioInference.rerank(model: str, query: str, documents: list[str] | list[dict[str, Any]], rank_fields: list[str] = ['text'], return_documents: bool = True, top_n: int | None = None, parameters: dict[str, Any] | None = None) RerankResult[source]

Rerank documents with associated relevance scores that represent the relevance of each document to the provided query using the specified model.

Parameters:
  • model (str, required) – The model to use for reranking.

  • query (str, required) – The query to compare with documents.

  • documents (list, required) – A list of documents or strings to rank.

  • rank_fields (list, optional) – A list of document fields to use for ranking. Defaults to [“text”].

  • return_documents (bool, optional) – Whether to include the documents in the response. Defaults to True.

  • top_n (int, optional) – How many documents to return. Defaults to len(documents).

  • parameters (dict, optional) – A dictionary of parameters to use when ranking documents.

Returns:

RerankResult object with keys data and usage. The data key contains a list of n documents, where n = top_n. The documents are sorted in order of relevance, with the first being the most relevant. The index field can be used to locate the document relative to the list of documents specified in the request. Each document contains a score key representing how close the document relates to the query.

Return type:

RerankResult

import asyncio
from pinecone import PineconeAsyncio

async def main():
    async with PineconeAsyncio() as pc:
        result = await pc.inference.rerank(
            model="bge-reranker-v2-m3",
            query="Tell me about tech companies",
            documents=[
                "Apple is a popular fruit known for its sweetness and crisp texture.",
                "Software is still eating the world.",
                "Many people enjoy eating apples as a healthy snack.",
                "Acme Inc. has revolutionized the tech industry with its sleek designs and user-friendly interfaces.",
                "An apple a day keeps the doctor away, as the saying goes.",
            ],
            top_n=2,
            return_documents=True,
        )
        print(result)
        # RerankResult(
        #     model='bge-reranker-v2-m3',
        #     data=[
        #         { index=3, score=0.020980744,
        #           document={text="Acme Inc. has rev..."} },
        #         { index=1, score=0.00034015716,
        #           document={text="Software is still..."} }
        #     ],
        #     usage={'rerank_units': 1}
        # )

asyncio.run(main())

You can also use document dictionaries with custom fields:

import asyncio
from pinecone import PineconeAsyncio

async def main():
    async with PineconeAsyncio() as pc:
        result = await pc.inference.rerank(
            model="pinecone-rerank-v0",
            query="What is machine learning?",
            documents=[
                {"text": "Machine learning is a subset of AI.", "category": "tech"},
                {"text": "Cooking recipes for pasta.", "category": "food"},
            ],
            rank_fields=["text"],
            top_n=1
        )

asyncio.run(main())

Or use the RerankModel enum:

import asyncio
from pinecone import PineconeAsyncio
from pinecone.inference import RerankModel

async def main():
    async with PineconeAsyncio() as pc:
        result = await pc.inference.rerank(
            model=RerankModel.PINECONE_RERANK_V0,
            query="Your query here",
            documents=["doc1", "doc2", "doc3"]
        )

asyncio.run(main())
AsyncioInference.list_models(*, type: str | None = None, vector_type: str | None = None) ModelInfoList[source]

List all available models.

Parameters:
  • type (str, optional) – The type of model to list. Either “embed” or “rerank”.

  • vector_type (str, optional) – The type of vector to list. Either “dense” or “sparse”.

Returns:

A list of models.

Return type:

ModelInfoList

import asyncio
from pinecone import PineconeAsyncio

async def main():
    async with PineconeAsyncio() as pc:
        # List all models
        models = await pc.inference.list_models()

        # List models, with model type filtering
        models = await pc.inference.list_models(type="embed")
        models = await pc.inference.list_models(type="rerank")

        # List models, with vector type filtering
        models = await pc.inference.list_models(vector_type="dense")
        models = await pc.inference.list_models(vector_type="sparse")

        # List models, with both type and vector type filtering
        models = await pc.inference.list_models(type="rerank", vector_type="dense")

asyncio.run(main())
AsyncioInference.get_model(model_name: str) ModelInfo[source]

Get details on a specific model.

Parameters:

model_name (str, required) – The name of the model to get details on.

Returns:

A ModelInfo object.

Return type:

ModelInfo

import asyncio
from pinecone import PineconeAsyncio

async def main():
    async with PineconeAsyncio() as pc:
        model_info = await pc.inference.get_model(model_name="text-embedding-3-small")
        print(model_info)
        # {
        #     "model": "text-embedding-3-small",
        #     "short_description": "...",
        #     "type": "embed",
        #     ...
        # }

asyncio.run(main())