Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions examples/memory/compaction_session_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"""
Example demonstrating OpenAI responses.compact session functionality.

This example shows how to use OpenAIResponsesCompactionSession to automatically
compact conversation history when it grows too large, reducing token usage
while preserving context.
"""

import asyncio

from agents import Agent, OpenAIResponsesCompactionSession, Runner, SQLiteSession


async def main():
# Create an underlying session for storage
underlying = SQLiteSession(":memory:")

# Wrap with compaction session - will automatically compact when threshold hit
session = OpenAIResponsesCompactionSession(
session_id="demo-session",
underlying_session=underlying,
model="gpt-4.1",
# Custom compaction trigger (default is 10 candidates)
should_trigger_compaction=lambda ctx: len(ctx["compaction_candidate_items"]) >= 4,
)

agent = Agent(
name="Assistant",
instructions="Reply concisely. Keep answers to 1-2 sentences.",
)

print("=== Compaction Session Example ===\n")

prompts = [
"What is the tallest mountain in the world?",
"How tall is it in feet?",
"When was it first climbed?",
"Who was on that expedition?",
"What country is the mountain in?",
]

for i, prompt in enumerate(prompts, 1):
print(f"Turn {i}:")
print(f"User: {prompt}")
result = await Runner.run(agent, prompt, session=session)
print(f"Assistant: {result.final_output}\n")

# Show final session state
items = await session.get_items()
print("=== Final Session State ===")
print(f"Total items: {len(items)}")
for item in items:
item_type = item.get("type", "unknown")
if item_type == "compaction":
print(" - compaction (encrypted content)")
elif item_type == "message":
role = item.get("role", "unknown")
print(f" - message ({role})")
else:
print(f" - {item_type}")


if __name__ == "__main__":
asyncio.run(main())
10 changes: 10 additions & 0 deletions src/agents/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
set_conversation_history_wrappers,
)
from .items import (
CompactionItem,
HandoffCallItem,
HandoffOutputItem,
ItemHelpers,
Expand All @@ -63,9 +64,13 @@
from .lifecycle import AgentHooks, RunHooks
from .memory import (
OpenAIConversationsSession,
OpenAIResponsesCompactionArgs,
OpenAIResponsesCompactionAwareSession,
OpenAIResponsesCompactionSession,
Session,
SessionABC,
SQLiteSession,
is_openai_responses_compaction_aware_session,
)
from .model_settings import ModelSettings
from .models.interface import Model, ModelProvider, ModelTracing
Expand Down Expand Up @@ -291,6 +296,11 @@ def enable_verbose_stdout_logging():
"SessionABC",
"SQLiteSession",
"OpenAIConversationsSession",
"OpenAIResponsesCompactionSession",
"OpenAIResponsesCompactionArgs",
"OpenAIResponsesCompactionAwareSession",
"is_openai_responses_compaction_aware_session",
"CompactionItem",
"AgentHookContext",
"RunContextWrapper",
"TContext",
Expand Down
4 changes: 4 additions & 0 deletions src/agents/_run_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
from .guardrail import InputGuardrail, InputGuardrailResult, OutputGuardrail, OutputGuardrailResult
from .handoffs import Handoff, HandoffInputData, nest_handoff_history
from .items import (
CompactionItem,
HandoffCallItem,
HandoffOutputItem,
ItemHelpers,
Expand Down Expand Up @@ -539,6 +540,9 @@ def process_model_response(
logger.debug("Queuing shell_call %s", call_identifier)
shell_calls.append(ToolRunShellCall(tool_call=output, shell_tool=shell_tool))
continue
if output_type == "compaction":
items.append(CompactionItem(raw_item=cast(dict[str, Any], output), agent=agent))
continue
if output_type == "apply_patch_call":
items.append(ToolCallItem(raw_item=cast(Any, output), agent=agent))
if apply_patch_tool:
Expand Down
18 changes: 18 additions & 0 deletions src/agents/items.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,23 @@ class MCPApprovalResponseItem(RunItemBase[McpApprovalResponse]):
type: Literal["mcp_approval_response_item"] = "mcp_approval_response_item"


@dataclass
class CompactionItem:
"""Represents a compaction item from responses.compact."""

agent: Agent[Any]
"""The agent whose run caused this item to be generated."""

raw_item: dict[str, Any]
"""The raw compaction item containing encrypted_content."""

type: Literal["compaction_item"] = "compaction_item"

def to_input_item(self) -> TResponseInputItem:
"""Converts this item into an input item suitable for passing to the model."""
return cast(TResponseInputItem, self.raw_item)


RunItem: TypeAlias = Union[
MessageOutputItem,
HandoffCallItem,
Expand All @@ -337,6 +354,7 @@ class MCPApprovalResponseItem(RunItemBase[McpApprovalResponse]):
MCPListToolsItem,
MCPApprovalRequestItem,
MCPApprovalResponseItem,
CompactionItem,
]
"""An item generated by an agent."""

Expand Down
13 changes: 12 additions & 1 deletion src/agents/memory/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
from .openai_conversations_session import OpenAIConversationsSession
from .session import Session, SessionABC
from .openai_responses_compaction_session import OpenAIResponsesCompactionSession
from .session import (
OpenAIResponsesCompactionArgs,
OpenAIResponsesCompactionAwareSession,
Session,
SessionABC,
is_openai_responses_compaction_aware_session,
)
from .sqlite_session import SQLiteSession
from .util import SessionInputCallback

Expand All @@ -9,4 +16,8 @@
"SessionInputCallback",
"SQLiteSession",
"OpenAIConversationsSession",
"OpenAIResponsesCompactionSession",
"OpenAIResponsesCompactionArgs",
"OpenAIResponsesCompactionAwareSession",
"is_openai_responses_compaction_aware_session",
]
215 changes: 215 additions & 0 deletions src/agents/memory/openai_responses_compaction_session.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
from __future__ import annotations

import logging
from typing import TYPE_CHECKING, Any, Callable

from openai import AsyncOpenAI

from ..models._openai_shared import get_default_openai_client
from .openai_conversations_session import OpenAIConversationsSession
from .session import (
OpenAIResponsesCompactionArgs,
OpenAIResponsesCompactionAwareSession,
SessionABC,
)

if TYPE_CHECKING:
from ..items import TResponseInputItem
from .session import Session

logger = logging.getLogger("openai-agents.openai.compaction")

DEFAULT_COMPACTION_THRESHOLD = 10


def select_compaction_candidate_items(
items: list[TResponseInputItem],
) -> list[TResponseInputItem]:
"""Select compaction candidate items.

Excludes user messages and compaction items.
"""
return [
item
for item in items
if not (
(item.get("type") == "message" and item.get("role") == "user")
or item.get("type") == "compaction"
)
]


def default_should_trigger_compaction(context: dict[str, Any]) -> bool:
"""Default decision: compact when >= 10 candidate items exist."""
return len(context["compaction_candidate_items"]) >= DEFAULT_COMPACTION_THRESHOLD


def is_openai_model_name(model: str) -> bool:
"""Validate model name follows OpenAI conventions."""
trimmed = model.strip()
if not trimmed:
return False

# Handle fine-tuned models: ft:gpt-4.1:org:proj:suffix
without_ft_prefix = trimmed[3:] if trimmed.startswith("ft:") else trimmed
root = without_ft_prefix.split(":", 1)[0]

# Allow gpt-* and o* models
if root.startswith("gpt-"):
return True
if root.startswith("o") and root[1:2].isdigit():
return True

return False


class OpenAIResponsesCompactionSession(SessionABC, OpenAIResponsesCompactionAwareSession):
"""Session decorator that triggers responses.compact when stored history grows.

Works with OpenAI Responses API models only. Wraps any Session (except
OpenAIConversationsSession) and automatically calls the OpenAI responses.compact
API after each turn when the decision hook returns True.
"""

def __init__(
self,
session_id: str,
underlying_session: Session,
*,
client: AsyncOpenAI | None = None,
model: str = "gpt-4.1",
should_trigger_compaction: Callable[[dict[str, Any]], bool] | None = None,
):
"""Initialize the compaction session.

Args:
session_id: Identifier for this session.
underlying_session: Session store that holds the compacted history. Cannot be
OpenAIConversationsSession.
client: OpenAI client for responses.compact API calls. Defaults to
get_default_openai_client() or new AsyncOpenAI().
model: Model to use for responses.compact. Defaults to "gpt-4.1". Must be an
OpenAI model name (gpt-*, o*, or ft:gpt-*).
should_trigger_compaction: Custom decision hook. Defaults to triggering when
10+ compaction candidates exist.
"""
if isinstance(underlying_session, OpenAIConversationsSession):
raise ValueError(
"OpenAIResponsesCompactionSession cannot wrap OpenAIConversationsSession "
"because it manages its own history on the server."
)

if not is_openai_model_name(model):
raise ValueError(f"Unsupported model for OpenAI responses compaction: {model}")

self.session_id = session_id
self.underlying_session = underlying_session
self._client = client
self.model = model
self.should_trigger_compaction = (
should_trigger_compaction or default_should_trigger_compaction
)

# Cache for incremental candidate tracking
self._compaction_candidate_items: list[TResponseInputItem] | None = None
self._session_items: list[TResponseInputItem] | None = None
self._response_id: str | None = None

@property
def client(self) -> AsyncOpenAI:
if self._client is None:
self._client = get_default_openai_client() or AsyncOpenAI()
return self._client

async def run_compaction(self, args: OpenAIResponsesCompactionArgs | None = None) -> None:
"""Run compaction using responses.compact API."""
if args and args.get("response_id"):
self._response_id = args["response_id"]

if not self._response_id:
raise ValueError(
"OpenAIResponsesCompactionSession.run_compaction requires a response_id"
)

compaction_candidate_items, session_items = await self._ensure_compaction_candidates()

force = args.get("force", False) if args else False
should_compact = force or self.should_trigger_compaction(
{
"response_id": self._response_id,
"compaction_candidate_items": compaction_candidate_items,
"session_items": session_items,
}
)

if not should_compact:
logger.debug(f"skip: decision hook declined compaction for {self._response_id}")
return

logger.debug(f"compact: start for {self._response_id} using {self.model}")

compacted = await self.client.responses.compact(
previous_response_id=self._response_id,
model=self.model,
)

await self.underlying_session.clear_session()
output_items: list[TResponseInputItem] = []
if compacted.output:
for item in compacted.output:
if isinstance(item, dict):
output_items.append(item)
else:
output_items.append(item.model_dump(exclude_unset=True)) # type: ignore

if output_items:
await self.underlying_session.add_items(output_items)

self._compaction_candidate_items = select_compaction_candidate_items(output_items)
self._session_items = output_items

logger.debug(
f"compact: done for {self._response_id} "
f"(output={len(output_items)}, candidates={len(self._compaction_candidate_items)})"
)

async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]:
return await self.underlying_session.get_items(limit)

async def add_items(self, items: list[TResponseInputItem]) -> None:
await self.underlying_session.add_items(items)
if self._compaction_candidate_items is not None:
new_candidates = select_compaction_candidate_items(items)
if new_candidates:
self._compaction_candidate_items.extend(new_candidates)
if self._session_items is not None:
self._session_items.extend(items)

async def pop_item(self) -> TResponseInputItem | None:
popped = await self.underlying_session.pop_item()
if popped:
self._compaction_candidate_items = None
self._session_items = None
return popped

async def clear_session(self) -> None:
await self.underlying_session.clear_session()
self._compaction_candidate_items = []
self._session_items = []

async def _ensure_compaction_candidates(
self,
) -> tuple[list[TResponseInputItem], list[TResponseInputItem]]:
"""Lazy-load and cache compaction candidates."""
if self._compaction_candidate_items is not None and self._session_items is not None:
return (self._compaction_candidate_items[:], self._session_items[:])

history = await self.underlying_session.get_items()
candidates = select_compaction_candidate_items(history)
self._compaction_candidate_items = candidates
self._session_items = history

logger.debug(
f"candidates: initialized (history={len(history)}, candidates={len(candidates)})"
)
return (candidates[:], history[:])
Loading