Skip to content

Pydantic AI

Use pydynox with Pydantic AI to build AI agents backed by DynamoDB.

Key features

  • Type-safe tools with Pydantic validation
  • Dependency injection for shared state
  • Full async support with pydynox async methods
  • S3 storage for large documents
  • Works with Amazon Bedrock models

Getting started

Installation

pip install pydynox pydantic-ai

Full example

"""Pydantic AI integration with pydynox.

Use case: Document management agent with S3 storage.
Uses async methods for better performance.
"""

from __future__ import annotations

from dataclasses import dataclass

from pydantic_ai import Agent
from pydantic_ai.models.bedrock import BedrockConverseModel
from pydynox import DynamoDBClient, Model, ModelConfig, set_default_client
from pydynox.attributes import NumberAttribute, S3Attribute, S3File, StringAttribute

# Create client
client = DynamoDBClient(region="us-east-1")
set_default_client(client)


# Define models
class Document(Model):
    model_config = ModelConfig(table="documents")

    pk = StringAttribute(hash_key=True)  # DOC#<id>
    sk = StringAttribute(range_key=True)  # VERSION#<version>
    title = StringAttribute()
    author = StringAttribute()
    size_bytes = NumberAttribute(default=0)
    content = S3Attribute(bucket="my-docs-bucket", prefix="documents/")


class Project(Model):
    model_config = ModelConfig(table="projects")

    pk = StringAttribute(hash_key=True)  # PROJECT#<id>
    sk = StringAttribute(range_key=True)  # METADATA
    name = StringAttribute()
    owner = StringAttribute()
    doc_count = NumberAttribute(default=0)


# Dependencies for the agent
@dataclass
class DocAgentDeps:
    """Dependencies passed to all tools."""

    client: DynamoDBClient


# Create the agent with Bedrock
model = BedrockConverseModel(
    model_name="us.anthropic.claude-sonnet-4-20250514-v1:0",
)

agent = Agent(
    model,
    deps_type=DocAgentDeps,
    system_prompt="""You are a document management assistant.
You can search documents, get document info, and manage projects.
Always provide clear information about document sizes and versions.""",
)


@agent.tool
async def search_documents(ctx, query: str, limit: int = 10) -> list:
    """Search documents by title.

    Args:
        ctx: The run context with dependencies.
        query: Search term to match in document titles.
        limit: Maximum results to return.

    Returns:
        List of matching documents with title, author, and size.
    """
    scan_result = Document.async_scan(
        filter_condition=Document.title.contains(query),
        limit=limit,
    )
    docs = [doc async for doc in scan_result]

    return [
        {
            "doc_id": doc.pk.replace("DOC#", ""),
            "title": doc.title,
            "author": doc.author,
            "size_kb": round(doc.size_bytes / 1024, 2),
        }
        for doc in docs
    ]


@agent.tool
async def get_document(ctx, doc_id: str, version: str = "latest") -> dict:
    """Get document details and download URL.

    Args:
        ctx: The run context with dependencies.
        doc_id: The document ID.
        version: Version number or "latest".

    Returns:
        Document info with download URL.
    """
    if version == "latest":
        query_result = Document.async_query(
            hash_key=f"DOC#{doc_id}",
            scan_index_forward=False,
            limit=1,
        )
        docs = [doc async for doc in query_result]
        if not docs:
            return {"error": f"Document {doc_id} not found"}
        doc = docs[0]
    else:
        doc = await Document.async_get(pk=f"DOC#{doc_id}", sk=f"VERSION#{version}")
        if not doc:
            return {"error": f"Document {doc_id} version {version} not found"}

    return {
        "doc_id": doc_id,
        "version": doc.sk.replace("VERSION#", ""),
        "title": doc.title,
        "author": doc.author,
        "size_kb": round(doc.size_bytes / 1024, 2),
        "s3_key": doc.content.key if doc.content else None,
    }


@agent.tool
async def list_document_versions(ctx, doc_id: str) -> list:
    """List all versions of a document.

    Args:
        ctx: The run context with dependencies.
        doc_id: The document ID.

    Returns:
        List of versions with size and author.
    """
    query_result = Document.async_query(
        hash_key=f"DOC#{doc_id}",
        scan_index_forward=False,
    )
    docs = [doc async for doc in query_result]

    return [
        {
            "version": doc.sk.replace("VERSION#", ""),
            "author": doc.author,
            "size_kb": round(doc.size_bytes / 1024, 2),
        }
        for doc in docs
    ]


