Friday, April 2, 2010

Using Event Broker in WSO2 Carbon to generate Events

Following code/configuration show what are required to make use of the event broker in carbon to generate events. In the following example we are looking at how to generate an email notification.

1). Declarative Service Component to obtain the service URL.

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.engine.AxisConfiguration;
import org.apache.axis2.engine.ListenerManager;
import org.apache.axis2.addressing.EndpointReference;
import org.apache.axis2.AxisFault;
import org.apache.log4j.*;
import org.osgi.service.component.ComponentContext;
import org.osgi.framework.BundleContext;
import org.wso2.carbon.utils.ConfigurationContextService;
import org.wso2.carbon.utils.NetworkUtils;
import org.wso2.carbon.eventing.broker.services.EventBrokerService;

import java.net.SocketException;

/**
* @scr.component name="org.wso2.carbon.health.service" immediate="true"
* @scr.reference name="configuration.context.service"
* interface="org.wso2.carbon.utils.ConfigurationContextService" cardinality="1..1"
* policy="dynamic" bind="setConfigurationContextService" unbind="unsetConfigurationContextService"
* @scr.reference name="listener.manager.service"
* interface="org.apache.axis2.engine.ListenerManager" cardinality="0..1" policy="dynamic"
* bind="setListenerManager" unbind="unsetListenerManager"
* @scr.reference name="eventbroker.service"
* interface="org.wso2.carbon.eventing.broker.services.EventBrokerService"
* cardinality="1..1" policy="dynamic" target="(name=HealthMonitorEventBroker)"
* bind="setHealthMonitorEventBrokerService" unbind="unsetHealthMonitorEventBrokerService"
*/
public class HealthMonitorEventingServiceComponent {
private static Log log = LogFactory.getLog(HealthMonitorEventingServiceComponent.class);

private boolean configurationDone = false;

private ConfigurationContextService configurationContextService = null;

private ListenerManager listenerManager = null;

private boolean initialized = false;

private static EventBrokerService healthMonitorEventBrokerService = null;

private String endpoint = null;

private static BundleContext bundleContext = null;

static Logger logger = Logger.getLogger(HealthMonitorEventingServiceComponent.class);

Logger rootLogger = LogManager.getRootLogger();

protected void activate(ComponentContext context) {
bundleContext = context.getBundleContext();
}

public static BundleContext getBundleContext() {
return bundleContext;
}

private void initializeAppender() {
if (listenerManager == null || healthMonitorEventBrokerService == null) {
return;
}
System.out.println("Carbon Health Monitor started!");

rootLogger.addAppender(new JiraAppender());
log.info("Jira appender added to the root logger");
// Setting level of root logger
//rootLogger.setLevel(Level.DEBUG);

addAppenders();
rootLogger.error("This log message is used to trigger the JiraAppender");
}

public void addAppenders() {
SimpleLayout layout = new SimpleLayout();
FileAppender appender = null;
try {
appender = new FileAppender(layout, "/tmp/output1.txt", false);
} catch (Exception e) {
logger.error(e);
}

rootLogger.addAppender(appender);
logger.debug("DEBUG message 2");
logger.debug("DEBUG message 3");
log.info("Appender added to the logger");
}

protected void deactivate(ComponentContext context) {
}

protected void setConfigurationContextService(ConfigurationContextService configurationContextService) {
this.configurationContextService = configurationContextService;
}

protected void unsetConfigurationContextService(ConfigurationContextService configurationContextService) {
}

protected void setHealthMonitorEventBrokerService(EventBrokerService healthMonitorEventBrokerService) {
this.healthMonitorEventBrokerService = healthMonitorEventBrokerService;
initializeAppender();
}

protected void unsetHealthMonitorEventBrokerService(EventBrokerService healthMonitorEventBrokerService) {
this.healthMonitorEventBrokerService = null;
}

protected void setListenerManager(ListenerManager listenerManager) {
this.listenerManager = listenerManager;
initialize();
initializeAppender();
}

protected void unsetListenerManager(ListenerManager listenerManager) {
this.listenerManager = null;
}

private void initialize() {
ConfigurationContext serverConfigurationContext = configurationContextService.getServerConfigContext();
if (!configurationDone && listenerManager != null) {
String host = null;
try {
host = NetworkUtils.getLocalHostname();
} catch (SocketException e) { }
if (serverConfigurationContext != null) {
AxisConfiguration config = serverConfigurationContext.getAxisConfiguration();
if (config != null && config.getTransportIn("http") != null &&
config.getTransportIn("http").getReceiver() != null) {
try {
EndpointReference[] eprArray = config.getTransportIn("http")
.getReceiver().getEPRsForService("HealthMonitorEventingService",
host);
if (eprArray != null && eprArray[0] != null) {
endpoint = eprArray[0].getAddress();
}
} catch (AxisFault e) { }
}
}
configurationDone = true;
}
}

public static EventBrokerService getHealthMonitorEventBrokerService() {
return healthMonitorEventBrokerService;
}
}


2). Generating Events.

OMFactory fac = OMAbstractFactory.getOMFactory();
Event<OMElement> event = new Event<OMElement>(zipElement);
event.setTopic("/mail-sending/mail");
OMElement topic = EventBrokerUtils.buildTopic(fac,event);
EventBrokerUtils.generateEvent(event.getMessage(), topic, HealthMonitorEventingServiceComponent.getHealthMonitorEventBrokerService());


3). The services.xml for the service.

<serviceGroup>
<service name="HealthMonitorEventingService" targetNamespace="http://eventing.registry.carbon.wso2.org">
<transports>
<transport>http</transport>
</transports>
<description>
Health Monitor Eventing Service
</description>
<messageReceivers>
<messageReceiver mep="http://www.w3.org/2004/08/wsdl/in-only"
class="org.wso2.carbon.eventing.broker.receivers.CarbonEventingMessageReceiver"/>
<messageReceiver mep="http://www.w3.org/2004/08/wsdl/in-out"
class="org.wso2.carbon.eventing.broker.receivers.CarbonEventingMessageReceiver"/>
</messageReceivers>
<parameter name="enableSubscribe" locked="true">true</parameter>
<operation name="Subscribe" mep="http://www.w3.org/ns/wsdl/in-out">
<actionMapping>http://schemas.xmlsoap.org/ws/2004/08/eventing/Subscribe</actionMapping>
</operation>
<parameter name="enableRenew" locked="true">false</parameter>
<!--operation name="Renew" mep="http://www.w3.org/ns/wsdl/in-out">
<actionMapping>http://schemas.xmlsoap.org/ws/2004/08/eventing/Renew</actionMapping>
</operation-->
<parameter name="enableUnsubscribe" locked="true">false</parameter>
<!--operation name="Unsubscribe" mep="http://www.w3.org/ns/wsdl/in-out">
<actionMapping>http://schemas.xmlsoap.org/ws/2004/08/eventing/Unsubscribe</actionMapping>
</operation-->
<parameter name="enableGetStatus" locked="true">false</parameter>
<!--operation name="GetStatus" mep="http://www.w3.org/ns/wsdl/in-out">
<actionMapping>http://schemas.xmlsoap.org/ws/2004/08/eventing/GetStatus</actionMapping>
</operation-->
<operation name="Publish" mep="http://www.w3.org/ns/wsdl/in-out">
<actionMapping>http://ws.apache.org/ws/2007/05/eventing-extended/Publish</actionMapping>
</operation>

<parameter name="eventBrokerInstance" locked="true">HealthMonitorEventBroker</parameter>
</service>
<parameter name="hiddenService" locked="true">true</parameter>
</serviceGroup>


4). Event Broker configuration

<eventBroker xmlns="http://wso2.org/ns/2009/09/eventing">
<eventStream name="HealthMonitorEventBroker">
<subscriptionManager class="org.wso2.carbon.eventing.impl.EmbeddedRegistryBasedSubscriptionManager">
<parameter name="topicHeaderName">topic</parameter>
<parameter name="topicHeaderNS">http://wso2.org/ns/2009/09/eventing/notify</parameter>
<parameter name="subscriptionStoragePath">/carbon/eventing/registry</parameter>
</subscriptionManager>
<!-- Uncomment to to RemoteRegistryBasedSubscriptionManager -->
<!--subscriptionManager class="org.wso2.carbon.eventing.impl.RemoteRegistryBasedSubscriptionManager">
<parameter name="topicHeaderName">topic</parameter>
<parameter name="topicHeaderNS">http://wso2.org/ns/2009/09/eventing/notify</parameter>
<parameter name="subscriptionStoragePath">/carbon/eventing/registry</parameter>
<parameter name="registryURL">http://remote-ip:port/registry/</parameter>
<parameter name="username">username</parameter>
<parameter name="password">password</parameter>
</subscriptionManager-->
<!--<eventDispatcher>org.wso2.carbon.registry.eventing.RegistryEventDispatcher</eventDispatcher>-->
<eventDispatcher>org.wso2.carbon.eventing.broker.CarbonEventDispatcher</eventDispatcher>
<notificationManager class="org.wso2.carbon.eventing.broker.CarbonNotificationManager">
<parameter name="minSpareThreads">25</parameter>
<parameter name="maxThreads">150</parameter>
<parameter name="maxQueuedRequests">100</parameter>
<!-- Keep Alive time in nano seconds -->
<parameter name="keepAliveTime">1000</parameter>
<!-- Specify path of security policy file to enable security. -->
<!--parameter name="securityPolicy">policypath</parameter-->

<!-- Parameters specific to the Registry Event Broker configuration -->
<!-- Set this as false to disable displaying of registry URL in notification e-mails -->
<parameter name="showRegistryURL" locked="true">true</parameter>
</notificationManager>
</eventStream>
</eventBroker>


I have used the above method to generate email notifications within the Carbon Health Monitor component which I developed. More info on Health Monitor component will be discussed later.

No comments: