Saturday, July 23, 2005 3:34 AM bart

Reliable messaging using MSMQ simplified

Introduction

The upcoming technology release of Indigo will simplify a lot of stuff around MSMQ and other "distributed software" technologies out there. Based on attribute-based programming the underlying implementation of various application aspects such as reliable messaging, service policies, service contracts, etc is abstracted and hidden for the developer, while relying on the various standards that were developed recently on the WS-* field. In a similar way the use of various transportation channels such as HTTP, MSMQ, etc is made easier and more transparent. So far for some Indigo evangelism on my blog for now (I'll do extensive posting on Indigo later this year once beta 1 of "Windows Vista" hits the road).

However, nowadays the development of a SOA-based application with reliable messaging support is not that straightforward as it could be. WSE currently lacks support for reliable messaging support as such through the WS-* series of standards. There is however a sample by architect George Copeland on how to do implement a reliable messaging solution using WSE 3.0 that can be found on MSDN on http://msdn.microsoft.com/webservices/default.aspx?pull=/library/en-us/dnwse/html/wseandws-rm.asp. In the first half of this year however (this article is dated in June this year) I came across the problem of implementing such a reliable messaging solution that could be combined with a service-oriented approach, as part of my last year's dissertation (which can be downloaded on http://www.bartdesmet.net/download/thesis/, in Dutch however). In this post I'll show you some of this stuff.

 

Basics of MSMQ

If you ask a developer for the Windows platform for a messaging solution in the Windows OS, he/she should come up with MSMQ as the answer number one. I won't explain MSMQ itself over here, more information can be found on www.microsoft.com/msmq (look for MSMQ 3.0, which is the XP/2003 version of the technology). In .NET you can use the MSMQ technology by taking advantage of the System.Messaging namespace which is pretty straightforward to use. The basic steps look like this:

  1. Create a message queue using the MSMQ management tools, through an installer class or using code (MessageQueue.Create). You can choose to create a public queue or a private queue, with or without transactional support.
  2. Put messages on the queue in application A, using the Send method on the MessageQueue class. The message object should be marked as [Serializable] in order to serialize it and put it on the queue as a message. To send more complex messages with support for encryption, priority based delivery, timeout values, etc you have to use the Message class instead.
  3. Receive the message at the other side (application B) using the Receive method on the MessageQueue class, pointing to the same queue. This method returns a Message object, from which you can get the message itself by using the Body property. In reality, you'll have to attach a formatter to the message queue instance to make deserialization possible, based on the original type of the message object.

In code, this looks as follows:

//sender
MessageQueue mq = new MessageQueue(path);
Message msg = new Message(body_object_of_message); //assume the type of this object to be MyMessageType
mq.Send(msg, somelabel); //th
e label can be a GUID or a textual message

//receiver
MessageQueue mq = new MessageQueue(path);
mq.Formatter = new XmlMessageFormatter(new Type[] { typeof(MyMessageType) }); //alternatively use a binary formatter
Message msg = mq.Receive(); //use a
MyMessageType original_message = (MyMessageType) msg.Body;

As you can see, this approach does not have a service-oriented look and feel. First of all, the messaging stuff is far too heavy to code using the System.Messaging namespace. Let's say it's too low-level to be really handy to program. A second drawback is the fact that you need to wrap a series of parameters to some call in a (serializable) message object. In contrast, a web service is far more easy to use:

//sender
object res = proxy.DoSomething(par1, par2, par3);

//receiver
[WebMethod]
object DoSomething(object par1, object par2, object par3)
{
   //perform work
}

When doing the same in MSMQ, you have to wrap the service operation parameters in a data carrier object (called the message) in order to do the same. Another disadvantage of the use of MSMQ for developing kind of a service-oriented application is the hosting of the app: you'll be responsible to write a server application that listens on the queue to take in requests, process them (on a background thread maybe) and send the result back to the client (using a response queue, because messaging in MSMQ is one-way messaging by design).

So, the drawbacks of MSMQ and System.Messaging are:

  • Object-based, not method-based (read: no "service operation" concept)
  • Low-level API, little abstraction for SOA
  • No hosting functionality (server app), no abstraction on the client (proxy client)
  • Uni-directional communication by default

Writing the server application really isn't that straightforward at all. When bidirectional communication or message acknowledgements (as part of a reliable messaging solution) are needed, a signficant amount of additional code ("plubming"?) is needed. Furthermore, threading will be required to have a better scalability and the use of a Windows Service as the application host is likely to be your choice. Combined, you'll have pretty much work to do in order to get the whole thing up and running.

 

Services built on top of MSMQ

Defining an MSMQ service

Somewhere in March/April I decided to write an abstraction layer for MSMQ that allows developers to create a service that uses the MSMQ protocol for (reliable) message delivery while maintaining the advantages of attribute-based development to write the service. That way, developers would not have to worry about creating a service application or a proxy manually, or to create the wrapper types for service operation calls. As a result, the development of this allows developers to write code like this:

[MsmqService] //compare with WebService
public class MyService
{
   [MsmqMethod] //compare with WebMethod
   public int Sum(int a, int b)
   {
      return a + b;
   }
}

Once this code is written and compiled, it's time to make it available over MSMQ. To do this, we need to generate a service application to host the code needed for multi-threading, message-to-operation translation, etc. As the matter in fact, this little piece of code results in:

  • 2 message queues, one for the input parameters (a, b) and one for the answer (the sum)
  • 2 carrier types, one for the input parameters (a, b) and one for the answer (the sum); both need to be serializable

Generating a service contract

In order to make things easier, a tool called mqdl.exe is available to convert the class definition (compiled to a .dll assembly) to a so-called MQDL service contract. This is called "export" mode. For the sample mentioned above, this looks as follows:

<?xml version="1.0" standalone="yes"?>
<MqdlDocument>
  <Service svc="SomeService" component="SomeTestService.SomeService">
    <Operation id="Sum">
      <Input>
        <Parameter name="a" type="System.Int32" />
        <Parameter name="b" type="System.Int32" />
      </Input>
      <Output type="System.Int32" />
    </Operation>
  </Service>
</MqdlDocument>

Creating a service application

Next, the mqdl.exe tool can be used to generate a service application and a proxy class. Let's start with the service application, called "svcman" mode. For each operation, a monitor is created that's watching a message queue. When a message is received, it's deserialized and passed to the service class (that we did create earlier) for processing. The answer that comes back is wrapped into a response message object, that is sent back on the response queue that's associated with the incoming message queue. The generated code for the Sum operation looks like this:

public class SumMonitor
{
        delegate System.Int32 ProcessDelegate(System.Int32 in1,
                                              System.Int32 in2);
       
        private class ProcessCallbackData
        {
            public ProcessDelegate pdel;
            public Message msg;
        }
   
        private MessageQueue queue;
        private bool stop = false;
        private bool stopped = false;
       
        public SumMonitor(string prefix)
        {
            string path = prefix + "SumRequests";
            if (!MessageQueue.Exists(path))
                MessageQueue.Create(path);
            queue = new MessageQueue(path);
            queue.Formatter = new XmlMessageFormatter
   (new Type [] { typeof(InputSum) });
        }
       
        public void Stop()
        {
            stop = true;
           
            while(!stopped);
        }
       
        public void Start()
        {
            while(!stop)
            {
                try
                {
                    Message msg = queue.Receive(TimeSpan.FromSeconds(5));

                    InputSum input = (InputSum) msg.Body;
                   
                    SomeTestService.SomeService svc
    = new SomeTestService.SomeService();
                    ProcessDelegate d = new ProcessDelegate(svc.Sum);
                    ProcessCallbackData pcd = new ProcessCallbackData();
                    pcd.pdel = d;
                    pcd.msg = msg;
                    d.BeginInvoke(input.in1, input.in2,
                                  new AsyncCallback(Done), pcd);
                }
                catch {}
            }
           
            stopped = true;
        }
       
        private void Done(IAsyncResult res)
        {
            ProcessCallbackData pcd = (ProcessCallbackData) res.AsyncState;
           
            OutputSum output = new OutputSum();
            output.output = pcd.pdel.EndInvoke(res);
           
            Message resp = new Message(output);
            resp.CorrelationId = pcd.msg.Id;
            pcd.msg.ResponseQueue.Send(resp);
        }
}

Note that callbacks are used to call the underlying service class in an asynchronous fashion. Furthermore, transport types are generated based on the MQDL contract:

[Serializable]
public class InputSum
{           
    public System.Int32 in1;
    public System.Int32 in2;
}
       
[Serializable]
public class OutputSum
{           
    public System.Int32 output;
}

Next, a service manager class is generated to host all of the monitors and start these on different threads:

public class ServiceManager
{
    private SumMonitor m1;
   
    public ServiceManager(string path)
    {
        m1 = new SumMonitor(path);
    }
   
    public void Start()
    {
        new Thread(new ThreadStart(m1.Start)).Start();
    }
   
    public void Stop()
    {
        m1.Stop();
    }
}

In the end, you just need to write a simply application (e.g. a Windows Service) that calls the Start and Stop methods on the ServiceManager, like this:

public static void Main()
{
 ServiceManager mgr = new ServiceManager(".\\private$\\");
 mgr.Start();
}

Consuming the service

At the other side, we need a proxy to hide all of the details of translating a service call to a transport type, sending it to a request queue, polling the response queue and translate the answer back to a method call return value. This is done by the "proxy" mode of mqdl.exe, which generates code like this:

public System.Int32 Sum(System.Int32 a, System.Int32 b)
{
 MessageQueue __queueReq = new MessageQueue(__path + "SumRequests");
 __queueReq.Formatter = new XmlMessageFormatter(
  new Type [] { typeof(InputSum) });

 string __pathResp = ".\\private$\\" + Guid.NewGuid().ToString();
 MessageQueue __queueResp = MessageQueue.Create(__pathResp);
 __queueResp.Formatter = new XmlMessageFormatter(
  new Type [] { typeof(OutputSum) });
   
 InputSum __input = new InputSum();
   
 __input.in1 = a;
 __input.in2 = b;
   
 Message __req = new Message(__input);
 __req.ResponseQueue = __queueResp;
 __queueReq.Send(__req);

 OutputSum __output = new OutputSum();
 try
 {
  string __id = __req.Id;
  Message __msg = __queueResp.ReceiveByCorrelationId(__id,     TimeSpan.FromSeconds(__timeout));
  __output = (OutputSum) __msg.Body;
 }
 finally
 {
  MessageQueue.Delete(__pathResp);
 }

 return __output.output;
}

People who do understand Dutch can read the details of this in my thesis (see http://www.bartdesmet.net/download/thesis/), chapter 10.

Schematic overview

The following picture illustrates the overall mechanism of mqdl.exe:

The mqdl.exe tool has three modi as I explained:

  • export mode, takes an assembly (DLL) and generates the MQDL contract (XML) using reflection
  • svcman mode, takes the MQDL contract (XML) and generates monitors and a service manager, both in C# using pretty large XSLT transforms
  • proxy mode, takes the MQDL contract (XML) and generates a proxy class in C# using another XSLT transform

 

Give me some code

You can take a look at this (experimental) implementation over here. Feel free to use it if it looks useful to you, suggestions and additions are welcome (but keep in mind the Indigo story that's due to be released soon).

Del.icio.us | Digg It | Technorati | Blinklist | Furl | reddit | DotNetKicks

Filed under: ,

Comments

# WCF: Building Secure, Reliable and Transacted Distributed Services

Wednesday, November 08, 2006 3:29 AM by B# .NET Blog

I just went to the session "Windows Communication Foundation: Building Secure , Reliable and Transacted