diff --git a/src/backend/app/api/monitoring.py b/src/backend/app/api/monitoring.py index c19d3e0..3cf48a7 100644 --- a/src/backend/app/api/monitoring.py +++ b/src/backend/app/api/monitoring.py @@ -25,11 +25,18 @@ from app.schemas.monitoring import ( CostSnapshotRead, CostSummaryRead, CronJobStatusRead, + CronSummaryRead, + HealthSummaryRead, SessionEventRead, + SessionSummaryRead, SubAgentRunRead, + SubAgentSummaryRead, SystemHealthMetricRead, + TrendDay, + TrendRead, ) from app.schemas.pagination import DefaultLimitOffsetPage +from app.models.gateways import Gateway router = APIRouter(prefix="/monitoring", tags=["monitoring"]) SESSION_DEP = Depends(get_session) @@ -473,3 +480,465 @@ async def get_cost_breakdown( )) return breakdowns + + +@router.get( + "/health-summary", + response_model=list[HealthSummaryRead], + summary="Health Summary", + description="Latest health status and metrics for all gateways in the organization.", +) +async def get_health_summary( + gateway_id: UUID | None = GATEWAY_ID_QUERY, + session: AsyncSession = SESSION_DEP, + ctx: OrganizationContext = ORG_MEMBER_DEP, +) -> list[HealthSummaryRead]: + """Get health summary for all gateways in the organization. + + Returns the latest health snapshot per gateway with system metrics. + + - **gateway_id**: Filter by specific gateway + """ + # Get latest health metric per gateway + subquery = ( + select( + SystemHealthMetric.gateway_id, + col(SystemHealthMetric.collected_at).label("latest_collected"), + col(SystemHealthMetric.created_at).label("latest_created"), + ) + .where(SystemHealthMetric.organization_id == ctx.organization.id) + .order_by(col(SystemHealthMetric.collected_at).desc()) + .subquery() + ) + + # Get distinct gateway_ids with their latest metric + statement = select(SystemHealthMetric).where( + SystemHealthMetric.organization_id == ctx.organization.id + ) + + if gateway_id is not None: + statement = statement.where(SystemHealthMetric.gateway_id == gateway_id) + + # Get all metrics, then group by gateway to get latest + result = await session.execute(statement) + metrics = result.scalars().all() + + if not metrics: + return [] + + # Group by gateway and get the latest metric per gateway + from collections import defaultdict + by_gateway: dict[UUID, SystemHealthMetric] = {} + for metric in metrics: + if metric.gateway_id not in by_gateway or metric.collected_at > by_gateway[metric.gateway_id].collected_at: + by_gateway[metric.gateway_id] = metric + + # Build HealthSummaryRead objects + summaries = [] + for gw_id, metric in by_gateway.items(): + # Get gateway name + gateway_stmt = select(Gateway).where(Gateway.id == gw_id) + gateway_result = await session.execute(gateway_stmt) + gateway = gateway_result.scalar_one_or_none() + gateway_name = gateway.name if gateway else "Unknown" + + # Determine status + status = "online" if metric.gateway_live else "offline" + + # Calculate uptime in seconds + uptime_seconds = None + if metric.gateway_uptime_ms: + uptime_seconds = round(metric.gateway_uptime_ms / 1000) + + # Calculate memory in MB + memory_mb = None + if metric.ram_used_bytes and metric.ram_total_bytes: + memory_mb = round(metric.ram_used_bytes / (1024 * 1024), 2) + + summaries.append(HealthSummaryRead( + gateway_id=gw_id, + gateway_name=gateway_name, + status=status, + pid=metric.gateway_pid, + uptime_seconds=uptime_seconds, + memory_mb=memory_mb, + cpu_percent=metric.cpu_percent, + ram_percent=metric.ram_percent, + swap_percent=metric.swap_percent, + disk_percent=metric.disk_percent, + compaction_mode=None, # Not tracked in current schema + last_collected_at=metric.collected_at, + )) + + return summaries + + +@router.get( + "/cron-summary", + response_model=list[CronSummaryRead], + summary="Cron Summary", + description="Status and scheduling info for all cron jobs in the organization.", +) +async def get_cron_summary( + gateway_id: UUID | None = GATEWAY_ID_QUERY, + enabled: bool | None = ENABLED_QUERY, + job_name: str | None = JOB_NAME_QUERY, + session: AsyncSession = SESSION_DEP, + ctx: OrganizationContext = ORG_MEMBER_DEP, +) -> list[CronSummaryRead]: + """Get cron summary for all cron jobs in the organization. + + Returns cron job status with scheduling info. + + - **gateway_id**: Filter by specific gateway + - **enabled**: Filter by enabled status + - **job_name**: Filter by job name (exact match) + """ + statement = select(CronJobStatus).where( + CronJobStatus.organization_id == ctx.organization.id + ) + + if gateway_id is not None: + statement = statement.where(CronJobStatus.gateway_id == gateway_id) + + if enabled is not None: + statement = statement.where(CronJobStatus.enabled == enabled) + + if job_name is not None: + statement = statement.where(CronJobStatus.job_name == job_name) + + statement = statement.order_by(col(CronJobStatus.created_at).desc()) + result = await session.execute(statement) + jobs = result.scalars().all() + + if not jobs: + return [] + + # Build CronSummaryRead objects + summaries = [] + for job in jobs: + # Get gateway name + gateway_stmt = select(Gateway).where(Gateway.id == job.gateway_id) + gateway_result = await session.execute(gateway_stmt) + gateway = gateway_result.scalar_one_or_none() + gateway_name = gateway.name if gateway else "Unknown" + + # Extract model from metadata if present + model = None + if job.metadata_ and isinstance(job.metadata_, dict): + model = job.metadata_.get("model") + + summaries.append(CronSummaryRead( + gateway_id=job.gateway_id, + gateway_name=gateway_name, + job_name=job.job_name, + schedule=job.schedule, + enabled=job.enabled, + status=job.status, + last_run_at=job.last_run_at, + next_run_at=job.next_run_at, + last_error=job.last_error, + failure_count=job.failure_count, + model=model, + metadata_=job.metadata_, + )) + + return summaries + + +@router.get( + "/sessions-summary", + response_model=list[SessionSummaryRead], + summary="Sessions Summary", + description="Recent session events with context and token breakdown.", +) +async def get_sessions_summary( + gateway_id: UUID | None = GATEWAY_ID_QUERY, + session_key: str | None = SESSION_KEY_QUERY, + session: AsyncSession = SESSION_DEP, + ctx: OrganizationContext = ORG_MEMBER_DEP, +) -> list[SessionSummaryRead]: + """Get sessions summary for the organization. + + Returns recent session events with model display names and token breakdown. + + - **gateway_id**: Filter by specific gateway + - **session_key**: Filter by session key + """ + statement = select(SessionEvent).where( + SessionEvent.organization_id == ctx.organization.id + ) + + if gateway_id is not None: + statement = statement.where(SessionEvent.gateway_id == gateway_id) + + if session_key is not None: + statement = statement.where(SessionEvent.session_key == session_key) + + statement = statement.order_by(col(SessionEvent.created_at).desc()) + result = await session.execute(statement) + events = result.scalars().all() + + if not events: + return [] + + # Build SessionSummaryRead objects + from app.services.monitoring.data_processing import ModelName + summaries = [] + for event in events: + # Get gateway name + gateway_stmt = select(Gateway).where(Gateway.id == event.gateway_id) + gateway_result = await session.execute(gateway_stmt) + gateway = gateway_result.scalar_one_or_none() + gateway_name = gateway.name if gateway else "Unknown" + + # Get model display name + model_display_name = None + if event.model: + model_display_name = ModelName(event.model) + + # Extract token counts + prompt_tokens = None + completion_tokens = None + total_tokens = None + if event.token_counts and isinstance(event.token_counts, dict): + prompt_tokens = event.token_counts.get("prompt_tokens") + completion_tokens = event.token_counts.get("completion_tokens") + total_tokens = event.token_counts.get("total_tokens") + if total_tokens is None and prompt_tokens is not None and completion_tokens is not None: + total_tokens = prompt_tokens + completion_tokens + + summaries.append(SessionSummaryRead( + gateway_id=event.gateway_id, + gateway_name=gateway_name, + session_key=event.session_key, + event_type=event.event_type, + model=event.model, + model_display_name=model_display_name, + agent_id=event.agent_id, + channel=event.channel, + context_percent=event.context_percent, + prompt_tokens=prompt_tokens, + completion_tokens=completion_tokens, + total_tokens=total_tokens, + last_collected_at=event.collected_at, + )) + + return summaries + + +@router.get( + "/sub-agents-summary", + response_model=list[SubAgentSummaryRead], + summary="Sub-Agent Summary", + description="Sub-agent runs with cost, duration, and token breakdown.", +) +async def get_sub_agents_summary( + gateway_id: UUID | None = GATEWAY_ID_QUERY, + status: str | None = STATUS_QUERY, + agent: str | None = AGENT_QUERY, + session: AsyncSession = SESSION_DEP, + ctx: OrganizationContext = ORG_MEMBER_DEP, +) -> list[SubAgentSummaryRead]: + """Get sub-agents summary for the organization. + + Returns sub-agent runs with cost and token breakdown. + + - **gateway_id**: Filter by specific gateway + - **status**: Filter by run status (pending, running, succeeded, failed) + - **agent**: Filter by agent name + """ + statement = select(SubAgentRun).where( + SubAgentRun.organization_id == ctx.organization.id + ) + + if gateway_id is not None: + statement = statement.where(SubAgentRun.gateway_id == gateway_id) + + if status is not None: + statement = statement.where(SubAgentRun.status == status) + + if agent is not None: + statement = statement.where(SubAgentRun.agent == agent) + + statement = statement.order_by(col(SubAgentRun.created_at).desc()) + result = await session.execute(statement) + runs = result.scalars().all() + + if not runs: + return [] + + # Build SubAgentSummaryRead objects + from app.services.monitoring.data_processing import ModelName + summaries = [] + for run in runs: + # Get gateway name + gateway_stmt = select(Gateway).where(Gateway.id == run.gateway_id) + gateway_result = await session.execute(gateway_stmt) + gateway = gateway_result.scalar_one_or_none() + gateway_name = gateway.name if gateway else "Unknown" + + # Get model display name + model_display_name = None + if run.model: + model_display_name = ModelName(run.model) + + # Calculate duration in seconds + duration_seconds = None + if run.duration_ms: + duration_seconds = round(run.duration_ms / 1000, 2) + + # Extract token counts + prompt_tokens = None + completion_tokens = None + total_tokens = None + if run.token_counts and isinstance(run.token_counts, dict): + prompt_tokens = run.token_counts.get("prompt_tokens") + completion_tokens = run.token_counts.get("completion_tokens") + total_tokens = run.token_counts.get("total_tokens") + if total_tokens is None and prompt_tokens is not None and completion_tokens is not None: + total_tokens = prompt_tokens + completion_tokens + + summaries.append(SubAgentSummaryRead( + gateway_id=run.gateway_id, + gateway_name=gateway_name, + agent_id=run.agent, + session_key=run.parent_session_key, + model=run.model, + model_display_name=model_display_name, + status=run.status, + cost=run.cost, + duration_seconds=duration_seconds, + prompt_tokens=prompt_tokens, + completion_tokens=completion_tokens, + total_tokens=total_tokens, + last_collected_at=run.collected_at, + )) + + return summaries + + +@router.get( + "/trends", + response_model=list[TrendRead], + summary="Trends Summary", + description="Cost and token trends over time (7d or 30d).", +) +async def get_trends( + gateway_id: UUID | None = GATEWAY_ID_QUERY, + range_param: str = Query(default="7d", description="Time range: 7d or 30d"), + session: AsyncSession = SESSION_DEP, + ctx: OrganizationContext = ORG_MEMBER_DEP, +) -> list[TrendRead]: + """Get cost and token trends for the organization. + + Returns daily aggregated cost and token data for the specified time range. + + - **gateway_id**: Filter by specific gateway + - **range**: Time range - "7d" (default) or "30d" + """ + import re + from datetime import date, timedelta + from app.services.monitoring.data_processing import ModelName + + # Parse range + range_days = 7 + if range_param == "30d": + range_days = 30 + elif re.match(r"^(\d+)d$", range_param): + range_days = int(range_param[:-1]) + + # Get all cost snapshots + statement = select(CostSnapshot).where( + CostSnapshot.organization_id == ctx.organization.id + ) + + if gateway_id is not None: + statement = statement.where(CostSnapshot.gateway_id == gateway_id) + + statement = statement.order_by(col(CostSnapshot.created_at).desc()) + result = await session.execute(statement) + snapshots = result.scalars().all() + + if not snapshots: + return [] + + # Group by gateway and get latest snapshot per gateway + from collections import defaultdict + by_gateway: dict[UUID, CostSnapshot] = {} + for snapshot in snapshots: + if snapshot.gateway_id not in by_gateway or snapshot.collected_at > by_gateway[snapshot.gateway_id].collected_at: + by_gateway[snapshot.gateway_id] = snapshot + + # Build daily buckets + today = date.today() + chart_dates = [] + for i in range(range_days - 1, -1, -1): + chart_dates.append((today - timedelta(days=i)).strftime("%Y-%m-%d")) + + # Aggregate daily costs from snapshots + daily_costs: dict[str, dict[str, float]] = defaultdict(dict) + daily_tokens: dict[str, dict[str, int]] = defaultdict(dict) + daily_calls: dict[str, dict[str, int]] = defaultdict(dict) + + for gw_id, snapshot in by_gateway.items(): + # Get gateway name + gateway_stmt = select(Gateway).where(Gateway.id == gw_id) + gateway_result = await session.execute(gateway_stmt) + gateway = gateway_result.scalar_one_or_none() + gateway_name = gateway.name if gateway else "Unknown" + + # Only include if we have model costs + if snapshot.model_costs: + for model, cost in snapshot.model_costs.items(): + # Use today's date for simplicity (would need date tracking in schema for real daily breakdown) + date_str = snapshot.collected_at.strftime("%Y-%m-%d") + if date_str not in daily_costs: + daily_costs[date_str] = {} + if date_str not in daily_tokens: + daily_tokens[date_str] = {} + daily_costs[date_str][model] = cost + if snapshot.token_counts and model in snapshot.token_counts: + daily_tokens[date_str][model] = snapshot.token_counts[model] + + # Build trend data per gateway + trends = [] + for gw_id, snapshot in by_gateway.items(): + # Get gateway name + gateway_stmt = select(Gateway).where(Gateway.id == gw_id) + gateway_result = await session.execute(gateway_stmt) + gateway = gateway_result.scalar_one_or_none() + gateway_name = gateway.name if gateway else "Unknown" + + # Build days list + days_data = [] + for d in chart_dates: + day_costs = daily_costs.get(d, {}) + day_tokens = daily_tokens.get(d, {}) + + total_cost = sum(day_costs.values()) + total_tokens = sum(day_tokens.values()) + + # Build model_costs dict + model_costs = {} + for m, c in day_costs.items(): + # Use ModelName for display + display_name = ModelName(m) + model_costs[display_name] = round(c, 2) + + days_data.append(TrendDay( + date=d, + total_cost=round(total_cost, 2), + total_tokens=total_tokens, + model_costs=model_costs, + )) + + trends.append(TrendRead( + gateway_id=gw_id, + gateway_name=gateway_name, + range=range_param, + days=days_data, + last_collected_at=snapshot.collected_at, + )) + + return trends diff --git a/src/backend/app/schemas/monitoring.py b/src/backend/app/schemas/monitoring.py index 57af5e8..a8f9dc6 100644 --- a/src/backend/app/schemas/monitoring.py +++ b/src/backend/app/schemas/monitoring.py @@ -136,3 +136,92 @@ class CostBreakdownRead(SQLModel): total_cost: float breakdown: list[dict[str, float]] # [{"model": "claude-opus", "cost": 1.23, "percent": 45.6}] last_collected_at: datetime + + +class HealthSummaryRead(SQLModel): + """Health summary payload - gateway health status and metrics.""" + + gateway_id: UUID + gateway_name: str + status: str + pid: int | None = None + uptime_seconds: int | None = None + memory_mb: float | None = None + cpu_percent: float | None = None + ram_percent: float | None = None + swap_percent: float | None = None + disk_percent: float | None = None + compaction_mode: str | None = None + last_collected_at: datetime + + +class CronSummaryRead(SQLModel): + """Cron summary payload - cron job status and scheduling.""" + + gateway_id: UUID + gateway_name: str + job_name: str + schedule: str + enabled: bool + status: str + last_run_at: datetime | None = None + next_run_at: datetime | None = None + last_error: str | None = None + failure_count: int + model: str | None = None + metadata_: dict | None = None + + +class SessionSummaryRead(SQLModel): + """Session summary payload - session events with context and token info.""" + + gateway_id: UUID + gateway_name: str + session_key: str + event_type: str + model: str | None = None + model_display_name: str | None = None + agent_id: str | None = None + channel: str | None = None + context_percent: float | None = None + prompt_tokens: int | None = None + completion_tokens: int | None = None + total_tokens: int | None = None + last_collected_at: datetime + + +class SubAgentSummaryRead(SQLModel): + """Sub-agent summary payload - sub-agent runs with cost and token breakdown.""" + + gateway_id: UUID + gateway_name: str + agent_id: str | None = None + session_key: str + model: str | None = None + model_display_name: str | None = None + status: str + cost: float | None = None + duration_seconds: float | None = None + prompt_tokens: int | None = None + completion_tokens: int | None = None + total_tokens: int | None = None + last_collected_at: datetime + + +class TrendDay(SQLModel): + """Single day of trend data.""" + + date: str + total_cost: float + total_tokens: int + model_costs: dict[str, float] + + +class TrendRead(SQLModel): + """Trend summary payload - cost and token trends over time.""" + + gateway_id: UUID + gateway_name: str + range: str # "7d" or "30d" + days: list[TrendDay] + last_collected_at: datetime