Redis-backed workflow rules engine with pubsub triggers

September 5, 2023

|repo-review

by Florian Narr

Redis-backed workflow rules engine with pubsub triggers

redis-workflow turns Redis pubsub into a configurable trigger-condition-action rules engine. You define workflows — a trigger name, a set of rules, a set of actions — attach them to a channel, and whenever a matching event arrives the engine evaluates the rules and emits the qualifying actions.

Why I starred it

Most apps hardcode their business rules. "When a new order arrives and stock > 0, ship the product." That logic lives in a handler function, deployed with the app. Changing a threshold means a code change, a PR, a deploy. redis-workflow separates the rule definitions from the handler code. The workflows are stored in Redis and loaded at startup. The handler just reacts to events.

The approach maps directly to the classic BPM model — trigger, condition, action — but with zero infrastructure beyond Redis you already have.

How it works

The main class is RedisWorkflowManager in src/index.ts, which extends EventEmitter. It maintains a workflows hashmap keyed by channel name. Each channel holds an array of IWorkflow objects.

When you call manager.start("myChannel"), the manager subscribes a Redis subscriber client to that channel and attaches a message listener. Incoming messages get parsed as JSON with the shape {event, context}. The engine then looks up the matching workflow via getTriggersAsDictForChannel() — a flat dictionary built once at start time mapping trigger names to workflows.

The interesting piece is rule evaluation. Each Workflow holds an instance of mozjexl.Jexl — a JavaScript expression language evaluator. getActionsForContext() in Workflow.ts runs every rule expression against the incoming context using Promise.all:

// src/lib/Workflow.ts
this.rules.map((rule) => {
    if (rule && rule.getExpression()) {
        rulesJobs.push(this.evaluator.eval(rule.getExpression(), context));
    }
});

Promise.all(rulesJobs).then((values) => {
    values.map((check: boolean) => {
        if (check !== true) { isValid = false; }
    });

    if (isValid && this.actions && this.actions.length > 0) {
        actionsToFire = this.actions;
    }
    resolve(actionsToFire);
});

Rules are all-or-nothing: if any single expression returns false, no actions fire and the manager emits a WorkflowEvents.Invalid event instead. There's no partial match — all conditions must pass.

Actions come in two flavors: ImmediateAction and DelayedAction. The delayed variant stores the interval in milliseconds and exposes a fluent builder API in src/lib/DelayedAction.ts:

const action = new DelayedAction("shipProduct").delay(5, "days").repeat(3);

The delay() method supports string aliases for time units ("s", "second", "seconds", "d", "day", "month", etc.) and converts them to milliseconds internally. repeat(0) means repeat indefinitely. The library doesn't schedule the delay for you — it emits the action on WorkflowEvents.Schedule and expects your application to wire it to whatever scheduler you use (cron, Bull, etc.).

Workflow persistence uses Redis sets + string keys. Each workflow is serialized to JSON via toDict() and stored under a key of channel:hash(workflowName). A Redis set at channel:workflows tracks all the keys for that channel, making SMEMBERS the index for loading all workflows on restart.

The key hash in src/lib/Util.ts is a simple djb2-style hash:

hash = ((hash << 5) - hash) + code;
hash &= hash; // Int32
// ...
return (hash >>> 0); // uInt32

It converts the signed int32 to uint32 at the end. Nothing fancy, but it avoids string-based keys that would be expensive to scan.

Stopping a channel is also interesting: stop() publishes a literal "WFKILL" string to the channel. The subscriber catches it, unsubscribes, and emits WorkflowEvents.Kill. Self-termination via the same pubsub channel — no out-of-band signaling needed.

Using it

import * as flow from "redis-workflow";

const config = new flow.RedisConfig("localhost", 6379, null, null);
const manager = new flow.RedisWorkflowManager(config);

const workflow = new flow.Workflow(
    "orderWorkflow",
    new flow.Trigger("order.placed"),
    [new flow.Rule("In stock", "inStock > 0")],
    [
        new flow.ImmediateAction("adjustInventory"),
        new flow.DelayedAction("shipProduct").delay(1, "days"),
    ]
);

manager.setWorkflows({ orders: [workflow] });

manager.on(flow.WorkflowEvents.Immediate, (action) => {
    // handle inventory update
});

manager.on(flow.WorkflowEvents.Schedule, (action) => {
    // pass to your job queue
});

manager.on(flow.WorkflowEvents.Invalid, (msg) => {
    // rules didn't pass — handle the miss
});

await manager.start("orders");

The WorkflowEvents enum covers the full lifecycle — Error, Add, Remove, Load, Save, Start, Stop, Kill, Schedule, Immediate, Invalid, Audit. You can subscribe to any of them. The Audit event fires for every action regardless of type, which makes it straightforward to stream all workflow activity to a log or analytics pipeline.

Rough edges

The reset() method in RedisWorkflowManager has a // TODO: comment where the database cleanup should be. Calling it clears in-memory workflows but doesn't remove anything from Redis. On the next reload those workflows come back.

The expression evaluator (mozjexl) is instantiated per-workflow in the Workflow constructor. If you load 500 workflows, you get 500 Jexl instances. The README acknowledges this as a // TODO: consider injecting this dependency as shared.

There are tests in src/__tests__/index.spec.ts but they require a live Redis instance — no mocking. That makes CI setup a bit more involved. The test file also uses jest.setTimeout(5000) globally, suggesting some pubsub timing is flaky.

The library targets redis v2, which uses the old callback API. The current redis package is v4+ with Promises and a completely different client interface. This is a breaking compatibility issue if you're starting a new project.

Last commit: "Version and build update" — no recent activity. It's been idle for years.

Bottom line

If you're on an older Node stack already using redis v2 and want to extract conditional business logic into Redis without adding a heavy workflow engine, this is a clean, well-typed foundation. For a greenfield project, the outdated Redis client dependency is a blocker you'd have to fork around.

mikesparr/redis-workflow on GitHub
mikesparr/redis-workflow