Files
ThreatHunt/backend/app/core/playbook_engine.py

166 lines
4.8 KiB
Python

"""
Playbook Execution Engine
Executes automated response playbooks based on triggers.
"""
from typing import Dict, Any, List
from datetime import datetime, timezone
import asyncio
class PlaybookEngine:
"""Engine for executing playbooks"""
def __init__(self):
"""Initialize playbook engine"""
self.actions_registry = {
"send_notification": self._action_send_notification,
"create_case": self._action_create_case,
"isolate_host": self._action_isolate_host,
"collect_artifact": self._action_collect_artifact,
"block_ip": self._action_block_ip,
"send_email": self._action_send_email,
}
async def execute_playbook(
self,
playbook: Dict[str, Any],
context: Dict[str, Any]
) -> Dict[str, Any]:
"""
Execute a playbook
Args:
playbook: Playbook definition
context: Execution context with relevant data
Returns:
Execution result
"""
results = []
errors = []
actions = playbook.get("actions", [])
for action in actions:
action_type = action.get("type")
action_params = action.get("params", {})
try:
# Get action handler
handler = self.actions_registry.get(action_type)
if not handler:
errors.append(f"Unknown action type: {action_type}")
continue
# Execute action
result = await handler(action_params, context)
results.append({
"action": action_type,
"status": "success",
"result": result
})
except Exception as e:
errors.append(f"Error in action {action_type}: {str(e)}")
results.append({
"action": action_type,
"status": "failed",
"error": str(e)
})
return {
"status": "completed" if not errors else "completed_with_errors",
"results": results,
"errors": errors
}
async def _action_send_notification(
self,
params: Dict[str, Any],
context: Dict[str, Any]
) -> Dict[str, Any]:
"""Send a notification"""
# In production, this would create a notification in the database
# and push it via WebSocket
return {
"notification_sent": True,
"message": params.get("message", "Playbook notification")
}
async def _action_create_case(
self,
params: Dict[str, Any],
context: Dict[str, Any]
) -> Dict[str, Any]:
"""Create a new case"""
# In production, this would create a case in the database
return {
"case_created": True,
"case_title": params.get("title", "Automated Case"),
"case_id": "placeholder_id"
}
async def _action_isolate_host(
self,
params: Dict[str, Any],
context: Dict[str, Any]
) -> Dict[str, Any]:
"""Isolate a host"""
# In production, this would call Velociraptor or other tools
# to isolate the host from the network
host_id = params.get("host_id")
return {
"host_isolated": True,
"host_id": host_id
}
async def _action_collect_artifact(
self,
params: Dict[str, Any],
context: Dict[str, Any]
) -> Dict[str, Any]:
"""Collect an artifact from a host"""
# In production, this would trigger Velociraptor collection
return {
"collection_started": True,
"artifact": params.get("artifact_name"),
"client_id": params.get("client_id")
}
async def _action_block_ip(
self,
params: Dict[str, Any],
context: Dict[str, Any]
) -> Dict[str, Any]:
"""Block an IP address"""
# In production, this would update firewall rules
ip_address = params.get("ip_address")
return {
"ip_blocked": True,
"ip_address": ip_address
}
async def _action_send_email(
self,
params: Dict[str, Any],
context: Dict[str, Any]
) -> Dict[str, Any]:
"""Send an email"""
# In production, this would send an actual email
return {
"email_sent": True,
"to": params.get("to"),
"subject": params.get("subject")
}
def get_playbook_engine() -> PlaybookEngine:
"""
Factory function to create playbook engine
Returns:
Configured PlaybookEngine instance
"""
return PlaybookEngine()