Kafka
# Kafka 的基本概念
分布式流处理平台
提供发布订阅及 Topic 支持
吞吐量高但不保证消息有序
Kafka消费者组是Kafka消费的单位
单个Partition只能由消费者组中某个消费者消费
消费者组中的单个消费者可以消费多个Partition
常见命令
1、启动Kafka
bin/kafka-server-start.sh config/server.properties &
2、停止Kafka
bin/kafka-server-stop.sh
3、创建Topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic jiangzh-topic
4、查看已经创建的Topic信息
bin/kafka-topics.sh --list --zookeeper localhost:2181
5、发送消息
bin/kafka-console-producer.sh --broker-list 192.168.220.128:9092 --topic jiangzh-topic
6、接收消息
bin/kafka-console-consumer.sh --bootstrap-server 192.168.220.128:9092 --topic jiangzh-topic --from-beginning
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 配置
https://www.cnblogs.com/saryli/p/13840672.html
# Kafka客户端操作类型
# AdminClient API:允许管理和检测Topic、broker以及其它Kafka对象
Kafka AdminClient API 和对应的 作用
- AdminClient: AdminClient客户端对象
- NewTopic: 创建Topic
- CreateTopicsResult: 创建Topic的返回结果
- ListTopicsResult: 查询Topic列表
- ListTopicsOptions: 查询Topic列表及选项
- DescribeTopicsResult: 查询Topics
- DescribeConfigsResult: 查询Topics配置项
# Producer API:发布消息到1个或多个topic
# Sample
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.220.128:9092");
properties.put(ProducerConfig.ACKS_CONFIG, "all");
properties.put(ProducerConfig.RETRIES_CONFIG, "0");
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
properties.put(ProducerConfig.LINGER_MS_CONFIG, "1");
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.imooc.jiangzh.kafka.producer.SamplePartition");
// Producer的主对象
Producer<String, String> producer = new KafkaProducer<>(properties);
// 消息对象 - ProducerRecoder
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record =
new ProducerRecord<>(TOPIC_NAME, "key-" + i, "value-" + i);
producer.send(record, (recordMetadata, e) ->
System.out.println("partition : " + recordMetadata.partition() + " , offset : " + recordMetadata.offset()));
}
// 所有的通道打开都需要关闭
producer.close();
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Producer发送模式
- 同步发送
- 异步发送
- 异步回调发送
构建 KafkaProducer
- MetricConfig
- 加载负载均衡器
- 初始化Serializer
- 初始化RecordAccumulator ——类似于计数器
- 启动newSender ——守护线程
KafkaProducer
- Producer是线程安全的
- Producer并不是接到一条发一条
- Producer是批量发送
KafkaProducer send(record) 方法
- 计算分区 —— 消息具体进入哪一个partition
- 计算批次 —— accumulator.append
- 1、创建批次 2、向批次中追加内容
消息传递保障
- 最多一次:收到0到1次
- 至少一次:收到1到多次
- 正好一次:有且仅有一次
acks 配置 producer需要server接收到数据之后发出的确认接收的信号,此项配置就是指procuder需要多少个这样的确认信号。此配置实际上代表了数据备份的可用性。以下设置为常用选项:
(1)acks=0: 设置为0表示producer不需要等待任何确认收到的信息。副本将立即加到socket buffer并认为已经发送。没有任何保障可以保证此种情况下server已经成功接收数据,同时重试配置不会发生作用(因为客户端不知道是否失败)回馈的offset会总是设置为-1;
(2)acks=1: 这意味着至少要等待leader已经成功将数据写入本地log,但是并没有等待所有follower是否成功写入。这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失。
(3)acks=all: 这意味着leader需要等待所有备份都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的保证。
(4)其他的设置,例如acks=2也是可以的,这将需要给定的acks数量,但是这种策略一般很少用。
# Consumer APl:订阅一个或多个topic,并处理产生的消息
# Consumer注意事项
- 单个分区的消息只能由ConsumerGroup
- 中某个Consumer消费
- Consumer从Partition中消费消息是顺序,默认从头开始消费
- 单个ConsumerGroup会消费所有Partition中的消息
# 新成员入组

# 组成员崩溃

# 组成员主动离组

# 提交位移

