Apache Kafka - KIP-42: Add Producer and Consumer Interceptors

kafka 0.10.0.0 released

Interceptors的概念应该来自flume

参考,http://blog.csdn.net/xiao_jun_0820/article/details/38111305

比如,flume提供的

Timestamp Interceptor

Host Interceptor

Static Interceptor

Regex Filtering Interceptor

Regex Extractor Interceptor

可以对于流过的message进行一些包装,比如插入时间,host,或做些过滤等etl操作

所以kafka在producer和consumer端也都提供这样的Interceptors接口,

ProducerInterceptor

/**
 * A plugin interface to allow things to intercept events happening to a producer record,
 * such as sending producer record or getting an acknowledgement when a record gets published
 */
public interface ProducerInterceptor<K, V> extends Configurable {
    /**
     * This is called when client sends record to KafkaProducer, before key and value gets serialized.
     * @param record the record from client
     * @return record that is either original record passed to this method or new record with modified key and value.
     */
    public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
 
    /**
     * This is called when the send has been acknowledged
     * @param metadata The metadata for the record that was sent (i.e. the partition and offset). The metadata information may be only partially filled, if an error occurred. Topic will be always set, and if partition is not -1, partition will be set partition set/assigned to this record.
     * @param exception The exception thrown during processing of this record. Null if no error occurred.
     */
    public void onAcknowledgement(RecordMetadata metadata, Exception exception);
   
    /**
     * This is called when interceptor is closed
     */
    public void close();
}

onSend() will be called in KafkaProducer.send(), before key and value gets serialized and before partition gets assigned.

If the implementation modifies key and/or value, it must return modified key and value in a new ProducerRecord object.

onAcknowledgement() will be called when the send is acknowledged. It has same API as Callback.onCompletion(), and is called just before Callback.onCompletion() is called.

多个multiple interceptors之间是可以串联的

ProducerInterceptor APIs will be called from multiple threads: onSend() will be called on submitting thread and onAcknowledgement() will be called on producer I/O thread.

ConsumerInterceptor

/**
 * A plugin interface to allow things to intercept Consumer events such as receiving a record or record being consumed
 * by a client.
 */
public interface ConsumerInterceptor<K, V> extends Configurable {
    /**
     * This is called when the records are about to be returned to the client.
     * @param records records to be consumed by the client. Null if record dropped/ignored/discarded (non consumable)
     * @return records that is either original 'records' passed to this method or modified set of records
     */
    public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
   
    /**
     * This is called when offsets get committed
     * This method will be called when the commit request sent to the server has been acknowledged.
     * @param offsets A map of the offsets and associated metadata that this callback applies to
     */
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
   
    /**
     * This is called when interceptor is closed
     */
    public void close();
}

onConsume() will be called in KafkaConsumer.poll(), just before poll() returns ConsumerRecords.

onCommit() will be called when offsets get committed: just before OffsetCommitCallback.onCompletion() is called and in ConsumerCoordinator.commitOffsetsSync() on successful commit.

Since new consumer is single-threaded, ConsumerInterceptor API will be called from a single thread.

总结,

Interceptor作为一种plugin可以做些,对message的decorate或cleaning或filtering等一些轻量的工作,最主要的用途还是用于监控,trace message

Interceptor可以串联执行

Interceptor必须要轻量,因为如果耗时就会影响链路的throughput

confluent公司也提供相应的interceptor产品,用于data stream的监控

http://docs.confluent.io/3.0.0/control-center/docs/clients.html

同时,为了更好的监控和audit

Currently, RecordMetadata contains topic/partition, offset, and timestamp (KIP-32).

We propose to add remaining record's metadata in RecordMetadata: checksum and record size. Both checksum and record size are useful for monitoring and audit.

For symmetry, we also propose to expose the same metadata on consumer side and make available to interceptors.

We will add checksum and record size fields to RecordMetadata and ConsumerRecord.

publicfinalclassRecordMetadata {

privatefinallongoffset;

privatefinalTopicPartition topicPartition;

privatefinallongchecksum; <<== NEW: checksum of the record

privatefinalintsize; <<== NEW: record size in bytes(before compression)

publicfinalclassConsumerRecord<K, V> {

.......

privatefinallongchecksum; <<== NEW: checksum of the record

privatefinalintsize; <<== NEW: record size in bytes (after decompression)