pipeline

package module
v4.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 22, 2026 License: MIT Imports: 10 Imported by: 0

README

pipeline

A Go package for building concurrent data processing pipelines using channels.

Overview

pipeline provides a set of composable stages for processing data streams concurrently. It handles context cancellation, error propagation, and goroutine lifecycle management automatically.

Stages are defined as structs in the stages package. Each stage has a Create method that integrates it into a pipeline by connecting input and output channels.

The package uses Go's runtime/trace to create trace regions for each stage, allowing for detailed performance analysis.

Installation

go get github.com/schraf/pipeline/v4

Usage

Basic Example

To create a pipeline, you define a Config and use NewPipeline. You then connect stages by calling their Create methods inside the Composer function.

package main

import (
	"context"
	"fmt"

	"github.com/schraf/pipeline/v4"
	"github.com/schraf/pipeline/v4/stages"
)

func main() {
	ctx := context.Background()

	// Transform stage: multiply by 2
	transformStage := stages.TransformStage[int, int]{
		Name:   "multiply",
		Buffer: 10,
		Transformer: func(ctx context.Context, x int) (*int, error) {
			result := x * 2
			return &result, nil
		},
	}

    // Configure the pipeline
	cfg := pipeline.Config[int, int]{
		Name:             "example",
		InputBufferSize:  10,
		OutputBufferSize: 10,
		Composer: func(composer pipeline.Composer[int, int]) {
			ctx := composer.Context()
			inputs := composer.Inputs()
			outputs := composer.Outputs()

			out := transformStage.Create(ctx, inputs.At(0))

			outputs.Link(ctx, 0, out)
		},
	}

	pipe, _ := pipeline.NewPipeline(ctx, cfg)

	pipe.Start()

	// Feed data into the pipeline
	go func() {
		defer pipe.CloseAllInputs()
		for i := 1; i <= 5; i++ {
			pipe.Inputs().Send(ctx, 0, i)
		}
	}()

	// Consume results from the last stage's output channel
	for v := range pipe.Outputs().SinkAtIter(ctx, 0) {
		fmt.Println(v)
	}

	// Wait for completion and check for errors
	if err := pipe.Wait(); err != nil {
		panic(err)
	}
}

Pipeline Configuration

The Config struct specifies the pipeline's basic parameters:

  • Name: Used for tracing and identification.
  • InputChannels: Number of input channels to create (defaults to 1).
  • InputBufferSize: Buffer size for each input channel.
  • OutputChannels: Number of output channels to create (defaults to 1).
  • OutputBufferSize: Buffer size for each output channel.
  • Composer: Function used to connect the pipeline inputs to outputs.

Pipeline Stages

Stages are located in the github.com/schraf/pipeline/v4/stages package.

Transform

Applies a transformation function to each value:

stage := stages.TransformStage[int, int]{
    Name:   "transform",
    Buffer: 10,
    Transformer: func(ctx context.Context, x int) (*int, error) {
        result := x * 2
        return &result, nil
    },
}
out := stage.Create(ctx, in)
Filter

Filters values based on a predicate:

stage := stages.FilterStage[int]{
    Name:   "filter",
    Buffer: 10,
    Filter: func(ctx context.Context, x int) (bool, error) {
        return x%2 == 0, nil
    },
}
out := stage.Create(ctx, in)
ParallelTransform

Applies transformation with multiple concurrent workers:

stage := stages.ParallelTransformStage[int, int]{
    Name:    "parallel",
    Buffer:  10,
    Workers: 5,
    Transformer: func(ctx context.Context, x int) (*int, error) {
        return &x, nil
    },
}
out := stage.Create(ctx, in)
Batch

Groups values into fixed-size batches:

stage := stages.BatchStage[int, []int]{
    Name:      "batch",
    Buffer:    10,
    BatchSize: 5,
}
out := stage.Create(ctx, in)
FanIn

Merges multiple input channels into one:

stage := stages.FanInStage[int]{
    Name:   "fan-in",
    Buffer: 10,
}
out := stage.Create(ctx, multiIn) // multiIn is a MultiChannelReceiver
FanOut

Distributes values to multiple output channels (broadcast):

stage := stages.FanOutStage[int]{
    Name:        "fan-out",
    OutputCount: 3,
    Buffer:      10,
}
outputs := stage.Create(ctx, in) // Returns MultiChannelReceiver
FanOutRoundRobin

