kafka java项目测试使用

引入依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>1.1.0</version>
</dependency>

生产者

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;

public class Producer {

    KafkaProducer<String, String> KafkaProducer;

    public Producer() {
        Properties map = new Properties();
        map.put("bootstrap.servers", "192.168.91.128:9092");
        map.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        map.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer = new KafkaProducer<>(map);
    }

    int i = 0;
    String msg = "bb hh ";

    public void produce() {
        ProducerRecord<String, String> record = new ProducerRecord<String, String>("eagle", msg + i);
        i++;
        try {
            KafkaProducer.send(record, (recordMetadata, e) -> System.out.println("send success"));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        Producer producer = new Producer();
        while (true) {
            producer.produce();
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

消费者

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Collections;
import java.util.Properties;

public class Consumer {
    KafkaConsumer<String, String> kafkaConsumer;

    public Consumer() {
        Properties map = new Properties();
        //map.put("bootstrap.servers", "59.111.60.130:9092,59.111.60.126:9092,59.111.60.127:9092");
        map.put("bootstrap.servers", "192.168.91.128:9092");
        map.put("group.id", "local-test-1");
        map.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        map.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        kafkaConsumer = new KafkaConsumer<>(map);
        kafkaConsumer.subscribe(Collections.singleton("eagle"));
    }

    public void consumer() {
        System.out.println("wait for consume...");
        try {
            while (true) {
                ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("key = " + record.key());
                    System.out.println("value = " + record.value());
                    System.out.println("partition = " + record.partition());
                    System.out.println("topic = " + record.topic());
                    System.out.println("offset = " + record.offset());
                    System.out.println("timestamp = " + record.timestamp());
                    System.out.println();
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        Consumer consumer = new Consumer();
        consumer.consumer();
    }
}

如果连接kafka超时:

修改kafka的config/server.properties文件中的如下内容:

advertised.listeners=PLAINTEXT://IP地址:9092

使用ifconfig或者ip a指令获取安装机器的ip地址,加入获取到的测试机器的IP地址为192.168.91.128,就将上述位置的配置参数修改为如下的内容:

advertised.listeners=PLAINTEXT://192.168.91.128:9092

修改完成后保存退出,并重新启动zk和kafka。

参考:https://www.jianshu.com/p/94349568533c