Kafka

本文记录了Kafka消息引擎系统(Messaging System)的大致原理和用途及一些细节

消息引擎系统

维基定义:消息引擎系统是一组规范。企业利用这组规范在不同系统之间传递语义准确的消息,实现松耦合的异步式数据传递

民间定义:系统A发送消息给消息引擎系统,系统B从消息引擎系统中读取A发送的消息

因此:

  • 消息引擎传输的对象是消息
  • 如何传输消息属于消息引擎设计机制的一部分

既然消息引擎是用于在不同系统之间传输消息的,那么如何设计待传输消息的格式就变得特别重要。一个比较容易想到的是使用已有的一些成熟解决方案,比如使用CSV、XML或是JSON;又或者是国外大厂开源的一些序列化框架,如Google的Protocol Buffer或Facebook的Thrift。

而Kafka的选择:使用纯二进制的字节序列。当然消息还是结构化的,只是在使用之前都要将其转换成二进制的字节序列

消息设计出来之后还不够,消息引擎系统还要设定具体的传输协议,即我用什么方法把消息传输出去。常见有两种方法:

  • 点对点模型:也叫消息队列模型。按“民间定义”来说,就是系统A发送的消息只能被系统B接受,其他任何系统都不能读取A发送的消息。日常生活的例子比如电话客服就属于这种模型:同一个客户呼入电话只能被一位客服人员处理,第二个客服人员不能为该客户服务
  • 发布/订阅模型:与上面不同的是,它有一个主题(topic)的概念,可以理解为逻辑语义相近的消息容器。该模型也有发送方和接收方,只不过提法不同。发送者也称为发布者(Publisher),接收方称为订阅者(Subscriber)。和点对点模型不同的是,这个模型可能存在多个发布者向相同的主题发送消息,而订阅者也可能存在多个,它们都能接收到相同主题的消息。生活中的报纸订阅就是一种典型的发布/订阅模型。

Kafka同时支持这两种消息引擎模型。

为什么要使用消息引擎,而不能让系统A直接发送消息给系统B?

答案是“削峰填谷”!这是指缓冲上下游瞬时突发流量,使其更平滑。特别是对于那些发送能力很强的上游系统,如果没有消息引擎的保护,“脆弱”的下有系统可能会直接被压垮导致全链路服务“雪崩”。但是一旦有了消息引擎,它能够有效地对抗上游的流量冲击,真正做到将上游的“峰”填满到“谷”中,避免了流量的震荡。(上游操作比较简单,TPS远高于处理订单的下游,因此需要Kafka这样的消息引擎系统来对抗这种上下游系统TPS的错配以及瞬时峰值流量)

消息引擎系统的另一大好处在与发送方和接收方的松耦合,这也在一定程度上简化了应用的开发,减少了系统间不必要的交互。

当引入Kafka之后,上游服务不再直接与下游服务进行交互。上游向Kafka Broker发送一条消息即可,下游的各个子服务订阅Kafka中的对应主题,并实时从该主题的各个分区(Partition)中获取到消息进行处理,从而实现了上游服务与下游处理服务的解耦。

Kafka相关术语

  • 主题(Topic),在Kafka中发布订阅的对象。可以为每个业务、应用设置是每类数据都创建专属的主题
  • 生产者(Producer),向主题发布消息的客户端应用程序。生产者程序通常持续不断地向一个或多个主题发送消息。
  • 消费者(Consumer),订阅这些主题消息的客户端应用程序。和生产者类似,消费者也能够同时订阅多个主题的消息。
  • 客户端(Clients),把生产者和消费者统称为客户端。

可以同时运行多个生产者和消费者实例,这些实例会不断地向Kafka集群中的多个主题生产和消费消息。