Distributes values to multiple output channels in a round-robin fashion:

stage := stages.FanOutRoundRobinStage[int]{
    Name:        "fan-out-rr",
    OutputCount: 3,
    Buffer:      10,
}
outputs := stage.Create(ctx, in)
Split

Routes values to different channels based on a selector:

stage := stages.SplitStage[int]{
    Name:        "split",
    OutputCount: 3,
    Buffer:      10,
    Selector: func(ctx context.Context, x int) int {
        return x % 3
    },
}
outputs := stage.Create(ctx, in)
Reduce

Processes values incrementally using a reducer function:

stage := stages.ReduceStage[int, int]{
    Name:         "reduce",
    Buffer:       1,
    InitialValue: 0,
    Reducer: func(ctx context.Context, acc int, x int) (int, error) {
        return acc + x, nil
    },
}
out := stage.Create(ctx, in)
WindowedReduce

Processes values incrementally allowing intermediate results and state resets:

stage := stages.WindowedReduceStage[int, int]{
    Name:    "windowed-reduce",
    Buffer:  1,
    Initial: 0,
    Reducer: func(ctx context.Context, acc int, val int) (int, int, bool, error) {
        acc += val
        // Emit and reset when sum >= 10
        if acc >= 10 {
            return 0, acc, true, nil
        }
        return acc, 0, false, nil
    },
}
out := stage.Create(ctx, in)
Flatten

Takes an input channel of slices and emits each element individually:

stage := stages.FlattenStage[int]{
    Name:   "flatten",
    Buffer: 10,
}
out := stage.Create(ctx, inSlices)
Limit

Limits the number of values passed through the stage:

stage := stages.LimitStage[int]{
    Name:   "limit",
    Buffer: 10,
    Limit:  5,
}
out := stage.Create(ctx, in)
Expand

Lazily expands single input items into multiple items using an iterator:

stage := stages.ExpandStage[int, int]{
    Name:   "expand",
    Buffer: 10,
    Expander: func(ctx context.Context, x int) iter.Seq2[int, error] {
        return func(yield func(int, error) bool) {
            yield(x, nil)
            yield(x*10, nil)
        }
    },
}
out := stage.Create(ctx, in)
Aggregate

Collects all values into a single slice:

stage := stages.AggregateStage[int]{
    Name:   "aggregate",
    Buffer: 1,
}
out := stage.Create(ctx, in) // out is <-chan []int

Telemetry

Pipelines support opt-in telemetry for monitoring channel throughput and buffer utilization, useful for identifying bottlenecks. Enable it by setting MetricsCollector in the pipeline config:

cfg := pipeline.Config[int, int]{
    Name:            "example",
    InputBufferSize: 50,
    MetricsCollector: &pipeline.LogCollector{
        Logger: slog.New(slog.NewJSONHandler(os.Stderr, nil)),
        Level:  slog.LevelInfo,
    },
    MetricsInterval: 500 * time.Millisecond,
    Composer: func(c pipeline.Composer[int, int]) error {
        // stages automatically register their channels
        out := transformStage.Create(c.Context(), c.Inputs().At(0))
        return c.Outputs().Link(c.Context(), 0, out)
    },
}

The LogCollector emits one structured slog message per channel per tick with len, cap, utilization, and throughput counters. A consistently full channel upstream of a stage with an empty channel downstream indicates a bottleneck.

Implement the MetricsCollector interface to send telemetry to a custom backend:

type MetricsCollector interface {
    OnSnapshot(PipelineSnapshot)
}

When MetricsCollector is nil (the default), no telemetry code runs and there is zero overhead.

Error Handling

The pipeline automatically cancels all stages when an error occurs. The first error encountered is captured and returned by Wait():

if err := p.Wait(); err != nil {
    log.Fatal(err)
}

Requirements

  • Go 1.24.0 or later

License

See LICENSE file for details.

Documentation

Overview

Package pipeline provides composable stages for building concurrent data processing pipelines using channels. It handles context cancellation, error propagation, and goroutine lifecycle management automatically.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInvalidChannel = errors.New("channel link size mismatch")
)

Functions

func Drain added in v4.1.0

func Drain(err error) error

Drain wraps an error as a DrainError. When returned from a Transformer or Expander, the current item is dropped and the stage drains all remaining input without processing it, allowing upstream stages to complete and downstream stages to flush what they already have.

func DrainChannel added in v4.1.0

func DrainChannel[T any](in <-chan T)

DrainChannel all values from the upstream channel

func ErrorInStage added in v4.3.0

func ErrorInStage(name string, err error) error

ErrorInStage wraps an error as a StageError

func IsDrainError added in v4.1.0

func IsDrainError(err error) bool

func IsSkipError added in v4.1.0

func IsSkipError(err error) bool

func RegisterChannel added in v4.4.0

func RegisterChannel[T any](t *Telemetry, name string, ch chan T)

RegisterChannel registers a channel for periodic len/cap sampling. If t is nil the call is a no-op.

func RegisterChannelWithCounters added in v4.4.0

func RegisterChannelWithCounters[T any](t *Telemetry, name string, ch chan T) (sends *atomic.Int64, recvs *atomic.Int64)

RegisterChannelWithCounters registers a channel for periodic len/cap sampling and returns atomic counters that the caller should increment on each send and receive. If t is nil the call returns (nil, nil) and the caller should skip incrementing.

func RegisterCounter added in v4.4.0

func RegisterCounter(t *Telemetry, name string) (sends *atomic.Int64, recvs *atomic.Int64)

RegisterCounter registers a named throughput counter pair without an associated channel. This is useful for tracking items flowing through a link or other pathway where a bidirectional channel reference is not available. If t is nil the call returns (nil, nil).

func Skip added in v4.1.0

func Skip(err error) error

Skip wraps an error as a SkipError. When returned from a Transformer or Expander, the current item is dropped and processing continues with the next item.

Types

type ChannelSnapshot added in v4.4.0

type ChannelSnapshot struct {
	// Name identifies the channel (e.g. "transform", "input[0]").
	Name string

	// Len is the number of items currently buffered in the channel.
	// Always 0 for unbuffered channels.
	Len int

	// Cap is the total buffer capacity of the channel.
	// 0 indicates an unbuffered channel.
	Cap int

	// Utilization is Len/Cap as a value between 0.0 and 1.0.
	// It is 0.0 when the channel is unbuffered (Cap == 0).
	// Not meaningful for unbuffered channels; use throughput counters instead.
	Utilization float64

	// TotalSent is the cumulative number of items sent into the channel.
	// It is -1 when throughput counting is not enabled for this channel.
	TotalSent int64

	// TotalRecv is the cumulative number of items received from the channel.
	// It is -1 when throughput counting is not enabled for this channel.
	TotalRecv int64
}

ChannelSnapshot is a point-in-time view of a single registered channel's state.

