diff --git a/.github/PULL_REQUEST_TEMPLATE b/.github/PULL_REQUEST_TEMPLATE index 96e3af1..f6064c4 100755 --- a/.github/PULL_REQUEST_TEMPLATE +++ b/.github/PULL_REQUEST_TEMPLATE @@ -8,6 +8,7 @@ - [ ] I have thought about how this code may affect other services. - [ ] This PR fixes an issue. - [ ] This PR adds something new (e.g. new method or parameters). +- [ ] This PR have unit tests (e.g. tests added/removed/changed) - [ ] This PR is a breaking change (e.g. methods or parameters removed/renamed) - [ ] This PR is **not** a code change (e.g. documentation, README, ...) diff --git a/README.md b/README.md index e5c5c0c..190dfa3 100755 --- a/README.md +++ b/README.md @@ -28,7 +28,6 @@ High-performance Python logging library with file rotation and optimized caching - [Memory Management](#memory-management) - [Flexible Configuration Options](#flexible-configuration-options) - [Migration Guide](#migration-guide) -- [Performance Improvements](#performance-improvements) - [Development](#source-code) - [Run Tests and Get Coverage Report using Poe](#run-tests-and-get-coverage-report-using-poe) - [License](#license) @@ -446,40 +445,6 @@ registered = LoggerFactory.get_registered_loggers() print(f"Currently registered: {list(registered.keys())}") ``` -## Thread-Safe Operations -All memory management operations are thread-safe and can be used safely in multi-threaded applications: - -```python -import threading -from pythonLogs import size_rotating_logger, clear_logger_registry - -def worker_function(worker_id): - # Each thread can safely create and use loggers - logger = size_rotating_logger( - name=f"worker_{worker_id}", - directory="/app/logs" - ) - - with logger as log: - log.info(f"Worker {worker_id} started") - # Automatic cleanup per thread - -# Create multiple threads - all operations are thread-safe -threads = [] -for i in range(10): - thread = threading.Thread(target=worker_function, args=(i,)) - threads.append(thread) - thread.start() - -# Wait for completion and clean up -for thread in threads: - thread.join() - -# Safe to clear registry from main thread -clear_logger_registry() -``` - - # Flexible Configuration Options You can use either enums (for type safety) or strings (for simplicity): @@ -552,25 +517,6 @@ timed_logger = timed_rotating_logger(level=LogLevel.WARNING, name="app", directo - 🔧 **Cleaner API** without manual `.init()` calls - 📚 **Centralized configuration** through factory pattern -# Performance Improvements - -## Benchmarks -The factory pattern with optimizations provides significant performance improvements: - -| Feature | Improvement | Benefit | -|---------|-------------|---------| -| Logger Registry | 90%+ faster | Cached logger instances | -| Settings Caching | ~85% faster | Reused configuration objects | -| Directory Validation | ~75% faster | Cached permission checks | -| Timezone Operations | ~60% faster | Cached timezone functions | - -## Performance Test Results -```python -# Create 100 loggers - Performance comparison -# Legacy method: ~0.045 seconds -# Factory pattern: ~0.004 seconds -# Improvement: 91% faster ⚡ -``` # Source Code ### Build @@ -579,7 +525,6 @@ poetry build -f wheel ``` - # Run Tests and Get Coverage Report using Poe ```shell poetry update --with test @@ -587,13 +532,10 @@ poe test ``` - # License Released under the [MIT License](LICENSE) - - # Buy me a cup of coffee + [GitHub Sponsor](https://github.com/sponsors/ddc) + [ko-fi](https://ko-fi.com/ddcsta) diff --git a/pyproject.toml b/pyproject.toml index 10ce02d..878b47b 100755 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "pythonLogs" -version = "4.0.3" +version = "4.0.4" description = "High-performance Python logging library with file rotation and optimized caching for better performance" license = "MIT" readme = "README.md" diff --git a/pythonLogs/basic_log.py b/pythonLogs/basic_log.py index a871608..99904dd 100644 --- a/pythonLogs/basic_log.py +++ b/pythonLogs/basic_log.py @@ -1,12 +1,13 @@ # -*- encoding: utf-8 -*- import logging -import threading from typing import Optional from pythonLogs.log_utils import get_format, get_level, get_timezone_function from pythonLogs.memory_utils import cleanup_logger_handlers, register_logger_weakref from pythonLogs.settings import get_log_settings +from pythonLogs.thread_safety import auto_thread_safe +@auto_thread_safe(['init', '_cleanup_logger']) class BasicLog: """Basic logger with context manager support for automatic resource cleanup.""" @@ -27,8 +28,6 @@ def __init__( self.timezone = timezone or _settings.timezone self.showlocation = showlocation or _settings.show_location self.logger = None - # Instance-level lock for thread safety - self._lock = threading.Lock() def init(self): logger = logging.getLogger(self.appname) @@ -54,8 +53,7 @@ def __exit__(self, exc_type, exc_val, exc_tb): def _cleanup_logger(self, logger: logging.Logger) -> None: """Clean up logger resources by closing all handlers with thread safety.""" - with self._lock: - cleanup_logger_handlers(logger) + cleanup_logger_handlers(logger) @staticmethod def cleanup_logger(logger: logging.Logger) -> None: diff --git a/pythonLogs/size_rotating.py b/pythonLogs/size_rotating.py index d3f7112..26823ae 100755 --- a/pythonLogs/size_rotating.py +++ b/pythonLogs/size_rotating.py @@ -2,7 +2,6 @@ import logging.handlers import os import re -import threading from typing import Optional from pythonLogs.constants import MB_TO_BYTES from pythonLogs.log_utils import ( @@ -18,8 +17,10 @@ ) from pythonLogs.memory_utils import cleanup_logger_handlers, register_logger_weakref from pythonLogs.settings import get_log_settings +from pythonLogs.thread_safety import auto_thread_safe +@auto_thread_safe(['init', '_cleanup_logger']) class SizeRotatingLog: """Size-based rotating logger with context manager support for automatic resource cleanup.""" def __init__( @@ -49,8 +50,6 @@ def __init__( self.streamhandler = streamhandler or _settings.stream_handler self.showlocation = showlocation or _settings.show_location self.logger = None - # Instance-level lock for thread safety - self._lock = threading.Lock() def init(self): check_filename_instance(self.filenames) @@ -98,8 +97,7 @@ def __exit__(self, exc_type, exc_val, exc_tb): def _cleanup_logger(self, logger: logging.Logger) -> None: """Clean up logger resources by closing all handlers with thread safety.""" - with self._lock: - cleanup_logger_handlers(logger) + cleanup_logger_handlers(logger) @staticmethod def cleanup_logger(logger: logging.Logger) -> None: diff --git a/pythonLogs/thread_safety.py b/pythonLogs/thread_safety.py new file mode 100644 index 0000000..59b75a0 --- /dev/null +++ b/pythonLogs/thread_safety.py @@ -0,0 +1,135 @@ +# -*- encoding: utf-8 -*- +import functools +import threading +from typing import Any, Callable, Dict, TypeVar, Type + +F = TypeVar('F', bound=Callable[..., Any]) + + +class ThreadSafeMeta(type): + """Metaclass that automatically adds thread safety to class methods.""" + + def __new__(mcs, name: str, bases: tuple, namespace: Dict[str, Any], **kwargs): + # Create the class first + cls = super().__new__(mcs, name, bases, namespace) + + # Add a class-level lock if not already present + if not hasattr(cls, '_lock'): + cls._lock = threading.RLock() + + # Get methods that should be thread-safe (exclude private/dunder methods) + thread_safe_methods = getattr(cls, '_thread_safe_methods', None) + if thread_safe_methods is None: + # Auto-detect public methods that modify state + thread_safe_methods = [ + method_name for method_name in namespace + if (callable(getattr(cls, method_name, None)) and + not method_name.startswith('_') and + method_name not in ['__enter__', '__exit__', '__init__']) + ] + + # Wrap each method with automatic locking + for method_name in thread_safe_methods: + if hasattr(cls, method_name): + original_method = getattr(cls, method_name) + if callable(original_method): + wrapped_method = thread_safe(original_method) + setattr(cls, method_name, wrapped_method) + + return cls + + +def thread_safe(func: F) -> F: + """Decorator that automatically adds thread safety to methods.""" + + @functools.wraps(func) + def wrapper(self, *args, **kwargs): + # Use instance lock if available, otherwise class lock + lock = getattr(self, '_lock', None) + if lock is None: + # Check if class has lock, if not create one + if not hasattr(self.__class__, '_lock'): + self.__class__._lock = threading.RLock() + lock = self.__class__._lock + + with lock: + return func(self, *args, **kwargs) + + return wrapper + + +def auto_thread_safe(thread_safe_methods: list = None): + """Class decorator that adds automatic thread safety to specified methods.""" + + def decorator(cls: Type) -> Type: + # Add lock to class if not present + if not hasattr(cls, '_lock'): + cls._lock = threading.RLock() + + # Store thread-safe methods list + if thread_safe_methods: + cls._thread_safe_methods = thread_safe_methods + + # Get methods to make thread-safe + methods_to_wrap = thread_safe_methods or [ + method_name for method_name in dir(cls) + if (callable(getattr(cls, method_name, None)) and + not method_name.startswith('_') and + method_name not in ['__enter__', '__exit__', '__init__']) + ] + + # Wrap each method + for method_name in methods_to_wrap: + if hasattr(cls, method_name): + original_method = getattr(cls, method_name) + if callable(original_method) and not hasattr(original_method, '_thread_safe_wrapped'): + wrapped_method = thread_safe(original_method) + wrapped_method._thread_safe_wrapped = True + setattr(cls, method_name, wrapped_method) + + return cls + + return decorator + + +class AutoThreadSafe: + """Base class that provides automatic thread safety for all public methods.""" + + def __init__(self): + if not hasattr(self, '_lock'): + self._lock = threading.RLock() + + def __init_subclass__(cls, **kwargs): + super().__init_subclass__(**kwargs) + + # Add class-level lock + if not hasattr(cls, '_lock'): + cls._lock = threading.RLock() + + # Auto-wrap public methods + for attr_name in dir(cls): + if not attr_name.startswith('_'): + attr = getattr(cls, attr_name) + if callable(attr) and not hasattr(attr, '_thread_safe_wrapped'): + wrapped_attr = thread_safe(attr) + wrapped_attr._thread_safe_wrapped = True + setattr(cls, attr_name, wrapped_attr) + + +def synchronized_method(func: F) -> F: + """Decorator for individual methods that need thread safety.""" + return thread_safe(func) + + +class ThreadSafeContext: + """Context manager for thread-safe operations.""" + + def __init__(self, lock: threading.Lock): + self.lock = lock + + def __enter__(self): + self.lock.acquire() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.lock.release() \ No newline at end of file diff --git a/pythonLogs/timed_rotating.py b/pythonLogs/timed_rotating.py index 51a8947..e687015 100755 --- a/pythonLogs/timed_rotating.py +++ b/pythonLogs/timed_rotating.py @@ -1,7 +1,6 @@ # -*- encoding: utf-8 -*- import logging.handlers import os -import threading from typing import Optional from pythonLogs.log_utils import ( check_directory_permissions, @@ -15,8 +14,10 @@ ) from pythonLogs.memory_utils import cleanup_logger_handlers, register_logger_weakref from pythonLogs.settings import get_log_settings +from pythonLogs.thread_safety import auto_thread_safe +@auto_thread_safe(['init', '_cleanup_logger']) class TimedRotatingLog: """ Time-based rotating logger with context manager support for automatic resource cleanup. @@ -60,8 +61,6 @@ def __init__( self.showlocation = showlocation or _settings.show_location self.rotateatutc = rotateatutc or _settings.rotate_at_utc self.logger = None - # Instance-level lock for thread safety - self._lock = threading.Lock() def init(self): check_filename_instance(self.filenames) @@ -107,8 +106,7 @@ def __exit__(self, exc_type, exc_val, exc_tb): def _cleanup_logger(self, logger: logging.Logger) -> None: """Clean up logger resources by closing all handlers with thread safety.""" - with self._lock: - cleanup_logger_handlers(logger) + cleanup_logger_handlers(logger) @staticmethod def cleanup_logger(logger: logging.Logger) -> None: diff --git a/tests/README.md b/tests/README.md index 6018e1d..babede3 100644 --- a/tests/README.md +++ b/tests/README.md @@ -1,21 +1,34 @@ # Test Suite Documentation -This directory contains comprehensive tests for the pythonLogs library, covering all features including Factory Pattern, Context Managers, Memory Management, and Performance Optimizations. +This directory contains comprehensive tests for the pythonLogs library, organized into logical categories for better maintainability and navigation. + +## Test Directory Structure + +``` +tests/ +├── core/ # Core functionality tests +├── context_management/ # Context managers & resource management +├── logger_types/ # Specific logger type tests +├── factory/ # Factory pattern tests +├── performance/ # Performance & memory optimization tests +├── thread_safety/ # Thread safety & concurrency tests +└── timezone/ # Timezone functionality tests +``` ## Test Files Overview -### Core Functionality Tests +### Core Functionality Tests (`tests/core/`) - **`test_basic_log.py`** - Comprehensive BasicLog functionality testing - Tests BasicLog initialization, context managers, thread safety - Validates cleanup methods and multiple instance handling - **10 test cases** covering all BasicLog features -- **`test_some_log_utils.py`** - Tests for utility functions +- **`test_log_utils.py`** - Tests for utility functions - Tests helper functions in `log_utils.py` - Includes file operations, timezone handling, and validation - Multiple test cases for various utilities -### Context Manager & Resource Management Tests +### Context Manager & Resource Management Tests (`tests/context_management/`) - **`test_context_managers.py`** - Context manager functionality for all logger types - Tests automatic resource cleanup for BasicLog, SizeRotatingLog, TimedRotatingLog - Validates exception safety and proper handler cleanup @@ -28,7 +41,7 @@ This directory contains comprehensive tests for the pythonLogs library, covering - Tests concurrent access safety and performance - **9 test cases** for robust resource management -### Logger Type Tests +### Logger Type Tests (`tests/logger_types/`) - **`test_size_rotating.py`** - Size-based rotating logger tests - Tests file rotation, compression, and cleanup - Context manager functionality and resource management @@ -41,7 +54,7 @@ This directory contains comprehensive tests for the pythonLogs library, covering - Timezone handling and rotation scheduling - Comprehensive time rotation scenarios -### Factory Pattern Tests +### Factory Pattern Tests (`tests/factory/`) - **`test_factory.py`** - Core factory pattern functionality - Tests `LoggerFactory` class and all factory methods - Validates logger creation, registry caching, and performance @@ -66,7 +79,7 @@ This directory contains comprehensive tests for the pythonLogs library, covering - Tests all logger types with string levels - Comprehensive string level compatibility -### Performance & Memory Tests +### Performance & Memory Tests (`tests/performance/`) - **`test_performance.py`** - Performance and optimization tests - Validates caching improvements and performance gains - Tests settings caching, registry performance, and memory usage @@ -85,14 +98,38 @@ This directory contains comprehensive tests for the pythonLogs library, covering - Validates memory efficiency of timezone caching - Timezone performance optimization validation -### Thread Safety & Concurrency Tests +### Thread Safety & Concurrency Tests (`tests/thread_safety/`) - **`test_thread_safety.py`** - Concurrency and thread safety - Tests concurrent logger creation and registry access - Validates thread-safe operations across all components - Tests concurrent context manager cleanup - Stress testing for multithreaded environments -### Timezone & Migration Tests +- **`test_automatic_thread_safety.py`** - Automatic thread safety implementation + - Tests automatic thread-safety decorators applied to logger classes + - Validates @auto_thread_safe decorator functionality + - Tests BasicLog, SizeRotatingLog, and TimedRotatingLog with automatic locking + - **4 test cases** covering automatic thread safety features + +- **`test_thread_safety_module.py`** - Comprehensive thread safety module tests + - Test all thread safety decorators (@thread_safe, @auto_thread_safe) + - Tests ThreadSafeMeta metaclass and AutoThreadSafe base class + - Tests ThreadSafeContext context manager and edge cases + - **19 test cases** covering all thread safety mechanisms + +- **`test_thread_safety_patterns.py`** - Advanced thread safety patterns + - Tests real-world concurrent patterns (producer-consumer, singleton, etc.) + - Tests resource pool, event bus, and cache patterns with thread safety + - Tests weak reference cleanup in multithreaded environments + - **8 test cases** covering complex thread safety scenarios + +- **`test_automatic_features.py`** - Integration of all automatic features + - Test memory optimization, resource cleanup, and thread safety together + - Validates all three automatic features work seamlessly + - Tests stress scenarios with multiple logger types concurrently + - **6 test cases** ensuring all automatic features integrate properly + +### Timezone & Migration Tests (`tests/timezone/`) - **`test_timezone_migration.py`** - Timezone functionality with zoneinfo - Tests migration from pytz to Python's built-in zoneinfo module - Validates UTC, localtime, and named timezone support @@ -116,21 +153,28 @@ poetry run poe test ### Run Specific Test Categories ```bash +# Core functionality tests +poetry run pytest tests/core/ -v + # Context managers and resource management -poetry run pytest tests/test_context_managers.py tests/test_resource_management.py -v +poetry run pytest tests/context_management/ -v -# Core logger functionality -poetry run pytest tests/test_basic_log.py tests/test_size_rotating.py tests/test_timed_rotating.py -v +# Logger type tests (size rotating, timed rotating) +poetry run pytest tests/logger_types/ -v # Factory pattern tests -poetry run pytest tests/test_factory*.py tests/test_enums.py -v +poetry run pytest tests/factory/ -v + +# Performance and memory optimization tests +poetry run pytest tests/performance/ -v -# Performance and memory tests -poetry run pytest tests/test_performance*.py tests/test_memory*.py -v +# Thread safety and concurrency tests +poetry run pytest tests/thread_safety/ -v -# Thread safety and concurrency -poetry run pytest tests/test_thread_safety.py -v +# Timezone functionality tests +poetry run pytest tests/timezone/ -v -# Timezone functionality -poetry run pytest tests/test_timezone*.py tests/test_zoneinfo*.py -v +# Run specific directories together +poetry run pytest tests/core/ tests/logger_types/ -v # Core + Logger types +poetry run pytest tests/performance/ tests/thread_safety/ -v # Performance + Concurrency ``` diff --git a/tests/test_context_managers.py b/tests/context_management/test_context_managers.py similarity index 100% rename from tests/test_context_managers.py rename to tests/context_management/test_context_managers.py diff --git a/tests/test_resource_management.py b/tests/context_management/test_resource_management.py similarity index 100% rename from tests/test_resource_management.py rename to tests/context_management/test_resource_management.py diff --git a/tests/test_basic_log.py b/tests/core/test_basic_log.py similarity index 100% rename from tests/test_basic_log.py rename to tests/core/test_basic_log.py diff --git a/tests/test_log_utils.py b/tests/core/test_log_utils.py similarity index 100% rename from tests/test_log_utils.py rename to tests/core/test_log_utils.py diff --git a/tests/test_enums.py b/tests/factory/test_enums.py similarity index 100% rename from tests/test_enums.py rename to tests/factory/test_enums.py diff --git a/tests/test_factory.py b/tests/factory/test_factory.py similarity index 100% rename from tests/test_factory.py rename to tests/factory/test_factory.py diff --git a/tests/test_factory_examples.py b/tests/factory/test_factory_examples.py similarity index 100% rename from tests/test_factory_examples.py rename to tests/factory/test_factory_examples.py diff --git a/tests/test_string_levels.py b/tests/factory/test_string_levels.py similarity index 100% rename from tests/test_string_levels.py rename to tests/factory/test_string_levels.py diff --git a/tests/test_size_rotating.py b/tests/logger_types/test_size_rotating.py similarity index 100% rename from tests/test_size_rotating.py rename to tests/logger_types/test_size_rotating.py diff --git a/tests/test_timed_rotating.py b/tests/logger_types/test_timed_rotating.py similarity index 100% rename from tests/test_timed_rotating.py rename to tests/logger_types/test_timed_rotating.py diff --git a/tests/test_memory_optimization.py b/tests/performance/test_memory_optimization.py similarity index 100% rename from tests/test_memory_optimization.py rename to tests/performance/test_memory_optimization.py diff --git a/tests/test_performance.py b/tests/performance/test_performance.py similarity index 100% rename from tests/test_performance.py rename to tests/performance/test_performance.py diff --git a/tests/test_performance_zoneinfo.py b/tests/performance/test_performance_zoneinfo.py similarity index 100% rename from tests/test_performance_zoneinfo.py rename to tests/performance/test_performance_zoneinfo.py diff --git a/tests/thread_safety/test_automatic_features.py b/tests/thread_safety/test_automatic_features.py new file mode 100644 index 0000000..eb73679 --- /dev/null +++ b/tests/thread_safety/test_automatic_features.py @@ -0,0 +1,188 @@ +# -*- coding: utf-8 -*- +"""Test that all automatic features work together in logger classes.""" +import gc +import tempfile +import threading +import time +from pythonLogs.basic_log import BasicLog +from pythonLogs.constants import RotateWhen +from pythonLogs.memory_utils import get_active_logger_count +from pythonLogs.size_rotating import SizeRotatingLog +from pythonLogs.timed_rotating import TimedRotatingLog + + +class TestAutomaticFeatures: + """Test that all three automatic features work together.""" + + def test_basic_log_all_automatic_features(self): + """Test BasicLog with all automatic features working together.""" + initial_logger_count = get_active_logger_count() + + # Test memory optimization, resource cleanup, and thread safety together + def test_logger_operations(): + with BasicLog(name="test_all_features_basic") as logger: + # Memory optimization: logger is registered automatically + assert logger is not None + logger.info("Testing all automatic features") + # Resource cleanup and thread safety: handled automatically by context manager + + # Run in multiple threads to test thread safety + threads = [] + for i in range(5): + thread = threading.Thread(target=test_logger_operations) + threads.append(thread) + thread.start() + + for thread in threads: + thread.join() + + # Force garbage collection to test memory optimization + gc.collect() + time.sleep(0.1) # Allow cleanup to complete + + # Verify memory optimization: logger count should be managed + final_logger_count = get_active_logger_count() + assert final_logger_count >= initial_logger_count # May have some loggers still active + + def test_size_rotating_log_all_automatic_features(self): + """Test SizeRotatingLog with all automatic features working together.""" + with tempfile.TemporaryDirectory() as temp_dir: + initial_logger_count = get_active_logger_count() + + def test_logger_operations(): + with SizeRotatingLog( + name="test_all_features_size", + directory=temp_dir, + filenames=("test.log",), + maxmbytes=1 + ) as logger: + # Memory optimization: logger is registered automatically + assert logger is not None + logger.info("Testing size rotating with all features") + # Resource cleanup and thread safety: handled automatically + + # Run in multiple threads to test thread safety + threads = [] + for i in range(3): + thread = threading.Thread(target=test_logger_operations) + threads.append(thread) + thread.start() + + for thread in threads: + thread.join() + + gc.collect() + time.sleep(0.1) + + def test_timed_rotating_log_all_automatic_features(self): + """Test TimedRotatingLog with all automatic features working together.""" + with tempfile.TemporaryDirectory() as temp_dir: + initial_logger_count = get_active_logger_count() + + def test_logger_operations(): + with TimedRotatingLog( + name="test_all_features_timed", + directory=temp_dir, + filenames=("test.log",), + when=RotateWhen.DAILY, + daystokeep=1 + ) as logger: + # Memory optimization: logger is registered automatically + assert logger is not None + logger.info("Testing timed rotating with all features") + # Resource cleanup and thread safety: handled automatically + + # Run in multiple threads to test thread safety + threads = [] + for i in range(3): + thread = threading.Thread(target=test_logger_operations) + threads.append(thread) + thread.start() + + for thread in threads: + thread.join() + + gc.collect() + time.sleep(0.1) + + def test_manual_cleanup_still_works(self): + """Test that manual cleanup methods still work alongside automatic features.""" + basic_log = BasicLog(name="test_manual_cleanup") + logger = basic_log.init() + + # Manual cleanup should still work + basic_log._cleanup_logger(logger) + BasicLog.cleanup_logger(logger) # Static method should work too + + # No errors should occur + + def test_automatic_features_verification(self): + """Verify all automatic features are properly configured.""" + # Test BasicLog + basic_log = BasicLog(name="test_verification") + + # 1. Memory Optimization: register_logger_weakref is called in init() + logger = basic_log.init() + assert logger is not None + + # 2. Automatic Resource Cleanup: Context manager support + assert hasattr(basic_log, '__enter__') + assert hasattr(basic_log, '__exit__') + assert hasattr(basic_log, '_cleanup_logger') + + # 3. Automatic Thread Safety: Decorator applied + assert hasattr(basic_log.__class__, '_lock') + assert hasattr(basic_log.init, '_thread_safe_wrapped') + assert hasattr(basic_log._cleanup_logger, '_thread_safe_wrapped') + + basic_log._cleanup_logger(logger) + + def test_stress_test_all_features(self): + """Stress test all automatic features working together.""" + with tempfile.TemporaryDirectory() as temp_dir: + results = [] + errors = [] + results_lock = threading.Lock() + + def stress_worker(worker_id): + try: + # Mix different logger types + if worker_id % 3 == 0: + with BasicLog(name=f"stress_basic_{worker_id}") as logger: + logger.info(f"Stress test basic {worker_id}") + elif worker_id % 3 == 1: + with SizeRotatingLog( + name=f"stress_size_{worker_id}", + directory=temp_dir, + filenames=(f"stress_{worker_id}.log",), + maxmbytes=1 + ) as logger: + logger.info(f"Stress test size {worker_id}") + else: + with TimedRotatingLog( + name=f"stress_timed_{worker_id}", + directory=temp_dir, + filenames=(f"stress_{worker_id}.log",), + when=RotateWhen.DAILY + ) as logger: + logger.info(f"Stress test timed {worker_id}") + + with results_lock: + results.append(f"Worker {worker_id} completed") + + except Exception as e: + with results_lock: + errors.append(f"Worker {worker_id}: {e}") + + # Create many concurrent workers + threads = [] + for i in range(20): + thread = threading.Thread(target=stress_worker, args=(i,)) + threads.append(thread) + thread.start() + + for thread in threads: + thread.join() + + assert len(errors) == 0, f"Stress test errors: {errors}" + assert len(results) == 20, f"Expected 20 results, got {len(results)}" diff --git a/tests/thread_safety/test_automatic_thread_safety.py b/tests/thread_safety/test_automatic_thread_safety.py new file mode 100644 index 0000000..b9afe2d --- /dev/null +++ b/tests/thread_safety/test_automatic_thread_safety.py @@ -0,0 +1,124 @@ +# -*- coding: utf-8 -*- +"""Test automatic thread safety implementation.""" +import threading +from pythonLogs.basic_log import BasicLog +from pythonLogs.constants import RotateWhen +from pythonLogs.size_rotating import SizeRotatingLog +from pythonLogs.timed_rotating import TimedRotatingLog + + +class TestAutomaticThreadSafety: + """Test cases for automatic thread safety of logger classes.""" + + def test_basic_log_automatic_thread_safety(self): + """Test BasicLog with automatic thread safety decorators.""" + basic_log = BasicLog(name="test_auto_thread_safety") + results = [] + errors = [] + + def worker(worker_id): + try: + # These operations should be automatically thread-safe + logger = basic_log.init() + logger.info(f"Worker {worker_id} logging message") + basic_log._cleanup_logger(logger) + results.append(f"Worker {worker_id} completed") + except Exception as e: + errors.append(f"Worker {worker_id}: {e}") + + threads = [] + for i in range(10): + thread = threading.Thread(target=worker, args=(i,)) + threads.append(thread) + thread.start() + + for thread in threads: + thread.join() + + assert len(errors) == 0, f"Automatic thread safety errors: {errors}" + assert len(results) == 10, f"Expected 10 results, got {len(results)}" + + def test_size_rotating_log_automatic_thread_safety(self): + """Test SizeRotatingLog with automatic thread safety decorators.""" + import tempfile + + with tempfile.TemporaryDirectory() as temp_dir: + log = SizeRotatingLog( + name="test_auto_size_rotating", + directory=temp_dir, + filenames=("test.log",), + maxmbytes=1 + ) + results = [] + errors = [] + + def worker(worker_id): + try: + logger = log.init() + logger.info(f"Size rotating worker {worker_id}") + log._cleanup_logger(logger) + results.append(f"Worker {worker_id} completed") + except Exception as e: + errors.append(f"Worker {worker_id}: {e}") + + threads = [] + for i in range(5): + thread = threading.Thread(target=worker, args=(i,)) + threads.append(thread) + thread.start() + + for thread in threads: + thread.join() + + assert len(errors) == 0, f"Size rotating automatic thread safety errors: {errors}" + assert len(results) == 5, f"Expected 5 results, got {len(results)}" + + def test_timed_rotating_log_automatic_thread_safety(self): + """Test TimedRotatingLog with automatic thread safety decorators.""" + import tempfile + + with tempfile.TemporaryDirectory() as temp_dir: + log = TimedRotatingLog( + name="test_auto_timed_rotating", + directory=temp_dir, + filenames=("test.log",), + when=RotateWhen.DAILY, + daystokeep=1 + ) + results = [] + errors = [] + + def worker(worker_id): + try: + logger = log.init() + logger.info(f"Timed rotating worker {worker_id}") + log._cleanup_logger(logger) + results.append(f"Worker {worker_id} completed") + except Exception as e: + errors.append(f"Worker {worker_id}: {e}") + + threads = [] + for i in range(5): + thread = threading.Thread(target=worker, args=(i,)) + threads.append(thread) + thread.start() + + for thread in threads: + thread.join() + + assert len(errors) == 0, f"Timed rotating automatic thread safety errors: {errors}" + assert len(results) == 5, f"Expected 5 results, got {len(results)}" + + def test_automatic_locking_verification(self): + """Verify that automatic locking is actually working by checking decorator presence.""" + basic_log = BasicLog(name="test_lock_verification") + + # Verify the class has the automatic thread safety decorator applied + assert hasattr(basic_log.__class__, '_lock'), "Class should have automatic lock" + assert hasattr(basic_log.init, '_thread_safe_wrapped'), "Method should be wrapped for thread safety" + assert hasattr(basic_log._cleanup_logger, '_thread_safe_wrapped'), "Method should be wrapped for thread safety" + + # Test that methods can still be called normally + logger = basic_log.init() + assert logger is not None, "Logger should be initialized" + basic_log._cleanup_logger(logger) diff --git a/tests/test_thread_safety.py b/tests/thread_safety/test_thread_safety.py similarity index 100% rename from tests/test_thread_safety.py rename to tests/thread_safety/test_thread_safety.py diff --git a/tests/thread_safety/test_thread_safety_module.py b/tests/thread_safety/test_thread_safety_module.py new file mode 100644 index 0000000..9ad2eef --- /dev/null +++ b/tests/thread_safety/test_thread_safety_module.py @@ -0,0 +1,490 @@ +"""Comprehensive tests for thread_safety.py module.""" +import threading +import time +from typing import List +import pytest +from pythonLogs.thread_safety import ( + ThreadSafeMeta, + thread_safe, + auto_thread_safe, + AutoThreadSafe, + synchronized_method, + ThreadSafeContext +) + + +class TestThreadSafeDecorator: + """Test the @thread_safe decorator.""" + + def test_thread_safe_decorator_basic(self): + """Test basic functionality of @thread_safe decorator.""" + + class TestClass: + def __init__(self): + self._lock = threading.RLock() + self.counter = 0 + + @thread_safe + def increment(self): + current = self.counter + time.sleep(0.001) # Simulate some work + self.counter = current + 1 + + obj = TestClass() + threads = [] + + def worker(): + for _ in range(10): + obj.increment() + + # Create multiple threads + for _ in range(5): + thread = threading.Thread(target=worker) + threads.append(thread) + thread.start() + + for thread in threads: + thread.join() + + # Should be exactly 50 (5 threads * 10 increments each) + assert obj.counter == 50, f"Expected 50, got {obj.counter}" + + def test_thread_safe_decorator_without_lock(self): + """Test @thread_safe decorator uses class lock when no instance lock exists.""" + + class TestClass: + def __init__(self): + self.counter = 0 + + @thread_safe + def increment(self): + self.counter += 1 + + obj = TestClass() + # Should fall back to creating a class-level lock in the decorator + obj.increment() + assert obj.counter == 1 + + # The lock should be accessible via the method's fallback mechanism + lock = getattr(obj, '_lock', None) or getattr(obj.__class__, '_lock', None) + assert lock is not None + + def test_thread_safe_decorator_preserves_metadata(self): + """Test that @thread_safe preserves function metadata.""" + + class TestClass: + def __init__(self): + self._lock = threading.RLock() + + @thread_safe + def test_method(self, arg1, arg2=None): + """Test method docstring.""" + return f"{arg1}-{arg2}" + + obj = TestClass() + method = obj.test_method + + # Check that wrapper preserves original function name and docstring + assert method.__name__ == 'test_method' + assert 'Test method docstring' in method.__doc__ + assert method(1, arg2=2) == "1-2" + + +class TestAutoThreadSafeDecorator: + """Test the @auto_thread_safe decorator.""" + + def test_auto_thread_safe_specific_methods(self): + """Test @auto_thread_safe with specific method list.""" + + @auto_thread_safe(['increment']) + class TestClass: + def __init__(self): + self.counter = 0 + + def increment(self): + current = self.counter + time.sleep(0.001) + self.counter = current + 1 + + def unsafe_increment(self): + self.counter += 1 + + obj = TestClass() + + # Check that specified method is wrapped + assert hasattr(obj.increment, '_thread_safe_wrapped') + # Check that non-specified method is not wrapped + assert not hasattr(obj.unsafe_increment, '_thread_safe_wrapped') + + # Test thread safety of wrapped method + threads = [] + for _ in range(10): + thread = threading.Thread(target=obj.increment) + threads.append(thread) + thread.start() + + for thread in threads: + thread.join() + + assert obj.counter == 10 + + def test_auto_thread_safe_all_public_methods(self): + """Test @auto_thread_safe without method list wraps all public methods.""" + + @auto_thread_safe() + class TestClass: + def __init__(self): + self.counter = 0 + + def increment(self): + self.counter += 1 + + def decrement(self): + self.counter -= 1 + + def _private_method(self): + pass + + obj = TestClass() + + # Public methods should be wrapped + assert hasattr(obj.increment, '_thread_safe_wrapped') + assert hasattr(obj.decrement, '_thread_safe_wrapped') + # Private method should not be wrapped + assert not hasattr(obj._private_method, '_thread_safe_wrapped') + + def test_auto_thread_safe_no_double_wrapping(self): + """Test that methods are not wrapped multiple times.""" + + @auto_thread_safe(['test_method']) + class TestClass: + def test_method(self): + return "test" + + obj = TestClass() + original_method = obj.test_method + + # Apply decorator again (should not double-wrap) + TestClass = auto_thread_safe(['test_method'])(TestClass) + obj2 = TestClass() + + # Should still work and not be double-wrapped + assert obj2.test_method() == "test" + + +class TestThreadSafeMeta: + """Test the ThreadSafeMeta metaclass.""" + + def test_thread_safe_meta_basic(self): + """Test basic ThreadSafeMeta functionality.""" + + class TestClass(metaclass=ThreadSafeMeta): + _thread_safe_methods = ['increment'] + + def __init__(self): + self.counter = 0 + + def increment(self): + current = self.counter + time.sleep(0.001) + self.counter = current + 1 + + def unsafe_method(self): + pass + + obj = TestClass() + + # Should have class-level lock + assert hasattr(obj.__class__, '_lock') + # Test that increment method works + obj.increment() + assert obj.counter == 1 + + def test_thread_safe_meta_auto_detection(self): + """Test ThreadSafeMeta auto-detects public methods.""" + + class TestClass(metaclass=ThreadSafeMeta): + def __init__(self): + self.value = 0 + + def public_method(self): + return "public" + + def _private_method(self): + return "private" + + obj = TestClass() + + # Should have class-level lock + assert hasattr(obj.__class__, '_lock') + # Test that methods work + assert obj.public_method() == "public" + assert obj._private_method() == "private" + + +class TestAutoThreadSafeBaseClass: + """Test the AutoThreadSafe base class.""" + + def test_auto_thread_safe_base_class(self): + """Test AutoThreadSafe base class functionality.""" + + class TestClass(AutoThreadSafe): + def __init__(self): + super().__init__() + self.counter = 0 + + def increment(self): + current = self.counter + time.sleep(0.001) + self.counter = current + 1 + + obj = TestClass() + + # Should have instance lock + assert hasattr(obj, '_lock') + # Public method should be wrapped + assert hasattr(obj.increment, '_thread_safe_wrapped') + + # Test thread safety + threads = [] + for _ in range(10): + thread = threading.Thread(target=obj.increment) + threads.append(thread) + thread.start() + + for thread in threads: + thread.join() + + assert obj.counter == 10 + + def test_auto_thread_safe_inheritance(self): + """Test AutoThreadSafe with inheritance.""" + + class BaseClass(AutoThreadSafe): + def __init__(self): + super().__init__() + self.value = 0 + + def base_method(self): + self.value += 1 + + class DerivedClass(BaseClass): + def __init__(self): + super().__init__() + self.derived_value = 0 + + def derived_method(self): + self.derived_value += 1 + + obj = DerivedClass() + + # Both base and derived methods should be thread-safe + assert hasattr(obj.base_method, '_thread_safe_wrapped') + assert hasattr(obj.derived_method, '_thread_safe_wrapped') + + +class TestSynchronizedMethodDecorator: + """Test the @synchronized_method decorator.""" + + def test_synchronized_method_decorator(self): + """Test @synchronized_method decorator.""" + + class TestClass: + def __init__(self): + self._lock = threading.RLock() + self.counter = 0 + + @synchronized_method + def increment(self): + current = self.counter + time.sleep(0.001) + self.counter = current + 1 + + obj = TestClass() + + # Test thread safety + threads = [] + for _ in range(10): + thread = threading.Thread(target=obj.increment) + threads.append(thread) + thread.start() + + for thread in threads: + thread.join() + + assert obj.counter == 10 + + +class TestThreadSafeContext: + """Test the ThreadSafeContext context manager.""" + + def test_thread_safe_context_manager(self): + """Test ThreadSafeContext context manager.""" + lock = threading.RLock() + counter = [0] # Use list to make it mutable + + def worker(): + with ThreadSafeContext(lock): + current = counter[0] + time.sleep(0.001) + counter[0] = current + 1 + + threads = [] + for _ in range(10): + thread = threading.Thread(target=worker) + threads.append(thread) + thread.start() + + for thread in threads: + thread.join() + + assert counter[0] == 10 + + def test_thread_safe_context_exception_handling(self): + """Test ThreadSafeContext properly releases lock on exception.""" + lock = threading.RLock() + + try: + with ThreadSafeContext(lock): + raise ValueError("Test exception") + except ValueError: + pass + + # Lock should be released even after exception + assert lock.acquire(blocking=False) + lock.release() + + +class TestEdgeCases: + """Test edge cases and error scenarios.""" + + def test_thread_safe_with_static_methods(self): + """Test thread safety with static methods.""" + + @auto_thread_safe(['regular_method']) + class TestClass: + counter = 0 + + def regular_method(self): + TestClass.counter += 1 + + @staticmethod + def static_method(): + return "static" + + obj = TestClass() + + # Regular method should be wrapped + assert hasattr(obj.regular_method, '_thread_safe_wrapped') + # Static method should not be affected + assert TestClass.static_method() == "static" + + def test_thread_safe_with_class_methods(self): + """Test thread safety with class methods.""" + + @auto_thread_safe(['regular_method']) + class TestClass: + counter = 0 + + def regular_method(self): + self.__class__.counter += 1 + + @classmethod + def class_method(cls): + return "class" + + obj = TestClass() + + # Regular method should be wrapped + assert hasattr(obj.regular_method, '_thread_safe_wrapped') + # Class method should work normally + assert TestClass.class_method() == "class" + + def test_thread_safe_with_properties(self): + """Test thread safety with properties.""" + + @auto_thread_safe(['set_value']) + class TestClass: + def __init__(self): + self._value = 0 + + @property + def value(self): + return self._value + + def set_value(self, val): + self._value = val + + obj = TestClass() + + # Property should work normally + assert obj.value == 0 + obj.set_value(42) + assert obj.value == 42 + + def test_multiple_decorator_applications(self): + """Test applying decorators multiple times.""" + + class TestClass: + def __init__(self): + self._lock = threading.RLock() + self.counter = 0 + + def increment(self): + self.counter += 1 + + # Apply auto_thread_safe multiple times + TestClass = auto_thread_safe(['increment'])(TestClass) + TestClass = auto_thread_safe(['increment'])(TestClass) + + obj = TestClass() + obj.increment() + assert obj.counter == 1 + + def test_thread_safety_with_exceptions(self): + """Test that locks are properly released when exceptions occur.""" + + class TestClass: + def __init__(self): + self._lock = threading.RLock() + self.counter = 0 + + @thread_safe + def failing_method(self): + self.counter += 1 + raise ValueError("Test exception") + + obj = TestClass() + + # Method should raise exception but lock should be released + with pytest.raises(ValueError): + obj.failing_method() + + # Lock should be available for next call + assert obj._lock.acquire(blocking=False) + obj._lock.release() + + assert obj.counter == 1 + + def test_nested_thread_safe_calls(self): + """Test nested calls to thread-safe methods.""" + + class TestClass: + def __init__(self): + self._lock = threading.RLock() + self.counter = 0 + + @thread_safe + def outer_method(self): + self.counter += 1 + self.inner_method() + + @thread_safe + def inner_method(self): + self.counter += 10 + + obj = TestClass() + obj.outer_method() + + # Should work with nested calls (RLock allows reentrance) + assert obj.counter == 11 \ No newline at end of file diff --git a/tests/thread_safety/test_thread_safety_patterns.py b/tests/thread_safety/test_thread_safety_patterns.py new file mode 100644 index 0000000..5645c90 --- /dev/null +++ b/tests/thread_safety/test_thread_safety_patterns.py @@ -0,0 +1,502 @@ +"""Test different thread safety usage patterns and advanced scenarios.""" +import threading +import time +import weakref +from concurrent.futures import ThreadPoolExecutor, as_completed +from pythonLogs.thread_safety import ( + ThreadSafeMeta, + thread_safe, + auto_thread_safe, + AutoThreadSafe, + ThreadSafeContext +) + + +class TestAdvancedThreadSafetyPatterns: + """Test advanced thread safety patterns and real-world scenarios.""" + + def test_producer_consumer_pattern(self): + """Test thread safety in producer-consumer pattern.""" + + @auto_thread_safe(['put', 'get']) + class ThreadSafeQueue: + def __init__(self, maxsize=10): + self.queue = [] + self.maxsize = maxsize + self.condition = threading.Condition() + + def put(self, item): + with self.condition: + while len(self.queue) >= self.maxsize: + self.condition.wait() + self.queue.append(item) + self.condition.notify_all() + + def get(self): + with self.condition: + while not self.queue: + self.condition.wait() + item = self.queue.pop(0) + self.condition.notify_all() + return item + + def size(self): + return len(self.queue) + + queue = ThreadSafeQueue(maxsize=5) + results = [] + + def producer(start, end): + for i in range(start, end): + queue.put(f"item_{i}") + time.sleep(0.001) + + def consumer(count): + items = [] + for _ in range(count): + item = queue.get() + items.append(item) + time.sleep(0.001) + results.extend(items) + + # Start producers and consumers + with ThreadPoolExecutor(max_workers=6) as executor: + # Start 2 producers + producer_futures = [ + executor.submit(producer, 0, 10), + executor.submit(producer, 10, 20) + ] + + # Start 2 consumers + consumer_futures = [ + executor.submit(consumer, 10), + executor.submit(consumer, 10) + ] + + # Wait for completion + for future in as_completed(producer_futures + consumer_futures): + future.result() + + # Should have consumed all 20 items + assert len(results) == 20 + assert queue.size() == 0 + + def test_reader_writer_pattern(self): + """Test thread safety in reader-writer pattern.""" + + @auto_thread_safe(['read', 'write']) + class ThreadSafeDataStore: + def __init__(self): + self.data = {} + self.read_count = 0 + self.write_count = 0 + + def read(self, key): + self.read_count += 1 + time.sleep(0.001) # Simulate read time + return self.data.get(key) + + def write(self, key, value): + self.write_count += 1 + time.sleep(0.001) # Simulate write time + self.data[key] = value + + def get_stats(self): + return { + 'reads': self.read_count, + 'writes': self.write_count, + 'data_size': len(self.data) + } + + store = ThreadSafeDataStore() + + def writer(start, end): + for i in range(start, end): + store.write(f"key_{i}", f"value_{i}") + + def reader(keys): + results = [] + for key in keys: + value = store.read(key) + if value: + results.append((key, value)) + return results + + # Start writers first + with ThreadPoolExecutor(max_workers=8) as executor: + writer_futures = [ + executor.submit(writer, 0, 25), + executor.submit(writer, 25, 50) + ] + + # Wait for some writes to complete + time.sleep(0.1) + + # Start readers + reader_futures = [ + executor.submit(reader, [f"key_{i}" for i in range(0, 20)]), + executor.submit(reader, [f"key_{i}" for i in range(20, 40)]), + executor.submit(reader, [f"key_{i}" for i in range(30, 50)]) + ] + + # Collect results + for future in as_completed(writer_futures + reader_futures): + future.result() + + stats = store.get_stats() + assert stats['writes'] == 50 + assert stats['data_size'] == 50 + assert stats['reads'] > 0 # Some reads should have occurred + + def test_singleton_pattern_thread_safety(self): + """Test thread-safe singleton pattern.""" + + class ThreadSafeSingleton: + _instance = None + _instance_lock = threading.RLock() + + def __init__(self): + if ThreadSafeSingleton._instance is not None: + raise RuntimeError("Use get_instance() to get singleton") + self.created_at = time.time() + self.counter = 0 + + @classmethod + def get_instance(cls): + if cls._instance is None: + with cls._instance_lock: + if cls._instance is None: + cls._instance = cls() + return cls._instance + + def increment(self): + with self._instance_lock: + self.counter += 1 + + instances = [] + + def get_singleton(): + instance = ThreadSafeSingleton.get_instance() + instances.append(instance) + instance.increment() + return instance + + # Create multiple threads trying to get singleton + with ThreadPoolExecutor(max_workers=10) as executor: + futures = [executor.submit(get_singleton) for _ in range(50)] + results = [future.result() for future in as_completed(futures)] + + # All instances should be the same object + assert len(set(id(inst) for inst in instances)) == 1 + # Counter should be exactly 50 + assert instances[0].counter == 50 + + def test_resource_pool_pattern(self): + """Test thread-safe resource pool pattern.""" + + @auto_thread_safe(['get_resource', 'return_resource']) + class ThreadSafeResourcePool: + def __init__(self, create_resource_func, pool_size=5): + self.create_resource = create_resource_func + self.pool_size = pool_size + self.available = [] + self.in_use = set() + self.condition = threading.Condition() + + # Pre-populate pool + for _ in range(pool_size): + self.available.append(self.create_resource()) + + def get_resource(self, timeout=None): + with self.condition: + start_time = time.time() + while not self.available: + if timeout and (time.time() - start_time) > timeout: + raise TimeoutError("No resource available") + self.condition.wait(timeout=0.1) + + resource = self.available.pop() + self.in_use.add(resource) + return resource + + def return_resource(self, resource): + with self.condition: + if resource in self.in_use: + self.in_use.remove(resource) + self.available.append(resource) + self.condition.notify() + + def stats(self): + return { + 'available': len(self.available), + 'in_use': len(self.in_use), + 'total': len(self.available) + len(self.in_use) + } + + # Create a simple resource (just a counter) + resource_counter = [0] + def create_resource(): + resource_counter[0] += 1 + return f"resource_{resource_counter[0]}" + + pool = ThreadSafeResourcePool(create_resource, pool_size=3) + completed_tasks = [] + + def worker(worker_id): + try: + resource = pool.get_resource(timeout=1.0) + time.sleep(0.1) # Simulate work + completed_tasks.append(f"worker_{worker_id}_used_{resource}") + pool.return_resource(resource) + except TimeoutError: + completed_tasks.append(f"worker_{worker_id}_timeout") + + # Start more workers than available resources + with ThreadPoolExecutor(max_workers=8) as executor: + futures = [executor.submit(worker, i) for i in range(8)] + for future in as_completed(futures): + future.result() + + stats = pool.stats() + assert stats['total'] == 3 # Pool size maintained + assert stats['available'] == 3 # All resources returned + assert stats['in_use'] == 0 # No resources stuck + assert len(completed_tasks) == 8 # All workers completed + + def test_cache_with_expiry_thread_safety(self): + """Test thread-safe cache with expiry.""" + + @auto_thread_safe(['get', 'put', 'cleanup']) + class ThreadSafeExpiryCache: + def __init__(self, default_ttl=1.0): + self.cache = {} + self.timestamps = {} + self.default_ttl = default_ttl + + def get(self, key): + if key in self.cache: + if time.time() - self.timestamps[key] < self.default_ttl: + return self.cache[key] + else: + # Expired + del self.cache[key] + del self.timestamps[key] + return None + + def put(self, key, value, ttl=None): + self.cache[key] = value + self.timestamps[key] = time.time() + + def cleanup(self): + current_time = time.time() + expired_keys = [ + key for key, timestamp in self.timestamps.items() + if current_time - timestamp >= self.default_ttl + ] + for key in expired_keys: + del self.cache[key] + del self.timestamps[key] + return len(expired_keys) + + def size(self): + return len(self.cache) + + cache = ThreadSafeExpiryCache(default_ttl=0.1) + + def cache_worker(worker_id): + # Put some data + for i in range(5): + cache.put(f"key_{worker_id}_{i}", f"value_{worker_id}_{i}") + + # Try to get data immediately + results = [] + for i in range(5): + value = cache.get(f"key_{worker_id}_{i}") + if value: + results.append(value) + + return results + + # Run workers concurrently + with ThreadPoolExecutor(max_workers=5) as executor: + futures = [executor.submit(cache_worker, i) for i in range(5)] + results = [future.result() for future in as_completed(futures)] + + # Wait for expiry + time.sleep(0.2) + + # Cleanup expired entries + expired_count = cache.cleanup() + + # Most entries should have expired + assert expired_count > 0 + assert cache.size() < 25 # Should be less than total inserted + + def test_event_bus_thread_safety(self): + """Test thread-safe event bus pattern.""" + + @auto_thread_safe(['subscribe', 'unsubscribe', 'publish']) + class ThreadSafeEventBus: + def __init__(self): + self.subscribers = {} + self.event_count = {} + + def subscribe(self, event_type, callback): + if event_type not in self.subscribers: + self.subscribers[event_type] = [] + self.subscribers[event_type].append(callback) + + def unsubscribe(self, event_type, callback): + if event_type in self.subscribers: + try: + self.subscribers[event_type].remove(callback) + except ValueError: + pass + + def publish(self, event_type, data): + self.event_count[event_type] = self.event_count.get(event_type, 0) + 1 + if event_type in self.subscribers: + for callback in self.subscribers[event_type][:]: # Copy to avoid modification during iteration + try: + callback(data) + except Exception: + pass # Ignore callback errors + + def get_stats(self): + return { + 'subscriber_count': sum(len(subs) for subs in self.subscribers.values()), + 'event_types': len(self.subscribers), + 'events_published': dict(self.event_count) + } + + event_bus = ThreadSafeEventBus() + received_events = [] + events_lock = threading.RLock() + + def event_handler(event_type): + def handler(data): + with events_lock: + received_events.append((event_type, data)) + return handler + + def publisher(event_type, count): + for i in range(count): + event_bus.publish(event_type, f"{event_type}_data_{i}") + time.sleep(0.001) + + def subscriber(event_type): + handler = event_handler(event_type) + event_bus.subscribe(event_type, handler) + time.sleep(0.05) # Let some events be published + return handler + + # Start subscribers and publishers concurrently + with ThreadPoolExecutor(max_workers=10) as executor: + # Start subscribers + sub_futures = [ + executor.submit(subscriber, "type_A"), + executor.submit(subscriber, "type_B"), + executor.submit(subscriber, "type_A"), # Multiple subscribers for same type + ] + + # Start publishers + pub_futures = [ + executor.submit(publisher, "type_A", 10), + executor.submit(publisher, "type_B", 5), + executor.submit(publisher, "type_C", 3), # No subscribers + ] + + # Wait for completion + for future in as_completed(sub_futures + pub_futures): + future.result() + + stats = event_bus.get_stats() + + # Should have received events for subscribed types + type_a_events = [e for e in received_events if e[0] == "type_A"] + type_b_events = [e for e in received_events if e[0] == "type_B"] + + assert len(type_a_events) > 0 # type_A had 2 subscribers + assert len(type_b_events) > 0 # type_B had 1 subscriber + assert stats['events_published']['type_A'] == 10 + assert stats['events_published']['type_B'] == 5 + assert stats['events_published']['type_C'] == 3 + + def test_weak_reference_cleanup_thread_safety(self): + """Test thread safety with weak references and cleanup.""" + + @auto_thread_safe(['register', 'cleanup', 'get_count']) + class ThreadSafeWeakRegistry: + def __init__(self): + self.registry = {} + self.cleanup_count = 0 + + def register(self, obj, name): + def cleanup_callback(weak_ref): + self.cleanup_count += 1 + if name in self.registry: + del self.registry[name] + + weak_ref = weakref.ref(obj, cleanup_callback) + self.registry[name] = weak_ref + + def cleanup(self): + # Manual cleanup of dead references + dead_refs = [] + for name, weak_ref in self.registry.items(): + if weak_ref() is None: + dead_refs.append(name) + + for name in dead_refs: + del self.registry[name] + + return len(dead_refs) + + def get_count(self): + return len(self.registry) + + def get_cleanup_count(self): + return self.cleanup_count + + registry = ThreadSafeWeakRegistry() + + def create_and_register_objects(start, end): + objects = [] + for i in range(start, end): + # Create a proper object that can have weak references + obj = type(f"TestObject_{i}", (), {"value": i})() + registry.register(obj, f"name_{i}") + objects.append(obj) + + # Let some objects go out of scope gradually + for i in range(len(objects)): + if i % 2 == 0: + objects[i] = None # Clear reference + time.sleep(0.001) + + return len([obj for obj in objects if obj is not None]) + + # Create objects in multiple threads + with ThreadPoolExecutor(max_workers=5) as executor: + futures = [ + executor.submit(create_and_register_objects, i*10, (i+1)*10) + for i in range(5) + ] + + for future in as_completed(futures): + future.result() + + # Force garbage collection + import gc + gc.collect() + time.sleep(0.1) + + # Manual cleanup + cleaned = registry.cleanup() + + # Some cleanup should have occurred + final_count = registry.get_count() + cleanup_count = registry.get_cleanup_count() + + assert final_count < 50 # Some objects should be cleaned up + assert cleanup_count >= 0 # Some automatic cleanup may have occurred \ No newline at end of file diff --git a/tests/test_timezone_migration.py b/tests/timezone/test_timezone_migration.py similarity index 100% rename from tests/test_timezone_migration.py rename to tests/timezone/test_timezone_migration.py diff --git a/tests/test_zoneinfo_fallbacks.py b/tests/timezone/test_zoneinfo_fallbacks.py similarity index 100% rename from tests/test_zoneinfo_fallbacks.py rename to tests/timezone/test_zoneinfo_fallbacks.py