JMS - Java Message Service



JMS Message Driven Bean (MDB) for Weblogic..

An MDB echo-bean application... Client sends message to outqueue - where MDB waits to take it - puts it on outque - where it is finally received by another client (to complete the circle).

Need to add to config.xml :


What you need to add to config.xml in your Weblogic installation to get rolling..
config.xml

...
<JMSQueue JNDIName="dk.topsecurity.inqueue" Name="topsin" StoreEnabled="default"/> <JMSQueue JNDIName="dk.topsecurity.outqueue" Name="topsout" StoreEnabled="default"/> ...



This time, for a change, we don't use a "jndi.properties" in your local directory - but instead parses parameters along to the different processes...

The stuff that make up the MDB on the server - starting out with the code:


The MDB...
ServiceBean.java


package dk.topsecurity;

import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import javax.ejb.CreateException;
import javax.ejb.MessageDrivenBean;
import javax.ejb.MessageDrivenContext;
import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import weblogic.rmi.RemoteException;

public class ServiceBean implements MessageDrivenBean, MessageListener {

  private String OUTQUEUE_NAME = "dk.topsecurity.outqueue";
  private static final boolean VERBOSE = true;
  private MessageDrivenContext m_context;
  private Destination destination;
  private ConnectionFactory factory;

  // You might also consider using WebLogic's log service
  private void log(String s) {
    if (VERBOSE) System.out.println(s);
  } 
  
  /**
   * These methods are required by the EJB Specification,
   * but are not used by this example.
   */
  public void ejbActivate() {
    log("ejbActivate called -");
  }
  public void ejbRemove() {
    log("ejbRemove called -");
  }
  public void ejbPassivate() {
    log("ejbPassivate called -");
  }

  /**
   * Sets the session context.
   * @param ctx MessageDrivenContext Context for session
   */
  public void setMessageDrivenContext(MessageDrivenContext ctx) {
    log("setMessageDrivenContext called");
    m_context = ctx;
  }

  /**
   * This method corresponds to the create method in the home interface.
   */
  public void ejbCreate () throws CreateException {
    log("ejbCreate called -");

    Context context = null;
    try {
      context = new InitialContext();
    } catch (NamingException ne) {
      ne.printStackTrace();
      throw new CreateException(ne.getMessage());
    }

    String connectionFactoryName = "weblogic.jms.ConnectionFactory";
    try {
      factory = (ConnectionFactory)context.lookup(connectionFactoryName);
    } catch (Exception e) {
      e.printStackTrace();
      throw new CreateException(e.getMessage());
    }

    try {
      destination = (Destination)context.lookup(OUTQUEUE_NAME);
    } catch (Exception e) {
      e.printStackTrace();
      throw new CreateException(e.getMessage());
    }
  }

  /**
   * Retrieve the value of the Message
   *
   */
  public void onMessage(Message msg) {
    try {  
	  if (msg instanceof TextMessage) {
        String reply = ((TextMessage) msg).getText();
        log("MDB Text: " + reply);
        log(reply);
        sendText((QueueConnectionFactory)factory, (Queue)destination, reply);
	  }
	  else if (msg instanceof ObjectMessage) {
        ObjectMessage om = (ObjectMessage) msg;
        Object rec = om.getObject();
        log("MDB Object: " + rec);
        sendObject((QueueConnectionFactory)factory, (Queue)destination, 
          (Serializable)rec);
	  }
	  else {
        log("MDB: received an unsupported message type...discarded.");
      }
    }
    catch(Exception ex) {
      ex.printStackTrace();
    }
  }

  private static void sendObject(QueueConnectionFactory qfactory, Queue queue,
    Serializable o) throws InvocationTargetException
  {
    QueueConnection qconnection = null;
    QueueSession qsession = null;
	QueueSender qsender = null;
    try {
      qconnection = qfactory.createQueueConnection();
      qsession = qconnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
	  qsender = qsession.createSender(queue);

      ObjectMessage omessage = qsession.createObjectMessage();
      omessage.setObject(o);
      qconnection.start();
      qsender.send(omessage);

	} catch (JMSException jmse) {
      throw new InvocationTargetException(jmse, "Could not send message");
    } finally {
      try { 
        if (qsender != null) qsender.close(); 
      } catch (JMSException ignore) {}
      try {
        if (qsession != null) qsession.close();
      } catch (JMSException ignore) {}
      try {
        if (qconnection != null) qconnection.close();
      } catch (JMSException ignore) {}
    }
  }

  private static void sendText(QueueConnectionFactory qfactory, Queue queue, String t)
    throws InvocationTargetException
  {
    QueueConnection qconnection = null;
    QueueSession qsession = null;
	QueueSender qsender = null;
    try {
      qconnection = qfactory.createQueueConnection();
      qsession = qconnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
	  qsender = qsession.createSender(queue);

      TextMessage tmessage = qsession.createTextMessage();
      tmessage.setText(t);
      qconnection.start();
      qsender.send(tmessage);

	} catch (JMSException jmse) {
      throw new InvocationTargetException(jmse, "Could not send message");
    } finally {
      try { 
        if (qsender != null) qsender.close(); 
      } catch (JMSException ignore) {}
      try {
        if (qsession != null) qsession.close();
      } catch (JMSException ignore) {}
      try {
        if (qconnection != null) qconnection.close();
      } catch (JMSException ignore) {}
    }
  }
}




It has a home interface..


The MDB home interface...
ServiceHome.java


package dk.topsecurity;

import java.rmi.RemoteException;
import javax.ejb.CreateException;
import javax.ejb.EJBHome;

public interface ServiceHome extends EJBHome{
  ServiceInterface create() throws CreateException, RemoteException;
} 




Followed by EJBObject interface..


EJBObject interface...
ServiceInterface.java


package dk.topsecurity;

import java.rmi.RemoteException;

public interface ServiceInterface extends javax.ejb.EJBObject{
} 




To the application setup... deployment descriptor and stuff..


Application descriptor for the META-INF/ in the ear-file...
application.xml


<?xml version="1.0"  encoding="UTF-8"?>

<!DOCTYPE application PUBLIC '-//Sun Microsystems, Inc.//DTD J2EE Application 1.2//EN' 'http://java.sun.com/j2ee/dtds/application_1_2.dtd'>

<application>
  <display-name>webservice1</display-name>
  <description>An archived ear containing an archived jar</description>
  <module>
     <ejb>TopsecurityMDB.jar</ejb>  
  </module>
  <module>
     <web>
       <web-uri>TopsecurityMDB.war</web-uri>
       <context-root>/TopsecurityMDB</context-root>
     </web>
  </module>
</application>




Now to the bean setup...


ejb-jar.xml for the bean-jar inside the ear-file
ejb-jar.xml


<?xml version="1.0"?>
<!DOCTYPE ejb-jar PUBLIC "-//Sun Microsystems, Inc.//DTD Enterprise JavaBeans 2.0//EN" "http://java.sun.com/dtd/ejb-jar_2_0.dtd">

<ejb-jar>
 <enterprise-beans>

    <message-driven>
      <ejb-name>TopsecurityMDB</ejb-name>
      <ejb-class>dk.topsecurity.ServiceBean</ejb-class>
      <transaction-type>Container</transaction-type>
      <message-driven-destination>
        <destination-type>javax.jms.Queue</destination-type>
      </message-driven-destination>
    </message-driven>
 </enterprise-beans>
</ejb-jar>




and


weblogic-ejb-jar.xml for the bean-jar inside the ear-file
weblogic-ejb-jar.xml


<?xml version="1.0"?>
<!DOCTYPE weblogic-ejb-jar PUBLIC "-//BEA Systems, Inc.//DTD WebLogic 7.0.0 EJB//EN" "http://www.bea.com/servers/wls700/dtd/weblogic-ejb-jar.dtd">
<weblogic-ejb-jar>

  <weblogic-enterprise-bean>
    <ejb-name>TopsecurityMDB</ejb-name>
    <message-driven-descriptor>
      <pool>
        <max-beans-in-free-pool>200</max-beans-in-free-pool>
        <initial-beans-in-free-pool>20</initial-beans-in-free-pool>
      </pool>
      <destination-jndi-name>dk.topsecurity.inqueue</destination-jndi-name>
    </message-driven-descriptor>
    <jndi-name>examplesMessageDriven</jndi-name>
  </weblogic-enterprise-bean>
</weblogic-ejb-jar>




Finally for the .war file we have an index.html, which enables us to see if anything is deployed..


web.xml for the war-jar inside the ear-file
web.xml


<!DOCTYPE web-app PUBLIC "-//Sun Microsystems, Inc.//DTD Web Application 2.3//EN" "http://java.sun.com/dtd/web-app_2_3.dtd">
<web-app>
</web-app>




and


web-services.xml for the war-jar inside the ear-file
web-services.xml


<web-services xmlns:xsd="http://www.w3.org/2001/XMLSchema" >
  <web-service 
    name="BounceService" 
    targetNamespace="http://www.foobar.com/echo"
    uri="/BounceService">
   <components>
     <jms-send-destination name="topsin" connection-factory="weblogic.jms.ConnectionFactory">
        <jndi-name path="dk.topsecurity.inqueue" />
     </jms-send-destination>
     <jms-receive-queue name="topsout" connection-factory="weblogic.jms.ConnectionFactory">
        <jndi-name path="dk.topsecurity.outqueue" />
     </jms-receive-queue>
   </components>
   <operations xmlns:xsd="http://www.w3.org/2001/XMLSchema">
     <operation invocation-style="one-way" name="submit" component="topsin" >
        <params>
	  <param name="param" style="in" type="xsd:string"/>
        </params>
     </operation>
     <operation invocation-style="request-response" name="query" component="topsout" >
        <params>
	  <param name="output_payload" style="out" type="xsd:string"/>
        </params>
     </operation>
     <operation invocation-style="one-way" name="submitObj" component="topsin" >
        <params>
	  <param name="paramObj" style="in" type="xsd:hexBinary"/>
        </params>
     </operation>
     <operation invocation-style="request-response" name="queryObj" component="topsout" >
        <params>
	  <param name="outputObj" style="out" type="xsd:hexBinary"/>
        </params>
     </operation>
   </operations>
 </web-service>
</web-services>




Next, we have to compile...


config.xml used by Ant compilation..
config.xml


<project name="TopsecurityMDB" default="all" basedir=".">

  <target name="init">
        <!-- set global properties for this build -->
        <property name="wl_home" value="C:/weblogicplatform/weblogic81"/>
        <property name="source" value="./src/dk/topsecurity"/>
        <property name="compiledir" value="./classes"/>
        <property name="warstage" value="./stage"/>
        <property name="earstage" value="./earstage"/>
        <property name="project_name"   value="TopsecurityMDB"/>
        <property name="jmsjar"         value="${project_name}.jar"/>
  </target>

  <target name="clean" depends="init">
    <delete dir="${compiledir}" />
    <mkdir dir="${compiledir}/temp" />
    <mkdir dir="${compiledir}/temp/META-INF"/>
    <delete dir="${warstage}" />
    <mkdir dir="${warstage}" />
    <mkdir dir="${warstage}/bouncer" />
    <mkdir dir="${warstage}/bouncer/WEB-INF" />
    <mkdir dir="${warstage}/bouncer/WEB-INF/classes" />
    <delete dir="${earstage}" />
    <mkdir dir="${earstage}" />
    <mkdir dir="${earstage}/META-INF" />
  </target>

  <target name="build" depends="clean">
  
    <!-- Create the EJB -->

    <copy file="ejb-jar.xml" todir="${compiledir}/temp/META-INF" overwrite="yes"/>
    <copy file="weblogic-ejb-jar.xml" todir="${compiledir}/temp/META-INF" 
      overwrite="yes"/>

    <javac srcdir="${source}" classpath="${wl_home}\server\lib\weblogic.jar" 
       includes="*.java" destdir="${compiledir}/temp"/>

    <javac srcdir="${source}" classpath="${wl_home}\server\lib\weblogic.jar" 
      includes="ServiceHome.java,ServiceInterface.java"
      destdir="${warstage}/bouncer/WEB-INF/classes" />

    <!-- you need a .jar file as argument to ejbc compiler -->
    <jar jarfile="${compiledir}/${project_name}.jar" basedir="${compiledir}/temp" includes="**/Service*.class,**/*.xml" />

    <!-- you need a .jar file as argument to ejbc compiler -->
    <jar jarfile="${compiledir}/${project_name}_client.jar" basedir="${compiledir}/temp" excludes="**/Service*.class,**/*.xml" />


    <java classname="weblogic.ejbc" fork="true" failonerror="true" 
      classpath="${wl_home}\server\lib\weblogic.jar" >
      <arg line=" -noexit -compiler sj ${compiledir}/${project_name}.jar ${compiledir}/${project_name}.jar"/>
    </java>

    <copy file="web.xml" todir="${warstage}/bouncer/WEB-INF" />
    <copy file="web-services.xml" todir="${warstage}/bouncer/WEB-INF" />
    <copy file="index.html" todir="${warstage}/bouncer" />

    <jar jarfile="${compiledir}/${project_name}.war" basedir="${warstage}/bouncer" />

    <copy file="${compiledir}/${project_name}.jar" todir="${earstage}" />
    <copy file="${compiledir}/${project_name}.war" todir="${earstage}" />
    <copy file="application.xml" todir="${earstage}/META-INF" />

    <jar jarfile="${compiledir}/${project_name}.ear" basedir="${earstage}" />

    <delete dir="${compiledir}/temp" />
    <delete file="${compiledir}/${project_name}.jar" />
    <delete file="${compiledir}/${project_name}.war" />
    <delete dir="${warstage}" />
    <delete dir="${earstage}" />
  </target>
  <target name="all" depends="clean,build" />
</project>




Finally... finally... we need two client applications to handle the MDB communication..

Reading from or to an JMS queue...


FromJMS.java reads anything thrown onto a JMS queue given as argument
FromJMS.java


package dk.topsecurity;

import java.io.BufferedReader;
import java.io.FilterOutputStream;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.io.Reader;
import java.io.Serializable;
import java.util.Dictionary;
import java.util.Hashtable;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.BytesMessage;
import javax.jms.MessageEOFException;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.Name;
import javax.naming.NamingException;
import javax.jms.MessageEOFException;

/**
 * @author Copyright (c) 2002 by BEA Systems, Inc. All Rights Reserved.
 *
 * dk.topsececurity: sorry folks at BEA, I just changed things a bit...
 *
 */

/**
 * fromJMS - this module simply listens on a specific Jms queue
 * and prints out the message.   If the message is 'quit' then
 * the module exits.
 * 
 * @returns
 * @throws Exception
 * 
 */

final public class FromJMS implements MessageListener {

  public final static String JNDI_FACTORY = 
    "weblogic.jndi.WLInitialContextFactory";
  public final static String JMS_FACTORY = "weblogic.jms.ConnectionFactory";
  public final static String QUEUE = "weblogic.jms.inqueue";

  private QueueConnectionFactory qconFactory;
  private QueueConnection qcon;
  private QueueSession qsession;
  private QueueReceiver qreceiver;
  private Queue queue;
  private boolean quit = false;
  private static String myqueue = null;

  public void onMessage(Message jmsMsg) {
    // get the message and parse
    try {
	  if (jmsMsg instanceof TextMessage) {
        String msgText = ((TextMessage) jmsMsg).getText();
        System.out.println("TextMessage:" + msgText);
        if (msgText.equalsIgnoreCase("quit")) {
          synchronized (this) {
            quit = true;

            this.notifyAll();  // Notify main thread to quit
          }                    // end sync
        }                     // end if
      } else if (jmsMsg instanceof BytesMessage) {
      // now put it in a TypedCArray and send it on down the pipe
        int i;
        byte[] carray = new byte[1000];
        for (i = 0; ; i++) {
          try {
              carray[i] = (byte) ((BytesMessage) jmsMsg).readByte();
          } catch (MessageEOFException eof) {
              break;
          }  // end try/catch
        }   // end forever

		System.out.println("BytesMessage: " + i + " bytes.");
		for (i = 0; i<10 ; i++) {
		  System.out.println("  CArray[" + i + "] = " + carray[i]);
		  if(carray[i] == 0) break;
		}   // end for
      } else {
          System.out.println("Unsupported message type.");
      }
    } catch (JMSException jmse) {
      jmse.printStackTrace();
    } // end of try
  } 

  /**
   * Create all the necessary objects for sending and receiving
   * messages from a JMS queue.
   */
  public void init(Context ctx, 
                   String queueName) throws NamingException, JMSException {

    qconFactory = (QueueConnectionFactory) ctx.lookup(JMS_FACTORY);
    qcon = qconFactory.createQueueConnection();
    qsession = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);

    try {
      queue = (Queue) ctx.lookup(queueName);
    } catch (NamingException ne) {
      queue = qsession.createQueue(queueName);

      ctx.bind(queueName, queue);
    } 

    String selector = "";

    qreceiver = qsession.createReceiver(queue, selector);

    qreceiver.setMessageListener(this);
    qcon.start();
  } 

  public void close() throws JMSException {
    qreceiver.close();
    qsession.close();
    qcon.close();
  } 

  public static void main(String[] args) throws Exception {
    if (args.length < 1 || args.length > 2) {
	  System.out.println("Usage: java fromJMS [queue] WebLogicURL");

      return;
    } 

    FromJMS ac = new FromJMS();

	if (args.length == 1) {
      InitialContext ic = getInitialContext(args[0]);

      ac.init(ic, QUEUE);
	  myqueue = QUEUE;
	} else {
      InitialContext ic = getInitialContext(args[1]);

      ac.init(ic, args[0]);
	  myqueue = args[0];
    } 

    System.out.println("fromJMS ready to recieve messages from: " + myqueue);

    // Wait until a "quit" message has been received.
    synchronized (ac) {
      while (!ac.quit) {
        try {
          ac.wait();
        } catch (InterruptedException ie) {}
      } 
    } 

    System.out.println("exiting...quit received.");
    Thread.sleep(2000);
    ac.close();
  } 

  // get an initial context to the server
  private static InitialContext getInitialContext(String url) 
          throws NamingException {
    Hashtable env = new Hashtable();

    env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY);
    env.put(Context.PROVIDER_URL, url);

    return new InitialContext(env);
  } 
}



Send2JMS.java sends any message typed through standard console to the queue specified by argument..
Send2JMS.java


package dk.topsecurity;

import java.io.BufferedReader;
import java.io.FilterOutputStream;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.io.Reader;
import java.io.Serializable;
import java.util.Dictionary;
import java.util.Hashtable;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.BytesMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.Name;
import javax.naming.NamingException;


/**
 * @author Copyright (c) 2002 by BEA Systems, Inc. All Rights Reserved.
 *
 * dk.topsececurity: sorry folks at BEA, I just changed things a bit...
 *
 */

/**
 * send2JMS - this module simply send to a specific Jms queue
 * If the message is 'quit' then the module exits.
 * 
 * @returns
 * @throws Exception
 */

final public class Send2JMS {

  public final static String JNDI_FACTORY = 
    "weblogic.jndi.WLInitialContextFactory";
  public final static String JMS_FACTORY = "weblogic.jms.ConnectionFactory";
  public final static String QUEUE = "weblogic.jms.inqueue";

  private QueueConnectionFactory qconFactory;
  private QueueConnection qcon;
  private QueueSession qsession;
  private QueueSender qsender;
  private Queue queue;
  private TextMessage textmsg;

  public void init(Context ctx, 
                   String queueName) throws NamingException, JMSException {
    qconFactory = (QueueConnectionFactory) ctx.lookup(JMS_FACTORY);
    qcon = qconFactory.createQueueConnection();
    qsession = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);

    try {
      queue = (Queue) ctx.lookup(queueName);
    } catch (NamingException ne) {
      queue = qsession.createQueue(queueName);

      ctx.bind(queueName, queue);
    } 

    qsender = qsession.createSender(queue);
    textmsg = qsession.createTextMessage();

    qcon.start();
  } 

  public void close() throws JMSException {
    qsender.close();
    qsession.close();
    qcon.close();
  } 

  public static void main(String[] args) throws Exception {

    if (args.length < 1 || args.length > 2) {
      System.out.println("Usage: send2JMS [queue] WebLogicURL");

      return;
    } 

    Send2JMS omy = new Send2JMS();

    if (args.length == 1) {
      InitialContext ic = getInitialContext(args[0]);

      omy.init(ic, QUEUE);

      System.out.println("Queue="+QUEUE+" from "+args[0]);
    } else {
      InitialContext ic = getInitialContext(args[1]);
      omy.init(ic, args[0]);
      System.out.println("Queue="+args[0]+" from "+args[1]);
    } 

    // get message from user
    BufferedReader msgStream = 
      new BufferedReader(new InputStreamReader(System.in));
    String line = null;
    boolean quitNow = false;

    do {
      System.out.print("Enter message (\"quit\" to quit): ");

      line = msgStream.readLine();

      if (line != null && line.trim().length() != 0) {
        quitNow = line.equalsIgnoreCase("quit");

        if (line.equalsIgnoreCase("null")) {
            line = null;
        }  
		omy.send(line);         
      } 
    } while (!quitNow);

    omy.close();
  } 

  // gets the initial context to the server
  private static InitialContext getInitialContext(String url) 
          throws NamingException {
    Hashtable env = new Hashtable();

    env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY);
    env.put(Context.PROVIDER_URL, url);

    return new InitialContext(env);
  } 

  public void send(String mymessage) throws JMSException {
    try {
      textmsg.setText(mymessage);
      qsender.send(textmsg);
    } catch (Exception e) {
      e.printStackTrace();
    } 
  } 
}



Off we go and compiles..



Leaving you with two files... TopsecurityMDB_client.jar and TopsecurityMDB.ear

The latter, you deploy on your weblogic server - the first one, you use with command scripts below to communicate with the MDB you assembled..


Throw stuff at the MDB - using Windows platform (not by preferred choice).. we need command scripts to run out client.. - be sure to include it on startup - and to remember to set wl_home to your weblogic installation directory of choice..
sendTest.cmd


set BEA_HOME=C:\weblogicplatform
set WL_HOME=C:\weblogicplatform\weblogic81

java -classpath .;./classes/TopsecurityMDB_client.jar;%WL_HOME%\server\lib\weblogic.jar dk.topsecurity.Send2JMS dk.topsecurity.inqueue t3://localhost:7001




and


Read output from MDB - using Windows platform (not by preferred choice).. we need command scripts to run out client.. - be sure to include it on startup - and to remember to set wl_home to your weblogic installation directory of choice..
fromTest.cmd


set BEA_HOME=C:\weblogicplatform
set WL_HOME=C:\weblogicplatform\weblogic81

java -classpath .;./classes/TopsecurityMDB_client.jar;%WL_HOME%\server\lib\weblogic.jar dk.topsecurity.FromJMS dk.topsecurity.outqueue t3://localhost:7001




Finally... finally... all in motion...