一文搞懂消息推送技术选型(消息推送原理以及实现过程)

createh51个月前 (01-31)技术教程19


  1. Ajax短轮询
  2. MQ
  3. redis 订阅/发布

Ajax短轮询

优点:

简单高效、浏览器使用循环不断地、间隔地发送请求获取数据

缺点:

频繁创建/断开连接,每次请求都会查询一遍数据不管有无都返回,对服务器业务处理的性能有很大的需求和压力;因为请求间有间隔时间,获取的数据是伪实时的,不适应对实时性要求很高的项目。

典型运用:

扫码登录

MQ

MQ的引入虽然 会造成技术的复杂度提升,但是合理的使用会极大的提高系统的 容错能力。

  • 优点:
  • 一般MQ都用于 系统解耦、流量削峰、数据分发
  • 缺点:

如果MQ服务挂了,导致消息发送和接收就无法使用了

复杂度提高。

MQ的对比

特性

ActiveMQ

RabbitMQ

RocketMQ

Kafka

单机吞吐量

万级,吞吐量比RocketMQ和Kafka要低了一个数量级

万级,吞吐量比RocketMQ和Kafka要低了一个数量级

10万级,RocketMQ也是可以支撑高吞吐的一种MQ

10万级别,这是kafka最大的优点,就是吞吐量高。 一般配合大数据类的系统来进行实时数据计算、日志采集等场景

topic数量对吞吐量的影响



topic可以达到几百,几千个的级别,吞吐量会有较小幅度的下降 这是RocketMQ的一大优势,在同等机器下,可以支撑大量的topic

topic从几十个到几百个的时候,吞吐量会大幅度下降 所以在同等机器下,kafka尽量保证topic数量不要过多。如果要支撑大规模topic,需要增加更多的机器资源

时效性

ms级

微秒级,这是rabbitmq的一大特点,延迟是最低的

ms级

延迟在ms级以内

可用性

高,基于主从架构实现高可用性

高,基于主从架构实现高可用性

非常高,分布式架构

非常高,kafka是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用

消息可靠性

有较低的概率丢失数据

有较低的概率丢失数据

经过参数优化配置,可以做到0丢失

经过参数优化配置,消息可以做到0丢失

rabbitmq 基于Elang语言编写 虽然提供了天然的高并发能力,但是 不利于 深入了解与掌握。

MQ的引入 需要保证两点:可靠性、高可用、幂等性。

mq想如果需要保证可靠性、在某些 对于实时性要求较高的 业务中,那么需要对消息进行持久化、以及保证消息的不丢失。

可靠性

结合三点就是生产者丢失消息、mq自身丢失消息、消费者丢失消息

MQ 若要保障消息的不丢失,对于rabbitmq来讲,常用的有两种方式:

1、开启事务

// 开启事务
channel.txSelect
try {
// 这里发送消息
} catch (Exception e) {
channel.txRollback
// 这里再次重发这条消息
}
// 提交事务
channel.txCommit

但是 对于rabbitmq 来说 开启事务 造成性能上的 浪费是很大的

消息数量

开启事务

未开启事务

10w

320796ms

10246ms

开启事务 与不开启事务 对于 性能上的开销是 320倍。因为 其被 @Transaction注解 标注过,对于每条消息都会被事务拦截器拦截处理。

2、ACK机制

关闭自动ACK,使用手动ACK。RabbitMQ中有一个ACK机制,默认情况下消费者接收到到消息,RabbitMQ会自动提交ACK,之后这条消息就不会再发送给消费者了。我们可以更改为手动ACK模式,每次处理完消息之后,再手动ack一下。不过这样可能会出现刚处理完还没手动ack确认,消费者挂了,导致消息重复消费。

spring:
rabbitmq:
addresses: 127.0.0.1
port: 5672
username: guest
password: guest
# 发送者开启 confirm 确认机制
publisher-confirm-type: correlated
# 发送者开启 return 确认机制
publisher-returns: true
listener:
simple:
concurrency: 10
max-concurrency: 10
prefetch: 1
auto-startup: true
default-requeue-rejected: true
# 设置消费端手动 ack
acknowledge-mode: manual
# 是否支持重试
retry:
enabled: true

