kafka connector 使用总结以及自定义connector开发

2019年11月26日 阅读数:113
这篇文章主要向大家介绍kafka connector 使用总结以及自定义connector开发,主要内容包括基础应用、实用技巧、原理机制等方面,希望对大家有所帮助。

Kafaka connect 是一种用于在Kafka和其余系统之间可扩展的、可靠的流式传输数据的工具。它使得可以快速定义将大量数据集合移入和移出Kafka的链接器变得简单。Kafka Connect能够从数据库或应用程序服务器收集数据到Kafka topic,使数据可用于低延迟的流处理。导出做业能够将数据从Kafka topic传输到二次存储和查询系统,或者传递到批处理系统以进行离线分析。html

Kafaka connect的核心组件:
Source:负责将外部数据写入到kafka的topic中。
Sink:负责从kafka中读取数据到本身须要的地方去,好比读取到HDFS,hbase等。java


Connectors :经过管理任务来协调数据流的高级抽象
Tasks:数据写入kafk和从kafka中读出数据的具体实现,source和sink使用时都须要Taskgit

Workers:运行connectors和tasks的进程github

Converters:kafka connect和其余存储系统直接发送或者接受数据之间转换数据,mongodb

converter会把bytes数据转换成kafka connect内部的格式,也能够把kafka connect内部存储格式的数据转变成bytes,converter对connector来讲是解耦的,因此其余的connector均可以重用,例如,使用了avro converter,那么jdbc connector能够写avro格式的数据到kafka,固然,hdfs connector也能够从kafka中读出avro格式的数据。数据库

 

Transforms:一种轻量级数据调整的工具
Kafka connect 工做模式:
Kafka connect 有两种工做模式:
standalone:在standalone模式中,全部的worker都在一个独立的进程中完成。
distributed:distributed模式具备高扩展性,以及提供自动容错机制。你可使用一个group.ip来启动不少worker进程,在有效的worker进程中它们会自动的去协调执行connector和task,若是你新加了一个worker或者挂了一个worker,其余的worker会检测到而后在从新分配connector和task。express

本文做者:张永清,转载请注明出处:https://www.cnblogs.com/laoqing/p/11927958.html apache

在分布式模式下经过rest api来管理connector。
connector的常见管理操做API:json

GET /connectors – 返回全部正在运行的connector名。
POST /connectors – 新建一个connector; 请求体必须是json格式而且须要包含name字段和config字段,name是connector的名字,config是json格式,必须包含你的connector的配置信息。
GET /connectors/{name} – 获取指定connetor的信息。
GET /connectors/{name}/config – 获取指定connector的配置信息。
PUT /connectors/{name}/config – 更新指定connector的配置信息。
GET /connectors/{name}/status – 获取指定connector的状态,包括它是否在运行、中止、或者失败,若是发生错误,还会列出错误的具体信息。
GET /connectors/{name}/tasks – 获取指定connector正在运行的task。
GET /connectors/{name}/tasks/{taskid}/status – 获取指定connector的task的状态信息。
PUT /connectors/{name}/pause – 暂停connector和它的task,中止数据处理知道它被恢复。
PUT /connectors/{name}/resume – 恢复一个被暂停的connector。
POST /connectors/{name}/restart – 重启一个connector,尤为是在一个connector运行失败的状况下比较经常使用
POST /connectors/{name}/tasks/{taskId}/restart – 重启一个task,通常是由于它运行失败才这样作。
DELETE /connectors/{name} – 删除一个connector,中止它的全部task并删除配置。

如何开发本身的Connector:bootstrap

一、引入maven依赖。

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>connect-api</artifactId>
            <version>${kafka.version}</version>
        </dependency>

二、开发自定义的Source

开发自定义的Source 须要继承实现SourceConnector和SourceTask这两个抽象类,实现抽象类中的未实现的方法或者重写抽象类中的方法。

本文做者:张永清,转载请注明出处:https://www.cnblogs.com/laoqing/p/11927958.html 

