Table of Contents

Code Style and Formatting

PEP 8 Compliance

# Good: Following PEP 8 standards
import os
import sys
from typing import List, Dict, Optional

class DataProcessor:
    """Process and analyze data with various operations."""
    
    def __init__(self, data_source: str, max_items: int = 1000) -> None:
        self.data_source = data_source
        self.max_items = max_items
        self._processed_count = 0
    
    def process_data(self, input_data: List[Dict]) -> List[Dict]:
        """Process input data and return cleaned results."""
        processed_results = []
        
        for item in input_data[:self.max_items]:
            if self._is_valid_item(item):
                cleaned_item = self._clean_item(item)
                processed_results.append(cleaned_item)
                self._processed_count += 1
        
        return processed_results
    
    def _is_valid_item(self, item: Dict) -> bool:
        """Check if item meets validation criteria."""
        required_fields = ['id', 'name', 'timestamp']
        return all(field in item for field in required_fields)
    
    def _clean_item(self, item: Dict) -> Dict:
        """Clean and normalize item data."""
        return {
            'id': str(item['id']).strip(),
            'name': item['name'].title(),
            'timestamp': item['timestamp'],
            'processed_at': time.time()
        }

# Bad: Poor formatting and naming
import os,sys
from typing import *
class dataprocessor:
    def __init__(self,datasource,maxitems=1000):
        self.datasource=datasource
        self.maxitems=maxitems
        self.processedcount=0
    def processdata(self,inputdata):
        processedresults=[]
        for item in inputdata[:self.maxitems]:
            if self.isvaliditem(item):
                cleaneditem=self.cleanitem(item)
                processedresults.append(cleaneditem)
                self.processedcount+=1
        return processedresults

Code Formatting Tools

# .pre-commit-config.yaml
repos:
  - repo: https://github.com/psf/black
    rev: 23.7.0
    hooks:
      - id: black
        language_version: python3.11

  - repo: https://github.com/PyCQA/isort
    rev: 5.12.0
    hooks:
      - id: isort
        args: ["--profile", "black"]

  - repo: https://github.com/PyCQA/flake8
    rev: 6.0.0
    hooks:
      - id: flake8
        additional_dependencies: [flake8-docstrings, flake8-type-checking]

  - repo: https://github.com/pre-commit/mirrors-mypy
    rev: v1.5.1
    hooks:
      - id: mypy
        additional_dependencies: [types-requests, types-PyYAML]

Project Structure and Organization

Standard Project Layout

my_project/
├── README.md
├── pyproject.toml
├── requirements.txt
├── requirements-dev.txt
├── .env.example
├── .gitignore
├── .pre-commit-config.yaml
├── src/
│   └── my_project/
│       ├── __init__.py
│       ├── main.py
│       ├── config/
│       │   ├── __init__.py
│       │   └── settings.py
│       ├── core/
│       │   ├── __init__.py
│       │   ├── models.py
│       │   └── services.py
│       ├── utils/
│       │   ├── __init__.py
│       │   ├── helpers.py
│       │   └── validators.py
│       └── cli/
│           ├── __init__.py
│           └── commands.py
├── tests/
│   ├── __init__.py
│   ├── conftest.py
│   ├── unit/
│   │   ├── __init__.py
│   │   ├── test_models.py
│   │   └── test_services.py
│   └── integration/
│       ├── __init__.py
│       └── test_api.py
├── docs/
│   ├── conf.py
│   ├── index.rst
│   └── api/
└── scripts/
    ├── setup.sh
    └── deploy.sh

Configuration Management

# config/settings.py
import os
from pathlib import Path
from typing import Dict, Any
from dataclasses import dataclass
from functools import lru_cache

@dataclass
class DatabaseConfig:
    """Database configuration settings."""
    host: str
    port: int
    name: str
    user: str
    password: str
    ssl_mode: str = "require"
    
    @property
    def connection_string(self) -> str:
        """Generate database connection string."""
        return (
            f"postgresql://{self.user}:{self.password}"
            f"@{self.host}:{self.port}/{self.name}"
            f"?sslmode={self.ssl_mode}"
        )

@dataclass
class AppConfig:
    """Application configuration settings."""
    debug: bool
    secret_key: str
    database: DatabaseConfig
    redis_url: str
    log_level: str = "INFO"
    max_upload_size: int = 10 * 1024 * 1024  # 10MB
    
    @classmethod
    def from_env(cls) -> 'AppConfig':
        """Create configuration from environment variables."""
        database_config = DatabaseConfig(
            host=os.getenv('DB_HOST', 'localhost'),
            port=int(os.getenv('DB_PORT', '5432')),
            name=os.getenv('DB_NAME', 'myapp'),
            user=os.getenv('DB_USER', 'postgres'),
            password=os.getenv('DB_PASSWORD', ''),
            ssl_mode=os.getenv('DB_SSL_MODE', 'require')
        )
        
        return cls(
            debug=os.getenv('DEBUG', 'False').lower() == 'true',
            secret_key=os.getenv('SECRET_KEY', ''),
            database=database_config,
            redis_url=os.getenv('REDIS_URL', 'redis://localhost:6379'),
            log_level=os.getenv('LOG_LEVEL', 'INFO'),
            max_upload_size=int(os.getenv('MAX_UPLOAD_SIZE', '10485760'))
        )

@lru_cache()
def get_config() -> AppConfig:
    """Get cached application configuration."""
    return AppConfig.from_env()

Error Handling and Logging

Robust Error Handling

import logging
import traceback
from typing import Optional, Type, Union
from contextlib import contextmanager
from functools import wraps

# Custom exceptions with clear hierarchy
class AppError(Exception):
    """Base application exception."""
    def __init__(self, message: str, error_code: Optional[str] = None):
        super().__init__(message)
        self.error_code = error_code
        self.message = message

class ValidationError(AppError):
    """Raised when data validation fails."""
    pass

class DatabaseError(AppError):
    """Raised when database operations fail."""
    pass

class ExternalServiceError(AppError):
    """Raised when external service calls fail."""
    pass

# Error handling decorator
def handle_errors(
    error_types: Union[Type[Exception], tuple] = Exception,
    default_return=None,
    log_errors: bool = True
):
    """Decorator for handling and logging errors."""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            try:
                return func(*args, **kwargs)
            except error_types as e:
                if log_errors:
                    logger = logging.getLogger(func.__module__)
                    logger.error(
                        f"Error in {func.__name__}: {str(e)}",
                        exc_info=True,
                        extra={
                            'function': func.__name__,
                            'args': str(args),
                            'kwargs': str(kwargs)
                        }
                    )
                
                if isinstance(e, AppError):
                    raise  # Re-raise custom exceptions
                
                # Convert to custom exception
                raise AppError(f"Unexpected error in {func.__name__}: {str(e)}")
        
        return wrapper
    return decorator

# Context manager for error handling
@contextmanager
def error_handler(operation_name: str):
    """Context manager for handling errors in operations."""
    logger = logging.getLogger(__name__)
    logger.info(f"Starting operation: {operation_name}")
    
    try:
        yield
        logger.info(f"Operation completed successfully: {operation_name}")
    except Exception as e:
        logger.error(
            f"Operation failed: {operation_name} - {str(e)}",
            exc_info=True
        )
        raise
    finally:
        logger.debug(f"Cleaning up operation: {operation_name}")

# Example usage
class DataService:
    """Service for data operations with robust error handling."""
    
    def __init__(self, database_url: str):
        self.database_url = database_url
        self.logger = logging.getLogger(__name__)
    
    @handle_errors(error_types=(DatabaseError, ValidationError))
    def save_user(self, user_data: dict) -> dict:
        """Save user data with error handling."""
        with error_handler("save_user"):
            # Validate input
            if not user_data.get('email'):
                raise ValidationError("Email is required")
            
            # Simulate database operation
            try:
                # Database save logic here
                result = {'id': 123, **user_data}
                self.logger.info(f"User saved successfully: {result['id']}")
                return result
            except Exception as e:
                raise DatabaseError(f"Failed to save user: {str(e)}")

