feat: Add real-time progress tracking for network scans

- Progress bar shows X/255 (or calculated range size)
- Percentage display during scan
- Currently scanning IP shown in real-time
- Live host count as hosts are discovered
- Remaining hosts counter
- Faster polling (2s instead of 5s) for smoother updates
- Backend calculates total hosts from CIDR or range notation
- Parse nmap --stats-every output for progress info
This commit is contained in:
2025-11-28 15:32:57 -05:00
parent 304d223a49
commit e51e52129f
2 changed files with 139 additions and 16 deletions

View File

@@ -395,12 +395,16 @@ async def start_network_scan(request: NetworkScanRequest):
import uuid
scan_id = str(uuid.uuid4())[:8]
# Calculate total hosts in target range
total_hosts = calculate_target_hosts(request.target)
# Build nmap command based on scan type
# Use --stats-every to get progress updates
scan_commands = {
"ping": f"nmap -sn {request.target} -oX -",
"quick": f"nmap -T4 -F -O --osscan-limit {request.target} -oX -",
"os": f"nmap -O -sV --version-light {request.target} -oX -",
"full": f"nmap -sS -sV -O -p- --version-all {request.target} -oX -"
"ping": f"nmap -sn {request.target} -oX - --stats-every 2s",
"quick": f"nmap -T4 -F -O --osscan-limit {request.target} -oX - --stats-every 2s",
"os": f"nmap -O -sV --version-light {request.target} -oX - --stats-every 2s",
"full": f"nmap -sS -sV -O -p- --version-all {request.target} -oX - --stats-every 2s"
}
command = scan_commands.get(request.scan_type, scan_commands["os"])
@@ -411,21 +415,57 @@ async def start_network_scan(request: NetworkScanRequest):
"scan_type": request.scan_type,
"status": "running",
"hosts": [],
"command": command
"command": command,
"progress": {
"total": total_hosts,
"scanned": 0,
"current_ip": "",
"hosts_found": 0,
"percent": 0
}
}
# Execute scan asynchronously
# Execute scan asynchronously with progress tracking
import asyncio
asyncio.create_task(execute_network_scan(scan_id, command))
asyncio.create_task(execute_network_scan_with_progress(scan_id, command, request.target))
return {"scan_id": scan_id, "status": "running"}
return {"scan_id": scan_id, "status": "running", "total_hosts": total_hosts}
async def execute_network_scan(scan_id: str, command: str):
"""Execute network scan and parse results"""
def calculate_target_hosts(target: str) -> int:
"""Calculate the number of hosts in a target specification"""
import ipaddress
# Handle CIDR notation
if '/' in target:
try:
network = ipaddress.ip_network(target, strict=False)
return network.num_addresses - 2 # Subtract network and broadcast
except ValueError:
pass
# Handle range notation (e.g., 192.168.1.1-50)
if '-' in target:
try:
parts = target.rsplit('.', 1)
if len(parts) == 2:
range_part = parts[1]
if '-' in range_part:
start, end = range_part.split('-')
return int(end) - int(start) + 1
except (ValueError, IndexError):
pass
# Single host
return 1
async def execute_network_scan_with_progress(scan_id: str, command: str, target: str):
"""Execute network scan with progress tracking"""
global network_hosts
try:
# Use streaming execution if available, otherwise batch
async with httpx.AsyncClient() as client:
response = await client.post(
f"{HACKGPT_API_URL}/execute",
@@ -437,11 +477,19 @@ async def execute_network_scan(scan_id: str, command: str):
result = response.json()
stdout = result.get("stdout", "")
# Parse nmap XML output
# Parse progress from nmap stats output
progress_info = parse_nmap_progress(stdout)
if progress_info:
network_scans[scan_id]["progress"].update(progress_info)
# Parse nmap XML output for hosts
hosts = parse_nmap_xml(stdout)
network_scans[scan_id]["status"] = "completed"
network_scans[scan_id]["hosts"] = hosts
network_scans[scan_id]["progress"]["scanned"] = network_scans[scan_id]["progress"]["total"]
network_scans[scan_id]["progress"]["hosts_found"] = len(hosts)
network_scans[scan_id]["progress"]["percent"] = 100
# Update global host list
for host in hosts:
@@ -459,6 +507,34 @@ async def execute_network_scan(scan_id: str, command: str):
network_scans[scan_id]["error"] = str(e)
def parse_nmap_progress(output: str) -> dict:
"""Parse nmap stats output for progress information"""
import re
progress = {}
# Look for stats lines like: "Stats: 0:00:45 elapsed; 50 hosts completed (10 up), 5 undergoing..."
stats_pattern = r'Stats:.*?(\d+)\s+hosts?\s+completed.*?(\d+)\s+up'
match = re.search(stats_pattern, output, re.IGNORECASE)
if match:
progress['scanned'] = int(match.group(1))
progress['hosts_found'] = int(match.group(2))
# Look for percentage: "About 45.00% done"
percent_pattern = r'About\s+([\d.]+)%\s+done'
match = re.search(percent_pattern, output, re.IGNORECASE)
if match:
progress['percent'] = float(match.group(1))
# Look for current scan target
current_pattern = r'Scanning\s+([^\s\[]+)'
match = re.search(current_pattern, output)
if match:
progress['current_ip'] = match.group(1)
return progress
def parse_nmap_xml(xml_output: str) -> List[Dict[str, Any]]:
"""Parse nmap XML output to extract hosts with OS info"""
import re