From cc1d7696bc2845d6fa57145274071f8e1a83b7df Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 9 Dec 2025 17:33:10 +0000 Subject: [PATCH] Implement Phase 3: Advanced search, real-time notifications, and Velociraptor integration Co-authored-by: mblanke <9078342+mblanke@users.noreply.github.com> --- .../b2c3d4e5f6g7_add_phase_3_tables.py | 50 +++++ backend/app/api/routes/hosts.py | 33 ++- backend/app/api/routes/notifications.py | 164 +++++++++++++++ backend/app/api/routes/velociraptor.py | 197 ++++++++++++++++++ backend/app/core/velociraptor.py | 194 +++++++++++++++++ backend/app/main.py | 8 +- backend/app/models/notification.py | 23 ++ backend/app/schemas/notification.py | 34 +++ backend/requirements.txt | 2 + 9 files changed, 700 insertions(+), 5 deletions(-) create mode 100644 backend/alembic/versions/b2c3d4e5f6g7_add_phase_3_tables.py create mode 100644 backend/app/api/routes/notifications.py create mode 100644 backend/app/api/routes/velociraptor.py create mode 100644 backend/app/core/velociraptor.py create mode 100644 backend/app/models/notification.py create mode 100644 backend/app/schemas/notification.py diff --git a/backend/alembic/versions/b2c3d4e5f6g7_add_phase_3_tables.py b/backend/alembic/versions/b2c3d4e5f6g7_add_phase_3_tables.py new file mode 100644 index 0000000..969574e --- /dev/null +++ b/backend/alembic/versions/b2c3d4e5f6g7_add_phase_3_tables.py @@ -0,0 +1,50 @@ +"""Add Phase 3 tables + +Revision ID: b2c3d4e5f6g7 +Revises: a1b2c3d4e5f6 +Create Date: 2025-12-09 17:30:00.000000 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = 'b2c3d4e5f6g7' +down_revision: Union[str, Sequence[str], None] = 'a1b2c3d4e5f6' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Upgrade schema for Phase 3.""" + + # Create notifications table + op.create_table( + 'notifications', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('user_id', sa.Integer(), nullable=False), + sa.Column('tenant_id', sa.Integer(), nullable=False), + sa.Column('title', sa.String(), nullable=False), + sa.Column('message', sa.Text(), nullable=False), + sa.Column('notification_type', sa.String(), nullable=False), + sa.Column('is_read', sa.Boolean(), nullable=False), + sa.Column('link', sa.String(), nullable=True), + sa.Column('created_at', sa.DateTime(), nullable=True), + sa.ForeignKeyConstraint(['tenant_id'], ['tenants.id'], ), + sa.ForeignKeyConstraint(['user_id'], ['users.id'], ), + sa.PrimaryKeyConstraint('id') + ) + op.create_index(op.f('ix_notifications_id'), 'notifications', ['id'], unique=False) + op.create_index(op.f('ix_notifications_created_at'), 'notifications', ['created_at'], unique=False) + + +def downgrade() -> None: + """Downgrade schema for Phase 3.""" + + # Drop notifications table + op.drop_index(op.f('ix_notifications_created_at'), table_name='notifications') + op.drop_index(op.f('ix_notifications_id'), table_name='notifications') + op.drop_table('notifications') diff --git a/backend/app/api/routes/hosts.py b/backend/app/api/routes/hosts.py index 16eac33..1e8833a 100644 --- a/backend/app/api/routes/hosts.py +++ b/backend/app/api/routes/hosts.py @@ -37,14 +37,43 @@ class HostRead(BaseModel): async def list_hosts( skip: int = 0, limit: int = 100, + hostname: Optional[str] = None, + ip_address: Optional[str] = None, + os: Optional[str] = None, + sort_by: Optional[str] = None, + sort_desc: bool = False, current_user: User = Depends(get_current_active_user), tenant_id: int = Depends(get_tenant_id), db: Session = Depends(get_db) ): """ - List hosts scoped to user's tenant + List hosts scoped to user's tenant with advanced filtering + + Supports: + - Filtering by hostname, IP address, OS + - Sorting by any field + - Pagination """ - hosts = db.query(Host).filter(Host.tenant_id == tenant_id).offset(skip).limit(limit).all() + query = db.query(Host).filter(Host.tenant_id == tenant_id) + + # Apply filters + if hostname: + query = query.filter(Host.hostname.ilike(f"%{hostname}%")) + if ip_address: + query = query.filter(Host.ip_address.ilike(f"%{ip_address}%")) + if os: + query = query.filter(Host.os.ilike(f"%{os}%")) + + # Apply sorting + if sort_by: + sort_column = getattr(Host, sort_by, None) + if sort_column: + if sort_desc: + query = query.order_by(sort_column.desc()) + else: + query = query.order_by(sort_column) + + hosts = query.offset(skip).limit(limit).all() return hosts diff --git a/backend/app/api/routes/notifications.py b/backend/app/api/routes/notifications.py new file mode 100644 index 0000000..ea10d54 --- /dev/null +++ b/backend/app/api/routes/notifications.py @@ -0,0 +1,164 @@ +from typing import List +from fastapi import APIRouter, Depends, HTTPException, status, WebSocket, WebSocketDisconnect +from sqlalchemy.orm import Session +import json + +from app.core.database import get_db +from app.core.deps import get_current_active_user +from app.models.user import User +from app.models.notification import Notification +from app.schemas.notification import NotificationRead, NotificationUpdate + +router = APIRouter() + +# Store active WebSocket connections +active_connections: dict[int, List[WebSocket]] = {} + + +@router.get("/", response_model=List[NotificationRead]) +async def list_notifications( + skip: int = 0, + limit: int = 50, + unread_only: bool = False, + current_user: User = Depends(get_current_active_user), + db: Session = Depends(get_db) +): + """ + List notifications for current user + + Supports filtering by read/unread status. + """ + query = db.query(Notification).filter( + Notification.user_id == current_user.id, + Notification.tenant_id == current_user.tenant_id + ) + + if unread_only: + query = query.filter(Notification.is_read == False) + + notifications = query.order_by(Notification.created_at.desc()).offset(skip).limit(limit).all() + return notifications + + +@router.put("/{notification_id}", response_model=NotificationRead) +async def update_notification( + notification_id: int, + notification_update: NotificationUpdate, + current_user: User = Depends(get_current_active_user), + db: Session = Depends(get_db) +): + """ + Update notification (mark as read/unread) + """ + notification = db.query(Notification).filter( + Notification.id == notification_id, + Notification.user_id == current_user.id + ).first() + + if not notification: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Notification not found" + ) + + notification.is_read = notification_update.is_read + db.commit() + db.refresh(notification) + + return notification + + +@router.post("/mark-all-read") +async def mark_all_read( + current_user: User = Depends(get_current_active_user), + db: Session = Depends(get_db) +): + """ + Mark all notifications as read for current user + """ + db.query(Notification).filter( + Notification.user_id == current_user.id, + Notification.is_read == False + ).update({"is_read": True}) + db.commit() + + return {"message": "All notifications marked as read"} + + +@router.websocket("/ws") +async def websocket_endpoint( + websocket: WebSocket, + db: Session = Depends(get_db) +): + """ + WebSocket endpoint for real-time notifications + + Clients should send their token on connect, then receive notifications in real-time. + """ + await websocket.accept() + user_id = None + + try: + # Wait for authentication message + auth_data = await websocket.receive_text() + auth_json = json.loads(auth_data) + token = auth_json.get("token") + + # Validate token and get user (simplified - in production use proper auth) + from app.core.security import verify_token + payload = verify_token(token) + if payload: + user_id = payload.get("sub") + + # Register connection + if user_id not in active_connections: + active_connections[user_id] = [] + active_connections[user_id].append(websocket) + + # Send confirmation + await websocket.send_json({"type": "connected", "user_id": user_id}) + + # Keep connection alive + while True: + data = await websocket.receive_text() + # Echo back for keepalive + await websocket.send_json({"type": "pong"}) + else: + await websocket.close(code=1008) # Policy violation + + except WebSocketDisconnect: + # Remove connection + if user_id and user_id in active_connections: + active_connections[user_id].remove(websocket) + if not active_connections[user_id]: + del active_connections[user_id] + except Exception as e: + print(f"WebSocket error: {e}") + await websocket.close(code=1011) # Internal error + + +async def send_notification_to_user(user_id: int, notification: dict): + """ + Send notification to all connected clients for a user + + Args: + user_id: User ID to send notification to + notification: Notification data to send + """ + if user_id in active_connections: + disconnected = [] + for websocket in active_connections[user_id]: + try: + await websocket.send_json({ + "type": "notification", + "data": notification + }) + except: + disconnected.append(websocket) + + # Clean up disconnected websockets + for ws in disconnected: + active_connections[user_id].remove(ws) + + if not active_connections[user_id]: + del active_connections[user_id] diff --git a/backend/app/api/routes/velociraptor.py b/backend/app/api/routes/velociraptor.py new file mode 100644 index 0000000..f67d180 --- /dev/null +++ b/backend/app/api/routes/velociraptor.py @@ -0,0 +1,197 @@ +from typing import List, Dict, Any, Optional +from fastapi import APIRouter, Depends, HTTPException, status +from sqlalchemy.orm import Session +from pydantic import BaseModel + +from app.core.database import get_db +from app.core.deps import get_current_active_user, require_role +from app.core.velociraptor import get_velociraptor_client +from app.models.user import User + +router = APIRouter() + + +class VelociraptorConfig(BaseModel): + """Velociraptor server configuration""" + base_url: str + api_key: str + + +class ArtifactCollectionRequest(BaseModel): + """Request to collect an artifact""" + client_id: str + artifact_name: str + parameters: Optional[Dict[str, Any]] = None + + +class HuntCreateRequest(BaseModel): + """Request to create a hunt""" + hunt_name: str + artifact_name: str + description: str + parameters: Optional[Dict[str, Any]] = None + + +# In a real implementation, this would be stored per-tenant in database +# For now, using a simple in-memory store +velociraptor_configs: Dict[int, VelociraptorConfig] = {} + + +@router.post("/config") +async def set_velociraptor_config( + config: VelociraptorConfig, + current_user: User = Depends(require_role(["admin"])), + db: Session = Depends(get_db) +): + """ + Configure Velociraptor integration (admin only) + + Stores Velociraptor server URL and API key for the tenant. + """ + velociraptor_configs[current_user.tenant_id] = config + return {"message": "Velociraptor configuration saved"} + + +@router.get("/clients") +async def list_velociraptor_clients( + limit: int = 100, + current_user: User = Depends(get_current_active_user), + db: Session = Depends(get_db) +): + """ + List clients from Velociraptor server + """ + config = velociraptor_configs.get(current_user.tenant_id) + if not config: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Velociraptor not configured for this tenant" + ) + + client = get_velociraptor_client(config.base_url, config.api_key) + try: + clients = await client.list_clients(limit=limit) + return {"clients": clients} + except Exception as e: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to fetch clients: {str(e)}" + ) + + +@router.get("/clients/{client_id}") +async def get_velociraptor_client_info( + client_id: str, + current_user: User = Depends(get_current_active_user), + db: Session = Depends(get_db) +): + """ + Get information about a specific Velociraptor client + """ + config = velociraptor_configs.get(current_user.tenant_id) + if not config: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Velociraptor not configured for this tenant" + ) + + client = get_velociraptor_client(config.base_url, config.api_key) + try: + client_info = await client.get_client(client_id) + return client_info + except Exception as e: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to fetch client: {str(e)}" + ) + + +@router.post("/collect") +async def collect_artifact( + request: ArtifactCollectionRequest, + current_user: User = Depends(get_current_active_user), + db: Session = Depends(get_db) +): + """ + Collect an artifact from a Velociraptor client + """ + config = velociraptor_configs.get(current_user.tenant_id) + if not config: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Velociraptor not configured for this tenant" + ) + + client = get_velociraptor_client(config.base_url, config.api_key) + try: + result = await client.collect_artifact( + request.client_id, + request.artifact_name, + request.parameters + ) + return result + except Exception as e: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to collect artifact: {str(e)}" + ) + + +@router.post("/hunts") +async def create_hunt( + request: HuntCreateRequest, + current_user: User = Depends(require_role(["admin"])), + db: Session = Depends(get_db) +): + """ + Create a new hunt (admin only) + """ + config = velociraptor_configs.get(current_user.tenant_id) + if not config: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Velociraptor not configured for this tenant" + ) + + client = get_velociraptor_client(config.base_url, config.api_key) + try: + result = await client.create_hunt( + request.hunt_name, + request.artifact_name, + request.description, + request.parameters + ) + return result + except Exception as e: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to create hunt: {str(e)}" + ) + + +@router.get("/hunts/{hunt_id}/results") +async def get_hunt_results( + hunt_id: str, + limit: int = 1000, + current_user: User = Depends(get_current_active_user), + db: Session = Depends(get_db) +): + """ + Get results from a hunt + """ + config = velociraptor_configs.get(current_user.tenant_id) + if not config: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Velociraptor not configured for this tenant" + ) + + client = get_velociraptor_client(config.base_url, config.api_key) + try: + results = await client.get_hunt_results(hunt_id, limit=limit) + return {"results": results} + except Exception as e: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to fetch hunt results: {str(e)}" + ) diff --git a/backend/app/core/velociraptor.py b/backend/app/core/velociraptor.py new file mode 100644 index 0000000..6cbcb4b --- /dev/null +++ b/backend/app/core/velociraptor.py @@ -0,0 +1,194 @@ +""" +Velociraptor API Client + +This module provides integration with Velociraptor servers for artifact +collection, hunt management, and client operations. +""" + +from typing import List, Dict, Any, Optional +import httpx +from datetime import datetime + + +class VelociraptorClient: + """Client for interacting with Velociraptor API""" + + def __init__(self, base_url: str, api_key: str): + """ + Initialize Velociraptor client + + Args: + base_url: Base URL of Velociraptor server + api_key: API key for authentication + """ + self.base_url = base_url.rstrip('/') + self.api_key = api_key + self.headers = { + "Authorization": f"******", + "Content-Type": "application/json" + } + + async def list_clients(self, limit: int = 100) -> List[Dict[str, Any]]: + """ + List all clients + + Args: + limit: Maximum number of clients to return + + Returns: + List of client information dictionaries + """ + async with httpx.AsyncClient() as client: + response = await client.get( + f"{self.base_url}/api/v1/clients", + headers=self.headers, + params={"count": limit} + ) + response.raise_for_status() + return response.json() + + async def get_client(self, client_id: str) -> Dict[str, Any]: + """ + Get information about a specific client + + Args: + client_id: Client ID + + Returns: + Client information dictionary + """ + async with httpx.AsyncClient() as client: + response = await client.get( + f"{self.base_url}/api/v1/clients/{client_id}", + headers=self.headers + ) + response.raise_for_status() + return response.json() + + async def collect_artifact( + self, + client_id: str, + artifact_name: str, + parameters: Optional[Dict[str, Any]] = None + ) -> Dict[str, Any]: + """ + Collect an artifact from a client + + Args: + client_id: Client ID + artifact_name: Name of artifact to collect + parameters: Optional parameters for artifact collection + + Returns: + Collection flow information + """ + payload = { + "client_id": client_id, + "artifacts": [artifact_name], + "parameters": parameters or {} + } + + async with httpx.AsyncClient() as client: + response = await client.post( + f"{self.base_url}/api/v1/flows", + headers=self.headers, + json=payload + ) + response.raise_for_status() + return response.json() + + async def create_hunt( + self, + hunt_name: str, + artifact_name: str, + description: str, + parameters: Optional[Dict[str, Any]] = None + ) -> Dict[str, Any]: + """ + Create a new hunt + + Args: + hunt_name: Name of the hunt + artifact_name: Artifact to collect + description: Hunt description + parameters: Optional parameters for artifact + + Returns: + Hunt information + """ + payload = { + "hunt_name": hunt_name, + "description": description, + "artifacts": [artifact_name], + "parameters": parameters or {} + } + + async with httpx.AsyncClient() as client: + response = await client.post( + f"{self.base_url}/api/v1/hunts", + headers=self.headers, + json=payload + ) + response.raise_for_status() + return response.json() + + async def get_hunt_results( + self, + hunt_id: str, + limit: int = 1000 + ) -> List[Dict[str, Any]]: + """ + Get results from a hunt + + Args: + hunt_id: Hunt ID + limit: Maximum number of results to return + + Returns: + List of hunt results + """ + async with httpx.AsyncClient() as client: + response = await client.get( + f"{self.base_url}/api/v1/hunts/{hunt_id}/results", + headers=self.headers, + params={"count": limit} + ) + response.raise_for_status() + return response.json() + + async def get_flow_results( + self, + client_id: str, + flow_id: str + ) -> List[Dict[str, Any]]: + """ + Get results from a flow + + Args: + client_id: Client ID + flow_id: Flow ID + + Returns: + List of flow results + """ + async with httpx.AsyncClient() as client: + response = await client.get( + f"{self.base_url}/api/v1/clients/{client_id}/flows/{flow_id}/results", + headers=self.headers + ) + response.raise_for_status() + return response.json() + + +def get_velociraptor_client(base_url: str, api_key: str) -> VelociraptorClient: + """ + Factory function to create Velociraptor client + + Args: + base_url: Base URL of Velociraptor server + api_key: API key for authentication + + Returns: + Configured VelociraptorClient instance + """ + return VelociraptorClient(base_url, api_key) diff --git a/backend/app/main.py b/backend/app/main.py index fa5e25a..3260161 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -1,13 +1,13 @@ from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware -from app.api.routes import auth, users, tenants, hosts, ingestion, vt, audit +from app.api.routes import auth, users, tenants, hosts, ingestion, vt, audit, notifications, velociraptor from app.core.config import settings app = FastAPI( title=settings.app_name, description="Multi-tenant threat hunting companion for Velociraptor", - version="0.2.0" + version="0.3.0" ) # Configure CORS @@ -27,6 +27,8 @@ app.include_router(hosts.router, prefix="/api/hosts", tags=["Hosts"]) app.include_router(ingestion.router, prefix="/api/ingestion", tags=["Ingestion"]) app.include_router(vt.router, prefix="/api/vt", tags=["VirusTotal"]) app.include_router(audit.router, prefix="/api/audit", tags=["Audit Logs"]) +app.include_router(notifications.router, prefix="/api/notifications", tags=["Notifications"]) +app.include_router(velociraptor.router, prefix="/api/velociraptor", tags=["Velociraptor"]) @app.get("/") @@ -34,7 +36,7 @@ async def root(): """Root endpoint""" return { "message": f"Welcome to {settings.app_name}", - "version": "0.2.0", + "version": "0.3.0", "docs": "/docs" } diff --git a/backend/app/models/notification.py b/backend/app/models/notification.py new file mode 100644 index 0000000..ae3bd05 --- /dev/null +++ b/backend/app/models/notification.py @@ -0,0 +1,23 @@ +from sqlalchemy import Column, Integer, String, DateTime, ForeignKey, Boolean, Text +from sqlalchemy.orm import relationship +from datetime import datetime, timezone + +from app.core.database import Base + + +class Notification(Base): + __tablename__ = "notifications" + + id = Column(Integer, primary_key=True, index=True) + user_id = Column(Integer, ForeignKey("users.id"), nullable=False) + tenant_id = Column(Integer, ForeignKey("tenants.id"), nullable=False) + title = Column(String, nullable=False) + message = Column(Text, nullable=False) + notification_type = Column(String, nullable=False) # info, warning, error, success + is_read = Column(Boolean, default=False, nullable=False) + link = Column(String, nullable=True) + created_at = Column(DateTime, default=lambda: datetime.now(timezone.utc), index=True) + + # Relationships + user = relationship("User") + tenant = relationship("Tenant") diff --git a/backend/app/schemas/notification.py b/backend/app/schemas/notification.py new file mode 100644 index 0000000..9c903ce --- /dev/null +++ b/backend/app/schemas/notification.py @@ -0,0 +1,34 @@ +from pydantic import BaseModel +from typing import Optional +from datetime import datetime + + +class NotificationBase(BaseModel): + """Base notification schema""" + title: str + message: str + notification_type: str + link: Optional[str] = None + + +class NotificationCreate(NotificationBase): + """Schema for creating a notification""" + user_id: int + tenant_id: int + + +class NotificationRead(NotificationBase): + """Schema for reading notification data""" + id: int + user_id: int + tenant_id: int + is_read: bool + created_at: datetime + + class Config: + from_attributes = True + + +class NotificationUpdate(BaseModel): + """Schema for updating a notification""" + is_read: bool diff --git a/backend/requirements.txt b/backend/requirements.txt index 9874e4d..1021bd6 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -11,3 +11,5 @@ pydantic-settings==2.1.0 pyotp==2.9.0 qrcode[pil]==7.4.2 websockets==12.0 +httpx==0.26.0 +email-validator==2.1.0