pulse
Guides

Using the SDK

go get github.com/bete7512/pulse

One *pulse.Client plays both roles — producer and worker — over a single connection.

Connect

p, err := pulse.New("localhost:50051",
	pulse.WithConcurrency(20),               // parallel handlers (default 10)
	pulse.WithUserPass("worker", "hunter2"), // when the server sets PULSE_AUTH_USERS
	pulse.WithTLS(nil),                      // system roots; pass *tls.Config to customize
)
defer p.Close()

Handle jobs

Handlers are plain typed functions; the SDK decodes the JSON payload:

type EmailArgs struct {
	To      string `json:"to"`
	Subject string `json:"subject"`
}

pulse.Register(p, "send-email", func(ctx context.Context, a EmailArgs) error {
	return sendEmail(a.To, a.Subject) // error → retry with backoff, then dead-letter
})

p.Run(ctx) // blocks; processes jobs until ctx is cancelled

Delivery is at-least-once: a handler can run more than once for the same job (crash recovery, network partitions). Key side effects on the job's identity:

pulse.Register(p, "charge", func(ctx context.Context, a ChargeArgs) error {
	job := pulse.FromContext(ctx) // ID and Attempt — the idempotency key
	return charge(a, fmt.Sprintf("%s-%d", job.ID, job.Attempt))
})

While a handler runs, the SDK heartbeats every ~10s so long jobs aren't reclaimed; if the process dies, the server's watchdog re-dispatches the job to another worker.

Enqueue

id, err := pulse.Enqueue(ctx, p, "send-email", EmailArgs{To: "a@b.com", Subject: "Welcome"})

// urgent work jumps the queue (default priority 0, FIFO within a priority):
pulse.Enqueue(ctx, p, "send-email", EmailArgs{To: "vip@b.com"}, pulse.WithPriority(10))

// raw escape hatch for dynamic payloads (must be valid JSON or empty):
p.Submit(ctx, "send-email", []byte(`{"to":"a@b.com"}`))

Failed jobs retry with attempts² backoff (1s, 4s) and dead-letter after 3 attempts with the last error preserved — inspect them with pulse jobs list --status dead_lettered.

Schedule

p.Schedule(ctx, "reminder",  payload, pulse.At(tomorrow9am))       // once, at a time
p.Schedule(ctx, "reminder",  payload, pulse.After(10*time.Minute)) // once, after a delay
p.Schedule(ctx, "reconcile", payload, pulse.Every(5*time.Minute))  // fixed interval
p.Schedule(ctx, "rollup",    payload, pulse.Cron("0 * * * *"))     // 5-field cron

// typed mirror of Enqueue:
pulse.ScheduleJob(ctx, p, "report", ReportArgs{Name: "daily"}, pulse.Cron("0 6 * * *"))

p.PauseSchedule(ctx, id); p.ResumeSchedule(ctx, id); p.DeleteSchedule(ctx, id)
p.ListScheduleJobs(ctx, id)  // the jobs a schedule spawned, with status
p.ListScheduleFires(ctx, id) // when it fired → which job

Fires are exactly-once per occurrence, safe across server restarts and replicas. A scheduled job arrives at the same Registered handler as any other job.

Operate

p.PauseDispatch(ctx, "db maintenance") // workers stop receiving NEW jobs; running ones finish
st, _ := p.DispatchStatus(ctx)         // st.Paused, st.Reason, st.PausedAt
p.ResumeDispatch(ctx)                  // backlog drains, priority-ordered

jobs, _ := p.ListJobs(ctx, pulse.JobQuery{Status: "DEAD_LETTERED", Topic: "report", Limit: 50})
job, _ := p.GetJob(ctx, id) // full state: status, attempts, payload, timestamps, last error
p.CancelJob(ctx, id)        // only PENDING/RETRYING jobs
p.RequeueJob(ctx, id)       // DEAD_LETTERED/CANCELED → PENDING with a fresh retry budget

Observe

stats, _ := p.QueueStats(ctx) // one count per (topic, status) + oldest waiting job
points, _ := p.Throughput(ctx, time.Hour, time.Minute) // finished work per bucket

QueueStats is a single aggregate query — poll it freely for dashboards. Throughput splits each bucket into completed vs dead-lettered counts; zero durations use the server defaults (1h window, 1m buckets). The bundled pulseui dashboard is built entirely on these two calls plus the admin methods above.

On this page