RabbitMQ 学习一 了解+点对点模式

RabbitMQ 学习一 了解+点对点模式

消息队列

1、了解

RabbitMQ:

RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。

消息中间件 用于消息的接收 基于AMPQ协议,具有高度的一致性,安全性,跨平台性等。

生产者,RabbitMQ,消费者:

生产者将消息放在消息中间件的交换机中,交换机对应着消息队列,消费者从消息队列中取出生产者所存放的消息。实现消息的分发。消费者与消息队列绑定。

点对点的模式中是将消息放在默认的交换机中。

2、安装

尽量不要安装在windows中,安装在虚拟机中,可以直接下载安装包,也可以使用docker拉取。

docker拉取:先查询是否有该镜像

docker search rabbitmq:management

有则可以拉取

docker pull rabbitmq:management

然后创建容器并发布

docker run -d --name rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management

命令中的【RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin】是web管理平台的用户名和密码

【 -p 15672:15672】 是控制平台docker映射到系统的对应端口

【 -p 5672:5672】 是应用程序的访问端口

http://自己的虚拟机ip:15672

3、使用说明:

首先要新建一个虚拟主机,然后给虚拟主机绑定用户,通过生产者发送消息时,要连接到server,连接到server中对应的某一个虚机主机,通过具体的用户名密码,紧接着才可以把消息发布给交换机或者消息队列中,进而消费者才能去消息队列中消费消息(消费者也需要连接到rabbitMQ中的server,以及它对应的虚拟主机),当生产者发送完消息后,它就可以进行别的工作了,它与消费者是完全解耦的,消费者只需要去消息队列中查是否有对应的消息即可,当需要再次发送消息时,是需要重新建立连接即可。

虚拟主机就类似于数据库中的库。

学习时,生产者可以使用Test环境进行测试,而消费不行,因为消费者需要一直监听队列中的信息,若使用Test,那么或许当消费岗刚消费了这个消息时,线程控制权已经失去了,那么它便没有可能去对body做处理。

所以要放在main函数中。

点对点模式“Hello World”

引入依赖

com.rabbitmq

amqp-client

5.7.3

需要一个消息提供者,一个消息消费者和RabbitMQ

import com.rabbitmq.client.*;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

public class Consumer {

/**

* 点对点 只能有一个消费者 业务场景 用户注册进行短信验证时,短信验证部分可以交由消费这边的系统去完成

* @param args

* @throws IOException

* @throws TimeoutException

*/

//消费消息的代码

public static void main(String[] args) throws IOException, TimeoutException {

//获取虚拟主机的连接

ConnectionFactory connectionFactory = new ConnectionFactory();

connectionFactory.setHost("192.168.235.130");

connectionFactory.setPort(5672);

connectionFactory.setVirtualHost("/ems");

connectionFactory.setUsername("ems");

connectionFactory.setPassword("ems");

Connection connection = connectionFactory.newConnection();

Channel channel = connection.createChannel();

channel.queueDeclare("hello",false,false,false,null);

channel.basicConsume("hello", true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:"+new String(body)); } } ); } }

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import org.junit.jupiter.api.Test;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

public class Provider {

//生产消息的代码

@Test

public void testSendMessage() throws IOException, TimeoutException {

//创建连接工厂

ConnectionFactory connectionFactory = new ConnectionFactory();

//设置连接rabbitmq的主机

connectionFactory.setHost("192.168.235.130");

//设置端口号

connectionFactory.setPort(5672);

//设置连接哪个虚拟主机

connectionFactory.setVirtualHost("/ems");

//设置访问虚拟主机的用户名密码

connectionFactory.setUsername("ems");

connectionFactory.setPassword("ems");

//获取连接对象

Connection connection = connectionFactory.newConnection();

//通过连接获取连接中的通道对象

Channel channel = connection.createChannel();

//通道绑定对应的消息队列

//参数一 队列的名称,不存在会自动创建

//参数二 用来定义队列的特性 是否持久化 true持久化(不管是否重启Rabbitmq,队列都会存在,当RabbitMQ关闭时他会把队列存在磁盘中)

//参数三 是否独占队列 true独占 false可以被其他连接使用

//参数四 是否在消费完成后自动删除队列 true自动删除 false 不会自动删除

//参数五 附加参数

channel.queueDeclare("hello",false,false,false,null);

//发布消息

//参数一 交换机名称

//参数二 队列名称

//参数三 传递消息的额外设置

//参数四 传递的消息

channel.basicPublish("","hello",null,"hello rabbitmq".getBytes());

//关闭通道和连接

channel.close();

connection.close();

}

}

为了方便后续学习,可以将共同的代码抽取出来

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import java.util.Objects;

public class RabbitMQConnection {

//工厂是重量级的 每次都创建耗费资源 所以做一个静态的成员 只创建一次

private static ConnectionFactory connectionFactory;

//具体赋值是在类加载时执行,只执行一次

static{

connectionFactory = new ConnectionFactory();

//属性的赋值一旦确定下来很少变 所以也放在static中

connectionFactory.setHost("192.168.235.130");

connectionFactory.setPort(5672);

connectionFactory.setVirtualHost("/ems");

connectionFactory.setUsername("ems");

connectionFactory.setPassword("ems");

}

//定义提供连接对象的方法

public static Connection getConnection(){

try {

return connectionFactory.newConnection();

} catch (Exception e) {

e.printStackTrace();

}

return null;

}

//关闭通道和关闭连接的方法

public static void closeConnectionAndChanel(Channel channel,Connection connection){

try {

if(Objects.nonNull(channel))

{channel.close();}

if (Objects.nonNull(connection))

{connection.close();}

} catch (Exception e) {

e.printStackTrace();

}

}

}

相关推荐

常见音频接口的区别、对比和应用
mobile.365-588

常见音频接口的区别、对比和应用

08-04 👁️‍🗨️ 3807
袁姗姗最新电视剧
365bet信誉怎么样

袁姗姗最新电视剧

07-21 👁️‍🗨️ 5726
老话长谈 微商吸粉锁粉需要知道的事
365bet信誉怎么样

老话长谈 微商吸粉锁粉需要知道的事

06-29 👁️‍🗨️ 428
为什么奥特曼真的存在?
365bet信誉怎么样

为什么奥特曼真的存在?

07-14 👁️‍🗨️ 8043
祉的解释
mobile.365-588

祉的解释

07-13 👁️‍🗨️ 879
115个人如何离线下载 115个人离线下载教程
365bet信誉怎么样

115个人如何离线下载 115个人离线下载教程

07-23 👁️‍🗨️ 8561
小型快艇驾驶教程
365bet信誉怎么样

小型快艇驾驶教程

08-07 👁️‍🗨️ 6665
教你怎么注册微信公共账号
365bet体育滚球

教你怎么注册微信公共账号

08-08 👁️‍🗨️ 7636
为什么逢赌必输?看完这个后我终于顿悟了
365bet体育滚球

为什么逢赌必输?看完这个后我终于顿悟了

07-14 👁️‍🗨️ 4910