From 76bee49e67a6144d1de4b7c758ee3ef7c8411c2c Mon Sep 17 00:00:00 2001 From: mblanke Date: Fri, 28 Nov 2025 14:06:45 -0500 Subject: [PATCH] feat: Add network map with OS detection and device icons - Add Network Map tab with D3.js force-directed graph visualization - Device icons: Windows, Linux, macOS, routers, switches, printers, servers - Network range scan modal with ping/quick/OS/full scan options - Parse nmap XML output to detect OS type and open ports - Host details sidebar panel with deep scan option - Draggable network nodes with gateway-centered layout - Auto-refresh and polling for running scans - Infer OS from open ports when nmap OS detection unavailable --- services/dashboard/app/main.py | 312 ++++++++++++++++ services/dashboard/templates/index.html | 460 ++++++++++++++++++++++++ 2 files changed, 772 insertions(+) diff --git a/services/dashboard/app/main.py b/services/dashboard/app/main.py index 10a88d5..48c391c 100644 --- a/services/dashboard/app/main.py +++ b/services/dashboard/app/main.py @@ -74,6 +74,11 @@ class ScanRequest(BaseModel): options: Dict[str, Any] = Field(default_factory=dict) +class NetworkScanRequest(BaseModel): + target: str + scan_type: str = "os" # ping, quick, os, full + + @app.get("/health") async def health_check(): """Health check endpoint""" @@ -363,6 +368,313 @@ async def get_kali_tools(): raise HTTPException(status_code=503, detail="Kali executor not available") +# ============== Network Map Endpoints ============== + +# In-memory store for network scan results +network_scans = {} +network_hosts = [] + +@app.post("/api/network/scan") +async def start_network_scan(request: NetworkScanRequest): + """Start a network range scan for OS detection""" + import uuid + scan_id = str(uuid.uuid4())[:8] + + # Build nmap command based on scan type + scan_commands = { + "ping": f"nmap -sn {request.target} -oX -", + "quick": f"nmap -T4 -F -O --osscan-limit {request.target} -oX -", + "os": f"nmap -O -sV --version-light {request.target} -oX -", + "full": f"nmap -sS -sV -O -p- --version-all {request.target} -oX -" + } + + command = scan_commands.get(request.scan_type, scan_commands["os"]) + + network_scans[scan_id] = { + "scan_id": scan_id, + "target": request.target, + "scan_type": request.scan_type, + "status": "running", + "hosts": [], + "command": command + } + + # Execute scan asynchronously + import asyncio + asyncio.create_task(execute_network_scan(scan_id, command)) + + return {"scan_id": scan_id, "status": "running"} + + +async def execute_network_scan(scan_id: str, command: str): + """Execute network scan and parse results""" + global network_hosts + + try: + async with httpx.AsyncClient() as client: + response = await client.post( + f"{HACKGPT_API_URL}/execute", + json={"command": command, "timeout": 600}, + timeout=610.0 + ) + + if response.status_code == 200: + result = response.json() + stdout = result.get("stdout", "") + + # Parse nmap XML output + hosts = parse_nmap_xml(stdout) + + network_scans[scan_id]["status"] = "completed" + network_scans[scan_id]["hosts"] = hosts + + # Update global host list + for host in hosts: + existing = next((h for h in network_hosts if h["ip"] == host["ip"]), None) + if existing: + existing.update(host) + else: + network_hosts.append(host) + else: + network_scans[scan_id]["status"] = "failed" + network_scans[scan_id]["error"] = response.text + + except Exception as e: + network_scans[scan_id]["status"] = "failed" + network_scans[scan_id]["error"] = str(e) + + +def parse_nmap_xml(xml_output: str) -> List[Dict[str, Any]]: + """Parse nmap XML output to extract hosts with OS info""" + import re + hosts = [] + + # Try XML parsing first + try: + import xml.etree.ElementTree as ET + + # Handle case where XML might have non-XML content before it + xml_start = xml_output.find(' str: + """Detect OS type from nmap OS string""" + if not os_string: + return "" + os_lower = os_string.lower() + + if "windows" in os_lower: + return "Windows" + elif "linux" in os_lower or "ubuntu" in os_lower or "debian" in os_lower or "centos" in os_lower or "red hat" in os_lower: + return "Linux" + elif "mac os" in os_lower or "darwin" in os_lower or "apple" in os_lower or "ios" in os_lower: + return "macOS" + elif "cisco" in os_lower: + return "Cisco Router" + elif "juniper" in os_lower: + return "Juniper Router" + elif "fortinet" in os_lower or "fortigate" in os_lower: + return "Fortinet" + elif "vmware" in os_lower or "esxi" in os_lower: + return "VMware Server" + elif "freebsd" in os_lower: + return "FreeBSD" + elif "android" in os_lower: + return "Android" + elif "printer" in os_lower or "hp" in os_lower: + return "Printer" + elif "switch" in os_lower: + return "Network Switch" + elif "router" in os_lower: + return "Router" + + return "" + + +def infer_os_from_ports(ports: List[Dict]) -> str: + """Infer OS type from open ports""" + port_nums = [p["port"] for p in ports] + services = [p.get("service", "").lower() for p in ports] + products = [p.get("product", "").lower() for p in ports] + + # Windows indicators + windows_ports = {135, 139, 445, 3389, 5985, 5986} + if windows_ports & set(port_nums): + return "Windows" + if any("microsoft" in p or "windows" in p for p in products): + return "Windows" + + # Linux indicators + if 22 in port_nums and "ssh" in services: + return "Linux" + + # Network device indicators + if 161 in port_nums or 162 in port_nums: # SNMP + return "Network Device" + + # Printer + if 9100 in port_nums or 631 in port_nums: + return "Printer" + + return "" + + +def parse_nmap_text(output: str) -> List[Dict[str, Any]]: + """Parse nmap text output as fallback""" + import re + hosts = [] + current_host = None + + for line in output.split('\n'): + # Match host line + host_match = re.search(r'Nmap scan report for (?:(\S+) \()?(\d+\.\d+\.\d+\.\d+)', line) + if host_match: + if current_host and current_host.get("ip"): + hosts.append(current_host) + current_host = { + "ip": host_match.group(2), + "hostname": host_match.group(1) or "", + "os_type": "", + "os_details": "", + "ports": [], + "mac": "", + "vendor": "" + } + continue + + if current_host: + # Match MAC + mac_match = re.search(r'MAC Address: ([0-9A-F:]+) \(([^)]+)\)', line) + if mac_match: + current_host["mac"] = mac_match.group(1) + current_host["vendor"] = mac_match.group(2) + + # Match port + port_match = re.search(r'(\d+)/(tcp|udp)\s+(\w+)\s+(\S+)', line) + if port_match: + current_host["ports"].append({ + "port": int(port_match.group(1)), + "protocol": port_match.group(2), + "state": port_match.group(3), + "service": port_match.group(4) + }) + + # Match OS + os_match = re.search(r'OS details?: (.+)', line) + if os_match: + current_host["os_details"] = os_match.group(1) + current_host["os_type"] = detect_os_type(os_match.group(1)) + + if current_host and current_host.get("ip"): + hosts.append(current_host) + + return hosts + + +@app.get("/api/network/scan/{scan_id}") +async def get_network_scan(scan_id: str): + """Get network scan status and results""" + if scan_id not in network_scans: + raise HTTPException(status_code=404, detail="Scan not found") + return network_scans[scan_id] + + +@app.get("/api/network/hosts") +async def get_network_hosts(): + """Get all discovered network hosts""" + return {"hosts": network_hosts} + + if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8080) \ No newline at end of file diff --git a/services/dashboard/templates/index.html b/services/dashboard/templates/index.html index 20c8192..f5324a9 100644 --- a/services/dashboard/templates/index.html +++ b/services/dashboard/templates/index.html @@ -7,6 +7,7 @@ +