Tuesday, August 19, 2014

Synchronization Constructs with Node + Redis

Summary

In this article I'll be describing how to create the following basic synchronization objects using a combination of Node and Redis:  semaphores, mutex locks, condition variables, and by a combination of locks and conditions - monitors.  I'll also include implementations of these objects in two classic computer science problems in concurrency.

Implementation of the Synchronization Objects

Given Node is a single-threaded architecture, the question arises - Why would I ever need to worry about concurrent programming problems like critical sections, mutual exclusion, synchronization, etc? Well, even if your code only involves one process, the asynchronous nature of Node can lead to situations where a shared object is being accessed in a non-coherent manner, i.e. race conditions.

There are no shared variables/memory in Node applications.  That includes even those apps with multiple processes utilizing the 'cluster' feature of Node.  That leads to use of objects external to Node to implement mutual exclusion and synchronization.  For this exercise, I used Redis.

Figure 1 depicts the overall approach I used.


Figure 1
Figure 2 depicts an overview of the Javascript object structure.

Figure 2
Synchronization Object Code Snippets

SyncConstruct - constructor
The constructor for this object creates Redis clients for command execution and subscription.  A callback map is also allocated to save callbacks for a waiting process.  A waiting process is 'awakened' when its Redis subscriber receives a message indicating it can proceed.  Its callback is fetched from the map and executed.

function syncConstruct(key, func)
{
    logger.debug('Entering - File: syncConstruct.js, Method: syncConstruct, key:%s', key);
    if (key && func)
    {
        var self = this;
        self.key = key;
        self.cbMap = {};  //object map containing the callback functions of processes waiting to execute
        self.client = redis.createClient(properties.redisServer.port, properties.redisServer.host);  //redis command client
        self.subscriber = redis.createClient(properties.redisServer.port, properties.redisServer.host);  //redis subscription client
        var channel = self.key + ':' + os.hostname() + ':' + process.pid;
        
        self.subscriber.subscribe(channel);
        
        
        /*
         * When a process that was queued due to synchronization delay, it is 'signal'ed to resume via a redis publication.  The
         * 'on message' event triggers execution of the delayed process's callback function
         */
        self.subscriber.on('message', function(channel, msg){
            logger.debug('Entering - File: syncConstruct.js, Method: on message, channel: %s, pid: %d', channel, process.pid);
            var cbFunc = self.cbMap.channel;
            delete self.cbMap.channel;
            cbFunc();
            logger.debug('Exiting - File: syncConstruct.js, Method: on message channel: %s, pid: %d', channel, process.pid);
        });
        
        /*
         * Constructor callback is invoked upon receipt of the 'subscribe' event.  
         */
        self.subscriber.on('subscribe', function(channel, count){
            logger.debug('Entering - File: syncConstruct.js, Method: on subscribe, channel: %s, pid: %d', channel, process.pid);
            func();
            logger.debug('Exiting - File: syncConstruct.js, Method: on subscribe, channel: %s, pid: %d', channel, process.pid);
        });
        
    }
    logger.debug('Exiting - File: syncConstruct.js, Method: syncConstruct');
};


SyncConstruct - enter
This provides the base method for critical section entry used by all the inherited objects (semaphore, etc).  This method executes the particular Redis Lua Script for that inherited object.  The process is either allowed to continue or is delayed by placing its callback on the wait queue (implemented as a Redis List object).

syncConstruct.prototype.enter = function(luaScript, callback1, callback2)
{
    
    var self = this;
    var channel = self.key + ':' + os.hostname() + ':' + process.pid;
    logger.debug('Entering - File: syncConstruct.js, Method: enter, channel: %s', channel);
    self.cbMap.channel = callback1;
   
    self.client.eval(luaScript, 2, self.key, self.key + ':WaitQueue', channel, function (err, res){
        if (err)
        {
            throw err;
        }
            
        if (res > 0)  //will never be true for a condition variable wait method call
        {
            logger.debug('File: syncConstruct.js, Method: enter, passed eval script, res: %s, channel: %s, pid: %d', 
                    res, channel, process.pid);
            var func = self.cbMap.channel;
            
            if (func) 
            {
                delete self.cbMap.channel;
                func();
            }
        }
        
        if (callback2)  //covers the case of releasing a monitor lock after a process has been put in the wait queue
            callback2();
    });
    logger.debug('Exiting - File: syncConstruct.js, Method: enter, channel: %s', channel);
};


SyncConstruct - exit
Base method for critical section exit.  It evaluates a Redis Lua script passed as a parameter.  For all inherited objects, that script ensure 'fairness' by choosing an already waiting process as next in queue.  If there are no processes in the wait queue, the synchronization object (Redis key) is updated to reflect the critical section is available.

syncConstruct.prototype.exit = function(luaScript, callback)
{
    var self = this;
    logger.debug('Entering - File: syncConstruct.js, Method: exit, self.key: %s, queue: %s', self.key, self.key+':WaitQueue');
    self.client.eval(luaScript, 2, self.key, self.key + ':WaitQueue', function (err, res){
        if (err)
        {
            throw err;
        }
      
        if (res > 0)  
        {
            if (callback)
                callback();
        }
        else  //covers the case that a message is published but no process received it (process died for example)
        {
            logger.error('***File: syncConstruct.js, Method: exit, no process received published message to %s', self.key+':WaitQueue');
            self.exit(luaScript, callback);
        }
    });
    logger.debug('Exiting - File: syncConstruct.js, Method: exit');
};


Semaphore - script to implement the "P" function
This Lua script is called by an Redis eval method.  The script checks the value of a Redis key.  If that key is greater than 0, it decrements the key and returns the key's previous value (something > 0).  If the key is 0, the process's id is pushed on to a Redis list (wait queue) and 0 is returned.

The decision to allow the process to proceed is based on the return value.  Greater than 0, the process proceeds.  Less than or equal 0, the process delays.

var pScript =  'local s = redis.call(\'get\', KEYS[1])\
                if (s and s + 0 > 0) then \
                    redis.call(\'decr\', KEYS[1]) \
                    return s \
                else \
                    redis.call(\'lpush\',KEYS[2], ARGV[1]) \
                    return 0 \
                end';



Semaphore - script to implement the "V" function
This Lua script checks the Redis list representing the wait queue by popping off the rightmost (oldest) member.  If a process was on that queue, a message is published to its specific channel (which is identified by a concatenation of the key name, machine name, and process id.  If nothing was on the wait queue, the Redis key is simply incremented.

var vScript =  'local pId = redis.call(\'rpop\', KEYS[2]) \
                if (pId) then \
                    return redis.call(\'publish\', pId, \'V\') \
                else \
                    return redis.call(\'incr\', KEYS[1]) \
                end';


Lock - script to implement the "acquire" function
This script fetches the Redis key associated with the lock and checks if it is greater than 0.  If so, the key is decremented and the previous value returned (value greater than 0 which allows the process to proceed).  If not, the process is pushed onto the FIFO wait queue.

var acquireScript =  'local s = redis.call(\'get\', KEYS[1]) \
                      if (s and s + 0 > 0) then \
                         redis.call(\'decr\', KEYS[1]) \
                         return s \
                      else \
                         redis.call(\'lpush\',KEYS[2], ARGV[1]) \
                         return 0 \
                      end';

Lock - script to implement the "release" function
This script checks the wait queue for a waiting process.  If one exists, that process is awakened via a published message.  If not, the Redis key associated with the lock is incremented to at most a value of 1.
var releaseScript =  'local pId = redis.call(\'rpop\', KEYS[2]) \
                     if (pId) then \
                        return redis.call(\'publish\', pId, \'release\') \
                     else \
                        local s = redis.call(\'get\', KEYS[1]) + 0 \
                        if (s and s + 0 < 1) then \
                            return redis.call(\'incr\', KEYS[1]) \
                        else \
                            return 1 \
                        end \
                     end';


Condition - script to implement the "wait" function
For condition variables, the entry procedure is very simple:  the process is put immediately onto the wait queue.

var waitScript = 'redis.call(\'lpush\',KEYS[2], ARGV[1]) \
                  return 0';


Condition - script to implement the "signal" function
Script to implement the signal function is equally simple for condition variables.  The wait queue is 'popped'.  If a process was waiting, a message to proceed to published to it.


var signalScript = 'local pId = redis.call(\'rpop\', KEYS[2]) \
                    if (pId) then \
                       return redis.call(\'publish\', pId, \'signal\') \
                    else \
                       return 1 \
                    end';


Condition - script to implement the "signalAll" function
Nearly identical to the above script.  In this case, the entire wait queue is popped and every waiting process is sent a message to proceed. 



var signalAllScript = 'local pId = redis.call(\'rpop\', KEYS[2]) \
                        while (pId) do \
                            redis.call(\'publish\', pId, \'signalAll\')\
                            pId = redis.call(\'rpop\', KEYS[2]) \
                        end \
                        return 1';


Synchronization Implementation #1 - The Dining Philosophers Problem

Dining Philosophers is a classic synchronization problem in computer science.  The gist of the problem is that multiple processes (Philosophers) are competing for insufficient resources (Forks).  If all processes were to acquire and hold 1 resource at the same time , none would be able to proceed (a Philosopher needs 2 forks to eat). Deadlock occurs, or more precisely for this case - all the Philosophers starve to death.

I implemented the solution to this problem using Semaphores.  Each of the 5 forks becomes a semaphore that each of the 5 philosopher processes perform P and V ops for access synchronization.

Figure 3 depicts the organization of the main process flow of the solution.


Figure 3


Below is a code snippet of that flow above.  Async was used to minimize callback nesting.

    async.series([ function (callback1)
                   {    
                        rightFork = new semaphore(rightForkNum, callback1);
                   },
                   function (callback2)
                   {
                       leftFork = new semaphore(leftForkNum, callback2);
                   },
                   function (callback3)
                   {
                       /*
                        * after semaphores have been created, do 10 iterations of getting forks, eating, and thinking
                        */
                       var iteration = 1;
                       d.run(function() {
                           async.whilst(
                                   function() 
                                   { 
                                       return iteration <= 10;
                                   },
                                   function(callback) 
                                   {
                                       logger.info('Iteration %d, Philosopher %d', iteration, cluster.worker.id);
                                       iteration++;
                                       live(rightFork, leftFork, callback);
                                   },
                                   function (err) 
                                   {
                                       if (err)
                                           throw err;
                                       logger.info('Philosopher %d signing off', cluster.worker.id);
                                       rightFork.quit();
                                       leftFork.quit();
                                       logger.debug('Exiting - File: dining.js, Method: initDomain, Philosopher id:%d', cluster.worker.id);
                                       callback3();
                                   }
                           );
                       });
                   }
                  ],
                  function(err)
                  {
                    if (err)
                        throw err;
                    cluster.worker.disconnect();
                  }
    );


Synchronization Implementation #2 - Readers/Writers Problem

Readers/Writers is yet another classic computer science problem.  For this synchronization problem, we have multiple read and write processes competing for the same resource (file, database, etc).  Multiple readers can access the resource concurrently (if there are no writers accessing), but a writer must have exclusive access.

I implemented this solution utilizing monitors.  I used a lock to ensure mutual exclusion to the monitor state, condition variables for process synchronization, and shared variables (Redis keys) for monitor state.

Figure 4 depicts the main process flow for the reader process.
Figure 4
Figure 5 depicts the main process flow for the writer process.
Figure 5

Below is a code snippet of the reader process.  Again, async is used to keep the callback nesting manageable.

    async.series(
                 [function(callback1)
                  {
                      logger.info('Reader %d attempting to gain read access', process.pid); 
                      var writers = 1;
                      async.whilst(  //loop until read access is available
                              function()
                              {
                                  return writers > 0;
                              },
                              function(cb1)
                              {
                                  mutex.acquire(function() {  //acquire monitor lock
                                      numWriters.get(function(res1) {  //fetch numWriters shared variable
                                          writers = res1;
                                          if (res1 == 0)  // if no writers are active, increment the numReaders shared var and release monitor lock
                                          {
                                              numReaders.incr(function(res2) {
                                                  mutex.release(function(){
                                                      cb1();
                                                  });
                                              }); 
                                          }
                                          else  //writers are active, wait on the okToRead cond var and release monitor lock
                                          {
                                              okToRead.wait(mutex, cb1);
                                          }
                                      });
                                  });
                              },
                              function(err)
                              {  
                                  if (err)
                                      throw err;
                                  callback1();  //writers was 0, reader gained access, exiting loop
                              }
                      );
                  },
                  function(callback2) 
                  {
                      logger.info('Reader %d gained read access', process.pid);
                      var readTime = utilities.randomPause(1,3); //1 to 3 seconds of simulated reading
                      setTimeout(function(){ logger.info('Reader %d finished reading', process.pid); callback2(); }, readTime);
                  },
                  function(callback3) 
                  {
                      logger.info('Reader %d releasing read access', process.pid);
                      mutex.acquire(function(){  //acquire monitor lock for the purpose of releasing a reader
                          numReaders.decr(function(res){  //decrement number of readers
                              if (res == 0)  // if num of readers is 0, signal a waiting writer
                              {
                                  okToWrite.signal(function(){
                                      mutex.release(function(){  //release monitor lock
                                         callback3(); //return
                                      });
                                  });
                              }
                              else  //num readers > 0, so just release the monitor lock and return
                              {  
                                  mutex.release(function(){ callback3();});
                              }
                          });
                      });
                  },
                  function(callback4)
                  {
                      logger.info('Reader %d processing data', process.pid);
                      var processingTime = utilities.randomPause(3,8);  //3 to 8 seconds of simulated data processing time
                      setTimeout(function(){ callback4(); }, processingTime);
                  }
                  ], 
                  function(err)
                  {
                     if (err)
                         throw err;
                     logger.debug('Exiting - File: readersWriters.js, Method: read, Process id:%d', process.pid);
                     readCB();
                  }
             );


Finally, code snippet of the writer process.

    async.series(
            [function(callback1)
             {
                 logger.info('Writer %d attempting to gain write access', process.pid); 
                 var writers = 1;
                 var readers = 1;
                 async.whilst(  //loop until write access is available, meaning - no readers or writers active
                         function()
                         {
                             return readers > 0 || writers > 0;
                         },
                         function(cb1)
                         {
                             mutex.acquire(function() {  //acquire monitor lock
                                 numReaders.get(function(res1){ //fetch numReaders shared variable
                                     readers = res1;
                                     if (res1 == 0)  //if no readers active, check number of writers
                                     {
                                         numWriters.get(function(res2){
                                             writers = res2;
                                             if (res2 == 0)  //if no writers active as well, increment numWriters and release monitor lock
                                             {
                                                 numWriters.incr(function(res3){
                                                     mutex.release(function(){
                                                         cb1();
                                                     });
                                                 });
                                             }
                                             else  //active writers are present, release monitor lock and wait
                                                 okToWrite.wait(mutex, cb1);
                                         });
                                     }
                                     else  //active readers are present, release monitor lock and wait
                                         okToWrite.wait(mutex, cb1);
                                 });
                             });
                         },
                         function(err)
                         {  
                             if (err)
                                 throw err;
                             callback1();  //number of readers and writers was 0, writer gained access, exiting loop
                         }
                 );
             },
             function(callback2) 
             {
                 logger.info('Writer %d gained write access', process.pid);
                 var writeTime = utilities.randomPause(1,3); //1 to 3 seconds of simulated wriing
                 setTimeout(function(){ logger.info('Writer %d finished writing', process.pid); callback2(); }, writeTime);
             },
             function(callback3) 
             {
                 logger.info('Writer %d releasing write access', process.pid);
                 mutex.acquire(function(){  //acquire monitor lock for the purpose of releasing a writer
                     numWriters.decr(function(res){  //decrement number of writers
                         okToWrite.signal(function(){  //signal 1 waiting writer
                             okToRead.signalAll(function(){  //signal all waiting readers
                                 mutex.release(function(){
                                     callback3();
                                 });
                             });
                         });
                     });    
                 });
             },
             function(callback4)
             {
                 logger.info('Writer %d processing data', process.pid);
                 var processingTime = utilities.randomPause(3,8);  //3 to 8 seconds of simulated data processing time
                 setTimeout(function(){ callback4(); }, processingTime);
             }], 
             function(err)
             {
                if (err)
                    throw err;
                logger.debug('Exiting - File: readersWriters.js, Method: write, Process id:%d', process.pid);
                writeCB();
             }
        );
Lessons Learned
  1. Developing process synchronization/mutex objects by hand is non-trivial.
  2. Process synchronization in an asynchronous environment (Node) becomes complex quickly.
  3. Attempting to use Redis publisher/subscriber in conjunction with standard Redis key/value objects is prone to race conditions if not carefully thought through.

Full Source Code here.
Copyright ©1993-2024 Joey E Whelan, All rights reserved.