Add bidirectional command capture - CLI commands now visible in dashboard

Co-authored-by: mblanke <9078342+mblanke@users.noreply.github.com>
This commit is contained in:
copilot-swe-agent[bot]
2025-12-03 15:22:10 +00:00
parent aa64383530
commit c4eaf1718a
8 changed files with 776 additions and 4 deletions

View File

@@ -498,6 +498,88 @@ async def list_installed_tools():
return {"installed_tools": installed}
@app.get("/captured_commands")
async def get_captured_commands(limit: int = 50, since: Optional[str] = None):
"""
Get commands that were captured from interactive shell sessions in the Kali container.
These are commands run directly by users via docker exec or SSH.
"""
global kali_container
if not kali_container:
raise HTTPException(status_code=503, detail="Kali container not available")
try:
kali_container.reload()
# Read command history from the shared volume
cmd = ["bash", "-c", "cd /workspace/.command_history && ls -t *.json 2>/dev/null | head -n {}".format(limit)]
exit_code, output = kali_container.exec_run(cmd=cmd, demux=True)
if exit_code != 0 or not output[0]:
return {"commands": [], "count": 0}
# Get list of log files
log_files = output[0].decode('utf-8', errors='replace').strip().split('\n')
log_files = [f for f in log_files if f.strip()]
commands = []
for log_file in log_files:
try:
# Read each JSON log file
read_cmd = ["cat", f"/workspace/.command_history/{log_file}"]
exit_code, output = kali_container.exec_run(cmd=read_cmd, demux=True)
if exit_code == 0 and output[0]:
cmd_data = json.loads(output[0].decode('utf-8', errors='replace'))
# Filter by timestamp if requested
if since:
cmd_timestamp = cmd_data.get("timestamp", "")
if cmd_timestamp < since:
continue
commands.append(cmd_data)
except json.JSONDecodeError:
continue
except Exception:
continue
return {
"commands": commands,
"count": len(commands),
"source": "interactive_shell_capture"
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error reading captured commands: {str(e)}")
@app.delete("/captured_commands/clear")
async def clear_captured_commands():
"""Clear all captured command history."""
global kali_container
if not kali_container:
raise HTTPException(status_code=503, detail="Kali container not available")
try:
kali_container.reload()
# Clear the command history directory
cmd = ["bash", "-c", "rm -f /workspace/.command_history/*.json"]
exit_code, _ = kali_container.exec_run(cmd=cmd)
if exit_code == 0:
return {"status": "cleared", "message": "All captured command history cleared"}
else:
raise HTTPException(status_code=500, detail="Failed to clear history")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/allowed-commands")
async def get_allowed_commands():
"""Get list of allowed commands for security validation."""