1 Star 0 Fork 15

yuanzy/sbalance

forked from lihuaib/sbalance 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
sbdns.c 18.70 KB
一键复制 编辑 原始数据 按行查看 历史

#include <ares.h>
#include <sys/types.h>
#include <string.h>
#include <signal.h>
#include <arpa/nameser.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include "sblist.h"
#include "sbdns.h"
#include "sbmain.h"
#include "sbalance.h"
#define SB_DNS_HASH_SLOTS 111
#define SB_DNS_TIMEOUT_TIME 2500 /* 5000ms */
#define SB_DNS_ARES_OPT ARES_OPT_TIMEOUTMS|ARES_OPT_TRIES
static int sig_cmd;
static int exiting = 0;
static struct hlist_head *hash_slots;
static struct list_head *wait_parse_list_head;
static struct list_head *parsing_list_head;
static struct ares_options options;
static int waiting_parsing_count = 0;
static void callback_gethost(void *arg, int status, int timeouts, struct hostent *h);
void sb_debug_show_dns_waiting_info();
void sb_debug_show_dns_parsing_info();
void sb_debug_show_dns_hash_info();
static unsigned long RSHash(const char *str)
{
unsigned long a = 63689;
unsigned long b = 378551;
unsigned long hash = 0;
while(*str) {
hash = hash * a + (*str++);
a *= b;
}
return hash;
}
static void sb_dns_msg_tag_status(struct dns_msg *msg, int host_len, int ip_len, char status)
{
msg->mtext_len = host_len;
msg->m.mtext[msg->mtext_len] = '#';
msg->m.mtext[msg->mtext_len + 1] = status;
msg->m.mtext[msg->mtext_len + 2] = '#';
msg->mtext_len += 3 + ip_len;
}
static inline void sb_dns_gen_msg(struct dns_msg *msg, struct sb_dns *dns, char status)
{
int host_len;
int ip_len;
host_len = strlen(dns->host);
ip_len = strlen(dns->ip);
memcpy(msg->m.mtext, dns->host, host_len);
memcpy(msg->m.mtext + host_len + 3, dns->ip, ip_len);
sb_dns_msg_tag_status(msg, host_len, ip_len, status);
}
int sb_dns_init_env()
{
int i = 0;
hash_slots = (struct hlist_head*)(malloc( sizeof(struct hlist_head) * SB_DNS_HASH_SLOTS ));
if (hash_slots == NULL)
return SB_ERROR;
for (i = 0; i < SB_DNS_HASH_SLOTS; i++) {
INIT_HLIST_HEAD(&hash_slots[i]);
}
wait_parse_list_head = (struct list_head*)(malloc(sizeof(struct list_head)));
if (wait_parse_list_head == NULL) {
free(hash_slots);
return SB_ERROR;
}
INIT_LIST_HEAD(wait_parse_list_head);
parsing_list_head = (struct list_head*)(malloc(sizeof(struct list_head)));
if (wait_parse_list_head == NULL) {
free(hash_slots);
return SB_ERROR;
}
INIT_LIST_HEAD(parsing_list_head);
options.timeout = SB_DNS_TIMEOUT_TIME;
options.tries = 2;
return SB_OK;
}
/* @RETURN 1. SB_NOTFOUND 2. SB_OK */
int sb_dns_modify_status(char *host, int new_status)
{
int slot_index;
struct hlist_head *hash_head;
slot_index = RSHash(host) % SB_DNS_HASH_SLOTS;
hash_head = &hash_slots[slot_index];
if (hash_head->first == NULL)
return SB_NOTFOUND;
else {
struct hlist_node *pos;
struct hlist_node *pre_pos = NULL;
struct sb_dns *type;
int cmp;
hlist_for_each_entry(type, pos, hash_head, hash_node) {
if ((cmp = strcmp(type->host, host)) == 0) {
type->status = new_status;
return SB_OK;
} else if (cmp > 0) {
return SB_NOTFOUND;
}
}
return SB_NOTFOUND;
}
}
void sb_dns_set_worker_bitmap(char *bitmap, int workerno, int flag)
{
int index, offset;
workerno--;
index = workerno >> 3;
offset = workerno - (index << 3);
DebugOutput("BEFORE SET workerno[%d]-flag[%d]-index[%d]-char[%x]", workerno, flag, index, bitmap[index]);
if (flag) {
bitmap[index] |= (0x1 << offset);
} else {
bitmap[index] &= (~ (0x1 << offset) );
}
DebugOutput("AFTER SET workerno[%d]-flag[%d]-index[%d]-char[%x]", workerno, flag, index, bitmap[index]);
}
void sb_dns_set_worker_array(struct sb_dns *type, int workerno, int flag)
{
if (type->waiting_dns_worker_num == SB_DNS_ARRAY_LEN) {
return;
}
type->waiting_dns_worker_array[type->waiting_dns_worker_num]= workerno;
type->waiting_dns_worker_num++;
DebugOutput("SET workerno[%d]-flag[%d]", workerno, flag);
}
inline int sb_dns_get_workerno(int index, int pos)
{
return ((index << 3) + pos + 1);
}
/*
@INPUT
1. len: input & output
@RETURN
1. SB_OK(find host's ip) ip end up with '\0'
2. SB_AGAIN(a. not find host's ip, but add query request, b. find host, but the status is parsing)
3. SB_TIMEOUT(find host, but status is timeout) temp status, ** not possible status **
4. SB_ERROR
*/
int sb_dns_query_host_ip(char *host, char *ip, int *len, int workerno)
{
int rc;
struct sb_dns *new_sb_dns;
unsigned long hash_val;
int slot_index;
struct hlist_head *hash_head;
int flag = 0;
int i =0;
slot_index = RSHash(host) % SB_DNS_HASH_SLOTS;
hash_head = &hash_slots[slot_index];
if (hash_head->first == NULL) {
new_sb_dns = (struct sb_dns*)calloc(1, sizeof(struct sb_dns));
if (!new_sb_dns) {
ErrorOutput("malloc new sb_dns failed\n");
return SB_ERROR;
}
/* new_sb_dns->timeout_time = sb_get_msec_timeout_time(SB_DNS_TIMEOUT); */
sb_dns_set_worker_array(new_sb_dns, workerno, 1);
strcpy(new_sb_dns->host, host);
rc = ares_init_options(&new_sb_dns->channel, &options, SB_DNS_ARES_OPT);
if (rc) {
return SB_ERROR;
}
ares_gethostbyname(new_sb_dns->channel, new_sb_dns->host, AF_INET,callback_gethost, new_sb_dns);
#if 0
DebugOutput("BEFORE ADD host[%s]", host);
sb_debug_show_dns_waiting_info();
sb_debug_show_dns_hash_info();
#endif
/* add to hash and wait list */
hlist_add_head(&new_sb_dns->hash_node, hash_head);
new_sb_dns->status = SB_DNS_STATUS_WAITING;
list_add(&new_sb_dns->list_node, wait_parse_list_head);
waiting_parsing_count++;
#if 0
DebugOutput("AFTER ADD host[%s]", host);
sb_debug_show_dns_waiting_info();
sb_debug_show_dns_hash_info();
#endif
return SB_AGAIN;
}
else {
struct hlist_node *pos = NULL;
struct hlist_node *pre_pos = NULL;
struct sb_dns *type;
int cmp;
int ip_len;
hlist_for_each_entry(type, pos, hash_head, hash_node) {
if ((cmp = strcmp(type->host, host)) == 0) {
if (type->status == SB_DNS_STATUS_OK) {
if ((ip_len = strlen(type->ip)) > *len) {
return SB_ERROR;
} else {
memcpy(ip, type->ip, ip_len);
ip[ip_len] = '\0';
*len = ip_len;
DebugOutput("HIT host[%s]-ip[%s]", host, type->ip);
return SB_OK;
}
} else if (type->status == SB_DNS_STATUS_WAITING || type->status == SB_DNS_STATUS_PARSING) {
sb_dns_set_worker_array(type, workerno, 1);
return SB_AGAIN;
} else if (type->status == SB_DNS_STATUS_DESTORYED) {
memset (type->waiting_dns_worker_bitmap, 0x00, sizeof (type->waiting_dns_worker_bitmap));
sb_dns_set_worker_array(type, workerno, 1);
/* re-add to wait parsing list to request a parse */
rc = ares_init_options(&type->channel, &options, SB_DNS_ARES_OPT);
if (rc)
return SB_ERROR;
#if 0
DebugOutput("BEFORE ADD host[%s]", host);
sb_debug_show_dns_waiting_info();
sb_debug_show_dns_hash_info();
#endif
type->status = SB_DNS_STATUS_WAITING;
list_add(&type->list_node, wait_parse_list_head);
waiting_parsing_count++;
#if 0
DebugOutput("AFTER ADD host[%s]", host);
sb_debug_show_dns_waiting_info();
#endif
return SB_AGAIN;
} else {
FatalOutput("not possible status for sb_dns, bug need fix");
return SB_ERROR;
}
} else if (cmp > 0) {
new_sb_dns = (struct sb_dns*)calloc(1, sizeof(struct sb_dns));
if (!new_sb_dns)
return SB_ERROR;
sb_dns_set_worker_array(new_sb_dns, workerno, 1);
strcpy(new_sb_dns->host, host);
rc = ares_init_options(&new_sb_dns->channel, &options, SB_DNS_ARES_OPT);
if (rc)
return SB_ERROR;
/* !!! */
ares_gethostbyname(new_sb_dns->channel, new_sb_dns->host, AF_INET,callback_gethost, new_sb_dns);
#if 0
DebugOutput("BEFORE ADD host[%s]", host);
sb_debug_show_dns_waiting_info();
sb_debug_show_dns_hash_info();
#endif
/* new_sb_dns->timeout_time = sb_get_msec_timeout_time(SB_DNS_TIMEOUT); */
hlist_add_before(&new_sb_dns->hash_node, pos);
new_sb_dns->status = SB_DNS_STATUS_WAITING;
list_add(&new_sb_dns->list_node, wait_parse_list_head);
waiting_parsing_count++;
#if 0
DebugOutput("AFTER ADD host[%s]", host);
sb_debug_show_dns_waiting_info();
sb_debug_show_dns_hash_info();
#endif
return SB_AGAIN;
} else {
pre_pos = pos;
}
}
new_sb_dns = (struct sb_dns*)calloc(1, sizeof(struct sb_dns));
if (!new_sb_dns)
return SB_ERROR;
sb_dns_set_worker_array(new_sb_dns, workerno, 1);
#if 0
DebugOutput("BEFORE ADD host[%s]", host);
sb_debug_show_dns_waiting_info();
sb_debug_show_dns_hash_info();
#endif
strcpy(new_sb_dns->host, host);
rc = ares_init_options(&new_sb_dns->channel, &options, SB_DNS_ARES_OPT);
if (rc)
return SB_ERROR;
ares_gethostbyname(new_sb_dns->channel, new_sb_dns->host, AF_INET, callback_gethost, new_sb_dns);
/* new_sb_dns->timeout_time = sb_get_msec_timeout_time(SB_DNS_TIMEOUT); */
hlist_add_after(pre_pos, &new_sb_dns->hash_node);
new_sb_dns->status = SB_DNS_STATUS_WAITING;
list_add(&new_sb_dns->list_node, wait_parse_list_head);
waiting_parsing_count++;
#if 0
DebugOutput("AFTER ADD host[%s]", host);
sb_debug_show_dns_waiting_info();
sb_debug_show_dns_hash_info();
#endif
return SB_AGAIN;
}
}
int sb_dns_reparse_host (char *host, int workerno)
{
int rc;
struct hlist_head *hash_head;
struct hlist_node *pos = NULL;
struct hlist_node *pre_pos = NULL;
struct sb_dns *type;
int slot_index;
int cmp;
slot_index = RSHash (host) % SB_DNS_HASH_SLOTS;
hash_head = &hash_slots[slot_index];
hlist_for_each_entry (type, pos, hash_head, hash_node) {
if ((cmp = strcmp (type->host, host)) == 0 && type->status == SB_DNS_STATUS_OK)
{
if (type->status != SB_DNS_STATUS_WAITING && type->status != SB_DNS_STATUS_PARSING) {
if (ares_init_options (&type->channel, &options, SB_DNS_ARES_OPT)) {
return SB_ERROR;
}
type->status = SB_DNS_STATUS_WAITING;
list_add (&type->list_node, wait_parse_list_head);
waiting_parsing_count++;
/*
sb_dns_set_worker_bitmap (type->waiting_dns_worker_bitmap, workerno, 1);
*/
sb_dns_set_worker_array(type, workerno, 1);
break;
}
} else if (cmp > 0) {
/* not possible */
break;
}
}
return SB_OK;
}
static void callback_gethost (void *arg, int status, int timeouts, struct hostent *h)
{
struct sb_dns *dns;
char **pptr;
dns = (struct sb_dns *) arg;
if (h == NULL)
{
if (status == ARES_ETIMEOUT)
dns->status = SB_DNS_STATUS_TIMEOUT;
else
dns->status = SB_DNS_STATUS_ERROR;
}
else
{
dns->status = SB_DNS_STATUS_OK;
for (pptr = h->h_addr_list; *pptr != NULL; pptr++)
{
/* if (inet_ntop(h->h_addrtype, *pptr, dns->ip, sizeof(dns->ip) - 1)) */
if (inet_ntop (AF_INET, *pptr, dns->ip, sizeof (dns->ip) - 1))
{ /* just use ipv4 */
return;
}
}
FatalOutput ("inet_ntop not right %s", *pptr);
}
}
void sb_debug_show_dns_waiting_info()
{
struct sb_dns *pos;
struct dns_msg msg;
DebugOutput("waiting_list info start...");
list_for_each_entry (pos, wait_parse_list_head, list_node)
{
DebugOutput("host[%s]", pos->host);
}
DebugOutput("waiting_list info end...");
}
void sb_debug_show_dns_parsing_info()
{
struct sb_dns *pos, *n;
struct dns_msg msg;
DebugOutput("parsing_list info start...");
list_for_each_entry (pos, parsing_list_head, list_node) {
DebugOutput("host[%s]-ip[%s]-status[%d]", pos->host, pos->ip, pos->status);
}
DebugOutput("parsing_list info end...");
}
void sb_debug_show_dns_hash_info()
{
struct hlist_head *hash_head;
struct sb_dns *type;
struct hlist_node *pos = NULL;
int i = 0;
DebugOutput("hlist info start...");
for (i = 0; i < SB_DNS_HASH_SLOTS; i++) {
DebugOutput("slot[%d] start...", i);
hash_head = &hash_slots[i];
hlist_for_each_entry(type, pos, hash_head, hash_node) {
DebugOutput("host[%s]-ip[%s]-status[%d]", type->host, type->ip, type->status);
}
DebugOutput("slot[%d] end...", i);
}
DebugOutput("hlist info end...");
}
/* @RETURN 1. SB_ERROR 2. SB_OK */
int sb_dns_process (int msgqueueid)
{
fd_set readers, writers;
int nfds = 1;
FD_ZERO (&readers);
FD_ZERO (&writers);
int count;
struct sb_dns *pos, *n;
struct timeval maxtv = { 0, 500 }; /* 50ms */
int rc, i, j;
int workerno;
struct dns_msg msg;
list_for_each_entry (pos, parsing_list_head, list_node)
{
nfds = ares_fds (pos->channel, &readers, &writers);
}
count = select (nfds, &readers, &writers, NULL, &maxtv);
if (count == -1) {
ErrorOutput("select failed, errno[%d]", errno);
return SB_ERROR;
}
list_for_each_entry_safe (pos, n, parsing_list_head, list_node)
{
ares_process (pos->channel, &readers, &writers);
if (pos->status != SB_DNS_STATUS_PARSING)
{
list_del_init (&pos->list_node);
waiting_parsing_count--;
#if 0
for (i = 0; i < SB_DNS_BITMAP_LEN; i++)
{
if (pos->waiting_dns_worker_bitmap[i] != 0x0)
{
for (j = 0; j < 8; j++)
{
if ((pos->waiting_dns_worker_bitmap[i] & (0x1 << j)))
{
if (pos->status == SB_DNS_STATUS_OK) {
sb_dns_gen_msg (&msg, pos, SB_DNS_STATUS_CHAR_OK);
} else if (pos->status == SB_DNS_STATUS_CHAR_TIMEOUT) {
sb_dns_gen_msg (&msg, pos, SB_DNS_STATUS_CHAR_TIMEOUT);
} else {
sb_dns_gen_msg (&msg, pos, SB_DNS_STATUS_CHAR_FAIL);
}
workerno = sb_dns_get_workerno(i, j);
msg.m.mtype = workerno + SB_REQREPMSG_TYPESPLIT;
rc = msg_send (msgqueueid, &msg, SB_WAIT);
if (rc) {
ErrorOutput ("send dns_msg failed, info[%s]", msg.m.mtext);
return rc;
}
msg.m.mtext[msg.mtext_len] = '\0';
DebugOutput("mtype[%ld] dns-send msg [%s] len[%d]", msg.m.mtype, msg.m.mtext, msg.mtext_len);
}
}
}
}
/* after send, reset */
memset (pos->waiting_dns_worker_bitmap, 0x00, sizeof (pos->waiting_dns_worker_bitmap));
#endif
for (i = 0; i < pos->waiting_dns_worker_num; i++)
{
if (pos->status == SB_DNS_STATUS_OK) {
sb_dns_gen_msg (&msg, pos, SB_DNS_STATUS_CHAR_OK);
} else if (pos->status == SB_DNS_STATUS_CHAR_TIMEOUT) {
sb_dns_gen_msg (&msg, pos, SB_DNS_STATUS_CHAR_TIMEOUT);
} else {
sb_dns_gen_msg (&msg, pos, SB_DNS_STATUS_CHAR_FAIL);
}
workerno = pos->waiting_dns_worker_array[i];
msg.m.mtype = workerno + SB_REQREPMSG_TYPESPLIT;
rc = msg_send (msgqueueid, &msg, SB_WAIT);
if (rc) {
ErrorOutput ("send dns_msg failed, info[%s]", msg.m.mtext);
return rc;
}
msg.m.mtext[msg.mtext_len] = '\0';
DebugOutput("mtype[%ld] dns-send msg [%s] len[%d]", msg.m.mtype, msg.m.mtext, msg.mtext_len);
}
/* after send, reset */
pos->waiting_dns_worker_num = 0;
/* destory channel */
ares_destroy (pos->channel);
if (pos->status != SB_DNS_STATUS_OK)
{
pos->status = SB_DNS_STATUS_DESTORYED;
}
pos->channel = NULL;
}
}
rc = SB_OK;
/* add wait_parse_list_head to parsing_list_head */
list_for_each_entry (pos, wait_parse_list_head, list_node)
{
pos->status = SB_DNS_STATUS_PARSING;
}
list_splice_init (wait_parse_list_head, parsing_list_head);
return rc;
}
void sb_dns_signal_handler(int signo)
{
if (sig_cmd != 0) {
return;
}
SystemInfo("catch signo[%d]", signo);
if (signo == SIGUSR1) {
sig_cmd = SB_SYS_CMD_QUICKEXIT;
}
else if (signo == SIGUSR2) {
sig_cmd = SB_SYS_CMD_SLOWEXIT;
}
else {
SystemInfo("unknow signal");
}
}
/*
struct mymsgbuf
{
long mtype;
char mtext[SB_MSG_QUEUE_MAX_MSG_LEN];
};
struct dns_msg
{
int mtext_len;
struct mymsgbuf m;
};
*/
void sb_dns_signal_init()
{
int signo;
struct sigaction act;
for (signo = 1; signo <= 64; signo++) {
if (signo == SIGKILL || signo == SIGSTOP) {
continue;
}
signal(signo, SIG_IGN);
}
/* TODO */
signal(SIGINT, SIG_DFL);
memset(&act, 0x00, sizeof(struct sigaction));
act.sa_handler = sb_dns_signal_handler;
sigaction(SIGUSR2, &act, NULL);
}
int sb_dns_init(int msgqueueid)
{
int rc;
if ((rc = sb_dns_init_env())) {
ErrorOutput("init dns env failed, rc=%d\n", rc);
return SB_ERROR;
}
sb_dns_signal_init();
return SB_OK;
}
int sb_dns_main_loop (int msgqueueid)
{
int rc = 0;
struct dns_msg msg;
char *host;
char *ip;
char *status;
int len;
static long msgtype = -(SB_REQREPMSG_TYPESPLIT - 1);
int workerno;
SystemInfo("dns process enter loop...");
while (1)
{
sb_update_time(&cache_time, cache_str_time);
if (sig_cmd == SB_SYS_CMD_QUICKEXIT) {
SystemInfo("catch quck exit signal, break dns main loop");
break;
}
/* recv request */
while (1)
{
if (sig_cmd == SB_SYS_CMD_SLOWEXIT) {
break;
}
rc = msg_recv (msgqueueid, msgtype, &msg, SB_NO_WAIT);
if (rc != SB_OK) {
if (errno != EAGAIN && errno != ENOMSG) {
ErrorOutput ("recv dns_msg failed, errno[%d]-error[%s]", errno, strerror(errno));
/* TODO */
exit(1);
}
break;
}
else {
workerno = msg.m.mtype;
msg.m.mtext[msg.mtext_len] = '\0';
DebugOutput("dns-recv msg [%s]", msg.m.mtext);
if (strstr (msg.m.mtext, "#1") != NULL) {
/* update request */
if (sb_dns_reparse_host (msg.m.mtext, workerno)) {
ErrorOutput ("reparse host failed, host[%s]", msg.m.mtext);
}
}
else {
/* spilit by '#' */
host = msg.m.mtext; /* len1 = dns->mtext_len */
status = msg.m.mtext + msg.mtext_len + 1; /* len2 = 1 */
ip = msg.m.mtext + msg.mtext_len + 3;
/* maxlen = SB_MSG_QUEUE_MAX_MSG_LEN - len1 - len2 - 2(two '#') */
len = SB_MSG_QUEUE_MAX_MSG_LEN - msg.mtext_len - 3;
rc = sb_dns_query_host_ip (host, ip, &len, workerno);
if (rc == SB_OK) {
msg.m.mtype += SB_REQREPMSG_TYPESPLIT;
sb_dns_msg_tag_status (&msg, msg.mtext_len, len, SB_DNS_STATUS_CHAR_OK);
rc = msg_send (msgqueueid, &msg, SB_WAIT);
if (rc)
{
ErrorOutput ("send dns_msg failed, info[%s]", msg.m.mtext);
}
msg.m.mtext[msg.mtext_len] = '\0';
DebugOutput("dns-send msg [%s]", msg.m.mtext);
}
else if (rc == SB_AGAIN) {
/* do nothing, just wait */
}
else {
msg.m.mtype += SB_REQREPMSG_TYPESPLIT;
sb_dns_msg_tag_status (&msg, msg.mtext_len, len, SB_DNS_STATUS_CHAR_FAIL);
rc = msg_send(msgqueueid, &msg, SB_WAIT);
if (rc)
{
ErrorOutput ("send dns_msg failed, info[%s]", msg.m.mtext);
}
}
}
}
}
if (sb_dns_process(msgqueueid)) {
ErrorOutput("sb_dns_process , errno[%d]", errno);
break;
}
if (sig_cmd == SB_SYS_CMD_SLOWEXIT) {
if (list_empty(wait_parse_list_head) && list_empty(parsing_list_head)) {
SystemInfo("slow exit,no waiting and parsing request, break dns main loop");
break;
}
}
/* using sleep not good idea */
if (waiting_parsing_count <= 0) {
usleep(100000); /* 100ms */
}
}
return SB_OK;
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/postgresqlPlus/sbalance.git
git@gitee.com:postgresqlPlus/sbalance.git
postgresqlPlus
sbalance
sbalance
master

搜索帮助