Documentation
¶
Overview ¶
Package pipeline provides composable stages for building concurrent data processing pipelines using channels. It handles context cancellation, error propagation, and goroutine lifecycle management automatically.
Index ¶
- Variables
- func Drain(err error) error
- func DrainChannel[T any](in <-chan T)
- func ErrorInStage(name string, err error) error
- func IsDrainError(err error) bool
- func IsSkipError(err error) bool
- func RegisterChannel[T any](t *Telemetry, name string, ch chan T)
- func RegisterChannelWithCounters[T any](t *Telemetry, name string, ch chan T) (sends *atomic.Int64, recvs *atomic.Int64)
- func RegisterCounter(t *Telemetry, name string) (sends *atomic.Int64, recvs *atomic.Int64)
- func Skip(err error) error
- type ChannelSnapshot
- type Composer
- type Config
- type Context
- type DrainError
- type InputChannel
- type LogCollector
- type MetricsCollector
- type MultiChannelReceiver
- func (m MultiChannelReceiver[T]) At(index uint) <-chan T
- func (m MultiChannelReceiver[T]) Iter() iter.Seq[<-chan T]
- func (m MultiChannelReceiver[T]) Len() int
- func (m MultiChannelReceiver[T]) SinkAt(ctx context.Context, index uint) []T
- func (m MultiChannelReceiver[T]) SinkAtIter(ctx context.Context, index uint) iter.Seq[T]
- type MultiChannelSender
- func (m MultiChannelSender[T]) At(index uint) chan<- T
- func (m MultiChannelSender[T]) Iter() iter.Seq[chan<- T]
- func (m MultiChannelSender[T]) Len() int
- func (m MultiChannelSender[T]) Link(ctx Context, index uint, in <-chan T) error
- func (m MultiChannelSender[T]) LinkAll(ctx Context, in MultiChannelReceiver[T]) error
- func (m MultiChannelSender[T]) Send(ctx context.Context, index uint, values ...T) error
- func (m MultiChannelSender[T]) SendRoundRobin(ctx context.Context, values ...T) error
- func (m MultiChannelSender[T]) SendToAll(ctx context.Context, values ...T) error
- type NoopCollector
- type OutputChannel
- type Pipeline
- type PipelineSnapshot
- type SkipError
- type StageError
- type Telemetry
Constants ¶
This section is empty.
Variables ¶
var (
ErrInvalidChannel = errors.New("channel link size mismatch")
)
Functions ¶
func Drain ¶ added in v4.1.0
Drain wraps an error as a DrainError. When returned from a Transformer or Expander, the current item is dropped and the stage drains all remaining input without processing it, allowing upstream stages to complete and downstream stages to flush what they already have.
func DrainChannel ¶ added in v4.1.0
func DrainChannel[T any](in <-chan T)
DrainChannel all values from the upstream channel
func ErrorInStage ¶ added in v4.3.0
ErrorInStage wraps an error as a StageError
func IsDrainError ¶ added in v4.1.0
func IsSkipError ¶ added in v4.1.0
func RegisterChannel ¶ added in v4.4.0
RegisterChannel registers a channel for periodic len/cap sampling. If t is nil the call is a no-op.
func RegisterChannelWithCounters ¶ added in v4.4.0
func RegisterChannelWithCounters[T any](t *Telemetry, name string, ch chan T) (sends *atomic.Int64, recvs *atomic.Int64)
RegisterChannelWithCounters registers a channel for periodic len/cap sampling and returns atomic counters that the caller should increment on each send and receive. If t is nil the call returns (nil, nil) and the caller should skip incrementing.
func RegisterCounter ¶ added in v4.4.0
RegisterCounter registers a named throughput counter pair without an associated channel. This is useful for tracking items flowing through a link or other pathway where a bidirectional channel reference is not available. If t is nil the call returns (nil, nil).
Types ¶
type ChannelSnapshot ¶ added in v4.4.0
type ChannelSnapshot struct {
// Name identifies the channel (e.g. "transform", "input[0]").
Name string
// Len is the number of items currently buffered in the channel.
// Always 0 for unbuffered channels.
Len int
// Cap is the total buffer capacity of the channel.
// 0 indicates an unbuffered channel.
Cap int
// Utilization is Len/Cap as a value between 0.0 and 1.0.
// It is 0.0 when the channel is unbuffered (Cap == 0).
// Not meaningful for unbuffered channels; use throughput counters instead.
Utilization float64
// TotalSent is the cumulative number of items sent into the channel.
// It is -1 when throughput counting is not enabled for this channel.
TotalSent int64
// TotalRecv is the cumulative number of items received from the channel.
// It is -1 when throughput counting is not enabled for this channel.
TotalRecv int64
}
ChannelSnapshot is a point-in-time view of a single registered channel's state.
When a channel is unbuffered (Cap == 0), Len and Utilization are always 0. Buffer utilization is not meaningful for unbuffered channels because items are transferred directly between sender and receiver with no intermediate buffering. For unbuffered channels, use TotalSent/TotalRecv throughput counters (available on Link paths and Meter'd channels) to compare processing rates between stages and identify bottlenecks.
type Composer ¶
func (Composer[In, Out]) Inputs ¶
func (c Composer[In, Out]) Inputs() MultiChannelReceiver[In]
func (Composer[In, Out]) Outputs ¶
func (c Composer[In, Out]) Outputs() MultiChannelSender[Out]
type Config ¶
type Config[In any, Out any] struct { Name string InputChannels uint InputBufferSize uint OutputChannels uint OutputBufferSize uint Composer func(Composer[In, Out]) error // MetricsCollector, when non-nil, enables telemetry for the pipeline. // All pipeline boundary channels (inputs and outputs) are automatically // registered. Built-in stages register their internal channels as well. // The collector receives periodic snapshots of channel buffer utilization // and throughput data suitable for bottleneck detection. MetricsCollector MetricsCollector // MetricsInterval controls how often telemetry snapshots are collected // and delivered to the MetricsCollector. It is ignored when // MetricsCollector is nil. The default is 1 second. MetricsInterval time.Duration }
Config defines the make up of a pipeline and is required for construction of it
type DrainError ¶ added in v4.1.0
type DrainError struct {
Err error
}
DrainError signals that the current item should be skipped AND the stage should stop processing new items. The stage drains its input channel to unblock upstream goroutines, then shuts down gracefully. The wrapped error is silently discarded and does not appear in the pipeline's returned error.
func (*DrainError) Error ¶ added in v4.1.0
func (e *DrainError) Error() string
func (*DrainError) Unwrap ¶ added in v4.1.0
func (e *DrainError) Unwrap() error
type InputChannel ¶
type InputChannel[T any] interface { ~chan T | ~<-chan T }
type LogCollector ¶ added in v4.4.0
type LogCollector struct {
// Logger is the slog.Logger used to emit telemetry messages.
// When nil, slog.Default() is used.
Logger *slog.Logger
// Level is the log level for telemetry messages. The zero value
// corresponds to slog.LevelInfo.
Level slog.Level
}
LogCollector writes structured telemetry snapshots using slog. Each snapshot produces one log message per registered channel, making it straightforward to filter and alert on individual channels in log aggregation systems.
func (*LogCollector) OnSnapshot ¶ added in v4.4.0
func (c *LogCollector) OnSnapshot(snapshot PipelineSnapshot)
OnSnapshot logs one structured message per channel in the snapshot.
type MetricsCollector ¶ added in v4.4.0
type MetricsCollector interface {
// OnSnapshot is called periodically with a snapshot of all registered channels.
OnSnapshot(PipelineSnapshot)
}
MetricsCollector receives periodic telemetry snapshots from a pipeline. Implementations must be safe for concurrent use.
type MultiChannelReceiver ¶
type MultiChannelReceiver[T any] []<-chan T
func NewMultiChannelReceiver ¶
func NewMultiChannelReceiver[T any, C InputChannel[T]](in ...C) MultiChannelReceiver[T]
func (MultiChannelReceiver[T]) At ¶
func (m MultiChannelReceiver[T]) At(index uint) <-chan T
func (MultiChannelReceiver[T]) Iter ¶
func (m MultiChannelReceiver[T]) Iter() iter.Seq[<-chan T]
func (MultiChannelReceiver[T]) Len ¶
func (m MultiChannelReceiver[T]) Len() int
func (MultiChannelReceiver[T]) SinkAt ¶
func (m MultiChannelReceiver[T]) SinkAt(ctx context.Context, index uint) []T
func (MultiChannelReceiver[T]) SinkAtIter ¶
type MultiChannelSender ¶
type MultiChannelSender[T any] []chan<- T
func NewMultiChannelSender ¶
func NewMultiChannelSender[T any, C OutputChannel[T]](out ...C) MultiChannelSender[T]
func (MultiChannelSender[T]) At ¶
func (m MultiChannelSender[T]) At(index uint) chan<- T
func (MultiChannelSender[T]) Iter ¶
func (m MultiChannelSender[T]) Iter() iter.Seq[chan<- T]
func (MultiChannelSender[T]) Len ¶
func (m MultiChannelSender[T]) Len() int
func (MultiChannelSender[T]) Link ¶
func (m MultiChannelSender[T]) Link(ctx Context, index uint, in <-chan T) error
func (MultiChannelSender[T]) LinkAll ¶
func (m MultiChannelSender[T]) LinkAll(ctx Context, in MultiChannelReceiver[T]) error
func (MultiChannelSender[T]) Send ¶
func (m MultiChannelSender[T]) Send(ctx context.Context, index uint, values ...T) error
func (MultiChannelSender[T]) SendRoundRobin ¶
func (m MultiChannelSender[T]) SendRoundRobin(ctx context.Context, values ...T) error
type NoopCollector ¶ added in v4.4.0
type NoopCollector struct{}
NoopCollector silently discards all telemetry snapshots. It is useful as a placeholder or for benchmarking the overhead of the telemetry registry without any I/O.
func (NoopCollector) OnSnapshot ¶ added in v4.4.0
func (NoopCollector) OnSnapshot(PipelineSnapshot)
OnSnapshot discards the snapshot.
type OutputChannel ¶
type OutputChannel[T any] interface { ~chan T | ~chan<- T }
type Pipeline ¶
Pipeline coordinates concurrent processing stages, managing their lifecycle and propagating errors and cancellation signals across all stages.
func NewPipeline ¶
func NewPipeline[In any, Out any](ctx context.Context, cfg Config[In, Out]) (*Pipeline[In, Out], context.Context, error)
NewPipeline creates a new Pipeline and a derived context for coordinating pipeline stages. The returned context is cancelled when any stage encounters an error. Use the returned Pipeline to register stages and wait for completion.
func (*Pipeline[In, Out]) CloseAllInputs ¶
func (p *Pipeline[In, Out]) CloseAllInputs()
CloseAllInputs will close all of the input channels. It is safe to call multiple times; only the first call will close the channels.
func (*Pipeline[In, Out]) Inputs ¶
func (p *Pipeline[In, Out]) Inputs() MultiChannelSender[In]
func (*Pipeline[In, Out]) Outputs ¶
func (p *Pipeline[In, Out]) Outputs() MultiChannelReceiver[Out]
type PipelineSnapshot ¶ added in v4.4.0
type PipelineSnapshot struct {
// PipelineName is the name of the pipeline as provided in Config.Name.
PipelineName string
// Timestamp is the time at which the snapshot was taken.
Timestamp time.Time
// Channels contains one entry per registered channel.
Channels []ChannelSnapshot
}
PipelineSnapshot is a point-in-time view of all registered channels in a pipeline.
type SkipError ¶ added in v4.1.0
type SkipError struct {
Err error
}
SkipError signals that the current item should be skipped without killing the pipeline. The stage continues processing subsequent items normally. The wrapped error is silently discarded and does not appear in the pipeline's returned error.
type StageError ¶ added in v4.3.0
StageError signals that the error originated from a pipeline stage. The error will include the name of the stage that the error occurred in.
func (*StageError) Error ¶ added in v4.3.0
func (e *StageError) Error() string
func (*StageError) Unwrap ¶ added in v4.3.0
func (e *StageError) Unwrap() error
type Telemetry ¶ added in v4.4.0
type Telemetry struct {
// contains filtered or unexported fields
}
Telemetry is a registry of channels and a periodic sampler that reports snapshots through a MetricsCollector. A nil *Telemetry is safe to use; all methods and the package-level registration functions treat nil as a no-op.
func NewTelemetry ¶ added in v4.4.0
func NewTelemetry(pipelineName string, collector MetricsCollector, interval time.Duration) *Telemetry
NewTelemetry creates a new telemetry registry. The sampler is not started until Start is called.
func (*Telemetry) Snapshot ¶ added in v4.4.0
func (t *Telemetry) Snapshot() PipelineSnapshot
Snapshot takes an immediate point-in-time snapshot of all registered channels.