isledb

package module
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 21, 2026 License: Apache-2.0 Imports: 38 Imported by: 0

README

IsleDB

isledb

CI Tests Coverage Status GoDoc

IsleDB is an embedded key-value engine designed for object storage. It borrows ideas from LSM-trees but rethinks them for object storage.

Writes go to an in-memory memtable first, then get flushed to SST files periodically. This batching matters—instead of hitting object storage on every put(), you amortize costs across many writes. Large values get stored separately as blobs so the SSTs stay small.

The SST files themselves live in object storage (S3, GCS, Azure, MinIO, etc). Your capacity and durability scale with the bucket, not your local disk.

Readers attach to the same bucket/prefix, stream SSTs and blobs on demand, and use local caches to minimize re-downloads—so read capacity scales horizontally without replicas.

Features

  1. Data lives on object storage (S3, GCS, Azure Blob, MinIO).
  2. Bottomless capacity.
  3. Object Store durability.
  4. Readers scale horizontally-no replicas, no connection limits.
  5. Three compaction modes (Merge, FIFO, Time-Window)
  6. Separate Writer and Compaction Process
  7. Pluggable Manifest store
isledb architecture

Use Cases

One library, many workloads. The same storage and replay model can power event ingestion, state materialization, key-value APIs, and object-storage-first data pipelines.

  • Event Hub — Ingest app events and fan out with tailing readers. Common alternative: managed brokers + sinks.
  • Event Store — Append ordered events and build projections from replay. Common alternative: dedicated event databases.
  • KV API Backing Store — Serve Get/Scan workloads with object-store durability. Common alternative: managed key-value services.
  • CDC Pipeline Buffer — Stage changes in object storage before indexing and analytics.

When not to use IsleDB

Pick the right tool for the workload. IsleDB is strongest in object-storage-first, append-heavy systems.

Strong fit for IsleDB
  • Append-heavy workloads (logs, events, CDC)
  • Large datasets where 1-10 second read latency is acceptable
  • Multi-reader / fan-out architectures
  • Cost-sensitive storage at scale
  • Serverless / ephemeral compute
Better choices elsewhere
  • Sub-10ms latency SLAs → Use low-latency serving data stores
  • High-frequency point updates to same keys → Use update-optimized transactional stores
  • Complex queries / joins / transactions → Use relational transactional databases
  • Small hot datasets (<1GB) → Use in-memory stores

Full API Reference

API-Reference

Getting Started

Create a Blob Store Connection(s3, azure, gcp) and DB Instance
ctx := context.Background()

dir, _ := filepath.Abs("./data")
store, err := blobstore.Open(ctx, "file://"+dir, "db1")
if err != nil {
	log.Fatal(err)
}
defer store.Close()

db, err := isledb.OpenDB(ctx, store, isledb.DBOptions{})
if err != nil {
	log.Fatal(err)
}
defer db.Close()

Cloud buckets (S3, GCS, Azure) use Go Cloud bucket URLs:

ctx := context.Background()

// S3
store, err := blobstore.Open(ctx, "s3://my-bucket?region=us-east-1", "db1")
if err != nil {
	log.Fatal(err)
}

// GCS
store, err = blobstore.Open(ctx, "gs://my-bucket", "db1")
if err != nil {
	log.Fatal(err)
}

// Azure Blob
store, err = blobstore.Open(ctx, "azblob://my-container", "db1")
if err != nil {
	log.Fatal(err)
}
defer store.Close()

db, err := isledb.OpenDB(ctx, store, isledb.DBOptions{})
if err != nil {
	log.Fatal(err)
}
defer db.Close()
Writer (single writer per bucket/prefix)

IsleDB uses a write-ahead memtable architecture where writes are first buffered in memory before being flushed to persistent SST files. Large values are stored separately in blob storage to keep SSTs compact.

opts := isledb.DefaultWriterOptions()
writer, err := db.OpenWriter(ctx, opts)
if err != nil {
	log.Fatal(err)
}
defer writer.Close()

batch := []struct {
	key   string
	value string
}{
	{key: "hello", value: "world"},
	{key: "foo", value: "bar"},
}
for _, kv := range batch {
	if err := writer.Put([]byte(kv.key), []byte(kv.value)); err != nil {
		log.Fatal(err)
	}
}
if err := writer.Flush(ctx); err != nil {
	log.Fatal(err)
}
Reader

Readers open against the same bucket/prefix and fetch SSTs and blobs on demand. Configure local caches to reduce repeated downloads, and scale readers horizontally as needed.

reader, err := isledb.OpenReader(ctx, store, isledb.ReaderOpenOptions{
	CacheDir: "./cache",
})
if err != nil {
	log.Fatal(err)
}
defer reader.Close()

value, ok, err := reader.Get(ctx, []byte("hello"))
if err != nil {
	log.Fatal(err)
}
if ok {
	log.Printf("value=%s", value)
}
Tailing Reader

Continuously streams new KV writes by polling for new SSTs and emitting entries in order. Good for event/log style consumption when you want a live feed over object storage.

tr, err := isledb.OpenTailingReader(ctx, store, isledb.TailingReaderOpenOptions{
	RefreshInterval: time.Second,
	ReaderOptions: isledb.ReaderOpenOptions{
		CacheDir: "./cache",
	},
})
if err != nil {
	log.Fatal(err)
}
defer tr.Close()

if err := tr.Start(); err != nil {
	log.Fatal(err)
}
err = tr.Tail(ctx, isledb.TailOptions{
	PollInterval: time.Second,
}, func(kv isledb.KV) error {
	log.Printf("%s=%s", kv.Key, kv.Value)
	return nil
})
if err != nil {
	log.Fatal(err)
}
Compaction

IsleDB ships multiple compaction paths so you can pick what matches your workload.

SSTable Compactor (merge)

Merges L0 SSTs into sorted runs and compacts consecutive sorted runs as needed to reduce read amplification. Use this for normal LSM maintenance.

compactor, err := db.OpenCompactor(ctx, isledb.DefaultCompactorOptions())
if err != nil {
	log.Fatal(err)
}
defer compactor.Close()

compactor.Start()
Retention Compactor (by age / FIFO)

Deletes oldest SSTs once they are older than RetentionPeriod, while keeping at least RetentionCount newest SSTs.

retention, err := db.OpenRetentionCompactor(ctx, isledb.RetentionCompactorOptions{
	Mode:            isledb.CompactByAge,
	RetentionPeriod: 7 * 24 * time.Hour,
	RetentionCount:  10,
})
if err != nil {
	log.Fatal(err)
}
defer retention.Close()

retention.Start()
Retention Compactor (by time window)

Groups SSTs into time buckets (SegmentDuration) and deletes whole segments that end before RetentionPeriod. It keeps at least RetentionCount / 10 segments (minimum 1).

RetentionPeriod controls how far back data is eligible for deletion. SegmentDuration controls the size of each bucket (for example, 1 day buckets with a 7 day retention keeps only the most recent 7 daily segments).

retention, err := db.OpenRetentionCompactor(ctx, isledb.RetentionCompactorOptions{
	Mode:            isledb.CompactByTimeWindow,
	RetentionPeriod: 7 * 24 * time.Hour,
	SegmentDuration: 24 * time.Hour,
	RetentionCount:  10,
})
if err != nil {
	log.Fatal(err)
}
defer retention.Close()

retention.Start()

Examples

  • examples/kvfile: local file-backed KV usage with writer/reader/tailer.
  • examples/wal-azblob: WAL-style event stream on Azurite/Azure Blob (tailing events).
  • examples/eventhub-minio: separate producer/consumer event hub demo on MinIO (S3 API).

Acknowledgments

IsleDB's uses SSTable of PebbleDB. Thanks to the CockroachDB team for building and open-sourcing such a well-engineered SSTable.

License

IsleDB is licensed under the Apache License 2.0.

Documentation

Index

Constants

View Source
const CompactionMaxIterations = 100

Variables

View Source
var ErrBackpressure = errors.New("writer backpressure")
View Source
var ErrEmptyIterator = errors.New("iterator produced no entries")
View Source
var ErrOutOfOrder = errors.New("iterator out of order")
View Source
var ErrTailingReaderStopped = errors.New("tailing reader stopped")

Functions

This section is empty.

Types

type BloomMeta

type BloomMeta = manifest.BloomMeta

type CleanupStats

type CleanupStats struct {
	SSTsDeleted    int
	BytesReclaimed int64
	Duration       time.Duration
}

type CompactionConfig

type CompactionConfig = manifest.CompactionConfig

type CompactionJob

type CompactionJob struct {
	Type      CompactionJobType
	InputSSTs []string
	InputRuns []uint32
	OutputRun *SortedRun
}

type CompactionJobType

type CompactionJobType int
const (
	CompactionL0Flush CompactionJobType = iota
	CompactionConsecutiveMerge
)

type CompactionLogPayload

type CompactionLogPayload = manifest.CompactionLogPayload

type Compactor

type Compactor struct {
	// contains filtered or unexported fields
}

Compactor merges SSTs into sorted runs in the background.

func (*Compactor) Close

func (c *Compactor) Close() error

func (*Compactor) FenceToken

func (c *Compactor) FenceToken() *manifest.FenceToken

func (*Compactor) IsFenced

func (c *Compactor) IsFenced() bool

func (*Compactor) Refresh

func (c *Compactor) Refresh(ctx context.Context) error

func (*Compactor) RunCompaction

func (c *Compactor) RunCompaction(ctx context.Context) error

RunCompaction performs a compaction cycle and returns when no work remains.

func (*Compactor) Start

func (c *Compactor) Start()

func (*Compactor) Stop

func (c *Compactor) Stop()

type CompactorOptions

type CompactorOptions struct {
	L0CompactionThreshold int

	MinSources    int
	MaxSources    int
	SizeThreshold int

	BloomBitsPerKey int
	BlockSize       int
	Compression     string
	TargetSSTSize   int64

	// ValidateSSTChecksum verifies SST checksums before compaction.
	ValidateSSTChecksum bool
	// SSTHashVerifier verifies SST signatures when present.
	SSTHashVerifier SSTHashVerifier

	CheckInterval     time.Duration
	OnCompactionStart func(CompactionJob)
	OnCompactionEnd   func(CompactionJob, error)
	OwnerID           string
	// GCMarkStorage allows using a custom storage backend for GC mark state.
	GCMarkStorage manifest.GCMarkStorage
}

func DefaultCompactorOptions

func DefaultCompactorOptions() CompactorOptions

type DB

type DB struct {
	// contains filtered or unexported fields
}

DB encapsulates manifest state for writers and compactors operating on a single bucket/prefix. Use OpenDB once, then call db.OpenWriter and/or db.OpenCompactor.

func OpenDB

func OpenDB(ctx context.Context, store *blobstore.Store, opts DBOptions) (*DB, error)

OpenDB opens a database and initializes it.

func (*DB) Close

func (db *DB) Close() error

Close closes the DB and any writers/compactors opened from it.

func (*DB) OpenCompactor

func (db *DB) OpenCompactor(ctx context.Context, opts CompactorOptions) (*Compactor, error)

OpenCompactor opens a compactor instance from the DB.

func (*DB) OpenRetentionCompactor

func (db *DB) OpenRetentionCompactor(ctx context.Context, opts RetentionCompactorOptions) (*RetentionCompactor, error)

OpenRetentionCompactor opens a retention compactor for this DB.

func (*DB) OpenWriter

func (db *DB) OpenWriter(ctx context.Context, opts WriterOptions) (*Writer, error)

OpenWriter opens a writer instance from the DB.

type DBOptions

type DBOptions struct {
	// ManifestStorage allows using a custom manifest storage backend.
	// If nil, the blob store is used.
	ManifestStorage manifest.Storage
	// GCMarkStorage allows using a custom storage backend for GC mark state.
	// If nil, the blob store is used.
	GCMarkStorage manifest.GCMarkStorage
}

DBOptions configures a DB instance.

type Iterator

type Iterator struct {
	// contains filtered or unexported fields
}

func (*Iterator) Close

func (it *Iterator) Close() error

func (*Iterator) Err

func (it *Iterator) Err() error

func (*Iterator) Key

func (it *Iterator) Key() []byte

func (*Iterator) Next

func (it *Iterator) Next() bool

func (*Iterator) SeekGE

func (it *Iterator) SeekGE(target []byte) bool

func (*Iterator) Valid

func (it *Iterator) Valid() bool

func (*Iterator) Value

func (it *Iterator) Value() []byte

type IteratorOptions

type IteratorOptions struct {
	MinKey []byte
	MaxKey []byte
}

type KV

type KV struct {
	Key   []byte
	Value []byte
}

type Manifest

type Manifest = manifest.Manifest

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

func OpenReader

func OpenReader(ctx context.Context, store *blobstore.Store, opts ReaderOpenOptions) (*Reader, error)

OpenReader opens a read-only handle.

func (*Reader) BlobCacheStats

func (r *Reader) BlobCacheStats() internal.BlobCacheStats

func (*Reader) Close

func (r *Reader) Close() error

func (*Reader) Get

func (r *Reader) Get(ctx context.Context, key []byte) (value []byte, found bool, err error)

Get returns the value for key if present and not deleted/expired.

func (*Reader) Manifest

func (r *Reader) Manifest() *Manifest

func (*Reader) ManifestLogCacheStats

func (r *Reader) ManifestLogCacheStats() cachestore.ManifestLogCacheStats

func (*Reader) ManifestUnsafe

func (r *Reader) ManifestUnsafe() *Manifest

func (*Reader) NewIterator

func (r *Reader) NewIterator(ctx context.Context, opts IteratorOptions) (*Iterator, error)

func (*Reader) Refresh

func (r *Reader) Refresh(ctx context.Context) (err error)

Refresh reloads the manifest and invalidates caches for removed SSTs.

func (*Reader) RefreshAndPrefetchSSTs

func (r *Reader) RefreshAndPrefetchSSTs(ctx context.Context) error

RefreshAndPrefetchSSTs refreshes the manifest and prefetches any new SSTs that were added since the last refresh. This is useful for keeping the cache warm with new data. Call this periodically if you want to proactively cache new SSTs as they are written.

WARNING: If you use this method, avoid calling Refresh() separately. Mixing RefreshAndPrefetchSSTs with manual Refresh calls may cause some SSTs to be missed during prefetch, as the function only prefetches SSTs that are new relative to the manifest state before the refresh. Choose one approach and use it consistently.

func (*Reader) SSTCacheStats

func (r *Reader) SSTCacheStats() SSTCacheStats

func (*Reader) Scan

func (r *Reader) Scan(ctx context.Context, minKey, maxKey []byte) (out []KV, err error)

Scan returns all key-value pairs in the given key range.

func (*Reader) ScanLimit

func (r *Reader) ScanLimit(ctx context.Context, minKey, maxKey []byte, limit int) (out []KV, err error)

type ReaderMetrics added in v0.0.2

type ReaderMetrics struct {
	RefreshTotal   prometheus.Counter
	RefreshErrors  prometheus.Counter
	RefreshLatency prometheus.Histogram

	GetTotal   prometheus.Counter
	GetErrors  prometheus.Counter
	GetHits    prometheus.Counter
	GetMisses  prometheus.Counter
	GetLatency prometheus.Histogram

	ScanTotal   prometheus.Counter
	ScanErrors  prometheus.Counter
	ScanLatency prometheus.Histogram
	ScanResults prometheus.Counter

	ScanLimitTotal   prometheus.Counter
	ScanLimitErrors  prometheus.Counter
	ScanLimitLatency prometheus.Histogram
	ScanLimitResults prometheus.Counter

	BlobFetchTotal   prometheus.Counter
	BlobFetchErrors  prometheus.Counter
	BlobFetchLatency prometheus.Histogram
	BlobCacheHits    prometheus.Counter
	BlobCacheMisses  prometheus.Counter
	BlobBytesTotal   prometheus.Counter

	SSTCacheHits       prometheus.Counter
	SSTCacheMisses     prometheus.Counter
	SSTDownloadTotal   prometheus.Counter
	SSTDownloadErrors  prometheus.Counter
	SSTDownloadLatency prometheus.Histogram
	SSTDownloadBytes   prometheus.Counter

	SSTRangeBlockCacheHits   prometheus.Counter
	SSTRangeBlockCacheMisses prometheus.Counter
	SSTRangeReadTotal        prometheus.Counter
	SSTRangeReadErrors       prometheus.Counter
	SSTRangeReadLatency      prometheus.Histogram
	SSTRangeReadBytes        prometheus.Counter
}

func DefaultReaderMetrics added in v0.0.2

func DefaultReaderMetrics(constLabels prometheus.Labels) *ReaderMetrics

func (*ReaderMetrics) ObserveBlobFetch added in v0.0.2

func (m *ReaderMetrics) ObserveBlobFetch(d time.Duration, sizeBytes int, cacheHit bool, err error)

func (*ReaderMetrics) ObserveGet added in v0.0.2

func (m *ReaderMetrics) ObserveGet(d time.Duration, found bool, err error)

func (*ReaderMetrics) ObserveRefresh added in v0.0.2

func (m *ReaderMetrics) ObserveRefresh(d time.Duration, err error)

func (*ReaderMetrics) ObserveSSTCacheLookup added in v0.0.2

func (m *ReaderMetrics) ObserveSSTCacheLookup(hit bool)

func (*ReaderMetrics) ObserveSSTDownload added in v0.0.2

func (m *ReaderMetrics) ObserveSSTDownload(d time.Duration, sizeBytes int64, err error)

func (*ReaderMetrics) ObserveSSTRangeBlockCacheLookup added in v0.0.2

func (m *ReaderMetrics) ObserveSSTRangeBlockCacheLookup(hit bool)

func (*ReaderMetrics) ObserveSSTRangeRead added in v0.0.2

func (m *ReaderMetrics) ObserveSSTRangeRead(d time.Duration, sizeBytes int64, err error)

func (*ReaderMetrics) ObserveScan added in v0.0.2

func (m *ReaderMetrics) ObserveScan(d time.Duration, resultCount int, err error)

func (*ReaderMetrics) ObserveScanLimit added in v0.0.2

func (m *ReaderMetrics) ObserveScanLimit(d time.Duration, resultCount int, err error)

type ReaderOpenOptions

type ReaderOpenOptions struct {
	// CacheDir is the directory for disk caches (required).
	CacheDir string

	// SSTCacheSize is the maximum bytes for SST cache (default 1GB).
	SSTCacheSize int64

	// BlobCacheSize is the maximum bytes for blob cache (default 1GB).
	BlobCacheSize int64

	// BlobCacheMaxItemSize is the maximum size per item in the blob cache.
	BlobCacheMaxItemSize int64

	// BlockCacheSize is the maximum bytes for the in-memory block cache used
	// when range-reading SSTs. Default 0 disables the block cache.
	BlockCacheSize int64

	// AllowUnverifiedRangeRead permits range-reading SSTs without verifying
	// full-file checksums or signatures.
	AllowUnverifiedRangeRead bool

	// RangeReadMinSSTSize is the minimum SST size (bytes) required to use
	// range-read + block cache. Default 0 means no size threshold.
	RangeReadMinSSTSize int64

	// ValidateSSTChecksum verifies SST checksums on first download.
	// If enabled and checksum is missing or mismatched, reads fail.
	ValidateSSTChecksum bool

	// SSTHashVerifier verifies SST signatures when present.
	// If provided and the SST has a signature, verification is enforced.
	SSTHashVerifier SSTHashVerifier

	Metrics *ReaderMetrics

	BlobReadOptions config.BlobReadOptions
	ManifestStorage manifest.Storage
}

ReaderOpenOptions configures a read-only handle.

func DefaultReaderOpenOptions

func DefaultReaderOpenOptions() ReaderOpenOptions

DefaultReaderOpenOptions returns sane defaults for ReaderOpenOptions.

type ReaderOptions

type ReaderOptions struct {
	// CacheDir is the directory for disk caches.
	CacheDir string

	// SSTCache is an optional pre-created SST cache.
	SSTCache diskcache.RefCountedCache

	// SSTCacheSize is the maximum bytes for SST cache (default 1GB).
	SSTCacheSize int64

	// BlobCache is an optional pre-created blob cache.
	BlobCache internal.BlobCache

	// BlobCacheSize is the maximum bytes for blob cache (default 1GB).
	BlobCacheSize int64

	// BlobCacheMaxItemSize is the maximum size per item in the blob cache.
	// Items larger than this will not be cached. Default 0 means no limit.
	BlobCacheMaxItemSize int64

	// BlockCacheSize is the maximum bytes for the in-memory block cache used
	// when range-reading SSTs. Default 0 disables the block cache.
	BlockCacheSize int64

	// AllowUnverifiedRangeRead permits range-reading SSTs without verifying
	// full-file checksums or signatures.
	AllowUnverifiedRangeRead bool

	// RangeReadMinSSTSize is the minimum SST size (bytes) required to use
	// range-read + block cache. Default 0 means no size threshold.
	RangeReadMinSSTSize int64

	ValueStorageConfig config.ValueStorageConfig
	ManifestStorage    manifest.Storage

	ManifestLogCache     cachestore.ManifestLogCache
	ManifestLogCacheSize int
	DisableManifestCache bool

	// ValidateSSTChecksum verifies SST checksums on first download.
	// If enabled and checksum is missing or mismatched, reads fail.
	ValidateSSTChecksum bool

	// SSTHashVerifier verifies SST signatures when present.
	// If provided and the SST has a signature, verification is enforced.
	SSTHashVerifier SSTHashVerifier

	Metrics *ReaderMetrics
}

func DefaultReaderOptions

func DefaultReaderOptions() ReaderOptions

type RetentionCompactor

type RetentionCompactor struct {
	// contains filtered or unexported fields
}

func (*RetentionCompactor) Close

func (c *RetentionCompactor) Close() error

func (*RetentionCompactor) IsFenced

func (c *RetentionCompactor) IsFenced() bool

func (*RetentionCompactor) Refresh

func (c *RetentionCompactor) Refresh(ctx context.Context) error

func (*RetentionCompactor) RunCleanup

func (c *RetentionCompactor) RunCleanup(ctx context.Context) error

func (*RetentionCompactor) Start

func (c *RetentionCompactor) Start()

func (*RetentionCompactor) Stats

func (*RetentionCompactor) Stop

func (c *RetentionCompactor) Stop()

type RetentionCompactorMode

type RetentionCompactorMode int
const (
	CompactByAge RetentionCompactorMode = iota

	CompactByTimeWindow
)

type RetentionCompactorOptions

type RetentionCompactorOptions struct {
	Mode RetentionCompactorMode

	RetentionPeriod time.Duration

	RetentionCount int

	CheckInterval time.Duration

	SegmentDuration time.Duration

	OnCleanup func(CleanupStats)

	OnCleanupError func(error)
	// GCMarkStorage allows using a custom storage backend for GC mark state.
	// If nil, the blob store is used.
	GCMarkStorage manifest.GCMarkStorage
}

func DefaultRetentionCompactorOptions

func DefaultRetentionCompactorOptions() RetentionCompactorOptions

type RetentionCompactorStats

type RetentionCompactorStats struct {
	Mode            RetentionCompactorMode
	RetentionPeriod time.Duration
	L0SSTCount      int
	SortedRunCount  int
	TotalSize       int64
	OldestSST       time.Time
}

type SSTCache

type SSTCache = diskcache.RefCountedCache

SSTCache is an alias to diskcache.RefCountedCache for caching SST file data.

type SSTCacheStats

type SSTCacheStats = diskcache.Stats

SSTCacheStats is an alias to diskcache.Stats for SST cache statistics.

type SSTHashSigner

type SSTHashSigner interface {
	Algorithm() string
	KeyID() string
	SignHash(hash []byte) ([]byte, error)
}

SSTHashSigner signs SST hash bytes for integrity verification.

type SSTHashVerifier

type SSTHashVerifier interface {
	VerifyHash(hash []byte, sig SSTSignature) error
}

SSTHashVerifier verifies SST hash signatures for integrity checks.

type SSTIterator

type SSTIterator interface {
	Next() bool
	Entry() internal.MemEntry
	Err() error
	Close() error
}

type SSTMeta

type SSTMeta = manifest.SSTMeta

type SSTSignature

type SSTSignature = manifest.SSTSignature

type SSTWriterOptions

type SSTWriterOptions struct {
	BloomBitsPerKey int
	BlockSize       int
	Compression     string
	Signer          SSTHashSigner
}

type SortedRun

type SortedRun = manifest.SortedRun

type TailOptions

type TailOptions struct {
	// MinKey and MaxKey constrain the tailing range (inclusive bounds).
	MinKey []byte
	MaxKey []byte

	// StartAfterKey resumes tailing from the next key after this value.
	// If set, it overrides MinKey as the lower bound.
	StartAfterKey []byte
	// PollInterval controls how often to check for new keys.
	PollInterval time.Duration
}

TailOptions controls tailing behavior for a TailingReader.

type TailingReader

type TailingReader struct {
	// contains filtered or unexported fields
}

TailingReader is a read-only handle that refreshes manifests periodically and can tail new keys as they appear in object storage.

func OpenTailingReader

func OpenTailingReader(ctx context.Context, store *blobstore.Store, opts TailingReaderOpenOptions) (*TailingReader, error)

OpenTailingReader opens a tailing reader handle.

func (*TailingReader) Close

func (tr *TailingReader) Close() error

Close stops background refresh and closes the underlying reader.

func (*TailingReader) Get

func (tr *TailingReader) Get(ctx context.Context, key []byte) ([]byte, bool, error)

func (*TailingReader) LastRefresh

func (tr *TailingReader) LastRefresh() time.Time

func (*TailingReader) Manifest

func (tr *TailingReader) Manifest() *Manifest

Manifest returns a snapshot of the current manifest.

func (*TailingReader) NewIterator

func (tr *TailingReader) NewIterator(ctx context.Context, opts IteratorOptions) (*Iterator, error)

NewIterator returns an iterator over the requested key range.

func (*TailingReader) Reader

func (tr *TailingReader) Reader() *Reader

Reader exposes the underlying Reader.

func (*TailingReader) Refresh

func (tr *TailingReader) Refresh(ctx context.Context) error

func (*TailingReader) Scan

func (tr *TailingReader) Scan(ctx context.Context, minKey, maxKey []byte) ([]KV, error)

Scan returns all key-value pairs in the given range.

func (*TailingReader) ScanLimit

func (tr *TailingReader) ScanLimit(ctx context.Context, minKey, maxKey []byte, limit int) ([]KV, error)

ScanLimit returns up to limit key-value pairs in the given range.

func (*TailingReader) Start

func (tr *TailingReader) Start() error

func (*TailingReader) Stop

func (tr *TailingReader) Stop()

Stop terminates the background refresh loop.

func (*TailingReader) Tail

func (tr *TailingReader) Tail(ctx context.Context, opts TailOptions, handler func(KV) error) error

Tail continuously scans for new keys and calls handler for each result.

func (*TailingReader) TailChannel

func (tr *TailingReader) TailChannel(ctx context.Context, opts TailOptions) (<-chan KV, <-chan error)

