feat: add dashboard data processing + WebSocket agent events (Phase 2 complete)

Dashboard Data Processing (data_processing.py):
- ModelName() — maps raw provider/model strings to display names
- BuildDailyChart() — aggregates cost/token/call data into daily chart buckets
- BuildAlerts() — evaluates cost, cron, context, gateway, memory alert conditions
- BuildCostBreakdown() — ranks models by cost descending
- FmtTokens() — formats token counts (1.2M, 1.5K, etc.)
- round2(), sum_bucket_costs(), TitleCase() — utility functions
- All pure Python, no I/O or RPC — transforms data from DB

WebSocket Agent Events (ws.py + event_parser.py):
- WebSocket endpoint at /ws/agents for real-time agent event broadcasting
- On connect: sends last 50 session events as initial state
- Background task polls SessionEvent table every 2s for new events
- Parses events into dashboard format (agentStatus, agentToolStart, etc.)
- broadcast_event() for other services to push events in real-time
- Lifespan integration: start/stop broadcast task with app startup/shutdown

Fixed in Ripley review:
- Removed duplicate method definitions in WebSocketConnectionManager
- Fixed broken import in main.py (dangling tuple)
- Removed inline import re in event_parser.py (already at module level)
- Fixed duplicate memory_search/memory_get entries in format_tool_status
- Used asyncio.get_running_loop() instead of deprecated get_event_loop()
- Cleaned up broadcast_to_all with proper broken connection cleanup
This commit is contained in:
Ripley 2026-05-10 21:01:38 -05:00
parent 22bc6bc36e
commit 461ccbcb88
5 changed files with 937 additions and 1 deletions

246
src/backend/app/api/ws.py Normal file
View File

@ -0,0 +1,246 @@
"""WebSocket API for real-time agent events.
This module implements a WebSocket endpoint at /ws/agents that broadcasts
real-time agent activity events to connected browser clients.
The WebSocket:
- Accepts connections from browser clients
- Sends initial state snapshot on connect
- Background task polls SessionEvent table every 2 seconds
- Broadcasts parsed events to all connected clients
- Cleans up on disconnect
"""
from __future__ import annotations
import asyncio
import json
from datetime import datetime, timedelta, timezone
from typing import Any
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
from sqlmodel import select
from app.core.logging import get_logger
from app.db.session import async_session_maker
from app.models.monitoring import SessionEvent
from app.services.monitoring.event_parser import parse_session_event
logger = get_logger(__name__)
router = APIRouter()
class WebSocketConnectionManager:
"""Manages WebSocket connections and event broadcasting."""
def __init__(self) -> None:
self.active_connections: set[WebSocket] = set()
self._broadcast_task: asyncio.Task[None] | None = None
async def connect(self, websocket: WebSocket) -> None:
"""Accept a new WebSocket connection and send initial state."""
await websocket.accept()
self.active_connections.add(websocket)
logger.info(
"app.websocket.client_connected total=%d",
len(self.active_connections),
)
await self._send_initial_state(websocket)
async def disconnect(self, websocket: WebSocket) -> None:
"""Remove a WebSocket connection."""
self.active_connections.discard(websocket)
logger.info(
"app.websocket.client_disconnected total=%d",
len(self.active_connections),
)
async def broadcast_to_all(self, event: dict[str, Any]) -> None:
"""Send an event to all connected clients."""
data = json.dumps(event)
disconnected: set[WebSocket] = set()
for connection in list(self.active_connections):
try:
await connection.send_text(data)
except Exception:
disconnected.add(connection)
# Clean up broken connections
for conn in disconnected:
self.active_connections.discard(conn)
async def _send_initial_state(self, websocket: WebSocket) -> None:
"""Send recent session events as initial state snapshot."""
try:
async with async_session_maker() as session:
stmt = (
select(SessionEvent)
.order_by(SessionEvent.created_at.desc())
.limit(50)
)
result = await session.execute(stmt)
events = result.scalars().all()
# Send parsed events oldest first
for event in reversed(events):
try:
dashboard_events = parse_session_event(event)
for de in dashboard_events:
await websocket.send_json(de)
except Exception as exc:
logger.warning(
"app.websocket.parse_error event_id=%s error=%s",
event.id,
exc,
)
await websocket.send_json({
"type": "initialState",
"timestamp": datetime.now(timezone.utc).isoformat(),
})
except Exception as exc:
logger.error(
"app.websocket.initial_state_error error=%s", exc, exc_info=True
)
async def _broadcast_loop(self) -> None:
"""Background task: poll for new session events and broadcast them."""
last_poll_time: datetime | None = None
poll_interval = 2 # seconds
while True:
try:
await asyncio.sleep(poll_interval)
async with async_session_maker() as session:
if last_poll_time:
stmt = (
select(SessionEvent)
.where(SessionEvent.created_at > last_poll_time)
.order_by(SessionEvent.created_at.asc())
)
else:
# First poll: get events from last hour
cutoff = datetime.now(timezone.utc).replace(tzinfo=None) - timedelta(hours=1)
stmt = (
select(SessionEvent)
.where(SessionEvent.created_at > cutoff)
.order_by(SessionEvent.created_at.asc())
)
result = await session.execute(stmt)
events = result.scalars().all()
if events:
last_poll_time = max(e.created_at for e in events)
for event in events:
try:
dashboard_events = parse_session_event(event)
for de in dashboard_events:
await self.broadcast_to_all(de)
except Exception as exc:
logger.warning(
"app.websocket.parse_error event_id=%s error=%s",
event.id,
exc,
)
except asyncio.CancelledError:
raise
except Exception as exc:
logger.error(
"app.websocket.broadcast_loop_error error=%s",
exc,
exc_info=True,
)
def start(self) -> None:
"""Start the background broadcast task."""
loop = asyncio.get_running_loop()
if self._broadcast_task is None or self._broadcast_task.done():
self._broadcast_task = loop.create_task(self._broadcast_loop())
logger.info("app.websocket.broadcast_started")
async def stop(self) -> None:
"""Stop the background broadcast task."""
if self._broadcast_task:
self._broadcast_task.cancel()
try:
await self._broadcast_task
except asyncio.CancelledError:
pass
self._broadcast_task = None
logger.info("app.websocket.broadcast_stopped")
# Global connection manager instance
ws_manager = WebSocketConnectionManager()
@router.websocket("/ws/agents")
async def websocket_endpoint(websocket: WebSocket) -> None:
"""WebSocket endpoint for real-time agent events.
Connect at: ws://localhost:8080/ws/agents
Event types sent:
- agentStatus: agent became active/idle/stalled
- agentToolStart: agent started using a tool
- agentToolDone: agent finished using a tool
- agentChat: agent sent a text message
- agentTask: new user task arrived
- sessionStart: new session started
- subagentSpawned: sub-agent spawned
- subagentDespawned: sub-agent completed
- initialState: initial state snapshot on connect
"""
await ws_manager.connect(websocket)
try:
while True:
try:
data = await asyncio.wait_for(
websocket.receive_text(),
timeout=30,
)
# Client can send pings or commands — currently just keep-alive
if data:
logger.debug("app.websocket.recv data=%s", data[:100])
except asyncio.TimeoutError:
# Send ping to keep connection alive
try:
await websocket.send_text("ping")
except Exception:
break
except WebSocketDisconnect:
pass
except Exception as exc:
logger.error("app.websocket.error error=%s", exc, exc_info=True)
finally:
await ws_manager.disconnect(websocket)
def broadcast_event(event: dict[str, Any]) -> None:
"""Queue an event for broadcasting to all connected clients.
This is a synchronous function that can be called from any context
(including background tasks) to push events to WebSocket clients.
Args:
event: Dashboard event dict to broadcast
"""
try:
loop = asyncio.get_running_loop()
loop.create_task(ws_manager.broadcast_to_all(event))
except RuntimeError:
logger.debug("app.websocket.no_loop skipping broadcast")
def start_websocket_broadcast() -> None:
"""Start the WebSocket broadcast background task."""
ws_manager.start()
async def stop_websocket_broadcast() -> None:
"""Stop the WebSocket broadcast background task."""
await ws_manager.stop()

View File

@ -33,6 +33,12 @@ from app.api.tags import router as tags_router
from app.api.task_custom_fields import router as task_custom_fields_router from app.api.task_custom_fields import router as task_custom_fields_router
from app.api.tasks import router as tasks_router from app.api.tasks import router as tasks_router
from app.api.users import router as users_router from app.api.users import router as users_router
from app.api.ws import (
broadcast_event,
router as ws_router,
start_websocket_broadcast,
stop_websocket_broadcast,
)
from app.core.config import settings from app.core.config import settings
from app.core.error_handling import install_error_handling from app.core.error_handling import install_error_handling
from app.core.logging import configure_logging, get_logger from app.core.logging import configure_logging, get_logger
@ -467,6 +473,10 @@ async def lifespan(_: FastAPI) -> AsyncIterator[None]:
else: else:
logger.info("app.lifecycle.gateway_collector.no_gateways") logger.info("app.lifecycle.gateway_collector.no_gateways")
# Start WebSocket broadcast task
start_websocket_broadcast()
logger.info("app.lifecycle.websocket_broadcast.start")
if settings.rate_limit_backend == RateLimitBackend.REDIS: if settings.rate_limit_backend == RateLimitBackend.REDIS:
validate_rate_limit_redis(settings.rate_limit_redis_url) validate_rate_limit_redis(settings.rate_limit_redis_url)
logger.info("app.lifecycle.rate_limit backend=redis") logger.info("app.lifecycle.rate_limit backend=redis")
@ -487,6 +497,16 @@ async def lifespan(_: FastAPI) -> AsyncIterator[None]:
exc, exc,
exc_info=True, exc_info=True,
) )
# Shutdown WebSocket broadcast
logger.info("app.lifecycle.websocket_broadcast.shutdown")
try:
await stop_websocket_broadcast()
except Exception as exc:
logger.error(
"app.lifecycle.websocket_broadcast.shutdown_error error=%s",
exc,
exc_info=True,
)
logger.info("app.lifecycle.stopped") logger.info("app.lifecycle.stopped")
@ -600,5 +620,8 @@ api_v1.include_router(tags_router)
api_v1.include_router(users_router) api_v1.include_router(users_router)
app.include_router(api_v1) app.include_router(api_v1)
# Include WebSocket router at root path (no /api prefix)
app.include_router(ws_router)
add_pagination(app) add_pagination(app)
logger.debug("app.routes.registered count=%s", len(app.routes)) logger.debug("app.routes.registered count=%s", len(app.routes))

View File

@ -5,6 +5,34 @@ OpenClaw Gateway RPC endpoints and stores the results in Mission Control's
monitoring models (CostSnapshot, CronJobStatus, SessionEvent, etc.). monitoring models (CostSnapshot, CronJobStatus, SessionEvent, etc.).
""" """
from app.services.monitoring.data_processing import (
BuildAlerts,
BuildCostBreakdown,
BuildDailyChart,
FmtTokens,
ModelName,
TitleCase,
round2,
sum_bucket_costs,
)
from app.services.monitoring.event_parser import (
extract_task_from_message,
format_tool_status,
parse_session_event,
)
from app.services.monitoring.gateway_collector import GatewayCollectorService from app.services.monitoring.gateway_collector import GatewayCollectorService
__all__ = ("GatewayCollectorService",) __all__ = (
"GatewayCollectorService",
"BuildAlerts",
"BuildCostBreakdown",
"BuildDailyChart",
"FmtTokens",
"ModelName",
"TitleCase",
"round2",
"sum_bucket_costs",
"extract_task_from_message",
"format_tool_status",
"parse_session_event",
)

View File

@ -0,0 +1,320 @@
"""Dashboard data processing utilities for Mission Control monitoring.
This module provides pure utility functions to transform raw monitoring data
into dashboard-ready formats. These functions are pure (no I/O, no RPC calls)
and are called by API endpoints to transform database-stored data.
"""
from __future__ import annotations
import math
from collections import defaultdict
def ModelName(model: str) -> str:
"""Map raw provider/model strings to display names.
Ported from Go's apprefresh.ModelName().
Example: "anthropic/claude-opus-4-6" -> "Claude Opus 4.6"
"""
ml = model.lower()
if "/" in ml:
parts = ml.split("/", 2)
ml = parts[1]
if "opus-4-6" in ml:
return "Claude Opus 4.6"
if "opus" in ml:
return "Claude Opus 4.5"
if "sonnet" in ml:
return "Claude Sonnet"
if "haiku" in ml:
return "Claude Haiku"
if "grok-4-fast" in ml:
return "Grok 4 Fast"
if "grok-4" in ml or "grok4" in ml:
return "Grok 4"
if "gemini-2.5-pro" in ml or "gemini-pro" in ml:
return "Gemini 2.5 Pro"
if "gemini-3-flash" in ml:
return "Gemini 3 Flash"
if "gemini-2.5-flash" in ml:
return "Gemini 2.5 Flash"
if "gemini" in ml or "flash" in ml:
return "Gemini Flash"
if "minimax-m2.5" in ml:
return "MiniMax M2.5"
if "minimax-m2" in ml or "minimax" in ml:
return "MiniMax"
if "glm-5" in ml:
return "GLM-5"
if "glm-4" in ml:
return "GLM-4"
if "k2p5" in ml or "kimi" in ml:
return "Kimi K2.5"
if "gpt-5.3-codex" in ml:
return "GPT-5.3 Codex"
if "gpt-5" in ml:
return "GPT-5"
if "gpt-4o" in ml:
return "GPT-4o"
if "gpt-4" in ml:
return "GPT-4"
if "o1" in ml:
return "O1"
if "o3" in ml:
return "O3"
return model
def TitleCase(s: str) -> str:
"""Uppercase the first character of an ASCII string.
Ported from Go's apprefresh.TitleCase().
"""
if not s:
return s
first = s[0]
if 'a' <= first <= 'z':
return chr(ord(first) - 32) + s[1:]
return s
def round2(v: float) -> float:
"""Round to 2 decimal places."""
return round(v * 100) / 100
def sum_bucket_costs(buckets: dict[str, dict]) -> float:
"""Sum the `cost` field from a dict of TokenBucket-like dicts.
Args:
buckets: Dict mapping model names to TokenBucket-like dicts with 'cost' field
Returns:
Total cost from all buckets
"""
total = 0.0
for bucket in buckets.values():
if isinstance(bucket, dict):
total += bucket.get("cost", 0.0)
return total
def BuildDailyChart(
daily_costs: dict[str, dict[str, float]],
daily_tokens: dict[str, dict[str, int]],
daily_calls: dict[str, dict[str, int]],
daily_subagent_costs: dict[str, float],
daily_subagent_count: dict[str, int],
days: int = 30
) -> list[dict]:
"""Aggregate cost/token/call data into daily buckets for chart rendering.
Ported from Go's apprefresh.BuildDailyChart().
Args:
daily_costs: Dict of {date: {model: cost}}
daily_tokens: Dict of {date: {model: tokens}}
daily_calls: Dict of {date: {model: calls}}
daily_subagent_costs: Dict of {date: subagent_cost}
daily_subagent_count: Dict of {date: subagent_count}
days: Number of days to include (default 30)
Returns:
List of dicts with keys: date, label, total, tokens, calls,
subagent_cost, subagent_runs, models
"""
from datetime import date, timedelta
# Generate last N days
today = date.today()
chart_dates = []
for i in range(days - 1, -1, -1):
chart_dates.append((today - timedelta(days=i)).strftime("%Y-%m-%d"))
# Find top 6 models by cost
model_totals: dict[str, float] = defaultdict(float)
for d in chart_dates:
if d in daily_costs:
for m, c in daily_costs[d].items():
model_totals[m] += c
# Sort models by total cost descending
sorted_models = sorted(model_totals.items(), key=lambda x: x[1], reverse=True)
top_models = {m for m, _ in sorted_models[:6]}
chart = []
for d in chart_dates:
day_costs = daily_costs.get(d, {})
day_tokens = daily_tokens.get(d, {})
day_calls = daily_calls.get(d, {})
total_cost = sum(day_costs.values())
total_tokens = sum(day_tokens.values())
total_calls = sum(day_calls.values())
models = {}
other_cost = 0.0
for m, c in day_costs.items():
if m in top_models:
models[m] = round2(c)
else:
other_cost += c
if other_cost > 0:
models["Other"] = round2(other_cost)
chart.append({
"date": d,
"label": d[5:], # MM-DD format
"total": round2(total_cost),
"tokens": total_tokens,
"calls": total_calls,
"subagent_cost": round2(daily_subagent_costs.get(d, 0.0)),
"subagent_runs": daily_subagent_count.get(d, 0),
"models": models,
})
return chart
def BuildAlerts(
total_cost_today: float,
cost_high: float,
cost_warn: float,
crons: list[dict],
sessions: list[dict],
context_threshold: float,
gateway_status: dict,
memory_threshold_kb: int
) -> list[dict]:
"""Evaluate alert conditions and return list of alerts.
Ported from Go's apprefresh.BuildAlerts().
Args:
total_cost_today: Current day's total cost
cost_high: High cost threshold
cost_warn: Warning cost threshold
crons: List of cron job status dicts with 'lastStatus' field
sessions: List of session dicts with 'contextPct' field
context_threshold: Context usage percentage threshold
gateway_status: Gateway health dict with 'status' and 'rss' fields
memory_threshold_kb: Memory threshold in KB
Returns:
List of alert dicts with keys: type, icon, message, severity
"""
alerts = []
# Cost alerts
if total_cost_today > cost_high:
alerts.append({
"type": "warning",
"icon": "💰",
"message": f"High daily cost: ${total_cost_today:.2f}",
"severity": "high",
})
elif total_cost_today > cost_warn:
alerts.append({
"type": "info",
"icon": "💵",
"message": f"Daily cost above ${cost_warn:.0f}: ${total_cost_today:.2f}",
"severity": "medium",
})
# Cron alerts
for cron in crons:
if cron.get("lastStatus") == "error":
name = cron.get("name", "Unknown")
alerts.append({
"type": "error",
"icon": "",
"message": f"Cron failed: {name}",
"severity": "high",
})
# Context alerts
for session in sessions:
pct = session.get("contextPct", 0.0)
if pct > context_threshold:
name = session.get("name", "Unknown")
if len(name) > 30:
name = name[:30]
alerts.append({
"type": "warning",
"icon": "⚠️",
"message": f"High context: {name} ({pct:.0f}%)",
"severity": "medium",
})
# Gateway offline alerts
if gateway_status.get("status") == "offline":
alerts.append({
"type": "error",
"icon": "🔴",
"message": "Gateway is offline",
"severity": "critical",
})
# Memory alerts
rss = gateway_status.get("rss", 0)
if isinstance(rss, int) and rss > memory_threshold_kb:
memory = gateway_status.get("memory", f"{rss} KB")
alerts.append({
"type": "warning",
"icon": "🧠",
"message": f"High memory usage: {memory}",
"severity": "medium",
})
return alerts
def BuildCostBreakdown(model_costs: dict[str, dict]) -> list[dict]:
"""Rank models by cost.
Ported from Go's apprefresh.BuildCostBreakdown().
Args:
model_costs: Dict mapping model names to TokenBucket-like dicts
Returns:
Sorted list of {model, cost} dicts, descending by cost
"""
pairs = []
for model, bucket in model_costs.items():
if isinstance(bucket, dict):
cost = bucket.get("cost", 0.0)
if cost > 0:
pairs.append((model, cost))
# Sort by cost descending
pairs.sort(key=lambda x: x[1], reverse=True)
return [
{"model": m, "cost": round2(c)}
for m, c in pairs
]
def FmtTokens(count: int) -> str:
"""Format token counts for display.
Ported from Go's apprefresh.FmtTokens().
Examples:
1234567 -> "1.2M"
500 -> "500"
1500 -> "1.5K"
1000000 -> "1.0M"
1000000000 -> "1.0B"
"""
if count >= 1_000_000_000:
return f"{count / 1_000_000_000:.1f}B"
if count >= 1_000_000:
return f"{count / 1_000_000:.1f}M"
if count >= 1_000:
return f"{count / 1_000:.1f}K"
return str(count)

View File

@ -0,0 +1,319 @@
"""Event parsing utilities for WebSocket agent events.
This module provides functions to parse SessionEvent records from the database
and convert them into dashboard-ready event formats for real-time broadcasting.
Ported from TypeScript in sources/pixel-agents/server/openclawParser.ts
"""
from __future__ import annotations
import json
import re
import asyncio
from typing import Any
def format_tool_status(tool_name: str, args: dict[str, Any]) -> str:
"""Format a tool call into a human-readable status string.
Ported from TypeScript formatToolStatus().
Args:
tool_name: The tool name (e.g., "Read", "Edit", "exec")
args: Tool arguments dictionary
Returns:
Human-readable status string
"""
if tool_name in ("Read", "read"):
file_path = args.get("file_path") or args.get("path")
if file_path:
return f"Reading {file_path}"
return "Reading"
if tool_name in ("Edit", "edit"):
file_path = args.get("file_path") or args.get("path")
if file_path:
return f"Editing {file_path}"
return "Editing"
if tool_name in ("Write", "write"):
file_path = args.get("file_path") or args.get("path")
if file_path:
return f"Writing {file_path}"
return "Writing"
if tool_name == "exec":
return "Running command"
if tool_name == "web_fetch":
url = args.get("url", "")
if isinstance(url, str):
url = url[:60] if len(url) > 60 else url
return f"Fetching {url}"
if tool_name == "browser":
return "Using browser"
if tool_name == "sessions_spawn":
return "Spawning sub-agent"
if tool_name == "memory_search":
return "Searching memory"
if tool_name == "memory_get":
return "Reading memory"
if tool_name == "message":
return "Sending message"
if tool_name == "image":
return "Analyzing image"
if tool_name == "pdf":
return "Analyzing PDF"
if tool_name == "tts":
return "Speaking"
return f"Using {tool_name}"
def extract_task_from_message(content: Any) -> str:
"""Extract a meaningful task description from a user message.
Ported from TypeScript extractTaskFromUserMessage().
OpenClaw wraps user messages in metadata - we strip that to find actual content.
Args:
content: The message content (string or array of blocks)
Returns:
Cleaned task text, or empty string if no content found
"""
text = ""
if isinstance(content, str):
text = content
elif isinstance(content, list):
for block in content:
if isinstance(block, dict) and block.get("type") == "text":
if isinstance(block.get("text"), str):
text = block["text"]
break
if not text:
return ""
# Strip OpenClaw conversation metadata wrapper
cleaned = text
# Strip inter-session message headers
inter_session_match = re.match(
r"\[Inter-session message\].*?\n.*?\n([\s\S]*)",
cleaned,
re.MULTILINE
)
if inter_session_match:
cleaned = inter_session_match.group(1).strip()
# Strip "Conversation info" metadata block
conv_match = re.match(
r"Conversation info \(untrusted.*?\n```json\n[\s\S]*?```\n([\s\S]*)",
cleaned,
re.MULTILINE
)
if conv_match:
cleaned = conv_match.group(1).strip()
# Strip "Sender" metadata block
sender_match = re.match(
r"Sender \(untrusted.*?\n```json\n[\s\S]*?```\n([\s\S]*)",
cleaned,
re.MULTILINE
)
if sender_match:
cleaned = sender_match.group(1).strip()
# Strip chat history block
hist_match = re.match(
r"Chat history.*?\n```json\n[\s\S]*?```\n([\s\S]*)",
cleaned,
re.MULTILINE
)
if hist_match:
cleaned = hist_match.group(1).strip()
# Strip @mentions at the start
cleaned = re.sub(r"^@\S+\s*", "", cleaned).strip()
# Strip untrusted context blocks
cleaned = re.sub(r"Untrusted context \(metadata[^)]*\):\s*", "", cleaned).strip()
cleaned = re.sub(
r"<<<EXTERNAL_UNTRUSTED_CONTENT[\s\S]*?<<<END_EXTERNAL_UNTRUSTED_CONTENT[^>]*>>>",
"",
cleaned
).strip()
# If it's a sessions_send payload, try to get the message body
try:
parsed = json.loads(cleaned)
if isinstance(parsed, dict) and (parsed.get("messages") or parsed.get("message")):
cleaned = parsed.get("message", "")
if isinstance(cleaned, list):
cleaned = ""
elif isinstance(cleaned, dict):
cleaned = ""
except json.JSONDecodeError:
pass
# Truncate to something reasonable
if len(cleaned) > 150:
cleaned = cleaned[:147] + "..."
return cleaned
def parse_session_event(event: Any) -> list[dict[str, Any]]:
"""Parse a SessionEvent model instance into dashboard events.
Ported from TypeScript parseOpenClawLine().
Takes a SessionEvent (SQLModel instance) and returns a list of dashboard events.
Args:
event: SessionEvent model instance with fields:
- event_type: "message", "session", "custom", etc.
- metadata_: JSON dict containing the full record data
Returns:
List of dashboard event dicts with type and relevant fields
"""
events = []
try:
# Parse metadata as the record
metadata = event.metadata_ or {}
record = metadata if isinstance(metadata, dict) else {}
if not record:
return events
if record.get("type") == "message":
msg = record.get("message") or {}
if isinstance(msg, dict):
role = msg.get("role")
content = msg.get("content")
if role == "assistant" and isinstance(content, list):
has_tool_call = False
chat_text = ""
for block in content:
if not isinstance(block, dict):
continue
block_type = block.get("type")
if block_type == "text" and isinstance(block.get("text"), str):
chat_text = block["text"].strip()
chat_text = re.sub(r"^\[\[reply_to[^\]]*\]\]\s*", "", chat_text).strip()
if block_type == "toolCall" and block.get("id"):
has_tool_call = True
tool_id = block["id"]
tool_name = block.get("name", "unknown")
args = block.get("arguments", {}) or {}
status = format_tool_status(tool_name, args)
events.append({
"type": "agentToolStart",
"id": event.agent_id or "unknown",
"toolId": tool_id,
"status": status,
})
# Detect sub-agent spawns
if tool_name == "sessions_spawn":
label = args.get("label") or args.get("agentId") or "sub-agent"
task = args.get("task", "")
events.append({
"type": "subagentSpawned",
"id": event.agent_id or "unknown",
"toolId": tool_id,
"label": label,
"task": task[:200] if task else "",
})
if block_type == "toolResult" and block.get("toolCallId"):
tool_id = block["toolCallId"]
events.append({
"type": "agentToolDone",
"id": event.agent_id or "unknown",
"toolId": tool_id,
})
if chat_text and chat_text not in ("NO_REPLY", "HEARTBEAT_OK", "ANNOUNCE_SKIP", "REPLY_SKIP"):
events.append({
"type": "agentChat",
"id": event.agent_id or "unknown",
"message": chat_text[:2000] if len(chat_text) > 2000 else chat_text,
})
if has_tool_call:
events.append({
"type": "agentStatus",
"id": event.agent_id or "unknown",
"status": "active",
})
elif role == "toolResult" and isinstance(content, dict):
# Tool completed
tool_id = content.get("toolCallId")
if tool_id:
events.append({
"type": "agentToolDone",
"id": event.agent_id or "unknown",
"toolId": tool_id,
})
elif role == "user":
# New user message - new turn starting
task_text = extract_task_from_message(content) if content else ""
if task_text:
events.append({
"type": "agentTask",
"id": event.agent_id or "unknown",
"task": task_text,
})
events.append({
"type": "agentToolsClear",
"id": event.agent_id or "unknown",
})
events.append({
"type": "agentStatus",
"id": event.agent_id or "unknown",
"status": "active",
})
elif record.get("type") == "session":
# Session start record
events.append({
"type": "sessionStart",
"id": event.agent_id or "unknown",
})
events.append({
"type": "agentStatus",
"id": event.agent_id or "unknown",
"status": "active",
})
elif record.get("type") == "custom":
# Handle custom events if needed
pass
except Exception:
# Ignore malformed events
pass
return events