naobe @ ウィキ
ActiveMQ
最終更新:
Bot(ページ名リンク)
-
view
OpenSourceに戻る
対象バージョン
5.6.0
インストール
- http://activemq.apache.org/download.html からバイナリをダウンロード
- 展開
- ファイル、ディレクトリともに権限をactivemq:activemqに修正
drwxr-xr-x 5 activemq activemq 4096 7月 1 18:32 data drwxr-xr-x 8 activemq activemq 4096 7月 1 18:32 example drwxr-xr-x 10 root admin 4096 7月 1 17:36 .. drwxr-xr-x 5 activemq activemq 4096 7月 1 14:04 bin drwxrwxr-x 5 activemq activemq 4096 7月 1 13:52 tmp drwxr-xr-x 10 activemq activemq 4096 7月 1 13:52 . drwxr-xr-x 2 activemq activemq 4096 6月 30 22:39 conf drwxr-xr-x 2 activemq activemq 4096 6月 30 22:39 docs drwxr-xr-x 4 activemq activemq 4096 6月 30 22:39 lib drwxr-xr-x 6 activemq activemq 4096 6月 30 22:39 webapps -rw-r--r-- 1 activemq activemq 40581 5月 2 21:07 LICENSE -rw-r--r-- 1 activemq activemq 3335 5月 2 21:07 NOTICE -rw-r--r-- 1 activemq activemq 2613 5月 2 21:07 README.txt -rw-r--r-- 1 activemq activemq 2038 5月 2 21:07 WebConsole-README.txt -rw-r--r-- 1 activemq activemq 2812 5月 2 21:07 user-guide.html -rwxr-xr-x 1 activemq activemq 4984024 5月 2 20:18 activemq-all-5.6.0.jar
- bin/activemqの以下を修正
if [ -z "$ACTIVEMQ_USER" ] ; then
ACTIVEMQ_USER="activemq"
# ACTIVEMQ_USER=""
fi
#JAVA_HOME=""
JAVA_HOME=[[Java]]インストールディレクトリ
起動
- sudo bin/activemq start で起動。ActiveMQのホームページには、bin/activemqで起動と書いてあったが、Usageを表示してサーバが起動しない。
- ps -ef | grep activemqで起動を確認
activemq 12032 1 0 Jul01 ? 00:00:35 /usr/java/jdk1.6.0_21/bin/java -Dorg.apache.activemq.audit=true -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote -Djava.io.tmpdir=/opt/apache-activemq-5.6.0/tmp -Dactivemq.classpath=/opt/apache-activemq-5.6.0/conf; -Dactivemq.home=/opt/apache-activemq-5.6.0 -Dactivemq.base=/opt/apache-activemq-5.6.0 -Dactivemq.conf=/opt/apache-activemq-5.6.0/conf -Dactivemq.data=/opt/apache-activemq-5.6.0/data -jar /opt/apache-activemq-5.6.0/bin/run.jar start
- netstat -an | grep 61616で、ポート確認
tcp 0 0 :::61616 :::* LISTEN
- http://ホスト:8161/adminで管理画面表示
- exampleのproducerを実行
cd example ant producer
- 管理画面のqueueをクリック。キューに2000件のメッセージが登録されている。
- TEST.FOOをクリックするとキューの詳細を表示
- exampleのconsumerを実行。キューが0件になる。
ant consumer
サンプルプログラム
ActiveMQと異なるホストからJNDIでConnectionFactoryを取得してキューに送信。
package test;
import java.util.Date;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.NamingException;
public class ActiveMQTest {
public static void main(String[] args) {
ActiveMQTest test = new ActiveMQTest();
try {
test.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
private int messageSize = 255;
public void execute() throws NamingException, JMSException {
Connection connection = null;
try {
Context jndiContext = new javax.naming.InitialContext();
ConnectionFactory connectionFactory = (ConnectionFactory)jndiContext.lookup("queueConnectionFactory");
connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("TEST.FOO");
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
sendLoop(session, producer);
} finally {
try {
if( connection != null) {
connection.close();
}
} catch (Throwable ignore) {
}
}
}
private void sendLoop(Session session, MessageProducer producer)
throws JMSException {
for (int i = 0; i < 100; i++) {
TextMessage message = session
.createTextMessage(createMessageText(i));
String msg = message.getText();
if (msg.length() > 50) {
msg = msg.substring(0, 50) + "...";
}
System.out.println("Sending message: '" + msg + "'");
producer.send(message);
}
}
private String createMessageText(int index) {
StringBuffer buffer = new StringBuffer(messageSize);
buffer.append("Message: " + index + " sent at: " + new Date());
if (buffer.length() > messageSize) {
return buffer.substring(0, messageSize);
}
for (int i = buffer.length(); i < messageSize; i++) {
buffer.append(' ');
}
return buffer.toString();
}
}
jndi.propertiesを作成しておく。
# START SNIPPET: jndi java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory # use the following property to configure the default connector java.naming.provider.url = tcp://ホスト:61616 # use the following property to specify the JNDI name the connection factory # should appear as. connectionFactoryNames = connectionFactory, queueConnectionFactory, topicConnectionFactry # register some queues in JNDI using the form # queue.[jndiName] = [physicalName] queue.MyQueue = example.MyQueue # register some topics in JNDI using the form # topic.[jndiName] = [physicalName] topic.MyTopic = example.MyTopic # END SNIPPET: jndi
クラスパスに、activemq-all-5.6.0.jar、spring-core-3.0.6.RELEASE.jar(これは本当に必要?)を設定しておく。
キューから受信
private void receiveQueueTest() throws NamingException, JMSException {
Connection connection = null;
Context jndiContext = new javax.naming.InitialContext();
ConnectionFactory connectionFactory = (ConnectionFactory) jndiContext
.lookup("queueConnectionFactory");
connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("TEST.FOO");
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(this);
}
public void onMessage(Message message) {
TextMessage msg = (TextMessage) message;
try {
System.out.println( msg.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
たまっていたキューを全て受信。consumer.receive()とすると1件だけ受信。
Request-Respopnseの実現
JMS上でリクエストーレスポンスを実現する最も良い方法は、一時キューとクライアント毎のconsumerをスタートアップ時に作成することです。一時キュー向けのすべてのメッセージにJMSReplyToプロパティをセットし、リクエストメッセージとレスポンスメッセージを関連付けるIDを使うことである。これでconsumer, producerを毎回作成するオーバヘッドを防ぐことができる。また希望するなら多くのスレッド間で、producer, consumerを共有することができる。
