Kafka
本系列主要介绍kafka基础知识。kafka是一个开源分布式事件流平台,被用于高性能数据通道、流分析、数据集成和关键任务应用程序。
核心能力
高吞吐量 可扩展 持久化存储 高可用
基础知识
kafka是一种消息队列,主要处理大量数据状态下的消息队列。 kafka教程
安装
安装zookeeper
docker pull wurstmeister/zookeeper
安装kafka
docker pull wurstmeister/zookeeper
使用docker-compose进行安装
version: '1'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 192.168.56.102
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
启动命令
docker-compose -f kafka.yml up -d
概要描述
版本:kafka 3.x
2种模式:点到点、发布订阅
作用:消息队列,流计算,存储
应用场景:解耦,异步通信,流量消峰
概念:
Topic
对应多个分区,多个副本(leader 副本,follower副本,只针对leader副本进行消费)
Broker Server
broker 0/1/2
Consumer
消费者,可以分组消费,某个分区只能有一个消费者进行消费
Producer
生产者
Zookeeper
记录谁是leader, kafka2.8.0以后也可以配置不采用ZK,使用kraft
linux脚本:
#!/bin/bash
case $1 in
"start")
for i in 1 2 3
do
ssh $i "xxx"
done
;;
"stop")
;;
esac
常用命令:
zookeeper-server-start.bat D:/soft/kafka_2.12-2.4.1/config/zookeeper.properties
zookeeper-shell.bat 127.0.0.1:2181
kafka-server-start.bat D:/soft/kafka_2.12-2.4.1/config/server.properties
# 3.x版本
kafka-server-start.bat -daemon D:/soft/kafka_2.12-2.4.1/config/server.properties
./kafka-topics.bat --bootstrap-server localhost:9092 -list
./kafka-topics.bat --bootstrap-server localhost:9092 --create --topic topic_test_20241213 --partitions 3
./kafka-topics.bat --bootstrap-server 127.0.0.1:9092 --list
# 查看当前topic信息
./kafka-topics.bat --bootstrap-server 127.0.0.1:9092 --topic {topic.name} --describe
# 增量消费数据
./kafka-console-producer.bat --topic topic_test_20241213 --broker-list 127.0.0.1:9092
# 从最开始消费数据
./kafka-console-consumer.bat --topic topic_test_20241213 --from-beginning --bootstrap-server 127.0.0.1:9092
./kafka-topics.bat --bootstrap-server localhost:9092 --create --topic demo.topic.20241213
./kafka-console-producer.bat --topic demo.topic.20241213 --broker-list 127.0.0.1:9092
./kafka-console-consumer.bat --topic demo.topic.20241213 --from-beginning --bootstrap-server 127.0.0.1:9092
./kafka-console-consumer.bat --topic demo.topic.20241213 --bootstrap-server 127.0.0.1:9092
集群启动
1zk+3kafka,kafka broker id: [0,1,2]
kafka-server-start.bat D:/soft/kafka_2.12-2.4.1/config/server.properties
kafka-server-start.bat D:/soft/kafka_2.12-2.4.1/config/server-9093.properties
kafka-server-start.bat D:/soft/kafka_2.12-2.4.1/config/server-9094.properties
#创建topic
./kafka-topics.bat --bootstrap-server 127.0.0.1:9092 --create --topic demo.topic --partitions 3 --replication-factor 3
#查看topic
./kafka-topics.bat --bootstrap-server 127.0.0.1:9092 --topic demo.topic --describe
#创建topic
./kafka-topics.bat --bootstrap-server 127.0.0.1:9092 --create --topic demo.topic.2 --partitions 2 --replication-factor 2
#查看topic
./kafka-topics.bat --bootstrap-server 127.0.0.1:9092 --topic demo.topic.2 --describe
#创建topic
./kafka-topics.bat --bootstrap-server 127.0.0.1:9092 --create --topic demo.topic.4 --partitions 4 --replication-factor 2
#查看topic
./kafka-topics.bat --bootstrap-server 127.0.0.1:9092 --topic demo.topic.4 --describe
生产者
生产者发送原理
Producer(主线程) -> send(ProducerRecord) -> Interceptors(拦截器),配合spark/flume使用,不使用原生的拦截器 -> Serializer(序列化器) -> Partitioner(分区器), RecordAccumulator(内存队列中操作,默认32M,多个Dqueue双端队列,每一个Dqueue中的元素ProducerBatch默认16K) -> Sender(读取线程),当Batch.size=16K,读取数据;或者linger.ms设置的时间,单位ms,到达时间就会进行数据读取 -> 发送到kafka集群,以brokerid为key,创建一个队列,最多缓存5个请求 -> selector: 通道,发送数据到kafka集群 -> kafka应答: 0(生产者发送数据不需要等待数据落盘应答);1(生产者发送数据,leader收到数据应答);-1/all(生产者发送数据,leader和isr队列所有节点收到数据后应答) org.apache.kafka.clients.producer.ProducerConfig#ACKS_CONFIG -> 成功:清除数据 -> 失败: 重试,int最大值
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class KafkaProducerTest {
public static void main(String[] args) {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(configs, new StringSerializer(), new StringSerializer());
// kafkaProducer.send(new ProducerRecord<>("demo.topic","demo"));
kafkaProducer.send(new ProducerRecord<>("demo.topic.2","demo2"));
kafkaProducer.send(new ProducerRecord<>("demo.topic.4","demo4"));
kafkaProducer.close();
}
}
生产者分区
分区作用
- 存储更灵活,可以将大数据进行切分,存储到多个分区上。
- 提高并行度,可以进行分组消费。
- 分区数量建议最大不能超过Broker Server的3倍,否则会造成分区管理困难。
代码实现
核心类:ProducerRecord.class
大致分为3中实现,具体可看org.apache.kafka.clients.producer.internals.DefaultPartitioner注释
指定partition
直接使用指定的partition进行发送数据
未指定partition,存在key:
使用key的hash值,和所有的partition数进行取模,得出partition
未指定partition,不存在key
使用Sticky Partition(黏性分区),会随机选一个分区,并尽可能使用该分区,直到该分区batch.size=16k,或者linger.ms时间到,再随机一个分区(与上次不同)
自定义分区
实现
org.apache.kafka.clients.producer.Partitioner
,参考:org.apache.kafka.clients.producer.RoundRobinPartitioner将自定义实现类注册到
KafkaProducer
java//Properties: //org.apache.kafka.clients.producer.ProducerConfig#PARTITIONER_CLASS_CONFIG //org.apache.kafka.clients.producer.ProducerConfig#PARTITIONER_CLASS_DOC
生产者吞吐量
调整批次数据大小
batch.size=16k~32k ,
配置信息:org.apache.kafka.clients.producer.ProducerConfig#BATCH_SIZE_DOC
linger.ms=5~100ms(会造成消息延迟)
配置信息:org.apache.kafka.clients.producer.ProducerConfig#LINGER_MS_DOC
压缩数据
compression.type=snappy
配置信息:org.apache.kafka.clients.producer.ProducerConfig#COMPRESSION_TYPE_DOC
调整缓冲区(RecordAccumulator)大小,默认32M
buffer.memory=64M
配置信息:org.apache.kafka.clients.producer.ProducerConfig#BUFFER_MEMORY_DOC
生产者可靠消息传递
配置项
acks
//org.apache.kafka.clients.producer.ProducerConfig#ACKS_CONFIG
//org.apache.kafka.clients.producer.ProducerConfig#ACKS_DOC
配置详解
acks=0
丢数,只保证数据发送
acks=1
丢数,只保证数据发送到leader,并且leader已经接收,如果在同步副本前down机会造成新选举的leader没有数据。
acks=all/-1
发送数据需leader和ISR队列所有节点都接收完成。
扩展问题:
如果follower节点故障,导致长时间阻塞
leader维护了一个动态的in-sync replica set(ISR)队列,意为和leader保持同步的follower+leader集合,如果follower长时间没有向leader发送通讯请求,则会被移除ISR队列。 该时间阈值由
replica.lag.time.max.ms
参数配置,默认30s。如果由于节点故障,导致leader和ISR队列节点一致(ISR最小副本数量
min.insync.replicas
默认为1),此时和acks=1的效果一样,存在丢数的风险,数据可靠性问题数据完全可靠条件:(ACK级别设置为-1) + (分区副本大于等于2) + (ISR应答的最小副本数量大于等于2)
总结
acks=0,基本不使用;acks=1,传输普通日志,允许个别丢数;acks=-1,数据可靠性要求比较高。
生产者数据重复消费
acks=-1, leader+ISR队列节点已全部接收完成,此时leader下线,选举新leader,会接收重复数据?
消息传递语义: 至少一次:(ACK级别设置为-1) + (分区副本大于等于2) + (ISR应答的最小副本数量大于等于2),不保证不重复 至多一次:acks=0,不保证数据不丢失
[精确一次]:至少一次 + 幂等性
幂等性:
配置项:enable.idempotence
,默认是true
Producer无论发送多少次重复数据,Broker端只会持久化一条,保证了不重复。
重复数据判断标准:
具有<PID,Partition,SeqNumber>相同主键的消息提交时,Broker只会持久化一条。其中PID在kafka重启时都会分配一个新的,Partition表示分区号,Sequence Number是单调自增的。
总结
幂等性只能保证单分区单会话不重复。
生产者事务
解决幂等性缺陷,只能保证单分区单会话保证不重复。
配置项
//org.apache.kafka.clients.producer.ProducerConfig#TRANSACTION_TIMEOUT_CONFIG
//org.apache.kafka.clients.producer.ProducerConfig#TRANSACTIONAL_ID_CONFIG
前提条件
开启事务,必须开启幂等性(enable.idempotence)
Producer使用事务功能前,必须先自定义一个唯一的transaction.id。即使客户端挂掉,重启后也能保证继续处理未完成的事务。
底层原理
名词解释 broker(transaction coordinator): 事务协调器 broker(Topic-partition Leader): 消息Topic对应的分区leader broker(transaction_topic): 存储事务相关信息的主题;默认有50个分区,每个分区负责一部分事务。事务划分根据transaction.id的hashcode%50,计算出该事务属于哪个分区。该分区Leader副本所在的broker节点即为这个transaction.id对应的transaction coordinator节点 步骤
- kafka producer向broker(transaction coordinator)请求producer id;
- broker (transaction coordinator) 返回producer id;
- kafka producer向broker (Topic-partition Leader)发送Topic;
- kafka producer向broker(transaction coordinator)发送commit请求;
- broker(transaction coordinator)持久化commit请求, 事务协调器向事务主题
- (transaction_topic,_transaction_state-分区-Leader存储事务信息的特殊主题);
- broker(transaction coordinator)返回commit成功;
- broker(transaction coordinator)后台发送commit请求到broker (Topic-partition Leader);
- broker (Topic-partition Leader)返回成功到broker(transaction coordinator);
- broker(transaction coordinator)持久化事务信息,事务协调器向事务主题
- (transaction_topic,_transaction_state-分区-Leader存储事务信息的特殊主题);
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.HashMap;
import java.util.Map;
public class KafkaProducerTransaction {
public static void main(String[] args) {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
//开启事务必须幂等性开启+transaction.id
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "demo.transaction.id");
configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(configs, new StringSerializer(), new StringSerializer());
kafkaProducer.initTransactions();
kafkaProducer.beginTransaction();
try {
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("demo.topic.20241213","transaction"+ i));
}
int i = 1 / 0;
}catch (Exception e) {
kafkaProducer.abortTransaction();
}finally {
kafkaProducer.commitTransaction();
}
kafkaProducer.close();
}
}
生产者生产数据有序
多分区,分区与分区间无序(可在消费者端加载所有数据,进行数据排序)。
单分区内,有序,条件如下:
配置项:
org.apache.kafka.clients.producer.ProducerConfig#MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION org.apache.kafka.clients.producer.ProducerConfig#MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC
kafka 1.x版本: max.in.flight.requests.per.connection=1(不需要考虑是否开启幂等性),该值是broker缓存的请求数,默认最多是5个。
kafka 1.x版本后:
未开启幂等性> max.in.flight.requests.per.connection=1
开启幂等性> max.in.flight.requests.per.connection <= 5, 启用幂等性后,kafka服务端会缓存producer发来的最近5个request的元数据,[如果顺序不一致,会在服务端进行一次排序],都可保证最近5个request的数据有序。
zookeeper存储信息
可视化工具
prettyzoo
配置文件
多节点部署,[server.proterties]zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182/kafka
此配置是将kafka配置到zookeeper的kafka节点下, /kafka/ /kafka/brokers /kafka/brokers/ids
存储节点信息
/kafka/ /kafka/admin /kafka/brokers ->/kafka/brokers/ids 记录在线brokers信息 ->/kafka/brokers/topics/{topic_name}/partition/{partition_num}/state 记录leader和isr队列信息 /kafka/cluster /kafka/consumers ->kafka0.9版本之前存储offset信息 ->kafka0.9版本之后存储在kafka主题中 /kafka/controller -> 辅助leader选举 /kafka/config /kafka/log_dir_event_notification /kafka/latest_producer_id_block /kafka/lsr_change_notification /kafka/Controller_epoch
kafka Broker
工作流程
名词解释:
AR: kafka分区中所有副本统称
工作流程:
broker启动的时候在zookeeper注册节点/kafka/brokers
/kafka/controller, 哪个broker先注册就为Controller
由Controller监听brokers节点变化
controller决定leader选举
选举规则:在isr中存活为前提,按照AR中排在前面的优先。例如:AR[1,0,2],ISR[0,1,2]那么leader就会按照1,0,2顺序轮询
broker leader controller将节点信息上传到zk,/kafka/brokers/topics/{topic_name}/partition/{partition_num}/state
其他broker controller从zk同步信息,以segement存储到本地硬盘;如果leader下线,就会重新选举,重复controller选举的过程
segement:
log data [segement: 1G, *.log *.index *.timeindex]
kafka 节点服役和退役
新增节点
前提条件:安装jdk,kafka,修改kafka, server.properties: broker.id
执行负载均衡操作:
创建负载均衡文件,包含要迁移的主题信息topic-to-move.json
json{ "topics": [ { "topic": "first" } ], "version": 1 }
执行脚本
batbin/kafka-reassign-partitions.bat --bootstrap-server {broker-server} --topic-to-move-json-file {topic-to-move.json} --broker-list "0,1,2,3" --generate
broker-list: 包含新增broker-server id
创建副本存储计划,第2步执行的输出内容[proposed partition reassignment configuration],increase-replication-factor.json
执行脚本
batbin/kafka-reassign-partitions.bat --bootstrap-server {broker-server} --reassignment-json-file {increase-replication-factor.json} --zookeeper 127.0.0.1:2181 --execute
验证副本存储计划
batbin/kafka-reassign-partitions.bat --bootstrap-server {broker-server} --reassignment-json-file {increase-replication-factor.json} --zookeeper 127.0.0.1:2181 --verify
移除节点
前提条件:已存在broker节点
执行负载均衡操作:
创建负载均衡文件,包含要迁移的主题信息topic-to-move.json
json{ "topics": [ { "topic": "first" } ], "version": 1 }
执行脚本
bat./kafka-reassign-partitions.bat --bootstrap-server 127.0.0.1:9092 --topics-to-move-json-file topic-to-move.json --broker-list "0,1,2" --zookeeper 127.0.0.1:2181 --generate
broker-list: 排除要移除的broker-server id
创建副本存储计划,第2步执行的输出内容[proposed partition reassignment configuration],decrease-replication-factor.json
执行脚本
batbin/kafka-reassign-partitions.bat --bootstrap-server {broker-server} --reassignment-json-file {decrease-replication-factor.json} --zookeeper 127.0.0.1:2181 --execute
验证副本存储计划
batbin/kafka-reassign-partitions.bat --bootstrap-server {broker-server} --reassignment-json-file {decrease-replication-factor.json} --zookeeper 127.0.0.1:2181 --verify
停止要移除的kafka broker server
kafka副本
副本基本信息
副本作用:提高数据可靠性。
默认副本数量为1个,生产环境一般配置2个,确保数据可靠性。太多会导致增加磁盘空间,增加网络数据传输,降低效率。数量不能超过Broker Server的数量。
副本分为leader和follower,生产者只会将数据发送到leader,follower会从leader上进行同步。
分区所有的副本统称为AR(Assigned Replicas)。
AR = ISR + OSR
ISR: 表示和Leader保持同步的Follower集合。如果Follower长时间未向Leader发送通讯请求或者同步数据,则该Follower被提出ISR。该时间阈值由replica.lag.time.max.ms参数设定,默认30s。Leader发生故障后,会从ISR中选择新的Leader。
OSR: 表示Follower与Leader副本同步时,延迟过多的副本。
Leader选举流程
同kafka Broker工作流程,主要是以AR副本顺序进行Leader节点的选举。
Leader、Follower故障处理
LEO: Log End Offset,每个副本的最后一个offset,LEO就是最新的offset+1。
HW: High Watermark,所有副本中最小的LEO。
Follower故障处理细节
- follower故障会被踢出ISR队列。
- leader和其他follower会继续正常接收数据,HW和LEO会继续往前移动。
- follower故障恢复后,会从本地的log文件中读取最后一次的HW,将log文件中大于HW的数据清除掉,重新从leader中同步数据。
- 直到follower的LEO大于等于Leader的Hw为止,此时会将该follower重新加入到ISR队列中。
Leader故障处理细节
- 该leader节点会被踢出ISR队列。
- 重新进行leader选举,其他的follower与leader进行数据同步,若follower的LEO大于leader的HW,会将自己的数据清除掉,与leader保持一致,直到follower的LEO大于等于Leader的Hw为止。
总结:leader故障处理,只能保证数据一致性,不能保证数据完整性,会造成数据丢失。
分区副本分配
leader和follower尽量均匀分配到每一台节点上。每一个分区对应的副本分配尽量不一致,保证副本在每个节点尽量平均。
#创建topic
./kafka-topics.bat --bootstrap-server 127.0.0.1:9092 --create --topic demo.topic.4 --partitions 4 --replication-factor 2
#查看topic
./kafka-topics.bat --bootstrap-server 127.0.0.1:9092 --topic demo.topic.4 --describe
上图,节点数为3,分区为4,副本数为2
Replicas: 以节点数为一组,开始为Partition0~3。开始副本为0,1;那么第二组Partition4, 开始副本则为0,2。
生产经验-手动调整分区
windows版本kafka_2.12-2.4.1未调整成功。
#创建topic, 当前broker [0,1,2]
./kafka-topics.bat --bootstrap-server 127.0.0.1:9092 --create --topic demo.topic.self --partitions 3 --replication-factor 2
#查看topic
./kafka-topics.bat --bootstrap-server 127.0.0.1:9092 --topic demo.topic.self --describe
上图,节点数为3,分区为3,副本数为2 。
调整副本全部放到broker.ids[0,1]上。
创建副本存储计划。increase-replication-factor.json
如果实际的分区数比节点数多,可能会存在问题。
./kafka-topics.bat --bootstrap-server 127.0.0.1:9092 --alter--topic demo.topic.self --partitions 3
json{ "version": 1, "partitions": [ { "topic": "demo.topic.self", "partition": 0, "replicas": [0] //必须要是Replicas中存在的 }, { "topic": "demo.topic.self", "partition": 1, "replicas": [0] //必须要是Replicas中存在的 }, { "topic": "demo.topic.self", "partition": 2, "replicas": [1] //必须要是Replicas中存在的 } ] }
执行副本存储计划。
bat./kafka-reassign-partitions.bat --bootstrap-server 127.0.0.1:9092 --reassignment-json-file increase-replication-factor.json --zookeeper 127.0.0.1:2181 --execute
验证副本存储计划。
bat./kafka-reassign-partitions.bat --bootstrap-server 127.0.0.1:9092 --reassignment-json-file increase-replication-factor.json --zookeeper 127.0.0.1:2181 --verify
查看该topic
batkafka-topics.bat --bootstrap-server 127.0.0.1:9092 --topic demo.topic.self --describe
Leader Partition自动平衡
正常情况,kafka会自动把leader partition均匀分散在各个机器上,保证各节点读写吞吐均匀。如果有的节点down机,会导致leader partition过于集中在存活的机器上,导致请求压力增高。当down机节点恢复后,会变成follower partation,读写请求很低,容易造成集群负载步均衡。
自动平衡参数:auto.leader.rebalance.enable
默认是true。
leader.imbalance.per.broker.percentage
默认是10%。表示每个broker允许的不平衡的leader比率,如果超过该值,控制器会触发leader的平衡。
leader.imbalance.check.interval.seconds
默认是300s。表示leader负载是否平衡的间隔时间。
算法规则:
针对于broker server某一个分区的leader和Replicas中展示首位的顺序不一致,Replicas 优先副本不是 Leader节点,所以不平衡加1,AR的副本数=Replicas数量。
所以broker server 不平衡率1/4>10%,需要再平衡。
Replicas: 表示broker ids
建议不开启自平衡参数。或者调大不平衡比率值,避免频繁进行自动平衡。
增加副本因子
不能通过命令行方式增加topic的replication-factor
可以使用kafka-reassign-partitions
脚本,创建存储计划。调整方式同上。
文件存储
文件存储机制
Topic是逻辑概念,而partition是物理概念,每个partition对应一个log文件。partition最小的存储单元是segment,默认大小为1G。
segment组成:
- log日志文件,当前segment第一条消息的offset命名
- index偏移量索引文件,当前segment第一条消息的offset命名
- timeindex时间戳索引文件
- 其他文件
数据默认是7天进行清理,由timeindex进行日期判断。
为防止log文件过大导致定位效率低下,kafka采用分片和索引机制,将每个partition分为多个segment,多个segment是追加的方式存储。segment文件夹命名规则:topic名称+分区序号,demo-0。
index文件为稀疏索引,大约每往log文件写入4kb数据,会往index文件写入一条索引,offset为相对offset。参数log.index.interval.bytes
默认4kb。
查看segment中log和index文件内容:
kafka-run-class.bat kafka.tools.DumpLogSegments --files D:\soft\kafka_2.12-2.4.1\9092\logs\demo-0\00000000000000000000.index
文件清理策略
kafka日志默认保存时间为7天。可以通过如下参数修改保存时间:
- log.retention.hours,最低优先级小时,默认168h=7天。
- log.retention.minutes,分钟。
- log.retention.ms,最高优先级毫秒。
- log.retention.check.interval.ms,检查周期,默认5分钟。
日志清理策略由delete和compact两种。
delete日志删除,将过期数据删除
log.cleanup.policy=delete
所有数据启用删除策略,默认启用删除策略。
- 基于时间,默认打开,以segment中所有记录中的最大时间作为该文件时间戳。
- 基于大小,默认关闭,超过设置的所有日志总大小,删除最早的segment。
log.retention.bytes
默认等于-1,表示无穷大。
compact日志压缩
log.cleanup.policy=compact
对于相同key,不同的value值只保留最后一个版本。
只适用于特殊场景,比如消息的key为用户ID,value是用户的资料。只保留所有用户最新的数据。
高效读写数据
分布式集群,采用分区技术,并行度高。
读数据采用稀疏索引,可以快速定位要消费的数据。
顺序写磁盘。写入数据的是按照追加的方式进行写入,为顺序写。顺序写600M/S,随机读写100k/s。
页缓存+零拷贝技术。
零拷贝:kafka broker不关心存储数据,交由linux内核处理(网卡零拷贝),不用kafka应用层进行数据处理,传输效率高。
PageCahe页缓存,操作系统底层提供。
消费者
消费方式
- 拉模式(pull),采用该消费方式,由消费者自己决定消费数据的速率。如果broker没有数据,也会造成一直空转。
- 推模式(push)
整体工作流程
工作流程
生产者生产数据->分区leader节点->消费者从分区leader节点拉取数据
消费者组内的消费者不能消费同一分区的数据,会造成消息重复消费。
消费者可以消费多分区的数据。
消费者消费消息的位置由offset表示,低版本(<0.9)存储在zk下的/kafka/consumer节点下;现在存储在kafka的特定主题__consumer_offsets。
消费者组原理
Consumer Group(CG) :消费者组,由多个consumer组成,形成一个消费者组的条件,是所有消费者的groupid相同。
- 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内的消费者消费。
- 消费者组之间互不影响。所有消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
消费者组消费消息:
单消费者组,组内消费者数量小于分区数量
按顺序分配分区数到消费者
单消费者组,组内消费者数量等于分区数量
一一对应分配到消费者
单消费者组,组内消费者数量大于分区数量
先一一对应分配到消费者,剩余的消费者保持空闲。
多消费者组,一组大于分区数量,一组小于分区数量
组内消费者数量等于分区数量,一一对应分配到消费者
组内消费者数量小于分区数量,按顺序分配分区数到消费者
消费者组初始化流程:
coordinator
: 辅助实现消费者组的初始化和分区分配,存储在每一个broker server。
coordinator节点选择:groupid的hashcode值%50(__consumer_offsets的分区数量)
由于__consumer_offsets
默认50分区,会分布在broker server的节点上,根据上一步的取模值,会计算__consumer_offsets
对应的分区落到哪个broker server节点上,以此节点的coordinator
作为该消费者组的初始化和分配工作。
- 消费者组中每一个consumer会向coordinator发送加入消费者组(joinGroup)的请求
- coordinator会选举该消费者组的leader
- coordinator把要消费的topic情况发送给leader消费者
- leader 消费者指定具体的消费方案
- leader 消费者将消费方案发送给coordinator
- coordinator将消费方案下发给其他的consumer
每个消费者都会和coordinator保持心跳(heartbeat.interval.ms
=3s),一旦超时(session.timeout.ms
=45s)该消费者就会被移除,并触发再平衡;或者消费者处理消息的时间过长(max.poll.interval.ms
=5min),也会触发再平衡。
配置属性:
org.apache.kafka.clients.consumer.ConsumerConfig#HEARTBEAT_INTERVAL_MS_CONFIG
org.apache.kafka.clients.consumer.ConsumerConfig#SESSION_TIMEOUT_MS_CONFIG
org.apache.kafka.clients.consumer.ConsumerConfig#MAX_POLL_INTERVAL_MS_CONFIG
消费者组详细消费流程
消费者先创建网络连接客户端,发送sendFetches请求Kafka broker消费;参数配置如下:
fetch.min.bytes
,每批次最小抓取大小,默认1字节org.apache.kafka.clients.consumer.ConsumerConfig#FETCH_MIN_BYTES_CONFIG
fetch.max.wait.ms
,一批数据最小值未达到的超时时间,默认500msorg.apache.kafka.clients.consumer.ConsumerConfig#FETCH_MAX_WAIT_MS_CONFIG
fetch.max.bytes
,每批次最大抓取大小,默认50Morg.apache.kafka.clients.consumer.ConsumerConfig#FETCH_MAX_BYTES_CONFIG
fetch.min.bytes
和fetch.max.wait.ms
任一满足都会触发拉取。kafka broker会进行回调,将拉取的数据放到消息队列中。
消费者会从回调队列中抓取数据,参数配置如下:
max.poll.records
一次拉取数据的最大条数,默认500条org.apache.kafka.clients.consumer.ConsumerConfig#MAX_POLL_RECORDS_CONFIG
消费者抓取数据后,先经过反序列化,再经过拦截器处理,最终得到需要的数据。
kafka broker不进行数据处理,只存储数据,只在生产者和消费者端进行数据处理,可以通过拦截器进行消息处理。
消费者API
前提条件
消费者必须配置group.id,发序列化,broker server等。
订阅主题
import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.regex.Pattern;
public class KafkaConsumerTopicTest {
// kafka-console-producer.bat --broker-list 127.0.0.1:9092 --topic demo
// kafka-console-consumer.bat --bootstrap-server 127.0.0.1:9092 --topic demo
public static void main(String[] args) {
Map<String, Object> config = new HashMap<>();
//设置boorstrp-server
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
//设置group.id, 命令行会随机生成一个
config.put(ConsumerConfig.GROUP_ID_CONFIG, "topic_consumer");
//设置配置参数和反序列化
KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<String, String>(config,
new StringDeserializer(), new StringDeserializer());
// kafkaConsumer.subscribe(Lists.asList("demo", new String[]{}));
kafkaConsumer.subscribe(Pattern.compile("demo"));
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
Iterator<ConsumerRecord<String, String>> iterator = consumerRecords.iterator();
System.out.println("开始拉取数据:" + JSON.toJSONString(consumerRecords));
while (iterator.hasNext()) {
System.out.println(iterator.next());
}
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
订阅分区
生产者按指定分区发送数据
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.HashMap;
import java.util.Map;
public class KafkaProducerPartitionTest {
public static void main(String[] args) {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(configs, new StringSerializer(), new StringSerializer());
for (int i = 0; i < 5; i++) {
//指定分区发送数据
kafkaProducer.send(new ProducerRecord<>("demo",0, "demo_key" + i,"demo_value" + i));
}
kafkaProducer.close();
}
}
消费者订阅指定分区数据
import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
public class KafkaConsumerPartitionTest {
// kafka-console-consumer.bat --bootstrap-server 127.0.0.1:9092 --topic demo
public static void main(String[] args) {
Map<String, Object> config = new HashMap<>();
//设置boorstrp-server
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
//设置group.id, 命令行会随机生成一个
config.put(ConsumerConfig.GROUP_ID_CONFIG, "topic_consumer");
//设置配置参数和反序列化
KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<String, String>(config,
new StringDeserializer(), new StringDeserializer());
ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
topicPartitions.add(new TopicPartition("demo", 0));
kafkaConsumer.assign(topicPartitions);
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
Iterator<ConsumerRecord<String, String>> iterator = consumerRecords.iterator();
System.out.println("开始拉取数据:" + JSON.toJSONString(consumerRecords));
while (iterator.hasNext()) {
System.out.println(iterator.next());
}
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
消费者组
多个消费者保证同一group.id即可成为一个消费者组。
分区分配,再平衡
分区(消费者组怎样分配消费者消费哪个分区)主流策略:
- Range
- RoundRobin
- Stickly
- CooperativeSticky
可以通过配置参数partition.assignment.strategy
修改分区分配策略。
org.apache.kafka.clients.consumer.ConsumerConfig#PARTITION_ASSIGNMENT_STRATEGY_CONFIG
默认策略是Range+CooperativeSticky, kafka 可以同时使用多个分区分配策略。
org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor
自定义策略,需要实现org.apache.kafka.clients.consumer.ConsumerPartitionAssignor
接口。
Range策略
org.apache.kafka.clients.consumer.RangeAssignor
针对每个topic而言,对同一个topic的分区按照序号进行排序,并对消费者按照字母顺序进行排序。
如果分区的topic过多,容易产生数据倾斜,消费消息时过于集中到某一个消费者。
若某一个消费者下线,待超时时间45s到,触发再平衡会将下线消费者的监听分区,全部划分到存活的某一消费者中。
公式:分区数/消费者数;需对分区和消费者进行排序。详情参考RangeAssignor
docs。
- 余数为0,平均分配
- 余数不为0,先按整除部分分配,将余数按照消费者从上到下进行依次补充,直到补充完整。(exp: 如果余数为1,则只补充道消费者1;余数为2,补充消费者1,2)
RoundRobin
org.apache.kafka.clients.consumer.RoundRobinAssignor
针对多有topic而言,partition和consumer进行hashcode排序,最后轮询分配到consumer。
若某一个消费者下线,待超时时间45s到,触发再平衡会将下线消费者的监听分区,追加轮询到存活消费者中,存存活消费者已分配的分区不变。之后再有消息传输,会重新进行轮询计算,按照存活消费者进行轮询。
Stickly
执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配变动,节省大量的开销。
org.apache.kafka.clients.consumer.StickyAssignor
不用排序随机分配,尽可能保持平均分配。
若某一个消费者下线,待超时时间45s到,触发再平衡会将下线消费者的监听分区,追加平均分配到存活消费者中,存存活消费者已分配的分区不变。之后再有消息传输,消费者会保证之前的分配策略,将新的分区平均分配到存活的消费者,按照存活消费者进行轮询。
CooperativeStickly
org.apache.kafka.clients.consumer.CooperativeStickyAssignor
合作者粘性策略,需要根据版本的描述信息进行使用,2.4.1版本如下:
offset位移
Offset的默认维护位置
0.9x之后维护在系统主题里,之前维护再zk中。
__consumer_offsets主题采用key和value的方式存储数据,key是group.id+topic+分区号,value是当前的offset值。每隔一段时间,kafka内部就会对这个topic进行compact,因此每个key对应的是最新的value值。
消费offset案例:
通过参数配置该topic可见,exclude.internal.topics
默认为false。将该值改为true。
org.apache.kafka.clients.consumer.ConsumerConfig#EXCLUDE_INTERNAL_TOPICS_CONFIG
通过往topic中写入数据,查看__consumer_offsets主题的内容。
自动提交offset
kafka提供了自动提交offset功能,为了使用户只关注于业务逻辑的处理
自动提交offset相关参数:
enable.auto.commit
是否开启自动提交offset功能,默认是true。
auto.commit.interval.ms
自动提交offset的时间间隔,默认是5s。
org.apache.kafka.clients.consumer.ConsumerConfig#ENABLE_AUTO_COMMIT_CONFIG
org.apache.kafka.clients.consumer.ConsumerConfig#AUTO_COMMIT_INTERVAL_MS_CONFIG
producer生产数据到分区,consumer组的消费者不断从分区从消费数据,offset的维护由consumer每5s进行触发一次,与消费数据相互隔离。
手动提交offset
手动提交分为2种方式:同步提交、异步提交
同步提交:commitSync,提交最新offset值时,会阻塞cosumer消费数据,必须等offset提交完毕,才能消费下一批数据。
异步提交:commitAsync,提交最新offset值时,不会阻塞consumer消费,提交完后就开始消费下一批数据了。
import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.regex.Pattern;
public class KafkaConsumerOffsetTest {
// kafka-console-producer.bat --broker-list 127.0.0.1:9092 --topic demo
// kafka-console-consumer.bat --bootstrap-server 127.0.0.1:9092 --topic demo
public static void main(String[] args) {
Map<String, Object> config = new HashMap<>();
//设置boorstrp-server
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
//设置group.id, 命令行会随机生成一个
config.put(ConsumerConfig.GROUP_ID_CONFIG, "topic_consumer");
//设置offset是否自动提交
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
//offset自动提交间隔时间
config.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
//设置配置参数和反序列化
KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<String, String>(config,
new StringDeserializer(), new StringDeserializer());
// kafkaConsumer.subscribe(Lists.asList("demo", new String[]{}));
kafkaConsumer.subscribe(Pattern.compile("demo"));
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
Iterator<ConsumerRecord<String, String>> iterator = consumerRecords.iterator();
System.out.println("开始拉取数据:" + JSON.toJSONString(consumerRecords));
while (iterator.hasNext()) {
ConsumerRecord<String, String> next = iterator.next();
System.out.println(next);
}
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
//同步提交
kafkaConsumer.commitSync();
//异步提交
// kafkaConsumer.commitAsync();
}
}
}
指定offset消费
auto.offset.reset
=earlist | latest | none, 默认是latest
- earlist: 自动将偏移量重置为最早的偏移量,类似命令:--from-beginning
- latest: 默认值,自动将偏移量置为最新的偏移量
- none: 如果未找到消费者组的偏移量,则向消费者排除异常
org.apache.kafka.clients.consumer.ConsumerConfig#AUTO_OFFSET_RESET_CONFIG
import ch.qos.logback.classic.Level;
import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
public class KafkaConsumerOffsetSeekTest {
// kafka-console-producer.bat --broker-list 127.0.0.1:9092 --topic demo
// kafka-console-consumer.bat --bootstrap-server 127.0.0.1:9092 --topic demo
public static void main(String[] args) throws InterruptedException {
//org.apache.logging.log4j.simple.SimpleLogger.SimpleLogger
// System.setProperty("org.apache.logging.log4j.simplelog.ROOT.level", "error");
//配置kafka使用的日志等级,不打印info、debug等日志信息
ch.qos.logback.classic.Logger logger1 = (ch.qos.logback.classic.Logger) LoggerFactory.getLogger("org.apache.kafka");
logger1.setLevel(Level.ERROR);
Map<String, Object> config = new HashMap<>();
//设置boorstrp-server
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
//设置group.id, 命令行会随机生成一个
config.put(ConsumerConfig.GROUP_ID_CONFIG, "topic_consumer_seek_10");
//设置配置参数和反序列化
KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<String, String>(config,
new StringDeserializer(), new StringDeserializer());
kafkaConsumer.subscribe(Pattern.compile("demo"));
//进行分区信息的获取
Set<TopicPartition> assignment = kafkaConsumer.assignment();
//此处初始化的时候会有coordinator进行消费者leader选举,因此需要再次阻塞,直到获取到值
while (assignment.isEmpty()) {
// 辅助进行leader选举
kafkaConsumer.poll(Duration.ofSeconds(1));
assignment = kafkaConsumer.assignment();
}
for (TopicPartition topicPartition : assignment) {
System.out.println("topicPartition: " + topicPartition);
kafkaConsumer.seek(topicPartition, 10);
}
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
Iterator<ConsumerRecord<String, String>> iterator = consumerRecords.iterator();
System.out.println("开始拉取数据:" + JSON.toJSONString(consumerRecords));
while (iterator.hasNext()) {
ConsumerRecord<String, String> next = iterator.next();
System.out.println(next);
}
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
指定时间消费
中转的方式实现以指定时间进行消费,将分区和时间参数传递,使用api查询分区下,该时间对应的offset信息。
注意:该时间在分区下必须存在数据,否则转换结果为空。
import ch.qos.logback.classic.Level;
import cn.hutool.core.lang.Console;
import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
public class KafkaConsumerOffsetSeekTimeStampTest {
// kafka-console-producer.bat --broker-list 127.0.0.1:9092 --topic demo
// kafka-console-consumer.bat --bootstrap-server 127.0.0.1:9092 --topic demo
public static void main(String[] args) throws InterruptedException {
//org.apache.logging.log4j.simple.SimpleLogger.SimpleLogger
// System.setProperty("org.apache.logging.log4j.simplelog.ROOT.level", "error");
//配置kafka使用的日志等级,不打印info、debug等日志信息
ch.qos.logback.classic.Logger logger1 = (ch.qos.logback.classic.Logger) LoggerFactory.getLogger("org.apache.kafka");
logger1.setLevel(Level.ERROR);
Map<String, Object> config = new HashMap<>();
//设置boorstrp-server
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
//设置group.id, 命令行会随机生成一个
config.put(ConsumerConfig.GROUP_ID_CONFIG, "topic_consumer_seek_time");
//设置配置参数和反序列化
KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<String, String>(config,
new StringDeserializer(), new StringDeserializer());
kafkaConsumer.subscribe(Pattern.compile("demo"));
//进行分区信息的获取
Set<TopicPartition> assignment = kafkaConsumer.assignment();
//此处初始化的时候会有coordinator进行消费者leader选举,因此需要再次阻塞,直到获取到值
while (assignment.isEmpty()) {
// 辅助进行leader选举
kafkaConsumer.poll(Duration.ofSeconds(1));
assignment = kafkaConsumer.assignment();
}
//构造要获取的时间段
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
for (TopicPartition topicPartition : assignment) {
//获取前一天
LocalDateTime localDateTime = LocalDateTime.now().plusDays(-2);
long dateTimeLong = localDateTime.toEpochSecond(ZoneOffset.UTC) * 1000;
Console.error("datetime: {}", dateTimeLong);
// timestampsToSearch.put(topicPartition, dateTimeLong);
//1737453269000
//1737444973545l
// timestampsToSearch.put(topicPartition, dateTimeLong);
timestampsToSearch.put(topicPartition, 1737444973545l);
}
Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap = kafkaConsumer.offsetsForTimes(timestampsToSearch);
for (TopicPartition topicPartition : assignment) {
System.out.println("topicPartition: " + topicPartition);
//此处可以进行鲁棒性处理,将异常信息捕获
OffsetAndTimestamp offsetAndTimestamp = topicPartitionOffsetAndTimestampMap.get(topicPartition);
Console.error("offset: {}", offsetAndTimestamp.offset());
kafkaConsumer.seek(topicPartition, offsetAndTimestamp.offset());
}
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
Iterator<ConsumerRecord<String, String>> iterator = consumerRecords.iterator();
System.out.println("开始拉取数据:" + JSON.toJSONString(consumerRecords));
while (iterator.hasNext()) {
ConsumerRecord<String, String> next = iterator.next();
System.out.println(next);
}
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
漏消费和重复消费
重复消费
自动offset,已消费了数据,由于异常原因,导致offset没有提交。
漏消费
手动offset,已提交offset,数据未消费完成。可能是异步提交offset,但是消费者消费时由于异常因素导致未正常消费消息。
解决重复消费和漏消费,需采用消费者事务。
消费者事务
消费者精确一次性消费,需要kafka消费端将消费过程和offset过程做原子绑定,如果存在下游消费者,也必须支持事务。
数据积压(消费者吞吐量)
增加Topic的分区数量,配套增加消费者数量,尽量保持消费者数=分区数。
提高每批次拉取的数据条数,默认是500条。一次拉取的数据过少(拉取条数/处理时间 < 生产速度)也会造成消息积压。配套调整参数,一次最大拉取的消息大小50M。
max.poll.records
一次拉取数据的最大条数,默认500条fetch.max.bytes
每批次最大抓取大小,默认50M
kafka-eagle监控
Mysql安装
todo...
Kafka-eagle安装
todo...
kafka-kraft模式
安装
kraft支持节点共用,既可以是broker,也可以是controller。
单安装kafka就可以启动,不需要依赖zookeeper。
启动
修改conf/kraft/server.properties种的配置:
node.id,controller.quorum.voters(node.id+主机:端口),advertised.listerners,log.dirs
初始化命令:
- 生成唯一编号
/bin/kafka-storage.sh random-uuid
- 使用该id格式化kafka存储目录
/bin/kafka-storage.sh format -t {uuid:A7B5c56D5d123235} -c {/usr/local/kafka/conf/kraft/server.properties}
- 启动kafka
/bin/kafka-server.start.sh -daemon conf/kraft/server.properties
外部系统集成
Flume
高可用、高可靠的分布式海量日志采集、聚合和传输系统,支持在日志系统中定制各类数据发送方,用于收集数据集;同时,也提供了对数据的简单处理,并写到各种数据接收方。
安装
安装hadoop,flume
可能出现的问题:
flume/lib下的guava-*.jar 可能会与handoop下的jar包冲突,建议删除该路径下jar,并配置handoop的环境变量。
生产者
监听linux本地文件->flume tailddirresource -> memory channel -> kafka sink -> kafka broker -> consumer
配置flume配置文件:jobs/file-to-kafka.conf
消费者
kafka producer生产数据 -> kafka broker -> flume kafkasource -> memory channel -> logger sink
配置flume配置文件:jobs/kafka-to-file.conf
Flink
流式处理框架,能够实现低延迟的实时数据处理。与Spark相比,Flink更专注于流处理,可以提供更好的事件处理和状态管理。并且还支持批处理任务。
安装
导入依赖jar包。
flink-java, flink-streaming-.jar, flink-clinets-.jar, flink-connector-kafka-*.jar,
生产者
java-api操作
消费者
java-api操作
Spark
快速、通用的大数据处理框架,拥有比Hadoop更好的性能和更广泛的应用领域。支持多种数据处理模式,如批处理、流处理、机器学习等。
安装
先进行jdk-8、scala-2.12.11的安装,并配置环境变量
生产者
导入依赖:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>3.0.0</version>
</dependency>
需要使用scala去操作。
消费者
导入依赖:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.0.0</version>
</dependency>
需要使用scala去操作。
Springboot
配置文件
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>kafka</artifactId>
<version>1.0-SNAPSHOT</version>
<!--
http://190.2.37.136:8081/#browse/search=keyword%3Dorg.springframework.boot%20AND%20version%3D2.6.14
-->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.10.RELEASE</version>
<relativePath/>
</parent>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<junit.version>4.12</junit.version>
<log4j.version>1.2.17</log4j.version>
<lombok.version>1.16.18</lombok.version>
<jedis.version>4.3.1</jedis.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
<!--<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.1.2</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.10</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<!--swagger2-->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.9.2</version>
</dependency>
</dependencies>
<repositories>
<repository>
<id>nfh-maven-public</id>
<name>nfh-maven-public</name>
<url>http://190.2.37.136:8081/repository/maven-public/</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
</project>
application.yml
server:
port: 7777
spring:
application:
name: kafka
swagger2:
enabled: true
mvc:
## org.springframework.boot.autoconfigure.web.servlet.WebMvcProperties.Pathmatch.matchingStrategy: springboot 2.6x 将pathmatch 默认值调整
pathmatch:
matching-strategy: ANT_PATH_MATCHER
jackson:
serialization:
FAIL_ON_EMPTY_BEANS: false
logging:
level:
root: info
org.example: info
# pattern:
# console: %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level &logger- %msg%n
# file: %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level &logger- %msg%n
file:
name: test.log
kafka:
bootstrapServers:
-localhost:9092
properties:
# 默认值:org.springframework.boot.autoconfigure.kafka.KafkaProperties.Producer#keySerializer
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: org.apache.kafka.common.serialization.StringSerializer
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
启动类:
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Main {
public static void main(String[] args) {
SpringApplication.run(Main.class, args);
}
}
配置类:
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
@EnableSwagger2
@Configuration
public class Swagger2Config {
@Value("${spring.swagger2.enabled}")
private Boolean enabled;
@Bean
public Docket createRestApi() {
return new Docket(DocumentationType.SWAGGER_2)
.apiInfo(apiInfo())
.enable(enabled)
.select()
.apis(RequestHandlerSelectors.basePackage("org.example"))
.paths(PathSelectors.any())
.build();
}
private ApiInfo apiInfo() {
return new ApiInfoBuilder().build();
}
}
生产者
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.ExecutionException;
//http://localhost:7777/swagger-ui.html
//http://localhost:7777/swagger-ui.html
@Api(tags = "测试消息发送")
@RestController
public class KafkaDemoController {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@ApiOperation("test")
@GetMapping(path = "/testKafka")
public Object test(String para) throws ExecutionException, InterruptedException {
ListenableFuture<SendResult<String, String>> helloKafka = kafkaTemplate.send("demo.topic.20241213", para);
SendResult<String, String> result = helloKafka.get();
return result;
}
}
消费者
import org.springframework.kafka.annotation.KafkaHandler;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
@KafkaListener(groupId = "demo.groupid", topics = "demo.topic.20241213")
public class KafkaDemoListener {
@KafkaHandler
public void handleListener(String message, @Headers Map<String, Object> map) {
System.out.println("message: " + message);
System.out.println("Headers: " + map);
}
}
生产调优
硬件选择
例:100万日活人数,每人生产日志100条,每日数据量大概 1亿条。
处理日志速度:1亿条 / (24 * 3600s) ≈ 1150/s
1条日志存储:0.5k ~ 2k , 按 1k计算,1150条 * 1k ≈ 1M/s
高峰值大约是平均值的20倍左右:1m/s * 20倍 = 20m/s ~ 40m/s(1条日志存储最大值计算)
服务器台数
经验公式:
台数 = 2 * (生产者峰值生产速率 * 副本数 / 100) + 1,副本数一般设置为2。
代入公式:2 * (20m/s * 2 / 100 )+ 1 = 3台
磁盘选择
kafka按照顺序读写,对磁盘要求不高,机械硬盘和固态硬盘顺序读写速度差不多,因此选择机械硬盘。(固态硬盘对随机读写效率高)
1亿条 * 1k ≈ 100g
100g * 2个副本 * 3天(日志清除策略时间) / 0.7(预留30%的磁盘空间) ≈ 1t
三台服务器总磁盘容量大于等于1t。
内存选择
kafka内存一般分为2部分,堆内存(启动配置文件中进行配置)和页缓存(操作系统内存)。
- 堆内存,一般建议每个节点设置为10G ~ 15G。
在kafka-server-start.sh中修改 -Xmx -Xms。
堆内存配置查看方式如下:
查看进程号
jps
根据进程号,查看kafka的GC情况
jstat -gc {pid} 10
,主要观测YGC次数。根据进程号,查看kafka的堆内存
jmap -heap {pid}
查看内存使用率,以便及时调整推内存大小。
- 页缓存
segment大小为1G,默认取25%进行页缓存。分区数是平摊到每一个节点上,因此需要先计算总体在进行平均。
计算公式:(分区数 * 1g * 25%) / 3 = 1g
一台服务器内存 10g + 1g
CPU选择
num.io.threads=8,负责写磁盘的线程数,占CPU总核数的50%。
num.replica.fetchers=1,副本拉取线程数,占CPU总核数50%的1/3。
num.network.threads=3,数据传输线程,占CPU总核数50%的2/3。
.....还有其他的线程。
因此,建议选择32个CPU core。将此CPU核数分为24(kafka) + 8(操作系统),
调整上述参数:
num.io.threads=12
num.replica.fetchers=4
num.network.threads=8
网络选择
网络带宽 = 峰值吞吐量 ≈ 20MB/s , 建议选择千兆网卡。
100Mbps单位是bit;10M/s单位是byte(字节);1byte = 8bit;100Mbps/8 = 12.5M/s。
一般百兆的网卡(100Mbps),千兆网卡(1000Mbps),万兆网卡(10000Mbps)