#!/usr/bin/env python3
"""
Local Agent API Server
Enables Copilot CLI integration via REST API + MCP Server
"""
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from typing import Optional, List, Dict, Any
import subprocess
import json
import logging
import asyncio
from datetime import datetime
from pathlib import Path
import hashlib
import os
# Setup logging
LOG_DIR = Path("/opt/local-agent/logs")
LOG_DIR.mkdir(parents=True, exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format='[%(asctime)s] %(levelname)s: %(message)s',
handlers=[
logging.FileHandler(LOG_DIR / "api.log"),
logging.StreamHandler()
]
)
log = logging.getLogger(__name__)
app = FastAPI(
title="Local AI Agent API",
version="1.0.0",
description="Copilot CLI Integration Bridge"
)
# Models
class CommandRequest(BaseModel):
command: str
timeout: int = 300
safe_mode: bool = True
class DeepSeekRequest(BaseModel):
query: str
model: str = "deepseek-chat"
reasoning: bool = False
json_output: bool = False
class AnsibleRequest(BaseModel):
playbook_path: str
extra_vars: Optional[Dict[str, Any]] = None
tags: Optional[str] = None
check: bool = False
class ExecutionResult(BaseModel):
status: str
stdout: str
stderr: str
code: int
duration: float
timestamp: str
# Dangerous patterns
DANGEROUS_PATTERNS = [
"rm -rf /",
"mkfs",
"dd if=/dev/zero",
":(){:|:&};",
"fork",
]
def check_safety(cmd: str) -> bool:
"""Validate command safety"""
for pattern in DANGEROUS_PATTERNS:
if pattern in cmd.lower():
log.warning(f"BLOCKED: {cmd} - dangerous pattern: {pattern}")
return False
return True
@app.get("/health")
async def health():
"""Health check"""
return {
"status": "healthy",
"version": "1.0.0",
"timestamp": datetime.now().isoformat()
}
@app.post("/execute")
async def execute_command(req: CommandRequest):
"""Execute shell command with safety checks"""
if req.safe_mode and not check_safety(req.command):
raise HTTPException(status_code=403, detail="Command blocked for safety")
log.info(f"EXEC: {req.command}")
try:
start = datetime.now()
result = subprocess.run(
req.command,
shell=True,
capture_output=True,
text=True,
timeout=req.timeout
)
duration = (datetime.now() - start).total_seconds()
output = ExecutionResult(
status="completed",
stdout=result.stdout,
stderr=result.stderr,
code=result.returncode,
duration=duration,
timestamp=datetime.now().isoformat()
)
log.info(f"COMPLETED: {req.command} (code: {result.returncode}, {duration:.2f}s)")
return output
except subprocess.TimeoutExpired:
log.error(f"TIMEOUT: {req.command}")
raise HTTPException(status_code=504, detail="Command timeout")
except Exception as e:
log.error(f"ERROR: {req.command} - {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/deepseek")
async def deepseek_query(req: DeepSeekRequest):
"""Query DeepSeek model"""
deepseek_api_wrapper = "/opt/local-agent/bin/deepseek-api.py"
log.info(f"DEEPSEEK: {req.query[:100]}...")
try:
cmd = ["python3", deepseek_api_wrapper, req.query]
# Ensure API key is in environment
env = os.environ.copy()
if "DEEPSEEK_API_KEY" not in env:
log.warning("DEEPSEEK_API_KEY not set in environment")
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=120,
env=env
)
if result.returncode == 0:
try:
output = json.loads(result.stdout)
except json.JSONDecodeError:
output = {"response": result.stdout}
log.info(f"DEEPSEEK SUCCESS: query completed")
return {
"status": "success",
"model": req.model,
"output": output,
"timestamp": datetime.now().isoformat()
}
else:
log.error(f"DEEPSEEK ERROR: {result.stderr}")
raise HTTPException(status_code=500, detail=result.stderr)
except subprocess.TimeoutExpired:
log.error("DEEPSEEK TIMEOUT")
raise HTTPException(status_code=504, detail="DeepSeek timeout")
except Exception as e:
log.error(f"DEEPSEEK ERROR: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/ansible")
async def run_ansible(req: AnsibleRequest):
"""Run Ansible playbook"""
playbook = Path(req.playbook_path)
if not playbook.exists():
raise HTTPException(status_code=404, detail=f"Playbook not found: {playbook}")
log.info(f"ANSIBLE: {playbook}")
try:
cmd = ["ansible-playbook", str(playbook)]
if req.extra_vars:
for key, value in req.extra_vars.items():
cmd.extend(["-e", f"{key}={value}"])
if req.tags:
cmd.extend(["--tags", req.tags])
if req.check:
cmd.append("--check")
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=600
)
log.info(f"ANSIBLE COMPLETED: code {result.returncode}")
return {
"status": "completed" if result.returncode == 0 else "failed",
"playbook": str(playbook),
"code": result.returncode,
"stdout": result.stdout,
"stderr": result.stderr,
"timestamp": datetime.now().isoformat()
}
except subprocess.TimeoutExpired:
log.error("ANSIBLE TIMEOUT")
raise HTTPException(status_code=504, detail="Ansible timeout")
except Exception as e:
log.error(f"ANSIBLE ERROR: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/services")
async def get_services_status():
"""Get status of managed services"""
services = ["ollama", "local-agent", "ssh"]
status = {}
for svc in services:
try:
result = subprocess.run(
["systemctl", "is-active", svc],
capture_output=True,
text=True
)
status[svc] = {
"active": result.returncode == 0,
"state": result.stdout.strip()
}
except Exception as e:
status[svc] = {"error": str(e)}
return status
@app.get("/logs")
async def get_logs(lines: int = 50):
"""Get recent agent logs"""
try:
result = subprocess.run(
["tail", "-n", str(lines), str(LOG_DIR / "agent.log")],
capture_output=True,
text=True
)
return {
"logs": result.stdout.split("\n"),
"timestamp": datetime.now().isoformat()
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/plan")
async def copilot_plan(plan_data: Dict[str, Any]):
"""Accept plan from Copilot CLI"""
log.info(f"PLAN RECEIVED: {plan_data.get('title', 'untitled')}")
# Store plan
plan_hash = hashlib.md5(
json.dumps(plan_data).encode()
).hexdigest()[:8]
plan_file = LOG_DIR / f"plan_{plan_hash}.json"
with open(plan_file, "w") as f:
json.dump({
"received_at": datetime.now().isoformat(),
**plan_data
}, f, indent=2)
log.info(f"PLAN STORED: {plan_file}")
return {
"status": "accepted",
"plan_id": plan_hash,
"stored_at": str(plan_file),
"timestamp": datetime.now().isoformat()
}
@app.post("/feedback")
async def execution_feedback(feedback: Dict[str, Any]):
"""Log execution feedback from local agent"""
log.info(f"FEEDBACK: {json.dumps(feedback)}")
return {
"status": "received",
"timestamp": datetime.now().isoformat()
}
@app.get("/")
async def root():
"""API documentation"""
return {
"name": "Local AI Agent API",
"version": "1.0.0",
"endpoints": {
"GET /health": "Health check",
"POST /execute": "Execute shell command",
"POST /deepseek": "Query DeepSeek",
"POST /ansible": "Run Ansible playbook",
"GET /services": "Service status",
"GET /logs": "Agent logs",
"POST /plan": "Receive Copilot plan",
"POST /feedback": "Log feedback"
},
"base_url": "http://localhost:8888",
"docs": "http://localhost:8888/docs"
}
if __name__ == "__main__":
import uvicorn
log.info("Starting Local Agent API Server...")
uvicorn.run(app, host="0.0.0.0", port=8888, log_level="info")