代码拉取完成,页面将自动刷新
#include "network/ConnectionManager.h"
#include "network/SessionManager.h"
#include "network/Listener.h"
#include "ServerMaster.h"
#include "ThreadPool.h"
#include "System.h"
#include "OS.h"
#define HOST_STRING "127.0.0.1"
#define PORT_STRING "9999"
#define sParallelServer (*ParallelServer::instance())
#define sParallelListener (*ParallelListener::instance())
#define sParallelServerMaster (*ParallelServerMaster::instance())
#define sParallelWorkerManager (*ParallelWorkerManager::instance())
#define sDataManager (*DataManager::instance())
#define sProducerSession (*ProducerSession::instance())
class DataManager : public Singleton<DataManager> {
public:
DataManager() {
packet_string_.resize(65536 * 5);
for (size_t i = 0, total = packet_string_.size(); i < total; ++i) {
packet_string_[i] = System::Rand(0, 256);
}
}
virtual ~DataManager() {
for (INetPacket *pck : pck_list_) {
delete pck;
}
}
void HandlePacket(INetPacket &pck) {
for (size_t i = 0;;++i) {
INetPacket *packet = GetPacket(i);
if (packet != nullptr) {
if (packet->GetOpcode() == pck.GetOpcode() &&
packet->GetReadableSize() == pck.GetReadableSize() &&
memcmp(packet->GetReadableBuffer(), pck.GetReadableBuffer(), pck.GetReadableSize()) == 0)
{
PopPacket(i);
delete packet;
break;
}
} else {
DBGASSERT(packet != nullptr);
break;
}
}
}
INetPacket *GetPacket(size_t i) const {
std::lock_guard<std::mutex> lock(mutex_);
return i < pck_list_.size() ? pck_list_[i] : nullptr;
}
void PopPacket(size_t i) {
std::lock_guard<std::mutex> lock(mutex_);
pck_list_.erase(pck_list_.begin() + i);
}
void PushPacket(INetPacket *pck) {
std::lock_guard<std::mutex> lock(mutex_);
pck_list_.push_back(pck);
}
std::string packet_string_;
std::atomic<int> send_pck_count_{0}, recv_pck_count_{0};
std::vector<INetPacket*> pck_list_;
mutable std::mutex mutex_;
};
class ProducerSession : public Session, public Singleton<ProducerSession> {
public:
ProducerSession() {}
virtual int HandlePacket(INetPacket *pck) {
return SessionHandleSuccess;
}
void SendSomePacket() {
if (GetSendDataSize() > 65536 * 256) {
return;
}
for (int i = 0, total = System::Rand(0, 256); i < total; ++i) {
int offset = System::Rand(0, (int)sDataManager.packet_string_.size());
int length = System::Rand(0, (int)sDataManager.packet_string_.size() - offset, true);
INetPacket *pck = INetPacket::New(System::Rand(0, 60000), length);
pck->Append(sDataManager.packet_string_.data() + offset, length);
sDataManager.PushPacket(pck);
PushSendPacket(*pck);
sDataManager.send_pck_count_.fetch_add(1);
}
}
};
class ConsumerSession : public Session {
public:
ConsumerSession() {}
virtual int HandlePacket(INetPacket *pck) {
sDataManager.HandlePacket(*pck);
sDataManager.recv_pck_count_.fetch_add(1);
return SessionHandleSuccess;
}
};
class ParallelWorkerManager : public ThreadPool, public Singleton<ParallelWorkerManager> {
public:
class ParallelWorker : public Thread {
void Kernel() {
if (sProducerSession.IsStatus(Session::Running)) {
sProducerSession.SendSomePacket();
}
OS::SleepMS(1);
}
};
virtual bool Prepare() {
for (int i = 0; i < 6; ++i) {
PushThread(new ParallelWorker());
}
return true;
}
};
class ParallelListener : public Listener, public Singleton<ParallelListener> {
public:
virtual std::string GetBindAddress() { return HOST_STRING; }
virtual std::string GetBindPort() { return PORT_STRING; }
virtual Session *NewSessionObject() { return new ConsumerSession(); }
virtual void AddDataPipes(Session *session) {
session->GetConnection()->AddDataPipe(new SendDataLz4Pipe, new RecvDataLz4Pipe);
}
};
class ParallelServer : public Thread, public Singleton<ParallelServer> {
public:
virtual bool Initialize() {
sProducerSession.SetConnection(sConnectionManager.NewConnection(sProducerSession));
sProducerSession.GetConnection()->AddDataPipe(new SendDataLz4Pipe, new RecvDataLz4Pipe);
sProducerSession.GetConnection()->AsyncConnect(HOST_STRING, PORT_STRING);
sSessionManager.AddSession(&sProducerSession);
return true;
}
virtual void Kernel() {
sSessionManager.Update();
printf("%d --> %d\r", sDataManager.send_pck_count_.load(),
sDataManager.recv_pck_count_.load());
System::Update();
OS::SleepMS(1);
}
};
class ParallelServerMaster : public IServerMaster, public Singleton<ParallelServerMaster> {
public:
virtual bool InitSingleton() {
ProducerSession::newInstance();
DataManager::newInstance();
ParallelServer::newInstance();
ParallelListener::newInstance();
ParallelWorkerManager::newInstance();
return true;
}
virtual void FinishSingleton() {
ParallelWorkerManager::deleteInstance();
ParallelListener::deleteInstance();
ParallelServer::deleteInstance();
DataManager::deleteInstance();
ProducerSession::deleteInstance();
}
protected:
virtual bool InitDBPool() { return true; }
virtual bool LoadDBData() { return true; }
virtual bool StartServices() {
sParallelServer.Start();
sParallelListener.Start();
sParallelWorkerManager.Start();
return true;
}
virtual void StopServices() {
sParallelWorkerManager.Stop();
sParallelListener.Stop();
sParallelServer.Stop();
sSessionManager.Stop();
}
virtual void Tick() {}
virtual std::string GetConfigFile() { return "config"; }
virtual size_t GetAsyncServiceCount() { return 0; }
virtual size_t GetIOServiceCount() { return 3; }
};
void ParallelMain(int argc, char **argv)
{
ParallelServerMaster::newInstance();
sParallelServerMaster.InitSingleton();
sParallelServerMaster.Initialize(argc, argv);
sParallelServerMaster.Run(argc, argv);
sParallelServerMaster.FinishSingleton();
ParallelServerMaster::deleteInstance();
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。