consumer

package
v1.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 6, 2025 License: MIT Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	InitialOffsetLatest   = sarama.OffsetNewest
	InitialOffsetEarliest = sarama.OffsetOldest
)

Variables

View Source
var (
	// ErrNoTopics consumer doesn't have topics and can't start
	ErrNoTopics = errors.New("no topics to consume")
	// ErrEmptyTopic the message wrapped by the handler has no topic (not event)
	ErrEmptyTopic = errors.New("empty topic")
	// ErrClosed consumer was closed
	ErrClosed = errors.New("closed")
	// ErrRunned consumer is runned
	ErrRunned = errors.New("runned")

	// ErrMessageUnmarshal the error happens when message tried to unmarshal
	ErrMessageUnmarshal = errors.New("message unmarshal error")
	// ErrMessageHandle the error happens when message tried to handle
	ErrMessageHandle = errors.New("message handle error")
)

Functions

func PermanentError

func PermanentError(err error) error

Types

type Group

type Group struct {
	// contains filtered or unexported fields
}

Group is the kafka consumer group

func NewGroup

func NewGroup(brokers []string, group string, opts ...Option) (*Group, error)

NewGroup constructs Group

func (*Group) AddHandler

func (c *Group) AddHandler(h Handler) error

AddHandler adds message handler

func (*Group) Close

func (c *Group) Close() error

Close closes the kafka consumer

func (*Group) Run

func (c *Group) Run(ctx context.Context) (err error)

Run starts consume the messages and handle them

type Handler

type Handler interface {
	codec.Meta
	Handle(context.Context, Message)
	Offset(partition int32) int64
}

Handler message handler interface

type HandlerBatch

type HandlerBatch[T any] struct {
	// contains filtered or unexported fields
}

HandlerBatch handler batch of messages

func NewHandlerBatch

func NewHandlerBatch[T any](decoder codec.Decoder[T], handlerFunc HandlerBatchFunc[T], opts ...HandlerOption) *HandlerBatch[T]

NewHandlerBatch constructs HandlerBatch

func (*HandlerBatch[T]) Handle

func (p *HandlerBatch[T]) Handle(ctx context.Context, raw Message)

Handle received Message handler

func (*HandlerBatch) Offset

func (h *HandlerBatch) Offset(part int32) (off int64)

Offset gets a handler's offset sequence

func (*HandlerBatch) Topic

func (p *HandlerBatch) Topic() string

Topic getter

func (*HandlerBatch) Type

func (p *HandlerBatch) Type() string

ProtoReflect protoreflect.Message getter

type HandlerBatchFunc

type HandlerBatchFunc[T any] func(ctx context.Context, msg []T, raw []Message) error

type HandlerOption

type HandlerOption func(*HandlerOptions)

HandlerOption handler option

func WithHandlerBatchFlushTimeout

func WithHandlerBatchFlushTimeout(timeout time.Duration) HandlerOption

WithHandlerBatchFlushTimeout sets BatchFlushTimeout option

func WithHandlerBatchSize

func WithHandlerBatchSize(size int) HandlerOption

WithHandlerBatchSize sets BatchSize option

func WithHandlerLogger

func WithHandlerLogger(l *slog.Logger) HandlerOption

WithHandlerLogger sets Logger option

func WithHandlerRetryInitialInterval

func WithHandlerRetryInitialInterval(d time.Duration) HandlerOption

WithHandlerRetryInitialInterval sets RetryInitialInterval option

func WithHandlerRetryMaxInterval

func WithHandlerRetryMaxInterval(d time.Duration) HandlerOption

WithHandlerRetryMaxInterval sets RetryMaxInterval option

func WithHandlerRetryMaxTime

func WithHandlerRetryMaxTime(d time.Duration) HandlerOption

WithHandlerRetryMaxTime sets RetryMaxTime option

func WithHandlerTimeout

func WithHandlerTimeout(timeout time.Duration) HandlerOption

WithHandlerTimeout sets Timeout option

func WithoutHandlerRetry

func WithoutHandlerRetry() HandlerOption

WithoutHandlerRetry sets Retry option to false

type HandlerOptions

