stages

package
v4.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 24, 2026 License: MIT Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AggregateStage

type AggregateStage[T any] struct {
	Name   string
	Buffer uint
}

func (AggregateStage[T]) Create

func (s AggregateStage[T]) Create(ctx pipeline.Context, in <-chan T) <-chan []T

type BatchStage

type BatchStage[T any] struct {
	Name      string
	Buffer    uint
	BatchSize uint
}

func (BatchStage[T]) Create

func (s BatchStage[T]) Create(ctx pipeline.Context, in <-chan T) <-chan []T

type ExpandStage

type ExpandStage[In any, Out any] struct {
	Name     string
	Buffer   uint
	Expander Expander[In, Out]
}

func (ExpandStage[In, Out]) Create

func (s ExpandStage[In, Out]) Create(ctx pipeline.Context, in <-chan In) <-chan Out

type Expander

type Expander[In any, Out any] func(context.Context, In) iter.Seq2[Out, error]

type FanInStage

type FanInStage[T any] struct {
	Name   string
	Buffer uint
}

func (FanInStage[T]) Create

func (s FanInStage[T]) Create(ctx pipeline.Context, in pipeline.MultiChannelReceiver[T]) <-chan T

type FanOutRoundRobinStage

type FanOutRoundRobinStage[T any] struct {
	Name        string
	OutputCount uint
	Buffer      uint
}

func (FanOutRoundRobinStage[T]) Create

func (s FanOutRoundRobinStage[T]) Create(ctx pipeline.Context, in <-chan T) pipeline.MultiChannelReceiver[T]

type FanOutStage

type FanOutStage[T any] struct {
	Name        string
	OutputCount uint
	Buffer      uint
}

func (FanOutStage[T]) Create

func (s FanOutStage[T]) Create(ctx pipeline.Context, in <-chan T) pipeline.MultiChannelReceiver[T]

type Filter

type Filter[T any] func(context.Context, T) (bool, error)

type FilterStage

type FilterStage[T any] struct {
	Name   string
	Buffer uint
	Filter Filter[T]
}

func (FilterStage[T]) Create

func (s FilterStage[T]) Create(ctx pipeline.Context, in <-chan T) <-chan T

type FlattenStage

type FlattenStage[T any] struct {
	Name   string
	Buffer uint
}

func (FlattenStage[T]) Create

func (s FlattenStage[T]) Create(ctx pipeline.Context, in <-chan []T) <-chan T

type LimitStage

type LimitStage[T any] struct {
	Name   string
	Buffer uint
	Limit  uint
}

func (LimitStage[T]) Create

func (s LimitStage[T]) Create(ctx pipeline.Context, in <-chan T) <-chan T

type ParallelTransformStage

type ParallelTransformStage[In any, Out any] struct {
	Name        string
	Buffer      uint
	Workers     uint
	Transformer Transformer[In, Out]
}

func (ParallelTransformStage[In, Out]) Create

func (s ParallelTransformStage[In, Out]) Create(ctx pipeline.Context, in <-chan In) <-chan Out

type ReduceStage

type ReduceStage[T any, Acc any] struct {
	Name    string
	Buffer  uint
	Initial Acc
	Reducer Reducer[T, Acc]
}

func (ReduceStage[T, Acc]) Create

func (s ReduceStage[T, Acc]) Create(ctx pipeline.Context, in <-chan T) <-chan Acc

type Reducer

type Reducer[T any, Acc any] func(context.Context, Acc, T) (Acc, error)

type Selector

type Selector[T any] func(context.Context, T) int

type SplitStage

type SplitStage[T any] struct {
	Name        string
	OutputCount uint
	Buffer      uint
	Selector    Selector[T]
}

func (SplitStage[T]) Create

func (s SplitStage[T]) Create(ctx pipeline.Context, in <-chan T) pipeline.MultiChannelReceiver[T]

type TransformStage

type TransformStage[In any, Out any] struct {
	Name        string
	Buffer      uint
	Transformer Transformer[In, Out]
}

func (TransformStage[In, Out]) Create

func (s TransformStage[In, Out]) Create(ctx pipeline.Context, in <-chan In) <-chan Out

type Transformer

type Transformer[In any, Out any] func(context.Context, In) (Out, error)

type WindowedReduceStage added in v4.0.1

type WindowedReduceStage[T any, Acc any] struct {
	Name    string
	Buffer  uint
	Initial Acc
	Reducer WindowedReducer[T, Acc]
}

func (WindowedReduceStage[T, Acc]) Create added in v4.0.1

func (s WindowedReduceStage[T, Acc]) Create(ctx pipeline.Context, in <-chan T) <-chan Acc

type WindowedReducer added in v4.0.1

type WindowedReducer[T any, Acc any] func(context.Context, Acc, T) (nextAcc Acc, output Acc, emit bool, err error)

WindowedReducer is a function that takes a context, the current accumulator, and a new input value. It returns: - nextAcc: The updated accumulator state to be used for the next iteration. - output: The value to be sent to the output channel (only if emit is true). - emit: A boolean flag indicating whether 'output' should be sent downstream. - err: An error if processing fails.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL