Documentation
¶
Overview ¶
Package kronosq provides a Redis-backed distributed task queue for Go.
kronosq lets you enqueue background jobs from one process and process them in another, with Redis as the durable store. It is inspired by asynq and adds automatic worker auto-scaling and built-in task lifecycle notifications (Slack, Discord, Telegram) out of the box.
Quick start ¶
Enqueue a task from a producer:
client := kronosq.NewClient(kronosq.RedisClientOpt{Addr: "localhost:6379"})
defer client.Close()
payload, _ := json.Marshal(map[string]any{"user_id": 42})
task := kronosq.NewTask("email:welcome", payload)
if err := client.Enqueue(task, kronosq.Queue("default"), kronosq.MaxRetry(3)); err != nil {
log.Fatal(err)
}
Process tasks in a worker server:
srv := kronosq.NewServer(kronosq.ServerConfig{
Redis: kronosq.RedisClientOpt{Addr: "localhost:6379"},
Queues: map[string]int{"default": 1},
})
mux := kronosq.NewServeMux()
mux.HandleFunc("email:welcome", func(ctx context.Context, t *kronosq.Task) error {
// handle the task
return nil
})
if err := srv.Run(mux); err != nil {
log.Fatal(err)
}
Redis connection ¶
All constructors accept a RedisConnOpt. Three implementations are provided:
- RedisClientOpt — single Redis node
- RedisSentinelOpt — Redis Sentinel for automatic failover
- RedisClusterOpt — Redis Cluster for horizontal scaling
Enqueue options ¶
Pass functional Option values to Client.Enqueue to customise behaviour:
- Queue — route the task to a named queue (default: "default")
- MaxRetry — cap retry attempts on failure (default: 25)
- Delay — schedule the task to run after a duration
- Timeout — per-task processing timeout (default: 30m)
- DeadlineAt — hard wall-clock deadline; takes precedence over Timeout if sooner
- Unique — deduplicate tasks of the same type within a TTL window
Priority queues ¶
Queues carry integer weights. With the default weighted round-robin a queue with weight 6 is polled roughly 6× as often as one with weight 1. Set [ServerConfig.StrictPriority] to true to always drain higher-weight queues entirely before polling lower-weight ones.
Scheduling ¶
Scheduler registers static cron tasks at startup using standard cron expressions. PeriodicTaskManager drives a dynamic schedule sourced from a PeriodicTaskConfigProvider — useful when cron specs live in a database and must change without a restart.
Event hooks ¶
Register an EventHandler on the server to react to task lifecycle events:
srv.OnEvent(kronosq.EventFailed, notify.Slack("https://hooks.slack.com/..."))
Available event constants: EventStarted, EventSucceeded, EventFailed, EventRetried, EventArchived. Events fire asynchronously and never block task processing.
Delivery semantics ¶
kronosq provides at-least-once delivery. If a server crashes while a task is being processed, the task is moved back to pending on the next startup and will be re-processed. Handlers must be idempotent.
Observability ¶
Inspector provides a read/write API over queues and tasks without requiring a running server. It is used internally by the embedded dashboard and the kronq CLI, and can also be used directly in your own tooling.
Sub-packages ¶
Additional functionality lives in sub-packages that can be imported independently:
- github.com/Azzurriii/kronosq/pkg/event — TaskEvent types and event constants
- github.com/Azzurriii/kronosq/pkg/notify — Slack, Discord, and Telegram notifiers
- github.com/Azzurriii/kronosq/pkg/dashboard — embedded HTTP dashboard served from your binary
Index ¶
- Constants
- func WithSyncInterval(d time.Duration) func(*PeriodicTaskManager)
- type AutoScaleConfig
- type Client
- type EventHandler
- type Handler
- type HandlerFunc
- type Inspector
- func (i *Inspector) Close() error
- func (i *Inspector) DeleteTask(queue, id string) error
- func (i *Inspector) GetTask(id string) (*TaskInfo, error)
- func (i *Inspector) PauseQueue(queue string) error
- func (i *Inspector) Queues() ([]*QueueInfo, error)
- func (i *Inspector) ResumeQueue(queue string) error
- func (i *Inspector) RunTask(queue, id string) error
- func (i *Inspector) Servers() ([]*ServerInfo, error)
- func (i *Inspector) Tasks(queue, status string, page, pageSize int) ([]*TaskInfo, error)
- type MiddlewareFunc
- type Option
- type PeriodicTaskConfig
- type PeriodicTaskConfigProvider
- type PeriodicTaskManager
- type QueueInfo
- type RedisClientOpt
- type RedisClusterOpt
- type RedisConnOpt
- type RedisSentinelOpt
- type Scheduler
- type ServeMux
- type Server
- type ServerConfig
- type ServerInfo
- type Task
- type TaskEvent
- type TaskInfo
Constants ¶
const ( // EventStarted fires when a worker picks up the task and begins processing. EventStarted = event.Started // EventSucceeded fires when the handler returns nil. EventSucceeded = event.Succeeded // EventFailed fires when the handler returns an error and all retries are // exhausted. The task is archived after this event. EventFailed = event.Failed // EventRetried fires when the handler returns an error but retries remain. EventRetried = event.Retried // EventArchived fires when the task is moved to the permanent failure archive. EventArchived = event.Archived )
Event type constants for use with Server.OnEvent.
Variables ¶
This section is empty.
Functions ¶
func WithSyncInterval ¶ added in v0.2.0
func WithSyncInterval(d time.Duration) func(*PeriodicTaskManager)
WithSyncInterval sets how often the manager re-polls the provider (default: 3 minutes).
Types ¶
type AutoScaleConfig ¶
type AutoScaleConfig struct {
// Min is the minimum number of concurrent workers. Defaults to 2 if zero.
Min int
// Max is the maximum number of concurrent workers.
Max int
// ScaleUpThreshold is the ratio of pending tasks to active workers that
// triggers adding a new worker. Defaults to 2.0 if zero.
ScaleUpThreshold float64
// IdleTimeout is how long a worker must be idle before it is removed down
// to Min. Defaults to 30s if zero.
IdleTimeout time.Duration
// CheckInterval is how often the auto-scaler evaluates queue depth.
// Defaults to 5s if zero.
CheckInterval time.Duration
}
AutoScaleConfig configures the auto-scaling behaviour of the worker pool. When set on [ServerConfig.AutoScale], kronosq automatically grows and shrinks the goroutine pool based on queue depth.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client enqueues tasks onto Redis-backed queues.
func NewClient ¶
func NewClient(opt RedisConnOpt) *Client
NewClient creates a new Client connected to Redis. opt can be a RedisClientOpt, RedisSentinelOpt, or RedisClusterOpt.
type EventHandler ¶
type EventHandler = event.EventHandler
EventHandler is a function that handles a task lifecycle event.
type Handler ¶
Handler processes a task.
func LoggingMiddleware ¶ added in v0.2.0
LoggingMiddleware logs task type, queue, duration, and outcome.
func RecoveryMiddleware ¶ added in v0.2.0
RecoveryMiddleware recovers from panics in handlers and returns them as errors.
type HandlerFunc ¶
HandlerFunc is a function that implements Handler.
func (HandlerFunc) ProcessTask ¶
func (f HandlerFunc) ProcessTask(ctx context.Context, task *Task) error
type Inspector ¶
type Inspector struct {
// contains filtered or unexported fields
}
Inspector provides a read API over queues and tasks (used by the dashboard and CLI).
func NewInspector ¶
func NewInspector(opt RedisConnOpt) *Inspector
NewInspector creates a new Inspector connected to Redis. opt can be a RedisClientOpt, RedisSentinelOpt, or RedisClusterOpt.
func (*Inspector) DeleteTask ¶
DeleteTask removes a task from Redis entirely.
func (*Inspector) PauseQueue ¶ added in v0.2.0
PauseQueue pauses task processing for the named queue.
func (*Inspector) ResumeQueue ¶ added in v0.2.0
ResumeQueue resumes task processing for the named queue.
func (*Inspector) Servers ¶
func (i *Inspector) Servers() ([]*ServerInfo, error)
Servers returns all currently active server instances.
type MiddlewareFunc ¶ added in v0.2.0
MiddlewareFunc wraps a Handler — use to add logging, recovery, auth, etc.
type Option ¶
type Option func(*enqueueOptions)
Option is a functional option for enqueue operations.
func DeadlineAt ¶ added in v0.2.0
DeadlineAt sets an absolute wall-clock deadline for task processing. The handler's context is cancelled at time t if the task has not finished. Takes precedence over Timeout when both are set and this deadline is sooner.
func Delay ¶
Delay schedules the task to be processed after the given duration. The task is placed in the scheduled set and moved to pending once the delay has elapsed.
func MaxRetry ¶
MaxRetry sets the maximum number of retry attempts before the task is archived as a permanent failure. Defaults to 25 when not set.
func Queue ¶
Queue specifies which queue the task is enqueued into. Defaults to "default" when not set.
func Timeout ¶
Timeout sets the per-task processing timeout. The handler's context is cancelled after this duration. Defaults to 30m when not set. Use DeadlineAt to set an absolute wall-clock deadline instead.
func Unique ¶
Unique deduplicates tasks of the same type within the given TTL window. If a task with the same queue and type already exists, Client.Enqueue returns rdb.ErrDuplicate and the second task is dropped.
type PeriodicTaskConfig ¶
PeriodicTaskConfig describes one dynamically-provided periodic task.
type PeriodicTaskConfigProvider ¶
type PeriodicTaskConfigProvider interface {
GetConfigs() ([]*PeriodicTaskConfig, error)
}
PeriodicTaskConfigProvider is implemented by callers that supply dynamic cron tasks (e.g. from a database).
type PeriodicTaskManager ¶
type PeriodicTaskManager struct {
// contains filtered or unexported fields
}
PeriodicTaskManager drives a dynamic cron schedule via a provider.
func NewPeriodicTaskManager ¶
func NewPeriodicTaskManager(opt RedisConnOpt, p PeriodicTaskConfigProvider, fns ...func(*PeriodicTaskManager)) *PeriodicTaskManager
NewPeriodicTaskManager creates a new PeriodicTaskManager. opt can be a RedisClientOpt, RedisSentinelOpt, or RedisClusterOpt.
func (*PeriodicTaskManager) Run ¶
func (m *PeriodicTaskManager) Run() error
Run starts the manager: syncs immediately, then re-syncs on ticker. Blocks until shutdown.
func (*PeriodicTaskManager) Shutdown ¶
func (m *PeriodicTaskManager) Shutdown()
Shutdown gracefully stops the manager.
type QueueInfo ¶
type QueueInfo struct {
Name string `json:"name"`
Pending int `json:"pending"`
Active int `json:"active"`
Scheduled int `json:"scheduled"`
Failed int `json:"failed"`
Size int `json:"size"`
Paused bool `json:"paused"`
}
QueueInfo holds statistics for a single queue.
type RedisClientOpt ¶
type RedisClientOpt struct {
// Addr is the Redis server address in "host:port" form.
// Defaults to "localhost:6379" when empty.
Addr string
// Password is the Redis AUTH password. Leave empty if not set.
Password string
// DB is the Redis database index. Defaults to 0.
DB int
}
RedisClientOpt holds options for connecting to a single Redis node. It implements RedisConnOpt.
type RedisClusterOpt ¶ added in v0.2.0
type RedisClusterOpt struct {
// Addrs is the list of cluster node addresses (host:port).
Addrs []string
// Password is the Redis AUTH password.
Password string
}
RedisClusterOpt holds options for connecting to a Redis Cluster. It implements RedisConnOpt.
type RedisConnOpt ¶ added in v0.2.0
type RedisConnOpt interface {
// contains filtered or unexported methods
}
RedisConnOpt is implemented by all Redis connection option types. Pass any implementation to NewClient, NewServer, NewInspector, etc.
type RedisSentinelOpt ¶ added in v0.2.0
type RedisSentinelOpt struct {
// MasterName is the Sentinel master group name.
MasterName string
// SentinelAddrs is the list of Sentinel node addresses (host:port).
SentinelAddrs []string
// Password is the Redis AUTH password for the master/replicas.
Password string
// SentinelPassword is the AUTH password for the Sentinel nodes themselves.
SentinelPassword string
// DB is the Redis database index.
DB int
}
RedisSentinelOpt holds options for connecting to Redis via Sentinel for automatic failover. It implements RedisConnOpt.
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
Scheduler registers static cron tasks that are enqueued on a schedule.
func NewScheduler ¶
func NewScheduler(opt RedisConnOpt) *Scheduler
NewScheduler creates a new Scheduler. opt can be a RedisClientOpt, RedisSentinelOpt, or RedisClusterOpt.
type ServeMux ¶
type ServeMux struct {
// contains filtered or unexported fields
}
ServeMux routes task types to their handlers.
func (*ServeMux) HandleFunc ¶
HandleFunc registers a handler function for the given task type.
func (*ServeMux) ProcessTask ¶
ProcessTask dispatches the task to the registered handler.
func (*ServeMux) Use ¶ added in v0.2.0
func (mux *ServeMux) Use(mw ...MiddlewareFunc)
Use appends middleware to the chain. Applied in order: first added = outermost.
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server is the worker entrypoint — it starts the processor pool and wires all internal components together.
func NewServer ¶
func NewServer(cfg ServerConfig) *Server
NewServer creates a new Server from the given config.
func (*Server) OnEvent ¶
func (s *Server) OnEvent(eventType string, h EventHandler)
OnEvent registers an EventHandler for the given event type.
type ServerConfig ¶
type ServerConfig struct {
// Redis is the Redis connection option. Required.
// Accepts [RedisClientOpt], [RedisSentinelOpt], or [RedisClusterOpt].
Redis RedisConnOpt
// Queues maps queue names to their relative processing weights.
// Workers poll higher-weight queues proportionally more often.
// Defaults to {"default": 1} if empty.
Queues map[string]int
// AutoScale configures automatic worker pool scaling based on queue depth.
// When nil (default), concurrency is fixed at the sum of all queue weights.
AutoScale *AutoScaleConfig
// StrictPriority, when true, always drains higher-weight queues entirely
// before polling lower-weight ones. When false (default), uses weighted
// round-robin so all queues make steady progress.
StrictPriority bool
}
ServerConfig holds all configuration for a Server.
type ServerInfo ¶
type ServerInfo struct {
ID string `json:"id"`
Host string `json:"host"`
Queues map[string]int `json:"queues"`
Concurrency int `json:"concurrency"`
StartedAt int64 `json:"started_at"`
}
ServerInfo holds metadata about an active server instance.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
cmd
|
|
|
kronq
command
|
|
|
internal
|
|
|
pkg
|
|
|
dashboard
Package dashboard serves an embedded web dashboard for kronosq.
|
Package dashboard serves an embedded web dashboard for kronosq. |
|
event
Package event defines the task lifecycle event types used by kronosq.
|
Package event defines the task lifecycle event types used by kronosq. |
|
notify
Package notify provides pre-built event.EventHandler implementations that send task lifecycle notifications to external services.
|
Package notify provides pre-built event.EventHandler implementations that send task lifecycle notifications to external services. |