A、开发自定义的SourceConnector

/**
 *
 */
public class ExampleSourceConnector extends SourceConnector{
    @Override
    public void start(Map<String, String> map) {

    }
    //返回须要指定的TASK
    @Override
    public Class<? extends Task> taskClass() {
        return ExampleSourceTask.class;
    }
    //TASK的配置
    @Override
    public List<Map<String, String>> taskConfigs(int i) {
        return null;
    }

    @Override
    public void stop() {

    }

    @Override
    public ConfigDef config() {
        return null;
    }

    @Override
    public String version() {
        return AppInfoParser.getVersion();
    }
}

B、开发Source对应的Task

public class ExampleSourceTask extends SourceTask {
    @Override
    public String version() {
        return new ExampleSourceConnector().version();
    }
    //任务启动
    @Override
    public void start(Map<String, String> map) {

    }
    //须要发送到kafka的数据。
    @Override
    public List<SourceRecord> poll() throws InterruptedException {
        return null;
    }
    //任务中止
    @Override
    public void stop() {

    }
}

三、开发自定义的Sink

  开发自定义的Sink 须要继承实现SinkConnector和SinkTask这两个抽象类,实现抽象类中的未实现的方法或者重写抽象类中的方法。

A、开发自定义的SinkConnector

/**
 *
 */
public class ExampleSinkConnector extends SinkConnector{
    @Override
    public void start(Map<String, String> map) {

    }
    //指定Task执行的类
    @Override
    public Class<? extends Task> taskClass() {
        return ExampleSinkTask.class;
    }
    //task对应的config
    @Override
    public List<Map<String, String>> taskConfigs(int i) {
        return null;
    }

    @Override
    public void stop() {

    }
    //配置定义
    @Override
    public ConfigDef config() {
        return null;
    }

    @Override
    public String version() {
        return AppInfoParser.getVersion();
    }
}

B、开发Sink对应的Task  

/**
 *
 */
public class ExampleSinkTask extends SinkTask {
    @Override
    public String version() {
        return new ExampleSinkConnector().version();
    }
    //task启动
    @Override
    public void start(Map<String, String> map) {

    }
    //数据put
    @Override
    public void put(Collection<SinkRecord> collection) {

    }
    @Override
    public void flush(Map<TopicPartition, OffsetAndMetadata> offsets){
    //Task中止。
    }
    @Override
    public void stop() {

    }
}

Kafka Connect Configs

 开源的实现的比较好的connector项目:

https://github.com/debezium/debezium 

https://github.com/confluentinc

 

 

 https://docs.confluent.io/current/connect/managing/connectors.html

 

 这里咱们以https://github.com/debezium/debezium 中的debezium-connector-mongodb 为例配置connector的standalone模式运行

从github中获取debezium-connector-mongodb-0.9.5.Final.jar 包,放到kafka的libs目录下,而且把mongodb相关的jar包一块儿放入到libs下。

在config目录下新建对应的mongodb.properties 属性配置文件

name=mongodb
connector.class=io.debezium.connector.mongodb.MongoDbConnector
mongodb.hosts=configs/10.100.xx.xx:27017
tasks.max=1
mongodb.name=mongo-test
#mongodb.user=root
#mongodb.password=123456
database.whitelist=kafkaTest
collection.whitelist=kafkaTest.kafkaTest
connect.max.attempts=12
max.queue.size=8192
max.batch.size=2048
poll.interval.ms=1000
connect.backoff.initial.delay.ms=1000
connect.backoff.max.delay.ms=2000
mongodb.ssl.enabled=false
mongodb.ssl.invalid.hostname.allowed=false
snapshot.mode=initial
initial.sync.max.threads=2
tombstones.on.delete=true
mongodb.members.auto.discover=true
source.struct.version=v2

 配置解释以下:

详情参考:https://debezium.io/documentation/reference/0.10/connectors/mongodb.html

https://docs.confluent.io/current/connect/debezium-connect-mongodb/mongodb_source_connector_config.html

