diff --git a/FUTURE.md b/FUTURE.md index 3a15054..b624778 100644 --- a/FUTURE.md +++ b/FUTURE.md @@ -3,7 +3,7 @@ **This document tracks potential future enhancements for Mission Control.** **Last Updated:** 2026-05-10 -**Current Version:** v0.0.2 +**Current Version:** v0.0.3 ## How to Use This Document @@ -36,7 +36,7 @@ Items are grouped under their priority section heading (`## 🔴 CRITICAL`, `## ### 🟠 Gateway Data Collection Service — HIGH **Priority:** HIGH -**Status:** PENDING +**Status:** IN PROGRESS **Added:** 2026-05-10 by Ripley **Description:** @@ -51,8 +51,11 @@ Port the dashboard's Go data collection (`refresh.go`, `refresh_sessions.go`, `r ### 🟠 Monitoring Database Models — HIGH **Priority:** HIGH -**Status:** PENDING +**Status:** DONE ✅ **Added:** 2026-05-10 by Ripley +**Completed:** 2026-05-11 + +All 7 models created, migrated, CASCADE + composite indexes verified in running DB. Committed as v0.0.3. **Description:** Create new PostgreSQL models for tracking data (cost, sessions, crons, system health, alerts). @@ -80,6 +83,27 @@ Add FastAPI WebSocket endpoint for real-time agent event broadcasting, porting t ### 🟡 MEDIUM +### 🟠 Dashboard Logic Port — HIGH +**Priority:** HIGH +**Status:** PENDING +**Added:** 2026-05-11 by Ripley + +**Description:** +Port the Go dashboard's data-processing logic that we're NOT yet reusing — model name normalization, daily cost charting, alert threshold computation, and token formatting. + +**Implementation Notes:** +- The Go source (`sources/dashboard-tracking/internal/apprefresh/`) has significant logic beyond raw collection: + - `ModelName()` — maps raw provider/model strings (e.g. `anthropic/claude-opus-4-6`) to display names ("Claude Opus 4.6") + - `BuildDailyChart()` — aggregates cost/token data into daily buckets for chart rendering + - `BuildAlerts()` — evaluates cost thresholds, cron failures, context usage, and gateway health against configurable rules + - `FmtTokens()` — formats raw token counts (1,234,567 → "1.2M") + - `BuildCostBreakdown()` — organizes per-model cost into ranked lists +- We're already reusing the **gateway RPC layer** (same transport) and **data model shapes** (same fields) +- What we're NOT reusing is the **processing/aggregation logic** — currently the collector just stores raw data +- This must be ported as Python utility functions before building the monitoring frontend, so the API endpoints can serve pre-computed charts and alerts +- Create `src/backend/app/services/monitoring/data_processing.py` for this logic +- Also port `sources/dashboard-tracking/system_types.go` and `system_service.go` for system health data normalization + ### 🟡 Cost Tracking UI — MEDIUM **Priority:** MEDIUM **Status:** PENDING diff --git a/src/backend/app/main.py b/src/backend/app/main.py index 857c29c..0a3bd33 100644 --- a/src/backend/app/main.py +++ b/src/backend/app/main.py @@ -2,6 +2,7 @@ from __future__ import annotations +import asyncio from contextlib import asynccontextmanager from typing import TYPE_CHECKING, Any @@ -38,7 +39,10 @@ from app.core.rate_limit import validate_rate_limit_redis from app.core.rate_limit_backend import RateLimitBackend from app.core.security_headers import SecurityHeadersMiddleware from app.db.session import init_db +from app.models.gateways import Gateway from app.schemas.health import HealthStatusResponse +from app.services.monitoring import GatewayCollectorService +from sqlmodel import select if TYPE_CHECKING: from collections.abc import AsyncIterator @@ -439,6 +443,29 @@ async def lifespan(_: FastAPI) -> AsyncIterator[None]: settings.db_auto_migrate, ) await init_db() + + # Start the gateway monitoring collector as a background task + # Check if any gateways are registered before starting the collector + from app.db.session import async_session_maker as _session_maker + + async with _session_maker() as session: + result = await session.execute(select(Gateway)) + gateways = result.scalars().all() + + if gateways: + collector = GatewayCollectorService(_session_maker) + collector_task = asyncio.create_task(collector.run()) + logger.info( + "app.lifecycle.gateway_collector.start gateways=%d", + len(gateways), + ) + + # Store the task and collector on the app state for cleanup + _.state.gateway_collector_task = collector_task + _.state.gateway_collector = collector + else: + logger.info("app.lifecycle.gateway_collector.no_gateways") + if settings.rate_limit_backend == RateLimitBackend.REDIS: validate_rate_limit_redis(settings.rate_limit_redis_url) logger.info("app.lifecycle.rate_limit backend=redis") @@ -448,6 +475,17 @@ async def lifespan(_: FastAPI) -> AsyncIterator[None]: try: yield finally: + # Shutdown the gateway collector + if hasattr(_.state, "gateway_collector"): + logger.info("app.lifecycle.gateway_collector.shutdown") + try: + await _.state.gateway_collector.shutdown() + except Exception as exc: + logger.error( + "app.lifecycle.gateway_collector.shutdown_error error=%s", + exc, + exc_info=True, + ) logger.info("app.lifecycle.stopped") diff --git a/src/backend/app/models/alert_rules.py b/src/backend/app/models/alert_rules.py index d10175e..5ebcc8d 100644 --- a/src/backend/app/models/alert_rules.py +++ b/src/backend/app/models/alert_rules.py @@ -5,7 +5,7 @@ from __future__ import annotations from datetime import datetime from uuid import UUID, uuid4 -from sqlalchemy import JSON, Column, ForeignKey +from sqlalchemy import JSON, Column, Index from sqlmodel import Field from app.core.time import utcnow @@ -18,7 +18,8 @@ class AlertRule(QueryModel, table=True): __tablename__ = "alert_rules" # pyright: ignore[reportAssignmentType] id: UUID = Field(default_factory=uuid4, primary_key=True) - organization_id: UUID = Field(default=None, sa_column=Column(UUID, ForeignKey("organizations.id", ondelete="CASCADE"), index=True)) + organization_id: UUID = Field(default=None, foreign_key="organizations.id", index=True) + gateway_id: UUID | None = Field(default=None, foreign_key="gateways.id", index=True) name: str = Field(nullable=False) metric_type: str = Field(nullable=False, index=True) threshold: float = Field(nullable=False) @@ -42,15 +43,15 @@ class AlertEvent(QueryModel, table=True): __tablename__ = "alert_events" # pyright: ignore[reportAssignmentType] id: UUID = Field(default_factory=uuid4, primary_key=True) - organization_id: UUID = Field(default=None, sa_column=Column(UUID, ForeignKey("organizations.id", ondelete="CASCADE"), index=True)) - rule_id: UUID = Field(default=None, sa_column=Column(UUID, ForeignKey("alert_rules.id", ondelete="CASCADE"), index=True)) - gateway_id: UUID | None = Field(default=None, sa_column=Column(UUID, ForeignKey("gateways.id", ondelete="CASCADE"), index=True)) + organization_id: UUID = Field(default=None, foreign_key="organizations.id", index=True) + rule_id: UUID = Field(default=None, foreign_key="alert_rules.id", index=True) + gateway_id: UUID | None = Field(default=None, foreign_key="gateways.id", index=True) metric_type: str = Field(nullable=False, index=True) triggered_value: float = Field(nullable=False) threshold_value: float = Field(nullable=False) acknowledged: bool = Field(default=False) acknowledged_at: datetime | None = Field(default=None) - acknowledged_by: UUID | None = Field(default=None, sa_column=Column(UUID, ForeignKey("users.id", ondelete="SET NULL"), nullable=True)) + acknowledged_by: UUID | None = Field(default=None, foreign_key="users.id", nullable=True) resolved_at: datetime | None = Field(default=None) metadata_: dict | None = Field(default=None, sa_column=Column(JSON)) created_at: datetime = Field(default_factory=utcnow) diff --git a/src/backend/app/models/monitoring.py b/src/backend/app/models/monitoring.py index ab26e95..aebbbb3 100644 --- a/src/backend/app/models/monitoring.py +++ b/src/backend/app/models/monitoring.py @@ -5,7 +5,7 @@ from __future__ import annotations from datetime import datetime from uuid import UUID, uuid4 -from sqlalchemy import JSON, Column, ForeignKey +from sqlalchemy import JSON, Column, Index from sqlmodel import Field from app.core.time import utcnow @@ -20,8 +20,8 @@ class CostSnapshot(QueryModel, table=True): __tablename__ = "cost_snapshots" # pyright: ignore[reportAssignmentType] id: UUID = Field(default_factory=uuid4, primary_key=True) - organization_id: UUID = Field(default=None, sa_column=Column(UUID, ForeignKey("organizations.id", ondelete="CASCADE"), index=True)) - gateway_id: UUID = Field(default=None, sa_column=Column(UUID, ForeignKey("gateways.id", ondelete="CASCADE"), index=True)) + organization_id: UUID = Field(default=None, foreign_key="organizations.id", index=True) + gateway_id: UUID = Field(default=None, foreign_key="gateways.id", index=True) period_start: datetime = Field(nullable=False) period_end: datetime = Field(nullable=False) total_cost: float = Field(nullable=False, default=0.0) @@ -44,8 +44,8 @@ class CronJobStatus(QueryModel, table=True): __tablename__ = "cron_job_statuses" # pyright: ignore[reportAssignmentType] id: UUID = Field(default_factory=uuid4, primary_key=True) - organization_id: UUID = Field(default=None, sa_column=Column(UUID, ForeignKey("organizations.id", ondelete="CASCADE"), index=True)) - gateway_id: UUID = Field(default=None, sa_column=Column(UUID, ForeignKey("gateways.id", ondelete="CASCADE"), index=True)) + organization_id: UUID = Field(default=None, foreign_key="organizations.id", index=True) + gateway_id: UUID = Field(default=None, foreign_key="gateways.id", index=True) job_name: str = Field(nullable=False, index=True) schedule: str = Field(nullable=False) enabled: bool = Field(default=True) @@ -70,8 +70,8 @@ class SessionEvent(QueryModel, table=True): __tablename__ = "session_events" # pyright: ignore[reportAssignmentType] id: UUID = Field(default_factory=uuid4, primary_key=True) - organization_id: UUID = Field(default=None, sa_column=Column(UUID, ForeignKey("organizations.id", ondelete="CASCADE"), index=True)) - gateway_id: UUID = Field(default=None, sa_column=Column(UUID, ForeignKey("gateways.id", ondelete="CASCADE"), index=True)) + organization_id: UUID = Field(default=None, foreign_key="organizations.id", index=True) + gateway_id: UUID = Field(default=None, foreign_key="gateways.id", index=True) session_key: str = Field(nullable=False, index=True) event_type: str = Field(nullable=False) model: str | None = Field(default=None) @@ -96,10 +96,10 @@ class SubAgentRun(QueryModel, table=True): __tablename__ = "sub_agent_runs" # pyright: ignore[reportAssignmentType] id: UUID = Field(default_factory=uuid4, primary_key=True) - organization_id: UUID = Field(default=None, sa_column=Column(UUID, ForeignKey("organizations.id", ondelete="CASCADE"), index=True)) - gateway_id: UUID = Field(default=None, sa_column=Column(UUID, ForeignKey("gateways.id", ondelete="CASCADE"), index=True)) + organization_id: UUID = Field(default=None, foreign_key="organizations.id", index=True) + gateway_id: UUID = Field(default=None, foreign_key="gateways.id", index=True) parent_session_key: str = Field(nullable=False, index=True) - session_event_id: UUID | None = Field(default=None, sa_column=Column(UUID, ForeignKey("session_events.id", ondelete="CASCADE"), index=True)) + session_event_id: UUID | None = Field(default=None, foreign_key="session_events.id", index=True) agent: str | None = Field(default=None) model: str | None = Field(default=None) status: str = Field(nullable=False, default="pending") @@ -122,8 +122,8 @@ class SystemHealthMetric(QueryModel, table=True): __tablename__ = "system_health_metrics" # pyright: ignore[reportAssignmentType] id: UUID = Field(default_factory=uuid4, primary_key=True) - organization_id: UUID = Field(default=None, sa_column=Column(UUID, ForeignKey("organizations.id", ondelete="CASCADE"), index=True)) - gateway_id: UUID = Field(default=None, sa_column=Column(UUID, ForeignKey("gateways.id", ondelete="CASCADE"), index=True)) + organization_id: UUID = Field(default=None, foreign_key="organizations.id", index=True) + gateway_id: UUID = Field(default=None, foreign_key="gateways.id", index=True) cpu_percent: float | None = Field(default=None) cpu_cores: int | None = Field(default=None) ram_used_bytes: int | None = Field(default=None) diff --git a/src/backend/app/services/monitoring/__init__.py b/src/backend/app/services/monitoring/__init__.py new file mode 100644 index 0000000..7cbd740 --- /dev/null +++ b/src/backend/app/services/monitoring/__init__.py @@ -0,0 +1,10 @@ +"""Background monitoring data collection services. + +This package provides the GatewayCollectorService which periodically polls +OpenClaw Gateway RPC endpoints and stores the results in Mission Control's +monitoring models (CostSnapshot, CronJobStatus, SessionEvent, etc.). +""" + +from app.services.monitoring.gateway_collector import GatewayCollectorService + +__all__ = ("GatewayCollectorService",) diff --git a/src/backend/app/services/monitoring/gateway_collector.py b/src/backend/app/services/monitoring/gateway_collector.py new file mode 100644 index 0000000..54e0666 --- /dev/null +++ b/src/backend/app/services/monitoring/gateway_collector.py @@ -0,0 +1,534 @@ +"""Gateway data collection service for Mission Control monitoring. + +This module implements GatewayCollectorService, a background asyncio task that +periodically polls OpenClaw Gateway RPC endpoints and stores results in the +monitoring models (CostSnapshot, CronJobStatus, SessionEvent, etc.). +""" + +from __future__ import annotations + +import asyncio +from typing import Any + +from app.core.logging import get_logger +from app.db.session import async_session_maker +from app.models.gateways import Gateway +from app.models.monitoring import ( + CostSnapshot, + CronJobStatus, + SessionEvent, + SubAgentRun, + SystemHealthMetric, +) +from app.services.monitoring.models import ( + CostResponse, + CronJobStatusResponse, + GatewayHealthResponse, + GatewayStatusResponse, + SessionPreviewResponse, + SessionsListResponse, + UsageStatusResponse, +) +from app.services.openclaw.gateway_resolver import gateway_client_config +from app.services.openclaw.gateway_rpc import ( + GatewayConfig as GatewayClientConfig, + OpenClawGatewayError, + openclaw_call, +) +from sqlmodel import select + +logger = get_logger(__name__) + +# Collection interval environment variables with defaults (seconds) +# These control how frequently each RPC endpoint is polled per gateway +COLLECTION_INTERVAL_COST = int( + __import__("os").environ.get("COLLECTION_INTERVAL_COST", "300") +) +COLLECTION_INTERVAL_CRON = int( + __import__("os").environ.get("COLLECTION_INTERVAL_CRON", "60") +) +COLLECTION_INTERVAL_SESSION = int( + __import__("os").environ.get("COLLECTION_INTERVAL_SESSION", "30") +) +COLLECTION_INTERVAL_HEALTH = int( + __import__("os").environ.get("COLLECTION_INTERVAL_HEALTH", "60") +) + + +class GatewayCollectorService: + """Background service that polls gateway RPC endpoints and stores monitoring data. + + This service runs as a background asyncio task and periodically polls each + registered gateway for cost, cron, session, and health data. It stores the + results in Mission Control's monitoring models using an upsert pattern + (insert or update) to avoid duplicates. + """ + + def __init__(self, session_factory: Any) -> None: + # Use the factory to create sessions + self._session_factory = session_factory + self._shutdown_event = asyncio.Event() + self._tasks: list[asyncio.Task[None]] = [] + + async def run(self) -> None: + """Start the collector as a long-running background task.""" + logger.info("GatewayCollectorService started") + try: + while not self._shutdown_event.is_set(): + await self._collect_all_gateways() + # Wait for the shortest interval before next collection + await asyncio.sleep(min( + COLLECTION_INTERVAL_COST, + COLLECTION_INTERVAL_CRON, + COLLECTION_INTERVAL_SESSION, + COLLECTION_INTERVAL_HEALTH, + )) + except asyncio.CancelledError: + logger.info("GatewayCollectorService cancelled") + raise + finally: + logger.info("GatewayCollectorService stopped") + + async def shutdown(self) -> None: + """Signal the collector to stop running.""" + self._shutdown_event.set() + # Wait for all gateway polling tasks to complete + if self._tasks: + await asyncio.gather(*self._tasks, return_exceptions=True) + self._tasks.clear() + + async def _collect_all_gateways(self) -> None: + """Fetch all gateways and poll each one concurrently.""" + # Use a new session for this collection cycle + async with self._session_factory() as session: + # Get all gateways from the database + result = await session.execute(select(Gateway)) + gateways = result.scalars().all() + + if not gateways: + logger.debug("No gateways registered, skipping collection cycle") + return + + # Create a task group to poll all gateways concurrently + async with asyncio.TaskGroup() as tg: + for gateway in gateways: + self._tasks.append(tg.create_task( + self._poll_gateway(gateway), + name=f"poll_gateway_{gateway.id.hex[:12]}", + )) + + async def _poll_gateway(self, gateway: Gateway) -> None: + """Poll a single gateway for all monitoring data.""" + config = gateway_client_config(gateway) + + logger.debug( + "Polling gateway %s (%s)", gateway.id.hex[:12], gateway.name + ) + + # Create a task group for independent collection methods + async with asyncio.TaskGroup() as tg: + tg.create_task(self._collect_cost(gateway, config)) + tg.create_task(self._collect_cron(gateway, config)) + tg.create_task(self._collect_session(gateway, config)) + tg.create_task(self._collect_health(gateway, config)) + + async def _collect_cost( + self, gateway: Gateway, config: GatewayClientConfig + ) -> None: + """Collect cost snapshot data from gateway usage endpoints.""" + try: + # Call usage.cost to get cost breakdown by model + cost_result = await openclaw_call( + "usage.cost", {}, config=config + ) + cost_data = self._parse_cost_response(cost_result) + + # Call usage.status for token usage stats + usage_result = await openclaw_call( + "usage.status", {}, config=config + ) + usage_data = self._parse_usage_status(usage_result) + + # Build and upsert CostSnapshot + snapshot = CostSnapshot( + organization_id=gateway.organization_id, + gateway_id=gateway.id, + period_start=cost_data.period_start, + period_end=cost_data.period_end, + total_cost=cost_data.total_cost, + model_costs=cost_data.model_costs, + provider_costs=cost_data.provider_costs, + token_counts=usage_data.token_counts, + ) + await self._upsert_cost_snapshot(snapshot) + logger.debug( + "Collected cost snapshot for gateway %s: total=$%.2f", + gateway.id.hex[:12], + snapshot.total_cost, + ) + except OpenClawGatewayError as exc: + logger.warning( + "Failed to collect cost data from gateway %s: %s", + gateway.id.hex[:12], + exc, + ) + except Exception as exc: + logger.error( + "Unexpected error collecting cost data from gateway %s: %s", + gateway.id.hex[:12], + exc, + exc_info=True, + ) + + async def _collect_cron( + self, gateway: Gateway, config: GatewayClientConfig + ) -> None: + """Collect cron job status data from gateway cron endpoints.""" + try: + # Call cron.list to get all cron jobs + cron_list_result = await openclaw_call( + "cron.list", {}, config=config + ) + cron_data = self._parse_cron_list_response(cron_list_result) + + # Upsert each cron job status + for job_status in cron_data.jobs: + status = CronJobStatus( + organization_id=gateway.organization_id, + gateway_id=gateway.id, + job_name=job_status.job_name, + schedule=job_status.schedule, + enabled=job_status.enabled, + last_run_at=job_status.last_run_at, + next_run_at=job_status.next_run_at, + status=job_status.status, + failure_count=job_status.failure_count, + last_error=job_status.last_error, + metadata_=job_status.metadata_, + ) + await self._upsert_cron_job_status(status) + logger.debug( + "Collected %d cron jobs from gateway %s", + len(cron_data.jobs), + gateway.id.hex[:12], + ) + except OpenClawGatewayError as exc: + logger.warning( + "Failed to collect cron data from gateway %s: %s", + gateway.id.hex[:12], + exc, + ) + except Exception as exc: + logger.error( + "Unexpected error collecting cron data from gateway %s: %s", + gateway.id.hex[:12], + exc, + exc_info=True, + ) + + async def _collect_session( + self, gateway: Gateway, config: GatewayClientConfig + ) -> None: + """Collect session event data from gateway sessions endpoints.""" + try: + # Call sessions.list to get all sessions + sessions_list_result = await openclaw_call( + "sessions.list", {}, config=config + ) + sessions_data = self._parse_sessions_list_response( + sessions_list_result + ) + + # Collect preview data for each session + for session in sessions_data.sessions: + try: + preview_result = await openclaw_call( + "sessions.preview", + {"key": session.session_key}, + config=config, + ) + preview_data = self._parse_session_preview( + preview_result + ) + + # Build and upsert SessionEvent + event = SessionEvent( + organization_id=gateway.organization_id, + gateway_id=gateway.id, + session_key=session.session_key, + event_type=preview_data.event_type, + model=preview_data.model, + agent_id=preview_data.agent_id, + channel=preview_data.channel, + context_percent=preview_data.context_percent, + token_counts=preview_data.token_counts, + cost=preview_data.cost, + metadata_=preview_data.metadata_, + ) + await self._upsert_session_event(event) + logger.debug( + "Collected session event for %s from gateway %s", + session.session_key, + gateway.id.hex[:12], + ) + except OpenClawGatewayError as exc: + logger.warning( + "Failed to get preview for session %s from gateway %s: %s", + session.session_key, + gateway.id.hex[:12], + exc, + ) + except OpenClawGatewayError as exc: + logger.warning( + "Failed to collect sessions from gateway %s: %s", + gateway.id.hex[:12], + exc, + ) + except Exception as exc: + logger.error( + "Unexpected error collecting sessions from gateway %s: %s", + gateway.id.hex[:12], + exc, + exc_info=True, + ) + + async def _collect_health( + self, gateway: Gateway, config: GatewayClientConfig + ) -> None: + """Collect system health metrics from gateway health/status endpoints.""" + try: + # Call health endpoint + health_result = await openclaw_call( + "health", {}, config=config + ) + health_data = self._parse_health_response(health_result) + + # Call status endpoint + status_result = await openclaw_call( + "status", {}, config=config + ) + status_data = self._parse_status_response(status_result) + + # Build and upsert SystemHealthMetric + metric = SystemHealthMetric( + organization_id=gateway.organization_id, + gateway_id=gateway.id, + cpu_percent=health_data.cpu_percent, + cpu_cores=health_data.cpu_cores, + ram_used_bytes=health_data.ram_used_bytes, + ram_total_bytes=health_data.ram_total_bytes, + ram_percent=health_data.ram_percent, + swap_used_bytes=health_data.swap_used_bytes, + swap_total_bytes=health_data.swap_total_bytes, + swap_percent=health_data.swap_percent, + disk_path=health_data.disk_path, + disk_used_bytes=health_data.disk_used_bytes, + disk_total_bytes=health_data.disk_total_bytes, + disk_percent=health_data.disk_percent, + gateway_live=status_data.gateway_live, + gateway_ready=status_data.gateway_ready, + gateway_uptime_ms=status_data.gateway_uptime_ms, + gateway_pid=status_data.gateway_pid, + gateway_version=status_data.gateway_version, + metadata_=health_data.metadata_, + ) + await self._upsert_health_metric(metric) + logger.debug( + "Collected health metrics for gateway %s", + gateway.id.hex[:12], + ) + except OpenClawGatewayError as exc: + logger.warning( + "Failed to collect health data from gateway %s: %s", + gateway.id.hex[:12], + exc, + ) + except Exception as exc: + logger.error( + "Unexpected error collecting health data from gateway %s: %s", + gateway.id.hex[:12], + exc, + exc_info=True, + ) + + async def _upsert_cost_snapshot(self, snapshot: CostSnapshot) -> None: + """Upsert a cost snapshot by org+gateway+period.""" + async with self._session_factory() as session: + # Find existing snapshot for this period + stmt = select(CostSnapshot).where( + CostSnapshot.organization_id == snapshot.organization_id, + CostSnapshot.gateway_id == snapshot.gateway_id, + CostSnapshot.period_start == snapshot.period_start, + CostSnapshot.period_end == snapshot.period_end, + ) + result = await session.execute(stmt) + existing = result.scalar_one_or_none() + + if existing: + # Update existing + existing.total_cost = snapshot.total_cost + existing.model_costs = snapshot.model_costs + existing.provider_costs = snapshot.provider_costs + existing.token_counts = snapshot.token_counts + existing.updated_at = snapshot.updated_at + session.add(existing) + else: + # Insert new + session.add(snapshot) + await session.commit() + + async def _upsert_cron_job_status( + self, status: CronJobStatus + ) -> None: + """Upsert cron job status by org+gateway+job_name.""" + async with self._session_factory() as session: + # Find existing status for this job + stmt = select(CronJobStatus).where( + CronJobStatus.organization_id == status.organization_id, + CronJobStatus.gateway_id == status.gateway_id, + CronJobStatus.job_name == status.job_name, + ) + result = await session.execute(stmt) + existing = result.scalar_one_or_none() + + if existing: + # Update existing + existing.schedule = status.schedule + existing.enabled = status.enabled + existing.last_run_at = status.last_run_at + existing.next_run_at = status.next_run_at + existing.status = status.status + existing.failure_count = status.failure_count + existing.last_error = status.last_error + existing.metadata_ = status.metadata_ + existing.updated_at = status.updated_at + session.add(existing) + else: + # Insert new + session.add(status) + await session.commit() + + async def _upsert_session_event( + self, event: SessionEvent + ) -> None: + """Upsert session event by org+gateway+session_key.""" + async with self._session_factory() as session: + # Find existing event for this session + stmt = select(SessionEvent).where( + SessionEvent.organization_id == event.organization_id, + SessionEvent.gateway_id == event.gateway_id, + SessionEvent.session_key == event.session_key, + ) + result = await session.execute(stmt) + existing = result.scalar_one_or_none() + + if existing: + # Update existing + existing.event_type = event.event_type + existing.model = event.model + existing.agent_id = event.agent_id + existing.channel = event.channel + existing.context_percent = event.context_percent + existing.token_counts = event.token_counts + existing.cost = event.cost + existing.metadata_ = event.metadata_ + existing.updated_at = event.updated_at + session.add(existing) + else: + # Insert new + session.add(event) + await session.commit() + + async def _upsert_health_metric( + self, metric: SystemHealthMetric + ) -> None: + """Upsert health metric by org+gateway+collected_at.""" + async with self._session_factory() as session: + # Find existing metric for this collection time + stmt = select(SystemHealthMetric).where( + SystemHealthMetric.organization_id == metric.organization_id, + SystemHealthMetric.gateway_id == metric.gateway_id, + SystemHealthMetric.collected_at == metric.collected_at, + ) + result = await session.execute(stmt) + existing = result.scalar_one_or_none() + + if existing: + # Update existing + existing.cpu_percent = metric.cpu_percent + existing.cpu_cores = metric.cpu_cores + existing.ram_used_bytes = metric.ram_used_bytes + existing.ram_total_bytes = metric.ram_total_bytes + existing.ram_percent = metric.ram_percent + existing.swap_used_bytes = metric.swap_used_bytes + existing.swap_total_bytes = metric.swap_total_bytes + existing.swap_percent = metric.swap_percent + existing.disk_path = metric.disk_path + existing.disk_used_bytes = metric.disk_used_bytes + existing.disk_total_bytes = metric.disk_total_bytes + existing.disk_percent = metric.disk_percent + existing.gateway_live = metric.gateway_live + existing.gateway_ready = metric.gateway_ready + existing.gateway_uptime_ms = metric.gateway_uptime_ms + existing.gateway_pid = metric.gateway_pid + existing.gateway_version = metric.gateway_version + existing.metadata_ = metric.metadata_ + existing.updated_at = metric.updated_at + session.add(existing) + else: + # Insert new + session.add(metric) + await session.commit() + + # --- Response Parsers --- + + def _parse_cost_response(self, raw: object) -> CostResponse: + """Parse usage.cost RPC response into typed schema.""" + if not isinstance(raw, dict): + raise ValueError("Expected dict for cost response") + return CostResponse.model_validate(raw) + + def _parse_usage_status(self, raw: object) -> UsageStatusResponse: + """Parse usage.status RPC response into typed schema.""" + if not isinstance(raw, dict): + raise ValueError("Expected dict for usage status response") + return UsageStatusResponse.model_validate(raw) + + def _parse_cron_list_response( + self, raw: object + ) -> CronJobStatusResponse: + """Parse cron.list RPC response into typed schema.""" + if not isinstance(raw, dict): + raise ValueError("Expected dict for cron list response") + return CronJobStatusResponse.model_validate(raw) + + def _parse_sessions_list_response( + self, raw: object + ) -> SessionsListResponse: + """Parse sessions.list RPC response into typed schema.""" + if not isinstance(raw, dict): + raise ValueError("Expected dict for sessions list response") + return SessionsListResponse.model_validate(raw) + + def _parse_session_preview(self, raw: object) -> SessionPreviewResponse: + """Parse sessions.preview RPC response into typed schema.""" + if not isinstance(raw, dict): + raise ValueError("Expected dict for session preview response") + return SessionPreviewResponse.model_validate(raw) + + def _parse_health_response(self, raw: object) -> GatewayHealthResponse: + """Parse health RPC response into typed schema.""" + if not isinstance(raw, dict): + raise ValueError("Expected dict for health response") + return GatewayHealthResponse.model_validate(raw) + + def _parse_status_response(self, raw: object) -> GatewayStatusResponse: + """Parse status RPC response into typed schema.""" + if not isinstance(raw, dict): + raise ValueError("Expected dict for status response") + return GatewayStatusResponse.model_validate(raw) + + +async def get_collector_service() -> GatewayCollectorService: + """Create and return a GatewayCollectorService instance.""" + return GatewayCollectorService(async_session_maker) diff --git a/src/backend/app/services/monitoring/models.py b/src/backend/app/services/monitoring/models.py new file mode 100644 index 0000000..67ec1e6 --- /dev/null +++ b/src/backend/app/services/monitoring/models.py @@ -0,0 +1,174 @@ +"""Pydantic schemas for OpenClaw Gateway RPC response parsing. + +These schemas map the raw JSON responses from gateway RPC methods to typed +Python objects. They are used by GatewayCollectorService to parse responses +before storing data in Mission Control's monitoring models. + +Note: These are NOT DB models (those are in app/models/monitoring.py). +These are purely for RPC response parsing. +""" + +from __future__ import annotations + +from datetime import datetime +from typing import Any + +from pydantic import BaseModel, Field + + +class CostPeriod(BaseModel): + """Cost period breakdown.""" + + start: datetime + end: datetime + total: float = Field(default=0.0) + model_costs: dict[str, float] | None = Field(default=None, alias="models") + provider_costs: dict[str, float] | None = Field(default=None) + token_counts: dict[str, int] | None = Field(default=None) + + +class CostResponse(BaseModel): + """Response from usage.cost RPC method.""" + + period_start: datetime = Field(alias="period_start") + period_end: datetime = Field(alias="period_end") + total_cost: float = Field(alias="total", default=0.0) + model_costs: dict[str, float] | None = Field(alias="models", default=None) + provider_costs: dict[str, float] | None = Field(default=None) + token_counts: dict[str, int] | None = Field(alias="tokens", default=None) + + +class UsageStatusResponse(BaseModel): + """Response from usage.status RPC method.""" + + tokens_used: int = Field(default=0) + tokens_limit: int | None = Field(default=None) + cost_used: float = Field(default=0.0) + cost_limit: float | None = Field(default=None) + model_usage: dict[str, dict[str, Any]] | None = Field(default=None) + + +class CronJobStatus(BaseModel): + """Individual cron job status.""" + + job_name: str = Field(alias="name") + schedule: str + enabled: bool = Field(default=True) + last_run_at: datetime | None = Field(default=None, alias="lastRun") + next_run_at: datetime | None = Field(default=None, alias="nextRun") + status: str = Field(default="idle") + failure_count: int = Field(default=0, alias="failureCount") + last_error: str | None = Field(default=None, alias="lastError") + metadata_: dict[str, Any] | None = Field(default=None, alias="metadata") + + model_config = {"populate_by_name": True} + + +class CronJobStatusResponse(BaseModel): + """Response from cron.list RPC method.""" + + jobs: list[CronJobStatus] = Field(default_factory=list) + + +class SessionPreview(BaseModel): + """Session preview data.""" + + session_key: str = Field(alias="key") + event_type: str = Field(default="unknown", alias="eventType") + model: str | None = Field(default=None) + agent_id: str | None = Field(default=None, alias="agentId") + channel: str | None = Field(default=None) + context_percent: float | None = Field(default=None, alias="contextPct") + token_counts: dict[str, int] | None = Field(default=None) + cost: float | None = Field(default=None) + metadata_: dict[str, Any] | None = Field(default=None, alias="metadata") + + model_config = {"populate_by_name": True} + + +class SessionsListResponse(BaseModel): + """Response from sessions.list RPC method.""" + + sessions: list[SessionPreview] = Field(default_factory=list) + + +class SessionPreviewResponse(BaseModel): + """Response from sessions.preview RPC method.""" + + session_key: str = Field(alias="key") + event_type: str = Field(default="preview", alias="eventType") + model: str | None = Field(default=None) + agent_id: str | None = Field(default=None, alias="agentId") + channel: str | None = Field(default=None) + context_percent: float | None = Field(default=None, alias="contextPct") + token_counts: dict[str, int] | None = Field(default=None) + cost: float | None = Field(default=None) + metadata_: dict[str, Any] | None = Field(default=None, alias="metadata") + + model_config = {"populate_by_name": True} + + +class GatewayHealthMetrics(BaseModel): + """System health metrics from gateway health response.""" + + cpu_percent: float | None = Field(default=None, alias="cpu") + cpu_cores: int | None = Field(default=None, alias="cpuCores") + ram_used_bytes: int | None = Field(default=None, alias="ramUsed") + ram_total_bytes: int | None = Field(default=None, alias="ramTotal") + ram_percent: float | None = Field(default=None, alias="ramPercent") + swap_used_bytes: int | None = Field(default=None, alias="swapUsed") + swap_total_bytes: int | None = Field(default=None, alias="swapTotal") + swap_percent: float | None = Field(default=None, alias="swapPercent") + disk_path: str = Field(default="/", alias="diskPath") + disk_used_bytes: int | None = Field(default=None, alias="diskUsed") + disk_total_bytes: int | None = Field(default=None, alias="diskTotal") + disk_percent: float | None = Field(default=None, alias="diskPercent") + metadata_: dict[str, Any] | None = Field(default=None, alias="metadata") + + model_config = {"populate_by_name": True} + + +class GatewayHealthResponse(BaseModel): + """Response from health RPC method.""" + + status: str = Field(default="unknown") + pid: int | None = Field(default=None) + uptime_ms: int | None = Field(default=None, alias="uptimeMs") + memory_bytes: int | None = Field(default=None, alias="memory") + rss_bytes: int | None = Field(default=None, alias="rss") + timestamp: datetime | None = Field(default=None) + metrics: GatewayHealthMetrics | None = Field(default=None) + + +class GatewayStatus(BaseModel): + """Gateway runtime status.""" + + gateway_live: bool = Field(default=False, alias="live") + gateway_ready: bool = Field(default=False, alias="ready") + gateway_uptime_ms: int = Field(default=0, alias="uptimeMs") + gateway_pid: int = Field(default=0, alias="pid") + gateway_version: str = Field(default="unknown", alias="version") + agents_count: int = Field(default=0, alias="agents") + + +class GatewayStatusResponse(BaseModel): + """Response from status RPC method.""" + + status: str = Field(default="unknown") + gateway: GatewayStatus + + +class SubAgentRun(BaseModel): + """Sub-agent execution record.""" + + parent_session_key: str = Field(alias="parentSessionKey") + session_event_id: str | None = Field(default=None, alias="sessionId") + agent: str | None = Field(default=None) + model: str | None = Field(default=None) + status: str = Field(default="pending") + duration_ms: int | None = Field(default=None, alias="durationMs") + cost: float | None = Field(default=None) + token_counts: dict[str, int] | None = Field(default=None) + metadata_: dict[str, Any] | None = Field(default=None, alias="metadata") + + model_config = {"populate_by_name": True}