go 操作 kafka

import "github.com/Shopify/sarama"

  saram 使用纯go语言编写。用于处理Apache Kafka(0.8及更高版本)的纯Go客户端库. 它包括一个易于生成和使用消息的高级API,以及一个在高级API不足时控制线路上字节的低级API. 内嵌提供了高级API的用法示例及其完整文档.

生产者:

有几种类型的生产者:

  sarama.NewSyncProducer() //同步发送者

  sarama.NewAsyncProducer() //异步发送者

同步模式:

func main() {
   config := sarama.NewConfig()  //实例化个sarama的Config
   config.Producer.Return.Successes = true  //是否开启消息发送成功后通知 successes channel
   config.Producer.Partitioner = sarama.NewRandomPartitioner //随机分区器
   client,err := sarama.NewClient([]string{"10.180.18.60:9092"}, config) //初始化客户端
   defer client.Close()
   if err != nil {panic(err)}
   producer,err := sarama.NewSyncProducerFromClient(client)
   if err!=nil {panic(err)}
   partition, offset , err := producer.SendMessage(&sarama.ProducerMessage{Topic: "liangtian_topic", Key: nil, Value: sarama.StringEncoder("hahaha")})
   if err != nil {
      log.Fatalf("unable to produce message: %q", err)
   }
   fmt.Println("partition",partition)
   fmt.Println("offset",offset)
}

  

异步模式:

  异步模式,顾名思义就是produce一个message之后不等待发送完成返回;这样调用者可以继续做其他的工作。

 config := sarama.NewConfig()
    // config.Producer.Return.Successes = true
    client, err := sarama.NewClient([]{"localhost:9092"}, config)
    if err != nil {
        log.Fatalf("unable to create kafka client: %q", err)
    }

    producer, err := sarama.NewAsyncProducerFromClient
    if err != nil {
        log.Fatalf("unable to create kafka producer: %q", err)
    }
    defer producer.Close()

    text := fmt.Sprintf("message %08d", i)
    producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(text)}
    // wait response
    select {
            //case msg := <-producer.Successes():
            //    log.Printf("Produced message successes: [%s]\n",msg.Value)
            case err := <-producer.Errors():
                log.Println("Produced message failure: ", err)
            default:
                log.Println("Produced message default",)
    }
    ...

  

关于异步producer有一个地方需要注意的。

  1. 异步模式produce一个消息后,缺省并不会报告成功状态。
config.Producer.Return.Successes = false
...
producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(text)}
log.Printf("Produced message: [%s]\n",text)

// wait response
select {
    case msg := <-producer.Successes():
        log.Printf("Produced message successes: [%s]\n",msg.Value)
    case err := <-producer.Errors():
        log.Println("Produced message failure: ", err)
}

  

则这段代码会挂住,因为设置没有要求返回成功config.Producer.Return.Successes = false,那么在select等待的时候producer.Successes()不会返回,producer.Errors()也不会返回(假设没有错误发生),就挂在这儿。当然可以加一个default分支绕过去,就不会挂住了:

select {
    case msg := <-producer.Successes():
        log.Printf("Produced message successes: [%s]\n",msg.Value)
    case err := <-producer.Errors():
        log.Println("Produced message failure: ", err)
    default:
        log.Println("Produced message default")
}
  1. 如果打开了Return.Successes配置,则上述代码段等同于同步方式
config.Producer.Return.Successes = true
...
producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(text)}
log.Printf("Produced message: [%s]\n",text)

// wait response
select {
    case msg := <-producer.Successes():
        log.Printf("Produced message successes: [%s]\n",msg.Value)
    case err := <-producer.Errors():
        log.Println("Produced message failure: ", err)
}

  

从log可以看到,每发送一条消息,收到一条Return.Successes,类似于:

