Using the SDK
go get github.com/bete7512/pulseOne *pulse.Client plays both roles — producer and worker — over a single connection.
Connect
p, err := pulse.New("localhost:50051",
pulse.WithConcurrency(20), // parallel handlers (default 10)
pulse.WithUserPass("worker", "hunter2"), // when the server sets PULSE_AUTH_USERS
pulse.WithTLS(nil), // system roots; pass *tls.Config to customize
)
defer p.Close()Handle jobs
Handlers are plain typed functions; the SDK decodes the JSON payload:
type EmailArgs struct {
To string `json:"to"`
Subject string `json:"subject"`
}
pulse.Register(p, "send-email", func(ctx context.Context, a EmailArgs) error {
return sendEmail(a.To, a.Subject) // error → retry with backoff, then dead-letter
})
p.Run(ctx) // blocks; processes jobs until ctx is cancelledDelivery is at-least-once: a handler can run more than once for the same job (crash recovery, network partitions). Key side effects on the job's identity:
pulse.Register(p, "charge", func(ctx context.Context, a ChargeArgs) error {
job := pulse.FromContext(ctx) // ID and Attempt — the idempotency key
return charge(a, fmt.Sprintf("%s-%d", job.ID, job.Attempt))
})While a handler runs, the SDK heartbeats every ~10s so long jobs aren't reclaimed; if the process dies, the server's watchdog re-dispatches the job to another worker.
Enqueue
id, err := pulse.Enqueue(ctx, p, "send-email", EmailArgs{To: "a@b.com", Subject: "Welcome"})
// urgent work jumps the queue (default priority 0, FIFO within a priority):
pulse.Enqueue(ctx, p, "send-email", EmailArgs{To: "vip@b.com"}, pulse.WithPriority(10))
// raw escape hatch for dynamic payloads (must be valid JSON or empty):
p.Submit(ctx, "send-email", []byte(`{"to":"a@b.com"}`))Failed jobs retry with attempts² backoff (1s, 4s) and dead-letter after 3 attempts with
the last error preserved — inspect them with pulse jobs list --status dead_lettered.
Schedule
p.Schedule(ctx, "reminder", payload, pulse.At(tomorrow9am)) // once, at a time
p.Schedule(ctx, "reminder", payload, pulse.After(10*time.Minute)) // once, after a delay
p.Schedule(ctx, "reconcile", payload, pulse.Every(5*time.Minute)) // fixed interval
p.Schedule(ctx, "rollup", payload, pulse.Cron("0 * * * *")) // 5-field cron
// typed mirror of Enqueue:
pulse.ScheduleJob(ctx, p, "report", ReportArgs{Name: "daily"}, pulse.Cron("0 6 * * *"))
p.PauseSchedule(ctx, id); p.ResumeSchedule(ctx, id); p.DeleteSchedule(ctx, id)
p.ListScheduleJobs(ctx, id) // the jobs a schedule spawned, with status
p.ListScheduleFires(ctx, id) // when it fired → which jobFires are exactly-once per occurrence, safe across server restarts and replicas. A
scheduled job arrives at the same Registered handler as any other job.
Operate
p.PauseDispatch(ctx, "db maintenance") // workers stop receiving NEW jobs; running ones finish
st, _ := p.DispatchStatus(ctx) // st.Paused, st.Reason, st.PausedAt
p.ResumeDispatch(ctx) // backlog drains, priority-ordered
jobs, _ := p.ListJobs(ctx, pulse.JobQuery{Status: "DEAD_LETTERED", Topic: "report", Limit: 50})
job, _ := p.GetJob(ctx, id) // full state: status, attempts, payload, timestamps, last error
p.CancelJob(ctx, id) // only PENDING/RETRYING jobs
p.RequeueJob(ctx, id) // DEAD_LETTERED/CANCELED → PENDING with a fresh retry budgetObserve
stats, _ := p.QueueStats(ctx) // one count per (topic, status) + oldest waiting job
points, _ := p.Throughput(ctx, time.Hour, time.Minute) // finished work per bucketQueueStats is a single aggregate query — poll it freely for dashboards.
Throughput splits each bucket into completed vs dead-lettered counts; zero durations
use the server defaults (1h window, 1m buckets). The bundled pulseui dashboard is
built entirely on these two calls plus the admin methods above.