# Streams API:高效地将输入流转换到输出流
# Kafka Stream基本概念
- Kafka Stream是处理分析存储在Kafka数据的客户端程序库
- Kafka Stream通过state store可以实现高效状态操作
- 支持原语Processor和高层抽象DSL
# sample
public class StreamSample {
private static final String INPUT_TOPIC="jiangzh-stream-in";
private static final String OUT_TOPIC="jiangzh-stream-out";
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.220.128:9092");
props.put(StreamsConfig.APPLICATION_ID_CONFIG,"wordcount-app");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// 如果构建流结构拓扑
final StreamsBuilder builder = new StreamsBuilder();
// 构建Wordcount
wordcountStream(builder);
// 构建foreachStream
foreachStream(builder);
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
}
// 如果定义流计算过程
static void foreachStream(final StreamsBuilder builder){
KStream<String,String> source = builder.stream(INPUT_TOPIC);
source
.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")))
.foreach((key,value)-> System.out.println(key + " : " + value));
}
// 如果定义流计算过程
static void wordcountStream(final StreamsBuilder builder){
// 不断从INPUT_TOPIC上获取新数据,并且追加到流上的一个抽象对象
KStream<String,String> source = builder.stream(INPUT_TOPIC);
// Hello World imooc
// KTable是数据集合的抽象对象
// 算子
final KTable<String, Long> count =
source
.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")))
// 合并 -> 按value值合并
.groupBy((key, value) -> value)
// 统计出现的总数
.count();
// 将结果输入到OUT_TOPIC中
count.toStream().to(OUT_TOPIC, Produced.with(Serdes.String(),Serdes.Long()));
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# Connector API:从一些源系统或应用程序中拉取数据到kafka
# Kafka Connect基本概念
- Kafka Connect是Kafka流式计算的一部分
- Kafka Connect主要用来与其他中间件建立流式通道
- Kafka Connect支持流式和批量处理集成
# Kafka核心概念
- Broker:一般指Kafka的部署节点
- Leader:用于处理消息的接收和消费
- Follower:主要用于备份消息数据
# Kafka集群部署
- Kafka天然支持集群
- Kafka集群依赖于Zookeeper进行协调
- Kafka主要通过brokerld区分不同节点
# Kafka节点故障
- Kafka与zookeeper心跳未保持视为节点故障
- follower消息落后leader太多也视为节点故障
- Kafka会对故障节点进行移除
# Kafka节点故障处理
- Kafka基本不会因为节点故障而丢失数据
- Kafka的语义担保也很大程度上避免数据丢失
- Kafka会对消息进行集群内平衡,减少消息在某些节点热度过高
# Kafka集群之Leader选举
- Kafka并没有采用多数投票来选举leader
- Kafka会动态维护—组Leader数据的副本(ISR)
- Kafka会在ISR中选择一个速度比较快的设为Leader
- Kafka有一种无奈的情况,ISR中副本全部宕机
- 对于上述情况,Kafka会进行unclean leader选举
- Kafka提供了两种不同的选择处理该部分内容
# Leader选举配置建议
- 禁用“unclean leader”选举
- 手动指定最小ISR
# Kafka面试题类型
# Kafka面试题分析
Kafka概念:分布式流处理平台
Kafka特性一:提供发布订阅及Topic支持
Kafka特性二:吞吐量高但不保证消息有序
Kafka消费者组是Kafka消费的单位
单个Partition只能由消费者组中某个消费者消费
消费者组中的单个消费者可以消费多个Partition
# Kafka常见应用场景
- 日志收集或流式系统
- 消息系统
- 用户活动跟踪或运营指标监控
# Kafka与其他消息中间件异同点
todo
# Kafka 吞吐量为什么大? 速度为什么快?
- 日志顺序读写和快速检索
- Partition机制
- 批量发送接收及数据压缩机制
- 通过sendfile实现零拷贝
# Kafka底层原理之日志
- Kafka的日志是以partition为单位存储的
- 日志目录格式为 topic名称+数字
- 日志文件格式是一个 ”日志条目“ 序列
- 每条日志消息由4字节整形与N字节消息组成
message length : 4 bytes (value: 1+4+n) //消息长度 "magic" value : 1 byte //版本号 crc : 4 bytes //CRC校验码 payload : n bytes //具体的消息1
2
3
4
# 日志分段
每个partition的日志会分为N个大小相等的segment中
每个segment中消息数量不一定相等
每个partition支持顺序读写(磁盘io顺序读写不一定比内存慢)

# segment存储结构
- Partition会将消息添加到最后一个 segment 上
- 当segment达到一定阈值会flush到磁盘上(consumer只有flush在磁盘上之后才能读到)
- segment 文件分为两个部分:index文件和 data文件(.log文件)

# 日志读操作
- 首先需要在存储的数据中找出 segment 文件
- 然后通过全局的 offset 计算出 segment 中的 offset
- 通过 index 中的 offset 寻找具体数据内容
# 日志写操作
- 日志允许串行的追加消息到文件最后
- 当日志文件达到阈值则滚动到新 segment 文件上
# Kafka 通过sendfile实现零拷贝原理

- 直接使用Linux的Sendfile(不建议Kafka部署在windows上面)

# Kafka Producer
- 创建producer的时候就会创建一个守护线程(RecordAccumulator)
- 这个守护线程一直轮询把消息发到Kafka


# Kafka 消息有序性处理
- Kafka的特性只支持单 Partition 有序
- 使用Kafka Key+offset可以做到业务有序
- eg:订单系统的id作为 key 加上 offset
# Kafka Topic 删除
- 流程

- 建议
- 建议设置auto.create.topics.enable=false
- 建议设置delete.topic.enable=true