From 3719ab42b4a8a48442d6b6c40bccdb8e59d0b25c Mon Sep 17 00:00:00 2001 From: Ripley Date: Sun, 10 May 2026 22:38:41 -0500 Subject: [PATCH] feat: add cost-summary and cost-breakdown monitoring endpoints - CostSummaryRead schema + GET /monitoring/cost-summary (latest snapshot per gateway) - CostBreakdownRead schema + GET /monitoring/cost-breakdown (models ranked by cost with %) - Both endpoints support ?gateway_id= filtering and org-scoping - Updated FUTURE.md: dashboard logic port and WebSocket marked done, remaining 5 summary endpoints queued --- FUTURE.md | 106 +++++++++++++++---- src/backend/app/api/monitoring.py | 142 ++++++++++++++++++++++++++ src/backend/app/schemas/monitoring.py | 25 +++++ 3 files changed, 252 insertions(+), 21 deletions(-) diff --git a/FUTURE.md b/FUTURE.md index e4c6746..b1b2442 100644 --- a/FUTURE.md +++ b/FUTURE.md @@ -60,17 +60,11 @@ Create new PostgreSQL models for tracking data (cost, sessions, crons, system he ### 🟠 WebSocket for Agent Events — HIGH **Priority:** HIGH -**Status:** PENDING +**Status:** DONE ✅ **Added:** 2026-05-10 by Ripley +**Completed:** 2026-05-11 -**Description:** -Add FastAPI WebSocket endpoint for real-time agent event broadcasting, porting the JSONL session watcher from pixel-agents. - -**Implementation Notes:** -- Port `sessionWatcher.ts` logic to Python -- Watch `~/.openclaw/agents/*/sessions/*.jsonl` -- Parse events and broadcast via WebSocket -- Front-end client in `agent-events.ts` +WebSocket endpoint at `/ws/agents` with initial state snapshot (last 50 events) and background polling (every 2s). Event parser ported from TS source. Committed in v0.0.4. --- @@ -86,24 +80,94 @@ Add FastAPI WebSocket endpoint for real-time agent event broadcasting, porting t ### 🟠 Dashboard Logic Port — HIGH **Priority:** HIGH +**Status:** DONE ✅ +**Added:** 2026-05-11 by Ripley +**Completed:** 2026-05-11 + +Data processing functions ported to Python: `ModelName()`, `BuildDailyChart()`, `BuildAlerts()`, `FmtTokens()`, `BuildCostBreakdown()`. Event parser ported: `parse_session_event()`, `format_tool_status()`. WebSocket endpoint at `/ws/agents` with initial snapshot + polling. All committed in v0.0.4. + +### 🟠 Cost Summary API Endpoints — HIGH +**Priority:** HIGH +**Status:** IN PROGRESS +**Added:** 2026-05-11 by Ripley + +**Description:** +"How much have I spent today, and which model is burning the most?" — Cost Cards + Cost Breakdown API endpoints. + +**Implementation Notes:** +- Today's cost, all-time cost, projected monthly cost +- Per-model cost breakdown (7d/30d/all-time tabs) +- Use existing `data_processing.BuildCostBreakdown()` and `data_processing.BuildDailyChart()` +- New API endpoints: `GET /api/v1/monitoring/cost-summary` and `GET /api/v1/monitoring/cost-breakdown?range=7d|30d|all` +- Org-scoped, paginated where applicable +- Neo is implementing this now + +### 🟠 Health Summary API — HIGH +**Priority:** HIGH **Status:** PENDING **Added:** 2026-05-11 by Ripley **Description:** -Port the Go dashboard's data-processing logic that we're NOT yet reusing — model name normalization, daily cost charting, alert threshold computation, and token formatting. +"Is my gateway actually running right now?" — System health summary endpoint. **Implementation Notes:** -- The Go source (`sources/dashboard-tracking/internal/apprefresh/`) has significant logic beyond raw collection: - - `ModelName()` — maps raw provider/model strings (e.g. `anthropic/claude-opus-4-6`) to display names ("Claude Opus 4.6") - - `BuildDailyChart()` — aggregates cost/token data into daily buckets for chart rendering - - `BuildAlerts()` — evaluates cost thresholds, cron failures, context usage, and gateway health against configurable rules - - `FmtTokens()` — formats raw token counts (1,234,567 → "1.2M") - - `BuildCostBreakdown()` — organizes per-model cost into ranked lists -- We're already reusing the **gateway RPC layer** (same transport) and **data model shapes** (same fields) -- What we're NOT reusing is the **processing/aggregation logic** — currently the collector just stores raw data -- This must be ported as Python utility functions before building the monitoring frontend, so the API endpoints can serve pre-computed charts and alerts -- Create `src/backend/app/services/monitoring/data_processing.py` for this logic -- Also port `sources/dashboard-tracking/system_types.go` and `system_service.go` for system health data normalization +- Gateway status (online/offline), PID, uptime, memory, compaction +- CPU, RAM, swap, disk gauges +- Use existing `monitoring/health` endpoint data + `data_processing` functions +- New API endpoint: `GET /api/v1/monitoring/health-summary` + +### 🟠 Cron Summary API — HIGH +**Priority:** HIGH +**Status:** PENDING +**Added:** 2026-05-11 by Ripley + +**Description:** +"Which cron jobs ran, which failed, and when does the next one fire?" — Cron summary endpoint. + +**Implementation Notes:** +- List all cron jobs with status, schedule, last/next run, duration, model +- Filter by enabled/disabled, gateway +- Use existing `monitoring/cron-jobs` endpoint +- New API endpoint: `GET /api/v1/monitoring/cron-summary` + +### 🟠 Sessions Summary API — HIGH +**Priority:** HIGH +**Status:** PENDING +**Added:** 2026-05-11 by Ripley + +**Description:** +"What sessions are active and how much context are they consuming?" — Active sessions summary. + +**Implementation Notes:** +- Recent sessions with model, type badges (DM/group/cron/subagent), context %, tokens +- Use `data_processing.ModelName()` and `event_parser.parse_session_event()` +- New API endpoint: `GET /api/v1/monitoring/sessions-summary` + +### 🟠 Sub-Agents Summary API — HIGH +**Priority:** HIGH +**Status:** PENDING +**Added:** 2026-05-11 by Ripley + +**Description:** +"Are my sub-agents doing useful work or spinning in circles?" — Sub-agent activity summary. + +**Implementation Notes:** +- Sub-agent runs with cost, duration, status + token breakdown (7d/30d tabs) +- Use existing `monitoring/sub-agents` endpoint +- New API endpoint: `GET /api/v1/monitoring/sub-agents-summary` + +### 🟠 Cost Trends API — HIGH +**Priority:** HIGH +**Status:** PENDING +**Added:** 2026-05-11 by Ripley + +**Description:** +"What's the cost trend over the last 7 days — am I accelerating?" — Charts & trends endpoint. + +**Implementation Notes:** +- Cost trend line (7d/30d), model cost breakdown bars, per-model usage +- Use `data_processing.BuildDailyChart()` +- New API endpoint: `GET /api/v1/monitoring/trends?range=7d|30d` ### 🟡 Cost Tracking UI — MEDIUM **Priority:** MEDIUM diff --git a/src/backend/app/api/monitoring.py b/src/backend/app/api/monitoring.py index 489d7ba..c19d3e0 100644 --- a/src/backend/app/api/monitoring.py +++ b/src/backend/app/api/monitoring.py @@ -21,7 +21,9 @@ from app.models.monitoring import ( SystemHealthMetric, ) from app.schemas.monitoring import ( + CostBreakdownRead, CostSnapshotRead, + CostSummaryRead, CronJobStatusRead, SessionEventRead, SubAgentRunRead, @@ -331,3 +333,143 @@ async def get_sub_agent_run( if run.organization_id != ctx.organization.id: raise HTTPException(status_code=403, detail="Access denied") return run + + +@router.get( + "/cost-summary", + response_model=list[CostSummaryRead], + summary="Cost Summary", + description="Consolidated cost overview for all gateways in the organization.", +) +async def get_cost_summary( + gateway_id: UUID | None = GATEWAY_ID_QUERY, + session: AsyncSession = SESSION_DEP, + ctx: OrganizationContext = ORG_MEMBER_DEP, +) -> list[CostSummaryRead]: + """Get cost summary for all gateways in the organization. + + Returns consolidated cost data with model breakdown for the most recent + snapshot per gateway. + + - **gateway_id**: Filter by specific gateway + """ + statement = select(CostSnapshot).where( + CostSnapshot.organization_id == ctx.organization.id + ) + + if gateway_id is not None: + statement = statement.where(CostSnapshot.gateway_id == gateway_id) + + statement = statement.order_by(col(CostSnapshot.created_at).desc()) + result = await session.execute(statement) + snapshots = result.scalars().all() + + if not snapshots: + return [] + + # Group by gateway and get the latest snapshot per gateway + from collections import defaultdict + by_gateway: dict[UUID, CostSnapshot] = {} + for snapshot in snapshots: + if snapshot.gateway_id not in by_gateway or snapshot.collected_at > by_gateway[snapshot.gateway_id].collected_at: + by_gateway[snapshot.gateway_id] = snapshot + + # Build CostSummaryRead objects + from app.models.gateways import Gateway + summaries = [] + for gw_id, snapshot in by_gateway.items(): + # Get gateway name + gateway_stmt = select(Gateway).where(Gateway.id == gw_id) + gateway_result = await session.execute(gateway_stmt) + gateway = gateway_result.scalar_one_or_none() + gateway_name = gateway.name if gateway else "Unknown" + + summaries.append(CostSummaryRead( + gateway_id=gw_id, + gateway_name=gateway_name, + period_start=snapshot.period_start, + period_end=snapshot.period_end, + total_cost=snapshot.total_cost, + cost_by_model=snapshot.model_costs or {}, + token_counts=snapshot.token_counts, + last_collected_at=snapshot.collected_at, + )) + + return summaries + + +@router.get( + "/cost-breakdown", + response_model=list[CostBreakdownRead], + summary="Cost Breakdown", + description="Models ranked by cost with percentage breakdown for all gateways.", +) +async def get_cost_breakdown( + gateway_id: UUID | None = GATEWAY_ID_QUERY, + session: AsyncSession = SESSION_DEP, + ctx: OrganizationContext = ORG_MEMBER_DEP, +) -> list[CostBreakdownRead]: + """Get cost breakdown for all gateways in the organization. + + Returns models ranked by cost with percentage breakdown for the most recent + snapshot per gateway. + + - **gateway_id**: Filter by specific gateway + """ + statement = select(CostSnapshot).where( + CostSnapshot.organization_id == ctx.organization.id + ) + + if gateway_id is not None: + statement = statement.where(CostSnapshot.gateway_id == gateway_id) + + statement = statement.order_by(col(CostSnapshot.created_at).desc()) + result = await session.execute(statement) + snapshots = result.scalars().all() + + if not snapshots: + return [] + + # Group by gateway and get the latest snapshot per gateway + from collections import defaultdict + by_gateway: dict[UUID, CostSnapshot] = {} + for snapshot in snapshots: + if snapshot.gateway_id not in by_gateway or snapshot.collected_at > by_gateway[snapshot.gateway_id].collected_at: + by_gateway[snapshot.gateway_id] = snapshot + + # Build CostBreakdownRead objects + from app.models.gateways import Gateway + breakdowns = [] + for gw_id, snapshot in by_gateway.items(): + # Get gateway name + gateway_stmt = select(Gateway).where(Gateway.id == gw_id) + gateway_result = await session.execute(gateway_stmt) + gateway = gateway_result.scalar_one_or_none() + gateway_name = gateway.name if gateway else "Unknown" + + # Build breakdown list with percentages + breakdown = [] + total = snapshot.total_cost or 0.0 + if total > 0 and snapshot.model_costs: + for model, cost in snapshot.model_costs.items(): + pct = round((cost / total) * 100, 2) if total > 0 else 0.0 + breakdown.append({ + "model": model, + "cost": cost, + "percent": pct, + }) + + # Sort by cost descending + breakdown.sort(key=lambda x: x["cost"], reverse=True) + + breakdowns.append(CostBreakdownRead( + gateway_id=gw_id, + gateway_name=gateway_name, + period_start=snapshot.period_start, + period_end=snapshot.period_end, + total_cost=snapshot.total_cost, + breakdown=breakdown, + last_collected_at=snapshot.collected_at, + )) + + return breakdowns diff --git a/src/backend/app/schemas/monitoring.py b/src/backend/app/schemas/monitoring.py index 405ca65..57af5e8 100644 --- a/src/backend/app/schemas/monitoring.py +++ b/src/backend/app/schemas/monitoring.py @@ -111,3 +111,28 @@ class SystemHealthMetricRead(SQLModel): collected_at: datetime created_at: datetime updated_at: datetime + + +class CostSummaryRead(SQLModel): + """Cost summary payload - consolidated cost overview per gateway.""" + + gateway_id: UUID + gateway_name: str + period_start: datetime + period_end: datetime + total_cost: float + cost_by_model: dict[str, float] + token_counts: dict[str, int] | None = None + last_collected_at: datetime + + +class CostBreakdownRead(SQLModel): + """Cost breakdown payload - models ranked by cost.""" + + gateway_id: UUID + gateway_name: str + period_start: datetime + period_end: datetime + total_cost: float + breakdown: list[dict[str, float]] # [{"model": "claude-opus", "cost": 1.23, "percent": 45.6}] + last_collected_at: datetime