Skip to content

A powerful Go package for schema-driven data streaming and analytics with Redis Streams and ClickHouse backend. StreamHouse provides a flexible schema system for reliable data streaming, real-time analytics, event processing, and high-performance data ingestion.

License

Notifications You must be signed in to change notification settings

parnexcodes/streamhouse

Repository files navigation

streamhouse

Go Version License Ask DeepWiki

A powerful Go package for schema-driven data streaming and analytics with Redis Streams and ClickHouse backend. StreamHouse provides a flexible schema system for reliable data streaming, real-time analytics, event processing, and high-performance data ingestion.

Features

  • Schema-Driven Architecture: Flexible schema system with field validation and type safety
  • Dual Storage Backend: Redis Streams for reliable queuing + ClickHouse for analytics
  • Data Builder API: Builder pattern for constructing events and batches
  • Background Processing: Efficient batch processing with configurable workers
  • Auto Table Management: Automatic ClickHouse table creation from schemas
  • Consumer Groups: Horizontal scaling with Redis Streams consumer groups
  • Error Handling: Comprehensive error handling with configurable retry logic
  • Health Monitoring: Built-in health checks and connection monitoring
  • Production Ready: Connection pooling, graceful shutdown, and resource management

Installation

go get github.com/parnexcodes/streamhouse

Architecture

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   Application   │───▶│   StreamHouse   │───▶│  Redis Streams  │
│                 │    │     Client      │    │   (Queue)       │
└─────────────────┘    └─────────────────┘    └─────────────────┘
                                │                        │
                                ▼                        ▼
                       ┌─────────────────┐    ┌─────────────────┐
                       │    Schema       │    │   Background    │
                       │   Registry      │    │   Consumer      │
                       └─────────────────┘    └─────────────────┘
                                                        │
                                                        ▼
                                              ┌─────────────────┐
                                              │   ClickHouse    │
                                              │   (Analytics)   │
                                              └─────────────────┘

Quick Start

package main

import (
    "context"
    "log"
    "time"
    
    "github.com/parnexcodes/streamhouse"
)

func main() {
    // Configure StreamHouse
    config := streamhouse.Config{
        Redis: streamhouse.RedisConfig{
            Host:     "localhost",
            Port:     6379,
            Password: "",
            DB:       0,
        },
        ClickHouse: streamhouse.ClickHouseConfig{
            Host:     "localhost",
            Port:     9000,
            Database: "analytics",
            Username: "default",
            Password: "",
        },
        StreamName:    "events",
        BatchSize:     100,
        FlushInterval: 5 * time.Second,
        Workers:       3,
    }
    
    // Create client
    client, err := streamhouse.NewClient(config)
    if err != nil {
        log.Fatal(err)
    }
    defer client.Close()
    
    // Register schema
    schema := &streamhouse.DataSchema{
        Name: "user.signup",
        Fields: map[string]streamhouse.FieldConfig{
            "user_id": {Type: "string", Required: true, Index: true},
            "email":   {Type: "string", Required: true},
            "name":    {Type: "string", Required: true},
        },
    }
    
    if err := client.RegisterSchema(schema); err != nil {
        log.Fatal(err)
    }
    
    // Start background consumer
    ctx := context.Background()
    go client.StartConsumer(ctx)
    
    // Stream data with error handling
    err = client.Stream("user.signup", map[string]interface{}{
        "user_id": "user123",
        "email":   "user@example.com",
        "name":    "John Doe",
    })
    if err != nil {
        log.Printf("Failed to stream data: %v", err)
    }
    
    // Or use async streaming (fire-and-forget)
    client.StreamAsync("user.signup", map[string]interface{}{
        "user_id": "user456",
        "email":   "jane@example.com", 
        "name":    "Jane Smith",
    })
}

Use Cases

  • Audit Logging: Track user activities and system events
  • Real-time Analytics: Page views, user interactions, business metrics
  • IoT Data Streaming: Sensor readings, device telemetry
  • E-commerce Events: Orders, payments, inventory changes
  • Application Monitoring: Performance metrics, error tracking

Documentation

Contributing

  1. Fork the repository
  2. Create your feature branch (git checkout -b feature/amazing-feature)
  3. Commit your changes (git commit -m 'Add some amazing feature')
  4. Push to the branch (git push origin feature/amazing-feature)
  5. Open a Pull Request

License

This project is licensed under the MIT License - see the LICENSE file for details.

Acknowledgments


About

A powerful Go package for schema-driven data streaming and analytics with Redis Streams and ClickHouse backend. StreamHouse provides a flexible schema system for reliable data streaming, real-time analytics, event processing, and high-performance data ingestion.

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages