How to protect your flask APIs from hackers

Practical guide on implementing security measures in Flask APIs.

Flask API Security Best Practices

Summary

Practical guide on implementing security measures in Flask APIs. Includes code examples for rate limiting, input validation, JWT authentication, SQL injection prevention, CORS, security logging, and error handling.


Rate Limiting

Rate limiting prevents abuse by restricting the number of requests a client can make within a specific time window. This protects your API from brute force attacks, DoS attempts, and resource exhaustion.

Basic implementation with Flask-Limiter

Flask-Limiter is the most straightforward way to implement rate limiting. It provides decorators to easily set limits per endpoint with minimal configuration.

from flask import Flask
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address

app = Flask(__name__)
limiter = Limiter(
    app,
    key_func=get_remote_address,
    default_limits=["100 per hour"]
)

@app.route('/api/data')
@limiter.limit("10 per minute")
def get_data():
    return {"data": "sensitive_info"}

Custom rate limiting with Redis

Redis-based rate limiting offers more control and scalability. This approach is ideal for distributed systems where you need shared rate limit counters across multiple application instances.

import redis
import time
from functools import wraps

redis_client = redis.Redis(host='localhost', port=6379, db=0)

def rate_limit(max_requests=60, window=60):
    def decorator(f):
        @wraps(f)
        def decorated_function(*args, **kwargs):
            key = f"rate_limit:{request.remote_addr}:{f.__name__}"
            current = redis_client.get(key)
            
            if current is None:
                redis_client.setex(key, window, 1)
                return f(*args, **kwargs)
            
            if int(current) >= max_requests:
                return {"error": "Rate limit exceeded"}, 429
                
            redis_client.incr(key)
            return f(*args, **kwargs)
        return decorated_function
    return decorator

@app.route('/api/upload')
@rate_limit(max_requests=5, window=300)  # 5 requests per 5 minutes
def upload_file():
    return {"status": "uploaded"}

SQL Injection Prevention

SQL injection occurs when user input is directly embedded into SQL queries. These attacks can lead to data theft, unauthorized access, and database corruption. Always use parameterized queries or ORM to prevent these vulnerabilities.

SQLAlchemy ORM automatically handles parameterization and escaping, making it the safest approach for database interactions in Flask applications.

from flask_sqlalchemy import SQLAlchemy

db = SQLAlchemy(app)

class User(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(80), unique=True, nullable=False)
    email = db.Column(db.String(120), unique=True, nullable=False)

# CORRECT - Using ORM
@app.route('/api/user/<username>')
def get_user(username):
    user = User.query.filter_by(username=username).first()
    if user:
        return {"id": user.id, "username": user.username}
    return {"error": "User not found"}, 404

Parameterized SQL queries

When ORM is not available, always use parameterized queries. Never concatenate user input directly into SQL strings as this creates injection vulnerabilities.

import sqlite3

# INCORRECT - Vulnerable to SQL injection
def get_user_vulnerable(username):
    conn = sqlite3.connect('app.db')
    cursor = conn.cursor()
    query = f"SELECT * FROM users WHERE username = '{username}'"
    cursor.execute(query)  # NEVER do this
    
# CORRECT - Parameterized query
def get_user_safe(username):
    conn = sqlite3.connect('app.db')
    cursor = conn.cursor()
    cursor.execute("SELECT * FROM users WHERE username = ?", (username,))
    result = cursor.fetchone()
    conn.close()
    return result

Input Validation

Input validation ensures that data received by your API conforms to expected formats and constraints. This prevents injection attacks, data corruption, and application errors caused by malformed input.

Using Marshmallow

Marshmallow provides robust schema-based validation with automatic error handling. It validates data types, formats, and custom business rules while providing clear error messages.

from marshmallow import Schema, fields, validate, ValidationError

class UserSchema(Schema):
    username = fields.Str(required=True, validate=validate.Length(min=3, max=20))
    email = fields.Email(required=True)
    age = fields.Int(validate=validate.Range(min=18, max=120))
    password = fields.Str(required=True, validate=validate.Length(min=8))

