Logging API Reference
Complete API reference for agent-actions' event-based logging system.
Public API
fire_event()
Emit an event to all registered handlers.
from agent_actions.logging import fire_event
from agent_actions.logging.events import WorkflowStartEvent
fire_event(WorkflowStartEvent(
workflow_name="my_workflow",
agent_count=5,
))
Note: The message field is auto-generated in __post_init__ based on other fields.
Parameters:
event(BaseEvent): Event instance to fire
Returns: None
Context Enrichment: Events are automatically enriched with correlation context from the EventManager before being dispatched to handlers.
get_manager()
Get the EventManager singleton instance.
from agent_actions.logging import get_manager
manager = get_manager()
manager.set_context(workflow_name="my_workflow")
Returns: EventManager instance
LoggerFactory
Factory class for initializing and managing the logging system.
LoggerFactory.initialize()
Initialize the unified logging system with event handlers.
from agent_actions.logging import LoggerFactory
manager = LoggerFactory.initialize(
output_dir="/path/to/workflow/agent_io",
workflow_name="my_workflow",
invocation_id="run_abc123",
verbose=True,
force=True,
)
Parameters:
config(Optional[LoggingConfig]): Logging configuration. If None, uses defaults from environment.output_dir(Optional[str | Path]): Directory for run_results.json and event logs. Defaults to None.workflow_name(str): Name of the workflow being executed. Defaults to "".invocation_id(Optional[str]): Unique ID for this invocation. Auto-generated if not provided.verbose(bool): Show DEBUG level events on console. Defaults to False.quiet(bool): Only show WARN and ERROR events on console. Defaults to False.force(bool): Reinitialize even if already initialized. Defaults to False.
Returns: EventManager instance
Side Effects:
- Creates and registers ConsoleEventHandler
- Creates and registers JSONFileHandler (if output_dir provided)
- Creates and registers RunResultsCollector (if output_dir provided)
- Sets up LoggingBridgeHandler to convert Python logging to events
- Sets correlation context (invocation_id, workflow_name)
Example:
# Typical CLI initialization
manager = LoggerFactory.initialize(
output_dir=agent_folder,
workflow_name="product_analysis",
invocation_id="run_xyz789",
verbose=args.verbose,
force=True,
)
# Events are now routed to:
# - Console (Rich-formatted output)
# - agent_folder/target/events.json
# - agent_folder/target/run_results.json
LoggerFactory.get_logger()
Get a Python logger that routes through the event system.
logger = LoggerFactory.get_logger("my_module")
logger.info("Processing item") # Converted to LogEvent
Parameters:
name(str): Logger name. Auto-prefixed with 'agent_actions.' if not already.
Returns: logging.Logger instance
Note: All logging calls are converted to LogEvent instances and routed through EventManager.
LoggerFactory.set_context()
Set shared context values for all events.
LoggerFactory.set_context(
workflow_name="my_workflow",
correlation_id="abc123",
agent_name="extract_data",
)
Parameters:
**kwargs: Arbitrary context key-value pairs
Returns: None
Note: Context is automatically attached to all events emitted after this call.
LoggerFactory.flush()
Flush all event handlers (write buffered data).
try:
run_workflow()
finally:
LoggerFactory.flush() # Ensure all events are written
Returns: None
Important: Always call before process exit to ensure buffered events are written.
LoggerFactory.get_event_manager()
Get the current EventManager instance.
manager = LoggerFactory.get_event_manager()
Returns: Optional[EventManager]
Returns None if: LoggerFactory hasn't been initialized yet.
LoggerFactory.get_run_results_collector()
Get the RunResultsCollector instance.
collector = LoggerFactory.get_run_results_collector()
Returns: Optional[RunResultsCollector]
Returns None if: LoggerFactory hasn't been initialized or no output_dir was provided.
LoggerFactory.reset()
Reset the factory state (for testing).
LoggerFactory.reset()
Returns: None
Warning: This clears all handlers and state. Only use in tests.
EventManager
Central event dispatcher that routes events to handlers.
EventManager.get()
Get the EventManager singleton instance.
from agent_actions.logging.core import EventManager
manager = EventManager.get()
Returns: EventManager instance
EventManager.initialize()
Mark the EventManager as initialized.
manager.initialize()
Returns: None
Note: This is called automatically by LoggerFactory.initialize().
EventManager.register()
Register an event handler.
from agent_actions.logging.core import ConsoleEventHandler
handler = ConsoleEventHandler()
manager.register(handler)
Parameters:
handler(EventHandler): Handler instance withaccepts()andhandle()methods
Returns: None
EventManager.register_function()
Register a simple function as a handler.
def my_handler(event):
print(f"Event: {event.message}")
manager.register_function(my_handler)
Parameters:
func(Callable[[BaseEvent], None]): Function that takes an event
Returns: None
Note: Function will receive all events (no filtering).
EventManager.fire()
Fire an event to all registered handlers.
from agent_actions.logging.events import WorkflowStartEvent
event = WorkflowStartEvent(workflow_name="my_workflow", agent_count=3)
manager.fire(event)
Parameters:
event(BaseEvent): Event to fire
Returns: None
Note: Event is enriched with context before being dispatched.
EventManager.set_context()
Set correlation context for all events.
manager.set_context(
workflow_name="my_workflow",
correlation_id="abc123",
)
Parameters:
**kwargs: Context key-value pairs
Returns: None
EventManager.context()
Context manager for temporary context override.
with manager.context(workflow_name="upstream"):
fire_event(WorkflowStartEvent(workflow_name="upstream", agent_count=2))
# Event has workflow_name="upstream"
# Context restored to previous value
Parameters:
**kwargs: Temporary context values
Returns: Context manager
Yields: None
EventManager.flush()
Flush all registered handlers.
manager.flush()
Returns: None
EventManager.reset()
Reset EventManager state (for testing).
EventManager.reset()
Returns: None
Warning: Clears all handlers and context. Only use in tests.
Event Classes
All event classes inherit from BaseEvent and are dataclasses.
BaseEvent
Base class for all events.
from agent_actions.logging.events.base import BaseEvent, EventCategory
@dataclass
class MyEvent(BaseEvent):
category: EventCategory = EventCategory.SYSTEM
event_type: str = "my_event"
# Custom fields
custom_field: str = ""
def __post_init__(self):
super().__post_init__()
self.data["custom_field"] = self.custom_field
Attributes:
event_id(str): Unique event identifier (auto-generated)timestamp(datetime): Event timestamp (auto-generated)category(EventCategory): Event categoryevent_type(str): Specific event typemessage(str): Human-readable messagedata(Dict[str, Any]): Structured event datameta(EventMetadata): Metadata (invocation_id, workflow_name, etc.)
Workflow Events
WorkflowStartEvent
Emitted when a workflow starts executing.
from agent_actions.logging.events import WorkflowStartEvent
fire_event(WorkflowStartEvent(
workflow_name="my_workflow",
agent_count=5,
execution_mode="parallel",
))
Note: The message field is auto-generated based on workflow_name and agent_count.
Fields:
workflow_name(str): Workflow nameagent_count(int): Number of agents in workflowexecution_mode(str): "sequential" or "parallel"
WorkflowCompleteEvent
Emitted when a workflow completes successfully.
from agent_actions.logging.events import WorkflowCompleteEvent
fire_event(WorkflowCompleteEvent(
workflow_name="my_workflow",
elapsed_time=119.5,
agents_completed=5,
agents_skipped=0,
agents_failed=0,
total_tokens=12500,
))
Fields:
workflow_name(str): Workflow nameelapsed_time(float): Total execution time in secondsagents_completed(int): Number of agents completedagents_skipped(int): Number of agents skippedagents_failed(int): Number of agents failedtotal_tokens(int): Total tokens used
WorkflowFailedEvent
Emitted when a workflow fails.
from agent_actions.logging.events import WorkflowFailedEvent
fire_event(WorkflowFailedEvent(
workflow_name="my_workflow",
error_message="Dependency cycle detected",
error_type="CyclicDependencyError",
elapsed_time=5.2,
failed_agent="extract_data",
))
Fields:
workflow_name(str): Workflow nameerror_message(str): Error descriptionerror_type(str): Exception type nameelapsed_time(float): Time elapsed before failurefailed_action(str): Name of the action that failed
Action Events
ActionStartEvent
Emitted when an action starts processing.
from agent_actions.logging.events import ActionStartEvent
fire_event(ActionStartEvent(
action_name="extract_data",
action_index=0, # 0-based index
total_actions=5,
action_type="llm",
input_path="/path/to/input.json",
))
Fields:
action_name(str): Action nameaction_index(int): Action position in workflow (0-based)total_actions(int): Total number of actionsaction_type(str): Type of action ("llm", "tool", etc.)input_path(str): Path to input file
ActionCompleteEvent
Emitted when an action completes successfully.
from agent_actions.logging.events import ActionCompleteEvent
fire_event(ActionCompleteEvent(
action_name="extract_data",
action_index=0,
total_actions=5,
execution_time=15.2,
output_path="/path/to/output.json",
record_count=100,
tokens={"prompt_tokens": 800, "completion_tokens": 400, "total_tokens": 1200},
))
Fields:
action_name(str): Action nameaction_index(int): Action position in workflow (0-based)total_actions(int): Total number of actionsexecution_time(float): Execution time in secondsoutput_path(str): Path to output filerecord_count(int): Number of records processedtokens(Dict[str, int]): Token usage breakdown
ActionSkipEvent
Emitted when an action is skipped (guard condition, cache hit, etc.).
from agent_actions.logging.events import ActionSkipEvent
fire_event(ActionSkipEvent(
action_name="optional_enrichment",
action_index=2,
total_actions=5,
skip_reason="Guard condition failed: confidence < 0.8",
))
Fields:
action_name(str): Action nameaction_index(int): Action position in workflow (0-based)total_actions(int): Total number of actionsskip_reason(str): Why action was skipped
ActionFailedEvent
Emitted when an action fails.
from agent_actions.logging.events import ActionFailedEvent
fire_event(ActionFailedEvent(
action_name="extract_data",
action_index=0,
total_actions=5,
error_message="API rate limit exceeded",
error_type="RateLimitError",
execution_time=2.5,
suggestion="Try again after rate limit resets",
))
Fields:
action_name(str): Action nameaction_index(int): Action position in workflow (0-based)total_actions(int): Total number of actionserror_message(str): Error descriptionerror_type(str): Exception class nameexecution_time(float): Time elapsed before failuresuggestion(str): Suggested fix or next step
Batch Events
BatchSubmittedEvent
Emitted when a batch job is submitted.
from agent_actions.logging.events import BatchSubmittedEvent
fire_event(BatchSubmittedEvent(
batch_id="batch_abc123",
agent_name="extract_data",
request_count=1000,
provider="openai",
))
Fields:
batch_id(str): Batch job IDaction_name(str): Action namerequest_count(int): Number of requests in batchprovider(str): LLM provider
BatchCompleteEvent
Emitted when a batch job completes.
from agent_actions.logging.events import BatchCompleteEvent
fire_event(BatchCompleteEvent(
batch_id="batch_abc123",
action_name="extract_data",
total=1000,
completed=995,
failed=5,
elapsed_time=3600.0,
total_tokens=75000,
))
Fields:
batch_id(str): Batch job IDaction_name(str): Action nametotal(int): Total requests in batchcompleted(int): Successfully completed requestsfailed(int): Failed requestselapsed_time(float): Total time in secondstotal_tokens(int): Total tokens used
BatchErrorEvent
Emitted when a batch job fails.
from agent_actions.logging.events import BatchErrorEvent
fire_event(BatchErrorEvent(
message="Batch failed",
action_name="extract_data",
batch_id="batch_abc123",
error_message="Batch expired",
error_type="BatchExpiredError",
))
Fields:
action_name(str): Action namebatch_id(str): Batch job IDerror_message(str): Error descriptionerror_type(str): Exception class name
Validation Events
ValidationStartEvent
Emitted when validation starts.
from agent_actions.logging.events import ValidationStartEvent
fire_event(ValidationStartEvent(
message="Starting validation",
target="extract_data_schema",
))
Fields:
target(str): What is being validated
ValidationCompleteEvent
Emitted when validation completes successfully.
from agent_actions.logging.events import ValidationCompleteEvent
fire_event(ValidationCompleteEvent(
message="Validation passed",
target="extract_data_schema",
))
Fields:
target(str): What was validated
ValidationErrorEvent
Emitted when validation finds an error.
from agent_actions.logging.events import ValidationErrorEvent
fire_event(ValidationErrorEvent(
target="extract_data_schema",
field="title",
error="Missing required field",
))
Fields:
target(str): What was validatedfield(str): Which field failederror(str): Error descriptionvalue(Any): The invalid value (optional)
ValidationWarningEvent
Emitted for validation warnings.
from agent_actions.logging.events import ValidationWarningEvent
fire_event(ValidationWarningEvent(
target="extract_data_schema",
field="description",
warning="Optional field is missing",
))
Fields:
target(str): What was validatedfield(str): Which field triggered the warningwarning(str): Warning messagevalue(Any): The problematic value (optional)
Event Handlers
EventHandler (Base Class)
Base class for all event handlers.
from agent_actions.logging.core.handlers import EventHandler
class MyHandler(EventHandler):
def accepts(self, event: BaseEvent) -> bool:
"""Return True if this handler processes the event."""
return event.category == "workflow"
def handle(self, event: BaseEvent) -> None:
"""Process the event."""
print(f"Event: {event.message}")
def flush(self) -> None:
"""Flush any buffered data."""
pass
Methods:
accepts(event)→ bool: Return True if handler processes this eventhandle(event)→ None: Process the eventflush()→ None: Flush buffered data
ConsoleEventHandler
Outputs events to the console using Rich formatting.
from agent_actions.logging.core import ConsoleEventHandler, EventLevel
handler = ConsoleEventHandler(
min_level=EventLevel.INFO,
show_timestamp=True,
categories={"workflow", "action", "batch"},
)
Parameters:
min_level(EventLevel): Minimum event level to displayshow_timestamp(bool): Include timestamp in outputformatter(Optional[Callable]): Custom formatter functioncategories(Optional[Set[str]]): Event categories to display (None = all)
JSONFileHandler
Writes events to a JSON file in NDJSON format.
from agent_actions.logging.core import JSONFileHandler, EventLevel
from pathlib import Path
handler = JSONFileHandler(
file_path=Path("logs/events.json"),
min_level=EventLevel.DEBUG,
buffer_size=10,
)
Parameters:
file_path(Path): Output file pathmin_level(EventLevel): Minimum event level to logbuffer_size(int): Number of events to buffer before writing
Output Format: NDJSON (one JSON object per line)
RunResultsCollector
Collects workflow execution results into run_results.json.
from agent_actions.logging.events.handlers import RunResultsCollector
collector = RunResultsCollector(
output_dir="/path/to/agent_io",
workflow_name="my_workflow",
)
Parameters:
output_dir(Optional[str | Path]): Directory to write run_results.jsonworkflow_name(str): Name of workflow being executed
Output: {output_dir}/target/run_results.json
Methods:
accepts(event): Returns True for workflow/action category eventshandle(event): Process event and update internal stateflush(): Write run_results.json to disk
Configuration
LoggingConfig
Configuration for the logging system.
from agent_actions.logging.config import LoggingConfig, FileHandlerConfig
config = LoggingConfig(
default_level="INFO",
file_handler=FileHandlerConfig(
enabled=True,
path="logs/events.json",
),
)
LoggerFactory.initialize(config=config)
Fields:
default_level(str): Default log level ("DEBUG", "INFO", "WARN", "ERROR")file_handler(FileHandlerConfig): File handler configuration
FileHandlerConfig
from agent_actions.logging.config import FileHandlerConfig
file_config = FileHandlerConfig(
enabled=True,
path="logs/events.json",
)
Fields:
enabled(bool): Enable file loggingpath(str): Log file path (relative to project root)
Constants
EventLevel
Event severity levels.
from agent_actions.logging.core import EventLevel
EventLevel.DEBUG # Detailed diagnostic information
EventLevel.INFO # General informational messages
EventLevel.WARN # Warning messages
EventLevel.ERROR # Error messages
EventCategory
Event categories for filtering.
from agent_actions.logging.events.base import EventCategory
EventCategory.WORKFLOW # Workflow lifecycle events
EventCategory.ACTION # Action execution events
EventCategory.BATCH # Batch processing events
EventCategory.VALIDATION # Validation events
EventCategory.PROGRESS # Progress updates
EventCategory.SYSTEM # System-level events
Examples
Basic Workflow Logging
from agent_actions.logging import LoggerFactory, fire_event
from agent_actions.logging.events import (
WorkflowStartEvent,
ActionStartEvent,
ActionCompleteEvent,
WorkflowCompleteEvent,
)
# Initialize
LoggerFactory.initialize(
output_dir="my_workflow/agent_io",
workflow_name="my_workflow",
)
# Workflow start
fire_event(WorkflowStartEvent(
message="Starting workflow",
workflow_name="my_workflow",
action_count=3,
))
# Action 1
fire_event(ActionStartEvent(
message="Starting action",
action_name="extract",
action_index=1,
))
# ... do work ...
fire_event(ActionCompleteEvent(
message="Action completed",
action_name="extract",
execution_time=5.2,
))
# Workflow complete
fire_event(WorkflowCompleteEvent(
message="Workflow completed",
workflow_name="my_workflow",
execution_time=15.6,
))
# Flush
LoggerFactory.flush()
Custom Event Handler
from agent_actions.logging import get_manager
from agent_actions.logging.core.handlers import EventHandler
from agent_actions.logging.events.base import BaseEvent
class MetricsHandler(EventHandler):
def __init__(self):
self.metrics = []
def accepts(self, event: BaseEvent) -> bool:
return event.category in ("workflow", "action")
def handle(self, event: BaseEvent) -> None:
if event.event_type == "ActionCompleteEvent":
self.metrics.append({
"action": event.data.get("action_name"),
"duration": event.data.get("execution_time"),
"tokens": event.data.get("tokens", {}),
})
def flush(self) -> None:
import json
with open("metrics.json", "w") as f:
json.dump(self.metrics, f, indent=2)
# Register
handler = MetricsHandler()
manager = get_manager()
manager.register(handler)
Context Scoping
from agent_actions.logging import get_manager, fire_event
from agent_actions.logging.events import ActionStartEvent
manager = get_manager()
# Set global context
manager.set_context(workflow_name="main")
# All events inherit workflow_name="main"
fire_event(ActionStartEvent(message="Action 1", action_name="extract"))
# Override for nested context
with manager.context(workflow_name="upstream"):
fire_event(ActionStartEvent(message="Upstream action", action_name="fetch"))
# Event has workflow_name="upstream"
# Back to workflow_name="main"
fire_event(ActionStartEvent(message="Action 2", action_name="transform"))
See Also
- Logging Architecture - System design and flow