Skip to main content
BunShip uses BullMQ backed by Redis for asynchronous task processing. Background jobs handle email delivery, webhook dispatch, and periodic cleanup tasks in a separate process from the API server.

Architecture

┌──────────────┐    ┌─────────┐    ┌──────────────────┐
│   API Server │───>│  Redis   │<───│  Worker Process  │
│  (Elysia)    │    │  (Queue) │    │  (BullMQ)        │
└──────────────┘    └─────────┘    └──────────────────┘
                                     ├── Email Worker
                                     ├── Webhook Worker
                                     └── Cleanup Worker
The API server enqueues jobs. A separate worker process consumes them. This separation means slow or failing background tasks never block API responses.

Starting the worker

Run the worker as a standalone process:
bun run src/worker.ts
On startup, the worker:
  1. Verifies the Redis connection is healthy
  2. Starts all three worker instances
  3. Schedules recurring cleanup jobs
  4. Listens for SIGTERM and SIGINT for graceful shutdown
// apps/api/src/worker.ts
async function main() {
  const redisHealthy = await checkRedisHealth();
  if (!redisHealthy) {
    console.error("Redis is not reachable. Exiting.");
    process.exit(1);
  }

  const workers = await startWorkers();
  await setupRecurringJobs();

  process.on("SIGTERM", async () => {
    await stopWorkers(workers);
    process.exit(0);
  });
}

Job Queues

BunShip defines three queues, each with its own configuration:
QueuePurposeConcurrencyDefault RetriesBackoff
emailTransactional emails via Resend53Exponential (2s base)
webhookWebhook delivery to external endpoints105Exponential (5s base)
cleanupPeriodic maintenance tasks11 (no retry)-
All queues share a common Redis connection:
const connection = new IORedis(process.env.REDIS_URL ?? "redis://localhost:6379", {
  maxRetriesPerRequest: null,
  enableReadyCheck: false,
});

Job retention

Completed and failed jobs are pruned automatically:
defaultJobOptions: {
  removeOnComplete: {
    count: 100,        // Keep last 100 completed jobs
    age: 24 * 3600,    // Keep for 24 hours
  },
  removeOnFail: {
    count: 1000,       // Keep last 1000 failed jobs
    age: 7 * 24 * 3600, // Keep for 7 days
  },
}

Built-in Workers

Email Worker

Sends transactional emails through Resend. Supports HTML content, plain text, templates, CC/BCC, and reply-to addresses.
import { addEmailJob } from "./jobs";

await addEmailJob({
  to: "[email protected]",
  subject: "Welcome to BunShip",
  html: "<h1>Welcome!</h1><p>Your account is ready.</p>",
  tags: { category: "onboarding" },
});
The worker enforces rate limits matching Resend’s tier:
export const emailWorker = new Worker<EmailJobData>("email", processEmailJob, {
  connection,
  concurrency: 5,
  limiter: {
    max: parseInt(process.env.RESEND_RATE_LIMIT || "10", 10),
    duration: 1000, // per second
  },
});
Email job data:
FieldTypeRequiredDescription
tostring | string[]YesRecipient email(s)
fromstringNoSender address (defaults to app config)
subjectstringYesEmail subject line
htmlstringNoHTML body
textstringNoPlain text body
templateIdstringNoResend template ID
templateDataobjectNoTemplate variables
replyTostringNoReply-to address
ccstring[]NoCC recipients
bccstring[]NoBCC recipients
tagsobjectNoEmail tags for tracking

Webhook Worker

Delivers webhook events to external endpoints with HMAC-SHA256 signing and automatic retries.
import { addWebhookJob } from "./jobs";

await addWebhookJob({
  webhookId: "wh_abc123",
  deliveryId: "del_xyz789",
  url: "https://example.com/webhooks",
  secret: "whsec_...",
  event: "member.added",
  payload: { memberId: "mem_123", role: "member" },
  attempt: 1,
});
Each delivery includes these headers:
HeaderValue
Content-Typeapplication/json
X-Webhook-EventEvent type
X-Webhook-SignatureHMAC-SHA256 signature
X-Webhook-DeliveryDelivery ID
User-AgentBunShip-Webhooks/1.0
Requests time out after 30 seconds. Non-2xx responses trigger a retry with exponential backoff. After all retries are exhausted, the delivery is marked as permanently failed in the database.

Cleanup Worker

Runs maintenance tasks one at a time (concurrency: 1). Four built-in tasks are supported:
TaskDescription
expired-tokensDeletes sessions expired for 30+ days
old-audit-logsRemoves audit logs older than the configured threshold
failed-deliveriesPurges failed webhook deliveries older than 7 days
temporary-filesCleans up expired temporary file uploads
export const cleanupWorker = new Worker<CleanupJobData>("cleanup", processCleanupJob, {
  connection,
  concurrency: 1,
});

Creating Custom Workers

Add a new worker by following the existing pattern.
1

Define the job data interface and queue

// apps/api/src/jobs/queue.ts
export interface ReportJobData {
  organizationId: string;
  reportType: "usage" | "billing" | "audit";
  dateRange: { start: string; end: string };
}

export const reportQueue = new Queue<ReportJobData>("report", defaultQueueOptions);

export async function addReportJob(data: ReportJobData) {
  return reportQueue.add("generate-report", data);
}
2

Create the worker

// apps/api/src/jobs/workers/report.worker.ts
import { Worker, Job } from "bullmq";
import IORedis from "ioredis";
import type { ReportJobData } from "../queue";

const connection = new IORedis(
  process.env.REDIS_URL ?? "redis://localhost:6379",
  { maxRetriesPerRequest: null, enableReadyCheck: false }
);

async function processReportJob(job: Job<ReportJobData>) {
  const { organizationId, reportType, dateRange } = job.data;
  console.log(`Generating ${reportType} report for ${organizationId}`);

  // Your report generation logic here
  // ...

  return { success: true, generatedAt: new Date() };
}

export const reportWorker = new Worker<ReportJobData>(
  "report",
  processReportJob,
  {
    connection,
    concurrency: 2,
  }
);

reportWorker.on("completed", (job) => {
  console.log(`Report job ${job.id} completed`);
});

reportWorker.on("failed", (job, err) => {
  console.error(`Report job ${job?.id} failed:`, err.message);
});
3

Register the worker in the startup function

// apps/api/src/jobs/index.ts
export { reportWorker } from "./workers/report.worker";

export async function startWorkers() {
  const { emailWorker } = await import("./workers/email.worker");
  const { webhookWorker } = await import("./workers/webhook.worker");
  const { cleanupWorker } = await import("./workers/cleanup.worker");
  const { reportWorker } = await import("./workers/report.worker");

  return { emailWorker, webhookWorker, cleanupWorker, reportWorker };
}
4

Enqueue jobs from your API routes

import { addReportJob } from "../jobs/queue";

app.post("/reports", async ({ body, org }) => {
  const job = await addReportJob({
    organizationId: org.id,
    reportType: body.type,
    dateRange: body.dateRange,
  });

  return { jobId: job.id, status: "queued" };
});

Job Scheduling

BunShip schedules recurring jobs using BullMQ’s cron repeat feature. These are configured in setupRecurringJobs:
// apps/api/src/jobs/index.ts
export async function setupRecurringJobs() {
  // Clean up expired tokens daily at 2 AM
  await addCleanupJob(
    { task: "expired-tokens" },
    {
      repeat: { cron: "0 2 * * *" },
      jobId: "cleanup-expired-tokens",
    }
  );

  // Clean up old audit logs weekly on Sunday at 3 AM
  await addCleanupJob(
    { task: "old-audit-logs", daysToKeep: 90 },
    {
      repeat: { cron: "0 3 * * 0" },
      jobId: "cleanup-old-audit-logs",
    }
  );

  // Clean up failed deliveries daily at 4 AM
  await addCleanupJob(
    { task: "failed-deliveries", daysToKeep: 7 },
    {
      repeat: { cron: "0 4 * * *" },
      jobId: "cleanup-failed-deliveries",
    }
  );

  // Clean up temporary files every 6 hours
  await addCleanupJob(
    { task: "temporary-files" },
    {
      repeat: { cron: "0 */6 * * *" },
      jobId: "cleanup-temporary-files",
    }
  );
}
The jobId field prevents duplicate recurring jobs when the worker restarts. BullMQ deduplicates by ID, so only one instance of each recurring job exists at a time.

Adding a custom recurring job

await reportQueue.add(
  "weekly-summary",
  { organizationId: "all", reportType: "usage", dateRange: { start: "auto", end: "auto" } },
  {
    repeat: { cron: "0 9 * * 1" }, // Every Monday at 9 AM
    jobId: "weekly-usage-summary",
  }
);

Graceful Shutdown

The worker process listens for termination signals and waits for in-progress jobs to finish before exiting:
process.on("SIGTERM", async () => {
  console.log("SIGTERM received, shutting down workers gracefully...");
  await Promise.all([
    workers.emailWorker.close(),
    workers.webhookWorker.close(),
    workers.cleanupWorker.close(),
  ]);
  await closeQueues();
  process.exit(0);
});
worker.close() stops accepting new jobs and waits for the currently running job to complete. closeQueues() closes all queue connections and the Redis client.
In Docker or Kubernetes, set the terminationGracePeriodSeconds high enough for your longest-running job to complete. The default 30 seconds may not be sufficient for large cleanup tasks.

Monitoring and Debugging

Redis health check

The worker verifies the Redis connection on startup:
export async function checkRedisHealth(): Promise<boolean> {
  try {
    const pong = await connection.ping();
    return pong === "PONG";
  } catch (error) {
    console.error("Redis health check failed:", error);
    return false;
  }
}

Worker event logging

Each worker emits events for completed, failed, and error states. These are logged to stdout/stderr by default:
emailWorker.on("completed", (job) => {
  console.log(`Email job ${job.id} completed`);
});

emailWorker.on("failed", (job, err) => {
  console.error(`Email job ${job?.id} failed:`, err.message);
});

emailWorker.on("error", (err) => {
  console.error("Email worker error:", err);
});

BullMQ dashboard

For a visual queue dashboard, consider adding Bull Board:
import { createBullBoard } from "@bull-board/api";
import { BullMQAdapter } from "@bull-board/api/bullMQAdapter";
import { ElysiaAdapter } from "@bull-board/elysia";

const serverAdapter = new ElysiaAdapter("/admin/queues");

createBullBoard({
  queues: [
    new BullMQAdapter(emailQueue),
    new BullMQAdapter(webhookQueue),
    new BullMQAdapter(cleanupQueue),
  ],
  serverAdapter,
});
Protect the dashboard route with admin-only middleware in production.

Environment Variables

VariableDescriptionDefault
REDIS_URLRedis connection URLredis://localhost:6379
RESEND_API_KEYResend API key (required for email worker)-
RESEND_RATE_LIMITMax emails per second10