Logging Architecture
agent-actions uses an event-based logging architecture providing both user-friendly CLI output and detailed structured logs for debugging and analytics.
Overview
The logging system has two primary goals:
- User-facing output - Clean, formatted progress messages during workflow execution
- Structured logs - Complete execution trace for debugging, analytics, and integration
All logging flows through a central EventManager that dispatches events to registered handlers.
Architecture Diagram
┌─────────────────────────────────────────────────────────┐
│ Application Code │
│ │
│ ┌──────────────────┐ ┌──────────────────┐ │
│ │ logger.info() │ │ fire_event() │ │
│ │ logger.error() │ │ (Direct events) │ │
│ └────────┬─────────┘ └────────┬─────────┘ │
│ │ │ │
└───────────┼─────────────────────────┼────────────────────┘
│ │
▼ │
┌──────────────────┐ │
│ LoggingBridge │ │
│ Handler │ │
│ (converts to │ │
│ events) │ │
└────────┬─────────┘ │
│ │
└─────────────┬───────────┘
▼
┌──────────────────┐
│ EventManager │
│ (singleton) │
│ │
│ - Enriches with │
│ context │
│ - Routes by │
│ category │
└────────┬─────────┘
│
┌────────────┼────────────┐
│ │ │
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌──────────────┐
│Console │ │JSON File│ │RunResults │
│Handler │ │Handler │ │Collector │
│ │ │ │ │ │
│Rich CLI │ │events. │ │run_results. │
│Output │ │json │ │json │
└─────────┘ └─────────┘ └──────────────┘
Core Components
EventManager
Location: agent_actions/logging/core/manager.py
The EventManager is a singleton that:
- Receives all events from the application
- Enriches events with correlation context (workflow_name, correlation_id, etc.)
- Routes events to registered handlers
- Manages handler lifecycle
Key Methods:
manager = EventManager.get() # Get singleton instance
# Register handlers
manager.register(handler)
# Set correlation context
manager.set_context(workflow_name="my_workflow", correlation_id="abc123")
# Or use context manager
with manager.context(workflow_name="my_workflow"):
fire_event(WorkflowStartEvent(...))
Event Types
Location: agent_actions/logging/events/types.py
Events are dataclasses that represent specific occurrences in the system:
Event Categories:
workflow- Workflow lifecycle eventsaction- Action execution eventsbatch- Batch job operationsvalidation- Validation resultsprogress- Progress updatessystem- System-level events
Common Events:
WorkflowStartEvent- Workflow execution beginsWorkflowCompleteEvent- Workflow execution completesWorkflowFailedEvent- Workflow execution failsActionStartEvent- Action begins processingActionCompleteEvent- Action completes successfullyActionSkipEvent- Action skipped (guard condition)ActionFailedEvent- Action fails with errorBatchSubmittedEvent- Batch job submittedBatchCompleteEvent- Batch job completesValidationStartEvent- Validation beginsValidationCompleteEvent- Validation passesValidationErrorEvent- Validation fails
Handlers
Handlers receive events and perform actions (output to console, write to file, etc.).
ConsoleEventHandler
Location: agent_actions/logging/core/handlers/console.py
Formats events for user-facing CLI output using Rich.
Features:
- Color-coded output by event type
- Timestamp display
- Category filtering (show only workflow/action/batch events by default)
- Verbose mode shows all events
Example Output:
19:40:16 | ▶ WORKFLOW qanalabs_quiz_gen started (5 actions, parallel)
19:40:17 | 1/5 START action: extract_raw_qa...
19:40:32 | 1/5 DONE extract_raw_qa (15.23s, 1.2K tokens)
19:40:32 | 2/5 START action: flatten_raw_questions...
19:40:33 | 2/5 DONE flatten_raw_questions (0.12s, tool)
19:42:15 | ✓ WORKFLOW complete (1m 59s, 12.5K tokens, 5 actions)
JSONFileHandler
Location: agent_actions/logging/core/handlers/json_file.py
Writes events to a JSON file in NDJSON format (one JSON object per line).
Features:
- Buffered writes (configurable buffer size)
- Creates parent directories automatically
- NDJSON format for easy streaming and parsing
- Full event payload with metadata
Output Location: {workflow}/agent_io/target/events.json
Example Entry:
{
"event_id": "evt_abc123",
"timestamp": "2024-01-15T19:40:16.123Z",
"event_type": "workflow_start",
"category": "workflow",
"message": "Starting workflow qanalabs_quiz_gen",
"meta": {
"invocation_id": "run_xyz789",
"workflow_name": "qanalabs_quiz_gen",
"correlation_id": "corr_def456"
},
"data": {
"agent_count": 5,
"execution_mode": "parallel"
}
}
RunResultsCollector
Location: agent_actions/logging/events/handlers/run_results.py
Collects workflow execution data and outputs a run_results.json artifact.
Features:
- Tracks action execution status, timing, and token usage
- Aggregates total token counts
- Records output folders for each action
- Captures error messages and skip reasons
Output Location: {workflow}/agent_io/target/run_results.json
Output Schema:
{
"metadata": {
"invocation_id": "run_xyz789",
"workflow_name": "qanalabs_quiz_gen",
"agent_count": 5,
"execution_mode": "parallel",
"started_at": "2024-01-15T19:40:16.000Z",
"completed_at": "2024-01-15T19:42:15.456Z",
"elapsed_time": 119.456,
"status": "success"
},
"results": [
{
"unique_id": "qanalabs_quiz_gen.extract_raw_qa",
"agent_name": "extract_raw_qa",
"agent_index": 1,
"status": "success",
"execution_time": 15.23,
"output_folder": "qanalabs_quiz_gen/agent_io/target/extract_raw_qa",
"record_count": 100,
"tokens": {
"prompt_tokens": 800,
"completion_tokens": 400,
"total_tokens": 1200
},
"timing": {
"started_at": "2024-01-15T19:40:17.000Z",
"completed_at": "2024-01-15T19:40:32.234Z"
}
}
],
"elapsed_time": 119.456,
"tokens": {
"prompt_tokens": 10000,
"completion_tokens": 2500,
"total_tokens": 12500
}
}
Event Flow
1. Initialization
When a workflow runs, LoggerFactory initializes the logging system:
# In cli/run.py
LoggerFactory.initialize(
output_dir=agent_folder, # {workflow}/agent_io
workflow_name=workflow_name,
invocation_id=run_id,
verbose=args.verbose,
force=True,
)
This:
- Creates EventManager singleton
- Registers ConsoleEventHandler
- Registers JSONFileHandler (writes to
{output_dir}/target/events.json) - Registers RunResultsCollector (writes to
{output_dir}/target/run_results.json) - Sets up LoggingBridgeHandler to convert
logger.*calls to events
2. Event Emission
Events can be emitted two ways:
Direct Events:
from agent_actions.logging import fire_event
from agent_actions.logging.events import WorkflowStartEvent
fire_event(WorkflowStartEvent(
message="Starting workflow",
workflow_name="my_workflow",
agent_count=5,
))
Python Logging (automatically converted to events):
import logging
logger = logging.getLogger(__name__)
logger.info("Processing item 123") # Becomes LogEvent
logger.error("Failed to process") # Becomes LogEvent with ERROR level
3. Event Enrichment
EventManager automatically enriches events with correlation context:
# Set context once
manager.set_context(
workflow_name="my_workflow",
correlation_id="abc123",
agent_name="extract_data",
)
# All events inherit this context
fire_event(AgentStartEvent(message="Starting"))
# Event will have workflow_name, correlation_id, agent_name populated
4. Handler Dispatch
EventManager routes events to handlers based on acceptance criteria:
class ConsoleEventHandler:
def accepts(self, event: BaseEvent) -> bool:
# Accept only specific categories
return event.category in self.categories
def handle(self, event: BaseEvent) -> None:
# Format and print event
self.console.print(self.formatter.format(event))
5. Output Generation
Handlers process events and generate output:
- ConsoleHandler → Rich-formatted terminal output
- JSONFileHandler → Append to
events.json - RunResultsCollector → Aggregate data, write
run_results.jsonon flush
Context Propagation
Context is automatically propagated through nested workflow executions:
# In dependency.py (upstream workflow execution)
with manager.context(
workflow_name=upstream_name,
correlation_id=str(uuid4())[:8]
):
# Execute upstream workflow
# All events in this scope inherit the context
upstream_wf.run()
# Context automatically restores after block
This ensures proper attribution of events to the correct workflow in the execution hierarchy.
Testing
Testing Event Emission
from agent_actions.logging.core import EventManager
from agent_actions.logging.events import WorkflowStartEvent
def test_workflow_event():
manager = EventManager.get()
events = []
# Register test handler
def capture(event):
events.append(event)
manager.register_function(capture)
# Fire event
fire_event(WorkflowStartEvent(
message="Test",
workflow_name="test",
))
# Verify
assert len(events) == 1
assert events[0].workflow_name == "test"
Testing Handlers
from agent_actions.logging.core.handlers import ConsoleEventHandler
from agent_actions.logging.events import AgentCompleteEvent
def test_console_handler():
handler = ConsoleEventHandler(categories={"agent"})
event = AgentCompleteEvent(
message="Completed",
agent_name="test",
execution_time=1.5,
)
# Test acceptance
assert handler.accepts(event) is True
# Test handling (use mock console to capture output)
handler.handle(event)
Configuration
Logging Levels
Console verbosity can be controlled:
# Default: INFO level, workflow/agent/batch categories
agac run
# Verbose: DEBUG level, all categories
AGENT_ACTIONS_LOG_LEVEL=DEBUG agac run
# Quiet: WARN level and above (coming in next release)
agac run --quiet
File Handler
File handler writes to:
{workflow}/agent_io/target/events.json(when output_dir is set)logs/agent_actions.log(fallback, if configured)
Configure via environment or LoggingConfig:
from agent_actions.logging.config import LoggingConfig
config = LoggingConfig(
default_level="INFO",
file_handler=FileHandlerConfig(
enabled=True,
path="logs/events.json",
),
)
LoggerFactory.initialize(config=config)
Best Practices
1. Use Typed Events
Don't use BaseEvent directly. Create specific event classes:
# Good
fire_event(AgentCompleteEvent(
message="Agent completed",
agent_name="extract_data",
execution_time=15.2,
))
# Bad
fire_event(BaseEvent(
message="Agent completed",
category="agent",
event_type="agent_complete",
))
2. Clear Messages
Event messages should be human-readable and actionable:
# Good
message="Agent extract_data completed in 15.2s"
# Bad
message="done"
3. Structured Data
Put machine-readable data in the data dict:
fire_event(AgentCompleteEvent(
message="Agent completed",
agent_name="extract_data",
execution_time=15.2,
tokens={"prompt": 800, "completion": 400},
))
The data dict will contain:
{
"agent_name": "extract_data",
"execution_time": 15.2,
"tokens": {"prompt": 800, "completion": 400}
}
4. Use Context Managers
For nested execution contexts:
with manager.context(workflow_name="upstream"):
# All events here get workflow_name="upstream"
execute_upstream()
# Context automatically restores
5. Flush Before Exit
Always flush handlers before exiting:
try:
run_workflow()
finally:
LoggerFactory.flush() # Ensure all buffered events are written
Migration from Legacy Logging
The event system replaces several legacy components:
ServiceLogger (Deprecated)
Before:
from agent_actions.cli.utils.service_logger import ServiceLogger
logger = ServiceLogger()
logger.print("Processing item", style="info")
After:
from agent_actions.logging import fire_event
from agent_actions.logging.events import ProgressEvent
fire_event(ProgressEvent(
message="Processing item",
current=1,
total=100,
))
Direct console.print() (Avoid)
Before:
console.print("[green]✓[/green] Completed")
After:
fire_event(AgentCompleteEvent(
message="Agent completed",
agent_name="my_agent",
))
The ConsoleEventHandler will format it appropriately.
Performance Considerations
Buffering
JSONFileHandler buffers events before writing:
json_handler = JSONFileHandler(
file_path=log_file,
buffer_size=10, # Write every 10 events
)
Handler Acceptance
Handlers filter events early to avoid unnecessary processing:
def accepts(self, event: BaseEvent) -> bool:
# Quick check before handle() is called
return event.category in self.categories
Context Copying
EventManager copies context to avoid mutation issues:
# Context is copied to each event
event.meta.invocation_id = self._context["invocation_id"]
event.meta.workflow_name = self._context["workflow_name"]
See Also
- API Reference - Detailed API documentation