Skip to content

Kafka

本系列主要介绍kafka基础知识。kafka是一个开源分布式事件流平台,被用于高性能数据通道、流分析、数据集成和关键任务应用程序。

核心能力

高吞吐量 可扩展 持久化存储 高可用

基础知识

kafka是一种消息队列,主要处理大量数据状态下的消息队列。 kafka教程

安装


安装zookeeper

shell
docker pull wurstmeister/zookeeper

安装kafka

shell
docker pull wurstmeister/zookeeper

使用docker-compose进行安装

yml
version: '1'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 192.168.56.102
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

启动命令

shell
docker-compose -f kafka.yml up -d

概要描述

版本:kafka 3.x

2种模式:点到点、发布订阅

作用:消息队列,流计算,存储

应用场景:解耦,异步通信,流量消峰

概念:

  1. Topic

    对应多个分区,多个副本(leader 副本,follower副本,只针对leader副本进行消费)

  2. Broker Server

    broker 0/1/2

  3. Consumer

    消费者,可以分组消费,某个分区只能有一个消费者进行消费

  4. Producer

    生产者

  5. Zookeeper

    记录谁是leader, kafka2.8.0以后也可以配置不采用ZK,使用kraft

linux脚本:

shell
#!/bin/bash

case $1 in
"start")
    for i in 1 2 3
    do
        ssh $i "xxx"
    done
;;

"stop")

;;
esac

常用命令:

bat
zookeeper-server-start.bat D:/soft/kafka_2.12-2.4.1/config/zookeeper.properties
zookeeper-shell.bat 127.0.0.1:2181

kafka-server-start.bat D:/soft/kafka_2.12-2.4.1/config/server.properties
# 3.x版本
kafka-server-start.bat -daemon D:/soft/kafka_2.12-2.4.1/config/server.properties

./kafka-topics.bat --bootstrap-server localhost:9092 -list
./kafka-topics.bat --bootstrap-server localhost:9092 --create --topic topic_test_20241213 --partitions 3
./kafka-topics.bat --bootstrap-server 127.0.0.1:9092 --list
# 查看当前topic信息
./kafka-topics.bat --bootstrap-server 127.0.0.1:9092 --topic {topic.name} --describe

# 增量消费数据
./kafka-console-producer.bat --topic topic_test_20241213 --broker-list 127.0.0.1:9092
# 从最开始消费数据
./kafka-console-consumer.bat --topic topic_test_20241213 --from-beginning --bootstrap-server 127.0.0.1:9092



./kafka-topics.bat --bootstrap-server localhost:9092 --create --topic demo.topic.20241213
./kafka-console-producer.bat --topic demo.topic.20241213 --broker-list 127.0.0.1:9092
./kafka-console-consumer.bat --topic demo.topic.20241213 --from-beginning --bootstrap-server 127.0.0.1:9092
./kafka-console-consumer.bat --topic demo.topic.20241213 --bootstrap-server 127.0.0.1:9092

集群启动

1zk+3kafka,kafka broker id: [0,1,2]

kafka-server-start.bat D:/soft/kafka_2.12-2.4.1/config/server.properties
kafka-server-start.bat D:/soft/kafka_2.12-2.4.1/config/server-9093.properties
kafka-server-start.bat D:/soft/kafka_2.12-2.4.1/config/server-9094.properties
bat
#创建topic
./kafka-topics.bat --bootstrap-server 127.0.0.1:9092 --create --topic demo.topic --partitions 3 --replication-factor 3
#查看topic
./kafka-topics.bat --bootstrap-server 127.0.0.1:9092 --topic demo.topic --describe

#创建topic
./kafka-topics.bat --bootstrap-server 127.0.0.1:9092 --create --topic demo.topic.2 --partitions 2 --replication-factor 2
#查看topic
./kafka-topics.bat --bootstrap-server 127.0.0.1:9092 --topic demo.topic.2 --describe

#创建topic
./kafka-topics.bat --bootstrap-server 127.0.0.1:9092 --create --topic demo.topic.4 --partitions 4 --replication-factor 2
#查看topic
./kafka-topics.bat --bootstrap-server 127.0.0.1:9092 --topic demo.topic.4 --describe

生产者

生产者发送原理

error.图片加载失败

Producer(主线程) -> send(ProducerRecord) -> Interceptors(拦截器),配合spark/flume使用,不使用原生的拦截器 -> Serializer(序列化器) -> Partitioner(分区器), RecordAccumulator(内存队列中操作,默认32M,多个Dqueue双端队列,每一个Dqueue中的元素ProducerBatch默认16K) -> Sender(读取线程),当Batch.size=16K,读取数据;或者linger.ms设置的时间,单位ms,到达时间就会进行数据读取 -> 发送到kafka集群,以brokerid为key,创建一个队列,最多缓存5个请求 -> selector: 通道,发送数据到kafka集群 -> kafka应答: 0(生产者发送数据不需要等待数据落盘应答);1(生产者发送数据,leader收到数据应答);-1/all(生产者发送数据,leader和isr队列所有节点收到数据后应答) org.apache.kafka.clients.producer.ProducerConfig#ACKS_CONFIG -> 成功:清除数据 -> 失败: 重试,int最大值

java
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

public class KafkaProducerTest {

    public static void main(String[] args) {

        Map<String, Object> configs = new HashMap<>();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");

        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(configs, new StringSerializer(), new StringSerializer());

//        kafkaProducer.send(new ProducerRecord<>("demo.topic","demo"));
        kafkaProducer.send(new ProducerRecord<>("demo.topic.2","demo2"));
        kafkaProducer.send(new ProducerRecord<>("demo.topic.4","demo4"));

        kafkaProducer.close();
    }
}

生产者分区

分区作用

  1. 存储更灵活,可以将大数据进行切分,存储到多个分区上。
  2. 提高并行度,可以进行分组消费。
  3. 分区数量建议最大不能超过Broker Server的3倍,否则会造成分区管理困难。

代码实现

核心类:ProducerRecord.class

大致分为3中实现,具体可看org.apache.kafka.clients.producer.internals.DefaultPartitioner注释

  1. 指定partition

    直接使用指定的partition进行发送数据

  2. 未指定partition,存在key:

    使用key的hash值,和所有的partition数进行取模,得出partition

  3. 未指定partition,不存在key

    使用Sticky Partition(黏性分区),会随机选一个分区,并尽可能使用该分区,直到该分区batch.size=16k,或者linger.ms时间到,再随机一个分区(与上次不同)

自定义分区

  • 实现org.apache.kafka.clients.producer.Partitioner,参考:org.apache.kafka.clients.producer.RoundRobinPartitioner

  • 将自定义实现类注册到KafkaProducer

    java
    //Properties: 
    //org.apache.kafka.clients.producer.ProducerConfig#PARTITIONER_CLASS_CONFIG
    //org.apache.kafka.clients.producer.ProducerConfig#PARTITIONER_CLASS_DOC

生产者吞吐量

  1. 调整批次数据大小

    batch.size=16k~32k ,

    配置信息:org.apache.kafka.clients.producer.ProducerConfig#BATCH_SIZE_DOC

    linger.ms=5~100ms(会造成消息延迟)

    配置信息:org.apache.kafka.clients.producer.ProducerConfig#LINGER_MS_DOC

  2. 压缩数据

    compression.type=snappy

    配置信息:org.apache.kafka.clients.producer.ProducerConfig#COMPRESSION_TYPE_DOC

  3. 调整缓冲区(RecordAccumulator)大小,默认32M

    buffer.memory=64M

    配置信息:org.apache.kafka.clients.producer.ProducerConfig#BUFFER_MEMORY_DOC

生产者可靠消息传递

配置项

acks

java
//org.apache.kafka.clients.producer.ProducerConfig#ACKS_CONFIG
//org.apache.kafka.clients.producer.ProducerConfig#ACKS_DOC

配置详解

  1. acks=0

    丢数,只保证数据发送

  2. acks=1

    丢数,只保证数据发送到leader,并且leader已经接收,如果在同步副本前down机会造成新选举的leader没有数据。

  3. acks=all/-1

    发送数据需leader和ISR队列所有节点都接收完成。

    扩展问题:

    • 如果follower节点故障,导致长时间阻塞

      leader维护了一个动态的in-sync replica set(ISR)队列,意为和leader保持同步的follower+leader集合,如果follower长时间没有向leader发送通讯请求,则会被移除ISR队列。 该时间阈值由replica.lag.time.max.ms参数配置,默认30s。

    • 如果由于节点故障,导致leader和ISR队列节点一致(ISR最小副本数量min.insync.replicas默认为1),此时和acks=1的效果一样,存在丢数的风险,数据可靠性问题

      数据完全可靠条件:(ACK级别设置为-1) + (分区副本大于等于2) + (ISR应答的最小副本数量大于等于2)

总结

acks=0,基本不使用;acks=1,传输普通日志,允许个别丢数;acks=-1,数据可靠性要求比较高。

生产者数据重复消费

acks=-1, leader+ISR队列节点已全部接收完成,此时leader下线,选举新leader,会接收重复数据?

消息传递语义: 至少一次:(ACK级别设置为-1) + (分区副本大于等于2) + (ISR应答的最小副本数量大于等于2),不保证不重复 至多一次:acks=0,不保证数据不丢失

​ [精确一次]:至少一次 + 幂等性

幂等性

配置项:enable.idempotence,默认是true

Producer无论发送多少次重复数据,Broker端只会持久化一条,保证了不重复。

重复数据判断标准

具有<PID,Partition,SeqNumber>相同主键的消息提交时,Broker只会持久化一条。其中PID在kafka重启时都会分配一个新的,Partition表示分区号,Sequence Number是单调自增的。

总结

幂等性只能保证单分区单会话不重复。

生产者事务

解决幂等性缺陷,只能保证单分区单会话保证不重复。

配置项

java
//org.apache.kafka.clients.producer.ProducerConfig#TRANSACTION_TIMEOUT_CONFIG
//org.apache.kafka.clients.producer.ProducerConfig#TRANSACTIONAL_ID_CONFIG

前提条件

开启事务,必须开启幂等性(enable.idempotence)

Producer使用事务功能前,必须先自定义一个唯一的transaction.id。即使客户端挂掉,重启后也能保证继续处理未完成的事务。

底层原理

名词解释 broker(transaction coordinator): 事务协调器 broker(Topic-partition Leader): 消息Topic对应的分区leader broker(transaction_topic): 存储事务相关信息的主题;默认有50个分区,每个分区负责一部分事务。事务划分根据transaction.id的hashcode%50,计算出该事务属于哪个分区。该分区Leader副本所在的broker节点即为这个transaction.id对应的transaction coordinator节点 步骤

  1. kafka producer向broker(transaction coordinator)请求producer id;
  2. broker (transaction coordinator) 返回producer id;
  3. kafka producer向broker (Topic-partition Leader)发送Topic;
  4. kafka producer向broker(transaction coordinator)发送commit请求;
  5. broker(transaction coordinator)持久化commit请求, 事务协调器向事务主题
  6. (transaction_topic,_transaction_state-分区-Leader存储事务信息的特殊主题);
  7. broker(transaction coordinator)返回commit成功;
  8. broker(transaction coordinator)后台发送commit请求到broker (Topic-partition Leader);
  9. broker (Topic-partition Leader)返回成功到broker(transaction coordinator);
  10. broker(transaction coordinator)持久化事务信息,事务协调器向事务主题
  11. (transaction_topic,_transaction_state-分区-Leader存储事务信息的特殊主题);
java
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.HashMap;
import java.util.Map;

public class KafkaProducerTransaction {

    public static void main(String[] args) {

        Map<String, Object> configs = new HashMap<>();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        //开启事务必须幂等性开启+transaction.id
        configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "demo.transaction.id");
        configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(configs, new StringSerializer(), new StringSerializer());

        kafkaProducer.initTransactions();
        kafkaProducer.beginTransaction();

       try {
           for (int i = 0; i < 5; i++) {
               kafkaProducer.send(new ProducerRecord<>("demo.topic.20241213","transaction"+ i));
           }

           int i = 1 / 0;
       }catch (Exception e) {
           kafkaProducer.abortTransaction();
       }finally {
           kafkaProducer.commitTransaction();
       }

        kafkaProducer.close();
    }
}

生产者生产数据有序

多分区,分区与分区间无序(可在消费者端加载所有数据,进行数据排序)。

单分区内,有序,条件如下

配置项:

org.apache.kafka.clients.producer.ProducerConfig#MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION org.apache.kafka.clients.producer.ProducerConfig#MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC

kafka 1.x版本: max.in.flight.requests.per.connection=1(不需要考虑是否开启幂等性),该值是broker缓存的请求数,默认最多是5个。

kafka 1.x版本后:

未开启幂等性> max.in.flight.requests.per.connection=1

开启幂等性> max.in.flight.requests.per.connection <= 5, 启用幂等性后,kafka服务端会缓存producer发来的最近5个request的元数据,[如果顺序不一致,会在服务端进行一次排序],都可保证最近5个request的数据有序。

zookeeper存储信息

可视化工具

prettyzoo

配置文件

多节点部署,[server.proterties]zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182/kafka

