package com.nixon;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* Depends on activemq-all-5.11.1.jar
*/
public class MqBulkMove {
private int DEFAULT_THROTTLE_MS = 20;
public static void main(String... args) {
if (args == null || args.length != 4) {
System.out.println("Usage: java -jar mqmove.jar broker-url source-queue dest-queue max-messages");
System.out.println("Example: java -jar mqmove.jar tcp://activemq01.nixon.com:61616 REQUEST_DQ REQUEST 1");
return;
}
String brokerUrl = args[0];
String sourceQueue = args[1];
String destQueue = args[2];
int maxNumberOfMessagesToMove = Integer.valueOf(args[3]);
try {
boolean success = new MqBulkMove().bulkMove(brokerUrl, sourceQueue, destQueue, maxNumberOfMessagesToMove);
System.exit(success ? 0 : -1);
}
catch (Exception ex) { throw new RuntimeException(ex); }
}
private boolean bulkMove(String activeMqBrokerUrl, String sourceQueue, String destQueue, int maxNumberOfMessagesToMove) throws Exception {
AMQ amq = null;
try {
amq = new AMQ(activeMqBrokerUrl, sourceQueue, destQueue);
amq.producer.setDeliveryMode(DeliveryMode.PERSISTENT);
if (!confirm(maxNumberOfMessagesToMove, sourceQueue, destQueue)) {
System.out.println("EXIT");
return false;
}
int numberOfMessagesMoved = 1;
while (numberOfMessagesMoved <= maxNumberOfMessagesToMove) {
Message message = null;
try {
message = amq.consumer.receive(1000);
System.out.println("Received message " + numberOfMessagesMoved);
}
catch (JMSException ex) {
ex.printStackTrace();
System.out.println("Failed to receive message " + numberOfMessagesMoved);
System.out.println("Exit:ERROR");
return false;
}
if (message == null) {
System.out.println("Message is null");
System.out.println("Exit::ERROR");
return false;
}
try {
amq.producer.send(message);
// Acknowledge the message only if copy was successful
message.acknowledge();
}
catch (JMSException ex) {
ex.printStackTrace();
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
System.out.println("Failed to send message to " + destQueue + " : " + text);
} else {
System.out.println("Failed to send message to " + destQueue + " : " + message);
}
System.out.println("Exit::ERROR");
return false;
}
System.out.println("Sent message " + numberOfMessagesMoved);
Thread.sleep(DEFAULT_THROTTLE_MS);
numberOfMessagesMoved++;
}
System.out.println("Exit::SUCCESS");
}
catch (Exception ex) {
ex.printStackTrace();
System.out.println("Exit:ERROR");
return false;
}
finally {
if (amq!=null){amq.close();}
}
return true;
}
private boolean confirm(int maxNumberOfMessagesToMove, String sourceQueue, String destQueue) {
System.out.println("-------------------------------------------------------------");
System.out.println("Moving " + maxNumberOfMessagesToMove + " messages (at most) from " + sourceQueue + " to " + destQueue);
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
System.out.print("confirm? [y/n]");
boolean isConfirmed = false;
try {
String input = reader.readLine();
isConfirmed = "y".equalsIgnoreCase(input);
} catch (Exception ex) {
ex.printStackTrace();
}
if (!isConfirmed) {
return false;
}
System.out.println("-------------------------------------------------------------");
return true;
}
private class AMQ {
Connection connection;
Session session;
Destination source;
Destination destination;
MessageConsumer consumer;
MessageProducer producer;
public AMQ(String activeMqBrokerUrl, String sourceQueue, String destQueue) throws JMSException {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(activeMqBrokerUrl);
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
source = session.createQueue(sourceQueue);
destination = session.createQueue(destQueue);
consumer = session.createConsumer(source);
producer = session.createProducer(destination);
}
public void close() throws JMSException {
try{producer.close();}catch(Exception ex){ex.printStackTrace();}
try{consumer.close();}catch(Exception ex){ex.printStackTrace();}
try{session.close();}catch(Exception ex){ex.printStackTrace();}
try{connection.close();}catch(Exception ex){ex.printStackTrace();}
}
}
}
import java.io.BufferedReader;
import java.io.InputStreamReader;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* Depends on activemq-all-5.11.1.jar
*/
public class MqBulkMove {
private int DEFAULT_THROTTLE_MS = 20;
public static void main(String... args) {
if (args == null || args.length != 4) {
System.out.println("Usage: java -jar mqmove.jar broker-url source-queue dest-queue max-messages");
System.out.println("Example: java -jar mqmove.jar tcp://activemq01.nixon.com:61616 REQUEST_DQ REQUEST 1");
return;
}
String brokerUrl = args[0];
String sourceQueue = args[1];
String destQueue = args[2];
int maxNumberOfMessagesToMove = Integer.valueOf(args[3]);
try {
boolean success = new MqBulkMove().bulkMove(brokerUrl, sourceQueue, destQueue, maxNumberOfMessagesToMove);
System.exit(success ? 0 : -1);
}
catch (Exception ex) { throw new RuntimeException(ex); }
}
private boolean bulkMove(String activeMqBrokerUrl, String sourceQueue, String destQueue, int maxNumberOfMessagesToMove) throws Exception {
AMQ amq = null;
try {
amq = new AMQ(activeMqBrokerUrl, sourceQueue, destQueue);
amq.producer.setDeliveryMode(DeliveryMode.PERSISTENT);
if (!confirm(maxNumberOfMessagesToMove, sourceQueue, destQueue)) {
System.out.println("EXIT");
return false;
}
int numberOfMessagesMoved = 1;
while (numberOfMessagesMoved <= maxNumberOfMessagesToMove) {
Message message = null;
try {
message = amq.consumer.receive(1000);
System.out.println("Received message " + numberOfMessagesMoved);
}
catch (JMSException ex) {
ex.printStackTrace();
System.out.println("Failed to receive message " + numberOfMessagesMoved);
System.out.println("Exit:ERROR");
return false;
}
if (message == null) {
System.out.println("Message is null");
System.out.println("Exit::ERROR");
return false;
}
try {
amq.producer.send(message);
// Acknowledge the message only if copy was successful
message.acknowledge();
}
catch (JMSException ex) {
ex.printStackTrace();
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
System.out.println("Failed to send message to " + destQueue + " : " + text);
} else {
System.out.println("Failed to send message to " + destQueue + " : " + message);
}
System.out.println("Exit::ERROR");
return false;
}
System.out.println("Sent message " + numberOfMessagesMoved);
Thread.sleep(DEFAULT_THROTTLE_MS);
numberOfMessagesMoved++;
}
System.out.println("Exit::SUCCESS");
}
catch (Exception ex) {
ex.printStackTrace();
System.out.println("Exit:ERROR");
return false;
}
finally {
if (amq!=null){amq.close();}
}
return true;
}
private boolean confirm(int maxNumberOfMessagesToMove, String sourceQueue, String destQueue) {
System.out.println("-------------------------------------------------------------");
System.out.println("Moving " + maxNumberOfMessagesToMove + " messages (at most) from " + sourceQueue + " to " + destQueue);
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
System.out.print("confirm? [y/n]");
boolean isConfirmed = false;
try {
String input = reader.readLine();
isConfirmed = "y".equalsIgnoreCase(input);
} catch (Exception ex) {
ex.printStackTrace();
}
if (!isConfirmed) {
return false;
}
System.out.println("-------------------------------------------------------------");
return true;
}
private class AMQ {
Connection connection;
Session session;
Destination source;
Destination destination;
MessageConsumer consumer;
MessageProducer producer;
public AMQ(String activeMqBrokerUrl, String sourceQueue, String destQueue) throws JMSException {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(activeMqBrokerUrl);
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
source = session.createQueue(sourceQueue);
destination = session.createQueue(destQueue);
consumer = session.createConsumer(source);
producer = session.createProducer(destination);
}
public void close() throws JMSException {
try{producer.close();}catch(Exception ex){ex.printStackTrace();}
try{consumer.close();}catch(Exception ex){ex.printStackTrace();}
try{session.close();}catch(Exception ex){ex.printStackTrace();}
try{connection.close();}catch(Exception ex){ex.printStackTrace();}
}
}
}