from fastapi import FastAPI, Request, Depends, Form, Response, HTTPException, status from fastapi.templating import Jinja2Templates from fastapi.staticfiles import StaticFiles from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse, StreamingResponse from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, func from contextlib import asynccontextmanager import json from datetime import datetime from typing import Optional from app.database import init_db, get_db from app.models import Book, ListeningSession, Recommendation, User, AppSettings from app.abs_client import get_abs_client from app.recommender import BookRecommender from app.config import get_settings from app.auth import ( get_current_user, get_current_user_optional, get_current_admin, authenticate_user, create_user, set_session_cookie, clear_session_cookie ) from app.services.stats import ReadingStatsService @asynccontextmanager async def lifespan(app: FastAPI): """Initialize database on startup.""" await init_db() yield # Initialize FastAPI app app = FastAPI( title="Audiobookshelf Recommendations", description="AI-powered book recommendations based on your listening history", lifespan=lifespan ) # Setup templates and static files templates = Jinja2Templates(directory="app/templates") app.mount("/static", StaticFiles(directory="app/static"), name="static") # Initialize recommender (shared across users) recommender = BookRecommender() @app.get("/", response_class=HTMLResponse) async def home( request: Request, db: AsyncSession = Depends(get_db), user: Optional[User] = Depends(get_current_user_optional) ): """Home page showing dashboard or landing page.""" # If user not logged in, show landing page if not user: return templates.TemplateResponse( "index.html", { "request": request, "user": None, "books": [], "recommendations": [] } ) # Get user's recent books and recommendations recent_sessions = await db.execute( select(ListeningSession) .where(ListeningSession.user_id == user.id) .order_by(ListeningSession.last_update.desc()) .limit(10) ) sessions = recent_sessions.scalars().all() # Get book details for sessions books = [] for session in sessions: book_result = await db.execute( select(Book).where(Book.id == session.book_id) ) book = book_result.scalar_one_or_none() if book: books.append({ "book": book, "session": session }) # Get user's recent recommendations recs_result = await db.execute( select(Recommendation) .where( Recommendation.user_id == user.id, Recommendation.dismissed == False ) .order_by(Recommendation.created_at.desc()) .limit(5) ) recommendations_raw = recs_result.scalars().all() # Parse JSON fields for template recommendations = [] for rec in recommendations_raw: rec_dict = { "id": rec.id, "title": rec.title, "author": rec.author, "description": rec.description, "reason": rec.reason, "genres": json.loads(rec.genres) if rec.genres else [] } recommendations.append(rec_dict) return templates.TemplateResponse( "index.html", { "request": request, "user": user, "books": books, "recommendations": recommendations } ) # ==================== Authentication Routes ==================== @app.get("/login", response_class=HTMLResponse) async def login_page(request: Request): """Login page.""" return templates.TemplateResponse("login.html", {"request": request}) @app.post("/api/auth/login") async def login( username: str = Form(...), password: str = Form(...), db: AsyncSession = Depends(get_db) ): """Authenticate user and create session.""" user = await authenticate_user(db, username, password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password" ) # Create redirect response and set session cookie redirect = RedirectResponse(url="/", status_code=status.HTTP_303_SEE_OTHER) set_session_cookie(redirect, user.id) return redirect @app.get("/register", response_class=HTMLResponse) async def register_page(request: Request): """Registration page.""" return templates.TemplateResponse("register.html", {"request": request}) @app.post("/api/auth/register") async def register( username: str = Form(...), email: str = Form(...), password: str = Form(...), abs_url: str = Form(...), abs_api_token: str = Form(...), display_name: Optional[str] = Form(None), db: AsyncSession = Depends(get_db) ): """Register a new user.""" try: # Check if registration is allowed result = await db.execute( select(AppSettings).where(AppSettings.key == "allow_registration") ) allow_reg_setting = result.scalar_one_or_none() # Check if there are any existing users (first user is always allowed) result = await db.execute(select(func.count(User.id))) user_count = result.scalar() if user_count > 0 and allow_reg_setting and allow_reg_setting.value.lower() != 'true': raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Registration is currently disabled" ) user = await create_user( db=db, username=username, email=email, password=password, abs_url=abs_url, abs_api_token=abs_api_token, display_name=display_name ) # Create redirect response and set session cookie redirect = RedirectResponse(url="/", status_code=status.HTTP_303_SEE_OTHER) set_session_cookie(redirect, user.id) return redirect except HTTPException as e: raise e except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e) ) @app.post("/api/auth/logout") async def logout(response: Response): """Logout user and clear session.""" clear_session_cookie(response) return JSONResponse({ "status": "success", "message": "Logged out successfully" }) # ==================== API Routes ==================== @app.get("/api/sync") async def sync_with_audiobookshelf( db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user) ): """Sync library and progress from Audiobookshelf.""" try: # Create user-specific ABS client abs_client = get_abs_client(user) # Get user info which includes all media progress user_info = await abs_client.get_user_info() media_progress = user_info.get("mediaProgress", []) synced_count = 0 for progress_item in media_progress: # Skip podcast episodes, only process books if progress_item.get("mediaItemType") != "book": continue library_item_id = progress_item.get("libraryItemId") if not library_item_id: continue # Fetch full library item details try: item = await abs_client.get_item_details(library_item_id) except: # Skip if item not found continue # Extract book info media = item.get("media", {}) metadata = media.get("metadata", {}) book_id = item.get("id") if not book_id: continue # Check if book exists in DB result = await db.execute(select(Book).where(Book.id == book_id)) book = result.scalar_one_or_none() # Create or update book if not book: book = Book( id=book_id, title=metadata.get("title", "Unknown"), author=metadata.get("authorName", "Unknown"), narrator=metadata.get("narratorName"), description=metadata.get("description"), genres=json.dumps(metadata.get("genres", [])), tags=json.dumps(media.get("tags", [])), duration=media.get("duration", 0), cover_url=media.get("coverPath") # Store relative path ) db.add(book) else: # Update existing book book.title = metadata.get("title", book.title) book.author = metadata.get("authorName", book.author) book.cover_url = media.get("coverPath") # Store relative path book.updated_at = datetime.now() # Update or create listening session progress_data = progress_item.get("progress", 0) current_time = progress_item.get("currentTime", 0) is_finished = progress_item.get("isFinished", False) started_at_ts = progress_item.get("startedAt") finished_at_ts = progress_item.get("finishedAt") session_result = await db.execute( select(ListeningSession) .where( ListeningSession.user_id == user.id, ListeningSession.book_id == book_id ) .order_by(ListeningSession.last_update.desc()) .limit(1) ) session = session_result.scalar_one_or_none() if not session: session = ListeningSession( user_id=user.id, book_id=book_id, progress=progress_data, current_time=current_time, is_finished=is_finished, started_at=datetime.fromtimestamp(started_at_ts / 1000) if started_at_ts else datetime.now(), finished_at=datetime.fromtimestamp(finished_at_ts / 1000) if finished_at_ts else None ) db.add(session) else: # Update existing session session.progress = progress_data session.current_time = current_time session.is_finished = is_finished if finished_at_ts and not session.finished_at: session.finished_at = datetime.fromtimestamp(finished_at_ts / 1000) synced_count += 1 await db.commit() return JSONResponse({ "status": "success", "synced": synced_count, "message": f"Synced {synced_count} books from Audiobookshelf" }) except Exception as e: return JSONResponse( {"status": "error", "message": str(e)}, status_code=500 ) @app.get("/api/recommendations/generate") async def generate_recommendations( db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user) ): """Generate new AI recommendations based on reading history.""" try: # Get finished books for context finished_result = await db.execute( select(ListeningSession, Book) .join(Book, ListeningSession.book_id == Book.id) .where( ListeningSession.user_id == user.id, ListeningSession.is_finished == True ) .order_by(ListeningSession.finished_at.desc()) .limit(20) ) finished_items = finished_result.all() if not finished_items: return JSONResponse({ "status": "error", "message": "No reading history found. Please sync with Audiobookshelf first." }) # Format reading history reading_history = [] for session, book in finished_items: reading_history.append({ "title": book.title, "author": book.author, "genres": json.loads(book.genres) if book.genres else [], "progress": session.progress, "is_finished": session.is_finished }) # Generate recommendations new_recs = await recommender.generate_recommendations( reading_history, num_recommendations=5 ) # Save to database for rec in new_recs: recommendation = Recommendation( user_id=user.id, title=rec.get("title"), author=rec.get("author"), description=rec.get("description"), reason=rec.get("reason"), genres=json.dumps(rec.get("genres", [])) ) db.add(recommendation) await db.commit() return JSONResponse({ "status": "success", "recommendations": new_recs, "count": len(new_recs) }) except Exception as e: return JSONResponse( {"status": "error", "message": str(e)}, status_code=500 ) @app.get("/api/recommendations") async def get_recommendations( db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user) ): """Get saved recommendations.""" result = await db.execute( select(Recommendation) .where( Recommendation.user_id == user.id, Recommendation.dismissed == False ) .order_by(Recommendation.created_at.desc()) ) recommendations = result.scalars().all() return JSONResponse({ "recommendations": [ { "id": rec.id, "title": rec.title, "author": rec.author, "description": rec.description, "reason": rec.reason, "genres": json.loads(rec.genres) if rec.genres else [], "created_at": rec.created_at.isoformat() } for rec in recommendations ] }) @app.get("/api/history") async def get_listening_history( db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user) ): """Get listening history.""" result = await db.execute( select(ListeningSession, Book) .join(Book, ListeningSession.book_id == Book.id) .where(ListeningSession.user_id == user.id) .order_by(ListeningSession.last_update.desc()) ) items = result.all() return JSONResponse({ "history": [ { "book": { "id": book.id, "title": book.title, "author": book.author, "cover_url": book.cover_url, }, "session": { "progress": session.progress, "is_finished": session.is_finished, "started_at": session.started_at.isoformat() if session.started_at else None, "finished_at": session.finished_at.isoformat() if session.finished_at else None, } } for session, book in items ] }) # ==================== Reading Log Routes ==================== @app.get("/reading-log", response_class=HTMLResponse) async def reading_log_page( request: Request, user: User = Depends(get_current_user) ): """Reading log page with stats and filters.""" return templates.TemplateResponse( "reading_log.html", { "request": request, "user": user } ) @app.get("/api/reading-log/stats") async def get_reading_stats( db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), start_date: Optional[str] = None, end_date: Optional[str] = None ): """Get reading statistics for the user.""" try: # Parse dates if provided start_dt = datetime.fromisoformat(start_date) if start_date else None end_dt = datetime.fromisoformat(end_date) if end_date else None # Calculate stats stats_service = ReadingStatsService(db, user.id, user.abs_url) stats = await stats_service.calculate_stats(start_dt, end_dt) return JSONResponse(stats) except Exception as e: return JSONResponse( {"status": "error", "message": str(e)}, status_code=500 ) @app.put("/api/sessions/{session_id}/rating") async def update_session_rating( session_id: int, rating: int = Form(...), db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user) ): """Update the rating for a listening session.""" try: # Validate rating if rating < 1 or rating > 5: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Rating must be between 1 and 5" ) # Get session and verify ownership result = await db.execute( select(ListeningSession).where( ListeningSession.id == session_id, ListeningSession.user_id == user.id ) ) session = result.scalar_one_or_none() if not session: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Session not found" ) # Update rating session.rating = rating await db.commit() return JSONResponse({ "status": "success", "message": "Rating updated successfully", "rating": rating }) except HTTPException as e: raise e except Exception as e: return JSONResponse( {"status": "error", "message": str(e)}, status_code=500 ) # ==================== Admin Routes ==================== @app.get("/admin", response_class=HTMLResponse) async def admin_page( request: Request, user: User = Depends(get_current_admin) ): """Admin panel page.""" return templates.TemplateResponse( "admin.html", { "request": request, "user": user } ) @app.get("/api/admin/settings") async def get_admin_settings( db: AsyncSession = Depends(get_db), admin: User = Depends(get_current_admin) ): """Get application settings.""" result = await db.execute(select(AppSettings)) settings = result.scalars().all() settings_dict = {s.key: s.value for s in settings} return JSONResponse(settings_dict) @app.put("/api/admin/settings/{key}") async def update_setting( key: str, value: str = Form(...), db: AsyncSession = Depends(get_db), admin: User = Depends(get_current_admin) ): """Update an application setting.""" result = await db.execute( select(AppSettings).where(AppSettings.key == key) ) setting = result.scalar_one_or_none() if not setting: # Create new setting setting = AppSettings(key=key, value=value) db.add(setting) else: # Update existing setting.value = value await db.commit() return JSONResponse({ "status": "success", "message": f"Setting {key} updated" }) @app.get("/api/admin/users") async def get_users( db: AsyncSession = Depends(get_db), admin: User = Depends(get_current_admin) ): """Get all users.""" result = await db.execute(select(User).order_by(User.created_at.desc())) users = result.scalars().all() users_list = [ { "id": user.id, "username": user.username, "email": user.email, "display_name": user.display_name, "is_admin": user.is_admin, "is_active": user.is_active, "created_at": user.created_at.isoformat() if user.created_at else None, "last_login": user.last_login.isoformat() if user.last_login else None, "is_current": user.id == admin.id } for user in users ] return JSONResponse({"users": users_list}) @app.put("/api/admin/users/{user_id}/admin") async def toggle_user_admin( user_id: int, is_admin: str = Form(...), db: AsyncSession = Depends(get_db), admin: User = Depends(get_current_admin) ): """Toggle admin status for a user.""" # Prevent admin from removing their own admin status if user_id == admin.id: return JSONResponse( {"status": "error", "message": "Cannot modify your own admin status"}, status_code=400 ) result = await db.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) user.is_admin = is_admin.lower() == 'true' await db.commit() return JSONResponse({ "status": "success", "message": "User admin status updated" }) @app.delete("/api/admin/users/{user_id}") async def delete_user( user_id: int, db: AsyncSession = Depends(get_db), admin: User = Depends(get_current_admin) ): """Delete a user.""" # Prevent admin from deleting themselves if user_id == admin.id: return JSONResponse( {"status": "error", "message": "Cannot delete your own account"}, status_code=400 ) result = await db.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) await db.delete(user) await db.commit() return JSONResponse({ "status": "success", "message": "User deleted successfully" }) @app.get("/health") async def health_check(): """Health check endpoint.""" return {"status": "healthy"} @app.get("/api/cover/{book_id}") async def get_book_cover( book_id: str, db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user) ): """Proxy book cover images from Audiobookshelf with authentication.""" import httpx # Get book from database result = await db.execute(select(Book).where(Book.id == book_id)) book = result.scalar_one_or_none() if not book or not book.cover_url: raise HTTPException(status_code=404, detail="Cover not found") # Get user's ABS client abs_client = get_abs_client(user) # Fetch cover from Audiobookshelf with auth cover_url = f"{abs_client.base_url}{book.cover_url}" async with httpx.AsyncClient() as client: try: response = await client.get(cover_url, headers=abs_client.headers) response.raise_for_status() # Return image with appropriate content type return Response( content=response.content, media_type=response.headers.get("content-type", "image/jpeg") ) except httpx.HTTPError: raise HTTPException(status_code=404, detail="Cover image not found")