pursue wind pursue wind
首页
Java
Python
数据库
框架
Linux
中间件
前端
计算机基础
DevOps
项目
面试
书
关于
归档
MacOS🤣 (opens new window)
GitHub (opens new window)
首页
Java
Python
数据库
框架
Linux
中间件
前端
计算机基础
DevOps
项目
面试
书
关于
归档
MacOS🤣 (opens new window)
GitHub (opens new window)
  • RabbitMQ

    • RabbitMQ
    • 中间件介绍
    • 消息队列介绍
    • RabbitMQ - 安装
    • RabbitMQ - 简单案例
    • RabbitMQ - 发布确认
    • RabbitMQ - 交换机
    • RabbitMQ - 死信队列
    • RabbitMQ - 延迟队列
    • RabbitMQ - 发布确认高级
    • RabbitMQ - 幂等性、优先级、惰性
  • RocketMQ

  • Kafka
    • 配置
    • Kafka客户端操作类型
      • AdminClient API:允许管理和检测Topic、broker以及其它Kafka对象
      • Producer API:发布消息到1个或多个topic
      • Sample
      • Consumer APl:订阅一个或多个topic,并处理产生的消息
      • Consumer注意事项
      • 新成员入组
      • 组成员崩溃
      • 组成员主动离组
      • 提交位移
      • Streams API:高效地将输入流转换到输出流
      • Kafka Stream基本概念
      • sample
      • Connector API:从一些源系统或应用程序中拉取数据到kafka
      • Kafka Connect基本概念
      • Kafka核心概念
      • Kafka集群部署
      • Kafka节点故障
      • Kafka节点故障处理
      • Kafka集群之Leader选举
      • Leader选举配置建议
      • Kafka面试题类型
      • Kafka面试题分析
      • Kafka常见应用场景
      • Kafka与其他消息中间件异同点
      • Kafka 吞吐量为什么大? 速度为什么快?
      • Kafka底层原理之日志
      • 日志分段
      • segment存储结构
      • 日志读操作
      • 日志写操作
      • Kafka 通过sendfile实现零拷贝原理
      • Kafka Producer
      • Kafka 消息有序性处理
      • Kafka Topic 删除
  • Nexus
  • 中间件
pursuewind
2022-12-14
目录

Kafka

# Kafka 的基本概念

  • 分布式流处理平台

  • 提供发布订阅及 Topic 支持

  • 吞吐量高但不保证消息有序

  • Kafka消费者组是Kafka消费的单位

  • 单个Partition只能由消费者组中某个消费者消费

  • 消费者组中的单个消费者可以消费多个Partition

常见命令

1、启动Kafka
bin/kafka-server-start.sh config/server.properties &

2、停止Kafka
bin/kafka-server-stop.sh

3、创建Topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic jiangzh-topic

4、查看已经创建的Topic信息
bin/kafka-topics.sh --list --zookeeper localhost:2181

5、发送消息
bin/kafka-console-producer.sh --broker-list 192.168.220.128:9092 --topic jiangzh-topic

6、接收消息
bin/kafka-console-consumer.sh --bootstrap-server 192.168.220.128:9092 --topic jiangzh-topic --from-beginning
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# 配置

https://www.cnblogs.com/saryli/p/13840672.html

# Kafka客户端操作类型

# AdminClient API:允许管理和检测Topic、broker以及其它Kafka对象

Kafka AdminClient API 和对应的 作用

  • AdminClient: AdminClient客户端对象
  • NewTopic: 创建Topic
  • CreateTopicsResult: 创建Topic的返回结果
  • ListTopicsResult: 查询Topic列表
  • ListTopicsOptions: 查询Topic列表及选项
  • DescribeTopicsResult: 查询Topics
  • DescribeConfigsResult: 查询Topics配置项

# Producer API:发布消息到1个或多个topic

# Sample

Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.220.128:9092");
properties.put(ProducerConfig.ACKS_CONFIG, "all");
properties.put(ProducerConfig.RETRIES_CONFIG, "0");
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
properties.put(ProducerConfig.LINGER_MS_CONFIG, "1");
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");

properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.imooc.jiangzh.kafka.producer.SamplePartition");

// Producer的主对象
Producer<String, String> producer = new KafkaProducer<>(properties);

// 消息对象 - ProducerRecoder
for (int i = 0; i < 10; i++) {
    ProducerRecord<String, String> record =
            new ProducerRecord<>(TOPIC_NAME, "key-" + i, "value-" + i);

    producer.send(record, (recordMetadata, e) ->
            System.out.println("partition : " + recordMetadata.partition() + " , offset : " + recordMetadata.offset()));
}

// 所有的通道打开都需要关闭
producer.close();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

Producer发送模式

  • 同步发送
  • 异步发送
  • 异步回调发送

构建 KafkaProducer

  • MetricConfig
  • 加载负载均衡器
  • 初始化Serializer
  • 初始化RecordAccumulator ——类似于计数器
  • 启动newSender ——守护线程

KafkaProducer

  • Producer是线程安全的
  • Producer并不是接到一条发一条
  • Producer是批量发送

KafkaProducer send(record) 方法

  • 计算分区 —— 消息具体进入哪一个partition
  • 计算批次 —— accumulator.append
  • 1、创建批次 2、向批次中追加内容

消息传递保障

  • 最多一次:收到0到1次
  • 至少一次:收到1到多次
  • 正好一次:有且仅有一次

acks 配置 producer需要server接收到数据之后发出的确认接收的信号,此项配置就是指procuder需要多少个这样的确认信号。此配置实际上代表了数据备份的可用性。以下设置为常用选项:

(1)acks=0: 设置为0表示producer不需要等待任何确认收到的信息。副本将立即加到socket buffer并认为已经发送。没有任何保障可以保证此种情况下server已经成功接收数据,同时重试配置不会发生作用(因为客户端不知道是否失败)回馈的offset会总是设置为-1;

(2)acks=1: 这意味着至少要等待leader已经成功将数据写入本地log,但是并没有等待所有follower是否成功写入。这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失。

(3)acks=all: 这意味着leader需要等待所有备份都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的保证。

(4)其他的设置,例如acks=2也是可以的,这将需要给定的acks数量,但是这种策略一般很少用。

# Consumer APl:订阅一个或多个topic,并处理产生的消息

# Consumer注意事项

  • 单个分区的消息只能由ConsumerGroup
  • 中某个Consumer消费
  • Consumer从Partition中消费消息是顺序,默认从头开始消费
  • 单个ConsumerGroup会消费所有Partition中的消息

# 新成员入组

# 组成员崩溃

# 组成员主动离组

# 提交位移

![](C:\Users\pursue wind\AppData\Roaming\Typora\typora-user-images\image-20220824145555874.png)

# Streams API:高效地将输入流转换到输出流

# Kafka Stream基本概念

  • Kafka Stream是处理分析存储在Kafka数据的客户端程序库
  • Kafka Stream通过state store可以实现高效状态操作
  • 支持原语Processor和高层抽象DSL

# sample

public class StreamSample {

    private static final String INPUT_TOPIC="jiangzh-stream-in";
    private static final String OUT_TOPIC="jiangzh-stream-out";

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.220.128:9092");
        props.put(StreamsConfig.APPLICATION_ID_CONFIG,"wordcount-app");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        // 如果构建流结构拓扑
        final StreamsBuilder builder = new StreamsBuilder();
        // 构建Wordcount
        wordcountStream(builder);
        // 构建foreachStream
        foreachStream(builder);

        final KafkaStreams streams = new KafkaStreams(builder.build(), props);

        streams.start();
    }

    // 如果定义流计算过程
    static void foreachStream(final StreamsBuilder builder){
        KStream<String,String> source = builder.stream(INPUT_TOPIC);
        source
                .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")))
                .foreach((key,value)-> System.out.println(key + " : " + value));
    }

    // 如果定义流计算过程
    static void wordcountStream(final StreamsBuilder builder){
        // 不断从INPUT_TOPIC上获取新数据,并且追加到流上的一个抽象对象
        KStream<String,String> source = builder.stream(INPUT_TOPIC);
        // Hello World imooc
        // KTable是数据集合的抽象对象
        // 算子
        final KTable<String, Long> count =
                source
                        .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")))
                        // 合并 -> 按value值合并
                        .groupBy((key, value) -> value)
                        // 统计出现的总数
                        .count();

        // 将结果输入到OUT_TOPIC中
        count.toStream().to(OUT_TOPIC, Produced.with(Serdes.String(),Serdes.Long()));
    }

}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53

# Connector API:从一些源系统或应用程序中拉取数据到kafka

# Kafka Connect基本概念

  • Kafka Connect是Kafka流式计算的一部分
  • Kafka Connect主要用来与其他中间件建立流式通道
  • Kafka Connect支持流式和批量处理集成

# Kafka核心概念

  • Broker:一般指Kafka的部署节点
  • Leader:用于处理消息的接收和消费
  • Follower:主要用于备份消息数据

# Kafka集群部署

  • Kafka天然支持集群
  • Kafka集群依赖于Zookeeper进行协调
  • Kafka主要通过brokerld区分不同节点

# Kafka节点故障

  • Kafka与zookeeper心跳未保持视为节点故障
  • follower消息落后leader太多也视为节点故障
  • Kafka会对故障节点进行移除

# Kafka节点故障处理

  • Kafka基本不会因为节点故障而丢失数据
  • Kafka的语义担保也很大程度上避免数据丢失
  • Kafka会对消息进行集群内平衡,减少消息在某些节点热度过高

# Kafka集群之Leader选举

  • Kafka并没有采用多数投票来选举leader
  • Kafka会动态维护—组Leader数据的副本(ISR)
  • Kafka会在ISR中选择一个速度比较快的设为Leader
  • Kafka有一种无奈的情况,ISR中副本全部宕机
  • 对于上述情况,Kafka会进行unclean leader选举
  • Kafka提供了两种不同的选择处理该部分内容

# Leader选举配置建议

  • 禁用“unclean leader”选举
  • 手动指定最小ISR

# Kafka面试题类型

# Kafka面试题分析

  • Kafka概念:分布式流处理平台

  • Kafka特性一:提供发布订阅及Topic支持

  • Kafka特性二:吞吐量高但不保证消息有序

  • Kafka消费者组是Kafka消费的单位

  • 单个Partition只能由消费者组中某个消费者消费

  • 消费者组中的单个消费者可以消费多个Partition

# Kafka常见应用场景

  • 日志收集或流式系统
  • 消息系统
  • 用户活动跟踪或运营指标监控

# Kafka与其他消息中间件异同点

todo

# Kafka 吞吐量为什么大? 速度为什么快?

  • 日志顺序读写和快速检索
  • Partition机制
  • 批量发送接收及数据压缩机制
  • 通过sendfile实现零拷贝

# Kafka底层原理之日志

  • Kafka的日志是以partition为单位存储的
  • 日志目录格式为 topic名称+数字
  • 日志文件格式是一个 ”日志条目“ 序列
  • 每条日志消息由4字节整形与N字节消息组成
    message length : 4 bytes (value: 1+4+n)   //消息长度
          "magic" value : 1 byte                    //版本号
          crc : 4 bytes                             //CRC校验码
          payload : n bytes                         //具体的消息
    
    1
    2
    3
    4

# 日志分段

  • 每个partition的日志会分为N个大小相等的segment中

  • 每个segment中消息数量不一定相等

  • 每个partition支持顺序读写(磁盘io顺序读写不一定比内存慢)

    image-20220825163004870

# segment存储结构

  • Partition会将消息添加到最后一个 segment 上
  • 当segment达到一定阈值会flush到磁盘上(consumer只有flush在磁盘上之后才能读到)
  • segment 文件分为两个部分:index文件和 data文件(.log文件)

image-20220825165711269

# 日志读操作

  • 首先需要在存储的数据中找出 segment 文件
  • 然后通过全局的 offset 计算出 segment 中的 offset
  • 通过 index 中的 offset 寻找具体数据内容

# 日志写操作

  • 日志允许串行的追加消息到文件最后
  • 当日志文件达到阈值则滚动到新 segment 文件上

# Kafka 通过sendfile实现零拷贝原理

image-20220825172021704

  • 直接使用Linux的Sendfile(不建议Kafka部署在windows上面)

image-20220825172207956

# Kafka Producer

  • 创建producer的时候就会创建一个守护线程(RecordAccumulator)
  • 这个守护线程一直轮询把消息发到Kafka

image-20220825173040436

image-20221214164658004

# Kafka 消息有序性处理

  • Kafka的特性只支持单 Partition 有序
  • 使用Kafka Key+offset可以做到业务有序
    • eg:订单系统的id作为 key 加上 offset

# Kafka Topic 删除

  • 流程

image-20220825174324412

  • 建议
    • 建议设置auto.create.topics.enable=false
    • 建议设置delete.topic.enable=true
Last Updated: 2023/02/14, 18:02:00
RocketMQ 4.5.1安装教程
Nexus

← RocketMQ 4.5.1安装教程 Nexus→

Theme by Vdoing | Copyright © 2019-2023 pursue-wind | 粤ICP备2022093130号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式
  • 飙升榜
  • 新歌榜
  • 云音乐民谣榜
  • 美国Billboard榜
  • UK排行榜周榜
  • 网络DJ