Skip to content

Security Integration

pyramid-capstone integrates seamlessly with Pyramid's powerful security system, allowing you to protect your API endpoints with authentication and authorization.

Quick Start

Add the permission parameter to any endpoint decorator:

from pyramid_capstone import th_api

@th_api.get('/admin/users', permission='admin')
def list_users_admin(request) -> list:
    """Only users with 'admin' permission can access this."""
    return get_all_users()

@th_api.post('/posts', permission='create_post')
def create_post(request, title: str, content: str) -> dict:
    """Only authenticated users with 'create_post' permission."""
    return create_new_post(title, content)

Setting Up Authentication

1. Basic Authentication Setup

Here's a complete example with session-based authentication:

from pyramid.config import Configurator
from pyramid.authentication import SessionAuthenticationPolicy
from pyramid.authorization import ACLAuthorizationPolicy
from pyramid.security import Allow, Everyone, Authenticated
from pyramid_capstone import th_api

class RootACL:
    """Root Access Control List."""
    __acl__ = [
        (Allow, Everyone, 'view'),
        (Allow, Authenticated, 'create'),
        (Allow, 'group:admins', 'admin'),
    ]

def main(global_config, **settings):
    config = Configurator(
        settings=settings,
        root_factory=lambda request: RootACL()
    )

    # Set up authentication and authorization
    config.set_authentication_policy(SessionAuthenticationPolicy())
    config.set_authorization_policy(ACLAuthorizationPolicy())

    # Include pyramid-capstone
    config.include('pyramid_capstone')
    config.scan()

    return config.make_wsgi_app()

2. JWT Authentication

For API-first applications, JWT tokens are often preferred:

from pyramid_jwt import create_jwt_authentication_policy

def main(global_config, **settings):
    config = Configurator(settings=settings)

    # JWT Authentication
    jwt_policy = create_jwt_authentication_policy(
        private_key='your-secret-key',
        algorithm='HS256',
        expiration=3600,  # 1 hour
        auth_type='Bearer'
    )

    config.set_authentication_policy(jwt_policy)
    config.set_authorization_policy(ACLAuthorizationPolicy())

    config.include('pyramid_capstone')
    config.scan()

    return config.make_wsgi_app()

Protected Endpoints

Basic Permission Checking

@th_api.get('/profile', permission='view')
def get_profile(request) -> dict:
    """Any authenticated user can view their profile."""
    user_id = request.authenticated_userid
    return {"user_id": user_id, "profile": get_user_profile(user_id)}

@th_api.post('/admin/users', permission='admin')
def create_user_admin(request, username: str, email: str) -> dict:
    """Only admins can create users."""
    return create_user(username, email)

@th_api.delete('/posts/{post_id}', permission='delete_post')
def delete_post(request, post_id: int) -> dict:
    """Only users with delete_post permission."""
    delete_post_by_id(post_id)
    return {"message": "Post deleted"}

Dynamic Permissions

For more complex authorization, you can use dynamic permissions:

from pyramid.security import has_permission

@th_api.get('/posts/{post_id}')
def get_post(request, post_id: int) -> dict:
    """Public posts are viewable by everyone, private posts need permission."""
    post = get_post_by_id(post_id)

    if post.is_private:
        # Check if user has permission to view private posts
        if not has_permission('view_private', request.context, request):
            request.response.status = 403
            return {"error": "Access denied"}

    return post.to_dict()

@th_api.put('/posts/{post_id}')
def update_post(request, post_id: int, title: str, content: str) -> dict:
    """Users can only edit their own posts, unless they're admins."""
    post = get_post_by_id(post_id)
    user_id = request.authenticated_userid

    # Check ownership or admin permission
    if post.author_id != user_id and not has_permission('admin', request.context, request):
        request.response.status = 403
        return {"error": "You can only edit your own posts"}

    return update_post_content(post_id, title, content)

Authentication Endpoints

Create login/logout endpoints to manage authentication:

from pyramid.security import remember, forget
import bcrypt

# In-memory user store (use a database in production)
USERS = {
    "alice": {
        "password": bcrypt.hashpw(b"secret123", bcrypt.gensalt()),
        "groups": ["users"]
    },
    "admin": {
        "password": bcrypt.hashpw(b"admin123", bcrypt.gensalt()),
        "groups": ["users", "admins"]
    }
}

