naobe @ ウィキ
ActiveMQ
最終更新:
Bot(ページ名リンク)
-
view
OpenSourceに戻る
対象バージョン
5.6.0
インストール
- http://activemq.apache.org/download.html からバイナリをダウンロード
- 展開
- ファイル、ディレクトリともに権限をactivemq:activemqに修正
drwxr-xr-x 5 activemq activemq 4096 7月 1 18:32 data drwxr-xr-x 8 activemq activemq 4096 7月 1 18:32 example drwxr-xr-x 10 root admin 4096 7月 1 17:36 .. drwxr-xr-x 5 activemq activemq 4096 7月 1 14:04 bin drwxrwxr-x 5 activemq activemq 4096 7月 1 13:52 tmp drwxr-xr-x 10 activemq activemq 4096 7月 1 13:52 . drwxr-xr-x 2 activemq activemq 4096 6月 30 22:39 conf drwxr-xr-x 2 activemq activemq 4096 6月 30 22:39 docs drwxr-xr-x 4 activemq activemq 4096 6月 30 22:39 lib drwxr-xr-x 6 activemq activemq 4096 6月 30 22:39 webapps -rw-r--r-- 1 activemq activemq 40581 5月 2 21:07 LICENSE -rw-r--r-- 1 activemq activemq 3335 5月 2 21:07 NOTICE -rw-r--r-- 1 activemq activemq 2613 5月 2 21:07 README.txt -rw-r--r-- 1 activemq activemq 2038 5月 2 21:07 WebConsole-README.txt -rw-r--r-- 1 activemq activemq 2812 5月 2 21:07 user-guide.html -rwxr-xr-x 1 activemq activemq 4984024 5月 2 20:18 activemq-all-5.6.0.jar
- bin/activemqの以下を修正
if [ -z "$ACTIVEMQ_USER" ] ; then ACTIVEMQ_USER="activemq" # ACTIVEMQ_USER="" fi #JAVA_HOME="" JAVA_HOME=[[Java]]インストールディレクトリ
起動
- sudo bin/activemq start で起動。ActiveMQのホームページには、bin/activemqで起動と書いてあったが、Usageを表示してサーバが起動しない。
- ps -ef | grep activemqで起動を確認
activemq 12032 1 0 Jul01 ? 00:00:35 /usr/java/jdk1.6.0_21/bin/java -Dorg.apache.activemq.audit=true -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote -Djava.io.tmpdir=/opt/apache-activemq-5.6.0/tmp -Dactivemq.classpath=/opt/apache-activemq-5.6.0/conf; -Dactivemq.home=/opt/apache-activemq-5.6.0 -Dactivemq.base=/opt/apache-activemq-5.6.0 -Dactivemq.conf=/opt/apache-activemq-5.6.0/conf -Dactivemq.data=/opt/apache-activemq-5.6.0/data -jar /opt/apache-activemq-5.6.0/bin/run.jar start
- netstat -an | grep 61616で、ポート確認
tcp 0 0 :::61616 :::* LISTEN
- http://ホスト:8161/adminで管理画面表示
- exampleのproducerを実行
cd example ant producer
- 管理画面のqueueをクリック。キューに2000件のメッセージが登録されている。
- TEST.FOOをクリックするとキューの詳細を表示
- exampleのconsumerを実行。キューが0件になる。
ant consumer
サンプルプログラム
ActiveMQと異なるホストからJNDIでConnectionFactoryを取得してキューに送信。
package test; import java.util.Date; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.naming.Context; import javax.naming.NamingException; public class ActiveMQTest { public static void main(String[] args) { ActiveMQTest test = new ActiveMQTest(); try { test.execute(); } catch (Exception e) { e.printStackTrace(); } } private int messageSize = 255; public void execute() throws NamingException, JMSException { Connection connection = null; try { Context jndiContext = new javax.naming.InitialContext(); ConnectionFactory connectionFactory = (ConnectionFactory)jndiContext.lookup("queueConnectionFactory"); connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("TEST.FOO"); MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); sendLoop(session, producer); } finally { try { if( connection != null) { connection.close(); } } catch (Throwable ignore) { } } } private void sendLoop(Session session, MessageProducer producer) throws JMSException { for (int i = 0; i < 100; i++) { TextMessage message = session .createTextMessage(createMessageText(i)); String msg = message.getText(); if (msg.length() > 50) { msg = msg.substring(0, 50) + "..."; } System.out.println("Sending message: '" + msg + "'"); producer.send(message); } } private String createMessageText(int index) { StringBuffer buffer = new StringBuffer(messageSize); buffer.append("Message: " + index + " sent at: " + new Date()); if (buffer.length() > messageSize) { return buffer.substring(0, messageSize); } for (int i = buffer.length(); i < messageSize; i++) { buffer.append(' '); } return buffer.toString(); } }
jndi.propertiesを作成しておく。
# START SNIPPET: jndi java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory # use the following property to configure the default connector java.naming.provider.url = tcp://ホスト:61616 # use the following property to specify the JNDI name the connection factory # should appear as. connectionFactoryNames = connectionFactory, queueConnectionFactory, topicConnectionFactry # register some queues in JNDI using the form # queue.[jndiName] = [physicalName] queue.MyQueue = example.MyQueue # register some topics in JNDI using the form # topic.[jndiName] = [physicalName] topic.MyTopic = example.MyTopic # END SNIPPET: jndi
クラスパスに、activemq-all-5.6.0.jar、spring-core-3.0.6.RELEASE.jar(これは本当に必要?)を設定しておく。
キューから受信
private void receiveQueueTest() throws NamingException, JMSException { Connection connection = null; Context jndiContext = new javax.naming.InitialContext(); ConnectionFactory connectionFactory = (ConnectionFactory) jndiContext .lookup("queueConnectionFactory"); connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("TEST.FOO"); MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(this); } public void onMessage(Message message) { TextMessage msg = (TextMessage) message; try { System.out.println( msg.getText()); } catch (JMSException e) { e.printStackTrace(); } }
たまっていたキューを全て受信。consumer.receive()とすると1件だけ受信。
Request-Respopnseの実現
JMS上でリクエストーレスポンスを実現する最も良い方法は、一時キューとクライアント毎のconsumerをスタートアップ時に作成することです。一時キュー向けのすべてのメッセージにJMSReplyToプロパティをセットし、リクエストメッセージとレスポンスメッセージを関連付けるIDを使うことである。これでconsumer, producerを毎回作成するオーバヘッドを防ぐことができる。また希望するなら多くのスレッド間で、producer, consumerを共有することができる。