1 Star 0 Fork 17

fanpq/robotphp

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
EasySwooleEvent.php 29.35 KB
一键复制 编辑 原始数据 按行查看 历史
黄超h 提交于 2023-06-06 14:44 . 提交代码

<?php
namespace EasySwoole\EasySwoole;
use App\Crontab\DayCron;
use App\Crontab\MonthCron;
use App\Crontab\NumberAllCron;
use App\Crontab\TaskNumberCdrCron;
use App\Crontab\TaskNumberRecordCron;
use App\Crontab\YearCron;
use App\Release\Hot;
use EasySwoole\EasySwoole\AbstractInterface\Event;
use EasySwoole\EasySwoole\Crontab\Crontab;
use EasySwoole\EasySwoole\Swoole\EventRegister;
use EasySwoole\ORM\DbManager;
use EasySwoole\ORM\Db\Connection;
use EasySwoole\Http\Request;
use EasySwoole\Http\Response;
use App\WebSocket\Controller\socket\WebSocketEvent;
use App\WebSocket\Controller\socket\WebSocketParser;
use EasySwoole\Component\Di;
use EasySwoole\Http\Message\Status;
use Swoole\Coroutine;
use Swoole\Http\Request as SwooleRequest;
use Swoole\Http\Response as SwooleResponse;
use EasySwoole\EasySwoole\Http\Dispatcher;
use EasySwoole\AtomicLimit\AtomicLimit;
use App\Net\util\IpList;
use EasySwoole\Component\Process\AbstractProcess;
use EasySwoole\Redis\Config\RedisConfig;
use EasySwoole\RedisPool\Pool;
use EasySwoole\RedisPool\RedisPool;
use App\Rpc\NodeManager\RedisManager;
class EasySwooleEvent implements Event
{
public static function initialize()
{
date_default_timezone_set('Asia/Shanghai');
self::setHttpGlobalOnRequest();
// Config::getInstance()->loadFile(EASYSWOOLE_ROOT.'/Conf');
Config::getInstance()->loadDir(EASYSWOOLE_ROOT.'/Conf');
/**
* MYSQL ORM 连接池
*/
$mysqlConfig = Config::getInstance()->getConf('MYSQL');
$config = new \EasySwoole\ORM\Db\Config($mysqlConfig);
try{
$config->setMinObjectNum($mysqlConfig['minObjectNum']);
$config->setMaxObjectNum($mysqlConfig['maxObjectNum']);
$config->setIntervalCheckTime($mysqlConfig['intervalCheckTime']);
$config->setGetObjectTimeout($mysqlConfig['getObjectTimeout']);
$config->setMaxIdleTime($mysqlConfig['maxIdleTime']);
DbManager::getInstance()->addConnection(new Connection($config),"main_mysql");
}catch (\Throwable $e){
$log = "系统启动时注入MySQL ORM异常 Msg:{$e->getMessage()}";
// 记录系统启动异常 可自行设置log记录
}
/**
* MYSQL ORM 连接池
*/
// $mysqlConfig = Config::getInstance()->getConf('MYSQL_NUMBER');
// $config = new \EasySwoole\ORM\Db\Config($mysqlConfig);
// try{
// $config->setMinObjectNum($mysqlConfig['minObjectNum']);
// $config->setMaxObjectNum($mysqlConfig['maxObjectNum']);
// $config->setIntervalCheckTime($mysqlConfig['intervalCheckTime']);
// $config->setGetObjectTimeout($mysqlConfig['getObjectTimeout']);
// $config->setMaxIdleTime($mysqlConfig['maxIdleTime']);
// DbManager::getInstance()->addConnection(new Connection($config),"number_mysql");
// }catch (\Throwable $e){
// $log = "系统启动时注入MySQL ORM异常 Msg:{$e->getMessage()}";
// // 记录系统启动异常 可自行设置log记录
// }
self::setRedisOperator();
/**
* REDIS 连接池
*/
// $config = new \EasySwoole\Pool\Config();
// $redis = Config::getInstance()->getConf('REDIS');
// $redisConfig = new \EasySwoole\Redis\Config\RedisConfig($redis);
// $redisPool = \EasySwoole\RedisPool\RedisPool::getInstance()->register($redisConfig,"main_redis");
// try {
// //配置连接池连接数
// $redisPool->setMinObjectNum($redis['minObjectNum']);
// $redisPool->setMaxObjectNum($redis['maxObjectNum']);
// }catch (\Exception $e){
// $log = "系统启动时注入REDIS异常 Msg:{$e->getMessage()}";
// // 记录系统启动异常 可自行设置log记录
// }
}
public static function setRedisOperator()
{
//本门使用的redis 有两个一个是web-redis
//还有fs-redis
//我们使用web-redis 为main-redis
//我们还有一个是fs-redis 为fs-redis
$redisMain = Config::getInstance()->getConf('REDIS');
$redisConfig = new RedisConfig($redisMain);
$redisPool = RedisPool::getInstance()->register($redisConfig,"main_redis");//redis_redis
try {
//配置连接池连接数
$redisPool->setMinObjectNum($redisMain['minObjectNum']);
$redisPool->setMaxObjectNum($redisMain['maxObjectNum']);
}catch (\Exception $e){
$log = "系统启动时注入REDIS异常 Msg:{$e->getMessage()}";
// 记录系统启动异常 可自行设置log记录
}
$redis = Config::getInstance()->getConf('WEB_REDIS');
$redisConfig = new RedisConfig($redis);
$redisPool = RedisPool::getInstance()->register($redisConfig,"web_redis");//web_redis
try {
//配置连接池连接数
$redisPool->setMinObjectNum($redis['minObjectNum']);
$redisPool->setMaxObjectNum($redis['maxObjectNum']);
}catch (\Exception $e){
$log = "系统启动时注入REDIS异常 Msg:{$e->getMessage()}";
// 记录系统启动异常 可自行设置log记录
}
// $redisConfig = new \EasySwoole\Redis\Config\RedisClusterConfig([
// ['120.24.61.37', 7001],
// ['120.24.61.37', 7002],
// ['120.24.61.37', 7003],
// ['120.24.61.37', 7004],
// ['120.24.61.37', 7005],
// ['120.24.61.37', 7006],
// ['47.107.71.25', 7001],
// ['47.107.71.25', 7002],
// ['47.107.71.25', 7003],
// ['47.107.71.25', 7004],
// ['47.107.71.25', 7005],
// ['47.107.71.25', 7006],
// ], [
// 'auth' => '',
// 'serialize' => \EasySwoole\Redis\Config\RedisConfig::SERIALIZE_PHP
// ]);
// $redisPool = \EasySwoole\RedisPool\RedisPool::getInstance()->register($redisConfig,'redisCluster');
// try {
// // //配置连接池连接数
// $redisPool->setMinObjectNum(5);
// $redisPool->setMaxObjectNum(20);
// }catch (\Exception $e){
// $log = "系统启动时注入REDIS异常 Msg:{$e->getMessage()}";
// }
}
//添加主服务
public static function mainServerCreate(EventRegister $register)
{
self::mainWebSocket($register);//主服务的webSocket
self::subHttp();//子服务的http
self::rpcServer();//添加微服务
self::setAtomicLimit();//限流限制
self::ipAccessCount();//ip计数器
//注册定时任务
Crontab::getInstance()->addTask(TaskNumberCdrCron::class);
//注册定时任务
Crontab::getInstance()->addTask(TaskNumberRecordCron::class);
//注册定时任务
Crontab::getInstance()->addTask(DayCron::class);
//注册定时任务
Crontab::getInstance()->addTask(MonthCron::class);
//注册定时任务
Crontab::getInstance()->addTask(YearCron::class);
//注册定时任务
Crontab::getInstance()->addTask(NumberAllCron::class);
// $scheduler = new \Swoole\Coroutine\Scheduler();
// $scheduler->add(function(){
// $pid = Coroutine::create(function(){
// $redisM = new RedisManager(new Pool(new RedisConfig(['host' => '172.17.0.6'])));
// $redis = $redisM->getNodes("Admin");
// var_dump($redis);
// });
// var_dump($pid);
// var_dump(Coroutine::list());
// });
// $scheduler->start();
// $pid = Coroutine::create(function(){
// $redisM = new RedisManager(new Pool(new RedisConfig(['host' => '172.17.0.6'])));
// $redis = $redisM->getNodes("Admin");
// var_dump($redis);
// });
// var_dump($pid);
// Coroutine::cancel($pid);
// $redisM = new RedisManager(new Pool(new RedisConfig(['host' => '172.17.0.6'])));
// var_dump($redisM->getNode("Admin"));
// self::createProcess();//注册自定义进程
// //创建一个 Dispatcher 配置
// $conf = new \EasySwoole\Socket\Config();
// //设置Dispatcher为WebSocket 模式
// $conf->setType(\EasySwoole\Socket\Config::WEB_SOCKET);
// try {
// $conf->setParser(new WebSocketParser());//设置解析器对象
// $dispatch = new \EasySwoole\Socket\Dispatcher($conf);//创建Dispatcher对象并注入config对象
// } catch (Exception $e) {
// }
// //给server注册相关事件在WebSocket模式下onMessage事件必须注册 并且交给Dispatcher对象处理
// $register->set(EventRegister::onMessage, function (\swoole_websocket_server $server, \swoole_websocket_frame $frame) use ($dispatch) {
// $dispatch->dispatch($server, $frame->data, $frame);
// });
// $websocketEvent = new WebSocketEvent();
// //自定义握手事件
// $register->set(EventRegister::onHandShake,function (\swoole_http_request $request, \swoole_http_response $response)use($websocketEvent){
// $websocketEvent->onHandShake($request,$response);
// });
// //自定义关闭事件
// $register->set(EventRegister::onClose, function (\swoole_server $server, int $fd, int $reactorId) use ($websocketEvent) {
// $websocketEvent->onClose($server, $fd, $reactorId);
// });
// http 子服务
}
//主服务的webSocket
public static function mainWebSocket(EventRegister $register):bool
{
//创建一个 Dispatcher 配置
$conf = new \EasySwoole\Socket\Config();
//设置Dispatcher为WebSocket 模式
$conf->setType(\EasySwoole\Socket\Config::WEB_SOCKET);
try {
$conf->setParser(new WebSocketParser());//设置解析器对象
$dispatch = new \EasySwoole\Socket\Dispatcher($conf);//创建Dispatcher对象并注入config对象
} catch (Exception $e) {
}
//给server注册相关事件在WebSocket模式下onMessage事件必须注册 并且交给Dispatcher(调度器) frame(框架)对象处理
$register->set(EventRegister::onMessage, function (\swoole_websocket_server $server, \swoole_websocket_frame $frame) use ($dispatch) {
$dispatch->dispatch($server, $frame->data, $frame);
});
$websocketEvent = new WebSocketEvent();
//自定义握手事件HandShake(握手)
$register->set(EventRegister::onHandShake,function (\swoole_http_request $request, \swoole_http_response $response)use($websocketEvent){
$websocketEvent->onHandShake($request,$response);
});
//自定义关闭事件
$register->set(EventRegister::onClose, function (\swoole_server $server, int $fd, int $reactorId) use ($websocketEvent) {
$websocketEvent->onClose($server, $fd, $reactorId);
});
return true;
}
//子服务的http
public static function subHttp():bool
{
$port = ServerManager::getInstance()->getSwooleServer()->addlistener('0.0.0.0', 9501, SWOOLE_SOCK_TCP);
$port->set([
'open_http_protocol' => true,
'document_root' => EASYSWOOLE_ROOT ,
'enable_static_handler' => true,
]);
$namespace = Di::getInstance()->get(SysConst::HTTP_CONTROLLER_NAMESPACE);
if (empty($namespace)) {
$namespace = 'App\\HttpController\\';
}
$depth = intval(Di::getInstance()->get(SysConst::HTTP_CONTROLLER_MAX_DEPTH));
$depth = $depth > 5 ? $depth : 5;
$max = intval(Di::getInstance()->get(SysConst::HTTP_CONTROLLER_POOL_MAX_NUM));
if ($max == 0) {
$max = 500;
}
$waitTime = intval(Di::getInstance()->get(SysConst::HTTP_CONTROLLER_POOL_WAIT_TIME));
if ($waitTime == 0) {
$waitTime = 50;
}
$dispatcher = Dispatcher::getInstance()->setNamespacePrefix($namespace)->setMaxDepth($depth)->setControllerMaxPoolNum($max)->setControllerPoolWaitTime($waitTime);;
//补充HTTP_EXCEPTION_HANDLER默认回调
$httpExceptionHandler = Di::getInstance()->get(SysConst::HTTP_EXCEPTION_HANDLER);
if (!is_callable($httpExceptionHandler)) {
$httpExceptionHandler = function ($throwable, $request, $response) {
$response->withStatus(Status::CODE_INTERNAL_SERVER_ERROR);
$response->write(nl2br($throwable->getMessage() . "\n" . $throwable->getTraceAsString()));
Trigger::getInstance()->throwable($throwable);
};
Di::getInstance()->set(SysConst::HTTP_EXCEPTION_HANDLER, $httpExceptionHandler);
}
$dispatcher->setHttpExceptionHandler($httpExceptionHandler);
$requestHook = Di::getInstance()->get(SysConst::HTTP_GLOBAL_ON_REQUEST);
$afterRequestHook = Di::getInstance()->get(SysConst::HTTP_GLOBAL_AFTER_REQUEST);
$port->on(EventRegister::onRequest, function (SwooleRequest $request, SwooleResponse $response) use ($dispatcher, $requestHook, $afterRequestHook) {
$request_psr = new Request($request);
$response_psr = new Response($response);
try {
$ret = null;
if (is_callable($requestHook)) {
$ret = call_user_func($requestHook, $request_psr, $response_psr);
}
if ($ret !== false) {
$dispatcher->dispatch($request_psr, $response_psr);
}
} catch (\Throwable $throwable) {
call_user_func(Di::getInstance()->get(SysConst::HTTP_EXCEPTION_HANDLER), $throwable, $request_psr, $response_psr);
} finally {
try {
if (is_callable($afterRequestHook)) {
call_user_func($afterRequestHook, $request_psr, $response_psr);
}
} catch (\Throwable $throwable) {
call_user_func(Di::getInstance()->get(SysConst::HTTP_EXCEPTION_HANDLER), $throwable, $request_psr, $response_psr);
}
}
$response_psr->__response();
});
return true;
}
//添加微服务
public static function rpcServer(){
// $redisM = new RedisManager(new Pool(new RedisConfig(['host' => '120.25.148.36'])));
//$redisM
$config = new \EasySwoole\Rpc\Config();//// 构造方法内用户可传入 节点管理器实现类(实现 `NodeManagerInterface` 接口的类) 默认为 `MemoryManager`
$config->setServerName('speech');//设置服务名(话术微服务服务)
//设置节点id,
// $config->setNodeId(\EasySwoole\Utility\Random::character(10));
//设置节点id,可忽略,构造函数已经设置
$config->setOnException(function(\Throwable $throwable){
// 设置异常处理器 对Service-Worker 和 AssistWorker的异常进行处理 必须设置 防止未捕获导致进程退出
});
$serverConfig = $config->getServer();
//【必须设置】设置本机IP
$serverConfig->setServerIp('0.0.0.0');
// 设置工作进程数量
$serverConfig->setWorkerNum(4);
// 设置监听地址及端口
$serverConfig->setListenAddress('0.0.0.0');
$serverConfig->setListenPort('9600');
// 设置服务端最大接受包大小
$serverConfig->setMaxPackageSize(1024 * 1024 * 20);
// 设置接收客户端数据时间
$serverConfig->setNetworkReadTimeout(30);
/** 广播设置 */
$assistConfig = $config->getAssist();
// 服务定时自刷新到节点管理器
$assistConfig->setAliveInterval(5000);
// 广播进程设置
$serviceFinderConfig = $assistConfig->getUdpServiceFinder();
// 监听地址和端口
$serviceFinderConfig->setEnableListen(true);
$serviceFinderConfig->setListenAddress('0.0.0.0');
$serviceFinderConfig->setListenPort(9600);
// 设置广播地址
$serviceFinderConfig->setEnableBroadcast(true);
$serviceFinderConfig->setBroadcastAddress(['172.17.0.3:9600', '172.17.0.3:9601']);
$serviceFinderConfig->setBroadcastInterval(5000); // 5s 广播一次
// 设置广播秘钥
$serviceFinderConfig->setEncryptKey('EasySwoole');
/** 客户端设置 */
// 如果只是暴露rpc服务 不进行调用别的rpc服务 可不用设置
$clientConfig = $config->getClient();
// 传输最大数据包大小
$clientConfig->setMaxPackageSize(1024 * 1024 * 40);
// 设置全局回调函数 成功及失败 $response->getStatus !== 0 全部为失败
$clientConfig->setOnGlobalSuccess(function (\EasySwoole\Rpc\Protocol\Response $response){
});
$clientConfig->setOnGlobalFail(function (\EasySwoole\Rpc\Protocol\Response $response){
var_dump($response->getStatus());
var_dump($response->getMsg());
});
/**
* 注册服务
*/
$rpc = new \EasySwoole\Rpc\Rpc($config);
//创建话术服务
$speechService = new \App\Rpc\Service\SpeechService();
//添加SpeechExamineModule话术审核模块到话术服务中
$speechService->addModule(new \App\Rpc\SpeechModule\SpeechExamineModule());
//添加Classify模块到话术服务中
$speechService->addModule(new \App\Rpc\SpeechModule\SpeechClassifyModule());
//添加condition模块到话术服务中
$speechService->addModule(new \App\Rpc\SpeechModule\SpeechConditionModule());
//添加flow模块到话术服务中
$speechService->addModule(new \App\Rpc\SpeechModule\SpeechFlowModule());
//添加flowMul模块到话术服务中
$speechService->addModule(new \App\Rpc\SpeechModule\SpeechFlowMulModule());
//添加Hot模块到话术服务中
$speechService->addModule(new \App\Rpc\SpeechModule\SpeechHotModule());
//添加Label模块到话术服务中
$speechService->addModule(new \App\Rpc\SpeechModule\SpeechLabelModule());
//添加Logy模块到话术服务中
$speechService->addModule(new \App\Rpc\SpeechModule\SpeechLogyModule());
//添加node模块到话术服务中
$speechService->addModule(new \App\Rpc\SpeechModule\SpeechNodeModule());
//添加mulNode模块到话术服务中
$speechService->addModule(new \App\Rpc\SpeechModule\SpeechMulNodeModule());
//添加rule模块到话术服务中
$speechService->addModule(new \App\Rpc\SpeechModule\SpeechRuleModule());
//添加speech模块到话术服务中
$speechService->addModule(new \App\Rpc\SpeechModule\SpeechSpeechModule());
//添加tch(知识库分组前端)模块到话术服务中
$speechService->addModule(new \App\Rpc\SpeechModule\SpeechTchModule());
//添加tech(知识库列表)模块到话术服务中
$speechService->addModule(new \App\Rpc\SpeechModule\SpeechTechModule());
//添加techGroup(知识库分组列表)模块到话术服务中
$speechService->addModule(new \App\Rpc\SpeechModule\SpeechTechGroupModule());
//添加大脑设置
$speechService->addModule(new \App\Rpc\SpeechModule\SpeechBrainModule());
//ASR配置
$speechService->addModule(new \App\Rpc\SpeechModule\SpeechAsrModule());
//Synonym配置
$speechService->addModule(new \App\Rpc\SpeechModule\SpeechSynonymModule());
$speechService->addModule(new \App\Rpc\SpeechModule\SpeechBasicModule());
// 添加 Speech服务到服务管理器中
$rpc->serviceManager()->addService($speechService);
//创建公共模块服务
$commonService = new \App\Rpc\Service\CommonService();
//添加CommonModule模块到Common服务中
$commonService->addModule(new \App\Rpc\SpeechModule\CommonModule());
//添加 Common 服务到服务管理器中
$rpc->serviceManager()->addService($commonService);
//task模块
$taskService = new \App\Rpc\Service\TaskService();
$taskService->addModule(new \App\Rpc\TaskModule\TaskTempModule());
$taskService->addModule(new \App\Rpc\TaskModule\TaskModule());
$taskService->addModule(new \App\Rpc\TaskModule\NumberModule());
//添加 任务模块 服务到服务管理器中
$rpc->serviceManager()->addService($taskService);
//kd和wx模块
$pushService = new \App\Rpc\Service\PushService();
$pushService->addModule(new \App\Rpc\PushModule\KdPushModule());
$pushService->addModule(new \App\Rpc\PushModule\WxPushModule());
//添加 任务模块 服务到服务管理器中
$rpc->serviceManager()->addService($pushService);
//agent模块
$agentService = new \App\Rpc\Service\AgentService();
$agentService->addModule(new \App\Rpc\AgentModule\AgentModule());
//添加 任务模块 服务到服务管理器中
$rpc->serviceManager()->addService($agentService);
//agent模块
$infoService = new \App\Rpc\Service\InfoService();
$infoService->addModule(new \App\Rpc\InfoModule\InfoModule());
//添加 任务模块 服务到服务管理器中
$rpc->serviceManager()->addService($infoService);
//admin模块
$adminService = new \App\Rpc\Service\AdminService();
$adminService->addModule(new \App\Rpc\AdminModule\AdminModule());
$adminService->addModule(new \App\Rpc\AdminModule\MenuModule());
$adminService->addModule(new \App\Rpc\AdminModule\RoleModule());
$adminService->addModule(new \App\Rpc\AdminModule\AgentModule());
$adminService->addModule(new \App\Rpc\AdminModule\UserModule());
$adminService->addModule(new \App\Rpc\AdminModule\ChargeModule());
$adminService->addModule(new \App\Rpc\AdminModule\PbxLinesModule());
$adminService->addModule(new \App\Rpc\AdminModule\PbxLineModule());
$adminService->addModule(new \App\Rpc\AdminModule\PbxSipModule());
$adminService->addModule(new \App\Rpc\AdminModule\PbxGatewayModule());
$adminService->addModule(new \App\Rpc\AdminModule\PbxExtModule());
$adminService->addModule(new \App\Rpc\AdminModule\ConditionModule());
$adminService->addModule(new \App\Rpc\AdminModule\ClassifyModule());
$adminService->addModule(new \App\Rpc\AdminModule\SynonymModule());
$adminService->addModule(new \App\Rpc\AdminModule\SystemModule());
$adminService->addModule(new \App\Rpc\AdminModule\AdminActionLogModule());
//添加examined后台审核(知识库分组列表)模块到话术服务中
$adminService->addModule(new \App\Rpc\AdminModule\SpeechExaminedModule());
$adminService->addModule(new \App\Rpc\AdminModule\TechnologyGroupModule());
$adminService->addModule(new \App\Rpc\AdminModule\LabelModule());
$adminService->addModule(new \App\Rpc\AdminModule\InfoModule());
$adminService->addModule(new \App\Rpc\AdminModule\BasicModule());
//添加 任务模块 服务到服务管理器中
$rpc->serviceManager()->addService($adminService);
//agent模块
$deptService = new \App\Rpc\Service\DeptService();
$deptService->addModule(new \App\Rpc\DeptModule\DeptModule());
$deptService->addModule(new \App\Rpc\DeptModule\SeatModule());
//添加 任务模块 服务到服务管理器中
$rpc->serviceManager()->addService($deptService);
//统计报表
$reportService = new \App\Rpc\Service\ReportService();
$reportService->addModule(new \App\Rpc\ReportModule\AiOutModule());
//添加 任务模块 服务到服务管理器中
$rpc->serviceManager()->addService($reportService);
//crm模块
$crmService = new \App\Rpc\Service\CrmService();
$crmService->addModule(new \App\Rpc\CrmModule\PrivateCustomerModule());
$crmService->addModule(new \App\Rpc\CrmModule\RuleModule());
$crmService->addModule(new \App\Rpc\CrmModule\CustomTypeModule());
$crmService->addModule(new \App\Rpc\CrmModule\BlackNumberModule());
$crmService->addModule(new \App\Rpc\CrmModule\CustomFieldModule());
$rpc->serviceManager()->addService($crmService);
//data大数据模块
$dataService = new \App\Rpc\Service\DataService();
$dataService->addModule(new \App\Rpc\DataModule\DataAModule());
$dataService->addModule(new \App\Rpc\DataModule\DataBModule());
$rpc->serviceManager()->addService($dataService);
//财务模块
$financeService = new \App\Rpc\Service\FinanceService();
$financeService->addModule(new \App\Rpc\FinanceModule\ChargeModule());
$rpc->serviceManager()->addService($financeService);
//基本信息模块
$basicService = new \App\Rpc\Service\BasicService();
$basicService->addModule(new \App\Rpc\BasicModule\ActionLogModule());
$basicService->addModule(new \App\Rpc\BasicModule\BasicModule());
$basicService->addModule(new \App\Rpc\BasicModule\MenuModule());
$basicService->addModule(new \App\Rpc\BasicModule\RoleModule());
$basicService->addModule(new \App\Rpc\BasicModule\UsersModule());
//添加 任务模块 服务到服务管理器中
$rpc->serviceManager()->addService($basicService);
// 此刻的rpc实例需要保存下来 或者采用单例模式继承整个Rpc类进行注册 或者使用Di
Di::getInstance()->set('rpc_memory_object', $rpc);
// var_dump( $rpc);
// 注册 rpc 服务
$rpc->attachServer(ServerManager::getInstance()->getSwooleServer());
}
//注册自定义进程
public static function createProcess(){
###### 注册 双机热备服务 自定义进程 ######
$processConfig = new \EasySwoole\Component\Process\Config([
'processName' => 'Es-release', // 设置 自定义进程名称
'processGroup' => 'Es-release', // 设置 自定义进程组名称
]);
\EasySwoole\Component\Process\Manager::getInstance()->addProcess(new Hot($processConfig));
}
//配置限流
public static function setAtomicLimit()
{
###### 配置限流器 ######
$limit = new AtomicLimit();
/** 为方便测试,(全局的)限制设置为 10 */
$limit->setLimitQps(200);
$limit->attachServer(ServerManager::getInstance()->getSwooleServer());
Di::getInstance()->set('auto_limiter', $limit);
}
//开启IP 限流
public static function ipAccessCount(){
// 开启 IP 限流
IpList::getInstance();
$class = new class('IpAccessCount') extends AbstractProcess
{
protected function run($arg)
{
$this->addTick(1 * 1000, function () {
/**
* 正常用户不会有一秒超过 6 次的api请求
* 做列表记录并清空
*/
IpList::getInstance()->accessList(30);
IpList::getInstance()->clear();
});
}
};
//注册IP限流自定义进程
$processConfig = new \EasySwoole\Component\Process\Config();
$processConfig->setProcessName('IP_LIST');//设置进程名称
$processConfig->setProcessGroup('IP_LIST');//设置进程组
$processConfig->setArg([]);//传参数
$processConfig->setRedirectStdinStdout(false);//是否重定向标准io
$processConfig->setPipeType(\EasySwoole\Component\Process\Config::PIPE_TYPE_SOCK_DGRAM);//设置管道类型
$processConfig->setEnableCoroutine(true);//是否自动开启协程
$processConfig->setMaxExitWaitTime(30);//最大退出等待时间
\EasySwoole\Component\Process\Manager::getInstance()->addProcess(new $class($processConfig));
}
//注入 HTTP_GLOBAL_ON_REQUEST 全局事件,判断和统计 IP 的访问
public static function setHttpGlobalOnRequest()
{
date_default_timezone_set("Asia/Shanghai");
Di::getInstance()->set('HTTP_GLOBAL_ON_REQUEST',function (Request $request,Response $response){
$fd = $request->getSwooleRequest()->fd;
$ip = ServerManager::getInstance()->getSwooleServer()->getClientInfo($fd)['remote_ip'];
$origin = $request->getHeader('origin')[0] ?? '*';
$response->withHeader('Access-Control-Allow-Origin', "*");
$response->withHeader('Access-Control-Allow-Methods', '*');
$response->withHeader('Access-Control-Allow-Credentials', 'true');
$response->withHeader('Access-Control-Allow-Headers', '*');
if ($request->getMethod() === 'OPTIONS') {
$response->withStatus(\EasySwoole\Http\Message\Status::CODE_OK);
return false;
}
// 如果当前周期的访问频率已经超过设置的值,则拦截
// 测试的时候可以将 30 改小,比如 3
if(IpList::getInstance()->access($ip)>30){
/**
* 直接强制关闭连接
*/
ServerManager::getInstance()->getSwooleServer()->close($fd);
// 调试输出 可以做逻辑处理
echo '被拦截' . PHP_EOL;
return false;
}
//调试输出可以做逻辑处理
//echo "正常访问0506".PHP_EOL;
return true;
});
}
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
PHP
1
https://gitee.com/fanpq/robotphp.git
git@gitee.com:fanpq/robotphp.git
fanpq
robotphp
robotphp
master

搜索帮助