334 lines
11 KiB
Python
334 lines
11 KiB
Python
|
|
"""Monitoring API endpoints for Mission Control."""
|
||
|
|
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
from datetime import datetime
|
||
|
|
from uuid import UUID
|
||
|
|
|
||
|
|
from fastapi import APIRouter, Depends, HTTPException, Query
|
||
|
|
from sqlmodel import col, select
|
||
|
|
from sqlmodel.ext.asyncio.session import AsyncSession
|
||
|
|
|
||
|
|
from app.api.deps import OrganizationContext, require_org_member
|
||
|
|
from app.core.logging import get_logger
|
||
|
|
from app.db import pagination
|
||
|
|
from app.db.session import get_session
|
||
|
|
from app.models.monitoring import (
|
||
|
|
CostSnapshot,
|
||
|
|
CronJobStatus,
|
||
|
|
SessionEvent,
|
||
|
|
SubAgentRun,
|
||
|
|
SystemHealthMetric,
|
||
|
|
)
|
||
|
|
from app.schemas.monitoring import (
|
||
|
|
CostSnapshotRead,
|
||
|
|
CronJobStatusRead,
|
||
|
|
SessionEventRead,
|
||
|
|
SubAgentRunRead,
|
||
|
|
SystemHealthMetricRead,
|
||
|
|
)
|
||
|
|
from app.schemas.pagination import DefaultLimitOffsetPage
|
||
|
|
|
||
|
|
router = APIRouter(prefix="/monitoring", tags=["monitoring"])
|
||
|
|
SESSION_DEP = Depends(get_session)
|
||
|
|
ORG_MEMBER_DEP = Depends(require_org_member)
|
||
|
|
|
||
|
|
_logger = get_logger(__name__)
|
||
|
|
|
||
|
|
# Query parameter constants
|
||
|
|
GATEWAY_ID_QUERY = Query(default=None, description="Filter by gateway ID")
|
||
|
|
JOB_NAME_QUERY = Query(default=None, description="Filter by job name")
|
||
|
|
ENABLED_QUERY = Query(default=None, description="Filter by enabled status")
|
||
|
|
SESSION_KEY_QUERY = Query(default=None, description="Filter by session key")
|
||
|
|
MODEL_QUERY = Query(default=None, description="Filter by model name")
|
||
|
|
EVENT_TYPE_QUERY = Query(default=None, description="Filter by event type")
|
||
|
|
STATUS_QUERY = Query(default=None, description="Filter by status")
|
||
|
|
AGENT_QUERY = Query(default=None, description="Filter by agent name")
|
||
|
|
START_DATE_QUERY = Query(default=None, description="Filter from this date (inclusive)")
|
||
|
|
END_DATE_QUERY = Query(default=None, description="Filter until this date (inclusive)")
|
||
|
|
|
||
|
|
|
||
|
|
@router.get(
|
||
|
|
"/cost-snapshots",
|
||
|
|
response_model=DefaultLimitOffsetPage[CostSnapshotRead],
|
||
|
|
summary="List Cost Snapshots",
|
||
|
|
description="List cost snapshots for gateways in the caller's organization.",
|
||
|
|
)
|
||
|
|
async def list_cost_snapshots(
|
||
|
|
gateway_id: UUID | None = GATEWAY_ID_QUERY,
|
||
|
|
start: datetime | None = START_DATE_QUERY,
|
||
|
|
end: datetime | None = END_DATE_QUERY,
|
||
|
|
session: AsyncSession = SESSION_DEP,
|
||
|
|
ctx: OrganizationContext = ORG_MEMBER_DEP,
|
||
|
|
) -> DefaultLimitOffsetPage[CostSnapshotRead]:
|
||
|
|
"""List cost snapshots with optional filters.
|
||
|
|
|
||
|
|
- **gateway_id**: Filter by specific gateway
|
||
|
|
- **start**: Filter snapshots from this date (inclusive)
|
||
|
|
- **end**: Filter snapshots until this date (inclusive)
|
||
|
|
"""
|
||
|
|
statement = select(CostSnapshot).where(
|
||
|
|
CostSnapshot.organization_id == ctx.organization.id
|
||
|
|
)
|
||
|
|
|
||
|
|
if gateway_id is not None:
|
||
|
|
statement = statement.where(CostSnapshot.gateway_id == gateway_id)
|
||
|
|
|
||
|
|
if start is not None:
|
||
|
|
statement = statement.where(CostSnapshot.collected_at >= start)
|
||
|
|
|
||
|
|
if end is not None:
|
||
|
|
statement = statement.where(CostSnapshot.collected_at <= end)
|
||
|
|
|
||
|
|
statement = statement.order_by(col(CostSnapshot.created_at).desc())
|
||
|
|
return await pagination.paginate(session, statement)
|
||
|
|
|
||
|
|
|
||
|
|
@router.get(
|
||
|
|
"/cost-snapshots/{id}",
|
||
|
|
response_model=CostSnapshotRead,
|
||
|
|
summary="Get Cost Snapshot",
|
||
|
|
description="Get a single cost snapshot by ID.",
|
||
|
|
)
|
||
|
|
async def get_cost_snapshot(
|
||
|
|
id: UUID,
|
||
|
|
session: AsyncSession = SESSION_DEP,
|
||
|
|
ctx: OrganizationContext = ORG_MEMBER_DEP,
|
||
|
|
) -> CostSnapshot:
|
||
|
|
"""Get a cost snapshot by ID."""
|
||
|
|
snapshot = await CostSnapshot.objects.by_id(id).first(session)
|
||
|
|
if snapshot is None:
|
||
|
|
raise HTTPException(status_code=404, detail="Cost snapshot not found")
|
||
|
|
if snapshot.organization_id != ctx.organization.id:
|
||
|
|
raise HTTPException(status_code=403, detail="Access denied")
|
||
|
|
return snapshot
|
||
|
|
|
||
|
|
|
||
|
|
@router.get(
|
||
|
|
"/cron-jobs",
|
||
|
|
response_model=DefaultLimitOffsetPage[CronJobStatusRead],
|
||
|
|
summary="List Cron Job Statuses",
|
||
|
|
description="List cron job statuses for gateways in the caller's organization.",
|
||
|
|
)
|
||
|
|
async def list_cron_jobs(
|
||
|
|
gateway_id: UUID | None = GATEWAY_ID_QUERY,
|
||
|
|
enabled: bool | None = ENABLED_QUERY,
|
||
|
|
job_name: str | None = JOB_NAME_QUERY,
|
||
|
|
session: AsyncSession = SESSION_DEP,
|
||
|
|
ctx: OrganizationContext = ORG_MEMBER_DEP,
|
||
|
|
) -> DefaultLimitOffsetPage[CronJobStatusRead]:
|
||
|
|
"""List cron job statuses with optional filters.
|
||
|
|
|
||
|
|
- **gateway_id**: Filter by specific gateway
|
||
|
|
- **enabled**: Filter by enabled status
|
||
|
|
- **job_name**: Filter by job name (exact match)
|
||
|
|
"""
|
||
|
|
statement = select(CronJobStatus).where(
|
||
|
|
CronJobStatus.organization_id == ctx.organization.id
|
||
|
|
)
|
||
|
|
|
||
|
|
if gateway_id is not None:
|
||
|
|
statement = statement.where(CronJobStatus.gateway_id == gateway_id)
|
||
|
|
|
||
|
|
if enabled is not None:
|
||
|
|
statement = statement.where(CronJobStatus.enabled == enabled)
|
||
|
|
|
||
|
|
if job_name is not None:
|
||
|
|
statement = statement.where(CronJobStatus.job_name == job_name)
|
||
|
|
|
||
|
|
statement = statement.order_by(col(CronJobStatus.created_at).desc())
|
||
|
|
return await pagination.paginate(session, statement)
|
||
|
|
|
||
|
|
|
||
|
|
@router.get(
|
||
|
|
"/cron-jobs/{id}",
|
||
|
|
response_model=CronJobStatusRead,
|
||
|
|
summary="Get Cron Job Status",
|
||
|
|
description="Get a single cron job status by ID.",
|
||
|
|
)
|
||
|
|
async def get_cron_job(
|
||
|
|
id: UUID,
|
||
|
|
session: AsyncSession = SESSION_DEP,
|
||
|
|
ctx: OrganizationContext = ORG_MEMBER_DEP,
|
||
|
|
) -> CronJobStatus:
|
||
|
|
"""Get a cron job status by ID."""
|
||
|
|
job = await CronJobStatus.objects.by_id(id).first(session)
|
||
|
|
if job is None:
|
||
|
|
raise HTTPException(status_code=404, detail="Cron job status not found")
|
||
|
|
if job.organization_id != ctx.organization.id:
|
||
|
|
raise HTTPException(status_code=403, detail="Access denied")
|
||
|
|
return job
|
||
|
|
|
||
|
|
|
||
|
|
@router.get(
|
||
|
|
"/sessions",
|
||
|
|
response_model=DefaultLimitOffsetPage[SessionEventRead],
|
||
|
|
summary="List Session Events",
|
||
|
|
description="List session events for gateways in the caller's organization.",
|
||
|
|
)
|
||
|
|
async def list_sessions(
|
||
|
|
gateway_id: UUID | None = GATEWAY_ID_QUERY,
|
||
|
|
session_key: str | None = SESSION_KEY_QUERY,
|
||
|
|
model: str | None = MODEL_QUERY,
|
||
|
|
event_type: str | None = EVENT_TYPE_QUERY,
|
||
|
|
session: AsyncSession = SESSION_DEP,
|
||
|
|
ctx: OrganizationContext = ORG_MEMBER_DEP,
|
||
|
|
) -> DefaultLimitOffsetPage[SessionEventRead]:
|
||
|
|
"""List session events with optional filters.
|
||
|
|
|
||
|
|
- **gateway_id**: Filter by specific gateway
|
||
|
|
- **session_key**: Filter by session key
|
||
|
|
- **model**: Filter by model name
|
||
|
|
- **event_type**: Filter by event type
|
||
|
|
"""
|
||
|
|
statement = select(SessionEvent).where(
|
||
|
|
SessionEvent.organization_id == ctx.organization.id
|
||
|
|
)
|
||
|
|
|
||
|
|
if gateway_id is not None:
|
||
|
|
statement = statement.where(SessionEvent.gateway_id == gateway_id)
|
||
|
|
|
||
|
|
if session_key is not None:
|
||
|
|
statement = statement.where(SessionEvent.session_key == session_key)
|
||
|
|
|
||
|
|
if model is not None:
|
||
|
|
statement = statement.where(SessionEvent.model == model)
|
||
|
|
|
||
|
|
if event_type is not None:
|
||
|
|
statement = statement.where(SessionEvent.event_type == event_type)
|
||
|
|
|
||
|
|
statement = statement.order_by(col(SessionEvent.created_at).desc())
|
||
|
|
return await pagination.paginate(session, statement)
|
||
|
|
|
||
|
|
|
||
|
|
@router.get(
|
||
|
|
"/sessions/{id}",
|
||
|
|
response_model=SessionEventRead,
|
||
|
|
summary="Get Session Event",
|
||
|
|
description="Get a single session event by ID.",
|
||
|
|
)
|
||
|
|
async def get_session_event(
|
||
|
|
id: UUID,
|
||
|
|
session: AsyncSession = SESSION_DEP,
|
||
|
|
ctx: OrganizationContext = ORG_MEMBER_DEP,
|
||
|
|
) -> SessionEvent:
|
||
|
|
"""Get a session event by ID."""
|
||
|
|
event = await SessionEvent.objects.by_id(id).first(session)
|
||
|
|
if event is None:
|
||
|
|
raise HTTPException(status_code=404, detail="Session event not found")
|
||
|
|
if event.organization_id != ctx.organization.id:
|
||
|
|
raise HTTPException(status_code=403, detail="Access denied")
|
||
|
|
return event
|
||
|
|
|
||
|
|
|
||
|
|
@router.get(
|
||
|
|
"/health",
|
||
|
|
response_model=DefaultLimitOffsetPage[SystemHealthMetricRead],
|
||
|
|
summary="List System Health Metrics",
|
||
|
|
description="List system health metrics for gateways in the caller's organization.",
|
||
|
|
)
|
||
|
|
async def list_health_metrics(
|
||
|
|
gateway_id: UUID | None = GATEWAY_ID_QUERY,
|
||
|
|
start: datetime | None = START_DATE_QUERY,
|
||
|
|
end: datetime | None = END_DATE_QUERY,
|
||
|
|
session: AsyncSession = SESSION_DEP,
|
||
|
|
ctx: OrganizationContext = ORG_MEMBER_DEP,
|
||
|
|
) -> DefaultLimitOffsetPage[SystemHealthMetricRead]:
|
||
|
|
"""List system health metrics with optional filters.
|
||
|
|
|
||
|
|
- **gateway_id**: Filter by specific gateway
|
||
|
|
- **start**: Filter metrics from this date (inclusive)
|
||
|
|
- **end**: Filter metrics until this date (inclusive)
|
||
|
|
"""
|
||
|
|
statement = select(SystemHealthMetric).where(
|
||
|
|
SystemHealthMetric.organization_id == ctx.organization.id
|
||
|
|
)
|
||
|
|
|
||
|
|
if gateway_id is not None:
|
||
|
|
statement = statement.where(SystemHealthMetric.gateway_id == gateway_id)
|
||
|
|
|
||
|
|
if start is not None:
|
||
|
|
statement = statement.where(SystemHealthMetric.collected_at >= start)
|
||
|
|
|
||
|
|
if end is not None:
|
||
|
|
statement = statement.where(SystemHealthMetric.collected_at <= end)
|
||
|
|
|
||
|
|
statement = statement.order_by(col(SystemHealthMetric.created_at).desc())
|
||
|
|
return await pagination.paginate(session, statement)
|
||
|
|
|
||
|
|
|
||
|
|
@router.get(
|
||
|
|
"/health/{id}",
|
||
|
|
response_model=SystemHealthMetricRead,
|
||
|
|
summary="Get System Health Metric",
|
||
|
|
description="Get a single system health metric by ID.",
|
||
|
|
)
|
||
|
|
async def get_health_metric(
|
||
|
|
id: UUID,
|
||
|
|
session: AsyncSession = SESSION_DEP,
|
||
|
|
ctx: OrganizationContext = ORG_MEMBER_DEP,
|
||
|
|
) -> SystemHealthMetric:
|
||
|
|
"""Get a system health metric by ID."""
|
||
|
|
metric = await SystemHealthMetric.objects.by_id(id).first(session)
|
||
|
|
if metric is None:
|
||
|
|
raise HTTPException(status_code=404, detail="Health metric not found")
|
||
|
|
if metric.organization_id != ctx.organization.id:
|
||
|
|
raise HTTPException(status_code=403, detail="Access denied")
|
||
|
|
return metric
|
||
|
|
|
||
|
|
|
||
|
|
@router.get(
|
||
|
|
"/sub-agents",
|
||
|
|
response_model=DefaultLimitOffsetPage[SubAgentRunRead],
|
||
|
|
summary="List Sub-Agent Runs",
|
||
|
|
description="List sub-agent runs for gateways in the caller's organization.",
|
||
|
|
)
|
||
|
|
async def list_sub_agents(
|
||
|
|
gateway_id: UUID | None = GATEWAY_ID_QUERY,
|
||
|
|
status: str | None = STATUS_QUERY,
|
||
|
|
agent: str | None = AGENT_QUERY,
|
||
|
|
session: AsyncSession = SESSION_DEP,
|
||
|
|
ctx: OrganizationContext = ORG_MEMBER_DEP,
|
||
|
|
) -> DefaultLimitOffsetPage[SubAgentRunRead]:
|
||
|
|
"""List sub-agent runs with optional filters.
|
||
|
|
|
||
|
|
- **gateway_id**: Filter by specific gateway
|
||
|
|
- **status**: Filter by run status (pending, running, succeeded, failed)
|
||
|
|
- **agent**: Filter by agent name
|
||
|
|
"""
|
||
|
|
statement = select(SubAgentRun).where(
|
||
|
|
SubAgentRun.organization_id == ctx.organization.id
|
||
|
|
)
|
||
|
|
|
||
|
|
if gateway_id is not None:
|
||
|
|
statement = statement.where(SubAgentRun.gateway_id == gateway_id)
|
||
|
|
|
||
|
|
if status is not None:
|
||
|
|
statement = statement.where(SubAgentRun.status == status)
|
||
|
|
|
||
|
|
if agent is not None:
|
||
|
|
statement = statement.where(SubAgentRun.agent == agent)
|
||
|
|
|
||
|
|
statement = statement.order_by(col(SubAgentRun.created_at).desc())
|
||
|
|
return await pagination.paginate(session, statement)
|
||
|
|
|
||
|
|
|
||
|
|
@router.get(
|
||
|
|
"/sub-agents/{id}",
|
||
|
|
response_model=SubAgentRunRead,
|
||
|
|
summary="Get Sub-Agent Run",
|
||
|
|
description="Get a single sub-agent run by ID.",
|
||
|
|
)
|
||
|
|
async def get_sub_agent_run(
|
||
|
|
id: UUID,
|
||
|
|
session: AsyncSession = SESSION_DEP,
|
||
|
|
ctx: OrganizationContext = ORG_MEMBER_DEP,
|
||
|
|
) -> SubAgentRun:
|
||
|
|
"""Get a sub-agent run by ID."""
|
||
|
|
run = await SubAgentRun.objects.by_id(id).first(session)
|
||
|
|
if run is None:
|
||
|
|
raise HTTPException(status_code=404, detail="Sub-agent run not found")
|
||
|
|
if run.organization_id != ctx.organization.id:
|
||
|
|
raise HTTPException(status_code=403, detail="Access denied")
|
||
|
|
return run
|