Skip to content

Examples

This page showcases real-world examples and patterns using pyramid-capstone.

Complete Blog API Example

The repository includes a comprehensive Blog API example that demonstrates:

  • CRUD operations for users, posts, comments, and categories
  • Complex parameter handling with filtering and pagination
  • Nested relationships between entities
  • Error handling with proper HTTP status codes
  • OpenAPI documentation with Swagger UI

Running the Blog Example

# Clone the repository
git clone https://github.com/tomas_correa/pyramid-capstone.git
cd pyramid-capstone

# Install dependencies
poetry install --with dev

# Run the blog API
cd examples/blog_api
pserve development.ini

# Visit http://localhost:6543 for API documentation
# Visit http://localhost:6543/swagger-ui/ for interactive docs

Common Patterns

1. Simple CRUD Operations

from pyramid_capstone import th_api
from typing import List, Optional

# In-memory storage for demo
books = {}
next_id = 1

@th_api.get('/books')
def list_books(request, 
               author: Optional[str] = None,
               genre: Optional[str] = None,
               page: int = 1,
               per_page: int = 10) -> dict:
    """List books with optional filtering and pagination."""
    filtered_books = list(books.values())

    # Apply filters
    if author:
        filtered_books = [b for b in filtered_books if author.lower() in b['author'].lower()]
    if genre:
        filtered_books = [b for b in filtered_books if b['genre'].lower() == genre.lower()]

    # Pagination
    total = len(filtered_books)
    start = (page - 1) * per_page
    end = start + per_page
    page_books = filtered_books[start:end]

    return {
        "books": page_books,
        "pagination": {
            "page": page,
            "per_page": per_page,
            "total": total,
            "has_next": end < total,
            "has_prev": page > 1
        }
    }

@th_api.get('/books/{book_id}')
def get_book(request, book_id: int) -> dict:
    """Get a specific book by ID."""
    book = books.get(book_id)
    if not book:
        request.response.status = 404
        return {"error": "Book not found"}
    return book

@th_api.post('/books')
def create_book(request, 
                title: str, 
                author: str, 
                genre: str,
                isbn: Optional[str] = None,
                pages: Optional[int] = None) -> dict:
    """Create a new book."""
    global next_id

    # Validation
    if len(title.strip()) < 1:
        request.response.status = 400
        return {"error": "Title is required"}

    if len(author.strip()) < 1:
        request.response.status = 400
        return {"error": "Author is required"}

    book = {
        "id": next_id,
        "title": title.strip(),
        "author": author.strip(),
        "genre": genre.strip(),
        "isbn": isbn,
        "pages": pages,
        "created_at": datetime.now().isoformat()
    }

    books[next_id] = book
    next_id += 1

    request.response.status = 201
    return book

@th_api.put('/books/{book_id}')
def update_book(request, 
                book_id: int,
                title: Optional[str] = None,
                author: Optional[str] = None,
                genre: Optional[str] = None,
                isbn: Optional[str] = None,
                pages: Optional[int] = None) -> dict:
    """Update an existing book."""
    book = books.get(book_id)
    if not book:
        request.response.status = 404
        return {"error": "Book not found"}

    # Update provided fields
    if title is not None:
        book["title"] = title.strip()
    if author is not None:
        book["author"] = author.strip()
    if genre is not None:
        book["genre"] = genre.strip()
    if isbn is not None:
        book["isbn"] = isbn
    if pages is not None:
        book["pages"] = pages

    book["updated_at"] = datetime.now().isoformat()
    return book

@th_api.delete('/books/{book_id}')
def delete_book(request, book_id: int) -> dict:
    """Delete a book."""
    if book_id not in books:
        request.response.status = 404
        return {"error": "Book not found"}

    del books[book_id]
    return {"message": "Book deleted successfully"}

2. File Upload Handling

import os
import uuid
from pyramid_capstone import th_api

UPLOAD_DIR = "/tmp/uploads"
os.makedirs(UPLOAD_DIR, exist_ok=True)