Property Default Description

name

 

Unique name for the connector. Attempting to register again with the same name will fail. (This property is required by all Kafka Connect connectors.)

connector.class

 

The name of the Java class for the connector. Always use a value of io.debezium.connector.mongodb.MongoDbConnector for the MongoDB connector.

mongodb.hosts

 

The comma-separated list of hostname and port pairs (in the form 'host' or 'host:port') of the MongoDB servers in the replica set. The list can contain a single hostname and port pair. If mongodb.members.auto.discover is set to false, then the host and port pair should be prefixed with the replica set name (e.g., rs0/localhost:27017).

mongodb.name

 

A unique name that identifies the connector and/or MongoDB replica set or sharded cluster that this connector monitors. Each server should be monitored by at most one Debezium connector, since this server name prefixes all persisted Kafka topics emanating from the MongoDB replica set or cluster.

mongodb.user

 

Name of the database user to be used when connecting to MongoDB. This is required only when MongoDB is configured to use authentication.

mongodb.password

 

Password to be used when connecting to MongoDB. This is required only when MongoDB is configured to use authentication.

mongodb.ssl.enabled

false

Connector will use SSL to connect to MongoDB instances.

mongodb.ssl.invalid.hostname.allowed

false

When SSL is enabled this setting controls whether strict hostname checking is disabled during connection phase. If true the connection will not prevent man-in-the-middle attacks.

database.whitelist

empty string

An optional comma-separated list of regular expressions that match database names to be monitored; any database name not included in the whitelist will be excluded from monitoring. By default all databases will be monitored. May not be used with database.blacklist.

database.blacklist

empty string

An optional comma-separated list of regular expressions that match database names to be excluded from monitoring; any database name not included in the blacklist will be monitored. May not be used with database.whitelist.

collection.whitelist

empty string

An optional comma-separated list of regular expressions that match fully-qualified namespaces for MongoDB collections to be monitored; any collection not included in the whitelist will be excluded from monitoring. Each identifier is of the form databaseName.collectionName. By default the connector will monitor all collections except those in the local and admin databases. May not be used with collection.blacklist.

collection.blacklist

empty string

An optional comma-separated list of regular expressions that match fully-qualified namespaces for MongoDB collections to be excluded from monitoring; any collection not included in the blacklist will be monitored. Each identifier is of the form databaseName.collectionName. May not be used with collection.whitelist.

snapshot.mode

initial

Specifies the criteria for running a snapshot (eg. initial sync) upon startup of the connector. The default is initial, and specifies the connector reads a snapshot when either no offset is found or if the oplog no longer contains the previous offset. The never option specifies that the connector should never use snapshots, instead the connector should proceed to tail the log.

field.blacklist

empty string

An optional comma-separated list of the fully-qualified names of fields that should be excluded from change event message values. Fully-qualified names for fields are of the form databaseName.collectionName.fieldName.nestedFieldName, where databaseName and collectionName may contain the wildcard (*) which matches any characters.

field.renames

empty string

An optional comma-separated list of the fully-qualified replacements of fields that should be used to rename fields in change event message values. Fully-qualified replacements for fields are of the form databaseName.collectionName.fieldName.nestedFieldName:newNestedFieldName, where databaseName and collectionName may contain the wildcard (*) which matches any characters, the colon character (:) is used to determine rename mapping of field. The next field replacement is applied to the result of the previous field replacement in the list, so keep this in mind when renaming multiple fields that are in the same path.

tasks.max

1

The maximum number of tasks that should be created for this connector. The MongoDB connector will attempt to use a separate task for each replica set, so the default is acceptable when using the connector with a single MongoDB replica set. When using the connector with a MongoDB sharded cluster, we recommend specifying a value that is equal to or more than the number of shards in the cluster, so that the work for each replica set can be distributed by Kafka Connect.

initial.sync.max.threads

1

Positive integer value that specifies the maximum number of threads used to perform an intial sync of the collections in a replica set. Defaults to 1.

tombstones.on.delete

