diff --git a/services/dashboard/app/main.py b/services/dashboard/app/main.py
index 10a88d5..48c391c 100644
--- a/services/dashboard/app/main.py
+++ b/services/dashboard/app/main.py
@@ -74,6 +74,11 @@ class ScanRequest(BaseModel):
options: Dict[str, Any] = Field(default_factory=dict)
+class NetworkScanRequest(BaseModel):
+ target: str
+ scan_type: str = "os" # ping, quick, os, full
+
+
@app.get("/health")
async def health_check():
"""Health check endpoint"""
@@ -363,6 +368,313 @@ async def get_kali_tools():
raise HTTPException(status_code=503, detail="Kali executor not available")
+# ============== Network Map Endpoints ==============
+
+# In-memory store for network scan results
+network_scans = {}
+network_hosts = []
+
+@app.post("/api/network/scan")
+async def start_network_scan(request: NetworkScanRequest):
+ """Start a network range scan for OS detection"""
+ import uuid
+ scan_id = str(uuid.uuid4())[:8]
+
+ # Build nmap command based on scan type
+ scan_commands = {
+ "ping": f"nmap -sn {request.target} -oX -",
+ "quick": f"nmap -T4 -F -O --osscan-limit {request.target} -oX -",
+ "os": f"nmap -O -sV --version-light {request.target} -oX -",
+ "full": f"nmap -sS -sV -O -p- --version-all {request.target} -oX -"
+ }
+
+ command = scan_commands.get(request.scan_type, scan_commands["os"])
+
+ network_scans[scan_id] = {
+ "scan_id": scan_id,
+ "target": request.target,
+ "scan_type": request.scan_type,
+ "status": "running",
+ "hosts": [],
+ "command": command
+ }
+
+ # Execute scan asynchronously
+ import asyncio
+ asyncio.create_task(execute_network_scan(scan_id, command))
+
+ return {"scan_id": scan_id, "status": "running"}
+
+
+async def execute_network_scan(scan_id: str, command: str):
+ """Execute network scan and parse results"""
+ global network_hosts
+
+ try:
+ async with httpx.AsyncClient() as client:
+ response = await client.post(
+ f"{HACKGPT_API_URL}/execute",
+ json={"command": command, "timeout": 600},
+ timeout=610.0
+ )
+
+ if response.status_code == 200:
+ result = response.json()
+ stdout = result.get("stdout", "")
+
+ # Parse nmap XML output
+ hosts = parse_nmap_xml(stdout)
+
+ network_scans[scan_id]["status"] = "completed"
+ network_scans[scan_id]["hosts"] = hosts
+
+ # Update global host list
+ for host in hosts:
+ existing = next((h for h in network_hosts if h["ip"] == host["ip"]), None)
+ if existing:
+ existing.update(host)
+ else:
+ network_hosts.append(host)
+ else:
+ network_scans[scan_id]["status"] = "failed"
+ network_scans[scan_id]["error"] = response.text
+
+ except Exception as e:
+ network_scans[scan_id]["status"] = "failed"
+ network_scans[scan_id]["error"] = str(e)
+
+
+def parse_nmap_xml(xml_output: str) -> List[Dict[str, Any]]:
+ """Parse nmap XML output to extract hosts with OS info"""
+ import re
+ hosts = []
+
+ # Try XML parsing first
+ try:
+ import xml.etree.ElementTree as ET
+
+ # Handle case where XML might have non-XML content before it
+ xml_start = xml_output.find(' str:
+ """Detect OS type from nmap OS string"""
+ if not os_string:
+ return ""
+ os_lower = os_string.lower()
+
+ if "windows" in os_lower:
+ return "Windows"
+ elif "linux" in os_lower or "ubuntu" in os_lower or "debian" in os_lower or "centos" in os_lower or "red hat" in os_lower:
+ return "Linux"
+ elif "mac os" in os_lower or "darwin" in os_lower or "apple" in os_lower or "ios" in os_lower:
+ return "macOS"
+ elif "cisco" in os_lower:
+ return "Cisco Router"
+ elif "juniper" in os_lower:
+ return "Juniper Router"
+ elif "fortinet" in os_lower or "fortigate" in os_lower:
+ return "Fortinet"
+ elif "vmware" in os_lower or "esxi" in os_lower:
+ return "VMware Server"
+ elif "freebsd" in os_lower:
+ return "FreeBSD"
+ elif "android" in os_lower:
+ return "Android"
+ elif "printer" in os_lower or "hp" in os_lower:
+ return "Printer"
+ elif "switch" in os_lower:
+ return "Network Switch"
+ elif "router" in os_lower:
+ return "Router"
+
+ return ""
+
+
+def infer_os_from_ports(ports: List[Dict]) -> str:
+ """Infer OS type from open ports"""
+ port_nums = [p["port"] for p in ports]
+ services = [p.get("service", "").lower() for p in ports]
+ products = [p.get("product", "").lower() for p in ports]
+
+ # Windows indicators
+ windows_ports = {135, 139, 445, 3389, 5985, 5986}
+ if windows_ports & set(port_nums):
+ return "Windows"
+ if any("microsoft" in p or "windows" in p for p in products):
+ return "Windows"
+
+ # Linux indicators
+ if 22 in port_nums and "ssh" in services:
+ return "Linux"
+
+ # Network device indicators
+ if 161 in port_nums or 162 in port_nums: # SNMP
+ return "Network Device"
+
+ # Printer
+ if 9100 in port_nums or 631 in port_nums:
+ return "Printer"
+
+ return ""
+
+
+def parse_nmap_text(output: str) -> List[Dict[str, Any]]:
+ """Parse nmap text output as fallback"""
+ import re
+ hosts = []
+ current_host = None
+
+ for line in output.split('\n'):
+ # Match host line
+ host_match = re.search(r'Nmap scan report for (?:(\S+) \()?(\d+\.\d+\.\d+\.\d+)', line)
+ if host_match:
+ if current_host and current_host.get("ip"):
+ hosts.append(current_host)
+ current_host = {
+ "ip": host_match.group(2),
+ "hostname": host_match.group(1) or "",
+ "os_type": "",
+ "os_details": "",
+ "ports": [],
+ "mac": "",
+ "vendor": ""
+ }
+ continue
+
+ if current_host:
+ # Match MAC
+ mac_match = re.search(r'MAC Address: ([0-9A-F:]+) \(([^)]+)\)', line)
+ if mac_match:
+ current_host["mac"] = mac_match.group(1)
+ current_host["vendor"] = mac_match.group(2)
+
+ # Match port
+ port_match = re.search(r'(\d+)/(tcp|udp)\s+(\w+)\s+(\S+)', line)
+ if port_match:
+ current_host["ports"].append({
+ "port": int(port_match.group(1)),
+ "protocol": port_match.group(2),
+ "state": port_match.group(3),
+ "service": port_match.group(4)
+ })
+
+ # Match OS
+ os_match = re.search(r'OS details?: (.+)', line)
+ if os_match:
+ current_host["os_details"] = os_match.group(1)
+ current_host["os_type"] = detect_os_type(os_match.group(1))
+
+ if current_host and current_host.get("ip"):
+ hosts.append(current_host)
+
+ return hosts
+
+
+@app.get("/api/network/scan/{scan_id}")
+async def get_network_scan(scan_id: str):
+ """Get network scan status and results"""
+ if scan_id not in network_scans:
+ raise HTTPException(status_code=404, detail="Scan not found")
+ return network_scans[scan_id]
+
+
+@app.get("/api/network/hosts")
+async def get_network_hosts():
+ """Get all discovered network hosts"""
+ return {"hosts": network_hosts}
+
+
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8080)
\ No newline at end of file
diff --git a/services/dashboard/templates/index.html b/services/dashboard/templates/index.html
index 20c8192..f5324a9 100644
--- a/services/dashboard/templates/index.html
+++ b/services/dashboard/templates/index.html
@@ -7,6 +7,7 @@
+