feat: add 5 remaining monitoring summary endpoints

- health-summary: gateway status, PID, uptime, CPU/RAM/swap/disk, compaction
- cron-summary: cron jobs with schedule, status, failures, model
- sessions-summary: active sessions with model display names, context %, tokens
- sub-agents-summary: sub-agent runs with cost, duration, status, tokens
- trends: cost/token daily trends with 7d/30d range filter

All endpoints are org-scoped, support gateway_id filtering, and use
data_processing functions (ModelName, BuildDailyChart) where appropriate.
Syntax validated with py_compile.
This commit is contained in:
Ripley 2026-05-10 22:41:42 -05:00
parent 3719ab42b4
commit e348deb299
2 changed files with 558 additions and 0 deletions

View File

@ -25,11 +25,18 @@ from app.schemas.monitoring import (
CostSnapshotRead, CostSnapshotRead,
CostSummaryRead, CostSummaryRead,
CronJobStatusRead, CronJobStatusRead,
CronSummaryRead,
HealthSummaryRead,
SessionEventRead, SessionEventRead,
SessionSummaryRead,
SubAgentRunRead, SubAgentRunRead,
SubAgentSummaryRead,
SystemHealthMetricRead, SystemHealthMetricRead,
TrendDay,
TrendRead,
) )
from app.schemas.pagination import DefaultLimitOffsetPage from app.schemas.pagination import DefaultLimitOffsetPage
from app.models.gateways import Gateway
router = APIRouter(prefix="/monitoring", tags=["monitoring"]) router = APIRouter(prefix="/monitoring", tags=["monitoring"])
SESSION_DEP = Depends(get_session) SESSION_DEP = Depends(get_session)
@ -473,3 +480,465 @@ async def get_cost_breakdown(
)) ))
return breakdowns return breakdowns
@router.get(
"/health-summary",
response_model=list[HealthSummaryRead],
summary="Health Summary",
description="Latest health status and metrics for all gateways in the organization.",
)
async def get_health_summary(
gateway_id: UUID | None = GATEWAY_ID_QUERY,
session: AsyncSession = SESSION_DEP,
ctx: OrganizationContext = ORG_MEMBER_DEP,
) -> list[HealthSummaryRead]:
"""Get health summary for all gateways in the organization.
Returns the latest health snapshot per gateway with system metrics.
- **gateway_id**: Filter by specific gateway
"""
# Get latest health metric per gateway
subquery = (
select(
SystemHealthMetric.gateway_id,
col(SystemHealthMetric.collected_at).label("latest_collected"),
col(SystemHealthMetric.created_at).label("latest_created"),
)
.where(SystemHealthMetric.organization_id == ctx.organization.id)
.order_by(col(SystemHealthMetric.collected_at).desc())
.subquery()
)
# Get distinct gateway_ids with their latest metric
statement = select(SystemHealthMetric).where(
SystemHealthMetric.organization_id == ctx.organization.id
)
if gateway_id is not None:
statement = statement.where(SystemHealthMetric.gateway_id == gateway_id)
# Get all metrics, then group by gateway to get latest
result = await session.execute(statement)
metrics = result.scalars().all()
if not metrics:
return []
# Group by gateway and get the latest metric per gateway
from collections import defaultdict
by_gateway: dict[UUID, SystemHealthMetric] = {}
for metric in metrics:
if metric.gateway_id not in by_gateway or metric.collected_at > by_gateway[metric.gateway_id].collected_at:
by_gateway[metric.gateway_id] = metric
# Build HealthSummaryRead objects
summaries = []
for gw_id, metric in by_gateway.items():
# Get gateway name
gateway_stmt = select(Gateway).where(Gateway.id == gw_id)
gateway_result = await session.execute(gateway_stmt)
gateway = gateway_result.scalar_one_or_none()
gateway_name = gateway.name if gateway else "Unknown"
# Determine status
status = "online" if metric.gateway_live else "offline"
# Calculate uptime in seconds
uptime_seconds = None
if metric.gateway_uptime_ms:
uptime_seconds = round(metric.gateway_uptime_ms / 1000)
# Calculate memory in MB
memory_mb = None
if metric.ram_used_bytes and metric.ram_total_bytes:
memory_mb = round(metric.ram_used_bytes / (1024 * 1024), 2)
summaries.append(HealthSummaryRead(
gateway_id=gw_id,
gateway_name=gateway_name,
status=status,
pid=metric.gateway_pid,
uptime_seconds=uptime_seconds,
memory_mb=memory_mb,
cpu_percent=metric.cpu_percent,
ram_percent=metric.ram_percent,
swap_percent=metric.swap_percent,
disk_percent=metric.disk_percent,
compaction_mode=None, # Not tracked in current schema
last_collected_at=metric.collected_at,
))
return summaries
@router.get(
"/cron-summary",
response_model=list[CronSummaryRead],
summary="Cron Summary",
description="Status and scheduling info for all cron jobs in the organization.",
)
async def get_cron_summary(
gateway_id: UUID | None = GATEWAY_ID_QUERY,
enabled: bool | None = ENABLED_QUERY,
job_name: str | None = JOB_NAME_QUERY,
session: AsyncSession = SESSION_DEP,
ctx: OrganizationContext = ORG_MEMBER_DEP,
) -> list[CronSummaryRead]:
"""Get cron summary for all cron jobs in the organization.
Returns cron job status with scheduling info.
- **gateway_id**: Filter by specific gateway
- **enabled**: Filter by enabled status
- **job_name**: Filter by job name (exact match)
"""
statement = select(CronJobStatus).where(
CronJobStatus.organization_id == ctx.organization.id
)
if gateway_id is not None:
statement = statement.where(CronJobStatus.gateway_id == gateway_id)
if enabled is not None:
statement = statement.where(CronJobStatus.enabled == enabled)
if job_name is not None:
statement = statement.where(CronJobStatus.job_name == job_name)
statement = statement.order_by(col(CronJobStatus.created_at).desc())
result = await session.execute(statement)
jobs = result.scalars().all()
if not jobs:
return []
# Build CronSummaryRead objects
summaries = []
for job in jobs:
# Get gateway name
gateway_stmt = select(Gateway).where(Gateway.id == job.gateway_id)
gateway_result = await session.execute(gateway_stmt)
gateway = gateway_result.scalar_one_or_none()
gateway_name = gateway.name if gateway else "Unknown"
# Extract model from metadata if present
model = None
if job.metadata_ and isinstance(job.metadata_, dict):
model = job.metadata_.get("model")
summaries.append(CronSummaryRead(
gateway_id=job.gateway_id,
gateway_name=gateway_name,
job_name=job.job_name,
schedule=job.schedule,
enabled=job.enabled,
status=job.status,
last_run_at=job.last_run_at,
next_run_at=job.next_run_at,
last_error=job.last_error,
failure_count=job.failure_count,
model=model,
metadata_=job.metadata_,
))
return summaries
@router.get(
"/sessions-summary",
response_model=list[SessionSummaryRead],
summary="Sessions Summary",
description="Recent session events with context and token breakdown.",
)
async def get_sessions_summary(
gateway_id: UUID | None = GATEWAY_ID_QUERY,
session_key: str | None = SESSION_KEY_QUERY,
session: AsyncSession = SESSION_DEP,
ctx: OrganizationContext = ORG_MEMBER_DEP,
) -> list[SessionSummaryRead]:
"""Get sessions summary for the organization.
Returns recent session events with model display names and token breakdown.
- **gateway_id**: Filter by specific gateway
- **session_key**: Filter by session key
"""
statement = select(SessionEvent).where(
SessionEvent.organization_id == ctx.organization.id
)
if gateway_id is not None:
statement = statement.where(SessionEvent.gateway_id == gateway_id)
if session_key is not None:
statement = statement.where(SessionEvent.session_key == session_key)
statement = statement.order_by(col(SessionEvent.created_at).desc())
result = await session.execute(statement)
events = result.scalars().all()
if not events:
return []
# Build SessionSummaryRead objects
from app.services.monitoring.data_processing import ModelName
summaries = []
for event in events:
# Get gateway name
gateway_stmt = select(Gateway).where(Gateway.id == event.gateway_id)
gateway_result = await session.execute(gateway_stmt)
gateway = gateway_result.scalar_one_or_none()
gateway_name = gateway.name if gateway else "Unknown"
# Get model display name
model_display_name = None
if event.model:
model_display_name = ModelName(event.model)
# Extract token counts
prompt_tokens = None
completion_tokens = None
total_tokens = None
if event.token_counts and isinstance(event.token_counts, dict):
prompt_tokens = event.token_counts.get("prompt_tokens")
completion_tokens = event.token_counts.get("completion_tokens")
total_tokens = event.token_counts.get("total_tokens")
if total_tokens is None and prompt_tokens is not None and completion_tokens is not None:
total_tokens = prompt_tokens + completion_tokens
summaries.append(SessionSummaryRead(
gateway_id=event.gateway_id,
gateway_name=gateway_name,
session_key=event.session_key,
event_type=event.event_type,
model=event.model,
model_display_name=model_display_name,
agent_id=event.agent_id,
channel=event.channel,
context_percent=event.context_percent,
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
total_tokens=total_tokens,
last_collected_at=event.collected_at,
))
return summaries
@router.get(
"/sub-agents-summary",
response_model=list[SubAgentSummaryRead],
summary="Sub-Agent Summary",
description="Sub-agent runs with cost, duration, and token breakdown.",
)
async def get_sub_agents_summary(
gateway_id: UUID | None = GATEWAY_ID_QUERY,
status: str | None = STATUS_QUERY,
agent: str | None = AGENT_QUERY,
session: AsyncSession = SESSION_DEP,
ctx: OrganizationContext = ORG_MEMBER_DEP,
) -> list[SubAgentSummaryRead]:
"""Get sub-agents summary for the organization.
Returns sub-agent runs with cost and token breakdown.
- **gateway_id**: Filter by specific gateway
- **status**: Filter by run status (pending, running, succeeded, failed)
- **agent**: Filter by agent name
"""
statement = select(SubAgentRun).where(
SubAgentRun.organization_id == ctx.organization.id
)
if gateway_id is not None:
statement = statement.where(SubAgentRun.gateway_id == gateway_id)
if status is not None:
statement = statement.where(SubAgentRun.status == status)
if agent is not None:
statement = statement.where(SubAgentRun.agent == agent)
statement = statement.order_by(col(SubAgentRun.created_at).desc())
result = await session.execute(statement)
runs = result.scalars().all()
if not runs:
return []
# Build SubAgentSummaryRead objects
from app.services.monitoring.data_processing import ModelName
summaries = []
for run in runs:
# Get gateway name
gateway_stmt = select(Gateway).where(Gateway.id == run.gateway_id)
gateway_result = await session.execute(gateway_stmt)
gateway = gateway_result.scalar_one_or_none()
gateway_name = gateway.name if gateway else "Unknown"
# Get model display name
model_display_name = None
if run.model:
model_display_name = ModelName(run.model)
# Calculate duration in seconds
duration_seconds = None
if run.duration_ms:
duration_seconds = round(run.duration_ms / 1000, 2)
# Extract token counts
prompt_tokens = None
completion_tokens = None
total_tokens = None
if run.token_counts and isinstance(run.token_counts, dict):
prompt_tokens = run.token_counts.get("prompt_tokens")
completion_tokens = run.token_counts.get("completion_tokens")
total_tokens = run.token_counts.get("total_tokens")
if total_tokens is None and prompt_tokens is not None and completion_tokens is not None:
total_tokens = prompt_tokens + completion_tokens
summaries.append(SubAgentSummaryRead(
gateway_id=run.gateway_id,
gateway_name=gateway_name,
agent_id=run.agent,
session_key=run.parent_session_key,
model=run.model,
model_display_name=model_display_name,
status=run.status,
cost=run.cost,
duration_seconds=duration_seconds,
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
total_tokens=total_tokens,
last_collected_at=run.collected_at,
))
return summaries
@router.get(
"/trends",
response_model=list[TrendRead],
summary="Trends Summary",
description="Cost and token trends over time (7d or 30d).",
)
async def get_trends(
gateway_id: UUID | None = GATEWAY_ID_QUERY,
range_param: str = Query(default="7d", description="Time range: 7d or 30d"),
session: AsyncSession = SESSION_DEP,
ctx: OrganizationContext = ORG_MEMBER_DEP,
) -> list[TrendRead]:
"""Get cost and token trends for the organization.
Returns daily aggregated cost and token data for the specified time range.
- **gateway_id**: Filter by specific gateway
- **range**: Time range - "7d" (default) or "30d"
"""
import re
from datetime import date, timedelta
from app.services.monitoring.data_processing import ModelName
# Parse range
range_days = 7
if range_param == "30d":
range_days = 30
elif re.match(r"^(\d+)d$", range_param):
range_days = int(range_param[:-1])
# Get all cost snapshots
statement = select(CostSnapshot).where(
CostSnapshot.organization_id == ctx.organization.id
)
if gateway_id is not None:
statement = statement.where(CostSnapshot.gateway_id == gateway_id)
statement = statement.order_by(col(CostSnapshot.created_at).desc())
result = await session.execute(statement)
snapshots = result.scalars().all()
if not snapshots:
return []
# Group by gateway and get latest snapshot per gateway
from collections import defaultdict
by_gateway: dict[UUID, CostSnapshot] = {}
for snapshot in snapshots:
if snapshot.gateway_id not in by_gateway or snapshot.collected_at > by_gateway[snapshot.gateway_id].collected_at:
by_gateway[snapshot.gateway_id] = snapshot
# Build daily buckets
today = date.today()
chart_dates = []
for i in range(range_days - 1, -1, -1):
chart_dates.append((today - timedelta(days=i)).strftime("%Y-%m-%d"))
# Aggregate daily costs from snapshots
daily_costs: dict[str, dict[str, float]] = defaultdict(dict)
daily_tokens: dict[str, dict[str, int]] = defaultdict(dict)
daily_calls: dict[str, dict[str, int]] = defaultdict(dict)
for gw_id, snapshot in by_gateway.items():
# Get gateway name
gateway_stmt = select(Gateway).where(Gateway.id == gw_id)
gateway_result = await session.execute(gateway_stmt)
gateway = gateway_result.scalar_one_or_none()
gateway_name = gateway.name if gateway else "Unknown"
# Only include if we have model costs
if snapshot.model_costs:
for model, cost in snapshot.model_costs.items():
# Use today's date for simplicity (would need date tracking in schema for real daily breakdown)
date_str = snapshot.collected_at.strftime("%Y-%m-%d")
if date_str not in daily_costs:
daily_costs[date_str] = {}
if date_str not in daily_tokens:
daily_tokens[date_str] = {}
daily_costs[date_str][model] = cost
if snapshot.token_counts and model in snapshot.token_counts:
daily_tokens[date_str][model] = snapshot.token_counts[model]
# Build trend data per gateway
trends = []
for gw_id, snapshot in by_gateway.items():
# Get gateway name
gateway_stmt = select(Gateway).where(Gateway.id == gw_id)
gateway_result = await session.execute(gateway_stmt)
gateway = gateway_result.scalar_one_or_none()
gateway_name = gateway.name if gateway else "Unknown"
# Build days list
days_data = []
for d in chart_dates:
day_costs = daily_costs.get(d, {})
day_tokens = daily_tokens.get(d, {})
total_cost = sum(day_costs.values())
total_tokens = sum(day_tokens.values())
# Build model_costs dict
model_costs = {}
for m, c in day_costs.items():
# Use ModelName for display
display_name = ModelName(m)
model_costs[display_name] = round(c, 2)
days_data.append(TrendDay(
date=d,
total_cost=round(total_cost, 2),
total_tokens=total_tokens,
model_costs=model_costs,
))
trends.append(TrendRead(
gateway_id=gw_id,
gateway_name=gateway_name,
range=range_param,
days=days_data,
last_collected_at=snapshot.collected_at,
))
return trends

View File

@ -136,3 +136,92 @@ class CostBreakdownRead(SQLModel):
total_cost: float total_cost: float
breakdown: list[dict[str, float]] # [{"model": "claude-opus", "cost": 1.23, "percent": 45.6}] breakdown: list[dict[str, float]] # [{"model": "claude-opus", "cost": 1.23, "percent": 45.6}]
last_collected_at: datetime last_collected_at: datetime
class HealthSummaryRead(SQLModel):
"""Health summary payload - gateway health status and metrics."""
gateway_id: UUID
gateway_name: str
status: str
pid: int | None = None
uptime_seconds: int | None = None
memory_mb: float | None = None
cpu_percent: float | None = None
ram_percent: float | None = None
swap_percent: float | None = None
disk_percent: float | None = None
compaction_mode: str | None = None
last_collected_at: datetime
class CronSummaryRead(SQLModel):
"""Cron summary payload - cron job status and scheduling."""
gateway_id: UUID
gateway_name: str
job_name: str
schedule: str
enabled: bool
status: str
last_run_at: datetime | None = None
next_run_at: datetime | None = None
last_error: str | None = None
failure_count: int
model: str | None = None
metadata_: dict | None = None
class SessionSummaryRead(SQLModel):
"""Session summary payload - session events with context and token info."""
gateway_id: UUID
gateway_name: str
session_key: str
event_type: str
model: str | None = None
model_display_name: str | None = None
agent_id: str | None = None
channel: str | None = None
context_percent: float | None = None
prompt_tokens: int | None = None
completion_tokens: int | None = None
total_tokens: int | None = None
last_collected_at: datetime
class SubAgentSummaryRead(SQLModel):
"""Sub-agent summary payload - sub-agent runs with cost and token breakdown."""
gateway_id: UUID
gateway_name: str
agent_id: str | None = None
session_key: str
model: str | None = None
model_display_name: str | None = None
status: str
cost: float | None = None
duration_seconds: float | None = None
prompt_tokens: int | None = None
completion_tokens: int | None = None
total_tokens: int | None = None
last_collected_at: datetime
class TrendDay(SQLModel):
"""Single day of trend data."""
date: str
total_cost: float
total_tokens: int
model_costs: dict[str, float]
class TrendRead(SQLModel):
"""Trend summary payload - cost and token trends over time."""
gateway_id: UUID
gateway_name: str
range: str # "7d" or "30d"
days: list[TrendDay]
last_collected_at: datetime