1 Star 0 Fork 2

doublespring/rocketmq-client4php

forked from 永恒/rocketmq-client4php 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
main.cpp 9.60 KB
一键复制 编辑 原始数据 按行查看 历史
002197 提交于 2016-12-22 10:13 . transproducer
#include <iostream>
#include <phpcpp.h>
#include <rocketmq/DefaultMQPushConsumer.h>
#include <rocketmq/MessageListener.h>
#include <rocketmq/Message.h>
#include <rocketmq/MessageExt.h>
#include <rocketmq/MessageQueue.h>
#include <rocketmq/PullResult.h>
#include <rocketmq/MQClientException.h>
#include <PhpSendResult.h>
#include <producer.h>
#include <transactionProducer.h>
class PhpMessage : public Php::Base {
public:
PhpMessage(MessageExt* messageExt) {
pMessageExt = messageExt;
}
Php::Value getMsgId() {
return pMessageExt->getMsgId();
}
Php::Value getTopic() {
return pMessageExt->getTopic();
}
Php::Value getTags() {
return pMessageExt->getTags();
}
Php::Value getKeys() {
return pMessageExt->getKeys();
}
Php::Value getBodyLen() {
return pMessageExt->getBodyLen();
}
Php::Value getQueueId() {
return pMessageExt->getQueueId();
}
Php::Value getReconsumeTimes() {
return pMessageExt->getReconsumeTimes();
}
Php::Value getCommitLogOffset() {
return (int64_t)pMessageExt->getCommitLogOffset();
}
Php::Value getBornTimestamp() {
return (int64_t)pMessageExt->getBornTimestamp();
}
Php::Value getStoreTimestamp() {
return (int64_t)pMessageExt->getStoreTimestamp();
}
Php::Value getPreparedTransactionOffset() {
return (int64_t)pMessageExt->getPreparedTransactionOffset();
}
Php::Value getProperty(Php::Parameters& params) {
if (params.size() < 1) {
throw Php::Exception("Incorrect number of parameters assigned.");
}
return pMessageExt->getProperty(params[0].stringValue());
}
/**
* return value type: char*
*/
Php::Value getBody() {
std::string body(pMessageExt->getBody(), pMessageExt->getBodyLen());
return body;
}
private:
MessageExt* pMessageExt;
};
class PhpMessageListener : public MessageListenerConcurrently {
private:
const Php::Value& _callback;
public:
PhpMessageListener(const Php::Value& callback) : _callback(callback) {
}
virtual ConsumeConcurrentlyStatus consumeMessage(std::list<MessageExt *> &msgs,
ConsumeConcurrentlyContext &context);
};
class PushConsumer : public Php::Base {
public:
PushConsumer() {
consumer = new DefaultMQPushConsumer();
consumer->setConsumeThreadMax(1);
consumer->setConsumeThreadMin(1);
}
virtual ~PushConsumer() {
delete(messageListener);
delete(consumer);
}
void __destruct() {
consumer->shutdown();
}
void start() {
consumer->start();
}
void shutdown() {
consumer->shutdown();
}
void setConsumerGroup(Php::Parameters& params) {
if (params.size() < 1) {
throw Php::Exception("Incorrect number of parameters assigned.");
}
consumer->setConsumerGroup(params[0].stringValue());
}
void subscribe(Php::Parameters& params) {
if (params.size() != 3) {
throw Php::Exception("Incorrect number of parameters passed in");
}
consumer->subscribe(params[0].stringValue(), params[1].stringValue());
Php::Value* value = new Php::Value(params[2]);
messageListener = new PhpMessageListener(*value);
//MsgListener* listener = new MsgListener();
consumer->registerMessageListener(messageListener);
}
private:
DefaultMQPushConsumer* consumer;
MessageListener* messageListener;
Php::Value* callback;
};
/**
* tell the compiler that the get_module is a pure C function
*/
extern "C" {
/**
* Function that is called by PHP right after the PHP process
* has started, and that returns an address of an internal PHP
* strucure with all the details and features of your extension
*
* @return void* a pointer to an address that is understood by PHP
*/
PHPCPP_EXPORT void *get_module()
{
// static(!) Php::Extension object that should stay in memory
// for the entire duration of the process (that's why it's static)
static Php::Extension extension("rocketmqclient4php", "1.0");
// Message class
Php::Class<PhpMessage> phpMessage("PhpMessage");
phpMessage.method("getMsgId", &PhpMessage::getMsgId, {});
phpMessage.method("getTopic", &PhpMessage::getTopic, {});
phpMessage.method("getTags", &PhpMessage::getTags, {});
phpMessage.method("getKeys", &PhpMessage::getKeys, {});
phpMessage.method("getBodyLen", &PhpMessage::getBodyLen, {});
phpMessage.method("getBody", &PhpMessage::getBody, {});
phpMessage.method("getQueueId", &PhpMessage::getQueueId, {});
phpMessage.method("getReconsumeTimes", &PhpMessage::getReconsumeTimes, {});
phpMessage.method("getCommitLogOffset", &PhpMessage::getCommitLogOffset, {});
phpMessage.method("getBornTimestamp", &PhpMessage::getBornTimestamp, {});
phpMessage.method("getStoreTimestamp", &PhpMessage::getStoreTimestamp, {});
phpMessage.method("getPreparedTransactionOffset", &PhpMessage::getPreparedTransactionOffset, {});
phpMessage.method("getProperty", &PhpMessage::getProperty,
{Php::ByVal("name", Php::Type::String)});
extension.add(std::move(phpMessage));
// SendResult
Php::Class<PhpSendResult> phpSendResult("PhpSendResult");
phpSendResult.method("getMsgId", &PhpSendResult::getMsgId, {});
phpSendResult.method("getUniqKey", &PhpSendResult::getUniqKey, {});
extension.add(std::move(phpSendResult));
// Consumer class
Php::Class<PushConsumer> pushConsumer("PushConsumer");
pushConsumer.method("start", &PushConsumer::start, {});
pushConsumer.method("shutdown", &PushConsumer::shutdown, {});
pushConsumer.method("__destruct", &PushConsumer::__destruct, {});
pushConsumer.method("setConsumerGroup", &PushConsumer::setConsumerGroup,
{Php::ByVal("consumerGroup", Php::Type::String)});
pushConsumer.method("subscribe", &PushConsumer::subscribe,
{Php::ByVal("topic", Php::Type::String),
Php::ByVal("tags", Php::Type::String),
Php::ByVal("consumeFunction", Php::Type::Callable)});
// Producer class
Php::Class<Producer> producer("Producer");
producer.method("__construct", &Producer::__construct, {});
producer.method("__destruct", &Producer::__destruct, {});
producer.method("send", &Producer::send, {
Php::ByVal("topic", Php::Type::String),
Php::ByVal("tag", Php::Type::String),
Php::ByVal("key", Php::Type::String),
Php::ByVal("content", Php::Type::String)});
producer.method("start", &Producer::start, {});
producer.method("setProducerGroup", &Producer::setProducerGroup, {
Php::ByVal("groupName", Php::Type::String)});
// TransactionProducer
Php::Class<TransactionProducer> transactionProducer("TransactionProducer");
transactionProducer.method("__construct", &TransactionProducer::__construct, {});
transactionProducer.method("__destruct", &TransactionProducer::__destruct, {});
transactionProducer.method("sendMessageInTransaction", &TransactionProducer::sendMessageInTransaction, {
Php::ByVal("topic", Php::Type::String),
Php::ByVal("tag", Php::Type::String),
Php::ByVal("key", Php::Type::String),
Php::ByVal("content", Php::Type::String)});
transactionProducer.method("endTransaction", &TransactionProducer::endTransaction, {
Php::ByVal("msgId", Php::Type::String),
Php::ByVal("transState", Php::Type::String)});
transactionProducer.method("start", &TransactionProducer::start, {});
transactionProducer.method("setProducerGroup", &TransactionProducer::setProducerGroup, {
Php::ByVal("groupName", Php::Type::String)});
// use namespace
Php::Namespace Rocketmq("Rocketmq");
Rocketmq.add(std::move(producer));
Rocketmq.add(std::move(transactionProducer));
Rocketmq.add(std::move(pushConsumer));
extension.add(std::move(Rocketmq));
// return the extension
return extension;
}
}
ConsumeConcurrentlyStatus PhpMessageListener::consumeMessage(std::list<MessageExt *> &msgs,
ConsumeConcurrentlyContext &context) {
//std::cout << "batch size: " << msgs.size() << std::endl;
if(msgs.empty()) {
throw Php::Exception("batch consuming message list is empty");
}
MessageExt* messageExt = msgs.front();
//std::cout << "getBody-0: " << messageExt->getBody() << std::endl;
//std::cout << "Begin to consume message. msgId: " << messageExt->getMsgId() << std::endl;
//std::cout << "Test if callable: " << (_callback.isCallable() ? " true" : "false") << std::endl;
if (!_callback.isCallable()) {
throw Php::Exception("Callback PHP function is expected");
}
Php::Value param = Php::Object("PhpMessage", new PhpMessage(messageExt));
Php::Value value;
try {
value = _callback(param);
} catch (...) {
std::cout << "Yuck! Bussiness code is buggy!" << std::endl;
}
if (!value.isNull() && value.isNumeric() && value.numericValue() > 0) {
std::cout << "Message Consumption Failed! Retry Later." << std::endl;
context.ackIndex = 0;
return RECONSUME_LATER;
}
//std::cout << "Message Consumed OK" << std::endl;
context.ackIndex = 1;
return CONSUME_SUCCESS;
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
C++
1
https://gitee.com/doublespring168/rocketmq-client4php.git
git@gitee.com:doublespring168/rocketmq-client4php.git
doublespring168
rocketmq-client4php
rocketmq-client4php
master

搜索帮助

0d507c66 1850385 C8b1a773 1850385