Skip to content

Implementing a Task Scheduler in Node Using Redis

Node.js and Redis are often used together to build scalable and high-performing applications. Although Redis has always been primarily an in-memory data store that allows for fast and efficient data access, over time, it has gained many useful features, and nowadays it can be used for things like rate limiting, session management, or queuing. With its excellent support for sorted sets, one feature to be added to that list can also be task scheduling.

Node.js doesn't have support for any kind of task scheduling other than the built-in setInterval() and setTimeout() functions, which are quite simple and don't have task queuing mechanisms. There are third-party packages like node-schedule and node-cron of course. But what if you wanted to understand how this could work under the hood? This blog post will show you how to build your own scheduler from scratch.

Redis Sorted Sets

Redis has a structure called sorted sets, a powerful data structure that allows developers to store data that is both ordered and unique, which is useful in many different use cases such as ranking, scoring, and sorting data. Since their introduction in 2009, sorted sets have become one of the most widely used and powerful data structures in Redis.

To add some data to a sorted set, you would need to use the ZADD command, which accepts three parameters: the name of the sorted set, the name of the member, and the score to associate with that member. When having multiple members, each with its own score, Redis will sort them by score. This is incredibly useful for implementing leaderboard-like lists. In our case, if we use a timestamp as a score, this means that we can order sorted set members by date, effectively implementing a queue where members with the most recent timestamp are at the top of the list.

If the member name is a task identifier, and the timestamp is the time at which we want the task to be executed, then implementing a scheduler would mean reading the sorted list, and just grabbing whatever task we find at the top!

The Algorithm

Now that we understand the capabilities of Redis sorted sets, we can draft out a rough algorithm that will be implemented by our Node scheduler.

Scheduling Tasks

The scheduling task piece would include adding a task to the sorted set, and adding task data to the global set using the task identifier as the key. The steps are as follows:

  1. Generate an identifier for the submitted task using the INCR command. This command will get the next integer sequence each time it's called.
  2. Use the SET command to set task data in the global set. The SET command accepts a key and a string. The key must be unique, therefore it can be something like task:${taskId}, while the value can be a JSON representation of the task data.
  3. Use the ZADD command to add the task identifier and the timestamp to the sorted set. The name of the sorted set can be something simple like sortedTasks, while the set member can be the task identifier and the score is the timestamp.

Processing Tasks

The processing part is an endless loop that checks if there are any tasks to process, otherwise it waits for a predefined interval before trying again. The algorithm can be as follows:

  1. Check if we are still allowed to run. We need a way to stop the loop if we want to stop the scheduler. This can be a simple boolean flag in the code.

  2. Use the ZRANGE command to get the first task in the list. ZRANGE accepts several useful arguments, such as the score range (the timestamp interval, in our case), and the offset/limit. If we provide it with the following arguments, we will get the first next task we need to execute.

    • Minimal score: 0 (the beginning of time)
    • Maximum score: current timestamp
    • Offset: 0
    • Count: 1
  3. If there is a task found:

    3.1 Get the task data by executing the GET command on the task:${taskId} key.

    3.2 Deserialize the data and call the task handler.

    3.3 Remove the task data using the DEL command on the task:${taskId} key.

    3.4 Remove the task identifier from the sorted set by calling ZREM on the sortedSets key.

    3.5 Go back to point 2 to get the next task.

  4. If there is no task found, wait for a predefined number of seconds before trying again.

The Code

Now to the code. We will have two objects to work with. The first one is RedisApi and this is simply a façade over the Redis client. For the Redis client, we chose to use ioredis, a popular Redis library for Node.

const Redis = require('ioredis');

