1 Star 1 Fork 0

hotmocha/spider

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
coremanager.c 14.93 KB
一键复制 编辑 原始数据 按行查看 历史
hotmocha 提交于 2015-04-02 22:29 . spider init

#include "coremanager.h"
#include <dlfcn.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include "spiderpub.h"
#define SOSTARTMAIN "SoMain"
static struct Env gCoreEnv;
typedef int (*SoMain)(struct TaskEnv*);
static int ManagerSigals[] = { SIGQUIT, SIGHUP, SIGWINCH, SIGTERM, SIGUSR1, 0 };
/**** 处理信号 ****/
void SignalManagerHandler(int signo)
{
switch(signo) {
case SIGQUIT:
gCoreEnv.cmd = COREMANAGER_CMD_EXIT;
break;
case SIGHUP:
gCoreEnv.cmd = COREMANAGER_CMD_RECONFIG;
break;
case SIGWINCH:
gCoreEnv.cmd = COREMANAGER_CMD_SHOWSTATUS;
break;
case SIGTERM:
gCoreEnv.cmd = COREMANAGER_CMD_QUICKEXIT;
break;
case SIGUSR1:
gCoreEnv.cmd = COREMANAGER_CMD_RESTART;
break;
default:
break;
}
}
/**** 初始化信号 ****/
int InitManagerSignal()
{
int sig;
struct sigaction sa;
int* pIndex = 0;
for (sig = 0; sig < 30; sig++) {
signal(sig, SIG_DFL);
}
memset(&sa, 0x00, sizeof(struct sigaction));
sa.sa_handler = SignalManagerHandler;
sigemptyset(&sa.sa_mask);
for (pIndex = ManagerSigals; *pIndex != 0; pIndex++) {
if (sigaction(*pIndex, &sa, NULL) == -1)
return -1;
}
return 0;
}
int MaskManagerSignal()
{
int sig;
struct sigaction sa;
int* pIndex = 0;
memset(&sa, 0x00, sizeof(struct sigaction));
sa.sa_handler = SIG_IGN;
sigemptyset(&sa.sa_mask);
for (pIndex = ManagerSigals; *pIndex != 0; pIndex++) {
if (sigaction(*pIndex, &sa, NULL) == -1)
return -1;
}
return 0;
}
/**** 任务的所有信号都为默认 ****/
int DefaultTaskSignal()
{
int sig;
for (sig = 0; sig < 30; sig++) {
signal(sig, SIG_DFL);
}
return 0;
}
struct TaskEnv* GetTaskEnvByIndex(int i)
{
struct TaskEnv *tenv = NULL;
if (i >= gCoreEnv.maxtasknum)
return NULL;
tenv = (struct TaskEnv*)(gCoreEnv.ptrTaskEnvShm + sizeof(struct TaskEnv) * i);
return tenv;
}
struct TaskEnv *GetTaskEnvByPid(pid_t pid)
{
int i = 0;
struct TaskEnv *tenv = NULL;
if (pid)
return NULL;
for (i = 0; i < gCoreEnv.maxtasknum; i++) {
tenv = GetTaskEnvByIndex(i);
if (tenv->pid == pid) {
return tenv;
}
}
return NULL;
}
int LoadLibrary(char *filename, void** ppHandle)
{
void *handle;
char *error = NULL;
handle = dlopen(filename, RTLD_LAZY);
if (!handle) {
ErrorLog(__FILE__, __LINE__, "dlopen failed, filename[%s]-err[%s]", filename, dlerror());
return SPIDER_INTERNAL_ERR;
}
*ppHandle = handle;
return 0;
}
int GetStartfunc(void *pHandle, SoMain *func)
{
char *error = NULL;
*func = dlsym(pHandle, SOSTARTMAIN);
if ((error = dlerror()) != NULL) {
ErrorLog(__FILE__, __LINE__, "dlsym failed,error[%s]", error);
return SPIDER_INTERNAL_ERR;
}
return 0;
}
int InitCoreEnv()
{
int shmid;
int shmsize = 0;
int r = 0;
gCoreEnv.maxtasknum = MAXTASKNUM;
/* 初始化共享内存 */
shmsize = gCoreEnv.maxtasknum * sizeof(struct TaskEnv);
shmid = shmget(IPC_PRIVATE, shmsize, 0600);
if (shmid == -1) {
ErrorLog(__FILE__, __LINE__, "create shm failed");
return SPIDER_INTERNAL_ERR;
}
gCoreEnv.ptrTaskEnvShm = shmat(shmid, NULL, 0);
if (gCoreEnv.ptrTaskEnvShm == (void*)-1) {
ErrorLog(__FILE__, __LINE__, "shmat failed");
return SPIDER_INTERNAL_ERR;
}
gCoreEnv.shmid = shmid;
memset(gCoreEnv.ptrTaskEnvShm, 0x00, shmsize);
ErrorLog( __FILE__ , __LINE__ , "初始化共享内存[%d]" , shmid );
return 0;
}
struct TaskEnv* FindUnusedTaskEnv()
{
struct TaskEnv *tenv = NULL;
int i = 0;
for (i = 0; i < gCoreEnv.maxtasknum; i++) {
tenv = GetTaskEnvByIndex(i);
if (tenv->status == TASKENV_UNUSED) {
return tenv;
}
}
return NULL;
}
/* 配置文件格式: Taskname:SoFileName*/
int ReadAndInitTaskListConfig()
{
// TODO: now only init one task
/* read task list, init task */
int freetenv;
struct TaskEnv *tenv = NULL;
tenv = GetTaskEnvByIndex(0);
if (tenv == NULL) {
ErrorLog(__FILE__, __LINE__, "too many task!");
return SPIDER_INTERNAL_ERR;
}
strcpy(tenv->sofilename, "./renrendai/librenrendai.so");
strcpy(tenv->taskname, "test task");
// tenv->status = TASKENV_USED;
gCoreEnv.tasknum++;
return 0;
}
int ReloadTaskListConfig()
{
/* find the new task, the delete task */
return 0;
}
int ShowStatus()
{
return 0;
}
int ForkEntry(struct TaskEnv *env, SoMain somainfunc)
{
int ret = 0;
pid_t pid;
void* pHandle = NULL;
if (env == NULL) {
ErrorLog(__FILE__, __LINE__, "taskenv null");
return SPIDER_ARG_ERR;
}
if ((pid = fork()) < 0) {
ErrorLog(__FILE__, __LINE__, "fork failed");
return SPIDER_INTERNAL_ERR;
}
else if(pid == 0) { /* in child */
env->status = TASKENV_USED;
/* 初始化信号 */
DefaultTaskSignal();
SetLogFile("./tasklog");
SetLogLevel( 0 );
env->pid = getpid();
/***** 开始就传递进来
ret = LoadLibrary(env->sofilename, &pHandle);
if (ret) {
ErrorLog(__FILE__, __LINE__, "task[%s] loadlibrary failed", env->taskname);
return ret;
}
ret = GetStartfunc(pHandle, &somainfunc);
if (ret) {
ErrorLog(__FILE__, __LINE__, "task[%s] get startfunc failed", env->taskname);
return ret;
}
*********/
/* 开始执行so中的main函数 */
ret = somainfunc(env);
/* 到这里一定是不正常退出,需要重新初始化 */
if (ret) {
ErrorLog(__FILE__, __LINE__, "task[%s] exit, ret[%d]!", env->taskname, ret);
}
env->pid = 0;
env->status = TASKENV_NEEDREINIT;
exit(0); /* 结束 */
}
else { /* in parent */
env->status = TASKENV_USED;
env->pid = pid;
ErrorLog(__FILE__, __LINE__, "task[%s]-pid[%d] start", env->taskname, pid);
}
return 0;
}
/* 发送停止命令, 让任务进程处理 */
int SlowKillAllTasks()
{
int r = 0;
int i;
struct TaskEnv *tenv = NULL;
for (i = 0; i < gCoreEnv.maxtasknum; i++) {
tenv = GetTaskEnvByIndex(i);
if (tenv->status == TASKENV_USED && tenv->pid > 0) {
sem_P(tenv->cmdlock);
tenv->cmd = TASKMANAGER_CMD_SLOWABORT;
sem_V(tenv->cmdlock);
}
}
return 0;
}
/* 发送死亡信号, 直接杀死 */
int QuickKillAllTasks()
{
int r = 0;
int i;
struct TaskEnv *tenv = NULL;
for (i = 0; i < gCoreEnv.maxtasknum; i++) {
tenv = GetTaskEnvByIndex(i);
if (tenv->status == TASKENV_UNUSED && tenv->pid > 0) {
kill(tenv->pid, SIGKILL);
}
}
return 0;
}
/* 替死去的子进程收尸 */
int ProcessDieTask()
{
pid_t pid;
int r = 0;
struct TaskEnv *tenv = NULL;
int status;
while(1) {
pid = waitpid(-1, &status, WNOHANG);
/* 暂时没有子进程死亡 */
if (pid == 0) {
break;
}
else if (pid < 0) {
/* 已经没有子进程可以wait了 */
if (errno == ECHILD) {
ErrorLog( __FILE__ , __LINE__ , "已经没有任务可以等待了" );
break;
}
else {
ErrorLog( __FILE__ , __LINE__ , "监控子进程结束信号出错" );
r = -1;
continue;
}
}
else {
/* 正确捕获了子进程的死亡信号 */
tenv = GetTaskEnvByPid(pid);
if (!tenv) {
ErrorLog( __FILE__ , __LINE__ , "在主环境中居然没有找到[%d]", pid );
}
else {
ErrorLog( __FILE__ , __LINE__ , "任务[%s]-pid[%d]退出", tenv->taskname, pid );
}
tenv->pid = 0;
gCoreEnv.tasknum--;
continue;
}
}
return r;
}
int WaitAllTaskDie()
{
int r = 0;
pid_t pid;
struct TaskEnv *tenv = NULL;
int status;
unsigned int usecs;
int waitloop = 0;
int cannotwait = 0;
while (gCoreEnv.tasknum > 0) {
while(1) {
pid = waitpid(-1, &status, WNOHANG);
/* current no child exit */
if (pid == 0) {
break;
}
else if (pid < 0) {
/* have no child */
if (errno == ECHILD) {
ErrorLog( __FILE__ , __LINE__ , "已经没有任务可以等待了" );
gCoreEnv.tasknum = 0;
}
else {
ErrorLog( __FILE__ , __LINE__ , "监控任务进程结束信号出错" );
continue;
}
}
else {
tenv = GetTaskEnvByPid(pid);
if (tenv) {
ErrorLog( __FILE__ , __LINE__ , "在主环境中居然没有找到[%d]", pid );
continue;
}
tenv->status = TASKENV_UNUSED;
tenv->pid = 0;
gCoreEnv.tasknum--;
}
}
usleep(200);
waitloop++;
/* 最多循环25次 */
if (waitloop >= 25) {
if (cannotwait == 1) {
ErrorLog( __FILE__ , __LINE__ , "已经强制杀死任务进程但还是存在残余, num[%d]", gCoreEnv.tasknum );
r = SPIDER_INTERNAL_ERR;
break;
}
/* 都这么久了还没有死,直接发死亡信号 */
ErrorLog( __FILE__ , __LINE__ , "强制杀死任务进程,num[%d]", gCoreEnv.tasknum );
QuickKillAllTasks();
waitloop = 10;
cannotwait = 1;
}
}
return 0;
}
int GetSoMainFunc(struct TaskEnv *env, SoMain *somainfunc)
{
int ret = 0;
void *pHandle = NULL;
ret = LoadLibrary(env->sofilename, &pHandle);
if (ret) {
ErrorLog(__FILE__, __LINE__, "task[%s]加载路径[%s]下的so文件[%s]出错", env->taskname, env->basepath, env->sofilename);
return ret;
}
ret = GetStartfunc(pHandle, somainfunc);
if (ret) {
ErrorLog(__FILE__, __LINE__, "task[%s]获得入口函数[%s]出错", env->taskname, SOSTARTMAIN);
return ret;
}
return ret;
}
int InitTaskListProcess()
{
int r = 0;
int i;
int failednum = 0;
for (i = 0; i < gCoreEnv.tasknum; i++) {
struct TaskEnv *tenv = NULL;
SoMain somainfunc = NULL;
tenv = GetTaskEnvByIndex(i);
/* 检查so时候存在,并且有权限执行 */
r = access(tenv->sofilename, X_OK | F_OK);
if (r) {
ErrorLog( __FILE__ , __LINE__ , "task[%s]没有动态库或者没有可执行权限" , tenv->taskname );
failednum++;
continue;
}
/* 获得入口函数 */
r = GetSoMainFunc(tenv, &somainfunc);
if (r) {
ErrorLog( __FILE__ , __LINE__ , "task[%s]获得入口函数出错");
failednum++;
continue;
}
/* 创建新的任务进程 */
r = ForkEntry(tenv, somainfunc);
if (r) {
ErrorLog(__FILE__, __LINE__, "task[%s] forkentry 失败,ret[%d]", tenv->taskname, r);
r = 0;
tenv->status = TASKENV_UNUSED;
tenv->pid = 0;
failednum++;
}
else {
ErrorLog(__FILE__, __LINE__, "task[%s] forkentry 成功,ret[%d]", tenv->taskname, r);
tenv->status = TASKENV_USED;
}
}
gCoreEnv.factinitedtasknum = gCoreEnv.tasknum - failednum;
ErrorLog( __FILE__ , __LINE__ , "共[%d]task,实际成功初始化[%d]" , gCoreEnv.tasknum, gCoreEnv.factinitedtasknum);
return 0;
}
/* 不管原来什么状态,如果失败就设置为未使用状态 */
int ReinitTaskProcess(struct TaskEnv *tenv)
{
int r = 0;
SoMain somainfunc = NULL;
r = access(tenv->sofilename, X_OK | F_OK);
if (r) {
ErrorLog( __FILE__ , __LINE__ , "task[%s]没有动态库或者没有可执行权限" , tenv->taskname );
tenv->status = TASKENV_UNUSED;
return r;
}
/* 获得入口函数 */
r = GetSoMainFunc(tenv, &somainfunc);
if (r) {
ErrorLog( __FILE__ , __LINE__ , "task[%s]获得入口函数出错");
tenv->status = TASKENV_UNUSED;
return r;
}
r = ForkEntry(tenv, somainfunc);
if (r) {
ErrorLog(__FILE__, __LINE__, "task[%s] reinit failed,ret[%d]", tenv->taskname, r);
tenv->status = TASKENV_UNUSED;
tenv->pid = 0;
}
else {
ErrorLog(__FILE__, __LINE__, "task[%s] reinit success,ret[%d]", tenv->taskname, r);
gCoreEnv.tasknum++;
tenv->status = TASKENV_USED;
}
return r;
}
/* 处理意外退出任务,超时等原因导致status = TASKENV_NEEDREINIT 的任务 */
int ScanNeedReinitTaskProcess()
{
int r = 0;
int rr = 0;
int i = 0;
struct TaskEnv *tenv = NULL;
for (i = 0; i < gCoreEnv.maxtasknum; i++) {
tenv = GetTaskEnvByIndex(i);
/* 只会扫面状态为TASKENV_NEEDREINIT的任务 */
if (tenv->status == TASKENV_NEEDREINIT) {
/* 如果初始化失败会被清理为未使用状态,下次不会被重新初始化 */
int rr = ReinitTaskProcess(tenv);
if (r) {
ErrorLog(__FILE__, __LINE__, "task[%s] 重新初始化失败,ret[%d]", tenv->taskname, r);
r = rr;
}
else {
ErrorLog(__FILE__, __LINE__, "task[%s] 重新初始化成功", tenv->taskname);
}
}
}
return r;
}
/* 清除超时进程,暴力杀死 */
int KillTask(struct TaskEnv *tenv)
{
int r = 0;
ErrorLog( __FILE__ , __LINE__ , "task[%s] timeout, 开始清理" , tenv->taskname );
/* 这里只是发信号不修改状态,状态在waitpid捕获后修改 */
kill(tenv->pid, SIGKILL);
ErrorLog( __FILE__ , __LINE__ , "task[%s] timeout, 清理成功" , tenv->taskname );
}
int CleanIPCResource()
{
int r;
r = shmdt(gCoreEnv.ptrTaskEnvShm);
if( r == -1 )
return -1;
r = shmctl(gCoreEnv.shmid, IPC_RMID, (struct shmid_ds *)0);
if (r == -1)
return -1;
return 0;
}
int ProcessTimeout()
{
int i;
for (i = 0; i < gCoreEnv.maxtasknum; i++) {
struct TaskEnv *tenv = NULL;
tenv = GetTaskEnvByIndex(i);
if (tenv->status == TASKENV_USED && time(NULL) - tenv->lasttime >= tenv->timeout) {
KillTask(tenv);
}
/* 这样被杀死的进程还有机会重新初始化 */
tenv->status = TASKENV_NEEDREINIT;
}
return 0;
}
int ProcessSignal()
{
int r = 0;
if (gCoreEnv.cmd == COREMANAGER_CMD_EXIT) {
/* 收到退出通知先屏蔽信号 */
MaskManagerSignal();
/* 先发出通知到任务让任务退出-但是不会马上执行 */
SlowKillAllTasks();
r = WaitAllTaskDie();
if (r) {
ErrorLog( __FILE__ , __LINE__ , "等待所有任务退出过程中出错");
}
ErrorLog(__FILE__, __LINE__, "销毁IPC资源");
r = CleanIPCResource();
if (r) {
ErrorLog( __FILE__ , __LINE__ , "清理共享内存出错" );
}
else {
ErrorLog( __FILE__ , __LINE__ , "清理共享内存成功" );
}
exit(0);
}
else if (gCoreEnv.cmd == COREMANAGER_CMD_QUICKEXIT) {
/* 收到退出通知先屏蔽信号 */
MaskManagerSignal();
/* 直接kill任务 */
QuickKillAllTasks();
r = WaitAllTaskDie();
if (r) {
ErrorLog( __FILE__ , __LINE__ , "等待所有任务退出过程中出错");
}
ErrorLog(__FILE__, __LINE__, "销毁IPC资源");
CleanIPCResource();
exit(0);
}
else if (gCoreEnv.cmd == COREMANAGER_CMD_RECONFIG) {
r = ReloadTaskListConfig();
if (r) {
}
}
return 0;
}
int main(int argc, char *args[])
{
int r = 0;
int i;
int sig;
struct sigaction sa;
SetLogFile("./log");
SetLogLevel( 0 );
r = InitCoreEnv();
if (r) {
ErrorLog(__FILE__, __LINE__, "初始化core环境failed");
r = CleanIPCResource();
if (r) {
ErrorLog( __FILE__ , __LINE__ , "清理共享内存出错" );
}
else {
ErrorLog( __FILE__ , __LINE__ , "清理共享内存成功" );
}
exit(1);
}
r = InitManagerSignal();
if (r) {
ErrorLog(__FILE__, __LINE__, "初始化core信号failed");
exit(1);
}
r = ReadAndInitTaskListConfig();
if (r) {
ErrorLog(__FILE__, __LINE__, "读取任务参数表failed");
exit(1);
}
ErrorLog( __FILE__ , __LINE__ , "任务数量[%d]" , gCoreEnv.tasknum);
if (gCoreEnv.tasknum == 0) {
ErrorLog( __FILE__ , __LINE__ , "没有找到可执行的任务");
goto E;
}
r = InitTaskListProcess();
if (r) {
ErrorLog(__FILE__, __LINE__, "初始化任务有错误");
/* 这里不会exit */
}
ErrorLog( __FILE__ , __LINE__ , "r=%d" , r );
if (gCoreEnv.factinitedtasknum == 0) {
ErrorLog(__FILE__, __LINE__, "所有任务初始化都失败");
goto E;
}
ErrorLog( __FILE__ , __LINE__ , "管理进程开始进入主循环" );
for (;;) {
/* 处理外部信号 */
ProcessSignal();
/* 处理子进程的死亡信号 */
ProcessDieTask();
/* 处理超时子进程 */
ProcessTimeout();
/* 扫描需要重新初始化的任务 */
ScanNeedReinitTaskProcess();
usleep(500 * 1000);
}
ErrorLog(__FILE__, __LINE__, "等待所有子进程退出");
/* 等待所有子进程退出 */
WaitAllTaskDie();
ErrorLog( __FILE__ , __LINE__ , "所有任务都已经退出" );
E:
/* 销毁IPC资源 */
ErrorLog(__FILE__, __LINE__, "销毁IPC资源");
r = CleanIPCResource();
if (r) {
ErrorLog( __FILE__ , __LINE__ , "清理共享内存出错" );
}
else {
ErrorLog( __FILE__ , __LINE__ , "清理共享内存成功" );
}
ErrorLog(__FILE__, __LINE__, "管理进程退出");
return 0;
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/hotmocha/spider.git
git@gitee.com:hotmocha/spider.git
hotmocha
spider
spider
master

搜索帮助

23e8dbc6 1850385 7e0993f3 1850385