0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
会员中心
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

golang中使用kafka的综合指南

马哥Linux运维 来源:稀土掘金技术社区 2023-11-30 11:18 次阅读

介绍

kafka是一个比较流行的分布式、可拓展、高性能、可靠的流处理平台。在处理kafka的数据时,这里有确保处理效率和可靠性的多种最佳实践。本文将介绍这几种实践方式,并通过sarama实现他们。

以下是一些kafka消费的最佳实践:

选择合适的提交策略:Kafka提供两种提交策略,自动和手动。虽然自动操作很容易使用,但它可能会导致数据丢失或重复。手动提交提供了更高级别的控制,确保消息至少处理一次或恰好一次,具体取决于用例。

尽可能减少Kafka的传输次数:大批量读取消息可以显著提高吞吐量。这可以通过调整 fetch.min.bytes 和 fetch.max.wait.ms 等参数来实现。

尽可能使用消费者组:Kafka允许多个消费者组成一个消费者组来并行消费数据。这使得 Kafka 能够将数据分发给一个组中的所有消费者,从而实现高效的数据消费。

调整消费者缓冲区大小:通过调整消费者的缓冲区大小,如 receive.buffer.bytes 和 max.partition.fetch.bytes,可以根据消息的预期大小和消费者的内存容量进行调整。这可以提高消费者的表现。

处理rebalance:当新的消费者加入消费者组,或者现有的消费者离开时,Kafka会触发rebalance以重新分配负载。在此过程中,消费者停止消费数据。因此,快速有效地处理重新平衡可以提高整体吞吐量。

监控消费者:使用 Kafka 的消费者指标来监控消费者的性能。定期监控可以帮助我们识别性能瓶颈并调整消费者的配置。

选择合适的提交策略

1.自动提交

Sarama 的 ConsumerGroup 默认情况下会自动提交偏移量。这意味着它会定期提交已成功消费的消息的偏移量,这允许消费者在重新启动或消费失败时从中断的地方继续。

下面是一个自动提交的消费者组消费消息的例子:


config := sarama.NewConfig()  
config.Version = sarama.V2_0_0_0 
config.Consumer.Offsets.AutoCommit.Enable = true  
config.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second  
  
ConsumerGroup, err := sarama.NewConsumerGroup(brokers, groupID, config)  
if err != nil {  
    log.Panicf( "创建消费者组客户端时出错: %v" , err)  
}  
  
Consumer := Consumer{}  
ctx := context.Background()  
  
for {  
    err := ConsumerGroup.Consume(ctx, [] string {topic}, Consumer)  
    if err != nil {  
        log.Panicf( "来自消费者的错误: %v" , err)  
    }  
}

根据config.Consumer.Offsets.AutoCommit.Interval可以看到,消费者会每秒自动提交offset。

2. 手动提交

手动提交使我们更好地控制何时提交消息偏移量。下面是一个手动提交的消费者组消费消息的例子:


config := sarama.NewConfig()  
config.Version = sarama.V2_0_0_0 
config.Consumer.Offsets.AutoCommit.Enable = false 
  
consumerGroup, err := sarama.NewConsumerGroup(brokers, groupID , config)  
if err != nil {  
    log.Panicf( "创建消费者组客户端时出错: %v" , err)  
}  
  
Consumer := Consumer{}  
ctx := context.Background()  
  
for {  
    err := ConsumerGroup.Consume( ctx, [] string {topic}, Consumer)  
    if err != nil {  
        log.Panicf( "Error from Consumer: %v" , err)  
    }  
}  
  


type Consumer struct {}  
  
func (consumer Consumer) Setup (_ sarama.ConsumerGroupSession) error { return nil }  
func (consumer Consumer) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }  
func (consumer Consumer) ConsumeClaim(sess sarama.ConsumerGroupSession, Claim sarama.ConsumerGroupClaim) error {  
    for msg : = range Claim.Messages() {  
        
        fmt.Printf( "Message topic:%q partition:%d offset:%d
" , msg.Topic, msg.Partition, msg.Offset)  


        
        sess.MarkMessage(msg, "" )  
    }  
    return nil  
}

在该示例中, 使用MarkMessage手动将消息标记为已处理,最终根据Consumer.Offsets.CommitInterval配置提交。另外这个例子省略了错误处理部分,开发时需要注意正确处理生产过程中出现的错误。

译者注:这篇文章虽然是今年5月发布,但是这里的提交方式还是有些过时了,目前sarama已经废弃了Consumer.Offsets.CommitInterval,相关配置目前在Consumer.Offsets.AutoCommit

尽可能减少Kafka的传输次数

减少kafka的传输次数可以通过优化从kafka中读取和写入数据的方式来实现:

1. 增加批次的大小

使用kafka批量发送消息的效果优于逐个发送消息,批次越大,kafka发送数据效率就越高。但是需要权衡延迟和吞吐量之间的关系。较大的批次虽然代表着更大的吞吐量,但也会增加延迟。因为批次越大,填充批次的时间也越久。

在Go中,我们可以在使用sarama包生成消息时设置批次大小:


config := sarama.NewConfig()  
config.Producer.Flush.Bytes = 1024 * 1024

以及获取消息的批次大小


config := sarama.NewConfig()  
config.Consumer.Fetch.Default = 1024 * 1024

2. 使用长轮询

长轮询是指消费者轮询时如果Kafka中没有数据,则消费者将等待数据到达。这减少了往返次数,因为消费者不需要在没有数据时不断请求数据。


config := sarama.NewConfig() 
config .Consumer.MaxWaitTime = 500 *time.Millisecond

该配置告诉消费者在返回之前会等待500毫秒

3. 尽可能使用消费者组

消费者组是一组协同工作消费来自kafka主题的消息的消费者。消费者组允许我们在多个消费者之间分配消息,从而提供横向拓展能力。使用消费者组时,kafka负责将分区分配给组中的消费者,并确保每个分区同时仅被一个消费者消费。

接下来是sarama中消费者组的使用:

使用消费者组需要实现一个ConsumerGroupHandler接口

该接口具有三个方法:Setup、Cleanup、 和ConsumeClaim


type exampleConsumerGroupHandler struct { 
} 


func  (h *exampleConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { 
    
    return  nil
 } 


func  (h *exampleConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { 
    
    return  nil
 } 


func  (h *exampleConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, Claim sarama.ConsumerGroupClaim) error { 
    for message := range Claim.Messages() { 
        
        fmt.Printf( "Message: %s
" , string (message.Value)) 


        
        session.MarkMessage(message, "" ) 
    }
    返回 nil
 }

创建sarama.ConsumerGroup并开始消费:


brokers := []string{"localhost:9092"} 
topic := "example_topic"  
groupID := "example_consumer_group"  
  


consumerGroup, err := sarama.NewConsumerGroup(brokers, groupID, config)  
if err != nil {  
    log.Fatalf("Error creating consumer group: %v", err)  
}  
defer consumerGroup.Close()  
  


handler := &exampleConsumerGroupHandler{}  
  


for {  
    err := consumerGroup.Consume(context.Background(), []string{topic}, handler)  
    if err != nil {  
        log.Printf("Error consuming messages: %v", err)  
    }  
}

该示例设置了一个消费组,用于消费来自“example_topic”的消息。消费者组可以通过添加更多消费者来提高处理能力。

使用消费者组时,记得处理消费期间rebalance和错误。

调整消费者缓冲区大小

在sarama中,我们可以调整消费者缓冲区的大小,以调整消费者在处理消息之前可以在内存中保存的消息数量。

默认情况下,缓冲区大小设置为256,这代表Sarama在开始处理消息之前将在内存中保存最多256条消息。如果消费者速度很慢,增加缓冲区大小可能有助于提高吞吐量。但是,更大的缓冲区也会消耗更多的内存。

以下是如何增加缓冲区大小的例子:


config := sarama.NewConfig()  
config.Consumer.Return.Errors = true  
config.Version = sarama.V2_1_0_0  
config.Consumer.Offsets.Initial = sarama.OffsetOldest  


config.ChannelBufferSize = 500  
  


group, err := sarama.NewConsumerGroup([]string{broker}, groupID, config)  
if err != nil {  
    panic(err)  
}  
  


ctx := context.Background()  
for {  
    topics := []string{topic}  
    handler := exampleConsumerGroupHandler{}  


    err := group.Consume(ctx, topics, &handler)  
    if err != nil {  
        panic(err)  
    }  
}

处理rebalance

当新消费者添加到消费者组或现有消费者离开消费者组时,kafka会重新平衡该组中的消费者。rebalance是kafka确保消费者组中的所有消费者不会消费同一分区的保证。

在sarama中,处理rebalance是通过 Setup 和CleanUp函数来完成的。

通过正确处理重新平衡事件,您可以确保应用程序正常处理消费者组的更改,例如消费者离开或加入,并且在这些事件期间不会丢失或处理两次消息。

译者注:其实更重要的是在ConsumeClaim函数在通道关闭时尽早退出,才能正确的进入CleanUp函数。

监控消费者

监控Kafka消费者对于确保系统的健康和性能至关重要,我们需要时刻关注延迟、处理时间和错误率的指标。

Golang没有内置对 Kafka 监控的支持,但有几个库和工具可以帮助我们。让我们看一下其中的一些:

Sarama的Metrics:Sarama 提供了一个指标注册表,它报告了有助于监控的各种指标,例如请求、响应的数量、请求和响应的大小等。这些指标可以使用 Prometheus 等监控系统来收集和监控。

JMX Exporter:如果您在 JVM 上运行 Kafka, 则可以使用 JMX Exporter 将kafka的 MBeans 发送给Prometheus

Kafka Exporter:Kafka Exporter是一个第三方工具,可以提供有关Kafka的更详细的指标。它可以提供消费者组延迟,这是消费kafka消息时要监控的关键指标。

Jaeger 或 OpenTelemetry:这些工具可用于分布式追踪,这有助于追踪消息如何流经系统以及可能出现瓶颈的位置。

日志:时刻关注应用程序日志,记录消费者中的任何错误或异常行为。这些日志可以帮助我们诊断问题。

消费者组命令, 可以使用kafka-consumer-groups命令来描述消费者组的状态。

请记住,不仅要追踪这些指标,还要针对任何需要关注的场景设置警报。通过这些方法,我们可以在问题还在初始阶段时快速做出响应。

以上工作有助于确保使用kafka的应用程序健壮、可靠且高效。

审核编辑:汤梓红

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • 数据
    +关注

    关注

    8

    文章

    7030

    浏览量

    89038
  • 参数
    +关注

    关注

    11

    文章

    1834

    浏览量

    32224
  • kafka
    +关注

    关注

    0

    文章

    51

    浏览量

    5222

原文标题:golang中使用kafka的综合指南

文章出处:【微信号:magedu-Linux,微信公众号:马哥Linux运维】欢迎添加关注!文章转载请注明出处。

收藏 人收藏

    评论

    相关推荐

    Kafka读取数据操作指南

    Kafka消费者——从 Kafka读取数据
    发表于 09-16 06:42

    浅析kafka

    kafka常见问题
    发表于 09-29 10:09

    基于发布与订阅的消息系统Kafka

    Kafka权威指南》——初识 Kafka
    发表于 03-05 13:46

    Kafka基础入门文档

    kafka系统入门教程(原理、配置、集群搭建、Java应用、Kafka-manager)
    发表于 03-12 07:22

    Kafka集群环境的搭建

    1、环境版本版本:kafka2.11,zookeeper3.4注意:这里zookeeper3.4也是基于集群模式部署。2、解压重命名tar -zxvf
    发表于 01-05 17:55

    现代的服务端技术栈:Golang/Protobuf/gRPC详解

    Golang又称Go语言,是一个开源的、多用途的编程语言,由Google研发,并由于种种原因,正在日益流行。Golang已经有10年的历史,并且据Google称已经在生产环境中使用了接近7年的时间,这一点可能让大多数人大跌眼镜。
    的头像 发表于 12-25 17:32 1142次阅读

    Kafka的概念及Kafka的宕机

    问题要从一次Kafka的宕机开始说起。 笔者所在的是一家金融科技公司,但公司内部并没有采用在金融支付领域更为流行的 RabbitMQ ,而是采用了设计之初就为日志处理而生的 Kafka ,所以我一直
    的头像 发表于 08-27 11:21 2105次阅读
    <b class='flag-5'>Kafka</b>的概念及<b class='flag-5'>Kafka</b>的宕机

    初探Golang内联

    今天我们来聊聊 Golang 中的内联。
    的头像 发表于 12-13 09:51 943次阅读

    GoLang的安装和使用

    GoLang的安装和使用
    的头像 发表于 01-13 14:06 1275次阅读
    <b class='flag-5'>GoLang</b>的安装和使用

    Kafka 的简介

      1 kafka简介 2 为什么要用消息系统 3 kafka基础知识 4 kafka集群架构 5 总结   1 kafka简介 其主要设计目标如下: 以时间复杂度为O(1)的方式提供
    的头像 发表于 07-03 11:10 614次阅读
    <b class='flag-5'>Kafka</b> 的简介

    物通博联5G-kafka工业网关实现kafka协议对接到云平台

    Kafka协议是一种基于TCP层的网络协议,用于在分布式消息传递系统Apache Kafka中发送和接收消息。Kafka协议定义了客户端和服务器之间的通信方式和数据格式,允许客户端发送消息到K
    的头像 发表于 07-11 10:44 519次阅读

    Spring Kafka的各种用法

    最近业务上用到了Spring Kafka,所以系统性的探索了下Spring Kafka的各种用法,发现了很多实用的特性,下面介绍下Spring Kafka的消息重试机制。 0. 前言 原生
    的头像 发表于 09-25 17:04 1012次阅读

    Kafka架构技术:Kafka的架构和客户端API设计

    Kafka 给自己的定位是事件流平台(event stream platform)。因此在消息队列中经常使用的 "消息"一词,在 Kafka 中被称为 "事件"。
    的头像 发表于 10-10 15:41 2381次阅读
    <b class='flag-5'>Kafka</b>架构技术:<b class='flag-5'>Kafka</b>的架构和客户端API设计

    kafka相关命令详解

    kafka常用命令详解
    的头像 发表于 10-20 11:34 943次阅读

    kafka基本原理详解

    今天浩道跟大家分享一篇关于kafka相关原理的硬核干货,可以说即使你没有接触过kafka,也可以秒懂,一起看看!
    的头像 发表于 01-03 09:57 895次阅读
    <b class='flag-5'>kafka</b>基本原理详解