Message Driven POJOs

We are prototyping some possible extensions to MDBs to see if we can make things a little easier for development. The idea is to give a message consumer (an MDB) a typed interface that message producers can send messages through. Both the publisher and subscriber would be typed interfaces.

The model

Message Driven POJOs will have the same model as Stateless/Stateful beans. There is a bean class tagged as @Consumer that must implement one or more @Producer interfaces. Just as a stateless bean is tagged as @Stateless and implements one or more @Remote or @Local interfaces.

It might be best to look at a full example. One is available in our testsuite

package org.jboss.annotation.ejb;

@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME)
public @interface Consumer
{
   String name() default "";

   TransactionManagementType transactionManagement() default TransactionManagementType.CONTAINER;

   ActivationConfigProperty[] activationConfig() default {};
}

@Consumer looks exactly like @MessageDriven of the EJB3 specification and is configured as such. Place @Consumer on a bean class. That bean class implements interfaces tagged as @Producer.

package org.jboss.annotation.Producer;

@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME)
public @interface Producer
{
   String connectionFactory() default "";
   boolean transacted() default false;
   int acknowledgeMode() default 1; // autoacknowledge
}

For each @Producer interface the @Consumer implements, there will be a proxy that implements that @Producer registered in JNDI under the FQN of that @Producer interface.

Simple example

Let's do a simple example. First define the consumer:

import org.jboss.ejb3.mdb.Consumer;

@Consumer(activationConfig =
        {
        @ActivationConfigProperty(propertyName="destinationType", propertyValue="javax.jms.Queue"),
        @ActivationConfigProperty(propertyName="destination", propertyValue="queue/mdbtest")
        })
public class QueueTestConsumer implements QueueTestRemote
{
   public void method1(String msg, int num)
   {
      System.out.println("method1(" + msg + ", " + num + ")");
   }
}

QueueTestConsumer is our bean class and defines the methods that can receive JMS messages. You see that it implements one interface, QueueTestRemote.

import org.jboss.annotation.ejb.Producer;

@Producer
public interface QueueTestRemote 
{
   public void method1(String msg, int num);
}

This interface will be used by clients(JMS Publishers) to send messages to the consumer via JMS. Let's now take a look at the client (really the producer of the actual messages).

import org.jboss.ejb3.mdb.ProducerObject;
import org.jboss.ejb3.mdb.ProducerManager;

public static void main(String[] args) throws Exception
{
   InitialContext ctx = new InitialContext();
   QueueTestRemote tester = (QueueTestRemote)ctx.lookup(QueueTestRemote.class.getName());

   ProducerObject po = (ProducerObject)tester;
   ProducerManager manager = po.getProducerManager();

   manager.connect(); // internally create a JMS connection
   try
   {
      tester.method1("hello world", 55);
   }
   finally
   {
      manager.close(); // clean up the JMS connection
   }
}

When the @Consumer is deployed by the EJB3 container, it looks for all of its @Producer interfaces and registers each one of them in JNDI under their FQN class name. The client code above looks up the QueueTestRemote interface in JNDI. Each producer implements the ProducerObject . The client typecasts the QueueTesterRemote to the ProducerObject. It then gets a ProducerManager that manages the JMS connection for this proxy. To start being able to send messages to the Queue, the client calls connect() on the manager. It then can successfully call methods on the tester. When it calls method1() this method call is converted into a JMS message and published to the Queue of the Consumer. The consumer will receive the message and invoke its method1 method.

@Producer default values

The proxy registered in JNDI will know how to contact the JMS Queue/Topic to publish messages. You can specify explicitly through the connnectionFactory attribute of the annotation what the JMS ConnectionFactory JNDI name is, or you can rely on defaults.

The default value is "ConnectionFactory" for the ConnectionFactory JNDI name.

If you additionally tag the producer as @ProducerLocal instead of @Producer, then "java:/ConnectionFactory" will be used.

@ProducerLocal

If you tag a producer as @ProducerLocal, the proxy will lookup the connection factory via the default InitialContext when connect() is called. Otherwise, the ConnectFactory reference will be embedded directly within the proxy.

@MessageProperties

The methods defined in a Producer are turned into JMS messages. The default message properties are a Time To Live of 0, a Priority of 4, and a delivery mode of PERSISTENT. You can override these default values in a couple of ways.

@Producer
@MessageProperties(delivery=DeliveryMode.NON_PERSISTENT, timeToLive=1000, priority=1)
public interface QueueTestRemote 
{
   public void method1(String msg, int num);
}

In this configuration, all method calls on QueueTestRemote will use the JMS message properties defined with the @MessageProperties annotation on the interface.

@Producer
public interface QueueTestRemote 
{
   public void method1(String msg, int num);

   @MessageProperties(delivery=DeliveryMode.NON_PERSISTENT, timeToLive=1000, priority=1)
   public void method2(String msg, int num);
}

So, in the above example, method1() uses the default message properties, and method2() overrides the defaults via the @MessageProperties annotation attached to it.

Obtaining the current message

Sometimes you may need to access the real JMS message. Maybe you need to obtain the replyTo destination or set an acknowledgement or something. You can obtain it by using the @org.jboss.annotation.ejb.CurrentMessage annotation.


   @CurrentMessage private javax.jms.Message currentMessage;

This annotation will inject the current JMS message into your Consumer bean before your target method is invoked.

A Full Example

A full example can be found in our testsuite. The QueueTestConsumer defines 3 Producers. One is for remote invocations on the Queue QueueTestRemote.java, another is for local invocations on the queue QueueTestLocal.java, yet another is for using the JCA adapter to send messages transactionally through CMT QueueTestXA.java

The test client sends messages through the QueueTestRemote interface. It interacts with a stateless bean to test the local and XA producers.

FEEDBACK NEEDED

Please provide feedback to this proposed feature on our forum. We would really like feedback on this feature.

Building and Running

To build and run the example, make sure you have ejb3.deployer installed in JBoss 4.0.x and have JBoss running. See the reference manual on how to install EJB 3.0.
Unix:    $ export JBOSS_HOME=<where your jboss 4.0 distribution is>
Windows: $ set JBOSS_HOME=<where your jboss 4.0 distribution is>
$ ant
$ ant run

Look in the console window to determine that the message was sent.

12:25:59,734 INFO  [Ejb3Module] found EJB3 consumer bean: org.jboss.tutorial.consumer.bean.ExampleConsumerBean
12:25:59,734 INFO  [Ejb3Module] found EJB3 stateless session bean: org.jboss.tutorial.consumer.bean.TesterBean
12:25:59,796 INFO  [ConsumerContainer] Producer: org.jboss.tutorial.consumer.bean.ExampleProducerRemote
12:25:59,796 INFO  [ConsumerContainer] Producer: org.jboss.tutorial.consumer.bean.ExampleProducerLocal
12:25:59,796 INFO  [ConsumerContainer] Producer: org.jboss.tutorial.consumer.bean.ExampleProducerXA
12:25:59,812 INFO  [ProxyDeployer] no declared remote bindings
12:25:59,812 INFO  [ProxyDeployer] there is remote interfaces
12:25:59,812 INFO  [ProxyDeployer] default remote binding has jndiName of org.jboss.tutorial.consumer.bean.Tester
12:25:59,828 INFO  [EJB3Deployer] Deployed: file[the deployment]
12:26:09,000 INFO  [STDOUT] method1(Remote method1 called, 1)
12:26:09,015 INFO  [STDOUT] method2: Remote method2 called
12:26:09,015 INFO  [STDOUT] method2 key/val: great:ejb3
12:26:09,015 INFO  [STDOUT] method2 key/val: hello:world
12:26:09,109 INFO  [STDOUT] method1(testLocal, 1)
12:26:09,125 INFO  [STDOUT] method2: testLocal2
12:26:09,125 INFO  [STDOUT] method2 key/val: great:ejb3
12:26:09,125 INFO  [STDOUT] method2 key/val: hello:world
12:26:09,125 INFO  [STDOUT] end TESTXA **
12:26:09,140 INFO  [STDOUT] method2: testXA2
12:26:09,140 INFO  [STDOUT] method2 key/val: great:ejb3
12:26:09,140 INFO  [STDOUT] method2 key/val: hello:world
12:26:09,140 INFO  [STDOUT] method1(testXA, 1)