1 Star 0 Fork 17

fanpq/robotphp

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
JobDaemon.php 6.73 KB
一键复制 编辑 原始数据 按行查看 历史
黄超h 提交于 2023-06-06 14:44 . 提交代码
<?php
declare(strict_types=1);
//require_once __DIR__ . '/RedisChat.php';
require_once __DIR__ . '/MysqlChat.php';
use Swoole\Coroutine;
use Swoole\Database\RedisConfig;
use Swoole\Database\RedisPool;
use Swoole\Runtime;
class JobDaemon
{
private $maxProcesses = 800;
// private $child = 0;
private $redis_task_wing = 'cdr'; //待处理队列
private $works;
private $mysqlChat;
private $processPool;
public function __construct()
{
// pcntl_signal(SIGCHLD,array($this,"sig_handler"));
set_time_limit(0);
// ini_set('default_socket_timeout', -1); //队列处理不超时,解决redis报错:read error on connection
// $this->mysqlChat = MysqlChat::getInstance();
// $this->processPool = new Swoole\Process\Pool($this->maxProcesses);
// $this->processPool->set(['enable_coroutine' => true]);
}
//process 处理第一个
function process(swoole_process $worker)
{
$this->works = $worker;
swoole_event_add($worker->pipe, function($pipe) {
$worker = $this->works;
//读出swoole_process进程中
$result = $worker->read(); //send data to master
// $result = json_decode($result,true);//redis pop的结果
// go(function()use ($result){
// $table = "autodialer_number_" . str_replace("task_name_queue", "", $result["source_name"]);
// $callId = $result["uuid"];
// $createdTime = $result["created_time"];
// $answerDate = $result["answered_time"];
// $hangupDate = $result["hungup_time"];
// $bill = $result["billsec"];
// $duration = $result["duration"];
// $hangupCause = $result["hangup_cause"];
// $number = $result["caller_id_number"];
// $detail = $result["details"];
// var_dump($detail);
// $hangupDisposition = $result["sip_hangup_disposition"];
// $querySql = "update from $table
// set callid = $callId
// ,calldate = $createdTime
// ,answerdate = $answerDate
// ,hangupdate = $hangupDate
// ,bill = $bill
// ,duration = $duration
// ,hangupcause = $hangupCause
// ,hangup_disposition = $hangupDisposition
// ,read_status=1
// where number = $number and read_status=0";
// $this->mysqlChat->query($querySql);
// usleep(200);
// }
// );
echo "From Master: $result\n";
$worker->exit(0);
});
}
//测试
public function testAction()
{
$rds = RedisChat::getInstance();
for($i=0;$i<10000;$i++){
$data = [
'abc'=>$i,
'timestamp'=>time().rand(100,199)
];
$rds->push($this->redis_task_wing, json_encode($data));
}
exit("结束");
}
//任务跑获取进程
public function runAction()
{
Runtime::enableCoroutine();
go(function () {
$redisPool = new RedisPool((new RedisConfig)
->withHost('172.17.0.6')
->withPort(6379)
->withAuth('')
->withDbIndex(0)
->withTimeout(1)
);
Coroutine::create(function () use ($redisPool) {
while (1) {
$redis = $redisPool->get();
$data_pop = $redis->blPop("cdr", 3);//无任务时,阻塞等待
echo '运行后内存:' . memory_get_usage() . PHP_EOL;
$redisPool->put($redis);
if (!$data_pop) {
continue;
}
if ($this->child < $this->maxProcesses) {
$this->child++;
$process = new swoole_process([$this, 'process']);
$process->write(json_encode($data_pop));//写入到swoole_process进程中
$pid = $process->start();//进程启动
$server = new Swoole\Server('127.0.0.1', 9501);
$server->addProcess($process);
}
}
});
});
// ini_set('swoole.enable_coroutine','Off');
// $chan = $this->redisChat->pool;
// echo '开始内存:'.memory_get_usage(), '';
// while(1) {
// var_dump("111111");
// $this->redisChat->get();
// $redis = $chan->pop();
// $data_pop = $redis->blPop($this->redis_task_wing, 3);//无任务时,阻塞等待
// echo '运行后内存:'.memory_get_usage(), '';
//// if (!$data_pop) {
//// continue;
//// }
// if ($this->child < $this->maxProcesses) {
// $this->child++;
// $process = new swoole_process([$this, 'process']);
// $process->write(json_encode($data_pop));//写入到swoole_process进程中
// $pid = $process->start();//进程启动
// }
// }
}
public function runProcessPool()
{
$this->processPool = new Swoole\Process\Pool(10);
$this->processPool->set(['enable_coroutine' => true]);
$this->processPool->on('WorkerStart', function (Swoole\Process\Pool $pool, $workerId) {
echo("[Worker #{$workerId}] WorkerStart, pid: " . posix_getpid() . "\n");
static $running = true;
Swoole\Process::signal(SIGTERM, function () use (&$running) {
$running = false;
echo "TERM\n";
});
$i = 0;
while ($running) {
Coroutine::sleep(1);
$i++;
if ($i == 5) {
$pool->detach();
} elseif ($i == 10) {
break;
}
}
});
$this->processPool->on('WorkerStop', function (Swoole\Process\Pool $pool, $workerId) {
echo("[Worker #{$workerId}] WorkerStop, pid: " . posix_getpid() . "\n");
});
$this->processPool->start();
}
// private function sig_handler($signo) {
// switch ($signo) {
// case SIGCHLD:
// while($ret = swoole_process::wait(false)) {
// $this->child--;
// }
// }
// }
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
PHP
1
https://gitee.com/fanpq/robotphp.git
git@gitee.com:fanpq/robotphp.git
fanpq
robotphp
robotphp
master

搜索帮助