function RedisApi(host, port) {
  const redis = new Redis(port, host, { maxRetriesPerRequest: 3 });

  return {
    getFirstInSortedSet: async (sortedSetKey) => {
      const results = await redis.zrange(
        sortedSetKey,
        0,
        new Date().getTime(),
        'BYSCORE',
        'LIMIT',
        0,
        1
      );

      return results?.length ? results[0] : null;
    },
    addToSortedSet: (sortedSetKey, member, score) => {
      return redis.zadd(sortedSetKey, score, member);
    },
    removeFromSortedSet: (sortedSetKey, member) => {
      return redis.zrem(sortedSetKey, member);
    },
    increaseCounter: (counterKey) => {
      return redis.incr(counterKey);
    },
    setString: (stringKey, value) => {
      return redis.set(stringKey, value, 'GET');
    },
    getString: (stringKey) => {
      return redis.get(stringKey);
    },
    removeString: (stringKey) => {
      return redis.del(stringKey);
    },
    isConnected: async () => {
      try {
        // Just get some dummy key to see if we are connected
        await redis.get('dummy');
        return true;
      } catch (e) {
        return false;
      }
    },
  };
}

The RedisApi function returns an object that has all the Redis operations that we mentioned previously, with the addition of isConnected, which we will use to check if the Redis connection is working.

The other object is the Scheduler object and has three functions:

  • start() to start the task processing
  • stop() to stop the task processing
  • schedule() to submit new tasks

The start() and schedule() functions contain the bulk of the algorithm we wrote above. The schedule() function adds a new task to Redis, while the start() function creates a findNextTask() function internally, which it schedules recursively while the scheduler is running. When creating a new Scheduler object, you need to provide the Redis connection details, a polling interval, and a task handler function. The task handler function will be provided with task data.

function Scheduler(
  pollingIntervalInSec,
  taskHandler,
  redisHost,
  redisPort = 6379
) {
  const redisApi = new RedisApi(redisPort, redisHost);

  let isRunning = false;

  return {
    schedule: async (data, timestamp) => {
      const taskId = await redisApi.increaseCounter('taskCounter');
      console.log(
        `Scheduled new task with ID ${taskId} and timestamp ${timestamp}`,
        data
      );
      await redisApi.setString(`task:${taskId}`, JSON.stringify(data));
      await redisApi.addToSortedSet('sortedTasks', taskId, timestamp);
    },
    start: async () => {
      console.log('Started scheduler');
      isRunning = true;

      const findNextTask = async () => {
        const isRedisConnected = await redisApi.isConnected();
        if (isRunning && isRedisConnected) {
          console.log('Polling for new tasks');

          let taskId;
          do {
            taskId = await redisApi.getFirstInSortedSet('sortedTasks');

            if (taskId) {
              console.log(`Found task ${taskId}`);
              const taskData = await redisApi.getString(`task:${taskId}`);
              try {
                console.log(`Passing data for task ${taskId}`, taskData);
                taskHandler(JSON.parse(taskData));
              } catch (err) {
                console.error(err);
              }
              redisApi.removeString(`task:${taskId}`);
              redisApi.removeFromSortedSet('sortedTasks', taskId);
            }
          } while (taskId);

          setTimeout(findNextTask, pollingIntervalInSec * 1000);
        }
      };

      findNextTask();
    },
    stop: () => {
      isRunning = false;
      console.log('Stopped scheduler');
    },
  };
}

That's it! Now, when you run the scheduler and submit a simple task, you should see an output like below:

const scheduler = new Scheduler(
  5,
  (taskData) => {
    console.log('Handled task', taskData);
  },
  'localhost'
);
scheduler.start();

// Submit a task to execute 10 seconds later
scheduler.schedule({ name: 'Test data' }, new Date().getTime() + 10000);
Started scheduler
Scheduled new task with ID 4 and timestamp 1679677571675 { name: 'Test data' }
Polling for new tasks
Polling for new tasks
Polling for new tasks
Found task 4 with timestamp 1679677571675
Passing data for task 4 {"name":"Test data"}
Handled task { name: 'Test data' }
Polling for new tasks

Conclusion

Redis sorted sets are an amazing tool, and Redis provides you with some really useful commands to query or update sorted sets with ease. Hopefully, this blog post was an inspiration for you to consider using sorted sets in your applications. Feel free to use StackBlitz to view this project online and play with it some more.