【KafKa】核心原理

一、为什么使用消息中间件(MQ)

异步调用
  • 同步变异步
应用解耦、可拓展性
  • 提供基于数据的接口层
流量削峰
  • 缓解瞬时高流量压力(双十一)
可恢复性
顺序保障

 

 

二、常见消息中间件

ActiveMQ
RabbitMQ
RocketMQ
Kafka
Redis
……

三、消息中间件中的术语

Broker:消息服务器,提供核心服务
Producer:消息生产者
Consumer:消息消费者
Topic:主题,发布订阅模式下的消息统一汇集地,逻辑上的存储单元
Queue:队列,P2P模式下的消息队列

 

 

四、消息中间件工作模式

点对点模式(P2P)
  • 一对一,消费者主动拉取数据
发布订阅模式(Pub/Sub)
  • 一对多,数据生产后,推送给所有订阅者

 

 

五、Apache Kafka

基于scala语言编写
Kafka是一种高吞吐量的分布式发布-订阅消息系统,专为超高吞吐量的实时日志采集、实时数据同步、实时数据计算等场景来设计
  • 快速,单Broker每秒几百MB读取
  • 不停机扩展集群
  • 消息副本冗余
  • 实时数据管道
使用Scala编写

 

六、Kafka安装

1. 下载解压
https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/
2. 修改server.properties中的
vi /opt/install/kafka/config/server.properties log.dirs=/var/kafka/kafka-logs zookeeper.connect=nodethree:2181,nodefour:2181,nodetwo:2181
3. 分发到各个节点,修改
vi /opt/install/kafka/config/server.properties broker.id=0 broker.id=1 broker.id=2
4. 编写启动脚本
cd /opt/install/kafka vi start-kafka.sh nohup bin/kafka-server-start.sh config/server.properties > kafka.log 2>&1 &
5. 创建topic
bin/kafka-topics.sh –zookeeper nodethree:2181,nodefour:2181,nodetwo:2181 –create –topic hydra –partitions 1 -replication-factor 1
6. 显示topic
bin/kafka-topics.sh –zookeeper nodethree:2181,nodefour:2181,nodetwo:2181 –list
7. 详细显示topic
bin/kafka-topics.sh –zookeeper nodethree:2181,nodefour:2181,nodetwo:2181 –describe –topic hydra
8. 启动生产者
bin/kafka-console-producer.sh –broker-list nodethree:9092,nodefour:9092,nodetwo:9092 –topic hydra
9. 启动消费者
bin/kafka-console-consumer.sh –zookeeper nodethree:2181,nodefour:2181,nodetwo:2181 –topic hydra –from-beginning
10. 删除Topic
//关闭集群 /root/script/hdfs-ha.close //修改kafka配置 vi config/server.properties delete.topic.enable=true //指令删除topic bin/kafka-topics.sh –delete –zookeeper nodethree:2181,nodefour:2181,nodetwo:2181 –topic hydra //删除kafka存储目录 rm -rf /var/kafka/kafka-logs/hydra rm -rf /var/kafka/kafka-logs/hydra rm -rf /var/flume/checkpoint/hydra rm -rf /var/flume/data/hydra rm -rf /events/input/intra/hydra //删除zk下的服务名 bin/zkServer.sh start bin/zkCli.sh -server nodetwo:2181 rmr /brokers/topics/hydra rmr /admin/delete_topics/hydra rmr /consumers/hydra rmr /config/topics/hydra //检查topic数量 bin/kafka-topics.sh –zookeeper nodethree:2181,nodefour:2181,nodetwo:2181 –list

 

 

七、练习1:完成Kafka安装

需求说明
  1. 完成Kafka单节点安装
  2. 使用kafka-topics.sh创建并查看主题详情
  3. 使用kafka-console-producer.sh生产任意消息
  4. 使用kafka-console-consumer.sh消费上一步生产的消息

 

八、Kafka架构

Broker:Kafka集群中的服务器
Topic:维护一个主题中的消息,可视为消息分类
Producer:向Kafka主题发布(生产)消息
Consumer:订阅(消费)主题并处理消息
Log:每个分区维护一个日志,每个分区内有序
Retention Period:消息停留时间
解耦,同步调用,如果某一端有代码调整,所有流程要重新测试

 

九、Kafka Topic

Topic
  • 主题是已发布消息的类别名称
  • 发布和订阅数据必须指定主题
  • 主题副本数量不大于Brokers个数
Partition(提高并发)
  • 一个主题包含多个分区,默认按Key Hash分区
  • 每个Partition对应一个文件夹<topic_name>-<partition_id>
  • 每个Partition被视为一个有序的日志文件(LogSegment)
  • Replication策略是基于Partition,而不是Topic
  • 每个Partition都有一个Leader,0或多个Followers
查看topic详细信息
bin/kafka-topics.sh –describe –zookeeper nodetwo:2181 –topic hydra
leader+follower=relicas

十、练习2:查看Kafka底层存储结构

需求说明
  • 确认server.properties中的“log.dirs”属性
  • 打开“log.dirs”对应目录
    • 找出你创建的所有主题
    • 找出系统内置的主题
    • 查看任意主题分区目录包含哪些文件

 

十一、Kafka Message

header:消息头,固定长度
  • offset:唯一确定每条消息在分区内的位置
  • CRC32:用crc32校验消息
  • “magic”:表示本次发布Kafka服务程序协议版本号
  • “attributes”:表示为独立版本、或标识压缩类型、或编码类型
body:消息体
  • key:表示消息键,可选
  • value bytes payload:表示实际消息数据

十二、Kafka Producer

生产者将消息写入到Broker
  • Producer直接发送消息到Broker上的Leader Partition
  • Producer客户端自己控制着消息被推送到哪些Partition
    • 随机分配、自定义分区算法等
  • Batch推送提高效率

十三、Kafka Broker

Kafka集群中每个Broker都可以响应Producer的请求
  • 哪些Broker是存活的?
  • Topic的Leader Partition在哪?
每个Broker充当Leader和Followers保持负载平衡
  • Leader处理所有读写请求
  • Followers被动复制Leader

 

 

十四、Kafka Consumer

消费者通过订阅消费消息
  • offset的管理是基于消费组(group.id)的级别
  • 每个Partition只能由同一消费组内的一个Consumer来消费
  • 每个Consumer可以消费多个分区
  • 消费过的数据仍会保留在Kafka中
  • 消费者不能超过分区数量
消费模式
  • 队列:所有消费者在一个消费组内
  • 发布/订阅:所有消费者被分配到不同的消费组

 

 

十五、Kafka数据流

副本同步
  • ISR(In-Sync Replica)
容灾
  • Leader Partition
高并发
  • 读写性能
  • Consumer Group
负载均衡
数据不丢失(ack机制)

十六、ZooKeeper在Kafka中的作用

Broker注册并监控状态
  • /brokers/ids

 

Topic注册
  • /brokers/topics
生产者负载均衡
  • 每个Broker启动时,都会完成Broker注册过程,生产者会通过该节点的变化来动态地感知到Broker服务器列表的变更
offset维护
  • Kafka早期版本使用ZooKeeper为每个消费者存储offset,由于ZooKeeper写入性能较差,从0.10版本后,Kafka使用自己的内部主题维护offset

 

 

十七、Kafka API

四种API:
  • Producer API
  • Consumer API
  • Streaming API
    • 从一个topic读取数据,经过处理转换,送到另一个topic
  • Connector API

 

十八、Kafka Producer API

关键类
  • KafkaProducer
  • ProducerRecord
