Sunday, March 29, 2009

Enabling OPC information exchange using service bus (nServiceBus) part 1

The samples in my previous post were pretty simplistic. In this post I want to extend the functionality and enable information exchange using a service bus. I’m using nServiceBus as the service bus implementation. A draft would look like:

OPC and PubSub ESB

The previously created program with the callback will listen for any changes on the tags. When a change occurs it will retrieve the values and publish them on the bus. Any client that subscribes to the message will receive the message. The nSeviceBus ESB has only a limited amount of prerequisites. Microsoft MSMQ needs to be enabled and some message queues create through a provided script.

For the exchange we need to define a message data class. For the sake of simplicity we will use the following format:

using NServiceBus;
using System;

namespace Messages
{
[Serializable]
public class EventMessage : IEvent
{
public Guid EventId { get; set; }
public DateTime Time { get; set; }
public TimeSpan Duration { get; set; }
public String Description { get; set; }
}

public interface IEvent : IMessage
{
Guid EventId { get; set; }
DateTime Time { get; set; }
TimeSpan Duration { get; set; }
String Description { get; set; }
}
}



Now I’ll use the previously create Tester class and rename it to OpcEventMessageDispatcher. I need to add a IBus member variable and change the Initialize method to also initialize the service bus. The Publish/subscribe sample of the nServiceBus provides sufficient infomation to do so. The with the sample provided application configuration file (app.config) can be used unchanged. The complete initialize method would look like:



private static IBus m_bus;



public void Initialize()
{
Opc.URL m_url = new Opc.URL("opcda://localhost/KEPware.KEPServerEx.V4");
m_server = new Opc.Da.Server(new OpcCom.Factory(), m_url);
try
{
if (m_server != null)
m_server.Connect();
}
catch (Opc.ConnectFailedException connectionFailure)
{
Console.WriteLine("Connection failure : " + connectionFailure.Message);
Console.WriteLine("Additional info : " + connectionFailure.InnerException.Message);
}
Console.WriteLine(String.Format("Connection with {0} successfull", m_server.Name));
m_bus = NServiceBus.Configure.With()
.SpringBuilder()
.MsmqSubscriptionStorage()
.XmlSerializer()
.MsmqTransport()
.IsTransactional(true)
.PurgeOnStartup(false)
.UnicastBus()
.ImpersonateSender(false)
.CreateBus()
.Start();
}
Now that we initialized the service bus we need to change the callback event to publish a message onto the bus.


//DataChange event
public void OnDataChange(object subscriptionHandle, object requestHandle, ItemValueResult[] values)
{

foreach (ItemValueResult item in values)
{
IEvent eventMessage = new EventMessage();
eventMessage.EventId = Guid.NewGuid();
eventMessage.Time = item.Timestamp;
eventMessage.Duration = TimeSpan.FromSeconds(99999D);
eventMessage.Description = String.Format("Tag {0} changed, current value {1} ", item.ItemName, item.Value);
m_bus.Publish(eventMessage);
Console.WriteLine("Published event with Id {0}.", eventMessage.EventId);
}
}


As you can see the creation is as easy as creating an event message and publish it using the Publish method. Running this would show following result:



OpcEventMessageDispatcher



The output shows the created messages with the Guid message id’s. In my next post I’ll create some clients that subscribe to the create messages.

No comments:

Post a Comment