2 Star 0 Fork 5

zjzdy/tmsg

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
tmsg.cpp 9.86 KB
一键复制 编辑 原始数据 按行查看 历史
zjzdy 提交于 2018-08-11 13:07 . first version
/*
tmsg.cpp
Licensed under the MIT License <http://opensource.org/licenses/MIT>.
SPDX-License-Identifier: MIT
Copyright (c) 2018 zjzengdongyang <http://zjzdy.cn>.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#include "tmsg.hpp"
#include <algorithm>
#include <utility>
#include <sstream>
#include <mqtt/client.h>
void tmsg::msg::parse() {
//循环解包所有数据
size_t offset = 0;
while(offset != sbuf.size()) {
size_t offset_t = offset;
auto obj = msgpack::unpack(sbuf.data(), sbuf.size(), offset);
if(offset_t == offset) {
throw std::runtime_error("unpack data fail in "+std::to_string(offset));
}
parsedData.emplace_back(std::move(obj));
}
parsed = true;
}
std::string tmsg::msg::getName(unsigned long sequence) {
if(sequence < names.size())
return names.at(sequence);
return std::string();
}
void tmsg::msg::clear() {
sbuf.clear();
names.clear();
parsed = false;
parsedData.clear();
updateTime();
}
std::string tmsg::msg::toJson(bool is_names) {
std::ostringstream oss;
oss << '[';
if(is_names) {
//输出名称定义
for(auto &i : names)
oss << '"' << i << "\",";
}
else {
//输出数据
if(!parsed)
parse();
for(auto &i : parsedData)
oss << i.get() << ',';
}
oss.seekp(-1, std::ios_base::cur);
oss << ']';
return oss.str();
}
size_t tmsg::msg::dataCount() {
if(!parsed)
parse();
return parsedData.size();
}
std::chrono::steady_clock::time_point tmsg::msg::getTime() {
return time;
}
void tmsg::msg::updateTime() {
time = std::chrono::steady_clock::now();
}
void tmsg::callback::connection_lost(const std::string& cause) {
if(tm_.verbose) {
std::cout << "Connection lost";
if(!cause.empty())
std::cout << ": " << cause << std::endl;
else
std::cout << std::endl;
}
}
void tmsg::callback::message_arrived(mqtt::const_message_ptr msg) {
const std::string def_b("/tmsg/define/");
const std::string data_b("/tmsg/data/");
try {
//处理定义
if(msg->get_topic_ref().str().compare(0, def_b.length(), def_b) == 0) {
std::string dataTopic = msg->get_topic_ref().str().substr(def_b.length());
if(dataTopic.empty())
return;
auto data = msgpack::unpack(msg->get_payload_ref().data(), msg->get_payload_ref().size());
data.get().convert(tm_.defines[dataTopic]);
if(tm_.verbose > 1)
std::cout << "Update data define for " << dataTopic << " (vid=" << (int)tm_.defines[dataTopic].first << ", in subscribe)" << std::endl;
if(tm_.verbose > 2)
std::cout << "New data define is " << data.get() << std::endl;
}
//处理数据
else if(msg->get_topic_ref().str().compare(0, data_b.length(), data_b) == 0) {
std::string dataTopic = msg->get_topic_ref().str().substr(data_b.length());
if(dataTopic.empty())
return;
size_t offset = 0;
//检查数据并储存
if(!tm_.defines[dataTopic].second.empty() &&
tm_.defines[dataTopic].first == msgpack::unpack(msg->get_payload_ref().data(), msg->get_payload_ref().size(), offset).get().as<unsigned char>()) {
tm_.msgs[dataTopic] = std::move(std::make_shared<tmsg::msg>(msg->get_payload_ref().data() + offset,
msg->get_payload_ref().size() - offset,
tm_.defines[dataTopic].second));
if(tm_.userCallback)
tm_.userCallback->message_arrived(dataTopic, tm_.msgs[dataTopic]);
}
if(tm_.verbose > 3) {
int vid = msgpack::unpack(msg->get_payload_ref().data(), msg->get_payload_ref().size()).get().as<unsigned char>();
std::cout << "Arrived data: topic " << dataTopic
<< ", define vid " << vid
<< ", size " << msg->get_payload_ref().size()
<< ", qos " << msg->get_qos()
<< ", retained " << msg->is_retained()
<< ", update local data " << (tm_.defines[dataTopic].first == vid && !tm_.defines[dataTopic].second.empty() ? "true" : "false")
<< std::endl;
std::cout << "Arrived data is " << tm_.msgs[dataTopic]->toJson() << std::endl;
}
}
else {
if(tm_.verbose > 3) {
std::cout << "Arrived unknown data: mqtt topic" << msg->get_topic()
<< ", size " << msg->get_payload_ref().size()
<< ", qos " << msg->get_qos() << ", retained " << msg->is_retained() << std::endl;
}
}
}
catch(msgpack::type_error const& e) {
std::cerr << e.what() << std::endl;
}
}
tmsg::callback::callback(tmsg& tm) : tm_(tm) {}
tmsg::tmsg(const std::string &serverURI, const std::string &clientId, int maxBufferedMsg) : cli(serverURI, clientId, maxBufferedMsg), cb(*this), verbose(0)
{
cli.set_callback(cb);
}
tmsg::~tmsg() {
disconnect();//销毁时自动断开连接,但不保证成功
}
bool tmsg::connect() {
mqtt::connect_options connOpts;
connOpts.set_keep_alive_interval(20);
connOpts.set_clean_session(true);
connOpts.set_automatic_reconnect(true);
return connect(connOpts);
}
bool tmsg::connect(mqtt::connect_options &connOpts) {
if(verbose)
std::cout << "Connecting... " << cli.get_server_uri() << std::endl;
try {
cli.connect(connOpts);
if(verbose && cli.is_connected())
std::cout << "Connected " << cli.get_server_uri() << std::endl;
}
catch (const mqtt::exception& exc) {
std::cerr << exc.what() << std::endl;
return false;
}
return true;
}
void tmsg::publish(const std::string &dataTopic, const std::shared_ptr<tmsg::msg> &m, int qos, bool retained) {
publish(dataTopic, *m.get(), qos, retained);
}
void tmsg::publish(const std::string &dataTopic, const tmsg::msg &m, int qos, bool retained) {
if(dataTopic.empty() || m.sbuf.size() == 0)
return;
if(m.names != defines[dataTopic].second) {//判断是否要更新并发布新的数据定义
static unsigned char vid = 0;//数据定义更新标识
if(verbose > 1)
std::cout << "Update data define for " << dataTopic << " (vid=" << (int)vid << ", in publish)" << std::endl;
//更新本地定义
defines[dataTopic].first = vid++;
defines[dataTopic].second = m.names;
//打包定义数据
msgpack::sbuffer sbuf;
msgpack::pack(sbuf, defines[dataTopic]);
//发布新定义数据
cli.publish("/tmsg/define/"+dataTopic, sbuf.data(), sbuf.size(), 1, true);
if(verbose > 2) {
msgpack::sbuffer buf(sbuf.size()+1);
buf.write(sbuf.data(), sbuf.size());
std::cout << "New data define is " << msgpack::unpack(buf.data(), buf.size()).get() << ", data size "
<< buf.size() << std::endl;
}
}
//增加数据定义的vid在开头,避免客户端按错误的定义来解析
msgpack::sbuffer sbuf(m.sbuf.size()+4);
msgpack::pack(sbuf, defines[dataTopic].first);
sbuf.write(m.sbuf.data(), m.sbuf.size());//拷贝msg的数据
cli.publish("/tmsg/data/"+dataTopic, sbuf.data(), sbuf.size(), qos, retained);
if(verbose > 3) {
std::cout << "Publish data: topic " << dataTopic
<< ", define vid " << (int)defines[dataTopic].first
<< ", size " << sbuf.size()
<< ", qos " << qos << ", retained " << retained << std::endl;
}
}
void tmsg::subscribe(const std::string &dataTopic, int qos) {
if(dataTopic.empty())
return;
if(verbose > 1)
std::cout << "Subscribe topic " << dataTopic << " (qos=" << qos << ")" << std::endl;
cli.subscribe("/tmsg/data/"+dataTopic, qos);//订阅数据
cli.subscribe("/tmsg/define/"+dataTopic, 1);//订阅定义
}
bool tmsg::is_connected() {
return cli.is_connected();
}
bool tmsg::disconnect() {
if(cli.is_connected())
cli.disconnect();
return !cli.is_connected();
}
void tmsg::setVerbose(unsigned int level) {
verbose = level;
}
std::shared_ptr<tmsg::msg> tmsg::getLatestMsg(const std::string &dataTopic) {
return msgs[dataTopic];
}
void tmsg::setNewMsgCallback(tmsg::newMsgCallback &cb) {
userCallback = &cb;
}
std::vector<std::string> tmsg::getAllTopic() {
std::vector<std::string> topics;
for(auto &i : msgs)
topics.emplace_back(i.first);
return std::move(topics);
}
const std::map<std::string, std::pair<unsigned char, std::vector<std::string>>> &tmsg::getAllDefines() {
return defines;
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
C++
1
https://gitee.com/zjzdy/tmsg.git
git@gitee.com:zjzdy/tmsg.git
zjzdy
tmsg
tmsg
master

搜索帮助