@th_api.post('/auth/login')
def login(request, username: str, password: str) -> dict:
    """Authenticate user and create session."""
    user = USERS.get(username)

    if not user or not bcrypt.checkpw(password.encode(), user["password"]):
        request.response.status = 401
        return {"error": "Invalid credentials"}

    # Create authentication headers
    headers = remember(request, username)
    request.response.headerlist.extend(headers)

    return {
        "message": "Login successful",
        "user": username,
        "groups": user["groups"]
    }

@th_api.post('/auth/logout')
def logout(request) -> dict:
    """Logout user and clear session."""
    headers = forget(request)
    request.response.headerlist.extend(headers)
    return {"message": "Logout successful"}

@th_api.get('/auth/me', permission='view')
def get_current_user(request) -> dict:
    """Get current authenticated user info."""
    username = request.authenticated_userid
    user = USERS.get(username, {})

    return {
        "username": username,
        "groups": user.get("groups", [])
    }

Advanced Security Patterns

Role-Based Access Control (RBAC)

from pyramid.security import Allow, Deny, Everyone, Authenticated

class BlogACL:
    """Access Control List for blog resources."""
    def __init__(self, request):
        self.request = request

    @property
    def __acl__(self):
        # Base permissions
        acl = [
            (Allow, Everyone, 'view_public'),
            (Allow, Authenticated, 'view_private'),
            (Allow, Authenticated, 'create_post'),
            (Allow, 'group:moderators', 'moderate'),
            (Allow, 'group:admins', 'admin'),
        ]

        # Dynamic permissions based on context
        if hasattr(self, 'post_id'):
            post = get_post_by_id(self.post_id)
            if post:
                # Post authors can edit their own posts
                acl.append((Allow, f'user:{post.author_id}', 'edit_post'))

        return acl

# Use in configuration
def main(global_config, **settings):
    config = Configurator(
        settings=settings,
        root_factory=BlogACL
    )
    # ... rest of configuration

API Key Authentication

import secrets
from pyramid.authentication import CallbackAuthenticationPolicy

# API key storage (use database in production)
API_KEYS = {
    "sk_test_123": {"user_id": "alice", "permissions": ["read", "write"]},
    "sk_prod_456": {"user_id": "admin", "permissions": ["read", "write", "admin"]},
}

class APIKeyAuthenticationPolicy(CallbackAuthenticationPolicy):
    def unauthenticated_userid(self, request):
        """Extract API key from Authorization header."""
        auth_header = request.headers.get('Authorization', '')
        if auth_header.startswith('Bearer '):
            return auth_header[7:]  # Remove 'Bearer ' prefix
        return None

    def callback(self, userid, request):
        """Validate API key and return user groups."""
        key_data = API_KEYS.get(userid)
        if key_data:
            return [f"permission:{perm}" for perm in key_data["permissions"]]
        return None

# Protected endpoint with API key
@th_api.get('/api/data', permission='read')
def get_api_data(request) -> dict:
    """Access with: Authorization: Bearer sk_test_123"""
    api_key = request.authenticated_userid
    key_data = API_KEYS.get(api_key, {})

    return {
        "data": "sensitive information",
        "user_id": key_data.get("user_id"),
        "permissions": key_data.get("permissions", [])
    }

Rate Limiting

from functools import wraps
from time import time
from collections import defaultdict

# Simple in-memory rate limiter (use Redis in production)
rate_limit_storage = defaultdict(list)

def rate_limit(max_requests: int = 100, window_seconds: int = 3600):
    """Rate limiting decorator."""
    def decorator(func):
        @wraps(func)
        def wrapper(request, *args, **kwargs):
            # Get client identifier (IP or user ID)
            client_id = request.authenticated_userid or request.client_addr
            current_time = time()

            # Clean old requests
            rate_limit_storage[client_id] = [
                req_time for req_time in rate_limit_storage[client_id]
                if current_time - req_time < window_seconds
            ]

            # Check rate limit
            if len(rate_limit_storage[client_id]) >= max_requests:
                request.response.status = 429
                return {"error": "Rate limit exceeded"}

            # Record this request
            rate_limit_storage[client_id].append(current_time)

            return func(request, *args, **kwargs)
        return wrapper
    return decorator

# Apply rate limiting
@th_api.post('/api/expensive-operation', permission='create')
@rate_limit(max_requests=10, window_seconds=3600)  # 10 requests per hour
def expensive_operation(request, data: str) -> dict:
    """Rate-limited endpoint."""
    return perform_expensive_operation(data)

Security Best Practices