此配置是将kafka配置到zookeeper的kafka节点下, /kafka/ /kafka/brokers /kafka/brokers/ids

存储节点信息

/kafka/ /kafka/admin /kafka/brokers ->/kafka/brokers/ids 记录在线brokers信息 ->/kafka/brokers/topics/{topic_name}/partition/{partition_num}/state 记录leader和isr队列信息 /kafka/cluster /kafka/consumers ->kafka0.9版本之前存储offset信息 ->kafka0.9版本之后存储在kafka主题中 /kafka/controller -> 辅助leader选举 /kafka/config /kafka/log_dir_event_notification /kafka/latest_producer_id_block /kafka/lsr_change_notification /kafka/Controller_epoch

kafka Broker

工作流程

名词解释:

AR: kafka分区中所有副本统称

工作流程:

  1. broker启动的时候在zookeeper注册节点/kafka/brokers

  2. /kafka/controller, 哪个broker先注册就为Controller

  3. 由Controller监听brokers节点变化

  4. controller决定leader选举

    选举规则:在isr中存活为前提,按照AR中排在前面的优先。例如:AR[1,0,2],ISR[0,1,2]那么leader就会按照1,0,2顺序轮询
  5. broker leader controller将节点信息上传到zk,/kafka/brokers/topics/{topic_name}/partition/{partition_num}/state

  6. 其他broker controller从zk同步信息,以segement存储到本地硬盘;如果leader下线,就会重新选举,重复controller选举的过程

segement:

log data [segement: 1G, *.log *.index *.timeindex]

kafka 节点服役和退役

新增节点

前提条件:安装jdk,kafka,修改kafka, server.properties: broker.id

执行负载均衡操作:

  1. 创建负载均衡文件,包含要迁移的主题信息topic-to-move.json

    json
    {
       "topics": [
           {
               "topic": "first"
           }
       ],
       "version": 1
    }
  2. 执行脚本

    bat
    bin/kafka-reassign-partitions.bat --bootstrap-server {broker-server} --topic-to-move-json-file {topic-to-move.json} --broker-list "0,1,2,3" --generate

    broker-list: 包含新增broker-server id

  3. 创建副本存储计划,第2步执行的输出内容[proposed partition reassignment configuration]increase-replication-factor.json

  4. 执行脚本

    bat
    bin/kafka-reassign-partitions.bat --bootstrap-server {broker-server} --reassignment-json-file {increase-replication-factor.json} --zookeeper 127.0.0.1:2181 --execute
  5. 验证副本存储计划

    bat
    bin/kafka-reassign-partitions.bat --bootstrap-server {broker-server} --reassignment-json-file {increase-replication-factor.json} --zookeeper 127.0.0.1:2181 --verify

移除节点

前提条件:已存在broker节点

执行负载均衡操作:

  1. 创建负载均衡文件,包含要迁移的主题信息topic-to-move.json

    json
    {
       "topics": [
           {
               "topic": "first"
           }
       ],
       "version": 1
    }
  2. 执行脚本

    bat
    ./kafka-reassign-partitions.bat --bootstrap-server 127.0.0.1:9092 --topics-to-move-json-file topic-to-move.json --broker-list "0,1,2" --zookeeper 127.0.0.1:2181 --generate

    broker-list: 排除要移除的broker-server id

  3. 创建副本存储计划,第2步执行的输出内容[proposed partition reassignment configuration]decrease-replication-factor.json

  4. 执行脚本

    bat
    bin/kafka-reassign-partitions.bat --bootstrap-server {broker-server} --reassignment-json-file {decrease-replication-factor.json} --zookeeper 127.0.0.1:2181 --execute
  5. 验证副本存储计划

    bat
    bin/kafka-reassign-partitions.bat --bootstrap-server {broker-server} --reassignment-json-file {decrease-replication-factor.json} --zookeeper 127.0.0.1:2181 --verify
  6. 停止要移除的kafka broker server

kafka副本

副本基本信息

  1. 副本作用:提高数据可靠性。

  2. 默认副本数量为1个,生产环境一般配置2个,确保数据可靠性。太多会导致增加磁盘空间,增加网络数据传输,降低效率。数量不能超过Broker Server的数量

  3. 副本分为leader和follower,生产者只会将数据发送到leader,follower会从leader上进行同步。

  4. 分区所有的副本统称为AR(Assigned Replicas)。

    AR = ISR + OSR

    ISR: 表示和Leader保持同步的Follower集合。如果Follower长时间未向Leader发送通讯请求或者同步数据,则该Follower被提出ISR。该时间阈值由replica.lag.time.max.ms参数设定,默认30s。Leader发生故障后,会从ISR中选择新的Leader。

    OSR: 表示Follower与Leader副本同步时,延迟过多的副本。

Leader选举流程

同kafka Broker工作流程,主要是以AR副本顺序进行Leader节点的选举。

Leader、Follower故障处理

LEO: Log End Offset,每个副本的最后一个offset,LEO就是最新的offset+1。

HW: High Watermark,所有副本中最小的LEO。

Follower故障处理细节

  1. follower故障会被踢出ISR队列。
  2. leader和其他follower会继续正常接收数据,HW和LEO会继续往前移动。
  3. follower故障恢复后,会从本地的log文件中读取最后一次的HW,将log文件中大于HW的数据清除掉,重新从leader中同步数据。
  4. 直到follower的LEO大于等于Leader的Hw为止,此时会将该follower重新加入到ISR队列中。

Leader故障处理细节

  1. 该leader节点会被踢出ISR队列。
  2. 重新进行leader选举,其他的follower与leader进行数据同步,若follower的LEO大于leader的HW,会将自己的数据清除掉,与leader保持一致,直到follower的LEO大于等于Leader的Hw为止。

总结:leader故障处理,只能保证数据一致性,不能保证数据完整性,会造成数据丢失。

分区副本分配

leader和follower尽量均匀分配到每一台节点上。每一个分区对应的副本分配尽量不一致,保证副本在每个节点尽量平均。

bat
#创建topic
./kafka-topics.bat --bootstrap-server 127.0.0.1:9092 --create --topic demo.topic.4 --partitions 4 --replication-factor 2
#查看topic
./kafka-topics.bat --bootstrap-server 127.0.0.1:9092 --topic demo.topic.4 --describe

error.图片加载失败

上图,节点数为3,分区为4,副本数为2

Replicas: 以节点数为一组,开始为Partition0~3。开始副本为0,1;那么第二组Partition4, 开始副本则为0,2。

生产经验-手动调整分区

windows版本kafka_2.12-2.4.1未调整成功。

bat
#创建topic, 当前broker [0,1,2]
./kafka-topics.bat --bootstrap-server 127.0.0.1:9092 --create --topic demo.topic.self --partitions 3 --replication-factor 2
#查看topic
./kafka-topics.bat --bootstrap-server 127.0.0.1:9092 --topic demo.topic.self --describe

上图,节点数为3,分区为3,副本数为2 。

调整副本全部放到broker.ids[0,1]上。

  1. 创建副本存储计划。increase-replication-factor.json

    如果实际的分区数比节点数多,可能会存在问题。

    ./kafka-topics.bat --bootstrap-server 127.0.0.1:9092 --alter--topic demo.topic.self --partitions 3

    json
    {
       "version": 1,
       "partitions": [
           {
               "topic": "demo.topic.self",
               "partition": 0,
               "replicas": [0] //必须要是Replicas中存在的
           },
           {
               "topic": "demo.topic.self",
               "partition": 1,
               "replicas": [0] //必须要是Replicas中存在的
           },
           {
               "topic": "demo.topic.self",
               "partition": 2,
               "replicas": [1] //必须要是Replicas中存在的
           }
       ]
    }
  2. 执行副本存储计划。

    bat
    ./kafka-reassign-partitions.bat --bootstrap-server 127.0.0.1:9092 --reassignment-json-file increase-replication-factor.json --zookeeper 127.0.0.1:2181 --execute
  3. 验证副本存储计划。

    bat
    ./kafka-reassign-partitions.bat --bootstrap-server 127.0.0.1:9092 --reassignment-json-file increase-replication-factor.json --zookeeper 127.0.0.1:2181 --verify
  4. 查看该topic

    bat
    kafka-topics.bat --bootstrap-server 127.0.0.1:9092 --topic demo.topic.self --describe

Leader Partition自动平衡

正常情况,kafka会自动把leader partition均匀分散在各个机器上,保证各节点读写吞吐均匀。如果有的节点down机,会导致leader partition过于集中在存活的机器上,导致请求压力增高。当down机节点恢复后,会变成follower partation,读写请求很低,容易造成集群负载步均衡。

自动平衡参数:auto.leader.rebalance.enable默认是true

leader.imbalance.per.broker.percentage默认是10%。表示每个broker允许的不平衡的leader比率,如果超过该值,控制器会触发leader的平衡。

leader.imbalance.check.interval.seconds默认是300s。表示leader负载是否平衡的间隔时间。

算法规则:

针对于broker server某一个分区的leader和Replicas中展示首位的顺序不一致,Replicas 优先副本不是 Leader节点,所以不平衡加1,AR的副本数=Replicas数量。

所以broker server 不平衡率1/4>10%,需要再平衡。

Replicas: 表示broker ids

建议不开启自平衡参数。或者调大不平衡比率值,避免频繁进行自动平衡。

增加副本因子

不能通过命令行方式增加topic的replication-factor

可以使用kafka-reassign-partitions脚本,创建存储计划。调整方式同上。

文件存储

文件存储机制

Topic是逻辑概念,而partition是物理概念,每个partition对应一个log文件。partition最小的存储单元是segment,默认大小为1G。

segment组成:

  1. log日志文件,当前segment第一条消息的offset命名
  2. index偏移量索引文件,当前segment第一条消息的offset命名
  3. timeindex时间戳索引文件
  4. 其他文件

数据默认是7天进行清理,由timeindex进行日期判断。

为防止log文件过大导致定位效率低下,kafka采用分片和索引机制,将每个partition分为多个segment,多个segment是追加的方式存储。segment文件夹命名规则:topic名称+分区序号,demo-0

index文件为稀疏索引,大约每往log文件写入4kb数据,会往index文件写入一条索引,offset为相对offset。参数log.index.interval.bytes默认4kb。

查看segment中log和index文件内容:

bat
kafka-run-class.bat kafka.tools.DumpLogSegments --files D:\soft\kafka_2.12-2.4.1\9092\logs\demo-0\00000000000000000000.index

文件清理策略

kafka日志默认保存时间为7天。可以通过如下参数修改保存时间:

  1. log.retention.hours,最低优先级小时,默认168h=7天。
  2. log.retention.minutes,分钟。
  3. log.retention.ms,最高优先级毫秒。
  4. log.retention.check.interval.ms,检查周期,默认5分钟。

日志清理策略由deletecompact两种。

delete日志删除,将过期数据删除

log.cleanup.policy=delete 所有数据启用删除策略,默认启用删除策略

  • 基于时间,默认打开,以segment中所有记录中的最大时间作为该文件时间戳。
  • 基于大小,默认关闭,超过设置的所有日志总大小,删除最早的segment。log.retention.bytes默认等于-1,表示无穷大。

compact日志压缩

log.cleanup.policy=compact

对于相同key,不同的value值只保留最后一个版本。

只适用于特殊场景,比如消息的key为用户ID,value是用户的资料。只保留所有用户最新的数据。

高效读写数据

  1. 分布式集群,采用分区技术,并行度高。

  2. 读数据采用稀疏索引,可以快速定位要消费的数据。

  3. 顺序写磁盘。写入数据的是按照追加的方式进行写入,为顺序写。顺序写600M/S,随机读写100k/s。

  4. 页缓存+零拷贝技术。

    零拷贝:kafka broker不关心存储数据,交由linux内核处理(网卡零拷贝),不用kafka应用层进行数据处理,传输效率高。

    PageCahe页缓存,操作系统底层提供。

消费者

消费方式

  • 拉模式(pull),采用该消费方式,由消费者自己决定消费数据的速率。如果broker没有数据,也会造成一直空转。
  • 推模式(push)

整体工作流程

工作流程

生产者生产数据->分区leader节点->消费者从分区leader节点拉取数据

消费者组内的消费者不能消费同一分区的数据,会造成消息重复消费。

消费者可以消费多分区的数据。

消费者消费消息的位置由offset表示,低版本(<0.9)存储在zk下的/kafka/consumer节点下;现在存储在kafka的特定主题__consumer_offsets

消费者组原理

Consumer Group(CG) :消费者组,由多个consumer组成,形成一个消费者组的条件,是所有消费者的groupid相同。

  • 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内的消费者消费。
  • 消费者组之间互不影响。所有消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

消费者组消费消息:

  • 单消费者组,组内消费者数量小于分区数量

    按顺序分配分区数到消费者

  • 单消费者组,组内消费者数量等于分区数量

    一一对应分配到消费者

  • 单消费者组,组内消费者数量大于分区数量

    先一一对应分配到消费者,剩余的消费者保持空闲。

  • 多消费者组,一组大于分区数量,一组小于分区数量

    组内消费者数量等于分区数量,一一对应分配到消费者

    组内消费者数量小于分区数量,按顺序分配分区数到消费者