依赖
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.3.0</version> </dependency>
代码
public class Producer { public static void main(String[] args) { Properties props = new Properties(); props.put(“bootstrap.servers”, “nodetwo:9092,nodethree:9092,nodefour:9092”); props.put(“acks”, “all”); props.put(“key.serializer”, “org.apache.kafka.common.serialization.StringSerializer”); props.put(“value.serializer”, “org.apache.kafka.common.serialization.StringSerializer”); KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props); for (int i = 0; i < 100; i++) { producer.send(new ProducerRecord<String, String>(“topic1”, Integer.toString(i), “dd:” + i)); } producer.close(); } }
消费
/opt/install/kafka/bin/kafka-console-consumer.sh –zookeeper nodethree:2181,nodefour:2181,nodetwo:2181 –topic hydra –from-beginning
配置项
参数名称
说明
默认值
bootstrap.servers
kafka集群的broker-list
acks
确保生产者可靠性设置
acks=0:不等待成功返回
acks=1:等Leader写成功返回
acks=all:等Leader和所有ISR中的Follower写成功返回,all也可以用-1代替
-1
key.serializer
key的序列化器
value.serializer
value的序列化器
retries
发送失败尝试重发次数
0
batch.size
每个partition的未发送消息大小
16384
partitioner.class
分区类,可以自定义分区类,实现partitioner接口
默认是哈希值%partitions
max.block.ms
最大阻塞时长
60000

 

 

十九、练习3:使用API生产消息

需求说明
  • 创建主题“my-topic”
  • 使用Producer API向主题“my-topic”发送消息“hello world”
  • 使用kafka-console-consumer.sh查看“my-topic”中的消息

 

 

二十、Kafka Consumer API

KafkaConsumer
ConsumerRecords
Properties props = new Properties(); props.put(“bootstrap.servers”, “node01:9092,node02:9092,node03:9092”); props.put(“group.id”, “testGroup1”); props.put(“enable.auto.commit”, “true”);//默认值true props.put(“auto.commit.interval.ms”, “1000”);//默认值5000 props.put(“key.deserializer”,”org.apache.kafka.common.serialization.StringDeserializer”); props.put(“value.deserializer”,”org.apache.kafka.common.serialization.StringDeserializer”); KafkaConsumer<String, String> consumer = new KafkaConsumer(props); consumer.subscribe(Arrays.asList(“20190626”)); while (true) { ConsumerRecords<String, String> records = consumer.poll(1000); for (ConsumerRecord<String, String> record : records) System.out.printf(“partition=%d, offset = %d, key = %s, value = %s%n”, record.partition(),record.offset(), record.key(), record.value()); }
手动提交Offset
Properties props = new Properties(); props.put(“bootstrap.servers”, “node01:9092,node02:9092,node03:9092”); props.put(“group.id”, “testGroup1”); props.put(“enable.auto.commit”, “false”); props.put(“key.deserializer”,”org.apache.kafka.common.serialization.StringDeserializer”); props.put(“value.deserializer”,”org.apache.kafka.common.serialization.StringDeserializer”); KafkaConsumer<String, String> consumer = new KafkaConsumer(props); consumer.subscribe(Arrays.asList(“20190626”)); List<ConsumerRecord<String, String>> buffer = new ArrayList(); while (true) { ConsumerRecords<String, String> records = consumer.poll(1000); for (ConsumerRecord<String, String> record : records) { buffer.add(record); } if(buffer.size()>5){//此处可进行业务逻辑处理,如保存数据库 consumer.commitAsync(); //异步调用,非阻塞方式 buffer.clear(); }}
配置参数
参数名称
说明
默认值
bootstrap.servers
kafka集群的broker-list
group.id
用于表示该consumer想要加入到哪个group中
“”
key.deserializer
key的反序列化器
value.deserializer
value的反序列化器
enable.auto.commit
是否自动提交
true
auto.commit.interval.ms
设置自动提交的频率
5000(5s)
auto.offset.reset
1) earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
2) latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
3) none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
latest

 

二十一、练习4:使用API消费消息

需求说明
  • 使用Consumer API消费“my-topic”中的消息并输出 至控制台

 

二十二、Kafka优化

消息有序
  • Kafka保证在同一主题同一分区内有序
  • 如何确保基于主题全局有序
    • 一个主题一个分区
  • 局部有序
    • 生产者将消息按Key分组如(Table+PK),一个分组写入一个分区
