Using Node, Redis, and Kue for Priority Job Processing


12 minute read


Code for tutorial available on Github

Get Your Priorities Straight

Chances are that if you’ve worked on the server side of things for long enough, you’ve probably run into a situation where you need a queue of some sort to handle messaging, processing, or any sort of orderly task execution.

In this tutorial, we’ll be using a certain type of queue — the priority job queue — to help us solve a fairly common problem: payment processing. We’ll be using Kue, which is a “priority job queue backed by redis, built for node.js”. Priority job queues are fairly common, sometimes implemented and known as “worker” processes in some applications.

Kue’s robust and usable API will help us by providing us, among other things:

  • job creation tools
  • interface with Redis, the persistent data store that will back our queue
  • job priority assignment
  • failure attempts and backoff
  • job events, progress, and logs
  • pausing, graceful shutdown, and error-handling
  • and more!

Prerequisites

You’ll need a few things before we can get started:

  • the latest stable version of Node.js, available here or via nvm
  • redis, which you can install via homebrew (brew install redis) or download here
  • npm (which comes bundled with node.js)
  • a text editor, like Atom

Getting Started

First, we’ll need to create a git repository to track our changes. I’ve created one on Github — you can create one wherever you prefer, but I highly recommend using Github.

$ cd whereYouKeepYourCode
$ git clone [email protected]:<your username>/<repository name>.git

Second, we’ll need to scaffold out a basic application structure. We’re going to be using Express as our web framework. To initialize our application, we’ll set up our package.json file and then install express and kue.

$ npm init
# ... you'll be walked through a series of questions to set up your project
$ npm install --save express kue

Now that we’ve installed some tools we’ll need, let’s get our basic express app set up. Covering Express and designing web APIs in depth is a topic for another time, so we’ll stick to a minimal setup — just enough for what we need. Let’s grab the express generator to speed up our setup process:

npm install -g generator-express

Now we can run express -h to see what options we have available to us. Because our API isn’t going to handle serving pages, we can safely skip any extra view engine or stylesheet options and just run express.

If you ls out your project, you should now be seeing a bare-bones express app structure that’s been generated for you. Feel free to remove the /views directory, the "jade": "X.X.X" entry from your package.json, and the following lines from your app.js:

var users = require(‘./routes/users’);

app.set(‘views’, path.join(__dirname, ‘views’));

app.set(‘view engine’, ‘jade’);

app.use(‘/users’, users);

If you prefer to leave them in, that’s fine, too — we just won’t be using them in this tutorial.

Setting Up Our API

Now that we’ve created our basic express structure, we can do some basic API setup. We’re wanting to accomplish efficient, effective payment processing for our customers purchasing products at our store. Since handling payments well is, like Express and API design, a large topic, we’ll deal with a simplified version for the sake of learning.

We’ve been given the following requirements:

  • receive requests from our client app(s) on mobile, web, desktop, or any connected platform
  • requests contain the following data:
    • payment-platform token (we’re not PCI certified yet!)
      • first and last name
      • email
      • product id
      • address
      • order time

Our task is to take the data, interpret it, process the payment, and then communicate a done message to our email receipt system so it can communicate with the user. So, first we need to add a route to our express app:

app.js

const payments = require("./routes/payments");

// I'll omit the other express middelware since it's outside the scope of our tutorial

app.use("/payments", payments);

//...

routes/payments.js

"use strict";

const router = require("express").Router();

router.post("/", (req, res, next) => {
    // our future code will go here
});

module.exports = router;

Let’s get some testing tools installed and make sure we have a failing test to gauge our progress by.

npm install --save-dev supertest tape
mkdir test && cd $_ && touch payments.js

1

Let’s set up a some basic tests for our API using supertest and tape. We won’t dive into supertest and tape, but suffice it to say that they are, respectively, a “super-agent driven library for testing node.js HTTP servers using a fluent API” and an elegantly-simple testing library.

test/payment.js

"use strict";

const supertest = require("supertest");
const app = require("../app");
// Supertest receives an http.Server instance like our express app, spins it up, assigns an ephemeral port, and gives us some basic asserts to use.

const api = supertest(app);

const test = require("tape");

test(
    (t) => {
        api.post("/payments")
            .expect("Content-Type", /json/)
            .expect(200)
            .end((err, res) => {
                if (err) {
                    t.fail();
                    t.end();
                }
                if (res) {
                    t.ok(res.body, "Should respond with a body");
                    t.end();
                }
            });
    },
    {
        timeout: 500,
    }
);

Running node test/*.js or npm test (if you’ve added the node test/*.js command to your package.json) should give you a failing test because our express app doesn’t respond and therefore exceeds the timeout we’ve set.

Let’s add a few more assertions — we want to expect the API to provide the client with a structured response like the following:

{
	received: <Boolean>,
    orderID: <someID>,
    receivedAt: <Date>,
    createdAt: <Date>,
    productID: <String>
    customer: {
    	firstName: <String>,
        lastName: <String>,
        email: <String>,
        address: <String>
    }
}

We can update our tests to reflect these expectations of the response. We’ll also add an overly-simplified dummyOrder fixture to send.

"use strict";

const request = require("supertest");
const app = require("../app");
const api = request(app);

const test = require("tape");

const dummyOrder = {
    // This job property lets you make better use of the kue UI — keep reading for more
    title: "Order #4kSvjL_Qx",
    paymentToken: "4kSvjL_Qx",
    orderID: "1a2b3c4",
    received: true,
    receivedAt: new Date("December 24, 2015 23:59:59"),
    createdAt: new Date("December 24, 2015 23:58:59"),
    productID: "5d6e6f",
    customer: {
        firstName: "A",
        lastName: "Person",
        email: "[email protected]",
        address: "1234 somewhere lane, ? USA 12345",
    },
};

test("Receiving and processing payments", (t) => {
    api.post("/payments")
        .send(dummyOrder)
        .end((err, res) => {
            const order = res.body.order;

            // Check for response body
            t.ok(res.body, "Should respond with a body");

            // Check for response meta properties
            t.equals(
                res.body.success,
                true,
                "The success property should be true"
            );
            t.equals(res.body.error, null, "The error property should be null");
            t.ok(res.body.message, "Should have a message property");

            // Check to see if the order is intact
            t.equals(order.received, true, "Should have been received");
            t.equals(
                order.orderID,
                dummyOrder.orderID,
                "Order ID should be the same"
            );
            t.equals(
                order.paymentToken,
                dummyOrder.paymentToken,
                "Payment token should be the same"
            );
            t.equals(
                order.productID,
                dummyOrder.productID,
                "Product ID should be the same"
            );
            t.end();

            // I ran into some unexpected behavior w/ supertest and tape where tape tests would appear to hang, even after calling t.end()
            process.exit();
        });
});

Setting Up Kue

Now we can get Kue up and running. Let’s first open a new terminal instance and get Redis running:

$ redis-server

You should see some cool output from Redis saying it’s up, running, and available on port 6379 (the default port). Your local Redis databse will be enough for our purposes, but you’ll seriously want to consider production-grade hosted Redis, available from Compose.

Now that Redis is up and running, let’s get started with creating our queue.

$ mkdir queue && cd $_ && touch payments.js

In our payments.js queue file, we’ll use kue to connect to Redis and create our first queue. If you don’t provide any connection configuration, Kue will automatically look for redis at the default, local url. This will break immediately in production, however, so we’ll check to see which environment node is running in first. You can see all of the configuration options availble here.

queue/Payments.js

"use strict";

let redisConfig;
if (process.env.NODE_ENV === "production") {
    redisConfig = {
        redis: {
            port: process.env.REDIS_PORT,
            host: process.env.REDIS_HOST,
            auth: process.env.REDIS_PASS,
        },
    };
} else {
    redisConfig = {};
}

const queue = require("kue").createQueue(redisConfig);

queue.watchStuckJobs(6000);

queue.on("ready", () => {
    // If you need to
    console.info("Queue is ready!");
});

queue.on("error", (err) => {
    // handle connection errors here
    console.error("There was an error in the main queue!");
    console.error(err);
    console.error(err.stack);
});

Another small note is the #watchStuckJobs() call, which helps guard against stuck or stalled jobs. In the upcoming 1.0 release, fully-atomic operations will obviate this, but for now it would be wise to leave in.

Creating Jobs

queue/payments.js

/// ... previous Redis configuration

const queue = require("kue").createQueue(redisConfig);

queue.watchStuckJobs(6000);

queue.on("ready", () => {
    // If you need to
    console.info("Queue is ready!");
});

queue.on("error", (err) => {
    // handle connection errors here
    console.error("There was an error in the main queue!");
    console.error(err);
    console.error(err.stack);
});

function createPayment(data, done) {
    queue
        .create("payment", data)
        .priority("critical")
        .attempts(8)
        .backoff(true)
        .removeOnComplete(false)
        .save((err) => {
            if (err) {
                console.error(err);
                done(err);
            }
            if (!err) {
                done();
            }
        });
}

module.exports = {
    create: (data, done) => {
        createPayment(data, done);
    },
};

Because we want to allow other modules to create jobs for our queue, we can export a function property on our payments.js module and allow people to call the #create(data, done) method. Kue uses a fluent style for many parts of it’s API, which lets us read the #createPayment() function in English as ‘create a payment job with the highest priority, re-attempt the job up to 8 times in the case of failure, use backoff timing, and do not remove the job once complete (so we can see if in the admin user interface)’. There are many more parameters we can tune and use when creating jobs, but these will suffice for now.

Processing Jobs

The API for processing jobs is fairly straightforward — you declaratively indicate which type of job you’re processing and how you want to handle the data. You’re given a callback to work with that has two key pieces of data: a done callback to fire once the job is finished and a job object that has, among other things, the data you need as well as some helper methods.

For example: queue/payments.js

//...
queue.process("payment", (job, done) => {
    // This is the data we sent into the #create() function call earlier
    // We're setting it to a constant here so we can do some guarding against accidental writes
    const data = job.data;
    //... do other stuff with the data.
});
//...

We can also add progress state to our jobs. This can be really useful if you’re running jobs that take a long time or just want to send feedback to your users. You can use job.progress(completed, total [, data]) to send progress messages to your job, which will in turn be available via the JSON API and the user interface.

So, our queue/payments.js file will look like this altogether:

"use strict";

let redisConfig;
if (process.env.NODE_ENV === "production") {
    redisConfig = {
        redis: {
            port: process.env.REDIS_PORT,
            host: process.env.REDIS_HOST,
            auth: process.env.REDIS_PASS,
            options: {
                no_ready_check: false,
            },
        },
    };
} else {
    redisConfig = {};
}

const kue = require("kue");
const queue = kue.createQueue(redisConfig);
queue.watchStuckJobs(1000 * 10);

queue.on("ready", () => {
    console.info("Queue is ready!");
});

queue.on("error", (err) => {
    console.error("There was an error in the main queue!");
    console.error(err);
    console.error(err.stack);
});

function createPayment(data, done) {
    queue
        .create("payment", data)
        .priority("critical")
        .attempts(8)
        .backoff(true)
        .removeOnComplete(false)
        .save((err) => {
            if (err) {
                console.error(err);
                done(err);
            }
            if (!err) {
                done();
            }
        });
}

// Process up to 20 jobs concurrently
queue.process("payment", 20, function (job, done) {
    // other processing work here
    // ...
    // ...

    // Call done when finished
    done();
});

module.exports = {
    create: (data, done) => {
        createPayment(data, done);
    },
};

Setting Up the User Interface

Kue provides a robust API for dealing with priority job queues. However, you might not expect it to also provide a user interface — it tends to be the case that libraries leave how that is implemented up to the developer (and you still can, there’s a JSON API, too). But, it does! The Kue user interface is great for getting a basic look at the status of various jobs and the data involved.

In app.js, we’ll just require kue and use it’s .app property to mount the UI on some express middleware of our choosing.

app.js

const kue = require("kue");
//...
app.use("/queue", kue.app);
app.use("/payments", payments);

Now if you run your app again with node bin/www, you can navigate to the localhost addresss you’re using and see the UI:

kue user interface

Final Tests and Wrap-Up

We have one last task to complete to ensure we’re testing our job queue well — we’ll need to test the actual queue itself. Fortunately, Kue also provides test modes for us to work with. I’ve found this to be a huge help, as working with accessing the right queue in the right environment proved to be much trickier than I had expected.

test/payments.js

const queue = require("kue").createQueue();
// ...
test("Creating payments and processing items with the queue", (t) => {
    // put kue into test mode
    queue.testMode.enter();

    queue.createJob("payment", dummyOrder).save();
    queue.createJob("payment", dummyOrder).save();

    t.equal(queue.testMode.jobs.length, 2, "There should be two jobs");
    t.equal(
        queue.testMode.jobs[0].type,
        "payment",
        "The jobs should be of type payment"
    );
    t.equal(
        queue.testMode.jobs[0].data,
        dummyOrder,
        "The job data should be intact"
    );

    // Clear and exit test mode
    queue.testMode.clear();
    queue.testMode.exit();
    t.end();
});

Fin

We’ve been able to look at using Kue with Redis and node.js to create a simple priority job kue — with tests! I hope you’ve already been able to see some of the benefits of using something like Kue with Redis to handle background job processing, but I wanted to highlight a few more benefits in closing:

  • statelessness: our queue is able to be able to be connected to and ‘live’ irrespective of which or how many instances of our app we’ve spun up. Instance #10,000 is able to connect and defer the same processing as #1.
  • speed: Redis is fast and we have tunable processing concurrency.
  • dedicated resources: Resources for queue management are possessed by the queue, not borrowed from the app instance.
  • Intermediate persistence: Not long-term and not in local memory — right in the sweet spot for our needs.
  • high-availability: The queue is available independent of any or all app instances.
  • prioritization: We allow for a spectrum of job types and needs.
  • graceful degradation: Error handling is deterministic and readily available to our instances.

Be sure to check out Kue on Github and hosted Redis from Compose!



  1. $_ is just a bash variable for “most recent parameter” that lets you make and then immediately switch into a directory [return]

Related: