-->

Tuesday, July 11, 2017

A Massively Parallel Short Order Cook

Ingredients



I have a small compute cluster set up in my house. Among other things (such as Hortonworks HDP on VirtualBox VMs, a discussion for another day), it runs a massively parallel distributed compute system based on the Alchemi grid computing project. Since I branched away from the Alchemi base code several years ago, I have made some rather significant changes to its core functionality. At this point in time, I have taken the base code and expanded it in many different ways, for instance, basing computation on the use of queues, instead of instantiating classes for each record as the original version does. I am also moving away from a centralized master node paradigm to a more “school of piranhas” approach to at-scale data processing, but, more on that in a future post. It still retains a lot of the underlying framework of Alchemi, but I am gradually moving away from some of the more restrictive technologies to more modern solutions (such as Web API calls instead of the extant dotNet remoting solution).

Straight out of the box, Alchemi bases its computations on a scatter/gather scenario, much like MPI does. As such, you write a C# class that represents a unit of computation, and then that class gets distributed to the cluster. For each part of the problem, the class gets instantiated on some node of the cluster, does a piece of work, and then returns the data, at which point the class then terminates. This is fine for small problems, but I find that at scale it is somewhat lacking, because of limitations in the amount of time it takes to move the code to a node, instantiate the class, rehydrate the data, and so forth. It would be much better if we did not have to spend the time doing this instantiation and rehydration overhead.

Instead, what I have done is used RabbitMQ. RabbitMQ is an in-memory queuing system written in Erlang, which is capable of moving large amounts of data in a short amount of time. RabbitMQ allows me to base my computational model on queues. Instead of instantiating a class for every single piece of the problem, what I then do is instantiate a class on an individual node that then listens for messages.  The data then streams to a node that was instantiated only one time, instead of being instantiated and torn down for every piece of data (i.e., for every record). This approach can scale up or down as needed by instantiating the class on more nodes. It also allows for building data pipelines, in that one node can feed its output into the input queue for one or more other nodes. Note that this opens up the discussion to the study of graph theory and queueing systems, both of which become very important as systems such as this scale up.



Preheat the oven


I built the base class for RabbitMQ queuing so that it can operate in two ways. The first way is that it will read data from a queue until it exhausts all available data, and then shut down.  The second way is that will start up and then listen for messages to process indefinitely until it receives a signal to shut down. The first way lends itself well to a single static dataset that requires batch processing. The second lends itself well to real time processing, where data will be coming in sporadically over a large period of time, or in chunks of varying sizes. As such, it lends itself well to real time robotics sensor processing, which I will discuss in another post.  



Cook at 50,000 Records Per Second


As a somewhat trivial example of how the system operates, I created a program, which is a massively parallel short order cook. It can cook up one digital hamburger or hot dog, or it can create thousands of them, but, because it is massively parallel, it scales to either demand easily. The program takes in a message consisting of the word “hamburger” or the phrase “hot dog”. Then, depending on what phrase is inputted, it decorates the message with words like “sauerkraut”, “pickle”, “ketchup”, and “mustard”.  Hamburgers and hot dogs have different streams and get decorated with different items. Finally, the streams for both hot dog and hamburger converge, and the entire message is surrounded with a “bun” string. Each stage of the processes run on a separate node of the cluster. For instance, there is a starting node, which splits the input stream into two separate streams, one for hot dogs and one for hamburgers.  Similarly, sauerkraut, pickle, ketchup, and mustard are all separate instances of specialized classes, waiting for specific input messages to decorate. These specialized classes all inherit from the RabbitMQ base class, and know about the queues they are supposed to listen to and post to.

I found the throughput for this trivial scenario to be quite good and, in fact, have seen peak speeds of about 100,000 messages per second, and sustained speeds of about 50,000 records for second.

I do hope you're hungry.



namespace Robotics.Alchemi.TubeTest.Grid
{
    using global::Alchemi.Core.GridThreads;
    using System;

    public static class AddSauerkraut
    {
        [Serializable]
        public class GridThread : RabbitMqGridThread
        {
            protected override string Level2 { get { return "Add sauerkraut"; } }

            protected override string ProcessMessage(string source, string message)
            {
                return message + ", sauerkraut";
            }
        }

        public class Step : global::Alchemi.Core.BatchJobs.Step
        {
            public override global::Alchemi.Core.Owner.GridThread CreateGridThread()
            {
                return new AddBun.GridThread();
            }
        }
    }
}




Here’s an example class, for adding sauerkraut to a hot dog (or anything else that is posted to its queue!). As you can see it doesn’t do too particularly much. It simply waits for a message to come in, and then decorates it with a specific string. This, of course, could be expanded to do more heavy lifting, such as, for example, determining similarities in sensor telemetry over time intervals, or round robining different types of sauerkraut to incoming hot dogs as they arrive.



 namespace Robotics.Alchemi.TubeTest.Grid
{
    using global::Alchemi.Core.GridThreads;
    using System;

    public static class Start
    {
        public static string InMessageQueueAlias = "StartQueue";

        [Serializable]
        public class GridThread : RabbitMqGridThread
        {
            protected override string Level2 { get { return "Start"; } }

            protected override string ProcessMessage(string source, string message)
            {
                message = message.Trim().ToLower();

                if (message == "hot dog")
                    SendToAlternateQueue(typeof(AddSauerkraut.GridThread), message);
                else
                    SendToAlternateQueue(typeof(AddPickles.GridThread), message);

                return null;
            }
        }

        public class Step : global::Alchemi.Core.BatchJobs.Step
        {
            public override global::Alchemi.Core.Owner.GridThread CreateGridThread()
            {
                return new GridThread();
            }
        }
    }
}



Here’s the code for the “Start” class, which, for this program, monitors a specific queue for input messages (from a user or other program), and then starts the process for this program by posting to a queue depending on the message received. Its job is simply to examine the message and then determine what queue it should go in. In general, the philosophy that I have followed is that each node should do a simple, uncomplicated job. Collectively, what all the nodes do adds up to more complex logic.  


namespace Robotics.Alchemi.TubeTest.Grid
{
    using global::Alchemi.Core.BatchJobs;
    using global::Alchemi.Core.GridThreads;
    using global::Alchemi.Core.Helpers;
    using global::Alchemi.Core.Owner;
    using global::Alchemi.Core.Storage.Enums;

    public static class BatchInfo
    {
        public static string ControlMailboxName = "TubeTest_ControlMailbox";
        public static string EntryPointMessageQueueAlias = "TubeTestin";
        public static string ExitPointMessageQueueAlias = "TubeTestout";

        public class Batch : global::Alchemi.Core.BatchJobs.Batch
        {
            public Batch() : base()
            {
                BatchName = "Tube Test";
                Dependencies.Add(new ModuleDependency(typeof(AddBun).Module));

                MailboxHelper.SendMessage(ControlMailboxName, "run");

                BatchSteps.AddRange(
                    new Step[]
                    {
                          new Start.Step()
                         ,new AddPickles.Step()
                         ,new AddBun.Step()
                         ,new AddKetchup.Step()
                         ,new AddMustard.Step()
                         ,new AddSauerkraut.Step()
                    });

                SetPipelineMappings(
                         EntryPointMessageQueueAlias

                        ,typeof(Start.GridThread)     // splits to add pickles and add sauerkraut
                        ,typeof(AddPickles.GridThread)
                        ,typeof(AddKetchup.GridThread)
                        ,typeof(AddMustard.GridThread)
                        ,typeof(AddBun.GridThread)

                        ,ExitPointMessageQueueAlias
                    );

                SetPipelineMappings(typeof(AddSauerkraut.GridThread), typeof(AddBun.GridThread));
            }

            protected override void HandleThreadStarting(GridThread thread)
            {
                ((RabbitMqGridThread)thread).StopType = MessageQueueGridThreadStopType.StopOnSignal;
                ((RabbitMqGridThread)thread).ControlMailboxName = BatchInfo.ControlMailboxName;
            }
        }
    }
}


The “Batch” class describes the program, holds global variables, defines which queues feed into which nodes, and so forth. In the case of a program where each of the nodes is going to exhaust the data in its queue and then exit, the batch class describes which steps will be run, and in what order.  In the case of reading in online mode, in which the nodes wait for data indefinitely until given the signal to quit, the Batch class does not define step order, because all steps (nodes) run simultaneously. In either case, the batch class describes an execution graph, describing the flow of data between nodes.


Serves One Million


The theory behind creating a massively parallel system based on queues is very interesting. To get started take a look at the following resources.

The flow data through system with multiple nodes (or servers) comes under the study of graph theory (which could also be called “network theory”) and queue theory. There are several good books that can get you started on these subjects.  

And, of course, the Wikipedia articles on graph theory and queuing theory are well worth a read.

Apache Spark


Another, much more popular system, which is used for real time processing data on massively parallel systems is Apache Spark, a very popular Apache project which is constantly evolving.

To use Spark with C# and .Net, take a look at the Mobius project from DataBricks, the originators of Apache Spark.


No comments:

Post a Comment