true

Controls whether a tombstone event should be generated after a delete event.
When true the delete operations are represented by a delete event and a subsequent tombstone event. When false only a delete event is sent.
Emitting the tombstone event (the default behavior) allows Kafka to completely delete all events pertaining to the given key once the source record got deleted.

snapshot.delay.ms

 

An interval in milli-seconds that the connector should wait before taking a snapshot after starting up;
Can be used to avoid snapshot interruptions when starting multiple connectors in a cluster, which may cause re-balancing of connectors.

snapshot.fetch.size

0

Specifies the maximum number of documents that should be read in one go from each collection while taking a snapshot. The connector will read the collection contents in multiple batches of this size.
Defaults to 0, which indicates that the server chooses an appropriate fetch size.

The following advanced configuration properties have good defaults that will work in most situations and therefore rarely need to be specified in the connector’s configuration.

Property Default Description

max.queue.size

8192

Positive integer value that specifies the maximum size of the blocking queue into which change events read from the database log are placed before they are written to Kafka. This queue can provide backpressure to the oplog reader when, for example, writes to Kafka are slower or if Kafka is not available. Events that appear in the queue are not included in the offsets periodically recorded by this connector. Defaults to 8192, and should always be larger than the maximum batch size specified in the max.batch.size property.

max.batch.size

2048

Positive integer value that specifies the maximum size of each batch of events that should be processed during each iteration of this connector. Defaults to 2048.

poll.interval.ms

1000

Positive integer value that specifies the number of milliseconds the connector should wait during each iteration for new change events to appear. Defaults to 1000 milliseconds, or 1 second.

connect.backoff.initial.delay.ms

1000

Positive integer value that specifies the initial delay when trying to reconnect to a primary after the first failed connection attempt or when no primary is available. Defaults to 1 second (1000 ms).

connect.backoff.max.delay.ms

1000

Positive integer value that specifies the maximum delay when trying to reconnect to a primary after repeated failed connection attempts or when no primary is available. Defaults to 120 seconds (120,000 ms).

connect.max.attempts

16

Positive integer value that specifies the maximum number of failed connection attempts to a replica set primary before an exception occurs and task is aborted. Defaults to 16, which with the defaults for connect.backoff.initial.delay.ms and connect.backoff.max.delay.msresults in just over 20 minutes of attempts before failing.

mongodb.members.auto.discover

true

Boolean value that specifies whether the addresses in 'mongodb.hosts' are seeds that should be used to discover all members of the cluster or replica set (true), or whether the address(es) in mongodb.hosts should be used as is (false). The default is true and should be used in all cases except where MongoDB is fronted by a proxy.

source.struct.version

v2

Schema version for the source block in CDC events; Debezium 0.10 introduced a few breaking
changes to the structure of the source block in order to unify the exposed structure across all the connectors.
By setting this option to v1 the structure used in earlier versions can be produced. Note that this setting is not recommended and is planned for removal in a future Debezium version.

heartbeat.interval.ms

0

Controls how frequently heartbeat messages are sent.
This property contains an interval in milli-seconds that defines how frequently the connector sends messages into a heartbeat topic. This can be used to monitor whether the connector is still receiving change events from the database. You also should leverage heartbeat messages in cases where only records in non-captured collections are changed for a longer period of time. In such situation the connector would proceed to read the oplog from the database but never emit any change messages into Kafka, which in turn means that no offset updates will be committed to Kafka. This will cause the oplog files to be rotated out but connector will not notice it so on restart some events are no longer available which leads to the need of re-execution of the initial snapshot.

Set this parameter to 0 to not send heartbeat messages at all.
Disabled by default.

heartbeat.topics.prefix

__debezium-heartbeat

Controls the naming of the topic to which heartbeat messages are sent.
The topic is named according to the pattern <heartbeat.topics.prefix>.<server.name>.

sanitize.field.names

true when connector configuration explicitly specifies the key.converter or value.converterparameters to use Avro, otherwise defaults to false.

