In this part, we describe how notifications can be used with SOAP client and services. As an example, just think that all external actors are SOAP ones in the architecture figure above. This really simple sample is composed of several external actors : Some WS notification subscribers which will receive notifications they are interested in; A notification producer which will be in charge of sending notifications to a topic and the DSB which will route notification messages to the right notification subscribers.
{info}It is important to note that in the DSB, the notification engine is a core service a so is automatically exposed as Web service by the SOAP connector. But, even if we only use Web services in this sample, we can also have DSB service clients which directly send subscribe and notify messages to the DSB WSN endpoint itself.{info}
A code sample is available at [https://svn.petalslink.com/svnroot/trunk/research/commons/dsb/samples/dsb-wsn-sample/src/main/java/org/petalslink/dsb/sample/wsn/Main.java]
It uses Web service clients to send subscribe and notify messages; service exposed with the help of CXF to receive notifications; and the DSB as notification broker.
{code:language=java}package org.petalslink.dsb.sample.wsn;
import java.util.concurrent.TimeUnit;
import javax.xml.namespace.QName;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.transform.TransformerException;
import org.petalslink.dsb.commons.service.api.Service;
import org.petalslink.dsb.notification.client.http.HTTPNotificationConsumerClient;
import org.petalslink.dsb.notification.client.http.HTTPNotificationProducerClient;
import org.petalslink.dsb.notification.client.http.HTTPNotificationProducerRPClient;
import org.petalslink.dsb.notification.service.NotificationConsumerService;
import org.petalslink.dsb.soap.CXFExposer;
import org.petalslink.dsb.soap.api.Exposer;
import org.w3c.dom.Document;
import com.ebmwebsourcing.easycommons.xml.XMLHelper;
import com.ebmwebsourcing.wsstar.basenotification.datatypes.api.abstraction.Notify;
import com.ebmwebsourcing.wsstar.basenotification.datatypes.api.abstraction.Subscribe;
import com.ebmwebsourcing.wsstar.basenotification.datatypes.api.abstraction.SubscribeResponse;
import com.ebmwebsourcing.wsstar.basenotification.datatypes.api.utils.WsnbException;
import com.ebmwebsourcing.wsstar.topics.datatypes.api.WstopConstants;
import com.ebmwebsourcing.wsstar.wsnb.services.INotificationConsumer;
import com.ebmwebsourcing.wsstar.wsnb.services.INotificationProducer;
import com.ebmwebsourcing.wsstar.wsnb.services.INotificationProducerRP;
import com.ebmwebsourcing.wsstar.wsnb.services.impl.util.Wsnb4ServUtils;
import com.ebmwebsourcing.wsstar.wsrfbf.services.faults.AbsWSStarFault;
/**
* The notification engine is hosted on some DSB node. Let's subscribe to events
* and send events. We will check if all is working...
*
* @author chamerling
*
*/
public class Main {
/**
* @param args
*/
public static void main(String[] args) {
System.out.println("****** CREATING LOCAL SERVER ******");
// local address which will receive notifications
String address = "http://localhost:8878/petals/services/NotificationConsumerPortService";
// DSB adress to subscribe to
String dsbSubscribe = "http://localhost:8084/petals/services/NotificationConsumerPortService";
// DSB address to send notifications to
String dsbNotify = "http://localhost:8084/petals/services/NotificationProducerPortService";
// the one which will receive notifications
System.out
.println("Creating service which will receive notification messages from the DSB...");
Service server = null;
QName interfaceName = new QName("http://docs.oasis-open.org/wsn/bw-2",
"NotificationConsumer");
QName serviceName = new QName("http://docs.oasis-open.org/wsn/bw-2",
"NotificationConsumerService");
QName endpointName = new QName("http://docs.oasis-open.org/wsn/bw-2",
"NotificationConsumerPort");
// expose the service
INotificationConsumer consumer = new INotificationConsumer() {
public void notify(Notify notify) throws WsnbException {
System.out
.println("Got a notify on HTTP service, this notification comes from the DSB itself...");
Document dom = Wsnb4ServUtils.getWsnbWriter().writeNotifyAsDOM(notify);
System.out.println("==============================");
try {
XMLHelper.writeDocument(dom, System.out);
} catch (TransformerException e) {
}
System.out.println("==============================");
}
};
NotificationConsumerService service = new NotificationConsumerService(interfaceName,
serviceName, endpointName, "NotificationConsumerService.wsdl", address, consumer);
Exposer exposer = new CXFExposer();
try {
server = exposer.expose(service);
server.start();
System.out.println("Local server is started and is ready to receive notifications");
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
}
System.out.println("****** SUBSCRIBE TO NOTIFICATION ******");
// Create the subscription, ie as a client let's send a subscribe
// message with reference to the previously registered endpoint
System.out.println("Subscribing to receive DSB notifications...");
INotificationProducer producerClient = new HTTPNotificationProducerClient(dsbSubscribe);
Subscribe subscribe = loadSubscribe();
try {
SubscribeResponse response = producerClient.subscribe(subscribe);
System.out.println("Got a response from the DSB");
Document dom = Wsnb4ServUtils.getWsnbWriter().writeSubscribeResponseAsDOM(response);
XMLHelper.writeDocument(dom, System.out);
} catch (WsnbException e) {
e.printStackTrace();
} catch (AbsWSStarFault e) {
e.printStackTrace();
} catch (TransformerException e) {
e.printStackTrace();
}
System.out.println("****** SEND NOTIFICATION TO THE DSB ******");
// send a notification to the DSB, since we just subscribed, we should
// receive it back...
System.out.println("Sending a notification to the DSB...");
INotificationConsumer consumerClient = new HTTPNotificationConsumerClient(dsbNotify);
Notify notify = loadNotify();
for (int i = 0; i < 2; i++) {
try {
consumerClient.notify(notify);
} catch (WsnbException e) {
e.printStackTrace();
}
}
try {
System.out.println("Waiting...");
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
}
System.out.println("****** GET RESOURCE PROPERTIES ******");
// getting resources
INotificationProducerRP resourceClient = new HTTPNotificationProducerRPClient(dsbSubscribe);
try {
QName qname = WstopConstants.TOPIC_SET_QNAME;
com.ebmwebsourcing.wsstar.resourceproperties.datatypes.api.abstraction.GetResourcePropertyResponse response = resourceClient
.getResourceProperty(qname);
System.out.println("Get Resource response :");
Document dom = Wsnb4ServUtils.getWsrfrpWriter().writeGetResourcePropertyResponseAsDOM(
response);
XMLHelper.writeDocument(dom, System.out);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("-Bye");
}
/**
* @return
*/
private static Notify loadNotify() {
DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
factory.setNamespaceAware(true);
Document document;
try {
document = factory.newDocumentBuilder().parse(
Main.class.getResourceAsStream("/notify.xml"));
return Wsnb4ServUtils.getWsnbReader().readNotify(document);
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
/**
* @return
*/
private static Subscribe loadSubscribe() {
DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
factory.setNamespaceAware(true);
Document document;
try {
document = factory.newDocumentBuilder().parse(
Main.class.getResourceAsStream("/subscribe.xml"));
return Wsnb4ServUtils.getWsnbReader().readSubscribe(document);
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}{code}
h2. Simple HTTP Clients
There are also some simple Java clients available which allows the developer to bypass the XML manipulations and directly access to the useful information such as subscription ID, topic, EPR...:
{code:language=java}package org.petalslink.dsb.notification.client.http;
import javax.xml.namespace.QName;
import junit.framework.TestCase;
import org.petalslink.dsb.notification.client.http.simple.HTTPConsumerClient;
import org.petalslink.dsb.notification.client.http.simple.HTTPProducerClient;
import org.petalslink.dsb.notification.client.http.simple.HTTPSubscriptionManagerClient;
import org.w3c.dom.Document;
import com.ebmwebsourcing.easycommons.xml.XMLHelper;
import com.ebmwebsourcing.wsstar.basefaults.datatypes.impl.impl.WsrfbfModelFactoryImpl;
import com.ebmwebsourcing.wsstar.basenotification.datatypes.impl.impl.WsnbModelFactoryImpl;
import com.ebmwebsourcing.wsstar.resource.datatypes.impl.impl.WsrfrModelFactoryImpl;
import com.ebmwebsourcing.wsstar.resourcelifetime.datatypes.impl.impl.WsrfrlModelFactoryImpl;
import com.ebmwebsourcing.wsstar.resourceproperties.datatypes.impl.impl.WsrfrpModelFactoryImpl;
import com.ebmwebsourcing.wsstar.topics.datatypes.impl.impl.WstopModelFactoryImpl;
import com.ebmwebsourcing.wsstar.wsnb.services.impl.util.Wsnb4ServUtils;
/**
* @author chamerling
*
*/
public class DSBTest extends TestCase {
public void testSubscribeNotifyUnsubscribe() throws Exception {
Wsnb4ServUtils.initModelFactories(new WsrfbfModelFactoryImpl(),
new WsrfrModelFactoryImpl(), new WsrfrlModelFactoryImpl(),
new WsrfrpModelFactoryImpl(), new WstopModelFactoryImpl(),
new WsnbModelFactoryImpl());
String dsb = "http://localhost:8084/petals/services/NotificationProducerPortService";
String subscriber = "http://localhost:9000/services/RawService";
// subscribe. Notifications will be sent to subscriber endpoint.
System.out.println("Subscribe");
HTTPProducerClient producerClient = new HTTPProducerClient(dsb);
String namespaceURI = "http://streams.event-processing.org/ids/";
String localPart = "NiceTempStream";
String prefix = "s";
QName topic = new QName(namespaceURI, localPart, prefix);
String UUID = producerClient.subscribe(topic, subscriber);
System.out.println("UUID : " + UUID);
// notify directly to the DSB. Up to the DSB to route messages to the
// right subscribers
System.out.println("Notify...");
Document payload = XMLHelper.createDocumentFromString("<foo>123456789</foo>");
HTTPConsumerClient consumerClient = new HTTPConsumerClient(
"http://localhost:8084/petals/services/NotificationConsumerPortService");
consumerClient.notify(payload, topic);
// Unsubscribe. We need the original subscription UUID!
System.out.println("Unsubscribe...");
HTTPSubscriptionManagerClient subscriptionManagerClient = new HTTPSubscriptionManagerClient(
dsb);
subscriptionManagerClient.unsubscribe(UUID);
// Notify on the same topic : Notifications are no more delivered to the
// original subscriber.
System.out.println("Notify...");
payload = XMLHelper.createDocumentFromString("<foo>AFTER</foo>");
for (int i = 0; i < 10; i++) {
System.out.println("Notify #" + i);
consumerClient.notify(payload, topic);
Thread.sleep(2000);
}
}
}{code}
The HTTP clients are available in the Maven dependency org.petalslink.dsb:dsb-notification-httpclient.
{info}In order to unsubscribe, you need to give the UUID returned by the original subscribe request. There is no way for now to retrieve these UUIDs from any API, so be sure to manage them on the client side.{info}
h1. Configuration
The notification configuration files are located under _$DSB/conf/_ folder. You can configure kernel and business services level from these configuration files.
h2. conf/topics/*-topicset.xml
Describes the topicsets supported by the notification engine. For example:
{code:language=xml}<?xml version="1.0" encoding="utf-8"?>
<wstop:TopicSet xmlns:wstop="http://docs.oasis-open.org/wsn/t-1">
<!-- DSB topics -->
<tns:DSBTopic xmlns:tns="http://www.petalslink.org/resources/event/1.0"
wstop:topic="true" />
<tns:MonitoringTopic xmlns:tns="http://www.petalslink.org/resources/event/1.0"
wstop:topic="true" />
<tns:CreationResourcesTopic
xmlns:tns="http://www.petalslink.org/resources/event/1.0" wstop:topic="true" />
</wstop:TopicSet>{code}
You can configure topic sets for :
* kernel service in the kernel-topicset.xml file
* business services in the business-topicset.xml file
h2. conf/notification.cfg
The default configuration file for the notification engine. Provides a way to define the engine internal endpoint/interface/service values. For example:
{code}#Configuration file for the notification engine
endpoint=Endpoint
interface={http://dsb.petalslink.com/notification}NotificationInterface
service={http://dsb.petalslink.com/notification}NotificationService
#supported-topics=topicA,topicB,topicC,DSBTopic{code}
{info}You do not have to change this file{info}
h2. conf/subscribers.cfg
This file allows to create subscribers at DSB startup. All the subscribers defined in this file will be automatically added as subscribers in the notification engine and so will receive notifications when some notifications are published in the associated topics.
For example:
{code}subscriber0.consumerReference=http://localhost:9000/services/RawService
subscriber0.topicName=DSBTopic
subscriber0.topicURI=http://www.petalslink.org/dsb/topicsns/
subscriber0.topicPrefix=dsb{code}
In the previous snippet, the subscriber0 will receive notifications on _[http://localhost:9000/services/RawService_ when] a message is published in the topic DSBTopic (\+ topic URI and Prefix).
{info}Note that you can add several subscribers using the same format, just change the prefix (subscriber0) in order to be unique in the file.{info}
{info}You can use CSV values in the consumerReference value if these subscribers need to subscribe to the same topic. {info}
h2. conf/consumer.cfg
This file allows the DSB to automatically subscribes to notifications at startup and so to receive notifications when they are published in the associated topics. Note that it is also possible to subscribe on behalf of others just by setting a consumerReference which is not the DSB.
{code}subscriber0.consumerReference=http://localhost:9000/services/RawService111
# Subscribe to who? This is where the subscribe is sent...
subscriber0.producerReference=http://localhost:9000/services/RawService
# Topic values definition : Which topic to subscribe to...
subscriber0.topicName=DSBTopic
subscriber0.topicURI=http://www.petalslink.org/dsb/topicsns/
subscriber0.topicPrefix=dsb{code}
With such configuration, a subscribe message will be sent to [http://localhost:9000/services/RawService] and the consumer reference (the one which will receive notifications) will be [http://localhost:9000/services/RawService111].