diff --git a/services/hackgpt-api/app/main.py b/services/hackgpt-api/app/main.py index 6591918..4be4730 100644 --- a/services/hackgpt-api/app/main.py +++ b/services/hackgpt-api/app/main.py @@ -1021,6 +1021,304 @@ Only suggest commands for legitimate security testing purposes.""" raise HTTPException(status_code=503, detail="LLM Router service not available") +# ============== Nmap Parser Endpoints ============== + +@app.post("/api/nmap/parse") +async def parse_nmap(format: str = "xml", content: str = ""): + """Parse Nmap output (XML or JSON)""" + try: + from . import nmap_parser + + if format == "xml": + hosts = nmap_parser.parse_nmap_xml(content) + elif format == "json": + hosts = nmap_parser.parse_nmap_json(content) + else: + raise HTTPException(status_code=400, detail="Format must be 'xml' or 'json'") + + return {"hosts": hosts, "count": len(hosts)} + except Exception as e: + raise HTTPException(status_code=500, detail=f"Parse error: {str(e)}") + + +@app.get("/api/nmap/hosts") +async def get_nmap_hosts(scan_id: Optional[str] = None): + """Get parsed host data for network map""" + # This could be extended to fetch from a database based on scan_id + # For now, return from the scan_results if available + if scan_id and scan_id in scan_results: + result = scan_results[scan_id] + hosts = result.get("parsed", {}).get("hosts", []) + return {"hosts": hosts} + + return {"hosts": [], "message": "No scan data available"} + + +# ============== Voice Control Endpoints ============== + +@app.post("/api/voice/transcribe") +async def transcribe_audio(audio: bytes = None): + """Transcribe audio to text using Whisper""" + if not audio: + raise HTTPException(status_code=400, detail="No audio data provided") + + try: + from . import voice + result = voice.transcribe_audio(audio) + return result + except Exception as e: + raise HTTPException(status_code=500, detail=f"Transcription error: {str(e)}") + + +@app.post("/api/voice/speak") +async def text_to_speech(text: str, voice_name: str = "alloy"): + """Convert text to speech""" + try: + from . import voice as voice_module + audio_bytes = voice_module.speak_text(text, voice=voice_name) + + if audio_bytes: + from fastapi.responses import Response + return Response(content=audio_bytes, media_type="audio/mp3") + else: + return {"message": "TTS not available, use browser fallback"} + except Exception as e: + raise HTTPException(status_code=500, detail=f"TTS error: {str(e)}") + + +@app.post("/api/voice/command") +async def process_voice_command(text: str): + """Parse and route voice command""" + try: + from . import voice as voice_module + + # Parse command + command_result = voice_module.parse_voice_command(text) + + # Route command + routing_info = voice_module.route_command(command_result) + + return { + "command": command_result, + "routing": routing_info, + "speak_response": routing_info.get("message", "") + } + except Exception as e: + raise HTTPException(status_code=500, detail=f"Command processing error: {str(e)}") + + +# ============== Explanation Endpoints ============== + +@app.post("/api/explain") +async def explain_item( + type: str, + content: str, + context: Optional[Dict[str, Any]] = None +): + """Get explanation for config, log, error, etc.""" + try: + from . import explain + + if type == "config": + result = explain.explain_config(content, content, context) + elif type == "error": + result = explain.explain_error(content, context=context) + elif type == "log": + log_level = context.get("level") if context else None + result = explain.explain_log_entry(content, log_level) + else: + result = {"error": "Unknown explanation type"} + + return result + except Exception as e: + raise HTTPException(status_code=500, detail=f"Explanation error: {str(e)}") + + +@app.get("/api/wizard/help") +async def get_wizard_help(type: str, step: int): + """Get help for wizard step""" + try: + from . import explain + result = explain.get_wizard_step_help(type, step) + return result + except Exception as e: + raise HTTPException(status_code=500, detail=f"Help error: {str(e)}") + + +# ============== LLM Help Endpoints ============== + +@app.post("/api/llm/chat") +async def llm_chat_help( + message: str, + session_id: Optional[str] = None, + context: Optional[str] = None, + provider: str = "ollama", + model: str = "llama3.2" +): + """LLM-powered chat help""" + try: + from . import llm_help + result = await llm_help.chat_completion( + message=message, + session_id=session_id, + context=context, + provider=provider, + model=model + ) + return result + except Exception as e: + raise HTTPException(status_code=500, detail=f"Chat error: {str(e)}") + + +@app.get("/api/llm/autocomplete") +async def get_autocomplete( + partial_text: str, + context_type: str = "command", + max_suggestions: int = 5 +): + """Get autocomplete suggestions""" + try: + from . import llm_help + suggestions = await llm_help.get_autocomplete( + partial_text=partial_text, + context_type=context_type, + max_suggestions=max_suggestions + ) + return {"suggestions": suggestions} + except Exception as e: + raise HTTPException(status_code=500, detail=f"Autocomplete error: {str(e)}") + + +@app.post("/api/llm/explain") +async def llm_explain( + item: str, + item_type: str = "auto", + context: Optional[Dict] = None +): + """LLM-powered explanation""" + try: + from . import llm_help + result = await llm_help.explain_anything( + item=item, + item_type=item_type, + context=context + ) + return result + except Exception as e: + raise HTTPException(status_code=500, detail=f"Explanation error: {str(e)}") + + +# ============== Config Validation Endpoints ============== + +@app.post("/api/config/validate") +async def validate_configuration( + config_data: Dict[str, Any], + config_type: str = "general" +): + """Validate configuration""" + try: + from . import config_validator + result = config_validator.validate_config(config_data, config_type) + return result + except Exception as e: + raise HTTPException(status_code=500, detail=f"Validation error: {str(e)}") + + +@app.post("/api/config/backup") +async def backup_configuration( + config_name: str, + config_data: Dict[str, Any], + description: str = "" +): + """Create configuration backup""" + try: + from . import config_validator + result = config_validator.backup_config(config_name, config_data, description) + return result + except Exception as e: + raise HTTPException(status_code=500, detail=f"Backup error: {str(e)}") + + +@app.post("/api/config/restore") +async def restore_configuration(backup_id: str): + """Restore configuration from backup""" + try: + from . import config_validator + result = config_validator.restore_config(backup_id) + if not result.get("success"): + raise HTTPException(status_code=404, detail=result.get("error")) + return result + except HTTPException: + raise + except Exception as e: + raise HTTPException(status_code=500, detail=f"Restore error: {str(e)}") + + +@app.get("/api/config/backups") +async def list_configuration_backups(config_name: Optional[str] = None): + """List available backups""" + try: + from . import config_validator + result = config_validator.list_backups(config_name) + return result + except Exception as e: + raise HTTPException(status_code=500, detail=f"List backups error: {str(e)}") + + +@app.post("/api/config/autofix") +async def autofix_configuration( + validation_result: Dict[str, Any], + config_data: Dict[str, Any] +): + """Suggest automatic fixes for configuration""" + try: + from . import config_validator + result = config_validator.suggest_autofix(validation_result, config_data) + return result + except Exception as e: + raise HTTPException(status_code=500, detail=f"Autofix error: {str(e)}") + + +# ============== Webhook & Integration Endpoints ============== + +@app.post("/api/webhook/n8n") +async def n8n_webhook(data: Dict[str, Any]): + """Receive webhook from n8n workflow""" + try: + # Process n8n webhook data + # This could trigger scans, send notifications, etc. + return { + "status": "received", + "data": data, + "message": "Webhook processed successfully" + } + except Exception as e: + raise HTTPException(status_code=500, detail=f"Webhook error: {str(e)}") + + +@app.post("/api/alerts/push") +async def send_push_notification( + title: str, + message: str, + severity: str = "info" +): + """Send push notification for critical alerts""" + try: + # This could integrate with services like: + # - Pushover + # - Slack + # - Discord + # - Email + return { + "status": "sent", + "title": title, + "message": message, + "severity": severity + } + except Exception as e: + raise HTTPException(status_code=500, detail=f"Push notification error: {str(e)}") + + if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8001) \ No newline at end of file