Whether field names will be sanitized to adhere to Avro naming requirements. See Avro namingfor more details.

这里以standalone的模式运行,在connect-standalone.properties中作以下配置:

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=localhost:9092
rest.port=9093
# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=false
value.converter.schemas.enable=false
rest.host.name=0.0.0.0
offset.storage.file.filename=/data4/kafka/connect/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000

# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Note: symlinks will be followed to discover dependencies or plugins.
# Examples:
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/opt/kafka/kafka_2.11-2.0.0/plugin

standalone模式下启动方式以下:

bin/connect-standalone.sh config/connect-standalone.properties connector1.properties[connector2.properties ...]   一次能够启动多个connector,只须要在参数中加上connector的配置文件路径便可。

例如:connect-standalone.sh config/connect-standalone.properties mongodb.properties

distribute模式部署:

一、修改配置connect-distributed.properties

# broker列表
bootstrap.servers=10.120.241.1:9200
 
# 同一集群中group.id须要配置一致,且不能和别的消费者同名
group.id=connect-cluster
 
# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# 使用json数据一样配置成false
key.converter.schemas.enable=false
value.converter.schemas.enable=false
····

二、手动建立集群模式所必须的kafka的几个topic

# config.storage.topic=connect-configs
$ bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-configs --replication-factor 3 --partitions 1 --config cleanup.policy=compact
 
# offset.storage.topic=connect-offsets
$ bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-offsets --replication-factor 3 --partitions 50 --config cleanup.policy=compact
 
# status.storage.topic=connect-status
$ $ bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-status --replication-factor 3 --partitions 10 --config cleanup.policy=compact
  • config.storage.topic:topic用于存储connector和任务配置;注意,这应该是一个单个的partition,多副本的topic
  • offset.storage.topic:用于存储offsets;这个topic应该配置多个partition和副本。
  • status.storage.topic:用于存储状态;这个topic 能够有多个partitions和副本

三、 启动worker

启动distributed模式命令以下:

./bin/connect-distributed ./etc/kafka/connect-distributed.properties   

四、使用restful启动connect
curl 'http://localhost:8083/connectors' -X POST -i -H "Content-Type:application/json" -d   
    '{ "name":"elasticsearch-sink",  
       "config":{"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",  
                "tasks.max":10,  
                "topics":"estest1012",  
                "key.ignore":true,  
                "schema.ignore":true,  
                "connection.url":"http://10.120.241.194:9200",  
                "type.name":"kafka-connect"}  
    }' 

常见问题:

一、在启动的过程当中出现各类各样的java.lang.ClassNotFoundException。

在启动connector的时候,一开始老是会报各个各样的ClassNotFoundException,不是这个包就是那个包,查找问题一直说要么缺乏包要么是包冲突,那么要排除依赖冲突或者看下是否是少了jar包。

二、在connector.properties中的key.converter.schemas.enable=false和value.converter.schemas.enable=false的问题。

这个选项默认在connect-standalone.properties中是true的,这个时候发送给topic的Json格式是须要使用avro格式。例如:
{
    "schema": {
        "type": "struct",
        "fields": [{
            "type": "int32",
            "optional": true,
            "field": "c1"
        }, {
            "type": "string",
            "optional": true,
            "field": "c2"
        }, {
            "type": "int64",
            "optional": false,
            "name": "org.apache.kafka.connect.data.Timestamp",
            "version": 1,
            "field": "create_ts"
        }, {
            "type": "int64",
            "optional": false,
            "name": "org.apache.kafka.connect.data.Timestamp",
            "version": 1,
            "field": "update_ts"
        }],
        "optional": false,
        "name": "foobar"
    },
    "payload": {
        "c1": 10000,
        "c2": "bar",
        "create_ts": 1501834166000,
        "update_ts": 1501834166000
    }
}

若是想发送普通的json格式而不是avro格式的话,很简单key.converter.schemas.enable和value.converter.schemas.enable设置为false就行。这样就能发送普通的json格式数据。