feat: add cost-summary and cost-breakdown monitoring endpoints

- CostSummaryRead schema + GET /monitoring/cost-summary (latest snapshot per gateway)
- CostBreakdownRead schema + GET /monitoring/cost-breakdown (models ranked by cost with %)
- Both endpoints support ?gateway_id= filtering and org-scoping
- Updated FUTURE.md: dashboard logic port and WebSocket marked done, remaining 5 summary endpoints queued
This commit is contained in:
Ripley 2026-05-10 22:38:41 -05:00
parent fd7d0aca42
commit 3719ab42b4
3 changed files with 252 additions and 21 deletions

106
FUTURE.md
View File

@ -60,17 +60,11 @@ Create new PostgreSQL models for tracking data (cost, sessions, crons, system he
### 🟠 WebSocket for Agent Events — HIGH
**Priority:** HIGH
**Status:** PENDING
**Status:** DONE ✅
**Added:** 2026-05-10 by Ripley
**Completed:** 2026-05-11
**Description:**
Add FastAPI WebSocket endpoint for real-time agent event broadcasting, porting the JSONL session watcher from pixel-agents.
**Implementation Notes:**
- Port `sessionWatcher.ts` logic to Python
- Watch `~/.openclaw/agents/*/sessions/*.jsonl`
- Parse events and broadcast via WebSocket
- Front-end client in `agent-events.ts`
WebSocket endpoint at `/ws/agents` with initial state snapshot (last 50 events) and background polling (every 2s). Event parser ported from TS source. Committed in v0.0.4.
---
@ -86,24 +80,94 @@ Add FastAPI WebSocket endpoint for real-time agent event broadcasting, porting t
### 🟠 Dashboard Logic Port — HIGH
**Priority:** HIGH
**Status:** DONE ✅
**Added:** 2026-05-11 by Ripley
**Completed:** 2026-05-11
Data processing functions ported to Python: `ModelName()`, `BuildDailyChart()`, `BuildAlerts()`, `FmtTokens()`, `BuildCostBreakdown()`. Event parser ported: `parse_session_event()`, `format_tool_status()`. WebSocket endpoint at `/ws/agents` with initial snapshot + polling. All committed in v0.0.4.
### 🟠 Cost Summary API Endpoints — HIGH
**Priority:** HIGH
**Status:** IN PROGRESS
**Added:** 2026-05-11 by Ripley
**Description:**
"How much have I spent today, and which model is burning the most?" — Cost Cards + Cost Breakdown API endpoints.
**Implementation Notes:**
- Today's cost, all-time cost, projected monthly cost
- Per-model cost breakdown (7d/30d/all-time tabs)
- Use existing `data_processing.BuildCostBreakdown()` and `data_processing.BuildDailyChart()`
- New API endpoints: `GET /api/v1/monitoring/cost-summary` and `GET /api/v1/monitoring/cost-breakdown?range=7d|30d|all`
- Org-scoped, paginated where applicable
- Neo is implementing this now
### 🟠 Health Summary API — HIGH
**Priority:** HIGH
**Status:** PENDING
**Added:** 2026-05-11 by Ripley
**Description:**
Port the Go dashboard's data-processing logic that we're NOT yet reusing — model name normalization, daily cost charting, alert threshold computation, and token formatting.
"Is my gateway actually running right now?" — System health summary endpoint.
**Implementation Notes:**
- The Go source (`sources/dashboard-tracking/internal/apprefresh/`) has significant logic beyond raw collection:
- `ModelName()` — maps raw provider/model strings (e.g. `anthropic/claude-opus-4-6`) to display names ("Claude Opus 4.6")
- `BuildDailyChart()` — aggregates cost/token data into daily buckets for chart rendering
- `BuildAlerts()` — evaluates cost thresholds, cron failures, context usage, and gateway health against configurable rules
- `FmtTokens()` — formats raw token counts (1,234,567 → "1.2M")
- `BuildCostBreakdown()` — organizes per-model cost into ranked lists
- We're already reusing the **gateway RPC layer** (same transport) and **data model shapes** (same fields)
- What we're NOT reusing is the **processing/aggregation logic** — currently the collector just stores raw data
- This must be ported as Python utility functions before building the monitoring frontend, so the API endpoints can serve pre-computed charts and alerts
- Create `src/backend/app/services/monitoring/data_processing.py` for this logic
- Also port `sources/dashboard-tracking/system_types.go` and `system_service.go` for system health data normalization
- Gateway status (online/offline), PID, uptime, memory, compaction
- CPU, RAM, swap, disk gauges
- Use existing `monitoring/health` endpoint data + `data_processing` functions
- New API endpoint: `GET /api/v1/monitoring/health-summary`
### 🟠 Cron Summary API — HIGH
**Priority:** HIGH
**Status:** PENDING
**Added:** 2026-05-11 by Ripley
**Description:**
"Which cron jobs ran, which failed, and when does the next one fire?" — Cron summary endpoint.
**Implementation Notes:**
- List all cron jobs with status, schedule, last/next run, duration, model
- Filter by enabled/disabled, gateway
- Use existing `monitoring/cron-jobs` endpoint
- New API endpoint: `GET /api/v1/monitoring/cron-summary`
### 🟠 Sessions Summary API — HIGH
**Priority:** HIGH
**Status:** PENDING
**Added:** 2026-05-11 by Ripley
**Description:**
"What sessions are active and how much context are they consuming?" — Active sessions summary.
**Implementation Notes:**
- Recent sessions with model, type badges (DM/group/cron/subagent), context %, tokens
- Use `data_processing.ModelName()` and `event_parser.parse_session_event()`
- New API endpoint: `GET /api/v1/monitoring/sessions-summary`
### 🟠 Sub-Agents Summary API — HIGH
**Priority:** HIGH
**Status:** PENDING
**Added:** 2026-05-11 by Ripley
**Description:**
"Are my sub-agents doing useful work or spinning in circles?" — Sub-agent activity summary.
**Implementation Notes:**
- Sub-agent runs with cost, duration, status + token breakdown (7d/30d tabs)
- Use existing `monitoring/sub-agents` endpoint
- New API endpoint: `GET /api/v1/monitoring/sub-agents-summary`
### 🟠 Cost Trends API — HIGH
**Priority:** HIGH
**Status:** PENDING
**Added:** 2026-05-11 by Ripley
**Description:**
"What's the cost trend over the last 7 days — am I accelerating?" — Charts & trends endpoint.
**Implementation Notes:**
- Cost trend line (7d/30d), model cost breakdown bars, per-model usage
- Use `data_processing.BuildDailyChart()`
- New API endpoint: `GET /api/v1/monitoring/trends?range=7d|30d`
### 🟡 Cost Tracking UI — MEDIUM
**Priority:** MEDIUM

View File

@ -21,7 +21,9 @@ from app.models.monitoring import (
SystemHealthMetric,
)
from app.schemas.monitoring import (
CostBreakdownRead,
CostSnapshotRead,
CostSummaryRead,
CronJobStatusRead,
SessionEventRead,
SubAgentRunRead,
@ -331,3 +333,143 @@ async def get_sub_agent_run(
if run.organization_id != ctx.organization.id:
raise HTTPException(status_code=403, detail="Access denied")
return run
@router.get(
"/cost-summary",
response_model=list[CostSummaryRead],
summary="Cost Summary",
description="Consolidated cost overview for all gateways in the organization.",
)
async def get_cost_summary(
gateway_id: UUID | None = GATEWAY_ID_QUERY,
session: AsyncSession = SESSION_DEP,
ctx: OrganizationContext = ORG_MEMBER_DEP,
) -> list[CostSummaryRead]:
"""Get cost summary for all gateways in the organization.
Returns consolidated cost data with model breakdown for the most recent
snapshot per gateway.
- **gateway_id**: Filter by specific gateway
"""
statement = select(CostSnapshot).where(
CostSnapshot.organization_id == ctx.organization.id
)
if gateway_id is not None:
statement = statement.where(CostSnapshot.gateway_id == gateway_id)
statement = statement.order_by(col(CostSnapshot.created_at).desc())
result = await session.execute(statement)
snapshots = result.scalars().all()
if not snapshots:
return []
# Group by gateway and get the latest snapshot per gateway
from collections import defaultdict
by_gateway: dict[UUID, CostSnapshot] = {}
for snapshot in snapshots:
if snapshot.gateway_id not in by_gateway or snapshot.collected_at > by_gateway[snapshot.gateway_id].collected_at:
by_gateway[snapshot.gateway_id] = snapshot
# Build CostSummaryRead objects
from app.models.gateways import Gateway
summaries = []
for gw_id, snapshot in by_gateway.items():
# Get gateway name
gateway_stmt = select(Gateway).where(Gateway.id == gw_id)
gateway_result = await session.execute(gateway_stmt)
gateway = gateway_result.scalar_one_or_none()
gateway_name = gateway.name if gateway else "Unknown"
summaries.append(CostSummaryRead(
gateway_id=gw_id,
gateway_name=gateway_name,
period_start=snapshot.period_start,
period_end=snapshot.period_end,
total_cost=snapshot.total_cost,
cost_by_model=snapshot.model_costs or {},
token_counts=snapshot.token_counts,
last_collected_at=snapshot.collected_at,
))
return summaries
@router.get(
"/cost-breakdown",
response_model=list[CostBreakdownRead],
summary="Cost Breakdown",
description="Models ranked by cost with percentage breakdown for all gateways.",
)
async def get_cost_breakdown(
gateway_id: UUID | None = GATEWAY_ID_QUERY,
session: AsyncSession = SESSION_DEP,
ctx: OrganizationContext = ORG_MEMBER_DEP,
) -> list[CostBreakdownRead]:
"""Get cost breakdown for all gateways in the organization.
Returns models ranked by cost with percentage breakdown for the most recent
snapshot per gateway.
- **gateway_id**: Filter by specific gateway
"""
statement = select(CostSnapshot).where(
CostSnapshot.organization_id == ctx.organization.id
)
if gateway_id is not None:
statement = statement.where(CostSnapshot.gateway_id == gateway_id)
statement = statement.order_by(col(CostSnapshot.created_at).desc())
result = await session.execute(statement)
snapshots = result.scalars().all()
if not snapshots:
return []
# Group by gateway and get the latest snapshot per gateway
from collections import defaultdict
by_gateway: dict[UUID, CostSnapshot] = {}
for snapshot in snapshots:
if snapshot.gateway_id not in by_gateway or snapshot.collected_at > by_gateway[snapshot.gateway_id].collected_at:
by_gateway[snapshot.gateway_id] = snapshot
# Build CostBreakdownRead objects
from app.models.gateways import Gateway
breakdowns = []
for gw_id, snapshot in by_gateway.items():
# Get gateway name
gateway_stmt = select(Gateway).where(Gateway.id == gw_id)
gateway_result = await session.execute(gateway_stmt)
gateway = gateway_result.scalar_one_or_none()
gateway_name = gateway.name if gateway else "Unknown"
# Build breakdown list with percentages
breakdown = []
total = snapshot.total_cost or 0.0
if total > 0 and snapshot.model_costs:
for model, cost in snapshot.model_costs.items():
pct = round((cost / total) * 100, 2) if total > 0 else 0.0
breakdown.append({
"model": model,
"cost": cost,
"percent": pct,
})
# Sort by cost descending
breakdown.sort(key=lambda x: x["cost"], reverse=True)
breakdowns.append(CostBreakdownRead(
gateway_id=gw_id,
gateway_name=gateway_name,
period_start=snapshot.period_start,
period_end=snapshot.period_end,
total_cost=snapshot.total_cost,
breakdown=breakdown,
last_collected_at=snapshot.collected_at,
))
return breakdowns

View File

@ -111,3 +111,28 @@ class SystemHealthMetricRead(SQLModel):
collected_at: datetime
created_at: datetime
updated_at: datetime
class CostSummaryRead(SQLModel):
"""Cost summary payload - consolidated cost overview per gateway."""
gateway_id: UUID
gateway_name: str
period_start: datetime
period_end: datetime
total_cost: float
cost_by_model: dict[str, float]
token_counts: dict[str, int] | None = None
last_collected_at: datetime
class CostBreakdownRead(SQLModel):
"""Cost breakdown payload - models ranked by cost."""
gateway_id: UUID
gateway_name: str
period_start: datetime
period_end: datetime
total_cost: float
breakdown: list[dict[str, float]] # [{"model": "claude-opus", "cost": 1.23, "percent": 45.6}]
last_collected_at: datetime