1 Star 0 Fork 3

xuenguang/fusion

forked from Jally/fusion 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
ParallelTest.h 6.43 KB
一键复制 编辑 原始数据 按行查看 历史
Jally 提交于 2018-09-14 10:34 . * 编码风格统一
#include "network/ConnectionManager.h"
#include "network/SessionManager.h"
#include "network/Listener.h"
#include "ServerMaster.h"
#include "ThreadPool.h"
#include "System.h"
#include "OS.h"
#define HOST_STRING "127.0.0.1"
#define PORT_STRING "9999"
#define sParallelServer (*ParallelServer::instance())
#define sParallelListener (*ParallelListener::instance())
#define sParallelServerMaster (*ParallelServerMaster::instance())
#define sParallelWorkerManager (*ParallelWorkerManager::instance())
#define sDataManager (*DataManager::instance())
#define sProducerSession (*ProducerSession::instance())
class DataManager : public Singleton<DataManager> {
public:
DataManager() {
packet_string_.resize(65536 * 5);
for (size_t i = 0, total = packet_string_.size(); i < total; ++i) {
packet_string_[i] = System::Rand(0, 256);
}
}
virtual ~DataManager() {
for (INetPacket *pck : pck_list_) {
delete pck;
}
}
void HandlePacket(INetPacket &pck) {
for (size_t i = 0;;++i) {
INetPacket *packet = GetPacket(i);
if (packet != nullptr) {
if (packet->GetOpcode() == pck.GetOpcode() &&
packet->GetReadableSize() == pck.GetReadableSize() &&
memcmp(packet->GetReadableBuffer(), pck.GetReadableBuffer(), pck.GetReadableSize()) == 0)
{
PopPacket(i);
delete packet;
break;
}
} else {
DBGASSERT(packet != nullptr);
break;
}
}
}
INetPacket *GetPacket(size_t i) const {
std::lock_guard<std::mutex> lock(mutex_);
return i < pck_list_.size() ? pck_list_[i] : nullptr;
}
void PopPacket(size_t i) {
std::lock_guard<std::mutex> lock(mutex_);
pck_list_.erase(pck_list_.begin() + i);
}
void PushPacket(INetPacket *pck) {
std::lock_guard<std::mutex> lock(mutex_);
pck_list_.push_back(pck);
}
std::string packet_string_;
std::atomic<int> send_pck_count_{0}, recv_pck_count_{0};
std::vector<INetPacket*> pck_list_;
mutable std::mutex mutex_;
};
class ProducerSession : public Session, public Singleton<ProducerSession> {
public:
ProducerSession() {}
virtual int HandlePacket(INetPacket *pck) {
return SessionHandleSuccess;
}
void SendSomePacket() {
if (GetSendDataSize() > 65536 * 256) {
return;
}
for (int i = 0, total = System::Rand(0, 256); i < total; ++i) {
int offset = System::Rand(0, (int)sDataManager.packet_string_.size());
int length = System::Rand(0, (int)sDataManager.packet_string_.size() - offset, true);
INetPacket *pck = INetPacket::New(System::Rand(0, 60000), length);
pck->Append(sDataManager.packet_string_.data() + offset, length);
sDataManager.PushPacket(pck);
PushSendPacket(*pck);
sDataManager.send_pck_count_.fetch_add(1);
}
}
};
class ConsumerSession : public Session {
public:
ConsumerSession() {}
virtual int HandlePacket(INetPacket *pck) {
sDataManager.HandlePacket(*pck);
sDataManager.recv_pck_count_.fetch_add(1);
return SessionHandleSuccess;
}
};
class ParallelWorkerManager : public ThreadPool, public Singleton<ParallelWorkerManager> {
public:
class ParallelWorker : public Thread {
void Kernel() {
if (sProducerSession.IsStatus(Session::Running)) {
sProducerSession.SendSomePacket();
}
OS::SleepMS(1);
}
};
virtual bool Prepare() {
for (int i = 0; i < 6; ++i) {
PushThread(new ParallelWorker());
}
return true;
}
};
class ParallelListener : public Listener, public Singleton<ParallelListener> {
public:
virtual std::string GetBindAddress() { return HOST_STRING; }
virtual std::string GetBindPort() { return PORT_STRING; }
virtual Session *NewSessionObject() { return new ConsumerSession(); }
virtual void AddDataPipes(Session *session) {
session->GetConnection()->AddDataPipe(new SendDataLz4Pipe, new RecvDataLz4Pipe);
}
};
class ParallelServer : public Thread, public Singleton<ParallelServer> {
public:
virtual bool Initialize() {
sProducerSession.SetConnection(sConnectionManager.NewConnection(sProducerSession));
sProducerSession.GetConnection()->AddDataPipe(new SendDataLz4Pipe, new RecvDataLz4Pipe);
sProducerSession.GetConnection()->AsyncConnect(HOST_STRING, PORT_STRING);
sSessionManager.AddSession(&sProducerSession);
return true;
}
virtual void Kernel() {
sSessionManager.Update();
printf("%d --> %d\r", sDataManager.send_pck_count_.load(),
sDataManager.recv_pck_count_.load());
System::Update();
OS::SleepMS(1);
}
};
class ParallelServerMaster : public IServerMaster, public Singleton<ParallelServerMaster> {
public:
virtual bool InitSingleton() {
ProducerSession::newInstance();
DataManager::newInstance();
ParallelServer::newInstance();
ParallelListener::newInstance();
ParallelWorkerManager::newInstance();
return true;
}
virtual void FinishSingleton() {
ParallelWorkerManager::deleteInstance();
ParallelListener::deleteInstance();
ParallelServer::deleteInstance();
DataManager::deleteInstance();
ProducerSession::deleteInstance();
}
protected:
virtual bool InitDBPool() { return true; }
virtual bool LoadDBData() { return true; }
virtual bool StartServices() {
sParallelServer.Start();
sParallelListener.Start();
sParallelWorkerManager.Start();
return true;
}
virtual void StopServices() {
sParallelWorkerManager.Stop();
sParallelListener.Stop();
sParallelServer.Stop();
sSessionManager.Stop();
}
virtual void Tick() {}
virtual std::string GetConfigFile() { return "config"; }
virtual size_t GetAsyncServiceCount() { return 0; }
virtual size_t GetIOServiceCount() { return 3; }
};
void ParallelMain(int argc, char **argv)
{
ParallelServerMaster::newInstance();
sParallelServerMaster.InitSingleton();
sParallelServerMaster.Initialize(argc, argv);
sParallelServerMaster.Run(argc, argv);
sParallelServerMaster.FinishSingleton();
ParallelServerMaster::deleteInstance();
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
C++
1
https://gitee.com/a13173091052/fusion.git
git@gitee.com:a13173091052/fusion.git
a13173091052
fusion
fusion
master

搜索帮助