mirror of
https://github.com/mblanke/GooseStrike.git
synced 2026-03-01 14:00:21 -05:00
1087 lines
35 KiB
Python
1087 lines
35 KiB
Python
"""FastAPI backend for GooseStrike."""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import sqlite3
|
|
import uuid
|
|
from contextlib import contextmanager
|
|
from pathlib import Path
|
|
from typing import Dict, Generator, Iterable, List, Optional
|
|
|
|
from fastapi import FastAPI, HTTPException
|
|
from fastapi.responses import HTMLResponse
|
|
from fastapi.staticfiles import StaticFiles
|
|
from fastapi.templating import Jinja2Templates
|
|
from pydantic import BaseModel, Field
|
|
from starlette.requests import Request
|
|
|
|
from mitre_mapping import MitreSuggestion, generate_attack_suggestions
|
|
import task_queue
|
|
|
|
CORE_SNAPSHOT_DATA = {
|
|
"title": "GooseStrike Core (Docker-ready)",
|
|
"bullets": [
|
|
"🔧 Nmap, Metasploit, SQLMap, Hydra, ZAP",
|
|
"🧠 AI exploit assistant (Claude, HackGPT-ready)",
|
|
"📚 Offline CVE mirroring with update_cve.sh",
|
|
"🗂 ASCII banner, logo, branding kit (PDF)",
|
|
"📜 CVE scan + JSON match script",
|
|
],
|
|
"downloads": [
|
|
"📦 goosestrike-cve-enabled.zip (download link)",
|
|
"🧠 hackgpt-ai-stack.zip with README + architecture",
|
|
],
|
|
}
|
|
|
|
ROADMAP_DATA = [
|
|
{"task": "🐳 Build `docker-compose.goosestrike-full.yml`", "status": "⏳ In progress"},
|
|
{"task": "🧠 HackGPT API container (linked to n8n)", "status": "⏳ Next up"},
|
|
{"task": "🌐 Local CVE API server", "status": "Pending"},
|
|
{"task": "🧬 Claude + HackGPT fallback system", "status": "Pending"},
|
|
{"task": "🔄 n8n workflow `.json` import", "status": "Pending"},
|
|
{"task": "🎯 Target \"prioritizer\" AI agent", "status": "Pending"},
|
|
{"task": "🧭 SVG architecture diagram", "status": "Pending"},
|
|
{"task": "🖥 Dashboard frontend (Armitage-style)", "status": "Optional"},
|
|
{"task": "🔐 C2 bridging to Mythic/Sliver", "status": "Optional"},
|
|
]
|
|
|
|
DB_PATH = Path("db/goosestrike.db")
|
|
EXPLOIT_DB_PATH = Path("db/exploits.db")
|
|
STATIC_DIR = Path("web/static")
|
|
UPLOAD_DIR = STATIC_DIR / "uploads"
|
|
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
|
|
UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
def _resolve_logo_url() -> str:
|
|
"""Return the logo URL supplied by the user or fall back to the crest."""
|
|
|
|
env_logo = os.getenv("GOOSESTRIKE_LOGO")
|
|
if env_logo:
|
|
return env_logo
|
|
|
|
valid_exts = {".svg", ".png", ".jpg", ".jpeg", ".webp"}
|
|
candidates: List[Path] = []
|
|
if UPLOAD_DIR.exists():
|
|
for candidate in UPLOAD_DIR.iterdir():
|
|
if candidate.is_file() and candidate.suffix.lower() in valid_exts:
|
|
candidates.append(candidate)
|
|
|
|
if candidates:
|
|
# Prefer the most recently touched file so new uploads override defaults.
|
|
candidates.sort(key=lambda p: p.stat().st_mtime, reverse=True)
|
|
chosen = candidates[0]
|
|
relative = chosen.relative_to(STATIC_DIR)
|
|
return f"/static/{relative.as_posix()}"
|
|
|
|
default_logo = STATIC_DIR / "uploads" / "official_goosestrike_logo.svg"
|
|
if default_logo.exists():
|
|
relative = default_logo.relative_to(STATIC_DIR)
|
|
return f"/static/{relative.as_posix()}"
|
|
|
|
return "/static/goose_flag_logo.svg"
|
|
|
|
app = FastAPI(title="GooseStrike API", version="0.1.0")
|
|
app.mount("/static", StaticFiles(directory="web/static"), name="static")
|
|
templates = Jinja2Templates(directory="web/templates")
|
|
app.state.logo_url = _resolve_logo_url()
|
|
|
|
|
|
def dict_factory(cursor, row):
|
|
return {col[0]: row[idx] for idx, col in enumerate(cursor.description)}
|
|
|
|
|
|
@contextmanager
|
|
def get_db(db_path: Optional[Path] = None) -> Generator[sqlite3.Connection, None, None]:
|
|
path = db_path or DB_PATH
|
|
conn = sqlite3.connect(path)
|
|
conn.row_factory = dict_factory
|
|
try:
|
|
yield conn
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def initialize_db() -> None:
|
|
with get_db() as conn:
|
|
conn.executescript(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS assets (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
ip TEXT UNIQUE NOT NULL,
|
|
hostname TEXT,
|
|
mac_address TEXT,
|
|
mac_vendor TEXT
|
|
);
|
|
CREATE TABLE IF NOT EXISTS services (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
asset_id INTEGER NOT NULL,
|
|
port INTEGER NOT NULL,
|
|
proto TEXT NOT NULL,
|
|
product TEXT,
|
|
version TEXT,
|
|
extra_json TEXT,
|
|
UNIQUE(asset_id, port, proto),
|
|
FOREIGN KEY(asset_id) REFERENCES assets(id)
|
|
);
|
|
CREATE TABLE IF NOT EXISTS service_cves (
|
|
service_id INTEGER NOT NULL,
|
|
cve_id TEXT NOT NULL,
|
|
PRIMARY KEY(service_id, cve_id),
|
|
FOREIGN KEY(service_id) REFERENCES services(id)
|
|
);
|
|
CREATE TABLE IF NOT EXISTS alerts (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
source TEXT NOT NULL,
|
|
payload_json TEXT NOT NULL,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
);
|
|
CREATE TABLE IF NOT EXISTS scan_runs (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
asset_id INTEGER NOT NULL,
|
|
scan_uuid TEXT,
|
|
scanner TEXT,
|
|
mode TEXT,
|
|
started_at TEXT,
|
|
completed_at TEXT,
|
|
notes TEXT,
|
|
raw_payload TEXT,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
FOREIGN KEY(asset_id) REFERENCES assets(id)
|
|
);
|
|
CREATE TABLE IF NOT EXISTS scan_services (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
scan_run_id INTEGER NOT NULL,
|
|
port INTEGER,
|
|
proto TEXT,
|
|
product TEXT,
|
|
version TEXT,
|
|
extra_json TEXT,
|
|
cves_json TEXT,
|
|
FOREIGN KEY(scan_run_id) REFERENCES scan_runs(id)
|
|
);
|
|
CREATE TABLE IF NOT EXISTS attack_suggestions (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
asset_id INTEGER NOT NULL,
|
|
scan_run_id INTEGER,
|
|
technique_id TEXT NOT NULL,
|
|
tactic TEXT NOT NULL,
|
|
name TEXT NOT NULL,
|
|
description TEXT NOT NULL,
|
|
related_cve TEXT,
|
|
severity TEXT,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
FOREIGN KEY(asset_id) REFERENCES assets(id),
|
|
FOREIGN KEY(scan_run_id) REFERENCES scan_runs(id)
|
|
);
|
|
"""
|
|
)
|
|
_ensure_column(conn, "assets", "mac_address", "TEXT")
|
|
_ensure_column(conn, "assets", "mac_vendor", "TEXT")
|
|
conn.commit()
|
|
|
|
|
|
def _ensure_column(conn: sqlite3.Connection, table: str, column: str, ddl: str) -> None:
|
|
columns = {row["name"] for row in conn.execute(f"PRAGMA table_info({table})").fetchall()}
|
|
if column not in columns:
|
|
conn.execute(f"ALTER TABLE {table} ADD COLUMN {column} {ddl}")
|
|
|
|
|
|
class ServiceIn(BaseModel):
|
|
port: int
|
|
proto: str = Field(..., regex=r"^[a-z0-9]+$")
|
|
product: Optional[str] = None
|
|
version: Optional[str] = None
|
|
extra: dict = Field(default_factory=dict)
|
|
cves: List[str] = Field(default_factory=list)
|
|
|
|
|
|
class ScanMetaIn(BaseModel):
|
|
scan_id: Optional[str] = None
|
|
scanner: Optional[str] = None
|
|
mode: Optional[str] = None
|
|
started_at: Optional[str] = None
|
|
completed_at: Optional[str] = None
|
|
notes: Optional[str] = None
|
|
|
|
|
|
class ScanIn(BaseModel):
|
|
ip: str
|
|
hostname: Optional[str] = None
|
|
mac_address: Optional[str] = Field(default=None, regex=r"^[0-9A-Fa-f:]{12,17}$")
|
|
mac_vendor: Optional[str] = None
|
|
scan: ScanMetaIn = Field(default_factory=ScanMetaIn)
|
|
services: List[ServiceIn]
|
|
|
|
|
|
class VulnerabilityOut(BaseModel):
|
|
cve_id: str
|
|
description: Optional[str]
|
|
severity: Optional[str]
|
|
score: Optional[float]
|
|
|
|
|
|
class ServiceOut(BaseModel):
|
|
id: int
|
|
port: int
|
|
proto: str
|
|
product: Optional[str]
|
|
version: Optional[str]
|
|
extra: dict
|
|
cves: List[str]
|
|
vulnerabilities: List[VulnerabilityOut]
|
|
|
|
|
|
class AssetOut(BaseModel):
|
|
id: int
|
|
ip: str
|
|
hostname: Optional[str]
|
|
mac_address: Optional[str]
|
|
mac_vendor: Optional[str]
|
|
services: List[ServiceOut]
|
|
|
|
|
|
class ScanServiceOut(BaseModel):
|
|
port: int
|
|
proto: str
|
|
product: Optional[str]
|
|
version: Optional[str]
|
|
extra: dict
|
|
cves: List[str]
|
|
|
|
|
|
class ScanRunOut(BaseModel):
|
|
id: int
|
|
asset_id: int
|
|
asset_ip: str
|
|
scan_id: Optional[str]
|
|
scanner: Optional[str]
|
|
mode: Optional[str]
|
|
started_at: Optional[str]
|
|
completed_at: Optional[str]
|
|
created_at: str
|
|
notes: Optional[str]
|
|
services: List[ScanServiceOut]
|
|
|
|
|
|
class CoreSnapshotOut(BaseModel):
|
|
title: str
|
|
bullets: List[str]
|
|
downloads: List[str]
|
|
|
|
|
|
class RoadmapItemOut(BaseModel):
|
|
task: str
|
|
status: str
|
|
|
|
|
|
class TaskCreateIn(BaseModel):
|
|
tool: str = Field(..., description="Runner to execute (sqlmap, hashcat, etc.)")
|
|
target: Optional[str] = Field(default=None, description="Human-friendly target reference")
|
|
params: dict = Field(default_factory=dict, description="Tool-specific parameters")
|
|
|
|
|
|
class TaskOut(BaseModel):
|
|
id: int
|
|
tool: str
|
|
target: Optional[str]
|
|
params: dict
|
|
status: str
|
|
created_at: str
|
|
started_at: Optional[str]
|
|
finished_at: Optional[str]
|
|
result: Optional[dict]
|
|
|
|
|
|
class TaskStatusUpdateIn(BaseModel):
|
|
status: str
|
|
result: Optional[dict] = None
|
|
|
|
|
|
class AttackSuggestionOut(BaseModel):
|
|
id: int
|
|
asset_id: int
|
|
asset_ip: str
|
|
scan_run_id: Optional[int]
|
|
technique_id: str
|
|
tactic: str
|
|
name: str
|
|
description: str
|
|
related_cve: Optional[str]
|
|
severity: Optional[str]
|
|
created_at: str
|
|
|
|
|
|
class AlertOut(BaseModel):
|
|
source: str
|
|
payload: dict
|
|
created_at: str
|
|
|
|
|
|
class MockDashboardOut(BaseModel):
|
|
core_snapshot: CoreSnapshotOut
|
|
roadmap: List[RoadmapItemOut]
|
|
assets: List[AssetOut]
|
|
scans: List[ScanRunOut]
|
|
attack_suggestions: List[AttackSuggestionOut]
|
|
tasks: List[TaskOut]
|
|
alerts: List[AlertOut]
|
|
|
|
|
|
@app.on_event("startup")
|
|
async def startup_event() -> None:
|
|
initialize_db()
|
|
task_queue.ensure_tables()
|
|
|
|
|
|
def upsert_asset(conn: sqlite3.Connection, scan: ScanIn) -> int:
|
|
conn.execute(
|
|
"INSERT INTO assets(ip, hostname) VALUES(?, ?) ON CONFLICT(ip) DO UPDATE SET hostname=excluded.hostname",
|
|
(scan.ip, scan.hostname),
|
|
)
|
|
conn.execute(
|
|
"UPDATE assets SET mac_address=COALESCE(?, mac_address), mac_vendor=COALESCE(?, mac_vendor) WHERE ip=?",
|
|
(scan.mac_address, scan.mac_vendor, scan.ip),
|
|
)
|
|
row = conn.execute("SELECT id FROM assets WHERE ip=?", (scan.ip,)).fetchone()
|
|
return row["id"]
|
|
|
|
|
|
def upsert_service(conn: sqlite3.Connection, asset_id: int, service: ServiceIn) -> int:
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO services(asset_id, port, proto, product, version, extra_json)
|
|
VALUES(?,?,?,?,?,?)
|
|
ON CONFLICT(asset_id, port, proto) DO UPDATE SET
|
|
product=excluded.product,
|
|
version=excluded.version,
|
|
extra_json=excluded.extra_json
|
|
""",
|
|
(
|
|
asset_id,
|
|
service.port,
|
|
service.proto,
|
|
service.product,
|
|
service.version,
|
|
json.dumps(service.extra),
|
|
),
|
|
)
|
|
row = conn.execute(
|
|
"SELECT id FROM services WHERE asset_id=? AND port=? AND proto=?",
|
|
(asset_id, service.port, service.proto),
|
|
).fetchone()
|
|
service_id = row["id"]
|
|
conn.execute("DELETE FROM service_cves WHERE service_id=?", (service_id,))
|
|
for cve_id in service.cves:
|
|
conn.execute(
|
|
"INSERT INTO service_cves(service_id, cve_id) VALUES(?, ?)",
|
|
(service_id, cve_id),
|
|
)
|
|
return service_id
|
|
|
|
|
|
def record_scan_run(conn: sqlite3.Connection, asset_id: int, payload: ScanIn) -> int:
|
|
scan_meta = payload.scan or ScanMetaIn()
|
|
scan_uuid = scan_meta.scan_id or str(uuid.uuid4())
|
|
cursor = conn.execute(
|
|
"""
|
|
INSERT INTO scan_runs(asset_id, scan_uuid, scanner, mode, started_at, completed_at, notes, raw_payload)
|
|
VALUES(?,?,?,?,?,?,?,?)
|
|
""",
|
|
(
|
|
asset_id,
|
|
scan_uuid,
|
|
scan_meta.scanner,
|
|
scan_meta.mode,
|
|
scan_meta.started_at,
|
|
scan_meta.completed_at,
|
|
scan_meta.notes,
|
|
json.dumps(payload.dict()),
|
|
),
|
|
)
|
|
scan_run_id = cursor.lastrowid
|
|
for service in payload.services:
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO scan_services(scan_run_id, port, proto, product, version, extra_json, cves_json)
|
|
VALUES(?,?,?,?,?,?,?)
|
|
""",
|
|
(
|
|
scan_run_id,
|
|
service.port,
|
|
service.proto,
|
|
service.product,
|
|
service.version,
|
|
json.dumps(service.extra),
|
|
json.dumps(service.cves),
|
|
),
|
|
)
|
|
return scan_run_id
|
|
|
|
|
|
def store_attack_suggestions(conn: sqlite3.Connection, asset_id: int, scan_run_id: int, payload: ScanIn) -> None:
|
|
conn.execute(
|
|
"DELETE FROM attack_suggestions WHERE asset_id=? AND scan_run_id=?",
|
|
(asset_id, scan_run_id),
|
|
)
|
|
suggestions: List[MitreSuggestion] = generate_attack_suggestions(payload.ip, payload.services)
|
|
for suggestion in suggestions:
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO attack_suggestions(asset_id, scan_run_id, technique_id, tactic, name, description, related_cve, severity)
|
|
VALUES(?,?,?,?,?,?,?,?)
|
|
""",
|
|
(
|
|
asset_id,
|
|
scan_run_id,
|
|
suggestion.technique_id,
|
|
suggestion.tactic,
|
|
suggestion.name,
|
|
suggestion.description,
|
|
suggestion.related_cve,
|
|
suggestion.severity,
|
|
),
|
|
)
|
|
|
|
|
|
@app.post("/ingest/scan", response_model=AssetOut)
|
|
async def ingest_scan(payload: ScanIn) -> AssetOut:
|
|
with get_db() as conn:
|
|
asset_id = upsert_asset(conn, payload)
|
|
services_out: List[ServiceOut] = []
|
|
for service in payload.services:
|
|
service_id = upsert_service(conn, asset_id, service)
|
|
services_out.append(
|
|
ServiceOut(
|
|
id=service_id,
|
|
port=service.port,
|
|
proto=service.proto,
|
|
product=service.product,
|
|
version=service.version,
|
|
extra=service.extra,
|
|
cves=service.cves,
|
|
vulnerabilities=_build_vulnerabilities(service.cves),
|
|
)
|
|
)
|
|
scan_run_id = record_scan_run(conn, asset_id, payload)
|
|
store_attack_suggestions(conn, asset_id, scan_run_id, payload)
|
|
conn.commit()
|
|
return AssetOut(
|
|
id=asset_id,
|
|
ip=payload.ip,
|
|
hostname=payload.hostname,
|
|
mac_address=payload.mac_address,
|
|
mac_vendor=payload.mac_vendor,
|
|
services=services_out,
|
|
)
|
|
|
|
|
|
def _build_vulnerabilities(cve_ids: Iterable[str]) -> List[VulnerabilityOut]:
|
|
details = fetch_cve_details(list(cve_ids))
|
|
return [
|
|
VulnerabilityOut(
|
|
cve_id=cve_id,
|
|
description=info.get("description"),
|
|
severity=info.get("severity"),
|
|
score=info.get("score"),
|
|
)
|
|
for cve_id, info in details.items()
|
|
]
|
|
|
|
|
|
def fetch_cve_details(cve_ids: List[str]) -> Dict[str, Dict[str, Optional[str]]]:
|
|
if not cve_ids or not EXPLOIT_DB_PATH.exists():
|
|
return {cve_id: {"description": None, "severity": None, "score": None} for cve_id in cve_ids}
|
|
placeholders = ",".join("?" for _ in cve_ids)
|
|
conn = sqlite3.connect(EXPLOIT_DB_PATH)
|
|
conn.row_factory = dict_factory
|
|
try:
|
|
rows = conn.execute(
|
|
f"SELECT cve_id, description, severity, score FROM cves WHERE cve_id IN ({placeholders})",
|
|
cve_ids,
|
|
).fetchall()
|
|
finally:
|
|
conn.close()
|
|
lookup = {row["cve_id"]: row for row in rows}
|
|
return {
|
|
cve_id: lookup.get(cve_id, {"description": None, "severity": None, "score": None})
|
|
for cve_id in cve_ids
|
|
}
|
|
|
|
|
|
def build_asset(conn: sqlite3.Connection, asset_row: dict) -> AssetOut:
|
|
services = conn.execute(
|
|
"SELECT * FROM services WHERE asset_id=? ORDER BY port",
|
|
(asset_row["id"],),
|
|
).fetchall()
|
|
service_models: List[ServiceOut] = []
|
|
all_cves: List[str] = []
|
|
for service in services:
|
|
cves = [row["cve_id"] for row in conn.execute(
|
|
"SELECT cve_id FROM service_cves WHERE service_id=?",
|
|
(service["id"],),
|
|
).fetchall()]
|
|
all_cves.extend(cves)
|
|
cve_details = fetch_cve_details(all_cves)
|
|
for service in services:
|
|
cves = [row["cve_id"] for row in conn.execute(
|
|
"SELECT cve_id FROM service_cves WHERE service_id=?",
|
|
(service["id"],),
|
|
).fetchall()]
|
|
service_models.append(
|
|
ServiceOut(
|
|
id=service["id"],
|
|
port=service["port"],
|
|
proto=service["proto"],
|
|
product=service["product"],
|
|
version=service["version"],
|
|
extra=json.loads(service["extra_json"] or "{}"),
|
|
cves=cves,
|
|
vulnerabilities=[
|
|
VulnerabilityOut(
|
|
cve_id=cve,
|
|
description=cve_details[cve]["description"],
|
|
severity=cve_details[cve]["severity"],
|
|
score=cve_details[cve]["score"],
|
|
)
|
|
for cve in cves
|
|
],
|
|
)
|
|
)
|
|
return AssetOut(
|
|
id=asset_row["id"],
|
|
ip=asset_row["ip"],
|
|
hostname=asset_row["hostname"],
|
|
mac_address=asset_row.get("mac_address"),
|
|
mac_vendor=asset_row.get("mac_vendor"),
|
|
services=service_models,
|
|
)
|
|
|
|
|
|
@app.get("/assets", response_model=List[AssetOut])
|
|
async def list_assets() -> List[AssetOut]:
|
|
with get_db() as conn:
|
|
assets = conn.execute("SELECT * FROM assets ORDER BY ip").fetchall()
|
|
return [build_asset(conn, asset) for asset in assets]
|
|
|
|
|
|
@app.get("/assets/{asset_id}", response_model=AssetOut)
|
|
async def get_asset(asset_id: int) -> AssetOut:
|
|
with get_db() as conn:
|
|
asset = conn.execute("SELECT * FROM assets WHERE id=?", (asset_id,)).fetchone()
|
|
if not asset:
|
|
raise HTTPException(status_code=404, detail="Asset not found")
|
|
return build_asset(conn, asset)
|
|
|
|
|
|
class CVEOut(BaseModel):
|
|
cve_id: str
|
|
description: Optional[str]
|
|
severity: Optional[str]
|
|
score: Optional[float]
|
|
exploits: List[dict]
|
|
|
|
|
|
@app.get("/cve/{cve_id}", response_model=CVEOut)
|
|
async def get_cve(cve_id: str) -> CVEOut:
|
|
if not EXPLOIT_DB_PATH.exists():
|
|
raise HTTPException(status_code=404, detail="CVE database not initialized")
|
|
conn = sqlite3.connect(EXPLOIT_DB_PATH)
|
|
conn.row_factory = dict_factory
|
|
try:
|
|
cve = conn.execute("SELECT * FROM cves WHERE cve_id=?", (cve_id,)).fetchone()
|
|
if not cve:
|
|
raise HTTPException(status_code=404, detail="CVE not found")
|
|
exploits = conn.execute(
|
|
"SELECT source, title, reference, path FROM exploits WHERE cve_id=?",
|
|
(cve_id,),
|
|
).fetchall()
|
|
return CVEOut(
|
|
cve_id=cve_id,
|
|
description=cve.get("description"),
|
|
severity=cve.get("severity"),
|
|
score=cve.get("score"),
|
|
exploits=exploits,
|
|
)
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
@app.get("/bycve/{cve_id}/hosts", response_model=List[AssetOut])
|
|
async def hosts_by_cve(cve_id: str) -> List[AssetOut]:
|
|
with get_db() as conn:
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT DISTINCT assets.* FROM assets
|
|
JOIN services ON services.asset_id = assets.id
|
|
JOIN service_cves ON service_cves.service_id = services.id
|
|
WHERE service_cves.cve_id = ?
|
|
""",
|
|
(cve_id,),
|
|
).fetchall()
|
|
return [build_asset(conn, row) for row in rows]
|
|
|
|
|
|
class AlertIn(BaseModel):
|
|
source: str
|
|
payload: dict
|
|
|
|
|
|
@app.post("/webhook/n8n/scan_complete")
|
|
async def webhook_scan_complete(alert: AlertIn) -> dict:
|
|
with get_db() as conn:
|
|
conn.execute(
|
|
"INSERT INTO alerts(source, payload_json) VALUES(?, ?)",
|
|
(alert.source, json.dumps(alert.payload)),
|
|
)
|
|
conn.commit()
|
|
return {"ok": True}
|
|
|
|
|
|
class CVEAlertIn(BaseModel):
|
|
cve_id: str
|
|
critical: bool = False
|
|
|
|
|
|
@app.post("/webhook/n8n/new_cve")
|
|
async def webhook_new_cve(alert: CVEAlertIn) -> dict:
|
|
with get_db() as conn:
|
|
conn.execute(
|
|
"INSERT INTO alerts(source, payload_json) VALUES(?, ?)",
|
|
(
|
|
"n8n:new_cve",
|
|
json.dumps({"cve_id": alert.cve_id, "critical": alert.critical}),
|
|
),
|
|
)
|
|
conn.commit()
|
|
return {"ok": True}
|
|
|
|
|
|
def _row_to_task(row: sqlite3.Row) -> TaskOut:
|
|
data = dict(row)
|
|
return TaskOut(
|
|
id=data["id"],
|
|
tool=data["tool"],
|
|
target=data.get("target"),
|
|
params=json.loads(data.get("params_json") or "{}"),
|
|
status=data["status"],
|
|
created_at=data["created_at"],
|
|
started_at=data.get("started_at"),
|
|
finished_at=data.get("finished_at"),
|
|
result=json.loads(data.get("result_json") or "null") if data.get("result_json") else None,
|
|
)
|
|
|
|
|
|
@app.get("/tasks", response_model=List[TaskOut])
|
|
async def list_tasks_endpoint() -> List[TaskOut]:
|
|
task_queue.ensure_tables()
|
|
with task_queue.get_conn() as conn:
|
|
rows = conn.execute(
|
|
"SELECT * FROM tasks ORDER BY created_at DESC, id DESC LIMIT 200"
|
|
).fetchall()
|
|
return [_row_to_task(row) for row in rows]
|
|
|
|
|
|
@app.get("/tasks/{task_id}", response_model=TaskOut)
|
|
async def get_task(task_id: int) -> TaskOut:
|
|
task_queue.ensure_tables()
|
|
with task_queue.get_conn() as conn:
|
|
row = conn.execute("SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone()
|
|
if not row:
|
|
raise HTTPException(status_code=404, detail="Task not found")
|
|
return _row_to_task(row)
|
|
|
|
|
|
@app.post("/tasks", response_model=TaskOut, status_code=201)
|
|
async def create_task(task: TaskCreateIn) -> TaskOut:
|
|
task_queue.ensure_tables()
|
|
task_id = task_queue.enqueue_task(task.tool, task.target, task.params)
|
|
with task_queue.get_conn() as conn:
|
|
row = conn.execute("SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone()
|
|
return _row_to_task(row)
|
|
|
|
|
|
@app.post("/tasks/{task_id}/status", response_model=TaskOut)
|
|
async def update_task_status_endpoint(task_id: int, payload: TaskStatusUpdateIn) -> TaskOut:
|
|
task_queue.ensure_tables()
|
|
task_queue.update_task_status(task_id, payload.status, payload.result)
|
|
with task_queue.get_conn() as conn:
|
|
row = conn.execute("SELECT * FROM tasks WHERE id=?", (task_id,)).fetchone()
|
|
if not row:
|
|
raise HTTPException(status_code=404, detail="Task not found")
|
|
return _row_to_task(row)
|
|
|
|
|
|
@app.get("/", response_class=HTMLResponse)
|
|
async def ui(request: Request):
|
|
resolved_logo = getattr(app.state, "logo_url", "/static/goose_flag_logo.svg")
|
|
return templates.TemplateResponse(
|
|
"index.html",
|
|
{
|
|
"request": request,
|
|
"logo_url": resolved_logo,
|
|
"spotlight_logo_url": resolved_logo,
|
|
},
|
|
)
|
|
|
|
|
|
@app.get("/core_snapshot", response_model=CoreSnapshotOut)
|
|
async def core_snapshot() -> CoreSnapshotOut:
|
|
return CoreSnapshotOut(**CORE_SNAPSHOT_DATA)
|
|
|
|
|
|
@app.get("/roadmap", response_model=List[RoadmapItemOut])
|
|
async def roadmap() -> List[RoadmapItemOut]:
|
|
return [RoadmapItemOut(**item) for item in ROADMAP_DATA]
|
|
|
|
|
|
@app.get("/mock/dashboard-data", response_model=MockDashboardOut)
|
|
async def mock_dashboard_data() -> MockDashboardOut:
|
|
return build_mock_dashboard_data()
|
|
|
|
|
|
@app.get("/mockup", response_class=HTMLResponse)
|
|
async def mock_dashboard(request: Request):
|
|
resolved_logo = getattr(app.state, "logo_url", "/static/goose_flag_logo.svg")
|
|
mock_payload = build_mock_dashboard_data()
|
|
return templates.TemplateResponse(
|
|
"mock_dashboard.html",
|
|
{
|
|
"request": request,
|
|
"logo_url": resolved_logo,
|
|
"spotlight_logo_url": resolved_logo,
|
|
"mock": mock_payload.dict(),
|
|
},
|
|
)
|
|
|
|
|
|
def build_scan_run(conn: sqlite3.Connection, row: dict) -> ScanRunOut:
|
|
services = conn.execute(
|
|
"SELECT * FROM scan_services WHERE scan_run_id=? ORDER BY port",
|
|
(row["id"],),
|
|
).fetchall()
|
|
return ScanRunOut(
|
|
id=row["id"],
|
|
asset_id=row["asset_id"],
|
|
asset_ip=row["asset_ip"],
|
|
scan_id=row.get("scan_uuid"),
|
|
scanner=row.get("scanner"),
|
|
mode=row.get("mode"),
|
|
started_at=row.get("started_at"),
|
|
completed_at=row.get("completed_at"),
|
|
created_at=row.get("created_at"),
|
|
notes=row.get("notes"),
|
|
services=[
|
|
ScanServiceOut(
|
|
port=svc.get("port"),
|
|
proto=svc.get("proto"),
|
|
product=svc.get("product"),
|
|
version=svc.get("version"),
|
|
extra=json.loads(svc.get("extra_json") or "{}"),
|
|
cves=json.loads(svc.get("cves_json") or "[]"),
|
|
)
|
|
for svc in services
|
|
],
|
|
)
|
|
|
|
|
|
@app.get("/scans", response_model=List[ScanRunOut])
|
|
async def list_scans() -> List[ScanRunOut]:
|
|
with get_db() as conn:
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT scan_runs.*, assets.ip AS asset_ip FROM scan_runs
|
|
JOIN assets ON assets.id = scan_runs.asset_id
|
|
ORDER BY scan_runs.created_at DESC
|
|
LIMIT 200
|
|
"""
|
|
).fetchall()
|
|
return [build_scan_run(conn, row) for row in rows]
|
|
|
|
|
|
@app.get("/scans/{scan_run_id}", response_model=ScanRunOut)
|
|
async def get_scan(scan_run_id: int) -> ScanRunOut:
|
|
with get_db() as conn:
|
|
row = conn.execute(
|
|
"""
|
|
SELECT scan_runs.*, assets.ip AS asset_ip FROM scan_runs
|
|
JOIN assets ON assets.id = scan_runs.asset_id
|
|
WHERE scan_runs.id = ?
|
|
""",
|
|
(scan_run_id,),
|
|
).fetchone()
|
|
if not row:
|
|
raise HTTPException(status_code=404, detail="Scan not found")
|
|
return build_scan_run(conn, row)
|
|
|
|
|
|
def _build_attack_response(rows: List[dict]) -> List[AttackSuggestionOut]:
|
|
return [
|
|
AttackSuggestionOut(
|
|
id=row["id"],
|
|
asset_id=row["asset_id"],
|
|
asset_ip=row["asset_ip"],
|
|
scan_run_id=row.get("scan_run_id"),
|
|
technique_id=row["technique_id"],
|
|
tactic=row["tactic"],
|
|
name=row["name"],
|
|
description=row["description"],
|
|
related_cve=row.get("related_cve"),
|
|
severity=row.get("severity"),
|
|
created_at=row.get("created_at"),
|
|
)
|
|
for row in rows
|
|
]
|
|
|
|
|
|
@app.get("/attack_suggestions", response_model=List[AttackSuggestionOut])
|
|
async def attack_suggestions() -> List[AttackSuggestionOut]:
|
|
with get_db() as conn:
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT attack_suggestions.*, assets.ip AS asset_ip FROM attack_suggestions
|
|
JOIN assets ON assets.id = attack_suggestions.asset_id
|
|
ORDER BY attack_suggestions.created_at DESC
|
|
LIMIT 200
|
|
"""
|
|
).fetchall()
|
|
return _build_attack_response(rows)
|
|
|
|
|
|
def build_mock_dashboard_data() -> MockDashboardOut:
|
|
core_snapshot = CoreSnapshotOut(**CORE_SNAPSHOT_DATA)
|
|
roadmap = [RoadmapItemOut(**item) for item in ROADMAP_DATA]
|
|
assets = [
|
|
AssetOut(
|
|
id=1,
|
|
ip="10.0.0.21",
|
|
hostname="web-canary",
|
|
mac_address="00:25:90:ab:cd:ef",
|
|
mac_vendor="Northern Goose Labs",
|
|
services=[
|
|
ServiceOut(
|
|
id=1,
|
|
port=80,
|
|
proto="tcp",
|
|
product="nginx",
|
|
version="1.23.4",
|
|
extra={"transport": "tcp", "state": "open"},
|
|
cves=["CVE-2023-3124", "CVE-2022-3554"],
|
|
vulnerabilities=[
|
|
VulnerabilityOut(
|
|
cve_id="CVE-2023-3124",
|
|
description="Sample HTTP/2 request smuggling bug",
|
|
severity="High",
|
|
score=8.1,
|
|
),
|
|
VulnerabilityOut(
|
|
cve_id="CVE-2022-3554",
|
|
description="Sample OpenSSL leak",
|
|
severity="Medium",
|
|
score=6.5,
|
|
),
|
|
],
|
|
),
|
|
ServiceOut(
|
|
id=2,
|
|
port=443,
|
|
proto="tcp",
|
|
product="Envoy",
|
|
version="1.26",
|
|
extra={"alpn": "h2"},
|
|
cves=["CVE-2024-1234"],
|
|
vulnerabilities=[
|
|
VulnerabilityOut(
|
|
cve_id="CVE-2024-1234",
|
|
description="Sample TLS parsing issue",
|
|
severity="Critical",
|
|
score=9.4,
|
|
)
|
|
],
|
|
),
|
|
],
|
|
),
|
|
AssetOut(
|
|
id=2,
|
|
ip="10.0.0.45",
|
|
hostname="db-honker",
|
|
mac_address="58:8a:5a:01:23:45",
|
|
mac_vendor="Canada Goose Compute",
|
|
services=[
|
|
ServiceOut(
|
|
id=3,
|
|
port=5432,
|
|
proto="tcp",
|
|
product="PostgreSQL",
|
|
version="14.9",
|
|
extra={"cluster": "atlas"},
|
|
cves=["CVE-2023-2454"],
|
|
vulnerabilities=[
|
|
VulnerabilityOut(
|
|
cve_id="CVE-2023-2454",
|
|
description="Sample privilege escalation",
|
|
severity="High",
|
|
score=8.5,
|
|
)
|
|
],
|
|
)
|
|
],
|
|
),
|
|
]
|
|
|
|
scans = [
|
|
ScanRunOut(
|
|
id=501,
|
|
asset_id=1,
|
|
asset_ip="10.0.0.21",
|
|
scan_id="mock-fast-001",
|
|
scanner="GooseStrike",
|
|
mode="fast",
|
|
started_at="2024-02-07T12:00:00Z",
|
|
completed_at="2024-02-07T12:05:00Z",
|
|
created_at="2024-02-07T12:05:01Z",
|
|
notes="Sample lab scan",
|
|
services=[
|
|
ScanServiceOut(
|
|
port=80,
|
|
proto="tcp",
|
|
product="nginx",
|
|
version="1.23.4",
|
|
extra={"transport": "tcp"},
|
|
cves=["CVE-2023-3124", "CVE-2022-3554"],
|
|
),
|
|
ScanServiceOut(
|
|
port=443,
|
|
proto="tcp",
|
|
product="Envoy",
|
|
version="1.26",
|
|
extra={"alpn": "h2"},
|
|
cves=["CVE-2024-1234"],
|
|
),
|
|
],
|
|
),
|
|
ScanRunOut(
|
|
id=502,
|
|
asset_id=2,
|
|
asset_ip="10.0.0.45",
|
|
scan_id="mock-full-001",
|
|
scanner="GooseStrike",
|
|
mode="full",
|
|
started_at="2024-02-05T09:00:00Z",
|
|
completed_at="2024-02-05T10:00:00Z",
|
|
created_at="2024-02-05T10:00:02Z",
|
|
notes="Baseline DB review",
|
|
services=[
|
|
ScanServiceOut(
|
|
port=5432,
|
|
proto="tcp",
|
|
product="PostgreSQL",
|
|
version="14.9",
|
|
extra={"ssl": "enabled"},
|
|
cves=["CVE-2023-2454"],
|
|
),
|
|
ScanServiceOut(
|
|
port=22,
|
|
proto="tcp",
|
|
product="OpenSSH",
|
|
version="8.9",
|
|
extra={"auth": "password"},
|
|
cves=["CVE-2023-38408"],
|
|
),
|
|
],
|
|
),
|
|
]
|
|
|
|
attack_suggestions = [
|
|
AttackSuggestionOut(
|
|
id=1,
|
|
asset_id=1,
|
|
asset_ip="10.0.0.21",
|
|
scan_run_id=501,
|
|
technique_id="T1190",
|
|
tactic="Initial Access",
|
|
name="Exploit Public-Facing Application",
|
|
description="Use SQLMap or Burp to validate CVE-2023-3124",
|
|
related_cve="CVE-2023-3124",
|
|
severity="High",
|
|
created_at="2024-02-07T12:05:03Z",
|
|
),
|
|
AttackSuggestionOut(
|
|
id=2,
|
|
asset_id=2,
|
|
asset_ip="10.0.0.45",
|
|
scan_run_id=502,
|
|
technique_id="T1059",
|
|
tactic="Execution",
|
|
name="Command Shell",
|
|
description="Use Metasploit postgres payload to leverage CVE-2023-2454",
|
|
related_cve="CVE-2023-2454",
|
|
severity="Medium",
|
|
created_at="2024-02-05T10:00:03Z",
|
|
),
|
|
]
|
|
|
|
tasks = [
|
|
TaskOut(
|
|
id=88,
|
|
tool="sqlmap",
|
|
target="https://web-canary/login",
|
|
params={"risk": 1, "level": 3},
|
|
status="pending",
|
|
created_at="2024-02-07T12:06:00Z",
|
|
started_at=None,
|
|
finished_at=None,
|
|
result=None,
|
|
),
|
|
TaskOut(
|
|
id=89,
|
|
tool="hashcat",
|
|
target="lab-hashes",
|
|
params={"hash_file": "hashes.txt", "wordlist": "rockyou.txt", "mode": 0},
|
|
status="running",
|
|
created_at="2024-02-07T12:06:15Z",
|
|
started_at="2024-02-07T12:06:20Z",
|
|
finished_at=None,
|
|
result=None,
|
|
),
|
|
]
|
|
|
|
alerts = [
|
|
AlertOut(
|
|
source="n8n:new_cve",
|
|
payload={"cve_id": "CVE-2024-1234", "critical": True},
|
|
created_at="2024-02-07T12:07:00Z",
|
|
),
|
|
AlertOut(
|
|
source="n8n:scan_complete",
|
|
payload={"scan_id": "mock-fast-001", "status": "ok"},
|
|
created_at="2024-02-07T12:05:05Z",
|
|
),
|
|
]
|
|
|
|
return MockDashboardOut(
|
|
core_snapshot=core_snapshot,
|
|
roadmap=roadmap,
|
|
assets=assets,
|
|
scans=scans,
|
|
attack_suggestions=attack_suggestions,
|
|
tasks=tasks,
|
|
alerts=alerts,
|
|
)
|
|
|
|
|
|
@app.get("/assets/{asset_id}/attack_suggestions", response_model=List[AttackSuggestionOut])
|
|
async def asset_attack_suggestions(asset_id: int) -> List[AttackSuggestionOut]:
|
|
with get_db() as conn:
|
|
rows = conn.execute(
|
|
"""
|
|
SELECT attack_suggestions.*, assets.ip AS asset_ip FROM attack_suggestions
|
|
JOIN assets ON assets.id = attack_suggestions.asset_id
|
|
WHERE assets.id = ?
|
|
ORDER BY attack_suggestions.created_at DESC
|
|
""",
|
|
(asset_id,),
|
|
).fetchall()
|
|
return _build_attack_response(rows)
|