2018/12/25 08:51:51 Produced message: [message 00002537]
2018/12/25 08:51:51 Produced message successes: [message 00002537]
2018/12/25 08:51:51 Produced message: [message 00002538]
2018/12/25 08:51:51 Produced message successes: [message 00002538]
2018/12/25 08:51:51 Produced message: [message 00002539]
2018/12/25 08:51:51 Produced message successes: [message 00002539]
2018/12/25 08:51:51 Produced message: [message 00002540]
2018/12/25 08:51:51 Produced message successes: [message 00002540]
2018/12/25 08:51:51 Produced message: [message 00002541]
2018/12/25 08:51:51 Produced message successes: [message 00002541]
2018/12/25 08:51:51 Produced message: [message 00002542]
2018/12/25 08:51:51 Produced message successes: [message 00002542]
2018/12/25 08:51:51 Produced message: [message 00002543]
2018/12/25 08:51:51 Produced message successes: [message 00002543]
...

就像是同步produce一样的行为了。

  1. 如果打开了Return.Successes配置,而又没有producer.Successes()提取,那么Successes()这个chan消息会被写满。
config.Producer.Return.Successes = true
...
log.Printf("Reade to Produced message: [%s]\n",text)
producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(text)}
log.Printf("Produced message: [%s]\n",text)

// wait response
select {
    //case msg := <-producer.Successes():
    //    log.Printf("Produced message successes: [%s]\n",msg.Value)
    case err := <-producer.Errors():
        log.Println("Produced message failure: ", err)
    default:
        log.Println("Produced message default",)
}

写满的结果就是不能再写入了,导致后面的Return.Successes消息丢失, 而且producer也会挂住,因为共享的buffer被占满了,大量的Return.Successes没有被消耗掉。

运行一段时间后:

2018/12/25 08:58:24 Reade to Produced message: [message 00000603]
2018/12/25 08:58:24 Produced message: [message 00000603]
2018/12/25 08:58:24 Produced message default
2018/12/25 08:58:24 Reade to Produced message: [message 00000604]
2018/12/25 08:58:24 Produced message: [message 00000604]
2018/12/25 08:58:24 Produced message default
2018/12/25 08:58:24 Reade to Produced message: [message 00000605]
2018/12/25 08:58:24 Produced message: [message 00000605]
2018/12/25 08:58:24 Produced message default
2018/12/25 08:58:24 Reade to Produced message: [message 00000606]
2018/12/25 08:58:24 Produced message: [message 00000606]
2018/12/25 08:58:24 Produced message default
2018/12/25 08:58:24 Reade to Produced message: [message 00000607]
2018/12/25 08:58:24 Produced message: [message 00000607]
2018/12/25 08:58:24 Produced message default
2018/12/25 08:58:24 Reade to Produced message: [message 00000608]

  

在produce第00000608个message的时候被挂住了,因为消息缓冲满了;这个缓冲的大小是可配的(可能是这个MaxRequestSize?),但是不管大小是多少,如果没有去提取Success消息最终都会被占满的。

结论就是说配置config.Producer.Return.Successes = true和操作<-producer.Successes()必须配套使用;配置成true,那么就要去读取Successes,如果配置成false,则不能去读取Successes。

消费者:

使用消费组消费:

每一个Topic的分区只能被一个消费组中的一个消费者所消费。一个消费者可以同时消费多个topic

 

type consumerGroupHandler struct{
   name string
}
func main1() {
   var wg sync.WaitGroup
   config := sarama.NewConfig()
   config.Consumer.Return.Errors = false
   config.Version = sarama.V0_10_2_0
   config.Consumer.Offsets.Initial = sarama.OffsetOldest
   client,err := sarama.NewClient([]string{"10.180.18.60:9092"}, config)
   defer client.Close()
   if err != nil {
      panic(err)
   }
   group1, err := sarama.NewConsumerGroupFromClient("c1", client)
   if err != nil {
      panic(err)
   }
   group2, err := sarama.NewConsumerGroupFromClient("c2", client)
   if err != nil {
      panic(err)
   }
   group3, err := sarama.NewConsumerGroupFromClient("c3", client)
   if err != nil {
      panic(err)
   }
   defer group1.Close()
   defer group2.Close()
   defer group3.Close()
   wg.Add(3)
   go consume(&group1,&wg,"c1")
   go consume(&group2,&wg,"c2")
   go consume(&group3,&wg,"c3")
   wg.Wait()
   signals := make(chan os.Signal, 1)
   signal.Notify(signals, os.Interrupt)
   select {
   case <-signals:
   }

}
func consume(group *sarama.ConsumerGroup,wg  *sync.WaitGroup, name string) {
   fmt.Println(name + "start")
   wg.Done()
   ctx := context.Background()
   for {
      //topic := []string{"tiantian_topic1","tiantian_topic2"} 可以消费多个topic
      topics := []string{"liangtian_topic"}
      handler := consumerGroupHandler{name: name}
      err := (*group).Consume(ctx, topics, handler)
      if err != nil {
         panic(err)
      }
   }
}
func (consumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error   { return nil }
func (consumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h consumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
   claim sarama.ConsumerGroupClaim) error {
   for msg := range claim.Messages() {
      fmt.Printf("%s Message topic:%q partition:%d offset:%d  value:%s\n",h.name, msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
      // 手动确认消息
      sess.MarkMessage(msg, "")
   }
   return nil
}

func handleErrors(group *sarama.ConsumerGroup,wg  *sync.WaitGroup ){
   wg.Done()
   for err := range (*group).Errors() {
      fmt.Println("ERROR", err)
   }
}

  

 

  普通消费者(我姑且这么说)。有些情况下我们有些消费者是没有消费组的,正常的消费者可自动分配分区到消费者并且组中消费者新增或删除会自动触发负载均衡的消费组。

  但在某些情况下,却想要更简单的东西。有时你知道你有一个单一的消费者总是需要从主题中的所有分区读取数据,或者从一个主题特定分区读取数据。在这种情况下没有理由需要组或负载均衡,只是订阅特定的主题或分区,偶尔使用消息和提交偏移量。

  但是有个注意的点。除了没有负载均衡以及需要手动查找分区,一切看起来都很正常。请记住,如果有人向主题添加新分区,则不会通知消费者。所以无论是处理通过定期检查consumer.partitionsFor()或者记住是否是管理员添加分区,应用程序将需要跳跃。还要注意的是消费者可以订阅的主题(成为一个消费组的一部分),或分配自己的分区,但不能同时实现。下面可以看看代码。一般不这么用。一般都用消费组+消费者

func main() {
   var wg sync.WaitGroup
   //创建消费者
   config := sarama.NewConfig()
   config.Consumer.Return.Errors = true
   client,err := sarama.NewClient([]string{"10.180.18.60:9092"}, config)
   defer client.Close()
   if err != nil {
      panic(err)
   }
   consumer, err := sarama.NewConsumerFromClient(client)
   defer consumer.Close()
   if err != nil {panic(err)}
   //设置分区
   partitionList, err :=  consumer.Partitions("liangtian_topic")
   if err != nil {
      fmt.Println("faild to get the list of partitions",err)
   }
   //[0 1 2]
   fmt.Println(partitionList)
   //循环读取分区
   for partition := range partitionList {
      pc, err := consumer.ConsumePartition("liangtian_topic", int32(partition), sarama.OffsetOldest)
      if err != nil {
         fmt.Printf("Failed to start consumer for partition %d: %s\n", partition, err)
         return
      }
      defer pc.AsyncClose()
      wg.Add(1)
      go func(pc sarama.PartitionConsumer) {
         defer wg.Done()
         for msg := range pc.Messages() {
            fmt.Printf("Partition:%d, Offset:%d, Key:%s, Value:%s", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
            fmt.Println()
         }
      }(pc)
   }
   //time.Sleep(time.Hour)
   wg.Wait()
   consumer.Close()
}