Thursday, April 2, 2015

You Can Do Magic

Well, Alchemi, anyway. 

Alchemi is an open source high performance computing framework, originally developed by the GRIDS Laboratory at the University of Melbourne, Australia, and other contributors to the project, which can be found here. The original Alchemi project is available under GPL v2, as are derivative works such as mine, which is a fork and update of the original Alchemi project. The project described here can be found on SourceForge, at https://sourceforge.net/projects/alchemi2/ .

Alchemi is a grid computing framework akin to more commonly used frameworks such as MPI, or the commercially available Digipede framework. Similar to Digipede, in Alchemi, you create a class in a .Net language, in which you do some unit of work, and that class is handed off to a worker node. The worker node executes the piece of work and then hands back the serialized class, with properties containing the results of computation.  

Alchemi is pretty neat, but, it was pretty old code, and needed some updating. For instance, internally, it used no generics, as the original code predates generics in the .Net framework. I also found the performance lacking, mainly due to the setup and teardown time needed for each instance of the GThread classes, which are what are handed to the worker node. There were also some much needed improvements. For instance, the idea of an application, which is a collection of GThreads, was good, as far as it went, but there also needed to be orchestrations of some sort, where one set of parallelized steps finishes, and the generated output is handed to another set of steps to continue the work. In the new paradigm, these are Batch objects, which are collections of Application objects with dependencies between the Applications, which describe an acyclic graph, or a dataflow diagram. Also lacking was the ability to do online processing – when a worker node finishes with its data, it generally quits and returns. While this can be great for static data, it doesn't allow for things such as a parallelized neural net, waiting for data and acting on it as it arrives. For this, I used MSMQ to allow piping of data between processes running in parallel, and mailboxes (implemented in SQL Server) to allow for control signals to be passed to worker nodes.

A completed batch, showing its steps. Steps are individual grid applications which are orchestrated as part of a batch.

I reworked a lot of the code to accomplish this, as well as doing some general cleanup and better adherence to naming standards (Gthreads are now renamed as GridThreads, there are no more non-generic collections, etc), better code organization, etc. I would estimate that about 70% of the code is rewritten or outright new code at this point, while retaining (for the most part) the underlying remoting mechanisms that launch and manage worker threads. 

The result is a much faster, much more fault tolerant and reliable cluster, capable of much higher throughput. For a six worker node cluster, I am, for some applications, seeing throughput of 200,000 records a second on my seven node, 13 core cluster (all Intel i5 machines, except for one ex-embedded machine, which has a Sempron). While not ready for the Top500 list, for what I need the cluster for (such as, crunching through robotic sensor information), it's not bad at all, and, in fact, quite fast.

The compute cluster I built consists of six HP DC7800 mini desktop computers, and one very modified HP T5730W thin client machine. All machines are running Windows 7 Pro, except node 6, which is Win8.1. The Drobo provides external storage, as well as being the NAS for our house in general.

Some Highlights

 In order to have better throughput, there are new types of GridThreads, such as the MessageQueueGridThread, which reads from one MSMQ queue, and outputs to another. It can operate in one of two modes – one where it processes a queue until no more records are available and then quits, and a second mode, where it runs continually until it receives an external stop signal, allowing for online, non-batch processing. Queues can be dynamically allocated from a pool of known queues running on several machines in the cluster, and allow for dynamic piping of data between parallel processes in an application.

namespace Robotics.Alchemi.TubeTest.Grid
    using global::Alchemi.Core.GridThreads;
    using global::Alchemi.Core.Helpers;
    using global::Alchemi.Core.Storage;
    using System;
    using System.Collections.Generic;
    using System.Messaging;

    public static class AddPickles
        public static string InMessageQueueAlias = "AddPicklesInqueue";

        public class AddPicklesStep : global::Alchemi.Core.BatchJobs.Step
            public override IEnumerable GetThreadsToExecute(IEnumerable batchState, bool isLocalExecution)

                yield return
                    new AddPicklesGridThread
                        StopSignalLabel = BatchInfo.StopSignalLabel,
                        StopType = MessageQueueGridThreadStopType.StopOnSignal,
                        ControlMessageQueueAlias = BatchInfo.ControlMessageQueueAlias,
                        InMessageQueueAlias = AddPickles.InMessageQueueAlias,
                        OutMessageQueueAlias = AddMustard.InMessageQueueAlias,

        public class AddPicklesGridThread : MessageQueueGridThread
            protected override object ProcessMessage(Message message, string messageAsString)
                return messageAsString + ", pickles";

An example of a message queue enabled grid thread, and its associated batch step setup class.

Queues can be monitored within the console application. Within running grid applications, queues, which can exist on any arbitrary machine within the cluster, are dynamically reserved for a limited time by the application. The application periodically re-reserves its queues. When a queue is idle past its reservation time, it can then be used by another application.

In-memory tracking of executor nodes and managers has been removed in favor of SQL Server only. This allows for better tracking and logging of node activity, applications, batches, and so on, and also allows access to data through other channels other than the manager, allowing the manager to focus on scheduling rather than servicing data requests better served through a dedicated, robust database system. This also allows for a more distributed system in the future, allowing Executors to be less dependent on a central control point.

 Executors and Managers are more fault tolerant. Executors sense when the manager is unavailable, and aggressively attempt to re-establish contact. While not perfect, this allows for a cluster that is tolerant of node failures and more readily routes around hardware failures.

Executors can be more directly controlled, and can be issued commands directly in order to stop/start an executor, shut down the executor's host machine, etc.

The Executors control screen, showing buttons that send commands to individual executors. As with most screens in the console application, this screen has an auto-refresh feature with an adjustable refresh frequency.

Executor (worker) nodes can be controlled directly by passing messages into a mailbox that they monitor. Mailbox functionality is also available programmatically to grid applications.

The front end application is greatly improved and extended. While a lot of the existing functionality remains – for instance, the performance graph – there is a lot of new functionality written using WPF, the entity framework, and the MVVM pattern, allowing for a responsive application with a more modern look and feel, functionality, and so forth. This bypasses the manager node where possible and instead queries SQL server directly, making it a normal data-centric application, independent of whether the cluster is up and running or not.

The main form of the console application. While not to regular UX standards, the [[HOME]] menu item brings the user back to this start screen, to allow quick switching between work areas. The legacy application is available through the 'Legacy Console' tile.

Most screens, such as the Applications screen shown here, have search and auto-refresh functionality. 

Although with some modifications, all of the legacy screens, such as the performance graph shown here, still exist and are readily accessible in the console application. 

Feed Shark