feat: add monitoring API endpoints (10 read-only endpoints)

- GET /monitoring/cost-snapshots + /{id} — list/detail with gateway_id, date range filters
- GET /monitoring/cron-jobs + /{id} — list/detail with gateway_id, enabled, job_name filters
- GET /monitoring/sessions + /{id} — list/detail with gateway_id, session_key, model, event_type filters
- GET /monitoring/health + /{id} — list/detail with gateway_id, date range filters
- GET /monitoring/sub-agents + /{id} — list/detail with gateway_id, status, agent filters
- All endpoints org-scoped (require_org_member), paginated (DefaultLimitOffsetPage)
- Pydantic Read schemas for all 5 monitoring models
- Router registered in main.py
- Removed unused imports (OkResponse, utcnow)
This commit is contained in:
Ripley 2026-05-10 20:44:44 -05:00
parent 85e805c388
commit 638bcd2d91
4 changed files with 460 additions and 0 deletions

View File

@ -0,0 +1,333 @@
"""Monitoring API endpoints for Mission Control."""
from __future__ import annotations
from datetime import datetime
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlmodel import col, select
from sqlmodel.ext.asyncio.session import AsyncSession
from app.api.deps import OrganizationContext, require_org_member
from app.core.logging import get_logger
from app.db import pagination
from app.db.session import get_session
from app.models.monitoring import (
CostSnapshot,
CronJobStatus,
SessionEvent,
SubAgentRun,
SystemHealthMetric,
)
from app.schemas.monitoring import (
CostSnapshotRead,
CronJobStatusRead,
SessionEventRead,
SubAgentRunRead,
SystemHealthMetricRead,
)
from app.schemas.pagination import DefaultLimitOffsetPage
router = APIRouter(prefix="/monitoring", tags=["monitoring"])
SESSION_DEP = Depends(get_session)
ORG_MEMBER_DEP = Depends(require_org_member)
_logger = get_logger(__name__)
# Query parameter constants
GATEWAY_ID_QUERY = Query(default=None, description="Filter by gateway ID")
JOB_NAME_QUERY = Query(default=None, description="Filter by job name")
ENABLED_QUERY = Query(default=None, description="Filter by enabled status")
SESSION_KEY_QUERY = Query(default=None, description="Filter by session key")
MODEL_QUERY = Query(default=None, description="Filter by model name")
EVENT_TYPE_QUERY = Query(default=None, description="Filter by event type")
STATUS_QUERY = Query(default=None, description="Filter by status")
AGENT_QUERY = Query(default=None, description="Filter by agent name")
START_DATE_QUERY = Query(default=None, description="Filter from this date (inclusive)")
END_DATE_QUERY = Query(default=None, description="Filter until this date (inclusive)")
@router.get(
"/cost-snapshots",
response_model=DefaultLimitOffsetPage[CostSnapshotRead],
summary="List Cost Snapshots",
description="List cost snapshots for gateways in the caller's organization.",
)
async def list_cost_snapshots(
gateway_id: UUID | None = GATEWAY_ID_QUERY,
start: datetime | None = START_DATE_QUERY,
end: datetime | None = END_DATE_QUERY,
session: AsyncSession = SESSION_DEP,
ctx: OrganizationContext = ORG_MEMBER_DEP,
) -> DefaultLimitOffsetPage[CostSnapshotRead]:
"""List cost snapshots with optional filters.
- **gateway_id**: Filter by specific gateway
- **start**: Filter snapshots from this date (inclusive)
- **end**: Filter snapshots until this date (inclusive)
"""
statement = select(CostSnapshot).where(
CostSnapshot.organization_id == ctx.organization.id
)
if gateway_id is not None:
statement = statement.where(CostSnapshot.gateway_id == gateway_id)
if start is not None:
statement = statement.where(CostSnapshot.collected_at >= start)
if end is not None:
statement = statement.where(CostSnapshot.collected_at <= end)
statement = statement.order_by(col(CostSnapshot.created_at).desc())
return await pagination.paginate(session, statement)
@router.get(
"/cost-snapshots/{id}",
response_model=CostSnapshotRead,
summary="Get Cost Snapshot",
description="Get a single cost snapshot by ID.",
)
async def get_cost_snapshot(
id: UUID,
session: AsyncSession = SESSION_DEP,
ctx: OrganizationContext = ORG_MEMBER_DEP,
) -> CostSnapshot:
"""Get a cost snapshot by ID."""
snapshot = await CostSnapshot.objects.by_id(id).first(session)
if snapshot is None:
raise HTTPException(status_code=404, detail="Cost snapshot not found")
if snapshot.organization_id != ctx.organization.id:
raise HTTPException(status_code=403, detail="Access denied")
return snapshot
@router.get(
"/cron-jobs",
response_model=DefaultLimitOffsetPage[CronJobStatusRead],
summary="List Cron Job Statuses",
description="List cron job statuses for gateways in the caller's organization.",
)
async def list_cron_jobs(
gateway_id: UUID | None = GATEWAY_ID_QUERY,
enabled: bool | None = ENABLED_QUERY,
job_name: str | None = JOB_NAME_QUERY,
session: AsyncSession = SESSION_DEP,
ctx: OrganizationContext = ORG_MEMBER_DEP,
) -> DefaultLimitOffsetPage[CronJobStatusRead]:
"""List cron job statuses with optional filters.
- **gateway_id**: Filter by specific gateway
- **enabled**: Filter by enabled status
- **job_name**: Filter by job name (exact match)
"""
statement = select(CronJobStatus).where(
CronJobStatus.organization_id == ctx.organization.id
)
if gateway_id is not None:
statement = statement.where(CronJobStatus.gateway_id == gateway_id)
if enabled is not None:
statement = statement.where(CronJobStatus.enabled == enabled)
if job_name is not None:
statement = statement.where(CronJobStatus.job_name == job_name)
statement = statement.order_by(col(CronJobStatus.created_at).desc())
return await pagination.paginate(session, statement)
@router.get(
"/cron-jobs/{id}",
response_model=CronJobStatusRead,
summary="Get Cron Job Status",
description="Get a single cron job status by ID.",
)
async def get_cron_job(
id: UUID,
session: AsyncSession = SESSION_DEP,
ctx: OrganizationContext = ORG_MEMBER_DEP,
) -> CronJobStatus:
"""Get a cron job status by ID."""
job = await CronJobStatus.objects.by_id(id).first(session)
if job is None:
raise HTTPException(status_code=404, detail="Cron job status not found")
if job.organization_id != ctx.organization.id:
raise HTTPException(status_code=403, detail="Access denied")
return job
@router.get(
"/sessions",
response_model=DefaultLimitOffsetPage[SessionEventRead],
summary="List Session Events",
description="List session events for gateways in the caller's organization.",
)
async def list_sessions(
gateway_id: UUID | None = GATEWAY_ID_QUERY,
session_key: str | None = SESSION_KEY_QUERY,
model: str | None = MODEL_QUERY,
event_type: str | None = EVENT_TYPE_QUERY,
session: AsyncSession = SESSION_DEP,
ctx: OrganizationContext = ORG_MEMBER_DEP,
) -> DefaultLimitOffsetPage[SessionEventRead]:
"""List session events with optional filters.
- **gateway_id**: Filter by specific gateway
- **session_key**: Filter by session key
- **model**: Filter by model name
- **event_type**: Filter by event type
"""
statement = select(SessionEvent).where(
SessionEvent.organization_id == ctx.organization.id
)
if gateway_id is not None:
statement = statement.where(SessionEvent.gateway_id == gateway_id)
if session_key is not None:
statement = statement.where(SessionEvent.session_key == session_key)
if model is not None:
statement = statement.where(SessionEvent.model == model)
if event_type is not None:
statement = statement.where(SessionEvent.event_type == event_type)
statement = statement.order_by(col(SessionEvent.created_at).desc())
return await pagination.paginate(session, statement)
@router.get(
"/sessions/{id}",
response_model=SessionEventRead,
summary="Get Session Event",
description="Get a single session event by ID.",
)
async def get_session_event(
id: UUID,
session: AsyncSession = SESSION_DEP,
ctx: OrganizationContext = ORG_MEMBER_DEP,
) -> SessionEvent:
"""Get a session event by ID."""
event = await SessionEvent.objects.by_id(id).first(session)
if event is None:
raise HTTPException(status_code=404, detail="Session event not found")
if event.organization_id != ctx.organization.id:
raise HTTPException(status_code=403, detail="Access denied")
return event
@router.get(
"/health",
response_model=DefaultLimitOffsetPage[SystemHealthMetricRead],
summary="List System Health Metrics",
description="List system health metrics for gateways in the caller's organization.",
)
async def list_health_metrics(
gateway_id: UUID | None = GATEWAY_ID_QUERY,
start: datetime | None = START_DATE_QUERY,
end: datetime | None = END_DATE_QUERY,
session: AsyncSession = SESSION_DEP,
ctx: OrganizationContext = ORG_MEMBER_DEP,
) -> DefaultLimitOffsetPage[SystemHealthMetricRead]:
"""List system health metrics with optional filters.
- **gateway_id**: Filter by specific gateway
- **start**: Filter metrics from this date (inclusive)
- **end**: Filter metrics until this date (inclusive)
"""
statement = select(SystemHealthMetric).where(
SystemHealthMetric.organization_id == ctx.organization.id
)
if gateway_id is not None:
statement = statement.where(SystemHealthMetric.gateway_id == gateway_id)
if start is not None:
statement = statement.where(SystemHealthMetric.collected_at >= start)
if end is not None:
statement = statement.where(SystemHealthMetric.collected_at <= end)
statement = statement.order_by(col(SystemHealthMetric.created_at).desc())
return await pagination.paginate(session, statement)
@router.get(
"/health/{id}",
response_model=SystemHealthMetricRead,
summary="Get System Health Metric",
description="Get a single system health metric by ID.",
)
async def get_health_metric(
id: UUID,
session: AsyncSession = SESSION_DEP,
ctx: OrganizationContext = ORG_MEMBER_DEP,
) -> SystemHealthMetric:
"""Get a system health metric by ID."""
metric = await SystemHealthMetric.objects.by_id(id).first(session)
if metric is None:
raise HTTPException(status_code=404, detail="Health metric not found")
if metric.organization_id != ctx.organization.id:
raise HTTPException(status_code=403, detail="Access denied")
return metric
@router.get(
"/sub-agents",
response_model=DefaultLimitOffsetPage[SubAgentRunRead],
summary="List Sub-Agent Runs",
description="List sub-agent runs for gateways in the caller's organization.",
)
async def list_sub_agents(
gateway_id: UUID | None = GATEWAY_ID_QUERY,
status: str | None = STATUS_QUERY,
agent: str | None = AGENT_QUERY,
session: AsyncSession = SESSION_DEP,
ctx: OrganizationContext = ORG_MEMBER_DEP,
) -> DefaultLimitOffsetPage[SubAgentRunRead]:
"""List sub-agent runs with optional filters.
- **gateway_id**: Filter by specific gateway
- **status**: Filter by run status (pending, running, succeeded, failed)
- **agent**: Filter by agent name
"""
statement = select(SubAgentRun).where(
SubAgentRun.organization_id == ctx.organization.id
)
if gateway_id is not None:
statement = statement.where(SubAgentRun.gateway_id == gateway_id)
if status is not None:
statement = statement.where(SubAgentRun.status == status)
if agent is not None:
statement = statement.where(SubAgentRun.agent == agent)
statement = statement.order_by(col(SubAgentRun.created_at).desc())
return await pagination.paginate(session, statement)
@router.get(
"/sub-agents/{id}",
response_model=SubAgentRunRead,
summary="Get Sub-Agent Run",
description="Get a single sub-agent run by ID.",
)
async def get_sub_agent_run(
id: UUID,
session: AsyncSession = SESSION_DEP,
ctx: OrganizationContext = ORG_MEMBER_DEP,
) -> SubAgentRun:
"""Get a sub-agent run by ID."""
run = await SubAgentRun.objects.by_id(id).first(session)
if run is None:
raise HTTPException(status_code=404, detail="Sub-agent run not found")
if run.organization_id != ctx.organization.id:
raise HTTPException(status_code=403, detail="Access denied")
return run