@agent.tool
async def get_project_stats(ctx, project_id: str) -> dict:
    """Get project statistics.

    Args:
        ctx: The run context with dependencies.
        project_id: The project ID.

    Returns:
        Project info with document count.
    """
    project = await Project.async_get(pk=f"PROJECT#{project_id}", sk="METADATA")

    if not project:
        return {"error": f"Project {project_id} not found"}

    return {
        "project_id": project_id,
        "name": project.name,
        "owner": project.owner,
        "document_count": project.doc_count,
    }


@agent.tool
async def upload_document(
    ctx,
    doc_id: str,
    title: str,
    author: str,
    content: str,
    version: str = "1",
) -> dict:
    """Upload a new document.

    Args:
        ctx: The run context with dependencies.
        doc_id: The document ID.
        title: Document title.
        author: Author name.
        content: Document content as text.
        version: Version number.

    Returns:
        Upload confirmation with S3 location.
    """
    content_bytes = content.encode("utf-8")

    doc = Document(
        pk=f"DOC#{doc_id}",
        sk=f"VERSION#{version}",
        title=title,
        author=author,
        size_bytes=len(content_bytes),
    )
    doc.content = S3File(content_bytes, name=f"{doc_id}.txt", content_type="text/plain")
    await doc.async_save()

    return {
        "success": True,
        "doc_id": doc_id,
        "version": version,
        "s3_key": doc.content.key,
        "size_kb": round(len(content_bytes) / 1024, 2),
    }


# Example usage
if __name__ == "__main__":
    import asyncio

    def create_tables():
        """Create DynamoDB tables if they don't exist."""
        if not client.table_exists("documents"):
            client.create_table(
                table_name="documents",
                hash_key=("pk", "S"),
                range_key=("sk", "S"),
                wait=True,
            )
            print("Table 'documents' created!")

        if not client.table_exists("projects"):
            client.create_table(
                table_name="projects",
                hash_key=("pk", "S"),
                range_key=("sk", "S"),
                wait=True,
            )
            print("Table 'projects' created!")

    async def seed_data():
        """Insert sample documents for testing."""
        sample_docs = [
            Document(
                pk="DOC#001",
                sk="VERSION#1",
                title="Q1 2025 quarterly report",
                author="Maria Silva",
                size_bytes=15360,
            ),
            Document(
                pk="DOC#001",
                sk="VERSION#2",
                title="Q1 2025 quarterly report - revised",
                author="Maria Silva",
                size_bytes=18432,
            ),
            Document(
                pk="DOC#002",
                sk="VERSION#1",
                title="Q4 2024 quarterly report",
                author="João Santos",
                size_bytes=12288,
            ),
            Document(
                pk="DOC#003",
                sk="VERSION#1",
                title="Annual budget proposal",
                author="Ana Costa",
                size_bytes=25600,
            ),
        ]

        sample_projects = [
            Project(
                pk="PROJECT#finance",
                sk="METADATA",
                name="Finance Reports",
                owner="Maria Silva",
                doc_count=3,
            ),
        ]

        for doc in sample_docs:
            await doc.async_save()
        for proj in sample_projects:
            await proj.async_save()

        print("Sample data inserted!")

    async def main():
        # Create tables first
        create_tables()

        # Seed data
        await seed_data()

        deps = DocAgentDeps(client=client)

        result = await agent.run(
            "Find all documents about 'quarterly report' and show me the latest version",
            deps=deps,
        )
        print(result.response)

    asyncio.run(main())

Tool patterns

Async CRUD with S3

"""Pydantic AI async CRUD with S3 example."""

from pydantic_ai import Agent
from pydantic_ai.models.bedrock import BedrockConverseModel
from pydynox import Model, ModelConfig
from pydynox.attributes import NumberAttribute, S3Attribute, S3File, StringAttribute


class Document(Model):
    model_config = ModelConfig(table="documents")

    pk = StringAttribute(hash_key=True)
    sk = StringAttribute(range_key=True)
    title = StringAttribute()
    author = StringAttribute()
    size_bytes = NumberAttribute(default=0)
    content = S3Attribute(bucket="my-docs-bucket", prefix="documents/")


