package org.mockejb.jms;

import javax.jms.*;
import java.util.*;

import org.mockejb.interceptor.InterceptableProxy;

/**
 * @author Dimitar Gospodinov
 *
 */
public class MockConsumer implements MessageConsumer {

    private boolean closed = false;

    private MockSession sess;
    private MockDestination destination;
    private final List messages = new ArrayList();
    private MessageListener listener = null;

    MockConsumer(MockSession sess, MockDestination destination) {
        this.sess = sess;
        this.destination = destination;
    }

    /**
     * @see javax.jms.MessageConsumer#getMessageSelector()
     */
    public String getMessageSelector() throws JMSException {
        return null;
    }

    /**
     * @see javax.jms.MessageConsumer#getMessageListener()
     */
    public MessageListener getMessageListener() throws JMSException {
        return listener;
    }

    /**
     * @see javax.jms.MessageConsumer#setMessageListener(javax.jms.MessageListener)
     */
    public void setMessageListener(MessageListener listener)
        throws JMSException {
        
        this.listener = (MessageListener) InterceptableProxy.create(javax.jms.MessageListener.class, listener);
    }

    /**
     * @see javax.jms.MessageConsumer#receive()
     */
    public Message receive() throws JMSException {
        if (isClosed()) {
            return null;
        }
        if (messages.size() == 0) {
            throw new RuntimeException("No messages received");
        }
        return (Message) messages.remove(0);
    }

    /**
     * @see javax.jms.MessageConsumer#receive(long)
     */
    public Message receive(long timeout) throws JMSException {
        if (isClosed() || (messages.size() == 0 && timeout != 0) ) {
            return null;
        }
        return receive();
    }

    /**
     * @see javax.jms.MessageConsumer#receiveNoWait()
     */
    public Message receiveNoWait() throws JMSException {
        return receive(1);
    }

    /**
     * @see javax.jms.MessageConsumer#close()
     */
    public void close() throws JMSException {
        closed = true;
        destination.removeConsumer(this);
        messages.clear();
        listener = null;
        destination = null;
    }

    // Non-standard methods

    /**
     *  Asynchronously consume all received messages.
     */
    void consume() throws JMSException {
        if (listener == null || messages.size() == 0) {
            return;
        }
        ListIterator it = messages.listIterator();
        System.out.println("Consuming...");
        while (it.hasNext()) {
            try {
                listener.onMessage((Message) it.next());
            } catch (Throwable t) {
                throw new RuntimeException(
                    "Escaped exception from MessageListener (faulty listener):\n"
                        + t.getMessage());
            }
            it.remove();
        }
    }

    /**
     * Consume message sent from destination.
     * If MessageListener is available and the connection is started, message is consumed immediatelly.
     * In all other cases message is stored and will be consumed later.
     * @param msg
     * @throws JMSException
     */
    void consume(Message msg) throws JMSException {

        MessageImpl receivedMsg = MessageUtility.copyMessage(msg, true);
        MessageListener l = getMessageListener();

        if (l != null && isStarted()) {
            try {
                l.onMessage(receivedMsg);
            } catch (Throwable t) {
                throw new RuntimeException(
                    "Escaped exception from MessageListener (faulty listener):\n"
                        + t.getMessage());
            }
        } else {
            messages.add(receivedMsg);
        }
    }

    /**
     *  Consumes all specified messages. <code>messages</code> is collection of
     * <code>Message</code>.
     * Each message is copied and <code>consume(Message)</code> is called with the copy.
     * @param messages
     */
    void consume(Collection messages) throws JMSException {
        Iterator it = messages.iterator();
        while (it.hasNext()) {
            consume((Message)it.next());        
        }
     }

    boolean isClosed() {
        return closed;
    }

    MockDestination getDestination() {
        return destination;
    }
    
    private boolean isStarted() {
        return sess.getConnection().isStarted();
    }

}