View File

@ -25,6 +25,7 @@ from app.api.boards import router as boards_router
from app.api.gateway import router as gateway_router
from app.api.gateways import router as gateways_router
from app.api.metrics import router as metrics_router
from app.api.monitoring import router as monitoring_router
from app.api.organizations import router as organizations_router
from app.api.skills_marketplace import router as skills_marketplace_router
from app.api.souls_directory import router as souls_directory_router
@ -582,6 +583,7 @@ api_v1.include_router(activity_router)
api_v1.include_router(gateway_router)
api_v1.include_router(gateways_router)
api_v1.include_router(metrics_router)
api_v1.include_router(monitoring_router)
api_v1.include_router(organizations_router)
api_v1.include_router(souls_directory_router)
api_v1.include_router(skills_marketplace_router)

View File

@ -21,6 +21,13 @@ from app.schemas.board_webhooks import (
from app.schemas.boards import BoardCreate, BoardRead, BoardUpdate
from app.schemas.gateways import GatewayCreate, GatewayRead, GatewayUpdate
from app.schemas.metrics import DashboardMetrics
from app.schemas.monitoring import (
CostSnapshotRead,
CronJobStatusRead,
SessionEventRead,
SubAgentRunRead,
SystemHealthMetricRead,
)
from app.schemas.organizations import (
OrganizationActiveUpdate,
OrganizationCreate,
@ -79,6 +86,11 @@ __all__ = [
"GatewayRead",
"GatewayUpdate",
"DashboardMetrics",
"CostSnapshotRead",
"CronJobStatusRead",
"SessionEventRead",
"SubAgentRunRead",
"SystemHealthMetricRead",
"OrganizationActiveUpdate",
"OrganizationCreate",
"OrganizationInviteAccept",

View File

@ -0,0 +1,113 @@
"""Monitoring schemas for Mission Control API."""
from __future__ import annotations
from datetime import datetime
from uuid import UUID
from sqlmodel import SQLModel
from app.schemas.pagination import DefaultLimitOffsetPage
class CostSnapshotRead(SQLModel):
"""Cost snapshot read payload."""
id: UUID
organization_id: UUID
gateway_id: UUID
period_start: datetime
period_end: datetime
total_cost: float
model_costs: dict | None = None
provider_costs: dict | None = None
token_counts: dict | None = None
collected_at: datetime
created_at: datetime
updated_at: datetime
class CronJobStatusRead(SQLModel):
"""Cron job status read payload."""
id: UUID
organization_id: UUID
gateway_id: UUID
job_name: str
schedule: str
enabled: bool
last_run_at: datetime | None = None
next_run_at: datetime | None = None
status: str
failure_count: int
last_error: str | None = None
metadata_: dict | None = None
created_at: datetime
updated_at: datetime
class SessionEventRead(SQLModel):
"""Session event read payload."""
id: UUID
organization_id: UUID
gateway_id: UUID
session_key: str
event_type: str
model: str | None = None
agent_id: str | None = None
channel: str | None = None
context_percent: float | None = None
token_counts: dict | None = None
cost: float | None = None
metadata_: dict | None = None
created_at: datetime
updated_at: datetime
class SubAgentRunRead(SQLModel):
"""Sub-agent run read payload."""
id: UUID
organization_id: UUID
gateway_id: UUID
parent_session_key: str
session_event_id: UUID | None = None
agent: str | None = None
model: str | None = None
status: str
duration_ms: int | None = None
cost: float | None = None
token_counts: dict | None = None
metadata_: dict | None = None
created_at: datetime
updated_at: datetime
class SystemHealthMetricRead(SQLModel):
"""System health metric read payload."""
id: UUID
organization_id: UUID
gateway_id: UUID
cpu_percent: float | None = None
cpu_cores: int | None = None
ram_used_bytes: int | None = None
ram_total_bytes: int | None = None
ram_percent: float | None = None
swap_used_bytes: int | None = None
swap_total_bytes: int | None = None
swap_percent: float | None = None
disk_path: str
disk_used_bytes: int | None = None
disk_total_bytes: int | None = None
disk_percent: float | None = None
gateway_live: bool | None = None
gateway_ready: bool | None = None
gateway_uptime_ms: int | None = None
gateway_pid: int | None = None
gateway_version: str | None = None
metadata_: dict | None = None
collected_at: datetime
created_at: datetime
updated_at: datetime