Structured Logging

import logging
import json
import sys
from datetime import datetime
from typing import Dict, Any

class StructuredFormatter(logging.Formatter):
    """JSON structured logging formatter."""
    
    def format(self, record: logging.LogRecord) -> str:
        """Format log record as structured JSON."""
        log_entry = {
            'timestamp': datetime.utcnow().isoformat(),
            'level': record.levelname,
            'logger': record.name,
            'message': record.getMessage(),
            'module': record.module,
            'function': record.funcName,
            'line': record.lineno
        }
        
        # Add extra fields
        if hasattr(record, '__dict__'):
            for key, value in record.__dict__.items():
                if key not in ['name', 'msg', 'args', 'levelname', 'levelno',
                              'pathname', 'filename', 'module', 'lineno',
                              'funcName', 'created', 'msecs', 'relativeCreated',
                              'thread', 'threadName', 'processName', 'process',
                              'getMessage', 'exc_info', 'exc_text', 'stack_info']:
                    log_entry[key] = value
        
        # Add exception info
        if record.exc_info:
            log_entry['exception'] = self.formatException(record.exc_info)
        
        return json.dumps(log_entry, ensure_ascii=False)

def setup_logging(level: str = "INFO", structured: bool = True) -> None:
    """Setup application logging configuration."""
    
    # Create logger
    logger = logging.getLogger()
    logger.setLevel(getattr(logging, level.upper()))
    
    # Remove existing handlers
    for handler in logger.handlers[:]:
        logger.removeHandler(handler)
    
    # Create console handler
    console_handler = logging.StreamHandler(sys.stdout)
    
    if structured:
        formatter = StructuredFormatter()
    else:
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
    
    console_handler.setFormatter(formatter)
    logger.addHandler(console_handler)
    
    # Add file handler for production
    if level.upper() != "DEBUG":
        file_handler = logging.FileHandler('app.log')
        file_handler.setFormatter(formatter)
        logger.addHandler(file_handler)

# Logger utility class
class LoggerMixin:
    """Mixin to provide logging capabilities to classes."""
    
    @property
    def logger(self) -> logging.Logger:
        """Get logger for this class."""
        return logging.getLogger(f"{self.__class__.__module__}.{self.__class__.__name__}")
    
    def log_method_call(self, method_name: str, **kwargs):
        """Log method call with parameters."""
        self.logger.debug(
            f"Calling {method_name}",
            extra={'method': method_name, 'parameters': kwargs}
        )

Testing Best Practices

Comprehensive Test Strategy

# tests/conftest.py
import pytest
import asyncio
from unittest.mock import Mock, patch
from typing import Generator, AsyncGenerator

@pytest.fixture
def mock_database():
    """Mock database connection."""
    with patch('my_project.database.get_connection') as mock_conn:
        mock_conn.return_value = Mock()
        yield mock_conn.return_value

@pytest.fixture
async def async_client():
    """Async test client fixture."""
    from my_project.app import create_app
    app = create_app(testing=True)
    
    async with app.test_client() as client:
        yield client

@pytest.fixture
def sample_user_data():
    """Sample user data for testing."""
    return {
        'name': 'John Doe',
        'email': 'john.doe@example.com',
        'age': 30,
        'role': 'user'
    }

# tests/unit/test_services.py
import pytest
from unittest.mock import Mock, patch, AsyncMock
from my_project.services import UserService
from my_project.models import User
from my_project.exceptions import ValidationError, DatabaseError

