Monday, November 7, 2016

A Tool to Bulk Move ActiveMQ Messages

This java algorithm can be used to mass-move messages from one ActiveMQ queue to another.

package com.nixon;
 
import java.io.BufferedReader;
import java.io.InputStreamReader;
 
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
 
import org.apache.activemq.ActiveMQConnectionFactory;
 
/**
 * Depends on activemq-all-5.11.1.jar
 */
public class MqBulkMove {
 
    private int DEFAULT_THROTTLE_MS = 20;
 
    public static void main(String... args) {
 
        if (args == null || args.length != 4) {
            System.out.println("Usage: java -jar mqmove.jar broker-url source-queue dest-queue max-messages");
            System.out.println("Example: java -jar mqmove.jar tcp://activemq01.nixon.com:61616 REQUEST_DQ REQUEST 1");
            return;
        }
 
        String brokerUrl = args[0];
        String sourceQueue = args[1];
        String destQueue = args[2];
        int maxNumberOfMessagesToMove = Integer.valueOf(args[3]);
 
        try {
            boolean success = new MqBulkMove().bulkMove(brokerUrl, sourceQueue, destQueue, maxNumberOfMessagesToMove);
            System.exit(success ? 0 : -1);
        }
        catch (Exception ex) { throw new RuntimeException(ex); }
    }
 
    private boolean bulkMove(String activeMqBrokerUrl, String sourceQueue, String destQueue, int maxNumberOfMessagesToMove) throws Exception {
        AMQ amq = null;
        try {
            amq = new AMQ(activeMqBrokerUrl, sourceQueue, destQueue);
 
            amq.producer.setDeliveryMode(DeliveryMode.PERSISTENT);
 
            if (!confirm(maxNumberOfMessagesToMove, sourceQueue, destQueue)) {
                System.out.println("EXIT");
                return false;
            }
 
            int numberOfMessagesMoved = 1;
            while (numberOfMessagesMoved <= maxNumberOfMessagesToMove) {
                Message message = null;
                try {
                    message = amq.consumer.receive(1000);
 
                    System.out.println("Received message " + numberOfMessagesMoved);
                }
                catch (JMSException ex) {
                    ex.printStackTrace();
 
                    System.out.println("Failed to receive message " + numberOfMessagesMoved);
                    System.out.println("Exit:ERROR");
                    return false;
                }
 
                if (message == null) {
                    System.out.println("Message is null");
                    System.out.println("Exit::ERROR");
                    return false;
                }
 
                try {
                    amq.producer.send(message);
 
                    // Acknowledge the message only if copy was successful
                    message.acknowledge();
                }
                catch (JMSException ex) {
                    ex.printStackTrace();
                    if (message instanceof TextMessage) {
                        TextMessage textMessage = (TextMessage) message;
                        String text = textMessage.getText();
                        System.out.println("Failed to send message to " + destQueue + " : " + text);
                    } else {
                        System.out.println("Failed to send message to " + destQueue + " : " + message);
                    }
                    System.out.println("Exit::ERROR");
                    return false;
                }
                System.out.println("Sent message " + numberOfMessagesMoved);
 
                Thread.sleep(DEFAULT_THROTTLE_MS);
 
                numberOfMessagesMoved++;
            }
 
            System.out.println("Exit::SUCCESS");
        }
        catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("Exit:ERROR");
            return false;   
        }
        finally {
            if (amq!=null){amq.close();}
        }
       
        return true;
    }
 
    private boolean confirm(int maxNumberOfMessagesToMove, String sourceQueue, String destQueue) {
        System.out.println("-------------------------------------------------------------");
        System.out.println("Moving " + maxNumberOfMessagesToMove + " messages (at most) from " + sourceQueue + " to " + destQueue);
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        System.out.print("confirm? [y/n]");
        boolean isConfirmed = false;
        try {
            String input = reader.readLine();
            isConfirmed = "y".equalsIgnoreCase(input);
        } catch (Exception ex) {
            ex.printStackTrace();
        }
        if (!isConfirmed) {
            return false;
        }
 
        System.out.println("-------------------------------------------------------------");
        return true;
    }
 
    private class AMQ {
       
        Connection connection;
        Session session;
 
        Destination source;
        Destination destination;
        MessageConsumer consumer;
        MessageProducer producer;
 
        public AMQ(String activeMqBrokerUrl, String sourceQueue, String destQueue) throws JMSException {
 
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(activeMqBrokerUrl);
 
            connection = connectionFactory.createConnection();
            connection.start();
 
            session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
 
            source = session.createQueue(sourceQueue);
            destination = session.createQueue(destQueue);
 
            consumer = session.createConsumer(source);
 
            producer = session.createProducer(destination);
        }
 
        public void close() throws JMSException {
            try{producer.close();}catch(Exception ex){ex.printStackTrace();}
            try{consumer.close();}catch(Exception ex){ex.printStackTrace();}
            try{session.close();}catch(Exception ex){ex.printStackTrace();}
            try{connection.close();}catch(Exception ex){ex.printStackTrace();}
        }
    }
 
}