@app.route('/api/users', methods=['POST'])
def create_user():
    schema = UserSchema()
    try:
        data = schema.load(request.json)
    except ValidationError as err:
        return {"errors": err.messages}, 400
    
    # Process validated data
    return {"message": "User created", "user": data}, 201

Manual validation with sanitization

For simple cases or when you need custom validation logic, manual sanitization removes potentially dangerous characters and escapes HTML content.

import re
from html import escape

def sanitize_input(data):
    if isinstance(data, str):
        # Escape HTML
        data = escape(data)
        # Remove dangerous characters
        data = re.sub(r'[<>"\'/\\]', '', data)
    return data

@app.route('/api/search')
def search():
    query = request.args.get('q', '')
    query = sanitize_input(query)
    
    if len(query) < 3:
        return {"error": "Query too short"}, 400
        
    # Process safe search
    return {"results": f"Search results for: {query}"}

JWT Authentication

JSON Web Tokens (JWT) provide stateless authentication for APIs. They contain encoded user information and expiration times, eliminating the need for server-side session storage while maintaining security.

Complete implementation

This implementation covers token generation, validation, and protection of endpoints. It includes proper error handling for expired and invalid tokens.

import jwt
from datetime import datetime, timedelta
from functools import wraps
from werkzeug.security import check_password_hash

app.config['JWT_SECRET_KEY'] = 'your-secret-key-change-this'

def generate_token(user_id):
    payload = {
        'user_id': user_id,
        'exp': datetime.utcnow() + timedelta(hours=24),
        'iat': datetime.utcnow()
    }
    return jwt.encode(payload, app.config['JWT_SECRET_KEY'], algorithm='HS256')

def token_required(f):
    @wraps(f)
    def decorated(*args, **kwargs):
        token = request.headers.get('Authorization')
        
        if not token:
            return {'error': 'Token missing'}, 401
            
        try:
            if token.startswith('Bearer '):
                token = token[7:]
            data = jwt.decode(token, app.config['JWT_SECRET_KEY'], algorithms=['HS256'])
            current_user_id = data['user_id']
        except jwt.ExpiredSignatureError:
            return {'error': 'Token expired'}, 401
        except jwt.InvalidTokenError:
            return {'error': 'Invalid token'}, 401
            
        return f(current_user_id, *args, **kwargs)
    return decorated

@app.route('/api/login', methods=['POST'])
def login():
    data = request.get_json()
    username = data.get('username')
    password = data.get('password')
    
    user = User.query.filter_by(username=username).first()
    
    if user and check_password_hash(user.password, password):
        token = generate_token(user.id)
        return {'token': token}
    
    return {'error': 'Invalid credentials'}, 401

@app.route('/api/protected')
@token_required
def protected_route(current_user_id):
    return {'message': f'Hello user {current_user_id}', 'data': 'sensitive_data'}

Secure CORS

Cross-Origin Resource Sharing (CORS) controls which domains can access your API from browsers. Misconfigured CORS can expose your API to unauthorized domains and enable cross-site attacks.

Restrictive configuration

Always specify exact domains instead of wildcards. This prevents unauthorized websites from making requests to your API from users' browsers.

from flask_cors import CORS

# INCORRECT - Too permissive
CORS(app, origins="*")

# CORRECT - Restrictive configuration
CORS(app, 
     origins=['https://yourdomain.com', 'https://app.yourdomain.com'],
     methods=['GET', 'POST'],
     allow_headers=['Content-Type', 'Authorization'],
     expose_headers=['X-Total-Count'],
     supports_credentials=True,
     max_age=3600)

# Manual granular configuration
@app.after_request
def after_request(response):
    origin = request.headers.get('Origin')
    allowed_origins = ['https://yourdomain.com', 'https://app.yourdomain.com']
    
    if origin in allowed_origins:
        response.headers.add('Access-Control-Allow-Origin', origin)
        response.headers.add('Access-Control-Allow-Headers', 'Content-Type,Authorization')
        response.headers.add('Access-Control-Allow-Methods', 'GET,POST,PUT,DELETE')
        response.headers.add('Access-Control-Allow-Credentials', 'true')
    
    return response

Security Headers

Security headers instruct browsers how to handle your application's content. They prevent common attacks like XSS, clickjacking, and content type confusion.

Security headers implementation

These headers provide multiple layers of browser-based protection. Each header addresses specific attack vectors and should be implemented together for comprehensive security.

@app.after_request
def security_headers(response):
    # Prevent XSS attacks
    response.headers['X-Content-Type-Options'] = 'nosniff'
    response.headers['X-Frame-Options'] = 'DENY'
    response.headers['X-XSS-Protection'] = '1; mode=block'
    
    # HSTS for HTTPS
    response.headers['Strict-Transport-Security'] = 'max-age=31536000; includeSubDomains'
    
    # Basic CSP
    response.headers['Content-Security-Policy'] = "default-src 'self'"
    
    # Referrer policy
    response.headers['Referrer-Policy'] = 'strict-origin-when-cross-origin'
    
    return response

# Using Flask-Talisman (recommended)
from flask_talisman import Talisman

Talisman(app, 
         force_https=True,
         strict_transport_security=True,
         content_security_policy={
             'default-src': "'self'",
             'script-src': "'self' 'unsafe-inline'",
             'style-src': "'self' 'unsafe-inline'"
         })

Security Logging

Security logging tracks suspicious activities, failed authentication attempts, and potential attacks. This data is crucial for incident response, forensics, and identifying attack patterns.

Security logging system

Proper security logging captures relevant events without exposing sensitive information. It provides audit trails and helps detect ongoing attacks or reconnaissance attempts.

import logging
from datetime import datetime

# Configure logging
logging.basicConfig(
    filename='security.log',
    level=logging.WARNING,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

security_logger = logging.getLogger('security')

def log_security_event(event_type, details, user_id=None, ip_address=None):
    security_logger.warning(f"SECURITY_EVENT: {event_type} | User: {user_id} | IP: {ip_address} | Details: {details}")

# Middleware for automatic logging
@app.before_request
def log_request():
    # Log suspicious access attempts
    if request.endpoint in ['login', 'admin_panel']:
        log_security_event(
            'AUTH_ATTEMPT',
            f"Endpoint: {request.endpoint}",
            ip_address=request.remote_addr
        )

# Decorator for sensitive endpoints
def log_access(endpoint_name):
    def decorator(f):
        @wraps(f)
        def decorated_function(*args, **kwargs):
            log_security_event(
                'SENSITIVE_ACCESS',
                f"Endpoint: {endpoint_name}",
                ip_address=request.remote_addr
            )
            return f(*args, **kwargs)
        return decorated_function
    return decorator

@app.route('/api/admin/users')
@token_required
@log_access('admin_users')
def get_all_users(current_user_id):
    return {"users": []}

Secure Error Handling

Error handling prevents information leakage that could help attackers understand your system's internals. Generic error messages protect against reconnaissance while detailed logs help developers debug issues.

Error handling without sensitive information

This approach logs detailed errors for developers while returning generic messages to clients. It prevents stack trace exposure and system information leakage.

from werkzeug.exceptions import HTTPException

@app.errorhandler(Exception)
def handle_exception(e):
    # Log complete error for development
    app.logger.error(f"Unhandled exception: {str(e)}", exc_info=True)
    
    # Security logging
    log_security_event(
        'APPLICATION_ERROR',
        f"Error type: {type(e).__name__}",
        ip_address=request.remote_addr
    )
    
    # Generic response to client
    if isinstance(e, HTTPException):
        return {"error": "Request failed", "code": e.code}, e.code
    
    # Generic 500 error for unhandled exceptions
    return {"error": "Internal server error"}, 500

@app.errorhandler(401)
def unauthorized(error):
    log_security_event(
        'UNAUTHORIZED_ACCESS',
        f"Endpoint: {request.endpoint}",
        ip_address=request.remote_addr
    )
    return {"error": "Authentication required"}, 401

@app.errorhandler(403)
def forbidden(error):
    log_security_event(
        'FORBIDDEN_ACCESS',
        f"Endpoint: {request.endpoint}",
        ip_address=request.remote_addr
    )
    return {"error": "Access forbidden"}, 403

Secure File Upload

File uploads pose significant security risks including malware upload, path traversal attacks, and server resource exhaustion. Proper validation, sanitization, and scanning are essential for safe file handling.

File validation and sanitization

This implementation validates file types, limits file sizes, scans for malicious content, and uses secure filename handling to prevent various file upload attacks.

import os
from werkzeug.utils import secure_filename

UPLOAD_FOLDER = '/secure/uploads'
ALLOWED_EXTENSIONS = {'txt', 'pdf', 'png', 'jpg', 'jpeg', 'gif'}
MAX_FILE_SIZE = 5 * 1024 * 1024  # 5MB

app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
app.config['MAX_CONTENT_LENGTH'] = MAX_FILE_SIZE

def allowed_file(filename):
    return '.' in filename and \
           filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS

def scan_file_content(file_path):
    """Basic malicious content scanning"""
    dangerous_patterns = [b'<script', b'javascript:', b'<?php', b'<%']
    
    with open(file_path, 'rb') as f:
        content = f.read(1024)  # Read first 1KB
        for pattern in dangerous_patterns:
            if pattern in content.lower():
                return False
    return True

@app.route('/api/upload', methods=['POST'])
@token_required
@rate_limit(max_requests=5, window=300)
def upload_file(current_user_id):
    if 'file' not in request.files:
        return {'error': 'No file provided'}, 400
    
    file = request.files['file']
    
    if file.filename == '':
        return {'error': 'Empty filename'}, 400
    
    if file and allowed_file(file.filename):
        filename = secure_filename(file.filename)
        # Add timestamp to avoid collisions
        filename = f"{int(time.time())}_{filename}"
        file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
        
        file.save(file_path)
        
        # Scan content
        if not scan_file_content(file_path):
            os.remove(file_path)  # Remove dangerous file
            log_security_event(
                'MALICIOUS_UPLOAD',
                f"File: {filename}",
                user_id=current_user_id,
                ip_address=request.remote_addr
            )
            return {'error': 'File contains malicious content'}, 400
        
        return {'message': 'File uploaded successfully', 'filename': filename}
    
    return {'error': 'File type not allowed'}, 400

Production Configuration

Production environments require secure configuration management, environment variable usage, and validation of critical settings. This prevents accidental exposure of sensitive data and ensures security measures are properly enabled.

Environment variables and secure configuration

This configuration separates sensitive data from code, validates required variables in production, and enables security features like secure cookies and proper session handling.

import os
from dotenv import load_dotenv

load_dotenv()

class Config:
    SECRET_KEY = os.environ.get('SECRET_KEY') or 'dev-key-change-this'
    JWT_SECRET_KEY = os.environ.get('JWT_SECRET_KEY') or 'jwt-key-change-this'
    SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL') or 'sqlite:///app.db'
    SQLALCHEMY_TRACK_MODIFICATIONS = False
    
    # Rate limiting
    RATELIMIT_STORAGE_URL = os.environ.get('REDIS_URL') or 'redis://localhost:6379'
    
    # Security settings
    SESSION_COOKIE_SECURE = True
    SESSION_COOKIE_HTTPONLY = True
    SESSION_COOKIE_SAMESITE = 'Lax'
    PERMANENT_SESSION_LIFETIME = timedelta(minutes=30)

app.config.from_object(Config)

# Verify critical variables in production
if os.environ.get('FLASK_ENV') == 'production':
    required_vars = ['SECRET_KEY', 'JWT_SECRET_KEY', 'DATABASE_URL']
    missing_vars = [var for var in required_vars if not os.environ.get(var)]
    
    if missing_vars:
        raise EnvironmentError(f"Missing required environment variables: {missing_vars}")

Security Checklist

This checklist ensures all critical security measures are implemented before deploying to production. Each item addresses specific attack vectors and compliance requirements.

Pre-production verification

Verification commands

# Check vulnerable dependencies
pip-audit

# Scan code with bandit
bandit -r app.py

# Verify security headers
curl -I https://yourdomain.com/api/health

Additional Resources

Last updated