dchan

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 17, 2026 License: MIT Imports: 23 Imported by: 0

README

dchan - Distributed Go Channels

A Go library that extends native channel semantics across a cluster of nodes. Each server is a peer -- no dedicated broker or administrator is required. Inspired by learnings from Distributed Systems (CS454) and Concurrency (CS343).

Quick Start

import "github.com/m4tth3/dchan"

// Every node creates a dchan instance with the same cluster info.
ch, _ := dchan.New(myAddr, "cluster-1", peerAddrs, "./data")
defer func() { ch.Close().Wait() }()

// Receive on a namespace (any node)
recvCh, closeRecv, _ := ch.Receive("orders", 10)
defer func() { closeRecv().Wait() }()
order := <-recvCh

// Send on the same namespace (any node)
sendCh, closeSend, _ := ch.Send("orders", 10)
defer func() { closeSend().Wait() }()
sendCh <- Order{ID: 1, Item: "widget"}

// Optionally track delivery with WithMessage
msg := dchan.WithMessage(Order{ID: 2}, 30*time.Second)
sendCh <- msg
if msg.Done() {
    // guaranteed received by a peer
}

Types sent through channels must be registered with gob.Register(MyType{}) so they can be encoded/decoded through any.

Architecture

dchan splits coordination and data transfer into two separate planes that share the same gRPC connections:

                        Raft Consensus (Control Plane)
                 ┌──────────────────────────────────────┐
                 │  Tracks which nodes receive which     │
                 │  namespaces. Applied via FSM.         │
                 │                                       │
                 │   Leader ◄──► Follower ◄──► Follower  │
                 └──────────────────────────────────────┘

                        gRPC (Data Plane)
                 ┌──────────────────────────────────────┐
                 │  Messages sent peer-to-peer.          │
                 │  Gob-encoded, round-robin across      │
                 │  receivers. Supports backpressure.     │
                 │                                       │
                 │   Node A ────message────► Node B      │
                 │   Node A ────message────► Node C      │
                 └──────────────────────────────────────┘
Send Flow
 ch <- value
    │
    ▼
 sender goroutine
    │
    ├─ 1. waitForTarget()      ◄── reads receiver set (populated by Raft FSM)
    │     (blocks if no receivers)
    │
    ├─ 2. gobEncode(value)
    │
    ├─ 3. gRPC Receive(data)   ──► target node pushes into local channel
    │     (round-robin, retry on reject)
    │
    └─ 4. done / next message
Receiver Registration Flow
 ch.Receive("ns", buf)
    │
    ├─ 1. Create local channel
    │
    ├─ 2. RegisterReceiver RPC ──► Raft leader
    │                                  │
    │                                  ▼
    │                           raft.Apply(cmd)
    │                                  │
    │                                  ▼
    │                           FSM adds this node to
    │                           receiver set for "ns"
    │                           (replicated to all nodes)
    │
    └─ 3. Return <-chan any to caller

Implementation Details

  • Raft (HashiCorp/Raft) provides sequentially consistent coordination of receiver state: which nodes are listening on which namespaces.
  • gRPC carries actual messages directly between sender and receiver nodes, bypassing the Raft log entirely.
  • Shared connections: the Raft transport and dchan message RPCs reuse the same gRPC server and client connections.
  • Gob encoding for messages. Users can send any Go type (including structs) registered with gob.Register.
  • At-most-once semantics: a message is delivered to exactly one receiver or not at all.
  • Backpressure: if a receiver's channel buffer is full, the sender blocks (or times out), naturally rate-limiting producers.
  • Smart-client round-robin: senders distribute messages across available receivers with a randomized starting offset.
Pros and Cons
Pros Cons
No dedicated broker -- every node is a peer Raft leader is a bottleneck for receiver state changes (register/unregister)
Messages travel directly sender-to-receiver (low latency) Messages are not persisted -- lost if the receiver crashes mid-flight
Native Go channel interface (chan<- any, <-chan any) At-most-once delivery; no built-in retries at the application level
Raft ensures all nodes agree on who is receiving Every node is a Raft voter, adding consensus overhead as the cluster grows
Connection reuse between Raft and messaging reduces overhead Cluster bootstrap requires a known set of initial addresses
Built-in backpressure from channel semantics Gob encoding requires type registration for custom types

Similar Projects

Project Limitation
distchan One connection per channel, single receiver
netchan Point-to-point only, no cluster coordination
protoactor-go Full actor framework, heavy boilerplate
goakt Full actor framework, requires separate discovery

dchan focuses on the mailbox primitive at the core of actor systems -- distributed channels -- without the framework overhead.

TODO

  • Transport layer with connection reuse
  • Raft FSM for receiver coordination
  • gRPC message passing with backpressure
  • Integration tests (multi-node send/receive)
  • Epidemic broadcast (Broadcast API)
  • Chunked transfer for large messages

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	DefaultSendTimeout    = 2 * time.Minute
	DefaultClusterTimeout = 2 * time.Minute
	DefaultSnapshotCount  = 3
	DefaultMaxJoinRetries = 5
	DefaultRetryDelay     = 3 * time.Second

	DefaultCallOptions = []grpc.CallOption{
		grpc.WaitForReady(true),
	}

	DefaultDialOptions = []grpc.DialOption{
		grpc.WithTransportCredentials(insecure.NewCredentials()),
	}

	DefaultServerOptions = []grpc.ServerOption{
		grpc.Creds(insecure.NewCredentials()),
	}
)
View Source
var (
	ErrNoLongerReceiving = errors.New("no longer receiving")
)

Functions

This section is empty.

Types

type BufferSize

type BufferSize int

type Chan

type Chan struct {
	Config
	// contains filtered or unexported fields
}

Chan is the interface for a distributed channel that can be used to send and receive messages between nodes. Note: it only works with exported fields (unexported fields are ignored).

Future optimization: - Start as a non-voter and upgrade to a voter if we start receiving messages. Idk if this is worth it tbh.

TODO: Support in the future. Use an epidemic broadcast approach. Broadcast(namespace Namespace, ctx context.Context) (chan<- T, context.CancelFunc, error)

func New

func New(address string, clusterId string, clusterAddresses []string, storeDir string, options ...Option) (*Chan, error)

New creates a new dChan with the given address, cluster ID, cluster addresses, and options. The address is the address of this server. The cluster ID is the cluster ID of this server. The cluster addresses are the addresses of initial cluster members (e.g. possibly itself). The options are the options for the dChan.

Note: we can join as a new node as long as we have a member of the cluster e.g. in initial cluster addresses.

Thus, we recommend having the same initial cluster addresses for all servers. Similarly, initial startup order should attempted to be the same as the cluster addresses.

func (*Chan) Close

func (d *Chan) Close() Future

func (*Chan) Receive

func (d *Chan) Receive(namespace Namespace, bufferSize BufferSize) (<-chan any, CloseFunc, error)

Receive returns a channel that can be used to receive messages from the distributed channel. Explicitly run CloseFunc to close the channel.

If a local channel already exists, the same instance is returned (bufferSize is ignored).

Note: The CloseFunc is asynchronous and returns a Future (asynchronous channel)

The channel can be used if items in the buffer. Otherwise check for channel closed.

func (*Chan) Send

func (d *Chan) Send(namespace Namespace, bufferSize BufferSize) (chan<- any, CloseFunc, error)

Send returns a channel that can be used to send messages to the distributed channel. Explicitly run CloseFunc when the channel is no longer needed.

The bufferSize is Sender + Receiver buffer sizes. Use a send deadline to avoid blocking if that's desired.

If a local channel already exists, the same instance is returned (bufferSize is ignored).

Note: The CloseFunc is asynchronous and returns a Future (asynchronous channel).

The channel should not be used after the CloseFunc is called.

type CloseFunc

type CloseFunc func() Future

type Config

type Config struct {
	// Server ID of this server.
	Id ServerID

	// Base directory for the store.
	StoreDir string

	// Every node must have this unique cluster ID.
	//
	// Currently used for file store names, but possibly we should
	// append it to the address as an ID.
	ClusterId string

	// Cluster addresses of initial cluster members (e.g. possibly itself).
	ClusterAddresses []string

	// Timeout for sending messages
	// Default to 2 minute
	//
	// For large messages, suggest setting a larger timeout OR
	// using a WithTimeout message.
	SendTimeout time.Duration

	// Timeout for cluster operations
	// Default to 2 minute
	ClusterTimeout time.Duration

	// Dial options for the gRPC clients to connect to other servers.
	DialOptions []grpc.DialOption

	// We should have WaitForReady option for the gRPC clients to connect to other servers.
	// Used for messages and cluster operations.
	CallOptions []grpc.CallOption

	// Server options for the gRPC server.
	ServerOptions []grpc.ServerOption

	// Snapshot count for the raft snapshot store.
	SnapshotCount int

	// Max number of retries to join the cluster.
	MaxJoinRetries int

	// Delay between retries.
	RetryDelay time.Duration
}

func DefaultConfig

func DefaultConfig() Config

type CustomEncodable

type CustomEncodable interface {
	gob.Encoder
	gob.Decoder
}

Implement this interface for custom encoding/decoding.

type Future

type Future struct {
	// contains filtered or unexported fields
}

func (Future) Wait

func (f Future) Wait() error

Wait blocks the future until it's complete and returns (possibly an error).

type Message

type Message struct {
	// contains filtered or unexported fields
}

Message is a struct that can be used to send a message with a wait until the message is sent or the context is done.

func WithMessage

func WithMessage(obj any, timeout time.Duration) Message

WithMessage creates a new Message object that can be used to send a message to wait until the message is sent or the timeout is reached.

This timeout overrides the default SendTimeout in the Config.

If timeout is non-zero, it will use the provided timeout else default timeout

func (Message) Done

func (w Message) Done() bool

Done waits until the message is sent or the context is done.

Returns true if the message is guaranteed sent (e.g. target received it). False if the message is possibly sent or not at all (e.g. context is done)

Guarantees at most once semantics.

type Namespace

type Namespace string

type Option

type Option func(*Chan) error

func WithCallOptions

func WithCallOptions(options ...grpc.CallOption) Option

func WithClusterTimeout

func WithClusterTimeout(timeout time.Duration) Option

WithClusterTimeout configures the cluster timeout. Default to 2 minute.

Don't recommend using a very small timeout.

func WithConfig

func WithConfig(config *Config) Option

WithConfig configures the dChan with the given config.

func WithDialOptions

func WithDialOptions(options ...grpc.DialOption) Option

func WithGrpcServer

func WithGrpcServer(addr string, server *grpc.Server) Option

Optionally reuse a gRPC server that exists.

func WithSendTimeout

func WithSendTimeout(timeout time.Duration) Option

WithSendTimeout configures the send timeout. Default to 2 minute.

Don't recommend using a very small timeout.

func WithServerOptions

func WithServerOptions(options ...grpc.ServerOption) Option

func WithSnapshotCount

func WithSnapshotCount(count int) Option

type RpcClient

type RpcClient = p.DChanServiceClient

type ServerID

type ServerID string

Directories

Path Synopsis
Package transport provides a Transport for github.com/hashicorp/raft over gRPC.
Package transport provides a Transport for github.com/hashicorp/raft over gRPC.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL