kronosq

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 27, 2026 License: MIT Imports: 18 Imported by: 0

README

kronosq

Redis-backed task queue library for Go

Go Reference Build Go Version

kronosq is a reliable, Redis-backed distributed task queue for Go. It provides a simple, familiar API for enqueueing background jobs, processing them with a scalable worker pool, and observing them through an embedded web dashboard — all without running a separate process.


Table of Contents


Features

  • Simple, familiar API — ergonomics inspired by asynq; easy to adopt
  • Redis-backed — persistent, reliable task storage using Redis lists and sorted sets
  • At-least-once delivery — tasks survive worker crashes and are re-queued on restart; handlers should be idempotent
  • Automatic retries with exponential backoff — failed tasks are retried up to a configurable limit
  • Scheduled tasks — delay individual tasks or run them on a cron schedule
  • Dynamic worker auto-scaling — worker pool scales up under load and idles back down automatically
  • Task lifecycle event hooks — Slack, Discord, and Telegram notifiers built-in; bring your own with a simple function signature
  • Embedded dashboard — React SPA served directly from your binary; no separate process needed
  • Priority queues with weighted processing — route tasks to named queues with relative weights
  • Strict priority schedulingStrictPriority mode guarantees higher-weight queues are always drained before lower-priority ones
  • Task processing deadlines — hard-cancel a task's context at an absolute wall-clock time with DeadlineAt
  • Pause & resume queues — temporarily halt processing for any queue without stopping workers
  • Middleware support — attach logging, recovery, or custom interceptors to ServeMux
  • Redis Sentinel & Cluster — connect to highly-available Redis topologies with no code changes
  • CLI tool (kronq) — inspect queues, browse tasks, re-run failures, pause/resume queues from the terminal

Architecture

Architecture

Requirements

  • Go 1.21+
  • Redis 6.2+

Installation

go get github.com/Azzurriii/kronosq

Quick Start

The following example shows the two sides of kronosq: a producer that enqueues a task, and a worker server that processes it.

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"

    "github.com/Azzurriii/kronosq"
)

// --- Producer ---

func main() {
    redisOpt := kronosq.RedisClientOpt{Addr: "localhost:6379"}

    client := kronosq.NewClient(redisOpt)
    defer client.Close()

    payload, _ := json.Marshal(map[string]any{"user_id": 42})
    task := kronosq.NewTask("email:welcome", payload)

    if err := client.Enqueue(task, kronosq.Queue("default"), kronosq.MaxRetry(3)); err != nil {
        log.Fatal(err)
    }
    fmt.Println("task enqueued")
}
package main

import (
    "context"
    "fmt"
    "log"

    "github.com/Azzurriii/kronosq"
)

// --- Worker server ---

func main() {
    redisOpt := kronosq.RedisClientOpt{Addr: "localhost:6379"}

    srv := kronosq.NewServer(kronosq.ServerConfig{
        Redis:  redisOpt,
        Queues: map[string]int{"default": 1},
    })

    mux := kronosq.NewServeMux()
    mux.HandleFunc("email:welcome", func(ctx context.Context, t *kronosq.Task) error {
        fmt.Printf("sending welcome email — payload: %s\n", t.Payload)
        return nil
    })

    if err := srv.Run(mux); err != nil {
        log.Fatal(err)
    }
}

Start Redis, then run both programs. The server blocks until SIGINT or SIGTERM and shuts down gracefully.

make up          # start Redis via Docker Compose
go run ./example # run the bundled example app

Task Options

Options are passed as variadic arguments to client.Enqueue.

Option Type Default Description
Queue(name string) string "default" Route the task to a named queue
MaxRetry(n int) int 25 Maximum retry attempts before archiving
Delay(d time.Duration) duration 0 Schedule the task to run after a delay
Timeout(d time.Duration) duration 30m Per-task processing timeout
DeadlineAt(t time.Time) time.Time zero Hard cancel at an absolute wall-clock time; takes precedence over Timeout if sooner
Unique(ttl time.Duration) duration 0 Deduplicate tasks by type within a TTL window
client.Enqueue(task,
    kronosq.Queue("critical"),
    kronosq.MaxRetry(5),
    kronosq.Delay(10*time.Minute),
    kronosq.Timeout(2*time.Minute),
    kronosq.DeadlineAt(time.Now().Add(1*time.Hour)),
    kronosq.Unique(1*time.Hour),
)

Priority Queues

Assign integer weights to queues. The processor polls higher-weight queues more frequently in proportion to their weight — a critical task is processed roughly 6 times as often as a low task.

srv := kronosq.NewServer(kronosq.ServerConfig{
    Redis: redisOpt,
    Queues: map[string]int{
        "critical": 6,
        "default":  3,
        "low":      1,
    },
})

Route a task to a specific queue at enqueue time:

client.Enqueue(task, kronosq.Queue("critical"))
Strict priority

Enable StrictPriority to guarantee that queues are always checked in weight order. Workers will not poll default until critical is empty.

srv := kronosq.NewServer(kronosq.ServerConfig{
    Redis:          redisOpt,
    StrictPriority: true,
    Queues: map[string]int{
        "critical": 3,
        "default":  2,
        "low":      1,
    },
})

Scheduled Tasks (Cron)

Static schedule with Scheduler

Register tasks at startup time using standard cron expressions:

scheduler := kronosq.NewScheduler(redisOpt)

task := kronosq.NewTask("report:generate", nil)
scheduler.Add("*/5 * * * *", task, kronosq.Queue("default"))

if err := scheduler.Run(); err != nil {
    log.Fatal(err)
}
Dynamic schedule with PeriodicTaskManager

For cron schedules that change at runtime (e.g., stored in a database), implement the PeriodicTaskConfigProvider interface:

type DBProvider struct{ db *sql.DB }

func (p *DBProvider) GetConfigs() ([]*kronosq.PeriodicTaskConfig, error) {
    // Load cron configs from your database or any external source.
    return []*kronosq.PeriodicTaskConfig{
        {
            Cronspec: "0 9 * * 1-5",
            Task:     kronosq.NewTask("report:daily", nil),
            Opts:     []kronosq.Option{kronosq.Queue("default")},
        },
    }, nil
}

mgr := kronosq.NewPeriodicTaskManager(redisOpt, &DBProvider{db: db})
if err := mgr.Run(); err != nil {
    log.Fatal(err)
}

The manager re-reads the provider on each tick, so schedule changes take effect without a restart.


Middleware

Attach middleware to ServeMux to wrap every handler with cross-cutting concerns such as logging, recovery, or tracing. Middleware is applied in registration order — first added is outermost.

mux := kronosq.NewServeMux()

// Built-in middleware
mux.Use(kronosq.LoggingMiddleware)
mux.Use(kronosq.RecoveryMiddleware)

// Custom middleware
mux.Use(func(next kronosq.Handler) kronosq.Handler {
    return kronosq.HandlerFunc(func(ctx context.Context, t *kronosq.Task) error {
        start := time.Now()
        err := next.ProcessTask(ctx, t)
        log.Printf("task %s took %s", t.Type, time.Since(start))
        return err
    })
})

mux.HandleFunc("email:welcome", handleWelcome)

Pause & Resume Queues

Pause a queue to temporarily stop workers from picking up new tasks, without shutting down the server. Tasks already in-flight complete normally.

inspector := kronosq.NewInspector(redisOpt)

inspector.PauseQueue("low")   // workers skip this queue
inspector.ResumeQueue("low")  // processing resumes

Pause and resume are also available in the dashboard UI and via kronq:

kronq queues pause low
kronq queues resume low

Event Hooks

kronosq fires events at each stage of a task's lifecycle. Register handlers on the server to react to these events — for example, to send an alert when a task fails.

import (
    "github.com/Azzurriii/kronosq"
    "github.com/Azzurriii/kronosq/pkg/notify"
)

srv := kronosq.NewServer(cfg)

// Notify Slack on failure.
srv.OnEvent(kronosq.EventFailed, notify.Slack("https://hooks.slack.com/services/..."))

// Notify Discord on success.
srv.OnEvent(kronosq.EventSucceeded, notify.Discord("https://discord.com/api/webhooks/..."))

// Notify Telegram on retry.
srv.OnEvent(kronosq.EventRetried, notify.Telegram("<bot-token>", "<chat-id>"))
Event types
Constant Fires when
kronosq.EventStarted A worker picks up the task and begins processing
kronosq.EventSucceeded The handler returns nil
kronosq.EventFailed The handler returns an error and retries are exhausted
kronosq.EventRetried The handler returns an error but retries remain
kronosq.EventArchived The task is moved to the archive (permanent failure)
Custom notifiers

Any function matching func(ctx context.Context, event kronosq.TaskEvent) error is a valid EventHandler:

srv.OnEvent(kronosq.EventFailed, func(ctx context.Context, e kronosq.TaskEvent) error {
    log.Printf("task %s failed after %s: %v", e.Task.ID, e.Latency, e.Error)
    return nil
})

Events fire asynchronously and never block task processing.


Auto-Scaling

kronosq can automatically grow and shrink the worker pool in response to queue depth. Configure it via AutoScaleConfig in ServerConfig:

srv := kronosq.NewServer(kronosq.ServerConfig{
    Redis:  redisOpt,
    Queues: map[string]int{"default": 3, "critical": 6},
    AutoScale: &kronosq.AutoScaleConfig{
        Min:              2,
        Max:              20,
        ScaleUpThreshold: 2.0,
        IdleTimeout:      30 * time.Second,
        CheckInterval:    5 * time.Second,
    },
})

Scale-up: when pendingTasks > activeWorkers × ScaleUpThreshold, workers are added up to Max.

Scale-down: when a worker has been idle longer than IdleTimeout, it is removed down to Min.

The auto-scaler is a pure goroutine/semaphore mechanism with no external dependencies. When AutoScale is omitted, concurrency defaults to the sum of all queue weights.


Redis High Availability

kronosq supports Redis Sentinel and Redis Cluster through the RedisConnOpt interface. Switch topologies without changing any other code.

Redis Sentinel
srv := kronosq.NewServer(kronosq.ServerConfig{
    Redis: kronosq.RedisSentinelOpt{
        MasterName:    "mymaster",
        SentinelAddrs: []string{"sentinel1:26379", "sentinel2:26379"},
    },
    Queues: map[string]int{"default": 1},
})
Redis Cluster
srv := kronosq.NewServer(kronosq.ServerConfig{
    Redis: kronosq.RedisClusterOpt{
        Addrs: []string{"node1:7000", "node2:7001", "node3:7002"},
    },
    Queues: map[string]int{"default": 1},
})

NewClient, NewInspector, NewScheduler, and NewPeriodicTaskManager all accept the same RedisConnOpt interface.


Dashboard

Mount the embedded dashboard on your existing HTTP server. It serves a React SPA and a REST API, both compiled into your binary — no separate deployment needed.

import (
    "net/http"

    "github.com/Azzurriii/kronosq"
    "github.com/Azzurriii/kronosq/pkg/dashboard"
)

inspector := kronosq.NewInspector(redisOpt)
http.Handle("/dashboard/", dashboard.Handler(inspector))

http.ListenAndServe(":8080", nil)
// Visit: http://localhost:8080/dashboard/
Dashboard REST API
Method Route Description
GET /dashboard/api/queues List all queues with stats and pause state
GET /dashboard/api/queues/:name/tasks Browse tasks with status filter and pagination
POST /dashboard/api/queues/:name/pause Pause a queue
POST /dashboard/api/queues/:name/resume Resume a paused queue
GET /dashboard/api/servers List active worker server instances
POST /dashboard/api/tasks/:id/run Re-enqueue an archived or failed task
DELETE /dashboard/api/tasks/:id Remove a task from Redis

The SPA is built with Vite + React. To rebuild it from source:

make ui   # outputs to pkg/dashboard/ui/dist/, embedded on next go build

CLI (kronq)

kronq is the command-line interface for managing queues and tasks against a live Redis instance.

