Documentation
¶
Index ¶
- Constants
- Variables
- func PermanentError(err error) error
- type Group
- type Handler
- type HandlerBatch
- type HandlerBatchFunc
- type HandlerOption
- func WithHandlerBatchFlushTimeout(timeout time.Duration) HandlerOption
- func WithHandlerBatchSize(size int) HandlerOption
- func WithHandlerLogger(l *slog.Logger) HandlerOption
- func WithHandlerRetryInitialInterval(d time.Duration) HandlerOption
- func WithHandlerRetryMaxInterval(d time.Duration) HandlerOption
- func WithHandlerRetryMaxTime(d time.Duration) HandlerOption
- func WithHandlerTimeout(timeout time.Duration) HandlerOption
- func WithoutHandlerRetry() HandlerOption
- type HandlerOptions
- type HandlerProto
- type HandlerProtoFunc
- type Message
- type Offset
- type OffsetSeq
- type Option
- func WithCommitCount(value int) Option
- func WithCommitDuration(value time.Duration) Option
- func WithDialTimeout(d time.Duration) Option
- func WithEarliestOffsetReset() Option
- func WithFetchMaxWait(d time.Duration) Option
- func WithFetchMinBytes(value int32) Option
- func WithLogger(logger *slog.Logger) Option
- func WithReadTimeout(d time.Duration) Option
- func WithSASL(mechanism sasl.Mechanism, username, password string) Option
- func WithTraceIDCtxFunc(f TraceIDCtxFunc) Option
- func WithWriteTimeout(d time.Duration) Option
- type Options
- type TraceIDCtxFunc
Constants ¶
const ( InitialOffsetLatest = sarama.OffsetNewest InitialOffsetEarliest = sarama.OffsetOldest )
Variables ¶
var ( // ErrNoTopics consumer doesn't have topics and can't start ErrNoTopics = errors.New("no topics to consume") // ErrEmptyTopic the message wrapped by the handler has no topic (not event) ErrEmptyTopic = errors.New("empty topic") // ErrClosed consumer was closed ErrClosed = errors.New("closed") // ErrRunned consumer is runned ErrRunned = errors.New("runned") // ErrMessageUnmarshal the error happens when message tried to unmarshal ErrMessageUnmarshal = errors.New("message unmarshal error") // ErrMessageHandle the error happens when message tried to handle ErrMessageHandle = errors.New("message handle error") )
Functions ¶
func PermanentError ¶
Types ¶
type Group ¶
type Group struct {
// contains filtered or unexported fields
}
Group is the kafka consumer group
func (*Group) AddHandler ¶
AddHandler adds message handler
type Handler ¶
type Handler interface {
codec.Meta
Handle(context.Context, Message)
Offset(partition int32) int64
}
Handler message handler interface
type HandlerBatch ¶
type HandlerBatch[T any] struct { // contains filtered or unexported fields }
HandlerBatch handler batch of messages
func NewHandlerBatch ¶
func NewHandlerBatch[T any](decoder codec.Decoder[T], handlerFunc HandlerBatchFunc[T], opts ...HandlerOption) *HandlerBatch[T]
NewHandlerBatch constructs HandlerBatch
type HandlerBatchFunc ¶
type HandlerOption ¶
type HandlerOption func(*HandlerOptions)
HandlerOption handler option
func WithHandlerBatchFlushTimeout ¶
func WithHandlerBatchFlushTimeout(timeout time.Duration) HandlerOption
WithHandlerBatchFlushTimeout sets BatchFlushTimeout option
func WithHandlerBatchSize ¶
func WithHandlerBatchSize(size int) HandlerOption
WithHandlerBatchSize sets BatchSize option
func WithHandlerLogger ¶
func WithHandlerLogger(l *slog.Logger) HandlerOption
WithHandlerLogger sets Logger option
func WithHandlerRetryInitialInterval ¶
func WithHandlerRetryInitialInterval(d time.Duration) HandlerOption
WithHandlerRetryInitialInterval sets RetryInitialInterval option
func WithHandlerRetryMaxInterval ¶
func WithHandlerRetryMaxInterval(d time.Duration) HandlerOption
WithHandlerRetryMaxInterval sets RetryMaxInterval option
func WithHandlerRetryMaxTime ¶
func WithHandlerRetryMaxTime(d time.Duration) HandlerOption
WithHandlerRetryMaxTime sets RetryMaxTime option
func WithHandlerTimeout ¶
func WithHandlerTimeout(timeout time.Duration) HandlerOption
WithHandlerTimeout sets Timeout option
func WithoutHandlerRetry ¶
func WithoutHandlerRetry() HandlerOption
WithoutHandlerRetry sets Retry option to false
type HandlerOptions ¶
type HandlerProto ¶
type HandlerProto[T any] struct { // contains filtered or unexported fields }
HandlerProto handler messages one at a time
func NewHandler ¶
func NewHandler[T any](decoder codec.Decoder[T], handlerFunc HandlerProtoFunc[T], opts ...HandlerOption) *HandlerProto[T]
NewHandler constructs HandlerProto
type HandlerProtoFunc ¶
type Message ¶
type Message struct {
Key []byte
Value []byte
ID string
Group string
Topic string
TraceID string
Partition int32
Offset int64
Timestamp time.Time
Headers []codec.Header
}
Message raw kafka message
type Option ¶
type Option func(*Options)
Option consumer option
func WithCommitCount ¶
WithCommitCount sets CommitCount option
func WithCommitDuration ¶
WithCommitDuration sets CommitDuration option
func WithDialTimeout ¶
WithDialTimeout sets how long to wait for the initial connection
func WithEarliestOffsetReset ¶
func WithEarliestOffsetReset() Option
WithEarliestOffsetReset sets AutoOffsetReset to "earliest" mode
func WithFetchMaxWait ¶
WithFetchMaxWait sets FetchMaxWait option
func WithFetchMinBytes ¶
WithFetchMinBytes sets FetchMinBytes option
func WithReadTimeout ¶
WithReadTimeout sets how long to wait for a response
func WithTraceIDCtxFunc ¶
func WithTraceIDCtxFunc(f TraceIDCtxFunc) Option
WithTraceIDCtxFunc sets func which inserts trace id to the context
func WithWriteTimeout ¶
WithWriteTimeout sets how long to wait for a transmit
type Options ¶
type Options struct {
InitialOffset int64 // Determine when the group should to start consuming (default: "latest")
FetchMaxWait time.Duration // Waiting time before returning messages to the client (relevant if FetchMinBytes > 1) (default 500)
FetchMinBytes int32 // Determine how many bytes kafka library have to receive before returning to the client (latency << FetchMinBytes << throughput) (default 1)
CommitCount int // After how many messages do client have to call the commit (default 20)
CommitDuration time.Duration // After how much time do client have to call the commit (default 1 second)
DialTimeout time.Duration // How long to wait for the initial connection
ReadTimeout time.Duration // How long to wait for a response
WriteTimeout time.Duration // How long to wait for a transmit
Logger *slog.Logger // Logger (default: slog.Default)
TraceIDCtx TraceIDCtxFunc // Func which inserts trace id into the context
SASL *sasl.SASL // SASL based authentication with broker
}
Options consumer options