Sunday, March 29, 2009

Enabling OPC information exchange using service bus (nServiceBus) part 2

In the previous post I created a simple message publisher to publish OPC tag information onto the nServiceBus service bus. In this post I’ll create a client that subscribes to the message and retrieves the message from the bus. The application is close to a copy of the publish/subscribe sample provide with the nServiceBus documentaion. Again the app.config can be used unchanged.

The first task is to amend the EventMessageHandler to use the event message class defined in the previous post.

using Messages;
using NServiceBus;
using System;

namespace Subscriber1
{
public class EventMessageHandler : IMessageHandler<EventMessage>
{
public void Handle(EventMessage message)
{
Console.WriteLine("Subscriber 1 received EventMessage with Id {0}.", message.EventId);
Console.WriteLine("Message time: {0}.", message.Time);
Console.WriteLine("Message duration: {0}.", message.Duration);
Console.WriteLine("Message description: {0}.", message.Description);
}
}
}


As you can see very straight forward.



The rest of the Subscriber1 can be used unchanged.



using System;
using Common.Logging;
using Messages;
using NServiceBus;

namespace Subscriber1
{
class Program
{
static void Main()
{
LogManager.GetLogger("hello").Debug("Started.");

var bus = NServiceBus.Configure.With()
.SpringBuilder()
.XmlSerializer()
.MsmqTransport()
.IsTransactional(false)
.PurgeOnStartup(false)
.UnicastBus()
.ImpersonateSender(false)
.SetMessageHandlersFromAssembliesInOrder(
typeof(EventMessageHandler).Assembly
)
.CreateBus()
.Start();

Console.WriteLine("Listening for events. To exit, press 'q' and then 'Enter'.");
while (Console.ReadLine().ToLower() != "q")
{
}
}
}
}



The implementation of second subscriber is similar. When running the program it shows the following result screen.



OPC and PubSub ESB output



You see the OpcEventMessageDispatcher publishes a message onto the bus which is read by both the subscribing clients.

Enabling OPC information exchange using service bus (nServiceBus) part 1

The samples in my previous post were pretty simplistic. In this post I want to extend the functionality and enable information exchange using a service bus. I’m using nServiceBus as the service bus implementation. A draft would look like:

OPC and PubSub ESB

The previously created program with the callback will listen for any changes on the tags. When a change occurs it will retrieve the values and publish them on the bus. Any client that subscribes to the message will receive the message. The nSeviceBus ESB has only a limited amount of prerequisites. Microsoft MSMQ needs to be enabled and some message queues create through a provided script.

For the exchange we need to define a message data class. For the sake of simplicity we will use the following format:

using NServiceBus;
using System;

namespace Messages
{
[Serializable]
public class EventMessage : IEvent
{
public Guid EventId { get; set; }
public DateTime Time { get; set; }
public TimeSpan Duration { get; set; }
public String Description { get; set; }
}

public interface IEvent : IMessage
{
Guid EventId { get; set; }
DateTime Time { get; set; }
TimeSpan Duration { get; set; }
String Description { get; set; }
}
}



Now I’ll use the previously create Tester class and rename it to OpcEventMessageDispatcher. I need to add a IBus member variable and change the Initialize method to also initialize the service bus. The Publish/subscribe sample of the nServiceBus provides sufficient infomation to do so. The with the sample provided application configuration file (app.config) can be used unchanged. The complete initialize method would look like:



private static IBus m_bus;



public void Initialize()
{
Opc.URL m_url = new Opc.URL("opcda://localhost/KEPware.KEPServerEx.V4");
m_server = new Opc.Da.Server(new OpcCom.Factory(), m_url);
try
{
if (m_server != null)
m_server.Connect();
}
catch (Opc.ConnectFailedException connectionFailure)
{
Console.WriteLine("Connection failure : " + connectionFailure.Message);
Console.WriteLine("Additional info : " + connectionFailure.InnerException.Message);
}
Console.WriteLine(String.Format("Connection with {0} successfull", m_server.Name));
m_bus = NServiceBus.Configure.With()
.SpringBuilder()
.MsmqSubscriptionStorage()
.XmlSerializer()
.MsmqTransport()
.IsTransactional(true)
.PurgeOnStartup(false)
.UnicastBus()
.ImpersonateSender(false)
.CreateBus()
.Start();
}
Now that we initialized the service bus we need to change the callback event to publish a message onto the bus.


//DataChange event
public void OnDataChange(object subscriptionHandle, object requestHandle, ItemValueResult[] values)
{

foreach (ItemValueResult item in values)
{
IEvent eventMessage = new EventMessage();
eventMessage.EventId = Guid.NewGuid();
eventMessage.Time = item.Timestamp;
eventMessage.Duration = TimeSpan.FromSeconds(99999D);
eventMessage.Description = String.Format("Tag {0} changed, current value {1} ", item.ItemName, item.Value);
m_bus.Publish(eventMessage);
Console.WriteLine("Published event with Id {0}.", eventMessage.EventId);
}
}


As you can see the creation is as easy as creating an event message and publish it using the Publish method. Running this would show following result:



OpcEventMessageDispatcher



The output shows the created messages with the Guid message id’s. In my next post I’ll create some clients that subscribe to the create messages.

Saturday, March 28, 2009

Retrieving tag information with Kepware & C# using asynchronous callbacks

 

In my previous post I managed to retrieve information out of production machines using the Kepware EX OPC server. In this post I’ll make a slightly more dynamic retrieval of the data. I will create again a retrieval for two tags, but the application should be “warned” whenever the tag value changes.

 

class Tester
{
private Opc.Da.Server m_server = null;
private Opc.Da.Subscription m_subscription = null;
private Opc.Da.SubscriptionState m_state = null;
private Opc.Da.Item[] m_items = null;

public void Initialize()
{
Opc.URL m_url = new Opc.URL("opcda://localhost/KEPware.KEPServerEx.V4");
m_server = new Opc.Da.Server(new OpcCom.Factory(), m_url) ;
try
{
if (m_server != null)
m_server.Connect();
}
catch (Opc.ConnectFailedException connectionFailure)
{
Console.WriteLine("Connection failure : " + connectionFailure.Message);
}
Console.WriteLine(String.Format("Connection with {0} successfull", m_server.Name));
}

public void QueryUsingDataChangedCallback()
{
m_state = new Opc.Da.SubscriptionState();
m_state.Name = "Data collector";
m_state.ServerHandle = null;
m_state.ClientHandle = Guid.NewGuid().ToString();
m_state.Active = true;
m_state.UpdateRate = 250; // Query every 250 ms
m_state.Deadband = 0;
m_state.Locale = null;

m_subscription = (Opc.Da.Subscription)m_server.CreateSubscription(m_state);
// Create room to observe 2 tags ...
m_items = new Item[2];
Opc.Da.Item m_item = new Item();
m_item.ClientHandle = Guid.NewGuid().ToString();
m_item.Active = true;
m_item.ItemName = "Channel_0_User_Defined.Ramp.Ramp4";
m_item.ItemPath = "";
m_item.ServerHandle = m_state.ClientHandle;
m_items[0] = m_item;
m_item = new Item();
m_item.ClientHandle = Guid.NewGuid().ToString();
m_item.Active = true;
m_item.ItemName = "Channel_1.Device_1.Tag_1";
m_item.ItemPath = "";
m_item.ServerHandle = m_state.ClientHandle;
m_items[1] = m_item; //Insert item
m_items = m_subscription.AddItems(m_items);
// Set the async callback event listener
m_subscription.DataChanged += new Opc.Da.DataChangedEventHandler(OnDataChange);
}


//DataChange event
public void OnDataChange(object subscriptionHandle, object requestHandle, ItemValueResult[] values)
{
foreach (ItemValueResult item in values)
{
Console.WriteLine("The observed data has changed ...");
Console.WriteLine("Item : {0} \nValue : {1}", item.ItemName, item.Value);
Console.WriteLine("Quality: {0}\nTime (HH:MM:SS mmmm) : {1}", item.Quality, item.Timestamp.ToString("HH:mm:ss ffff"));
}
}

static void Main(string[] args)
{
Tester tst = new Tester();
tst.Initialize();
tst.QueryUsingDataChangedCallback();
Console.ReadLine();
}
}
}


The QueryUsingDataChangeCallback method creates the tag group (subscription) and then creates the event handler to listen whenever values have changed. When ever the event occurs some message is printed.



KepwareCallbackOutput



Besites the milisecond time that’s a bit weird, the output shows that the tag values are observed and changes are shown. If we imagine that a some event control would be bound to this changes, we could create a windows WPF or Silverlight application that shows warning messages on to a display. Although that the code would fit for some major refactoring, I would like to dig in first to decouple this application from any other listening application or module. A tool that I assume can help me with the decoupling is the open source ESB nServiceBus. In a very schematic view it is shown in the next picture:



PublishSubscribeESB



Our previously build application would be re-factured to a messaging endpoint that publishes any tag value changes onto the service bus. All other clients that are interested in the information subscribe to the message and retreive the information from the service bus.

Gadget whish list

I subscribed to the 150k tour version of the Amstel Gold Race classic . Although the track will be routed properly,  I’m planning to buy a Garmin Edge s705 to track my cycling ride and any new ones. The new shortly (stable) available Tacx training software will support downloading the GPS data into my Fortius VR trainer. There are already cycling GPS download routes sites on the web (GPS routes Netherlands and  GPS routes Belgium).

Retrieving information from a “tag” using C# and Kepware OPC server

As promised on my previous post this post will show you how to connect and retrieve some data from a PLC connect through the Kepware EX server. The picture  shows the main screen of the  Kepware KEPServerEX

server. The structure is “channel”, “device” , “tag”. On the screen e.g. “Channel_0_User_Defined”, “Ramp”, “Ramp4”. For each tag the name, address, data type, scan rate, scaling and description are shown.

My first attempt will be to query just 2 tags on different channels and devices.

class Tester
{
private Opc.Da.Server m_server = null;

public void Initialize()
{
Opc.URL m_url = new Opc.URL("opcda://localhost/KEPware.KEPServerEx.V4");
m_server = new Opc.Da.Server(new OpcCom.Factory(), m_url) ;
try
{
if (m_server != null)
m_server.Connect();
}
catch (Opc.ConnectFailedException connectionFailure)
{
Console.WriteLine("Connection failure : " + connectionFailure.Message);
}
Console.WriteLine(String.Format("Connection with {0} successfull", m_server.Name));
}

public void QueryOPCTags()
{
Opc.Da.Item item = new Item();
Opc.Da.Item[] m_items = new Item[2]; // Define two elements
item.ItemName = "Channel_0_User_Defined.Ramp.Ramp4";
m_items[0] = item;
item = new Item();
item.ItemName = "Channel_1.Device_1.Tag_2";
m_items[1] = item;
ItemValueResult[] itemValues = null;
try
{
itemValues = m_server.Read(m_items);
}
catch (Exception e)
{
Console.WriteLine(e.Message);
}
if (itemValues.Length > 0)
{
foreach (ItemValueResult value in itemValues)
{
Console.WriteLine("Item name:{0},value:{1},quality:{2}", value.ItemName, value.Value, value.Quality);
}
}
}


            static void Main(string[] args)
{
Tester tst = new Tester();
tst.Initialize();
tst.QueryOPCTags();


                Console.ReadLine();
}
}
}





The routine QueryOpcTags creates 2 tags and adds them to the tag array (m_items). It will then query the server to read the information from the defined tags and store the retrieved information in the ItemValueResult collection.



The result of running this piece of code will look like..



OPCTestOutputScreen



Although that the program is not very useful due to the static nature, it at least shows that we can get some information out of the “machine” using some C# code. In my next post I’ll extend the sample to retrieve information whenever a observed tag changes.

Connecting to a Kepware OPC server

I’m investigating the retrieval of status information from my company production machines querying the PLC’s using OPC. This week I downloaded a promising OPC server from Kepware. The product came with samples, but none in a .Net C# version. Browsing the internet didn’t help much. so I decided to just give it a go based on the other language samples and the opcnet API. I installed the Kepware EX trial version. The install went very smoothly, whithout any problems. For coding I used the Visual Studio Express IDE with SP1 installed.

My first piece of code to connect to the server looked like:

class Tester
{
private Opc.Da.Server m_server = null;
public void Initialize()
{
Opc.URL m_url = new Opc.URL("opcda://localhost/KEPware.KEPServerEx.V4");
m_server = new Opc.Da.Server(new OpcCom.Factory(), m_url) ;
try
{
if (m_server != null)
m_server.Connect();
}
catch (Opc.ConnectFailedException connectionFailure)
{
Console.WriteLine("Connection failure : " + connectionFailure.Message);
} }
Console.WriteLine(String.Format("Connection with {0} successfull", m_server.Name));
}
 
            static void Main(string[] args)
{
Tester tst = new Tester();
tst.Initialize();
Console.ReadLine();
}
}

It all was fairly simple. I discovered that it’s possible to query for the available OPC servers using the ServerEnumerator routine. Below the code to query the available servers and if any is available, connecting to the first one.

class Tester
{
private Opc.Da.Server m_server = null;
private Opc.IDiscovery m_discovery = new OpcCom.ServerEnumerator();

public void Initialize()
{
Opc.Server[] m_servers = m_discovery.GetAvailableServers(Specification.COM_DA_20, "localhost", null);
foreach (Opc.Da.Server server in m_servers)
{
Console.WriteLine(String.Format("server : {0}", server.Name));
}
if (m_servers != null)
m_server[0].Connect();
}
catch (Opc.ConnectFailedException connectionFailure)
{
Console.WriteLine("Connection failure : " + connectionFailure.Message);
}
Console.WriteLine(String.Format("Connection with {0} successfull", m_server.Name));
}
         }

In my next post I’ll try to connect to the server and retrieve some “tag” data from the PLC’s.