Skip to main content

Memproof

memproof.Memproof is the single entry point for the library. It assembles the internal pipeline (risk engine, policy engine, Trailproof audit trail, quarantine store, approval broker, and memory adapter) and exposes a concise async API.
from memproof import Memproof

mp = Memproof(policy="./memproof.yaml")

Constructor

Memproof(
    policy: str | None = None,
    adapter: str = "in_memory",
    config: MemproofConfig | None = None,
    *,
    policy_schema_path: str | None = None,
    langgraph_url: str = "",
    langgraph_api_key: str = "",
    openai_api_key: str = "",
    openai_organization: str = "",
    mcp_server_url: str = "",
    trail_store: str = "memory",
    trail_store_path: str | None = None,
    trail_signing_key: str | None = None,
    attesta_enabled: bool = False,
    attesta_url: str = "",
    attesta_token: str = "",
)
You can either pass a fully constructed MemproofConfig object via config, or use the convenience keyword arguments — they are forwarded to a new MemproofConfig internally. If config is provided, the keyword arguments are ignored (except policy_schema_path).

Parameters

policy
str | None
default:"None"
Path to a Memproof policy YAML file. If None and no config is provided, defaults to "memproof.yaml" in the current working directory.
adapter
str
default:"\"in_memory\""
Which memory adapter to use. Supported values:
  • "in_memory" — ephemeral, dictionary-backed store (useful for testing).
  • "langgraph" — LangGraph checkpoint-backed store.
  • "openai_sessions" — OpenAI Sessions API-backed store.
  • "mcp" — MCP memory server-backed store.
config
MemproofConfig | None
default:"None"
A pre-built configuration object. When provided, all other keyword arguments (except policy_schema_path) are ignored. See Configuration.
policy_schema_path
str | None
default:"None"
Path to the JSON Schema file used to validate the policy YAML. If None, Memproof looks for schemas/memproof-policy.schema.json relative to the package root.
langgraph_url
str
default:"\"\""
Base URL for the LangGraph checkpoint API. Only used when adapter="langgraph".
langgraph_api_key
str
default:"\"\""
API key for authenticating with the LangGraph checkpoint API.
openai_api_key
str
default:"\"\""
OpenAI API key. Only used when adapter="openai_sessions".
openai_organization
str
default:"\"\""
OpenAI organization ID. Only used when adapter="openai_sessions".
mcp_server_url
str
default:"\"\""
URL of the MCP memory server. Only used when adapter="mcp".
trail_store
str
default:"\"memory\""
Trailproof audit trail storage backend. Supported values:
  • "memory" — in-memory store (events are lost on process exit).
  • "jsonl" — JSONL file-backed persistent store.
trail_store_path
str | None
default:"None"
File path for the JSONL trail store. Required when trail_store="jsonl".
trail_signing_key
str | None
default:"None"
HMAC-SHA256 secret key for signing trail events. When provided, each event includes a cryptographic signature for tamper detection.
attesta_enabled
bool
default:"False"
Enable the Attesta external approval service for require_approval policy decisions.
attesta_url
str
default:"\"\""
Base URL for the Attesta approval service.
attesta_token
str
default:"\"\""
Bearer token for authenticating with the Attesta approval service.

Example

from memproof import Memproof

# Minimal -- uses in-memory adapter and default policy path
mp = Memproof(policy="./memproof.yaml")

# With LangGraph backend and JSONL audit trail
mp = Memproof(
    policy="./memproof.yaml",
    adapter="langgraph",
    langgraph_url="https://langgraph.example.com",
    langgraph_api_key="lg-key-abc",
    trail_store="jsonl",
    trail_store_path="./audit.jsonl",
    trail_signing_key="my-hmac-secret",
)

# With a pre-built config object
from memproof import MemproofConfig

cfg = MemproofConfig(
    policy_path="./memproof.yaml",
    adapter="mcp",
    mcp_server_url="http://localhost:8200",
)
mp = Memproof(config=cfg)

Methods

remember()

Create a new memory through the full control pipeline (risk assessment, policy evaluation, event logging, and adapter persistence).
async def remember(
    content: str,
    scope: dict[str, Any] | MemoryScope,
    context: dict[str, Any] | OperationContext,
    *,
    tags: list[str] | None = None,
    metadata: dict[str, Any] | None = None,
    ttl_seconds: int | None = None,
    idempotency_key: str | None = None,
) -> MemoryOperationResponse
content
str
required
The memory content to store. Must be non-empty.
scope
dict | MemoryScope
required
Identifies where this memory belongs. Accepts either a MemoryScope instance or a dictionary with the keys tenant_id, project_id, agent_id, and optionally session_id and subject_id.
context
dict | OperationContext
required
Describes who is performing the operation and when. Accepts either an OperationContext instance or a dictionary with the keys actor_type, actor_id, source, timestamp, and optionally request_id, correlation_id, and metadata.
tags
list[str] | None
default:"None"
Optional list of tags to associate with the memory.
metadata
dict[str, Any] | None
default:"None"
Arbitrary key-value metadata to attach to the memory record.
ttl_seconds
int | None
default:"None"
Time-to-live in seconds. If set, the memory will be considered expired after this duration. Must be greater than 0.
idempotency_key
str | None
default:"None"
Client-supplied idempotency key. If the same key is reused, the original response is returned without re-executing the operation. If None, a unique key is generated automatically.
return
MemoryOperationResponse
Contains operation_id, status, the created memory record (if committed), risk_assessment, and the decision from the policy engine. See Models.

