# runbook.py — Automated Incident Response
import asyncio, httpx, logging
from datetime import datetime
logger = logging.getLogger(__name__)
STEPS = [
"Detect unhealthy state",
"Drain connection pool",
"Restart payment processor",
"Verify health endpoint",
"Send recovery notification",
]
async def run_runbook(service_url: str) -> dict:
results = []
start = datetime.utcnow()
for i, step in enumerate(STEPS):
logger.info(f"[{i+1}/5] {step}")
await asyncio.sleep(2 + (i * 0.5)) # simulate work
results.append({"step": step, "status": "done", "ts": datetime.utcnow().isoformat()})
elapsed = (datetime.utcnow() - start).total_seconds()
return {"steps": results, "elapsed_seconds": round(elapsed, 1)}
# trigger.py — Chaos trigger & runbook dispatcher
from fastapi import APIRouter
from pydantic import BaseModel
import asyncio, random
router = APIRouter()
class ChaosRequest(BaseModel):
mode: str = "connection_pool"
auto_runbook_delay: int = 8
@router.post("/admin/chaos")
async def trigger_chaos(req: ChaosRequest):
"""Inject failure into payment service"""
await inject_failure(req.mode)
if req.auto_runbook_delay > 0:
asyncio.create_task(
delayed_runbook(req.auto_runbook_delay)
)
return {"status": "chaos_injected", "mode": req.mode}
async def delayed_runbook(delay: int):
await asyncio.sleep(delay)
await run_runbook(auto=True)
# health_check.py — Continuous health monitoring
import asyncio, httpx
from datetime import datetime
HEALTH_ENDPOINT = "/health"
CHECK_INTERVAL = 10 # seconds
MTTR_TARGET = 30 # seconds
async def monitor_loop(service_url: str):
"""Poll health endpoint, track MTTD and MTTR"""
incident_start = None
while True:
try:
async with httpx.AsyncClient() as client:
r = await client.get(
f"{service_url}{HEALTH_ENDPOINT}",
timeout=5.0
)
healthy = r.status_code == 200
except Exception:
healthy = False
if not healthy and incident_start is None:
incident_start = datetime.utcnow()
await alert_oncall(incident_start)
elif healthy and incident_start is not None:
mttr = (datetime.utcnow() - incident_start).total_seconds()
await record_mttr(mttr)
incident_start = None
await asyncio.sleep(CHECK_INTERVAL)