@RabbitHandler
public void handlerMq(String msg, Channel channel, Message message) throws IOException {
try {
channel.basicAck(
message.getMessageProperties().getDeliveryTag(), false);


} catch (Exception e) {
if (
message.getMessageProperties().getRedelivered()) {

log.error("消息已重复处理失败,拒绝再次接收...", e);
channel.basicReject(
message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息

} else {
log.error("消息即将再次返回队列处理...", e);
channel.basicNack(
message.getMessageProperties().getDeliveryTag(), false, true);

}
}

}

rabbitq提供了两个回调方法

confirm 与return 回调

confirm 是用于生产者发送消息,保证交换机exchange能正常收到,但是无法保证 从exchange的消息 正常发送给队列去消费。

return回调是处理一些 不可正确路由的消息,如exchange 不存在,或者就是路由key 无法正确找到队列。

这两种机制 是可靠性的 重要保障,可以保证消息正常的在mq中传递。

@Component
@Slf4j
public class RabbitMQConfirmAndReturn implements
RabbitTemplate.ConfirmCallback,
RabbitTemplate.ReturnCallback {

/**
* confirm机制只保证消息到达exchange,不保证消息可以路由到正确的queue,如果exchange错误,就会触发confirm机制
*
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
log.error("rabbitmq confirm fail,cause:{}", cause);
}
}
/**
* Return 消息机制用于处理一个不可路由的消息。在某些情况下,如果我们在发送消息的时候,当前的 exchange 不存在或者指定路由 key 路由不到,这个时候我们需要监听这种不可达的消息
* @param message
* @param replyCode
* @param replyText
* @param exchange
* @param routingKey
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.error("mq消息不可达,message:{},replyCode:{},replyText:{},exchange:{},routing:{}", message.toString(), replyCode, replyText, exchange, routingKey);
String messageId =
message.getMessageProperties().getMessageId();

}
}

MQ的幂等性搭建:

ACK机制能保证消息一定能被消费但是无法保证消息被消息了几次,这就需要额外编码来保证幂等性,而rabbitmq没有提供额外的幂等操作需要额外代码保证。

MQ的高可用

rabbitmq的消息是储存在一个节点中,让mq的节点崩溃后 其存储的消息就会丢失,会造成服务的不可用,如果使用缓存使用一个持久化的queue,但是在message发送并写入磁盘之间会存在一个虽然短暂的时间差。

为了避免节点失效,将mq节点进行集群处理,当一个节点失效后 就有第二个节点接替前一个节点工作。单失效的那个节点上的消息无法被找回。

镜像队列的配置:

rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]
-p Vhost: 可选参数,针对指定vhost下的queue进行设置
Name: policy的名称
Pattern: queue的匹配模式(正则表达式)
Definition:镜像定义,包括三个部分ha-mode, ha-params, ha-sync-mode
ha-mode:指n明镜像队列的模式,有效值为 all/exactly/nodes
all:表示在集群中所有的节点上进行镜像
exactly:表示在指定个数的节点上进行镜像,节点的个数由ha-params指定
nodes:表示在指定的节点上进行镜像,节点名称通过ha-params指定
ha-params:ha-mode模式需要用到的参数
ha-sync-mode:进行队列中消息的同步方式,有效值为automatic和manual
priority:可选参数,policy的优先级

rabbitmqctl set_policy --priority 0 --apply-to queues mirror_queue "^queue_" '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'

可以通过下面命令判断那些slaves已经完成同步

rabbitmqctl list_queues name slave_pids synchronised_slave_pids

镜像队列的原理:

redis 订阅/通知

优缺点:

redis的 订阅通知 与rabbitmq相比,其优势体现在不想要搭建复杂笨重的 MQ ,简单轻量。但是由于redis没有类似于mq的消息持久化与ACK的保证,所以redis实现的发布/订阅功能并不可靠,仅适用于实时、且可靠性不高的场景(因为redis的订阅/发布目前是发送即忘的形式,如果客户端短线即会丢失)。如一些列消息的弹窗通知、有效期等等。

实现方式之一:

redis的键空间通知

配置:

  1. 首先找到redis.conf配置文件,打开文件,查找notify-keyspace-events,将前面的#去掉便可。注意:这里配置的是notify-keyspace-events的Ex参数,即说明,当键过时的时候会触发通知,若是只须要哈希命令键触发通知则能够设置为notify-keyspace-events Eh。
  2. 重启redis-server。
  3. 配置完成。

redis:
localhost: localhost
port: 6379
database: 7
password:
# 过期事件订阅,接收7号数据库中所有key的过期事件
listen-pattern: __keyevent@7__:expired

@Configuration
public class
RedisListenerConfiguration {

@Value("${
spring.redis.listen-pattern}")

public String pattern;
@Bean
public
RedisMessageListenerContainer listenerContainer(RedisConnectionFactory redisConnection) {


RedisMessageListenerContainer container = new
RedisMessageListenerContainer();


container.setConnectionFactory(redisConnection);

/**
* Topic是消息发布(Pub)者和订阅(Sub)者之间的传输中介
*/
Topic topic = new PatternTopic(this.pattern);

container.addMessageListener(new RedisMessageListener(), topic);

return container;
}
}

监听:
public class RedisMessageListener implements MessageListener {
/**
* Redis 事件监听回调
* @param message
* @param pattern
*/
@Override
public void onMessage(Message message, byte[] pattern) {

}
}

当向redis订阅一个 过期时间的时候,当key过期的时候 redis会发送一个通知高速服务器,key事件已过期,然后 服务器可以执行自己的相关逻辑,可以在key过期的时候 执行一系列操作

4】三种通信方式的优缺点


短轮询

长轮询

WebSocket

浏览器支持

几乎所有现代浏览器

几乎所有现代浏览器

IE 10+ Edge Firefox 4+ Chrome 4+ Safari 5+ Opera 11.5+

服务器负载

较少的CPU资源,较多的内存资源和带宽资源

与传统轮询相似,但是占用带宽较少

无需循环等待(长轮询),CPU和内存资源不以客户端数量衡量,而是以客户端事件数衡量。三种方式里性能最佳。

客户端负载

占用较多的内存资源与请求数。

与传统轮询相似。

同Server-Sent Event。

延迟

非实时,延迟取决于请求间隔。

同传统轮询。

实时。

实现复杂度

非常简单。

需要服务器配合,客户端实现非常简单。

需要Socket程序实现和额外端口,客户端实现简单。

技术方案的选型:

关于业务场景,如果并发量不大,请求频率不高的情况下 选用轮询难度实现上小很多,而且容错率更高。如果在频繁请求资源,一次请求无法返回所有数据的情况下 适合使用websocket。具体需要看业务场景决定。

MQ的典型应用:

哔哩哔哩的弹幕技术架构

  • Kafka(第三方服务)
    消息队列系统。Kafka 是一个分布式的基于发布/订阅的消息系统,它是支持水平扩展的。每条发布到 Kafka 集群的消息都会打上一个名为 Topic(逻辑上可以被认为是一个 queue)的类别,起到消息分布式分发的作用。
  • Router
    存储消息。Comet 将信息传送给 Logic 之后,Logic 会对所收到的信息进行存储,采用 register session 的方式在 Router 上进行存储。Router 里面会收录用户的注册信息,这样就可以知道用户是与哪个机器建立的连接。
  • Logic
    对消息进行逻辑处理。用户建立连接之后会将消息转发给 Logic ,在 Logic 上可以进行账号验证。当然,类似于 IP 过滤以及黑名单设置此类的操作也可以经由 Logic 进行。
  • Comet
    维护客户端长链接。在上面可以规定一些业务需求,比如可以规定用户传送的信息的内容、输送用户信息等。Comet 提供并维持服务端与客户端之间的链接,这里保证链接可用性的方法主要是发送链接协议(如 Socket 等)。
  • Client
    客户端。与 Comet 建立链接。
  • Jop
    消息分发。可以起多个 Jop 模块放到不同的机器上进行覆盖,将消息收录之后,分发到所有的 Comet 上,之后再由 Comet 转发出去。

MQ的实现延迟的方式

rabbitmq:

rabbitmq并没有提供 原生的 延迟队列的实现方式,如果要实现延迟的效果可以使用 死信队列的方式

“死信”是RabbitMQ中的一种消息机制,死信是当MQ出现以下情况的时候:

  1. 消息被否定确认,使用 channel.basicNack 或 channel.basicReject ,并且此时requeue 属性被设置为false。
  2. 消息在队列的存活时间超过设置的TTL时间。
  3. 消息队列的消息数量已经超过最大队列长度