@th_api.post('/upload')
def upload_file(request, description: Optional[str] = None) -> dict:
    """Upload a file with optional description."""

    # Get uploaded file from request
    if 'file' not in request.POST:
        request.response.status = 400
        return {"error": "No file provided"}

    upload = request.POST['file']

    # Validate file
    if not hasattr(upload, 'filename') or not upload.filename:
        request.response.status = 400
        return {"error": "Invalid file"}

    # Check file size (limit to 10MB)
    if hasattr(upload, 'file'):
        upload.file.seek(0, 2)  # Seek to end
        size = upload.file.tell()
        upload.file.seek(0)  # Reset to beginning

        if size > 10 * 1024 * 1024:  # 10MB
            request.response.status = 400
            return {"error": "File too large (max 10MB)"}

    # Generate unique filename
    file_id = str(uuid.uuid4())
    file_ext = os.path.splitext(upload.filename)[1]
    filename = f"{file_id}{file_ext}"
    filepath = os.path.join(UPLOAD_DIR, filename)

    # Save file
    with open(filepath, 'wb') as f:
        f.write(upload.file.read())

    # Store file metadata
    file_info = {
        "id": file_id,
        "original_name": upload.filename,
        "filename": filename,
        "size": os.path.getsize(filepath),
        "description": description,
        "uploaded_at": datetime.now().isoformat()
    }

    request.response.status = 201
    return file_info

@th_api.get('/files/{file_id}')
def download_file(request, file_id: str) -> dict:
    """Download a file by ID."""
    # In a real app, you'd look up file metadata from database
    filename = f"{file_id}.jpg"  # Simplified for example
    filepath = os.path.join(UPLOAD_DIR, filename)

    if not os.path.exists(filepath):
        request.response.status = 404
        return {"error": "File not found"}

    # Set response headers for file download
    request.response.content_type = 'application/octet-stream'
    request.response.headers['Content-Disposition'] = f'attachment; filename="{filename}"'

    # Return file content
    with open(filepath, 'rb') as f:
        request.response.body = f.read()

    return request.response

3. Complex Data Validation

from dataclasses import dataclass
from typing import List, Optional
from datetime import datetime, date
import re

@dataclass
class Address:
    street: str
    city: str
    state: str
    zip_code: str
    country: str = "US"

@dataclass
class ContactInfo:
    email: str
    phone: Optional[str] = None
    address: Optional[Address] = None

@dataclass
class Person:
    first_name: str
    last_name: str
    birth_date: date
    contact: ContactInfo

# Validation functions
def validate_email(email: str) -> str:
    pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
    if not re.match(pattern, email):
        raise ValueError("Invalid email format")
    return email.lower()

def validate_phone(phone: str) -> str:
    # Remove all non-digits
    digits = re.sub(r'\D', '', phone)
    if len(digits) != 10:
        raise ValueError("Phone number must be 10 digits")
    return f"({digits[:3]}) {digits[3:6]}-{digits[6:]}"

def validate_zip_code(zip_code: str) -> str:
    if not re.match(r'^\d{5}(-\d{4})?$', zip_code):
        raise ValueError("Invalid ZIP code format")
    return zip_code

@th_api.post('/people')
def create_person(request,
                  first_name: str,
                  last_name: str,
                  birth_date: str,
                  email: str,
                  phone: Optional[str] = None,
                  street: Optional[str] = None,
                  city: Optional[str] = None,
                  state: Optional[str] = None,
                  zip_code: Optional[str] = None) -> dict:
    """Create a person with complex validation."""

    errors = {}

    # Validate names
    if len(first_name.strip()) < 2:
        errors['first_name'] = 'First name must be at least 2 characters'

    if len(last_name.strip()) < 2:
        errors['last_name'] = 'Last name must be at least 2 characters'

    # Validate birth date
    try:
        birth_date_obj = datetime.fromisoformat(birth_date).date()
        if birth_date_obj > date.today():
            errors['birth_date'] = 'Birth date cannot be in the future'
        if birth_date_obj < date(1900, 1, 1):
            errors['birth_date'] = 'Birth date cannot be before 1900'
    except ValueError:
        errors['birth_date'] = 'Invalid date format (use YYYY-MM-DD)'
        birth_date_obj = None

    # Validate email
    try:
        email = validate_email(email)
    except ValueError as e:
        errors['email'] = str(e)

    # Validate phone if provided
    if phone:
        try:
            phone = validate_phone(phone)
        except ValueError as e:
            errors['phone'] = str(e)

    # Validate address if provided
    address = None
    if any([street, city, state, zip_code]):
        # If any address field is provided, all required fields must be present
        if not all([street, city, state, zip_code]):
            errors['address'] = 'All address fields (street, city, state, zip_code) are required'
        else:
            try:
                zip_code = validate_zip_code(zip_code)
                address = Address(
                    street=street.strip(),
                    city=city.strip(),
                    state=state.strip().upper(),
                    zip_code=zip_code
                )
            except ValueError as e:
                errors['zip_code'] = str(e)

    # Return validation errors
    if errors:
        request.response.status = 400
        return {"error": "Validation failed", "details": errors}

    # Create person object
    contact = ContactInfo(
        email=email,
        phone=phone,
        address=address
    )

    person = Person(
        first_name=first_name.strip(),
        last_name=last_name.strip(),
        birth_date=birth_date_obj,
        contact=contact
    )

    # In a real app, save to database here
    person_dict = {
        "id": str(uuid.uuid4()),
        "first_name": person.first_name,
        "last_name": person.last_name,
        "birth_date": person.birth_date.isoformat(),
        "contact": {
            "email": person.contact.email,
            "phone": person.contact.phone,
            "address": {
                "street": person.contact.address.street,
                "city": person.contact.address.city,
                "state": person.contact.address.state,
                "zip_code": person.contact.address.zip_code,
                "country": person.contact.address.country
            } if person.contact.address else None
        },
        "created_at": datetime.now().isoformat()
    }

    request.response.status = 201
    return person_dict

