|
Key
This line was removed.
This word was removed. This word was added.
This line was added.
|
Comment:
Changes (2)
View Page History

The DSB implements and provides Web service notifications support as defined in the [OASIS WSN specifications|http://www.oasis-open.org/committees/tc_home.php?wg_abbrev=wsn]. It focuses on Web service Base Notification, Web service Topics and Web service Resources.
The current section describes how it is implemented and how it can be used by clients.
h1. High level Architecture
The WSN library used to provide WSN support is described here. This library provides all the basic features needed to provide a simple and efficient notification support in any runtime. The DSB provides two different WSN implementations, the first one is provided by a DSB component (Petals JBI component) and the second one is a native WSN support inside the DSB kernel. !WSN-DSB.png|align=center,border=1!
h2. Sequence description
* *Step 1*: Interested parties subscribes to some notification by sending a WSN Subscribe message to the DSB Notification Producer interface. The information needed is the Topic name and the endpoint reference which will be in charge of receiving WSN notification messages when new messages are posted to the specified topic.
* *Step 1'* : The Notification producer forward the subscribe request(s) to the core engine hosted somewhere in the DSB. When the notification engine receives the subscribe message, it stores all the information needed to contact the subscriber when new messages will be available.
* *Step 2 and 3* : A notification consumer client sends a notifiction message to the Notification Consumer interface. This message is automatically sent to the Notification engine.
* *Step 4* : The notification engine detects that there are some actors which are interested in notifications posted in the given topic. The Notification engine selects an endpoint to send message to. This selection is possible since all the required information has been stored in Step 1' i.e. if the subscriber is a Web service, the notification automatically detects that the notification MUST be sent using the SOAP connector. Based on that deduction, the Notification engine will send the message to right 'protocol binding' component and not directly. This is a key point of the DSB integration: the message will be routed to a DSB service which is able to send the message to the notification subscriber in the right format and protocol i.e. the notification engine is transport and protocol independent.
* *Step 4'* : The Protocol binding forward the notification message to the subscriber.
h1. Implementations
h2. DSB component
A specific DSB component is in charge of the WSN stuff. It embeds the core notification engine and expose interfaces defined in WSN such as:
* *NotificationProducer*: Subscribe and GetCurrentMessage operations support. Allows to Subscribe to notifications i.e. a client send a Subscribe message with an endpoint reference interested in receiving some notifications. The notification the subscriber is interested in are classified into a WSN Topic and is specified in a Subscribe message element.
* *NotificationConsumer*: Notify operation support. The component can receive notification messages through the Notify operation. When the client sends a notify message on a Topic, the notification engine will select all the subscribers which are interested in receiving messages for the given topic and sends notification message to all interested parties. By using the DSB, you do not have to care about the transport to use to send notifications to subscribers, it is up to the DSB to route the message to the right services with the right transport and protocol.
There are some additional interfaces which are exposed and which allows to manage resources such as WS Topics:
* *NotificationProducerResourceProperty*. Resources are used to manage topics. With this interface you can get, modify supported topics, ... Supported Resources calls are:
** TopicSet : Get the list of topics
** FixedTopicSet : check if the topic set if fixed or not
** TopicExpression : Get the topic expression list
** TopicExpressionDialect : Get the list of topic dialects
The endpoints provided by this component are automatically exposed by the DSB at startup and are available by default at:
* [http://localhost:8084/petals/services/NotificationProducerPortService?wsdl]
* [http://localhost:8084/petals/services/NotificationConsumerPortService?wsdl]
This means that all calls to these endpoints will be forwarded to the notification engine, independently to its node location.
h2. DSB service
A WSN engine is also embedded as a core kernel service. Other kernel services are able to use this WSN engine to subscribe to topics, send notifications, ... directly with the Java APIs defined in module _dsb-kernel-pubsubservice_.
Additionally, the WSN engine is exposed by the DSB at startup as Web services and provides the same features listed in the previous section (DSB component) at the following URLs:
* [http://localhost:7600/petals/ws/NotificationProducer]
* [http://localhost:7600/petals/ws/NotificationConsumer]
h2. Java services
The WSN support is not only for external stuff. We can also use the DSB kernel features to register Java classes as listeners and send notifications directly from Java code. It is explained in the extensions sections.
h1. Using notifications
h2. XML Payloads
In this section we describe the payloads of the WSN messages supported by the DSB. If you do not use the Java clients listed below, be sure to manipulate the messages in the right way to retrieve required data.
h3. Subscribe
h3. Unsubscribe
h2. XML clients


{info}It is important to note that in the DSB, the notification engine is a core service a so is automatically exposed as Web service by the SOAP connector. But, even if we only use Web services in this sample, we can also have DSB service clients which directly send subscribe and notify messages to the DSB WSN endpoint itself.{info}
A code sample is available at [https://svn.petalslink.com/svnroot/trunk/research/commons/dsb/samples/dsb-wsn-sample/src/main/java/org/petalslink/dsb/sample/wsn/Main.java]
It uses Web service clients to send subscribe and notify messages; service exposed with the help of CXF to receive notifications; and the DSB as notification broker.
{code:language=java}package org.petalslink.dsb.sample.wsn;
import java.util.concurrent.TimeUnit;
import javax.xml.namespace.QName;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.transform.TransformerException;
import org.petalslink.dsb.commons.service.api.Service;
import org.petalslink.dsb.notification.client.http.HTTPNotificationConsumerClient;
import org.petalslink.dsb.notification.client.http.HTTPNotificationProducerClient;
import org.petalslink.dsb.notification.client.http.HTTPNotificationProducerRPClient;
import org.petalslink.dsb.notification.service.NotificationConsumerService;
import org.petalslink.dsb.soap.CXFExposer;
import org.petalslink.dsb.soap.api.Exposer;
import org.w3c.dom.Document;
import com.ebmwebsourcing.easycommons.xml.XMLHelper;
import com.ebmwebsourcing.wsstar.basenotification.datatypes.api.abstraction.Notify;
import com.ebmwebsourcing.wsstar.basenotification.datatypes.api.abstraction.Subscribe;
import com.ebmwebsourcing.wsstar.basenotification.datatypes.api.abstraction.SubscribeResponse;
import com.ebmwebsourcing.wsstar.basenotification.datatypes.api.utils.WsnbException;
import com.ebmwebsourcing.wsstar.topics.datatypes.api.WstopConstants;
import com.ebmwebsourcing.wsstar.wsnb.services.INotificationConsumer;
import com.ebmwebsourcing.wsstar.wsnb.services.INotificationProducer;
import com.ebmwebsourcing.wsstar.wsnb.services.INotificationProducerRP;
import com.ebmwebsourcing.wsstar.wsnb.services.impl.util.Wsnb4ServUtils;
import com.ebmwebsourcing.wsstar.wsrfbf.services.faults.AbsWSStarFault;
/**
* The notification engine is hosted on some DSB node. Let's subscribe to events
* and send events. We will check if all is working...
*
* @author chamerling
*
*/
public class Main {
/**
* @param args
*/
public static void main(String[] args) {
System.out.println("****** CREATING LOCAL SERVER ******");
// local address which will receive notifications
String address = "http://localhost:8878/petals/services/NotificationConsumerPortService";
// DSB adress to subscribe to
String dsbSubscribe = "http://localhost:8084/petals/services/NotificationConsumerPortService";
// DSB address to send notifications to
String dsbNotify = "http://localhost:8084/petals/services/NotificationProducerPortService";
// the one which will receive notifications
System.out
.println("Creating service which will receive notification messages from the DSB...");
Service server = null;
QName interfaceName = new QName("http://docs.oasis-open.org/wsn/bw-2",
"NotificationConsumer");
QName serviceName = new QName("http://docs.oasis-open.org/wsn/bw-2",
"NotificationConsumerService");
QName endpointName = new QName("http://docs.oasis-open.org/wsn/bw-2",
"NotificationConsumerPort");
// expose the service
INotificationConsumer consumer = new INotificationConsumer() {
public void notify(Notify notify) throws WsnbException {
System.out
.println("Got a notify on HTTP service, this notification comes from the DSB itself...");
Document dom = Wsnb4ServUtils.getWsnbWriter().writeNotifyAsDOM(notify);
System.out.println("==============================");
try {
XMLHelper.writeDocument(dom, System.out);
} catch (TransformerException e) {
}
System.out.println("==============================");
}
};
NotificationConsumerService service = new NotificationConsumerService(interfaceName,
serviceName, endpointName, "NotificationConsumerService.wsdl", address, consumer);
Exposer exposer = new CXFExposer();
try {
server = exposer.expose(service);
server.start();
System.out.println("Local server is started and is ready to receive notifications");
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
}
System.out.println("****** SUBSCRIBE TO NOTIFICATION ******");
// Create the subscription, ie as a client let's send a subscribe
// message with reference to the previously registered endpoint
System.out.println("Subscribing to receive DSB notifications...");
INotificationProducer producerClient = new HTTPNotificationProducerClient(dsbSubscribe);
Subscribe subscribe = loadSubscribe();
try {
SubscribeResponse response = producerClient.subscribe(subscribe);
System.out.println("Got a response from the DSB");
Document dom = Wsnb4ServUtils.getWsnbWriter().writeSubscribeResponseAsDOM(response);
XMLHelper.writeDocument(dom, System.out);
} catch (WsnbException e) {
e.printStackTrace();
} catch (AbsWSStarFault e) {
e.printStackTrace();
} catch (TransformerException e) {
e.printStackTrace();
}
System.out.println("****** SEND NOTIFICATION TO THE DSB ******");
// send a notification to the DSB, since we just subscribed, we should
// receive it back...
System.out.println("Sending a notification to the DSB...");
INotificationConsumer consumerClient = new HTTPNotificationConsumerClient(dsbNotify);
Notify notify = loadNotify();
for (int i = 0; i < 2; i++) {
try {
consumerClient.notify(notify);
} catch (WsnbException e) {
e.printStackTrace();
}
}
try {
System.out.println("Waiting...");
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
}
System.out.println("****** GET RESOURCE PROPERTIES ******");
// getting resources
INotificationProducerRP resourceClient = new HTTPNotificationProducerRPClient(dsbSubscribe);
try {
QName qname = WstopConstants.TOPIC_SET_QNAME;
com.ebmwebsourcing.wsstar.resourceproperties.datatypes.api.abstraction.GetResourcePropertyResponse response = resourceClient
.getResourceProperty(qname);
System.out.println("Get Resource response :");
Document dom = Wsnb4ServUtils.getWsrfrpWriter().writeGetResourcePropertyResponseAsDOM(
response);
XMLHelper.writeDocument(dom, System.out);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("-Bye");
}
/**
* @return
*/
private static Notify loadNotify() {
DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
factory.setNamespaceAware(true);
Document document;
try {
document = factory.newDocumentBuilder().parse(
Main.class.getResourceAsStream("/notify.xml"));
return Wsnb4ServUtils.getWsnbReader().readNotify(document);
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
/**
* @return
*/
private static Subscribe loadSubscribe() {
DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
factory.setNamespaceAware(true);
Document document;
try {
document = factory.newDocumentBuilder().parse(
Main.class.getResourceAsStream("/subscribe.xml"));
return Wsnb4ServUtils.getWsnbReader().readSubscribe(document);
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}{code}
h2. Simple HTTP Clients
There are also some simple Java clients available which allows the developer to bypass the XML manipulations and directly access to the useful information such as subscription ID, topic, EPR...:
{code:language=java}package org.petalslink.dsb.notification.client.http;
import javax.xml.namespace.QName;
import junit.framework.TestCase;
import org.petalslink.dsb.notification.client.http.simple.HTTPConsumerClient;
import org.petalslink.dsb.notification.client.http.simple.HTTPProducerClient;
import org.petalslink.dsb.notification.client.http.simple.HTTPSubscriptionManagerClient;
import org.w3c.dom.Document;
import com.ebmwebsourcing.easycommons.xml.XMLHelper;
import com.ebmwebsourcing.wsstar.basefaults.datatypes.impl.impl.WsrfbfModelFactoryImpl;
import com.ebmwebsourcing.wsstar.basenotification.datatypes.impl.impl.WsnbModelFactoryImpl;
import com.ebmwebsourcing.wsstar.resource.datatypes.impl.impl.WsrfrModelFactoryImpl;
import com.ebmwebsourcing.wsstar.resourcelifetime.datatypes.impl.impl.WsrfrlModelFactoryImpl;
import com.ebmwebsourcing.wsstar.resourceproperties.datatypes.impl.impl.WsrfrpModelFactoryImpl;
import com.ebmwebsourcing.wsstar.topics.datatypes.impl.impl.WstopModelFactoryImpl;
import com.ebmwebsourcing.wsstar.wsnb.services.impl.util.Wsnb4ServUtils;
/**
* @author chamerling
*
*/
public class DSBTest extends TestCase {
public void testSubscribeNotifyUnsubscribe() throws Exception {
Wsnb4ServUtils.initModelFactories(new WsrfbfModelFactoryImpl(),
new WsrfrModelFactoryImpl(), new WsrfrlModelFactoryImpl(),
new WsrfrpModelFactoryImpl(), new WstopModelFactoryImpl(),
new WsnbModelFactoryImpl());
String dsb = "http://localhost:8084/petals/services/NotificationProducerPortService";
String subscriber = "http://localhost:9000/services/RawService";
// subscribe. Notifications will be sent to subscriber endpoint.
System.out.println("Subscribe");
HTTPProducerClient producerClient = new HTTPProducerClient(dsb);
String namespaceURI = "http://streams.event-processing.org/ids/";
String localPart = "NiceTempStream";
String prefix = "s";
QName topic = new QName(namespaceURI, localPart, prefix);
String UUID = producerClient.subscribe(topic, subscriber);
System.out.println("UUID : " + UUID);
// notify directly to the DSB. Up to the DSB to route messages to the
// right subscribers
System.out.println("Notify...");
Document payload = XMLHelper.createDocumentFromString("<foo>123456789</foo>");
HTTPConsumerClient consumerClient = new HTTPConsumerClient(
"http://localhost:8084/petals/services/NotificationConsumerPortService");
consumerClient.notify(payload, topic);
// Unsubscribe. We need the original subscription UUID!
System.out.println("Unsubscribe...");
HTTPSubscriptionManagerClient subscriptionManagerClient = new HTTPSubscriptionManagerClient(
dsb);
subscriptionManagerClient.unsubscribe(UUID);
// Notify on the same topic : Notifications are no more delivered to the
// original subscriber.
System.out.println("Notify...");
payload = XMLHelper.createDocumentFromString("<foo>AFTER</foo>");
for (int i = 0; i < 10; i++) {
System.out.println("Notify #" + i);
consumerClient.notify(payload, topic);
Thread.sleep(2000);
}
}
}{code}
The HTTP clients are available in the Maven dependency org.petalslink.dsb:dsb-notification-httpclient.
{info}In order to unsubscribe, you need to give the UUID returned by the original subscribe request. There is no way for now to retrieve these UUIDs from any API, so be sure to manage them on the client side.{info}
h1. Configuration


The notification configuration files are located under _$DSB/conf/_ folder. You can configure kernel and business services level from these configuration files.
h2. conf/topics/*-topicset.xml
Describes the topicsets supported by the notification engine. For example:
{code:language=xml}<?xml version="1.0" encoding="utf-8"?>
<wstop:TopicSet xmlns:wstop="http://docs.oasis-open.org/wsn/t-1">
<!-- DSB topics -->
<tns:DSBTopic xmlns:tns="http://www.petalslink.org/resources/event/1.0"
wstop:topic="true" />
<tns:MonitoringTopic xmlns:tns="http://www.petalslink.org/resources/event/1.0"
wstop:topic="true" />
<tns:CreationResourcesTopic
xmlns:tns="http://www.petalslink.org/resources/event/1.0" wstop:topic="true" />
</wstop:TopicSet>{code}
You can configure topic sets for :
* kernel service in the kernel-topicset.xml file
* business services in the business-topicset.xml file
h2. conf/notification.cfg
The default configuration file for the notification engine. Provides a way to define the engine internal endpoint/interface/service values. For example:
{code}#Configuration file for the notification engine
endpoint=Endpoint
interface={http://dsb.petalslink.com/notification}NotificationInterface
service={http://dsb.petalslink.com/notification}NotificationService
#supported-topics=topicA,topicB,topicC,DSBTopic{code}
{info}You do not have to change this file{info}
h2. conf/subscribers.cfg
This file allows to create subscribers at DSB startup. All the subscribers defined in this file will be automatically added as subscribers in the notification engine and so will receive notifications when some notifications are published in the associated topics.
For example:
{code}subscriber0.consumerReference=http://localhost:9000/services/RawService
subscriber0.topicName=DSBTopic
subscriber0.topicURI=http://www.petalslink.org/dsb/topicsns/
subscriber0.topicPrefix=dsb{code}
In the previous snippet, the subscriber0 will receive notifications on _[http://localhost:9000/services/RawService_ when] a message is published in the topic DSBTopic (\+ topic URI and Prefix).
{info}Note that you can add several subscribers using the same format, just change the prefix (subscriber0) in order to be unique in the file.{info}
{info}You can use CSV values in the consumerReference value if these subscribers need to subscribe to the same topic. {info}
h2. conf/consumer.cfg
This file allows the DSB to automatically subscribes to notifications at startup and so to receive notifications when they are published in the associated topics. Note that it is also possible to subscribe on behalf of others just by setting a consumerReference which is not the DSB.
{code}subscriber0.consumerReference=http://localhost:9000/services/RawService111
# Subscribe to who? This is where the subscribe is sent...
subscriber0.producerReference=http://localhost:9000/services/RawService
# Topic values definition : Which topic to subscribe to...
subscriber0.topicName=DSBTopic
subscriber0.topicURI=http://www.petalslink.org/dsb/topicsns/
subscriber0.topicPrefix=dsb{code}
With such configuration, a subscribe message will be sent to [http://localhost:9000/services/RawService] and the consumer reference (the one which will receive notifications) will be [http://localhost:9000/services/RawService111].