core

package
v0.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 5, 2024 License: MIT Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrStoreNotFound = errors.New("store not found")
	ErrEventExists   = errors.New("event already exists")
)

Functions

func Iterate added in v0.0.2

func Iterate(ctx context.Context, m StoreManager, fluxVersion core.Version, filter Filter) iter.Seq2[*Event, error]

Types

type Event

type Event struct {
	StoreId       StoreId
	StoreMetadata Metadata
	FluxVersion   core.Version
	core.Event
}

type EventOutOfOrderError

type EventOutOfOrderError struct {
	StoreId  StoreId
	Expected core.Version
	Actual   core.Version
}

func (*EventOutOfOrderError) Error

func (e *EventOutOfOrderError) Error() string

type Filter added in v0.0.2

type Filter struct {
	AggregateType *string
	AggregateID   *string
	Metadata      *Metadata
	StoreMetadata *Metadata
}

type MessageBus

type MessageBus interface {
	Publish(subject string, message []byte) error
	Subscribe(subject string, handler func(message []byte, metadata Metadata) error) (Unsubscriber, error)
}

type Metadata

type Metadata map[string]string

func (*Metadata) Scan

func (m *Metadata) Scan(src any) error

type StoreId

type StoreId uuid.UUID

func (StoreId) MarshalText

func (id StoreId) MarshalText() ([]byte, error)

func (*StoreId) Scan

func (id *StoreId) Scan(src any) error

func (StoreId) String

func (id StoreId) String() string

func (*StoreId) UnmarshalText

func (id *StoreId) UnmarshalText(data []byte) error

type StoreIterator

type StoreIterator interface {
	Close()
	WaitForNext() bool
	Value() Event
}

type StoreManager

type StoreManager interface {
	List(metadata Metadata) iter.Seq[SubStore]
	Create(id StoreId, metadata Metadata) (SubStore, error)
	Get(id StoreId) (SubStore, error)

	// OnCommit registers a callback that will be called after events have been committed to the store.
	// If events are committed in a transaction, the callback will be called after the transaction has been committed.
	// If a shared storage is used, it could be that the handler is called with nil values. E.g. when two processes are using a
	// shared postgres database, the handler will be called with nil values for the SubStore and events, in the process that did not
	// commit the events.
	OnCommit(handler func(SubStore, []Event)) Unsubscriber
	All(fluxVersion core.Version, filter Filter) (iter.Seq[Event], error)
}

type SubStore

type SubStore interface {
	core.EventStore
	Id() StoreId
	Metadata() Metadata
	// start is the non inclusive version to start from
	All(start core.Version) (iter.Seq[core.Event], error)
	Append(event core.Event) error

	LastVersion() core.Version

	UpdateMetadata(metadata Metadata) error
}

type UnsubscribeFunc

type UnsubscribeFunc func() error

func (UnsubscribeFunc) Unsubscribe

func (u UnsubscribeFunc) Unsubscribe() error

type Unsubscriber

type Unsubscriber interface {
	Unsubscribe() error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL