Skip to content

psteinroe/postgres-stream

Repository files navigation

Postgres Stream

Reliably stream Postgres table changes to external systems (queues, webhooks, etc.) with automatic failover and zero event loss.

How Does It Work?

  1. User manages subscriptions directly in the database (pgstream.subscriptions table)
  2. Subscription changes automatically create database triggers on the target tables
  3. Application events fire the managed triggers, which insert into a partitioned events table
  4. Postgres Stream streams events via logical replication and delivers them to your sink
flowchart LR
    subgraph Postgres
        A[subscriptions<br/>table] -->|creates<br/>triggers| B[trigger<br/>on users]
        B -->|insert<br/>into| C[events<br/>partitioned<br/>20250112..]
        C -->|logical<br/>replication| D[replication<br/>slot]
    end
    D -->|streams| E[Postgres<br/>Stream]
    E -->|delivers| F[sink<br/>queue/http]

    style Postgres fill:#dbeafe,stroke:#2563eb,stroke-width:3px
    style A fill:#f3e8ff,stroke:#9333ea,stroke-width:2px,color:#581c87
    style B fill:#fed7aa,stroke:#ea580c,stroke-width:2px,color:#7c2d12
    style C fill:#bfdbfe,stroke:#2563eb,stroke-width:2px,color:#1e40af
    style D fill:#99f6e4,stroke:#14b8a6,stroke-width:2px,color:#115e59
    style E fill:#fecdd3,stroke:#e11d48,stroke-width:3px,color:#881337
    style F fill:#bbf7d0,stroke:#16a34a,stroke-width:3px,color:#14532d
Loading
Example of generated trigger function (simplified for readability)
-- Auto-generated when you insert into subscriptions table
create or replace function pgstream._publish_after_insert_on_users()
returns trigger as $$
declare
  v_jsonb_output jsonb := '[]'::jsonb;
  v_base_payload jsonb := jsonb_build_object(
    'tg_op', tg_op,
    'tg_table_name', tg_table_name,
    'tg_table_schema', tg_table_schema,
    'timestamp', (extract(epoch from now()) * 1000)::bigint
  );
begin
  -- Check subscription "user-signup" condition
  if new.email_verified = true then
    v_jsonb_output := v_jsonb_output || (jsonb_build_object(
      'tg_name', 'user-signup',
      'new', jsonb_build_object('id', new.id, 'email', new.email),
      'old', null
    ) || v_base_payload || '{}'::jsonb); -- payload_extensions would go here
  end if;

  -- Write to events table if any subscriptions matched
  if jsonb_array_length(v_jsonb_output) > 0 then
    insert into pgstream.events (payload, stream_id)
    select elem, 1
    from jsonb_array_elements(v_jsonb_output) as t(elem);
  end if;

  return new;
end;
$$ language plpgsql
set search_path = ''
security definer;

-- Trigger that calls the function
create constraint trigger pgstream_insert
after insert on public.users
deferrable initially deferred
for each row
when ((new.email_verified = true))
execute procedure pgstream._publish_after_insert_on_users();

Note: The actual generated code handles multiple subscriptions per table, merges when clauses with OR logic, and includes all payload_extensions. This example shows the structure for a single subscription.

Benefits

  • Single binary - No complex infrastructure or high-availability destinations required
  • Postgres-native durability - Events are stored in the database, WAL can be released immediately
  • Automatic failover - Queries the events table to replay missed events after recovery
  • Automatic recovery - Postgres Stream automatically recovers from an invalidated replication slot
  • No data loss - As long as downtime is less than partition retention (7 days by default)

Drawbacks

  • Small overhead - Additional INSERT into events table on every subscribed operation
  • Partition management - Need to monitor partition growth if event volume is very high
  • Not for dynamic subscriptions - Each subscription change recreates database triggers (expensive operation)

Using It

1. Requirements

  • Postgres 15+ with wal_level=logical
  • User with replication privilege

2. Configuration

Create config.yaml:

stream:
  id: 1  # Unique stream ID
  pg_connection:
    host: localhost
    port: 5432
    name: mydb
    username: postgres
    password: postgres
    tls:
      enabled: false
  batch:
    max_size: 1000      # Events per batch
    max_fill_secs: 5    # Max time to fill batch

sink:
  type: memory  # Built-in test sink

3. Run the Binary

# Start Postgres Stream
postgres-stream

# Or with Docker
docker run -v $(pwd)/config.yaml:/config.yaml postgres-stream

