2 Star 14 Fork 4

gavingqf/anet

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
share_ptr_session.hpp 7.06 KB
一键复制 编辑 原始数据 按行查看 历史
gaoqingfeng 提交于 2024-11-18 09:11 . update
#pragma once
/*
* session which runs in the same thread.
*/
#include <functional>
#include <vector>
#include <memory>
#include <map>
#include <mutex>
#include <atomic>
#include "anet.hpp"
#include "connection.hpp"
#include "main_service.h"
#include "rpc/rpc_handle.hpp"
#include "default_interface.hpp"
#include "allocator.hpp"
namespace anet {
namespace tcp {
class IReleaser;
// proxy session with all net events run in the same thread.
template <typename proxySession>
class CSharePtrSession : public ISession,
public std::enable_shared_from_this<CSharePtrSession<proxySession>> {
public:
CSharePtrSession() {
initProxySession(proxySession::Create());
}
explicit CSharePtrSession(proxySession* pSession) {
initProxySession(pSession);
}
virtual ~CSharePtrSession() = default;
virtual void onRecv(const char* msg, int len) override {
auto self = this->shared_from_this();
// std::vector<char> vecMsg(msg, msg + len);
anet::utils::PoolVector<char> vecMsg(msg, msg + len);
CMainService::instance().post([self, this, vecMsg = std::move(vecMsg)]{
this->OnMessage(vecMsg.data(), vecMsg.size());
});
}
virtual void onConnected(connSharePtr conn) override {
m_closed.store(false);
m_conn = conn;
auto self = this->shared_from_this();
CMainService::instance().post([self, this]() {
this->OnConnected();
});
}
virtual void onTerminate() override {
m_closed.store(true);
auto self = this->shared_from_this();
CMainService::instance().post([self, this]() {
this->OnTerminate();
});
}
virtual void release() override {
auto self = this->shared_from_this();
CMainService::instance().post([self, this]() {
this->OnRelease();
this->Release();
});
}
void setReleaser(IReleaser* releaser) {
if (releaser == nullptr) {
throw std::invalid_argument("releaser cannot be null");
return;
}
m_releaser = releaser;
}
unsigned int getId() const {
return m_id;
}
void setId(unsigned int id) {
m_id = id;
// proxySession must have a SetId() method.
m_proxySession->SetId(id);
}
void Release() {
if (m_releaser != nullptr) {
auto self = this->shared_from_this();
m_releaser->releaseSession((void*)&self);
} else {
LogCrit("Cannot find releaser object");
}
}
void OnConnected() {
m_proxySession->OnConnected();
}
void OnMessage(const char* msg, int len) {
m_proxySession->OnMessage(msg, len);
}
void OnTerminate() {
m_proxySession->OnTerminate();
}
void OnRelease() {
m_proxySession->OnRelease();
}
void Send(const std::string& msg) {
this->Send(msg.c_str(), int(msg.size()));
}
void Send(const char* msg, int len) {
if (msg == nullptr || len == 0) {
return;
}
if (IsConnected()) {
m_conn->Send(msg, len);
}
}
void Send(unsigned short msgId, const char* msg, int len) {
anet::utils::PoolVector<char> vec;
int size = buildProto(msgId, msg, len, vec);
this->Send(vec.data(), size);
}
template <typename... Args>
void remote_call(const std::string& method, Args&&... args) {
anet::rpc_codec::rpc_stream stream;
anet::rpc_codec::pack_remote_call(stream, method, std::forward<Args>(args)...);
this->Send(stream.c_str(), static_cast<int>(stream.buf().size()));
}
bool IsConnected() const {
return !m_closed.load();
}
void Close() {
if (IsConnected()) {
m_conn->close();
}
}
const std::string getRemoteIP() const {
return m_conn->getRemoteAddr();
}
unsigned short getRemotePort() const {
return m_conn->getRemotePort();
}
proxySession* getProxySession() const {
return m_proxySession;
}
int buildProto(unsigned short msgId, const char* msg, int len,
anet::utils::PoolVector<char>& vecBuf) {
vecBuf.resize(sizeof(SCommonHead) + sizeof(unsigned short) + len);
char* pBuf = vecBuf.data();
SCommonHead& head = *reinterpret_cast<SCommonHead*>(pBuf);
head.len = uint32(htonl(uint32(len) + sizeof(msgId)));
*reinterpret_cast<uint16*>(pBuf + gProto_head_size) = htons(msgId);
std::memcpy(pBuf + gProto_head_size + sizeof(msgId), msg, len);
return static_cast<int>(gProto_head_size + sizeof(msgId) + len);
}
private:
void initProxySession(proxySession* pSession) {
if (pSession == nullptr) {
throw std::runtime_error("proxySession cannot be null");
return;
}
m_proxySession = pSession;
m_proxySession->SetSession(this);
}
private:
connSharePtr m_conn{ nullptr };
unsigned int m_id{ 0 };
std::atomic_bool m_closed{ true };
IReleaser* m_releaser{ nullptr };
proxySession* m_proxySession{ nullptr };
};
class IReleaser {
public:
virtual ~IReleaser() = default;
virtual void releaseSession(void* pSession) = 0;
};
// proxySession session factory.
template <typename proxySession>
class CSharePtrSessionFactory : public ISessionFactory, public IReleaser {
public:
CSharePtrSessionFactory() = default;
virtual ~CSharePtrSessionFactory() = default;
CSharePtrSessionFactory(const CSharePtrSessionFactory&) = delete;
CSharePtrSessionFactory& operator=(const CSharePtrSessionFactory&) = delete;
using sharePtrSession = CSharePtrSession<proxySession>;
using sessionSharePtr = std::shared_ptr<sharePtrSession>;
using Id2SessionPtr = std::map<unsigned int, sessionSharePtr>;
virtual ISession* createSession() override {
return _createSession(nullptr);
}
// releaseSession releases session.
virtual void releaseSession(void* session) override {
if (session == nullptr) {
return;
}
auto& realSession = *reinterpret_cast<sessionSharePtr*>(session);
const auto id = realSession->getId();
// lock guard.
std::lock_guard<std::mutex> lock(m_mutex);
m_sessions.erase(id);
}
// createSession creates session.
ISession* createSession(proxySession* pSession) {
return _createSession(pSession);
}
// findSession finds session with id.
std::pair<bool, sessionSharePtr> findSession(unsigned int id) const {
std::lock_guard<std::mutex> lock(m_mutex);
auto it = m_sessions.find(id);
if (it != m_sessions.end()) {
return { true, it->second };
} else {
return { false, sessionSharePtr{} };
}
}
proxySession* GetSession(unsigned int id) const {
auto result = this->findSession(id);
if (result.first && result.second) {
return result.second->getProxySession();
} else {
return nullptr;
}
}
private:
ISession* _createSession(proxySession* pSession) {
std::lock_guard<std::mutex> lock(m_mutex);
const unsigned int sessionId = m_nextId++;
sessionSharePtr sessSharePtr = (pSession != nullptr) ?
std::make_shared<sharePtrSession>(pSession) :
std::make_shared<sharePtrSession>();
sessSharePtr->setId(sessionId);
sessSharePtr->setReleaser(this);
m_sessions[sessionId] = sessSharePtr;
return sessSharePtr.get();
}
private:
mutable std::mutex m_mutex;
Id2SessionPtr m_sessions;
unsigned int m_nextId{ 1 };
};
}
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
C++
1
https://gitee.com/gavingqf/anet.git
git@gitee.com:gavingqf/anet.git
gavingqf
anet
anet
master

搜索帮助