[TOC]
# 什么是JMS
JMS即Java消息服务(Java Message Service)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。
Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。
JMS是一种与厂商无关的 API,用来访问消息收发系统消息。它类似于JDBC(Java Database Connectivity):
这里,JDBC 是可以用来访问许多不同关系数据库的 API,而 JMS 则提供同样与厂商无关的访问方法,以访问消息收发服务。许多厂商都支持 JMS,包括 IBM 的 MQSeries、BEA的 Weblogic JMS service和 Progress 的 SonicMQ,这只是几个例子。
JMS 使您能够通过消息收发服务(有时称为消息中介程序或路由器)从一个 JMS 客户机向另一个 JMS客户机发送消息。消息是 JMS 中的一种类型对象,由两部分组成:报头和消息主体。报头由路由信息以及有关该消息的元数据组成。消息主体则携带着应用程序的数据或有效负载。根据有效负载的类型来划分,可以将消息分为几种类型,它们分别携带:简单文本(TextMessage)、可序列化的对象 (ObjectMessage)、属性集合 (MapMessage)、字节流 (BytesMessage)、原始值流 (StreamMessage),还有无有效负载的消息 (Message)。
# JMS规范
1. 专业技术规范
JMS(Java Messaging Service)是Java平台上有关面向消息中间件(MOM)的技术规范,它便于消息系统中的Java应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发,翻译为Java消息服务。
2. 体系架构
JMS由以下元素组成。
JMS提供者:连接面向消息中间件的,JMS接口的一个实现。提供者可以是Java平台的JMS实现,也可以是非Java平台的面向消息中间件的适配器。
JMS客户:生产或消费基于消息的Java的应用程序或对象。
JMS生产者:创建并发送消息的JMS客户。
JMS消费者:接收消息的JMS客户。
JMS消息:包括可以在JMS客户之间传递的数据的对象
JMS队列:一个容纳那些被发送的等待阅读的消息的区域。一旦一个消息被阅读,该消息将被从队列中移走。
JMS主题:一种支持发送消息给多个订阅者的机制。
# Java消息服务应用程序结构支持两种模型
1. 点对点或队列模型
在点对点或队列模型下,一个生产者向一个特定的队列发布消息,一个消费者从该队列中读取消息。这里,生产者知道消费者的队列,并直接将消息发送到消费者的队列
![](https://box.kancloud.cn/a76f56e71eea6602f63c33bc4cd8c061_793x145.png)
这种模式被概括为:
只有一个消费者将获得消息
生产者不需要在接收者消费该消息期间处于运行状态,接收者也同样不需要在消息发送时处于运行状态。
每一个成功处理的消息都由接收者签收
2. 发布者/订阅者模型
发布者/订阅者模型支持向一个特定的消息主题发布消息。0或多个订阅者可能对接收来自特定消息主题的消息感兴趣。在这种模型下,发布者和订阅者彼此不知道对方。这种模式好比是匿名公告板。
![](https://box.kancloud.cn/16600f938c0732f2c1607842eb51c57a_789x375.png)
这种模式被概括为:
多个消费者可以获得消息
在发布者和订阅者之间存在时间依赖性。发布者需要建立一个订阅(subscription),以便客户能够订阅。订阅者必须保持持续的活动状态以接收消息,除非订阅者建立了持久的订阅。在那种情况下,在订阅者未连接时发布的消息将在订阅者重新连接时重新发布。
# activemq
## 安装
1. 下载ActiveMQ
去官方网站下载:http://activemq.apache.org/
2. 运行ActiveMQ
解压缩apache-activemq-5.5.1-bin.zip,
修改配置文件activeMQ.xml,将0.0.0.0修改为localhost
~~~
<transportConnectors>
<transportConnector name="openwire" uri="tcp://localhost:61616"/>
<transportConnector name="ssl" uri="ssl://localhost:61617"/>
<transportConnector name="stomp" uri="stomp://localhost:61613"/>
<transportConnector uri="http://localhost:8081"/>
<transportConnector uri="udp://localhost:61618"/>
~~~
然后双击`apache-activemq-5.5.1\bin\win64\activemq.bat`运行ActiveMQ程序。
启动ActiveMQ以后,登陆:`http://localhost:8161/admin/`,创建一个Queue,命名为FirstQueue。
3. 运行代码
~~~
package cn.itcast_03_mq.queue
package cn.itcast_03_mq.topic
~~~
账号密码都是admin
4. 配置文件activemq.xml
![](https://box.kancloud.cn/bfdadfb488b4ca2806f3792dfeef4215_2354x516.png)
## 代码
### 生产者
~~~
package mq;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class ProducerTool {
private String user = ActiveMQConnection.DEFAULT_USER;
private String password = ActiveMQConnection.DEFAULT_PASSWORD;
private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
// private String url = "failover://tcp://localhost:61616";
//主题
private String subject = "myqueue";
//目标
private Queue destination = null;
//连接
private Connection connection = null;
//会话
private Session session = null;
private MessageProducer producer = null;
//初始化
private void initialize() throws JMSException {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
connection = connectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue(subject);
producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
//发送消息
public void produceMessage(String message) throws JMSException {
initialize();
//创建文本消息,把消息变成他的格式
TextMessage msg = session.createTextMessage(message);
connection.start();
System.out.println("Producer:->Sending message: " + message);
producer.send(msg);
System.out.println("Producer:->Message sent complete!");
}
//关闭连接
public void close() throws JMSException {
System.out.println("Producer:->Closing connection");
if (producer != null) {
producer.close();
}
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
}
~~~
生产者内部是开启多个线程在生产的
**测试**
~~~
package mq;
import javax.jms.JMSException;
public class ProducerTest {
public static void main(String[] args) throws JMSException {
ProducerTool producerTool = new ProducerTool();
for (int i=0; i<10; i++) {
producerTool.produceMessage("hello,world"+i);
}
producerTool.close();
}
}
~~~
**网页**
![](https://box.kancloud.cn/a1cc8f227d923e767a8747d77535237f_2702x546.png)
### 消费者
~~~
package mq;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class ConsumerTool implements MessageListener, ExceptionListener {
private String user = ActiveMQConnection.DEFAULT_USER;
private String password = ActiveMQConnection.DEFAULT_PASSWORD;
// private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
private String url = "failover://tcp://localhost:41414";
private String queue = "myqueue";
private Destination destination = null;
private Connection connection = null;
private Session session = null;
private MessageConsumer consumer = null;
private ActiveMQConnectionFactory connectionFactory = null;
public static Boolean isconnection = false;
// 初始化
private void initialize() throws JMSException {
connectionFactory = new ActiveMQConnectionFactory(
user, password, url);
connection = connectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue(queue);
consumer = session.createConsumer(destination);
}
public void consumeMessage() throws JMSException {
initialize();
connection.start();
//activemq采用推送机制来消耗消息
consumer.setMessageListener(this);
connection.setExceptionListener(this);
System.out.println("Consumer " + Thread.currentThread().getName() + " :=>local listening...");
isconnection = true;
//开始监听
Message message = consumer.receive();
System.out.println(message.getJMSMessageID());
}
//如果是异常的话
@Override
public void onException(JMSException e) {
isconnection = false;
}
// 消息处理函数
@Override
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
TextMessage txtMsg = (TextMessage) message;
String msg = txtMsg.getText();
System.out.println("Consumer " + Thread.currentThread().getId() + " :=>Received: " + msg);
} else {
System.out.println("Consumer " + Thread.currentThread().getId() + " :=>Received: " + message);
}
} catch (Exception e) {
e.printStackTrace();
}
}
// 关闭连接
public void close() throws JMSException {
System.out.println("Consumer" + Thread.currentThread().getName() + ":->Closing connection");
if (consumer != null)
consumer.close();
if (session != null)
session.close();
if (connection != null)
connection.close();
}
}
~~~
**测试**
~~~
package mq;
public class ConsumerTest implements Runnable{
static Thread t1 = null;
static Thread t2 = null;
public static void main(String[] args) {
//创建消费者
t1 = new Thread(new ConsumerTest());
t1.start();
t2 = new Thread(new ConsumerTest());
t2.start();
/*
* while (true) { System.out.println(t1.isAlive()); if (!t1.isAlive()) {
* t1 = new Thread(new ConsumerTest()); t1.start();
* System.out.println("重新启动"); } Thread.sleep(5000); } 延时500毫秒之后停止接受消息
* Thread.sleep(500); consumer.close();
*/
}
@Override
public void run() {
try{
ConsumerTool consumer = new ConsumerTool();
consumer.consumeMessage();
//如果断开连接,主线程就不走
while (ConsumerTool.isconnection) {
}
}catch (Exception e) {
}
}
}
~~~
- linux
- 常用命令
- 高级文本命令
- 面试题
- redis
- String
- list
- hash
- set
- sortedSet
- 案例-推荐
- java高级特性
- 多线程
- 实现线程的三种方式
- 同步关键词
- 读写锁
- 锁的相关概念
- 多线程的join
- 有三个线程T1 T2 T3,保证顺序执行
- java五种线程池
- 守护线程与普通线程
- ThreadLocal
- BlockingQueue消息队列
- JMS
- 反射
- volatile
- jvm
- IO
- nio
- netty
- netty简介
- 案例一发送字符串
- 案例二发送对象
- 轻量级RPC开发
- 简介
- spring(IOC/AOP)
- spring初始化顺序
- 通过ApplicationContextAware加载Spring上下文
- InitializingBean的作用
- 结论
- 自定义注解
- zk在框架中的应用
- hadoop
- 简介
- hadoop集群搭建
- hadoop单机安装
- HDFS简介
- hdfs基本操作
- hdfs环境搭建
- 常见问题汇总
- hdfs客户端操作
- mapreduce工作机制
- 案列-单词统计
- 局部聚合Combiner
- 案列-流量统计(分区,排序,比较)
- 案列-倒排索引
- 案例-共同好友
- 案列-join算法实现
- 案例-求topN(分组)
- 自定义inputFormat
- 自定义outputFormat
- 框架运算全流程
- mapreduce的优化方案
- HA机制
- Hive
- 安装
- DDL操作
- 创建表
- 修改表
- DML操作
- Load
- insert
- select
- join操作
- 严格模式
- 数据类型
- shell参数
- 函数
- 内置运算符
- 内置函数
- 自定义函数
- Transform实现
- 特殊分割符处理
- 案例
- 级联求和accumulate
- flume
- 简介
- 安装
- 常用的组件
- 拦截器
- 案例
- 采集目录到HDFS
- 采集文件到HDFS
- 多个agent串联
- 日志采集和汇总
- 自定义拦截器
- 高可用配置
- 使用注意
- sqoop
- 安装
- 数据导入
- 导入数据到HDFS
- 导入关系表到HIVE
- 导入表数据子集
- 增量导入
- 数据导出
- 作业
- 原理
- azkaban
- 简介
- 安装
- 案例
- 简介
- command类型单一job
- command类型多job工作流flow
- HDFS操作任务
- mapreduce任务
- hive脚本任务
- hbase
- 简介
- 安装
- 命令行
- 基本CURD
- 过滤器查询
- 系统架构
- 物理存储
- 寻址机制
- 读写过程
- Region管理
- master工作机制
- 建表高级属性
- 与mapreduce结合
- 协处理器
- 点击流平台开发
- 简介
- storm
- 简介
- 安装
- 集群启动及任务过程分析
- 单词统计
- 并行度
- ACK容错机制
- ACK简介