TailChannel returns a channel of KV updates and an error channel.

type TailingReaderOpenOptions

type TailingReaderOpenOptions struct {
	RefreshInterval time.Duration
	OnRefresh       func()
	OnRefreshError  func(error)

	ReaderOptions ReaderOpenOptions
}

TailingReaderOpenOptions configures a tailing reader.

func DefaultTailingReaderOpenOptions

func DefaultTailingReaderOpenOptions() TailingReaderOpenOptions

DefaultTailingReaderOpenOptions returns sane defaults for TailingReaderOpenOptions.

type TailingReaderOptions

type TailingReaderOptions struct {
	RefreshInterval time.Duration

	OnRefresh      func()
	OnRefreshError func(error)

	ReaderOptions ReaderOptions
}

func DefaultTailingReaderOptions

func DefaultTailingReaderOptions() TailingReaderOptions

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

Writer provides write access to the database.

func (*Writer) Close

func (w *Writer) Close() error

Close closes the writer, flushing any pending writes.

func (*Writer) Delete

func (w *Writer) Delete(key []byte) error

Delete marks a key as deleted.

func (*Writer) DeleteWithTTL

func (w *Writer) DeleteWithTTL(key []byte, ttl time.Duration) error

DeleteWithTTL marks a key as deleted with a time-to-live duration.

func (*Writer) Flush

func (w *Writer) Flush(ctx context.Context) error

Flush forces a flush of the current memtable to a new SST file.

func (*Writer) Put

func (w *Writer) Put(key, value []byte) error

Put writes a key-value pair to the database.

func (*Writer) PutWithTTL

func (w *Writer) PutWithTTL(key, value []byte, ttl time.Duration) error

PutWithTTL writes a key-value pair with a time-to-live duration.

type WriterMetrics added in v0.0.2

type WriterMetrics struct {
	FlushTotal        prometheus.Counter
	FlushErrors       prometheus.Counter
	FlushLatency      prometheus.Histogram
	FlushBytes        prometheus.Counter
	PutTotal          prometheus.Counter
	PutErrors         prometheus.Counter
	BackPressureTotal prometheus.Counter
	PutBlobTotal      prometheus.Counter
	PutBlobErrors     prometheus.Counter
	PutBlobLatency    prometheus.Histogram
	BlobBytesTotal    prometheus.Counter
	DeleteTotal       prometheus.Counter
}

func DefaultWriterMetrics added in v0.0.2

func DefaultWriterMetrics(constLabels prometheus.Labels) *WriterMetrics

func (*WriterMetrics) ObserveBackpressure added in v0.0.2

func (m *WriterMetrics) ObserveBackpressure()

func (*WriterMetrics) ObserveDelete added in v0.0.2

func (m *WriterMetrics) ObserveDelete()

func (*WriterMetrics) ObserveFlush added in v0.0.2

func (m *WriterMetrics) ObserveFlush(d time.Duration, err error)

func (*WriterMetrics) ObserveFlushBytes added in v0.0.2

func (m *WriterMetrics) ObserveFlushBytes(sizeBytes int64)

func (*WriterMetrics) ObservePut added in v0.0.2

func (m *WriterMetrics) ObservePut(err error)

func (*WriterMetrics) ObservePutBlob added in v0.0.2

func (m *WriterMetrics) ObservePutBlob(sizeBytes int, d time.Duration, err error)

type WriterOptions

type WriterOptions struct {
	MemtableSize    int64
	FlushInterval   time.Duration
	BloomBitsPerKey int
	BlockSize       int
	Compression     string
	// MaxImmutableMemtables limits pending memtables waiting to be flushed.
	// When the limit is reached, writers return ErrBackpressure.
	MaxImmutableMemtables int

	OnFlushError func(error)
	ValueStorage config.ValueStorageConfig
	OwnerID      string

	Metrics *WriterMetrics
}

func DefaultWriterOptions

func DefaultWriterOptions() WriterOptions

Directories

Path Synopsis
examples
kvfile command
wal-azblob command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL