From f884a9b22985e52dfebcd63eab521d2b6f318017 Mon Sep 17 00:00:00 2001 From: jiaoyue Date: Mon, 4 Sep 2023 22:13:45 +0800 Subject: [PATCH 1/2] raw_msg --- src/util/python/python_helper.cpp | 34 +++++++++++++++++++++++++++++-- src/util/python/python_helper.hh | 7 +++++++ src/util/python/python_module.cpp | 17 ++++++++++++++++ 3 files changed, 56 insertions(+), 2 deletions(-) diff --git a/src/util/python/python_helper.cpp b/src/util/python/python_helper.cpp index 0b81803d..edef7613 100644 --- a/src/util/python/python_helper.cpp +++ b/src/util/python/python_helper.cpp @@ -1500,8 +1500,11 @@ auto CppRuntime::pop(kratos::service::BoxChannelPtr &channel) -> PyObject * { auto header_ptr = (rpc::RpcIdentityNotifyHeader*)(buf_ptr); header_ptr->ntoh(); return on_notify_identity(channel, header_ptr, buf_ptr + sizeof(rpc::RpcIdentityNotifyHeader)); - } - break; + } break; + case rpc::RpcMsgType::NO_RPC: { + return on_raw_msg(channel->get_id(), buf_ptr + sizeof(rpc::RpcMsgHeader), + int(header.length) - int(sizeof(rpc::RpcMsgHeader))); + } break; default: break; } @@ -2285,6 +2288,22 @@ auto CppRuntime::heartbeat() -> void { proxy_channel_->send(buf_ptr, (int)header_length); } +auto CppRuntime::send_raw_msg(char *msg) -> bool { + if (!proxy_channel_) { + return false; + } + auto header_length = sizeof(rpc::RpcMsgHeader); + auto total_length = header_length + sizeof(msg); + auto *buf_ptr = get_buffer_ptr(total_length); + auto *msg_header = reinterpret_cast(buf_ptr); + msg_header->type = decltype(msg_header->type)(rpc::RpcMsgType::NO_RPC); + msg_header->length = decltype(msg_header->length)(total_length); + msg_header->hton(); + strcpy(buf_ptr + header_length, msg); + proxy_channel_->send(buf_ptr, (int)total_length); + return true; +} + #endif // PYTHON_SDK auto CppRuntime::deinit(bool py_clean) -> void { @@ -2586,6 +2605,17 @@ auto CppRuntime::on_notify_identity(kratos::service::BoxChannelPtr &channel, rpc return obj_ptr; } +auto CppRuntime::on_raw_msg(std::uint64_t channel_id, const char *data, int length) -> PyObject * { + auto *obj_ptr = PyDict_New(); + SET_ATTR_STRING_XDECREF(this, obj_ptr, "MSGID", + PyLong_FromLong((long)rpc::RpcMsgType::NO_RPC)); + SET_ATTR_STRING_XDECREF(this, obj_ptr, "CHANNELID", + PyLong_FromUnsignedLongLong(channel_id)); + auto *data_obj_ptr = PyUnicode_FromStringAndSize(data, length); + SET_ATTR_STRING_XDECREF(this, obj_ptr, "DATA", data_obj_ptr); + return obj_ptr; +} + auto CppRuntime::deserialize_arg(const char *data, int size, rpc::ServiceUUID uuid, rpc::MethodID method_id) -> PyObject * { diff --git a/src/util/python/python_helper.hh b/src/util/python/python_helper.hh index 328721e2..020b696b 100644 --- a/src/util/python/python_helper.hh +++ b/src/util/python/python_helper.hh @@ -372,6 +372,11 @@ public: * 发送心跳 */ auto heartbeat() -> void; + + /** + * 发送NO_RPC消息 + */ + auto send_raw_msg(char *msg) -> bool; #endif // PYTHON_SDK /** * 清理 @@ -582,6 +587,8 @@ private: auto on_notify_identity(kratos::service::BoxChannelPtr &channel, rpc::RpcIdentityNotifyHeader* header, char* identity_tag) ->PyObject*; + auto on_raw_msg(std::uint64_t channel_id, const char *data, int length) -> PyObject *; + auto deserialize_arg(const char *data, int size, rpc::ServiceUUID uuid, rpc::MethodID method_id) -> PyObject *; auto deserialize_ret(const char *data, int size, rpc::ServiceUUID uuid, diff --git a/src/util/python/python_module.cpp b/src/util/python/python_module.cpp index fd16a7b8..1fd8b289 100644 --- a/src/util/python/python_module.cpp +++ b/src/util/python/python_module.cpp @@ -619,6 +619,22 @@ static auto heartbeat(PyObject * /*self*/, PyObject *args) -> PyObject* { Py_RETURN_NONE; } +static auto send_raw_msg(PyObject * /*self*/, PyObject *args) -> PyObject* { + char *msg{nullptr}; + if (!PyArg_ParseTuple(args, "s", &msg)) { + Py_RETURN_FALSE; + } + if (!msg) { + Py_RETURN_FALSE; + } + auto retval = get_buffer_ptr()->send_raw_msg(msg); + if (retval) { + Py_RETURN_TRUE; + } else { + Py_RETURN_FALSE; + } +} + #endif // PYTHON_SDK static PyMethodDef buffer_methods[] = { @@ -628,6 +644,7 @@ static PyMethodDef buffer_methods[] = { {"Connect", connect, METH_VARARGS, "Connect to cluster"}, {"Disconnect", disconnect, METH_VARARGS, "Disconnect from cluster"}, {"Heartbeat", heartbeat, METH_NOARGS, "Send HeartBeat Message"}, + {"SendRawMsg", send_raw_msg, METH_VARARGS, "Send raw message"}, #endif // PYTHON_SDK {"Pop", pop, METH_NOARGS, "Pop a RPC protocol object from buffer"}, {"Call", call, METH_VARARGS, "rpc::RpcCallRequestHeader"}, -- Gitee From 724e3fb79f8a68823d5d7c6305a0df6cefee378c Mon Sep 17 00:00:00 2001 From: jiaoyue Date: Tue, 5 Sep 2023 21:24:53 +0800 Subject: [PATCH 2/2] fix msg length --- src/util/python/python_helper.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/util/python/python_helper.cpp b/src/util/python/python_helper.cpp index edef7613..ffb21067 100644 --- a/src/util/python/python_helper.cpp +++ b/src/util/python/python_helper.cpp @@ -2293,14 +2293,14 @@ auto CppRuntime::send_raw_msg(char *msg) -> bool { return false; } auto header_length = sizeof(rpc::RpcMsgHeader); - auto total_length = header_length + sizeof(msg); + auto total_length = header_length + strlen(msg); auto *buf_ptr = get_buffer_ptr(total_length); auto *msg_header = reinterpret_cast(buf_ptr); msg_header->type = decltype(msg_header->type)(rpc::RpcMsgType::NO_RPC); msg_header->length = decltype(msg_header->length)(total_length); msg_header->hton(); strcpy(buf_ptr + header_length, msg); - proxy_channel_->send(buf_ptr, (int)total_length); + proxy_channel_->send(buf_ptr, (int)total_length); return true; } @@ -2606,7 +2606,7 @@ auto CppRuntime::on_notify_identity(kratos::service::BoxChannelPtr &channel, rpc } auto CppRuntime::on_raw_msg(std::uint64_t channel_id, const char *data, int length) -> PyObject * { - auto *obj_ptr = PyDict_New(); + auto *obj_ptr = PyDict_New(); SET_ATTR_STRING_XDECREF(this, obj_ptr, "MSGID", PyLong_FromLong((long)rpc::RpcMsgType::NO_RPC)); SET_ATTR_STRING_XDECREF(this, obj_ptr, "CHANNELID", -- Gitee