消息副本保证
  • request.required.acks
    • 0 -生产者从不等待ack
    • 1 -生产者等Leader写成功后返回
    • -1 /all -生产者Leader和所有ISR中的Follower写成功后返回
  • min.insync.replicas
    • 该属性规定了最小的ISR数。当producer设置request.required.acks为all或-1时,指定副本(replicas)的最小数目,如果这个数目没有达到,producer会产生异常
如何使Kafka数据不丢失
  • 面对Kafka数据丢失问题的解决思路
    • 判断是否Kafka环节丢失数据
  • Producer端保证数据不丢失
    • acks配置策略+副本策略
    • retries配置策略
  • Broker端保证数据不丢失
  • Consumer端保证数据不丢失
    • Offset的控制
最佳实践
  • 结合Spark Streaming的实时流处理
  • 通用消息总线
  • 收集用户活动数据
  • 从应用程序、服务器或设备收集操作指标
  • 日志聚合(结合ELK)
  • 分布式系统提交日志

 

 

二十三、Kafka高吞吐实现

批量发送
  • 使用缓存发送
数据压缩
Consumer负载均衡
分区
  • Kafka本身是分布式集群,支持分区技术,并发度高
顺序读写磁盘
  • Kafka的producer生产数据,要写入到log文件中,写的过程是一直追加到文件末端,为顺序写。官网有数据表明,同样的磁盘,顺序写入能到600M/s,而随机写入只有100K/s
  • 随机寻址浪费时间
零拷贝技术
  • 数据不再复制到“用户态缓冲区”,内核读取缓冲区直接连接socket缓冲区

 

 

 

二十四、Kafka精确消费一次 Exactly-once

1. 思路:手动维护偏移量offset(不丢失)+幂等处理(不重复)
props.put(“enable.auto.commit”, “false”); kafkaConsumer.commitSync();
//开启kafka幂等性 //注意:在使用幂等时 必须开启acks=all和retires props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true); //设置kafka的 Acks以及retries props.put(ProducerConfig.ACKS_CONFIG,”all”); //不包含第一次发送,如果尝试3次都失败,则系统放弃发送 props.put(ProducerConfig.RETRIES_CONFIG,3); //保证信息有序 props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,1); //将检测超时的时间为1ms props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,1000);
2. 幂等性可以保证分区内发送的原子性,多分区多记录需要使用事务
public static void main(String[] args) { KafkaProducer<String,String> producer = buildKafkaProducer(); //初始化事务控制 producer.initTransactions(); try { producer.beginTransaction(); for (int i = 0; i < 3; i++) { //测试消费者消费等级 ProducerRecord<String, String> record = new ProducerRecord<>(“topic01”, “key” + i, “value” + i+”fufu”); producer.send(record); producer.flush(); } //提交事务 producer.commitTransaction(); } catch (Exception e) { System.err.println(“出错了:~”+e.getMessage()); //关闭事务 producer.abortTransaction(); }finally { producer.close(); } }

 

 

二十五、Kafka优化

1. 日志保存时间
//减少日志保存时间 log.retention.hours=72
2. 副本数量
//默认副本数为1 default.replication.factor:1
3. 网络通讯延迟
//网络不稳定或机器配置不足时,调大延迟参数 replica.socket.timeout.ms=30000 //网络不好,或kafka集群压力较大,会出现副本丢失,然后频繁复制副本。调大以下参数 replica.lag.time.max.ms=600000
4. Producer压缩方式
//默认发送不进行压缩,推荐配置一个合适的压缩算法 compression.type:none
5. 内存调整
//默认一个G,可以提高到4-6G,但生产环境尽量不要超过6个G export KAFKA_HEAP_OPTS=”-Xms4g -Xmx4g”
6. 单条日志传输大小调整
//kafka默认单条最大值是1M,如果大于1M,消费无法从生产者推送到消费者,需要更改配置 //kafka单条可复制的最大字节数 replica.fetch.max.bytes:1048576 //kafka单条可接受的最大字节数 message.max.bytes:1000012
7. 过期数据清理
//首先保证数据没有被引用 //delete和compact两种策略 //启动删除策略 log.cleanup.policy=delete //启动压缩策略 log.cleanup.policy=compact

发表回复