4. Create Subscriptions

Subscriptions define which events to capture. Simply insert into the subscriptions table:

-- Subscribe to verified user signups
insert into pgstream.subscriptions (
  key,
  stream_id,
  operation,
  schema_name,
  table_name,
  when_clause,
  column_names,
  payload_extensions
) values (
  'user-signup',                          -- Unique identifier
  1,                                      -- Stream ID from config
  'INSERT',                               -- Operation: INSERT, UPDATE, or DELETE
  'public',                               -- Schema name
  'users',                                -- Table name
  'new.email_verified = true',            -- Optional filter (SQL expression)
  array['id', 'email', 'created_at'],     -- Columns to include in payload
  '[]'::jsonb                             -- Payload extensions (see below)
);

This automatically creates the trigger on public.users. Now when you insert a verified user, the event is captured.

To reduce no-op trigger recreation (important for production), use merge to only update when values actually change:

Click to show merge-based helper function
-- Helper function that only recreates triggers when subscription actually changes
create or replace function set_subscriptions(
  p_stream_id bigint,
  p_subscriptions pgstream.subscriptions[]
)
returns void
language plpgsql
security definer
set search_path to ''
as $$
begin
  create temporary table temp_subscriptions as
  select * from unnest(p_subscriptions);

  -- Only update if values actually changed (avoids trigger recreation)
  merge into pgstream.subscriptions as target
  using temp_subscriptions as source
  on (target.key = source.key and target.stream_id = p_stream_id)
  when matched and (
    target.operation is distinct from source.operation or
    target.schema_name is distinct from source.schema_name or
    target.table_name is distinct from source.table_name or
    target.when_clause is distinct from source.when_clause or
    target.column_names is distinct from source.column_names or
    target.metadata is distinct from source.metadata or
    target.payload_extensions is distinct from source.payload_extensions
  ) then update set
    operation = source.operation,
    schema_name = source.schema_name,
    table_name = source.table_name,
    when_clause = source.when_clause,
    column_names = source.column_names,
    metadata = source.metadata,
    payload_extensions = source.payload_extensions
  when not matched then insert (
    key, stream_id, operation, schema_name, table_name,
    when_clause, column_names, metadata, payload_extensions
  ) values (
    source.key, p_stream_id, source.operation, source.schema_name,
    source.table_name, source.when_clause, source.column_names,
    source.metadata, source.payload_extensions
  );

  -- Remove subscriptions not in input
  delete from pgstream.subscriptions
  where stream_id = p_stream_id
    and not exists (
      select 1 from temp_subscriptions
      where pgstream.subscriptions.key = temp_subscriptions.key
    );

  drop table temp_subscriptions;
end;
$$;

5. Testing Your Subscription

Now when a user signs up with verified email:

insert into users (email, email_verified) values ('user@example.com', true);

The sink receives:

{
  "tg_name": "user-signup",
  "tg_op": "INSERT",
  "tg_table_name": "users",
  "tg_table_schema": "public",
  "timestamp": 1703001234567,
  "new": {
    "id": 123,
    "email": "user@example.com",
    "created_at": "2024-12-12T10:30:00Z"
  },
  "old": null
}

6. Add Metadata with Payload Extensions

Add dynamic routing information or static metadata using payload_extensions:

insert into pgstream.subscriptions (
  key, stream_id, operation, schema_name, table_name,
  when_clause, column_names, payload_extensions
) values (
  'order-notification',
  1,
  'INSERT',
  'public',
  'orders',
  null,
  array['id', 'user_id', 'total'],
  '[
    {"json_path": "group_id", "expression": "new.user_id::text"},
    {"json_path": "queue_name", "expression": "''orders-high-priority''"}
  ]'::jsonb
);

Result:

{
  "tg_name": "order-notification",
  "new": {"id": 456, "user_id": 123, "total": 99.99},
  "group_id": "123",
  "queue_name": "orders-high-priority"
}

Common use cases:

  • Routing: Add group_id for partitioning in Kafka/queues
  • Static metadata: Add queue_name, priority, tenant_id
  • Dynamic values: Add computed fields like new.created_at::date
  • Context info: Add auth.user_id(), current_setting('app.tenant_id')

How Failover Works

When the sink fails (e.g., queue goes down), Postgres Stream:

  1. Saves the failed event's ID as a checkpoint
  2. Continues consuming the replication stream (events still written to table)
  3. Periodically retries delivering the checkpoint event
  4. On success, uses COPY to stream all events between checkpoint and current position
  5. Replays missed events in order, then returns to normal streaming