4. Async Operations with Background Tasks

import asyncio
from concurrent.futures import ThreadPoolExecutor
from pyramid_capstone import th_api

# Thread pool for background tasks
executor = ThreadPoolExecutor(max_workers=4)

# Task storage (use Redis or database in production)
tasks = {}

def long_running_task(task_id: str, data: dict) -> dict:
    """Simulate a long-running task."""
    import time

    # Update task status
    tasks[task_id]["status"] = "processing"
    tasks[task_id]["progress"] = 0

    # Simulate work with progress updates
    for i in range(10):
        time.sleep(1)  # Simulate work
        tasks[task_id]["progress"] = (i + 1) * 10

    # Complete task
    tasks[task_id]["status"] = "completed"
    tasks[task_id]["result"] = {
        "processed_data": f"Processed: {data}",
        "completed_at": datetime.now().isoformat()
    }

    return tasks[task_id]["result"]

@th_api.post('/tasks')
def create_task(request, task_type: str, data: dict) -> dict:
    """Create a background task."""

    if task_type not in ["data_processing", "report_generation"]:
        request.response.status = 400
        return {"error": "Invalid task type"}

    task_id = str(uuid.uuid4())

    # Initialize task
    tasks[task_id] = {
        "id": task_id,
        "type": task_type,
        "status": "queued",
        "progress": 0,
        "created_at": datetime.now().isoformat(),
        "result": None
    }

    # Submit task to thread pool
    future = executor.submit(long_running_task, task_id, data)
    tasks[task_id]["future"] = future

    request.response.status = 202  # Accepted
    return {
        "task_id": task_id,
        "status": "queued",
        "status_url": f"/tasks/{task_id}"
    }

@th_api.get('/tasks/{task_id}')
def get_task_status(request, task_id: str) -> dict:
    """Get task status and result."""

    task = tasks.get(task_id)
    if not task:
        request.response.status = 404
        return {"error": "Task not found"}

    # Check if task is done
    if "future" in task and task["future"].done():
        try:
            result = task["future"].result()
            task["status"] = "completed"
            task["result"] = result
        except Exception as e:
            task["status"] = "failed"
            task["error"] = str(e)

        # Clean up future reference
        del task["future"]

    # Return task info (excluding future object)
    return {
        "id": task["id"],
        "type": task["type"],
        "status": task["status"],
        "progress": task["progress"],
        "created_at": task["created_at"],
        "result": task.get("result"),
        "error": task.get("error")
    }

@th_api.get('/tasks')
def list_tasks(request, status: Optional[str] = None) -> dict:
    """List all tasks with optional status filter."""

    filtered_tasks = []
    for task in tasks.values():
        # Skip future object in response
        task_info = {k: v for k, v in task.items() if k != "future"}

        if status is None or task_info["status"] == status:
            filtered_tasks.append(task_info)

    return {"tasks": filtered_tasks}

5. WebSocket Integration

from pyramid_capstone import th_api
import json

# WebSocket connections storage
websocket_connections = set()

@th_api.post('/notifications')
def send_notification(request, message: str, notification_type: str = "info") -> dict:
    """Send notification to all connected WebSocket clients."""

    notification = {
        "type": notification_type,
        "message": message,
        "timestamp": datetime.now().isoformat()
    }

    # Send to all connected WebSocket clients
    disconnected = set()
    for ws in websocket_connections:
        try:
            ws.send(json.dumps(notification))
        except:
            # Connection is closed, mark for removal
            disconnected.add(ws)

    # Remove disconnected clients
    websocket_connections -= disconnected

    return {
        "message": "Notification sent",
        "recipients": len(websocket_connections),
        "notification": notification
    }

@th_api.get('/notifications/stats')
def get_notification_stats(request) -> dict:
    """Get notification system statistics."""
    return {
        "connected_clients": len(websocket_connections),
        "server_time": datetime.now().isoformat()
    }

