一、下载与安装

直接去官网(http://activemq.apache.org/)下载最新版本即可,由于这是免安装的,只需要解压就行了。安装完之后进入bin目录,双击 activemq.bat文件(linux下在bin目录下执行 activemq start)

二、访问控制台

在浏览器输入:http://ip:8161/admin/,出现如下界面表示启动成功,默认的用户名密码都是admin

activemq端口怎么调(activemq详解)(1)

三、修改端口号

61616为对外服务端口号

8161为控制器端口号

当端口号冲突时,可以修改这两个端口号。cd conf ,修改activemq.xml 修改里面的61616端口。修改jetty.xml,修改里面的8161端口。

queue队列模式:

和rabbitmq简单队列模式一样,若是有多个消费者消费同一个队列中的消息的话,默认也是轮询机制的消费

示例代码:

public class Productor {    public static final String BORKER_URL = "tcp://127.0.0.1:61616";    public static final String QUEUE_NAME = "queue1";    public static void main(String[] args) throws JMSException {        //创建工厂        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BORKER_URL);        //创建tcp连接        Connection connection = factory.createConnection();        //建立连接        connection.start();        /**         * 创建会话,1.是否开启事务,2.签收模式         */        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        //创建队列(消息的目的地)        Queue queue = session.createQueue(QUEUE_NAME);        //创建生产者        MessageProducer producer = session.createProducer(queue);        //消息非持久化        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);        //消息持久化 默认是持久化的//        producer.setDeliveryMode(DeliveryMode.PERSISTENT);        //创建消息        TextMessage message = session.createTextMessage("你好吗");        //发送消息        producer.send(message);        producer.close();        session.close();        connection.close();        System.out.println("发送成功!");    }}public class Consumer {    public static final String BORKER_URL = "tcp://127.0.0.1:61616";    public static final String QUEUE_NAME = "queue1";    public static void main(String[] args) throws JMSException {        //创建工厂        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BORKER_URL);        //创建tcp连接        Connection connection = factory.createConnection();        //建立连接        connection.start();        /**         * 创建会话,1.是否开启事务,2.签收模式         */        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        //创建/声明队列(消息的目的地)        Queue queue = session.createQueue(QUEUE_NAME);        //创建消费者        MessageConsumer consumer = session.createConsumer(queue);        /*while (true) {            //receive会阻塞线程            TextMessage message = (TextMessage)consumer.receive();            System.out.println("接收到消息:" + message.getText());        }*/        //监听的方式消费        consumer.setMessageListener(message -> {            TextMessage textMessage = (TextMessage)message;            try {                System.out.println("1号接收到消息:" + textMessage.getText());            } catch (JMSException e) {                e.printStackTrace();            }        });    }}

topic队列模式:

称为发布订阅模式,生产者把消息发送给订阅给某个topic主题的消费者,是分发的模式,这种模式默认需要先启动消费者,不然就算生产者发布了某个topic主题的消息,消费者也消费不了;除非消费者提前订阅,并且做了消息持久化的处理,这样后启动消费者才能消费提前推送的消息。

代码:

public class Productor {    public static final String BORKER_URL = "tcp://127.0.0.1:61616";    public static final String TOPIC_NAME = "topic1";    public static void main(String[] args) throws JMSException {        //创建工厂        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BORKER_URL);        //异步投递        factory.setUseAsyncSend(true);        //创建tcp连接        Connection connection = factory.createConnection();        /**         * 创建会话,1.是否开启事务,2.签收模式         */        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        //创建/声明topic(消息的目的地)        Topic topic = session.createTopic(TOPIC_NAME);        //创建生产者        ActiveMQMessageProducer producer = (ActiveMQMessageProducer)session.createProducer(topic);        //持久化        producer.setDeliveryMode(DeliveryMode.PERSISTENT);        //建立连接        connection.start();        //创建消息        TextMessage message = session.createTextMessage("你好吗");        //发送消息,异步发送回调函数        producer.send(message, new AsyncCallback() {            @Override            public void onSuccess() {                System.out.println("success");            }            @Override            public void onException(JMSException e) {                System.out.println("fail");            }        });        producer.close();        session.close();        connection.close();        System.out.println("发送成功!");    }}public class Consumer1 {    public static final String BORKER_URL = "tcp://127.0.0.1:61616";    public static final String TOPIC_NAME = "topic1";    public static void main(String[] args) throws JMSException {        //创建工厂        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BORKER_URL);        //创建tcp连接        Connection connection = factory.createConnection();        //制定clientId        connection.setClientID("my");        /**         * 创建会话,1.是否开启事务,2.签收模式         */        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        //创建/声明topic(消息的目的地)        Topic topic = session.createTopic(TOPIC_NAME);        //订阅主题        TopicSubscriber subscriber = session.createDurableSubscriber(topic, "remark");        //建立连接        connection.start();        while (true) {            //receive会阻塞线程            //接收订阅的消息            TextMessage message = (TextMessage) subscriber.receive();            System.out.println("接收到消息:" + message.getText());        }        /*//创建消费者        MessageConsumer consumer = session.createConsumer(topic);        //建立连接        connection.start();        *//*while (true) {            //receive会阻塞线程            TextMessage message = (TextMessage)consumer.receive();            System.out.println("接收到消息:" + message.getText());        }*//*        //监听的方式消费        consumer.setMessageListener(message -> {            TextMessage textMessage = (TextMessage)message;            try {                System.out.println("1号接收到消息:" + textMessage.getText());            } catch (JMSException e) {                e.printStackTrace();            }        });*/    }}

如何保证消息的可靠性

回答这个问题主要从持久化,事务,签收这几个方面入手

消息持久化的核心代码:

//queue模式的消息持久化 默认是持久化的 producer.setDeliveryMode(DeliveryMode.PERSISTENT);  /** * topic模式的持久化 */Topic topic = session.createTopic(TOPIC_NAME);ActiveMQMessageProducer producer = (ActiveMQMessageProducer)session.createProducer(topic);producer.setDeliveryMode(DeliveryMode.PERSISTENT);connection.start();

事务的核心代码(偏生产者):

//参数设置成trueconnection.createSession(false, Session.AUTO_ACKNOWLEDGE);//事务提交session.commit();

签收的核心代码(偏消费者):

//参数设置成手动提交connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);//消息签收message.acknowledge();

注意:若是既开启事务,又开启手动签收,以事务为准,只要事务被提交了也默认消息被签收了

性能提升:

1.利用nio的协议比tcp的性能高,

  • 配置方式:在conf目录下activemq.xml照着下面配置
<broker>  ...  <transportConnectors>    <transportConnector name="nio" uri="nio://0.0.0.0:61616"/>    </<transportConnectors>  ...</broker>

  • 第二步是代码访问方式由tcp改为nio
//创建工厂ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("nio://127.0.0.1:61616");

2.jdbc+Journaling提高只有jdbc持久化的性能,它在做持久化入数据库之前,会先将数据保存到Journaling文件中,之后才慢慢同步到数据库中,等于中间加了一层缓冲层。

  • 把数据库mysql的驱动包放到lib目录下
  • 配置方式:在conf目录下activemq.xml照着下面配置,其中有个createTablesOnStartup属性,默认值是true,表示每次启动后去数据库自动建表
<persistenceAdapter>  <kahaDB directory="${activemq.data}/kahadb"/> </persistenceAdapter> //上面是默认配置找到改成下面的配置<persistenceAdapter>   <journalPersistenceAdapterFactory journalLogFiles="5" dataDirectory="${basedir}/activemq-data" dataSource="#mysql-ds"/></persistenceAdapter> //下面的代码写在<beans>节点中<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">  <property name="driverClassName" value="com.mysql.jdbc.Driver"/>  <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>  <property name="username" value="activemq"/>  <property name="password" value="activemq"/>  <property name="poolPreparedStatements" value="true"/></bean>