1. Input Validation

Always validate and sanitize input data:

import re
from html import escape

@th_api.post('/posts')
def create_post(request, title: str, content: str, permission='create_post') -> dict:
    """Create a post with input validation."""

    # Validate title length
    if len(title.strip()) < 3:
        request.response.status = 400
        return {"error": "Title must be at least 3 characters"}

    if len(title) > 200:
        request.response.status = 400
        return {"error": "Title must be less than 200 characters"}

    # Sanitize HTML content
    content = escape(content)

    # Validate content
    if len(content.strip()) < 10:
        request.response.status = 400
        return {"error": "Content must be at least 10 characters"}

    return create_new_post(title.strip(), content)

2. CORS Configuration

For browser-based applications:

def main(global_config, **settings):
    config = Configurator(settings=settings)

    # CORS settings
    config.add_settings({
        'cors.preflight_maxage': '3600',
        'cors.origins': 'https://yourdomain.com',
        'cors.credentials': 'true'
    })

    config.include('pyramid_cors')
    config.include('pyramid_capstone')
    config.scan()

    return config.make_wsgi_app()

3. HTTPS Enforcement

from pyramid.events import NewRequest

def require_https(event):
    """Redirect HTTP requests to HTTPS."""
    request = event.request
    if request.scheme != 'https' and not request.registry.settings.get('debug'):
        raise HTTPSRequired()

def main(global_config, **settings):
    config = Configurator(settings=settings)

    # Enforce HTTPS in production
    if not settings.get('debug'):
        config.add_subscriber(require_https, NewRequest)

    # ... rest of configuration

4. Security Headers

from pyramid.events import NewResponse

def add_security_headers(event):
    """Add security headers to all responses."""
    response = event.response
    response.headers.update({
        'X-Content-Type-Options': 'nosniff',
        'X-Frame-Options': 'DENY',
        'X-XSS-Protection': '1; mode=block',
        'Strict-Transport-Security': 'max-age=31536000; includeSubDomains',
        'Content-Security-Policy': "default-src 'self'"
    })

def main(global_config, **settings):
    config = Configurator(settings=settings)
    config.add_subscriber(add_security_headers, NewResponse)
    # ... rest of configuration

Testing Secured Endpoints

Test your secured endpoints properly:

import pytest
from pyramid.testing import DummyRequest
from pyramid.security import remember

def test_protected_endpoint_requires_auth(pyramid_config):
    """Test that protected endpoints require authentication."""
    request = DummyRequest()

    # Should fail without authentication
    response = get_profile(request)
    assert request.response.status_int == 403

def test_protected_endpoint_with_auth(pyramid_config):
    """Test protected endpoint with valid authentication."""
    request = DummyRequest()

    # Simulate authenticated user
    request.authenticated_userid = 'alice'

    response = get_profile(request)
    assert response['user_id'] == 'alice'

def test_admin_endpoint_requires_admin_permission(pyramid_config):
    """Test that admin endpoints require admin permission."""
    request = DummyRequest()
    request.authenticated_userid = 'regular_user'

    # Mock has_permission to return False for non-admin
    with patch('pyramid.security.has_permission', return_value=False):
        response = create_user_admin(request, 'newuser', 'new@example.com')
        assert request.response.status_int == 403

Common Security Patterns

Resource-Based Permissions

@th_api.get('/posts/{post_id}', permission='view_post')
def get_post(request, post_id: int) -> dict:
    """Permission is checked against the specific post resource."""
    # Pyramid will check 'view_post' permission against the post context
    pass

@th_api.put('/posts/{post_id}', permission='edit_post')
def update_post(request, post_id: int, title: str) -> dict:
    """Only users who can edit this specific post."""
    pass

Conditional Security

@th_api.get('/posts/{post_id}')
def get_post_conditional(request, post_id: int) -> dict:
    """Apply security conditionally based on post visibility."""
    post = get_post_by_id(post_id)

    if post.is_private:
        # Check authentication for private posts
        if not request.authenticated_userid:
            request.response.status = 401
            return {"error": "Authentication required"}

        # Check if user can view this private post
        if not can_view_private_post(request.authenticated_userid, post):
            request.response.status = 403
            return {"error": "Access denied"}

    return post.to_dict()

Security is a crucial aspect of API development. The pyramid-capstone library makes it easy to integrate with Pyramid's robust security system while maintaining clean, readable code.

For more examples of security implementations, see our Examples guide.