feat: Add network map with OS detection and device icons

- Add Network Map tab with D3.js force-directed graph visualization
- Device icons: Windows, Linux, macOS, routers, switches, printers, servers
- Network range scan modal with ping/quick/OS/full scan options
- Parse nmap XML output to detect OS type and open ports
- Host details sidebar panel with deep scan option
- Draggable network nodes with gateway-centered layout
- Auto-refresh and polling for running scans
- Infer OS from open ports when nmap OS detection unavailable
This commit is contained in:
2025-11-28 14:06:45 -05:00
parent 871eedbcbc
commit 76bee49e67
2 changed files with 772 additions and 0 deletions

View File

@@ -74,6 +74,11 @@ class ScanRequest(BaseModel):
options: Dict[str, Any] = Field(default_factory=dict)
class NetworkScanRequest(BaseModel):
target: str
scan_type: str = "os" # ping, quick, os, full
@app.get("/health")
async def health_check():
"""Health check endpoint"""
@@ -363,6 +368,313 @@ async def get_kali_tools():
raise HTTPException(status_code=503, detail="Kali executor not available")
# ============== Network Map Endpoints ==============
# In-memory store for network scan results
network_scans = {}
network_hosts = []
@app.post("/api/network/scan")
async def start_network_scan(request: NetworkScanRequest):
"""Start a network range scan for OS detection"""
import uuid
scan_id = str(uuid.uuid4())[:8]
# Build nmap command based on scan type
scan_commands = {
"ping": f"nmap -sn {request.target} -oX -",
"quick": f"nmap -T4 -F -O --osscan-limit {request.target} -oX -",
"os": f"nmap -O -sV --version-light {request.target} -oX -",
"full": f"nmap -sS -sV -O -p- --version-all {request.target} -oX -"
}
command = scan_commands.get(request.scan_type, scan_commands["os"])
network_scans[scan_id] = {
"scan_id": scan_id,
"target": request.target,
"scan_type": request.scan_type,
"status": "running",
"hosts": [],
"command": command
}
# Execute scan asynchronously
import asyncio
asyncio.create_task(execute_network_scan(scan_id, command))
return {"scan_id": scan_id, "status": "running"}
async def execute_network_scan(scan_id: str, command: str):
"""Execute network scan and parse results"""
global network_hosts
try:
async with httpx.AsyncClient() as client:
response = await client.post(
f"{HACKGPT_API_URL}/execute",
json={"command": command, "timeout": 600},
timeout=610.0
)
if response.status_code == 200:
result = response.json()
stdout = result.get("stdout", "")
# Parse nmap XML output
hosts = parse_nmap_xml(stdout)
network_scans[scan_id]["status"] = "completed"
network_scans[scan_id]["hosts"] = hosts
# Update global host list
for host in hosts:
existing = next((h for h in network_hosts if h["ip"] == host["ip"]), None)
if existing:
existing.update(host)
else:
network_hosts.append(host)
else:
network_scans[scan_id]["status"] = "failed"
network_scans[scan_id]["error"] = response.text
except Exception as e:
network_scans[scan_id]["status"] = "failed"
network_scans[scan_id]["error"] = str(e)
def parse_nmap_xml(xml_output: str) -> List[Dict[str, Any]]:
"""Parse nmap XML output to extract hosts with OS info"""
import re
hosts = []
# Try XML parsing first
try:
import xml.etree.ElementTree as ET
# Handle case where XML might have non-XML content before it
xml_start = xml_output.find('<?xml')
if xml_start == -1:
xml_start = xml_output.find('<nmaprun')
if xml_start != -1:
xml_output = xml_output[xml_start:]
root = ET.fromstring(xml_output)
for host_elem in root.findall('.//host'):
if host_elem.find("status").get("state") != "up":
continue
host = {
"ip": "",
"hostname": "",
"mac": "",
"vendor": "",
"os_type": "",
"os_details": "",
"ports": []
}
# Get IP address
addr = host_elem.find("address[@addrtype='ipv4']")
if addr is not None:
host["ip"] = addr.get("addr", "")
# Get MAC address
mac = host_elem.find("address[@addrtype='mac']")
if mac is not None:
host["mac"] = mac.get("addr", "")
host["vendor"] = mac.get("vendor", "")
# Get hostname
hostname = host_elem.find(".//hostname")
if hostname is not None:
host["hostname"] = hostname.get("name", "")
# Get OS info
os_elem = host_elem.find(".//osmatch")
if os_elem is not None:
os_name = os_elem.get("name", "")
host["os_details"] = os_name
host["os_type"] = detect_os_type(os_name)
else:
# Try osclass
osclass = host_elem.find(".//osclass")
if osclass is not None:
osfamily = osclass.get("osfamily", "")
host["os_type"] = detect_os_type(osfamily)
host["os_details"] = f"{osfamily} {osclass.get('osgen', '')}"
# Get ports
for port_elem in host_elem.findall(".//port"):
port_info = {
"port": int(port_elem.get("portid", 0)),
"protocol": port_elem.get("protocol", "tcp"),
"state": port_elem.find("state").get("state", "") if port_elem.find("state") is not None else "",
"service": ""
}
service = port_elem.find("service")
if service is not None:
port_info["service"] = service.get("name", "")
port_info["product"] = service.get("product", "")
port_info["version"] = service.get("version", "")
# Use service info to help detect OS
if not host["os_type"]:
product = service.get("product", "").lower()
if "microsoft" in product or "windows" in product:
host["os_type"] = "Windows"
elif "apache" in product or "nginx" in product:
if not host["os_type"]:
host["os_type"] = "Linux"
if port_info["state"] == "open":
host["ports"].append(port_info)
# Infer OS from ports if still unknown
if not host["os_type"]:
host["os_type"] = infer_os_from_ports(host["ports"])
if host["ip"]:
hosts.append(host)
except Exception as e:
# Fallback: parse text output
print(f"XML parsing failed: {e}, falling back to text parsing")
hosts = parse_nmap_text(xml_output)
return hosts
def detect_os_type(os_string: str) -> str:
"""Detect OS type from nmap OS string"""
if not os_string:
return ""
os_lower = os_string.lower()
if "windows" in os_lower:
return "Windows"
elif "linux" in os_lower or "ubuntu" in os_lower or "debian" in os_lower or "centos" in os_lower or "red hat" in os_lower:
return "Linux"
elif "mac os" in os_lower or "darwin" in os_lower or "apple" in os_lower or "ios" in os_lower:
return "macOS"
elif "cisco" in os_lower:
return "Cisco Router"
elif "juniper" in os_lower:
return "Juniper Router"
elif "fortinet" in os_lower or "fortigate" in os_lower:
return "Fortinet"
elif "vmware" in os_lower or "esxi" in os_lower:
return "VMware Server"
elif "freebsd" in os_lower:
return "FreeBSD"
elif "android" in os_lower:
return "Android"
elif "printer" in os_lower or "hp" in os_lower:
return "Printer"
elif "switch" in os_lower:
return "Network Switch"
elif "router" in os_lower:
return "Router"
return ""
def infer_os_from_ports(ports: List[Dict]) -> str:
"""Infer OS type from open ports"""
port_nums = [p["port"] for p in ports]
services = [p.get("service", "").lower() for p in ports]
products = [p.get("product", "").lower() for p in ports]
# Windows indicators
windows_ports = {135, 139, 445, 3389, 5985, 5986}
if windows_ports & set(port_nums):
return "Windows"
if any("microsoft" in p or "windows" in p for p in products):
return "Windows"
# Linux indicators
if 22 in port_nums and "ssh" in services:
return "Linux"
# Network device indicators
if 161 in port_nums or 162 in port_nums: # SNMP
return "Network Device"
# Printer
if 9100 in port_nums or 631 in port_nums:
return "Printer"
return ""
def parse_nmap_text(output: str) -> List[Dict[str, Any]]:
"""Parse nmap text output as fallback"""
import re
hosts = []
current_host = None
for line in output.split('\n'):
# Match host line
host_match = re.search(r'Nmap scan report for (?:(\S+) \()?(\d+\.\d+\.\d+\.\d+)', line)
if host_match:
if current_host and current_host.get("ip"):
hosts.append(current_host)
current_host = {
"ip": host_match.group(2),
"hostname": host_match.group(1) or "",
"os_type": "",
"os_details": "",
"ports": [],
"mac": "",
"vendor": ""
}
continue
if current_host:
# Match MAC
mac_match = re.search(r'MAC Address: ([0-9A-F:]+) \(([^)]+)\)', line)
if mac_match:
current_host["mac"] = mac_match.group(1)
current_host["vendor"] = mac_match.group(2)
# Match port
port_match = re.search(r'(\d+)/(tcp|udp)\s+(\w+)\s+(\S+)', line)
if port_match:
current_host["ports"].append({
"port": int(port_match.group(1)),
"protocol": port_match.group(2),
"state": port_match.group(3),
"service": port_match.group(4)
})
# Match OS
os_match = re.search(r'OS details?: (.+)', line)
if os_match:
current_host["os_details"] = os_match.group(1)
current_host["os_type"] = detect_os_type(os_match.group(1))
if current_host and current_host.get("ip"):
hosts.append(current_host)
return hosts
@app.get("/api/network/scan/{scan_id}")
async def get_network_scan(scan_id: str):
"""Get network scan status and results"""
if scan_id not in network_scans:
raise HTTPException(status_code=404, detail="Scan not found")
return network_scans[scan_id]
@app.get("/api/network/hosts")
async def get_network_hosts():
"""Get all discovered network hosts"""
return {"hosts": network_hosts}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8080)