有客户端自然也就有服务器端:

  • Broker,Kafka的服务器端由被称为Broker的服务进程构成,即一个Kafka集群由多个Broker组成,Broker负责接收和处理客户端发送过来的请求,以及对消息进行持久化。虽然Broker进程能够运行在同一台机器上,但更常见的做法是将不同的Broker分散运行在不同的机器上,这样如果集群中某一台机器宕机,即使在它上面运行的所有Broker进程都挂掉了,其他机器上的Broker也依然能够对外提供服务。这其实就是Kafka提供高可用的手段之一

  • 备份机制(Replication),是实现高可用的另一个手段。备份的思想很简单,就是把相同的数据拷贝到多台机器上,而这些相同的数据拷贝在Kafka中被称为副本(Replica)。副本的数量是可以配置的,这些副本保存着相同的数据,但却有不同的角色和作用。Kafka定义了两类副本:

    • 领导者副本(Leader Replica),对外提供服务(指与客户端程序进行交互)
    • 追随者副本(Follower Replica),被动地追随领导者副本,不能与外界进行交互

    副本的工作机制也很简单:生产者总是向领导者副本写消息;而消费者总是从领导者副本读消息。至于追随者副本,它只做一件事:向领导者副本发送请求,请求领导者把最新生产的消息发给它,这样它能保持与领导者的同步

  • 伸缩性(Scalability),虽然有了副本机制可以保证数据的持久化或消息不丢失,但是没有解决伸缩性问题,。拿副本来说,虽然现在有了领导者副本和追随者副本,但倘若领导者副本积累了太多的数据以至于单台Broker及其都无法容纳了。

  • 分区(Partitioning),为了解决伸缩性的问题,此时应该把数据分割成多份保存在不同的Broker上,Kafka也是这么设计的。Kafka中的分区机制指的是将每个主题划分为多个分区,每个分区是一组有序的消息日志。生产者生产的每条消息只会被发送到一个分区中,也就是说如果向一个双分区的主题发送一条消息,这条消息要么在分区0中,要么在分区1中。注意Kafka的分区编号是从0开始的,如果Topic有100个分区,那么分区编号就是从0到99.

    注意副本是在分区这个层级定义的。每个分区下可以配置若干个副本,其中只能有一个领导者副本和N-1个追随者副本。生产者向分区写入消息,每条消息在分区中的位置信息由一个叫位移(Offset)的数据来表征。分区位移总是从0开始,假设一个生产者向一个空分区写入了10条消息,那么这10条消息的位移依次是0,1,2 … 9

至此能完整串联起Kafka的三层消息架构

  • 第一层是主题层,每个主题可以配置M个分区,而每个分区又可以配置N个副本
  • 第二层是分区层,每个分区的N个副本中只能有一个充当领导者角色,对外提供服务;其他N-1个副本是追随者副本,只是提供数据冗余之用
  • 第三层是消息层,分区中包含若干条消息,每条消息的位移从0开始,依次递增
  • 最后,客户端程序只能与分区的领导者副本进行交互

Kafka Broker如何持久化数据?

Kafka使用消息日志(Log)来保存数据,一个日志就是磁盘上一个只能追加写(Append-only)消息的物理文件。因为只能追加写入,故避免了缓慢的随机I/O操作,改为性能较好的顺序I/O写操作,这也是实现Kafka高吞吐量特性的一个重要手段。不过如果不停地向一个日志写入消息,最终也会耗尽所有的磁盘空间,因此Kafka要定期地删除消息以回收磁盘。

具体就是通过日志段(Log Segment)机制。在Kafka底层,一个日志又进一步细分成多个日志段,消息被追加写到当前最新的日志段中,当写满了一个日志段,Kafka会自动切分出一个新的日志段,并将老的日志段封存起来。Kafka在后台还有定时任务会定期地检查老的日志段是否能够被删除,从而实现回收磁盘的目的。

消费者细节

前面提过两种消息模型,即点对点模型(Peer to Peer, P2P)和发布订阅模型。

  • 点对点指得是同一条消息只能被下游的一个消费者消费,其他消费者则不能染指。在Kafka中实现这种P2P模型的方法就是引入了消费者组(Consumer Group)。所谓的消费者组,指的是多个消费者实例共同组成一个组来消费一组主题。这组主题中的每个分区都只会被组内的一个消费者实例消费,其他消费者实例不能消费它。

    为什么要引入消费者组?主要是为了提升消费者端的吞吐量。多个消费者实例(Consumer Instance,可以是运行消费者应用的进程,也可以是一个线程)同时消费,加速整个消费端的吞吐量(TPS)。

  • 消费者组里面的所有消费者实例不仅“瓜分”订阅主题的数据,而且它们还能彼此协助。假设组内某个实例挂掉了,Kafka能够自动检测到,然后把这个Failed实例之前负责的分区转移给其他或者的消费者。这个过程就是Kafka中大名鼎鼎的“重平衡”(Rebalance)。但是其实由重平衡引发的消费者问题比比皆是,事实上很多重平衡的Bug社区都无力解决。

  • 每个消费者在消费消息的过程中必然需要有个字段记录它当前消费到了分区的哪个位置上,这个字段就是消费者位移(Consumer Offset)。注意,这和上面所说的位移完全不是一个概念。上面的Offset表征的是分区内的消息为止,它是不变的,即一旦消息被成功写个消费者有着自己的消费者位移,因此一定要区分这两类的区别。

Kafka只是消息引擎系统?

注意,Apache Kafka是消息引擎系统,也是一个分布式流处理平台(Distributed Streaming Platform)

Kafka在设计之初就旨在提供三个方面的特性:

  • 提供一套API实现生产者和消费者
  • 降低网络传输和磁盘存储开销
  • 实现高伸缩性架构

作为流处理平台,Kafka与其他主流大数据流式计算框架相比的优势:

  • 更容易实现端到端的正确性(Correctness)。实现正确性是流处理能够匹敌批处理的基石。

    正确性一直是批处理的强项,而实现正确性的基石是要求框架能提供精确一次(Exactly-once)处理语义,即处理一条消息有且只有一次机会能够影响系统状态。目前主流的大数据流处理框架只能实现框架内的精确一次处理语义

  • Kafka自己对于流式计算的定位是一个用于搭建实时流处理的客户端库而非一个完整的功能系统。这意味着对于一些中小企业,流处理数据量不大,逻辑也不复杂,无需使用重量级的完整性平台,这时就可以使用Kafka流处理组件。

除此之外,Kafka能够被用作分布式存储系统,不过这方面应用较少。

应该选择哪种Kafka?

Kafka是一个开源框架,但是存在多个组织或公司发布不同的Kafka,类似于Linux的不同发行版的概念(CentOS、Ubuntu…)

  • Apache Kafka

    • 最“正宗”的版本,是后面其他所有版本的基础。即后面的版本要么原封不动地继承了Apache Kafka,要么是在此之上扩展了新功能。
    • 优势:开发人数最多、版本迭代速度最快,社区响应及时
    • 劣势:仅提供最基础的组件,并且没有提供任何监控框架或工具
  • Confluent Kafka

    • 提供了一些Apache Kafka没有的高级特性,比如跨数据中心备份、Schema注册中心以及集群监控工具等。
  • Cloudera/Hortonworks Kafka

    • Cloudera提供的CDH和Hortonworks提供的HDP是非常著名的大数据平台,里面集成了目前主流的大数据框架,能够帮助用户实现从分布式存储、集群调度、流处理到机器学习、实时数据库等全方位的数据处理

Kafka的版本更迭

  • 0.7版本:只提供了最基础的消息队列功能
  • 0.8版本:引入了副本机制,至此Kafka成为了一个真正意义上完备的分布式高可靠消息队列解决方案
  • 0.9.0.0版本:增加了基础的安全认证/权限功能;使用Java重写了新版本消费者API;引入了Kafka Connect组件
  • 0.10.0.0版本:引入了Kafka Streams,正式升级成分布式流处理平台
  • 0.11.0.0版本:提供了幂等性Producer API以及事务API;对Kafka消息格式做了重构
  • 1.0和2.0版本:主要还是Kafka Streams的各种改进

Kafka的线上集群部署方案

操作系统

Linux系统的优势:

  • I/O模型的使用:在Linux上能获得更高效的I/O性能
  • 数据网络传输效率:在Linux上能够享受到零拷贝技术所带来的的快速数据传输特性
  • 社区支持度:社区对Windows平台上的Bug不做任何承诺,因此不能用于生产环境

磁盘

机械磁盘:成本低容量大,但易损坏

固态硬盘:性能优势大,但单价高

由于Kafka使用磁盘大多是顺序读写操作,一定程度上规避了机械磁盘最大的劣势,即随机读写操作慢。因此推荐使用机械磁盘即可,而可靠性差的缺点可以由Kafka在软件层面提供机制来保证。

磁盘容量

考虑以下几个因素:

  • 新增消息数
  • 消息留存时间
  • 平均消息大小
  • 备份数
  • 是否启用压缩

实际使用中建议预留20%-30%的磁盘空间

带宽

根据实际带宽资源和业务SLA预估服务器数量

对于千兆网络,建议每台服务器按照700Mbps来计算,避免大流量下的丢包

集群参数配置(重要!)

包括Kafka服务端的配置,其中既有Broker端参数,也有主题(Topic)级别的参数、JVM端参数和操作系统级别的参数。

Broker端的参数也被称为静态参数(Static Config)。所谓静态参数,是指你必须在Kafka的配置文件server.properties中进行设置的参数,不管是新增、修改还是删除。同时,必须重启Broker进程才能令它们生效。

而主题级别的参数设置有所不同,Kafka提供了专门的kafka-configs领命来修改它们。

置于JVM和操作系统级别参数,它们的设置方法比较通用化。

Broker端参数

Kafka Broker提供了近200个参数,按照大的用途类别来介绍:

信息存储重要参数

  • log.dirs:非常重要的参数,制定了Broker需要使用的若干个文件目录路径。这个参数是没有默认值的,这说明它必须亲自指定
  • log.dir:注意结尾没有s,说明它只能表示单个路径,它是补充上一个参数用的

这两个参数中只要设置 log.dirs就好了。更重要的是,在线上生产环境中一定要为 log.dirs配置多个路径,具体格式是一个CSV格式,也就是用逗号分隔的多个路径。如果有条件最好保证这些目录挂载到不同的物理磁盘上,这样做的好处:

  • 提升读写性能:多块物理磁盘同时读写数据有更高的吞吐量
  • 实现故障转移:即Failover。自1.1版本开始,坏掉的磁盘上的数据会自动地转移到其他正常的磁盘上,而且Broker还能正常工作

ZooKeeper相关设置

ZooKeeper是一个分布式协调框架,负责协调管理并保存Kafka集群的所有元数据信息,比如集群都有哪些Broker在运行、创建了哪些Topic,每个Topic有多少分区以及这些分区的Leader副本都在哪些机器上等等。

  • zookeeper.connect:Kafka与ZooKeeper相关的最重要参数。这是一个CSV格式的参数,可以指定它的值为 zk1:2181, zk2:2181,zk3:2181。2181是ZooKeeper的默认端口

如果让多个Kafka集群使用同一套ZooKeeper集群,这个时候就要用到chroot(在ZooKeeper中类似别名的概念):

如果有两套Kafka集群,假设分别叫kafka1和kafka2,那么两套集群的 zookeeper.connect参数可以这样指定:zk1:2181,zk2:2181,zk3:2181/kafka1zk1:2181,zk2:2181,zk3:2181/kafka2切记chroot只需要写一次,而且是加到最后的。经常碰到有人这样指定:zk1:2181/kafka1,zk2:2181/kafka2,zk3:2181/kafka3,这样的格式是不对的

Broker连接相关

关于客户端程序或其他Broker如何与该Broker进行通信的设置。有以下三个参数:

  • listeners:监听器,其实就是告诉外部连接者要通过什么协议访问指定主机名和端口开放的kafka服务
  • advertised.lisenters:多了个advertised,表示宣称的、公平的,就是说这组监听器是Broker用于对外发布的
  • host.name/port:列出这两个参数就是想说把它们忘掉吧,压根不要为它们指定值,毕竟都是过期的参数了