When a channel is unbuffered (Cap == 0), Len and Utilization are always 0. Buffer utilization is not meaningful for unbuffered channels because items are transferred directly between sender and receiver with no intermediate buffering. For unbuffered channels, use TotalSent/TotalRecv throughput counters (available on Link paths and Meter'd channels) to compare processing rates between stages and identify bottlenecks.

type Composer

type Composer[In any, Out any] struct {
	// contains filtered or unexported fields
}

func (Composer[In, Out]) Context

func (c Composer[In, Out]) Context() Context

func (Composer[In, Out]) Inputs

func (c Composer[In, Out]) Inputs() MultiChannelReceiver[In]

func (Composer[In, Out]) Outputs

func (c Composer[In, Out]) Outputs() MultiChannelSender[Out]

type Config

type Config[In any, Out any] struct {
	Name             string
	InputChannels    uint
	InputBufferSize  uint
	OutputChannels   uint
	OutputBufferSize uint
	Composer         func(Composer[In, Out]) error

	// MetricsCollector, when non-nil, enables telemetry for the pipeline.
	// All pipeline boundary channels (inputs and outputs) are automatically
	// registered. Built-in stages register their internal channels as well.
	// The collector receives periodic snapshots of channel buffer utilization
	// and throughput data suitable for bottleneck detection.
	MetricsCollector MetricsCollector

	// MetricsInterval controls how often telemetry snapshots are collected
	// and delivered to the MetricsCollector. It is ignored when
	// MetricsCollector is nil. The default is 1 second.
	MetricsInterval time.Duration
}

Config defines the make up of a pipeline and is required for construction of it

type Context

type Context struct {
	context.Context
	// contains filtered or unexported fields
}

func (Context) Go

func (c Context) Go(name string, fn func(ctx context.Context) error)

func (Context) Telemetry added in v4.4.0

func (c Context) Telemetry() *Telemetry

Telemetry returns the pipeline's telemetry registry, or nil when telemetry is disabled. Stage authors can use this to register their internal channels for buffer utilization monitoring.

type DrainError added in v4.1.0

type DrainError struct {
	Err error
}

DrainError signals that the current item should be skipped AND the stage should stop processing new items. The stage drains its input channel to unblock upstream goroutines, then shuts down gracefully. The wrapped error is silently discarded and does not appear in the pipeline's returned error.

func (*DrainError) Error added in v4.1.0

func (e *DrainError) Error() string

func (*DrainError) Unwrap added in v4.1.0

func (e *DrainError) Unwrap() error

type InputChannel

type InputChannel[T any] interface {
	~chan T | ~<-chan T
}

type LogCollector added in v4.4.0

type LogCollector struct {
	// Logger is the slog.Logger used to emit telemetry messages.
	// When nil, slog.Default() is used.
	Logger *slog.Logger

	// Level is the log level for telemetry messages. The zero value
	// corresponds to slog.LevelInfo.
	Level slog.Level
}

LogCollector writes structured telemetry snapshots using slog. Each snapshot produces one log message per registered channel, making it straightforward to filter and alert on individual channels in log aggregation systems.

func (*LogCollector) OnSnapshot added in v4.4.0

func (c *LogCollector) OnSnapshot(snapshot PipelineSnapshot)

OnSnapshot logs one structured message per channel in the snapshot.

type MetricsCollector added in v4.4.0

type MetricsCollector interface {
	// OnSnapshot is called periodically with a snapshot of all registered channels.
	OnSnapshot(PipelineSnapshot)
}

MetricsCollector receives periodic telemetry snapshots from a pipeline. Implementations must be safe for concurrent use.

type MultiChannelReceiver

type MultiChannelReceiver[T any] []<-chan T

func NewMultiChannelReceiver

func NewMultiChannelReceiver[T any, C InputChannel[T]](in ...C) MultiChannelReceiver[T]

func (MultiChannelReceiver[T]) At

func (m MultiChannelReceiver[T]) At(index uint) <-chan T

func (MultiChannelReceiver[T]) Iter

func (m MultiChannelReceiver[T]) Iter() iter.Seq[<-chan T]

func (MultiChannelReceiver[T]) Len

func (m MultiChannelReceiver[T]) Len() int

func (MultiChannelReceiver[T]) SinkAt

func (m MultiChannelReceiver[T]) SinkAt(ctx context.Context, index uint) []T

func (MultiChannelReceiver[T]) SinkAtIter

func (m MultiChannelReceiver[T]) SinkAtIter(ctx context.Context, index uint) iter.Seq[T]

type MultiChannelSender

type MultiChannelSender[T any] []chan<- T

func NewMultiChannelSender

func NewMultiChannelSender[T any, C OutputChannel[T]](out ...C) MultiChannelSender[T]

func (MultiChannelSender[T]) At

func (m MultiChannelSender[T]) At(index uint) chan<- T

func (MultiChannelSender[T]) Iter

func (m MultiChannelSender[T]) Iter() iter.Seq[chan<- T]

func (MultiChannelSender[T]) Len

func (m MultiChannelSender[T]) Len() int
func (m MultiChannelSender[T]) Link(ctx Context, index uint, in <-chan T) error

func (MultiChannelSender[T]) LinkAll

func (m MultiChannelSender[T]) LinkAll(ctx Context, in MultiChannelReceiver[T]) error

func (MultiChannelSender[T]) Send

func (m MultiChannelSender[T]) Send(ctx context.Context, index uint, values ...T) error

func (MultiChannelSender[T]) SendRoundRobin

func (m MultiChannelSender[T]) SendRoundRobin(ctx context.Context, values ...T) error

func (MultiChannelSender[T]) SendToAll

func (m MultiChannelSender[T]) SendToAll(ctx context.Context, values ...T) error

type NoopCollector added in v4.4.0

type NoopCollector struct{}

NoopCollector silently discards all telemetry snapshots. It is useful as a placeholder or for benchmarking the overhead of the telemetry registry without any I/O.

func (NoopCollector) OnSnapshot added in v4.4.0

func (NoopCollector) OnSnapshot(PipelineSnapshot)

OnSnapshot discards the snapshot.

type OutputChannel

type OutputChannel[T any] interface {
	~chan T | ~chan<- T
}

type Pipeline

type Pipeline[In any, Out any] struct {
	// contains filtered or unexported fields
}

Pipeline coordinates concurrent processing stages, managing their lifecycle and propagating errors and cancellation signals across all stages.

func NewPipeline

func NewPipeline[In any, Out any](ctx context.Context, cfg Config[In, Out]) (*Pipeline[In, Out], context.Context, error)

NewPipeline creates a new Pipeline and a derived context for coordinating pipeline stages. The returned context is cancelled when any stage encounters an error. Use the returned Pipeline to register stages and wait for completion.

func (*Pipeline[In, Out]) CloseAllInputs

func (p *Pipeline[In, Out]) CloseAllInputs()

CloseAllInputs will close all of the input channels. It is safe to call multiple times; only the first call will close the channels.

func (*Pipeline[In, Out]) Inputs

func (p *Pipeline[In, Out]) Inputs() MultiChannelSender[In]

func (*Pipeline[In, Out]) Outputs

func (p *Pipeline[In, Out]) Outputs() MultiChannelReceiver[Out]

func (*Pipeline[In, Out]) Telemetry added in v4.4.0

func (p *Pipeline[In, Out]) Telemetry() *Telemetry

Telemetry returns the pipeline's telemetry registry, or nil when telemetry is disabled. This can be used to register additional channels or take on-demand snapshots.

func (*Pipeline[In, Out]) Wait

func (p *Pipeline[In, Out]) Wait() error

Wait blocks until all registered stages complete and returns all errors encountered by any stage (joined via errors.Join), or nil if all stages completed successfully.

type PipelineSnapshot added in v4.4.0

type PipelineSnapshot struct {
	// PipelineName is the name of the pipeline as provided in Config.Name.
	PipelineName string

	// Timestamp is the time at which the snapshot was taken.
	Timestamp time.Time

	// Channels contains one entry per registered channel.
	Channels []ChannelSnapshot
}

PipelineSnapshot is a point-in-time view of all registered channels in a pipeline.

type SkipError added in v4.1.0

type SkipError struct {
	Err error
}

SkipError signals that the current item should be skipped without killing the pipeline. The stage continues processing subsequent items normally. The wrapped error is silently discarded and does not appear in the pipeline's returned error.

func (*SkipError) Error added in v4.1.0

func (e *SkipError) Error() string

func (*SkipError) Unwrap added in v4.1.0

func (e *SkipError) Unwrap() error

type StageError added in v4.3.0

type StageError struct {
	StageName string
	Err       error
}

StageError signals that the error originated from a pipeline stage. The error will include the name of the stage that the error occurred in.

func (*StageError) Error added in v4.3.0

func (e *StageError) Error() string

func (*StageError) Unwrap added in v4.3.0

func (e *StageError) Unwrap() error

type Telemetry added in v4.4.0

type Telemetry struct {
	// contains filtered or unexported fields
}

Telemetry is a registry of channels and a periodic sampler that reports snapshots through a MetricsCollector. A nil *Telemetry is safe to use; all methods and the package-level registration functions treat nil as a no-op.

func NewTelemetry added in v4.4.0

func NewTelemetry(pipelineName string, collector MetricsCollector, interval time.Duration) *Telemetry

NewTelemetry creates a new telemetry registry. The sampler is not started until Start is called.

func (*Telemetry) Snapshot added in v4.4.0

func (t *Telemetry) Snapshot() PipelineSnapshot

Snapshot takes an immediate point-in-time snapshot of all registered channels.

func (*Telemetry) Start added in v4.4.0

func (t *Telemetry) Start(ctx context.Context)

Start launches the periodic sampler goroutine. It is bound to the provided context and will also stop when Stop is called.

func (*Telemetry) Stop added in v4.4.0

func (t *Telemetry) Stop()

Stop stops the periodic sampler and waits for it to exit. It is safe to call multiple times.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL