from __future__ import annotations import json import sqlite3 from pathlib import Path from typing import Any import pytest from fastapi.testclient import TestClient import api import scanner NMAP_XML = """
""" def test_parse_nmap_xml(): hosts, started, finished = scanner.parse_nmap_xml(NMAP_XML) assert len(hosts) == 1 assert hosts[0].ip == "192.168.1.10" assert hosts[0].mac_address == "00:11:22:33:44:55" assert hosts[0].services[0].port == 80 assert started.startswith("2024") assert finished.endswith("UTC") def test_scanner_main_monkeypatched(monkeypatch): class DummyProcess: def __init__(self, stdout: str): self.stdout = stdout self.stderr = "" self.returncode = 0 captured: dict[str, Any] = {} def fake_run(cmd: Any, check: bool, stdout, stderr, text): # pylint: disable=unused-argument captured["cmd"] = cmd return DummyProcess(NMAP_XML) monkeypatch.setattr(scanner, "subprocess", type("S", (), {"run": staticmethod(fake_run)})) exit_code = scanner.main( ["192.168.1.0/24", "--no-upload", "--nmap-args", "-Pn --script vuln"] ) assert exit_code == 0 assert "-Pn" in captured["cmd"] assert "--script" in captured["cmd"] def test_build_nmap_command_accepts_custom_args(): cmd = scanner.build_nmap_command( "10.0.0.0/24", "standard", None, ["-Pn", "--script", "vuln"] ) assert cmd[0] == "nmap" assert cmd[-1] == "10.0.0.0/24" assert cmd[1:4] == ["-Pn", "--script", "vuln"] @pytest.fixture() def temp_db(tmp_path: Path, monkeypatch): db_path = tmp_path / "api.db" exploit_db = tmp_path / "exploits.db" tasks_db = tmp_path / "tasks.db" monkeypatch.setattr(api, "DB_PATH", db_path) monkeypatch.setattr(api, "EXPLOIT_DB_PATH", exploit_db) monkeypatch.setattr(api.task_queue, "DB_PATH", tasks_db) api.initialize_db() api.task_queue.ensure_tables() conn = sqlite3.connect(exploit_db) conn.execute( "CREATE TABLE IF NOT EXISTS cves (cve_id TEXT PRIMARY KEY, description TEXT, severity TEXT, score REAL)" ) conn.execute( "INSERT OR REPLACE INTO cves(cve_id, description, severity, score) VALUES(?,?,?,?)", ("CVE-2023-12345", "Test vuln", "HIGH", 8.8), ) conn.close() return db_path def test_ingest_and_list_assets(temp_db): client = TestClient(api.app) payload = { "ip": "192.168.1.10", "hostname": "web", "mac_address": "00:11:22:33:44:55", "mac_vendor": "TestVendor", "scan": {"scan_id": "unit-test-scan", "scanner": "pytest", "mode": "fast"}, "services": [ { "port": 80, "proto": "tcp", "product": "Apache", "version": "2.4.57", "extra": {"name": "http"}, "cves": ["CVE-2023-12345"], } ], } response = client.post("/ingest/scan", json=payload) assert response.status_code == 200 asset = response.json() assert asset["mac_address"] == "00:11:22:33:44:55" list_response = client.get("/assets") assert list_response.status_code == 200 assets = list_response.json() assert assets[0]["services"][0]["cves"] == ["CVE-2023-12345"] vuln = assets[0]["services"][0]["vulnerabilities"][0] assert vuln["severity"] == "HIGH" conn = sqlite3.connect(temp_db) rows = conn.execute("SELECT COUNT(*) FROM services").fetchone()[0] assert rows == 1 scan_runs = conn.execute("SELECT COUNT(*) FROM scan_runs").fetchone()[0] assert scan_runs == 1 conn.close() scans = client.get("/scans").json() assert scans[0]["scan_id"] == "unit-test-scan" suggestions = client.get("/attack_suggestions").json() assert any(item["technique_id"] == "T1068" for item in suggestions) def test_task_queue_endpoints(temp_db): client = TestClient(api.app) response = client.post( "/tasks", json={ "tool": "password_cracker", "target": "lab-hashes", "params": {"hash_file": "hashes.txt", "wordlist": "rockyou.txt"}, }, ) assert response.status_code == 201 task = response.json() assert task["tool"] == "password_cracker" list_response = client.get("/tasks") assert list_response.status_code == 200 tasks = list_response.json() assert len(tasks) == 1 assert tasks[0]["target"] == "lab-hashes" update_response = client.post( f"/tasks/{task['id']}/status", json={"status": "completed", "result": {"exit_code": 0}}, ) assert update_response.status_code == 200 assert update_response.json()["status"] == "completed"