How to protect your flask APIs from hackers
Practical guide on implementing security measures in Flask APIs.
Flask API Security Best Practices
Summary
Practical guide on implementing security measures in Flask APIs. Includes code examples for rate limiting, input validation, JWT authentication, SQL injection prevention, CORS, security logging, and error handling.
Rate Limiting
Rate limiting prevents abuse by restricting the number of requests a client can make within a specific time window. This protects your API from brute force attacks, DoS attempts, and resource exhaustion.
Basic implementation with Flask-Limiter
Flask-Limiter is the most straightforward way to implement rate limiting. It provides decorators to easily set limits per endpoint with minimal configuration.
from flask import Flask
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
app = Flask(__name__)
limiter = Limiter(
app,
key_func=get_remote_address,
default_limits=["100 per hour"]
)
@app.route('/api/data')
@limiter.limit("10 per minute")
def get_data():
return {"data": "sensitive_info"}
Custom rate limiting with Redis
Redis-based rate limiting offers more control and scalability. This approach is ideal for distributed systems where you need shared rate limit counters across multiple application instances.
import redis
import time
from functools import wraps
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def rate_limit(max_requests=60, window=60):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
key = f"rate_limit:{request.remote_addr}:{f.__name__}"
current = redis_client.get(key)
if current is None:
redis_client.setex(key, window, 1)
return f(*args, **kwargs)
if int(current) >= max_requests:
return {"error": "Rate limit exceeded"}, 429
redis_client.incr(key)
return f(*args, **kwargs)
return decorated_function
return decorator
@app.route('/api/upload')
@rate_limit(max_requests=5, window=300) # 5 requests per 5 minutes
def upload_file():
return {"status": "uploaded"}
SQL Injection Prevention
SQL injection occurs when user input is directly embedded into SQL queries. These attacks can lead to data theft, unauthorized access, and database corruption. Always use parameterized queries or ORM to prevent these vulnerabilities.
Using SQLAlchemy ORM (Recommended)
SQLAlchemy ORM automatically handles parameterization and escaping, making it the safest approach for database interactions in Flask applications.
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy(app)
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
# CORRECT - Using ORM
@app.route('/api/user/<username>')
def get_user(username):
user = User.query.filter_by(username=username).first()
if user:
return {"id": user.id, "username": user.username}
return {"error": "User not found"}, 404
Parameterized SQL queries
When ORM is not available, always use parameterized queries. Never concatenate user input directly into SQL strings as this creates injection vulnerabilities.
import sqlite3
# INCORRECT - Vulnerable to SQL injection
def get_user_vulnerable(username):
conn = sqlite3.connect('app.db')
cursor = conn.cursor()
query = f"SELECT * FROM users WHERE username = '{username}'"
cursor.execute(query) # NEVER do this
# CORRECT - Parameterized query
def get_user_safe(username):
conn = sqlite3.connect('app.db')
cursor = conn.cursor()
cursor.execute("SELECT * FROM users WHERE username = ?", (username,))
result = cursor.fetchone()
conn.close()
return result
Input Validation
Input validation ensures that data received by your API conforms to expected formats and constraints. This prevents injection attacks, data corruption, and application errors caused by malformed input.
Using Marshmallow
Marshmallow provides robust schema-based validation with automatic error handling. It validates data types, formats, and custom business rules while providing clear error messages.
from marshmallow import Schema, fields, validate, ValidationError
class UserSchema(Schema):
username = fields.Str(required=True, validate=validate.Length(min=3, max=20))
email = fields.Email(required=True)
age = fields.Int(validate=validate.Range(min=18, max=120))
password = fields.Str(required=True, validate=validate.Length(min=8))
@app.route('/api/users', methods=['POST'])
def create_user():
schema = UserSchema()
try:
data = schema.load(request.json)
except ValidationError as err:
return {"errors": err.messages}, 400
# Process validated data
return {"message": "User created", "user": data}, 201
Manual validation with sanitization
For simple cases or when you need custom validation logic, manual sanitization removes potentially dangerous characters and escapes HTML content.
import re
from html import escape
def sanitize_input(data):
if isinstance(data, str):
# Escape HTML
data = escape(data)
# Remove dangerous characters
data = re.sub(r'[<>"\'/\\]', '', data)
return data
@app.route('/api/search')
def search():
query = request.args.get('q', '')
query = sanitize_input(query)
if len(query) < 3:
return {"error": "Query too short"}, 400
# Process safe search
return {"results": f"Search results for: {query}"}
JWT Authentication
JSON Web Tokens (JWT) provide stateless authentication for APIs. They contain encoded user information and expiration times, eliminating the need for server-side session storage while maintaining security.
Complete implementation
This implementation covers token generation, validation, and protection of endpoints. It includes proper error handling for expired and invalid tokens.
import jwt
from datetime import datetime, timedelta
from functools import wraps
from werkzeug.security import check_password_hash
app.config['JWT_SECRET_KEY'] = 'your-secret-key-change-this'
def generate_token(user_id):
payload = {
'user_id': user_id,
'exp': datetime.utcnow() + timedelta(hours=24),
'iat': datetime.utcnow()
}
return jwt.encode(payload, app.config['JWT_SECRET_KEY'], algorithm='HS256')
def token_required(f):
@wraps(f)
def decorated(*args, **kwargs):
token = request.headers.get('Authorization')
if not token:
return {'error': 'Token missing'}, 401
try:
if token.startswith('Bearer '):
token = token[7:]
data = jwt.decode(token, app.config['JWT_SECRET_KEY'], algorithms=['HS256'])
current_user_id = data['user_id']
except jwt.ExpiredSignatureError:
return {'error': 'Token expired'}, 401
except jwt.InvalidTokenError:
return {'error': 'Invalid token'}, 401
return f(current_user_id, *args, **kwargs)
return decorated
@app.route('/api/login', methods=['POST'])
def login():
data = request.get_json()
username = data.get('username')
password = data.get('password')
user = User.query.filter_by(username=username).first()
if user and check_password_hash(user.password, password):
token = generate_token(user.id)
return {'token': token}
return {'error': 'Invalid credentials'}, 401
@app.route('/api/protected')
@token_required
def protected_route(current_user_id):
return {'message': f'Hello user {current_user_id}', 'data': 'sensitive_data'}
Secure CORS
Cross-Origin Resource Sharing (CORS) controls which domains can access your API from browsers. Misconfigured CORS can expose your API to unauthorized domains and enable cross-site attacks.
Restrictive configuration
Always specify exact domains instead of wildcards. This prevents unauthorized websites from making requests to your API from users' browsers.
from flask_cors import CORS
# INCORRECT - Too permissive
CORS(app, origins="*")
# CORRECT - Restrictive configuration
CORS(app,
origins=['https://yourdomain.com', 'https://app.yourdomain.com'],
methods=['GET', 'POST'],
allow_headers=['Content-Type', 'Authorization'],
expose_headers=['X-Total-Count'],
supports_credentials=True,
max_age=3600)
# Manual granular configuration
@app.after_request
def after_request(response):
origin = request.headers.get('Origin')
allowed_origins = ['https://yourdomain.com', 'https://app.yourdomain.com']
if origin in allowed_origins:
response.headers.add('Access-Control-Allow-Origin', origin)
response.headers.add('Access-Control-Allow-Headers', 'Content-Type,Authorization')
response.headers.add('Access-Control-Allow-Methods', 'GET,POST,PUT,DELETE')
response.headers.add('Access-Control-Allow-Credentials', 'true')
return response
Security Headers
Security headers instruct browsers how to handle your application's content. They prevent common attacks like XSS, clickjacking, and content type confusion.
Security headers implementation
These headers provide multiple layers of browser-based protection. Each header addresses specific attack vectors and should be implemented together for comprehensive security.
@app.after_request
def security_headers(response):
# Prevent XSS attacks
response.headers['X-Content-Type-Options'] = 'nosniff'
response.headers['X-Frame-Options'] = 'DENY'
response.headers['X-XSS-Protection'] = '1; mode=block'
# HSTS for HTTPS
response.headers['Strict-Transport-Security'] = 'max-age=31536000; includeSubDomains'
# Basic CSP
response.headers['Content-Security-Policy'] = "default-src 'self'"
# Referrer policy
response.headers['Referrer-Policy'] = 'strict-origin-when-cross-origin'
return response
# Using Flask-Talisman (recommended)
from flask_talisman import Talisman
Talisman(app,
force_https=True,
strict_transport_security=True,
content_security_policy={
'default-src': "'self'",
'script-src': "'self' 'unsafe-inline'",
'style-src': "'self' 'unsafe-inline'"
})
Security Logging
Security logging tracks suspicious activities, failed authentication attempts, and potential attacks. This data is crucial for incident response, forensics, and identifying attack patterns.
Security logging system
Proper security logging captures relevant events without exposing sensitive information. It provides audit trails and helps detect ongoing attacks or reconnaissance attempts.
import logging
from datetime import datetime
# Configure logging
logging.basicConfig(
filename='security.log',
level=logging.WARNING,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
security_logger = logging.getLogger('security')
def log_security_event(event_type, details, user_id=None, ip_address=None):
security_logger.warning(f"SECURITY_EVENT: {event_type} | User: {user_id} | IP: {ip_address} | Details: {details}")
# Middleware for automatic logging
@app.before_request
def log_request():
# Log suspicious access attempts
if request.endpoint in ['login', 'admin_panel']:
log_security_event(
'AUTH_ATTEMPT',
f"Endpoint: {request.endpoint}",
ip_address=request.remote_addr
)
# Decorator for sensitive endpoints
def log_access(endpoint_name):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
log_security_event(
'SENSITIVE_ACCESS',
f"Endpoint: {endpoint_name}",
ip_address=request.remote_addr
)
return f(*args, **kwargs)
return decorated_function
return decorator
@app.route('/api/admin/users')
@token_required
@log_access('admin_users')
def get_all_users(current_user_id):
return {"users": []}
Secure Error Handling
Error handling prevents information leakage that could help attackers understand your system's internals. Generic error messages protect against reconnaissance while detailed logs help developers debug issues.
Error handling without sensitive information
This approach logs detailed errors for developers while returning generic messages to clients. It prevents stack trace exposure and system information leakage.
from werkzeug.exceptions import HTTPException
@app.errorhandler(Exception)
def handle_exception(e):
# Log complete error for development
app.logger.error(f"Unhandled exception: {str(e)}", exc_info=True)
# Security logging
log_security_event(
'APPLICATION_ERROR',
f"Error type: {type(e).__name__}",
ip_address=request.remote_addr
)
# Generic response to client
if isinstance(e, HTTPException):
return {"error": "Request failed", "code": e.code}, e.code
# Generic 500 error for unhandled exceptions
return {"error": "Internal server error"}, 500
@app.errorhandler(401)
def unauthorized(error):
log_security_event(
'UNAUTHORIZED_ACCESS',
f"Endpoint: {request.endpoint}",
ip_address=request.remote_addr
)
return {"error": "Authentication required"}, 401
@app.errorhandler(403)
def forbidden(error):
log_security_event(
'FORBIDDEN_ACCESS',
f"Endpoint: {request.endpoint}",
ip_address=request.remote_addr
)
return {"error": "Access forbidden"}, 403
Secure File Upload
File uploads pose significant security risks including malware upload, path traversal attacks, and server resource exhaustion. Proper validation, sanitization, and scanning are essential for safe file handling.
File validation and sanitization
This implementation validates file types, limits file sizes, scans for malicious content, and uses secure filename handling to prevent various file upload attacks.
import os
from werkzeug.utils import secure_filename
UPLOAD_FOLDER = '/secure/uploads'
ALLOWED_EXTENSIONS = {'txt', 'pdf', 'png', 'jpg', 'jpeg', 'gif'}
MAX_FILE_SIZE = 5 * 1024 * 1024 # 5MB
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
app.config['MAX_CONTENT_LENGTH'] = MAX_FILE_SIZE
def allowed_file(filename):
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def scan_file_content(file_path):
"""Basic malicious content scanning"""
dangerous_patterns = [b'<script', b'javascript:', b'<?php', b'<%']
with open(file_path, 'rb') as f:
content = f.read(1024) # Read first 1KB
for pattern in dangerous_patterns:
if pattern in content.lower():
return False
return True
@app.route('/api/upload', methods=['POST'])
@token_required
@rate_limit(max_requests=5, window=300)
def upload_file(current_user_id):
if 'file' not in request.files:
return {'error': 'No file provided'}, 400
file = request.files['file']
if file.filename == '':
return {'error': 'Empty filename'}, 400
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
# Add timestamp to avoid collisions
filename = f"{int(time.time())}_{filename}"
file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
file.save(file_path)
# Scan content
if not scan_file_content(file_path):
os.remove(file_path) # Remove dangerous file
log_security_event(
'MALICIOUS_UPLOAD',
f"File: {filename}",
user_id=current_user_id,
ip_address=request.remote_addr
)
return {'error': 'File contains malicious content'}, 400
return {'message': 'File uploaded successfully', 'filename': filename}
return {'error': 'File type not allowed'}, 400
Production Configuration
Production environments require secure configuration management, environment variable usage, and validation of critical settings. This prevents accidental exposure of sensitive data and ensures security measures are properly enabled.
Environment variables and secure configuration
This configuration separates sensitive data from code, validates required variables in production, and enables security features like secure cookies and proper session handling.
import os
from dotenv import load_dotenv
load_dotenv()
class Config:
SECRET_KEY = os.environ.get('SECRET_KEY') or 'dev-key-change-this'
JWT_SECRET_KEY = os.environ.get('JWT_SECRET_KEY') or 'jwt-key-change-this'
SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL') or 'sqlite:///app.db'
SQLALCHEMY_TRACK_MODIFICATIONS = False
# Rate limiting
RATELIMIT_STORAGE_URL = os.environ.get('REDIS_URL') or 'redis://localhost:6379'
# Security settings
SESSION_COOKIE_SECURE = True
SESSION_COOKIE_HTTPONLY = True
SESSION_COOKIE_SAMESITE = 'Lax'
PERMANENT_SESSION_LIFETIME = timedelta(minutes=30)
app.config.from_object(Config)
# Verify critical variables in production
if os.environ.get('FLASK_ENV') == 'production':
required_vars = ['SECRET_KEY', 'JWT_SECRET_KEY', 'DATABASE_URL']
missing_vars = [var for var in required_vars if not os.environ.get(var)]
if missing_vars:
raise EnvironmentError(f"Missing required environment variables: {missing_vars}")
Security Checklist
This checklist ensures all critical security measures are implemented before deploying to production. Each item addresses specific attack vectors and compliance requirements.
Pre-production verification
Verification commands
# Check vulnerable dependencies
pip-audit
# Scan code with bandit
bandit -r app.py
# Verify security headers
curl -I https://yourdomain.com/api/health
Additional Resources
Last updated