Example

result = await mp.remember(
    content="user prefers dark mode",
    scope={"tenant_id": "acme", "project_id": "assistant", "agent_id": "a1"},
    context={
        "actor_type": "agent",
        "actor_id": "a1",
        "source": "langgraph",
        "timestamp": "2026-01-15T10:30:00Z",
    },
    tags=["preference", "ui"],
    metadata={"confidence": 0.95},
    ttl_seconds=86400,
)

print(result.status)          # OperationStatus.committed
print(result.memory.memory_id)
print(result.decision.action) # DecisionAction.allow

get()

Retrieve a single memory record by its ID.
async def get(memory_id: str) -> MemoryRecord
memory_id
str
required
The unique identifier of the memory to retrieve.
return
MemoryRecord
The full memory record. See Models.

Errors

  • NotFoundError — raised if no memory with the given ID exists.

Example

record = await mp.get("mem-abc123")
print(record.content)
print(record.scope.tenant_id)

update()

Update an existing memory through the full control pipeline. Only the fields you provide are changed; omitted fields remain unchanged.
async def update(
    memory_id: str,
    context: dict[str, Any] | OperationContext,
    *,
    content: str | None = None,
    tags: list[str] | None = None,
    metadata_patch: dict[str, Any] | None = None,
    ttl_seconds: int | None = None,
    idempotency_key: str | None = None,
) -> MemoryOperationResponse
memory_id
str
required
The ID of the memory to update.
context
dict | OperationContext
required
Operation context. Same format as remember().
content
str | None
default:"None"
New content for the memory. If None, the content is not changed.
tags
list[str] | None
default:"None"
Replacement tag list. If None, tags are not changed.
metadata_patch
dict[str, Any] | None
default:"None"
Key-value pairs to merge into the existing metadata. Existing keys not present in the patch are preserved.
ttl_seconds
int | None
default:"None"
New TTL value in seconds. Must be greater than 0 if provided.
idempotency_key
str | None
default:"None"
Client-supplied idempotency key.
return
MemoryOperationResponse
Contains the updated memory, risk assessment, and policy decision. See Models.

Example

result = await mp.update(
    memory_id="mem-abc123",
    context={
        "actor_type": "user",
        "actor_id": "user-42",
        "source": "web-ui",
        "timestamp": "2026-01-15T11:00:00Z",
    },
    content="user prefers light mode",
    tags=["preference", "ui", "updated"],
)

print(result.status)  # OperationStatus.committed

forget()

Delete a memory through the full control pipeline. The operation still flows through risk assessment and policy evaluation before the adapter removes the record.
async def forget(
    memory_id: str,
    context: dict[str, Any] | OperationContext,
    *,
    idempotency_key: str | None = None,
) -> OperationStatusResponse
memory_id
str
required
The ID of the memory to delete.
context
dict | OperationContext
required
Operation context. Same format as remember().
idempotency_key
str | None
default:"None"
Client-supplied idempotency key.
return
OperationStatusResponse
Contains the operation ID, status, and decision. See Models.

Example

result = await mp.forget(
    memory_id="mem-abc123",
    context={
        "actor_type": "user",
        "actor_id": "user-42",
        "source": "web-ui",
        "timestamp": "2026-01-15T12:00:00Z",
    },
)

print(result.status)  # OperationStatus.committed

Search for memories within a given scope. The search query is forwarded to the adapter, which returns ranked results.
async def search(
    query: str,
    scope: dict[str, Any] | MemoryScope,
    context: dict[str, Any] | OperationContext,
    *,
    limit: int = 20,
    filters: dict[str, Any] | None = None,
) -> MemorySearchResponse
query
str
required
The search query string. Must be non-empty.
scope
dict | MemoryScope
required
Restricts the search to memories within this scope.
context
dict | OperationContext
required
Operation context for auditing the search request.
limit
int
default:"20"
Maximum number of results to return. Must be between 1 and 100 (inclusive).
filters
dict[str, Any] | None
default:"None"
Additional key-value filters passed to the adapter. The available filter keys depend on the adapter implementation.
return
MemorySearchResponse
Contains a hits list of MemorySearchHit objects, each with a memory and a relevance score. See Models.

