BunShip uses BullMQ backed by Redis for asynchronous task processing. Background jobs handle email delivery, webhook dispatch, and periodic cleanup tasks in a separate process from the API server.
Architecture
┌──────────────┐ ┌─────────┐ ┌──────────────────┐
│ API Server │───>│ Redis │<───│ Worker Process │
│ (Elysia) │ │ (Queue) │ │ (BullMQ) │
└──────────────┘ └─────────┘ └──────────────────┘
├── Email Worker
├── Webhook Worker
└── Cleanup Worker
The API server enqueues jobs. A separate worker process consumes them. This separation means slow or failing background tasks never block API responses.
Starting the worker
Run the worker as a standalone process:
On startup, the worker:
- Verifies the Redis connection is healthy
- Starts all three worker instances
- Schedules recurring cleanup jobs
- Listens for
SIGTERM and SIGINT for graceful shutdown
// apps/api/src/worker.ts
async function main() {
const redisHealthy = await checkRedisHealth();
if (!redisHealthy) {
console.error("Redis is not reachable. Exiting.");
process.exit(1);
}
const workers = await startWorkers();
await setupRecurringJobs();
process.on("SIGTERM", async () => {
await stopWorkers(workers);
process.exit(0);
});
}
Job Queues
BunShip defines three queues, each with its own configuration:
| Queue | Purpose | Concurrency | Default Retries | Backoff |
|---|
email | Transactional emails via Resend | 5 | 3 | Exponential (2s base) |
webhook | Webhook delivery to external endpoints | 10 | 5 | Exponential (5s base) |
cleanup | Periodic maintenance tasks | 1 | 1 (no retry) | - |
All queues share a common Redis connection:
const connection = new IORedis(process.env.REDIS_URL ?? "redis://localhost:6379", {
maxRetriesPerRequest: null,
enableReadyCheck: false,
});
Job retention
Completed and failed jobs are pruned automatically:
defaultJobOptions: {
removeOnComplete: {
count: 100, // Keep last 100 completed jobs
age: 24 * 3600, // Keep for 24 hours
},
removeOnFail: {
count: 1000, // Keep last 1000 failed jobs
age: 7 * 24 * 3600, // Keep for 7 days
},
}
Built-in Workers
Email Worker
Sends transactional emails through Resend. Supports HTML content, plain text, templates, CC/BCC, and reply-to addresses.
import { addEmailJob } from "./jobs";
await addEmailJob({
to: "[email protected]",
subject: "Welcome to BunShip",
html: "<h1>Welcome!</h1><p>Your account is ready.</p>",
tags: { category: "onboarding" },
});
The worker enforces rate limits matching Resend’s tier:
export const emailWorker = new Worker<EmailJobData>("email", processEmailJob, {
connection,
concurrency: 5,
limiter: {
max: parseInt(process.env.RESEND_RATE_LIMIT || "10", 10),
duration: 1000, // per second
},
});
Email job data:
| Field | Type | Required | Description |
|---|
to | string | string[] | Yes | Recipient email(s) |
from | string | No | Sender address (defaults to app config) |
subject | string | Yes | Email subject line |
html | string | No | HTML body |
text | string | No | Plain text body |
templateId | string | No | Resend template ID |
templateData | object | No | Template variables |
replyTo | string | No | Reply-to address |
cc | string[] | No | CC recipients |
bcc | string[] | No | BCC recipients |
tags | object | No | Email tags for tracking |
Webhook Worker
Delivers webhook events to external endpoints with HMAC-SHA256 signing and automatic retries.
import { addWebhookJob } from "./jobs";
await addWebhookJob({
webhookId: "wh_abc123",
deliveryId: "del_xyz789",
url: "https://example.com/webhooks",
secret: "whsec_...",
event: "member.added",
payload: { memberId: "mem_123", role: "member" },
attempt: 1,
});
Each delivery includes these headers:
| Header | Value |
|---|
Content-Type | application/json |
X-Webhook-Event | Event type |
X-Webhook-Signature | HMAC-SHA256 signature |
X-Webhook-Delivery | Delivery ID |
User-Agent | BunShip-Webhooks/1.0 |
Requests time out after 30 seconds. Non-2xx responses trigger a retry with exponential backoff. After all retries are exhausted, the delivery is marked as permanently failed in the database.
Cleanup Worker
Runs maintenance tasks one at a time (concurrency: 1). Four built-in tasks are supported:
| Task | Description |
|---|
expired-tokens | Deletes sessions expired for 30+ days |
old-audit-logs | Removes audit logs older than the configured threshold |
failed-deliveries | Purges failed webhook deliveries older than 7 days |
temporary-files | Cleans up expired temporary file uploads |
export const cleanupWorker = new Worker<CleanupJobData>("cleanup", processCleanupJob, {
connection,
concurrency: 1,
});
Creating Custom Workers
Add a new worker by following the existing pattern.
Define the job data interface and queue
// apps/api/src/jobs/queue.ts
export interface ReportJobData {
organizationId: string;
reportType: "usage" | "billing" | "audit";
dateRange: { start: string; end: string };
}
export const reportQueue = new Queue<ReportJobData>("report", defaultQueueOptions);
export async function addReportJob(data: ReportJobData) {
return reportQueue.add("generate-report", data);
}
Create the worker
// apps/api/src/jobs/workers/report.worker.ts
import { Worker, Job } from "bullmq";
import IORedis from "ioredis";
import type { ReportJobData } from "../queue";
const connection = new IORedis(
process.env.REDIS_URL ?? "redis://localhost:6379",
{ maxRetriesPerRequest: null, enableReadyCheck: false }
);
async function processReportJob(job: Job<ReportJobData>) {
const { organizationId, reportType, dateRange } = job.data;
console.log(`Generating ${reportType} report for ${organizationId}`);
// Your report generation logic here
// ...
return { success: true, generatedAt: new Date() };
}
export const reportWorker = new Worker<ReportJobData>(
"report",
processReportJob,
{
connection,
concurrency: 2,
}
);
reportWorker.on("completed", (job) => {
console.log(`Report job ${job.id} completed`);
});
reportWorker.on("failed", (job, err) => {
console.error(`Report job ${job?.id} failed:`, err.message);
});
Register the worker in the startup function
// apps/api/src/jobs/index.ts
export { reportWorker } from "./workers/report.worker";
export async function startWorkers() {
const { emailWorker } = await import("./workers/email.worker");
const { webhookWorker } = await import("./workers/webhook.worker");
const { cleanupWorker } = await import("./workers/cleanup.worker");
const { reportWorker } = await import("./workers/report.worker");
return { emailWorker, webhookWorker, cleanupWorker, reportWorker };
}
Enqueue jobs from your API routes
import { addReportJob } from "../jobs/queue";
app.post("/reports", async ({ body, org }) => {
const job = await addReportJob({
organizationId: org.id,
reportType: body.type,
dateRange: body.dateRange,
});
return { jobId: job.id, status: "queued" };
});
Job Scheduling
BunShip schedules recurring jobs using BullMQ’s cron repeat feature. These are configured in setupRecurringJobs:
// apps/api/src/jobs/index.ts
export async function setupRecurringJobs() {
// Clean up expired tokens daily at 2 AM
await addCleanupJob(
{ task: "expired-tokens" },
{
repeat: { cron: "0 2 * * *" },
jobId: "cleanup-expired-tokens",
}
);
// Clean up old audit logs weekly on Sunday at 3 AM
await addCleanupJob(
{ task: "old-audit-logs", daysToKeep: 90 },
{
repeat: { cron: "0 3 * * 0" },
jobId: "cleanup-old-audit-logs",
}
);
// Clean up failed deliveries daily at 4 AM
await addCleanupJob(
{ task: "failed-deliveries", daysToKeep: 7 },
{
repeat: { cron: "0 4 * * *" },
jobId: "cleanup-failed-deliveries",
}
);
// Clean up temporary files every 6 hours
await addCleanupJob(
{ task: "temporary-files" },
{
repeat: { cron: "0 */6 * * *" },
jobId: "cleanup-temporary-files",
}
);
}
The jobId field prevents duplicate recurring jobs when the worker restarts. BullMQ deduplicates by ID, so only one instance of each recurring job exists at a time.
Adding a custom recurring job
await reportQueue.add(
"weekly-summary",
{ organizationId: "all", reportType: "usage", dateRange: { start: "auto", end: "auto" } },
{
repeat: { cron: "0 9 * * 1" }, // Every Monday at 9 AM
jobId: "weekly-usage-summary",
}
);
Graceful Shutdown
The worker process listens for termination signals and waits for in-progress jobs to finish before exiting:
process.on("SIGTERM", async () => {
console.log("SIGTERM received, shutting down workers gracefully...");
await Promise.all([
workers.emailWorker.close(),
workers.webhookWorker.close(),
workers.cleanupWorker.close(),
]);
await closeQueues();
process.exit(0);
});
worker.close() stops accepting new jobs and waits for the currently running job to complete. closeQueues() closes all queue connections and the Redis client.
In Docker or Kubernetes, set the terminationGracePeriodSeconds high enough for your
longest-running job to complete. The default 30 seconds may not be sufficient for large cleanup
tasks.
Monitoring and Debugging
Redis health check
The worker verifies the Redis connection on startup:
export async function checkRedisHealth(): Promise<boolean> {
try {
const pong = await connection.ping();
return pong === "PONG";
} catch (error) {
console.error("Redis health check failed:", error);
return false;
}
}
Worker event logging
Each worker emits events for completed, failed, and error states. These are logged to stdout/stderr by default:
emailWorker.on("completed", (job) => {
console.log(`Email job ${job.id} completed`);
});
emailWorker.on("failed", (job, err) => {
console.error(`Email job ${job?.id} failed:`, err.message);
});
emailWorker.on("error", (err) => {
console.error("Email worker error:", err);
});
BullMQ dashboard
For a visual queue dashboard, consider adding Bull Board:
import { createBullBoard } from "@bull-board/api";
import { BullMQAdapter } from "@bull-board/api/bullMQAdapter";
import { ElysiaAdapter } from "@bull-board/elysia";
const serverAdapter = new ElysiaAdapter("/admin/queues");
createBullBoard({
queues: [
new BullMQAdapter(emailQueue),
new BullMQAdapter(webhookQueue),
new BullMQAdapter(cleanupQueue),
],
serverAdapter,
});
Protect the dashboard route with admin-only middleware in production.
Environment Variables
| Variable | Description | Default |
|---|
REDIS_URL | Redis connection URL | redis://localhost:6379 |
RESEND_API_KEY | Resend API key (required for email worker) | - |
RESEND_RATE_LIMIT | Max emails per second | 10 |