Documentation
¶
Index ¶
- Variables
- type BufferSize
- type Chan
- type CloseFunc
- type Config
- type CustomEncodable
- type Future
- type Message
- type Namespace
- type Option
- func WithCallOptions(options ...grpc.CallOption) Option
- func WithClusterTimeout(timeout time.Duration) Option
- func WithConfig(config *Config) Option
- func WithDialOptions(options ...grpc.DialOption) Option
- func WithGrpcServer(addr string, server *grpc.Server) Option
- func WithSendTimeout(timeout time.Duration) Option
- func WithServerOptions(options ...grpc.ServerOption) Option
- func WithSnapshotCount(count int) Option
- type RpcClient
- type ServerID
Constants ¶
This section is empty.
Variables ¶
var ( DefaultSendTimeout = 2 * time.Minute DefaultClusterTimeout = 2 * time.Minute DefaultSnapshotCount = 3 DefaultMaxJoinRetries = 5 DefaultRetryDelay = 3 * time.Second DefaultCallOptions = []grpc.CallOption{ grpc.WaitForReady(true), } DefaultDialOptions = []grpc.DialOption{ grpc.WithTransportCredentials(insecure.NewCredentials()), } DefaultServerOptions = []grpc.ServerOption{ grpc.Creds(insecure.NewCredentials()), } )
var (
ErrNoLongerReceiving = errors.New("no longer receiving")
)
Functions ¶
This section is empty.
Types ¶
type BufferSize ¶
type BufferSize int
type Chan ¶
type Chan struct {
Config
// contains filtered or unexported fields
}
Chan is the interface for a distributed channel that can be used to send and receive messages between nodes. Note: it only works with exported fields (unexported fields are ignored).
Future optimization: - Start as a non-voter and upgrade to a voter if we start receiving messages. Idk if this is worth it tbh.
TODO: Support in the future. Use an epidemic broadcast approach. Broadcast(namespace Namespace, ctx context.Context) (chan<- T, context.CancelFunc, error)
func New ¶
func New(address string, clusterId string, clusterAddresses []string, storeDir string, options ...Option) (*Chan, error)
New creates a new dChan with the given address, cluster ID, cluster addresses, and options. The address is the address of this server. The cluster ID is the cluster ID of this server. The cluster addresses are the addresses of initial cluster members (e.g. possibly itself). The options are the options for the dChan.
Note: we can join as a new node as long as we have a member of the cluster e.g. in initial cluster addresses.
Thus, we recommend having the same initial cluster addresses for all servers. Similarly, initial startup order should attempted to be the same as the cluster addresses.
func (*Chan) Receive ¶
Receive returns a channel that can be used to receive messages from the distributed channel. Explicitly run CloseFunc to close the channel.
If a local channel already exists, the same instance is returned (bufferSize is ignored).
Note: The CloseFunc is asynchronous and returns a Future (asynchronous channel)
The channel can be used if items in the buffer. Otherwise check for channel closed.
func (*Chan) Send ¶
Send returns a channel that can be used to send messages to the distributed channel. Explicitly run CloseFunc when the channel is no longer needed.
The bufferSize is Sender + Receiver buffer sizes. Use a send deadline to avoid blocking if that's desired.
If a local channel already exists, the same instance is returned (bufferSize is ignored).
Note: The CloseFunc is asynchronous and returns a Future (asynchronous channel).
The channel should not be used after the CloseFunc is called.
type Config ¶
type Config struct {
// Server ID of this server.
Id ServerID
// Base directory for the store.
StoreDir string
// Every node must have this unique cluster ID.
//
// Currently used for file store names, but possibly we should
// append it to the address as an ID.
ClusterId string
// Cluster addresses of initial cluster members (e.g. possibly itself).
ClusterAddresses []string
// Timeout for sending messages
// Default to 2 minute
//
// For large messages, suggest setting a larger timeout OR
// using a WithTimeout message.
SendTimeout time.Duration
// Timeout for cluster operations
// Default to 2 minute
ClusterTimeout time.Duration
// Dial options for the gRPC clients to connect to other servers.
DialOptions []grpc.DialOption
// We should have WaitForReady option for the gRPC clients to connect to other servers.
// Used for messages and cluster operations.
CallOptions []grpc.CallOption
// Server options for the gRPC server.
ServerOptions []grpc.ServerOption
// Snapshot count for the raft snapshot store.
SnapshotCount int
// Max number of retries to join the cluster.
MaxJoinRetries int
// Delay between retries.
RetryDelay time.Duration
}
func DefaultConfig ¶
func DefaultConfig() Config
type CustomEncodable ¶
Implement this interface for custom encoding/decoding.
type Message ¶
type Message struct {
// contains filtered or unexported fields
}
Message is a struct that can be used to send a message with a wait until the message is sent or the context is done.
func WithMessage ¶
WithMessage creates a new Message object that can be used to send a message to wait until the message is sent or the timeout is reached.
This timeout overrides the default SendTimeout in the Config.
If timeout is non-zero, it will use the provided timeout else default timeout
type Option ¶
func WithCallOptions ¶
func WithCallOptions(options ...grpc.CallOption) Option
func WithClusterTimeout ¶
WithClusterTimeout configures the cluster timeout. Default to 2 minute.
Don't recommend using a very small timeout.
func WithConfig ¶
WithConfig configures the dChan with the given config.
func WithDialOptions ¶
func WithDialOptions(options ...grpc.DialOption) Option
func WithGrpcServer ¶
Optionally reuse a gRPC server that exists.
func WithSendTimeout ¶
WithSendTimeout configures the send timeout. Default to 2 minute.
Don't recommend using a very small timeout.
func WithServerOptions ¶
func WithServerOptions(options ...grpc.ServerOption) Option
func WithSnapshotCount ¶
type RpcClient ¶
type RpcClient = p.DChanServiceClient