import os, re, yaml from fastapi import FastAPI, Request, HTTPException from pydantic import BaseModel try: import docker except Exception: docker = None PORT = int(os.getenv("PORT", "8080")) GUARD_DEFAULT_MAX = 25 RULES_PATH = "rules.yaml" if os.path.exists(RULES_PATH): with open(RULES_PATH, "r") as f: RULES = yaml.safe_load(f) or {} else: RULES = {} ALLOWED_REGEX = re.compile(RULES.get("guardrails", {}).get("allowed_services_regex", ".*")) MAX_SCALE = int(RULES.get("guardrails", {}).get("max_scale_replicas", GUARD_DEFAULT_MAX)) DOCKER_SOCK = "/var/run/docker.sock" _client = None def get_client(): global _client if _client is not None: return _client if docker is None: raise HTTPException(500, "docker SDK not available in image.") if not os.path.exists(DOCKER_SOCK): raise HTTPException(500, f"Docker socket not found at {DOCKER_SOCK}. Did you mount it?") try: _client = docker.DockerClient(base_url=f'unix://{DOCKER_SOCK}') _client.ping() return _client except Exception as e: raise HTTPException(500, f"Cannot connect to Docker at {DOCKER_SOCK}: {e}") app = FastAPI(title="BurnServ AI Ops Agent") def _guard_service(service_name: str): if not ALLOWED_REGEX.match(service_name): raise HTTPException(403, f"Service '{service_name}' not allowed by guardrails.") def _scale(service_name: str, replicas: int = None, step: int = None, min_replicas: int = None, max_replicas: int = None): cli = get_client() _guard_service(service_name) try: svc = cli.services.get(service_name) except Exception: raise HTTPException(404, f"Service '{service_name}' not found") spec = svc.attrs.get('Spec', {}).copy() mode = spec.get('Mode', {}) replicas_current = mode.get('Replicated', {}).get('Replicas', 1) if replicas is None: tgt = int(replicas_current) + int(step or 1) if min_replicas is not None: tgt = max(tgt, int(min_replicas)) if max_replicas is not None: tgt = min(tgt, int(max_replicas)) replicas = tgt replicas = max(1, min(int(replicas), MAX_SCALE)) mode['Replicated'] = {'Replicas': replicas} spec['Mode'] = mode try: svc.update(task_template=spec.get('TaskTemplate'), **{k: v for k, v in spec.items() if k != 'TaskTemplate'}) except Exception: svc.update(**spec) return {"service": service_name, "replicas": replicas} def _restart_service(service_name: str): cli = get_client() _guard_service(service_name) try: svc = cli.services.get(service_name) except Exception: raise HTTPException(404, f"Service '{service_name}' not found") # Use SDK-native ForceUpdate bump with current spec fetched by the SDK try: # Passing fetch_current_spec=True lets docker-py pull the live spec, # and force_update tells Swarm to redeploy tasks (rolling restart) svc.update(fetch_current_spec=True, force_update=(int(os.getenv("FORCE_UPDATE_BUMP", "1")))) return {"service": service_name, "status": "rolling-restart-issued"} except Exception as e: # Fallback: manually bump TaskTemplate.ForceUpdate in-place spec = svc.attrs.get('Spec', {}).copy() tt = spec.get('TaskTemplate') or {} fu = int(tt.get('ForceUpdate') or 0) tt['ForceUpdate'] = fu + 1 spec['TaskTemplate'] = tt try: svc.update(task_template=spec.get('TaskTemplate'), name=spec.get('Name'), labels=spec.get('Labels'), mode=spec.get('Mode'), update_config=spec.get('UpdateConfig'), networks=spec.get('Networks'), endpoint_spec=spec.get('EndpointSpec')) return {"service": service_name, "status": "rolling-restart-issued"} except Exception as e2: raise HTTPException(500, f"Restart failed: {e2}") class Command(BaseModel): text: str = "" action: str | None = None params: dict | None = None @app.get("/health") def health(): return {"ok": True} @app.get("/diagnostics") def diagnostics(): info = {"docker_sock_exists": os.path.exists(DOCKER_SOCK), "rules_loaded": bool(RULES)} try: cli = get_client() info["docker_ping"] = True info["server_version"] = cli.version() except Exception as e: info["docker_ping"] = False info["error"] = str(e) return info @app.post("/command") def command(cmd: Command): if cmd.action: if cmd.action == "scale": p = cmd.params or {} if "service" not in p or "replicas" not in p: raise HTTPException(400, "Missing params for scale: service, replicas") return _scale(p["service"], replicas=int(p["replicas"])) if cmd.action == "restart_service": p = cmd.params or {} if "service" not in p: raise HTTPException(400, "Missing service param for restart_service") return _restart_service(p["service"]) raise HTTPException(400, "Unknown action") t = (cmd.text or "").strip().lower() if not t: raise HTTPException(400, "Empty command") if t.startswith("scale "): parts = t.split() try: svc = parts[1] if "to" in parts: idx_to = parts.index("to") reps = int(parts[idx_to + 1]) return _scale(svc, replicas=reps) return _scale(svc, step=1) except Exception: raise HTTPException(400, "Format: 'scale to '") if t.startswith("restart "): try: svc = t.split()[1] return _restart_service(svc) except Exception: raise HTTPException(400, "Format: 'restart '") raise HTTPException(400, "Unrecognized command") @app.post("/alert") async def alert(request: Request): payload = await request.json() alerts = payload.get("alerts", []) executed = [] for a in alerts: labels = a.get("labels", {}) or {} for rule in RULES.get("alerts", []): match = rule.get("match", {}) if all(labels.get(k) == v for k, v in match.items()): for act in rule.get("actions", []): if "scale" in act: cfg = act["scale"].copy() svc = cfg.get("service", "").replace("{{ $labels.service_name }}", labels.get("service_name", "")) res = _scale(service_name=svc, replicas=cfg.get("replicas"), step=cfg.get("step"), min_replicas=cfg.get("min_replicas"), max_replicas=cfg.get("max_replicas", MAX_SCALE)) executed.append({"alert": labels.get("alertname"), "action": "scale", "result": res}) elif "restart_service" in act: cfg = act["restart_service"].copy() svc = cfg.get("service", "").replace("{{ $labels.service_name }}", labels.get("service_name", "")) res = _restart_service(svc) executed.append({"alert": labels.get("alertname"), "action": "restart_service", "result": res}) return {"executed": executed}