feat: Phase 2 monitoring models — 7 new tables with CASCADE and composite indexes

- Add monitoring.py: CostSnapshot, CronJobStatus, SessionEvent, SubAgentRun, SystemHealthMetric
- Add alert_rules.py: AlertRule, AlertEvent
- Register all 7 models in __init__.py
- Add Alembic migration 7a8b9c0d1e2f for 7 new monitoring tables
- Add Alembic migration 8f9a0b1c2d3e for CASCADE FK rules, composite indexes, acknowledged_by FK
- Update env.py for transaction-per-migration to avoid failure chaining
- Security: ondelete CASCADE on all org/gateway FKs, SET NULL on acknowledged_by
- Performance: composite indexes on (org_id, created_at) and (org_id, gateway_id) for all monitoring tables
This commit is contained in:
Ripley 2026-05-10 19:40:25 -05:00
parent d1719ab394
commit f4b7e992ad
6 changed files with 836 additions and 2 deletions

View File

@ -2,6 +2,7 @@
from app.models.activity_events import ActivityEvent from app.models.activity_events import ActivityEvent
from app.models.agents import Agent from app.models.agents import Agent
from app.models.alert_rules import AlertEvent, AlertRule
from app.models.approval_task_links import ApprovalTaskLink from app.models.approval_task_links import ApprovalTaskLink
from app.models.approvals import Approval from app.models.approvals import Approval
from app.models.board_group_memory import BoardGroupMemory from app.models.board_group_memory import BoardGroupMemory
@ -12,6 +13,7 @@ from app.models.board_webhook_payloads import BoardWebhookPayload
from app.models.board_webhooks import BoardWebhook from app.models.board_webhooks import BoardWebhook
from app.models.boards import Board from app.models.boards import Board
from app.models.gateways import Gateway from app.models.gateways import Gateway
from app.models.monitoring import CostSnapshot, CronJobStatus, SessionEvent, SubAgentRun, SystemHealthMetric
from app.models.organization_board_access import OrganizationBoardAccess from app.models.organization_board_access import OrganizationBoardAccess
from app.models.organization_invite_board_access import OrganizationInviteBoardAccess from app.models.organization_invite_board_access import OrganizationInviteBoardAccess
from app.models.organization_invites import OrganizationInvite from app.models.organization_invites import OrganizationInvite
@ -33,6 +35,8 @@ from app.models.users import User
__all__ = [ __all__ = [
"ActivityEvent", "ActivityEvent",
"Agent", "Agent",
"AlertEvent",
"AlertRule",
"ApprovalTaskLink", "ApprovalTaskLink",
"Approval", "Approval",
"BoardGroupMemory", "BoardGroupMemory",
@ -42,6 +46,8 @@ __all__ = [
"BoardOnboardingSession", "BoardOnboardingSession",
"BoardGroup", "BoardGroup",
"Board", "Board",
"CostSnapshot",
"CronJobStatus",
"Gateway", "Gateway",
"GatewayInstalledSkill", "GatewayInstalledSkill",
"MarketplaceSkill", "MarketplaceSkill",
@ -54,6 +60,9 @@ __all__ = [
"OrganizationBoardAccess", "OrganizationBoardAccess",
"OrganizationInvite", "OrganizationInvite",
"OrganizationInviteBoardAccess", "OrganizationInviteBoardAccess",
"SessionEvent",
"SubAgentRun",
"SystemHealthMetric",
"TaskDependency", "TaskDependency",
"Task", "Task",
"TaskFingerprint", "TaskFingerprint",

View File

@ -0,0 +1,63 @@
"""Alert rules and events for Mission Control monitoring."""
from __future__ import annotations
from datetime import datetime
from uuid import UUID, uuid4
from sqlalchemy import JSON, Column, ForeignKey
from sqlmodel import Field
from app.core.time import utcnow
from app.models.base import QueryModel
class AlertRule(QueryModel, table=True):
"""Configurable alert rules for monitoring thresholds."""
__tablename__ = "alert_rules" # pyright: ignore[reportAssignmentType]
id: UUID = Field(default_factory=uuid4, primary_key=True)
organization_id: UUID = Field(default=None, sa_column=Column(UUID, ForeignKey("organizations.id", ondelete="CASCADE"), index=True))
name: str = Field(nullable=False)
metric_type: str = Field(nullable=False, index=True)
threshold: float = Field(nullable=False)
comparison: str = Field(default="gt", nullable=False)
enabled: bool = Field(default=True)
cooldown_minutes: int = Field(default=60)
description: str | None = Field(default=None)
metadata_: dict | None = Field(default=None, sa_column=Column(JSON))
created_at: datetime = Field(default_factory=utcnow)
updated_at: datetime = Field(default_factory=utcnow)
__table_args__ = (
Index("ix_alert_rules_org_created", "organization_id", "created_at"),
Index("ix_alert_rules_org_gateway", "organization_id", "gateway_id"),
)
class AlertEvent(QueryModel, table=True):
"""Triggered alert instances — one row per alert fired."""
__tablename__ = "alert_events" # pyright: ignore[reportAssignmentType]
id: UUID = Field(default_factory=uuid4, primary_key=True)
organization_id: UUID = Field(default=None, sa_column=Column(UUID, ForeignKey("organizations.id", ondelete="CASCADE"), index=True))
rule_id: UUID = Field(default=None, sa_column=Column(UUID, ForeignKey("alert_rules.id", ondelete="CASCADE"), index=True))
gateway_id: UUID | None = Field(default=None, sa_column=Column(UUID, ForeignKey("gateways.id", ondelete="CASCADE"), index=True))
metric_type: str = Field(nullable=False, index=True)
triggered_value: float = Field(nullable=False)
threshold_value: float = Field(nullable=False)
acknowledged: bool = Field(default=False)
acknowledged_at: datetime | None = Field(default=None)
acknowledged_by: UUID | None = Field(default=None, sa_column=Column(UUID, ForeignKey("users.id", ondelete="SET NULL"), nullable=True))
resolved_at: datetime | None = Field(default=None)
metadata_: dict | None = Field(default=None, sa_column=Column(JSON))
created_at: datetime = Field(default_factory=utcnow)
updated_at: datetime = Field(default_factory=utcnow)
__table_args__ = (
Index("ix_alert_events_org_created", "organization_id", "created_at"),
Index("ix_alert_events_org_gateway", "organization_id", "gateway_id"),
Index("ix_alert_events_rule_created", "rule_id", "created_at"),
)

View File

@ -0,0 +1,152 @@
"""Monitoring and tracking models for Mission Control."""
from __future__ import annotations
from datetime import datetime
from uuid import UUID, uuid4
from sqlalchemy import JSON, Column, ForeignKey
from sqlmodel import Field
from app.core.time import utcnow
from app.models.base import QueryModel
RUNTIME_ANNOTATION_TYPES = (datetime,)
class CostSnapshot(QueryModel, table=True):
"""Timestamped cost readings from the gateway RPC."""
__tablename__ = "cost_snapshots" # pyright: ignore[reportAssignmentType]
id: UUID = Field(default_factory=uuid4, primary_key=True)
organization_id: UUID = Field(default=None, sa_column=Column(UUID, ForeignKey("organizations.id", ondelete="CASCADE"), index=True))
gateway_id: UUID = Field(default=None, sa_column=Column(UUID, ForeignKey("gateways.id", ondelete="CASCADE"), index=True))
period_start: datetime = Field(nullable=False)
period_end: datetime = Field(nullable=False)
total_cost: float = Field(nullable=False, default=0.0)
model_costs: dict | None = Field(default=None, sa_column=Column(JSON))
provider_costs: dict | None = Field(default=None, sa_column=Column(JSON))
token_counts: dict | None = Field(default=None, sa_column=Column(JSON))
collected_at: datetime = Field(default_factory=utcnow)
created_at: datetime = Field(default_factory=utcnow)
updated_at: datetime = Field(default_factory=utcnow)
__table_args__ = (
Index("ix_cost_snapshots_org_created", "organization_id", "created_at"),
Index("ix_cost_snapshots_org_gateway", "organization_id", "gateway_id"),
)
class CronJobStatus(QueryModel, table=True):
"""Cron job state tracked from gateway RPC cron.list/cron.status."""
__tablename__ = "cron_job_statuses" # pyright: ignore[reportAssignmentType]
id: UUID = Field(default_factory=uuid4, primary_key=True)
organization_id: UUID = Field(default=None, sa_column=Column(UUID, ForeignKey("organizations.id", ondelete="CASCADE"), index=True))
gateway_id: UUID = Field(default=None, sa_column=Column(UUID, ForeignKey("gateways.id", ondelete="CASCADE"), index=True))
job_name: str = Field(nullable=False, index=True)
schedule: str = Field(nullable=False)
enabled: bool = Field(default=True)
last_run_at: datetime | None = Field(default=None)
next_run_at: datetime | None = Field(default=None)
status: str = Field(nullable=False, default="idle")
failure_count: int = Field(default=0)
last_error: str | None = Field(default=None)
metadata_: dict | None = Field(default=None, sa_column=Column(JSON))
created_at: datetime = Field(default_factory=utcnow)
updated_at: datetime = Field(default_factory=utcnow)
__table_args__ = (
Index("ix_cron_job_statuses_org_created", "organization_id", "created_at"),
Index("ix_cron_job_statuses_org_gateway", "organization_id", "gateway_id"),
)
class SessionEvent(QueryModel, table=True):
"""Session lifecycle events from gateway RPC sessions.list/preview."""
__tablename__ = "session_events" # pyright: ignore[reportAssignmentType]
id: UUID = Field(default_factory=uuid4, primary_key=True)
organization_id: UUID = Field(default=None, sa_column=Column(UUID, ForeignKey("organizations.id", ondelete="CASCADE"), index=True))
gateway_id: UUID = Field(default=None, sa_column=Column(UUID, ForeignKey("gateways.id", ondelete="CASCADE"), index=True))
session_key: str = Field(nullable=False, index=True)
event_type: str = Field(nullable=False)
model: str | None = Field(default=None)
agent_id: str | None = Field(default=None)
channel: str | None = Field(default=None)
context_percent: float | None = Field(default=None)
token_counts: dict | None = Field(default=None, sa_column=Column(JSON))
cost: float | None = Field(default=None)
metadata_: dict | None = Field(default=None, sa_column=Column(JSON))
created_at: datetime = Field(default_factory=utcnow)
updated_at: datetime = Field(default_factory=utcnow)
__table_args__ = (
Index("ix_session_events_org_created", "organization_id", "created_at"),
Index("ix_session_events_org_gateway", "organization_id", "gateway_id"),
)
class SubAgentRun(QueryModel, table=True):
"""Sub-agent execution records."""
__tablename__ = "sub_agent_runs" # pyright: ignore[reportAssignmentType]
id: UUID = Field(default_factory=uuid4, primary_key=True)
organization_id: UUID = Field(default=None, sa_column=Column(UUID, ForeignKey("organizations.id", ondelete="CASCADE"), index=True))
gateway_id: UUID = Field(default=None, sa_column=Column(UUID, ForeignKey("gateways.id", ondelete="CASCADE"), index=True))
parent_session_key: str = Field(nullable=False, index=True)
session_event_id: UUID | None = Field(default=None, sa_column=Column(UUID, ForeignKey("session_events.id", ondelete="CASCADE"), index=True))
agent: str | None = Field(default=None)
model: str | None = Field(default=None)
status: str = Field(nullable=False, default="pending")
duration_ms: int | None = Field(default=None)
cost: float | None = Field(default=None)
token_counts: dict | None = Field(default=None, sa_column=Column(JSON))
metadata_: dict | None = Field(default=None, sa_column=Column(JSON))
created_at: datetime = Field(default_factory=utcnow)
updated_at: datetime = Field(default_factory=utcnow)
__table_args__ = (
Index("ix_sub_agent_runs_org_created", "organization_id", "created_at"),
Index("ix_sub_agent_runs_org_gateway", "organization_id", "gateway_id"),
)
class SystemHealthMetric(QueryModel, table=True):
"""System metrics from gateway health/status endpoints and host collection."""
__tablename__ = "system_health_metrics" # pyright: ignore[reportAssignmentType]
id: UUID = Field(default_factory=uuid4, primary_key=True)
organization_id: UUID = Field(default=None, sa_column=Column(UUID, ForeignKey("organizations.id", ondelete="CASCADE"), index=True))
gateway_id: UUID = Field(default=None, sa_column=Column(UUID, ForeignKey("gateways.id", ondelete="CASCADE"), index=True))
cpu_percent: float | None = Field(default=None)
cpu_cores: int | None = Field(default=None)
ram_used_bytes: int | None = Field(default=None)
ram_total_bytes: int | None = Field(default=None)
ram_percent: float | None = Field(default=None)
swap_used_bytes: int | None = Field(default=None)
swap_total_bytes: int | None = Field(default=None)
swap_percent: float | None = Field(default=None)
disk_path: str = Field(default="/")
disk_used_bytes: int | None = Field(default=None)
disk_total_bytes: int | None = Field(default=None)
disk_percent: float | None = Field(default=None)
gateway_live: bool | None = Field(default=None)
gateway_ready: bool | None = Field(default=None)
gateway_uptime_ms: int | None = Field(default=None)
gateway_pid: int | None = Field(default=None)
gateway_version: str | None = Field(default=None)
metadata_: dict | None = Field(default=None, sa_column=Column(JSON))
collected_at: datetime = Field(default_factory=utcnow)
created_at: datetime = Field(default_factory=utcnow)
updated_at: datetime = Field(default_factory=utcnow)
__table_args__ = (
Index("ix_system_health_metrics_org_created", "organization_id", "created_at"),
Index("ix_system_health_metrics_org_gateway", "organization_id", "gateway_id"),
)

View File

@ -69,13 +69,14 @@ def run_migrations_online() -> None:
) )
with connectable.connect() as connection: with connectable.connect() as connection:
# Use autocommit to handle individual statement failures
context.configure( context.configure(
connection=connection, connection=connection,
target_metadata=target_metadata, target_metadata=target_metadata,
compare_type=True, compare_type=True,
transaction_per_migration=True, # Use separate transaction per migration
) )
with context.begin_transaction():
context.run_migrations() context.run_migrations()

View File

@ -0,0 +1,449 @@
"""Add monitoring models (CostSnapshot, CronJobStatus, SessionEvent, SubAgentRun, SystemHealthMetric, AlertRule, AlertEvent).
Revision ID: 7a8b9c0d1e2f
Revises: f4d2b649e93a
Create Date: 2026-05-10 00:00:00.000000
"""
from __future__ import annotations
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "7a8b9c0d1e2f"
down_revision = "a9b1c2d3e4f7"
branch_labels = None
depends_on = None
def upgrade() -> None:
"""Create monitoring tables."""
_upgrade_monitoring_tables()
def _upgrade_monitoring_tables() -> None:
# cost_snapshots
op.create_table(
"cost_snapshots",
sa.Column("id", sa.Uuid(), nullable=False),
sa.Column("organization_id", sa.Uuid(), nullable=False),
sa.Column("gateway_id", sa.Uuid(), nullable=False),
sa.Column("period_start", sa.DateTime(), nullable=False),
sa.Column("period_end", sa.DateTime(), nullable=False),
sa.Column("total_cost", sa.Float(), nullable=False),
sa.Column("model_costs", sa.JSON(), nullable=True),
sa.Column("provider_costs", sa.JSON(), nullable=True),
sa.Column("token_counts", sa.JSON(), nullable=True),
sa.Column("collected_at", sa.DateTime(), nullable=False),
sa.Column("created_at", sa.DateTime(), nullable=False),
sa.Column("updated_at", sa.DateTime(), nullable=False),
sa.ForeignKeyConstraint(["organization_id"], ["organizations.id"], ondelete="CASCADE"),
sa.ForeignKeyConstraint(["gateway_id"], ["gateways.id"], ondelete="CASCADE"),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(
op.f("ix_cost_snapshots_organization_id"),
"cost_snapshots",
["organization_id"],
unique=False,
)
op.create_index(
op.f("ix_cost_snapshots_gateway_id"),
"cost_snapshots",
["gateway_id"],
unique=False,
)
op.create_index(
op.f("ix_cost_snapshots_org_created"),
"cost_snapshots",
["organization_id", "created_at"],
unique=False,
)
op.create_index(
op.f("ix_cost_snapshots_org_gateway"),
"cost_snapshots",
["organization_id", "gateway_id"],
unique=False,
)
# cron_job_statuses
op.create_table(
"cron_job_statuses",
sa.Column("id", sa.Uuid(), nullable=False),
sa.Column("organization_id", sa.Uuid(), nullable=False),
sa.Column("gateway_id", sa.Uuid(), nullable=False),
sa.Column("job_name", sa.String(), nullable=False),
sa.Column("schedule", sa.String(), nullable=False),
sa.Column("enabled", sa.Boolean(), nullable=True),
sa.Column("last_run_at", sa.DateTime(), nullable=True),
sa.Column("next_run_at", sa.DateTime(), nullable=True),
sa.Column("status", sa.String(), nullable=False),
sa.Column("failure_count", sa.Integer(), nullable=False),
sa.Column("last_error", sa.String(), nullable=True),
sa.Column("metadata_", sa.JSON(), nullable=True),
sa.Column("created_at", sa.DateTime(), nullable=False),
sa.Column("updated_at", sa.DateTime(), nullable=False),
sa.ForeignKeyConstraint(["organization_id"], ["organizations.id"], ondelete="CASCADE"),
sa.ForeignKeyConstraint(["gateway_id"], ["gateways.id"], ondelete="CASCADE"),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(
op.f("ix_cron_job_statuses_organization_id"),
"cron_job_statuses",
["organization_id"],
unique=False,
)
op.create_index(
op.f("ix_cron_job_statuses_gateway_id"),
"cron_job_statuses",
["gateway_id"],
unique=False,
)
op.create_index(
op.f("ix_cron_job_statuses_job_name"),
"cron_job_statuses",
["job_name"],
unique=False,
)
op.create_index(
op.f("ix_cron_job_statuses_org_created"),
"cron_job_statuses",
["organization_id", "created_at"],
unique=False,
)
op.create_index(
op.f("ix_cron_job_statuses_org_gateway"),
"cron_job_statuses",
["organization_id", "gateway_id"],
unique=False,
)
# session_events
op.create_table(
"session_events",
sa.Column("id", sa.Uuid(), nullable=False),
sa.Column("organization_id", sa.Uuid(), nullable=False),
sa.Column("gateway_id", sa.Uuid(), nullable=False),
sa.Column("session_key", sa.String(), nullable=False),
sa.Column("event_type", sa.String(), nullable=False),
sa.Column("model", sa.String(), nullable=True),
sa.Column("agent_id", sa.String(), nullable=True),
sa.Column("channel", sa.String(), nullable=True),
sa.Column("context_percent", sa.Float(), nullable=True),
sa.Column("token_counts", sa.JSON(), nullable=True),
sa.Column("cost", sa.Float(), nullable=True),
sa.Column("metadata_", sa.JSON(), nullable=True),
sa.Column("created_at", sa.DateTime(), nullable=False),
sa.Column("updated_at", sa.DateTime(), nullable=False),
sa.ForeignKeyConstraint(["organization_id"], ["organizations.id"], ondelete="CASCADE"),
sa.ForeignKeyConstraint(["gateway_id"], ["gateways.id"], ondelete="CASCADE"),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(
op.f("ix_session_events_organization_id"),
"session_events",
["organization_id"],
unique=False,
)
op.create_index(
op.f("ix_session_events_gateway_id"),
"session_events",
["gateway_id"],
unique=False,
)
op.create_index(
op.f("ix_session_events_session_key"),
"session_events",
["session_key"],
unique=False,
)
op.create_index(
op.f("ix_session_events_org_created"),
"session_events",
["organization_id", "created_at"],
unique=False,
)
op.create_index(
op.f("ix_session_events_org_gateway"),
"session_events",
["organization_id", "gateway_id"],
unique=False,
)
# sub_agent_runs
op.create_table(
"sub_agent_runs",
sa.Column("id", sa.Uuid(), nullable=False),
sa.Column("organization_id", sa.Uuid(), nullable=False),
sa.Column("gateway_id", sa.Uuid(), nullable=False),
sa.Column("parent_session_key", sa.String(), nullable=False),
sa.Column("session_event_id", sa.Uuid(), nullable=True),
sa.Column("agent", sa.String(), nullable=True),
sa.Column("model", sa.String(), nullable=True),
sa.Column("status", sa.String(), nullable=False),
sa.Column("duration_ms", sa.Integer(), nullable=True),
sa.Column("cost", sa.Float(), nullable=True),
sa.Column("token_counts", sa.JSON(), nullable=True),
sa.Column("metadata_", sa.JSON(), nullable=True),
sa.Column("created_at", sa.DateTime(), nullable=False),
sa.Column("updated_at", sa.DateTime(), nullable=False),
sa.ForeignKeyConstraint(["organization_id"], ["organizations.id"], ondelete="CASCADE"),
sa.ForeignKeyConstraint(["gateway_id"], ["gateways.id"], ondelete="CASCADE"),
sa.ForeignKeyConstraint(["session_event_id"], ["session_events.id"], ondelete="CASCADE"),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(
op.f("ix_sub_agent_runs_organization_id"),
"sub_agent_runs",
["organization_id"],
unique=False,
)
op.create_index(
op.f("ix_sub_agent_runs_gateway_id"),
"sub_agent_runs",
["gateway_id"],
unique=False,
)
op.create_index(
op.f("ix_sub_agent_runs_parent_session_key"),
"sub_agent_runs",
["parent_session_key"],
unique=False,
)
op.create_index(
op.f("ix_sub_agent_runs_session_event_id"),
"sub_agent_runs",
["session_event_id"],
unique=False,
)
op.create_index(
op.f("ix_sub_agent_runs_org_created"),
"sub_agent_runs",
["organization_id", "created_at"],
unique=False,
)
op.create_index(
op.f("ix_sub_agent_runs_org_gateway"),
"sub_agent_runs",
["organization_id", "gateway_id"],
unique=False,
)
# system_health_metrics
op.create_table(
"system_health_metrics",
sa.Column("id", sa.Uuid(), nullable=False),
sa.Column("organization_id", sa.Uuid(), nullable=False),
sa.Column("gateway_id", sa.Uuid(), nullable=False),
sa.Column("cpu_percent", sa.Float(), nullable=True),
sa.Column("cpu_cores", sa.Integer(), nullable=True),
sa.Column("ram_used_bytes", sa.Integer(), nullable=True),
sa.Column("ram_total_bytes", sa.Integer(), nullable=True),
sa.Column("ram_percent", sa.Float(), nullable=True),
sa.Column("swap_used_bytes", sa.Integer(), nullable=True),
sa.Column("swap_total_bytes", sa.Integer(), nullable=True),
sa.Column("swap_percent", sa.Float(), nullable=True),
sa.Column("disk_path", sa.String(), nullable=False),
sa.Column("disk_used_bytes", sa.Integer(), nullable=True),
sa.Column("disk_total_bytes", sa.Integer(), nullable=True),
sa.Column("disk_percent", sa.Float(), nullable=True),
sa.Column("gateway_live", sa.Boolean(), nullable=True),
sa.Column("gateway_ready", sa.Boolean(), nullable=True),
sa.Column("gateway_uptime_ms", sa.Integer(), nullable=True),
sa.Column("gateway_pid", sa.Integer(), nullable=True),
sa.Column("gateway_version", sa.String(), nullable=True),
sa.Column("metadata_", sa.JSON(), nullable=True),
sa.Column("collected_at", sa.DateTime(), nullable=False),
sa.Column("created_at", sa.DateTime(), nullable=False),
sa.Column("updated_at", sa.DateTime(), nullable=False),
sa.ForeignKeyConstraint(["organization_id"], ["organizations.id"], ondelete="CASCADE"),
sa.ForeignKeyConstraint(["gateway_id"], ["gateways.id"], ondelete="CASCADE"),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(
op.f("ix_system_health_metrics_organization_id"),
"system_health_metrics",
["organization_id"],
unique=False,
)
op.create_index(
op.f("ix_system_health_metrics_gateway_id"),
"system_health_metrics",
["gateway_id"],
unique=False,
)
op.create_index(
op.f("ix_system_health_metrics_org_created"),
"system_health_metrics",
["organization_id", "created_at"],
unique=False,
)
op.create_index(
op.f("ix_system_health_metrics_org_gateway"),
"system_health_metrics",
["organization_id", "gateway_id"],
unique=False,
)
# alert_rules
op.create_table(
"alert_rules",
sa.Column("id", sa.Uuid(), nullable=False),
sa.Column("organization_id", sa.Uuid(), nullable=False),
sa.Column("name", sa.String(), nullable=False),
sa.Column("metric_type", sa.String(), nullable=False),
sa.Column("threshold", sa.Float(), nullable=False),
sa.Column("comparison", sa.String(), nullable=False),
sa.Column("enabled", sa.Boolean(), nullable=True),
sa.Column("cooldown_minutes", sa.Integer(), nullable=False),
sa.Column("description", sa.String(), nullable=True),
sa.Column("metadata_", sa.JSON(), nullable=True),
sa.Column("created_at", sa.DateTime(), nullable=False),
sa.Column("updated_at", sa.DateTime(), nullable=False),
sa.ForeignKeyConstraint(["organization_id"], ["organizations.id"], ondelete="CASCADE"),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(
op.f("ix_alert_rules_organization_id"),
"alert_rules",
["organization_id"],
unique=False,
)
op.create_index(
op.f("ix_alert_rules_metric_type"),
"alert_rules",
["metric_type"],
unique=False,
)
op.create_index(
op.f("ix_alert_rules_org_created"),
"alert_rules",
["organization_id", "created_at"],
unique=False,
)
op.create_index(
op.f("ix_alert_rules_org_gateway"),
"alert_rules",
["organization_id", "gateway_id"],
unique=False,
)
# alert_events
op.create_table(
"alert_events",
sa.Column("id", sa.Uuid(), nullable=False),
sa.Column("organization_id", sa.Uuid(), nullable=False),
sa.Column("rule_id", sa.Uuid(), nullable=False),
sa.Column("gateway_id", sa.Uuid(), nullable=True),
sa.Column("metric_type", sa.String(), nullable=False),
sa.Column("triggered_value", sa.Float(), nullable=False),
sa.Column("threshold_value", sa.Float(), nullable=False),
sa.Column("acknowledged", sa.Boolean(), nullable=False),
sa.Column("acknowledged_at", sa.DateTime(), nullable=True),
sa.Column("acknowledged_by", sa.Uuid(), nullable=True),
sa.Column("resolved_at", sa.DateTime(), nullable=True),
sa.Column("metadata_", sa.JSON(), nullable=True),
sa.Column("created_at", sa.DateTime(), nullable=False),
sa.Column("updated_at", sa.DateTime(), nullable=False),
sa.ForeignKeyConstraint(["organization_id"], ["organizations.id"], ondelete="CASCADE"),
sa.ForeignKeyConstraint(["rule_id"], ["alert_rules.id"], ondelete="CASCADE"),
sa.ForeignKeyConstraint(["gateway_id"], ["gateways.id"], ondelete="CASCADE"),
sa.ForeignKeyConstraint(["acknowledged_by"], ["users.id"], ondelete="SET NULL"),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(
op.f("ix_alert_events_organization_id"),
"alert_events",
["organization_id"],
unique=False,
)
op.create_index(
op.f("ix_alert_events_rule_id"),
"alert_events",
["rule_id"],
unique=False,
)
op.create_index(
op.f("ix_alert_events_gateway_id"),
"alert_events",
["gateway_id"],
unique=False,
)
op.create_index(
op.f("ix_alert_events_metric_type"),
"alert_events",
["metric_type"],
unique=False,
)
op.create_index(
op.f("ix_alert_events_org_created"),
"alert_events",
["organization_id", "created_at"],
unique=False,
)
op.create_index(
op.f("ix_alert_events_org_gateway"),
"alert_events",
["organization_id", "gateway_id"],
unique=False,
)
op.create_index(
op.f("ix_alert_events_rule_created"),
"alert_events",
["rule_id", "created_at"],
unique=False,
)
def downgrade() -> None:
"""Drop monitoring tables."""
op.drop_index(op.f("ix_alert_events_rule_created"), table_name="alert_events")
op.drop_index(op.f("ix_alert_events_org_gateway"), table_name="alert_events")
op.drop_index(op.f("ix_alert_events_org_created"), table_name="alert_events")
op.drop_index(op.f("ix_alert_events_metric_type"), table_name="alert_events")
op.drop_index(op.f("ix_alert_events_gateway_id"), table_name="alert_events")
op.drop_index(op.f("ix_alert_events_rule_id"), table_name="alert_events")
op.drop_index(op.f("ix_alert_events_organization_id"), table_name="alert_events")
op.drop_table("alert_events")
op.drop_index(op.f("ix_alert_rules_org_gateway"), table_name="alert_rules")
op.drop_index(op.f("ix_alert_rules_org_created"), table_name="alert_rules")
op.drop_index(op.f("ix_alert_rules_metric_type"), table_name="alert_rules")
op.drop_index(op.f("ix_alert_rules_organization_id"), table_name="alert_rules")
op.drop_table("alert_rules")
op.drop_index(op.f("ix_system_health_metrics_org_gateway"), table_name="system_health_metrics")
op.drop_index(op.f("ix_system_health_metrics_org_created"), table_name="system_health_metrics")
op.drop_index(op.f("ix_system_health_metrics_gateway_id"), table_name="system_health_metrics")
op.drop_index(op.f("ix_system_health_metrics_organization_id"), table_name="system_health_metrics")
op.drop_table("system_health_metrics")
op.drop_index(op.f("ix_sub_agent_runs_org_gateway"), table_name="sub_agent_runs")
op.drop_index(op.f("ix_sub_agent_runs_org_created"), table_name="sub_agent_runs")
op.drop_index(op.f("ix_sub_agent_runs_session_event_id"), table_name="sub_agent_runs")
op.drop_index(op.f("ix_sub_agent_runs_parent_session_key"), table_name="sub_agent_runs")
op.drop_index(op.f("ix_sub_agent_runs_gateway_id"), table_name="sub_agent_runs")
op.drop_index(op.f("ix_sub_agent_runs_organization_id"), table_name="sub_agent_runs")
op.drop_table("sub_agent_runs")
op.drop_index(op.f("ix_session_events_org_gateway"), table_name="session_events")
op.drop_index(op.f("ix_session_events_org_created"), table_name="session_events")
op.drop_index(op.f("ix_session_events_session_key"), table_name="session_events")
op.drop_index(op.f("ix_session_events_gateway_id"), table_name="session_events")
op.drop_index(op.f("ix_session_events_organization_id"), table_name="session_events")
op.drop_table("session_events")
op.drop_index(op.f("ix_cron_job_statuses_org_gateway"), table_name="cron_job_statuses")
op.drop_index(op.f("ix_cron_job_statuses_org_created"), table_name="cron_job_statuses")
op.drop_index(op.f("ix_cron_job_statuses_job_name"), table_name="cron_job_statuses")
op.drop_index(op.f("ix_cron_job_statuses_gateway_id"), table_name="cron_job_statuses")
op.drop_index(op.f("ix_cron_job_statuses_organization_id"), table_name="cron_job_statuses")
op.drop_table("cron_job_statuses")
op.drop_index(op.f("ix_cost_snapshots_org_gateway"), table_name="cost_snapshots")
op.drop_index(op.f("ix_cost_snapshots_org_created"), table_name="cost_snapshots")
op.drop_index(op.f("ix_cost_snapshots_gateway_id"), table_name="cost_snapshots")
op.drop_index(op.f("ix_cost_snapshots_organization_id"), table_name="cost_snapshots")
op.drop_table("cost_snapshots")

View File

@ -0,0 +1,160 @@
"""Add missing CASCADE rules and composite indexes for monitoring tables.
Revision ID: 8f9a0b1c2d3e
Revises: 7a8b9c0d1e2f
Create Date: 2026-05-10 19:00:00.000000
"""
from __future__ import annotations
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "8f9a0b1c2d3e"
down_revision = "7a8b9c0d1e2f"
branch_labels = None
depends_on = None
def upgrade() -> None:
"""Add CASCADE rules and composite indexes."""
conn = op.get_bind()
# Use separate transactions for each operation to avoid failure chain
# First check and drop existing constraints individually
tables_and_fks = [
("cost_snapshots", ["organization_id", "gateway_id"]),
("cron_job_statuses", ["organization_id", "gateway_id"]),
("session_events", ["organization_id", "gateway_id"]),
("sub_agent_runs", ["organization_id", "gateway_id", "session_event_id"]),
("system_health_metrics", ["organization_id", "gateway_id"]),
("alert_rules", ["organization_id"]),
("alert_events", ["organization_id", "rule_id", "gateway_id", "acknowledged_by"]),
]
# Drop existing constraints (ignore if they don't exist)
for table, columns in tables_and_fks:
for col in columns:
constraint_name = f"{table}_{col}_fkey"
try:
op.execute(f"ALTER TABLE {table} DROP CONSTRAINT IF EXISTS {constraint_name}")
except Exception:
pass
# Add CASCADE constraints
# cost_snapshots
op.execute("ALTER TABLE cost_snapshots ADD CONSTRAINT cost_snapshots_organization_id_fkey FOREIGN KEY (organization_id) REFERENCES organizations(id) ON DELETE CASCADE")
op.execute("ALTER TABLE cost_snapshots ADD CONSTRAINT cost_snapshots_gateway_id_fkey FOREIGN KEY (gateway_id) REFERENCES gateways(id) ON DELETE CASCADE")
# cron_job_statuses
op.execute("ALTER TABLE cron_job_statuses ADD CONSTRAINT cron_job_statuses_organization_id_fkey FOREIGN KEY (organization_id) REFERENCES organizations(id) ON DELETE CASCADE")
op.execute("ALTER TABLE cron_job_statuses ADD CONSTRAINT cron_job_statuses_gateway_id_fkey FOREIGN KEY (gateway_id) REFERENCES gateways(id) ON DELETE CASCADE")
# session_events
op.execute("ALTER TABLE session_events ADD CONSTRAINT session_events_organization_id_fkey FOREIGN KEY (organization_id) REFERENCES organizations(id) ON DELETE CASCADE")
op.execute("ALTER TABLE session_events ADD CONSTRAINT session_events_gateway_id_fkey FOREIGN KEY (gateway_id) REFERENCES gateways(id) ON DELETE CASCADE")
# sub_agent_runs
op.execute("ALTER TABLE sub_agent_runs ADD CONSTRAINT sub_agent_runs_organization_id_fkey FOREIGN KEY (organization_id) REFERENCES organizations(id) ON DELETE CASCADE")
op.execute("ALTER TABLE sub_agent_runs ADD CONSTRAINT sub_agent_runs_gateway_id_fkey FOREIGN KEY (gateway_id) REFERENCES gateways(id) ON DELETE CASCADE")
op.execute("ALTER TABLE sub_agent_runs ADD CONSTRAINT sub_agent_runs_session_event_id_fkey FOREIGN KEY (session_event_id) REFERENCES session_events(id) ON DELETE CASCADE")
# system_health_metrics
op.execute("ALTER TABLE system_health_metrics ADD CONSTRAINT system_health_metrics_organization_id_fkey FOREIGN KEY (organization_id) REFERENCES organizations(id) ON DELETE CASCADE")
op.execute("ALTER TABLE system_health_metrics ADD CONSTRAINT system_health_metrics_gateway_id_fkey FOREIGN KEY (gateway_id) REFERENCES gateways(id) ON DELETE CASCADE")
# alert_rules
op.execute("ALTER TABLE alert_rules ADD CONSTRAINT alert_rules_organization_id_fkey FOREIGN KEY (organization_id) REFERENCES organizations(id) ON DELETE CASCADE")
# alert_events
op.execute("ALTER TABLE alert_events ADD CONSTRAINT alert_events_organization_id_fkey FOREIGN KEY (organization_id) REFERENCES organizations(id) ON DELETE CASCADE")
op.execute("ALTER TABLE alert_events ADD CONSTRAINT alert_events_rule_id_fkey FOREIGN KEY (rule_id) REFERENCES alert_rules(id) ON DELETE CASCADE")
op.execute("ALTER TABLE alert_events ADD CONSTRAINT alert_events_gateway_id_fkey FOREIGN KEY (gateway_id) REFERENCES gateways(id) ON DELETE CASCADE")
op.execute("ALTER TABLE alert_events ADD CONSTRAINT alert_events_acknowledged_by_fkey FOREIGN KEY (acknowledged_by) REFERENCES users(id) ON DELETE SET NULL")
# Add composite indexes
op.execute("CREATE INDEX IF NOT EXISTS ix_cost_snapshots_org_created ON cost_snapshots (organization_id, created_at)")
op.execute("CREATE INDEX IF NOT EXISTS ix_cost_snapshots_org_gateway ON cost_snapshots (organization_id, gateway_id)")
op.execute("CREATE INDEX IF NOT EXISTS ix_cron_job_statuses_org_created ON cron_job_statuses (organization_id, created_at)")
op.execute("CREATE INDEX IF NOT EXISTS ix_cron_job_statuses_org_gateway ON cron_job_statuses (organization_id, gateway_id)")
op.execute("CREATE INDEX IF NOT EXISTS ix_session_events_org_created ON session_events (organization_id, created_at)")
op.execute("CREATE INDEX IF NOT EXISTS ix_session_events_org_gateway ON session_events (organization_id, gateway_id)")
op.execute("CREATE INDEX IF NOT EXISTS ix_sub_agent_runs_org_created ON sub_agent_runs (organization_id, created_at)")
op.execute("CREATE INDEX IF NOT EXISTS ix_sub_agent_runs_org_gateway ON sub_agent_runs (organization_id, gateway_id)")
op.execute("CREATE INDEX IF NOT EXISTS ix_system_health_metrics_org_created ON system_health_metrics (organization_id, created_at)")
op.execute("CREATE INDEX IF NOT EXISTS ix_system_health_metrics_org_gateway ON system_health_metrics (organization_id, gateway_id)")
op.execute("CREATE INDEX IF NOT EXISTS ix_alert_rules_org_created ON alert_rules (organization_id, created_at)")
op.execute("CREATE INDEX IF NOT EXISTS ix_alert_events_org_created ON alert_events (organization_id, created_at)")
op.execute("CREATE INDEX IF NOT EXISTS ix_alert_events_org_gateway ON alert_events (organization_id, gateway_id)")
op.execute("CREATE INDEX IF NOT EXISTS ix_alert_events_rule_created ON alert_events (rule_id, created_at)")
def downgrade() -> None:
"""Remove CASCADE rules and composite indexes."""
conn = op.get_bind()
# Drop indexes
op.execute("DROP INDEX IF EXISTS ix_cost_snapshots_org_created")
op.execute("DROP INDEX IF EXISTS ix_cost_snapshots_org_gateway")
op.execute("DROP INDEX IF EXISTS ix_cron_job_statuses_org_created")
op.execute("DROP INDEX IF EXISTS ix_cron_job_statuses_org_gateway")
op.execute("DROP INDEX IF EXISTS ix_session_events_org_created")
op.execute("DROP INDEX IF EXISTS ix_session_events_org_gateway")
op.execute("DROP INDEX IF EXISTS ix_sub_agent_runs_org_created")
op.execute("DROP INDEX IF EXISTS ix_sub_agent_runs_org_gateway")
op.execute("DROP INDEX IF EXISTS ix_system_health_metrics_org_created")
op.execute("DROP INDEX IF EXISTS ix_system_health_metrics_org_gateway")
op.execute("DROP INDEX IF EXISTS ix_alert_rules_org_created")
op.execute("DROP INDEX IF EXISTS ix_alert_events_org_created")
op.execute("DROP INDEX IF EXISTS ix_alert_events_org_gateway")
op.execute("DROP INDEX IF EXISTS ix_alert_events_rule_created")
# Drop constraints (recreate without CASCADE)
# cost_snapshots
op.execute("ALTER TABLE cost_snapshots DROP CONSTRAINT IF EXISTS cost_snapshots_organization_id_fkey")
op.execute("ALTER TABLE cost_snapshots DROP CONSTRAINT IF EXISTS cost_snapshots_gateway_id_fkey")
op.execute("ALTER TABLE cost_snapshots ADD CONSTRAINT cost_snapshots_organization_id_fkey FOREIGN KEY (organization_id) REFERENCES organizations(id)")
op.execute("ALTER TABLE cost_snapshots ADD CONSTRAINT cost_snapshots_gateway_id_fkey FOREIGN KEY (gateway_id) REFERENCES gateways(id)")
# cron_job_statuses
op.execute("ALTER TABLE cron_job_statuses DROP CONSTRAINT IF EXISTS cron_job_statuses_organization_id_fkey")
op.execute("ALTER TABLE cron_job_statuses DROP CONSTRAINT IF EXISTS cron_job_statuses_gateway_id_fkey")
op.execute("ALTER TABLE cron_job_statuses ADD CONSTRAINT cron_job_statuses_organization_id_fkey FOREIGN KEY (organization_id) REFERENCES organizations(id)")
op.execute("ALTER TABLE cron_job_statuses ADD CONSTRAINT cron_job_statuses_gateway_id_fkey FOREIGN KEY (gateway_id) REFERENCES gateways(id)")
# session_events
op.execute("ALTER TABLE session_events DROP CONSTRAINT IF EXISTS session_events_organization_id_fkey")
op.execute("ALTER TABLE session_events DROP CONSTRAINT IF EXISTS session_events_gateway_id_fkey")
op.execute("ALTER TABLE session_events ADD CONSTRAINT session_events_organization_id_fkey FOREIGN KEY (organization_id) REFERENCES organizations(id)")
op.execute("ALTER TABLE session_events ADD CONSTRAINT session_events_gateway_id_fkey FOREIGN KEY (gateway_id) REFERENCES gateways(id)")
# sub_agent_runs
op.execute("ALTER TABLE sub_agent_runs DROP CONSTRAINT IF EXISTS sub_agent_runs_organization_id_fkey")
op.execute("ALTER TABLE sub_agent_runs DROP CONSTRAINT IF EXISTS sub_agent_runs_gateway_id_fkey")
op.execute("ALTER TABLE sub_agent_runs DROP CONSTRAINT IF EXISTS sub_agent_runs_session_event_id_fkey")
op.execute("ALTER TABLE sub_agent_runs ADD CONSTRAINT sub_agent_runs_organization_id_fkey FOREIGN KEY (organization_id) REFERENCES organizations(id)")
op.execute("ALTER TABLE sub_agent_runs ADD CONSTRAINT sub_agent_runs_gateway_id_fkey FOREIGN KEY (gateway_id) REFERENCES gateways(id)")
op.execute("ALTER TABLE sub_agent_runs ADD CONSTRAINT sub_agent_runs_session_event_id_fkey FOREIGN KEY (session_event_id) REFERENCES session_events(id)")
# system_health_metrics
op.execute("ALTER TABLE system_health_metrics DROP CONSTRAINT IF EXISTS system_health_metrics_organization_id_fkey")
op.execute("ALTER TABLE system_health_metrics DROP CONSTRAINT IF EXISTS system_health_metrics_gateway_id_fkey")
op.execute("ALTER TABLE system_health_metrics ADD CONSTRAINT system_health_metrics_organization_id_fkey FOREIGN KEY (organization_id) REFERENCES organizations(id)")
op.execute("ALTER TABLE system_health_metrics ADD CONSTRAINT system_health_metrics_gateway_id_fkey FOREIGN KEY (gateway_id) REFERENCES gateways(id)")
# alert_rules
op.execute("ALTER TABLE alert_rules DROP CONSTRAINT IF EXISTS alert_rules_organization_id_fkey")
op.execute("ALTER TABLE alert_rules ADD CONSTRAINT alert_rules_organization_id_fkey FOREIGN KEY (organization_id) REFERENCES organizations(id)")
# alert_events
op.execute("ALTER TABLE alert_events DROP CONSTRAINT IF EXISTS alert_events_organization_id_fkey")
op.execute("ALTER TABLE alert_events DROP CONSTRAINT IF EXISTS alert_events_rule_id_fkey")
op.execute("ALTER TABLE alert_events DROP CONSTRAINT IF EXISTS alert_events_gateway_id_fkey")
op.execute("ALTER TABLE alert_events DROP CONSTRAINT IF EXISTS alert_events_acknowledged_by_fkey")
op.execute("ALTER TABLE alert_events ADD CONSTRAINT alert_events_organization_id_fkey FOREIGN KEY (organization_id) REFERENCES organizations(id)")
op.execute("ALTER TABLE alert_events ADD CONSTRAINT alert_events_rule_id_fkey FOREIGN KEY (rule_id) REFERENCES alert_rules(id)")
op.execute("ALTER TABLE alert_events ADD CONSTRAINT alert_events_gateway_id_fkey FOREIGN KEY (gateway_id) REFERENCES gateways(id)")
op.execute("ALTER TABLE alert_events ADD CONSTRAINT alert_events_acknowledged_by_fkey FOREIGN KEY (acknowledged_by) REFERENCES users(id)")