A pipeline for automatically scraping, enriching, scoring, and reaching out to SMB leads with positive unit economics.
Anthrasite Lead-Factory is an automated pipeline designed to validate that Anthrasite can:
- Scrape & enrich SMB leads overnight
- Deduplicate, score, generate mock-ups, and personalize email
- Send high-deliverability outreach
- Hand warm replies to 1-3 pilot agencies with positive unit economics
The pipeline operates on a nightly batch process, processing leads from three verticals (HVAC, Plumbers, Vets) across three metro areas (NY 10002, WA 98908, Carmel IN).
The pipeline consists of six sequential stages:
- Scraping (
leadfactory.pipeline.scrape): Fetches business listings from Yelp Fusion and Google Places APIs. - Enrichment (
leadfactory.pipeline.enrich): Analyzes websites for tech stack and Core Web Vitals, with tier-based additional enrichment. - Deduplication (
leadfactory.pipeline.dedupe): Uses Ollama Llama-3 8B to identify and merge duplicate leads. - Scoring (
leadfactory.pipeline.score): Applies YAML-defined rules to score leads based on their features. - Mock-up Generation (
leadfactory.pipeline.mockup): Creates website improvement mock-ups using GPT-4o (with Claude fallback). - Email Queueing (
leadfactory.pipeline.email_queue): Sends personalized outreach via SendGrid.
Additional components include:
- Cost Management (
leadfactory.cost.*): Budget gating, auditing, and cost tracking. - Utilities (
leadfactory.utils.*): Metrics, logging, and other support functions.
The project uses environment variables for configuration. These are managed through .env files:
.env.example: Template with all possible configuration options. This file is committed to the repository..env: Main configuration file for local development. Contains real or mock API keys..env.production: Production environment configuration. Used in production deployments.
-
Copy
.env.exampleto.env:cp .env.example .env
-
Edit
.envand add your API keys and configuration values. Required API keys include:- Yelp Fusion API (YELP_API_KEY or YELP_KEY)
- Google Places API (GOOGLE_API_KEY or GOOGLE_KEY)
- OpenAI API (OPENAI_API_KEY)
- SendGrid API (SENDGRID_API_KEY or SENDGRID_KEY)
- ScreenshotOne API (SCREENSHOT_ONE_API_KEY or SCREENSHOT_ONE_KEY) - for Tier 2+
- Anthropic API (ANTHROPIC_API_KEY) - optional, used as fallback
-
For production deployment, create a
.env.productionfile with production settings:cp .env.example .env.production # Edit .env.production with production values
To validate your API configuration and ensure all integrations work correctly:
python tests/validate_real_api_integration_fixed.pyThis script will check if all required API keys are available and make test API calls to verify connectivity.
Key feature flags include:
MOCKUP_ENABLED: Set totrueto enable mockup generation (Tier 2+)DEBUG_MODE: Set totruefor additional debug loggingTEST_MODE: Set totrueto use mock data in developmentUSE_MOCKS: Set totrueto skip real API calls
For more details on environment configuration, see API Integration Testing.
When using Python's dotenv library, system environment variables take precedence over values in .env files by default. If you have conflicting environment variables set in your system, they will override values in your .env file.
In scripts that need to ensure .env file values are used regardless of system environment:
from dotenv import load_dotenv
# Force .env values to override system environment variables
load_dotenv(override=True)- Utilities (
leadfactory.utils.*): Metrics, logging, and other support functions.
- Python 3.10+
- Docker (for containerized deployment)
- Supabase account (for storage and database)
- Ollama with Llama-3 8B model
- API keys for: Yelp Fusion, Google Places, ScreenshotOne, PageSpeed, SEMrush, SendGrid, OpenAI, Anthropic
-
Clone the repository:
git clone https://github.com/mirqtio/Anthrasite_LeadFactory.git cd Anthrasite_LeadFactory -
Create a virtual environment:
python -m venv venv source venv/bin/activate # On Windows: venv\Scripts\activate
-
Install the package and dependencies:
# For development installation with all tools pip install -e ".[dev]" # For metrics-only installation pip install -e ".[metrics]" # For basic installation pip install -e .
-
For development only - install pre-commit hooks:
pre-commit install
-
Set up environment variables:
cp .env.example .env # Edit .env with your API keys and configuration -
Initialize the database:
sqlite3 leadfactory.db < db/migrations/2025-05-19_init.sql -
Seed initial data:
python -c "import sqlite3; conn = sqlite3.connect('leadfactory.db'); c = conn.cursor(); c.execute('INSERT INTO zip_queue (zip, metro, done) VALUES (\"10002\", \"New York\", 0), (\"98908\", \"Yakima\", 0), (\"46032\", \"Carmel\", 0)'); conn.commit()"
The pipeline is configured through environment variables defined in the .env file. Key configuration options include:
-
Tier Level: Set
TIER=1|2|3to control the depth of enrichment:- Tier 1: Basic tech stack and Core Web Vitals
- Tier 2: Adds screenshots
- Tier 3: Adds SEMrush Site Audit
-
Mock-up Generation: Set
MOCKUP_ENABLED=true|falseto toggle mock-up generation (false for Tier-1 control group, true for Tier-2/3). -
AI Models: Configure OpenAI (GPT-4o), Anthropic (Claude), and Ollama (Llama-3 8B) settings.
-
Alert Thresholds: Set thresholds for bounce rates, spam rates, and cost per lead.
See .env.example for the complete list of configuration options.
The project now includes a modern CLI interface that replaces the legacy bin/ scripts:
# Get help for all commands
python3 -m leadfactory.cli.main --help
# Pipeline operations
python3 -m leadfactory.cli.main pipeline scrape --limit 5
python3 -m leadfactory.cli.main pipeline enrich --limit 10
python3 -m leadfactory.cli.main pipeline dedupe --limit 100
python3 -m leadfactory.cli.main pipeline score --limit 50
python3 -m leadfactory.cli.main pipeline mockup --id 123
python3 -m leadfactory.cli.main pipeline email --limit 5
# Administrative operations
python3 -m leadfactory.cli.main admin setup-db
python3 -m leadfactory.cli.main admin migrate
python3 -m leadfactory.cli.main admin backup
# Development operations
python3 -m leadfactory.cli.main dev test
python3 -m leadfactory.cli.main dev lint
python3 -m leadfactory.cli.main dev format
# Global options
python3 -m leadfactory.cli.main --verbose pipeline scrape --limit 5
python3 -m leadfactory.cli.main --dry-run pipeline email --limit 10For detailed migration information, see CLI Migration Guide.
The legacy bin/ scripts are still available but deprecated:
# Scrape leads (limit to 5 for testing)
python bin/01_scrape.py --limit 5
# Enrich leads
python bin/02_enrich.py
# Deduplicate leads
python bin/03_dedupe.py
# Score leads
python bin/04_score.py
# Generate mock-ups
python bin/05_mockup.py
# Queue emails
python bin/06_email_queue.pyTo run the complete pipeline as a nightly batch:
bash bin/run_nightly.shThis will execute all pipeline stages in sequence, aborting on the first non-zero exit code.
The nightly batch script supports several command-line options:
# Run in debug mode with verbose output
bin/run_nightly.sh --debug
# Skip specific pipeline stages
bin/run_nightly.sh --skip-stage=3 --skip-stage=5
# Limit the number of leads processed
bin/run_nightly.sh --limit=10
# Run in dry-run mode (no external API calls or emails)
bin/run_nightly.sh --dry-run
# Show help message
bin/run_nightly.sh --helpTo schedule the pipeline to run automatically every night, use the provided setup script:
# Set up with default settings (runs at 1:00 AM)
bin/setup_cron.sh
# Set a custom time (e.g., 2:30 AM)
bin/setup_cron.sh --time=02:30
# Set up for a different user
bin/setup_cron.sh --user=anthrasite --time=03:15This will configure:
- A cron job to run the pipeline at the specified time
- Log rotation to manage log files
- Proper error handling and notification
Logs for the cron job will be stored in logs/cron_nightly.log.
The pipeline exports Prometheus metrics on port 9090 (configurable via PROMETHEUS_PORT). Key metrics include:
leads_scraped_total: Counter of total leads scrapedbatch_runtime_seconds: Gauge of batch processing timeleadfactory_cpu_hours_per_lead: Gauge of CPU usage per leadpipeline_failure_rate: Counter tracking pipeline failures
These metrics can be visualized in Grafana Cloud with the provided alert rules.
The pipeline includes comprehensive large-scale validation tests that verify its ability to handle high volumes of leads efficiently. These tests are automatically run:
- Monthly (first Sunday of each month at 2am UTC)
- After significant changes to core pipeline components
- On-demand via GitHub Actions UI
The validation suite includes:
- Scale Testing: Processes up to 10,000 leads through the complete pipeline
- Performance Metrics: Tracks throughput, success rates, and processing times
- Failure Simulation: Validates graceful handling of various error conditions
- Bottleneck Detection: Identifies performance bottlenecks in the pipeline
Performance reports and visualizations are automatically generated and published as GitHub Actions artifacts. The system enforces the following performance requirements:
- Minimum throughput: 100 leads/minute
- Maximum error rate: 1%
- Maximum runtime: 180 minutes for 10,000 leads
Run the large-scale validation tests locally with:
# Run the complete validation suite
python scripts/run_large_scale_tests.py
# Run a smaller test for quick verification
python scripts/run_large_scale_tests.py --lead-count=100 --skip-10k- Primary data is stored in Supabase Postgres
- WAL (Write-Ahead Logging) is enabled for data integrity
- Nightly backups to S3
- RSYNC mirror to a backup VPS for SPOF (Single Point of Failure) protection
The project includes a comprehensive SPOF fallback mechanism that ensures business continuity in case of primary instance failure:
The bin/rsync_backup.sh script performs nightly data mirroring to a backup VPS:
# Run with default settings
bin/rsync_backup.sh
# Perform a dry run without making changes
bin/rsync_backup.sh --dry-run
# Use a custom configuration file
bin/rsync_backup.sh --config=/path/to/config.ymlConfiguration is stored in etc/backup_config.yml and includes:
- Remote server details
- Directories and files to backup
- Exclusion patterns
- Retention policies
- Notification settings
The bin/health_check.sh script monitors the primary instance and automatically boots the backup when needed:
# Run health check with default settings
bin/health_check.sh
# Only perform health check without auto-boot
bin/health_check.sh --check-only
# Force boot on backup VPS without health checks
bin/health_check.sh --force-bootThe health check script:
- Monitors the primary instance health endpoint
- Tracks failure count with configurable threshold (set to 2 consecutive failures per Phase 0 v1.3 spec)
- Automatically boots the Docker stack on the backup VPS when threshold is reached
- Sends notifications via email and Slack
Configuration is stored in etc/health_check_config.yml.
-
Copy the sample configuration files:
cp etc/backup_config.yml.sample etc/backup_config.yml cp etc/health_check_config.yml.sample etc/health_check_config.yml
-
Edit the configuration files with your specific settings
-
Set up SSH keys for passwordless authentication between primary and backup servers
-
Schedule the scripts using cron:
# Add to crontab # Nightly backup at 2:00 AM 0 2 * * * /path/to/bin/rsync_backup.sh >> /path/to/logs/rsync_backup_cron.log 2>&1 # Health check every 5 minutes */5 * * * * /path/to/bin/health_check.sh >> /path/to/logs/health_check_cron.log 2>&1
The project includes BDD (Behavior-Driven Development) tests for each pipeline stage:
# Run all tests
pytest tests/
# Run tests for a specific stage
pytest tests/test_scraper.pyThe project uses pre-commit hooks to enforce code quality standards. These hooks run automatically before each commit to ensure code meets quality and security requirements.
Key pre-commit hooks include:
- Ruff: Fast Python linting
- Black: Code formatting
- Bandit: Security vulnerability scanning
- Pre-commit-hooks: File checks (trailing whitespace, YAML validation, etc.)
For setup instructions and usage guide, see Pre-commit Workflow Guide.
All feature development follows the standardized workflow:
- Development Phase: Implement features with error handling and logging
- Testing Phase: Run unit tests and BDD tests
- Quality Assurance Phase: Run static analysis tools (ruff, bandit) and code formatting (black)
- Pre-Commit Phase: Run pre-commit hooks locally and fix any issues
- Commit Phase: Create feature branch with descriptive name and commit
- CI Verification Phase: Verify CI pipeline passes before merging
The project uses GitHub Actions for comprehensive continuous integration. All CI checks are now configured in strict blocking mode to ensure code quality and reliability.
The project has two main CI workflow configurations:
- API Integration Tests (
api-integration-tests.yml): API-specific integration testing with both mock and real APIs - Large-Scale Validation (
large-scale-validation.yml): Performance testing at scale (10,000 leads)
All pull requests must pass the following quality gates before merging:
| Gate | Tool | Threshold | Configuration |
|---|---|---|---|
| Formatting | Black | 0 errors | --check mode |
| Linting | Ruff | 0 errors | Standard rules |
| Linting | Flake8 | 0 errors | See setup.cfg |
| Type checking | MyPy | 0 errors | See mypy.ini |
| Security | Bandit | 0 high/medium issues | -ll flag |
| Gate | Tool | Threshold | Configuration |
|---|---|---|---|
| Unit tests | Pytest | 100% pass | All test modules |
| Test coverage | Coverage | ≥80% coverage | --cov-fail-under=80 |
| Integration tests | Pytest | 100% pass | Mock APIs by default |
| Gate | Metric | Threshold | Validation |
|---|---|---|---|
| Throughput | Leads/minute | ≥100 leads/min | Large-scale test |
| Error rate | Failed leads % | ≤1% | Large-scale test |
| Runtime | Total minutes | ≤180 minutes | Large-scale test |
The CI workflows run automatically on:
- Push to
mainanddevelopbranches - Pull requests to
mainanddevelopbranches - Manual trigger via GitHub Actions interface
- Scheduled runs (weekly for API tests, monthly for large-scale validation)
The CI process generates several important artifacts:
- Test Coverage Reports: Uploaded to Codecov for tracking coverage trends
- Performance Metrics: Generated during large-scale validation
- API Usage Reports: Tracking API costs and usage statistics
# Trigger unified CI workflow manually
git push origin my-feature-branch# Manual trigger with GitHub CLI
gh workflow run api-integration-tests.yml --ref my-branch --field use_real_apis=false# Manual trigger with GitHub CLI
gh workflow run large-scale-validation.yml --ref my-branch --field lead_count=10000- API Tests:
.github/workflows/api-integration-tests.yml - Large-Scale:
.github/workflows/large-scale-validation.yml - Coverage:
.codecov.yml
CI workflows require special handling of environment variables to ensure tests run correctly:
The CI system uses a two-tier approach to API keys:
- Mock API Keys: Used by default in pull request builds and non-scheduled runs
- Real API Keys: Used in scheduled runs and when explicitly enabled via workflow inputs
# Example from API Integration Tests workflow
- name: Setup test environment and mock API keys
run: |
# Create .env file with mock keys for testing
cp .env.example .env
echo "LEADFACTORY_USE_MOCKS=1" >> .env
- name: Set real API keys
if: ${{ github.event.inputs.use_real_apis == 'true' || github.event_name == 'schedule' }}
run: |
echo "Setting up real API keys where available"
echo "LEADFACTORY_USE_MOCKS=0" >> .envSecure API keys are stored in GitHub Secrets and accessed in the workflows. When setting up these secrets, use the exact environment variable names expected by the application:
YELP_API_KEYGOOGLE_API_KEYOPENAI_API_KEYSENDGRID_API_KEYSCREENSHOT_ONE_KEYANTHROPIC_API_KEYSLACK_WEBHOOK_URL
Important: The CI system uses
load_dotenv(override=True)to ensure environment variables from.envfiles take precedence over system environment variables.
You may encounter "Context access might be invalid" warnings in GitHub Actions workflows. These occur when using expressions like ${{ secrets.SOME_SECRET }} in places where GitHub's context is restricted for security reasons.
Solution: Use environment variables as intermediaries:
# Instead of this (may cause warnings):
- run: echo "API_KEY=${{ secrets.API_KEY }}" >> .env
# Use this approach:
- name: Set up API keys
env:
API_KEY: ${{ secrets.API_KEY }}
run: echo "API_KEY=$API_KEY" >> .env- 403 Forbidden errors: Check API key permissions and verify correct scopes are enabled
- Rate limiting issues: Implement retry logic or reduce parallel testing
- Timeout errors: Adjust timeout settings in workflow configuration
- Update API Keys: Refresh expired or invalid API keys in GitHub Secrets
- Check CI Logs: Review detailed error messages in the CI logs
- Local Validation: Run the
validate_real_api_integration_fixed.pyscript locally - Mock API Testing: Use
LEADFACTORY_USE_MOCKS=1to bypass real APIs for faster testing
The CI process generates code coverage reports that help identify untested code paths.
Coverage reports provide metrics in several categories:
- Line Coverage: Percentage of code lines executed during tests
- Branch Coverage: Percentage of code branches (if/else) executed
- Function Coverage: Percentage of functions called during tests
Coverage reports are uploaded to Codecov for long-term tracking and visualization:
- Coverage Trends: Track how coverage changes over time
- Coverage Gaps: Identify files and functions with low coverage
- PR Coverage: See how pull requests impact overall coverage
The project maintains the following coverage requirements:
| Component | Minimum Coverage |
|---|---|
| Core pipeline | 85% |
| Utilities | 80% |
| Scripts | 70% |
| Overall | 80% |
To improve coverage in areas identified as lacking:
- Add targeted unit tests for specific functions
- Create integration tests for complex code paths
- Use parameterized tests to cover multiple scenarios
- Add explicit tests for error handling paths
The pipeline includes cost tracking for all API calls and operations. A budget audit is triggered after the first 1,000-lead batch to validate unit economics before scaling to 10,000 leads.
Proprietary - Anthrasite, Inc. 2025