消费者组初始化流程:

coordinator: 辅助实现消费者组的初始化和分区分配,存储在每一个broker server。

coordinator节点选择:groupid的hashcode值%50(__consumer_offsets的分区数量)

由于__consumer_offsets默认50分区,会分布在broker server的节点上,根据上一步的取模值,会计算__consumer_offsets对应的分区落到哪个broker server节点上,以此节点的coordinator作为该消费者组的初始化和分配工作。

  1. 消费者组中每一个consumer会向coordinator发送加入消费者组(joinGroup)的请求
  2. coordinator会选举该消费者组的leader
  3. coordinator把要消费的topic情况发送给leader消费者
  4. leader 消费者指定具体的消费方案
  5. leader 消费者将消费方案发送给coordinator
  6. coordinator将消费方案下发给其他的consumer

每个消费者都会和coordinator保持心跳(heartbeat.interval.ms=3s),一旦超时(session.timeout.ms=45s)该消费者就会被移除,并触发再平衡;或者消费者处理消息的时间过长(max.poll.interval.ms=5min),也会触发再平衡。

配置属性:

java
org.apache.kafka.clients.consumer.ConsumerConfig#HEARTBEAT_INTERVAL_MS_CONFIG
org.apache.kafka.clients.consumer.ConsumerConfig#SESSION_TIMEOUT_MS_CONFIG
org.apache.kafka.clients.consumer.ConsumerConfig#MAX_POLL_INTERVAL_MS_CONFIG

消费者组详细消费流程

  1. 消费者先创建网络连接客户端,发送sendFetches请求Kafka broker消费;参数配置如下:

    fetch.min.bytes,每批次最小抓取大小,默认1字节

    org.apache.kafka.clients.consumer.ConsumerConfig#FETCH_MIN_BYTES_CONFIG

    fetch.max.wait.ms,一批数据最小值未达到的超时时间,默认500ms

    org.apache.kafka.clients.consumer.ConsumerConfig#FETCH_MAX_WAIT_MS_CONFIG

    fetch.max.bytes,每批次最大抓取大小,默认50M

    org.apache.kafka.clients.consumer.ConsumerConfig#FETCH_MAX_BYTES_CONFIG

    fetch.min.bytesfetch.max.wait.ms任一满足都会触发拉取。

  2. kafka broker会进行回调,将拉取的数据放到消息队列中。

  3. 消费者会从回调队列中抓取数据,参数配置如下:

    max.poll.records一次拉取数据的最大条数,默认500条

    org.apache.kafka.clients.consumer.ConsumerConfig#MAX_POLL_RECORDS_CONFIG

  4. 消费者抓取数据后,先经过反序列化,再经过拦截器处理,最终得到需要的数据。

kafka broker不进行数据处理,只存储数据,只在生产者和消费者端进行数据处理,可以通过拦截器进行消息处理。

消费者API

前提条件

消费者必须配置group.id,发序列化,broker server等。

订阅主题

java
import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.regex.Pattern;

public class KafkaConsumerTopicTest {

    // kafka-console-producer.bat --broker-list 127.0.0.1:9092 --topic demo
    // kafka-console-consumer.bat --bootstrap-server 127.0.0.1:9092 --topic demo
    public static void main(String[] args) {

        Map<String, Object> config = new HashMap<>();

        //设置boorstrp-server
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");

        //设置group.id, 命令行会随机生成一个
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "topic_consumer");

        //设置配置参数和反序列化
        KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<String, String>(config,
                new StringDeserializer(), new StringDeserializer());

//        kafkaConsumer.subscribe(Lists.asList("demo", new String[]{}));
        kafkaConsumer.subscribe(Pattern.compile("demo"));

        while (true) {
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
            Iterator<ConsumerRecord<String, String>> iterator = consumerRecords.iterator();
            System.out.println("开始拉取数据:" + JSON.toJSONString(consumerRecords));
            while (iterator.hasNext()) {
                System.out.println(iterator.next());
            }
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }

    }
}

订阅分区

生产者按指定分区发送数据

java
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.HashMap;
import java.util.Map;

public class KafkaProducerPartitionTest {

    public static void main(String[] args) {

        Map<String, Object> configs = new HashMap<>();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");

        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(configs, new StringSerializer(), new StringSerializer());


        for (int i = 0; i < 5; i++) {
            //指定分区发送数据
            kafkaProducer.send(new ProducerRecord<>("demo",0, "demo_key" + i,"demo_value" + i));
        }

        kafkaProducer.close();
    }
}

消费者订阅指定分区数据

java
import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

public class KafkaConsumerPartitionTest {

    // kafka-console-consumer.bat --bootstrap-server 127.0.0.1:9092 --topic demo
    public static void main(String[] args) {

        Map<String, Object> config = new HashMap<>();

        //设置boorstrp-server
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");

        //设置group.id, 命令行会随机生成一个
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "topic_consumer");

        //设置配置参数和反序列化
        KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<String, String>(config,
                new StringDeserializer(), new StringDeserializer());

        ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
        topicPartitions.add(new TopicPartition("demo", 0));
        kafkaConsumer.assign(topicPartitions);

        while (true) {
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
            Iterator<ConsumerRecord<String, String>> iterator = consumerRecords.iterator();
            System.out.println("开始拉取数据:" + JSON.toJSONString(consumerRecords));
            while (iterator.hasNext()) {
                System.out.println(iterator.next());
            }
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }

    }
}

消费者组

多个消费者保证同一group.id即可成为一个消费者组。

分区分配,再平衡

分区(消费者组怎样分配消费者消费哪个分区)主流策略:

  • Range
  • RoundRobin
  • Stickly
  • CooperativeSticky

可以通过配置参数partition.assignment.strategy修改分区分配策略。

org.apache.kafka.clients.consumer.ConsumerConfig#PARTITION_ASSIGNMENT_STRATEGY_CONFIG

默认策略是Range+CooperativeSticky, kafka 可以同时使用多个分区分配策略。

org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor

自定义策略,需要实现org.apache.kafka.clients.consumer.ConsumerPartitionAssignor接口。

Range策略

org.apache.kafka.clients.consumer.RangeAssignor

针对每个topic而言,对同一个topic的分区按照序号进行排序,并对消费者按照字母顺序进行排序。

如果分区的topic过多,容易产生数据倾斜,消费消息时过于集中到某一个消费者。

若某一个消费者下线,待超时时间45s到,触发再平衡会将下线消费者的监听分区,全部划分到存活的某一消费者中。

公式:分区数/消费者数;需对分区和消费者进行排序。详情参考RangeAssignordocs。

  • 余数为0,平均分配
  • 余数不为0,先按整除部分分配,将余数按照消费者从上到下进行依次补充,直到补充完整。(exp: 如果余数为1,则只补充道消费者1;余数为2,补充消费者1,2)

RoundRobin

org.apache.kafka.clients.consumer.RoundRobinAssignor

针对多有topic而言,partition和consumer进行hashcode排序,最后轮询分配到consumer。

若某一个消费者下线,待超时时间45s到,触发再平衡会将下线消费者的监听分区,追加轮询到存活消费者中,存存活消费者已分配的分区不变。之后再有消息传输,会重新进行轮询计算,按照存活消费者进行轮询。

Stickly

执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配变动,节省大量的开销。

org.apache.kafka.clients.consumer.StickyAssignor

不用排序随机分配,尽可能保持平均分配。

若某一个消费者下线,待超时时间45s到,触发再平衡会将下线消费者的监听分区,追加平均分配到存活消费者中,存存活消费者已分配的分区不变。之后再有消息传输,消费者会保证之前的分配策略,将新的分区平均分配到存活的消费者,按照存活消费者进行轮询。

CooperativeStickly

org.apache.kafka.clients.consumer.CooperativeStickyAssignor

合作者粘性策略,需要根据版本的描述信息进行使用,2.4.1版本如下:

error.图片加载失败

offset位移

Offset的默认维护位置

0.9x之后维护在系统主题里,之前维护再zk中。

__consumer_offsets主题采用key和value的方式存储数据,key是group.id+topic+分区号,value是当前的offset值。每隔一段时间,kafka内部就会对这个topic进行compact,因此每个key对应的是最新的value值。

消费offset案例:

通过参数配置该topic可见,exclude.internal.topics默认为false。将该值改为true。

org.apache.kafka.clients.consumer.ConsumerConfig#EXCLUDE_INTERNAL_TOPICS_CONFIG

通过往topic中写入数据,查看__consumer_offsets主题的内容。

自动提交offset

kafka提供了自动提交offset功能,为了使用户只关注于业务逻辑的处理

自动提交offset相关参数:

enable.auto.commit是否开启自动提交offset功能,默认是true。

auto.commit.interval.ms自动提交offset的时间间隔,默认是5s。

org.apache.kafka.clients.consumer.ConsumerConfig#ENABLE_AUTO_COMMIT_CONFIG

org.apache.kafka.clients.consumer.ConsumerConfig#AUTO_COMMIT_INTERVAL_MS_CONFIG

producer生产数据到分区,consumer组的消费者不断从分区从消费数据,offset的维护由consumer每5s进行触发一次,与消费数据相互隔离。

手动提交offset

手动提交分为2种方式:同步提交、异步提交

同步提交:commitSync,提交最新offset值时,会阻塞cosumer消费数据,必须等offset提交完毕,才能消费下一批数据。

异步提交:commitAsync,提交最新offset值时,不会阻塞consumer消费,提交完后就开始消费下一批数据了。

java
import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.regex.Pattern;

public class KafkaConsumerOffsetTest {

    // kafka-console-producer.bat --broker-list 127.0.0.1:9092 --topic demo
    // kafka-console-consumer.bat --bootstrap-server 127.0.0.1:9092 --topic demo
    public static void main(String[] args) {

        Map<String, Object> config = new HashMap<>();

        //设置boorstrp-server
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");

        //设置group.id, 命令行会随机生成一个
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "topic_consumer");

        //设置offset是否自动提交
        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        //offset自动提交间隔时间
        config.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);

        //设置配置参数和反序列化
        KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<String, String>(config,
                new StringDeserializer(), new StringDeserializer());

//        kafkaConsumer.subscribe(Lists.asList("demo", new String[]{}));
        kafkaConsumer.subscribe(Pattern.compile("demo"));

        while (true) {
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
            Iterator<ConsumerRecord<String, String>> iterator = consumerRecords.iterator();
            System.out.println("开始拉取数据:" + JSON.toJSONString(consumerRecords));
            while (iterator.hasNext()) {
                ConsumerRecord<String, String> next = iterator.next();
                System.out.println(next);

            }
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }

            //同步提交
            kafkaConsumer.commitSync();
            //异步提交
//            kafkaConsumer.commitAsync();

        }

    }
}

指定offset消费

auto.offset.reset=earlist | latest | none, 默认是latest

  • earlist: 自动将偏移量重置为最早的偏移量,类似命令:--from-beginning
  • latest: 默认值,自动将偏移量置为最新的偏移量
  • none: 如果未找到消费者组的偏移量,则向消费者排除异常

org.apache.kafka.clients.consumer.ConsumerConfig#AUTO_OFFSET_RESET_CONFIG

java
import ch.qos.logback.classic.Level;
import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;

public class KafkaConsumerOffsetSeekTest {

    // kafka-console-producer.bat --broker-list 127.0.0.1:9092 --topic demo
    // kafka-console-consumer.bat --bootstrap-server 127.0.0.1:9092 --topic demo
    public static void main(String[] args) throws InterruptedException {

        //org.apache.logging.log4j.simple.SimpleLogger.SimpleLogger
//        System.setProperty("org.apache.logging.log4j.simplelog.ROOT.level", "error");

        //配置kafka使用的日志等级,不打印info、debug等日志信息
        ch.qos.logback.classic.Logger logger1 = (ch.qos.logback.classic.Logger) LoggerFactory.getLogger("org.apache.kafka");
        logger1.setLevel(Level.ERROR);

        Map<String, Object> config = new HashMap<>();

        //设置boorstrp-server
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");

        //设置group.id, 命令行会随机生成一个
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "topic_consumer_seek_10");

        //设置配置参数和反序列化
        KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<String, String>(config,
                new StringDeserializer(), new StringDeserializer());

        kafkaConsumer.subscribe(Pattern.compile("demo"));

        //进行分区信息的获取
        Set<TopicPartition> assignment = kafkaConsumer.assignment();

        //此处初始化的时候会有coordinator进行消费者leader选举,因此需要再次阻塞,直到获取到值
        while (assignment.isEmpty()) {
            // 辅助进行leader选举
            kafkaConsumer.poll(Duration.ofSeconds(1));

            assignment = kafkaConsumer.assignment();
        }

        for (TopicPartition topicPartition : assignment) {
            System.out.println("topicPartition: " + topicPartition);
            kafkaConsumer.seek(topicPartition, 10);
        }

        while (true) {
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
            Iterator<ConsumerRecord<String, String>> iterator = consumerRecords.iterator();
            System.out.println("开始拉取数据:" + JSON.toJSONString(consumerRecords));
            while (iterator.hasNext()) {
                ConsumerRecord<String, String> next = iterator.next();
                System.out.println(next);

            }
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }

        }

    }
}

