package org.mockejb.jms;
import javax.jms.*;
import java.util.*;
import org.mockejb.interceptor.InterceptableProxy;
public class MockConsumer implements MessageConsumer {
private boolean closed = false;
private MockSession sess;
private MockDestination destination;
private final List messages = new ArrayList();
private MessageListener listener = null;
MockConsumer(MockSession sess, MockDestination destination) {
this.sess = sess;
this.destination = destination;
}
public String getMessageSelector() throws JMSException {
return null;
}
public MessageListener getMessageListener() throws JMSException {
return listener;
}
public void setMessageListener(MessageListener listener)
throws JMSException {
this.listener = (MessageListener) InterceptableProxy.create(javax.jms.MessageListener.class, listener);
}
public Message receive() throws JMSException {
if (isClosed()) {
return null;
}
if (messages.size() == 0) {
throw new RuntimeException("No messages received");
}
return (Message) messages.remove(0);
}
public Message receive(long timeout) throws JMSException {
if (isClosed() || (messages.size() == 0 && timeout != 0) ) {
return null;
}
return receive();
}
public Message receiveNoWait() throws JMSException {
return receive(1);
}
public void close() throws JMSException {
closed = true;
destination.removeConsumer(this);
messages.clear();
listener = null;
destination = null;
}
void consume() throws JMSException {
if (listener == null || messages.size() == 0) {
return;
}
ListIterator it = messages.listIterator();
System.out.println("Consuming...");
while (it.hasNext()) {
try {
listener.onMessage((Message) it.next());
} catch (Throwable t) {
throw new RuntimeException(
"Escaped exception from MessageListener (faulty listener):\n"
+ t.getMessage());
}
it.remove();
}
}
void consume(Message msg) throws JMSException {
MessageImpl receivedMsg = MessageUtility.copyMessage(msg, true);
MessageListener l = getMessageListener();
if (l != null && isStarted()) {
try {
l.onMessage(receivedMsg);
} catch (Throwable t) {
throw new RuntimeException(
"Escaped exception from MessageListener (faulty listener):\n"
+ t.getMessage());
}
} else {
messages.add(receivedMsg);
}
}
void consume(Collection messages) throws JMSException {
Iterator it = messages.iterator();
while (it.hasNext()) {
consume((Message)it.next());
}
}
boolean isClosed() {
return closed;
}
MockDestination getDestination() {
return destination;
}
private boolean isStarted() {
return sess.getConnection().isStarted();
}
}