type HandlerOptions struct {
	Logger *slog.Logger

	Timeout time.Duration

	BatchSize         int
	BatchFlushTimeout time.Duration

	Retry                bool
	RetryInitialInterval time.Duration
	RetryMaxInterval     time.Duration
	RetryMaxTime         time.Duration
}

type HandlerProto

type HandlerProto[T any] struct {
	// contains filtered or unexported fields
}

HandlerProto handler messages one at a time

func NewHandler

func NewHandler[T any](decoder codec.Decoder[T], handlerFunc HandlerProtoFunc[T], opts ...HandlerOption) *HandlerProto[T]

NewHandler constructs HandlerProto

func (*HandlerProto[T]) Handle

func (p *HandlerProto[T]) Handle(ctx context.Context, raw Message)

Handle received Message handler

func (*HandlerProto) Offset

func (h *HandlerProto) Offset(part int32) (off int64)

Offset gets a handler's offset sequence

func (*HandlerProto) Topic

func (p *HandlerProto) Topic() string

Topic getter

func (*HandlerProto) Type

func (p *HandlerProto) Type() string

ProtoReflect protoreflect.Message getter

type HandlerProtoFunc

type HandlerProtoFunc[T any] func(ctx context.Context, msg T, raw Message) error

type Message

type Message struct {
	Key       []byte
	Value     []byte
	ID        string
	Group     string
	Topic     string
	TraceID   string
	Partition int32
	Offset    int64
	Timestamp time.Time
	Headers   []codec.Header
}

Message raw kafka message

type Offset

type Offset map[int32]int64

Offset kafka offset by partition

type OffsetSeq added in v1.0.1

type OffsetSeq = iter.Seq2[int32, int64]

Offset sequence

type Option

type Option func(*Options)

Option consumer option

func WithCommitCount

func WithCommitCount(value int) Option

WithCommitCount sets CommitCount option

func WithCommitDuration

func WithCommitDuration(value time.Duration) Option

WithCommitDuration sets CommitDuration option

func WithDialTimeout

func WithDialTimeout(d time.Duration) Option

WithDialTimeout sets how long to wait for the initial connection

func WithEarliestOffsetReset

func WithEarliestOffsetReset() Option

WithEarliestOffsetReset sets AutoOffsetReset to "earliest" mode

func WithFetchMaxWait

func WithFetchMaxWait(d time.Duration) Option

WithFetchMaxWait sets FetchMaxWait option

func WithFetchMinBytes

func WithFetchMinBytes(value int32) Option

WithFetchMinBytes sets FetchMinBytes option

func WithLogger

func WithLogger(logger *slog.Logger) Option

WithLogger sets Logger option

func WithReadTimeout

func WithReadTimeout(d time.Duration) Option

WithReadTimeout sets how long to wait for a response

func WithSASL

func WithSASL(mechanism sasl.Mechanism, username, password string) Option

WithSASL sets SASL based authentication with broker

func WithTraceIDCtxFunc

func WithTraceIDCtxFunc(f TraceIDCtxFunc) Option

WithTraceIDCtxFunc sets func which inserts trace id to the context

func WithWriteTimeout

func WithWriteTimeout(d time.Duration) Option

WithWriteTimeout sets how long to wait for a transmit

type Options

type Options struct {
	InitialOffset int64 // Determine when the group should to start consuming (default: "latest")

	FetchMaxWait  time.Duration // Waiting time before returning messages to the client (relevant if FetchMinBytes > 1) (default 500)
	FetchMinBytes int32         // Determine how many bytes kafka library have to receive before returning to the client (latency << FetchMinBytes << throughput) (default 1)

	CommitCount    int           // After how many messages do client have to call the commit (default 20)
	CommitDuration time.Duration // After how much time do client have to call the commit (default 1 second)

	DialTimeout  time.Duration // How long to wait for the initial connection
	ReadTimeout  time.Duration // How long to wait for a response
	WriteTimeout time.Duration // How long to wait for a transmit

	Logger     *slog.Logger   // Logger (default: slog.Default)
	TraceIDCtx TraceIDCtxFunc // Func which inserts trace id into the context
	SASL       *sasl.SASL     // SASL based authentication with broker
}

Options consumer options

type TraceIDCtxFunc

type TraceIDCtxFunc func(context.Context, string) context.Context

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL