naobe @ ウィキ

ActiveMQ

最終更新:

naobe

- view
管理者のみ編集可
OpenSourceに戻る

対象バージョン

5.6.0

インストール

 drwxr-xr-x  5 activemq activemq    4096  7月  1 18:32 data
 drwxr-xr-x  8 activemq activemq    4096  7月  1 18:32 example
 drwxr-xr-x 10 root     admin       4096  7月  1 17:36 ..
 drwxr-xr-x  5 activemq activemq    4096  7月  1 14:04 bin
 drwxrwxr-x  5 activemq activemq    4096  7月  1 13:52 tmp
 drwxr-xr-x 10 activemq activemq    4096  7月  1 13:52 .
 drwxr-xr-x  2 activemq activemq    4096  6月 30 22:39 conf
 drwxr-xr-x  2 activemq activemq    4096  6月 30 22:39 docs
 drwxr-xr-x  4 activemq activemq    4096  6月 30 22:39 lib
 drwxr-xr-x  6 activemq activemq    4096  6月 30 22:39 webapps
 -rw-r--r--  1 activemq activemq   40581  5月  2 21:07 LICENSE
 -rw-r--r--  1 activemq activemq    3335  5月  2 21:07 NOTICE
 -rw-r--r--  1 activemq activemq    2613  5月  2 21:07 README.txt
 -rw-r--r--  1 activemq activemq    2038  5月  2 21:07 WebConsole-README.txt
 -rw-r--r--  1 activemq activemq    2812  5月  2 21:07 user-guide.html
 -rwxr-xr-x  1 activemq activemq 4984024  5月  2 20:18 activemq-all-5.6.0.jar
  • bin/activemqの以下を修正
 if [ -z "$ACTIVEMQ_USER" ] ; then
     ACTIVEMQ_USER="activemq"
 #    ACTIVEMQ_USER=""
 fi
 
 #JAVA_HOME=""
 JAVA_HOME=Javaインストールディレクトリ

起動

  • sudo bin/activemq start で起動。ActiveMQのホームページには、bin/activemqで起動と書いてあったが、Usageを表示してサーバが起動しない。
  • ps -ef | grep activemqで起動を確認
 activemq 12032     1  0 Jul01 ?        00:00:35 /usr/java/jdk1.6.0_21/bin/java -Dorg.apache.activemq.audit=true -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote -Djava.io.tmpdir=/opt/apache-activemq-5.6.0/tmp -Dactivemq.classpath=/opt/apache-activemq-5.6.0/conf; -Dactivemq.home=/opt/apache-activemq-5.6.0 -Dactivemq.base=/opt/apache-activemq-5.6.0 -Dactivemq.conf=/opt/apache-activemq-5.6.0/conf -Dactivemq.data=/opt/apache-activemq-5.6.0/data -jar /opt/apache-activemq-5.6.0/bin/run.jar start
  • netstat -an | grep 61616で、ポート確認
 tcp        0      0 :::61616                    :::*                        LISTEN
  • http://ホスト:8161/adminで管理画面表示
  • exampleのproducerを実行
 cd example
 ant producer

  • 管理画面のqueueをクリック。キューに2000件のメッセージが登録されている。

  • TEST.FOOをクリックするとキューの詳細を表示

  • exampleのconsumerを実行。キューが0件になる。
 ant consumer

サンプルプログラム

ActiveMQと異なるホストからJNDIでConnectionFactoryを取得してキューに送信。
 package test;
 
 import java.util.Date;
 
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 import javax.naming.Context;
 import javax.naming.NamingException;
 
 public class ActiveMQTest {
 	
 	public static void main(String[] args) {
 		ActiveMQTest test = new ActiveMQTest();
 		
 		try {
 			test.execute();
 		} catch (Exception e) {
 			e.printStackTrace();
 		}
 	}
 
 	private int messageSize = 255;
 
 	public void execute() throws NamingException, JMSException {
 		Connection connection = null;
 		try {
 			Context jndiContext = new javax.naming.InitialContext();
 			ConnectionFactory connectionFactory = (ConnectionFactory)jndiContext.lookup("queueConnectionFactory");
 			connection = connectionFactory.createConnection();
 			connection.start();
 			Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 			Destination destination = session.createQueue("TEST.FOO");
 			
 			MessageProducer producer = session.createProducer(destination);
 			producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
 			
 			sendLoop(session, producer);
 		} finally {
             try {
             	if( connection != null) {
                     connection.close();
             	}
             } catch (Throwable ignore) {
             }
 		}
 	}
 
 	private void sendLoop(Session session, MessageProducer producer)
 			throws JMSException {
 		for (int i = 0; i < 100; i++) {
 
 			TextMessage message = session
 					.createTextMessage(createMessageText(i));
 
 			String msg = message.getText();
 			if (msg.length() > 50) {
 				msg = msg.substring(0, 50) + "...";
 			}
 			System.out.println("Sending message: '" + msg + "'");
 
 			producer.send(message);
 		}
 
 	}
 
 	private String createMessageText(int index) {
         StringBuffer buffer = new StringBuffer(messageSize);
         buffer.append("Message: " + index + " sent at: " + new Date());
         if (buffer.length() > messageSize) {
             return buffer.substring(0, messageSize);
         }
         for (int i = buffer.length(); i < messageSize; i++) {
             buffer.append(' ');
         }
         return buffer.toString();
 	}
 }

jndi.propertiesを作成しておく。
 # START SNIPPET: jndi
 java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory
 
 # use the following property to configure the default connector
 java.naming.provider.url = tcp://ホスト:61616
 
 # use the following property to specify the JNDI name the connection factory
 # should appear as.
 connectionFactoryNames = connectionFactory, queueConnectionFactory, topicConnectionFactry
 
 # register some queues in JNDI using the form
 # queue.[jndiName] = [physicalName]
 queue.MyQueue = example.MyQueue
 
 # register some topics in JNDI using the form
 # topic.[jndiName] = [physicalName]
 topic.MyTopic = example.MyTopic
 
 # END SNIPPET: jndi

クラスパスに、activemq-all-5.6.0.jar、spring-core-3.0.6.RELEASE.jar(これは本当に必要?)を設定しておく。

キューから受信

 private void receiveQueueTest() throws NamingException, JMSException {
 	Connection connection = null;
 	Context jndiContext = new javax.naming.InitialContext();
 	ConnectionFactory connectionFactory = (ConnectionFactory) jndiContext
 			.lookup("queueConnectionFactory");
 	connection = connectionFactory.createConnection();
 	connection.start();
 	Session session = connection.createSession(false,
 			Session.AUTO_ACKNOWLEDGE);
 	Destination destination = session.createQueue("TEST.FOO");
 	MessageConsumer consumer = session.createConsumer(destination);
 	consumer.setMessageListener(this);
 }	
 
 public void onMessage(Message message) {
 	TextMessage msg = (TextMessage) message;
 	try {
 		System.out.println( msg.getText());
 	} catch (JMSException e) {
 		e.printStackTrace();
 	}
 }

たまっていたキューを全て受信。consumer.receive()とすると1件だけ受信。

Request-Respopnseの実現


JMS上でリクエストーレスポンスを実現する最も良い方法は、一時キューとクライアント毎のconsumerをスタートアップ時に作成することです。一時キュー向けのすべてのメッセージにJMSReplyToプロパティをセットし、リクエストメッセージとレスポンスメッセージを関連付けるIDを使うことである。これでconsumer, producerを毎回作成するオーバヘッドを防ぐことができる。また希望するなら多くのスレッド間で、producer, consumerを共有することができる。


Tomcatとの連携

人気記事ランキング
目安箱バナー