diff --git a/src/backend/app/api/monitoring.py b/src/backend/app/api/monitoring.py new file mode 100644 index 0000000..489d7ba --- /dev/null +++ b/src/backend/app/api/monitoring.py @@ -0,0 +1,333 @@ +"""Monitoring API endpoints for Mission Control.""" + +from __future__ import annotations + +from datetime import datetime +from uuid import UUID + +from fastapi import APIRouter, Depends, HTTPException, Query +from sqlmodel import col, select +from sqlmodel.ext.asyncio.session import AsyncSession + +from app.api.deps import OrganizationContext, require_org_member +from app.core.logging import get_logger +from app.db import pagination +from app.db.session import get_session +from app.models.monitoring import ( + CostSnapshot, + CronJobStatus, + SessionEvent, + SubAgentRun, + SystemHealthMetric, +) +from app.schemas.monitoring import ( + CostSnapshotRead, + CronJobStatusRead, + SessionEventRead, + SubAgentRunRead, + SystemHealthMetricRead, +) +from app.schemas.pagination import DefaultLimitOffsetPage + +router = APIRouter(prefix="/monitoring", tags=["monitoring"]) +SESSION_DEP = Depends(get_session) +ORG_MEMBER_DEP = Depends(require_org_member) + +_logger = get_logger(__name__) + +# Query parameter constants +GATEWAY_ID_QUERY = Query(default=None, description="Filter by gateway ID") +JOB_NAME_QUERY = Query(default=None, description="Filter by job name") +ENABLED_QUERY = Query(default=None, description="Filter by enabled status") +SESSION_KEY_QUERY = Query(default=None, description="Filter by session key") +MODEL_QUERY = Query(default=None, description="Filter by model name") +EVENT_TYPE_QUERY = Query(default=None, description="Filter by event type") +STATUS_QUERY = Query(default=None, description="Filter by status") +AGENT_QUERY = Query(default=None, description="Filter by agent name") +START_DATE_QUERY = Query(default=None, description="Filter from this date (inclusive)") +END_DATE_QUERY = Query(default=None, description="Filter until this date (inclusive)") + + +@router.get( + "/cost-snapshots", + response_model=DefaultLimitOffsetPage[CostSnapshotRead], + summary="List Cost Snapshots", + description="List cost snapshots for gateways in the caller's organization.", +) +async def list_cost_snapshots( + gateway_id: UUID | None = GATEWAY_ID_QUERY, + start: datetime | None = START_DATE_QUERY, + end: datetime | None = END_DATE_QUERY, + session: AsyncSession = SESSION_DEP, + ctx: OrganizationContext = ORG_MEMBER_DEP, +) -> DefaultLimitOffsetPage[CostSnapshotRead]: + """List cost snapshots with optional filters. + + - **gateway_id**: Filter by specific gateway + - **start**: Filter snapshots from this date (inclusive) + - **end**: Filter snapshots until this date (inclusive) + """ + statement = select(CostSnapshot).where( + CostSnapshot.organization_id == ctx.organization.id + ) + + if gateway_id is not None: + statement = statement.where(CostSnapshot.gateway_id == gateway_id) + + if start is not None: + statement = statement.where(CostSnapshot.collected_at >= start) + + if end is not None: + statement = statement.where(CostSnapshot.collected_at <= end) + + statement = statement.order_by(col(CostSnapshot.created_at).desc()) + return await pagination.paginate(session, statement) + + +@router.get( + "/cost-snapshots/{id}", + response_model=CostSnapshotRead, + summary="Get Cost Snapshot", + description="Get a single cost snapshot by ID.", +) +async def get_cost_snapshot( + id: UUID, + session: AsyncSession = SESSION_DEP, + ctx: OrganizationContext = ORG_MEMBER_DEP, +) -> CostSnapshot: + """Get a cost snapshot by ID.""" + snapshot = await CostSnapshot.objects.by_id(id).first(session) + if snapshot is None: + raise HTTPException(status_code=404, detail="Cost snapshot not found") + if snapshot.organization_id != ctx.organization.id: + raise HTTPException(status_code=403, detail="Access denied") + return snapshot + + +@router.get( + "/cron-jobs", + response_model=DefaultLimitOffsetPage[CronJobStatusRead], + summary="List Cron Job Statuses", + description="List cron job statuses for gateways in the caller's organization.", +) +async def list_cron_jobs( + gateway_id: UUID | None = GATEWAY_ID_QUERY, + enabled: bool | None = ENABLED_QUERY, + job_name: str | None = JOB_NAME_QUERY, + session: AsyncSession = SESSION_DEP, + ctx: OrganizationContext = ORG_MEMBER_DEP, +) -> DefaultLimitOffsetPage[CronJobStatusRead]: + """List cron job statuses with optional filters. + + - **gateway_id**: Filter by specific gateway + - **enabled**: Filter by enabled status + - **job_name**: Filter by job name (exact match) + """ + statement = select(CronJobStatus).where( + CronJobStatus.organization_id == ctx.organization.id + ) + + if gateway_id is not None: + statement = statement.where(CronJobStatus.gateway_id == gateway_id) + + if enabled is not None: + statement = statement.where(CronJobStatus.enabled == enabled) + + if job_name is not None: + statement = statement.where(CronJobStatus.job_name == job_name) + + statement = statement.order_by(col(CronJobStatus.created_at).desc()) + return await pagination.paginate(session, statement) + + +@router.get( + "/cron-jobs/{id}", + response_model=CronJobStatusRead, + summary="Get Cron Job Status", + description="Get a single cron job status by ID.", +) +async def get_cron_job( + id: UUID, + session: AsyncSession = SESSION_DEP, + ctx: OrganizationContext = ORG_MEMBER_DEP, +) -> CronJobStatus: + """Get a cron job status by ID.""" + job = await CronJobStatus.objects.by_id(id).first(session) + if job is None: + raise HTTPException(status_code=404, detail="Cron job status not found") + if job.organization_id != ctx.organization.id: + raise HTTPException(status_code=403, detail="Access denied") + return job + + +@router.get( + "/sessions", + response_model=DefaultLimitOffsetPage[SessionEventRead], + summary="List Session Events", + description="List session events for gateways in the caller's organization.", +) +async def list_sessions( + gateway_id: UUID | None = GATEWAY_ID_QUERY, + session_key: str | None = SESSION_KEY_QUERY, + model: str | None = MODEL_QUERY, + event_type: str | None = EVENT_TYPE_QUERY, + session: AsyncSession = SESSION_DEP, + ctx: OrganizationContext = ORG_MEMBER_DEP, +) -> DefaultLimitOffsetPage[SessionEventRead]: + """List session events with optional filters. + + - **gateway_id**: Filter by specific gateway + - **session_key**: Filter by session key + - **model**: Filter by model name + - **event_type**: Filter by event type + """ + statement = select(SessionEvent).where( + SessionEvent.organization_id == ctx.organization.id + ) + + if gateway_id is not None: + statement = statement.where(SessionEvent.gateway_id == gateway_id) + + if session_key is not None: + statement = statement.where(SessionEvent.session_key == session_key) + + if model is not None: + statement = statement.where(SessionEvent.model == model) + + if event_type is not None: + statement = statement.where(SessionEvent.event_type == event_type) + + statement = statement.order_by(col(SessionEvent.created_at).desc()) + return await pagination.paginate(session, statement) + + +@router.get( + "/sessions/{id}", + response_model=SessionEventRead, + summary="Get Session Event", + description="Get a single session event by ID.", +) +async def get_session_event( + id: UUID, + session: AsyncSession = SESSION_DEP, + ctx: OrganizationContext = ORG_MEMBER_DEP, +) -> SessionEvent: + """Get a session event by ID.""" + event = await SessionEvent.objects.by_id(id).first(session) + if event is None: + raise HTTPException(status_code=404, detail="Session event not found") + if event.organization_id != ctx.organization.id: + raise HTTPException(status_code=403, detail="Access denied") + return event + + +@router.get( + "/health", + response_model=DefaultLimitOffsetPage[SystemHealthMetricRead], + summary="List System Health Metrics", + description="List system health metrics for gateways in the caller's organization.", +) +async def list_health_metrics( + gateway_id: UUID | None = GATEWAY_ID_QUERY, + start: datetime | None = START_DATE_QUERY, + end: datetime | None = END_DATE_QUERY, + session: AsyncSession = SESSION_DEP, + ctx: OrganizationContext = ORG_MEMBER_DEP, +) -> DefaultLimitOffsetPage[SystemHealthMetricRead]: + """List system health metrics with optional filters. + + - **gateway_id**: Filter by specific gateway + - **start**: Filter metrics from this date (inclusive) + - **end**: Filter metrics until this date (inclusive) + """ + statement = select(SystemHealthMetric).where( + SystemHealthMetric.organization_id == ctx.organization.id + ) + + if gateway_id is not None: + statement = statement.where(SystemHealthMetric.gateway_id == gateway_id) + + if start is not None: + statement = statement.where(SystemHealthMetric.collected_at >= start) + + if end is not None: + statement = statement.where(SystemHealthMetric.collected_at <= end) + + statement = statement.order_by(col(SystemHealthMetric.created_at).desc()) + return await pagination.paginate(session, statement) + + +@router.get( + "/health/{id}", + response_model=SystemHealthMetricRead, + summary="Get System Health Metric", + description="Get a single system health metric by ID.", +) +async def get_health_metric( + id: UUID, + session: AsyncSession = SESSION_DEP, + ctx: OrganizationContext = ORG_MEMBER_DEP, +) -> SystemHealthMetric: + """Get a system health metric by ID.""" + metric = await SystemHealthMetric.objects.by_id(id).first(session) + if metric is None: + raise HTTPException(status_code=404, detail="Health metric not found") + if metric.organization_id != ctx.organization.id: + raise HTTPException(status_code=403, detail="Access denied") + return metric + + +@router.get( + "/sub-agents", + response_model=DefaultLimitOffsetPage[SubAgentRunRead], + summary="List Sub-Agent Runs", + description="List sub-agent runs for gateways in the caller's organization.", +) +async def list_sub_agents( + gateway_id: UUID | None = GATEWAY_ID_QUERY, + status: str | None = STATUS_QUERY, + agent: str | None = AGENT_QUERY, + session: AsyncSession = SESSION_DEP, + ctx: OrganizationContext = ORG_MEMBER_DEP, +) -> DefaultLimitOffsetPage[SubAgentRunRead]: + """List sub-agent runs with optional filters. + + - **gateway_id**: Filter by specific gateway + - **status**: Filter by run status (pending, running, succeeded, failed) + - **agent**: Filter by agent name + """ + statement = select(SubAgentRun).where( + SubAgentRun.organization_id == ctx.organization.id + ) + + if gateway_id is not None: + statement = statement.where(SubAgentRun.gateway_id == gateway_id) + + if status is not None: + statement = statement.where(SubAgentRun.status == status) + + if agent is not None: + statement = statement.where(SubAgentRun.agent == agent) + + statement = statement.order_by(col(SubAgentRun.created_at).desc()) + return await pagination.paginate(session, statement) + + +@router.get( + "/sub-agents/{id}", + response_model=SubAgentRunRead, + summary="Get Sub-Agent Run", + description="Get a single sub-agent run by ID.", +) +async def get_sub_agent_run( + id: UUID, + session: AsyncSession = SESSION_DEP, + ctx: OrganizationContext = ORG_MEMBER_DEP, +) -> SubAgentRun: + """Get a sub-agent run by ID.""" + run = await SubAgentRun.objects.by_id(id).first(session) + if run is None: + raise HTTPException(status_code=404, detail="Sub-agent run not found") + if run.organization_id != ctx.organization.id: + raise HTTPException(status_code=403, detail="Access denied") + return run diff --git a/src/backend/app/main.py b/src/backend/app/main.py index 0a3bd33..ddb3f0e 100644 --- a/src/backend/app/main.py +++ b/src/backend/app/main.py @@ -25,6 +25,7 @@ from app.api.boards import router as boards_router from app.api.gateway import router as gateway_router from app.api.gateways import router as gateways_router from app.api.metrics import router as metrics_router +from app.api.monitoring import router as monitoring_router from app.api.organizations import router as organizations_router from app.api.skills_marketplace import router as skills_marketplace_router from app.api.souls_directory import router as souls_directory_router @@ -582,6 +583,7 @@ api_v1.include_router(activity_router) api_v1.include_router(gateway_router) api_v1.include_router(gateways_router) api_v1.include_router(metrics_router) +api_v1.include_router(monitoring_router) api_v1.include_router(organizations_router) api_v1.include_router(souls_directory_router) api_v1.include_router(skills_marketplace_router) diff --git a/src/backend/app/schemas/__init__.py b/src/backend/app/schemas/__init__.py index 163e258..99b9129 100644 --- a/src/backend/app/schemas/__init__.py +++ b/src/backend/app/schemas/__init__.py @@ -21,6 +21,13 @@ from app.schemas.board_webhooks import ( from app.schemas.boards import BoardCreate, BoardRead, BoardUpdate from app.schemas.gateways import GatewayCreate, GatewayRead, GatewayUpdate from app.schemas.metrics import DashboardMetrics +from app.schemas.monitoring import ( + CostSnapshotRead, + CronJobStatusRead, + SessionEventRead, + SubAgentRunRead, + SystemHealthMetricRead, +) from app.schemas.organizations import ( OrganizationActiveUpdate, OrganizationCreate, @@ -79,6 +86,11 @@ __all__ = [ "GatewayRead", "GatewayUpdate", "DashboardMetrics", + "CostSnapshotRead", + "CronJobStatusRead", + "SessionEventRead", + "SubAgentRunRead", + "SystemHealthMetricRead", "OrganizationActiveUpdate", "OrganizationCreate", "OrganizationInviteAccept", diff --git a/src/backend/app/schemas/monitoring.py b/src/backend/app/schemas/monitoring.py new file mode 100644 index 0000000..405ca65 --- /dev/null +++ b/src/backend/app/schemas/monitoring.py @@ -0,0 +1,113 @@ +"""Monitoring schemas for Mission Control API.""" + +from __future__ import annotations + +from datetime import datetime +from uuid import UUID + +from sqlmodel import SQLModel + +from app.schemas.pagination import DefaultLimitOffsetPage + + +class CostSnapshotRead(SQLModel): + """Cost snapshot read payload.""" + + id: UUID + organization_id: UUID + gateway_id: UUID + period_start: datetime + period_end: datetime + total_cost: float + model_costs: dict | None = None + provider_costs: dict | None = None + token_counts: dict | None = None + collected_at: datetime + created_at: datetime + updated_at: datetime + + +class CronJobStatusRead(SQLModel): + """Cron job status read payload.""" + + id: UUID + organization_id: UUID + gateway_id: UUID + job_name: str + schedule: str + enabled: bool + last_run_at: datetime | None = None + next_run_at: datetime | None = None + status: str + failure_count: int + last_error: str | None = None + metadata_: dict | None = None + created_at: datetime + updated_at: datetime + + +class SessionEventRead(SQLModel): + """Session event read payload.""" + + id: UUID + organization_id: UUID + gateway_id: UUID + session_key: str + event_type: str + model: str | None = None + agent_id: str | None = None + channel: str | None = None + context_percent: float | None = None + token_counts: dict | None = None + cost: float | None = None + metadata_: dict | None = None + created_at: datetime + updated_at: datetime + + +class SubAgentRunRead(SQLModel): + """Sub-agent run read payload.""" + + id: UUID + organization_id: UUID + gateway_id: UUID + parent_session_key: str + session_event_id: UUID | None = None + agent: str | None = None + model: str | None = None + status: str + duration_ms: int | None = None + cost: float | None = None + token_counts: dict | None = None + metadata_: dict | None = None + created_at: datetime + updated_at: datetime + + +class SystemHealthMetricRead(SQLModel): + """System health metric read payload.""" + + id: UUID + organization_id: UUID + gateway_id: UUID + cpu_percent: float | None = None + cpu_cores: int | None = None + ram_used_bytes: int | None = None + ram_total_bytes: int | None = None + ram_percent: float | None = None + swap_used_bytes: int | None = None + swap_total_bytes: int | None = None + swap_percent: float | None = None + disk_path: str + disk_used_bytes: int | None = None + disk_total_bytes: int | None = None + disk_percent: float | None = None + gateway_live: bool | None = None + gateway_ready: bool | None = None + gateway_uptime_ms: int | None = None + gateway_pid: int | None = None + gateway_version: str | None = None + metadata_: dict | None = None + collected_at: datetime + created_at: datetime + updated_at: datetime