class TestUserService:
    """Test cases for UserService."""
    
    @pytest.fixture
    def user_service(self, mock_database):
        """Create UserService instance with mocked database."""
        return UserService(mock_database)
    
    def test_create_user_success(self, user_service, sample_user_data):
        """Test successful user creation."""
        # Arrange
        user_service.db.save.return_value = {'id': 1, **sample_user_data}
        
        # Act
        result = user_service.create_user(sample_user_data)
        
        # Assert
        assert result['id'] == 1
        assert result['name'] == sample_user_data['name']
        user_service.db.save.assert_called_once()
    
    def test_create_user_validation_error(self, user_service):
        """Test user creation with invalid data."""
        # Arrange
        invalid_data = {'name': ''}  # Missing email
        
        # Act & Assert
        with pytest.raises(ValidationError) as exc_info:
            user_service.create_user(invalid_data)
        
        assert 'email' in str(exc_info.value)
    
    @patch('my_project.services.send_email')
    def test_create_user_with_notification(self, mock_send_email, user_service, sample_user_data):
        """Test user creation with email notification."""
        # Arrange
        user_service.db.save.return_value = {'id': 1, **sample_user_data}
        
        # Act
        result = user_service.create_user(sample_user_data, send_notification=True)
        
        # Assert
        assert result['id'] == 1
        mock_send_email.assert_called_once_with(
            sample_user_data['email'],
            'Welcome!',
            pytest.any(str)
        )
    
    def test_get_user_not_found(self, user_service):
        """Test getting non-existent user."""
        # Arrange
        user_service.db.get.return_value = None
        
        # Act & Assert
        with pytest.raises(ValidationError):
            user_service.get_user(999)

# Property-based testing with Hypothesis
from hypothesis import given, strategies as st

class TestDataValidation:
    """Property-based tests for data validation."""
    
    @given(st.emails())
    def test_email_validation_with_valid_emails(self, email):
        """Test email validation with generated valid emails."""
        from my_project.utils.validators import is_valid_email
        assert is_valid_email(email) is True
    
    @given(st.text().filter(lambda x: '@' not in x))
    def test_email_validation_with_invalid_emails(self, invalid_email):
        """Test email validation with invalid emails."""
        from my_project.utils.validators import is_valid_email
        assert is_valid_email(invalid_email) is False
    
    @given(st.integers(min_value=1, max_value=120))
    def test_age_validation_with_valid_ages(self, age):
        """Test age validation with valid ranges."""
        from my_project.utils.validators import is_valid_age
        assert is_valid_age(age) is True

# Integration tests
class TestUserIntegration:
    """Integration tests for user operations."""
    
    @pytest.mark.integration
    async def test_full_user_workflow(self, async_client, sample_user_data):
        """Test complete user workflow from creation to deletion."""
        # Create user
        response = await async_client.post('/users', json=sample_user_data)
        assert response.status_code == 201
        user_id = response.json['id']
        
        # Get user
        response = await async_client.get(f'/users/{user_id}')
        assert response.status_code == 200
        assert response.json['email'] == sample_user_data['email']
        
        # Update user
        update_data = {'name': 'Jane Doe'}
        response = await async_client.patch(f'/users/{user_id}', json=update_data)
        assert response.status_code == 200
        assert response.json['name'] == 'Jane Doe'
        
        # Delete user
        response = await async_client.delete(f'/users/{user_id}')
        assert response.status_code == 204

# Performance tests
class TestPerformance:
    """Performance and load tests."""
    
    @pytest.mark.performance
    def test_bulk_user_creation_performance(self, user_service):
        """Test performance of bulk user creation."""
        import time
        
        users_data = [
            {'name': f'User {i}', 'email': f'user{i}@example.com'}
            for i in range(1000)
        ]
        
        start_time = time.time()
        results = user_service.create_users_bulk(users_data)
        end_time = time.time()
        
        # Assert all users created
        assert len(results) == 1000
        
        # Assert performance threshold (< 5 seconds for 1000 users)
        execution_time = end_time - start_time
        assert execution_time < 5.0, f"Bulk creation took {execution_time:.2f}s"

Test Configuration

# pytest.ini
[tool:pytest]
minversion = 6.0
addopts = 
    -ra
    --strict-markers
    --strict-config
    --cov=src
    --cov-report=term-missing
    --cov-report=html
    --cov-report=xml
    --cov-fail-under=80