指定时间消费

中转的方式实现以指定时间进行消费,将分区和时间参数传递,使用api查询分区下,该时间对应的offset信息。

注意:该时间在分区下必须存在数据,否则转换结果为空。

java
import ch.qos.logback.classic.Level;
import cn.hutool.core.lang.Console;
import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;

public class KafkaConsumerOffsetSeekTimeStampTest {

    // kafka-console-producer.bat --broker-list 127.0.0.1:9092 --topic demo
    // kafka-console-consumer.bat --bootstrap-server 127.0.0.1:9092 --topic demo
    public static void main(String[] args) throws InterruptedException {

        //org.apache.logging.log4j.simple.SimpleLogger.SimpleLogger
//        System.setProperty("org.apache.logging.log4j.simplelog.ROOT.level", "error");

        //配置kafka使用的日志等级,不打印info、debug等日志信息
        ch.qos.logback.classic.Logger logger1 = (ch.qos.logback.classic.Logger) LoggerFactory.getLogger("org.apache.kafka");
        logger1.setLevel(Level.ERROR);

        Map<String, Object> config = new HashMap<>();

        //设置boorstrp-server
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");

        //设置group.id, 命令行会随机生成一个
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "topic_consumer_seek_time");

        //设置配置参数和反序列化
        KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<String, String>(config,
                new StringDeserializer(), new StringDeserializer());

        kafkaConsumer.subscribe(Pattern.compile("demo"));

        //进行分区信息的获取
        Set<TopicPartition> assignment = kafkaConsumer.assignment();

        //此处初始化的时候会有coordinator进行消费者leader选举,因此需要再次阻塞,直到获取到值
        while (assignment.isEmpty()) {
            // 辅助进行leader选举
            kafkaConsumer.poll(Duration.ofSeconds(1));

            assignment = kafkaConsumer.assignment();
        }

        //构造要获取的时间段
        Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
        for (TopicPartition topicPartition : assignment) {
            //获取前一天
            LocalDateTime localDateTime = LocalDateTime.now().plusDays(-2);
            long dateTimeLong = localDateTime.toEpochSecond(ZoneOffset.UTC) * 1000;
            Console.error("datetime: {}", dateTimeLong);
//            timestampsToSearch.put(topicPartition, dateTimeLong);
            //1737453269000
            //1737444973545l
//            timestampsToSearch.put(topicPartition, dateTimeLong);
            timestampsToSearch.put(topicPartition, 1737444973545l);
        }


        Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap = kafkaConsumer.offsetsForTimes(timestampsToSearch);

        for (TopicPartition topicPartition : assignment) {
            System.out.println("topicPartition: " + topicPartition);
            //此处可以进行鲁棒性处理,将异常信息捕获
            OffsetAndTimestamp offsetAndTimestamp = topicPartitionOffsetAndTimestampMap.get(topicPartition);
            Console.error("offset: {}", offsetAndTimestamp.offset());
            kafkaConsumer.seek(topicPartition, offsetAndTimestamp.offset());
        }

        while (true) {
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
            Iterator<ConsumerRecord<String, String>> iterator = consumerRecords.iterator();
            System.out.println("开始拉取数据:" + JSON.toJSONString(consumerRecords));
            while (iterator.hasNext()) {
                ConsumerRecord<String, String> next = iterator.next();
                System.out.println(next);

            }
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }

        }

    }
}

漏消费和重复消费

重复消费

自动offset,已消费了数据,由于异常原因,导致offset没有提交。

漏消费

手动offset,已提交offset,数据未消费完成。可能是异步提交offset,但是消费者消费时由于异常因素导致未正常消费消息。

解决重复消费和漏消费,需采用消费者事务

消费者事务

消费者精确一次性消费,需要kafka消费端将消费过程和offset过程做原子绑定,如果存在下游消费者,也必须支持事务。

数据积压(消费者吞吐量)

  1. 增加Topic的分区数量,配套增加消费者数量,尽量保持消费者数=分区数。

  2. 提高每批次拉取的数据条数,默认是500条。一次拉取的数据过少(拉取条数/处理时间 < 生产速度)也会造成消息积压。配套调整参数,一次最大拉取的消息大小50M。

    max.poll.records一次拉取数据的最大条数,默认500条

    fetch.max.bytes每批次最大抓取大小,默认50M

kafka-eagle监控

Mysql安装

todo...

Kafka-eagle安装

todo...

kafka-kraft模式

安装

kraft支持节点共用,既可以是broker,也可以是controller。

单安装kafka就可以启动,不需要依赖zookeeper。

启动

修改conf/kraft/server.properties种的配置:

node.id,controller.quorum.voters(node.id+主机:端口),advertised.listerners,log.dirs

初始化命令:

  1. 生成唯一编号
shell
/bin/kafka-storage.sh random-uuid
  1. 使用该id格式化kafka存储目录
shell
/bin/kafka-storage.sh format -t {uuid:A7B5c56D5d123235} -c {/usr/local/kafka/conf/kraft/server.properties}
  1. 启动kafka
shell
/bin/kafka-server.start.sh -daemon conf/kraft/server.properties

外部系统集成

Flume

高可用、高可靠的分布式海量日志采集、聚合和传输系统,支持在日志系统中定制各类数据发送方,用于收集数据集;同时,也提供了对数据的简单处理,并写到各种数据接收方。

安装

安装hadoop,flume

可能出现的问题

flume/lib下的guava-*.jar 可能会与handoop下的jar包冲突,建议删除该路径下jar,并配置handoop的环境变量。

生产者

监听linux本地文件->flume tailddirresource -> memory channel -> kafka sink -> kafka broker -> consumer

配置flume配置文件:jobs/file-to-kafka.conf

消费者

kafka producer生产数据 -> kafka broker -> flume kafkasource -> memory channel -> logger sink

配置flume配置文件:jobs/kafka-to-file.conf

流式处理框架,能够实现低延迟的实时数据处理。与Spark相比,Flink更专注于流处理,可以提供更好的事件处理和状态管理。并且还支持批处理任务。

安装

导入依赖jar包。

flink-java, flink-streaming-.jar, flink-clinets-.jar, flink-connector-kafka-*.jar,

生产者

java-api操作

消费者

java-api操作

Spark

快速、通用的大数据处理框架,拥有比Hadoop更好的性能和更广泛的应用领域。支持多种数据处理模式,如批处理、流处理、机器学习等。

安装

先进行jdk-8、scala-2.12.11的安装,并配置环境变量

生产者

导入依赖:

xml
<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>

需要使用scala去操作。

消费者

导入依赖:

xml
<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>
<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>
<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>

需要使用scala去操作。

Springboot

配置文件

pom.xml

xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>kafka</artifactId>
    <version>1.0-SNAPSHOT</version>

    <!--
    http://190.2.37.136:8081/#browse/search=keyword%3Dorg.springframework.boot%20AND%20version%3D2.6.14
    -->
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.10.RELEASE</version>
        <relativePath/>
    </parent>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <junit.version>4.12</junit.version>
        <log4j.version>1.2.17</log4j.version>
        <lombok.version>1.16.18</lombok.version>
        <jedis.version>4.3.1</jedis.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-aop</artifactId>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>${junit.version}</version>
        </dependency>

        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>${log4j.version}</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>${lombok.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>2.4.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.4.1</version>
        </dependency>
        <!--<dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>3.1.2</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.kafka</groupId>
                    <artifactId>kafka-clients</artifactId>
                </exclusion>
            </exclusions>
        </dependency>-->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.8.10</version>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.83</version>
        </dependency>

        <!--swagger2-->
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.9.2</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>2.9.2</version>
        </dependency>

    </dependencies>

    <repositories>
        <repository>
            <id>nfh-maven-public</id>
            <name>nfh-maven-public</name>
            <url>http://190.2.37.136:8081/repository/maven-public/</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
    </repositories>



</project>

application.yml

yml
server:
  port: 7777

spring:
  application:
    name: kafka
  swagger2:
    enabled: true
  mvc:
    ## org.springframework.boot.autoconfigure.web.servlet.WebMvcProperties.Pathmatch.matchingStrategy: springboot 2.6x 将pathmatch 默认值调整
    pathmatch:
      matching-strategy: ANT_PATH_MATCHER
  jackson:
    serialization:
      FAIL_ON_EMPTY_BEANS: false

logging:
  level:
    root: info
    org.example: info
#  pattern:
#    console: %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level &logger- %msg%n
#    file: %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level &logger- %msg%n
  file:
    name: test.log


  kafka:
    bootstrapServers:
      -localhost:9092
    properties:
      # 默认值:org.springframework.boot.autoconfigure.kafka.KafkaProperties.Producer#keySerializer
      key.serializer: org.apache.kafka.common.serialization.StringSerializer
      value.serializer: org.apache.kafka.common.serialization.StringSerializer
      key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value.deserializer: org.apache.kafka.common.serialization.StringDeserializer

启动类:

java
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Main {
    public static void main(String[] args) {
        SpringApplication.run(Main.class, args);
    }
}

配置类:

java
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;

@EnableSwagger2
@Configuration
public class Swagger2Config {

    @Value("${spring.swagger2.enabled}")
    private Boolean enabled;

    @Bean
    public Docket createRestApi() {
        return new Docket(DocumentationType.SWAGGER_2)
                .apiInfo(apiInfo())
                .enable(enabled)
                .select()
                .apis(RequestHandlerSelectors.basePackage("org.example"))
                .paths(PathSelectors.any())
                .build();
    }

    private ApiInfo apiInfo() {
        return new ApiInfoBuilder().build();
    }
}

生产者

java
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.ExecutionException;

//http://localhost:7777/swagger-ui.html
//http://localhost:7777/swagger-ui.html
@Api(tags = "测试消息发送")
@RestController
public class KafkaDemoController {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @ApiOperation("test")
    @GetMapping(path = "/testKafka")
    public Object test(String para) throws ExecutionException, InterruptedException {
        ListenableFuture<SendResult<String, String>> helloKafka = kafkaTemplate.send("demo.topic.20241213", para);
        SendResult<String, String> result = helloKafka.get();
        return result;
    }
}

消费者

java
import org.springframework.kafka.annotation.KafkaHandler;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
@KafkaListener(groupId = "demo.groupid", topics = "demo.topic.20241213")
public class KafkaDemoListener {

    @KafkaHandler
    public void handleListener(String message, @Headers Map<String, Object> map) {
        System.out.println("message: " + message);
        System.out.println("Headers: " + map);
    }
}

生产调优

硬件选择

例:100万日活人数,每人生产日志100条,每日数据量大概 1亿条。

处理日志速度:1亿条 / (24 * 3600s) ≈ 1150/s

1条日志存储:0.5k ~ 2k , 按 1k计算,1150条 * 1k ≈ 1M/s

高峰值大约是平均值的20倍左右:1m/s * 20倍 = 20m/s ~ 40m/s(1条日志存储最大值计算)

服务器台数

经验公式:

台数 = 2 * (生产者峰值生产速率 * 副本数 / 100) + 1,副本数一般设置为2。

代入公式:2 * (20m/s * 2 / 100 )+ 1 = 3台

磁盘选择

kafka按照顺序读写,对磁盘要求不高,机械硬盘和固态硬盘顺序读写速度差不多,因此选择机械硬盘。(固态硬盘对随机读写效率高)

1亿条 * 1k ≈ 100g

100g * 2个副本 * 3天(日志清除策略时间) / 0.7(预留30%的磁盘空间) ≈ 1t

三台服务器总磁盘容量大于等于1t。

内存选择

kafka内存一般分为2部分,堆内存(启动配置文件中进行配置)和页缓存(操作系统内存)。

  1. 堆内存,一般建议每个节点设置为10G ~ 15G。

在kafka-server-start.sh中修改 -Xmx -Xms。

堆内存配置查看方式如下:

  • 查看进程号 jps

  • 根据进程号,查看kafka的GC情况jstat -gc {pid} 10,主要观测YGC次数。

  • 根据进程号,查看kafka的堆内存jmap -heap {pid}查看内存使用率,以便及时调整推内存大小。

  1. 页缓存

segment大小为1G,默认取25%进行页缓存。分区数是平摊到每一个节点上,因此需要先计算总体在进行平均。

计算公式:(分区数 * 1g * 25%) / 3 = 1g

一台服务器内存 10g + 1g

CPU选择

num.io.threads=8,负责写磁盘的线程数,占CPU总核数的50%。

num.replica.fetchers=1,副本拉取线程数,占CPU总核数50%的1/3。

num.network.threads=3,数据传输线程,占CPU总核数50%的2/3。

.....还有其他的线程。

因此,建议选择32个CPU core。将此CPU核数分为24(kafka) + 8(操作系统),

调整上述参数:

num.io.threads=12

num.replica.fetchers=4

num.network.threads=8

网络选择

网络带宽 = 峰值吞吐量 ≈ 20MB/s , 建议选择千兆网卡

100Mbps单位是bit;10M/s单位是byte(字节);1byte = 8bit;100Mbps/8 = 12.5M/s。

一般百兆的网卡(100Mbps),千兆网卡(1000Mbps),万兆网卡(10000Mbps)

Released under the MIT License.