Files
StrikePackageGPT/services/hackgpt-api/app/main.py
mblanke 304d223a49 feat: Add Scan History tab with rescan capability
- New Scan History tab shows all past scans
- Each scan shows: tool, target, status, timestamp, findings count
- Rescan button to quickly re-run any previous scan with same settings
- Copy command button copies scan command to clipboard and terminal
- Clear All button to purge scan history
- Relative timestamps (e.g. '5 min ago', '2 hours ago')
- Status badges: running (yellow), completed (green), failed (red)
- Backend endpoints for clearing scan history
2025-11-28 14:55:29 -05:00

1026 lines
35 KiB
Python

"""
HackGPT API Service
Security-focused API that interfaces with LLM router and Kali container
for penetration testing and security analysis tasks.
"""
from fastapi import FastAPI, HTTPException, BackgroundTasks, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from typing import Optional, Literal, List, Dict, Any
import httpx
import asyncio
import os
import uuid
import json
from datetime import datetime
app = FastAPI(
title="HackGPT API",
description="AI-powered security analysis and penetration testing assistant",
version="0.2.0"
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Configuration
LLM_ROUTER_URL = os.getenv("LLM_ROUTER_URL", "http://strikepackage-llm-router:8000")
KALI_EXECUTOR_URL = os.getenv("KALI_EXECUTOR_URL", "http://strikepackage-kali-executor:8002")
# In-memory storage (use Redis in production)
tasks: Dict[str, Any] = {}
sessions: Dict[str, Dict] = {}
scan_results: Dict[str, Any] = {}
# ============== Models ==============
class ChatMessage(BaseModel):
role: Literal["system", "user", "assistant"]
content: str
timestamp: Optional[datetime] = None
class ChatRequest(BaseModel):
message: str
session_id: Optional[str] = None
context: Optional[str] = None
provider: str = "ollama"
model: str = "llama3.2"
class PhaseChatRequest(BaseModel):
message: str
phase: str
provider: str = "ollama"
model: str = "llama3.2"
findings: List[Dict[str, Any]] = []
class AttackChainRequest(BaseModel):
findings: List[Dict[str, Any]]
provider: str = "ollama"
model: str = "llama3.2"
class CommandRequest(BaseModel):
command: str
timeout: int = Field(default=300, ge=1, le=3600)
working_dir: str = "/workspace"
parse_output: bool = True
class ScanRequest(BaseModel):
tool: str
target: str
scan_type: Optional[str] = None
options: Dict[str, Any] = Field(default_factory=dict)
class SecurityAnalysisRequest(BaseModel):
target: str
analysis_type: Literal["recon", "vulnerability", "exploit_research", "report"]
options: Optional[dict] = None
class TaskStatus(BaseModel):
task_id: str
status: Literal["pending", "running", "completed", "failed"]
result: Optional[Any] = None
error: Optional[str] = None
progress: int = 0
# ============== Security Tool Definitions ==============
SECURITY_TOOLS = {
"nmap": {
"name": "nmap",
"description": "Network scanner and security auditing tool",
"category": "reconnaissance",
"templates": {
"quick": "nmap -T4 -F {target}",
"full": "nmap -sV -sC -O -p- {target}",
"stealth": "nmap -sS -T2 -f {target}",
"vuln": "nmap --script vuln {target}",
"version": "nmap -sV -p {ports} {target}",
}
},
"nikto": {
"name": "nikto",
"description": "Web server vulnerability scanner",
"category": "vulnerability_scanning",
"templates": {
"default": "nikto -h {target}",
"ssl": "nikto -h {target} -ssl",
"full": "nikto -h {target} -C all",
}
},
"gobuster": {
"name": "gobuster",
"description": "Directory/file brute-forcing",
"category": "web_testing",
"templates": {
"dir": "gobuster dir -u {target} -w /usr/share/wordlists/dirb/common.txt -q",
"dns": "gobuster dns -d {target} -w /usr/share/wordlists/dns/subdomains-top1million-5000.txt",
}
},
"sqlmap": {
"name": "sqlmap",
"description": "SQL injection detection and exploitation",
"category": "vulnerability_scanning",
"templates": {
"test": "sqlmap -u '{target}' --batch --level=1",
"dbs": "sqlmap -u '{target}' --batch --dbs",
}
},
"whatweb": {
"name": "whatweb",
"description": "Web technology fingerprinting",
"category": "reconnaissance",
"templates": {
"default": "whatweb {target}",
"aggressive": "whatweb -a 3 {target}",
}
},
"searchsploit": {
"name": "searchsploit",
"description": "Exploit database search",
"category": "exploitation",
"templates": {
"search": "searchsploit {query}",
"json": "searchsploit -j {query}",
}
}
}
# System prompts for different security tasks
SECURITY_PROMPTS = {
"recon": """You are a penetration testing assistant specializing in reconnaissance.
Analyze the target and suggest reconnaissance techniques. Focus on:
- OSINT gathering
- DNS enumeration
- Subdomain discovery
- Port scanning strategies
- Technology fingerprinting
Always emphasize legal and ethical considerations.""",
"vulnerability": """You are a vulnerability assessment specialist.
Analyze the provided information and identify potential vulnerabilities:
- Common CVEs that may apply
- Misconfigurations
- Weak authentication mechanisms
- Input validation issues
Provide severity ratings and remediation suggestions.""",
"exploit_research": """You are a security researcher focused on exploit analysis.
Given the vulnerability information:
- Explain the technical details of the vulnerability
- Describe potential exploitation techniques
- Suggest proof-of-concept approaches
- Recommend detection and prevention methods
Always include responsible disclosure considerations.""",
"report": """You are a security report writer.
Create a professional security assessment report including:
- Executive summary
- Technical findings
- Risk ratings
- Remediation recommendations
- Timeline for fixes
Format the report in clear, professional language.""",
"command_assist": """You are a security command expert. Analyze the user's request and:
1. Suggest the most appropriate security tool and command
2. Explain what the command does
3. Describe expected output
4. Note any safety considerations
If the user wants to run a scan, extract:
- tool: The security tool to use (nmap, nikto, gobuster, etc.)
- target: The target IP, hostname, or URL
- scan_type: The type of scan (quick, full, etc.)
Respond in JSON format when suggesting a command:
{
"tool": "tool_name",
"target": "target_value",
"scan_type": "scan_type",
"explanation": "What this will do",
"command": "The full command"
}"""
}
# Phase-specific context-aware prompts (HackGpt-style)
PHASE_PROMPTS = {
"recon": """You are HackGPT operating in **Phase 1: Reconnaissance**.
Your role is to assist with passive and active information gathering:
- OSINT techniques (theHarvester, Maltego, Shodan)
- DNS enumeration (amass, subfinder, dnsenum)
- Port scanning strategies (nmap, masscan)
- Technology fingerprinting (whatweb, wappalyzer)
- Google dorking and search operators
For each response:
1. Provide actionable reconnaissance steps
2. Suggest specific tools and commands
3. Explain what information each technique reveals
4. Assign a risk relevance score (1-10) based on potential attack surface
Keep responses focused on information gathering. Do not suggest exploitation yet.""",
"scanning": """You are HackGPT operating in **Phase 2: Scanning & Enumeration**.
Your role is to assist with in-depth service enumeration:
- Service version detection (nmap -sV)
- Banner grabbing and fingerprinting
- Directory brute-forcing (gobuster, ffuf, dirb)
- SMB/NetBIOS enumeration (enum4linux, smbclient)
- SNMP enumeration
For each response:
1. Build on reconnaissance findings
2. Suggest detailed enumeration commands
3. Identify potential attack vectors from enumeration
4. Assign a risk score based on exposed services (1-10)
Focus on gathering detailed service information.""",
"vuln": """You are HackGPT operating in **Phase 3: Vulnerability Assessment**.
Your role is to identify and assess vulnerabilities:
- CVE identification and CVSS scoring
- Automated vulnerability scanning (nikto, nuclei, nessus)
- Web application testing (OWASP Top 10)
- SQL injection detection (sqlmap)
- Configuration analysis
For each finding, provide:
1. Vulnerability title and description
2. CVSS score estimate (0.0-10.0) with severity label
3. Affected components
4. Potential impact
5. Remediation recommendations
Format findings as structured data when possible.""",
"exploit": """You are HackGPT operating in **Phase 4: Exploitation**.
Your role is to safely demonstrate vulnerability impact:
- Exploit research (searchsploit, exploit-db)
- Proof-of-concept development
- Credential attacks (hydra, medusa)
- Post-exploitation enumeration
CRITICAL SAFETY RULES:
1. Only suggest exploitation with explicit authorization
2. Prefer non-destructive PoC approaches
3. Always have a rollback plan
4. Document every exploitation attempt
Provide exploitation strategies with:
- Success probability estimate
- Required prerequisites
- Detection likelihood
- Impact demonstration goals""",
"report": """You are HackGPT operating in **Phase 5: Reporting**.
Your role is to create professional security documentation:
- Executive summaries for stakeholders
- Technical reports for remediation teams
- Risk matrices and prioritization
- Compliance mapping (OWASP, NIST, PCI-DSS)
Structure reports with:
1. Executive Summary (business impact, key findings)
2. Scope and Methodology
3. Findings (sorted by severity: Critical > High > Medium > Low)
4. Risk Assessment Matrix
5. Remediation Recommendations with timelines
6. Appendices (raw data, tool outputs)
Use professional language appropriate for security assessments.""",
"retest": """You are HackGPT operating in **Phase 6: Retesting & Verification**.
Your role is to verify remediation effectiveness:
- Re-run previous vulnerability scans
- Validate patches and fixes
- Regression testing
- Delta analysis (before/after)
For each retest:
1. Reference the original finding
2. Describe the verification approach
3. Confirm fix status (Resolved/Partial/Unresolved)
4. Note any new issues introduced
5. Update risk scores accordingly
Focus on validation and verification procedures."""
}
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {"status": "healthy", "service": "hackgpt-api"}
@app.post("/chat")
async def security_chat(request: ChatRequest):
"""Chat with security-focused AI assistant"""
messages = [
{
"role": "system",
"content": """You are HackGPT, an AI assistant specialized in cybersecurity,
penetration testing, and security research. You provide educational information
about security concepts, tools, and techniques. Always emphasize ethical hacking
principles and legal considerations. You help security professionals understand
vulnerabilities and defenses."""
}
]
if request.context:
messages.append({"role": "system", "content": f"Context: {request.context}"})
messages.append({"role": "user", "content": request.message})
async with httpx.AsyncClient() as client:
try:
response = await client.post(
f"{LLM_ROUTER_URL}/chat",
json={
"provider": request.provider,
"model": request.model,
"messages": messages,
"temperature": 0.7,
"max_tokens": 2048
},
timeout=120.0
)
if response.status_code != 200:
raise HTTPException(status_code=response.status_code, detail=response.text)
return response.json()
except httpx.ConnectError:
raise HTTPException(status_code=503, detail="LLM Router service not available")
@app.post("/chat/phase")
async def phase_aware_chat(request: PhaseChatRequest):
"""Phase-aware chat with context from current pentest phase"""
phase_prompt = PHASE_PROMPTS.get(request.phase, PHASE_PROMPTS["recon"])
# Build context from findings if available
findings_context = ""
if request.findings:
findings_summary = []
for f in request.findings[-10:]: # Last 10 findings
severity = f.get("severity", "info")
title = f.get("title") or f.get("raw", "Unknown finding")
findings_summary.append(f"- [{severity.upper()}] {title}")
if findings_summary:
findings_context = f"\n\nCurrent Findings:\n" + "\n".join(findings_summary)
messages = [
{"role": "system", "content": phase_prompt + findings_context},
{"role": "user", "content": request.message}
]
async with httpx.AsyncClient() as client:
try:
response = await client.post(
f"{LLM_ROUTER_URL}/chat",
json={
"provider": request.provider,
"model": request.model,
"messages": messages,
"temperature": 0.7,
"max_tokens": 2048
},
timeout=120.0
)
if response.status_code != 200:
raise HTTPException(status_code=response.status_code, detail=response.text)
result = response.json()
# Try to extract findings and risk score from response
content = result.get("content", "")
risk_score = None
extracted_findings = []
# Simple extraction of risk scores mentioned in response
import re
risk_match = re.search(r'risk\s*(?:score|rating)?[:\s]*(\d+(?:\.\d+)?)\s*(?:/\s*10)?', content, re.IGNORECASE)
if risk_match:
try:
risk_score = float(risk_match.group(1))
if risk_score > 10:
risk_score = risk_score / 10
except:
pass
# Extract any severity-labeled findings
severity_patterns = [
(r'\[CRITICAL\]\s*(.+?)(?:\n|$)', 'critical'),
(r'\[HIGH\]\s*(.+?)(?:\n|$)', 'high'),
(r'\[MEDIUM\]\s*(.+?)(?:\n|$)', 'medium'),
(r'\[LOW\]\s*(.+?)(?:\n|$)', 'low'),
]
for pattern, severity in severity_patterns:
matches = re.findall(pattern, content, re.IGNORECASE)
for match in matches:
extracted_findings.append({
"id": f"ai-{len(extracted_findings)}",
"title": match.strip()[:100],
"severity": severity
})
result["risk_score"] = risk_score
result["findings"] = extracted_findings[:5] # Limit to 5
return result
except httpx.ConnectError:
raise HTTPException(status_code=503, detail="LLM Router service not available")
@app.post("/attack-chains")
async def analyze_attack_chains(request: AttackChainRequest):
"""Analyze findings to identify attack chains using AI"""
if not request.findings:
return {"attack_chains": []}
# Build findings summary for AI analysis
findings_text = []
for f in request.findings:
severity = f.get("severity", "info")
title = f.get("title") or f.get("raw", "Unknown")
tool = f.get("tool", "unknown")
target = f.get("target", "unknown")
findings_text.append(f"- [{severity.upper()}] {title} (found by {tool} on {target})")
prompt = f"""Analyze these security findings and identify potential attack chains.
An attack chain is a sequence of vulnerabilities that can be combined for greater impact.
Findings:
{chr(10).join(findings_text)}
For each attack chain identified, provide:
1. Chain name
2. Step-by-step attack path
3. Combined risk score (1-10)
4. Likelihood of success (0.0-1.0)
5. Overall impact
6. Recommendation priority
Respond in this JSON format:
{{
"attack_chains": [
{{
"name": "Chain name",
"risk_score": 8.5,
"likelihood": 0.7,
"impact": "Description of impact",
"recommendation": "Priority recommendation",
"steps": [
{{"step": 1, "action": "First step", "method": "technique used"}},
{{"step": 2, "action": "Second step", "method": "technique used"}}
]
}}
]
}}
Only return valid JSON."""
messages = [
{"role": "system", "content": "You are a security analyst specializing in attack chain analysis. Respond only with valid JSON."},
{"role": "user", "content": prompt}
]
async with httpx.AsyncClient() as client:
try:
response = await client.post(
f"{LLM_ROUTER_URL}/chat",
json={
"provider": request.provider,
"model": request.model,
"messages": messages,
"temperature": 0.3,
"max_tokens": 2048
},
timeout=120.0
)
if response.status_code != 200:
raise HTTPException(status_code=response.status_code, detail=response.text)
result = response.json()
content = result.get("content", "")
# Try to parse JSON from response
try:
import re
json_match = re.search(r'\{[\s\S]*\}', content)
if json_match:
chains_data = json.loads(json_match.group())
return chains_data
except json.JSONDecodeError:
pass
# Fallback: generate basic chains from high-severity findings
high_severity = [f for f in request.findings if f.get("severity") in ["critical", "high"]]
if high_severity:
return {
"attack_chains": [{
"name": f"Attack via {high_severity[0].get('title', 'vulnerability')[:50]}",
"risk_score": 7.5,
"likelihood": 0.6,
"impact": "Potential system compromise",
"recommendation": "Address high-severity findings first",
"steps": [
{"step": 1, "action": "Exploit initial vulnerability", "method": high_severity[0].get("title", "unknown")[:50]},
{"step": 2, "action": "Establish persistence", "method": "post-exploitation"},
{"step": 3, "action": "Lateral movement", "method": "credential harvesting"}
]
}]
}
return {"attack_chains": []}
except httpx.ConnectError:
raise HTTPException(status_code=503, detail="LLM Router service not available")
# ============== Session Management ==============
def get_or_create_session(session_id: Optional[str] = None) -> str:
"""Get existing session or create a new one."""
if session_id and session_id in sessions:
sessions[session_id]["last_activity"] = datetime.utcnow()
return session_id
new_id = str(uuid.uuid4())
sessions[new_id] = {
"id": new_id,
"created_at": datetime.utcnow(),
"last_activity": datetime.utcnow(),
"messages": [],
"context": {}
}
return new_id
@app.get("/sessions/{session_id}")
async def get_session(session_id: str):
"""Get session details."""
if session_id not in sessions:
raise HTTPException(status_code=404, detail="Session not found")
return sessions[session_id]
@app.post("/sessions/{session_id}/context")
async def update_session_context(session_id: str, context: Dict[str, Any]):
"""Update session context with scan results or other data."""
if session_id not in sessions:
raise HTTPException(status_code=404, detail="Session not found")
sessions[session_id]["context"].update(context)
return {"status": "updated"}
# ============== Command Execution ==============
@app.post("/execute")
async def execute_command(request: CommandRequest):
"""Execute a command in the Kali container."""
async with httpx.AsyncClient() as client:
try:
response = await client.post(
f"{KALI_EXECUTOR_URL}/execute",
json={
"command": request.command,
"timeout": request.timeout,
"working_dir": request.working_dir
},
timeout=float(request.timeout + 30)
)
if response.status_code != 200:
raise HTTPException(status_code=response.status_code, detail=response.text)
result = response.json()
# Parse output if requested and tool is recognized
if request.parse_output:
tool = request.command.split()[0]
parsed = parse_tool_output(tool, result.get("stdout", ""))
result["parsed"] = parsed
return result
except httpx.ConnectError:
raise HTTPException(status_code=503, detail="Kali executor service not available")
except httpx.TimeoutException:
raise HTTPException(status_code=504, detail="Command execution timed out")
# ============== Scan Management ==============
@app.post("/scan")
async def start_scan(request: ScanRequest, background_tasks: BackgroundTasks):
"""Start a security scan."""
tool_config = SECURITY_TOOLS.get(request.tool)
if not tool_config:
raise HTTPException(status_code=400, detail=f"Unknown tool: {request.tool}")
# Build command from template
scan_type = request.scan_type or list(tool_config["templates"].keys())[0]
template = tool_config["templates"].get(scan_type)
if not template:
raise HTTPException(status_code=400, detail=f"Unknown scan type: {scan_type}")
# Format command with target and options
try:
command = template.format(target=request.target, **request.options)
except KeyError as e:
raise HTTPException(status_code=400, detail=f"Missing required option: {e}")
# Create scan task
scan_id = str(uuid.uuid4())
scan_results[scan_id] = {
"scan_id": scan_id,
"tool": request.tool,
"target": request.target,
"scan_type": scan_type,
"command": command,
"status": "pending",
"started_at": datetime.utcnow().isoformat(),
"result": None,
"parsed": None
}
background_tasks.add_task(run_scan, scan_id, command, request.tool)
return {"scan_id": scan_id, "status": "pending", "command": command}
async def run_scan(scan_id: str, command: str, tool: str):
"""Run scan in background."""
scan_results[scan_id]["status"] = "running"
try:
async with httpx.AsyncClient() as client:
response = await client.post(
f"{KALI_EXECUTOR_URL}/execute",
json={"command": command, "timeout": 600, "working_dir": "/workspace"},
timeout=660.0
)
if response.status_code == 200:
result = response.json()
scan_results[scan_id]["status"] = "completed"
scan_results[scan_id]["result"] = result
scan_results[scan_id]["completed_at"] = datetime.utcnow().isoformat()
# Parse output
parsed = parse_tool_output(tool, result.get("stdout", ""))
scan_results[scan_id]["parsed"] = parsed
else:
scan_results[scan_id]["status"] = "failed"
scan_results[scan_id]["error"] = response.text
except Exception as e:
scan_results[scan_id]["status"] = "failed"
scan_results[scan_id]["error"] = str(e)
@app.get("/scan/{scan_id}")
async def get_scan_result(scan_id: str):
"""Get scan results."""
if scan_id not in scan_results:
raise HTTPException(status_code=404, detail="Scan not found")
return scan_results[scan_id]
@app.get("/scans")
async def list_scans():
"""List all scans."""
return list(scan_results.values())
@app.delete("/scans/clear")
async def clear_scans():
"""Clear all scan history."""
global scan_results
scan_results = {}
return {"status": "cleared", "message": "All scan history cleared"}
# ============== Output Parsing ==============
def parse_tool_output(tool: str, output: str) -> Dict[str, Any]:
"""Parse output from security tools."""
tool = tool.lower()
if tool == "nmap":
return parse_nmap_output(output)
elif tool == "nikto":
return parse_nikto_output(output)
elif tool == "gobuster":
return parse_gobuster_output(output)
return {"raw": output}
def parse_nmap_output(output: str) -> Dict[str, Any]:
"""Parse nmap output."""
import re
results = {"hosts": [], "raw": output}
current_host = None
for line in output.split('\n'):
line = line.strip()
if 'Nmap scan report for' in line:
if current_host:
results["hosts"].append(current_host)
match = re.search(r'for (\S+)(?: \((\d+\.\d+\.\d+\.\d+)\))?', line)
if match:
current_host = {
"hostname": match.group(1),
"ip": match.group(2) or match.group(1),
"ports": [],
"os": None
}
elif current_host and re.match(r'^\d+/(tcp|udp)', line):
parts = line.split()
if len(parts) >= 3:
port_proto = parts[0].split('/')
current_host["ports"].append({
"port": int(port_proto[0]),
"protocol": port_proto[1],
"state": parts[1],
"service": parts[2] if len(parts) > 2 else "unknown",
"version": ' '.join(parts[3:]) if len(parts) > 3 else None
})
if current_host:
results["hosts"].append(current_host)
return results
def parse_nikto_output(output: str) -> Dict[str, Any]:
"""Parse nikto output."""
results = {"findings": [], "server_info": {}, "raw": output}
for line in output.split('\n'):
line = line.strip()
if '+ Target IP:' in line:
results["server_info"]["ip"] = line.split(':')[-1].strip()
elif '+ Server:' in line:
results["server_info"]["server"] = line.split(':', 1)[-1].strip()
elif line.startswith('+') and ':' in line:
if not any(skip in line for skip in ['Target IP', 'Server:', 'Start Time']):
severity = "info"
if any(w in line.lower() for w in ['vulnerable', 'exploit']):
severity = "high"
elif any(w in line.lower() for w in ['outdated', 'insecure']):
severity = "medium"
results["findings"].append({
"raw": line[1:].strip(),
"severity": severity
})
return results
def parse_gobuster_output(output: str) -> Dict[str, Any]:
"""Parse gobuster output."""
import re
results = {"findings": [], "directories": [], "files": [], "raw": output}
for line in output.split('\n'):
match = re.search(r'^(/\S*)\s+\(Status:\s*(\d+)\)', line.strip())
if match:
finding = {
"path": match.group(1),
"status": int(match.group(2))
}
results["findings"].append(finding)
if finding["path"].endswith('/'):
results["directories"].append(finding["path"])
else:
results["files"].append(finding["path"])
return results
# ============== AI-Assisted Scanning ==============
@app.post("/ai-scan")
async def ai_assisted_scan(request: ChatRequest, background_tasks: BackgroundTasks):
"""Use AI to determine and run appropriate scan."""
# Get AI suggestion
messages = [
{"role": "system", "content": SECURITY_PROMPTS["command_assist"]},
{"role": "user", "content": request.message}
]
async with httpx.AsyncClient() as client:
try:
response = await client.post(
f"{LLM_ROUTER_URL}/chat",
json={
"provider": request.provider,
"model": request.model,
"messages": messages,
"temperature": 0.3,
"max_tokens": 1024
},
timeout=60.0
)
if response.status_code != 200:
raise HTTPException(status_code=response.status_code, detail=response.text)
ai_response = response.json()
content = ai_response.get("content", "")
# Try to parse JSON from response
try:
import re
json_match = re.search(r'\{[^{}]*\}', content, re.DOTALL)
if json_match:
suggestion = json.loads(json_match.group())
# If we have a valid tool and target, start the scan
if suggestion.get("tool") and suggestion.get("target"):
scan_request = ScanRequest(
tool=suggestion["tool"],
target=suggestion["target"],
scan_type=suggestion.get("scan_type")
)
return await start_scan(scan_request, background_tasks)
return {"suggestion": suggestion, "ai_response": content}
except json.JSONDecodeError:
pass
return {"ai_response": content}
except httpx.ConnectError:
raise HTTPException(status_code=503, detail="LLM Router service not available")
@app.post("/analyze")
async def analyze_security(request: SecurityAnalysisRequest, background_tasks: BackgroundTasks):
"""Start a security analysis task"""
task_id = str(uuid.uuid4())
tasks[task_id] = TaskStatus(task_id=task_id, status="pending")
background_tasks.add_task(run_analysis, task_id, request)
return {"task_id": task_id, "status": "pending"}
async def run_analysis(task_id: str, request: SecurityAnalysisRequest):
"""Run security analysis in background"""
tasks[task_id].status = "running"
try:
prompt = SECURITY_PROMPTS.get(request.analysis_type, SECURITY_PROMPTS["recon"])
async with httpx.AsyncClient() as client:
response = await client.post(
f"{LLM_ROUTER_URL}/chat",
json={
"provider": "ollama",
"model": "llama3.2",
"messages": [
{"role": "system", "content": prompt},
{"role": "user", "content": f"Analyze target: {request.target}\nOptions: {request.options}"}
],
"temperature": 0.5,
"max_tokens": 4096
},
timeout=300.0
)
if response.status_code == 200:
data = response.json()
tasks[task_id].status = "completed"
tasks[task_id].result = data.get("content", "")
else:
tasks[task_id].status = "failed"
tasks[task_id].error = response.text
except Exception as e:
tasks[task_id].status = "failed"
tasks[task_id].error = str(e)
@app.get("/task/{task_id}")
async def get_task_status(task_id: str):
"""Get status of a running task"""
if task_id not in tasks:
raise HTTPException(status_code=404, detail="Task not found")
return tasks[task_id]
@app.get("/tools")
async def list_tools():
"""List available security tools and their descriptions"""
return {
"reconnaissance": [
{"name": "nmap", "description": "Network scanner and security auditing tool"},
{"name": "masscan", "description": "Fast TCP port scanner"},
{"name": "amass", "description": "Subdomain enumeration tool"},
{"name": "theHarvester", "description": "OSINT tool for gathering emails, names, subdomains"},
{"name": "whatweb", "description": "Web technology fingerprinting"},
],
"vulnerability_scanning": [
{"name": "nikto", "description": "Web server vulnerability scanner"},
{"name": "nuclei", "description": "Template-based vulnerability scanner"},
{"name": "sqlmap", "description": "SQL injection detection and exploitation"},
{"name": "wpscan", "description": "WordPress vulnerability scanner"},
],
"exploitation": [
{"name": "metasploit", "description": "Penetration testing framework"},
{"name": "searchsploit", "description": "Exploit database search tool"},
{"name": "hydra", "description": "Network login cracker"},
],
"web_testing": [
{"name": "burpsuite", "description": "Web application security testing"},
{"name": "gobuster", "description": "Directory/file brute-forcing"},
{"name": "ffuf", "description": "Fast web fuzzer"},
]
}
@app.post("/suggest-command")
async def suggest_command(request: ChatRequest):
"""Get AI-suggested security commands based on context"""
messages = [
{
"role": "system",
"content": """You are a security command expert. Given the user's request,
suggest appropriate security tool commands. Provide:
1. The exact command to run
2. Explanation of what it does
3. Expected output
4. Safety considerations
Only suggest commands for legitimate security testing purposes."""
},
{"role": "user", "content": request.message}
]
if request.context:
messages.insert(1, {"role": "system", "content": f"Context: {request.context}"})
async with httpx.AsyncClient() as client:
try:
response = await client.post(
f"{LLM_ROUTER_URL}/chat",
json={
"provider": request.provider,
"model": request.model,
"messages": messages,
"temperature": 0.3,
"max_tokens": 1024
},
timeout=60.0
)
if response.status_code != 200:
raise HTTPException(status_code=response.status_code, detail=response.text)
return response.json()
except httpx.ConnectError:
raise HTTPException(status_code=503, detail="LLM Router service not available")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8001)