stream

package module
v0.0.0-...-6ed9861 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 25, 2025 License: MIT Imports: 16 Imported by: 0

README

go-stream

Go Reference Go Report Card

go-stream is a Go library that provides an embedded NATS server with type-safe, opinionated messaging patterns. It simplifies pub/sub, request/reply, and durable messaging by embedding NATS directly into your application—no external infrastructure required.

🎯 Why go-stream?

  • Zero Infrastructure: Embedded NATS server starts automatically
  • Type Safety: Generic-based APIs with compile-time safety
  • Opinionated: Sensible defaults that "just work"
  • Production Ready: Built on battle-tested NATS technology
  • Flexible: Supports both Core NATS and JetStream modes

⚡ Quick Start

go get github.com/a2y-d5l/go-stream
Basic Pub/Sub
package main

import (
    "context"
    "fmt"
    
    stream "github.com/a2y-d5l/go-stream"
)

type User struct {
    ID   string `json:"id"`
    Name string `json:"name"`
}

func main() {
    // Create stream with embedded NATS server
    s, err := stream.New(context.Background())
    if err != nil {
        panic(err)
    }
    defer s.Close(context.Background())

    // Subscribe with type-safe JSON handling
    sub, err := stream.SubscribeJSON(s, "users", 
        func(ctx context.Context, user User) error {
            fmt.Printf("Received: %+v\n", user)
            return nil
        })
    if err != nil {
        panic(err)
    }
    defer sub.Stop()

    // Publish with automatic JSON encoding
    user := User{ID: "123", Name: "Alice"}
    err = s.PublishJSON(context.Background(), "users", user)
    if err != nil {
        panic(err)
    }
}

🏗️ Architecture

Your Application
├── Embedded NATS Server (automatic)
├── Client Connection (managed)
└── go-stream API (type-safe)

No external dependencies. No configuration files. No separate processes.

📚 Core Features

🔄 Messaging Patterns
Pattern Description Use Case
Pub/Sub Broadcast messages to multiple subscribers Event notifications, real-time updates
Request/Reply Synchronous request-response with load balancing Service communication, RPC calls
Queue Groups Distribute work across multiple workers Load balancing, task processing
💾 Storage Modes
Mode Persistence Performance Use Case
Core In-memory only Ultra-fast Real-time events, temporary data
JetStream Disk-persistent High-throughput Durable messaging, event sourcing
🛡️ Built-in Features
  • Automatic Reconnection - Resilient to network issues
  • Graceful Shutdown - Clean resource cleanup
  • Error Handling - Comprehensive error propagation
  • Concurrency Control - Configurable worker pools
  • Backpressure - Multiple overflow policies
  • Type Safety - Generic APIs prevent runtime errors

🚀 Usage Patterns

Simple Pub/Sub
// Publisher
err := s.PublishJSON(ctx, "events", MyEvent{Type: "user_created"})

// Subscriber  
sub, err := stream.SubscribeJSON(s, "events", handleEvent)
Request/Reply with Timeout
// Create a requester
requester := stream.NewJSONRequester[Request, Response](s, "add")

// Send request with timeout
response, err := requester.Request(ctx, Request{A: 5, B: 3}, 5*time.Second)
Durable JetStream Messaging
// Enable JetStream mode
s, err := stream.New(ctx, 
    stream.WithDefaultTopicMode(stream.TopicModeJetStream),
    stream.WithStoreDir("./data"))

// Durable consumer survives restarts
sub, err := stream.SubscribeJSON(s, "orders", processOrder,
    stream.WithDurable("order-processor"),
    stream.WithAckPolicy(stream.AckManual))
Load Balancing with Queue Groups
// Multiple workers share the load
for i := range 3 {
    stream.SubscribeJSON(s, "tasks", processTask,
        stream.WithQueueGroupName("workers"),
        stream.WithConcurrency(2))
}
Examples

See examples/ for runnable examples:

Example Description Key Concepts
basic/ Simple pub/sub with JSON messages Core concepts, error handling
requestreply/ Calculator service with load balancing Request/reply, queue groups, timeouts
jetstream/ Order processing with persistence Durable consumers, message replay
# Run any example
cd examples/basic && go run main.go
cd examples/requestreply && go run main.go  
cd examples/jetstream && go run main.go

⚙️ Configuration

Basic Configuration
s, err := stream.New(ctx,
    stream.WithHost("127.0.0.1"),      // Server host
    stream.WithRandomPort(),           // Dynamic port
    stream.WithDefaultTopicMode(stream.TopicModeCore), // Core or JetStream
)
Production Configuration
s, err := stream.New(ctx,
    stream.WithStoreDir("/data/nats"),         // Persistent storage
    stream.WithMaxPayload(64*1024*1024),       // 64MB max message
    stream.WithConnectTimeout(10*time.Second), // Connection timeout
    stream.WithDrainTimeout(30*time.Second),   // Graceful shutdown timeout
)
TLS Configuration
tlsConfig := &tls.Config{
    Certificates: []tls.Certificate{cert},
    ClientAuth:   tls.RequireAndVerifyClientCert,
}

s, err := stream.New(ctx, stream.WithTLS(tlsConfig))

🔧 Advanced Features

Custom Codecs
// Default is JSON, but you can use custom codecs
s, err := stream.New(ctx, stream.WithDefaultCodec(MyCodec))
Health Checks
// Lightweight health check
if err := s.Healthy(ctx); err != nil {
    log.Printf("Stream unhealthy: %v", err)
}

// Deep health check with server status
status, err := s.DeepHealthCheck(ctx)
Subscription Options
sub, err := stream.SubscribeJSON(s, "topic", handler,
    stream.WithConcurrency(4),                    // 4 workers
    stream.WithBufferSize(1000),                  // Message buffer
    stream.WithBackpressure(stream.BackpressureDropOldest), // Overflow policy
    stream.WithMaxDeliver(3),                     // Retry limit
    stream.WithDurable("my-consumer"),            // Durable name
)

🏭 Production Considerations

Performance
  • Core Mode: ~1M+ msgs/sec for in-memory messaging
  • JetStream: ~100K+ msgs/sec with persistence
  • Memory: Minimal overhead with embedded server
  • Latency: Sub-millisecond for local messaging
Monitoring
// Built-in observability hooks
s, err := stream.New(ctx,
    stream.WithLogger(slog.New(handler)), // Custom logging
    // Add metrics/tracing via middleware
)
Deployment
  • Single Binary: Everything embedded, no external dependencies
  • Docker: Works seamlessly in containers
  • Kubernetes: Scales horizontally with persistent volumes
  • Cloud: Deploy anywhere Go runs

🔍 Troubleshooting

Common Issues

Port conflicts?

stream.WithRandomPort() // Uses dynamic port allocation

Messages not received?

// Ensure subscription is established before publishing
sub, err := stream.SubscribeJSON(s, "topic", handler)
time.Sleep(100 * time.Millisecond) // Allow subscription setup

JetStream storage issues?

stream.WithStoreDir("./data") // Ensure directory is writable

For more troubleshooting, see the examples README.

🤝 Contributing

Contributions welcome! Please see CONTRIBUTING.md for guidelines.

📄 License

This project is licensed under the MIT License - see the LICENSE file for details.

🙏 Acknowledgments

Built on the amazing NATS messaging system. Thanks to the NATS team for creating such robust infrastructure.

Documentation

Overview

Package stream provides an embedded NATS server with a small, ergonomic façade.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrCodecUnsupported indicates the codec is not supported.
	ErrCodecUnsupported = errors.New("unsupported codec")
	// ErrTimeout indicates the operation timed out.
	ErrTimeout = errors.New("operation timeout")
)
View Source
var (
	// ErrStreamClosed indicates the stream was already closed (or never started).
	ErrStreamClosed = errors.New("stream is closed")
	// ErrStreamUnhealthy indicates server or client is not ready/connected.
	ErrStreamUnhealthy = errors.New("stream is not healthy")
)
View Source
var (
	// ErrInvalidTopic indicates the topic name is invalid.
	ErrInvalidTopic = errors.New("invalid topic name")
	// ErrTopicExists indicates the topic already exists.
	ErrTopicExists = errors.New("topic already exists")
	// ErrUnknownTopic indicates the topic does not exist.
	ErrUnknownTopic = errors.New("unknown topic")
)

Functions

func RequestJSON

func RequestJSON[T any](
	s *Stream,
	ctx context.Context,
	topic Topic,
	req any,
	timeout time.Duration,
	opts ...PublishOption,
) (T, error)

RequestJSON sends a typed request and decodes a typed response using JSON. (When the Codec system is fully wired, this can delegate to the resolved codec.)

Types

type AckPolicy

type AckPolicy int
const (
	AckAuto AckPolicy = iota // (JS only; ignored in Core mode)
	AckManual
)

type BackpressurePolicy

type BackpressurePolicy int

Backpressure and ack semantics.

const (
	BackpressureBlock BackpressurePolicy = iota // default
	BackpressureDropNewest
	BackpressureDropOldest
)

type Codec

type Codec interface {
	Encode(v any) ([]byte, error)
	Decode(data []byte, v any) error
	ContentType() string
}

Codec is a pluggable encoder/decoder (default JSON).

var (
	JSONCodec Codec = jsonCodec{}
)

type DiscardPolicy

type DiscardPolicy int
const (
	DiscardOld DiscardPolicy = iota
	DiscardNew
)

type JSONPublisher

type JSONPublisher[T any] struct {
	// contains filtered or unexported fields
}

JSONPublisher wraps PublishJSON with captured stream/topic.

func NewJSONPublisher

func NewJSONPublisher[T any](s *Stream, topic Topic) JSONPublisher[T]

func (JSONPublisher[T]) Publish

func (p JSONPublisher[T]) Publish(ctx context.Context, v T, opts ...PublishOption) error

type JSONRequester

type JSONRequester[Req, Resp any] struct {
	// contains filtered or unexported fields
}

JSONRequester wraps RequestJSON with captured stream/topic.

func NewJSONRequester

func NewJSONRequester[Req, Resp any](s *Stream, topic Topic) JSONRequester[Req, Resp]

func (JSONRequester[Req, Resp]) Request

func (r JSONRequester[Req, Resp]) Request(ctx context.Context, req Req, timeout time.Duration, opts ...PublishOption) (Resp, error)

type JSONSubscriber

type JSONSubscriber[T any] struct {
	// contains filtered or unexported fields
}

func NewJSONSubscriber

func NewJSONSubscriber[T any](s *Stream, topic Topic, opts ...SubscribeOption) JSONSubscriber[T]

func (JSONSubscriber[T]) Subscribe

func (j JSONSubscriber[T]) Subscribe(handler func(ctx context.Context, m T) error) (Subscription, error)

type Message

type Message struct {
	Topic   Topic
	Data    []byte
	Headers map[string]string
	ID      string
	Time    time.Time
}

type Option

type Option func(*config)

Option configures the Stream.

func WithBasicAuth

func WithBasicAuth(user, pass string) Option

WithBasicAuth sets user/password auth for the embedded server & client.

func WithConnectTimeout

func WithConnectTimeout(d time.Duration) Option

WithConnectTimeout sets the client connect timeout.

func WithDefaultCodec

func WithDefaultCodec(cd Codec) Option

WithDefaultCodec overrides the default codec (JSON by default).

func WithDefaultTopicMode

func WithDefaultTopicMode(m TopicMode) Option

WithDefaultTopicMode sets the default topic mode for new topics (Core or JetStream).

func WithDisableJetStream

func WithDisableJetStream() Option

WithDisableJetStream disables JetStream (useful for testing).

func WithDrainTimeout

func WithDrainTimeout(d time.Duration) Option

WithDrainTimeout sets how long Close waits for client drain before hard-close.

func WithHost

func WithHost(h string) Option

WithHost sets the listen host for the embedded server (default 127.0.0.1).

func WithLogger

func WithLogger(l *slog.Logger) Option

WithLogger injects a slog logger.

func WithMaxPayload

func WithMaxPayload(bytes int) Option

WithMaxPayload sets the server max payload size (bytes).

func WithPort

func WithPort(p int) Option

WithPort sets the server port. Use WithRandomPort for dynamic.

func WithRandomPort

func WithRandomPort() Option

WithRandomPort selects a random free port for the embedded server.

func WithReconnectWait

func WithReconnectWait(min time.Duration) Option

WithReconnectWait sets the fixed reconnect wait (min).

func WithRequestIDHeader

func WithRequestIDHeader(name string) Option

WithRequestIDHeader changes the correlation header name (default X-Request-Id).

func WithServerReadyTimeout

func WithServerReadyTimeout(d time.Duration) Option

WithServerReadyTimeout sets how long to wait for the embedded server to be ready.

func WithStoreDir

func WithStoreDir(dir string) Option

WithStoreDir sets the JetStream store directory (useful for durable topics later).

func WithTLS

func WithTLS(cfg *tls.Config) Option

WithTLS enables TLS for the embedded server and client.

func WithTokenAuth

func WithTokenAuth(token string) Option

WithTokenAuth sets token auth for the embedded server & client.

type PublishOption

type PublishOption func(*publishCfg)

func WithFlush

func WithFlush(on bool) PublishOption

WithFlush controls whether to flush after publish; pair with WithFlushTimeout.

func WithFlushTimeout

func WithFlushTimeout(d time.Duration) PublishOption

WithFlushTimeout sets an optional timeout used when flushing.

func WithHeaders

func WithHeaders(h map[string]string) PublishOption

WithHeaders merges the given headers into the outbound message.

func WithMessageID

func WithMessageID(id string) PublishOption

WithMessageID sets/overrides the X-Message-Id header.

func WithReplyTo

func WithReplyTo(t Topic) PublishOption

WithReplyTo sets the NATS Reply field for publish or request flows.

type Publisher

type Publisher interface {
	Publish(ctx context.Context, topic Topic, msg Message, opts ...PublishOption) error
}

type PublisherFunc

type PublisherFunc func(ctx context.Context, topic Topic, msg Message, opts ...PublishOption) error

func (PublisherFunc) Publish

func (f PublisherFunc) Publish(ctx context.Context, topic Topic, msg Message, opts ...PublishOption) error

type Retention

type Retention int
const (
	RetentionEphemeral Retention = iota // in-memory/core, or JS w/ small limits
	RetentionDurable                    // JS persisted stream
)

type StartPosition

type StartPosition int
const (
	DeliverNew StartPosition = iota
	DeliverByStartTime
	DeliverLastPerSubject
)

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream owns the embedded nats-server, client connection(s), and registries. It is safe for concurrent use.

func New

func New(ctx context.Context, opts ...Option) (*Stream, error)

New starts an embedded NATS server, waits until it’s ready, and connects a client.

Defaults “just work” for local dev: loopback host, dynamic port, Core topic mode, JSON codec.

func (*Stream) Close

func (s *Stream) Close(ctx context.Context) error

Close drains the client and shuts down the server gracefully.

func (*Stream) DeepHealthCheck

func (s *Stream) DeepHealthCheck(ctx context.Context) error

DeepHealthCheck performs a thorough health check including server readiness verification. This is more expensive than Healthy() and should be used sparingly.

func (*Stream) Healthy

func (s *Stream) Healthy(ctx context.Context) error

Healthy returns an error if the stream is not in a healthy state. This is a lightweight check that verifies the basic operational status of the stream. For a more thorough check including server readiness, use DeepHealthCheck().

func (*Stream) Publish

func (s *Stream) Publish(ctx context.Context, topic Topic, msg Message, opts ...PublishOption) error

func (*Stream) PublishJSON

func (s *Stream) PublishJSON(ctx context.Context, topic Topic, v any, opts ...PublishOption) error

func (*Stream) Publisher

func (s *Stream) Publisher(name string) (Publisher, bool)

func (*Stream) RegisterPublisher

func (s *Stream) RegisterPublisher(name string, p Publisher) error

func (*Stream) Request

func (s *Stream) Request(
	ctx context.Context,
	topic Topic,
	msg Message,
	timeout time.Duration,
	opts ...PublishOption,
) (Message, error)

func (*Stream) Subscribe

func (s *Stream) Subscribe(topic Topic, sub Subscriber, opts ...SubscribeOption) (Subscription, error)

func (*Stream) TestJSONResponder

func (s *Stream) TestJSONResponder(topic Topic, handler func(data []byte) (any, error)) (*nats.Subscription, error)

TestJSONResponder is a helper for testing JSON request/reply functionality

func (*Stream) TestResponder

func (s *Stream) TestResponder(topic Topic, handler func(data []byte) ([]byte, error)) (*nats.Subscription, error)

TestResponder is a helper for testing request/reply functionality It sets up a proper NATS responder that can handle reply-to automatically

type SubscribeOption

type SubscribeOption func(*SubscribeOptions)

func WithAckPolicy

func WithAckPolicy(p AckPolicy) SubscribeOption

func WithBackpressure

func WithBackpressure(p BackpressurePolicy) SubscribeOption

func WithBufferSize

func WithBufferSize(n int) SubscribeOption

func WithConcurrency

func WithConcurrency(n int) SubscribeOption

func WithDLQ

func WithDLQ(t Topic) SubscribeOption

func WithDurable

func WithDurable(name string) SubscribeOption

func WithExponentialBackoff

func WithExponentialBackoff(initial time.Duration, steps int, factor float64) SubscribeOption

func WithMaxDeliver

func WithMaxDeliver(n int) SubscribeOption

func WithQueueGroupName

func WithQueueGroupName(name string) SubscribeOption

Option helpers (subset wired for Core mode; others are accepted but unused)

func WithStartAt

func WithStartAt(pos StartPosition) SubscribeOption

func WithStartTime

func WithStartTime(t time.Time) SubscribeOption

type SubscribeOptions

type SubscribeOptions struct {
	// Delivery topology
	QueueGroupName string
	Concurrency    int
	BufferSize     int
	Backpressure   BackpressurePolicy

	// JetStream semantics (ignored in Core mode for now)
	AckPolicy  AckPolicy
	MaxDeliver int
	Backoff    []time.Duration
	Durable    string
	StartAt    StartPosition
	StartTime  time.Time
	DLQ        Topic

	// Codec (reserved for later)
	Codec Codec
}

type Subscriber

type Subscriber interface {
	Handle(ctx context.Context, msg Message) error
}

type SubscriberFunc

type SubscriberFunc func(ctx context.Context, msg Message) error

func (SubscriberFunc) Handle

func (f SubscriberFunc) Handle(ctx context.Context, msg Message) error

type Subscription

type Subscription interface {
	Drain(ctx context.Context) error
	Stop() error
}

func SubscribeJSON

func SubscribeJSON[T any](
	s *Stream,
	topic Topic,
	handler func(ctx context.Context, m T) error,
	opts ...SubscribeOption,
) (Subscription, error)

type Topic

type Topic string

type TopicMode

type TopicMode int

TopicMode controls the default topic mode at the Stream level (topics can override).

const (
	TopicModeCore TopicMode = iota
	TopicModeJetStream
)

func (TopicMode) String

func (m TopicMode) String() string

type TopicOption

type TopicOption func(*TopicOptions)

type TopicOptions

type TopicOptions struct {
	Mode            TopicMode     // default: stream default (TopicModeCore unless changed)
	Retention       Retention     // default: Ephemeral
	MaxBytes        int64         // JS
	MaxMsgs         int64         // JS
	MaxAge          time.Duration // JS
	Replicas        int           // JS; single embedded server => 1
	DiscardPolicy   DiscardPolicy // JS
	SubjectOverride string        // optional subject mapping
	Codec           Codec         // optional per-topic default
}

Directories

Path Synopsis
internal
js
syncx
Package syncx provides enhanced synchronization utilities for the go-stream library.
Package syncx provides enhanced synchronization utilities for the go-stream library.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL