代码拉取完成,页面将自动刷新
<?php
try {
$rk = new \RdKafka\Consumer();
//$rk->setLogLevel(LOG_DEBUG); // 设置日志级别
$rk->addBrokers("49.234.210.15:9092"); // 添加经纪人,就是ip地址
$topic = $rk->newTopic("topic"); // 这里的$rk和生产者是不同的类哦
// 第一个参数分区ID
// 第二个参数是开始消费的偏移量,有效值
$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);
$true = true;
while ($true) {
// 第一个参数是要消耗的分区
// 第二个参数是等待收到消息的最长时间,1000是一秒
$msg = $topic->consume(0, 1000);
if ($msg->err) {
echo __LINE__.$msg->errstr(), "\n";
break;
} else {
echo __LINE__.$msg->payload, "\n";
if (!$msg->payload){
$true = false;
}
}
}
} catch (\Exception $e) {
var_dump($e->getLine() . ' | ' . $e->getMessage());
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。