Documentation
¶
Overview ¶
Package stream provides an embedded NATS server with a small, ergonomic façade.
Index ¶
- Variables
- func RequestJSON[T any](s *Stream, ctx context.Context, topic Topic, req any, timeout time.Duration, ...) (T, error)
- type AckPolicy
- type BackpressurePolicy
- type Codec
- type DiscardPolicy
- type JSONPublisher
- type JSONRequester
- type JSONSubscriber
- type Message
- type Option
- func WithBasicAuth(user, pass string) Option
- func WithConnectTimeout(d time.Duration) Option
- func WithDefaultCodec(cd Codec) Option
- func WithDefaultTopicMode(m TopicMode) Option
- func WithDisableJetStream() Option
- func WithDrainTimeout(d time.Duration) Option
- func WithHost(h string) Option
- func WithLogger(l *slog.Logger) Option
- func WithMaxPayload(bytes int) Option
- func WithPort(p int) Option
- func WithRandomPort() Option
- func WithReconnectWait(min time.Duration) Option
- func WithRequestIDHeader(name string) Option
- func WithServerReadyTimeout(d time.Duration) Option
- func WithStoreDir(dir string) Option
- func WithTLS(cfg *tls.Config) Option
- func WithTokenAuth(token string) Option
- type PublishOption
- type Publisher
- type PublisherFunc
- type Retention
- type StartPosition
- type Stream
- func (s *Stream) Close(ctx context.Context) error
- func (s *Stream) DeepHealthCheck(ctx context.Context) error
- func (s *Stream) Healthy(ctx context.Context) error
- func (s *Stream) Publish(ctx context.Context, topic Topic, msg Message, opts ...PublishOption) error
- func (s *Stream) PublishJSON(ctx context.Context, topic Topic, v any, opts ...PublishOption) error
- func (s *Stream) Publisher(name string) (Publisher, bool)
- func (s *Stream) RegisterPublisher(name string, p Publisher) error
- func (s *Stream) Request(ctx context.Context, topic Topic, msg Message, timeout time.Duration, ...) (Message, error)
- func (s *Stream) Subscribe(topic Topic, sub Subscriber, opts ...SubscribeOption) (Subscription, error)
- func (s *Stream) TestJSONResponder(topic Topic, handler func(data []byte) (any, error)) (*nats.Subscription, error)
- func (s *Stream) TestResponder(topic Topic, handler func(data []byte) ([]byte, error)) (*nats.Subscription, error)
- type SubscribeOption
- func WithAckPolicy(p AckPolicy) SubscribeOption
- func WithBackpressure(p BackpressurePolicy) SubscribeOption
- func WithBufferSize(n int) SubscribeOption
- func WithConcurrency(n int) SubscribeOption
- func WithDLQ(t Topic) SubscribeOption
- func WithDurable(name string) SubscribeOption
- func WithExponentialBackoff(initial time.Duration, steps int, factor float64) SubscribeOption
- func WithMaxDeliver(n int) SubscribeOption
- func WithQueueGroupName(name string) SubscribeOption
- func WithStartAt(pos StartPosition) SubscribeOption
- func WithStartTime(t time.Time) SubscribeOption
- type SubscribeOptions
- type Subscriber
- type SubscriberFunc
- type Subscription
- type Topic
- type TopicMode
- type TopicOption
- type TopicOptions
Constants ¶
This section is empty.
Variables ¶
var ( // ErrCodecUnsupported indicates the codec is not supported. ErrCodecUnsupported = errors.New("unsupported codec") // ErrTimeout indicates the operation timed out. ErrTimeout = errors.New("operation timeout") )
var ( // ErrStreamClosed indicates the stream was already closed (or never started). ErrStreamClosed = errors.New("stream is closed") // ErrStreamUnhealthy indicates server or client is not ready/connected. ErrStreamUnhealthy = errors.New("stream is not healthy") )
var ( // ErrInvalidTopic indicates the topic name is invalid. ErrInvalidTopic = errors.New("invalid topic name") // ErrTopicExists indicates the topic already exists. ErrTopicExists = errors.New("topic already exists") // ErrUnknownTopic indicates the topic does not exist. ErrUnknownTopic = errors.New("unknown topic") )
Functions ¶
func RequestJSON ¶
func RequestJSON[T any]( s *Stream, ctx context.Context, topic Topic, req any, timeout time.Duration, opts ...PublishOption, ) (T, error)
RequestJSON sends a typed request and decodes a typed response using JSON. (When the Codec system is fully wired, this can delegate to the resolved codec.)
Types ¶
type BackpressurePolicy ¶
type BackpressurePolicy int
Backpressure and ack semantics.
const ( BackpressureBlock BackpressurePolicy = iota // default BackpressureDropNewest BackpressureDropOldest )
type Codec ¶
type Codec interface {
Encode(v any) ([]byte, error)
Decode(data []byte, v any) error
ContentType() string
}
Codec is a pluggable encoder/decoder (default JSON).
var (
JSONCodec Codec = jsonCodec{}
)
type JSONPublisher ¶
type JSONPublisher[T any] struct { // contains filtered or unexported fields }
JSONPublisher wraps PublishJSON with captured stream/topic.
func NewJSONPublisher ¶
func NewJSONPublisher[T any](s *Stream, topic Topic) JSONPublisher[T]
func (JSONPublisher[T]) Publish ¶
func (p JSONPublisher[T]) Publish(ctx context.Context, v T, opts ...PublishOption) error
type JSONRequester ¶
type JSONRequester[Req, Resp any] struct { // contains filtered or unexported fields }
JSONRequester wraps RequestJSON with captured stream/topic.
func NewJSONRequester ¶
func NewJSONRequester[Req, Resp any](s *Stream, topic Topic) JSONRequester[Req, Resp]
func (JSONRequester[Req, Resp]) Request ¶
func (r JSONRequester[Req, Resp]) Request(ctx context.Context, req Req, timeout time.Duration, opts ...PublishOption) (Resp, error)
type JSONSubscriber ¶
type JSONSubscriber[T any] struct { // contains filtered or unexported fields }
func NewJSONSubscriber ¶
func NewJSONSubscriber[T any](s *Stream, topic Topic, opts ...SubscribeOption) JSONSubscriber[T]
func (JSONSubscriber[T]) Subscribe ¶
func (j JSONSubscriber[T]) Subscribe(handler func(ctx context.Context, m T) error) (Subscription, error)
type Option ¶
type Option func(*config)
Option configures the Stream.
func WithBasicAuth ¶
WithBasicAuth sets user/password auth for the embedded server & client.
func WithConnectTimeout ¶
WithConnectTimeout sets the client connect timeout.
func WithDefaultCodec ¶
WithDefaultCodec overrides the default codec (JSON by default).
func WithDefaultTopicMode ¶
WithDefaultTopicMode sets the default topic mode for new topics (Core or JetStream).
func WithDisableJetStream ¶
func WithDisableJetStream() Option
WithDisableJetStream disables JetStream (useful for testing).
func WithDrainTimeout ¶
WithDrainTimeout sets how long Close waits for client drain before hard-close.
func WithMaxPayload ¶
WithMaxPayload sets the server max payload size (bytes).
func WithRandomPort ¶
func WithRandomPort() Option
WithRandomPort selects a random free port for the embedded server.
func WithReconnectWait ¶
WithReconnectWait sets the fixed reconnect wait (min).
func WithRequestIDHeader ¶
WithRequestIDHeader changes the correlation header name (default X-Request-Id).
func WithServerReadyTimeout ¶
WithServerReadyTimeout sets how long to wait for the embedded server to be ready.
func WithStoreDir ¶
WithStoreDir sets the JetStream store directory (useful for durable topics later).
func WithTokenAuth ¶
WithTokenAuth sets token auth for the embedded server & client.
type PublishOption ¶
type PublishOption func(*publishCfg)
func WithFlush ¶
func WithFlush(on bool) PublishOption
WithFlush controls whether to flush after publish; pair with WithFlushTimeout.
func WithFlushTimeout ¶
func WithFlushTimeout(d time.Duration) PublishOption
WithFlushTimeout sets an optional timeout used when flushing.
func WithHeaders ¶
func WithHeaders(h map[string]string) PublishOption
WithHeaders merges the given headers into the outbound message.
func WithMessageID ¶
func WithMessageID(id string) PublishOption
WithMessageID sets/overrides the X-Message-Id header.
func WithReplyTo ¶
func WithReplyTo(t Topic) PublishOption
WithReplyTo sets the NATS Reply field for publish or request flows.
type PublisherFunc ¶
func (PublisherFunc) Publish ¶
func (f PublisherFunc) Publish(ctx context.Context, topic Topic, msg Message, opts ...PublishOption) error
type StartPosition ¶
type StartPosition int
const ( DeliverNew StartPosition = iota DeliverByStartTime DeliverLastPerSubject )
type Stream ¶
type Stream struct {
// contains filtered or unexported fields
}
Stream owns the embedded nats-server, client connection(s), and registries. It is safe for concurrent use.
func New ¶
New starts an embedded NATS server, waits until it’s ready, and connects a client.
Defaults “just work” for local dev: loopback host, dynamic port, Core topic mode, JSON codec.
func (*Stream) DeepHealthCheck ¶
DeepHealthCheck performs a thorough health check including server readiness verification. This is more expensive than Healthy() and should be used sparingly.
func (*Stream) Healthy ¶
Healthy returns an error if the stream is not in a healthy state. This is a lightweight check that verifies the basic operational status of the stream. For a more thorough check including server readiness, use DeepHealthCheck().
func (*Stream) PublishJSON ¶
func (*Stream) RegisterPublisher ¶
func (*Stream) Subscribe ¶
func (s *Stream) Subscribe(topic Topic, sub Subscriber, opts ...SubscribeOption) (Subscription, error)
func (*Stream) TestJSONResponder ¶
func (s *Stream) TestJSONResponder(topic Topic, handler func(data []byte) (any, error)) (*nats.Subscription, error)
TestJSONResponder is a helper for testing JSON request/reply functionality
func (*Stream) TestResponder ¶
func (s *Stream) TestResponder(topic Topic, handler func(data []byte) ([]byte, error)) (*nats.Subscription, error)
TestResponder is a helper for testing request/reply functionality It sets up a proper NATS responder that can handle reply-to automatically
type SubscribeOption ¶
type SubscribeOption func(*SubscribeOptions)
func WithAckPolicy ¶
func WithAckPolicy(p AckPolicy) SubscribeOption
func WithBackpressure ¶
func WithBackpressure(p BackpressurePolicy) SubscribeOption
func WithBufferSize ¶
func WithBufferSize(n int) SubscribeOption
func WithConcurrency ¶
func WithConcurrency(n int) SubscribeOption
func WithDLQ ¶
func WithDLQ(t Topic) SubscribeOption
func WithDurable ¶
func WithDurable(name string) SubscribeOption
func WithExponentialBackoff ¶
func WithExponentialBackoff(initial time.Duration, steps int, factor float64) SubscribeOption
func WithMaxDeliver ¶
func WithMaxDeliver(n int) SubscribeOption
func WithQueueGroupName ¶
func WithQueueGroupName(name string) SubscribeOption
Option helpers (subset wired for Core mode; others are accepted but unused)
func WithStartAt ¶
func WithStartAt(pos StartPosition) SubscribeOption
func WithStartTime ¶
func WithStartTime(t time.Time) SubscribeOption
type SubscribeOptions ¶
type SubscribeOptions struct {
// Delivery topology
QueueGroupName string
Concurrency int
BufferSize int
Backpressure BackpressurePolicy
// JetStream semantics (ignored in Core mode for now)
AckPolicy AckPolicy
MaxDeliver int
Backoff []time.Duration
Durable string
StartAt StartPosition
StartTime time.Time
DLQ Topic
// Codec (reserved for later)
Codec Codec
}
type SubscriberFunc ¶
type Subscription ¶
func SubscribeJSON ¶
func SubscribeJSON[T any]( s *Stream, topic Topic, handler func(ctx context.Context, m T) error, opts ...SubscribeOption, ) (Subscription, error)
type TopicMode ¶
type TopicMode int
TopicMode controls the default topic mode at the Stream level (topics can override).
type TopicOption ¶
type TopicOption func(*TopicOptions)
type TopicOptions ¶
type TopicOptions struct {
Mode TopicMode // default: stream default (TopicModeCore unless changed)
Retention Retention // default: Ephemeral
MaxBytes int64 // JS
MaxMsgs int64 // JS
MaxAge time.Duration // JS
Replicas int // JS; single embedded server => 1
DiscardPolicy DiscardPolicy // JS
SubjectOverride string // optional subject mapping
Codec Codec // optional per-topic default
}