import httpx from typing import List, Dict, Any from app.config import get_settings class AudiobookshelfClient: """Client for interacting with Audiobookshelf API.""" def __init__(self, abs_url: str, api_token: str): """ Initialize Audiobookshelf client with credentials. Args: abs_url: Base URL of Audiobookshelf server api_token: API token for authentication (unencrypted) """ self.base_url = abs_url.rstrip("/") self.api_token = api_token self.headers = {"Authorization": f"Bearer {self.api_token}"} async def get_libraries(self) -> List[Dict[str, Any]]: """Get all libraries from Audiobookshelf.""" async with httpx.AsyncClient() as client: response = await client.get( f"{self.base_url}/api/libraries", headers=self.headers ) response.raise_for_status() return response.json().get("libraries", []) async def get_library_items(self, library_id: str) -> List[Dict[str, Any]]: """Get all items in a library.""" async with httpx.AsyncClient() as client: response = await client.get( f"{self.base_url}/api/libraries/{library_id}/items", headers=self.headers, ) response.raise_for_status() return response.json().get("results", []) async def get_user_listening_sessions(self) -> List[Dict[str, Any]]: """Get user's listening sessions.""" async with httpx.AsyncClient() as client: response = await client.get( f"{self.base_url}/api/me/listening-sessions", headers=self.headers ) response.raise_for_status() return response.json().get("sessions", []) async def get_user_progress(self) -> List[Dict[str, Any]]: """Get user's items in progress.""" async with httpx.AsyncClient() as client: response = await client.get( f"{self.base_url}/api/me/items-in-progress", headers=self.headers ) response.raise_for_status() # The response is an array of library items with progress result = response.json() return result.get("libraryItems", result if isinstance(result, list) else []) async def get_item_details(self, item_id: str) -> Dict[str, Any]: """Get detailed information about a specific library item.""" async with httpx.AsyncClient() as client: response = await client.get( f"{self.base_url}/api/items/{item_id}", headers=self.headers ) response.raise_for_status() return response.json() async def get_user_info(self) -> Dict[str, Any]: """Get current user information.""" async with httpx.AsyncClient() as client: response = await client.get( f"{self.base_url}/api/me", headers=self.headers ) response.raise_for_status() return response.json() def get_abs_client(user) -> AudiobookshelfClient: """ Create an Audiobookshelf client for a specific user. Args: user: User model instance with abs_url and abs_api_token Returns: Configured AudiobookshelfClient instance """ from app.auth import decrypt_token # Decrypt the user's API token decrypted_token = decrypt_token(user.abs_api_token) return AudiobookshelfClient( abs_url=user.abs_url, api_token=decrypted_token )