Installation
# With Go
go install github.com/Azzurriii/kronosq/cmd/kronq@latest

# Or with curl
curl -fsSL https://raw.githubusercontent.com/Azzurriii/kronosq/main/scripts/install.sh | sh
Commands
Command Description
kronq queues List all queues and their stats (pending, active, failed counts)
kronq queues pause <name> Pause a queue
kronq queues resume <name> Resume a paused queue
kronq tasks <queue> List tasks in a queue; filter by --status pending|active|failed
kronq task inspect <id> -q <queue> Show full task details including payload
kronq task run <id> -q <queue> Re-enqueue a failed task
kronq task delete <id> -q <queue> Remove a task from Redis
kronq servers List active worker server instances
kronq version Print the CLI version
kronq help [command] Show help for a command
Global flags
--redis   Redis address (default: localhost:6379, env: KRONQ_REDIS)
-o        Output format: table, json (default: table)
Examples
# List all queues
kronq queues

# Pause the low-priority queue temporarily
kronq queues pause low

# Show pending tasks in the critical queue
kronq tasks critical --status pending

# Inspect a specific task
kronq task inspect 3f2a1b4c -q critical

# Re-run a failed task
kronq task run 3f2a1b4c -q critical

# Retry all failed tasks in a queue
kronq tasks default --status failed --retry-all

# Check queue health — exits 1 if any queue has failures (useful in CI)
kronq queues --check-healthy

# Output as JSON
kronq queues -o json

Configuration Reference

ServerConfig
Field Type Default Description
Redis RedisConnOpt required Redis connection (RedisClientOpt, RedisSentinelOpt, or RedisClusterOpt)
Queues map[string]int {"default": 1} Queue names and their relative processing weights
StrictPriority bool false When true, higher-weight queues are always drained before lower-weight ones
AutoScale *AutoScaleConfig nil (disabled) Worker pool auto-scaling; if nil, concurrency equals the sum of queue weights
AutoScaleConfig
Field Type Default Description
Min int 2 Minimum number of active workers
Max int required Maximum number of active workers
ScaleUpThreshold float64 2.0 Ratio of pending tasks to active workers that triggers a scale-up
IdleTimeout time.Duration 30s Duration a worker must be idle before it is removed
CheckInterval time.Duration 5s How often the auto-scaler evaluates queue depth
RedisClientOpt
Field Type Default Description
Addr string "localhost:6379" Redis server address
Password string "" Redis password (AUTH)
DB int 0 Redis database index
RedisSentinelOpt
Field Type Description
MasterName string Sentinel master name
SentinelAddrs []string Addresses of sentinel nodes
Password string Redis password
DB int Redis database index
RedisClusterOpt
Field Type Description
Addrs []string Cluster node addresses
Password string Redis password

How it works

Client.Enqueue writes tasks to Redis lists and sorted sets. The Processor pool pulls tasks via BLMOVE, skipping any queue in the paused set. ServeMux routes each task by type through the middleware chain to the registered handler. On completion, EventBus fires registered notifiers asynchronously so they never block processing. The AutoScaler polls queue depth on an independent ticker and adjusts the goroutine pool size between Min and Max. The dashboard and kronq CLI read and write Redis state through the Inspector.


License

MIT — see LICENSE.

Documentation

Overview

Package kronosq provides a Redis-backed distributed task queue for Go.

kronosq lets you enqueue background jobs from one process and process them in another, with Redis as the durable store. It is inspired by asynq and adds automatic worker auto-scaling and built-in task lifecycle notifications (Slack, Discord, Telegram) out of the box.

Quick start

Enqueue a task from a producer:

client := kronosq.NewClient(kronosq.RedisClientOpt{Addr: "localhost:6379"})
defer client.Close()

payload, _ := json.Marshal(map[string]any{"user_id": 42})
task := kronosq.NewTask("email:welcome", payload)
if err := client.Enqueue(task, kronosq.Queue("default"), kronosq.MaxRetry(3)); err != nil {
    log.Fatal(err)
}

Process tasks in a worker server:

srv := kronosq.NewServer(kronosq.ServerConfig{
    Redis:  kronosq.RedisClientOpt{Addr: "localhost:6379"},
    Queues: map[string]int{"default": 1},
})
mux := kronosq.NewServeMux()
mux.HandleFunc("email:welcome", func(ctx context.Context, t *kronosq.Task) error {
    // handle the task
    return nil
})
if err := srv.Run(mux); err != nil {
    log.Fatal(err)
}

Redis connection

All constructors accept a RedisConnOpt. Three implementations are provided:

Enqueue options

Pass functional Option values to Client.Enqueue to customise behaviour:

  • Queue — route the task to a named queue (default: "default")
  • MaxRetry — cap retry attempts on failure (default: 25)
  • Delay — schedule the task to run after a duration
  • Timeout — per-task processing timeout (default: 30m)
  • DeadlineAt — hard wall-clock deadline; takes precedence over Timeout if sooner
  • Unique — deduplicate tasks of the same type within a TTL window

Priority queues

Queues carry integer weights. With the default weighted round-robin a queue with weight 6 is polled roughly 6× as often as one with weight 1. Set [ServerConfig.StrictPriority] to true to always drain higher-weight queues entirely before polling lower-weight ones.

Scheduling

Scheduler registers static cron tasks at startup using standard cron expressions. PeriodicTaskManager drives a dynamic schedule sourced from a PeriodicTaskConfigProvider — useful when cron specs live in a database and must change without a restart.

Event hooks

Register an EventHandler on the server to react to task lifecycle events:

srv.OnEvent(kronosq.EventFailed, notify.Slack("https://hooks.slack.com/..."))

Available event constants: EventStarted, EventSucceeded, EventFailed, EventRetried, EventArchived. Events fire asynchronously and never block task processing.

Delivery semantics

kronosq provides at-least-once delivery. If a server crashes while a task is being processed, the task is moved back to pending on the next startup and will be re-processed. Handlers must be idempotent.

Observability

Inspector provides a read/write API over queues and tasks without requiring a running server. It is used internally by the embedded dashboard and the kronq CLI, and can also be used directly in your own tooling.

Sub-packages

Additional functionality lives in sub-packages that can be imported independently:

Index

Constants

View Source
const (
	// EventStarted fires when a worker picks up the task and begins processing.
	EventStarted = event.Started
	// EventSucceeded fires when the handler returns nil.
	EventSucceeded = event.Succeeded
	// EventFailed fires when the handler returns an error and all retries are
	// exhausted. The task is archived after this event.
	EventFailed = event.Failed
	// EventRetried fires when the handler returns an error but retries remain.
	EventRetried = event.Retried
	// EventArchived fires when the task is moved to the permanent failure archive.
	EventArchived = event.Archived
)

Event type constants for use with Server.OnEvent.

Variables

This section is empty.

Functions

func WithSyncInterval added in v0.2.0

func WithSyncInterval(d time.Duration) func(*PeriodicTaskManager)

WithSyncInterval sets how often the manager re-polls the provider (default: 3 minutes).

Types

type AutoScaleConfig

type AutoScaleConfig struct {
	// Min is the minimum number of concurrent workers. Defaults to 2 if zero.
	Min int
	// Max is the maximum number of concurrent workers.
	Max int
	// ScaleUpThreshold is the ratio of pending tasks to active workers that
	// triggers adding a new worker. Defaults to 2.0 if zero.
	ScaleUpThreshold float64
	// IdleTimeout is how long a worker must be idle before it is removed down
	// to Min. Defaults to 30s if zero.
	IdleTimeout time.Duration
	// CheckInterval is how often the auto-scaler evaluates queue depth.
	// Defaults to 5s if zero.
	CheckInterval time.Duration
}

AutoScaleConfig configures the auto-scaling behaviour of the worker pool. When set on [ServerConfig.AutoScale], kronosq automatically grows and shrinks the goroutine pool based on queue depth.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client enqueues tasks onto Redis-backed queues.

func NewClient

func NewClient(opt RedisConnOpt) *Client

NewClient creates a new Client connected to Redis. opt can be a RedisClientOpt, RedisSentinelOpt, or RedisClusterOpt.

func (*Client) Close

func (c *Client) Close() error

Close closes the underlying Redis connection.

func (*Client) Enqueue

func (c *Client) Enqueue(task *Task, opts ...Option) error

Enqueue pushes a task onto a queue with the provided options.

type EventHandler

type EventHandler = event.EventHandler

EventHandler is a function that handles a task lifecycle event.

type Handler

type Handler interface {
	ProcessTask(ctx context.Context, task *Task) error
}

Handler processes a task.

func LoggingMiddleware added in v0.2.0

func LoggingMiddleware(next Handler) Handler

LoggingMiddleware logs task type, queue, duration, and outcome.

func RecoveryMiddleware added in v0.2.0

func RecoveryMiddleware(next Handler) Handler

RecoveryMiddleware recovers from panics in handlers and returns them as errors.

type HandlerFunc

type HandlerFunc func(ctx context.Context, task *Task) error

HandlerFunc is a function that implements Handler.

func (HandlerFunc) ProcessTask

func (f HandlerFunc) ProcessTask(ctx context.Context, task *Task) error

type Inspector

type Inspector struct {
	// contains filtered or unexported fields
}

Inspector provides a read API over queues and tasks (used by the dashboard and CLI).

func NewInspector

func NewInspector(opt RedisConnOpt) *Inspector

NewInspector creates a new Inspector connected to Redis. opt can be a RedisClientOpt, RedisSentinelOpt, or RedisClusterOpt.

func (*Inspector) Close

func (i *Inspector) Close() error

Close releases the underlying Redis connection.

func (*Inspector) DeleteTask

func (i *Inspector) DeleteTask(queue, id string) error

DeleteTask removes a task from Redis entirely.

func (*Inspector) GetTask

func (i *Inspector) GetTask(id string) (*TaskInfo, error)

GetTask retrieves a single task by ID.

func (*Inspector) PauseQueue added in v0.2.0

func (i *Inspector) PauseQueue(queue string) error

PauseQueue pauses task processing for the named queue.

func (*Inspector) Queues

func (i *Inspector) Queues() ([]*QueueInfo, error)

Queues returns stats for all known queues.

func (*Inspector) ResumeQueue added in v0.2.0

func (i *Inspector) ResumeQueue(queue string) error

ResumeQueue resumes task processing for the named queue.

func (*Inspector) RunTask

func (i *Inspector) RunTask(queue, id string) error

RunTask re-enqueues an archived/failed task.

func (*Inspector) Servers

func (i *Inspector) Servers() ([]*ServerInfo, error)

Servers returns all currently active server instances.

func (*Inspector) Tasks

func (i *Inspector) Tasks(queue, status string, page, pageSize int) ([]*TaskInfo, error)

Tasks returns a page of tasks for the given queue and status. status: "pending" | "active" | "failed" page is 1-based.

type MiddlewareFunc added in v0.2.0

type MiddlewareFunc func(Handler) Handler

MiddlewareFunc wraps a Handler — use to add logging, recovery, auth, etc.

type Option

type Option func(*enqueueOptions)

Option is a functional option for enqueue operations.

func DeadlineAt added in v0.2.0

func DeadlineAt(t time.Time) Option

DeadlineAt sets an absolute wall-clock deadline for task processing. The handler's context is cancelled at time t if the task has not finished. Takes precedence over Timeout when both are set and this deadline is sooner.

func Delay

func Delay(d time.Duration) Option

Delay schedules the task to be processed after the given duration. The task is placed in the scheduled set and moved to pending once the delay has elapsed.

func MaxRetry

func MaxRetry(n int) Option

MaxRetry sets the maximum number of retry attempts before the task is archived as a permanent failure. Defaults to 25 when not set.

func Queue

func Queue(name string) Option

Queue specifies which queue the task is enqueued into. Defaults to "default" when not set.

func Timeout

func Timeout(d time.Duration) Option

Timeout sets the per-task processing timeout. The handler's context is cancelled after this duration. Defaults to 30m when not set. Use DeadlineAt to set an absolute wall-clock deadline instead.

func Unique

func Unique(ttl time.Duration) Option

Unique deduplicates tasks of the same type within the given TTL window. If a task with the same queue and type already exists, Client.Enqueue returns rdb.ErrDuplicate and the second task is dropped.

type PeriodicTaskConfig

type PeriodicTaskConfig struct {
	Cronspec string
	Task     *Task
	Opts     []Option
}

PeriodicTaskConfig describes one dynamically-provided periodic task.

type PeriodicTaskConfigProvider

type PeriodicTaskConfigProvider interface {
	GetConfigs() ([]*PeriodicTaskConfig, error)
}

PeriodicTaskConfigProvider is implemented by callers that supply dynamic cron tasks (e.g. from a database).

type PeriodicTaskManager

type PeriodicTaskManager struct {
	// contains filtered or unexported fields
}

PeriodicTaskManager drives a dynamic cron schedule via a provider.

func NewPeriodicTaskManager

func NewPeriodicTaskManager(opt RedisConnOpt, p PeriodicTaskConfigProvider, fns ...func(*PeriodicTaskManager)) *PeriodicTaskManager

NewPeriodicTaskManager creates a new PeriodicTaskManager. opt can be a RedisClientOpt, RedisSentinelOpt, or RedisClusterOpt.

func (*PeriodicTaskManager) Run

func (m *PeriodicTaskManager) Run() error

Run starts the manager: syncs immediately, then re-syncs on ticker. Blocks until shutdown.

func (*PeriodicTaskManager) Shutdown

func (m *PeriodicTaskManager) Shutdown()

Shutdown gracefully stops the manager.

type QueueInfo

type QueueInfo struct {
	Name      string `json:"name"`
	Pending   int    `json:"pending"`
	Active    int    `json:"active"`
	Scheduled int    `json:"scheduled"`
	Failed    int    `json:"failed"`
	Size      int    `json:"size"`
	Paused    bool   `json:"paused"`
}

QueueInfo holds statistics for a single queue.

type RedisClientOpt

type RedisClientOpt struct {
	// Addr is the Redis server address in "host:port" form.
	// Defaults to "localhost:6379" when empty.
	Addr string
	// Password is the Redis AUTH password. Leave empty if not set.
	Password string
	// DB is the Redis database index. Defaults to 0.
	DB int
}

RedisClientOpt holds options for connecting to a single Redis node. It implements RedisConnOpt.

type RedisClusterOpt added in v0.2.0

type RedisClusterOpt struct {
	// Addrs is the list of cluster node addresses (host:port).
	Addrs []string
	// Password is the Redis AUTH password.
	Password string
}

RedisClusterOpt holds options for connecting to a Redis Cluster. It implements RedisConnOpt.

type RedisConnOpt added in v0.2.0

type RedisConnOpt interface {
	// contains filtered or unexported methods
}

RedisConnOpt is implemented by all Redis connection option types. Pass any implementation to NewClient, NewServer, NewInspector, etc.

type RedisSentinelOpt added in v0.2.0

type RedisSentinelOpt struct {
	// MasterName is the Sentinel master group name.
	MasterName string
	// SentinelAddrs is the list of Sentinel node addresses (host:port).
	SentinelAddrs []string
	// Password is the Redis AUTH password for the master/replicas.
	Password string
	// SentinelPassword is the AUTH password for the Sentinel nodes themselves.
	SentinelPassword string
	// DB is the Redis database index.
	DB int
}

RedisSentinelOpt holds options for connecting to Redis via Sentinel for automatic failover. It implements RedisConnOpt.

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler registers static cron tasks that are enqueued on a schedule.

func NewScheduler

func NewScheduler(opt RedisConnOpt) *Scheduler

NewScheduler creates a new Scheduler. opt can be a RedisClientOpt, RedisSentinelOpt, or RedisClusterOpt.

func (*Scheduler) Add

func (s *Scheduler) Add(cronExpr string, task *Task, opts ...Option) error

Add registers a cron job. Returns error if cronExpr is invalid.

func (*Scheduler) Run

func (s *Scheduler) Run() error

Run starts the cron scheduler and blocks until SIGINT/SIGTERM or Shutdown().

func (*Scheduler) Shutdown

func (s *Scheduler) Shutdown()

Shutdown gracefully stops the scheduler.

type ServeMux

type ServeMux struct {
	// contains filtered or unexported fields
}

ServeMux routes task types to their handlers.

func NewServeMux

func NewServeMux() *ServeMux

NewServeMux creates a new ServeMux.

func (*ServeMux) Handle

func (mux *ServeMux) Handle(taskType string, h Handler)

Handle registers a handler for the given task type.

func (*ServeMux) HandleFunc

func (mux *ServeMux) HandleFunc(taskType string, fn func(ctx context.Context, task *Task) error)

HandleFunc registers a handler function for the given task type.

func (*ServeMux) ProcessTask

func (mux *ServeMux) ProcessTask(ctx context.Context, task *Task) error

ProcessTask dispatches the task to the registered handler.

func (*ServeMux) Use added in v0.2.0

func (mux *ServeMux) Use(mw ...MiddlewareFunc)

Use appends middleware to the chain. Applied in order: first added = outermost.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server is the worker entrypoint — it starts the processor pool and wires all internal components together.

func NewServer

func NewServer(cfg ServerConfig) *Server

NewServer creates a new Server from the given config.

func (*Server) OnEvent

func (s *Server) OnEvent(eventType string, h EventHandler)

OnEvent registers an EventHandler for the given event type.

func (*Server) Run

func (s *Server) Run(mux *ServeMux) error

Run starts the server and blocks until SIGINT/SIGTERM is received.

func (*Server) Shutdown

func (s *Server) Shutdown() error

Shutdown gracefully stops the server.

type ServerConfig

type ServerConfig struct {
	// Redis is the Redis connection option. Required.
	// Accepts [RedisClientOpt], [RedisSentinelOpt], or [RedisClusterOpt].
	Redis RedisConnOpt
	// Queues maps queue names to their relative processing weights.
	// Workers poll higher-weight queues proportionally more often.
	// Defaults to {"default": 1} if empty.
	Queues map[string]int
	// AutoScale configures automatic worker pool scaling based on queue depth.
	// When nil (default), concurrency is fixed at the sum of all queue weights.
	AutoScale *AutoScaleConfig
	// StrictPriority, when true, always drains higher-weight queues entirely
	// before polling lower-weight ones. When false (default), uses weighted
	// round-robin so all queues make steady progress.
	StrictPriority bool
}

ServerConfig holds all configuration for a Server.

type ServerInfo

type ServerInfo struct {
	ID          string         `json:"id"`
	Host        string         `json:"host"`
	Queues      map[string]int `json:"queues"`
	Concurrency int            `json:"concurrency"`
	StartedAt   int64          `json:"started_at"`
}

ServerInfo holds metadata about an active server instance.

type Task

type Task struct {
	Type    string
	Payload []byte
}

Task holds the type and payload of a unit of work.

func NewTask

func NewTask(typeName string, payload []byte) *Task

NewTask creates a new task with the given type and payload.

type TaskEvent

type TaskEvent = event.TaskEvent

TaskEvent is an alias for pkg/event.TaskEvent.

type TaskInfo

type TaskInfo struct {
	ID      string `json:"id"`
	Type    string `json:"type"`
	Payload []byte `json:"payload"`
	Queue   string `json:"queue"`
	Status  string `json:"status"`
	Retry   int    `json:"retry"`
	Retried int    `json:"retried"`
}

TaskInfo holds the inspectable fields of a task.

Directories

Path Synopsis
cmd
kronq command
internal
rdb
pkg
dashboard
Package dashboard serves an embedded web dashboard for kronosq.
Package dashboard serves an embedded web dashboard for kronosq.
event
Package event defines the task lifecycle event types used by kronosq.
Package event defines the task lifecycle event types used by kronosq.
notify
Package notify provides pre-built event.EventHandler implementations that send task lifecycle notifications to external services.
Package notify provides pre-built event.EventHandler implementations that send task lifecycle notifications to external services.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL