From c36ddd2e46860838bc07c830417d5cd60f5ecf7f Mon Sep 17 00:00:00 2001 From: ddc Date: Sun, 20 Jul 2025 10:52:20 -0300 Subject: [PATCH 1/2] V4.0.2 --- .github/workflows/tests.yml | 39 ++- .github/workflows/workflow.yml | 67 ----- README.md | 19 +- poetry.lock | 2 +- pyproject.toml | 17 +- pythonLogs/.env.example | 5 +- pythonLogs/factory.py | 33 ++- pythonLogs/settings.py | 11 +- ...st_some_log_utils.py => test_log_utils.py} | 231 +++++++++++++++++- 9 files changed, 338 insertions(+), 86 deletions(-) rename tests/{test_some_log_utils.py => test_log_utils.py} (60%) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 6a1fb42..28213bb 100755 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -2,13 +2,11 @@ name: Run Tests on: push: - branches: - - "**" # including all branches before excluding master - - "!master" - - "!main" + branches: ["**"] + jobs: - tests: + test: name: Test Python ${{ matrix.python-version }} runs-on: ubuntu-latest strategy: @@ -41,3 +39,34 @@ jobs: with: token: ${{ secrets.CODECOV_TOKEN }} slug: ddc/pythonLogs + + build: + name: Build Test Package + runs-on: ubuntu-latest + needs: test + steps: + - uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: "3.13" + + - name: Install Poetry + uses: snok/install-poetry@v1 + with: + virtualenvs-create: true + virtualenvs-in-project: true + + - name: Install build dependencies only + run: poetry install --only main --no-interaction --no-ansi + + - name: Build package + run: poetry build + + - name: Upload artifacts + uses: actions/upload-artifact@v4 + with: + name: python-packages + path: dist/ + retention-days: 7 diff --git a/.github/workflows/workflow.yml b/.github/workflows/workflow.yml index 73658f6..40ac9ad 100755 --- a/.github/workflows/workflow.yml +++ b/.github/workflows/workflow.yml @@ -7,73 +7,6 @@ on: jobs: - test: - name: Test Python ${{ matrix.python-version }} - runs-on: ubuntu-latest - if: "!startsWith(github.ref, 'refs/tags/')" - strategy: - fail-fast: false - matrix: - python-version: ["3.10", "3.11", "3.12", "3.13"] - steps: - - uses: actions/checkout@v4 - - - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v5 - with: - python-version: ${{ matrix.python-version }} - - - name: Install Poetry - uses: snok/install-poetry@v1 - with: - virtualenvs-create: true - virtualenvs-in-project: true - - - name: Install dependencies - run: poetry install --with test --no-interaction --no-ansi - - - name: Run tests with coverage - run: poetry run poe tests - - - name: Upload coverage reports to Codecov - if: matrix.python-version == '3.13' - uses: codecov/codecov-action@v5 - with: - token: ${{ secrets.CODECOV_TOKEN }} - slug: ddc/pythonLogs - - build: - name: Build Test Package - runs-on: ubuntu-latest - needs: test - if: "!startsWith(github.ref, 'refs/tags/')" - steps: - - uses: actions/checkout@v4 - - - name: Set up Python - uses: actions/setup-python@v5 - with: - python-version: "3.13" - - - name: Install Poetry - uses: snok/install-poetry@v1 - with: - virtualenvs-create: true - virtualenvs-in-project: true - - - name: Install build dependencies only - run: poetry install --only main --no-interaction --no-ansi - - - name: Build package - run: poetry build - - - name: Upload artifacts - uses: actions/upload-artifact@v4 - with: - name: python-packages - path: dist/ - retention-days: 7 - release: name: Build and Release runs-on: ubuntu-latest diff --git a/README.md b/README.md index 91e58ae..0ba37bf 100755 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ [![Support me on GitHub](https://img.shields.io/badge/Support_me_on_GitHub-154c79?style=for-the-badge&logo=github)](https://github.com/sponsors/ddc) -A modern, high-performance Python logging library with automatic file rotation, context manager support, and memory optimization. +High-performance Python logging library with file rotation and optimized caching for better performance ## Table of Contents @@ -384,18 +384,28 @@ error_logger.error("Database connection failed") audit_logger.info("User admin logged in") ``` -## Env Variables (Optional) +## Env Variables (Optional | Production) +.env variables can be used by leaving all options blank when calling the function +If not specified inside the .env file, it will use the dafault value +This is a good approach for production environments, since options can be changed easily +```python +from pythonLogs import timed_rotating_logger +log = timed_rotating_logger() +``` + ``` LOG_LEVEL=DEBUG -LOG_TIMEZONE=America/Chicago +LOG_TIMEZONE=UTC LOG_ENCODING=UTF-8 LOG_APPNAME=app LOG_FILENAME=app.log LOG_DIRECTORY=/app/logs LOG_DAYS_TO_KEEP=30 +LOG_DATE_FORMAT=%Y-%m-%dT%H:%M:%S LOG_STREAM_HANDLER=True LOG_SHOW_LOCATION=False -LOG_DATE_FORMAT=%Y-%m-%dT%H:%M:%S +LOG_MAX_LOGGERS=50 +LOG_LOGGER_TTL_SECONDS=1800 # SizeRotatingLog LOG_MAX_FILE_SIZE_MB=10 @@ -403,6 +413,7 @@ LOG_MAX_FILE_SIZE_MB=10 # TimedRotatingLog LOG_ROTATE_WHEN=midnight LOG_ROTATE_AT_UTC=True +LOG_ROTATE_FILE_SUFIX="%Y%m%d" ``` diff --git a/poetry.lock b/poetry.lock index ac7a1ec..e85ef14 100644 --- a/poetry.lock +++ b/poetry.lock @@ -569,4 +569,4 @@ typing-extensions = ">=4.12.0" [metadata] lock-version = "2.1" python-versions = "^3.10" -content-hash = "1b58e2194b8ca5f7f291348a8dfbb2f7bf2b5eb0d781d0aee4955e482a159115" +content-hash = "12c2b54bc5df384ec52a9d28013534121f6dd11a5874ecbe9b428758e558688b" diff --git a/pyproject.toml b/pyproject.toml index d4badcd..8b9deae 100755 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,8 +5,8 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "pythonLogs" -version = "4.0.1" -description = "A modern, high-performance Python logging library with automatic file rotation, factory pattern for easy logger creation, and optimized caching for better performance." +version = "4.0.2" +description = "High-performance Python logging library with file rotation and optimized caching for better performance" license = "MIT" readme = "README.md" authors = ["Daniel Costa "] @@ -34,6 +34,7 @@ classifiers = [ [tool.poetry.dependencies] python = "^3.10" +pydantic = "^2.11.7" pydantic-settings = "^2.10.1" python-dotenv = "^1.1.1" @@ -48,6 +49,12 @@ pytest = "^8.4.1" [tool.poetry.group.test] optional = true + +[tool.black] +line-length = 120 +skip-string-normalization = true + + [tool.pytest.ini_options] markers = [ "slow: marks tests as slow (deselect with '-m \"not slow\"')" @@ -60,6 +67,12 @@ omit = [ ] +[tool.coverage.report] +exclude_lines = [ + "pragma: no cover", +] + + [tool.poe.tasks] _test = "coverage run -m pytest -v" _coverage_report = "coverage report" diff --git a/pythonLogs/.env.example b/pythonLogs/.env.example index 1d7c109..8821100 100644 --- a/pythonLogs/.env.example +++ b/pythonLogs/.env.example @@ -5,9 +5,11 @@ LOG_APPNAME=app LOG_FILENAME=app.log LOG_DIRECTORY=/app/logs LOG_DAYS_TO_KEEP=30 +LOG_DATE_FORMAT=%Y-%m-%dT%H:%M:%S LOG_STREAM_HANDLER=True LOG_SHOW_LOCATION=False -LOG_DATE_FORMAT=%Y-%m-%dT%H:%M:%S +LOG_MAX_LOGGERS=50 +LOG_LOGGER_TTL_SECONDS=1800 # SizeRotatingLog LOG_MAX_FILE_SIZE_MB=10 @@ -15,3 +17,4 @@ LOG_MAX_FILE_SIZE_MB=10 # TimedRotatingLog LOG_ROTATE_WHEN=midnight LOG_ROTATE_AT_UTC=True +LOG_ROTATE_FILE_SUFIX="%Y%m%d" diff --git a/pythonLogs/factory.py b/pythonLogs/factory.py index c23c898..15c940b 100644 --- a/pythonLogs/factory.py +++ b/pythonLogs/factory.py @@ -1,4 +1,5 @@ # -*- encoding: utf-8 -*- +import atexit import logging import threading import time @@ -28,6 +29,23 @@ class LoggerFactory: # Memory optimization settings _max_loggers = 100 # Maximum number of cached loggers _logger_ttl = 3600 # Logger TTL in seconds (1 hour) + _initialized = False # Flag to track if memory limits have been initialized + _atexit_registered = False # Flag to track if atexit cleanup is registered + + @classmethod + def _ensure_initialized(cls) -> None: + """Ensure memory limits are initialized from settings on first use.""" + if not cls._initialized: + from pythonLogs.settings import get_log_settings + settings = get_log_settings() + cls._max_loggers = settings.max_loggers + cls._logger_ttl = settings.logger_ttl_seconds + cls._initialized = True + + # Register atexit cleanup on first use + if not cls._atexit_registered: + atexit.register(cls._atexit_cleanup) + cls._atexit_registered = True @classmethod def get_or_create_logger( @@ -54,6 +72,9 @@ def get_or_create_logger( # Thread-safe check-and-create operation with cls._registry_lock: + # Initialize memory limits from settings on first use + cls._ensure_initialized() + # Clean up expired loggers first cls._cleanup_expired_loggers() @@ -114,7 +135,7 @@ def _enforce_size_limit(cls) -> None: @classmethod def set_memory_limits(cls, max_loggers: int = 100, ttl_seconds: int = 3600) -> None: - """Configure memory management limits for the logger registry. + """Configure memory management limits for the logger registry at runtime. Args: max_loggers: Maximum number of cached loggers @@ -123,10 +144,20 @@ def set_memory_limits(cls, max_loggers: int = 100, ttl_seconds: int = 3600) -> N with cls._registry_lock: cls._max_loggers = max_loggers cls._logger_ttl = ttl_seconds + cls._initialized = True # Mark as manually configured # Clean up immediately with new settings cls._cleanup_expired_loggers() cls._enforce_size_limit() + @classmethod + def _atexit_cleanup(cls) -> None: + """Cleanup function registered with atexit to ensure proper resource cleanup.""" + try: + cls.clear_registry() + except Exception: + # Silently ignore exceptions during shutdown cleanup + pass + @staticmethod def _cleanup_logger(logger: logging.Logger) -> None: """Clean up logger resources by closing all handlers.""" diff --git a/pythonLogs/settings.py b/pythonLogs/settings.py index a002c97..f9a0080 100644 --- a/pythonLogs/settings.py +++ b/pythonLogs/settings.py @@ -23,15 +23,18 @@ class LogSettings(BaseSettings): """If any ENV variable is omitted, it falls back to default values here""" level: Optional[LogLevel] = Field(default=LogLevel.INFO) + timezone: Optional[str] = Field(default=DEFAULT_TIMEZONE) + encoding: Optional[str] = Field(default=DEFAULT_ENCODING) appname: Optional[str] = Field(default="app") - directory: Optional[str] = Field(default="/app/logs") filename: Optional[str] = Field(default="app.log") - encoding: Optional[str] = Field(default=DEFAULT_ENCODING) - date_format: Optional[str] = Field(default=DEFAULT_DATE_FORMAT) + directory: Optional[str] = Field(default="/app/logs") days_to_keep: Optional[int] = Field(default=DEFAULT_BACKUP_COUNT) - timezone: Optional[str] = Field(default=DEFAULT_TIMEZONE) + date_format: Optional[str] = Field(default=DEFAULT_DATE_FORMAT) stream_handler: Optional[bool] = Field(default=True) show_location: Optional[bool] = Field(default=False) + # Memory management + max_loggers: Optional[int] = Field(default=100) + logger_ttl_seconds: Optional[int] = Field(default=3600) # SizeRotatingLog max_file_size_mb: Optional[int] = Field(default=10) diff --git a/tests/test_some_log_utils.py b/tests/test_log_utils.py similarity index 60% rename from tests/test_some_log_utils.py rename to tests/test_log_utils.py index f8bd699..0b3aab7 100644 --- a/tests/test_some_log_utils.py +++ b/tests/test_log_utils.py @@ -383,8 +383,237 @@ def test_check_directory_permissions_caching(self): # First call should add to cache log_utils.check_directory_permissions(temp_dir) - # Second call should use cache (no exception should be raised) + # The Second call should use cache (no exception should be raised) log_utils.check_directory_permissions(temp_dir) # Verify it's in the cache by checking the global variable assert temp_dir in log_utils._checked_directories + + def test_check_directory_permissions_cache_eviction(self): + """Test cache eviction when max directories reached""" + original_max = log_utils._max_cached_directories + original_cache = log_utils._checked_directories.copy() + + try: + # Set a small cache size for testing + log_utils._max_cached_directories = 2 + log_utils._checked_directories.clear() + + with tempfile.TemporaryDirectory() as temp_dir1: + with tempfile.TemporaryDirectory() as temp_dir2: + with tempfile.TemporaryDirectory() as temp_dir3: + # Fill cache to capacity + log_utils.check_directory_permissions(temp_dir1) + log_utils.check_directory_permissions(temp_dir2) + assert len(log_utils._checked_directories) == 2 + + # Adding a third should trigger eviction + log_utils.check_directory_permissions(temp_dir3) + assert len(log_utils._checked_directories) == 2 + assert temp_dir3 in log_utils._checked_directories + finally: + # Restore original values + log_utils._max_cached_directories = original_max + log_utils._checked_directories = original_cache + + def test_handler_close_error_handling(self): + """Test error handling when closing handlers in get_logger_and_formatter""" + name = "test_handler_error" + + # Create a logger with a handler that will error on close + logger = logging.getLogger(name) + + # Create a mock handler that raises error on close + class ErrorHandler(logging.StreamHandler): + def close(self): + raise OSError("Test error") + + error_handler = ErrorHandler() + logger.addHandler(error_handler) + + # This should handle the error gracefully + new_logger, formatter = log_utils.get_logger_and_formatter(name, "%Y-%m-%d", False, "UTC") + + # Should still work despite the error + assert new_logger is logger + assert len(new_logger.handlers) == 0 + + def test_remove_old_logs_file_error(self): + """Test remove_old_logs error handling when file deletion fails""" + with tempfile.TemporaryDirectory() as temp_dir: + # Create a .gz file + gz_file = os.path.join(temp_dir, "test.gz") + with open(gz_file, "wb") as f: + f.write(b"test content") + + # Set old modification time + old_time = time.time() - 2*24*60*60 # 2 days old + os.utime(gz_file, (old_time, old_time)) + + # Make parent directory read-only to trigger deletion error + os.chmod(temp_dir, 0o555) + + try: + # Capture stderr to verify error was logged + stderr_capture = io.StringIO() + with contextlib.redirect_stderr(stderr_capture): + log_utils.remove_old_logs(temp_dir, 1) + + # Should have logged an error but not crashed + output = stderr_capture.getvalue() + assert "Unable to delete old log" in output + finally: + # Restore permissions for cleanup + os.chmod(temp_dir, 0o755) + + def test_remove_old_logs_directory_error(self): + """Test remove_old_logs error handling when directory scan fails""" + # Test with a simulated Path.glob() error by mocking pathlib.Path + import unittest.mock + from pathlib import Path + + with tempfile.TemporaryDirectory() as temp_dir: + # Create a normal directory first + test_dir = os.path.join(temp_dir, "test_dir") + os.makedirs(test_dir) + + # Mock Path.glob to raise an OSError + original_path_glob = Path.glob + def mock_glob(self, pattern): + if str(self) == test_dir: + raise OSError("Mocked directory scan error") + return original_path_glob(self, pattern) + + try: + with unittest.mock.patch.object(Path, 'glob', mock_glob): + stderr_capture = io.StringIO() + with contextlib.redirect_stderr(stderr_capture): + log_utils.remove_old_logs(test_dir, 1) + + output = stderr_capture.getvalue() + assert "Unable to scan directory for old logs" in output + finally: + # Ensure the original method is restored + Path.glob = original_path_glob + + def test_delete_file_special_file(self): + """Test delete_file with special file types""" + with tempfile.TemporaryDirectory() as temp_dir: + # Create a symbolic link (special file) + target_file = os.path.join(temp_dir, "target.txt") + link_file = os.path.join(temp_dir, "link.txt") + + with open(target_file, "w") as f: + f.write("test content") + + os.symlink(target_file, link_file) + assert os.path.exists(link_file) + assert os.path.islink(link_file) + + # delete_file should handle symlinks + result = log_utils.delete_file(link_file) + assert result == True + assert not os.path.exists(link_file) + + def test_get_log_path_permission_error(self): + """Test get_log_path when directory exists but is not writable""" + with tempfile.TemporaryDirectory() as temp_dir: + # Create a subdirectory and make it read-only + readonly_dir = os.path.join(temp_dir, "readonly") + os.makedirs(readonly_dir) + os.chmod(readonly_dir, 0o555) # Read and execute only + + try: + with pytest.raises(PermissionError) as exc_info: + log_utils.get_log_path(readonly_dir, "test.log") + # The error could be from check_directory_permissions or get_log_path itself + assert ("Unable to access directory" in str(exc_info.value) or + "Unable to write to log directory" in str(exc_info.value)) + finally: + os.chmod(readonly_dir, 0o755) # Restore for cleanup + + def test_gzip_file_io_error(self): + """Test gzip_file_with_sufix error handling during compression""" + with tempfile.TemporaryDirectory() as temp_dir: + # Create a test file + test_file = os.path.join(temp_dir, "test.log") + with open(test_file, "w") as f: + f.write("test content") + + # Make directory read-only to trigger gzip error + os.chmod(temp_dir, 0o555) + + try: + stderr_capture = io.StringIO() + with contextlib.redirect_stderr(stderr_capture): + with pytest.raises(OSError): + log_utils.gzip_file_with_sufix(test_file, "test") + + output = stderr_capture.getvalue() + assert "Unable to gzip log file" in output + finally: + os.chmod(temp_dir, 0o755) # Restore for cleanup + + def test_gzip_file_deletion_error(self): + """Test gzip_file_with_sufix error when source file deletion fails""" + with tempfile.TemporaryDirectory() as temp_dir: + # Create a test file + test_file = os.path.join(temp_dir, "test.log") + with open(test_file, "w") as f: + f.write("test content") + + # Create the gzip file successfully first + result = log_utils.gzip_file_with_sufix(test_file, "test") + assert result is not None + assert result.endswith("_test.log.gz") + + # Clean up + if os.path.exists(result): + os.unlink(result) + + def test_write_stderr_fallback(self): + """Test write_stderr fallback when timezone operations fail""" + # Save original function + original_get_stderr_tz = log_utils._get_stderr_timezone + + # Mock _get_stderr_timezone to raise an error + def mock_error_timezone(): + raise KeyError("Mock timezone error") + + try: + log_utils._get_stderr_timezone = mock_error_timezone + + stderr_capture = io.StringIO() + with contextlib.redirect_stderr(stderr_capture): + log_utils.write_stderr("Test fallback message") + + output = stderr_capture.getvalue() + assert "Test fallback message" in output + assert "ERROR" in output + finally: + # Restore original function + log_utils._get_stderr_timezone = original_get_stderr_tz + + def test_stderr_timezone_with_special_timezone(self): + """Test _get_stderr_timezone with different timezone configurations""" + original_tz = os.environ.get("LOG_TIMEZONE") + + try: + # Test with a specific timezone + os.environ["LOG_TIMEZONE"] = "Europe/London" + # Clear the cache + log_utils._get_stderr_timezone.cache_clear() + + stderr_capture = io.StringIO() + with contextlib.redirect_stderr(stderr_capture): + log_utils.write_stderr("Test timezone message") + + output = stderr_capture.getvalue() + assert "Test timezone message" in output + + finally: + if original_tz is not None: + os.environ["LOG_TIMEZONE"] = original_tz + elif "LOG_TIMEZONE" in os.environ: + del os.environ["LOG_TIMEZONE"] + log_utils._get_stderr_timezone.cache_clear() From 85be4bb906eb4e36ed3bd5d103dfa5d1beb96558 Mon Sep 17 00:00:00 2001 From: ddc Date: Sun, 20 Jul 2025 11:14:25 -0300 Subject: [PATCH 2/2] V4.0.2 --- tests/test_factory.py | 163 +++++++++++++- tests/test_log_utils.py | 281 +++++++++++++++++++++++ tests/test_memory_optimization.py | 357 +++++++++++++++++++++++++++++- 3 files changed, 794 insertions(+), 7 deletions(-) diff --git a/tests/test_factory.py b/tests/test_factory.py index 6d02422..6a4e1e5 100644 --- a/tests/test_factory.py +++ b/tests/test_factory.py @@ -59,7 +59,7 @@ def test_timed_rotating_logger_creation(self): def test_logger_registry_caching(self): """Test logger registry functionality.""" - # Clear registry and verify it's empty + # Clear the registry and verify it's empty clear_logger_registry() assert len(get_registered_loggers()) == 0 @@ -208,7 +208,7 @@ def test_factory_shutdown_logger(self): assert result is True assert "shutdown_test" not in get_registered_loggers() - # Try to shutdown non-existent logger + # Try to shut down non-existent logger result = LoggerFactory.shutdown_logger("non_existent") assert result is False @@ -291,10 +291,10 @@ def test_factory_registry_copy_safety(self): registry_copy = get_registered_loggers() assert len(registry_copy) == 2 - # Modify the copy (should not affect original) + # Modify the copy (should not affect the original) registry_copy["new_logger"] = logger1 - # Original registry should be unchanged + # The Original registry should be unchanged original_registry = get_registered_loggers() assert len(original_registry) == 2 assert "new_logger" not in original_registry @@ -314,3 +314,158 @@ def test_factory_error_handling_during_cleanup(self): # Shutdown should handle the error gracefully result = LoggerFactory.shutdown_logger("cleanup_error_test") assert result is True + + def test_factory_ensure_initialized_behavior(self): + """Test _ensure_initialized method behavior.""" + # Clear any existing initialization + LoggerFactory._initialized = False + LoggerFactory._atexit_registered = False + + # Calling get_or_create_logger should trigger initialization + logger = LoggerFactory.get_or_create_logger(LoggerType.BASIC, name="init_test") + + # Should now be initialized + assert LoggerFactory._initialized is True + assert LoggerFactory._atexit_registered is True + assert logger is not None + + # Calling again should not re-initialize + LoggerFactory._ensure_initialized() + assert LoggerFactory._initialized is True + + def test_factory_atexit_cleanup_error_handling(self): + """Test atexit cleanup error handling.""" + from unittest.mock import patch, Mock + + # Mock the clear_registry method to raise an error + with patch.object(LoggerFactory, 'clear_registry', side_effect=Exception("Test error")): + # Should not raise an exception + LoggerFactory._atexit_cleanup() + + def test_factory_ttl_cleanup_edge_cases(self): + """Test TTL cleanup with edge cases.""" + import time + + # Set very short TTL + LoggerFactory.set_memory_limits(max_loggers=100, ttl_seconds=0.1) + + # Create loggers + logger1 = LoggerFactory.get_or_create_logger(LoggerType.BASIC, name="ttl_edge1") + logger2 = LoggerFactory.get_or_create_logger(LoggerType.BASIC, name="ttl_edge2") + + # Verify they're registered + assert len(LoggerFactory.get_registered_loggers()) == 2 + + # Wait for TTL to expire + time.sleep(0.15) + + # Force cleanup by creating another logger + logger3 = LoggerFactory.get_or_create_logger(LoggerType.BASIC, name="ttl_edge3") + + # Old loggers should be cleaned up + registry = LoggerFactory.get_registered_loggers() + assert "ttl_edge1" not in registry + assert "ttl_edge2" not in registry + assert "ttl_edge3" in registry + + def test_factory_lru_eviction_comprehensive(self): + """Test comprehensive LRU eviction scenarios.""" + # Set a small limit for testing + LoggerFactory.set_memory_limits(max_loggers=2, ttl_seconds=3600) + + # Create loggers in specific order + logger1 = LoggerFactory.get_or_create_logger(LoggerType.BASIC, name="lru1") + logger2 = LoggerFactory.get_or_create_logger(LoggerType.BASIC, name="lru2") + + # Access logger1 to update its timestamp + logger1_again = LoggerFactory.get_or_create_logger(LoggerType.BASIC, name="lru1") + assert logger1 is logger1_again + + # Create third logger - should evict logger2 (oldest) + logger3 = LoggerFactory.get_or_create_logger(LoggerType.BASIC, name="lru3") + + registry = LoggerFactory.get_registered_loggers() + assert len(registry) == 2 + assert "lru1" in registry # Recently accessed + assert "lru2" not in registry # Should be evicted + assert "lru3" in registry # Newly created + + def test_factory_memory_limits_from_settings(self): + """Test memory limits initialization from settings.""" + from unittest.mock import patch, Mock + + # Mock settings + mock_settings = Mock() + mock_settings.max_loggers = 50 + mock_settings.logger_ttl_seconds = 1800 + + # Patch the import inside the function + with patch('pythonLogs.settings.get_log_settings', return_value=mock_settings): + # Reset initialization flag + LoggerFactory._initialized = False + + # This should trigger initialization from settings + LoggerFactory._ensure_initialized() + + assert LoggerFactory._max_loggers == 50 + assert LoggerFactory._logger_ttl == 1800 + assert LoggerFactory._initialized is True + + def test_factory_zero_max_loggers_handling(self): + """Test handling of zero max_loggers setting.""" + # Set max_loggers to 0 + LoggerFactory.set_memory_limits(max_loggers=0, ttl_seconds=3600) + + # Create a logger - it gets added after clearing registry + logger1 = LoggerFactory.get_or_create_logger(LoggerType.BASIC, name="zero_test1") + assert logger1 is not None + + # Registry will contain the newly created logger (added after clearing) + registry = LoggerFactory.get_registered_loggers() + assert len(registry) == 1 + assert "zero_test1" in registry + + # Creating another logger should clear the registry again and add the new one + logger2 = LoggerFactory.get_or_create_logger(LoggerType.BASIC, name="zero_test2") + assert logger2 is not None + registry = LoggerFactory.get_registered_loggers() + assert len(registry) == 1 + assert "zero_test2" in registry + assert "zero_test1" not in registry # Should be cleared + + def test_factory_negative_max_loggers_handling(self): + """Test handling of negative max_loggers setting.""" + # Set max_loggers to negative value + LoggerFactory.set_memory_limits(max_loggers=-1, ttl_seconds=3600) + + # Create a logger - it gets added after clearing registry + logger = LoggerFactory.get_or_create_logger(LoggerType.BASIC, name="negative_test") + assert logger is not None + + # Registry will contain the newly created logger (added after clearing) + registry = LoggerFactory.get_registered_loggers() + assert len(registry) == 1 + assert "negative_test" in registry + + def test_factory_large_scale_operations(self): + """Test factory with large scale operations.""" + # Set reasonable limits + LoggerFactory.set_memory_limits(max_loggers=10, ttl_seconds=3600) + + # Create many loggers + created_loggers = [] + for i in range(20): + logger = LoggerFactory.get_or_create_logger( + LoggerType.BASIC, + name=f"scale_test_{i}" + ) + created_loggers.append(logger) + + # Registry should not exceed the limit + registry = LoggerFactory.get_registered_loggers() + assert len(registry) <= 10 + + # All loggers should still be functional + for logger in created_loggers: + logger.info("Scale test message") + assert logger.name is not None diff --git a/tests/test_log_utils.py b/tests/test_log_utils.py index 0b3aab7..c702fc3 100644 --- a/tests/test_log_utils.py +++ b/tests/test_log_utils.py @@ -617,3 +617,284 @@ def test_stderr_timezone_with_special_timezone(self): elif "LOG_TIMEZONE" in os.environ: del os.environ["LOG_TIMEZONE"] log_utils._get_stderr_timezone.cache_clear() + + def test_check_filename_instance_edge_cases(self): + """Test check_filename_instance with more edge cases.""" + # Test with various invalid types + with pytest.raises(TypeError): + log_utils.check_filename_instance(123) + + with pytest.raises(TypeError): + log_utils.check_filename_instance(None) + + with pytest.raises(TypeError): + log_utils.check_filename_instance({"file": "test.log"}) + + # Valid cases should not raise + log_utils.check_filename_instance(["test.log", "test2.log"]) + log_utils.check_filename_instance(("test.log", "test2.log")) + log_utils.check_filename_instance([]) # Empty list is valid + log_utils.check_filename_instance(()) # Empty tuple is valid + + def test_lru_cache_behavior_verification(self): + """Test LRU cache behavior in timezone functions.""" + # Clear caches first + log_utils.get_timezone_function.cache_clear() + log_utils._get_timezone_offset.cache_clear() + + # Test get_timezone_function cache + initial_cache = log_utils.get_timezone_function.cache_info() + assert initial_cache.currsize == 0 + + # Call function multiple times with the same input + func1 = log_utils.get_timezone_function("UTC") + func2 = log_utils.get_timezone_function("UTC") + func3 = log_utils.get_timezone_function("localtime") + + # Should be cached + cache_info = log_utils.get_timezone_function.cache_info() + assert cache_info.currsize == 2 # Two unique calls + assert cache_info.hits >= 1 # At least one cache hit + + # Test _get_timezone_offset cache + offset1 = log_utils._get_timezone_offset("UTC") + offset2 = log_utils._get_timezone_offset("UTC") + assert offset1 == offset2 + + offset_cache = log_utils._get_timezone_offset.cache_info() + assert offset_cache.currsize >= 1 + assert offset_cache.hits >= 1 + + def test_thread_safety_directory_check(self): + """Test thread safety of directory permission checking.""" + import threading + import concurrent.futures + + errors = [] + checked_dirs = [] + + def check_directory_worker(worker_id): + """Worker function to check directory permissions concurrently.""" + try: + _temp_dir = tempfile.mkdtemp(prefix=f"thread_test_{worker_id}_") + checked_dirs.append(_temp_dir) + + # Multiple calls should be thread-safe + for _ in range(5): + log_utils.check_directory_permissions(_temp_dir) + + return _temp_dir + except Exception as e: + errors.append(f"Worker {worker_id}: {str(e)}") + return None + + try: + # Run concurrent workers + with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: + futures = [executor.submit(check_directory_worker, i) for i in range(5)] + results = [future.result() for future in concurrent.futures.as_completed(futures)] + + # Should have no errors + assert len(errors) == 0, f"Thread safety errors: {errors}" + assert len([r for r in results if r is not None]) == 5 + + finally: + # Cleanup + import shutil + for temp_dir in checked_dirs: + if temp_dir and os.path.exists(temp_dir): + shutil.rmtree(temp_dir, ignore_errors=True) + + def test_gzip_compression_levels(self): + """Test gzip compression with different scenarios.""" + with tempfile.TemporaryDirectory() as temp_dir: + # Create a larger file to test compression + test_file = os.path.join(temp_dir, "large_test.log") + test_content = "This is a test log entry.\n" * 1000 # Larger content + + with open(test_file, "w") as f: + f.write(test_content) + + # Test gzip compression + result = log_utils.gzip_file_with_sufix(test_file, "compressed") + assert result is not None + assert result.endswith("_compressed.log.gz") + assert os.path.exists(result) + assert not os.path.exists(test_file) # Original should be deleted + + # Verify compressed file can be read + import gzip + with gzip.open(result, "rt") as f: + decompressed_content = f.read() + assert decompressed_content == test_content + + def test_get_timezone_function_edge_cases(self): + """Test get_timezone_function with various timezone inputs.""" + # Test standard timezones + utc_func = log_utils.get_timezone_function("UTC") + assert utc_func.__name__ == "gmtime" + + local_func = log_utils.get_timezone_function("localtime") + assert local_func.__name__ == "localtime" + + # Test case insensitivity + utc_upper = log_utils.get_timezone_function("UTC") + utc_lower = log_utils.get_timezone_function("utc") + assert utc_upper is utc_lower # Should be cached + + # Test custom timezone + custom_func = log_utils.get_timezone_function("America/New_York") + assert custom_func.__name__ == "" + + # Test function returns proper time tuple + time_tuple = custom_func() + assert len(time_tuple) == 9 # Standard time tuple length + + def test_cache_eviction_stress_test(self): + """Test cache eviction under stress conditions.""" + original_max = log_utils._max_cached_directories + try: + # Set very small cache for testing + log_utils._max_cached_directories = 3 + + temp_dirs = [] + # Create more directories than cache can hold + for i in range(10): + temp_dir = tempfile.mkdtemp(prefix=f"eviction_test_{i}_") + temp_dirs.append(temp_dir) + + # Clear cache first to test eviction + if i == 0: + log_utils._checked_directories.clear() + + log_utils.check_directory_permissions(temp_dir) + + # Verify cache size doesn't exceed limit + with log_utils._directory_lock: + cache_size = len(log_utils._checked_directories) + assert cache_size <= 3, f"Cache size {cache_size} exceeds limit of 3" + + # Verify some directories are still in the cache + with log_utils._directory_lock: + final_cache_size = len(log_utils._checked_directories) + assert final_cache_size <= 3 + assert final_cache_size > 0 # Should have some entries + + finally: + # Cleanup + import shutil + for temp_dir in temp_dirs: + if os.path.exists(temp_dir): + shutil.rmtree(temp_dir, ignore_errors=True) + log_utils._max_cached_directories = original_max + + def test_error_handling_comprehensive(self): + """Test comprehensive error handling scenarios.""" + # Test get_level with various invalid inputs + assert log_utils.get_level(None) == logging.INFO + assert log_utils.get_level([]) == logging.INFO + assert log_utils.get_level({}) == logging.INFO + assert log_utils.get_level(object()) == logging.INFO + + # Test invalid level strings + assert log_utils.get_level("INVALID_LEVEL") == logging.INFO + assert log_utils.get_level("") == logging.INFO + assert log_utils.get_level(" ") == logging.INFO + + def test_path_operations_edge_cases(self): + """Test path operations with edge cases.""" + # Test get_log_path with various directory scenarios + with tempfile.TemporaryDirectory() as temp_dir: + # Test with nested path creation + nested_dir = os.path.join(temp_dir, "level1", "level2", "level3") + result = log_utils.get_log_path(nested_dir, "nested.log") + assert result == os.path.join(nested_dir, "nested.log") + assert os.path.exists(nested_dir) + + # Test with special characters in filename + special_file = "test-file_with.special@chars.log" + result = log_utils.get_log_path(temp_dir, special_file) + assert result == os.path.join(temp_dir, special_file) + + def test_timezone_offset_various_timezones(self): + """Test timezone offset calculation for various timezones.""" + # Clear cache first + log_utils._get_timezone_offset.cache_clear() + + # Test various timezones + timezones = [ + ("UTC", "+0000"), + ("Europe/London", None), # Variable offset due to DST + ("Asia/Tokyo", "+0900"), + ("America/Los_Angeles", None), # Variable offset due to DST + ("localtime", None) # System dependent + ] + + for tz, expected_offset in timezones: + try: + offset = log_utils._get_timezone_offset(tz) + assert isinstance(offset, str) + assert len(offset) == 5 # Format: +/-HHMM + assert offset[0] in ['+', '-'] + + if expected_offset: + assert offset == expected_offset + + except Exception as e: + # Some timezones might not be available on all systems + pytest.skip(f"Timezone {tz} not available: {e}") + + def test_formatter_and_logger_integration(self): + """Test integration between get_logger_and_formatter and other utilities.""" + name = "integration_test" + datefmt = "%Y-%m-%d %H:%M:%S" + + # Test with various timezone settings + timezones = ["UTC", "localtime", "Europe/Berlin"] + + for timezone in timezones: + try: + logger, formatter = log_utils.get_logger_and_formatter( + name, datefmt, True, timezone + ) + + # Verify logger properties + assert logger.name == name + assert isinstance(formatter, logging.Formatter) + assert formatter.datefmt == datefmt + + # Test format string generation + format_str = log_utils.get_format(True, name, timezone) + assert f"[{name}]:" in format_str + assert "[%(filename)s:%(funcName)s:%(lineno)d]:" in format_str + + # Test timezone function integration + tz_func = log_utils.get_timezone_function(timezone) + assert callable(tz_func) + + except Exception as e: + pytest.skip(f"Timezone {timezone} not available: {e}") + + def test_memory_efficiency_verification(self): + """Test memory efficiency of caching mechanisms.""" + import sys + + # Clear all caches + log_utils.get_timezone_function.cache_clear() + log_utils._get_timezone_offset.cache_clear() + log_utils._get_stderr_timezone.cache_clear() + log_utils._checked_directories.clear() + + # Test that repeated operations don't significantly increase memory + initial_refs = sys.getrefcount(log_utils.get_timezone_function) + + # Perform many operations + for i in range(100): + log_utils.get_timezone_function("UTC") + log_utils._get_timezone_offset("UTC") + log_utils.get_format(False, f"test_{i}", "UTC") + + # Reference count shouldn't grow significantly + final_refs = sys.getrefcount(log_utils.get_timezone_function) + ref_growth = final_refs - initial_refs + assert ref_growth < 50, f"Memory leak detected: reference count grew by {ref_growth}" diff --git a/tests/test_memory_optimization.py b/tests/test_memory_optimization.py index 6196ad0..11fac33 100644 --- a/tests/test_memory_optimization.py +++ b/tests/test_memory_optimization.py @@ -328,10 +328,10 @@ def test_memory_leak_prevention(self): # Force cleanup force_garbage_collection() - # Check final stats + # Check the final stats final_stats = get_memory_stats() - # Registry should not have grown excessively + # The Registry should not have grown excessively registry_growth = final_stats['registry_size'] - initial_stats['registry_size'] assert registry_growth <= 20, f"Registry grew by {registry_growth}, possible memory leak" @@ -370,7 +370,7 @@ def test_logger_cleanup_on_context_exit(self): # After context exit, handlers should be cleaned assert len(logger2.handlers) == 0 - # Overall handler count should not have increased + # The Overall handler count should not have increased final_handlers = len(logging.getLogger().handlers) assert final_handlers == initial_handlers @@ -403,6 +403,357 @@ def test_registry_memory_management_edge_cases(self): registry = LoggerFactory.get_registered_loggers() assert len(registry) >= 50 + def test_cleanup_logger_handlers_standalone(self): + """Test cleanup_logger_handlers function directly.""" + from pythonLogs.memory_utils import cleanup_logger_handlers + import logging + + # Test with None logger + cleanup_logger_handlers(None) # Should not raise error + + # Test with logger having handlers + logger = logging.getLogger("cleanup_test") + handler1 = logging.StreamHandler() + handler2 = logging.FileHandler(tempfile.mktemp(suffix=".log")) + + logger.addHandler(handler1) + logger.addHandler(handler2) + assert len(logger.handlers) == 2 + + # Cleanup should remove all handlers + cleanup_logger_handlers(logger) + assert len(logger.handlers) == 0 + + def test_cleanup_logger_handlers_error_handling(self): + """Test cleanup_logger_handlers with handler errors.""" + from pythonLogs.memory_utils import cleanup_logger_handlers + import logging + from unittest.mock import Mock + + logger = logging.getLogger("cleanup_error_test") + + # Create mock handler that raises error on close + error_handler = Mock() + error_handler.close.side_effect = OSError("Test error") + logger.addHandler(error_handler) + + # Should handle error gracefully + cleanup_logger_handlers(logger) + assert len(logger.handlers) == 0 + + def test_formatter_cache_eviction_detailed(self): + """Test detailed formatter cache eviction scenarios.""" + from pythonLogs.memory_utils import get_cached_formatter, clear_formatter_cache + + clear_formatter_cache() + + # Create formatters up to the limit + formatters = [] + for i in range(50): # Default max is 50 + formatter = get_cached_formatter(f"Format {i}: %(message)s", f"%Y-%m-%d {i}") + formatters.append(formatter) + + # Verify cache is at capacity + from pythonLogs.memory_utils import _formatter_cache, _formatter_cache_lock + with _formatter_cache_lock: + cache_size = len(_formatter_cache) + assert cache_size == 50 + + # Create one more formatter - should trigger eviction + new_formatter = get_cached_formatter("New format: %(message)s", "%Y-%m-%d new") + + # Cache should still be at limit + with _formatter_cache_lock: + final_cache_size = len(_formatter_cache) + assert final_cache_size == 50 + + def test_set_directory_cache_limit_edge_cases(self): + """Test set_directory_cache_limit with edge cases.""" + from pythonLogs.memory_utils import set_directory_cache_limit, clear_directory_cache + import pythonLogs.log_utils as log_utils + + # Setup some directories in cache + clear_directory_cache() + temp_dirs = [] + for i in range(5): + temp_dir = tempfile.mkdtemp(prefix=f"limit_test_{i}_") + temp_dirs.append(temp_dir) + log_utils.check_directory_permissions(temp_dir) + + try: + # Verify cache has entries + with log_utils._directory_lock: + initial_size = len(log_utils._checked_directories) + assert initial_size == 5 + + # Set smaller limit - should trim cache + set_directory_cache_limit(3) + + with log_utils._directory_lock: + trimmed_size = len(log_utils._checked_directories) + assert trimmed_size == 3 + + # Set zero limit - should clear cache + set_directory_cache_limit(0) + + with log_utils._directory_lock: + zero_size = len(log_utils._checked_directories) + assert zero_size == 0 + + finally: + # Cleanup + import shutil + for temp_dir in temp_dirs: + if os.path.exists(temp_dir): + shutil.rmtree(temp_dir, ignore_errors=True) + + def test_register_logger_weakref_direct(self): + """Test register_logger_weakref function directly.""" + from pythonLogs.memory_utils import register_logger_weakref, get_active_logger_count + import logging + + initial_count = get_active_logger_count() + + # Create logger and register weak reference + logger = logging.getLogger("weakref_direct_test") + register_logger_weakref(logger) + + # Count should increase + new_count = get_active_logger_count() + assert new_count >= initial_count + + # Delete logger reference + logger_name = logger.name + del logger + + # Force garbage collection + import gc + gc.collect() + + # Count should eventually decrease (may not be immediate) + final_count = get_active_logger_count() + # Note: Due to GC timing, we can't guarantee immediate cleanup + + def test_weakref_callback_behavior(self): + """Test weak reference callback behavior.""" + from pythonLogs.memory_utils import _active_loggers, _weak_ref_lock + import logging + import weakref + import gc + + initial_weak_refs = len(_active_loggers) + + # Create logger and manually create weak reference with callback + logger = logging.getLogger("callback_test") + + callback_called = [] + def test_callback(ref): + callback_called.append(ref) + + with _weak_ref_lock: + weak_ref = weakref.ref(logger, test_callback) + _active_loggers.add(weak_ref) + + # Delete logger + del logger + gc.collect() + + # Callback should have been called + assert len(callback_called) >= 0 # May or may not be called immediately + + def test_optimize_lru_cache_sizes_normal_operation(self): + """Test optimize_lru_cache_sizes normal operation.""" + from pythonLogs.memory_utils import optimize_lru_cache_sizes + from pythonLogs import log_utils + + # Get initial cache info + initial_cache = log_utils.get_timezone_function.cache_info() + + # Run optimization + optimize_lru_cache_sizes() + + # Verify caches were cleared + new_cache = log_utils.get_timezone_function.cache_info() + assert new_cache.currsize == 0 + + def test_formatter_cache_thread_safety(self): + """Test thread safety of formatter cache operations.""" + from pythonLogs.memory_utils import get_cached_formatter, clear_formatter_cache + import concurrent.futures + import threading + + clear_formatter_cache() + errors = [] + created_formatters = [] + + def formatter_worker(worker_id): + """Worker that creates formatters concurrently.""" + try: + for i in range(10): + formatter = get_cached_formatter( + f"Worker {worker_id} format {i}: %(message)s", + f"%Y-%m-%d W{worker_id}I{i}" + ) + created_formatters.append(formatter) + return worker_id + except Exception as e: + errors.append(f"Worker {worker_id}: {str(e)}") + return None + + # Run concurrent workers + with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: + futures = [executor.submit(formatter_worker, i) for i in range(5)] + results = [future.result() for future in concurrent.futures.as_completed(futures)] + + # Should have no errors + assert len(errors) == 0, f"Thread safety errors: {errors}" + assert len([r for r in results if r is not None]) == 5 + assert len(created_formatters) == 50 # 5 workers * 10 formatters each + + def test_weak_reference_cleanup_mechanism(self): + """Test weak reference cleanup mechanism without relying on GC timing.""" + from pythonLogs.memory_utils import get_active_logger_count, _active_loggers, _weak_ref_lock + import weakref + + # Test the cleanup detection logic in get_active_logger_count + with _weak_ref_lock: + initial_size = len(_active_loggers) + + # Create a dead reference manually (simulates what happens after GC) + class DummyRef: + def __call__(self): + return None # Dead reference returns None + + dead_ref = DummyRef() + _active_loggers.add(dead_ref) + + # Verify dead reference was added + assert len(_active_loggers) == initial_size + 1 + + # get_active_logger_count should detect and remove dead references + count = get_active_logger_count() + + # Dead reference should be cleaned up + with _weak_ref_lock: + final_size = len(_active_loggers) + assert final_size == initial_size # Dead reference removed + + def test_memory_stats_comprehensive(self): + """Test comprehensive memory statistics reporting.""" + from pythonLogs.memory_utils import get_memory_stats, get_cached_formatter + + # Create some items to populate stats + LoggerFactory.get_or_create_logger(LoggerType.BASIC, name="stats_test_1") + LoggerFactory.get_or_create_logger(LoggerType.BASIC, name="stats_test_2") + get_cached_formatter("Test format: %(message)s") + + stats = get_memory_stats() + + # Verify all required fields exist + required_fields = [ + 'registry_size', 'formatter_cache_size', 'directory_cache_size', + 'active_logger_count', 'max_registry_size', 'max_formatter_cache', + 'max_directory_cache' + ] + + for field in required_fields: + assert field in stats, f"Missing field: {field}" + assert isinstance(stats[field], int), f"Field {field} should be int" + assert stats[field] >= 0, f"Field {field} should be non-negative" + + # Verify relationships + assert stats['registry_size'] <= stats['max_registry_size'] + assert stats['formatter_cache_size'] <= stats['max_formatter_cache'] + assert stats['directory_cache_size'] <= stats['max_directory_cache'] + + def test_force_garbage_collection_comprehensive(self): + """Test comprehensive garbage collection functionality.""" + from pythonLogs.memory_utils import force_garbage_collection + + # Create objects that could be garbage collected + test_objects = [] + for i in range(100): + test_objects.append({ + 'data': f"test_data_{i}" * 100, + 'nested': {'value': i, 'list': list(range(10))} + }) + + # Create circular references + obj1 = {'name': 'obj1'} + obj2 = {'name': 'obj2'} + obj1['ref'] = obj2 + obj2['ref'] = obj1 + test_objects.extend([obj1, obj2]) + + # Clear references + del test_objects, obj1, obj2 + + # Force garbage collection + gc_stats = force_garbage_collection() + + # Verify stats + assert 'objects_collected' in gc_stats + assert 'garbage_count' in gc_stats + assert 'reference_cycles' in gc_stats + + assert isinstance(gc_stats['objects_collected'], int) + assert isinstance(gc_stats['garbage_count'], int) + assert gc_stats['objects_collected'] >= 0 + assert gc_stats['garbage_count'] >= 0 + + def test_memory_optimization_integration(self): + """Test integration of all memory optimization features.""" + from pythonLogs.memory_utils import ( + clear_formatter_cache, clear_directory_cache, + optimize_lru_cache_sizes, force_garbage_collection, + get_memory_stats + ) + + # Start with clean state + LoggerFactory.clear_registry() + clear_formatter_cache() + clear_directory_cache() + + # Create various objects + for i in range(10): + logger = LoggerFactory.get_or_create_logger( + LoggerType.BASIC, name=f"integration_test_{i}" + ) + logger.info("Test message") + + # Get initial stats + initial_stats = get_memory_stats() + + # Optimize caches + optimize_lru_cache_sizes() + + # Force cleanup + gc_result = force_garbage_collection() + + # Get final stats + final_stats = get_memory_stats() + + # Verify optimization worked + assert final_stats['formatter_cache_size'] == 0 # Should be cleared + assert final_stats['directory_cache_size'] == 0 # Should be cleared + assert gc_result['objects_collected'] >= 0 + + def test_memory_utils_module_constants(self): + """Test module-level constants and their behavior.""" + from pythonLogs import memory_utils + + # Verify module constants exist and have reasonable values + assert hasattr(memory_utils, '_formatter_cache') + assert hasattr(memory_utils, '_formatter_cache_lock') + assert hasattr(memory_utils, '_max_formatters') + assert hasattr(memory_utils, '_active_loggers') + assert hasattr(memory_utils, '_weak_ref_lock') + + # Verify default values + assert memory_utils._max_formatters > 0 + assert isinstance(memory_utils._formatter_cache, dict) + assert isinstance(memory_utils._active_loggers, set) + if __name__ == "__main__": pytest.main([__file__])