This commit is contained in:
2026-02-03 21:58:25 +01:00
parent 3019bcaf1a
commit 959dad3fec
14 changed files with 1477 additions and 1 deletions

View File

@@ -0,0 +1,10 @@
"""
Services Package
This package contains business logic and external service integrations.
Keeping business logic separate from routes makes the code more testable.
"""
from . import ai_service, database
__all__ = ["ai_service", "database"]

View File

@@ -0,0 +1,250 @@
"""
=============================================================================
AI SERVICE
=============================================================================
This module handles all AI provider integrations.
Supported Providers:
- OpenAI (GPT-4, GPT-3.5)
- Anthropic (Claude)
Architecture:
- Provider-agnostic interface (same API regardless of provider)
- Easy to add new providers
- Streaming support for real-time responses
The actual API keys and configuration come from config.py
"""
from typing import AsyncGenerator, Optional
import openai
from config import settings
# =============================================================================
# INITIALIZE CLIENTS
# =============================================================================
# Initialize OpenAI client
openai_client = openai.AsyncOpenAI(api_key=settings.OPENAI_API_KEY)
# For Anthropic, you would:
# import anthropic
# anthropic_client = anthropic.AsyncAnthropic(api_key=settings.ANTHROPIC_API_KEY)
# =============================================================================
# TEXT GENERATION
# =============================================================================
async def generate_text(
prompt: str,
max_tokens: int = 500,
temperature: float = 0.7,
model: Optional[str] = None,
) -> dict:
"""
Generate text from a prompt using the configured AI provider.
Args:
prompt: The input text to generate from
max_tokens: Maximum tokens in the response
temperature: Creativity level (0 = deterministic, 2 = very creative)
model: Override the default model
Returns:
dict with keys: text, model, usage
Example:
result = await generate_text("Write a haiku about Python")
print(result["text"])
"""
model = model or settings.AI_MODEL
if settings.AI_PROVIDER == "openai":
return await _generate_openai(prompt, max_tokens, temperature, model)
elif settings.AI_PROVIDER == "anthropic":
return await _generate_anthropic(prompt, max_tokens, temperature, model)
else:
raise ValueError(f"Unknown AI provider: {settings.AI_PROVIDER}")
async def _generate_openai(
prompt: str,
max_tokens: int,
temperature: float,
model: str,
) -> dict:
"""Generate text using OpenAI."""
response = await openai_client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
max_tokens=max_tokens,
temperature=temperature,
)
return {
"text": response.choices[0].message.content,
"model": model,
"usage": {
"prompt_tokens": response.usage.prompt_tokens,
"completion_tokens": response.usage.completion_tokens,
},
}
async def _generate_anthropic(
prompt: str,
max_tokens: int,
temperature: float,
model: str,
) -> dict:
"""
Generate text using Anthropic Claude.
NOTE: Uncomment and install anthropic package to use.
"""
# import anthropic
# client = anthropic.AsyncAnthropic(api_key=settings.ANTHROPIC_API_KEY)
#
# response = await client.messages.create(
# model=model,
# max_tokens=max_tokens,
# messages=[{"role": "user", "content": prompt}],
# )
#
# return {
# "text": response.content[0].text,
# "model": model,
# "usage": {
# "prompt_tokens": response.usage.input_tokens,
# "completion_tokens": response.usage.output_tokens,
# },
# }
raise NotImplementedError("Anthropic provider not configured")
# =============================================================================
# CHAT COMPLETION
# =============================================================================
async def chat_completion(
messages: list[dict],
max_tokens: int = 500,
temperature: float = 0.7,
model: Optional[str] = None,
) -> dict:
"""
Generate a chat response from message history.
Args:
messages: List of message dicts with 'role' and 'content'
max_tokens: Maximum tokens in the response
temperature: Creativity level
model: Override the default model
Returns:
dict with keys: message, model
Example:
result = await chat_completion([
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Hello!"},
])
print(result["message"]["content"])
"""
model = model or settings.AI_MODEL
response = await openai_client.chat.completions.create(
model=model,
messages=messages,
max_tokens=max_tokens,
temperature=temperature,
)
return {
"message": {
"role": "assistant",
"content": response.choices[0].message.content,
},
"model": model,
}
# =============================================================================
# STREAMING
# =============================================================================
async def stream_text(
prompt: str,
max_tokens: int = 500,
temperature: float = 0.7,
model: Optional[str] = None,
) -> AsyncGenerator[str, None]:
"""
Stream generated text token by token.
This is an async generator that yields text chunks as they're generated.
Use this for real-time chat interfaces.
Example usage in FastAPI:
@app.post("/stream")
async def stream_endpoint(request: Request):
return StreamingResponse(
stream_text(request.prompt),
media_type="text/event-stream",
)
"""
model = model or settings.AI_MODEL
response = await openai_client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
max_tokens=max_tokens,
temperature=temperature,
stream=True, # Enable streaming
)
# Yield each chunk as it arrives
async for chunk in response:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
# =============================================================================
# UTILITY FUNCTIONS
# =============================================================================
async def count_tokens(text: str, model: str = "gpt-3.5-turbo") -> int:
"""
Count the number of tokens in a text string.
Useful for:
- Estimating costs before making API calls
- Ensuring prompts don't exceed model limits
Note: This is an approximation. For exact counts, use tiktoken library.
"""
# Rough approximation: ~4 characters per token for English
return len(text) // 4
def get_model_context_limit(model: str) -> int:
"""
Get the context window size for a model.
Useful for knowing how much text you can send/receive.
"""
limits = {
# OpenAI
"gpt-4-turbo": 128000,
"gpt-4": 8192,
"gpt-3.5-turbo": 16385,
# Anthropic
"claude-3-opus-20240229": 200000,
"claude-3-sonnet-20240229": 200000,
"claude-3-haiku-20240307": 200000,
}
return limits.get(model, 4096)

View File

@@ -0,0 +1,172 @@
"""
=============================================================================
DATABASE SERVICE (SUPABASE)
=============================================================================
This module provides Supabase database integration.
Supabase Features Used:
- Database: PostgreSQL with auto-generated REST API
- Auth: User authentication (handled in frontend mostly)
- Real-time: Websocket subscriptions (used in frontend)
- Storage: File uploads (not shown here, but easy to add)
Why Supabase?
- Free tier is generous (great for hackathons!)
- Built-in auth that works with frontend
- Real-time subscriptions out of the box
- Full PostgreSQL (can run raw SQL)
- Great documentation
"""
from supabase import create_client, Client
from config import settings
from functools import lru_cache
@lru_cache
def get_supabase_client() -> Client:
"""
Get a cached Supabase client instance.
Using lru_cache ensures we reuse the same client,
avoiding connection overhead.
Note: We use the SERVICE KEY here (not anon key) because
the backend needs admin access to the database.
The frontend uses the anon key with Row Level Security.
"""
if not settings.SUPABASE_URL or not settings.SUPABASE_SERVICE_KEY:
raise ValueError(
"Supabase not configured. "
"Set SUPABASE_URL and SUPABASE_SERVICE_KEY in .env"
)
return create_client(
settings.SUPABASE_URL,
settings.SUPABASE_SERVICE_KEY,
)
# =============================================================================
# DATABASE HELPERS
# =============================================================================
async def get_user_by_id(user_id: str) -> dict | None:
"""
Get a user by their ID.
Example:
user = await get_user_by_id("123-456-789")
print(user["email"])
"""
supabase = get_supabase_client()
response = supabase.auth.admin.get_user_by_id(user_id)
return response.user if response else None
async def verify_jwt_token(token: str) -> dict | None:
"""
Verify a JWT token and get the user.
Use this to authenticate API requests that include
the Supabase access token.
Example:
@app.get("/protected")
async def protected_route(authorization: str = Header()):
token = authorization.replace("Bearer ", "")
user = await verify_jwt_token(token)
if not user:
raise HTTPException(status_code=401)
"""
supabase = get_supabase_client()
try:
response = supabase.auth.get_user(token)
return response.user if response else None
except Exception:
return None
# =============================================================================
# GENERIC CRUD HELPERS
# =============================================================================
class DatabaseTable:
"""
Helper class for common database operations.
Example:
projects = DatabaseTable("projects")
all_projects = await projects.get_all()
project = await projects.get_by_id("123")
new_project = await projects.create({"name": "My Project"})
"""
def __init__(self, table_name: str):
self.table_name = table_name
self.client = get_supabase_client()
async def get_all(self, filters: dict = None) -> list[dict]:
"""Get all records, optionally filtered."""
query = self.client.table(self.table_name).select("*")
if filters:
for key, value in filters.items():
query = query.eq(key, value)
response = query.execute()
return response.data
async def get_by_id(self, record_id: str) -> dict | None:
"""Get a single record by ID."""
response = (
self.client.table(self.table_name)
.select("*")
.eq("id", record_id)
.single()
.execute()
)
return response.data
async def create(self, data: dict) -> dict:
"""Create a new record."""
response = (
self.client.table(self.table_name)
.insert(data)
.execute()
)
return response.data[0]
async def update(self, record_id: str, data: dict) -> dict:
"""Update an existing record."""
response = (
self.client.table(self.table_name)
.update(data)
.eq("id", record_id)
.execute()
)
return response.data[0] if response.data else None
async def delete(self, record_id: str) -> bool:
"""Delete a record."""
self.client.table(self.table_name).delete().eq("id", record_id).execute()
return True
# =============================================================================
# EXAMPLE: Projects Table Helper
# =============================================================================
# Create a helper instance for the projects table
projects_db = DatabaseTable("projects")
# You can now use:
# await projects_db.get_all()
# await projects_db.get_by_id("123")
# await projects_db.create({"name": "Test", "user_id": "abc"})
# await projects_db.update("123", {"name": "Updated"})
# await projects_db.delete("123")