如何配置死信队列

  1. 配置业务队列,绑定到业务交换机上
  2. 为业务队列配置死信交换机和路由key
  3. 为死信交换机配置死信队列

@Configuration
public class RabbitMQConfig {
public static final String BUSINESS_EXCHANGE_NAME = "
dead.letter.demo.simple.business.exchange";

public static final String BUSINESS_QUEUEA_NAME = "
dead.letter.demo.simple.business.queuea";

public static final String BUSINESS_QUEUEB_NAME = "
dead.letter.demo.simple.business.queueb";

public static final String DEAD_LETTER_EXCHANGE = "
dead.letter.demo.simple.deadletter.exchange";

public static final String
DEAD_LETTER_QUEUEA_ROUTING_KEY = "
dead.letter.demo.simple.deadletter.queuea.routingkey";

public static final String
DEAD_LETTER_QUEUEB_ROUTING_KEY = "
dead.letter.demo.simple.deadletter.queueb.routingkey";

public static final String DEAD_LETTER_QUEUEA_NAME = "
dead.letter.demo.simple.deadletter.queuea";

public static final String DEAD_LETTER_QUEUEB_NAME = "
dead.letter.demo.simple.deadletter.queueb";

// 声明业务Exchange
@Bean("businessExchange")
public FanoutExchange businessExchange(){
return new FanoutExchange(BUSINESS_EXCHANGE_NAME);
}
// 声明死信Exchange
@Bean("deadLetterExchange")
public DirectExchange deadLetterExchange(){
return new DirectExchange(DEAD_LETTER_EXCHANGE);
}
// 声明业务队列A
@Bean("businessQueueA")
public Queue businessQueueA(){
Map args = new HashMap<>(2);
// x-dead-letter-exchange 这里声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
// x-dead-letter-routing-key 这里声明当前队列的死信路由key
args.put("x-dead-letter-routing-key",
DEAD_LETTER_QUEUEA_ROUTING_KEY);

return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).withArguments(args).build();
}
// 声明死信队列A
@Bean("deadLetterQueueA")
public Queue deadLetterQueueA(){
return new Queue(DEAD_LETTER_QUEUEA_NAME);
}
// 声明业务队列A绑定关系
@Bean
public Binding businessBindingA(@Qualifier("businessQueueA") Queue queue,
@Qualifier("businessExchange") FanoutExchange exchange){
return BindingBuilder.bind(queue).to(exchange);
}
// 声明死信队列A绑定关系
@Bean
public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue,
@Qualifier("deadLetterExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(
DEAD_LETTER_QUEUEA_ROUTING_KEY);

}


最后总结:选用哪种推送技术需要根据具体的业务场景,一句话一切脱离业务的设计都是耍流氓,不能杀鸡用牛刀,也不能不考虑未来。

相关文章

Java开发数据分析人才需求大(java数据分析项目)

昨日,由广东省教育厅主办、广州大学华软软件学院承办的2018届高校毕业生系列供需见面活动(从化地区IT专场)于12月1日在华软学院举行。全省近300家企业参加,为各高校毕业生提供3360多个就业岗位,...

两成大数据岗位要求Java技能(大数据需要java什么水平)

受新基建、数字化转型及数字中国愿景目标等一系列政策促进,大数据产业高速增长,并与人工智能、云计算和区块链等领域深度融合,为各行各业带来革命性的变革。相关人才需求量正在持续增长,且呈现出供不应求的趋势。...

42 张图带你揭秘后端技术都要学啥?

作者 | L的存在来源 | 我是程序员小贱(ID:Lanj1995Q)说到后端开发,难免会遇到各种所谓高大上的「关键词 」,对于我们应届生小白,难免会觉得比较陌生,因为在学校确实比较少遇见这些所谓高大...

为什么JVM要用到压缩指针?Java对象要求8字节的整数倍?

作者:苏易困原文链接:https://juejin.cn/post/7076013091040788494前言前两天在一个帖子中看到一道面试题:堆内存超过32G时,为什么压缩指针失效?之前没有了解过这...

Java的前景怎么样,适合哪类人群学习

作为互联网小白,选择行业时一定要慎重。如果你想选择编程类的技术工作,可以先从Java了解~那么,Java语言在IT行业内的发展前景如何?本篇文章会详解讲解。Java前景怎么样Java有非常广泛的应用市...