Documentation
¶
Overview ¶
Package durablestream implements the Durable Streams Protocol.
Example (FullDemo) ¶
package main
import (
"context"
"fmt"
"net/http/httptest"
"time"
"github.com/ahimsalabs/durable-streams-go/durablestream"
"github.com/ahimsalabs/durable-streams-go/durablestream/storage/memorystorage"
)
func main() {
// [snippet:demo]
// Start test server
storage := memorystorage.New()
handler := durablestream.NewHandler(storage, nil)
server := httptest.NewServer(handler)
defer server.Close()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
client := durablestream.NewClient(server.URL, nil)
_, _ = client.Create(ctx, "/mystream", &durablestream.CreateOptions{
ContentType: "application/json",
})
// Write using Writer
writer, _ := client.Writer(ctx, "/mystream")
_ = writer.SendJSON(map[string]string{"hello": "world"}, nil)
// Read using Reader with Messages iterator
reader := client.Reader("/mystream", durablestream.ZeroOffset)
defer reader.Close()
for msg, err := range reader.Messages(ctx) {
if err != nil {
break
}
fmt.Println(msg.String())
break // Just read first message for demo
}
// [/snippet:demo]
}
Output: {"hello":"world"}
Index ¶
- Constants
- Variables
- type Client
- func (c *Client) Create(ctx context.Context, path string, opts *CreateOptions) (*StreamInfo, error)
- func (c *Client) Delete(ctx context.Context, path string) error
- func (c *Client) Head(ctx context.Context, path string) (*StreamInfo, error)
- func (c *Client) Reader(path string, offset Offset) *Reader
- func (c *Client) Writer(ctx context.Context, path string) (*StreamWriter, error)
- type ClientConfig
- type CreateOptions
- type Handler
- type HandlerConfig
- type HeaderProvider
- type Message
- type Offset
- type ReadMode
- type ReadResult
- type Reader
- type SendOptions
- type Storage
- type StoredMessage
- type StreamConfig
- type StreamData
- type StreamInfo
- type StreamWriter
- type TransportClientConfig
Examples ¶
Constants ¶
const ( // ReadModeAuto catches up, then uses long-poll for live updates (default). ReadModeAuto = transport.ReadModeAuto // ReadModeLongPoll uses long-polling for live updates (Section 5.6). ReadModeLongPoll = transport.ReadModeLongPoll // ReadModeSSE uses Server-Sent Events for live updates (Section 5.7). ReadModeSSE = transport.ReadModeSSE )
Read mode constants.
Variables ¶
var ( // ErrNotFound indicates the requested stream does not exist. ErrNotFound = errors.New("stream not found") // ErrGone indicates the requested offset is before the earliest // retained position due to retention/compaction. ErrGone = errors.New("offset before earliest retained position") // ErrConflict indicates a conflict occurred: // - Stream exists with different configuration (on create) // - Content type mismatch (on append) ErrConflict = errors.New("conflict") // ErrSequenceConflict indicates a sequence number conflict on append. // This occurs when appending with a sequence number <= the last appended sequence. ErrSequenceConflict = errors.New("sequence conflict") // ErrClosed indicates the stream or connection has been closed. ErrClosed = errors.New("stream closed") // ErrBadRequest indicates a malformed or invalid request. ErrBadRequest = errors.New("bad request") )
Sentinel errors for common conditions. Use errors.Is() to check for these errors.
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client provides methods to interact with durable streams. See PROTOCOL.md Section 5: HTTP Operations.
Example ¶
[snippet:client]
package main
import (
"context"
"fmt"
"log"
"github.com/ahimsalabs/durable-streams-go/durablestream"
)
func main() {
ctx := context.Background()
client := durablestream.NewClient("http://localhost:4437/streams", nil)
_, err := client.Create(ctx, "events", &durablestream.CreateOptions{
ContentType: "application/json",
})
if err != nil {
log.Fatal(err)
}
// Write using Writer
writer, err := client.Writer(ctx, "events")
if err != nil {
log.Fatal(err)
}
event := map[string]any{"type": "user.created", "id": 123}
if err := writer.SendJSON(event, nil); err != nil {
log.Fatal(err)
}
fmt.Println("Appended at offset:", writer.Offset())
// Read using Reader
reader := client.Reader("events", durablestream.ZeroOffset)
defer reader.Close()
result, err := reader.Read(ctx)
if err != nil {
log.Fatal(err)
}
fmt.Println("Got data:", len(result.Data) > 0)
fmt.Println("Next offset:", result.NextOffset)
}
func NewClient ¶
func NewClient(baseURL string, cfg *ClientConfig) *Client
NewClient creates a new stream client for the given base URL. Pass nil for cfg to use defaults.
The client automatically retries transient failures (5xx errors, rate limits) with exponential backoff. For custom retry behavior or to disable retry, use NewClientWithTransport.
For custom transports (testing, middleware composition), use NewClientWithTransport.
func NewClientWithTransport ¶
func NewClientWithTransport(t transport.Transport, cfg *TransportClientConfig) *Client
NewClientWithTransport creates a client with a custom transport.
Use this for:
- Testing with mock transports
- Middleware composition (logging, retry)
- Custom transport implementations
Example with middleware:
t := transport.NewHTTPTransport(url, &transport.HTTPConfig{Headers: myHeaders})
t = transport.WithRetry(transport.DefaultRetryOptions())(t)
client := durablestream.NewClientWithTransport(t, nil)
func (*Client) Create ¶
func (c *Client) Create(ctx context.Context, path string, opts *CreateOptions) (*StreamInfo, error)
Create creates a new stream with the given options (Section 5.1: Create Stream). Pass nil for opts to use defaults.
func (*Client) Head ¶
Head queries stream metadata without transferring data (Section 5.4: Stream Metadata).
func (*Client) Reader ¶
Reader creates a new Reader for continuous reading from a stream. The Reader inherits the client's ReadMode for live tailing behavior.
When offset is "now" and the read mode is LongPoll or Auto, the reader skips catch-up and goes directly to long-poll mode. Per PROTOCOL.md Section 6: "Servers MUST immediately begin waiting for new data (no initial empty response)"
type ClientConfig ¶
type ClientConfig struct {
// HTTPClient is the underlying HTTP client.
// Default: http.DefaultClient.
HTTPClient *http.Client
// Headers provides headers to include in all requests.
// Called per-request to allow dynamic values (e.g., auth tokens).
// This is the primary customization point for authentication.
Headers HeaderProvider
// Timeout is the default timeout for all operations (Create, Head, Delete, etc).
// Zero or negative values default to 30s.
Timeout time.Duration
// ReadMode specifies how live reads are handled after catch-up (Section 5.6-5.7).
// Zero value defaults to ReadModeAuto (catch-up then long-poll).
ReadMode ReadMode
}
ClientConfig configures a Client created via NewClient.
For custom transports (testing, middleware), use NewClientWithTransport instead.
Zero Values ¶
Zero values are replaced with defaults:
- Timeout: 30s (if zero or negative)
- ReadMode: ReadModeAuto (if zero)
- HTTPClient: http.DefaultClient (if nil)
- Headers: none (if nil)
type CreateOptions ¶
type CreateOptions struct {
// ContentType sets the content type for the stream.
// Default: "application/octet-stream"
ContentType string
// TTL sets a relative time-to-live for the stream.
// Zero means no TTL. Mutually exclusive with ExpiresAt.
TTL time.Duration
// ExpiresAt sets an absolute expiry time for the stream.
// Zero means no expiry. Mutually exclusive with TTL.
ExpiresAt time.Time
// InitialData sets the initial stream data.
InitialData []byte
}
CreateOptions specifies options for creating a stream (Section 5.1).
type Handler ¶
type Handler struct {
// contains filtered or unexported fields
}
Handler implements http.Handler for serving durable streams. Per spec Section 5: routes requests based on HTTP method.
Example ¶
[snippet:server]
package main
import (
"log"
"net/http"
"github.com/ahimsalabs/durable-streams-go/durablestream"
"github.com/ahimsalabs/durable-streams-go/durablestream/storage/memorystorage"
)
func main() {
storage := memorystorage.New()
handler := durablestream.NewHandler(storage, nil)
mux := http.NewServeMux()
mux.Handle("/v1/stream/", http.StripPrefix("/v1/stream/", handler))
log.Println("Listening on :4437")
log.Fatal(http.ListenAndServe(":4437", mux))
}
func NewHandler ¶
func NewHandler(storage Storage, cfg *HandlerConfig) *Handler
NewHandler creates a new stream handler with the given storage. Pass nil for cfg to use defaults.
type HandlerConfig ¶
type HandlerConfig struct {
// PathExtractor extracts the stream ID from the request.
// Default: uses r.URL.Path.
PathExtractor func(*http.Request) string
// LongPollTimeout is the maximum wait time for long-poll requests. Default: 30s.
LongPollTimeout time.Duration
// SSECloseAfter is the duration after which SSE connections are closed. Default: 60s.
SSECloseAfter time.Duration
// MaxAppendSize is the maximum allowed size for append operations. Default: 10MB.
MaxAppendSize int64
// ChunkSize is the maximum response size (in bytes) for read operations.
// When a read would return more data than this limit, results are paginated.
// Default: 1MB.
ChunkSize int
}
HandlerConfig configures a Handler.
type HeaderProvider ¶
type HeaderProvider = transport.HeaderProvider
HeaderProvider is a function that provides HTTP headers per-request. Re-exported from transport package.
type Message ¶
type Message struct {
// contains filtered or unexported fields
}
Message represents a single message from a stream. Use Decode() to unmarshal JSON, Bytes() for raw access, or String() for text.
type Offset ¶
type Offset string
Offset represents an opaque position within a stream. Per spec Section 6: Offsets are opaque tokens that are lexicographically sortable.
From PROTOCOL.md Section 6:
- Opaque: Clients MUST NOT interpret offset structure or meaning
- Lexicographically Sortable: For any two valid offsets for the same stream, a lexicographic comparison determines their relative position in the stream.
- Persistent: Offsets remain valid for the lifetime of the stream (until deletion or expiration)
const ZeroOffset Offset = ""
ZeroOffset represents the zero value for an offset, equivalent to the stream start.
func (Offset) Compare ¶
Compare performs a lexicographic comparison of two offsets. Returns:
-1 if o is before other 0 if o equals other 1 if o is after other
Note: This uses lexicographic (byte-wise) ordering, not numeric ordering. For example, "9" > "10" lexicographically because '9' (0x39) > '1' (0x31).
type ReadMode ¶
ReadMode specifies how live reads are handled after catch-up. See transport.ReadMode for detailed documentation.
type ReadResult ¶
type ReadResult struct {
Messages []StoredMessage // Individual messages in offset order
NextOffset Offset // Offset to use for next read
TailOffset Offset // Current tail (for up-to-date detection)
}
ReadResult contains messages from a storage read.
type Reader ¶
type Reader struct {
// contains filtered or unexported fields
}
Reader provides continuous stream reading with automatic mode transitions. It manages offset tracking and switches from catch-up to live mode based on the client's configured ReadMode.
See PROTOCOL.md Section 5.5-5.7 for read operation details.
Example ¶
[snippet:reader]
package main
import (
"context"
"fmt"
"log"
"github.com/ahimsalabs/durable-streams-go/durablestream"
)
func main() {
ctx := context.Background()
client := durablestream.NewClient("http://localhost:4437/streams", nil)
// Create a reader starting from offset 0
reader := client.Reader("events", durablestream.ZeroOffset)
defer reader.Close()
for msg, err := range reader.Messages(ctx) {
if err != nil {
log.Fatal(err)
}
// Use msg.String() for text, msg.Bytes() for raw bytes,
// or msg.Decode(&v) for JSON
fmt.Println("Received:", msg.String())
}
}
func (*Reader) Messages ¶
Messages returns an iterator for reading messages from the stream. For JSON streams, it parses the JSON array and yields individual messages. Each Message can be decoded via msg.Decode(&v) or accessed as raw bytes via msg.Bytes().
This enables use with Go 1.22+ range-over-func:
for msg, err := range reader.Messages(ctx) {
if err != nil {
log.Fatal(err)
}
var event MyEvent
if err := msg.Decode(&event); err != nil {
log.Fatal(err)
}
}
func (*Reader) Read ¶
func (r *Reader) Read(ctx context.Context) (*StreamData, error)
Read performs a single read operation based on the current state. Returns the read result or an error.
During catch-up phase, uses basic GET requests (Section 5.5). After UpToDate, switches to live mode based on ReadMode:
- ReadModeAuto/ReadModeLongPoll: long-poll requests (Section 5.6)
- ReadModeSSE: Server-Sent Events stream (Section 5.7)
type SendOptions ¶
type SendOptions struct {
// Seq is an optional monotonic sequence number for writer coordination.
// If provided and less than or equal to the last sequence, returns ErrConflict.
Seq string
}
SendOptions specifies options for Send and SendJSON operations.
type Storage ¶
type Storage interface {
// Create creates a new stream. Returns (true, nil) if newly created.
// Returns (false, nil) if stream exists with matching config (idempotent).
// Returns (false, error) if stream exists with different config.
Create(ctx context.Context, streamID string, cfg StreamConfig) (created bool, err error)
// Append writes data to a stream. Returns the new tail offset.
// seq is optional sequence number for coordination (Section 5.2).
//
// The data slice is only valid for the duration of the call; the caller
// may reuse or modify it after Append returns. Implementations that need
// to retain the data (e.g., in-memory storage) must copy it.
Append(ctx context.Context, streamID string, data []byte, seq string) (Offset, error)
// Read returns messages from offset. limit is max total bytes to return.
// Returns messages and the next offset to read from (Section 5.5).
Read(ctx context.Context, streamID string, offset Offset, limit int) (*ReadResult, error)
// Head returns stream metadata without reading data (Section 5.4).
// Returns ErrNotFound if stream doesn't exist (use for existence checks).
Head(ctx context.Context, streamID string) (*StreamInfo, error)
// Delete removes a stream (Section 5.3).
Delete(ctx context.Context, streamID string) error
// WaitForData blocks until data is available at offset, then returns it.
// Returns immediately if data already exists at offset.
// Returns ctx.Err() on timeout/cancellation.
// Returns ErrNotFound if stream doesn't exist or is deleted while waiting.
WaitForData(ctx context.Context, streamID string, offset Offset, limit int) (*ReadResult, error)
// Close releases resources. Safe to call multiple times.
Close() error
}
Storage defines the interface for stream persistence. Implementations must be goroutine-safe.
type StoredMessage ¶
type StoredMessage struct {
Data []byte // Raw bytes of this message
Offset Offset // Offset after this message
}
StoredMessage represents a single message in a stream. Each append operation creates one StoredMessage (or multiple if JSON array is flattened).
type StreamConfig ¶
type StreamConfig struct {
ContentType string
TTL time.Duration // Zero means no TTL
ExpiresAt time.Time // Zero means no expiry
IsPrivate bool // If true, use Cache-Control: private (Section 8.1)
}
StreamConfig contains creation-time configuration.
func (StreamConfig) IsExpired ¶
func (c StreamConfig) IsExpired() bool
IsExpired checks if the config has expired based on its ExpiresAt field. Returns false if ExpiresAt is zero (no expiry).
func (StreamConfig) Matches ¶
func (c StreamConfig) Matches(other StreamConfig) bool
Matches checks if two StreamConfigs are equivalent for idempotent create. Content-Type is compared case-insensitively (per RFC 2045). ExpiresAt is only compared when TTL is zero (i.e., explicit Stream-Expires-At header).
type StreamData ¶
type StreamData struct {
Data []byte // Raw response bytes (empty on 204 timeout)
NextOffset Offset // Next offset to read from
Cursor string // Opaque cursor for long-poll
UpToDate bool // True if caught up to tail
}
StreamData contains the result of a stream read operation.
type StreamInfo ¶
type StreamInfo struct {
ContentType string
NextOffset Offset
TTL time.Duration // Zero means no TTL
ExpiresAt time.Time // Zero means no expiry
IsPrivate bool // If true, use Cache-Control: private (Section 8.1)
}
StreamInfo contains metadata about a stream.
type StreamWriter ¶
type StreamWriter struct {
// contains filtered or unexported fields
}
StreamWriter provides efficient append operations by caching stream metadata. Create via Client.Writer(). The writer holds no resources requiring cleanup. See PROTOCOL.md Section 5.2: Append to Stream.
func (*StreamWriter) Offset ¶
func (w *StreamWriter) Offset() Offset
Offset returns the current tail offset after the last successful append.
func (*StreamWriter) Send ¶
func (w *StreamWriter) Send(data []byte, opts *SendOptions) error
Send appends raw bytes to the stream (Section 5.2: Append to Stream).
func (*StreamWriter) SendJSON ¶
func (w *StreamWriter) SendJSON(v any, opts *SendOptions) error
SendJSON marshals v as JSON and appends to the stream.
type TransportClientConfig ¶
type TransportClientConfig struct {
// Timeout is the default timeout for all operations.
// Zero or negative values default to 30s.
Timeout time.Duration
// ReadMode specifies how live reads are handled after catch-up.
// Zero value defaults to ReadModeAuto.
ReadMode ReadMode
}
TransportClientConfig configures a Client created via NewClientWithTransport.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
internal
|
|
|
protocol
Package protocol contains internal HTTP protocol constants and utilities.
|
Package protocol contains internal HTTP protocol constants and utilities. |
|
Package storage provides offset formatting helpers for durable stream storage implementations.
|
Package storage provides offset formatting helpers for durable stream storage implementations. |
|
memorystorage
Package memorystore provides an in-memory implementation of durablestream.Storage.
|
Package memorystore provides an in-memory implementation of durablestream.Storage. |
|
Package testing provides test helpers for durablestream.
|
Package testing provides test helpers for durablestream. |
|
Package transport defines the interface for durable stream network operations.
|
Package transport defines the interface for durable stream network operations. |