# WebSocket handler (separate from th_api decorators)
def websocket_view(request):
    """Handle WebSocket connections."""
    ws = request.environ.get('wsgi.websocket')
    if not ws:
        request.response.status = 400
        return {"error": "WebSocket connection required"}

    # Add to connections
    websocket_connections.add(ws)

    try:
        # Send welcome message
        welcome = {
            "type": "welcome",
            "message": "Connected to notification service",
            "timestamp": datetime.now().isoformat()
        }
        ws.send(json.dumps(welcome))

        # Keep connection alive
        while True:
            message = ws.receive()
            if message is None:
                break

            # Echo received messages (for testing)
            echo = {
                "type": "echo",
                "message": f"Received: {message}",
                "timestamp": datetime.now().isoformat()
            }
            ws.send(json.dumps(echo))

    except Exception as e:
        print(f"WebSocket error: {e}")

    finally:
        # Remove from connections
        websocket_connections.discard(ws)

    return {}

6. API Versioning

from pyramid_capstone import th_api

# Version 1 API
@th_api.get('/v1/users/{user_id}')
def get_user_v1(request, user_id: int) -> dict:
    """Get user (v1 format)."""
    user = get_user_from_db(user_id)
    if not user:
        request.response.status = 404
        return {"error": "User not found"}

    # V1 format - simple structure
    return {
        "id": user.id,
        "name": user.username,
        "email": user.email
    }

# Version 2 API
@th_api.get('/v2/users/{user_id}')
def get_user_v2(request, user_id: int, include_profile: bool = False) -> dict:
    """Get user (v2 format with enhanced features)."""
    user = get_user_from_db(user_id)
    if not user:
        request.response.status = 404
        return {"error": "User not found"}

    # V2 format - enhanced structure
    result = {
        "id": user.id,
        "username": user.username,
        "email": user.email,
        "created_at": user.created_at.isoformat(),
        "is_active": user.is_active,
        "metadata": {
            "version": "2.0",
            "last_updated": user.updated_at.isoformat()
        }
    }

    # Optional profile inclusion
    if include_profile and user.profile:
        result["profile"] = {
            "bio": user.profile.bio,
            "avatar_url": user.profile.avatar_url,
            "location": user.profile.location
        }

    return result

# Version negotiation through headers
@th_api.get('/users/{user_id}')
def get_user_versioned(request, user_id: int) -> dict:
    """Get user with version negotiation."""

    # Check Accept header for version
    accept_header = request.headers.get('Accept', '')

    if 'application/vnd.api.v2+json' in accept_header:
        return get_user_v2(request, user_id)
    else:
        # Default to v1
        return get_user_v1(request, user_id)

Integration Examples

SQLAlchemy Integration

from sqlalchemy import Column, Integer, String, DateTime, Boolean, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship, sessionmaker
from pyramid_capstone import th_api

Base = declarative_base()

class User(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True)
    username = Column(String(50), unique=True, nullable=False)
    email = Column(String(255), unique=True, nullable=False)
    created_at = Column(DateTime, default=datetime.utcnow)
    is_active = Column(Boolean, default=True)

    posts = relationship("Post", back_populates="author")

class Post(Base):
    __tablename__ = 'posts'

    id = Column(Integer, primary_key=True)
    title = Column(String(200), nullable=False)
    content = Column(String, nullable=False)
    author_id = Column(Integer, ForeignKey('users.id'))
    created_at = Column(DateTime, default=datetime.utcnow)

    author = relationship("User", back_populates="posts")

@th_api.get('/users/{user_id}/posts')
def get_user_posts(request, user_id: int, page: int = 1, per_page: int = 10) -> dict:
    """Get user posts with SQLAlchemy."""

    # Query with eager loading to avoid N+1 queries
    posts_query = (request.dbsession.query(Post)
                   .filter_by(author_id=user_id)
                   .order_by(Post.created_at.desc()))

    # Get total count
    total = posts_query.count()

    # Apply pagination
    posts = (posts_query
             .offset((page - 1) * per_page)
             .limit(per_page)
             .all())

    return {
        "posts": [
            {
                "id": post.id,
                "title": post.title,
                "content": post.content[:200] + "..." if len(post.content) > 200 else post.content,
                "created_at": post.created_at.isoformat()
            }
            for post in posts
        ],
        "pagination": {
            "page": page,
            "per_page": per_page,
            "total": total,
            "has_next": (page * per_page) < total,
            "has_prev": page > 1
        }
    }

These examples demonstrate the flexibility and power of pyramid-capstone for building various types of APIs. Each pattern can be adapted and combined to meet your specific requirements.

For more comprehensive examples, check out the Blog API example in the repository.