Example

results = await mp.search(
    query="dark mode",
    scope={"tenant_id": "acme", "project_id": "assistant", "agent_id": "a1"},
    context={
        "actor_type": "agent",
        "actor_id": "a1",
        "source": "langgraph",
        "timestamp": "2026-01-15T10:31:00Z",
    },
    limit=5,
)

for hit in results.hits:
    print(f"{hit.memory.content} (score={hit.score})")

get_operation_status()

Retrieve the current status of a previously submitted operation. This is a synchronous method.
def get_operation_status(operation_id: str) -> OperationStatusResponse
operation_id
str
required
The operation ID returned by remember(), update(), or forget().
return
OperationStatusResponse
Current status of the operation. See Models.

Example

status = mp.get_operation_status(result.operation_id)
print(status.status)          # e.g. OperationStatus.pending_approval
print(status.operation_type)  # e.g. OperationType.remember

approve()

Approve a pending operation that was held by a require_approval policy decision.
async def approve(
    operation_id: str,
    actor_id: str,
    notes: str | None = None,
) -> OperationStatusResponse
operation_id
str
required
The ID of the operation to approve.
actor_id
str
required
Identifier of the actor (human reviewer) performing the approval.
notes
str | None
default:"None"
Optional free-text notes to attach to the approval decision.
return
OperationStatusResponse
Updated status of the operation after approval. See Models.

Example

approved = await mp.approve(
    operation_id="op-xyz789",
    actor_id="reviewer-1",
    notes="Reviewed and approved -- content is safe.",
)

print(approved.status)  # OperationStatus.committed

deny()

Deny a pending operation that was held by a require_approval policy decision.
async def deny(
    operation_id: str,
    actor_id: str,
    notes: str | None = None,
) -> OperationStatusResponse
operation_id
str
required
The ID of the operation to deny.
actor_id
str
required
Identifier of the actor performing the denial.
notes
str | None
default:"None"
Optional free-text notes explaining the denial.
return
OperationStatusResponse
Updated status of the operation after denial. See Models.

Example

denied = await mp.deny(
    operation_id="op-xyz789",
    actor_id="reviewer-1",
    notes="Content contains PII -- denied per policy.",
)

print(denied.status)  # OperationStatus.blocked

verify_audit_trail()

Verify the integrity of the Trailproof audit trail by validating the SHA-256 hash chain. If any event has been tampered with, the chain breaks and verification fails.
def verify_audit_trail() -> TrailVerificationResult
return
TrailVerificationResult
Contains valid (bool), event_count (int), and error (str or None if valid). See Trailproof documentation for details.

Example

result = mp.verify_audit_trail()

if result.valid:
    print(f"Audit trail intact: {result.event_count} events verified")
else:
    print(f"Tampering detected: {result.error}")

query_audit_trail()

Query the Trailproof audit trail with optional filters for event type and metadata fields.
def query_audit_trail(
    event_type: str | None = None,
    metadata: dict[str, Any] | None = None,
    limit: int | None = None,
) -> list[TrailEvent]
event_type
str | None
default:"None"
Filter by event type (e.g. "memproof.pipeline.committed").
metadata
dict[str, Any] | None
default:"None"
Filter by metadata fields (e.g. {"operation_id": "op-abc123"} or {"tenant_id": "acme"}).
limit
int | None
default:"None"
Maximum number of events to return.
return
list[TrailEvent]
A list of Trailproof TrailEvent objects matching the query criteria.

Example

# All committed events for a tenant
events = mp.query_audit_trail(
    event_type="memproof.pipeline.committed",
    metadata={"tenant_id": "acme"},
    limit=50,
)

for event in events:
    print(f"{event.event_type} at {event.timestamp}")

# Full trace for a single operation
trace = mp.query_audit_trail(
    metadata={"operation_id": "op-abc123"},
)

Scope and Context Flexibility

Every method that accepts scope or context parameters can receive either a Pydantic model instance or a plain dictionary. Memproof coerces dictionaries internally:
from memproof import MemoryScope, OperationContext

# Using model instances
scope = MemoryScope(tenant_id="acme", project_id="p1", agent_id="a1")
context = OperationContext(
    actor_type="agent",
    actor_id="a1",
    source="langgraph",
    timestamp="2026-01-15T10:30:00Z",
)
result = await mp.remember("hello", scope=scope, context=context)

# Using plain dicts (equivalent)
result = await mp.remember(
    "hello",
    scope={"tenant_id": "acme", "project_id": "p1", "agent_id": "a1"},
    context={
        "actor_type": "agent",
        "actor_id": "a1",
        "source": "langgraph",
        "timestamp": "2026-01-15T10:30:00Z",
    },
)
Both forms are fully supported and produce identical results.