From f4b7e992ad7c9350683164f6b73bff9980da71c7 Mon Sep 17 00:00:00 2001 From: Ripley Date: Sun, 10 May 2026 19:40:25 -0500 Subject: [PATCH] =?UTF-8?q?feat:=20Phase=202=20monitoring=20models=20?= =?UTF-8?q?=E2=80=94=207=20new=20tables=20with=20CASCADE=20and=20composite?= =?UTF-8?q?=20indexes?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add monitoring.py: CostSnapshot, CronJobStatus, SessionEvent, SubAgentRun, SystemHealthMetric - Add alert_rules.py: AlertRule, AlertEvent - Register all 7 models in __init__.py - Add Alembic migration 7a8b9c0d1e2f for 7 new monitoring tables - Add Alembic migration 8f9a0b1c2d3e for CASCADE FK rules, composite indexes, acknowledged_by FK - Update env.py for transaction-per-migration to avoid failure chaining - Security: ondelete CASCADE on all org/gateway FKs, SET NULL on acknowledged_by - Performance: composite indexes on (org_id, created_at) and (org_id, gateway_id) for all monitoring tables --- src/backend/app/models/__init__.py | 9 + src/backend/app/models/alert_rules.py | 63 +++ src/backend/app/models/monitoring.py | 152 ++++++ src/backend/migrations/env.py | 5 +- .../7a8b9c0d1e2f_add_monitoring_models.py | 449 ++++++++++++++++++ ...c2d3e_add_cascade_and_composite_indexes.py | 160 +++++++ 6 files changed, 836 insertions(+), 2 deletions(-) create mode 100644 src/backend/app/models/alert_rules.py create mode 100644 src/backend/app/models/monitoring.py create mode 100644 src/backend/migrations/versions/7a8b9c0d1e2f_add_monitoring_models.py create mode 100644 src/backend/migrations/versions/8f9a0b1c2d3e_add_cascade_and_composite_indexes.py diff --git a/src/backend/app/models/__init__.py b/src/backend/app/models/__init__.py index 2d1df55..646ffae 100644 --- a/src/backend/app/models/__init__.py +++ b/src/backend/app/models/__init__.py @@ -2,6 +2,7 @@ from app.models.activity_events import ActivityEvent from app.models.agents import Agent +from app.models.alert_rules import AlertEvent, AlertRule from app.models.approval_task_links import ApprovalTaskLink from app.models.approvals import Approval from app.models.board_group_memory import BoardGroupMemory @@ -12,6 +13,7 @@ from app.models.board_webhook_payloads import BoardWebhookPayload from app.models.board_webhooks import BoardWebhook from app.models.boards import Board from app.models.gateways import Gateway +from app.models.monitoring import CostSnapshot, CronJobStatus, SessionEvent, SubAgentRun, SystemHealthMetric from app.models.organization_board_access import OrganizationBoardAccess from app.models.organization_invite_board_access import OrganizationInviteBoardAccess from app.models.organization_invites import OrganizationInvite @@ -33,6 +35,8 @@ from app.models.users import User __all__ = [ "ActivityEvent", "Agent", + "AlertEvent", + "AlertRule", "ApprovalTaskLink", "Approval", "BoardGroupMemory", @@ -42,6 +46,8 @@ __all__ = [ "BoardOnboardingSession", "BoardGroup", "Board", + "CostSnapshot", + "CronJobStatus", "Gateway", "GatewayInstalledSkill", "MarketplaceSkill", @@ -54,6 +60,9 @@ __all__ = [ "OrganizationBoardAccess", "OrganizationInvite", "OrganizationInviteBoardAccess", + "SessionEvent", + "SubAgentRun", + "SystemHealthMetric", "TaskDependency", "Task", "TaskFingerprint", diff --git a/src/backend/app/models/alert_rules.py b/src/backend/app/models/alert_rules.py new file mode 100644 index 0000000..d10175e --- /dev/null +++ b/src/backend/app/models/alert_rules.py @@ -0,0 +1,63 @@ +"""Alert rules and events for Mission Control monitoring.""" + +from __future__ import annotations + +from datetime import datetime +from uuid import UUID, uuid4 + +from sqlalchemy import JSON, Column, ForeignKey +from sqlmodel import Field + +from app.core.time import utcnow +from app.models.base import QueryModel + + +class AlertRule(QueryModel, table=True): + """Configurable alert rules for monitoring thresholds.""" + + __tablename__ = "alert_rules" # pyright: ignore[reportAssignmentType] + + id: UUID = Field(default_factory=uuid4, primary_key=True) + organization_id: UUID = Field(default=None, sa_column=Column(UUID, ForeignKey("organizations.id", ondelete="CASCADE"), index=True)) + name: str = Field(nullable=False) + metric_type: str = Field(nullable=False, index=True) + threshold: float = Field(nullable=False) + comparison: str = Field(default="gt", nullable=False) + enabled: bool = Field(default=True) + cooldown_minutes: int = Field(default=60) + description: str | None = Field(default=None) + metadata_: dict | None = Field(default=None, sa_column=Column(JSON)) + created_at: datetime = Field(default_factory=utcnow) + updated_at: datetime = Field(default_factory=utcnow) + + __table_args__ = ( + Index("ix_alert_rules_org_created", "organization_id", "created_at"), + Index("ix_alert_rules_org_gateway", "organization_id", "gateway_id"), + ) + + +class AlertEvent(QueryModel, table=True): + """Triggered alert instances — one row per alert fired.""" + + __tablename__ = "alert_events" # pyright: ignore[reportAssignmentType] + + id: UUID = Field(default_factory=uuid4, primary_key=True) + organization_id: UUID = Field(default=None, sa_column=Column(UUID, ForeignKey("organizations.id", ondelete="CASCADE"), index=True)) + rule_id: UUID = Field(default=None, sa_column=Column(UUID, ForeignKey("alert_rules.id", ondelete="CASCADE"), index=True)) + gateway_id: UUID | None = Field(default=None, sa_column=Column(UUID, ForeignKey("gateways.id", ondelete="CASCADE"), index=True)) + metric_type: str = Field(nullable=False, index=True) + triggered_value: float = Field(nullable=False) + threshold_value: float = Field(nullable=False) + acknowledged: bool = Field(default=False) + acknowledged_at: datetime | None = Field(default=None) + acknowledged_by: UUID | None = Field(default=None, sa_column=Column(UUID, ForeignKey("users.id", ondelete="SET NULL"), nullable=True)) + resolved_at: datetime | None = Field(default=None) + metadata_: dict | None = Field(default=None, sa_column=Column(JSON)) + created_at: datetime = Field(default_factory=utcnow) + updated_at: datetime = Field(default_factory=utcnow) + + __table_args__ = ( + Index("ix_alert_events_org_created", "organization_id", "created_at"), + Index("ix_alert_events_org_gateway", "organization_id", "gateway_id"), + Index("ix_alert_events_rule_created", "rule_id", "created_at"), + ) diff --git a/src/backend/app/models/monitoring.py b/src/backend/app/models/monitoring.py new file mode 100644 index 0000000..ab26e95 --- /dev/null +++ b/src/backend/app/models/monitoring.py @@ -0,0 +1,152 @@ +"""Monitoring and tracking models for Mission Control.""" + +from __future__ import annotations + +from datetime import datetime +from uuid import UUID, uuid4 + +from sqlalchemy import JSON, Column, ForeignKey +from sqlmodel import Field + +from app.core.time import utcnow +from app.models.base import QueryModel + +RUNTIME_ANNOTATION_TYPES = (datetime,) + + +class CostSnapshot(QueryModel, table=True): + """Timestamped cost readings from the gateway RPC.""" + + __tablename__ = "cost_snapshots" # pyright: ignore[reportAssignmentType] + + id: UUID = Field(default_factory=uuid4, primary_key=True) + organization_id: UUID = Field(default=None, sa_column=Column(UUID, ForeignKey("organizations.id", ondelete="CASCADE"), index=True)) + gateway_id: UUID = Field(default=None, sa_column=Column(UUID, ForeignKey("gateways.id", ondelete="CASCADE"), index=True)) + period_start: datetime = Field(nullable=False) + period_end: datetime = Field(nullable=False) + total_cost: float = Field(nullable=False, default=0.0) + model_costs: dict | None = Field(default=None, sa_column=Column(JSON)) + provider_costs: dict | None = Field(default=None, sa_column=Column(JSON)) + token_counts: dict | None = Field(default=None, sa_column=Column(JSON)) + collected_at: datetime = Field(default_factory=utcnow) + created_at: datetime = Field(default_factory=utcnow) + updated_at: datetime = Field(default_factory=utcnow) + + __table_args__ = ( + Index("ix_cost_snapshots_org_created", "organization_id", "created_at"), + Index("ix_cost_snapshots_org_gateway", "organization_id", "gateway_id"), + ) + + +class CronJobStatus(QueryModel, table=True): + """Cron job state tracked from gateway RPC cron.list/cron.status.""" + + __tablename__ = "cron_job_statuses" # pyright: ignore[reportAssignmentType] + + id: UUID = Field(default_factory=uuid4, primary_key=True) + organization_id: UUID = Field(default=None, sa_column=Column(UUID, ForeignKey("organizations.id", ondelete="CASCADE"), index=True)) + gateway_id: UUID = Field(default=None, sa_column=Column(UUID, ForeignKey("gateways.id", ondelete="CASCADE"), index=True)) + job_name: str = Field(nullable=False, index=True) + schedule: str = Field(nullable=False) + enabled: bool = Field(default=True) + last_run_at: datetime | None = Field(default=None) + next_run_at: datetime | None = Field(default=None) + status: str = Field(nullable=False, default="idle") + failure_count: int = Field(default=0) + last_error: str | None = Field(default=None) + metadata_: dict | None = Field(default=None, sa_column=Column(JSON)) + created_at: datetime = Field(default_factory=utcnow) + updated_at: datetime = Field(default_factory=utcnow) + + __table_args__ = ( + Index("ix_cron_job_statuses_org_created", "organization_id", "created_at"), + Index("ix_cron_job_statuses_org_gateway", "organization_id", "gateway_id"), + ) + + +class SessionEvent(QueryModel, table=True): + """Session lifecycle events from gateway RPC sessions.list/preview.""" + + __tablename__ = "session_events" # pyright: ignore[reportAssignmentType] + + id: UUID = Field(default_factory=uuid4, primary_key=True) + organization_id: UUID = Field(default=None, sa_column=Column(UUID, ForeignKey("organizations.id", ondelete="CASCADE"), index=True)) + gateway_id: UUID = Field(default=None, sa_column=Column(UUID, ForeignKey("gateways.id", ondelete="CASCADE"), index=True)) + session_key: str = Field(nullable=False, index=True) + event_type: str = Field(nullable=False) + model: str | None = Field(default=None) + agent_id: str | None = Field(default=None) + channel: str | None = Field(default=None) + context_percent: float | None = Field(default=None) + token_counts: dict | None = Field(default=None, sa_column=Column(JSON)) + cost: float | None = Field(default=None) + metadata_: dict | None = Field(default=None, sa_column=Column(JSON)) + created_at: datetime = Field(default_factory=utcnow) + updated_at: datetime = Field(default_factory=utcnow) + + __table_args__ = ( + Index("ix_session_events_org_created", "organization_id", "created_at"), + Index("ix_session_events_org_gateway", "organization_id", "gateway_id"), + ) + + +class SubAgentRun(QueryModel, table=True): + """Sub-agent execution records.""" + + __tablename__ = "sub_agent_runs" # pyright: ignore[reportAssignmentType] + + id: UUID = Field(default_factory=uuid4, primary_key=True) + organization_id: UUID = Field(default=None, sa_column=Column(UUID, ForeignKey("organizations.id", ondelete="CASCADE"), index=True)) + gateway_id: UUID = Field(default=None, sa_column=Column(UUID, ForeignKey("gateways.id", ondelete="CASCADE"), index=True)) + parent_session_key: str = Field(nullable=False, index=True) + session_event_id: UUID | None = Field(default=None, sa_column=Column(UUID, ForeignKey("session_events.id", ondelete="CASCADE"), index=True)) + agent: str | None = Field(default=None) + model: str | None = Field(default=None) + status: str = Field(nullable=False, default="pending") + duration_ms: int | None = Field(default=None) + cost: float | None = Field(default=None) + token_counts: dict | None = Field(default=None, sa_column=Column(JSON)) + metadata_: dict | None = Field(default=None, sa_column=Column(JSON)) + created_at: datetime = Field(default_factory=utcnow) + updated_at: datetime = Field(default_factory=utcnow) + + __table_args__ = ( + Index("ix_sub_agent_runs_org_created", "organization_id", "created_at"), + Index("ix_sub_agent_runs_org_gateway", "organization_id", "gateway_id"), + ) + + +class SystemHealthMetric(QueryModel, table=True): + """System metrics from gateway health/status endpoints and host collection.""" + + __tablename__ = "system_health_metrics" # pyright: ignore[reportAssignmentType] + + id: UUID = Field(default_factory=uuid4, primary_key=True) + organization_id: UUID = Field(default=None, sa_column=Column(UUID, ForeignKey("organizations.id", ondelete="CASCADE"), index=True)) + gateway_id: UUID = Field(default=None, sa_column=Column(UUID, ForeignKey("gateways.id", ondelete="CASCADE"), index=True)) + cpu_percent: float | None = Field(default=None) + cpu_cores: int | None = Field(default=None) + ram_used_bytes: int | None = Field(default=None) + ram_total_bytes: int | None = Field(default=None) + ram_percent: float | None = Field(default=None) + swap_used_bytes: int | None = Field(default=None) + swap_total_bytes: int | None = Field(default=None) + swap_percent: float | None = Field(default=None) + disk_path: str = Field(default="/") + disk_used_bytes: int | None = Field(default=None) + disk_total_bytes: int | None = Field(default=None) + disk_percent: float | None = Field(default=None) + gateway_live: bool | None = Field(default=None) + gateway_ready: bool | None = Field(default=None) + gateway_uptime_ms: int | None = Field(default=None) + gateway_pid: int | None = Field(default=None) + gateway_version: str | None = Field(default=None) + metadata_: dict | None = Field(default=None, sa_column=Column(JSON)) + collected_at: datetime = Field(default_factory=utcnow) + created_at: datetime = Field(default_factory=utcnow) + updated_at: datetime = Field(default_factory=utcnow) + + __table_args__ = ( + Index("ix_system_health_metrics_org_created", "organization_id", "created_at"), + Index("ix_system_health_metrics_org_gateway", "organization_id", "gateway_id"), + ) diff --git a/src/backend/migrations/env.py b/src/backend/migrations/env.py index c99306e..a75f808 100644 --- a/src/backend/migrations/env.py +++ b/src/backend/migrations/env.py @@ -69,14 +69,15 @@ def run_migrations_online() -> None: ) with connectable.connect() as connection: + # Use autocommit to handle individual statement failures context.configure( connection=connection, target_metadata=target_metadata, compare_type=True, + transaction_per_migration=True, # Use separate transaction per migration ) - with context.begin_transaction(): - context.run_migrations() + context.run_migrations() if context.is_offline_mode(): diff --git a/src/backend/migrations/versions/7a8b9c0d1e2f_add_monitoring_models.py b/src/backend/migrations/versions/7a8b9c0d1e2f_add_monitoring_models.py new file mode 100644 index 0000000..c2d6f4b --- /dev/null +++ b/src/backend/migrations/versions/7a8b9c0d1e2f_add_monitoring_models.py @@ -0,0 +1,449 @@ +"""Add monitoring models (CostSnapshot, CronJobStatus, SessionEvent, SubAgentRun, SystemHealthMetric, AlertRule, AlertEvent). + +Revision ID: 7a8b9c0d1e2f +Revises: f4d2b649e93a +Create Date: 2026-05-10 00:00:00.000000 + +""" + +from __future__ import annotations + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "7a8b9c0d1e2f" +down_revision = "a9b1c2d3e4f7" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + """Create monitoring tables.""" + _upgrade_monitoring_tables() + + +def _upgrade_monitoring_tables() -> None: + # cost_snapshots + op.create_table( + "cost_snapshots", + sa.Column("id", sa.Uuid(), nullable=False), + sa.Column("organization_id", sa.Uuid(), nullable=False), + sa.Column("gateway_id", sa.Uuid(), nullable=False), + sa.Column("period_start", sa.DateTime(), nullable=False), + sa.Column("period_end", sa.DateTime(), nullable=False), + sa.Column("total_cost", sa.Float(), nullable=False), + sa.Column("model_costs", sa.JSON(), nullable=True), + sa.Column("provider_costs", sa.JSON(), nullable=True), + sa.Column("token_counts", sa.JSON(), nullable=True), + sa.Column("collected_at", sa.DateTime(), nullable=False), + sa.Column("created_at", sa.DateTime(), nullable=False), + sa.Column("updated_at", sa.DateTime(), nullable=False), + sa.ForeignKeyConstraint(["organization_id"], ["organizations.id"], ondelete="CASCADE"), + sa.ForeignKeyConstraint(["gateway_id"], ["gateways.id"], ondelete="CASCADE"), + sa.PrimaryKeyConstraint("id"), + ) + op.create_index( + op.f("ix_cost_snapshots_organization_id"), + "cost_snapshots", + ["organization_id"], + unique=False, + ) + op.create_index( + op.f("ix_cost_snapshots_gateway_id"), + "cost_snapshots", + ["gateway_id"], + unique=False, + ) + op.create_index( + op.f("ix_cost_snapshots_org_created"), + "cost_snapshots", + ["organization_id", "created_at"], + unique=False, + ) + op.create_index( + op.f("ix_cost_snapshots_org_gateway"), + "cost_snapshots", + ["organization_id", "gateway_id"], + unique=False, + ) + + # cron_job_statuses + op.create_table( + "cron_job_statuses", + sa.Column("id", sa.Uuid(), nullable=False), + sa.Column("organization_id", sa.Uuid(), nullable=False), + sa.Column("gateway_id", sa.Uuid(), nullable=False), + sa.Column("job_name", sa.String(), nullable=False), + sa.Column("schedule", sa.String(), nullable=False), + sa.Column("enabled", sa.Boolean(), nullable=True), + sa.Column("last_run_at", sa.DateTime(), nullable=True), + sa.Column("next_run_at", sa.DateTime(), nullable=True), + sa.Column("status", sa.String(), nullable=False), + sa.Column("failure_count", sa.Integer(), nullable=False), + sa.Column("last_error", sa.String(), nullable=True), + sa.Column("metadata_", sa.JSON(), nullable=True), + sa.Column("created_at", sa.DateTime(), nullable=False), + sa.Column("updated_at", sa.DateTime(), nullable=False), + sa.ForeignKeyConstraint(["organization_id"], ["organizations.id"], ondelete="CASCADE"), + sa.ForeignKeyConstraint(["gateway_id"], ["gateways.id"], ondelete="CASCADE"), + sa.PrimaryKeyConstraint("id"), + ) + op.create_index( + op.f("ix_cron_job_statuses_organization_id"), + "cron_job_statuses", + ["organization_id"], + unique=False, + ) + op.create_index( + op.f("ix_cron_job_statuses_gateway_id"), + "cron_job_statuses", + ["gateway_id"], + unique=False, + ) + op.create_index( + op.f("ix_cron_job_statuses_job_name"), + "cron_job_statuses", + ["job_name"], + unique=False, + ) + op.create_index( + op.f("ix_cron_job_statuses_org_created"), + "cron_job_statuses", + ["organization_id", "created_at"], + unique=False, + ) + op.create_index( + op.f("ix_cron_job_statuses_org_gateway"), + "cron_job_statuses", + ["organization_id", "gateway_id"], + unique=False, + ) + + # session_events + op.create_table( + "session_events", + sa.Column("id", sa.Uuid(), nullable=False), + sa.Column("organization_id", sa.Uuid(), nullable=False), + sa.Column("gateway_id", sa.Uuid(), nullable=False), + sa.Column("session_key", sa.String(), nullable=False), + sa.Column("event_type", sa.String(), nullable=False), + sa.Column("model", sa.String(), nullable=True), + sa.Column("agent_id", sa.String(), nullable=True), + sa.Column("channel", sa.String(), nullable=True), + sa.Column("context_percent", sa.Float(), nullable=True), + sa.Column("token_counts", sa.JSON(), nullable=True), + sa.Column("cost", sa.Float(), nullable=True), + sa.Column("metadata_", sa.JSON(), nullable=True), + sa.Column("created_at", sa.DateTime(), nullable=False), + sa.Column("updated_at", sa.DateTime(), nullable=False), + sa.ForeignKeyConstraint(["organization_id"], ["organizations.id"], ondelete="CASCADE"), + sa.ForeignKeyConstraint(["gateway_id"], ["gateways.id"], ondelete="CASCADE"), + sa.PrimaryKeyConstraint("id"), + ) + op.create_index( + op.f("ix_session_events_organization_id"), + "session_events", + ["organization_id"], + unique=False, + ) + op.create_index( + op.f("ix_session_events_gateway_id"), + "session_events", + ["gateway_id"], + unique=False, + ) + op.create_index( + op.f("ix_session_events_session_key"), + "session_events", + ["session_key"], + unique=False, + ) + op.create_index( + op.f("ix_session_events_org_created"), + "session_events", + ["organization_id", "created_at"], + unique=False, + ) + op.create_index( + op.f("ix_session_events_org_gateway"), + "session_events", + ["organization_id", "gateway_id"], + unique=False, + ) + + # sub_agent_runs + op.create_table( + "sub_agent_runs", + sa.Column("id", sa.Uuid(), nullable=False), + sa.Column("organization_id", sa.Uuid(), nullable=False), + sa.Column("gateway_id", sa.Uuid(), nullable=False), + sa.Column("parent_session_key", sa.String(), nullable=False), + sa.Column("session_event_id", sa.Uuid(), nullable=True), + sa.Column("agent", sa.String(), nullable=True), + sa.Column("model", sa.String(), nullable=True), + sa.Column("status", sa.String(), nullable=False), + sa.Column("duration_ms", sa.Integer(), nullable=True), + sa.Column("cost", sa.Float(), nullable=True), + sa.Column("token_counts", sa.JSON(), nullable=True), + sa.Column("metadata_", sa.JSON(), nullable=True), + sa.Column("created_at", sa.DateTime(), nullable=False), + sa.Column("updated_at", sa.DateTime(), nullable=False), + sa.ForeignKeyConstraint(["organization_id"], ["organizations.id"], ondelete="CASCADE"), + sa.ForeignKeyConstraint(["gateway_id"], ["gateways.id"], ondelete="CASCADE"), + sa.ForeignKeyConstraint(["session_event_id"], ["session_events.id"], ondelete="CASCADE"), + sa.PrimaryKeyConstraint("id"), + ) + op.create_index( + op.f("ix_sub_agent_runs_organization_id"), + "sub_agent_runs", + ["organization_id"], + unique=False, + ) + op.create_index( + op.f("ix_sub_agent_runs_gateway_id"), + "sub_agent_runs", + ["gateway_id"], + unique=False, + ) + op.create_index( + op.f("ix_sub_agent_runs_parent_session_key"), + "sub_agent_runs", + ["parent_session_key"], + unique=False, + ) + op.create_index( + op.f("ix_sub_agent_runs_session_event_id"), + "sub_agent_runs", + ["session_event_id"], + unique=False, + ) + op.create_index( + op.f("ix_sub_agent_runs_org_created"), + "sub_agent_runs", + ["organization_id", "created_at"], + unique=False, + ) + op.create_index( + op.f("ix_sub_agent_runs_org_gateway"), + "sub_agent_runs", + ["organization_id", "gateway_id"], + unique=False, + ) + + # system_health_metrics + op.create_table( + "system_health_metrics", + sa.Column("id", sa.Uuid(), nullable=False), + sa.Column("organization_id", sa.Uuid(), nullable=False), + sa.Column("gateway_id", sa.Uuid(), nullable=False), + sa.Column("cpu_percent", sa.Float(), nullable=True), + sa.Column("cpu_cores", sa.Integer(), nullable=True), + sa.Column("ram_used_bytes", sa.Integer(), nullable=True), + sa.Column("ram_total_bytes", sa.Integer(), nullable=True), + sa.Column("ram_percent", sa.Float(), nullable=True), + sa.Column("swap_used_bytes", sa.Integer(), nullable=True), + sa.Column("swap_total_bytes", sa.Integer(), nullable=True), + sa.Column("swap_percent", sa.Float(), nullable=True), + sa.Column("disk_path", sa.String(), nullable=False), + sa.Column("disk_used_bytes", sa.Integer(), nullable=True), + sa.Column("disk_total_bytes", sa.Integer(), nullable=True), + sa.Column("disk_percent", sa.Float(), nullable=True), + sa.Column("gateway_live", sa.Boolean(), nullable=True), + sa.Column("gateway_ready", sa.Boolean(), nullable=True), + sa.Column("gateway_uptime_ms", sa.Integer(), nullable=True), + sa.Column("gateway_pid", sa.Integer(), nullable=True), + sa.Column("gateway_version", sa.String(), nullable=True), + sa.Column("metadata_", sa.JSON(), nullable=True), + sa.Column("collected_at", sa.DateTime(), nullable=False), + sa.Column("created_at", sa.DateTime(), nullable=False), + sa.Column("updated_at", sa.DateTime(), nullable=False), + sa.ForeignKeyConstraint(["organization_id"], ["organizations.id"], ondelete="CASCADE"), + sa.ForeignKeyConstraint(["gateway_id"], ["gateways.id"], ondelete="CASCADE"), + sa.PrimaryKeyConstraint("id"), + ) + op.create_index( + op.f("ix_system_health_metrics_organization_id"), + "system_health_metrics", + ["organization_id"], + unique=False, + ) + op.create_index( + op.f("ix_system_health_metrics_gateway_id"), + "system_health_metrics", + ["gateway_id"], + unique=False, + ) + op.create_index( + op.f("ix_system_health_metrics_org_created"), + "system_health_metrics", + ["organization_id", "created_at"], + unique=False, + ) + op.create_index( + op.f("ix_system_health_metrics_org_gateway"), + "system_health_metrics", + ["organization_id", "gateway_id"], + unique=False, + ) + + # alert_rules + op.create_table( + "alert_rules", + sa.Column("id", sa.Uuid(), nullable=False), + sa.Column("organization_id", sa.Uuid(), nullable=False), + sa.Column("name", sa.String(), nullable=False), + sa.Column("metric_type", sa.String(), nullable=False), + sa.Column("threshold", sa.Float(), nullable=False), + sa.Column("comparison", sa.String(), nullable=False), + sa.Column("enabled", sa.Boolean(), nullable=True), + sa.Column("cooldown_minutes", sa.Integer(), nullable=False), + sa.Column("description", sa.String(), nullable=True), + sa.Column("metadata_", sa.JSON(), nullable=True), + sa.Column("created_at", sa.DateTime(), nullable=False), + sa.Column("updated_at", sa.DateTime(), nullable=False), + sa.ForeignKeyConstraint(["organization_id"], ["organizations.id"], ondelete="CASCADE"), + sa.PrimaryKeyConstraint("id"), + ) + op.create_index( + op.f("ix_alert_rules_organization_id"), + "alert_rules", + ["organization_id"], + unique=False, + ) + op.create_index( + op.f("ix_alert_rules_metric_type"), + "alert_rules", + ["metric_type"], + unique=False, + ) + op.create_index( + op.f("ix_alert_rules_org_created"), + "alert_rules", + ["organization_id", "created_at"], + unique=False, + ) + op.create_index( + op.f("ix_alert_rules_org_gateway"), + "alert_rules", + ["organization_id", "gateway_id"], + unique=False, + ) + + # alert_events + op.create_table( + "alert_events", + sa.Column("id", sa.Uuid(), nullable=False), + sa.Column("organization_id", sa.Uuid(), nullable=False), + sa.Column("rule_id", sa.Uuid(), nullable=False), + sa.Column("gateway_id", sa.Uuid(), nullable=True), + sa.Column("metric_type", sa.String(), nullable=False), + sa.Column("triggered_value", sa.Float(), nullable=False), + sa.Column("threshold_value", sa.Float(), nullable=False), + sa.Column("acknowledged", sa.Boolean(), nullable=False), + sa.Column("acknowledged_at", sa.DateTime(), nullable=True), + sa.Column("acknowledged_by", sa.Uuid(), nullable=True), + sa.Column("resolved_at", sa.DateTime(), nullable=True), + sa.Column("metadata_", sa.JSON(), nullable=True), + sa.Column("created_at", sa.DateTime(), nullable=False), + sa.Column("updated_at", sa.DateTime(), nullable=False), + sa.ForeignKeyConstraint(["organization_id"], ["organizations.id"], ondelete="CASCADE"), + sa.ForeignKeyConstraint(["rule_id"], ["alert_rules.id"], ondelete="CASCADE"), + sa.ForeignKeyConstraint(["gateway_id"], ["gateways.id"], ondelete="CASCADE"), + sa.ForeignKeyConstraint(["acknowledged_by"], ["users.id"], ondelete="SET NULL"), + sa.PrimaryKeyConstraint("id"), + ) + op.create_index( + op.f("ix_alert_events_organization_id"), + "alert_events", + ["organization_id"], + unique=False, + ) + op.create_index( + op.f("ix_alert_events_rule_id"), + "alert_events", + ["rule_id"], + unique=False, + ) + op.create_index( + op.f("ix_alert_events_gateway_id"), + "alert_events", + ["gateway_id"], + unique=False, + ) + op.create_index( + op.f("ix_alert_events_metric_type"), + "alert_events", + ["metric_type"], + unique=False, + ) + op.create_index( + op.f("ix_alert_events_org_created"), + "alert_events", + ["organization_id", "created_at"], + unique=False, + ) + op.create_index( + op.f("ix_alert_events_org_gateway"), + "alert_events", + ["organization_id", "gateway_id"], + unique=False, + ) + op.create_index( + op.f("ix_alert_events_rule_created"), + "alert_events", + ["rule_id", "created_at"], + unique=False, + ) + + +def downgrade() -> None: + """Drop monitoring tables.""" + op.drop_index(op.f("ix_alert_events_rule_created"), table_name="alert_events") + op.drop_index(op.f("ix_alert_events_org_gateway"), table_name="alert_events") + op.drop_index(op.f("ix_alert_events_org_created"), table_name="alert_events") + op.drop_index(op.f("ix_alert_events_metric_type"), table_name="alert_events") + op.drop_index(op.f("ix_alert_events_gateway_id"), table_name="alert_events") + op.drop_index(op.f("ix_alert_events_rule_id"), table_name="alert_events") + op.drop_index(op.f("ix_alert_events_organization_id"), table_name="alert_events") + op.drop_table("alert_events") + + op.drop_index(op.f("ix_alert_rules_org_gateway"), table_name="alert_rules") + op.drop_index(op.f("ix_alert_rules_org_created"), table_name="alert_rules") + op.drop_index(op.f("ix_alert_rules_metric_type"), table_name="alert_rules") + op.drop_index(op.f("ix_alert_rules_organization_id"), table_name="alert_rules") + op.drop_table("alert_rules") + + op.drop_index(op.f("ix_system_health_metrics_org_gateway"), table_name="system_health_metrics") + op.drop_index(op.f("ix_system_health_metrics_org_created"), table_name="system_health_metrics") + op.drop_index(op.f("ix_system_health_metrics_gateway_id"), table_name="system_health_metrics") + op.drop_index(op.f("ix_system_health_metrics_organization_id"), table_name="system_health_metrics") + op.drop_table("system_health_metrics") + + op.drop_index(op.f("ix_sub_agent_runs_org_gateway"), table_name="sub_agent_runs") + op.drop_index(op.f("ix_sub_agent_runs_org_created"), table_name="sub_agent_runs") + op.drop_index(op.f("ix_sub_agent_runs_session_event_id"), table_name="sub_agent_runs") + op.drop_index(op.f("ix_sub_agent_runs_parent_session_key"), table_name="sub_agent_runs") + op.drop_index(op.f("ix_sub_agent_runs_gateway_id"), table_name="sub_agent_runs") + op.drop_index(op.f("ix_sub_agent_runs_organization_id"), table_name="sub_agent_runs") + op.drop_table("sub_agent_runs") + + op.drop_index(op.f("ix_session_events_org_gateway"), table_name="session_events") + op.drop_index(op.f("ix_session_events_org_created"), table_name="session_events") + op.drop_index(op.f("ix_session_events_session_key"), table_name="session_events") + op.drop_index(op.f("ix_session_events_gateway_id"), table_name="session_events") + op.drop_index(op.f("ix_session_events_organization_id"), table_name="session_events") + op.drop_table("session_events") + + op.drop_index(op.f("ix_cron_job_statuses_org_gateway"), table_name="cron_job_statuses") + op.drop_index(op.f("ix_cron_job_statuses_org_created"), table_name="cron_job_statuses") + op.drop_index(op.f("ix_cron_job_statuses_job_name"), table_name="cron_job_statuses") + op.drop_index(op.f("ix_cron_job_statuses_gateway_id"), table_name="cron_job_statuses") + op.drop_index(op.f("ix_cron_job_statuses_organization_id"), table_name="cron_job_statuses") + op.drop_table("cron_job_statuses") + + op.drop_index(op.f("ix_cost_snapshots_org_gateway"), table_name="cost_snapshots") + op.drop_index(op.f("ix_cost_snapshots_org_created"), table_name="cost_snapshots") + op.drop_index(op.f("ix_cost_snapshots_gateway_id"), table_name="cost_snapshots") + op.drop_index(op.f("ix_cost_snapshots_organization_id"), table_name="cost_snapshots") + op.drop_table("cost_snapshots") diff --git a/src/backend/migrations/versions/8f9a0b1c2d3e_add_cascade_and_composite_indexes.py b/src/backend/migrations/versions/8f9a0b1c2d3e_add_cascade_and_composite_indexes.py new file mode 100644 index 0000000..fc7f4c4 --- /dev/null +++ b/src/backend/migrations/versions/8f9a0b1c2d3e_add_cascade_and_composite_indexes.py @@ -0,0 +1,160 @@ +"""Add missing CASCADE rules and composite indexes for monitoring tables. + +Revision ID: 8f9a0b1c2d3e +Revises: 7a8b9c0d1e2f +Create Date: 2026-05-10 19:00:00.000000 + +""" + +from __future__ import annotations + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = "8f9a0b1c2d3e" +down_revision = "7a8b9c0d1e2f" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + """Add CASCADE rules and composite indexes.""" + conn = op.get_bind() + + # Use separate transactions for each operation to avoid failure chain + # First check and drop existing constraints individually + tables_and_fks = [ + ("cost_snapshots", ["organization_id", "gateway_id"]), + ("cron_job_statuses", ["organization_id", "gateway_id"]), + ("session_events", ["organization_id", "gateway_id"]), + ("sub_agent_runs", ["organization_id", "gateway_id", "session_event_id"]), + ("system_health_metrics", ["organization_id", "gateway_id"]), + ("alert_rules", ["organization_id"]), + ("alert_events", ["organization_id", "rule_id", "gateway_id", "acknowledged_by"]), + ] + + # Drop existing constraints (ignore if they don't exist) + for table, columns in tables_and_fks: + for col in columns: + constraint_name = f"{table}_{col}_fkey" + try: + op.execute(f"ALTER TABLE {table} DROP CONSTRAINT IF EXISTS {constraint_name}") + except Exception: + pass + + # Add CASCADE constraints + # cost_snapshots + op.execute("ALTER TABLE cost_snapshots ADD CONSTRAINT cost_snapshots_organization_id_fkey FOREIGN KEY (organization_id) REFERENCES organizations(id) ON DELETE CASCADE") + op.execute("ALTER TABLE cost_snapshots ADD CONSTRAINT cost_snapshots_gateway_id_fkey FOREIGN KEY (gateway_id) REFERENCES gateways(id) ON DELETE CASCADE") + + # cron_job_statuses + op.execute("ALTER TABLE cron_job_statuses ADD CONSTRAINT cron_job_statuses_organization_id_fkey FOREIGN KEY (organization_id) REFERENCES organizations(id) ON DELETE CASCADE") + op.execute("ALTER TABLE cron_job_statuses ADD CONSTRAINT cron_job_statuses_gateway_id_fkey FOREIGN KEY (gateway_id) REFERENCES gateways(id) ON DELETE CASCADE") + + # session_events + op.execute("ALTER TABLE session_events ADD CONSTRAINT session_events_organization_id_fkey FOREIGN KEY (organization_id) REFERENCES organizations(id) ON DELETE CASCADE") + op.execute("ALTER TABLE session_events ADD CONSTRAINT session_events_gateway_id_fkey FOREIGN KEY (gateway_id) REFERENCES gateways(id) ON DELETE CASCADE") + + # sub_agent_runs + op.execute("ALTER TABLE sub_agent_runs ADD CONSTRAINT sub_agent_runs_organization_id_fkey FOREIGN KEY (organization_id) REFERENCES organizations(id) ON DELETE CASCADE") + op.execute("ALTER TABLE sub_agent_runs ADD CONSTRAINT sub_agent_runs_gateway_id_fkey FOREIGN KEY (gateway_id) REFERENCES gateways(id) ON DELETE CASCADE") + op.execute("ALTER TABLE sub_agent_runs ADD CONSTRAINT sub_agent_runs_session_event_id_fkey FOREIGN KEY (session_event_id) REFERENCES session_events(id) ON DELETE CASCADE") + + # system_health_metrics + op.execute("ALTER TABLE system_health_metrics ADD CONSTRAINT system_health_metrics_organization_id_fkey FOREIGN KEY (organization_id) REFERENCES organizations(id) ON DELETE CASCADE") + op.execute("ALTER TABLE system_health_metrics ADD CONSTRAINT system_health_metrics_gateway_id_fkey FOREIGN KEY (gateway_id) REFERENCES gateways(id) ON DELETE CASCADE") + + # alert_rules + op.execute("ALTER TABLE alert_rules ADD CONSTRAINT alert_rules_organization_id_fkey FOREIGN KEY (organization_id) REFERENCES organizations(id) ON DELETE CASCADE") + + # alert_events + op.execute("ALTER TABLE alert_events ADD CONSTRAINT alert_events_organization_id_fkey FOREIGN KEY (organization_id) REFERENCES organizations(id) ON DELETE CASCADE") + op.execute("ALTER TABLE alert_events ADD CONSTRAINT alert_events_rule_id_fkey FOREIGN KEY (rule_id) REFERENCES alert_rules(id) ON DELETE CASCADE") + op.execute("ALTER TABLE alert_events ADD CONSTRAINT alert_events_gateway_id_fkey FOREIGN KEY (gateway_id) REFERENCES gateways(id) ON DELETE CASCADE") + op.execute("ALTER TABLE alert_events ADD CONSTRAINT alert_events_acknowledged_by_fkey FOREIGN KEY (acknowledged_by) REFERENCES users(id) ON DELETE SET NULL") + + # Add composite indexes + op.execute("CREATE INDEX IF NOT EXISTS ix_cost_snapshots_org_created ON cost_snapshots (organization_id, created_at)") + op.execute("CREATE INDEX IF NOT EXISTS ix_cost_snapshots_org_gateway ON cost_snapshots (organization_id, gateway_id)") + op.execute("CREATE INDEX IF NOT EXISTS ix_cron_job_statuses_org_created ON cron_job_statuses (organization_id, created_at)") + op.execute("CREATE INDEX IF NOT EXISTS ix_cron_job_statuses_org_gateway ON cron_job_statuses (organization_id, gateway_id)") + op.execute("CREATE INDEX IF NOT EXISTS ix_session_events_org_created ON session_events (organization_id, created_at)") + op.execute("CREATE INDEX IF NOT EXISTS ix_session_events_org_gateway ON session_events (organization_id, gateway_id)") + op.execute("CREATE INDEX IF NOT EXISTS ix_sub_agent_runs_org_created ON sub_agent_runs (organization_id, created_at)") + op.execute("CREATE INDEX IF NOT EXISTS ix_sub_agent_runs_org_gateway ON sub_agent_runs (organization_id, gateway_id)") + op.execute("CREATE INDEX IF NOT EXISTS ix_system_health_metrics_org_created ON system_health_metrics (organization_id, created_at)") + op.execute("CREATE INDEX IF NOT EXISTS ix_system_health_metrics_org_gateway ON system_health_metrics (organization_id, gateway_id)") + op.execute("CREATE INDEX IF NOT EXISTS ix_alert_rules_org_created ON alert_rules (organization_id, created_at)") + op.execute("CREATE INDEX IF NOT EXISTS ix_alert_events_org_created ON alert_events (organization_id, created_at)") + op.execute("CREATE INDEX IF NOT EXISTS ix_alert_events_org_gateway ON alert_events (organization_id, gateway_id)") + op.execute("CREATE INDEX IF NOT EXISTS ix_alert_events_rule_created ON alert_events (rule_id, created_at)") + + +def downgrade() -> None: + """Remove CASCADE rules and composite indexes.""" + conn = op.get_bind() + + # Drop indexes + op.execute("DROP INDEX IF EXISTS ix_cost_snapshots_org_created") + op.execute("DROP INDEX IF EXISTS ix_cost_snapshots_org_gateway") + op.execute("DROP INDEX IF EXISTS ix_cron_job_statuses_org_created") + op.execute("DROP INDEX IF EXISTS ix_cron_job_statuses_org_gateway") + op.execute("DROP INDEX IF EXISTS ix_session_events_org_created") + op.execute("DROP INDEX IF EXISTS ix_session_events_org_gateway") + op.execute("DROP INDEX IF EXISTS ix_sub_agent_runs_org_created") + op.execute("DROP INDEX IF EXISTS ix_sub_agent_runs_org_gateway") + op.execute("DROP INDEX IF EXISTS ix_system_health_metrics_org_created") + op.execute("DROP INDEX IF EXISTS ix_system_health_metrics_org_gateway") + op.execute("DROP INDEX IF EXISTS ix_alert_rules_org_created") + op.execute("DROP INDEX IF EXISTS ix_alert_events_org_created") + op.execute("DROP INDEX IF EXISTS ix_alert_events_org_gateway") + op.execute("DROP INDEX IF EXISTS ix_alert_events_rule_created") + + # Drop constraints (recreate without CASCADE) + # cost_snapshots + op.execute("ALTER TABLE cost_snapshots DROP CONSTRAINT IF EXISTS cost_snapshots_organization_id_fkey") + op.execute("ALTER TABLE cost_snapshots DROP CONSTRAINT IF EXISTS cost_snapshots_gateway_id_fkey") + op.execute("ALTER TABLE cost_snapshots ADD CONSTRAINT cost_snapshots_organization_id_fkey FOREIGN KEY (organization_id) REFERENCES organizations(id)") + op.execute("ALTER TABLE cost_snapshots ADD CONSTRAINT cost_snapshots_gateway_id_fkey FOREIGN KEY (gateway_id) REFERENCES gateways(id)") + + # cron_job_statuses + op.execute("ALTER TABLE cron_job_statuses DROP CONSTRAINT IF EXISTS cron_job_statuses_organization_id_fkey") + op.execute("ALTER TABLE cron_job_statuses DROP CONSTRAINT IF EXISTS cron_job_statuses_gateway_id_fkey") + op.execute("ALTER TABLE cron_job_statuses ADD CONSTRAINT cron_job_statuses_organization_id_fkey FOREIGN KEY (organization_id) REFERENCES organizations(id)") + op.execute("ALTER TABLE cron_job_statuses ADD CONSTRAINT cron_job_statuses_gateway_id_fkey FOREIGN KEY (gateway_id) REFERENCES gateways(id)") + + # session_events + op.execute("ALTER TABLE session_events DROP CONSTRAINT IF EXISTS session_events_organization_id_fkey") + op.execute("ALTER TABLE session_events DROP CONSTRAINT IF EXISTS session_events_gateway_id_fkey") + op.execute("ALTER TABLE session_events ADD CONSTRAINT session_events_organization_id_fkey FOREIGN KEY (organization_id) REFERENCES organizations(id)") + op.execute("ALTER TABLE session_events ADD CONSTRAINT session_events_gateway_id_fkey FOREIGN KEY (gateway_id) REFERENCES gateways(id)") + + # sub_agent_runs + op.execute("ALTER TABLE sub_agent_runs DROP CONSTRAINT IF EXISTS sub_agent_runs_organization_id_fkey") + op.execute("ALTER TABLE sub_agent_runs DROP CONSTRAINT IF EXISTS sub_agent_runs_gateway_id_fkey") + op.execute("ALTER TABLE sub_agent_runs DROP CONSTRAINT IF EXISTS sub_agent_runs_session_event_id_fkey") + op.execute("ALTER TABLE sub_agent_runs ADD CONSTRAINT sub_agent_runs_organization_id_fkey FOREIGN KEY (organization_id) REFERENCES organizations(id)") + op.execute("ALTER TABLE sub_agent_runs ADD CONSTRAINT sub_agent_runs_gateway_id_fkey FOREIGN KEY (gateway_id) REFERENCES gateways(id)") + op.execute("ALTER TABLE sub_agent_runs ADD CONSTRAINT sub_agent_runs_session_event_id_fkey FOREIGN KEY (session_event_id) REFERENCES session_events(id)") + + # system_health_metrics + op.execute("ALTER TABLE system_health_metrics DROP CONSTRAINT IF EXISTS system_health_metrics_organization_id_fkey") + op.execute("ALTER TABLE system_health_metrics DROP CONSTRAINT IF EXISTS system_health_metrics_gateway_id_fkey") + op.execute("ALTER TABLE system_health_metrics ADD CONSTRAINT system_health_metrics_organization_id_fkey FOREIGN KEY (organization_id) REFERENCES organizations(id)") + op.execute("ALTER TABLE system_health_metrics ADD CONSTRAINT system_health_metrics_gateway_id_fkey FOREIGN KEY (gateway_id) REFERENCES gateways(id)") + + # alert_rules + op.execute("ALTER TABLE alert_rules DROP CONSTRAINT IF EXISTS alert_rules_organization_id_fkey") + op.execute("ALTER TABLE alert_rules ADD CONSTRAINT alert_rules_organization_id_fkey FOREIGN KEY (organization_id) REFERENCES organizations(id)") + + # alert_events + op.execute("ALTER TABLE alert_events DROP CONSTRAINT IF EXISTS alert_events_organization_id_fkey") + op.execute("ALTER TABLE alert_events DROP CONSTRAINT IF EXISTS alert_events_rule_id_fkey") + op.execute("ALTER TABLE alert_events DROP CONSTRAINT IF EXISTS alert_events_gateway_id_fkey") + op.execute("ALTER TABLE alert_events DROP CONSTRAINT IF EXISTS alert_events_acknowledged_by_fkey") + op.execute("ALTER TABLE alert_events ADD CONSTRAINT alert_events_organization_id_fkey FOREIGN KEY (organization_id) REFERENCES organizations(id)") + op.execute("ALTER TABLE alert_events ADD CONSTRAINT alert_events_rule_id_fkey FOREIGN KEY (rule_id) REFERENCES alert_rules(id)") + op.execute("ALTER TABLE alert_events ADD CONSTRAINT alert_events_gateway_id_fkey FOREIGN KEY (gateway_id) REFERENCES gateways(id)") + op.execute("ALTER TABLE alert_events ADD CONSTRAINT alert_events_acknowledged_by_fkey FOREIGN KEY (acknowledged_by) REFERENCES users(id)")