消息队列
为什么使用消息队列?
解耦、异步、削峰
消息队列有什么优点和缺点?
优点:解耦、异步、削峰
缺点:系统的可用性降低、系统的复杂性提高了、一致性问题。
RabbitMQ上的一个queue中存放的message是否有数量限制?限制是多少
默认情况下一般是无限制,因为限制取决于机器的内存,但是消息过多会导致处理效率的下降。
可以通过参数来限制, x-max-length :对队列中消息的条数进行限制 , x-max-length-bytes :对队列中消息的总量进行限制
AMQP 是什么?
RabbitMQ 就是 AMQP 协议的 Erlang
的实现(当然 RabbitMQ 还支持 STOMP2
、 MQTT3
等协议 ) AMQP 的模型架构 和 RabbitMQ 的模型架构是一样的,生产者将消息发送给交换器,交换器和队列绑定 。
RabbitMQ 中的交换器、交换器类型、队列、绑定、路由键等都是遵循的 AMQP 协议中相 应的概念。目前 RabbitMQ 最新版本默认支持的是 AMQP 0-9-1。
AMQP 协议的三层:
- Module Layer:协议最高层,主要定义了一些客户端调用的命令,客户端可以用这些命令实现自己的业务逻辑。
- Session Layer:中间层,主要负责客户端命令发送给服务器,再将服务端应答返回客户端,提供可靠性同步机制和错误处理。
- TransportLayer:最底层,主要传输二进制数据流,提供帧的处理、信道复用、错误检测和数据表示等。
AMQP 模型的三大组件:
- 交换器 (Exchange):消息代理服务器中用于把消息路由到队列的组件。
- 队列 (Queue):用来存储消息的数据结构,位于硬盘或内存中。
- 绑定 (Binding):一套规则,告知交换器消息应该将消息投递给哪个队列。
说说生产者 Producer 和消费者 Consumer?
生产者 :
- 消息生产者,就是投递消息的一方。
- 消息一般包含两个部分:消息体(
payload
)和标签(Label
)。
消费者:
- 消费消息,也就是接收消息的一方。
- 消费者连接到 RabbitMQ 服务器,并订阅到队列上。消费消息时只消费消息体,丢弃标签。
说说 Broker 服务节点、Queue 队列、Exchange 交换器?
- Broker:可以看做 RabbitMQ 的服务节点。一般情况下一个 Broker 可以看做一个 RabbitMQ 服务器。
- Queue:RabbitMQ 的内部对象,用于存储消息。多个消费者可以订阅同一队列,这时队列中的消息会被平摊(轮询)给多个消费者进行处理。
- Exchange:生产者将消息发送到交换器,由交换器将消息路由到一个或者多个队列中。当路由不到时,或返回给生产者或直接丢弃。
什么是死信队列?如何导致的?
DLX,全称为 Dead-Letter-Exchange
,死信交换器,死信邮箱。当消息在一个队列中变成死信 (dead message
) 之后,它能被重新发送到另一个交换器中,这个交换器就是 DLX,绑定 DLX 的队列就称之为死信队列。
导致的死信的几种原因:
- 消息被拒(
Basic.Reject /Basic.Nack
) 且requeue = false
。 - 消息 TTL 过期。
- 队列满了,无法再添加。
什么是延迟队列?RabbitMQ 怎么实现延迟队列?
延迟队列指的是存储对应的延迟消息,消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。
RabbitMQ 本身是没有延迟队列的,要实现延迟消息,一般有两种方式:
- 通过 RabbitMQ 本身队列的特性来实现,需要使用 RabbitMQ 的死信交换机(Exchange)和消息的存活时间 TTL(Time To Live)。
- 在 RabbitMQ 3.5.7 及以上的版本提供了一个插件(rabbitmq-delayed-message-exchange)来实现延迟队列功能。同时,插件依赖 Erlang/OPT 18.0 及以上。
也就是说,AMQP 协议以及 RabbitMQ 本身没有直接支持延迟队列的功能,但是可以通过 TTL 和 DLX 模拟出延迟队列的功能。
什么是优先级队列?
RabbitMQ 自 V3.5.0 有优先级队列实现,优先级高的队列会先被消费。
可以通过x-max-priority
参数来实现优先级队列。不过,当消费速度大于生产速度且 Broker 没有堆积的情况下,优先级显得没有意义。
RabbitMQ 有哪些工作模式?
- 简单模式
- work 工作模式
- pub/sub 发布订阅模式
- Routing 路由模式
- Topic 主题模式
RabbitMQ 消息怎么传输?
由于 TCP 链接的创建和销毁开销较大,且并发数受系统资源限制,会造成性能瓶颈,所以 RabbitMQ 使用信道的方式来传输数据。信道(Channel)是生产者、消费者与 RabbitMQ 通信的渠道,信道是建立在 TCP 链接上的虚拟链接,且每条 TCP 链接上的信道数量没有限制。就是说 RabbitMQ 在一条 TCP 链接上建立成百上千个信道来达到多个线程处理,这个 TCP 被多个线程共享,每个信道在 RabbitMQ 都有唯一的 ID,保证了信道私有性,每个信道对应一个线程使用。
如何保证消息的可靠性?
消息到 MQ 的过程中搞丢,MQ 自己搞丢,MQ 到消费过程中搞丢。
- 生产者到 RabbitMQ:事务机制和 Confirm 机制,注意:事务机制和 Confirm 机制是互斥的,两者不能共存,会导致 RabbitMQ 报错。
- RabbitMQ 自身:持久化、集群、普通模式、镜像模式。
- RabbitMQ 到消费者:basicAck 机制、死信队列、消息补偿机制。
如何保证 RabbitMQ 消息的顺序性?
- 拆分多个 queue(消息队列),每个 queue(消息队列) 一个 consumer(消费者),就是多一些 queue (消息队列)而已,确实是麻烦点;
- 或者就一个 queue (消息队列)但是对应一个 consumer(消费者),然后这个 consumer(消费者)内部用内存队列做排队,然后分发给底层不同的 worker 来处理。
如何保证 RabbitMQ 高可用的?
- 拆分多个 queue(消息队列),每个 queue(消息队列) 一个 consumer(消费者),就是多一些 queue (消息队列)而已,确实是麻烦点;
- 或者就一个 queue (消息队列)但是对应一个 consumer(消费者),然后这个 consumer(消费者)内部用内存队列做排队,然后分发给底层不同的 worker 来处理。
如何保证 RabbitMQ 高可用的?
RabbtiMQ 是可以设置过期时间的,也就是 TTL。如果消息在 queue 中积压超过一定的时间就会被 RabbitMQ 给清理掉,这个数据就没了。那这就是第二个坑了。这就不是说数据会大量积压在 mq 里,而是大量的数据会直接搞丢。我们可以采取一个方案,就是批量重导,这个我们之前线上也有类似的场景干过。就是大量积压的时候,我们当时就直接丢弃数据了,然后等过了高峰期以后,比如大家一起喝咖啡熬夜到晚上 12 点以后,用户都睡觉了。这个时候我们就开始写程序,将丢失的那批数据,写个临时程序,一点一点的查出来,然后重新灌入 mq 里面去,把白天丢的数据给他补回来。也只能是这样了。假设 1 万个订单积压在 mq 里面,没有处理,其中 1000 个订单都丢了,你只能手动写程序把那 1000 个订单给查出来,手动发到 mq 里去再补一次。
如何解决重复消费?
所有的MQ 是无法保证消息不被重复消费的,只能业务系统层面考虑。利用redis进行业务方面的去重。
Rocketmq如何保证高可用性?
1、架构层面
避免用单节点或者简单的一主一从架构,可以采取多主多从的架构,并且主从之间采用同步复制的方式进行数据双写。
2、刷盘策略
RocketMQ默认的异步刷盘,可以改成同步刷盘SYNC_FLUSH。
3、生产消息的高可用
当消息发送失败了,在消息重试的时候,会尽量规避上一次发送的 Broker,选择还没推送过该消息的Broker,以增大消息发送的成功率。
4、消费消息的高可用
消费者获取到消息之后,可以等到整个业务处理完成,再进行CONSUME_SUCCESS状态确认,如果业务处理过程中发生了异常那么就会触发broker的重试机制。
RocketMq的存储机制了解吗?
消息生产者发送消息到broker,都是会按照顺序存储在CommitLog文件中,每个commitLog文件的大小为1G。
CommitLog-存储所有的消息元数据,包括Topic、QueueId以及message。文件
CosumerQueue-消费逻辑队列:存储消息在CommitLog的offset。文件
IndexFile-索引文件:存储消息的key和时间戳等信息,使得RocketMq可以采用key和时间区间来查询消息 。
也就是说,rocketMq将消息均存储在CommitLog中,并分别提供了CosumerQueue和IndexFile两个索引,来快速检索消息。
RocketMq性能比较高的原因?
1.顺序写
顺序写比随机写的性能会高很多,不会有大量寻址的过程
2.异步刷盘
相比较于同步刷盘,异步刷盘的性能会高很多
3.零拷贝
使用mmap的方式进行零拷贝,提高了数据传输的效率
有几百万消息持续积压几小时,说说怎么解决?
1.一般情况下,是消费者的问题,赶紧修复消费者。
2.如果你将消费者修复好了之后,但是mq里面积压的消息量比较大,消费完了可能需要很长的世间,在业务上不能接受,这个时候,
我们可以将原来的消费者停掉。然后征用10倍的机器来部署producer,将原来生产者里面积压的消息快速消费掉。
然后将原来的生产者也停掉。
再征用10倍的机器部署consumer,然后再快速消费倒腾过来的积压数据。
等快速消费完积压的数据之后,将征用过来的机器都释放掉,最后再恢复原来的生产者和消费者就可以了。
Rocketmq中Broker的部署方式
1.单Master 部署;
2.多Master部署
3.多Master多Slave部署
Rocketmq中Broker的刷盘策略有哪些?
同步刷盘
异步刷盘
什么是路由注册?RocketMQ如何进行路由注册与发现?
RocketMQ的路由注册是通过broker向NameServer发送心跳包实现的,首先borker每隔30s向nameserver发送心跳语句,nameserver处理。
RocketMQ的路由发现不是实时的,NameServer不会主动向客户端推送,而是客户端定时拉取主题最新的路由,然后更新。
step1:调用RouterInfoManager的方法,从路由表topicQueueTable、brokerAddrTable、filterServerTable分别填充信息;
step2:如果主题对应的消息为顺序消息,则从NameServerKVconfig中获取关于顺序消息相关的配置填充路由信息;
什么是路由剔除?RocketMQ如何进行路由剔除?
路由删除有两个触发节点:
1)NameServer定时扫描brokerLiveTable检测上次心跳包与当前系统时间的时间差,如果大于120S,就需要删除;
2)Broker在正常关闭使,会执行unregisterBroker命令。
两种方法删除的逻辑都是一致的。
step1:申请写锁
step2:从brokerLiveTable、filterServerTable移除,从brokerAddrTable、clusterAddrTable、topicQueueTable移除
step3:释放锁
使用RocketMQ过程中遇到过什么问题?
1、消息挤压问题
2、消息丢失问题
3、消息重复消费问题
4、RocketMQ内存不够OOM问题
RocketMQ的总体架构,以及每个组件的功能?
讲一讲RocketMQ中的分布式事务及实现
总结:使用的是半消息+事务回查机制来做的。
讲一讲RocketMQ中事务回查机制的实现
TODO: 太难了,先放过。
你们生产环境的rocketmq集群是怎么搭建的
就说是三主三从的架构。
面试官可能会问你为什么从从节点消费数据呀,因为主节点不是每次来一条数据就同步给从节点,而是够了一个批次之后,一起提交给从节点,这种效率比较高效。
nameserver节点之间是不会相互进行通信的,每个nameserver都维护了全量的borker信息。
如果borker-a主节点挂了之后,borker-slave-a节点是不会被提升为主节点的,它一辈子就是从节点。如果主从同步的方式你指定的是同步复制,那么基本省不会丢失数据,但是如果你指定的是异步复制的方式,那么会丢数据的。
rocketmq中有重试机制,他会看borker-a发送不成功了,会将消息发送到borker-b中。
nameserver1对应的ip是192.168.31.103
nameserver2对应的ip是192.168.31.104
rocketmq的架构是怎么样的,组件有哪些?
RocketMQ 和 kafka 之间有什么区别
适用场景:
rocketmq适合做业务的处理。
kafka适合做日志的处理。
性能:
rocketmq:tps 10w
kafka:tps 100w
结论:kafka性能更高
可靠性:
rocketmq:支持同步刷盘、异步刷盘、同步复制,异步复制
kafka:支持异步刷盘,异步复制
结论:rocketmq的可靠性较好
顺序性:
RocketMQ:支持严格的顺序消息,在顺序消费的场景下,一台broker宕机后,发送消息失败,但不会轮序·
Kakfa:kafka在某些配置下,支持顺序消息,但在一台broker宕机后,消息会乱序 结论:RocketMQ的顺序性较好
消费失败重试
rocketmq:支持失败重试,支持重试间隔时间顺延。
kafka:不支持
结论:rocketmq胜出
延时消息/定时消息
rocketmq:支持
kafka:不支持
结论:rocketmq胜出
分布式事务:
rocketmq:支持
kafka:不支持
结论:rocketmq胜出
消息回溯 RocketMQ:支持某个时间戳(毫秒级)来回溯消息 Kakfa:支持某个偏移量offset来回溯消息 结论:各有千秋,不分伯仲
消息查询机制 RocketMQ:支持messageid和消息内容查询消息 Kakfa:不支持 结论:RocketMQ胜出
rocketmq几个核心知识点
rocketmq发送消息支持同步发送和异步发送。
rocketmq中是不会自动进行主从切换的。说白了就是你只要是从节点,你这辈子也就是从节点,不可能提升为主节点。
rocketmq消息发送失败处理:
最多重试两次,这是rocketmq内部实现的
rocketmqmq主节点和从节点的数据同步方式可以采用同步复制或者是异步复制。
rocketmq是通过注册消息监听器的方式来消费消息的,消息监听器大概有这么几种
从这张图中我们能够看到有支持并发消费消息的监听器,有顺序消费消息的监听器。
rocketmq的顺序消息
顺序消息指生产者局部有序发送到一个queue,但多个queue之间是全局无序的。
生产者在发送消息的时候我们可以指定MessageQueueSelector的接口,然后在这个接口里面实现我们自定义的将消息发送到哪个messagequeue里面。
public SendResult send(Message msg, MessageQueueSelector selector, Object arg)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
return this.defaultMQProducerImpl.send(msg, selector, arg);
}
然后我们在消费消息的时候,一般使用的是注册一个监听器的方式进行消息的消费,这个时候我们使用MessageListenerOrderly这个顺序消费消息的监听器。
rocketmq怎样保证消息不丢失
消息发送到生产者是采用同步发送还是再用异步发送,如果采用的是同步发送的方式,这个时候会给我们返回一个结果,我们需要判断这个结果是否成功,如果失败就重新发送。如果采用的是异步发送,这个时候会在callback接口中提供两个方法,一个是onSuccess方法,会传给我们处理结果,这个时候我们可以根据这个结果进行判断,是否需要重新发送。还有一个方法是onException方法,也可以在这个方法里面进行消息的重新发送。说白了就是不管是同步发送还是异步发送,我们都可以根据返回的结果做相应的补偿机制。
mq采用同步刷盘还是异步刷盘,异步刷盘是消息写到缓存中就立刻返回了,为了保证消息不丢失,这个时候我们可以使用同步刷盘的方式。
rocketmq如果是集群模式部署的话,这个时候主从复制我们可以采用同步复制,不要采用异步复制。
消费者在拿到消息之后,处理完成消息,会自动提交ack应答,比方说我们在处理消息的时候出现了异常,这个时候还想重复消费这条消息,我们就需要关闭自动应答,改为手动应答的方式。
还有就是消息在发送之前,可能因为网络抖动的原因,压根就没有到达mq,这个时候我们可以在消息发送之前设置一张消息日志表,将消息先存到这张表里面(采用顺序写的这种方式),然后再进行发送。
Kafka的特性
1.消息持久化
2.高吞吐量
3.扩展性
4.多客户端支持
5.Kafka Streams
注意:当你不会说的时候,就围绕着kafka你知道的kafka的功能来说,比方说消息的持久化机制。
说一说Kafka你熟悉的参数?
创建生产者对象时有三个属性必须指定
bootstrap.servers:说白了就是指定都有哪些节点
key.serializer:键的序列化器
value:serializer: 值的序列化器
acks:指定了必须要有多少个分区副本收到消息,生产者才会认为写入消息是成功的,这个参数对消息丢失的可能性有重大影响。
acks=0:⽣产者不等待broker对消息的确认,只要将消息放到缓冲区,就认为消息已经发送完成。
acks=1: 表示消息只需要写到主分区即可,然后就相应客户端,而不等待副本分区的确认。
acks=all: 表示消息需要写到主分区,并且会等待所有的ISR副本分区确认记录。
retries:重试次数,当消息返送出现错误的时候,系统会重发消息。根客户端收到错误时,重发一样。
compression.type:指定消息的压缩类型。
max.request.size:控制生产者发送请求的最大大小。
request.timeout.ms:客户端等待请求响应的最大值。说白了就是一个超时时间。
batch.size:当多个消息被发送同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。当批次内存被填满后,批次里的所有消息会被发送出去。
kafka中,可以不用zookeeper么?
在新版本的kafka中可以不使用zookeeper。但是kafka本身是自带zookeeper的。但是为了安全考虑,我们通常会外接我们自己的zookeeper。现在的新版本可以使用Kafka with Kraft,就可以完全抛弃zookeeper。
为什么Kafka不支持读写分离?
1、数据一致性问题:数据从主节点转到从节点,必然会有一个延时的时间窗口,这个时间窗口会导致主从节点之间的数据不一致。
2、延时问题:Kafka追求高性能,如果走主从复制,延时严重
Kafka中是怎么做到消息顺序性的
一个 topic,一个 partition,一个 consumer,内部单线程消费,最傻瓜式的一种做法。
构建ProducerRecord消息的时候,我们可以通过构造方法指定要发送到哪个分区。
public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {
}
Kafka为什么那么快?
- 利用 Partition 实现并行处理
- 顺序写磁盘
- 充分利用 Page Cache
- 零拷贝技术
- 批处理
- 数据压缩
Kafka文件高效存储设计原理?
- Kafka把Topic中一个Partition大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完成的文件,减少磁盘占用
- 通过索引信息可以快速定位Message和确定response的最大大小
- 通过将索引元数据全部映射到 memory,可以避免 Segment 文件的磁盘I/O操作
- 通过索引文件稀疏存储,可以大幅降低索引文件元数据占用空间大小
主题是一个逻辑上的概念,还可以细分为多个分区,一个分区只属于单个主题,很多时候也会把分区称为主题分区(Topic-Partition)。同一主题下的不同分区包含的消息是不同的,分区在存储层面可以看做一个可追加的日志文件
,消息在被追加到分区日志文件的时候都会分配一个特定的偏移量(offset)。offset 是消息在分区中的唯一标识,kafka 通过它来保证消息在分区内的顺序性,不过 offset 并不跨越分区,也就是说,kafka保证的是分区有序而不是主题有序。
在分区中又引入了多副本(replica)的概念,通过增加副本数量可以提高容灾能力。同一分区的不同副本中保存的是相同的消息。副本之间是一主多从的关系,其中主副本负责读写,从副本只负责消息同步。副本处于不同的 broker 中,当主副本出现异常,便会在从副本中提升一个为主副本。
Kafka 中分区的原则
- 指明Partition的情况下,直接将指明的值作为Partition值
- 没有指明Partition值但有 key 的情况下,将 key 的 Hash 值与 topic 的Partition值进行取余得到Partition值
- 既没有Partition值又没有 key 值的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与Topic可用的Partition总数取余得到Parittion值,也就是常说的 round-robin 算法
Kafka 为什么要把消息分区
- 方便在集群中扩展,每个 Partition 可用通过调整以适应它所在的机器,而一个Topic又可以有多个Partition组成,因此整个集群就可以适应任意大小的数据了
- 可以提高并发,因为可以以Partition为单位进行读写
Kafka 中生产者运行流程
- 一条消息发过来首先会被封装成一个 ProducerRecord 对象
- 对该对象进行序列化处理(可以使用默认,也可以自定义序列化)
- 对消息进行分区处理,分区的时候需要获取集群的元数据,决定这个消息会被发送到哪个主题的哪个分区
- 分好区的消息不会直接发送到服务端,而是放入生产者的缓存区,多条消息会被封装成一个批次(Batch),默认一个批次的大小是 16KB
- Sender 线程启动以后会从缓存里面去获取可以发送的批次
- Sender 线程把一个一个批次发送到服务端
Kafka 中的消息封装
在Kafka 中 Producer 可以 Batch的方式推送数据达到提高效率的作用。Kafka Producer 可以将消息在内存中累积到一定数量后作为一个 Batch 发送请求。Batch 的数量大小可以通过 Producer 的参数进行控制,可以从三个维度进行控制
- 累计的消息的数量(如500条)
- 累计的时间间隔(如100ms)
- 累计的数据大小(如64KB)
通过增加 Batch 的大小,可以减少网络请求和磁盘I/O的频次,具体参数配置需要在效率和时效性做一个权衡。
Kafka 消息的消费模式
Kafka采用大部分消息系统遵循的传统模式:Producer将消息推送到Broker,Consumer从Broker获取消息。
如果采用 Push 模式,则Consumer难以处理不同速率的上游推送消息。
采用 Pull 模式的好处是Consumer可以自主决定是否批量的从Broker拉取数据。Pull模式有个缺点是,如果Broker没有可供消费的消息,将导致Consumer不断在循环中轮询,直到新消息到达。为了避免这点,Kafka有个参数可以让Consumer阻塞直到新消息到达。
Kafka 如何实现负载均衡与故障转移
负载均衡是指让系统的负载根据一定的规则均衡地分配在所有参与工作的服务器上,从而最大限度保证系统整体运行效率与稳定性
负载均衡
Kakfa 的负载均衡就是每个 Broker 都有均等的机会为 Kafka 的客户端(生产者与消费者)提供服务,可以负载分散到所有集群中的机器上。Kafka 通过智能化的分区领导者选举来实现负载均衡,提供智能化的 Leader 选举算法,可在集群的所有机器上均匀分散各个Partition的Leader,从而整体上实现负载均衡。
故障转移
Kafka 的故障转移是通过使用会话机制实现的,每台 Kafka 服务器启动后会以会话的形式把自己注册到 Zookeeper 服务器上。一旦服务器运转出现问题,就会导致与Zookeeper 的会话不能维持从而超时断连,此时Kafka集群会选举出另一台服务器来完全替代这台服务器继续提供服务
Kafka 中 AR、ISR、OSR 三者的概念
AR
:分区中所有副本称为 ARISR
:所有与主副本保持一定程度同步的副本(包括主副本)称为 ISROSR
:与主副本滞后过多的副本组成 OSR
分区副本什么情况下会从 ISR 中剔出
Leader 会维护一个与自己基本保持同步的Replica列表,该列表称为ISR,每个Partition都会有一个ISR,而且是由Leader动态维护。所谓动态维护,就是说如果一个Follower比一个Leader落后太多,或者超过一定时间未发起数据复制请求,则Leader将其从ISR中移除。当ISR中所有Replica都向Leader发送ACK(Acknowledgement确认)时,Leader才commit。
分区副本中的 Leader 如果宕机但 ISR 却为空该如何处理
可以通过配置unclean.leader.election
:
- true:允许 OSR 成为 Leader,但是 OSR 的消息较为滞后,可能会出现消息不一致的问题
- false:会一直等待旧 leader 恢复正常,降低了可用性
如何判断一个 Broker 是否还有效
- Broker必须可以维护和ZooKeeper的连接,Zookeeper通过心跳机制检查每个结点的连接。
- 如果Broker是个Follower,它必须能及时同步Leader的写操作,延时不能太久。
Kafka 可接收的消息最大默认多少字节,如何修改
Kafka可以接收的最大消息默认为1000000字节,如果想调整它的大小,可在Broker中修改配置参数:Message.max.bytes
的值
但要注意的是,修改这个值,还要同时注意其他对应的参数值是正确的,否则就可能引发一些系统异常。首先这个值要比消费端的fetch.Message.max.bytes(默认值1MB,表示消费者能读取的最大消息的字节数)参数值要小才是正确的设置,否则Broker就会因为消费端无法使用这个消息而挂起。
Kafka 的 ACK 机制
Kafka的Producer有三种ack机制,参数值有0、1 和 -1
- 0: 相当于异步操作,Producer 不需要Leader给予回复,发送完就认为成功,继续发送下一条(批)Message。此机制具有最低延迟,但是持久性可靠性也最差,当服务器发生故障时,很可能发生数据丢失。
- 1: Kafka 默认的设置。表示 Producer 要 Leader 确认已成功接收数据才发送下一条(批)Message。不过 Leader 宕机,Follower 尚未复制的情况下,数据就会丢失。此机制提供了较好的持久性和较低的延迟性。
- -1: Leader 接收到消息之后,还必须要求ISR列表里跟Leader保持同步的那些Follower都确认消息已同步,Producer 才发送下一条(批)Message。此机制持久性可靠性最好,但延时性最差。
Kafka 的 consumer 如何消费数据
在Kafka中,Producers将消息推送给Broker端,在Consumer和Broker建立连接之后,会主动去 Pull(或者说Fetch)消息。这种模式有些优点,首先Consumer端可以根据自己的消费能力适时的去fetch消息并处理,且可以控制消息消费的进度(offset);此外,消费者可以控制每次消费的数,实现批量消费。
Kafka 的Topic中 Partition 数据是怎么存储到磁盘的
Topic 中的多个 Partition 以文件夹的形式保存到 Broker,每个分区序号从0递增,且消息有序。Partition 文件下有多个Segment(xxx.index,xxx.log),Segment文件里的大小和配置文件大小一致。默认为1GB,但可以根据实际需要修改。如果大小大于1GB时,会滚动一个新的Segment并且以上一个Segment最后一条消息的偏移量命名。
Kafka 创建Topic后如何将分区放置到不同的 Broker 中
Kafka创建Topic将分区放置到不同的Broker时遵循以下规则:
- 副本因子不能大于Broker的个数。
- 第一个分区(编号为0)的第一个副本放置位置是随机从Broker List中选择的。
- 其他分区的第一个副本放置位置相对于第0个分区依次往后移。也就是如果有3个Broker,3个分区,假设第一个分区放在第二个Broker上,那么第二个分区将会放在第三个Broker上;第三个分区将会放在第一个Broker上,更多Broker与更多分区依此类推。剩余的副本相对于第一个副本放置位置其实是由
nextReplicaShift
决定的,而这个数也是随机产生的。
Kafka 的日志保留期与数据清理策略
概念
保留期内保留了Kafka群集中的所有已发布消息,超过保期的数据将被按清理策略进行清理。默认保留时间是7天,如果想修改时间,在server.properties
里更改参数log.retention.hours/minutes/ms
的值便可。
清理策略
- 删除:
log.cleanup.policy=delete
表示启用删除策略,这也是默认策略。一开始只是标记为delete,文件无法被索引。只有过了log.Segment.delete.delay.ms
这个参数设置的时间,才会真正被删除。 - 压缩:
log.cleanup.policy=compact
表示启用压缩策略,将数据压缩,只保留每个Key最后一个版本的数据。首先在Broker的配置中设置log.cleaner.enable=true
启用 cleaner,这个默认是关闭的。
Kafka 日志存储的Message是什么格式
Kafka一个Message由固定长度的header和一个变长的消息体body组成。将Message存储在日志时采用不同于Producer发送的消息格式。每个日志文件都是一个log entries(日志项)序列:
- 每一个log entry包含一个四字节整型数(Message长度,值为1+4+N)。
- 1个字节的magic,magic表示本次发布Kafka服务程序协议版本号。
- 4个字节的CRC32值,CRC32用于校验Message。
- 最终是N个字节的消息数据。每条消息都有一个当前Partition下唯一的64位offset。
Kafka没有限定单个消息的大小,但一般推荐消息大小不要超过1MB,通常一般消息大小都在1~10KB之间。
Kafka 是否支持多租户隔离
多租户技术(multi-tenancy technology)是一种软件架构技术,它是实现如何在多用户的环境下共用相同的系统或程序组件,并且仍可确保各用户间数据的隔离性。
解决方案
通过配置哪个主题可以生产或消费数据来启用多租户,也有对配额的操作支持。管理员可以对请求定义和强制配额,以控制客户端使用的Broker资源。
Kafka 的日志分段策略与刷新策略
日志分段(Segment)策略
log.roll.hours/ms
:日志滚动的周期时间,到达指定周期时间时,强制生成一个新的Segment,默认值168h(7day)。log.Segment.bytes
:每个Segment的最大容量。到达指定容量时,将强制生成一个新的Segment。默认值1GB(-1代表不限制)。log.retention.check.interval.ms
:日志片段文件检查的周期时间。默认值60000ms。
日志刷新策略
Kafka的日志实际上是开始是在缓存中的,然后根据实际参数配置的策略定期一批一批写入到日志文件中,以提高吞吐量。
log.flush.interval.Messages
:消息达到多少条时将数据写入到日志文件。默认值为10000。log.flush.interval.ms
:当达到该时间时,强制执行一次flush。默认值为null。log.flush.scheduler.interval.ms
:周期性检查,是否需要将信息flush。默认为很大的值。
Kafka 中如何进行主从同步
Kafka动态维护了一个同步状态的副本的集合(a set of In-SyncReplicas),简称ISR,在这个集合中的结点都是和Leader保持高度一致的,任何一条消息只有被这个集合中的每个结点读取并追加到日志中,才会向外部通知“这个消息已经被提交”。
kafka 通过配置 producer.type
来确定是异步还是同步,默认是同步
同步复制
Producer 会先通过Zookeeper识别到Leader,然后向 Leader 发送消息,Leader 收到消息后写入到本地 log文件。这个时候Follower 再向 Leader Pull 消息,Pull 回来的消息会写入的本地 log 中,写入完成后会向 Leader 发送 Ack 回执,等到 Leader 收到所有 Follower 的回执之后,才会向 Producer 回传 Ack。
异步复制
Kafka 中 Producer 异步发送消息是基于同步发送消息的接口来实现的,异步发送消息的实现很简单,客户端消息发送过来以后,会先放入一个 BlackingQueue
队列中然后就返回了。Producer 再开启一个线程 ProducerSendTread
不断从队列中取出消息,然后调用同步发送消息的接口将消息发送给 Broker。
Producer的这种在内存缓存消息,当累计达到阀值时批量发送请求,小数据I/O太多,会拖慢整体的网络延迟,批量延迟发送事实上提升了网络效率。但是如果在达到阀值前,Producer不可用了,缓存的数据将会丢失。
Kafka 中什么情况下会出现消息丢失/不一致的问题
消息发送时
消息发送有两种方式:同步 - sync
和 异步 - async
。默认是同步的方式,可以通过 producer.type 属性进行配置,kafka 也可以通过配置 acks 属性来确认消息的生产
0
:表示不进行消息接收是否成功的确认1
:表示当 leader 接收成功时的确认-1
:表示 leader 和 follower 都接收成功的确认
当 acks = 0 时,不和 Kafka 进行消息接收确认,可能会因为网络异常,缓冲区满的问题,导致消息丢失
当 acks = 1 时,只有 leader 同步成功而 follower 尚未完成同步,如果 leader 挂了,就会造成数据丢失
消息消费时
Kafka 有两个消息消费的 consumer 接口,分别是 low-level
和 hign-level
low-level
:消费者自己维护 offset 等值,可以实现对 kafka 的完全控制high-level
:封装了对 partition 和 offset,使用简单
如果使用高级接口,可能存在一个消费者提取了一个消息后便提交了 offset,那么还没来得及消费就已经挂了,下次消费时的数据就是 offset + 1 的位置,那么原先 offset 的数据就丢失了。
Kafka 作为流处理平台的特点
流处理就是连续、实时、并发和以逐条记录的方式处理数据的意思。Kafka 是一个分布式流处理平台,它的高吞吐量、低延时、高可靠性、容错性、高可扩展性都使得Kafka非常适合作为流式平台。
- 它是一个简单的、轻量级的Java类库,能够被集成到任何Java应用中
- 除了Kafka之外没有任何其他的依赖,利用Kafka的分区模型支持水平扩容和保证顺序性
- 支持本地状态容错,可以执行非常快速有效的有状态操作
- 支持 eexactly-once 语义
- 支持一次处理一条记录,实现 ms 级的延迟
消费者故障,出现活锁问题如何解决
活锁的概念:消费者持续的维持心跳,但没有进行消息处理。
为了预防消费者在这种情况一直持有分区,通常会利用 max.poll.interval.ms
活跃检测机制,如果调用 Poll 的频率大于最大间隔,那么消费者将会主动离开消费组,以便其他消费者接管该分区
Kafa 中如何保证顺序消费
Kafka 的消费单元是 Partition,同一个 Partition 使用 offset 作为唯一标识保证顺序性,但这只是保证了在 Partition 内部的顺序性而不是 Topic 中的顺序,因此我们需要将所有消息发往统一 Partition 才能保证消息顺序消费,那么可以在发送的时候指定 MessageKey,同一个 key 的消息会发到同一个 Partition 中。
kafka怎样做消息广播?
这里需要问面试官一下,你是想要实现消息的广播发送还是消息的广播消费。
消息的广播发送:
Kafka 本身不提供直接的“广播”机制,但可以通过创建多个主题(topics)并将消息发送到每个主题来实现广播效果。每个订阅者只能看到他所订阅的主题中的消息。
消息的广播消费:
总所周知,我们每一个消费组中只有一个消费者能消费到某个主题下的那一条消息。这个时候我们可以通过在每个消费者分组上加上UUID,这样就能保证每个项目启动的消费者的分组是不同的。这样就能够达到广播消费的目的了。
kafka中的消息粒度,partition下面还有一个消息存储的粒度,叫什么?
**Kafka中的partition下面存储的是LogSegment**。每个partition在物理上被分割成多个大小相等的LogSegment,每个LogSegment由一个数据文件(.log)和一个索引文件(.index)组成。数据文件存储实际的消息数据,而索引文件存储消息的索引信息,包括消息的物理偏移地址等元数据。
如果我配置了7天,那么我消费的时候如果指定的是earlist的话,那么有没有可能消费到7天之外的数据。
有可能 https://blog.csdn.net/weixin_42305433/article/details/109731388
kafka消息删除的粒度,是消息级别的删除吗?
Kafka的删除策略以segment为单位,而非单条消息。
kafka的消息是以主题为单位进行归类的,各个主题之间彼此是独立的,互不影响。
每个主题又可以分为一个或多个分区。
每个分区各自存在一个记录消息数据的日志文件。
分区日志文件中包含很多的LogSegment(也就是说日志分段),默认情况下一个LogSegment是1G。
kafka一共有两种消息删除策略,一种是消息删除,一种是消息压缩。
消息删除:在Kafka中,消息一旦被写入到分区中,就不可以被直接删除。这是因为Kafka的设计目标是实现高性能的消息持久化存储,而不是作为一个传统的队列,所以不支持直接删除消息。
然而,Kafka提供了消息的过期策略来间接删除消息。具体来说,可以通过设置消息的过期时间(TTL)来控制消息的生命周期。一旦消息的时间戳超过了设定的过期时间,Kafka会将其标记为过期,并在后续的清理过程中删除这些过期的消息。
Kafka的清理过程由消费者组中的消费者来执行。消费者消费主题中的消息,并将消费的进度提交到Kafka。一旦消息被提交,Kafka就可以安全地删除这些消息。
另一方面,如果需要从Kafka中完全删除消息,可以通过设置合适的保留策略来实现。Kafka支持两种保留策略:基于时间和基于大小。基于时间的保留策略会根据消息的时间戳来删除旧的消息,而基于大小的保留策略会根据分区的大小来删除旧的消息。可以根据业务需求选择适合的保留策略。
需要注意的是,删除消息并不会立即释放磁盘空间。删除的消息只是被标记为删除,并在后续的清理过程中才会真正释放磁盘空间。因此,即使消息被删除,磁盘空间也不会立即释放,而是会在清理过程中逐渐释放。
消息压缩会将所有key相同的消息进行合并。这个一般使用在大数据领域。
kafka为什么这么快?
kafka对于大数据是支持的,比如说Hadoop
主要有以下这么四点:
磁盘顺序
读写页缓存:直接使用操作系统的页缓存特性提高处理速度,进而避免了JVM GC带来的性能损耗。
零拷贝
批量操作:kafka也支持消息压缩和批量发送数据
kafka文件删除有两种方式,一种是基于时间,一种是基于分区文件的大小
kafka几个核心点(面试有被问到)
acks:
该选项控制着已发送消息的持久性。
acks=0 :⽣产者不等待broker的任何消息确认。只要将消息放到了socket的缓冲区,就认为消息 已发送。不能保证服务器是否收到该消息, retries 设置也不起作⽤,因为客户端不关⼼消息是 否发送失败。客户端收到的消息偏移量永远是-1。
acks=1 :leader将记录写到它本地⽇志,就响应客户端确认消息,⽽不等待follower副本的确 认。如果leader确认了消息就宕机,则可能会丢失消息,因为follower副本可能还没来得及同步该 消息。
acks=all :leader等待所有同步的副本确认该消息。保证了只要有⼀个同步副本存在,消息就不 会丢失。这是最强的可⽤性保证。等价于 acks=-1 。默认值为1,字符串。可选值:[all, -1, 0, 1]
retries
设置该属性为⼀个⼤于1的值,将在消息发送失败的时候重新发送消息。该重试与客户端收到异常 重新发送并⽆⼆⾄。允许重试但是不设置 max.in.flight.requests.per.connection 为1,存 在消息乱序的可能,因为如果两个批次发送到同⼀个分区,第⼀个失败了重试,第⼆个成功了,则 第⼀个消息批在第⼆个消息批后。int类型的值,默认:0,可选值:[0,...,2147483647]
kafka消费消息的时候,我们可以使用同步提交offset也可以异步提交offset。
kafka怎样保证消息不丢失(面试有被问到)
在生产者端,我们可以设置acks的值,来保证消息不丢失
在broker端,我们可以设置分区和副本的机制来保证消息的不丢失。
在消费者端,我们可以使用在消息被正确消费之后手动提交偏移量的方式来保证。
也可以在消息投递之前,添加一张日志表(采用顺序写的方式)来保证。
kafka生产者都干了哪些事情?
为什么kafka在客户端发送的时候需要做一个缓存区?
1.为了减少IO的开销(单个 -> 批次)
2.减少GC
缓存发送:主要是这两个参数控制(只要1个满足)
大小:batch.size = 16384(16K)
时间:linger.ms = 0(说白了就是只要有了消息就会进行发送)
序列化器:将消息对象序列化为字节流,以便存储和传输。
分区器:根据配置的分区策略,将消息分配到不同的分区中
kafka生产者是怎样将消息发送到broker上面的?
后台是通过一个sender线程使用TCP的方式发送到broker上面的。当然它是自定义的TCP协议。
kafka搭建为什么要使用zookeeper?
1.每个broker在启动的时候都会向zookeeper中注册自己的信息(ip port),broker创建的节点是临时节点类型,如果broker发生宕机,这个临时节点也会被删除,这个时候就会发生主从切换。
2.每个topic下面可能会有多个分区信息,这个时候topic和对应分区以及broker的关系也会被记录到zookeeper中。
3.生产者的负载均衡,因为每个broker都会注册自己的信息,每个broker上线下线,生产者都能够感知到,这样生产者就能够负载均衡的发送消息了。
4.消费者的负载均衡,每个消费者分组都包含了若干个消费者,每条消息只会发送到组里面的一个消费者中。同上。
5.维系分区和消费者的关系。
6.消息消费的偏移量,也就是每个分区消息的消费进度信息。
搭建kafka集群的时候需要注意什么?
我总结有这么以下几点:
需要搭建zookeeper集群
kafka的安装配置,肯定是要改配置文件的。
网络配置:因为是broker集群,我们一定要确保每个节点都能够正常的进行通信,如果不能通信的话,这个时候我们就需要关闭防火墙,或者是开发某个端口。
版本一致性:确保Kafka集群的各个节点使用相同版本的Kafka,以避免兼容性问题。
Topic分区和副本配置:根据需求设置Topic的分区数和副本数,副本数可以保证数据的冗余和可用性1。
Broker ID唯一性:每个Kafka节点的broker.id
必须是唯一的,且集群中的每个节点都应该有一个唯一的标识1。
环境准备:包括节点规划、JDK环境配置等,确保每个节点都安装了JDK,因为Kafka需要Java运行环境2。
说一下kafka怎样保证消息的幂等性?
说白了就是不重复消费 就说redis的setnx
说一下怎样搭建kafka集群?
要搭建Kafka集群,您需要准备以下条件:
- 至少三台机器(或者在同一台机器上运行多个虚拟实例)。
- 相同的Kafka版本在所有机器上。
- 一个ZooKeeper集群,用于管理Kafka集群状态。
以下是搭建Kafka集群的基本步骤:
- 安装Java。
- 下载并解压Kafka到每台机器上。
- 配置ZooKeeper集群(如果尚未设置)。
- 配置Kafka服务器属性,例如
server.properties
。
以下是配置Kafka集群的关键设置示例:
broker.id: 每个Kafka实例的唯一标识。
listeners: 监听的地址和端口。
zookeeper.connect: ZooKeeper集群的地址。
示例server.properties
配置:
broker.id=1
listeners=PLAINTEXT://:9092
zookeeper.connect=zk1:2181,zk2:2181,zk3:2181
对于其他Kafka实例,您需要更改broker.id
来保证唯一性,并相应更改listeners
和zookeeper.connect
中的配置。
启动Kafka集群通常按以下顺序进行:
- 启动ZooKeeper集群。
- 启动Kafka服务。
启动Kafka服务的命令通常是:
kafka-server-start.sh /path/to/your/kafka/config/server.properties
确保每个Kafka实例的配置文件都已正确配置,并且所有端口都没有被其他服务占用。
什么是 Rebalance 机制
在 Kafka 中,Rebalance(再平衡)是一种机制,用于在消费者组(Consumer Group)内重新分配分区(Partition)的所有权。
当有消费者加入或离开消费者组、或者主题(Topic)的分区数量发生变化时,就会触发 Rebalance。它的目的是确保消费者组内的每个消费者能够公平合理地消费消息,避免某个消费者负担过重或部分消息得不到处理。
kafka自动提交的时候设置的超时时间是多少
Kafka 消费者默认的自动提交间隔时间是 5 秒。这个时间是通过auto.commit.interval.ms
参数来设置的。这意味着每隔 5 秒,消费者会自动将它所消费到的消息的偏移量提交给 Kafka 集群。
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) {
Properties props = new Properties();
// 设置其他必要的消费者配置参数,如bootstrap.servers等
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS, "3000");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 后续的消费者操作代码
}
}