Guarantees:

  • No events lost (as long as downtime < partition retention)
  • Events delivered at least once
  • Order preserved within partitions
  • No WAL retention required (events stored in table)

How Slot Recovery Works

When the replication slot is invalidated (WAL exceeded max_slot_wal_keep_size), Postgres Stream:

  1. Detects the "can no longer get changes from replication slot" error
  2. Queries confirmed_flush_lsn from the invalidated slot (Postgres preserves this)
  3. Finds the first event with lsn > confirmed_flush_lsn
  4. Sets a failover checkpoint at that event
  5. Drops the invalidated slot and creates a new one
  6. Triggers failover replay from checkpoint to current position

Guarantees:

  • Automatic recovery without operator intervention
  • No events lost (events stored in table, not dependent on WAL)
  • Works as long as events are within partition retention

Automated Partition Management

Postgres Stream automatically manages daily partitions in the background:

Retention policy:

  • Creates partitions 7 days in advance
  • Drops partitions older than 7 days
  • Runs on startup and then daily

Implementing Custom Sinks

Create custom sinks to deliver events to any destination (HTTP, Kafka, RabbitMQ, etc.).

Important: Sink dependencies and implementations should be behind feature flags to avoid bloating the binary. Users should only compile the sinks they actually use.

1. Add Dependencies with Feature Flags

Update Cargo.toml:

[dependencies]
# Existing dependencies...

# Optional sink dependencies
reqwest = { version = "0.11", features = ["json"], optional = true }
rdkafka = { version = "0.36", optional = true }

[features]
# Sink feature flags
sink-http = ["dep:reqwest"]
sink-kafka = ["dep:rdkafka"]

2. Create the Sink Implementation

Create a new file src/sink/http.rs:

use etl::error::EtlResult;
use reqwest::Client;
use serde::Deserialize;
use tracing::info;

use crate::sink::Sink;
use crate::types::TriggeredEvent;

#[derive(Clone, Debug, Deserialize)]
pub struct HttpSinkConfig {
    pub url: String,
    #[serde(default)]
    pub headers: std::collections::HashMap<String, String>,
}

#[derive(Clone)]
pub struct HttpSink {
    config: HttpSinkConfig,
    client: Client,
}

impl HttpSink {
    pub fn new(config: HttpSinkConfig) -> Self {
        let client = Client::new();
        Self { config, client }
    }
}

impl Sink for HttpSink {
    fn name() -> &'static str {
        "http"
    }

    async fn publish_events(&self, events: Vec<TriggeredEvent>) -> EtlResult<()> {
        for event in events {
            let mut request = self.client
                .post(&self.config.url)
                .json(&event.payload);

            // Add custom headers
            for (key, value) in &self.config.headers {
                request = request.header(key, value);
            }

            let response = request.send().await?;

            if !response.status().is_success() {
                return Err(etl::etl_error!(
                    etl::error::ErrorKind::Network,
                    "HTTP request failed: {}",
                    response.status()
                ));
            }

            info!("published event {} to {}", event.id.id, self.config.url);
        }

        Ok(())
    }
}

3. Register in src/sink/mod.rs

mod base;
pub mod memory;

#[cfg(feature = "sink-http")]
pub mod http;

pub use base::Sink;

4. Add to SinkConfig Enum

Update src/config/sink.rs:

use serde::Deserialize;

#[cfg(feature = "sink-http")]
use crate::sink::http::HttpSinkConfig;

#[derive(Clone, Debug, Deserialize)]
#[serde(tag = "type", rename_all = "lowercase")]
pub enum SinkConfig {
    Memory,

    #[cfg(feature = "sink-http")]
    Http(HttpSinkConfig),
}

5. Wire Up in Core

Update src/core.rs:

#[cfg(feature = "sink-http")]
use crate::sink::http::HttpSink;

// In start_pipeline_with_config():
let sink = match &config.sink {
    SinkConfig::Memory => MemorySink::new(),

    #[cfg(feature = "sink-http")]
    SinkConfig::Http(cfg) => HttpSink::new(cfg.clone()),
};

6. Build and Use

Build with the HTTP sink feature:

cargo build --release --features sink-http

Use in config.yaml:

sink:
  type: http
  url: https://webhook.example.com/events
  headers:
    Authorization: Bearer token123
    X-Custom-Header: value

About

Reliable event streaming from Postgres

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages