durablestream

package
v0.0.0-...-9969c5e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 24, 2026 License: MIT Imports: 14 Imported by: 1

Documentation

Overview

Package durablestream implements the Durable Streams Protocol.

Example (FullDemo)
package main

import (
	"context"
	"fmt"
	"net/http/httptest"
	"time"

	"github.com/ahimsalabs/durable-streams-go/durablestream"
	"github.com/ahimsalabs/durable-streams-go/durablestream/storage/memorystorage"
)

func main() {
	// [snippet:demo]
	// Start test server
	storage := memorystorage.New()
	handler := durablestream.NewHandler(storage, nil)
	server := httptest.NewServer(handler)
	defer server.Close()

	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()

	client := durablestream.NewClient(server.URL, nil)

	_, _ = client.Create(ctx, "/mystream", &durablestream.CreateOptions{
		ContentType: "application/json",
	})

	// Write using Writer
	writer, _ := client.Writer(ctx, "/mystream")
	_ = writer.SendJSON(map[string]string{"hello": "world"}, nil)

	// Read using Reader with Messages iterator
	reader := client.Reader("/mystream", durablestream.ZeroOffset)
	defer reader.Close()

	for msg, err := range reader.Messages(ctx) {
		if err != nil {
			break
		}
		fmt.Println(msg.String())
		break // Just read first message for demo
	}
	// [/snippet:demo]

}
Output:

{"hello":"world"}

Index

Examples

Constants

View Source
const (
	// ReadModeAuto catches up, then uses long-poll for live updates (default).
	ReadModeAuto = transport.ReadModeAuto

	// ReadModeLongPoll uses long-polling for live updates (Section 5.6).
	ReadModeLongPoll = transport.ReadModeLongPoll

	// ReadModeSSE uses Server-Sent Events for live updates (Section 5.7).
	ReadModeSSE = transport.ReadModeSSE
)

Read mode constants.

Variables

View Source
var (
	// ErrNotFound indicates the requested stream does not exist.
	ErrNotFound = errors.New("stream not found")

	// ErrGone indicates the requested offset is before the earliest
	// retained position due to retention/compaction.
	ErrGone = errors.New("offset before earliest retained position")

	// ErrConflict indicates a conflict occurred:
	// - Stream exists with different configuration (on create)
	// - Content type mismatch (on append)
	ErrConflict = errors.New("conflict")

	// ErrSequenceConflict indicates a sequence number conflict on append.
	// This occurs when appending with a sequence number <= the last appended sequence.
	ErrSequenceConflict = errors.New("sequence conflict")

	// ErrClosed indicates the stream or connection has been closed.
	ErrClosed = errors.New("stream closed")

	// ErrBadRequest indicates a malformed or invalid request.
	ErrBadRequest = errors.New("bad request")
)

Sentinel errors for common conditions. Use errors.Is() to check for these errors.

Functions

This section is empty.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client provides methods to interact with durable streams. See PROTOCOL.md Section 5: HTTP Operations.

Example

[snippet:client]

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/ahimsalabs/durable-streams-go/durablestream"
)

func main() {
	ctx := context.Background()

	client := durablestream.NewClient("http://localhost:4437/streams", nil)

	_, err := client.Create(ctx, "events", &durablestream.CreateOptions{
		ContentType: "application/json",
	})
	if err != nil {
		log.Fatal(err)
	}

	// Write using Writer
	writer, err := client.Writer(ctx, "events")
	if err != nil {
		log.Fatal(err)
	}

	event := map[string]any{"type": "user.created", "id": 123}
	if err := writer.SendJSON(event, nil); err != nil {
		log.Fatal(err)
	}
	fmt.Println("Appended at offset:", writer.Offset())

	// Read using Reader
	reader := client.Reader("events", durablestream.ZeroOffset)
	defer reader.Close()

	result, err := reader.Read(ctx)
	if err != nil {
		log.Fatal(err)
	}

	fmt.Println("Got data:", len(result.Data) > 0)
	fmt.Println("Next offset:", result.NextOffset)
}

func NewClient

func NewClient(baseURL string, cfg *ClientConfig) *Client

NewClient creates a new stream client for the given base URL. Pass nil for cfg to use defaults.

The client automatically retries transient failures (5xx errors, rate limits) with exponential backoff. For custom retry behavior or to disable retry, use NewClientWithTransport.

For custom transports (testing, middleware composition), use NewClientWithTransport.

func NewClientWithTransport

func NewClientWithTransport(t transport.Transport, cfg *TransportClientConfig) *Client

NewClientWithTransport creates a client with a custom transport.

Use this for:

  • Testing with mock transports
  • Middleware composition (logging, retry)
  • Custom transport implementations

Example with middleware:

t := transport.NewHTTPTransport(url, &transport.HTTPConfig{Headers: myHeaders})
t = transport.WithRetry(transport.DefaultRetryOptions())(t)
client := durablestream.NewClientWithTransport(t, nil)

func (*Client) Create

func (c *Client) Create(ctx context.Context, path string, opts *CreateOptions) (*StreamInfo, error)

Create creates a new stream with the given options (Section 5.1: Create Stream). Pass nil for opts to use defaults.

func (*Client) Delete

func (c *Client) Delete(ctx context.Context, path string) error

Delete removes a stream (Section 5.3: Delete Stream).

func (*Client) Head

func (c *Client) Head(ctx context.Context, path string) (*StreamInfo, error)

Head queries stream metadata without transferring data (Section 5.4: Stream Metadata).

func (*Client) Reader

func (c *Client) Reader(path string, offset Offset) *Reader

Reader creates a new Reader for continuous reading from a stream. The Reader inherits the client's ReadMode for live tailing behavior.

When offset is "now" and the read mode is LongPoll or Auto, the reader skips catch-up and goes directly to long-poll mode. Per PROTOCOL.md Section 6: "Servers MUST immediately begin waiting for new data (no initial empty response)"

func (*Client) Writer

func (c *Client) Writer(ctx context.Context, path string) (*StreamWriter, error)

Writer creates a StreamWriter for append operations. The writer caches stream metadata (content-type) to avoid per-append overhead.

type ClientConfig

type ClientConfig struct {
	// HTTPClient is the underlying HTTP client.
	// Default: http.DefaultClient.
	HTTPClient *http.Client

	// Headers provides headers to include in all requests.
	// Called per-request to allow dynamic values (e.g., auth tokens).
	// This is the primary customization point for authentication.
	Headers HeaderProvider

	// Timeout is the default timeout for all operations (Create, Head, Delete, etc).
	// Zero or negative values default to 30s.
	Timeout time.Duration

	// ReadMode specifies how live reads are handled after catch-up (Section 5.6-5.7).
	// Zero value defaults to ReadModeAuto (catch-up then long-poll).
	ReadMode ReadMode
}

ClientConfig configures a Client created via NewClient.

For custom transports (testing, middleware), use NewClientWithTransport instead.

Zero Values

Zero values are replaced with defaults:

  • Timeout: 30s (if zero or negative)
  • ReadMode: ReadModeAuto (if zero)
  • HTTPClient: http.DefaultClient (if nil)
  • Headers: none (if nil)

type CreateOptions

type CreateOptions struct {
	// ContentType sets the content type for the stream.
	// Default: "application/octet-stream"
	ContentType string

	// TTL sets a relative time-to-live for the stream.
	// Zero means no TTL. Mutually exclusive with ExpiresAt.
	TTL time.Duration

	// ExpiresAt sets an absolute expiry time for the stream.
	// Zero means no expiry. Mutually exclusive with TTL.
	ExpiresAt time.Time

	// InitialData sets the initial stream data.
	InitialData []byte
}

CreateOptions specifies options for creating a stream (Section 5.1).

type Handler

type Handler struct {
	// contains filtered or unexported fields
}

Handler implements http.Handler for serving durable streams. Per spec Section 5: routes requests based on HTTP method.

Example

[snippet:server]

package main

import (
	"log"
	"net/http"

	"github.com/ahimsalabs/durable-streams-go/durablestream"
	"github.com/ahimsalabs/durable-streams-go/durablestream/storage/memorystorage"
)

func main() {
	storage := memorystorage.New()
	handler := durablestream.NewHandler(storage, nil)

	mux := http.NewServeMux()
	mux.Handle("/v1/stream/", http.StripPrefix("/v1/stream/", handler))

	log.Println("Listening on :4437")
	log.Fatal(http.ListenAndServe(":4437", mux))
}

func NewHandler

func NewHandler(storage Storage, cfg *HandlerConfig) *Handler

NewHandler creates a new stream handler with the given storage. Pass nil for cfg to use defaults.

func (*Handler) ServeHTTP

func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request)

ServeHTTP routes to appropriate handler based on method.

type HandlerConfig

type HandlerConfig struct {
	// PathExtractor extracts the stream ID from the request.
	// Default: uses r.URL.Path.
	PathExtractor func(*http.Request) string

	// LongPollTimeout is the maximum wait time for long-poll requests. Default: 30s.
	LongPollTimeout time.Duration

	// SSECloseAfter is the duration after which SSE connections are closed. Default: 60s.
	SSECloseAfter time.Duration

	// MaxAppendSize is the maximum allowed size for append operations. Default: 10MB.
	MaxAppendSize int64

	// ChunkSize is the maximum response size (in bytes) for read operations.
	// When a read would return more data than this limit, results are paginated.
	// Default: 1MB.
	ChunkSize int
}

HandlerConfig configures a Handler.

type HeaderProvider

type HeaderProvider = transport.HeaderProvider

HeaderProvider is a function that provides HTTP headers per-request. Re-exported from transport package.

type Message

type Message struct {
	// contains filtered or unexported fields
}

Message represents a single message from a stream. Use Decode() to unmarshal JSON, Bytes() for raw access, or String() for text.

func (Message) Bytes

func (m Message) Bytes() []byte

Bytes returns the raw message bytes.

func (Message) Decode

func (m Message) Decode(v any) error

Decode unmarshals the message as JSON into v.

func (Message) String

func (m Message) String() string

String returns the message as a string.

type Offset

type Offset string

Offset represents an opaque position within a stream. Per spec Section 6: Offsets are opaque tokens that are lexicographically sortable.

From PROTOCOL.md Section 6:

  1. Opaque: Clients MUST NOT interpret offset structure or meaning
  2. Lexicographically Sortable: For any two valid offsets for the same stream, a lexicographic comparison determines their relative position in the stream.
  3. Persistent: Offsets remain valid for the lifetime of the stream (until deletion or expiration)
const ZeroOffset Offset = ""

ZeroOffset represents the zero value for an offset, equivalent to the stream start.

func (Offset) Compare

func (o Offset) Compare(other Offset) int

Compare performs a lexicographic comparison of two offsets. Returns:

-1 if o is before other
 0 if o equals other
 1 if o is after other

Note: This uses lexicographic (byte-wise) ordering, not numeric ordering. For example, "9" > "10" lexicographically because '9' (0x39) > '1' (0x31).

func (Offset) IsZero

func (o Offset) IsZero() bool

IsZero returns true if the offset is the zero value (empty string).

func (Offset) String

func (o Offset) String() string

String returns the string representation of the offset. Implements fmt.Stringer interface.

type ReadMode

type ReadMode = transport.ReadMode

ReadMode specifies how live reads are handled after catch-up. See transport.ReadMode for detailed documentation.

type ReadResult

type ReadResult struct {
	Messages   []StoredMessage // Individual messages in offset order
	NextOffset Offset          // Offset to use for next read
	TailOffset Offset          // Current tail (for up-to-date detection)
}

ReadResult contains messages from a storage read.

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

Reader provides continuous stream reading with automatic mode transitions. It manages offset tracking and switches from catch-up to live mode based on the client's configured ReadMode.

See PROTOCOL.md Section 5.5-5.7 for read operation details.

Example

[snippet:reader]

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/ahimsalabs/durable-streams-go/durablestream"
)

func main() {
	ctx := context.Background()

	client := durablestream.NewClient("http://localhost:4437/streams", nil)

	// Create a reader starting from offset 0
	reader := client.Reader("events", durablestream.ZeroOffset)
	defer reader.Close()

	for msg, err := range reader.Messages(ctx) {
		if err != nil {
			log.Fatal(err)
		}
		// Use msg.String() for text, msg.Bytes() for raw bytes,
		// or msg.Decode(&v) for JSON
		fmt.Println("Received:", msg.String())
	}
}

func (*Reader) Close

func (r *Reader) Close() error

Close closes the reader and releases any resources.

func (*Reader) Messages

func (r *Reader) Messages(ctx context.Context) iter.Seq2[Message, error]

Messages returns an iterator for reading messages from the stream. For JSON streams, it parses the JSON array and yields individual messages. Each Message can be decoded via msg.Decode(&v) or accessed as raw bytes via msg.Bytes().

This enables use with Go 1.22+ range-over-func:

for msg, err := range reader.Messages(ctx) {
    if err != nil {
        log.Fatal(err)
    }
    var event MyEvent
    if err := msg.Decode(&event); err != nil {
        log.Fatal(err)
    }
}

func (*Reader) Offset

func (r *Reader) Offset() Offset

Offset returns the current offset position of the reader.

func (*Reader) Read

func (r *Reader) Read(ctx context.Context) (*StreamData, error)

Read performs a single read operation based on the current state. Returns the read result or an error.

During catch-up phase, uses basic GET requests (Section 5.5). After UpToDate, switches to live mode based on ReadMode:

  • ReadModeAuto/ReadModeLongPoll: long-poll requests (Section 5.6)
  • ReadModeSSE: Server-Sent Events stream (Section 5.7)

func (*Reader) Seek

func (r *Reader) Seek(offset Offset) *Reader

Seek repositions the reader to the given offset. The next Read will start from this offset. Returns the reader for chaining.

func (*Reader) SeekTail

func (r *Reader) SeekTail(ctx context.Context) error

SeekTail repositions the reader to the current tail of the stream. After seeking to tail, subsequent reads will only return new data. This is useful for "live tail" scenarios where you only want new messages.

type SendOptions

type SendOptions struct {
	// Seq is an optional monotonic sequence number for writer coordination.
	// If provided and less than or equal to the last sequence, returns ErrConflict.
	Seq string
}

SendOptions specifies options for Send and SendJSON operations.

type Storage

type Storage interface {
	// Create creates a new stream. Returns (true, nil) if newly created.
	// Returns (false, nil) if stream exists with matching config (idempotent).
	// Returns (false, error) if stream exists with different config.
	Create(ctx context.Context, streamID string, cfg StreamConfig) (created bool, err error)

	// Append writes data to a stream. Returns the new tail offset.
	// seq is optional sequence number for coordination (Section 5.2).
	//
	// The data slice is only valid for the duration of the call; the caller
	// may reuse or modify it after Append returns. Implementations that need
	// to retain the data (e.g., in-memory storage) must copy it.
	Append(ctx context.Context, streamID string, data []byte, seq string) (Offset, error)

	// Read returns messages from offset. limit is max total bytes to return.
	// Returns messages and the next offset to read from (Section 5.5).
	Read(ctx context.Context, streamID string, offset Offset, limit int) (*ReadResult, error)

	// Head returns stream metadata without reading data (Section 5.4).
	// Returns ErrNotFound if stream doesn't exist (use for existence checks).
	Head(ctx context.Context, streamID string) (*StreamInfo, error)

	// Delete removes a stream (Section 5.3).
	Delete(ctx context.Context, streamID string) error

	// WaitForData blocks until data is available at offset, then returns it.
	// Returns immediately if data already exists at offset.
	// Returns ctx.Err() on timeout/cancellation.
	// Returns ErrNotFound if stream doesn't exist or is deleted while waiting.
	WaitForData(ctx context.Context, streamID string, offset Offset, limit int) (*ReadResult, error)

	// Close releases resources. Safe to call multiple times.
	Close() error
}

Storage defines the interface for stream persistence. Implementations must be goroutine-safe.

type StoredMessage

type StoredMessage struct {
	Data   []byte // Raw bytes of this message
	Offset Offset // Offset after this message
}

StoredMessage represents a single message in a stream. Each append operation creates one StoredMessage (or multiple if JSON array is flattened).

type StreamConfig

type StreamConfig struct {
	ContentType string
	TTL         time.Duration // Zero means no TTL
	ExpiresAt   time.Time     // Zero means no expiry
	IsPrivate   bool          // If true, use Cache-Control: private (Section 8.1)
}

StreamConfig contains creation-time configuration.

func (StreamConfig) IsExpired

func (c StreamConfig) IsExpired() bool

IsExpired checks if the config has expired based on its ExpiresAt field. Returns false if ExpiresAt is zero (no expiry).

func (StreamConfig) Matches

func (c StreamConfig) Matches(other StreamConfig) bool

Matches checks if two StreamConfigs are equivalent for idempotent create. Content-Type is compared case-insensitively (per RFC 2045). ExpiresAt is only compared when TTL is zero (i.e., explicit Stream-Expires-At header).

type StreamData

type StreamData struct {
	Data       []byte // Raw response bytes (empty on 204 timeout)
	NextOffset Offset // Next offset to read from
	Cursor     string // Opaque cursor for long-poll
	UpToDate   bool   // True if caught up to tail
}

StreamData contains the result of a stream read operation.

type StreamInfo

type StreamInfo struct {
	ContentType string
	NextOffset  Offset
	TTL         time.Duration // Zero means no TTL
	ExpiresAt   time.Time     // Zero means no expiry
	IsPrivate   bool          // If true, use Cache-Control: private (Section 8.1)
}

StreamInfo contains metadata about a stream.

type StreamWriter

type StreamWriter struct {
	// contains filtered or unexported fields
}

StreamWriter provides efficient append operations by caching stream metadata. Create via Client.Writer(). The writer holds no resources requiring cleanup. See PROTOCOL.md Section 5.2: Append to Stream.

func (*StreamWriter) Offset

func (w *StreamWriter) Offset() Offset

Offset returns the current tail offset after the last successful append.

func (*StreamWriter) Send

func (w *StreamWriter) Send(data []byte, opts *SendOptions) error

Send appends raw bytes to the stream (Section 5.2: Append to Stream).

func (*StreamWriter) SendJSON

func (w *StreamWriter) SendJSON(v any, opts *SendOptions) error

SendJSON marshals v as JSON and appends to the stream.

type TransportClientConfig

type TransportClientConfig struct {
	// Timeout is the default timeout for all operations.
	// Zero or negative values default to 30s.
	Timeout time.Duration

	// ReadMode specifies how live reads are handled after catch-up.
	// Zero value defaults to ReadModeAuto.
	ReadMode ReadMode
}

TransportClientConfig configures a Client created via NewClientWithTransport.

Directories

Path Synopsis
internal
protocol
Package protocol contains internal HTTP protocol constants and utilities.
Package protocol contains internal HTTP protocol constants and utilities.
Package storage provides offset formatting helpers for durable stream storage implementations.
Package storage provides offset formatting helpers for durable stream storage implementations.
memorystorage
Package memorystore provides an in-memory implementation of durablestream.Storage.
Package memorystore provides an in-memory implementation of durablestream.Storage.
Package testing provides test helpers for durablestream.
Package testing provides test helpers for durablestream.
Package transport defines the interface for durable stream network operations.
Package transport defines the interface for durable stream network operations.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL