import os, re, yaml from fastapi import FastAPI, Request, HTTPException from pydantic import BaseModel try: import docker except Exception: docker = None PORT = int(os.getenv("PORT", "8080")) GUARD_DEFAULT_MAX = 25 RULES_PATH = "rules.yaml" if os.path.exists(RULES_PATH): with open(RULES_PATH, "r") as f: RULES = yaml.safe_load(f) or {} else: RULES = {} ALLOWED_REGEX = re.compile(RULES.get("guardrails", {}).get("allowed_services_regex", ".*")) MAX_SCALE = int(RULES.get("guardrails", {}).get("max_scale_replicas", GUARD_DEFAULT_MAX)) DOCKER_SOCK = "/var/run/docker.sock" _client = None def get_client(): global _client if _client is not None: return _client if docker is None: raise HTTPException(status_code=500, detail="docker SDK not available in image.") if not os.path.exists(DOCKER_SOCK): raise HTTPException(status_code=500, detail=f"Docker socket not found at {DOCKER_SOCK}. Did you mount it?") try: _client = docker.DockerClient(base_url=f'unix://{DOCKER_SOCK}') _client.ping() return _client except Exception as e: raise HTTPException(status_code=500, detail=f"Cannot connect to Docker at {DOCKER_SOCK}: {e}") app = FastAPI(title="BurnServ AI Ops Agent") def _guard_service(service_name: str): if not ALLOWED_REGEX.match(service_name): raise HTTPException(status_code=403, detail=f"Service '{service_name}' not allowed by guardrails.") def _scale(service_name: str, replicas: int = None, step: int = None, min_replicas: int = None, max_replicas: int = None): cli = get_client() _guard_service(service_name) try: svc = cli.services.get(service_name) except Exception: raise HTTPException(status_code=404, detail=f"Service '{service_name}' not found") spec = svc.attrs.get('Spec', {}).copy() mode = spec.get('Mode', {}) replicas_current = mode.get('Replicated', {}).get('Replicas', 1) if replicas is None: tgt = int(replicas_current) + int(step or 1) if min_replicas is not None: tgt = max(tgt, int(min_replicas)) if max_replicas is not None: tgt = min(tgt, int(max_replicas)) replicas = tgt replicas = max(1, min(int(replicas), MAX_SCALE)) mode['Replicated'] = {'Replicas': replicas} spec['Mode'] = mode try: svc.update(task_template=spec.get('TaskTemplate'), **{k: v for k, v in spec.items() if k != 'TaskTemplate'}) except Exception: svc.update(**spec) return {"service": service_name, "replicas": replicas} def _restart_service(service_name: str): cli = get_client() _guard_service(service_name) try: svc = cli.services.get(service_name) except Exception: raise HTTPException(status_code=404, detail=f"Service '{service_name}' not found") spec = svc.attrs.get('Spec', {}).copy() try: current_index = svc.attrs.get('Version', {}).get('Index', 0) svc.update(**spec, force_update=current_index + 1) except Exception: svc.update(**spec) return {"service": service_name, "status": "rolling-restart-issued"} class Command(BaseModel): text: str = "" action: str | None = None params: dict | None = None @app.get("/health") def health(): return {"ok": True} @app.get("/diagnostics") def diagnostics(): info = { "docker_sock_exists": os.path.exists(DOCKER_SOCK), "docker_sock_path": DOCKER_SOCK, "uid": os.getuid() if hasattr(os, "getuid") else "n/a", "gid": os.getgid() if hasattr(os, "getgid") else "n/a", "env_PORT": PORT, "rules_loaded": bool(RULES), } try: cli = get_client() info["docker_ping"] = True info["server_version"] = cli.version() except HTTPException as he: info["docker_ping"] = False info["error"] = he.detail except Exception as e: info["docker_ping"] = False info["error"] = str(e) return info @app.post("/command") def command(cmd: Command): if cmd.action: if cmd.action == "scale": p = cmd.params or {} if "service" not in p or "replicas" not in p: raise HTTPException(status_code=400, detail="Missing params for scale: service, replicas") return _scale(p["service"], replicas=int(p["replicas"])) elif cmd.action == "restart_service": p = cmd.params or {} if "service" not in p: raise HTTPException(status_code=400, detail="Missing service param for restart_service") return _restart_service(p["service"]) raise HTTPException(status_code=400, detail="Unknown action") t = (cmd.text or "").strip().lower() if not t: raise HTTPException(status_code=400, detail="Empty command") if t.startswith("scale "): parts = t.split() try: svc = parts[1] if "to" in parts: idx_to = parts.index("to") reps = int(parts[idx_to + 1]) return _scale(svc, replicas=reps) return _scale(svc, step=1) except Exception: raise HTTPException(status_code=400, detail="Format: 'scale to '") if t.startswith("restart "): try: svc = t.split()[1] return _restart_service(svc) except Exception: raise HTTPException(status_code=400, detail="Format: 'restart '") raise HTTPException(status_code=400, detail="Unrecognized command") @app.post("/alert") async def alert(request: Request): payload = await request.json() alerts = payload.get("alerts", []) executed = [] for a in alerts: labels = a.get("labels", {}) or {} for rule in RULES.get("alerts", []): match = rule.get("match", {}) if all(labels.get(k) == v for k, v in match.items()): for act in rule.get("actions", []): if "scale" in act: cfg = act["scale"].copy() svc = cfg.get("service", "").replace("{{ $labels.service_name }}", labels.get("service_name", "")) res = _scale( service_name=svc, replicas=cfg.get("replicas"), step=cfg.get("step"), min_replicas=cfg.get("min_replicas"), max_replicas=cfg.get("max_replicas", MAX_SCALE), ) executed.append({"alert": labels.get("alertname"), "action": "scale", "result": res}) elif "restart_service" in act: cfg = act["restart_service"].copy() svc = cfg.get("service", "").replace("{{ $labels.service_name }}", labels.get("service_name", "")) res = _restart_service(svc) executed.append({"alert": labels.get("alertname"), "action": "restart_service", "result": res}) return {"executed": executed}