具体说说监听器的概念,从构成上说,它是若干个逗号分隔的三元组,每个三元组的格式为 <协议名称, 主机名, 端口号>

  • 协议名称可能是标准的名字,比如PLAINTEXT表示明文传输、SSL表示使用SSL或TLS加密传输等;也可能是自己定义的协议名字,比如 CONTROLLER://localhost:9092

    一旦自己定义了协议名称,必须还要指定 listener.security.protocol.map参数告诉这个协议底层使用了哪种安全协议,比如指定 listener.security.protocal=CONTROLLER:PLAINTEXT表示CONTROLLER这个自定义协议底层使用明文不加密传输数据

  • 主机号和端口号比较直观。不过注意主机名这个设置中可以使用IP地址或主机名,但是建议最好全部使用主机名,即Broker端和Client端应用配置中全部填写主机名。Broker源代码中也使用的是主机名,如果在某些地方使用了IP地址进行连接,可能会发生无法连接的问题

Topic管理相关

  • auto.create.topics.enable:是否允许自动创建Topic。建议设置成false,防止因为拼写错误自动生成了一个新的Topic
  • unclean.leader.election.enable:是否允许Unclean Leader选举,建议显式设置成false
  • auto.leader.rebalance.enable:是否允许定期进行Leader选举,建议设置成false(换Leader代价很高并且本质上没有任何性能收益)

数据留存相关

  • log.retention.{hours|minutes|ms}:这是个”三兄弟“,都是控制一条消息数据被保存多长时间。从优先级上来说ms设置最高、minutes次之、hours最低。

    但是通常情况下还是设置hours级别多一点,比如 log.retention.hours=168表示默认保存7天的数据,自动删除7天前的数据。很多公司把Kafka当做存储来使用,那么这个值就要相应地调大。

  • log.retention.bytes:这是指定Broker为消息保存的总磁盘容量大小。

    默认是-1,表明想在这台Broker上保存多少数据都可以,至少在容量方面Broker绝对不做任何阻拦。这个参数真正发挥作用的场景其实是在云上构建多租户的Kafka集群:设想要做一个云上Kafka服务,每个租户只能使用100GB的磁盘空间,为了避免有个“恶意”租户使用过多的磁盘空间,设置这个参数就显得很重要了。

  • message.max.byte:控制Broker能够接收的最大消息大小

    这个参数不能使用默认值,默认的1000012太少了,还不到1MB。实际场景中突破1MB的消息都是屡见不鲜的,因此在线上环境中设置一个比较大的值还是比较保险的做法。

Topic级别参数

注意如果同时设置了Topic参数和全局Broker参数,Topic级别参数会覆盖全局Broker参数的值,而每个Topic都能设置自己的参数值。

例如前面提到的消息数据的留存时间参数,在实际环境中,如果为所有Topic数据都保存相当长的时间,这样做既不高效也无必要。更适当的做法是允许不同部门的Topic根据自身也无需要,设置自己的留存时间。如果只能设置全局Broker参数,那么势必要提取所有业务留存时间的最大值作为全局参数值,此时设置Topic级别参数把它覆盖是一个不错的选择。

依旧按照用途分组引出重要的Topic级别参数:

消息保存相关

  • retention.ms:规定了该Topic消息被保存的时长,默认是7天,即该Topic只保存最近7天的消息。一旦设置了这个值,它会覆盖掉Broker端的全局参数值
  • retention.bytes:规定了要为该Topic预留多大的磁盘空间。和全局参数作用相似,通常在多租户的Kafka集群中有用武之地,默认为-1

如何设置Topic级别参数

  • 创建Topic时进行设置

    假设你的部门需要将交易数据发送到Kafka进行处理,需要保存最近半年的交易数据,同时这些数据很大,通常都有几MB,但一般不会超过5MB,那么可以用以下命令创建Topic:

    1
    bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic transaction --partions 1 --replication-factor 1 --cofig retention.ms=15552000000 --config max.message.bytes=5242880

    我们只需知道Kafka开放了kafka-topics命令供我们来创建Tipic即可。注意结尾处的–config设置,就是在config后面制定了想要设置的Topic级别参数。

  • 修改Topic时设置

    可以使用另一个自带的命令kafka-configs来修改Topic级别参数。假设我们现在要发送最大值10MB的消息,修改的命令如下:

    1
    bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name transaction --alter --add-config max.message.bytes=10485760

总体来说只有这两种方式进行设置,推荐始终坚持使用第二种方式来设置