testpaths = tests
markers =
    unit: Unit tests
    integration: Integration tests
    performance: Performance tests
    slow: Slow running tests
    external: Tests requiring external services
filterwarnings =
    error
    ignore::UserWarning
    ignore::DeprecationWarning

Performance Optimization

Profiling and Monitoring

import cProfile
import pstats
import functools
import time
import psutil
import asyncio
from typing import Callable, Any
from contextlib import contextmanager

def profile_function(sort_by: str = 'cumulative'):
    """Decorator to profile function execution."""
    def decorator(func: Callable) -> Callable:
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            profiler = cProfile.Profile()
            profiler.enable()
            
            try:
                result = func(*args, **kwargs)
                return result
            finally:
                profiler.disable()
                
                # Print profile stats
                stats = pstats.Stats(profiler)
                stats.sort_stats(sort_by)
                stats.print_stats(20)  # Top 20 functions
        
        return wrapper
    return decorator

def time_it(func: Callable) -> Callable:
    """Decorator to measure function execution time."""
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.perf_counter()
        result = func(*args, **kwargs)
        end_time = time.perf_counter()
        
        execution_time = end_time - start_time
        print(f"{func.__name__} executed in {execution_time:.4f} seconds")
        
        return result
    return wrapper

@contextmanager
def memory_monitor(description: str = "Operation"):
    """Context manager to monitor memory usage."""
    process = psutil.Process()
    
    # Get initial memory usage
    initial_memory = process.memory_info().rss / 1024 / 1024  # MB
    print(f"{description} - Initial memory: {initial_memory:.2f} MB")
    
    try:
        yield
    finally:
        # Get final memory usage
        final_memory = process.memory_info().rss / 1024 / 1024  # MB
        memory_diff = final_memory - initial_memory
        print(f"{description} - Final memory: {final_memory:.2f} MB")
        print(f"{description} - Memory difference: {memory_diff:+.2f} MB")

# Async performance utilities
class AsyncPerformanceMonitor:
    """Monitor async operations performance."""
    
    def __init__(self):
        self.operation_times = {}
        self.operation_counts = {}
    
    async def __aenter__(self):
        self.start_time = time.perf_counter()
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        self.execution_time = time.perf_counter() - self.start_time
    
    def record_operation(self, operation_name: str, execution_time: float):
        """Record operation performance metrics."""
        if operation_name not in self.operation_times:
            self.operation_times[operation_name] = []
            self.operation_counts[operation_name] = 0
        
        self.operation_times[operation_name].append(execution_time)
        self.operation_counts[operation_name] += 1
    
    def get_stats(self) -> dict:
        """Get performance statistics."""
        stats = {}
        
        for operation, times in self.operation_times.items():
            stats[operation] = {
                'count': self.operation_counts[operation],
                'total_time': sum(times),
                'average_time': sum(times) / len(times),
                'min_time': min(times),
                'max_time': max(times)
            }
        
        return stats

# Example usage
class OptimizedDataProcessor:
    """Data processor with performance optimizations."""
    
    def __init__(self):
        self.cache = {}
        self.performance_monitor = AsyncPerformanceMonitor()
    
    @time_it
    @profile_function()
    def process_large_dataset(self, data: list) -> list:
        """Process large dataset with performance monitoring."""
        with memory_monitor("Large dataset processing"):
            # Use generator for memory efficiency
            return list(self._process_items_generator(data))
    
    def _process_items_generator(self, data: list):
        """Generator for memory-efficient processing."""
        for item in data:
            # Simulate processing
            processed_item = self._process_single_item(item)
            yield processed_item
    
    def _process_single_item(self, item: dict) -> dict:
        """Process single item with caching."""
        cache_key = f"{item.get('id', '')}-{item.get('type', '')}"
        
        if cache_key in self.cache:
            return self.cache[cache_key]
        
        # Simulate expensive operation
        result = {
            'id': item.get('id'),
            'processed': True,
            'timestamp': time.time()
        }
        
        # Cache result
        self.cache[cache_key] = result
        return result
    
    async def async_batch_process(self, items: list, batch_size: int = 100) -> list:
        """Process items in async batches for better performance."""
        results = []
        
        for i in range(0, len(items), batch_size):
            batch = items[i:i + batch_size]
            
            async with AsyncPerformanceMonitor() as monitor:
                batch_results = await asyncio.gather(*[
                    self._async_process_item(item) for item in batch
                ])
                results.extend(batch_results)
            
            self.performance_monitor.record_operation(
                f"batch_process_{len(batch)}", 
                monitor.execution_time
            )
        
        return results
    
    async def _async_process_item(self, item: dict) -> dict:
        """Async processing of single item."""
        # Simulate async operation
        await asyncio.sleep(0.01)
        return self._process_single_item(item)

Security Best Practices

Input Validation and Sanitization

import re
import html
import bleach
from typing import Any, Dict, List
from urllib.parse import urlparse
from dataclasses import dataclass

@dataclass
class ValidationRule:
    """Data validation rule configuration."""
    field_name: str
    required: bool = False
    data_type: type = str
    min_length: int = 0
    max_length: int = 1000
    regex_pattern: str = None
    allowed_values: List[Any] = None
    custom_validator: callable = None

class SecureValidator:
    """Secure data validation and sanitization."""
    
    EMAIL_PATTERN = re.compile(
        r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
    )
    
    # Safe HTML tags for content
    ALLOWED_HTML_TAGS = [
        'p', 'br', 'strong', 'em', 'u', 'ol', 'ul', 'li',
        'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'blockquote'
    ]
    
    ALLOWED_HTML_ATTRIBUTES = {
        '*': ['class'],
        'a': ['href', 'title'],
        'img': ['src', 'alt', 'width', 'height']
    }
    
    @classmethod
    def validate_email(cls, email: str) -> bool:
        """Validate email format."""
        if not email or not isinstance(email, str):
            return False
        return bool(cls.EMAIL_PATTERN.match(email.strip().lower()))
    
    @classmethod
    def validate_url(cls, url: str) -> bool:
        """Validate URL format and safety."""
        try:
            parsed = urlparse(url)
            return bool(
                parsed.scheme in ('http', 'https') and
                parsed.netloc and
                len(url) <= 2048
            )
        except Exception:
            return False
    
    @classmethod
    def sanitize_html(cls, content: str) -> str:
        """Sanitize HTML content to prevent XSS."""
        if not content:
            return ""
        
        # Clean HTML with bleach
        cleaned = bleach.clean(
            content,
            tags=cls.ALLOWED_HTML_TAGS,
            attributes=cls.ALLOWED_HTML_ATTRIBUTES,
            strip=True
        )
        
        return cleaned
    
    @classmethod
    def sanitize_string(cls, value: str, escape_html: bool = True) -> str:
        """Sanitize string input."""
        if not isinstance(value, str):
            return ""
        
        # Strip whitespace
        sanitized = value.strip()
        
        # Escape HTML if requested
        if escape_html:
            sanitized = html.escape(sanitized)
        
        return sanitized
    
    @classmethod
    def validate_data(cls, data: Dict[str, Any], rules: List[ValidationRule]) -> Dict[str, Any]:
        """Validate data against rules."""
        errors = {}
        validated_data = {}
        
        for rule in rules:
            field_value = data.get(rule.field_name)
            
            # Check required fields
            if rule.required and (field_value is None or field_value == ""):
                errors[rule.field_name] = f"{rule.field_name} is required"
                continue
            
            # Skip validation for None values on optional fields
            if field_value is None and not rule.required:
                validated_data[rule.field_name] = None
                continue
            
            # Type validation
            if not isinstance(field_value, rule.data_type):
                try:
                    field_value = rule.data_type(field_value)
                except (ValueError, TypeError):
                    errors[rule.field_name] = f"{rule.field_name} must be of type {rule.data_type.__name__}"
                    continue
            
            # String length validation
            if isinstance(field_value, str):
                if len(field_value) < rule.min_length:
                    errors[rule.field_name] = f"{rule.field_name} must be at least {rule.min_length} characters"
                    continue
                
                if len(field_value) > rule.max_length:
                    errors[rule.field_name] = f"{rule.field_name} must be no more than {rule.max_length} characters"
                    continue
                
                # Sanitize string
                field_value = cls.sanitize_string(field_value)
            
            # Regex pattern validation
            if rule.regex_pattern and isinstance(field_value, str):
                if not re.match(rule.regex_pattern, field_value):
                    errors[rule.field_name] = f"{rule.field_name} format is invalid"
                    continue
            
            # Allowed values validation
            if rule.allowed_values and field_value not in rule.allowed_values:
                errors[rule.field_name] = f"{rule.field_name} must be one of: {', '.join(map(str, rule.allowed_values))}"
                continue
            
            # Custom validation
            if rule.custom_validator:
                try:
                    if not rule.custom_validator(field_value):
                        errors[rule.field_name] = f"{rule.field_name} failed custom validation"
                        continue
                except Exception as e:
                    errors[rule.field_name] = f"{rule.field_name} validation error: {str(e)}"
                    continue
            
            validated_data[rule.field_name] = field_value
        
        if errors:
            raise ValidationError(f"Validation failed: {errors}")
        
        return validated_data

# Example secure API endpoint
from flask import Flask, request, jsonify
import hashlib
import secrets

class SecureUserAPI:
    """Secure user API with validation and rate limiting."""
    
    def __init__(self):
        self.rate_limit_store = {}  # In production, use Redis
        self.user_validation_rules = [
            ValidationRule('name', required=True, min_length=2, max_length=100),
            ValidationRule('email', required=True, max_length=255, 
                         custom_validator=SecureValidator.validate_email),
            ValidationRule('age', required=False, data_type=int),
            ValidationRule('role', required=True, 
                         allowed_values=['user', 'admin', 'moderator'])
        ]
    
    def create_user(self, request_data: dict) -> dict:
        """Create user with secure validation."""
        try:
            # Rate limiting check
            client_ip = request.environ.get('REMOTE_ADDR', 'unknown')
            if not self._check_rate_limit(client_ip):
                raise SecurityError("Rate limit exceeded")
            
            # Validate and sanitize input
            validated_data = SecureValidator.validate_data(
                request_data, 
                self.user_validation_rules
            )
            
            # Hash sensitive data
            if 'password' in request_data:
                validated_data['password_hash'] = self._hash_password(
                    request_data['password']
                )
            
            # Generate secure token
            validated_data['user_token'] = self._generate_secure_token()
            
            # Save user (implementation depends on your database)
            user_id = self._save_user(validated_data)
            
            return {
                'id': user_id,
                'name': validated_data['name'],
                'email': validated_data['email'],
                'role': validated_data['role'],
                'token': validated_data['user_token']
            }
            
        except ValidationError as e:
            raise APIError(f"Validation failed: {str(e)}", status_code=400)
        except Exception as e:
            # Log error but don't expose details
            logging.error(f"User creation failed: {str(e)}")
            raise APIError("Internal server error", status_code=500)
    
    def _check_rate_limit(self, client_ip: str) -> bool:
        """Simple rate limiting implementation."""
        current_time = time.time()
        window_start = current_time - 3600  # 1 hour window
        
        if client_ip not in self.rate_limit_store:
            self.rate_limit_store[client_ip] = []
        
        # Clean old requests
        self.rate_limit_store[client_ip] = [
            req_time for req_time in self.rate_limit_store[client_ip]
            if req_time > window_start
        ]
        
        # Check limit (100 requests per hour)
        if len(self.rate_limit_store[client_ip]) >= 100:
            return False
        
        # Record current request
        self.rate_limit_store[client_ip].append(current_time)
        return True
    
    def _hash_password(self, password: str) -> str:
        """Securely hash password with salt."""
        salt = secrets.token_hex(16)
        password_hash = hashlib.pbkdf2_hmac(
            'sha256',
            password.encode('utf-8'),
            salt.encode('utf-8'),
            100000  # iterations
        )
        return f"{salt}:{password_hash.hex()}"
    
    def _generate_secure_token(self) -> str:
        """Generate cryptographically secure token."""
        return secrets.token_urlsafe(32)

