kafka go producer 启动基本配置

  1.官网上下载kafka安装包:http://kafka.apache.org/downloads.html

  2.执行命令运行zookeeper 实例(单点):

   bin/zookeeper-server-start.sh config/zookeeper.properties


3. 启动kafka broker 服务:


bin/kafka-server-start.sh config/server.properties


其中的
server.properties 有些配置需要修改:


listeners=PLAINTEXT://hostName:9092


如果是远程producer,hostname设置为ip,这样远程机器无需设置host.


log.dir 是broker的日志地址。


4.在使用go的客户端 Shopify/sarama 包的操作过程:


(1) go get "github.com/Shopify/sarama"


(2) 修改config 中的配置:


c.Version = V0_10_0_0 //使用的是kafka 0.10.0.0的版本


(3) producer测试代码如下:


package main

import (
        "github.com/Shopify/sarama"
        "log"
        "os"
        "strings"
)

var logger = log.New(os.Stderr, "[TEST]", log.LstdFlags)

func main(){
        sarama.Logger = logger
        
        config := sarama.NewConfig()
        config.ClientID = "newsDataSource"
        config.Producer.RequiredAcks = sarama.WaitForAll
        config.Producer.Partitioner = sarama.NewRandomPartitioner
        
        msg := &sarama.ProducerMessage{}
        msg.Topic = "hello"
        msg.Partition = int32(-1)
        msg.Key = sarama.StringEncoder("key")
        msg.Value = sarama.ByteEncoder("hello")
        
        producer, err := sarama.NewSyncProducer(strings.Split("localhost:9092", ","), config)
        if err != nil {
                logger.Printf("Failed to produce message :%s", err )
                os.Exit(500)
        }
        
        defer producer.Close()
        
        partition, offset, err := producer.SendMessage(msg)
        if err != nil {
                logger.Printf("Failed to produce message :%s", err )
        }
        logger.Printf("partition:%d, offset: %d\n", partition, offset )
}