Complete backend infrastructure and authentication system

Co-authored-by: mblanke <9078342+mblanke@users.noreply.github.com>
This commit is contained in:
copilot-swe-agent[bot]
2025-12-09 14:29:06 +00:00
parent af23e610b2
commit 961946026a
47 changed files with 2337 additions and 1 deletions

View File

View File

@@ -0,0 +1,16 @@
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
"""Application settings"""
app_name: str = "VelociCompanion"
database_url: str = "postgresql://postgres:postgres@db:5432/velocicompanion"
secret_key: str = "your-secret-key-change-in-production-min-32-chars-long"
access_token_expire_minutes: int = 30
algorithm: str = "HS256"
class Config:
env_file = ".env"
settings = Settings()

View File

@@ -0,0 +1,19 @@
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from app.core.config import settings
engine = create_engine(settings.database_url)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
def get_db():
"""Dependency for getting database session"""
db = SessionLocal()
try:
yield db
finally:
db.close()

108
backend/app/core/deps.py Normal file
View File

@@ -0,0 +1,108 @@
from typing import List, Optional
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session
from app.core.database import get_db
from app.core.security import verify_token
from app.models.user import User
# OAuth2 scheme for token extraction
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/login")
async def get_current_user(
token: str = Depends(oauth2_scheme),
db: Session = Depends(get_db)
) -> User:
"""
Extract and validate JWT from Authorization header
Args:
token: JWT token from Authorization header
db: Database session
Returns:
Current user object
Raises:
HTTPException: If token is invalid or user not found
"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
# Verify and decode token
payload = verify_token(token)
if payload is None:
raise credentials_exception
user_id: Optional[int] = payload.get("sub")
if user_id is None:
raise credentials_exception
# Get user from database
user = db.query(User).filter(User.id == user_id).first()
if user is None:
raise credentials_exception
return user
async def get_current_active_user(
current_user: User = Depends(get_current_user)
) -> User:
"""
Ensure user is active
Args:
current_user: Current user from get_current_user
Returns:
Active user object
Raises:
HTTPException: If user is inactive
"""
if not current_user.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Inactive user"
)
return current_user
def require_role(allowed_roles: List[str]):
"""
Role-based access control dependency factory
Args:
allowed_roles: List of roles that are allowed to access the endpoint
Returns:
Dependency function that checks user role
"""
async def role_checker(current_user: User = Depends(get_current_active_user)) -> User:
if current_user.role not in allowed_roles:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Insufficient permissions"
)
return current_user
return role_checker
async def get_tenant_id(current_user: User = Depends(get_current_active_user)) -> int:
"""
Extract tenant_id from current user for scoping queries
Args:
current_user: Current active user
Returns:
Tenant ID
"""
return current_user.tenant_id

View File

@@ -0,0 +1,58 @@
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from app.core.config import settings
# Password hashing context
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a plain password against a hashed password"""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""Hash a password using bcrypt"""
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""
Create a JWT access token
Args:
data: Dictionary containing user_id, tenant_id, role
expires_delta: Optional expiration time delta
Returns:
Encoded JWT token
"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=settings.access_token_expire_minutes)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, settings.secret_key, algorithm=settings.algorithm)
return encoded_jwt
def verify_token(token: str) -> Optional[dict]:
"""
Verify and decode a JWT token
Args:
token: JWT token string
Returns:
Decoded token payload or None if invalid
"""
try:
payload = jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm])
return payload
except JWTError:
return None