model = BedrockConverseModel(
    model="us.anthropic.claude-sonnet-4-20250514-v1:0",
    region_name="us-east-1",
)

agent = Agent(model, system_prompt="You are a document assistant.")


@agent.tool
async def upload_document(
    ctx,
    doc_id: str,
    title: str,
    author: str,
    content: str,
    version: str = "1",
) -> dict:
    """Upload a new document to S3."""
    content_bytes = content.encode("utf-8")

    doc = Document(
        pk=f"DOC#{doc_id}",
        sk=f"VERSION#{version}",
        title=title,
        author=author,
        size_bytes=len(content_bytes),
    )
    doc.content = S3File(content_bytes, name=f"{doc_id}.txt", content_type="text/plain")
    await doc.save_async()

    return {
        "success": True,
        "doc_id": doc_id,
        "s3_key": doc.content.key,
    }


@agent.tool
async def get_document(ctx, doc_id: str, version: str = "latest") -> dict:
    """Get document details."""
    if version == "latest":
        docs = await Document.query_async(
            key_condition="pk = :pk",
            expression_values={":pk": f"DOC#{doc_id}"},
            scan_index_forward=False,
            limit=1,
        )
        if not docs:
            return {"error": f"Document {doc_id} not found"}
        doc = docs[0]
    else:
        doc = await Document.get_async(pk=f"DOC#{doc_id}", sk=f"VERSION#{version}")
        if not doc:
            return {"error": f"Document {doc_id} version {version} not found"}

    return {
        "doc_id": doc_id,
        "version": doc.sk.replace("VERSION#", ""),
        "title": doc.title,
        "size_kb": round(doc.size_bytes / 1024, 2),
    }

Using dependencies

"""Pydantic AI dependencies example."""

from dataclasses import dataclass

from pydantic_ai import Agent
from pydantic_ai.models.bedrock import BedrockConverseModel
from pydynox import DynamoDBClient, Model, ModelConfig
from pydynox.attributes import StringAttribute


class Document(Model):
    model_config = ModelConfig(table="documents")

    pk = StringAttribute(hash_key=True)
    sk = StringAttribute(range_key=True)
    title = StringAttribute()
    author = StringAttribute()


@dataclass
class AppDeps:
    client: DynamoDBClient
    cache: dict  # Simple in-memory cache


model = BedrockConverseModel(
    model="us.anthropic.claude-sonnet-4-20250514-v1:0",
    region_name="us-east-1",
)

agent = Agent(model, deps_type=AppDeps, system_prompt="You are a document assistant.")


@agent.tool
async def get_document_cached(ctx, doc_id: str) -> dict:
    """Get document with caching."""
    cache_key = f"doc:{doc_id}"

    if cache_key in ctx.deps.cache:
        return ctx.deps.cache[cache_key]

    doc = await Document.get_async(pk=f"DOC#{doc_id}", sk="VERSION#latest")
    if doc:
        result = {"title": doc.title, "author": doc.author}
        ctx.deps.cache[cache_key] = result
        return result

    return {"error": "Not found"}


@agent.tool
async def safe_delete(ctx, doc_id: str) -> dict:
    """Safely delete a document."""
    try:
        doc = await Document.get_async(pk=f"DOC#{doc_id}", sk="VERSION#latest")
        if not doc:
            return {"success": False, "error": "Not found"}

        await doc.delete_async()
        return {"success": True}

    except Exception as e:
        return {"success": False, "error": str(e)}

Tips

Use async methods

Pydantic AI is async-first. Use pydynox async methods for better performance:

Sync Async
Model.get() Model.get_async()
Model.query() Model.query_async()
Model.scan() Model.scan_async()
model.save() model.save_async()
model.delete() model.delete_async()

The ctx parameter

Every tool receives a context object as the first parameter. Use it to access dependencies.

Handle errors

Return error info instead of raising exceptions.

Next steps

  • Strands - AWS agent framework for customer support
  • Smolagents - HuggingFace's agent framework with encrypted data
  • Async - Learn more about async operations
  • S3 attribute - Learn more about S3 storage