kafka-php

kafka-php的github地址 https://github.com/jacky5059/kafka-php

生产者produce示例代码

<?php
set_include_path(
    implode(PATH_SEPARATOR, array(
        realpath(__DIR__ . '/../lib'),
        get_include_path(),
    ))
);
require 'autoloader.php';
$host = 'localhost';
$port = 9092;
$topic = 'test';
$producer = new Kafka_Producer($host, $port, Kafka_Encoder::COMPRESSION_NONE);
$in = fopen('php://stdin', 'r');
while (true) {
    echo "\nEnter comma separated messages:\n";
    $messages = explode(',', fgets($in));
    foreach (array_keys($messages) as $k) {
        //$messages[$k] = trim($messages[$k]);
    }
    $bytes = $producer->send($messages, $topic);
    printf("\nSuccessfully sent %d messages (%d bytes)\n\n", count($messages), $bytes);
}

简单消费者simple consumer示例代码

<?php
set_include_path(
    implode(PATH_SEPARATOR, array(
        realpath(__DIR__ . '/../lib'),
        get_include_path(),
    ))
);
require 'autoloader.php';
$host = 'localhost';
$zkPort  = 2181; //zookeeper
$kPort   = 9092; //kafka server
$topic   = 'test';
$maxSize = 10000000;
$socketTimeout = 2;
$offset    = 0;
$partition = 0;
$nMessages = 0;
$consumer = new Kafka_SimpleConsumer($host, $kPort, $socketTimeout, $maxSize);
while (true) {
    try {
        //create a fetch request for topic "test", partition 0, current offset and fetch size of 1MB
        $fetchRequest = new Kafka_FetchRequest($topic, $partition, $offset, $maxSize);
        //get the message set from the consumer and print them out
        $partialOffset = 0;
        $messages = $consumer->fetch($fetchRequest);
        foreach ($messages as $msg) {
            ++$nMessages;
            echo "\nconsumed[$offset][$partialOffset][msg #{$nMessages}]: " . $msg->payload();
            $partialOffset = $messages->validBytes();
        }
        //advance the offset after consuming each message
        $offset += $messages->validBytes();
        //echo "\n---[Advancing offset to $offset]------(".date('H:i:s').")";
        unset($fetchRequest);
        //sleep(2);
    } catch (Exception $e) {
        // probably consumed all items in the queue.
        echo "\nERROR: " . get_class($e) . ': ' . $e->getMessage()."\n".$e->getTraceAsString()."\n";
        sleep(2);
    }
}

基于zookeeper的消费者zkconsumer示例代码

<?php
set_include_path(
    implode(PATH_SEPARATOR, array(
        realpath(__DIR__ . '/../lib'),
        get_include_path(),
    ))
);
require 'autoloader.php';
// zookeeper address (one or more, separated by commas)
$zkaddress = 'localhost:8121';
// kafka topic to consume from
$topic = 'testtopic';
// kafka consumer group
$group = 'testgroup';
// socket buffer size: must be greater than the largest message in the queue
$socketBufferSize = 10485760; //10 MB
// approximate max number of bytes to get in a batch
$maxBatchSize = 20971520; //20 MB
$zookeeper = new Zookeeper($zkaddress);
$zkconsumer = new Kafka_ZookeeperConsumer(
    new Kafka_Registry_Topic($zookeeper),
    new Kafka_Registry_Broker($zookeeper),
    new Kafka_Registry_Offset($zookeeper, $group),
    $topic,
    $socketBufferSize
);
$messages = array();
try {
    foreach ($zkconsumer as $message) {
        // either process each message one by one, or collect them and process them in batches
        $messages[] = $message;
        if ($zkconsumer->getReadBytes() >= $maxBatchSize) {
            break;
        }
    }
} catch (Kafka_Exception_OffsetOutOfRange $exception) {
    // if we haven't received any messages, resync the offsets for the next time, then bomb out
    if ($zkconsumer->getReadBytes() == 0) {
        $zkconsumer->resyncOffsets();
        die($exception->getMessage());
    }
    // if we did receive some messages before the exception, carry on.
} catch (Kafka_Exception_Socket_Connection $exception) {
    // deal with it below
} catch (Kafka_Exception $exception) {
    // deal with it below
}
if (null !== $exception) {
    // if we haven't received any messages, bomb out
    if ($zkconsumer->getReadBytes() == 0) {
        die($exception->getMessage());
    }
    // otherwise log the error, commit the offsets for the messages read so far and return the data
}
// process the data in batches, wait for ACK
$success = doSomethingWithTheMessages($messages);
// Once the data is processed successfully, commit the byte offsets.
if ($success) {
    $zkconsumer->commitOffsets();
}
// get an approximate figure on the size of the queue
try {
    echo "\nRemaining bytes in queue: " . $consumer->getRemainingSize();
} catch (Kafka_Exception_Socket_Connection $exception) {
    die($exception->getMessage());
} catch (Kafka_Exception $exception) {
    die($exception->getMessage());
}