Dependency Management

Modern Dependency Management with Poetry

# pyproject.toml
[tool.poetry]
name = "my-project"
version = "1.0.0"
description = "A modern Python project"
authors = ["Your Name <you@example.com>"]
readme = "README.md"
packages = [{include = "my_project", from = "src"}]

[tool.poetry.dependencies]
python = "^3.11"
fastapi = "^0.104.1"
pydantic = "^2.5.0"
sqlalchemy = "^2.0.23"
alembic = "^1.13.1"
redis = "^5.0.1"
celery = "^5.3.4"
requests = "^2.31.0"
python-multipart = "^0.0.6"
uvicorn = {extras = ["standard"], version = "^0.24.0"}

[tool.poetry.group.dev.dependencies]
pytest = "^7.4.3"
pytest-asyncio = "^0.21.1"
pytest-cov = "^4.1.0"
black = "^23.11.0"
isort = "^5.12.0"
flake8 = "^6.1.0"
mypy = "^1.7.1"
pre-commit = "^3.6.0"
hypothesis = "^6.92.1"

[tool.poetry.group.docs.dependencies]
sphinx = "^7.2.6"
sphinx-rtd-theme = "^1.3.0"
sphinx-autodoc-typehints = "^1.25.2"

[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

[tool.black]
line-length = 88
target-version = ['py311']
include = '\.pyi?$'
extend-exclude = '''
/(
  # directories
  \.eggs
  | \.git
  | \.hg
  | \.mypy_cache
  | \.tox
  | \.venv
  | build
  | dist
)/
'''

[tool.isort]
profile = "black"
multi_line_output = 3
line_length = 88
known_first_party = ["my_project"]

[tool.mypy]
python_version = "3.11"
warn_return_any = true
warn_unused_configs = true
disallow_untyped_defs = true
disallow_incomplete_defs = true
check_untyped_defs = true
disallow_untyped_decorators = true
no_implicit_optional = true
warn_redundant_casts = true
warn_unused_ignores = true
warn_no_return = true
warn_unreachable = true
strict_equality = true

[tool.coverage.run]
source = ["src"]
omit = ["*/tests/*", "*/test_*.py"]

[tool.coverage.report]
exclude_lines = [
    "pragma: no cover",
    "def __repr__",
    "if self.debug:",
    "if settings.DEBUG",
    "raise AssertionError",
    "raise NotImplementedError",
    "if 0:",
    "if __name__ == .__main__.:"
]

Documentation Standards

Comprehensive Documentation with Sphinx

# docs/conf.py
import os
import sys
sys.path.insert(0, os.path.abspath('../src'))

project = 'My Project'
copyright = '2024, Your Name'
author = 'Your Name'
release = '1.0.0'

extensions = [
    'sphinx.ext.autodoc',
    'sphinx.ext.autosummary',
    'sphinx.ext.viewcode',
    'sphinx.ext.napoleon',
    'sphinx_autodoc_typehints',
    'sphinx.ext.intersphinx'
]

templates_path = ['_templates']
exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store']

html_theme = 'sphinx_rtd_theme'
html_static_path = ['_static']

# Napoleon settings
napoleon_google_docstring = True
napoleon_numpy_docstring = True
napoleon_include_init_with_doc = False
napoleon_include_private_with_doc = False

# Autodoc settings
autodoc_default_options = {
    'members': True,
    'member-order': 'bysource',
    'special-members': '__init__',
    'undoc-members': True,
    'exclude-members': '__weakref__'
}

# Intersphinx mapping
intersphinx_mapping = {
    'python': ('https://docs.python.org/3', None),
    'requests': ('https://requests.readthedocs.io/en/latest/', None),
}