JVM参数

  • 堆大小的设置:通用建议,将JVM堆大小设置成6GB,默认的Heap Size为1GB,有点小。由于Kafka Broker在与客户端进行交互时会在JVM堆上创建大量的ByteBuffer实例,Heap Size不能太小。
  • 垃圾回收器的设置(GC设置):如果使用Java 8,使用默认的G1收集器就好了。在没有任何调优的情况下,主要体现在更少的Full GC,需要调整的参数更少等

如何设置

只需要设置两个环境变量即可:

  • KAFKA_HEAP_OPTS:指定堆大小
  • KAFKA_JVM_PERFORMANCE_OPTS:指定GC参数

例如可以这样启动Kafka Broker,即在启动Kafka Broker之前,先设置上这两个环境变量

1
2
3
4
export KAFKA_HEAP_OPTS=--Xms6g --Xms6g
export KAFKA_JVM_PERFORMANCE_OPTS= -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokeConcurrent -Djava.awt.headless=true

bin/kafka-server-start.sh config/server.properties

操作系统参数

通常情况下Kafka并不需要设置太多的OS参数,但有些因素还是需要关注一下:

  • 文件描述符限制
    • ulimit -n:推荐任何一个Java项目调整一下这个值,实际上文件描述系统资源并不像想象中那样昂贵,不用太担心调大此值会有什么不利影响。通常情况下将它设置成一个超大的值是合理的做法,比如 ulimit -n 1000000。如果不设置的话会经常看到“Too many open files”的错误
  • 文件系统类型
    • 文件系统指的是如ext3、ext4或XFS这样的日志型文件系统。根据官网的测试报告,XFS的性能强于ext4,所以生产环境最好还是使用XFS
  • Swappiness
    • 对于swap的调优,很多文章提到设置其为0,个人推荐设置成一个较小的值。因为一旦设置为0,当物理内存耗尽,操作系统会出发OOM killer组件,它会随机挑选一个进程然后kill掉,根本不给用户任何的预警。但如果设置成一个较小的值比如1,当开始使用swap空间时,至少能观测到Broker性能开始出现急剧下降,从而给你进一步调优和诊断问题的时间
  • 提交时间
    • 或说是Flush落盘时间。默认为5秒,一般认为太频繁了,可以适当增加提交间隔来降低物理磁盘的写操作。

无消息丢失配置

最佳实践:

  • 不要使用producer.send(msg),而要使用producer.send(msg, callback)。记住,一定要使用带有回调通知的send方法
  • 设置acks=all。acks是Producer的一个参数,代表了你对“已提交”消息的定义。如果设置成all,则表明所有副本Broker都要接收到消息,该消息才算是“已提交”、这是最高等级的“已提交”定义
  • 设置retries为一个较大的值。这里的retries同样是Producer的参数,对应Producer的自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了retries>0的Producer能够自动重试消息发送,避免消息丢失
  • 设置unclean.leader.election.enable=false。这是Broker端的参数,它控制的是哪些Broker有资格竞选分区的Leader。如果一个Broker落后原先的Leader太多,那么它一旦成为新的Leader,必然会造成消息的丢失。故一般设置成false不允许该情况发生
  • 设置replication.factor>=3。这也是Broken端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余
  • 设置min.insync.replicas>1。仍然是Broker端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于1可以提升消息持久性。在实际环境中千万不要使用默认值1.
  • 确保replication.factot>min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要再不降低可用性的基础上完成。推荐设置成replication.factot=min.insync.replicas+1。
  • 确保消息消费完成再提交。Consumer端有个参数enable.auto.commit,最好设置成false,并采用手动提交位移的方式。这对于单Consumer多线程处理的场景而言是至关重要的。

Java生产者是如何管理TCP连接的

为何采用TCP?

Apache Kafka的所有通信都是基于TCP的,而不是HTTP或其他协议。无论是生产者、消费者,还是Broker之间的通信都是如此。

主要原因在与TCP和HTTP之间的区别:

  • 从社区角度看,在开发客户端时,人们能够利用TCP本身的一些高级功能,比如多路复用请求以及同时轮询多个连接的能力
    • 所谓的多路复用请求,即multiplexing request,是指将两个或多个数据流合并到底层单一物理连接中的过程。TCP的多路复用请求会在一条物理连接上创建若干个虚拟连接,每个虚拟连接负责流转各自对应的数据流。
    • 其实严格来说,TCP并不能多路复用,它只是提供可靠的消息交付语义保证,比如自动重传丢失的报文。更严谨地说,作为一个基于报文的协议,TCP能够被用于多路复用连接场景的前提是,上层的应用协议(如HTTP)允许发送多条消息。
  • 除了这些高级功能有可能被Kafka客户端的开发人员使用之外,社区还发现,目前已知的HTTP库在很多编程语言中都略显简陋

Kafka生产者程序

Kafka的Java生产者API主要的对象就是KafkaProducer。通常开发一个生产者的步骤有4步:

  • 构造生产者对象所需的参数对象
  • 利用第一步的参数对象,创建KafkaProducer对象实例
  • 使用KafkaProducer的send方法发送消息
  • 调用KafkaProducer的close方法关闭生产者并释放各种系统资源
1
2
3
4
5
6
7
8
9
10
//代码实现
Properties props = new Properties();
props.put("参数1","参数1的值");
props.put("参数2","参数2的值");
//......
try(Producer producer = new KafkaProducer<>(props)){
//producer.send(msg,callback);
producer.send(new ProducerRecord(......),callback);
//......
}

这段代码使用了Java 7提供的try-with-resource特性,所以并没有显式调用producer.close()方法。无论是否显式调用close方法,所有生产者程序大致都是这个路数。

现在有一个问题,当我们开发一个Producer应用时,生产者会向Kafka集群中指定的主题(Topic)发送消息,这必然涉及与Kafka Broker创建TCP连接。那么,Kafka的Producer客户端是如何管理这些TCP连接的呢?

何时创建TCP连接

创建KafkaProducer实例时,生产者应用会在后台创建并启动一个名为Sender的线程,该Sender线程开始运行时首先会创建与Broker的连接。

当不调用send方法时,这个Producer不知道给哪个主题发送消息,也不知道连接哪个Broker,因此它会连接bootstrap.servers参数指定的所有Broker。

bootstrap.servers是Producer的核心参数之一,指定了这个Producer启动时要连接的Broker地址。因此如果制定了1000个Broker连接信息,那么Producer启动时会首先创建与这1000个Broker的TCP连接。

实际使用中不推荐把集群中所有的Broker信息都配置到bootstrap.servers中,通常指定3-4台就够了。因为Producer一旦连接到集群中的任一台Broker,就能拿到整个集群的Broker信息。

此外TCP连接还可能在两个地方被创建:

  • 一个是在更新元数据后,如果发现与某些Broker当前没有连接,那么它就会创建一个TCP连接
  • 另一个是在消息发送时,如果Producer发现尚不存在与目标Broker的连接,也会创建一个

何时关闭TCP连接

  • 用户主动关闭。包括用户调用kill -9主动“杀掉”Producer应用。当然最推荐的还是调用producer.close()方法来关闭

  • Kafka自动关闭。这与Producer端参数connection.max.idle.ms的值有关。默认情况下该参数值是9分钟,即如果在9分钟内没有任何请求“流过”TCP连接,那么Kafka会主动帮你把该TCP连接关闭。可通过设置为-1禁掉这种机制。

    注意这种方式的TCP连接是在Broker端被关闭的,但其实这个TCP连接的发起方是客户端,因此在TCP看来属于被动关闭(passive close)的场景。后果是会产生大量的CLOSE_WAIT,因此Producer端或Client端没有机会显式地观测到此连接已被中断

Java消费者是如何管理TCP连接的

何时创建TCP连接

消费者端主要的程序入口是KafkaConsumer类。和生产者不同的是,构建KafkaConsumer实例时是不会创建任何TCP连接的,也就是说当执行完new KafkaConsumer(properties)语句后,会发现没有Socket连接被创建出来。这一点和Java生产者是有区别的。

实际上TCP连接是在调用KafkaConsumer.poll方法时被创建的。再细粒度地说,在poll方法内部有3个时机可以创建TCP连接:

  • 发起FindCoordinator请求时

    消费者端有个组件叫协调者(Coordinator),它驻留在Broker端的内存中,负责消费者组的组成员管理和各个消费者的位移提交管理。当消费者程序首次启动调用poll方法时,它需要向Kafka集群发送一个名为FindCoordinator的请求,希望Kafka集群告诉它哪个Broker是管理它的协调者。消费者会向集群中当前负载最小的那台Broker发送请求。

  • 连接协调者时

    Broker处理完上一步发送的FindCoordinator请求之后,会返还对应的响应结果(Response),显式地告诉消费者哪个Broker是真正的协调者,因此在这一步,消费者知晓了真正的协调者后,会创建连向该Broker的Socket连接。只有成功连入协调者,协调者才能开启正常的组协调操作,比如加入组、等待组分配方案、心跳请求处理、位移获取、位移提交等

  • 消费数据时

    消费者会为每个要消费的分区创建与该分区领导者副本所在Broker连接的TCP。

Mafka的学习

Producer Demo:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class DemoSyncProducer{
/**
* producer实例请使用长链接,启动一个实例一直发消息
* 请不要使用短连接:发送一条消息初始化一次,一是性能不好,二是服务端会限制连接次数影响消息发送
*/
private static IProducerProcessor producer;

//注意:执行main函数若抛出ERROR级别异常,请务必进行观察处理
public static void main(String[] args) throws Exception{
Properties properties = new Properties();
//设置业务所在BG的namespace
properties.setProperty(ConsumerConstants.MafkaBGNamespace, "common");
//设置生产者appkey
properties.setProperty(ConsumerConstants.MafkaClientAppkey, "notify");

//创建topic对应的producer对象(注意每次build调用会产生一个新的实例)
//请注意:若调用MafkaClient.buildProduceFactory()创建实例抛出有异常,请重点关注并排查异常原因,不可频繁调用该方法给服务端带来压力
producer = MafkaClient.buildProduceFactory(properties, "notify.message.send");
for (int i = 0; i < 10; ++i){
try{
//同步发送,注意:producer只实例化一次,不要每次调用sendMessage方法前都创建producer实例
ProducerResult result = producer.sendMessage("send sync message" + i);
System.out.println("send " + i + " status: " + result.getProducerStatus());
}catch(Exception e){
e.printStackTrace(e);
}
}
}
}

Consumer Demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class DemoConsumer{

private static IConsumerProcessor consumer;

public static void main(String[] args) throws Exception{
Properties properties = new Properties();
//设置业务所在BG的namespace
properties.setProperty(ConsumerConstants.MafkaBGNamespace, "common");
//设置消费者appkey
properties.setProperty(ConsumerConstants.MafkaClientAppkey, "notify");
//设置订阅组group
properties.setProperty(ConsumerConstants.SubscribeGroup, "notify.message.send");

//创建topic对应的consumer对象(注意每次build调用会产生一个新的实例)
consumer = MafkaClient.buildConsumerFactory(properties, "notify.message.send");

//调用recvMessageWithParallel设置listener
//注意1:可以修改String.class以支持自定义数据类型
//注意2:针对同一个consumer对象,只能调用一次该方法;多次调用的话,后面的调用都会报异常
consumer.recvMessageWithParallel(String.class, new IMessageListener(){
@Override
public ConsumeStatus recvMessage(MafkaMessage message, MessageContext context){
try{
System.out.println("message=[" + message.getBody() + "] partition=" + message.getPartition());
} catch (Exception e){
e.printStackTrace();
}
return ConsumeStatus.CONSUME_SUCCESS;
}
});
}
}

注意点:

  • 将producer资源在业务应用初始化的时候创建好
  • producer资源创建好后,再开放业务流量
  • 不要频繁创建producer,即不要每次发送消息的时候,创建一个producer。发送完毕后,进行close
  • 若调用MafkaClient.buildProduceFactory()创建实例抛出有异常,请充电关注并排查异常原因,不可频繁调用该方法给服务端带来压力
  • 版权声明: 本博客所有文章除特别声明外,著作权归作者所有。转载请注明出处!
  • Copyrights © 2022 ZHU
  • 访问人数: | 浏览次数:

请我喝杯咖啡吧~

支付宝
微信