2 Star 0 Fork 0

CageQ/syncredis

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
sync_redis.hpp 3.07 KB
一键复制 编辑 原始数据 按行查看 历史
cageq 提交于 2018-03-15 12:13 . updat
#pragma once
#include <memory>
#include <vector>
#include <functional>
#include "fmt/format.h"
#include "redis_conn.hpp"
class SyncRedis
{
public:
typedef std::shared_ptr<RedisConn> RedisConnPtr;
SyncRedis()
{
m_cur_index = 0;
}
int start(const std::string & host = "127.0.0.1", int port = 6379 ,
const std::string& passwd = "", int thrds = 8)
{
for(int i =0; i< thrds ;i ++)
{
RedisConnPtr conn = RedisConnPtr(new RedisConn(i));
if (conn->init(host,port,passwd) == 0) {
m_connections.push_back(conn);
}
}
return 0;
}
int terminate() {
for (auto & conn : m_connections) {
conn->disconnect();
}
m_connections.clear();
return 0;
}
template <typename ... Args>
SyncRedis * execute(RedisHandle handle , const std::string & cmd , const Args & ... args )
{
fmt::MemoryWriter command;
command << cmd ;
int argSize = sizeof ...(Args);
fmt::MemoryWriter params;
for (int i = 0; i < argSize ; i++)
{
params<< " {} ";
}
command.write(params.c_str(),args...);
//DLog("command is : %s" , command.c_str());
RedisConnPtr conn = this->get_next_conn();
if (conn)
{
RedisCommand* pResult = new RedisCommand();
pResult->handle = handle;
pResult->command = command.c_str();
//TODO
conn->execute(command.c_str(),pResult);
}
return this;
}
template <typename ... Args>
SyncRedis * execute(const std::string & cmd , const Args & ... args )
{
fmt::MemoryWriter command;
command << cmd ;
int argSize = sizeof ...(Args);
fmt::MemoryWriter params;
for (int i = 0; i < argSize ; i++)
{
params<< " {} ";
}
command.write(params.c_str(),args...);
DLog("command is : %s" , command.c_str());
RedisConnPtr conn = this->get_next_conn();
if (conn)
{
RedisCommand* pResult = new RedisCommand();
pResult->command = command.c_str();
//TODO
conn->execute(command.c_str(),pResult);
}
return this;
}
int execute(const std::string & cmd)
{
RedisConnPtr conn = this->get_next_conn();
if (conn)
{
RedisCommand* pResult = new RedisCommand();
pResult->command = cmd.c_str();
conn->execute(cmd.c_str(),pResult);
}
return 0;
}
RedisConnPtr get_next_conn()
{
int total = m_connections.size();
int curIdx = m_cur_index++ %total;
while (!m_connections[curIdx]->is_connected() && total > 0 )
{
DLog("tring to reconnect to server");
m_connections[curIdx]->connect();
total -- ;
curIdx = m_cur_index++ %m_connections.size();
}
if (total > 0)
{
RedisConnPtr conn = m_connections[curIdx];
if (conn->is_connected())
{
return conn;
}
}
return RedisConnPtr();
}
int process_result(ResultHandler handle)
{
for(auto conn : m_connections)
{
conn->result_queue.process([&](RedisCommand* pResult) {
if (handle)
{
handle(pResult);
}
delete pResult;
});
}
return 0 ;
}
private:
std::vector<RedisConnPtr> m_connections;
int m_cur_index;
};
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
C++
1
https://gitee.com/cageq/syncredis.git
git@gitee.com:cageq/syncredis.git
cageq
syncredis
syncredis
master

搜索帮助