⟩ Is it possible to send or receive a message from within a message listener?
Yes. You can send to or receive from any queue or topic from within in a message listener.
If it's not an MDB, you can use the same Connection or Session that the onMessage() is part of to do this. When you create your message listener, you pass in a session in your constructor. Then you have access to the session in your onMessage method and you would be able to make synchronous, not asynchronous, calls from within the onMessage method. Do not use another Session that is servicing another onMessage() because that would multi-thread that Session and Sessions don't support multi-threading.
When things are done non-transactionally, there can be duplicates or lost messages (assuming your onMessage() code is attempting to forward messages):
1. If you call acknowledge after the publish() and the acknowledge fails for whatever reason (network/server failure), then you will see the message again and will end up publishing twice (possible duplicate semantics). You can try to keep track of sequence numbers to detect duplicates but this is not easy.
2. If you call acknowledge before the publish(), you get at-most-once semantics. If the publish() fails, you don't know if the failure occurred before or after the message reached the server.
If you want exactly once, transactional semantics using onMessage, you must use transactional MDBs. The onMessage() for a transactional MDB starts the transaction, includes the WebLogic Server JMS message received within that transaction and the publish() would also be in the same transaction. The following code sends a response to each message that it receives. It creates the connection, etc. in the ejbCreate method so that it doesn't need to create it every time onMessage is called. The QueueSender is anonymous (null Queue) since we don't know to whom we will have to reply. The ejbRemove method cleans up by closing the connection. This same approach can be used to create a receiver, subscriber or publisher.
import javax.ejb.CreateException;
import javax.ejb.EJBContext;
import javax.naming.*;
import javax.naming.directory.*;
import java.util.Hashtable;
import javax.ejb.MessageDrivenBean;
import javax.ejb.MessageDrivenContext;
import javax.jms.*;
public class MDB
implements MessageDrivenBean, MessageListener {
public static final String WLSqcf =
"javax.jms.QueueConnectionFactory";
public static final String WLSqname =
"jms.queue.TestQueue1";
public static final String WLSurl =
"t3://localhost:7001";
public static final String WLSJNDIfactory =
"weblogic.jndi.WLInitialContextFactory";
private MessageDrivenContext context;
private QueueSession session;
private QueueConnection connection = null;
private QueueConnectionFactory factory;
private InitialContext ctx;
private QueueSender QueueSender;
// Required - public constructor with no argument
public MDB() {}
// Required - ejbActivate
public void ejbActivate() {}
// Required - ejbRemove
public void ejbRemove() {
context = null;
if (connection != null) {
try {
connection.close();
} catch(Exception e) {}
connection = null;
}
}
// Required - ejbPassivate
public void ejbPassivate() {}
public void setMessageDrivenContext(
MessageDrivenContext mycontext) {
context = mycontext;
}
// Required - ejbCreate() with no arguments
public void ejbCreate () throws CreateException {
try {
// Get the initial context
Hashtable env = new Hashtable();
env.put(Context.INITIAL_CONTEXT_FACTORY, WLSJNDIfactory);
env.put(Context.PROVIDER_URL, WLSurl);
env.put(Context.REFERRAL, "throw");
ctx = new InitialContext(env);
factory = (QueueConnectionFactory)ctx.lookup(WLSqcf);
// Create a QueueConnection, QueueSession, QueueSender
connection = factory.createQueueConnection();
session = connection.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);
queueSender = session.createSender(null);
connection.start();
} catch (Exception e) {
throw(new CreateException(e.toString()));
}
}
// Implementation of MessageListener
// Throws no exceptions
public void onMessage(Message msg) {
try {
System.out.println("MDB: " +
((TextMessage)msg).getText());
msg.clearBody();
((TextMessage)msg).setText("reply message");
queueSender.send((Queue)msg.getJMSReplyTo(), msg);
}
catch(Exception e) { // Catch any exception
e.printStackTrace();
}
}
}
This approach creates a connection per EJB/MDB instance, so you might want to create a producer pool that is shared by the EJB instances. This is done by writing a class that populates a static pool with producers (see the next question for a sample producer pool). The onMessage call grabs a producer when needed. Since Sessions must be single threaded, make sure there is only one producer per session within the producer pool.