diff --git a/src/backend/app/api/ws.py b/src/backend/app/api/ws.py new file mode 100644 index 0000000..6555229 --- /dev/null +++ b/src/backend/app/api/ws.py @@ -0,0 +1,246 @@ +"""WebSocket API for real-time agent events. + +This module implements a WebSocket endpoint at /ws/agents that broadcasts +real-time agent activity events to connected browser clients. + +The WebSocket: +- Accepts connections from browser clients +- Sends initial state snapshot on connect +- Background task polls SessionEvent table every 2 seconds +- Broadcasts parsed events to all connected clients +- Cleans up on disconnect +""" + +from __future__ import annotations + +import asyncio +import json +from datetime import datetime, timedelta, timezone +from typing import Any + +from fastapi import APIRouter, WebSocket, WebSocketDisconnect +from sqlmodel import select + +from app.core.logging import get_logger +from app.db.session import async_session_maker +from app.models.monitoring import SessionEvent +from app.services.monitoring.event_parser import parse_session_event + +logger = get_logger(__name__) + +router = APIRouter() + + +class WebSocketConnectionManager: + """Manages WebSocket connections and event broadcasting.""" + + def __init__(self) -> None: + self.active_connections: set[WebSocket] = set() + self._broadcast_task: asyncio.Task[None] | None = None + + async def connect(self, websocket: WebSocket) -> None: + """Accept a new WebSocket connection and send initial state.""" + await websocket.accept() + self.active_connections.add(websocket) + logger.info( + "app.websocket.client_connected total=%d", + len(self.active_connections), + ) + await self._send_initial_state(websocket) + + async def disconnect(self, websocket: WebSocket) -> None: + """Remove a WebSocket connection.""" + self.active_connections.discard(websocket) + logger.info( + "app.websocket.client_disconnected total=%d", + len(self.active_connections), + ) + + async def broadcast_to_all(self, event: dict[str, Any]) -> None: + """Send an event to all connected clients.""" + data = json.dumps(event) + disconnected: set[WebSocket] = set() + for connection in list(self.active_connections): + try: + await connection.send_text(data) + except Exception: + disconnected.add(connection) + # Clean up broken connections + for conn in disconnected: + self.active_connections.discard(conn) + + async def _send_initial_state(self, websocket: WebSocket) -> None: + """Send recent session events as initial state snapshot.""" + try: + async with async_session_maker() as session: + stmt = ( + select(SessionEvent) + .order_by(SessionEvent.created_at.desc()) + .limit(50) + ) + result = await session.execute(stmt) + events = result.scalars().all() + + # Send parsed events oldest first + for event in reversed(events): + try: + dashboard_events = parse_session_event(event) + for de in dashboard_events: + await websocket.send_json(de) + except Exception as exc: + logger.warning( + "app.websocket.parse_error event_id=%s error=%s", + event.id, + exc, + ) + + await websocket.send_json({ + "type": "initialState", + "timestamp": datetime.now(timezone.utc).isoformat(), + }) + except Exception as exc: + logger.error( + "app.websocket.initial_state_error error=%s", exc, exc_info=True + ) + + async def _broadcast_loop(self) -> None: + """Background task: poll for new session events and broadcast them.""" + last_poll_time: datetime | None = None + poll_interval = 2 # seconds + + while True: + try: + await asyncio.sleep(poll_interval) + + async with async_session_maker() as session: + if last_poll_time: + stmt = ( + select(SessionEvent) + .where(SessionEvent.created_at > last_poll_time) + .order_by(SessionEvent.created_at.asc()) + ) + else: + # First poll: get events from last hour + cutoff = datetime.now(timezone.utc).replace(tzinfo=None) - timedelta(hours=1) + stmt = ( + select(SessionEvent) + .where(SessionEvent.created_at > cutoff) + .order_by(SessionEvent.created_at.asc()) + ) + + result = await session.execute(stmt) + events = result.scalars().all() + + if events: + last_poll_time = max(e.created_at for e in events) + + for event in events: + try: + dashboard_events = parse_session_event(event) + for de in dashboard_events: + await self.broadcast_to_all(de) + except Exception as exc: + logger.warning( + "app.websocket.parse_error event_id=%s error=%s", + event.id, + exc, + ) + + except asyncio.CancelledError: + raise + except Exception as exc: + logger.error( + "app.websocket.broadcast_loop_error error=%s", + exc, + exc_info=True, + ) + + def start(self) -> None: + """Start the background broadcast task.""" + loop = asyncio.get_running_loop() + if self._broadcast_task is None or self._broadcast_task.done(): + self._broadcast_task = loop.create_task(self._broadcast_loop()) + logger.info("app.websocket.broadcast_started") + + async def stop(self) -> None: + """Stop the background broadcast task.""" + if self._broadcast_task: + self._broadcast_task.cancel() + try: + await self._broadcast_task + except asyncio.CancelledError: + pass + self._broadcast_task = None + logger.info("app.websocket.broadcast_stopped") + + +# Global connection manager instance +ws_manager = WebSocketConnectionManager() + + +@router.websocket("/ws/agents") +async def websocket_endpoint(websocket: WebSocket) -> None: + """WebSocket endpoint for real-time agent events. + + Connect at: ws://localhost:8080/ws/agents + + Event types sent: + - agentStatus: agent became active/idle/stalled + - agentToolStart: agent started using a tool + - agentToolDone: agent finished using a tool + - agentChat: agent sent a text message + - agentTask: new user task arrived + - sessionStart: new session started + - subagentSpawned: sub-agent spawned + - subagentDespawned: sub-agent completed + - initialState: initial state snapshot on connect + """ + await ws_manager.connect(websocket) + try: + while True: + try: + data = await asyncio.wait_for( + websocket.receive_text(), + timeout=30, + ) + # Client can send pings or commands — currently just keep-alive + if data: + logger.debug("app.websocket.recv data=%s", data[:100]) + except asyncio.TimeoutError: + # Send ping to keep connection alive + try: + await websocket.send_text("ping") + except Exception: + break + except WebSocketDisconnect: + pass + except Exception as exc: + logger.error("app.websocket.error error=%s", exc, exc_info=True) + finally: + await ws_manager.disconnect(websocket) + + +def broadcast_event(event: dict[str, Any]) -> None: + """Queue an event for broadcasting to all connected clients. + + This is a synchronous function that can be called from any context + (including background tasks) to push events to WebSocket clients. + + Args: + event: Dashboard event dict to broadcast + """ + try: + loop = asyncio.get_running_loop() + loop.create_task(ws_manager.broadcast_to_all(event)) + except RuntimeError: + logger.debug("app.websocket.no_loop skipping broadcast") + + +def start_websocket_broadcast() -> None: + """Start the WebSocket broadcast background task.""" + ws_manager.start() + + +async def stop_websocket_broadcast() -> None: + """Stop the WebSocket broadcast background task.""" + await ws_manager.stop() \ No newline at end of file diff --git a/src/backend/app/main.py b/src/backend/app/main.py index ddb3f0e..d354ffc 100644 --- a/src/backend/app/main.py +++ b/src/backend/app/main.py @@ -33,6 +33,12 @@ from app.api.tags import router as tags_router from app.api.task_custom_fields import router as task_custom_fields_router from app.api.tasks import router as tasks_router from app.api.users import router as users_router +from app.api.ws import ( + broadcast_event, + router as ws_router, + start_websocket_broadcast, + stop_websocket_broadcast, +) from app.core.config import settings from app.core.error_handling import install_error_handling from app.core.logging import configure_logging, get_logger @@ -467,6 +473,10 @@ async def lifespan(_: FastAPI) -> AsyncIterator[None]: else: logger.info("app.lifecycle.gateway_collector.no_gateways") + # Start WebSocket broadcast task + start_websocket_broadcast() + logger.info("app.lifecycle.websocket_broadcast.start") + if settings.rate_limit_backend == RateLimitBackend.REDIS: validate_rate_limit_redis(settings.rate_limit_redis_url) logger.info("app.lifecycle.rate_limit backend=redis") @@ -487,6 +497,16 @@ async def lifespan(_: FastAPI) -> AsyncIterator[None]: exc, exc_info=True, ) + # Shutdown WebSocket broadcast + logger.info("app.lifecycle.websocket_broadcast.shutdown") + try: + await stop_websocket_broadcast() + except Exception as exc: + logger.error( + "app.lifecycle.websocket_broadcast.shutdown_error error=%s", + exc, + exc_info=True, + ) logger.info("app.lifecycle.stopped") @@ -600,5 +620,8 @@ api_v1.include_router(tags_router) api_v1.include_router(users_router) app.include_router(api_v1) +# Include WebSocket router at root path (no /api prefix) +app.include_router(ws_router) + add_pagination(app) logger.debug("app.routes.registered count=%s", len(app.routes)) diff --git a/src/backend/app/services/monitoring/__init__.py b/src/backend/app/services/monitoring/__init__.py index 7cbd740..b5333ad 100644 --- a/src/backend/app/services/monitoring/__init__.py +++ b/src/backend/app/services/monitoring/__init__.py @@ -5,6 +5,34 @@ OpenClaw Gateway RPC endpoints and stores the results in Mission Control's monitoring models (CostSnapshot, CronJobStatus, SessionEvent, etc.). """ +from app.services.monitoring.data_processing import ( + BuildAlerts, + BuildCostBreakdown, + BuildDailyChart, + FmtTokens, + ModelName, + TitleCase, + round2, + sum_bucket_costs, +) +from app.services.monitoring.event_parser import ( + extract_task_from_message, + format_tool_status, + parse_session_event, +) from app.services.monitoring.gateway_collector import GatewayCollectorService -__all__ = ("GatewayCollectorService",) +__all__ = ( + "GatewayCollectorService", + "BuildAlerts", + "BuildCostBreakdown", + "BuildDailyChart", + "FmtTokens", + "ModelName", + "TitleCase", + "round2", + "sum_bucket_costs", + "extract_task_from_message", + "format_tool_status", + "parse_session_event", +) diff --git a/src/backend/app/services/monitoring/data_processing.py b/src/backend/app/services/monitoring/data_processing.py new file mode 100644 index 0000000..7b07538 --- /dev/null +++ b/src/backend/app/services/monitoring/data_processing.py @@ -0,0 +1,320 @@ +"""Dashboard data processing utilities for Mission Control monitoring. + +This module provides pure utility functions to transform raw monitoring data +into dashboard-ready formats. These functions are pure (no I/O, no RPC calls) +and are called by API endpoints to transform database-stored data. +""" + +from __future__ import annotations + +import math +from collections import defaultdict + + +def ModelName(model: str) -> str: + """Map raw provider/model strings to display names. + + Ported from Go's apprefresh.ModelName(). + Example: "anthropic/claude-opus-4-6" -> "Claude Opus 4.6" + """ + ml = model.lower() + if "/" in ml: + parts = ml.split("/", 2) + ml = parts[1] + + if "opus-4-6" in ml: + return "Claude Opus 4.6" + if "opus" in ml: + return "Claude Opus 4.5" + if "sonnet" in ml: + return "Claude Sonnet" + if "haiku" in ml: + return "Claude Haiku" + if "grok-4-fast" in ml: + return "Grok 4 Fast" + if "grok-4" in ml or "grok4" in ml: + return "Grok 4" + if "gemini-2.5-pro" in ml or "gemini-pro" in ml: + return "Gemini 2.5 Pro" + if "gemini-3-flash" in ml: + return "Gemini 3 Flash" + if "gemini-2.5-flash" in ml: + return "Gemini 2.5 Flash" + if "gemini" in ml or "flash" in ml: + return "Gemini Flash" + if "minimax-m2.5" in ml: + return "MiniMax M2.5" + if "minimax-m2" in ml or "minimax" in ml: + return "MiniMax" + if "glm-5" in ml: + return "GLM-5" + if "glm-4" in ml: + return "GLM-4" + if "k2p5" in ml or "kimi" in ml: + return "Kimi K2.5" + if "gpt-5.3-codex" in ml: + return "GPT-5.3 Codex" + if "gpt-5" in ml: + return "GPT-5" + if "gpt-4o" in ml: + return "GPT-4o" + if "gpt-4" in ml: + return "GPT-4" + if "o1" in ml: + return "O1" + if "o3" in ml: + return "O3" + return model + + +def TitleCase(s: str) -> str: + """Uppercase the first character of an ASCII string. + + Ported from Go's apprefresh.TitleCase(). + """ + if not s: + return s + first = s[0] + if 'a' <= first <= 'z': + return chr(ord(first) - 32) + s[1:] + return s + + +def round2(v: float) -> float: + """Round to 2 decimal places.""" + return round(v * 100) / 100 + + +def sum_bucket_costs(buckets: dict[str, dict]) -> float: + """Sum the `cost` field from a dict of TokenBucket-like dicts. + + Args: + buckets: Dict mapping model names to TokenBucket-like dicts with 'cost' field + + Returns: + Total cost from all buckets + """ + total = 0.0 + for bucket in buckets.values(): + if isinstance(bucket, dict): + total += bucket.get("cost", 0.0) + return total + + +def BuildDailyChart( + daily_costs: dict[str, dict[str, float]], + daily_tokens: dict[str, dict[str, int]], + daily_calls: dict[str, dict[str, int]], + daily_subagent_costs: dict[str, float], + daily_subagent_count: dict[str, int], + days: int = 30 +) -> list[dict]: + """Aggregate cost/token/call data into daily buckets for chart rendering. + + Ported from Go's apprefresh.BuildDailyChart(). + + Args: + daily_costs: Dict of {date: {model: cost}} + daily_tokens: Dict of {date: {model: tokens}} + daily_calls: Dict of {date: {model: calls}} + daily_subagent_costs: Dict of {date: subagent_cost} + daily_subagent_count: Dict of {date: subagent_count} + days: Number of days to include (default 30) + + Returns: + List of dicts with keys: date, label, total, tokens, calls, + subagent_cost, subagent_runs, models + """ + from datetime import date, timedelta + + # Generate last N days + today = date.today() + chart_dates = [] + for i in range(days - 1, -1, -1): + chart_dates.append((today - timedelta(days=i)).strftime("%Y-%m-%d")) + + # Find top 6 models by cost + model_totals: dict[str, float] = defaultdict(float) + for d in chart_dates: + if d in daily_costs: + for m, c in daily_costs[d].items(): + model_totals[m] += c + + # Sort models by total cost descending + sorted_models = sorted(model_totals.items(), key=lambda x: x[1], reverse=True) + top_models = {m for m, _ in sorted_models[:6]} + + chart = [] + for d in chart_dates: + day_costs = daily_costs.get(d, {}) + day_tokens = daily_tokens.get(d, {}) + day_calls = daily_calls.get(d, {}) + + total_cost = sum(day_costs.values()) + total_tokens = sum(day_tokens.values()) + total_calls = sum(day_calls.values()) + + models = {} + other_cost = 0.0 + for m, c in day_costs.items(): + if m in top_models: + models[m] = round2(c) + else: + other_cost += c + + if other_cost > 0: + models["Other"] = round2(other_cost) + + chart.append({ + "date": d, + "label": d[5:], # MM-DD format + "total": round2(total_cost), + "tokens": total_tokens, + "calls": total_calls, + "subagent_cost": round2(daily_subagent_costs.get(d, 0.0)), + "subagent_runs": daily_subagent_count.get(d, 0), + "models": models, + }) + + return chart + + +def BuildAlerts( + total_cost_today: float, + cost_high: float, + cost_warn: float, + crons: list[dict], + sessions: list[dict], + context_threshold: float, + gateway_status: dict, + memory_threshold_kb: int +) -> list[dict]: + """Evaluate alert conditions and return list of alerts. + + Ported from Go's apprefresh.BuildAlerts(). + + Args: + total_cost_today: Current day's total cost + cost_high: High cost threshold + cost_warn: Warning cost threshold + crons: List of cron job status dicts with 'lastStatus' field + sessions: List of session dicts with 'contextPct' field + context_threshold: Context usage percentage threshold + gateway_status: Gateway health dict with 'status' and 'rss' fields + memory_threshold_kb: Memory threshold in KB + + Returns: + List of alert dicts with keys: type, icon, message, severity + """ + alerts = [] + + # Cost alerts + if total_cost_today > cost_high: + alerts.append({ + "type": "warning", + "icon": "💰", + "message": f"High daily cost: ${total_cost_today:.2f}", + "severity": "high", + }) + elif total_cost_today > cost_warn: + alerts.append({ + "type": "info", + "icon": "💵", + "message": f"Daily cost above ${cost_warn:.0f}: ${total_cost_today:.2f}", + "severity": "medium", + }) + + # Cron alerts + for cron in crons: + if cron.get("lastStatus") == "error": + name = cron.get("name", "Unknown") + alerts.append({ + "type": "error", + "icon": "❌", + "message": f"Cron failed: {name}", + "severity": "high", + }) + + # Context alerts + for session in sessions: + pct = session.get("contextPct", 0.0) + if pct > context_threshold: + name = session.get("name", "Unknown") + if len(name) > 30: + name = name[:30] + alerts.append({ + "type": "warning", + "icon": "⚠️", + "message": f"High context: {name} ({pct:.0f}%)", + "severity": "medium", + }) + + # Gateway offline alerts + if gateway_status.get("status") == "offline": + alerts.append({ + "type": "error", + "icon": "🔴", + "message": "Gateway is offline", + "severity": "critical", + }) + + # Memory alerts + rss = gateway_status.get("rss", 0) + if isinstance(rss, int) and rss > memory_threshold_kb: + memory = gateway_status.get("memory", f"{rss} KB") + alerts.append({ + "type": "warning", + "icon": "🧠", + "message": f"High memory usage: {memory}", + "severity": "medium", + }) + + return alerts + + +def BuildCostBreakdown(model_costs: dict[str, dict]) -> list[dict]: + """Rank models by cost. + + Ported from Go's apprefresh.BuildCostBreakdown(). + + Args: + model_costs: Dict mapping model names to TokenBucket-like dicts + + Returns: + Sorted list of {model, cost} dicts, descending by cost + """ + pairs = [] + for model, bucket in model_costs.items(): + if isinstance(bucket, dict): + cost = bucket.get("cost", 0.0) + if cost > 0: + pairs.append((model, cost)) + + # Sort by cost descending + pairs.sort(key=lambda x: x[1], reverse=True) + + return [ + {"model": m, "cost": round2(c)} + for m, c in pairs + ] + + +def FmtTokens(count: int) -> str: + """Format token counts for display. + + Ported from Go's apprefresh.FmtTokens(). + + Examples: + 1234567 -> "1.2M" + 500 -> "500" + 1500 -> "1.5K" + 1000000 -> "1.0M" + 1000000000 -> "1.0B" + """ + if count >= 1_000_000_000: + return f"{count / 1_000_000_000:.1f}B" + if count >= 1_000_000: + return f"{count / 1_000_000:.1f}M" + if count >= 1_000: + return f"{count / 1_000:.1f}K" + return str(count) diff --git a/src/backend/app/services/monitoring/event_parser.py b/src/backend/app/services/monitoring/event_parser.py new file mode 100644 index 0000000..9b59e98 --- /dev/null +++ b/src/backend/app/services/monitoring/event_parser.py @@ -0,0 +1,319 @@ +"""Event parsing utilities for WebSocket agent events. + +This module provides functions to parse SessionEvent records from the database +and convert them into dashboard-ready event formats for real-time broadcasting. + +Ported from TypeScript in sources/pixel-agents/server/openclawParser.ts +""" + +from __future__ import annotations + +import json +import re +import asyncio +from typing import Any + + +def format_tool_status(tool_name: str, args: dict[str, Any]) -> str: + """Format a tool call into a human-readable status string. + + Ported from TypeScript formatToolStatus(). + + Args: + tool_name: The tool name (e.g., "Read", "Edit", "exec") + args: Tool arguments dictionary + + Returns: + Human-readable status string + """ + if tool_name in ("Read", "read"): + file_path = args.get("file_path") or args.get("path") + if file_path: + return f"Reading {file_path}" + return "Reading" + + if tool_name in ("Edit", "edit"): + file_path = args.get("file_path") or args.get("path") + if file_path: + return f"Editing {file_path}" + return "Editing" + + if tool_name in ("Write", "write"): + file_path = args.get("file_path") or args.get("path") + if file_path: + return f"Writing {file_path}" + return "Writing" + + if tool_name == "exec": + return "Running command" + + if tool_name == "web_fetch": + url = args.get("url", "") + if isinstance(url, str): + url = url[:60] if len(url) > 60 else url + return f"Fetching {url}" + + if tool_name == "browser": + return "Using browser" + + if tool_name == "sessions_spawn": + return "Spawning sub-agent" + + if tool_name == "memory_search": + return "Searching memory" + + if tool_name == "memory_get": + return "Reading memory" + + if tool_name == "message": + return "Sending message" + + if tool_name == "image": + return "Analyzing image" + + if tool_name == "pdf": + return "Analyzing PDF" + + if tool_name == "tts": + return "Speaking" + + return f"Using {tool_name}" + + +def extract_task_from_message(content: Any) -> str: + """Extract a meaningful task description from a user message. + + Ported from TypeScript extractTaskFromUserMessage(). + OpenClaw wraps user messages in metadata - we strip that to find actual content. + + Args: + content: The message content (string or array of blocks) + + Returns: + Cleaned task text, or empty string if no content found + """ + text = "" + + if isinstance(content, str): + text = content + elif isinstance(content, list): + for block in content: + if isinstance(block, dict) and block.get("type") == "text": + if isinstance(block.get("text"), str): + text = block["text"] + break + + if not text: + return "" + + # Strip OpenClaw conversation metadata wrapper + cleaned = text + + # Strip inter-session message headers + inter_session_match = re.match( + r"\[Inter-session message\].*?\n.*?\n([\s\S]*)", + cleaned, + re.MULTILINE + ) + if inter_session_match: + cleaned = inter_session_match.group(1).strip() + + # Strip "Conversation info" metadata block + conv_match = re.match( + r"Conversation info \(untrusted.*?\n```json\n[\s\S]*?```\n([\s\S]*)", + cleaned, + re.MULTILINE + ) + if conv_match: + cleaned = conv_match.group(1).strip() + + # Strip "Sender" metadata block + sender_match = re.match( + r"Sender \(untrusted.*?\n```json\n[\s\S]*?```\n([\s\S]*)", + cleaned, + re.MULTILINE + ) + if sender_match: + cleaned = sender_match.group(1).strip() + + # Strip chat history block + hist_match = re.match( + r"Chat history.*?\n```json\n[\s\S]*?```\n([\s\S]*)", + cleaned, + re.MULTILINE + ) + if hist_match: + cleaned = hist_match.group(1).strip() + + # Strip @mentions at the start + cleaned = re.sub(r"^@\S+\s*", "", cleaned).strip() + + # Strip untrusted context blocks + cleaned = re.sub(r"Untrusted context \(metadata[^)]*\):\s*", "", cleaned).strip() + cleaned = re.sub( + r"<<]*>>>", + "", + cleaned + ).strip() + + # If it's a sessions_send payload, try to get the message body + try: + parsed = json.loads(cleaned) + if isinstance(parsed, dict) and (parsed.get("messages") or parsed.get("message")): + cleaned = parsed.get("message", "") + if isinstance(cleaned, list): + cleaned = "" + elif isinstance(cleaned, dict): + cleaned = "" + except json.JSONDecodeError: + pass + + # Truncate to something reasonable + if len(cleaned) > 150: + cleaned = cleaned[:147] + "..." + + return cleaned + + +def parse_session_event(event: Any) -> list[dict[str, Any]]: + """Parse a SessionEvent model instance into dashboard events. + + Ported from TypeScript parseOpenClawLine(). + Takes a SessionEvent (SQLModel instance) and returns a list of dashboard events. + + Args: + event: SessionEvent model instance with fields: + - event_type: "message", "session", "custom", etc. + - metadata_: JSON dict containing the full record data + + Returns: + List of dashboard event dicts with type and relevant fields + """ + events = [] + + try: + # Parse metadata as the record + metadata = event.metadata_ or {} + record = metadata if isinstance(metadata, dict) else {} + + if not record: + return events + + if record.get("type") == "message": + msg = record.get("message") or {} + + if isinstance(msg, dict): + role = msg.get("role") + content = msg.get("content") + + if role == "assistant" and isinstance(content, list): + has_tool_call = False + chat_text = "" + + for block in content: + if not isinstance(block, dict): + continue + block_type = block.get("type") + + if block_type == "text" and isinstance(block.get("text"), str): + chat_text = block["text"].strip() + chat_text = re.sub(r"^\[\[reply_to[^\]]*\]\]\s*", "", chat_text).strip() + + if block_type == "toolCall" and block.get("id"): + has_tool_call = True + tool_id = block["id"] + tool_name = block.get("name", "unknown") + args = block.get("arguments", {}) or {} + status = format_tool_status(tool_name, args) + + events.append({ + "type": "agentToolStart", + "id": event.agent_id or "unknown", + "toolId": tool_id, + "status": status, + }) + + # Detect sub-agent spawns + if tool_name == "sessions_spawn": + label = args.get("label") or args.get("agentId") or "sub-agent" + task = args.get("task", "") + events.append({ + "type": "subagentSpawned", + "id": event.agent_id or "unknown", + "toolId": tool_id, + "label": label, + "task": task[:200] if task else "", + }) + + if block_type == "toolResult" and block.get("toolCallId"): + tool_id = block["toolCallId"] + events.append({ + "type": "agentToolDone", + "id": event.agent_id or "unknown", + "toolId": tool_id, + }) + + if chat_text and chat_text not in ("NO_REPLY", "HEARTBEAT_OK", "ANNOUNCE_SKIP", "REPLY_SKIP"): + events.append({ + "type": "agentChat", + "id": event.agent_id or "unknown", + "message": chat_text[:2000] if len(chat_text) > 2000 else chat_text, + }) + + if has_tool_call: + events.append({ + "type": "agentStatus", + "id": event.agent_id or "unknown", + "status": "active", + }) + + elif role == "toolResult" and isinstance(content, dict): + # Tool completed + tool_id = content.get("toolCallId") + if tool_id: + events.append({ + "type": "agentToolDone", + "id": event.agent_id or "unknown", + "toolId": tool_id, + }) + + elif role == "user": + # New user message - new turn starting + task_text = extract_task_from_message(content) if content else "" + if task_text: + events.append({ + "type": "agentTask", + "id": event.agent_id or "unknown", + "task": task_text, + }) + events.append({ + "type": "agentToolsClear", + "id": event.agent_id or "unknown", + }) + events.append({ + "type": "agentStatus", + "id": event.agent_id or "unknown", + "status": "active", + }) + + elif record.get("type") == "session": + # Session start record + events.append({ + "type": "sessionStart", + "id": event.agent_id or "unknown", + }) + events.append({ + "type": "agentStatus", + "id": event.agent_id or "unknown", + "status": "active", + }) + + elif record.get("type") == "custom": + # Handle custom events if needed + pass + + except Exception: + # Ignore malformed events + pass + + return events