Saturday, August 26, 2006 1:03 PM bart

Running parallel jobs asynchronously using WaitHandle

Introduction

For the moment, I'm working on a thin client project which includes manageability of thin client machines over the network using a so-called "network manager" tool. Technologies involved are .NET 2.0 and .NET Remoting (for various reasons, one of which is to keep the footprint on client machines low).

One of the admin tasks that one has to perform is to shut down a computer remotely (assuming the "administrator" in charge has the right permissions). However, typically such a thing will be done in groups (shut down an entire thin client pc room for instance). The idea is that one single call is made to a server-application to shut down (or reboot or whatever) a list of computers. Once the server gets such a request (and performs the necessairy authentication and authorization), the call is dispatched to all clients but in parallel (a timeout typically takes a couple of seconds, serializing the calls would lead to an enormous wait time).

In this post I'm showing you the pattern of parallel execution of (similar) jobs and how to wait till all of them complete.

The entry point of the parallel job

One of the functions in my code (a bit simplified, i.e. no security or logging code) is:

public Dictionary<string, ManagementResult> Shutdown(string[] computers, TimeSpan delay, string message, bool allowCancel)
{
   if (computers == null
|| computers.Length == 0)
      return new Dictionary<string, ManagementResult
>();

   int
n = computers.Length;

   IAsyncCall[] wos = new ComputerExiter
[n];
   WaitHandle[] whs = new WaitHandle
[n];
   for (int
i = 0; i < n; i++)
   {
      IAsyncCall res = new ComputerExiter(this,
computers[i], delay, message, allowCancel);
      wos[i] = res;
      whs[i] = res.Do();
   }

   WaitHandle.WaitAll(whs);

   Dictionary<string, ManagementResult> results = new Dictionary<string, ManagementResult>();

   foreach (IAsyncCall res in wos)
     
results.Add(res.Computer, res.Result);

   return results;
}

We'll examine this in much more detail further on, but first some groundwork.

The ManagementResult class is a simple multi-property class to report the status of one individual operation. The list of computers goes in, a dictionary mapping the computer name on the result of the desired operation (in this case shutdown) goes out.

[Serializable]
public class
ManagementResult //Implementation omitted for clarity
{
   public ManagementResult(bool success, string message, FailureReason failure, Exception ex);

   public bool Success;
   public Exception
Exception;
   public FailureReason
FailureReason;
   public string
Message;
}

A helper interface and abstract class

Because I have more than one task that has to be executed in parallel, I created a simple helper interface called IAsyncCall:

internal interface IAsyncCall
{
   string Computer { get
; }
  
ManagementResult Result { get
; }
  
WaitHandle
Do();
}

Basically the interface specifies the input (the computer), the output (the result) and the operation itself, returning a WaitHandle. Hang on for a minute to understand the role of the WaitHandle...

About AutoResetEvent and WaitHandle

Next, I have an abstract class implementing this interface, looking as follows:

internal abstract class AsyncCaller : IAsyncCall
{
   protected Management
mgmt;
   protected string
computer;
   protected ManagementResult
result;

   protected AutoResetEvent evt = new AutoResetEvent(false
);

   protected
AsyncCaller(Management mgmt, string computer)
   {
      this
.mgmt = mgmt;
      this
.computer = computer;
   }

   public string
Computer
   {
      get { return
computer; }
   }

   public ManagementResult
Result
   {
      get { return
result; }
   }

   public WaitHandle
Do()
   {
      ThreadPool.QueueUserWorkItem(new WaitCallback
(Helper));
      return
evt;
   }

   public abstract void Helper(object
o);
}

Don't worry about the mgmt variable, this one just keeps a reference to a class of mine that allows to do server-to-client communication and is used to send commands such as "shut down" to an individual computer. The most important part however is the AutoResetEvent. Let's explain:

  • First some information about the class hierarchy: AutoResetEvent inherits from EventWaitHandle which on its turn inherits from WaitHandle.
  • The constructor takes an argument specifying whether the "wait handle" is signaled or not. In this case, we're creating a "wait handle" that is not set (false).
  • Now look at the method Do, which queues the work (using a helper method called "Helper" that has to be implemented in the subclass) using the .NET ThreadPool (you could also use a simple Thread and ThreadStart approach, but using the pool is a better alternative). The Do-method returns the WaitHandle to the caller.
  • The Helper method has a parameter of type object to match the WaitCallback delegate required by the QueueUserWorkItem method of ThreadPool. We are not using the parameter however.

Putting the pieces together

Now we have an abstract class that takes care of some of the plumbing. The next step is to make a concrete implementation:

internal sealed class ComputerExiter : AsyncCaller
{
   private TimeSpan delay;
   private string
message;
   private bool
allowCancel;

   public ComputerExiter(Management mgmt, string computer, TimeSpan delay, string message, bool allowCancel) : base(mgmt, computer)
   {
      this.delay = delay;
      this
.message = message;
      this
.allowCancel = allowCancel;
   }

   protected override void Helper(object
o)
   {
      result =
mgmt.Shutdown(computer, delay, message, allowCancel);
      evt.Set();
   }

}

A few important things to notice:

  • The constructor is straight-forward, just calling the base constructor and setting some fields.
  • The only thing the class does, is implementing the Helper method. Remember that the "object o" parameter is just there to match the WaitCallback delegate. We are not using the parameter however.
  • The Helper method does the dirty work (in this case calling some Management object to perform the shutdown of the one computer that's being processed) but does an even more important thing when it finishes the job: signaling the WaitHandle using evt.Set(). This indicates that work has finished and the system should not longer wait for this thread.

Time to go back to the place we started: our entry-point level method.

Step 1 - Starting the work

Let's start with the first part:

   IAsyncCall[] wos = new ComputerExiter[n];
   WaitHandle[] whs = new WaitHandle
[n];
   for (int
i = 0; i < n; i++)
   {
      IAsyncCall res = new ComputerExiter(this,
computers[i], delay, message, allowCancel);
      wos[i] = res;
      whs[i] = res.Do();
   }

To start with, the code creates a ComputerExiter instance for each computer that has to be shut down. Remember that the ComputerExiter derives from AsyncCaller, which implements IAsyncCall:

      IAsyncCall res = new ComputerExiter(this, computers[i], delay, message, allowCancel);
      wos[i] = res;

Nothing special has happened so far, we've just created an object that holds some info about the job to be done: the computer's name, the shutdown delay, a message to be displayed to the end-user, and a flag indicating whether the end-user can cancel the shutdown operation or not. Finally, we store the object in an array because we'll need it later on (when the job is completed) to gather information about the result using the IAsyncCall.Result property.

Next, it's time to start the ComputerExiter by calling the Do method. This method returns a WaitHandle which we keep in an array (see further):

      whs[i] = res.Do();

Recall the Do method is defined on the abstract AsyncCaller class and does this:

public WaitHandle Do()
{
   ThreadPool.QueueUserWorkItem(new WaitCallback
(Helper));
   return
evt;
}

So, we start running the Helper method on another thread and return the evt object (the AutoResetEvent) to the caller.

Step 2 - Waiting till all work is done

On to the next step: waiting for work to be completed. This is one of the most crucial parts of the code, but also one of the easiest:

   WaitHandle.WaitAll(whs);

Using the WaitAll method call we ask the system to wait till all the WaitHandles have been signaled. Recall that the WaitHandles are initally not signaled (new AutoResetEvent(false)) and that the helper method does the signaling when work has been done (evt.Set()).

 Step 3 - Gathering results

And finally, we can retrieve the results from all individual workers using the Result property defined on the IAsyncCall interface:

   Dictionary<string, ManagementResult> results = new Dictionary<string, ManagementResult>();

   foreach (IAsyncCall res in
wos)
     
results.Add(res.Computer, res.Result);

   return
results;

Wrap up

At the heart of this mechanism are two classes: the AutoResetEvent and the WaitHandle (actually each AutoResetEvent is a WaitHandle). The basic idea is signaling. Using WaitHandle.WaitAll (passing in an array of WaitHandles) you let a thread wait till every WaitHandle has been signaled, which can be done on another thread. Each worker object creates an AutoResetEvent (initally unsignaled) and returns it to the caller. You can see this as the first phase of gathering WaitHandles. Next, the workers can start their mission (each on a separate thread). When a worker completes, it signals the associated WaitHandle. Once all workers have done their jobs, the WaitAll call returns.

Reference sample

Below you'll find a simple Notepad-written reference sample that illustrates the concepts in a clear fashion:

using System;
using
System.Threading;

class
WaitHandleDemo
{
   static int
min = 1;
   static int
max = 9;

   public static void
Main()
   {
      Random rand = new Random
();

      int n = 10;
      WaitHandle[] whs = new WaitHandle
[n];

      for (int
i = 0; i < n; i++)
         whs[i] =
new
Worker(i, rand.Next(min, max) * 1000).Do();

     
WaitHandle
.WaitAll(whs);
   }

   class
Worker
   {
     
private AutoResetEvent evt = new AutoResetEvent(false
);
     
private int
ms;
      private int
i;

      public Worker(int i, int ms
)
      {
         this
.ms = ms;
         this
.i = i;
      }

      public WaitHandle
Do()
      {
         ThreadPool.QueueUserWorkItem(new WaitCallback
(Helper));
         return
evt;
      }

      void Helper(object
o)
      {
         Console.WriteLine("Worker {0} started ({1} ms)."
, i, ms);
         Thread
.Sleep(ms);
         evt.Set();
         Console.WriteLine("Worker {0} stopped."
, i);
      }
   }
}

Output should look like:

Worker 0 started (2000 ms).
Worker 1 started (3000 ms).
Worker 2 started (6000 ms).
Worker 3 started (8000 ms).
Worker 0 stopped.
Worker 4 started (8000 ms).
Worker 5 started (1000 ms).
Worker 6 started (3000 ms).
Worker 1 stopped.
Worker 7 started (5000 ms).
Worker 8 started (1000 ms).
Worker 5 stopped.
Worker 9 started (5000 ms).
Worker 8 stopped.
Worker 6 stopped.
Worker 2 stopped.
Worker 9 stopped.
Worker 7 stopped.
Worker 3 stopped.
Worker 4 stopped.

Enjoy!

Del.icio.us | Digg It | Technorati | Blinklist | Furl | reddit | DotNetKicks

Filed under:

Comments

# re: Running parallel jobs asynchronously using WaitHandle

Friday, September 01, 2006 12:58 AM by Alex Simkin

All I got from the provided Notepad example was: WaitAll for multiple handles on a STA thread is not supported.

Regards.

# re: Running parallel jobs asynchronously using WaitHandle

Friday, September 01, 2006 1:35 AM by bart

Hi Alex,

That's exactly why there is no [STAThread] attribute in my code to decorate the Main method. The reason for the STAThread and WaitAll to be incompatible is message pumping. Because WaitAll is a blocking call this yields to an invalid situation and therefore you'll end up with a NotSupportedException.

The main question however is why do you have an STA thread after all? Are you still using COM objects that get called from your code and require a single thread apartment? If not, leave the STAThread attribute alone as I did in my code sample.

-Bart