Versions in this module Expand all Collapse all v0 v0.0.1 Dec 3, 2024 Changes in this version + var EmitLatest = func(o *mcastOpts) + func AlwaysRetry(err error) bool + func Discard[T any](ctx context.Context, src Observable[T]) + func First[T any](ctx context.Context, src Observable[T]) (item T, err error) + func Last[T any](ctx context.Context, src Observable[T]) (item T, err error) + func ObserveWithWaitGroup[T any](ctx context.Context, wg *sync.WaitGroup, src Observable[T], next func(T), ...) + func ToChannel[T any](ctx context.Context, src Observable[T], opts ...ToChannelOpt) <-chan T + func ToSlice[T any](ctx context.Context, src Observable[T]) (items []T, err error) + func ToTruncatingChannel[T any](ctx context.Context, src Observable[T], opts ...ToChannelOpt) <-chan T + type FuncObservable func(context.Context, func(T), func(error)) + func (f FuncObservable[T]) Observe(ctx context.Context, next func(T), complete func(error)) + type MulticastOpt func(o *mcastOpts) + type Observable interface + Observe func(ctx context.Context, next func(T), complete func(error)) + func Buffer[Buf any, T any](src Observable[T], bufferSize int, waitTime time.Duration, ...) Observable[Buf] + func Concat[T any](srcs ...Observable[T]) Observable[T] + func Debounce[T any](src Observable[T], duration time.Duration) Observable[T] + func Distinct[T comparable](src Observable[T]) Observable[T] + func Empty[T any]() Observable[T] + func Error[T any](err error) Observable[T] + func Filter[T any](src Observable[T], pred func(T) bool) Observable[T] + func FlatMap[A, B any](src Observable[A], apply func(A) Observable[B]) Observable[B] + func FromChannel[T any](in <-chan T) Observable[T] + func FromSlice[T any](items []T) Observable[T] + func Just[T any](item T) Observable[T] + func Map[A, B any](src Observable[A], apply func(A) B) Observable[B] + func Multicast[T any](opts ...MulticastOpt) (mcast Observable[T], next func(T), complete func(error)) + func Range(from, to int) Observable[int] + func Reduce[Item, Result any](src Observable[Item], init Result, reduce func(Result, Item) Result) Observable[Result] + func Retry[T any](src Observable[T], shouldRetry RetryFunc) Observable[T] + func Stuck[T any]() Observable[T] + func Throttle[T any](src Observable[T], ratePerSecond float64, burst int) Observable[T] + func ToMulticast[T any](src Observable[T], opts ...MulticastOpt) (mcast Observable[T], connect func(context.Context)) + type RetryFunc func(err error) bool + func BackoffRetry(shouldRetry RetryFunc, minBackoff, maxBackoff time.Duration) RetryFunc + func LimitRetries(shouldRetry RetryFunc, numRetries int) RetryFunc + type ToChannelOpt func(*toChannelOpts) + func WithBufferSize(n int) ToChannelOpt + func WithErrorChan(errCh chan error) ToChannelOpt