ActiveMQ学习--消息服务安装

ActiveMQ官网地址:http://activemq.apache.org/

安装和配置步骤记录如下:

1.下载软件,下载导航页如下,进入后会自动提示最佳下载地址,然后点击即开始下载。

http://www.apache.org/dyn/closer.cgi?path=/activemq/5.9.0/apache-activemq-5.9.0-bin.tar.gz

2.解压缩,并移动到任意位置

tar zxf apache-activemq-5.9.0-bin.tar.gz
mv apache-activemq-5.9.0 /usr/local

3.创建activeMQ实例

cd /usr/local/apache-activemq-5.9.0/bin
./activemq create /alidata/myactivemq  #创建一个broker为myactivemq的实例在/alidata/myactivemq路径下。执行完命令后会自动创建/alidata/myactivemq目录
./activemq setup /etc/default/activemq #创建默认配置文件,最新的版本已经取消了setup命令所以可以忽略这一步

4.配置myactivemq,需要配置的地方大概有三个地方:tcp监听端口、data目录位置、登录验证用户。

配置data目录位置:

修改文件路径        /alidata/myactivemq/conf/credentials.properties
添加一行配置在文件末尾     activemq.data=data   #保存数据到每个实例的data目录下

修改主配置文件,路径是: /alidata/myactivemq/conf/activemq.xml

修复后内容如下:

<?xml version="1.0" encoding="UTF-8"?>
		...
    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="myactivemq" dataDirectory="${activemq.data}">
				...
    <plugins>
        <!-- 安全认证 -->
        <simpleAuthenticationPlugin>  
            <users>
            <authenticationUser username="admin-myactivemq" password="admin-myactivemq" groups="users,admins"/>
            <authenticationUser username="user-myactivemq" password="user-myactivemq" groups="users"/>
            <!--
            <authenticationUser username="guest" password="password" groups="guests"/>
            -->
            </users>
        </simpleAuthenticationPlugin>
    </plugins>

    ...
    <systemUsage>
      <systemUsage>
          <memoryUsage>
              <memoryUsage percentOfJvmHeap="70"/>
          </memoryUsage>
          <storeUsage>
              <storeUsage limit="10 gb"/>
          </storeUsage>
          <tempUsage>
              <tempUsage limit="5 gb"/>
          </tempUsage>
      </systemUsage>
  </systemUsage>

  ...
  <transportConnectors>
      <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
      <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
  </transportConnectors>
  ...

  </broker>

...
<!-- 不启用内嵌服务器
<import resource="jetty.xml"/>
-->
</beans>

5.启动刚创建的ActiveMQ实例

/alidata/myactivemq/bin/myactivemq start   # 启动
#/alidata/myactivemq/bin/myactivemq stop   # 停止

如果出现如下提示,则表示启动成功。
INFO: Loading '/etc/default/activemq'
INFO: Using java '/usr/bin/java'
INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details
INFO: pidfile created : '/alidata/myactivemq/data/activemq-AY130306141237620587.pid' (pid '22650')

6.测试activemq是否工作正常,代码如下:

package com.myactivemq;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class App 
{
	static String username = "user-myactivemq";
	static String password = "user-myactivemq";
	static String brokerURL = "tcp://xx.xx.xx.xx:61616";
	
    public static void main( String[] args ) throws Exception
    {
        thread(new Producer(), false);
        thread(new Producer(), false);
        thread(new Consumer(), false);
        Thread.sleep(1000);
        thread(new Consumer(), false);
        thread(new Producer(), false);
        thread(new Consumer(), false);
        thread(new Producer(), false);
        Thread.sleep(1000);
        thread(new Consumer(), false);
        thread(new Producer(), false);
        thread(new Consumer(), false);
        thread(new Consumer(), false);
        thread(new Producer(), false);
        thread(new Producer(), false);
        Thread.sleep(1000);
        thread(new Producer(), false);
        thread(new Consumer(), false);
        thread(new Consumer(), false);
        thread(new Producer(), false);
        thread(new Consumer(), false);
        thread(new Producer(), false);
        thread(new Consumer(), false);
        thread(new Producer(), false);
        thread(new Consumer(), false);
        thread(new Consumer(), false);
        thread(new Producer(), false);
    }
    
    public static void thread(Runnable runnable, boolean daemon) {
        Thread brokerThread = new Thread(runnable);
        brokerThread.setDaemon(daemon);
        brokerThread.start();
    }
    
    static class Producer implements Runnable{

		public void run() {
			try {
				// Create a ConnectionFactory
				ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(username, password, brokerURL);
				
				// Create a Connection
				Connection connection = connectionFactory.createConnection();
				connection.start();
				
				// Create a Session
				Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
				
				// Create the destination (Topic or Queue)
				Destination destination = session.createQueue("TEST.FOO");
				
				// Create a MessageProducer from the Session to the Topic or Queue
				MessageProducer producer = session.createProducer(destination);
				producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
				
				// Create a messages
				String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode();
				TextMessage message = session.createTextMessage(text);
				
				// Tell the producer to send the message
				System.out.println("Sent message: "+ message.hashCode() + " : " + Thread.currentThread().getName());
				producer.send(message);
         
				// Clean up
				session.close();
				connection.close();
			} catch (JMSException e) {
				e.printStackTrace();
			}
		}
    	
    }
    
    static class Consumer implements Runnable, ExceptionListener{

		public void run() {
			try {
		        // Create a ConnectionFactory
                ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(username, password, brokerURL);
 
                // Create a Connection
                Connection connection = connectionFactory.createConnection();
                connection.start();
 
                connection.setExceptionListener(this);
 
                // Create a Session
                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
                // Create the destination (Topic or Queue)
                Destination destination = session.createQueue("TEST.FOO");
 
                // Create a MessageConsumer from the Session to the Topic or Queue
                MessageConsumer consumer = session.createConsumer(destination);
 
                // Wait for a message
                Message message = consumer.receive(1000);
 
                if (message instanceof TextMessage) {
                    TextMessage textMessage = (TextMessage) message;
                    String text = textMessage.getText();
                    System.out.println("Received: " + text);
                } else {
                    System.out.println("Received: " + message);
                }
 
                consumer.close();
                session.close();
                connection.close();
			} catch (JMSException e) {
				e.printStackTrace();
			}
		}

		public void onException(JMSException arg0) {
			System.out.println("JMS Exception occured.  Shutting down client.");			
		}
    	
    }
}

运行结果如下:

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Sent message: 345667 : Thread-0
Sent message: 4725080 : Thread-1
Received: Hello world! From: Thread-1 : 817096
Sent message: 20868995 : Thread-8
Sent message: 22545518 : Thread-6
Received: Hello world! From: Thread-0 : 27018084
Received: Hello world! From: Thread-8 : 22649035
Sent message: 21744361 : Thread-16
Sent message: 27432016 : Thread-15
Sent message: 25745710 : Thread-12
Received: Hello world! From: Thread-6 : 724058
Received: Hello world! From: Thread-15 : 16158948
Received: Hello world! From: Thread-12 : 11998234
Sent message: 33268399 : Thread-26
Sent message: 11160700 : Thread-22
Sent message: 20704708 : Thread-29
Sent message: 4657294 : Thread-24
Sent message: 22470836 : Thread-19
Received: Hello world! From: Thread-16 : 26336541
Received: Hello world! From: Thread-22 : 24489190
Received: Hello world! From: Thread-26 : 2273946
Received: Hello world! From: Thread-29 : 8611945
Received: Hello world! From: Thread-19 : 9921725
Received: Hello world! From: Thread-24 : 8972349


提交评论