from fastapi import FastAPI, Request, Depends, Form, Response, HTTPException, status from fastapi.templating import Jinja2Templates from fastapi.staticfiles import StaticFiles from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse, StreamingResponse from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, func from contextlib import asynccontextmanager import json from datetime import datetime from typing import Optional import httpx import os from pathlib import Path from app.database import init_db, get_db from app.models import Book, ListeningSession, Recommendation, User, AppSettings from app.abs_client import get_abs_client from app.recommender import BookRecommender from app.config import get_settings from app.auth import ( get_current_user, get_current_user_optional, get_current_admin, authenticate_user, create_user, set_session_cookie, clear_session_cookie ) from app.services.stats import ReadingStatsService def extract_author_name(metadata: dict) -> str: """ Extract author name from Audiobookshelf metadata. Tries multiple fields in order: 1. authorName (string field) 2. authors array (extract first author's name) 3. Falls back to "Unknown" """ # Try authorName field first author_name = metadata.get("authorName") if author_name: return author_name # Try authors array authors = metadata.get("authors", []) if authors and len(authors) > 0: # Authors can be objects with 'name' field or just strings first_author = authors[0] if isinstance(first_author, dict): author_name = first_author.get("name") if author_name: return author_name elif isinstance(first_author, str): return first_author return "Unknown" @asynccontextmanager async def lifespan(app: FastAPI): """Initialize database on startup.""" await init_db() yield # Initialize FastAPI app app = FastAPI( title="Dewy Oracle", description="AI-powered book recommendations based on your listening history", lifespan=lifespan ) # Setup templates and static files templates = Jinja2Templates(directory="app/templates") app.mount("/static", StaticFiles(directory="app/static"), name="static") # Initialize recommender (shared across users) recommender = BookRecommender() @app.get("/", response_class=HTMLResponse) async def home( request: Request, db: AsyncSession = Depends(get_db), user: Optional[User] = Depends(get_current_user_optional) ): """Home page showing dashboard or landing page.""" # If user not logged in, show landing page if not user: return templates.TemplateResponse( "index.html", { "request": request, "user": None, "books": [], "recommendations": [] } ) # Get user's recent books and recommendations recent_sessions = await db.execute( select(ListeningSession) .where(ListeningSession.user_id == user.id) .order_by(ListeningSession.last_update.desc()) .limit(10) ) sessions = recent_sessions.scalars().all() # Get book details for sessions books = [] for session in sessions: book_result = await db.execute( select(Book).where(Book.id == session.book_id) ) book = book_result.scalar_one_or_none() if book: books.append({ "book": book, "session": session }) # Get user's recent recommendations recs_result = await db.execute( select(Recommendation) .where( Recommendation.user_id == user.id, Recommendation.dismissed == False ) .order_by(Recommendation.created_at.desc()) .limit(5) ) recommendations_raw = recs_result.scalars().all() # Parse JSON fields for template recommendations = [] for rec in recommendations_raw: rec_dict = { "id": rec.id, "title": rec.title, "author": rec.author, "description": rec.description, "reason": rec.reason, "genres": json.loads(rec.genres) if rec.genres else [] } recommendations.append(rec_dict) return templates.TemplateResponse( "index.html", { "request": request, "user": user, "books": books, "recommendations": recommendations } ) # ==================== Authentication Routes ==================== @app.get("/login", response_class=HTMLResponse) async def login_page(request: Request): """Login page.""" return templates.TemplateResponse("login.html", {"request": request}) @app.post("/api/auth/login") async def login( username: str = Form(...), password: str = Form(...), db: AsyncSession = Depends(get_db) ): """Authenticate user and create session.""" user = await authenticate_user(db, username, password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password" ) # Create redirect response and set session cookie redirect = RedirectResponse(url="/", status_code=status.HTTP_303_SEE_OTHER) set_session_cookie(redirect, user.id) return redirect @app.get("/register", response_class=HTMLResponse) async def register_page(request: Request): """Registration page.""" return templates.TemplateResponse("register.html", {"request": request}) @app.post("/api/auth/register") async def register( username: str = Form(...), email: str = Form(...), password: str = Form(...), abs_url: str = Form(...), abs_api_token: str = Form(...), display_name: Optional[str] = Form(None), db: AsyncSession = Depends(get_db) ): """Register a new user.""" try: # Check if registration is allowed result = await db.execute( select(AppSettings).where(AppSettings.key == "allow_registration") ) allow_reg_setting = result.scalar_one_or_none() # Check if there are any existing users (first user is always allowed) result = await db.execute(select(func.count(User.id))) user_count = result.scalar() if user_count > 0 and allow_reg_setting and allow_reg_setting.value.lower() != 'true': raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Registration is currently disabled" ) user = await create_user( db=db, username=username, email=email, password=password, abs_url=abs_url, abs_api_token=abs_api_token, display_name=display_name ) # Create redirect response and set session cookie redirect = RedirectResponse(url="/", status_code=status.HTTP_303_SEE_OTHER) set_session_cookie(redirect, user.id) return redirect except HTTPException as e: raise e except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e) ) @app.post("/api/auth/logout") async def logout(response: Response): """Logout user and clear session.""" clear_session_cookie(response) return JSONResponse({ "status": "success", "message": "Logged out successfully" }) # ==================== API Routes ==================== async def download_cover_image(abs_client, book_id: str) -> Optional[str]: """ Download cover image from Audiobookshelf and save locally. Args: abs_client: Audiobookshelf client with authentication book_id: Library item ID Returns: Local path to saved cover (e.g., /static/covers/book_id.jpg) or None """ if not book_id: return None # Create covers directory if it doesn't exist covers_dir = Path("app/static/covers") covers_dir.mkdir(parents=True, exist_ok=True) # Use Audiobookshelf API cover endpoint cover_url = f"{abs_client.base_url}/api/items/{book_id}/cover" async with httpx.AsyncClient() as client: try: response = await client.get(cover_url, headers=abs_client.headers, follow_redirects=True) response.raise_for_status() # Determine extension from content-type content_type = response.headers.get("content-type", "image/jpeg") ext = ".webp" if "webp" in content_type else ".jpg" local_filename = f"{book_id}{ext}" local_path = covers_dir / local_filename # Save to local file with open(local_path, "wb") as f: f.write(response.content) # Return path relative to static directory return f"/static/covers/{local_filename}" except Exception as e: print(f"Failed to download cover for {book_id}: {e}") return None @app.get("/api/sync") async def sync_with_audiobookshelf( db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user) ): """Sync library and progress from Audiobookshelf.""" try: # Create user-specific ABS client abs_client = get_abs_client(user) # Get user info which includes all media progress user_info = await abs_client.get_user_info() media_progress = user_info.get("mediaProgress", []) synced_count = 0 for progress_item in media_progress: # Skip podcast episodes, only process books if progress_item.get("mediaItemType") != "book": continue library_item_id = progress_item.get("libraryItemId") if not library_item_id: continue # Fetch full library item details try: item = await abs_client.get_item_details(library_item_id) except: # Skip if item not found continue # Extract book info media = item.get("media", {}) metadata = media.get("metadata", {}) book_id = item.get("id") if not book_id: continue # Check if book exists in DB result = await db.execute(select(Book).where(Book.id == book_id)) book = result.scalar_one_or_none() # Download cover image and get local path local_cover_url = await download_cover_image(abs_client, book_id) # Create or update book if not book: book = Book( id=book_id, title=metadata.get("title", "Unknown"), author=extract_author_name(metadata), narrator=metadata.get("narratorName"), description=metadata.get("description"), genres=json.dumps(metadata.get("genres", [])), tags=json.dumps(media.get("tags", [])), duration=media.get("duration", 0), cover_url=local_cover_url # Store local path ) db.add(book) else: # Update existing book book.title = metadata.get("title", book.title) book.author = extract_author_name(metadata) if local_cover_url: # Only update if download succeeded book.cover_url = local_cover_url book.updated_at = datetime.now() # Update or create listening session progress_data = progress_item.get("progress", 0) current_time = progress_item.get("currentTime", 0) is_finished = progress_item.get("isFinished", False) started_at_ts = progress_item.get("startedAt") finished_at_ts = progress_item.get("finishedAt") session_result = await db.execute( select(ListeningSession) .where( ListeningSession.user_id == user.id, ListeningSession.book_id == book_id ) .order_by(ListeningSession.last_update.desc()) .limit(1) ) session = session_result.scalar_one_or_none() if not session: session = ListeningSession( user_id=user.id, book_id=book_id, progress=progress_data, current_time=current_time, is_finished=is_finished, started_at=datetime.fromtimestamp(started_at_ts / 1000) if started_at_ts else datetime.now(), finished_at=datetime.fromtimestamp(finished_at_ts / 1000) if finished_at_ts else None ) db.add(session) else: # Update existing session session.progress = progress_data session.current_time = current_time session.is_finished = is_finished if finished_at_ts and not session.finished_at: session.finished_at = datetime.fromtimestamp(finished_at_ts / 1000) synced_count += 1 await db.commit() return JSONResponse({ "status": "success", "synced": synced_count, "message": f"Synced {synced_count} books from Audiobookshelf" }) except Exception as e: return JSONResponse( {"status": "error", "message": str(e)}, status_code=500 ) @app.get("/api/recommendations/generate") async def generate_recommendations( db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user) ): """Generate new AI recommendations based on reading history.""" try: # Get finished books for context finished_result = await db.execute( select(ListeningSession, Book) .join(Book, ListeningSession.book_id == Book.id) .where( ListeningSession.user_id == user.id, ListeningSession.is_finished == True ) .order_by(ListeningSession.finished_at.desc()) .limit(20) ) finished_items = finished_result.all() if not finished_items: return JSONResponse({ "status": "error", "message": "No reading history found. Please sync with Audiobookshelf first." }) # Format reading history reading_history = [] for session, book in finished_items: reading_history.append({ "title": book.title, "author": book.author, "genres": json.loads(book.genres) if book.genres else [], "progress": session.progress, "is_finished": session.is_finished }) # Generate recommendations new_recs = await recommender.generate_recommendations( reading_history, num_recommendations=5 ) # Save to database for rec in new_recs: recommendation = Recommendation( user_id=user.id, title=rec.get("title"), author=rec.get("author"), description=rec.get("description"), reason=rec.get("reason"), genres=json.dumps(rec.get("genres", [])) ) db.add(recommendation) await db.commit() return JSONResponse({ "status": "success", "recommendations": new_recs, "count": len(new_recs) }) except Exception as e: return JSONResponse( {"status": "error", "message": str(e)}, status_code=500 ) @app.get("/api/recommendations") async def get_recommendations( db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user) ): """Get saved recommendations.""" result = await db.execute( select(Recommendation) .where( Recommendation.user_id == user.id, Recommendation.dismissed == False ) .order_by(Recommendation.created_at.desc()) ) recommendations = result.scalars().all() return JSONResponse({ "recommendations": [ { "id": rec.id, "title": rec.title, "author": rec.author, "description": rec.description, "reason": rec.reason, "genres": json.loads(rec.genres) if rec.genres else [], "created_at": rec.created_at.isoformat() } for rec in recommendations ] }) @app.get("/api/history") async def get_listening_history( db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user) ): """Get listening history.""" result = await db.execute( select(ListeningSession, Book) .join(Book, ListeningSession.book_id == Book.id) .where(ListeningSession.user_id == user.id) .order_by(ListeningSession.last_update.desc()) ) items = result.all() return JSONResponse({ "history": [ { "book": { "id": book.id, "title": book.title, "author": book.author, "cover_url": book.cover_url, }, "session": { "progress": session.progress, "is_finished": session.is_finished, "started_at": session.started_at.isoformat() if session.started_at else None, "finished_at": session.finished_at.isoformat() if session.finished_at else None, } } for session, book in items ] }) # ==================== Reading Log Routes ==================== @app.get("/reading-log", response_class=HTMLResponse) async def reading_log_page( request: Request, user: User = Depends(get_current_user) ): """Reading log page with stats and filters.""" return templates.TemplateResponse( "reading_log.html", { "request": request, "user": user } ) @app.get("/api/reading-log/stats") async def get_reading_stats( db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user), start_date: Optional[str] = None, end_date: Optional[str] = None ): """Get reading statistics for the user.""" try: # Parse dates if provided start_dt = datetime.fromisoformat(start_date) if start_date else None end_dt = datetime.fromisoformat(end_date) if end_date else None # Calculate stats stats_service = ReadingStatsService(db, user.id, user.abs_url) stats = await stats_service.calculate_stats(start_dt, end_dt) return JSONResponse(stats) except Exception as e: return JSONResponse( {"status": "error", "message": str(e)}, status_code=500 ) @app.put("/api/sessions/{session_id}/rating") async def update_session_rating( session_id: int, rating: int = Form(...), db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user) ): """Update the rating for a listening session.""" try: # Validate rating if rating < 1 or rating > 5: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Rating must be between 1 and 5" ) # Get session and verify ownership result = await db.execute( select(ListeningSession).where( ListeningSession.id == session_id, ListeningSession.user_id == user.id ) ) session = result.scalar_one_or_none() if not session: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Session not found" ) # Update rating session.rating = rating await db.commit() return JSONResponse({ "status": "success", "message": "Rating updated successfully", "rating": rating }) except HTTPException as e: raise e except Exception as e: return JSONResponse( {"status": "error", "message": str(e)}, status_code=500 ) # ==================== Admin Routes ==================== @app.get("/admin", response_class=HTMLResponse) async def admin_page( request: Request, user: User = Depends(get_current_admin) ): """Admin panel page.""" return templates.TemplateResponse( "admin.html", { "request": request, "user": user } ) @app.get("/api/admin/settings") async def get_admin_settings( db: AsyncSession = Depends(get_db), admin: User = Depends(get_current_admin) ): """Get application settings.""" result = await db.execute(select(AppSettings)) settings = result.scalars().all() settings_dict = {s.key: s.value for s in settings} return JSONResponse(settings_dict) @app.put("/api/admin/settings/{key}") async def update_setting( key: str, value: str = Form(...), db: AsyncSession = Depends(get_db), admin: User = Depends(get_current_admin) ): """Update an application setting.""" result = await db.execute( select(AppSettings).where(AppSettings.key == key) ) setting = result.scalar_one_or_none() if not setting: # Create new setting setting = AppSettings(key=key, value=value) db.add(setting) else: # Update existing setting.value = value await db.commit() return JSONResponse({ "status": "success", "message": f"Setting {key} updated" }) @app.get("/api/admin/users") async def get_users( db: AsyncSession = Depends(get_db), admin: User = Depends(get_current_admin) ): """Get all users.""" result = await db.execute(select(User).order_by(User.created_at.desc())) users = result.scalars().all() users_list = [ { "id": user.id, "username": user.username, "email": user.email, "display_name": user.display_name, "is_admin": user.is_admin, "is_active": user.is_active, "created_at": user.created_at.isoformat() if user.created_at else None, "last_login": user.last_login.isoformat() if user.last_login else None, "is_current": user.id == admin.id } for user in users ] return JSONResponse({"users": users_list}) @app.put("/api/admin/users/{user_id}/admin") async def toggle_user_admin( user_id: int, is_admin: str = Form(...), db: AsyncSession = Depends(get_db), admin: User = Depends(get_current_admin) ): """Toggle admin status for a user.""" # Prevent admin from removing their own admin status if user_id == admin.id: return JSONResponse( {"status": "error", "message": "Cannot modify your own admin status"}, status_code=400 ) result = await db.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) user.is_admin = is_admin.lower() == 'true' await db.commit() return JSONResponse({ "status": "success", "message": "User admin status updated" }) @app.delete("/api/admin/users/{user_id}") async def delete_user( user_id: int, db: AsyncSession = Depends(get_db), admin: User = Depends(get_current_admin) ): """Delete a user.""" # Prevent admin from deleting themselves if user_id == admin.id: return JSONResponse( {"status": "error", "message": "Cannot delete your own account"}, status_code=400 ) result = await db.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) await db.delete(user) await db.commit() return JSONResponse({ "status": "success", "message": "User deleted successfully" }) @app.post("/api/admin/users") async def create_user_by_admin( username: str = Form(...), email: str = Form(...), password: str = Form(...), display_name: str = Form(None), abs_url: str = Form(...), abs_api_token: str = Form(...), is_admin: str = Form("false"), db: AsyncSession = Depends(get_db), admin: User = Depends(get_current_admin) ): """Create a new user (admin only).""" try: # Check if username exists result = await db.execute(select(User).where(User.username == username)) if result.scalar_one_or_none(): return JSONResponse( {"status": "error", "message": "Username already exists"}, status_code=400 ) # Check if email exists result = await db.execute(select(User).where(User.email == email)) if result.scalar_one_or_none(): return JSONResponse( {"status": "error", "message": "Email already exists"}, status_code=400 ) # Create new user new_user = await create_user( db=db, username=username, email=email, password=password, display_name=display_name, abs_url=abs_url, abs_api_token=abs_api_token ) # Set admin status if requested if is_admin.lower() == "true": new_user.is_admin = True await db.commit() return JSONResponse({ "status": "success", "message": f"User '{username}' created successfully" }) except Exception as e: return JSONResponse( {"status": "error", "message": str(e)}, status_code=500 ) @app.put("/api/admin/users/{user_id}/password") async def change_user_password( user_id: int, new_password: str = Form(...), db: AsyncSession = Depends(get_db), admin: User = Depends(get_current_admin) ): """Change a user's password (admin only).""" try: # Validate password length if len(new_password) < 6: return JSONResponse( {"status": "error", "message": "Password must be at least 6 characters"}, status_code=400 ) # Get user result = await db.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) # Update password from app.auth import hash_password user.hashed_password = hash_password(new_password) await db.commit() return JSONResponse({ "status": "success", "message": f"Password changed for user '{user.username}'" }) except HTTPException: raise except Exception as e: return JSONResponse( {"status": "error", "message": str(e)}, status_code=500 ) @app.get("/health") async def health_check(): """Health check endpoint.""" return {"status": "healthy"}