Files
StrikePackageGPT/services/hackgpt-api/app/config_validator.py
2025-12-03 12:50:53 +00:00

517 lines
18 KiB
Python

"""
Configuration Validator Module
Validates configurations before save/change with plain-English warnings.
Provides backup/restore functionality and auto-fix suggestions.
"""
import json
import os
from typing import Dict, Any, List, Optional, Tuple
from datetime import datetime
import copy
# Configuration storage (in production, use persistent storage)
config_backups: Dict[str, List[Dict[str, Any]]] = {}
BACKUP_DIR = os.getenv("CONFIG_BACKUP_DIR", "/workspace/config_backups")
def validate_config(
config_data: Dict[str, Any],
config_type: str = "general"
) -> Dict[str, Any]:
"""
Validate configuration data before applying changes.
Args:
config_data: Configuration dictionary to validate
config_type: Type of configuration (scan, system, security, network)
Returns:
Dictionary with validation results
{
"valid": bool,
"warnings": List[str],
"errors": List[str],
"suggestions": List[Dict],
"safe_to_apply": bool
}
"""
errors = []
warnings = []
suggestions = []
# Type-specific validation
if config_type == "scan":
errors, warnings, suggestions = _validate_scan_config(config_data)
elif config_type == "network":
errors, warnings, suggestions = _validate_network_config(config_data)
elif config_type == "security":
errors, warnings, suggestions = _validate_security_config(config_data)
else:
errors, warnings, suggestions = _validate_general_config(config_data)
# Check for common issues across all config types
common_errors, common_warnings = _check_common_issues(config_data)
errors.extend(common_errors)
warnings.extend(common_warnings)
return {
"valid": len(errors) == 0,
"warnings": warnings,
"errors": errors,
"suggestions": suggestions,
"safe_to_apply": len(errors) == 0 and len([w for w in warnings if "critical" in w.lower()]) == 0,
"config_type": config_type
}
def _validate_scan_config(config_data: Dict[str, Any]) -> Tuple[List[str], List[str], List[Dict]]:
"""Validate scan configuration."""
errors = []
warnings = []
suggestions = []
# Check timeout
timeout = config_data.get("timeout", 300)
if not isinstance(timeout, (int, float)):
errors.append("Timeout must be a number (seconds)")
elif timeout < 1:
errors.append("Timeout must be at least 1 second")
elif timeout < 10:
warnings.append("Very short timeout (< 10s) may cause scans to fail prematurely")
elif timeout > 3600:
warnings.append("Very long timeout (> 1 hour) may cause scans to hang indefinitely")
# Check target
target = config_data.get("target", "")
if not target or not isinstance(target, str):
errors.append("Target must be specified (IP address, hostname, or network range)")
elif not _is_valid_target(target):
warnings.append(f"Target '{target}' may not be valid - ensure it's a valid IP, hostname, or CIDR")
# Check scan intensity
intensity = config_data.get("intensity", 3)
if isinstance(intensity, (int, float)):
if intensity < 1 or intensity > 5:
warnings.append("Scan intensity should be between 1 (stealth) and 5 (aggressive)")
if intensity >= 4:
warnings.append("High intensity scans may trigger IDS/IPS systems")
suggestions.append({
"field": "intensity",
"suggestion": 3,
"reason": "Balanced intensity for stealth and speed"
})
# Check port range
ports = config_data.get("ports", "")
if ports:
if not _is_valid_port_spec(str(ports)):
errors.append(f"Invalid port specification: {ports}")
return errors, warnings, suggestions
def _validate_network_config(config_data: Dict[str, Any]) -> Tuple[List[str], List[str], List[Dict]]:
"""Validate network configuration."""
errors = []
warnings = []
suggestions = []
# Check port
port = config_data.get("port")
if port is not None:
if not isinstance(port, int):
errors.append("Port must be an integer")
elif port < 1 or port > 65535:
errors.append("Port must be between 1 and 65535")
elif port < 1024:
warnings.append("Ports below 1024 require elevated privileges")
# Check host/bind address
host = config_data.get("host", "")
if host and not _is_valid_ip_or_hostname(host):
warnings.append(f"Host '{host}' may not be a valid IP address or hostname")
# Check max connections
max_conn = config_data.get("max_connections")
if max_conn is not None:
if not isinstance(max_conn, int) or max_conn < 1:
errors.append("max_connections must be a positive integer")
elif max_conn > 1000:
warnings.append("Very high max_connections (> 1000) may exhaust system resources")
return errors, warnings, suggestions
def _validate_security_config(config_data: Dict[str, Any]) -> Tuple[List[str], List[str], List[Dict]]:
"""Validate security configuration."""
errors = []
warnings = []
suggestions = []
# Check for exposed secrets
for key, value in config_data.items():
if any(secret_word in key.lower() for secret_word in ['password', 'secret', 'token', 'key', 'credential']):
if isinstance(value, str):
if len(value) < 8:
warnings.append(f"SECURITY: {key} appears weak (< 8 characters)")
if value in ['password', '123456', 'admin', 'default']:
errors.append(f"SECURITY: {key} is using a default/weak value")
# Check SSL/TLS settings
ssl_enabled = config_data.get("ssl_enabled", False)
if not ssl_enabled:
warnings.append("SECURITY: SSL/TLS is disabled - data will be transmitted unencrypted")
# Check authentication
auth_enabled = config_data.get("authentication_enabled", True)
if not auth_enabled:
warnings.append("SECURITY: Authentication is disabled - system will be exposed")
return errors, warnings, suggestions
def _validate_general_config(config_data: Dict[str, Any]) -> Tuple[List[str], List[str], List[Dict]]:
"""Validate general configuration."""
errors = []
warnings = []
suggestions = []
# Check for valid JSON structure
if not isinstance(config_data, dict):
errors.append("Configuration must be a JSON object")
return errors, warnings, suggestions
# Check for empty config
if not config_data:
warnings.append("Configuration is empty")
return errors, warnings, suggestions
def _check_common_issues(config_data: Dict[str, Any]) -> Tuple[List[str], List[str]]:
"""Check for common configuration issues."""
errors = []
warnings = []
# Check for null/undefined values
for key, value in config_data.items():
if value is None:
warnings.append(f"Value for '{key}' is null - will use default")
# Check for suspicious paths
for key, value in config_data.items():
if isinstance(value, str):
if value.startswith('/root/') or value.startswith('C:\\Windows\\'):
warnings.append(f"SECURITY: '{key}' points to a sensitive system path")
return errors, warnings
def backup_config(config_name: str, config_data: Dict[str, Any], description: str = "") -> Dict[str, Any]:
"""
Create a backup of current configuration.
Args:
config_name: Name/ID of the configuration
config_data: Configuration data to backup
description: Optional description of the backup
Returns:
Dictionary with backup information
"""
timestamp = datetime.utcnow().isoformat()
backup_id = f"{config_name}_{timestamp}"
backup = {
"backup_id": backup_id,
"config_name": config_name,
"timestamp": timestamp,
"description": description or "Automatic backup",
"config_data": copy.deepcopy(config_data),
"size_bytes": len(json.dumps(config_data))
}
# Store in memory
if config_name not in config_backups:
config_backups[config_name] = []
config_backups[config_name].append(backup)
# Keep only last 10 backups per config
if len(config_backups[config_name]) > 10:
config_backups[config_name] = config_backups[config_name][-10:]
# Also save to disk if backup directory exists
try:
os.makedirs(BACKUP_DIR, exist_ok=True)
backup_file = os.path.join(BACKUP_DIR, f"{backup_id}.json")
with open(backup_file, 'w') as f:
json.dump(backup, f, indent=2)
except Exception as e:
print(f"Warning: Could not save backup to disk: {e}")
return {
"success": True,
"backup_id": backup_id,
"timestamp": timestamp,
"message": f"Configuration backed up successfully"
}
def restore_config(backup_id: str) -> Dict[str, Any]:
"""
Restore configuration from a backup.
Args:
backup_id: ID of the backup to restore
Returns:
Dictionary with restored configuration and metadata
"""
# Search in memory backups
for config_name, backups in config_backups.items():
for backup in backups:
if backup["backup_id"] == backup_id:
return {
"success": True,
"backup_id": backup_id,
"config_name": config_name,
"config_data": copy.deepcopy(backup["config_data"]),
"timestamp": backup["timestamp"],
"description": backup["description"],
"message": "Configuration restored successfully"
}
# Try loading from disk
try:
backup_file = os.path.join(BACKUP_DIR, f"{backup_id}.json")
if os.path.exists(backup_file):
with open(backup_file, 'r') as f:
backup = json.load(f)
return {
"success": True,
"backup_id": backup_id,
"config_name": backup["config_name"],
"config_data": backup["config_data"],
"timestamp": backup["timestamp"],
"description": backup.get("description", ""),
"message": "Configuration restored from disk backup"
}
except Exception as e:
pass
return {
"success": False,
"backup_id": backup_id,
"error": "Backup not found",
"message": f"No backup found with ID: {backup_id}"
}
def suggest_autofix(validation_result: Dict[str, Any], config_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Suggest automatic fixes for configuration issues.
Args:
validation_result: Result from validate_config()
config_data: Original configuration data
Returns:
Dictionary with auto-fix suggestions
"""
if validation_result.get("valid") and not validation_result.get("warnings"):
return {
"has_fixes": False,
"message": "Configuration is valid, no fixes needed"
}
fixed_config = copy.deepcopy(config_data)
fixes_applied = []
# Apply suggestions from validation
for suggestion in validation_result.get("suggestions", []):
field = suggestion.get("field")
suggested_value = suggestion.get("suggestion")
reason = suggestion.get("reason")
if field in fixed_config:
old_value = fixed_config[field]
fixed_config[field] = suggested_value
fixes_applied.append({
"field": field,
"old_value": old_value,
"new_value": suggested_value,
"reason": reason
})
# Apply common fixes based on errors
for error in validation_result.get("errors", []):
if "timeout must be" in error.lower():
if "timeout" in fixed_config:
fixed_config["timeout"] = 300 # Default safe timeout
fixes_applied.append({
"field": "timeout",
"old_value": config_data.get("timeout"),
"new_value": 300,
"reason": "Reset to safe default value"
})
if "port must be" in error.lower():
if "port" in fixed_config:
fixed_config["port"] = 8080 # Default safe port
fixes_applied.append({
"field": "port",
"old_value": config_data.get("port"),
"new_value": 8080,
"reason": "Reset to safe default port"
})
return {
"has_fixes": len(fixes_applied) > 0,
"fixes_applied": fixes_applied,
"fixed_config": fixed_config,
"message": f"Applied {len(fixes_applied)} automatic fixes"
}
def list_backups(config_name: Optional[str] = None) -> Dict[str, Any]:
"""
List available configuration backups.
Args:
config_name: Optional config name to filter by
Returns:
Dictionary with list of backups
"""
all_backups = []
# Get from memory
if config_name:
backups = config_backups.get(config_name, [])
for backup in backups:
all_backups.append({
"backup_id": backup["backup_id"],
"config_name": backup["config_name"],
"timestamp": backup["timestamp"],
"description": backup["description"],
"size_bytes": backup["size_bytes"]
})
else:
for cfg_name, backups in config_backups.items():
for backup in backups:
all_backups.append({
"backup_id": backup["backup_id"],
"config_name": backup["config_name"],
"timestamp": backup["timestamp"],
"description": backup["description"],
"size_bytes": backup["size_bytes"]
})
# Also check disk backups
try:
if os.path.exists(BACKUP_DIR):
for filename in os.listdir(BACKUP_DIR):
if filename.endswith('.json'):
backup_id = filename[:-5] # Remove .json
# Check if already in list (avoid duplicates)
if not any(b["backup_id"] == backup_id for b in all_backups):
try:
filepath = os.path.join(BACKUP_DIR, filename)
with open(filepath, 'r') as f:
backup = json.load(f)
if not config_name or backup["config_name"] == config_name:
all_backups.append({
"backup_id": backup["backup_id"],
"config_name": backup["config_name"],
"timestamp": backup["timestamp"],
"description": backup.get("description", ""),
"size_bytes": os.path.getsize(filepath)
})
except:
pass
except Exception as e:
print(f"Warning: Could not read disk backups: {e}")
# Sort by timestamp (newest first)
all_backups.sort(key=lambda x: x["timestamp"], reverse=True)
return {
"backups": all_backups,
"count": len(all_backups),
"config_name": config_name
}
# Validation helper functions
def _is_valid_target(target: str) -> bool:
"""Check if target is a valid IP, hostname, or CIDR."""
import re
# IP address
ip_pattern = r'^(\d{1,3}\.){3}\d{1,3}$'
if re.match(ip_pattern, target):
parts = target.split('.')
return all(0 <= int(part) <= 255 for part in parts)
# CIDR notation
if '/' in target:
cidr_pattern = r'^(\d{1,3}\.){3}\d{1,3}/\d{1,2}$'
if re.match(cidr_pattern, target):
ip_part = target.split('/')[0]
return _is_valid_target(ip_part)
# IP range
if '-' in target:
range_pattern = r'^(\d{1,3}\.){3}\d{1,3}-\d{1,3}$'
if re.match(range_pattern, target):
return True
# Hostname/domain
hostname_pattern = r'^([a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?\.)*[a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?$'
if re.match(hostname_pattern, target):
return True
return False
def _is_valid_port_spec(ports: str) -> bool:
"""Check if port specification is valid."""
import re
# Single port
if ports.isdigit():
port_num = int(ports)
return 1 <= port_num <= 65535
# Port range
if '-' in ports:
range_pattern = r'^\d+-\d+$'
if re.match(range_pattern, ports):
start, end = map(int, ports.split('-'))
return 1 <= start <= end <= 65535
# Comma-separated ports
if ',' in ports:
port_list = ports.split(',')
return all(_is_valid_port_spec(p.strip()) for p in port_list)
return False
def _is_valid_ip_or_hostname(host: str) -> bool:
"""Check if host is a valid IP address or hostname."""
import re
# IP address
ip_pattern = r'^(\d{1,3}\.){3}\d{1,3}$'
if re.match(ip_pattern, host):
parts = host.split('.')
return all(0 <= int(part) <= 255 for part in parts)
# Hostname
hostname_pattern = r'^([a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?\.)*[a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?$'
return bool(re.match(hostname_pattern, host))