1 Star 1 Fork 15

lihuaib/sbalance

forked from hotmocha/sbalance 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
sbproxy.c 32.99 KB
一键复制 编辑 原始数据 按行查看 历史
hotmocha 提交于 2015-06-30 20:20 . a

/* TODO: add dns timer */
#include "sbalance.h"
#include "sbdns.h"
#include "sbconstant.h"
#include "sbproxy.h"
#include "sbproxy.h"
#include "sblist.h"
#include <string.h>
#include <stdio.h>
#include <unistd.h>
#include <netdb.h>
#include <sys/types.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <errno.h>
#define SB_DNS_TIMEOUT_TIME 2500
#define SB_DNS_HASH_SLOTS 11
/* fixbug: client want to finish a keepalive, but in get request function will treat
the normal finish event an error */
int sb_proxy_send_request_to_server_event(struct sb_cycle_env *env, struct sb_event *ev);
int sb_proxy_accept_event(struct sb_cycle_env *env, struct sb_event *ev);
int sb_proxy_get_request_event(struct sb_cycle_env *env, struct sb_event *ev);
int sb_proxy_test_connect_http_server_complete_event(struct sb_cycle_env *env, struct sb_event *ev);
int sb_proxy_send_to_client_connectserverok_event(struct sb_cycle_env *env, struct sb_event *ev);
int sb_proxy_dns_request(struct sb_cycle_env *env, struct sb_session *sess, int flag);
int sb_proxy_dns_recv_resp(struct sb_cycle_env *env);
static char *CRLF = "\r\n";
static char line_buf[MAXIOBUFLEN];
/* @RETURN callback Handler */
void sb_dns_done_handler(struct sb_cycle_env *env, struct sb_session *sess)
{
int rc;
int ip_len;
int len;
if (sess->status == SB_DNS_STATUS_OK) { /* DESTORYED OR OK */
len = sizeof(sess->server.net_address.ip);
rc = sb_proxy_connect_http_server(env, sess);
if (rc == SB_ERROR) {
ErrorOutput("proxy connect failed, session finish");
(void)sb_finish_session(env, sess);
}
}
else {
if (sess->host_is_ip) {
(void)sb_finish_session(env, sess);
return;
}
/* try to update */
sb_proxy_dns_request(env, sess, 1);
ErrorOutput("proxy dns request failed, session finish");
(void)sb_finish_session(env, sess);
}
}
/* @RETURN 1. SB_OK 2. SB_ERROR 3. SB_AGAIN(dns parsing, when complete, sb_dns_done_handler be called, occur in new request)*/
int sb_parse_http_request_header(struct sb_session *sess)
{
unsigned int line_len;
char *buf = sess->cs_buf;
char *start_ptr = buf + sess->current_offset;
char *end_ptr = NULL;
int err;
char *c = NULL;
struct sb_connection *server_conn = NULL;
int i = 0, j;
int len, host_len;
int del_len = 0;
server_conn = &sess->server;
while (start_ptr < start_ptr + sess->cs_used) {
end_ptr = NULL;
for (c = start_ptr; c <= start_ptr + sess->cs_used - 2; c++) {
if (*c == '\r' && *(c + 1) == '\n' ) {
end_ptr = c;
break;
}
}
if (end_ptr == NULL) {
break;
}
line_len = end_ptr - start_ptr;
/* first line */
if (sess->current_offset == 0) {
if (! ( memcmp(start_ptr, "GET", 3) == 0 ||
memcmp(start_ptr, "POST", 4) == 0 ||
memcmp(start_ptr, "PUT", 3) == 0 ||
memcmp(start_ptr, "HEAD", 4) == 0 ||
memcmp(start_ptr, "TRACE", 5) == 0 ||
memcmp(start_ptr, "CONNECT", 7) == 0 ||
memcmp(start_ptr, "DELETE", 6) == 0 )
) {
ErrorOutput("http request method error");
return SB_ERROR;
}
}
if (start_ptr == end_ptr) {
sess->crlf = 1;
}
else if (line_len >= 7 && memcmp(start_ptr, "CONNECT", 7) == 0) {
sess->http_connect_method = 1;
}
else if (line_len >= 15 && memcmp(start_ptr, "Content-Length:", 15) == 0) {
char *tmp_start = NULL;
char *tmp_end = NULL;
char tmp_buf[32];
for (tmp_start = start_ptr + 15; tmp_start < end_ptr && (*tmp_start == ' ' || *tmp_start== '\t'); tmp_start++ ){}
for (tmp_end = end_ptr - 1; tmp_end >= tmp_start && (*tmp_end == ' ' || *tmp_end== '\t'); tmp_end--) {}
if (tmp_start <= tmp_end) {
memcpy(tmp_buf, tmp_start, tmp_end - tmp_start + 1);
tmp_buf[tmp_end - tmp_start + 1] = '\0';
sess->content_length = (unsigned int)atoi(tmp_buf);
}
}
else if (line_len >= 6 && memcmp(start_ptr, "Proxy-", 6) == 0) {
/* 6: strlen("Proxy-") */
del_len = 6;
memmove(start_ptr, start_ptr + 6, sess->cs_used - sess->current_offset - del_len);
}
else if (line_len >= 5 && memcmp(start_ptr, "Host:", 5) == 0) {
char *tmp_start = NULL;
char *tmp_end = NULL;
char tmp_buf[256];
int colon = 0;
int is_origin_ip = 1;
for (tmp_start = start_ptr + 5; tmp_start < end_ptr && (*tmp_start == ' ' || *tmp_start== '\t'); tmp_start++ ){}
for (tmp_end = end_ptr - 1; tmp_end >= tmp_start && ( *tmp_end == ' ' || *tmp_end== '\t'); tmp_end--) {}
/* already dns the host ip */
if (server_conn->net_address.ip[0] == '\0') {
if (tmp_start < tmp_end) {
host_len = tmp_end - tmp_start + 1;
sess->host_is_ip = 0;
j = host_len;
for (i = 0; i < host_len; i++) {
if (tmp_start[i] != '.' && !(tmp_start[i] >= '0' && tmp_start[i] <= '9') && tmp_start[i] != ':') {
is_origin_ip = 0;
break;
} else if (tmp_start[i] == ':') {
colon++;
j = i;
}
}
if (colon >= 2) {
ErrorOutput("request header not correct");
return SB_ERROR;
}
if (is_origin_ip == 1) { /* origin ip */
if (colon == 1) {
/* ip address */
memcpy(server_conn->net_address.ip, tmp_start, j);
server_conn->net_address.ip[j] = '\0';
*(tmp_end + 1) = '\0';
/* port */
server_conn->net_address.port = atoi(tmp_start + j + 1);
}
else { /* colon = 0 */
memcpy(server_conn->net_address.ip, tmp_start, host_len);
server_conn->net_address.ip[host_len] = '\0';
server_conn->net_address.port = sess->http_connect_method ? 443 : 80;
}
sess->host_is_ip = 1;
} else {
sess->host[0] = '\0';
if (colon == 1) {
memcpy(sess->host, tmp_start, j);
sess->host[j] = '\0';
*(tmp_end + 1) = '\0';
server_conn->net_address.port = atoi(tmp_start + j + 1);
} else {
memcpy(sess->host, tmp_start, host_len);
sess->host[host_len] = '\0';
server_conn->net_address.port = sess->http_connect_method ? 443 : 80;
}
}
}
else {
return SB_ERROR;
}
} else {
/* do nothing */
}
}
sess->current_offset += (line_len + 2 - del_len);
sess->cs_used -= del_len;
del_len = 0;
start_ptr = sess->cs_buf + sess->current_offset;
if (sess->crlf == 1)
break;
}
return SB_OK;
}
/* start with no event handler both in server and client */
int sb_proxy_connect_http_server_ok(struct sb_cycle_env *env, struct sb_session *sess)
{
struct sb_connection *ser_c;
struct sb_connection *cli_c;
const static char CONNECTMETHOD_CONNECTSEREROK[] = "HTTP/1.1 200 Connection Established\r\n\r\n";
const static char CONNECTMETHOD_CONNECTSEREROK_LEN = sizeof(CONNECTMETHOD_CONNECTSEREROK) - 1;
ser_c = &sess->server;
cli_c = &sess->client;
if (sb_event_is_active(&ser_c->write)) {
if (sb_del_write_event(&ser_c->write)) {
ErrorOutput("connect server ok, but delete write event failed\n");
return SB_ERROR;
}
}
/* client use connect medhod want to build tcp tunnel */
if (sess->http_connect_method) {
/*
if (sess->request_times || sess->sc_used) {
FatalOutput("http connect method request can't use previous tcp channel");
//return SB_ERROR;
}
*/
memcpy(sess->sc_buf + sess->sc_used, CONNECTMETHOD_CONNECTSEREROK, CONNECTMETHOD_CONNECTSEREROK_LEN);
sess->sc_used += CONNECTMETHOD_CONNECTSEREROK_LEN;
if (!sb_event_is_active(&cli_c->write)) {
sb_add_write_event(&cli_c->write, sb_proxy_send_to_client_connectserverok_event);
}
}
else {
/* server's read handler always sb_transfer_read_event */
/* client's write handler always sb_transfer_write_event, but add when has sending data */
sb_add_read_event(&ser_c->read, sb_transfer_read_event );
(void)sb_proxy_start_request(env, sess);
}
sess->request_times++;
return SB_OK;
}
/* EVENT HANDLER: send the request to http server
if send done, add client's write handler sb_transfer_write_event and server's read handler sb_transfer_read_event
and delete server write handler
*/
int sb_proxy_send_request_to_server_event(struct sb_cycle_env *env, struct sb_event *ev)
{
int err;
struct sb_connection *ser_c = NULL;
struct sb_connection *cli_c = NULL;
struct sb_session *sess = NULL;
ser_c = (struct sb_connection*)ev->data;
sess= ser_c->session;
cli_c = &sess->client;
err = sb_send(ser_c);
if (err == SB_OK) { /* Send done, change handler */
if (cli_c->read.shutdown) {
InfoOutput("client[%s]:[%d]--server[%s]:[%d] finish", sess->client.net_address.ip,
sess->client.net_address.port, sess->server.net_address.ip, sess->server.net_address.port);
(void)sb_finish_session(env, sess);
return SB_OK;
}
/* now we should, recv server's data and send to client */
/* write will delete auto */
//sb_del_write_event(&ser_c->write);
/* copy next request's data to cs_buf */
memmove(sess->cs_buf, sess->cs_buf + sess->current_offset, sess->cs_used_remain);
sess->current_offset = 0;
sess->content_length = 0;
sess->cs_used_remain = 0;
sess->crlf = 0;
/* get client request */
sb_add_read_event(&cli_c->read, sb_proxy_get_request_event);
/***
sb_del_write_event(&ser_c->write);
if (!sb_event_is_active(&cli_c->write)) {
sb_add_write_event(&cli_c->write, sb_transfer_write_event );
}
***/
return SB_OK;
}
else if (err == SB_AGAIN) {
return SB_OK;
}
else {
ErrorOutput("client[%s]:[%d]--server[%s]:[%d] send error", sess->client.net_address.ip,
sess->client.net_address.port, sess->server.net_address.ip, sess->server.net_address.port);
(void)sb_finish_session(env, sess);
return SB_ERROR;
}
}
/* EVENT HANDLER: proxy send client "Established" response after connect server ok(when request use connect method) */
/* notice: because this event handler call another event handler, so in the event handler will not finish session
and when send all data, new ssl session */
int sb_proxy_send_to_client_connectserverok_event(struct sb_cycle_env *env, struct sb_event *ev)
{
int rc = 0;
struct sb_connection *cli_c;
struct sb_connection *ser_c;
struct sb_session *sess;
cli_c = (struct sb_connection*)ev->data;
sess = cli_c->session;
ser_c = &sess->server;
/* manually call another event handler */
rc = sb_transfer_write_event(env, ev);
if (rc == SB_OK) {
/* when data send done */
sess->cs_used = 0;
sb_add_read_event(&ser_c->read, sb_transfer_read_event );
sb_add_read_event(&cli_c->read, sb_transfer_read_event );
/** delete write auto in sb_transfer_write_event
if (sb_event_is_active(&cli_c->write)) {
sb_del_write_event(&cli_c->write);
}
**/
}
return rc;
}
/* EVENT HANDLER: server write handler*/
int sb_proxy_test_connect_http_server_complete_event(struct sb_cycle_env *env, struct sb_event *ev)
{
int err = 0, len = sizeof(int);
int r;
struct sb_connection *c;
struct sb_session *sess;
c = (struct sb_connection*)ev->data;
sess = c->session;
r = getsockopt(c->sockfd, SOL_SOCKET, SO_ERROR, (void *) &err, &len);
if (r || err) {
(void)sb_finish_session(env, sess);
ErrorOutput("cli[%s]-ser[%s] make transfer link failed, r[%d] errno[%d] err[%d]", sess->client.net_address.ip, sess->server.net_address.ip, r, errno, err);
return SB_ERROR;
}
else {
/* ok, finally success */
err = sb_proxy_connect_http_server_ok(env, sess);
if (err) {
(void)sb_finish_session(env, sess);
ErrorOutput("client[%s]-server[%s] connect ok,but init env failed", sess->client.net_address.ip, sess->server.net_address.ip);
return SB_ERROR;
}
else {
DebugOutput("client[%s]-server[%s]:[%d] make transfer link success", sess->client.net_address.ip, sess->server.net_address.ip, sess->server.net_address.port);
return SB_OK;
}
}
}
int sb_proxy_start_request(struct sb_cycle_env *env, struct sb_session *sess)
{
/* 1. stop recv browser's data (delte client's read event)
2. try connect http server
3. copy cs_buf_tmp to cs_buf(completely request) and clear completely request from cs_buf
*/
struct sb_connection *in;
struct sb_connection *out;
struct sockaddr_in addr;
struct sb_event *we;
in = &sess->client;
out = &sess->server;
/* activate server's write, because keepalive allow one tcp channel can exchange mutiple request and response */
if (sb_event_is_active(&out->write))
sb_del_write_event(&out->write);
sb_add_write_event(&out->write, sb_proxy_send_request_to_server_event );
return SB_OK;
}
/* init socket, try connect server */
int sb_proxy_connect_http_server(struct sb_cycle_env *env, struct sb_session *sess)
{
int err, n, fd;
struct sb_connection *in;
struct sb_connection *out;
struct sockaddr_in addr;
struct sb_event *we;
int addr_len;
in = &sess->client;
out = &sess->server;
we = &out->write;
/* let's connect server */
if ((fd = socket(AF_INET, SOCK_STREAM, 0)) < 0 ) {
if (errno == EMFILE || errno == ENFILE) {
/* delay accept */
ErrorOutput("too many fd, access system limit, ip[%s]'s request is cut", sess->client.net_address.ip);
}
else {
ErrorOutput("connect server, get socket fd failed, ip[%s]'s request is cut", sess->client.net_address.ip);
}
return SB_ERROR;
}
out->sockfd = fd;
memset(&addr, 0x00, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(out->net_address.port);
if (inet_pton(AF_INET, out->net_address.ip, &addr.sin_addr.s_addr) != 1) {
ErrorOutput("inet_pton failed ip[%s],errno[%d]", out->net_address.ip, errno);
sess->why_finish = FINISH_REASON_SYSTEMCALLERR;
return SB_ERROR;
}
addr_len = sizeof(struct sockaddr_in);
memcpy(&out->net_address.sockaddr, &addr, addr_len);
sb_set_nonblocking(out->sockfd);
sb_set_reuseaddr(out->sockfd, 1);
sb_set_tcpkeepalive(out->sockfd);
if (connect(out->sockfd, (struct sockaddr*)&addr, sizeof(addr)) == -1) {
if (errno != EINPROGRESS) {
ErrorOutput("connect server[%s] failed, errno[%d]", sess->server.net_address.ip, errno);
sess->why_finish = FINISH_REASON_CONNECTSERVERERR;
return SB_CONNECTERR;
}
/* can't know the connect result
so register the server's read event and set handler */
else if (errno == EINPROGRESS) {
err = sb_add_write_event(we, sb_proxy_test_connect_http_server_complete_event );
if (err) {
ErrorOutput("connect ip[%s]:port[%d] add write event failed, errno[%d]", out->net_address.ip, out->net_address.port, errno);
sess->why_finish = FINISH_REASON_PROCESSEVENTERR;
return SB_ERROR;
}
return SB_AGAIN;
}
} else { /* connect success */
err = sb_proxy_connect_http_server_ok(env, sess);
if (err) {
ErrorOutput("client[%s]-server[%s] connect ok,but init env failed", sess->client.net_address.ip, sess->server.net_address.ip);
return SB_ERROR;
}
else {
InfoOutput("client[%s]:[%d]--[%d]local[%d]--server[%s]:[%d] link success", sess->client.net_address.ip,
sess->client.net_address.port, sess->client.sockfd,sess->server.sockfd,
sess->server.net_address.ip, sess->server.net_address.port);
return SB_OK;
}
}
}
struct sb_session* sb_proxy_fork_session(struct sb_cycle_env *env, struct sb_session *sess)
{
InfoOutput("client[%s]:[%d]--[%d]local[%d]--server[%s]:[%d] fork session", sess->client.net_address.ip,
sess->client.net_address.port, sess->client.sockfd,sess->server.sockfd,
sess->server.net_address.ip, sess->server.net_address.port);
struct sb_session *new_session;
struct sb_connection *cli_c;
struct sb_connection *ser_c;
struct sb_event *cli_re;
struct sb_event *cli_we;
struct sb_event *ser_re;
struct sb_event *ser_we;
/* try two times */
new_session = sb_get_session(env);
if (new_session == NULL) {
new_session = sb_get_session(env);
}
if (new_session == NULL) {
return NULL;
}
memcpy(new_session, sess, sizeof(struct sb_session));
cli_c = &new_session->client;
ser_c = &new_session->server;
cli_c->session = new_session;
ser_c->session = new_session;
cli_re = &cli_c->read;
cli_re->data = cli_c;
cli_we = &cli_c->write;
cli_we->data = cli_c;
ser_re = &ser_c->read;
ser_re->data = ser_c;
ser_we = &ser_c->write;
ser_we->data = ser_c;
/** not need event and timer
if (sb_event_is_active(&sess->client.read)) {
sb_add_read_event(cli_re, sess->client.read.handler);
}
if (sb_event_is_active(&sess->server.read)) {
sb_add_read_event(ser_re, sess->server.read.handler);
}
if (sb_event_is_active(&sess->client.write)) {
sb_add_write_event(cli_we, sess->client.write.handler);
}
if (sb_event_is_active(&sess->server.write)) {
sb_add_write_event(ser_we, sess->server.write.handler);
}
if (sess->client.read.in_timer_set) {
sb_timer_push_event(cli_re);
}
if (sess->client.write.in_timer_set) {
sb_timer_push_event(cli_we);
}
if (sess->server.read.in_timer_set) {
sb_timer_push_event(ser_re);
}
if (sess->server.write.in_timer_set) {
sb_timer_push_event(ser_we);
}
**/
return new_session;
}
/* special use for http proxy's CONNECT method, just finish server side
client side will be held by new forked session */
int sb_finish_session_with_opening_client(struct sb_cycle_env *env, struct sb_session *sess)
{
struct sb_connection *ser_c;
struct sb_connection *cli_c;
if (sess->had_finish){
return SB_OK;
}
sess->had_finish = 1;
cli_c = &sess->client;
ser_c = &sess->server;
/* unregister event */
if (sb_event_is_active(&cli_c->read)) {
sb_del_read_event(&cli_c->read);
}
if (sb_event_is_active(&cli_c->write)) {
sb_del_write_event(&cli_c->write);
}
if (sb_event_is_active(&ser_c->read)) {
sb_del_read_event(&ser_c->read);
}
if (sb_event_is_active(&ser_c->write)) {
sb_del_write_event(&ser_c->write);
}
/* can't close client's sockfd!!! */
if (sess->server.sockfd > 0) {
close(sess->server.sockfd);
sess->server.sockfd = 0;
}
/* delete timer */
if (cli_c->read.in_timer_set) {
sb_timer_pop_event(&cli_c->read);
}
if (cli_c->write.in_timer_set) {
sb_timer_pop_event(&cli_c->write);
}
if (ser_c->read.in_timer_set) {
sb_timer_pop_event(&ser_c->read);
}
if (ser_c->write.in_timer_set) {
sb_timer_pop_event(&ser_c->write);
}
/* mv session from active list to reuse list */
sb_list_delete_session(&env->active_session_list, sess);
env->active_session_num--;
sb_list_push_session(&env->new_reuse_session_list, sess);
env->reuse_sessoion_num++;
return SB_OK;
}
/* EVENT HANDLER: client read */
int sb_proxy_get_request_event(struct sb_cycle_env *env, struct sb_event *ev)
{
int err;
struct sb_connection *in;
struct sb_connection *out;
struct sb_session *sess;
struct sb_session *new_sess;
int r, rc;
int content_len;
in = ev->data;
sess = in->session;
out = &sess->server;
/* @RETURN: 1. SB_BUFNOTENOUGH 2. SB_AGAIN 3. SB_ERROR(errno) 4. SB_DONE */
rc = sb_recv(in);
if (rc == SB_DONE) {
/* finish the session */
/* fixbug: when client use shutdown(SHUT_WR), client will not send data,
but want to recv data from server, so we can't finish whole session. Try send data to client */
/* (void)sb_finish_session(env, sess); */
in->read.shutdown = 1;
InfoOutput("[%s] request want to finish", in->net_address.ip);
}
if(rc == SB_BUFNOTENOUGH) {
/* buffer not enough */
(void)sb_finish_session(env, sess);
ErrorOutput("[%s] request too long", in->net_address.ip);
return SB_ERROR;
}
else if (rc == SB_ERROR) {
(void)sb_finish_session(env, sess);
ErrorOutput("[%s] request error!", in->net_address.ip);
return SB_ERROR;
}
else { /* SB_AGAIN or SB_DONE */
if (sess->cs_used - sess->current_offset >= 2 && sess->crlf == 0) {
if ((rc = sb_parse_http_request_header(sess)) == SB_ERROR) { /* maybe SB_AGAIN */
ErrorOutput("sessionid[%lu] sb_parse_http_request_header failed", sess->sessionid);
(void)sb_finish_session(env, sess);
return SB_ERROR;
}
}
if (sess->crlf == 1) {
//memcpy(sess->cs_buf+sess->cs_used, "Connection:Close", sizeof("Connection:Close") -1);
//sess->cs_used += sizeof("Connection:Close") -1;
/* delete client's read, and start server request */
sb_del_read_event(&in->read);
if (sess->http_connect_method) {
FatalOutput("http connect method request can't use previous tcp channel");
(void)sb_finish_session(env, sess);
ErrorOutput("[%s] request [%s] error!", in->net_address.ip, out->net_address.ip);
return SB_ERROR;
/* 1. fork session
2. discard origin http connection with server's side
*/
new_sess = sb_proxy_fork_session(env, sess);
if (new_sess == NULL) {
(void)sb_finish_session(env, sess);
ErrorOutput("get session failed!");
return SB_ERROR;
}
(void)sb_finish_session_with_opening_client(env, sess);
/* wow!!! now we are in new session, say bye bye with old http session */
rc = sb_proxy_connect_http_server(env, new_sess);
if (rc == SB_ERROR) {
(void)sb_finish_session(env, new_sess);
return SB_ERROR;
}
}
if (sess->http_connect_method && sess->content_length) {
ErrorOutput("connect method can't use contect length");
(void)sb_finish_session(env, new_sess);
return SB_ERROR;
}
if (sess->content_length) {
content_len = sess->cs_used - sess->current_offset;
if (content_len < sess->content_length) {
return SB_AGAIN;
} else {
sess->cs_used_remain = content_len - sess->content_length;
sess->cs_used -= sess->cs_used_remain;
}
}
if (sb_proxy_start_request(env, sess)) {
(void)sb_finish_session(env, sess);
return SB_ERROR;
}
sess->request_times++;
}
else {
if (in->read.shutdown) {
if (sess->cs_used == 0) {
ErrorOutput("[%s] request finish!", in->net_address.ip);
rc = SB_OK;
}
else {
ErrorOutput("[%s] request finish before connect return and http request not correct", in->net_address.ip);
rc = SB_ERROR;
}
(void)sb_finish_session(env, sess);
return rc;
}
return SB_AGAIN;
}
}
return SB_OK;
}
/* EVENT HANDLER(client read) */
int sb_proxy_get_new_request_event(struct sb_cycle_env *env, struct sb_event *ev)
{
int err;
struct sb_connection *in;
struct sb_connection *out;
struct sb_session *sess;
int r, rc, rc2 = 0;
int content_len;
in = ev->data;
sess = in->session;
out = &sess->server;
/* @RETURN: 1. SB_BUFNOTENOUGH 2. SB_AGAIN 3. SB_ERROR(errno) 4. SB_DONE */
rc = sb_recv(in);
if (rc == SB_DONE) {
/* finish the session */
/* fixbug: when client use shutdown(SHUT_WR), client will not send data,
but want to recv data from server, so we can't finish whole session. Try send data to client */
/* (void)sb_finish_session(env, sess); */
in->read.shutdown = 1;
InfoOutput("[%s] request finish before connect return", in->net_address.ip);
}
if(rc == SB_BUFNOTENOUGH) {
/* buffer not enough */
(void)sb_finish_session(env, sess);
ErrorOutput("[%s] request too long", in->net_address.ip);
return SB_ERROR;
}
else if (rc == SB_ERROR) {
(void)sb_finish_session(env, sess);
ErrorOutput("[%s] request error!", in->net_address.ip);
return SB_ERROR;
}
else { /* SB_DONE or SB_AGAIN */
/* parse request */
if (sess->cs_used - sess->current_offset >=2 && sess->crlf == 0) {
if ((rc = sb_parse_http_request_header(sess)) == SB_ERROR) { /* maybe SB_AGAIN */
ErrorOutput("sessionid[%lu] sb_parse_http_request_header failed", sess->sessionid);
(void)sb_finish_session(env, sess);
return SB_ERROR;
}
}
if (sess->crlf == 1) {
//memcpy(sess->cs_buf+sess->cs_used, "Connection:Close", sizeof("Connection:Close") -1);
//sess->cs_used += sizeof("Connection:Close") -1;
/* delete client's read, and start server request */
sb_del_read_event(&in->read);
if (sess->http_connect_method && sess->content_length) {
ErrorOutput("connect method can't use content length");
(void)sb_finish_session(env, sess);
return SB_ERROR;
}
if (sess->content_length) {
content_len = sess->cs_used - sess->current_offset;
if (content_len < sess->content_length) {
return SB_AGAIN;
} else if (content_len >= sess->content_length) {
sess->cs_used_remain = content_len - sess->content_length;
sess->cs_used -= sess->cs_used_remain;
}
}
/* has server ip in host filed */
if (sess->host_is_ip) {
rc = sb_proxy_connect_http_server(env, sess);
if (rc == SB_ERROR) {
(void)sb_finish_session(env, sess);
ErrorOutput("sessionid[%lu] sb_proxy_connect_http_server error", sess->sessionid);
return SB_ERROR;
}
}
else {
/* need use dns */
rc = sb_proxy_dns_request(env, sess, 0);
if (rc == SB_ERROR) {
ErrorOutput("sessionid[%lu] sb_proxy_dns_request error", sess->sessionid);
(void)sb_finish_session(env, sess);
return SB_ERROR;
}
}
}
else {
if (in->read.shutdown) {
ErrorOutput("[%s] request finish before connect return and http request not correct", in->net_address.ip);
(void)sb_finish_session(env, sess);
return SB_ERROR;
}
}
}
return SB_OK;
}
int sb_proxy_accept_event(struct sb_cycle_env *env, struct sb_event *ev)
{
int err;
int new_fd;
struct sockaddr_in new_addr;
int new_addr_len;
struct sb_session *new_session;
struct sb_listen *li;
li = ev->data;
while(1) {
/* eat too many resource need wait */
/**
if (env->accept_dalay_flag == 1)
break;
**/
new_addr_len = sizeof(struct sockaddr_in);
new_fd = accept(li->listen_fd, (struct sockaddr*)&new_addr, &new_addr_len);
if (new_fd == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
DebugOutput("accept EAGAIN");
}
else if (errno == EMFILE || errno == ENFILE) {
/* need wait for a moment */
/* delay accept */
if (env->accept_delay_timeout > 0) {
sb_push_accept_delay_timer(env);
}
ErrorOutput("too many fd, access the system limit");
}
else {
ErrorOutput("listen ip[%s]-port[%d]-listen_fd[%d] accept failed",
li->listen_address.ip,
li->listen_address.port,
li->listen_fd );
}
break;
}
else {
struct sb_connection *cli_c;
struct sb_connection *ser_c;
struct sb_event *cli_re;
struct sb_event *cli_we;
struct sb_event *ser_re;
struct sb_event *ser_we;
/* try two times */
new_session = sb_get_session(env);
if (new_session == NULL) {
new_session = sb_get_session(env);
}
if (new_session == NULL) {
ErrorOutput("get session failed");
/* close the new socket */
/* can't get session, need wait for a moment */
close(new_fd);
break;
}
cli_c = &new_session->client;
cli_c->connect_type = CONNECT_TYPE_CLIENT;
ser_c = &new_session->server;
ser_c->connect_type = CONNECT_TYPE_SERVER;
cli_c->session = new_session;
ser_c->session = new_session;
cli_re = &cli_c->read;
cli_re->data = cli_c;
cli_we = &cli_c->write;
cli_we->data = cli_c;
ser_re = &ser_c->read;
ser_re->data = ser_c;
ser_we = &ser_c->write;
ser_we->data = ser_c;
/* set cli sockpt */
sb_set_nonblocking(new_fd);
sb_set_reuseaddr(new_fd, 1);
sb_set_tcpkeepalive(new_fd);
/* get client address info */
cli_c->sockfd = new_fd;
memcpy(&cli_c->net_address.sockaddr, &new_addr, new_addr_len);
strcpy(cli_c->net_address.ip, inet_ntoa(new_addr.sin_addr));
cli_c->net_address.port = ntohs(new_addr.sin_port );
InfoOutput("listen ip[%s]-port[%d] accept connect ip[%s]-port[%d]", li->listen_address.ip, li->listen_address.port,
cli_c->net_address.ip, cli_c->net_address.port);
sb_add_read_event(cli_re, sb_proxy_get_new_request_event );
}
}
return SB_OK;
}
#define SB_DNS_WAITING_HASH_SLOTS 11
static struct hlist_head hash_slots[SB_DNS_WAITING_HASH_SLOTS];
static struct list_head wait_parse_list_head;
static struct list_head parsing_list_head;
static struct ares_options options;
static unsigned long RSHash(const char *str)
{
unsigned long a = 63689;
unsigned long b = 378551;
unsigned long hash = 0;
while(*str) {
hash = hash * a + (*str++);
a *= b;
}
return hash;
}
int sb_proxy_dns_init()
{
int i;
for (i = 0; i < SB_DNS_HASH_SLOTS; i++) {
INIT_HLIST_HEAD(&hash_slots[i]);
}
INIT_LIST_HEAD(&parsing_list_head);
options.timeout = SB_DNS_TIMEOUT_TIME;
options.tries = 2;
return SB_OK;
}
/* flag == 1: force dns server update host */
int sb_proxy_dns_request(struct sb_cycle_env *env, struct sb_session *sess, int force_update)
{
struct dns_msg msg;
int host_len;
int rc = 0;
char *host;
struct sb_dns_request *new_request;
unsigned long hash_val;
int slot_index;
struct hlist_head *hash_head;
host = sess->host;
slot_index = RSHash(host) % SB_DNS_HASH_SLOTS;
hash_head = &hash_slots[slot_index];
if (hash_head->first == NULL) {
new_request = (struct sb_dns_request*)calloc(1, sizeof(struct sb_dns_request));
if (new_request == NULL) {
ErrorOutput("malloc new sb_dns_request failed\n");
return SB_ERROR;
}
strcpy(new_request->host, host);
INIT_LIST_HEAD(&new_request->sess_list_head);
hlist_add_head(&new_request->hash_node, hash_head);
list_add(&sess->dns_list_node, &new_request->sess_list_head);
} else {
struct hlist_node *pos = NULL;
struct hlist_node *pre_pos = NULL;
struct sb_dns_request *type;
int cmp;
int ip_len;
int add_last = 1;
hlist_for_each_entry(type, pos, hash_head, hash_node) {
if ((cmp = strcmp(type->host, host)) == 0) {
list_add(&sess->dns_list_node, &type->sess_list_head);
add_last = 0;
break;
} else if (cmp > 0) {
new_request = (struct sb_dns_request*)calloc(1, sizeof(struct sb_dns_request));
if (new_request == NULL) {
ErrorOutput("malloc new sb_dns_request failed\n");
return SB_ERROR;
}
strcpy(new_request->host, host);
hlist_add_before(&new_request->hash_node, pos);
INIT_LIST_HEAD(&new_request->sess_list_head);
list_add(&sess->dns_list_node, &new_request->sess_list_head);
add_last = 0;
break;
} else {
pre_pos = pos;
}
}
if (add_last) {
new_request = (struct sb_dns_request*)calloc(1, sizeof(struct sb_dns_request));
if (new_request == NULL) {
ErrorOutput("malloc new sb_dns_request failed\n");
return SB_ERROR;
}
strcpy(new_request->host, host);
INIT_LIST_HEAD(&new_request->sess_list_head);
list_add(&sess->dns_list_node, &new_request->sess_list_head);
hlist_add_after(pre_pos, &new_request->hash_node);
}
}
msg.m.mtype = env->workerno;
host_len = strlen(host);
memcpy(msg.m.mtext, host, host_len);
msg.mtext_len = host_len;
if (force_update == 1) { /* force flag */
memcpy(msg.m.mtext, "#1", 2);
msg.mtext_len = 2;
}
rc = msg_send (env->msgqueueid, &msg, SB_NO_WAIT);
msg.m.mtext[msg.mtext_len] = '\0';
ErrorOutput("msgqueueid[%d], msg.m.mtext[%s], type[%ld], len[%d]", env->msgqueueid, msg.m.mtext, msg.m.mtype, msg.mtext_len);
if (rc)
{
ErrorOutput ("send dns_msg failed, info[%s], errno[%d]-error[%s]", msg.m.mtext, errno, strerror(errno));
list_del_init(&sess->dns_list_node);
return SB_ERROR;
}
msg.m.mtext[msg.mtext_len] = '\0';
DebugOutput("dns_msg[%s]", msg.m.mtext);
return SB_OK;
}
int sb_proxy_dns_recv_resp(struct sb_cycle_env *env)
{
int rc;
struct dns_msg msg;
int ip_len;
struct sb_session *sess, *next_sess;
char *host, *ip, status;
char *resp_text;
int slot_index;
struct hlist_head *hash_head;
while (1) {
memset(&msg, 0x00, sizeof(struct dns_msg));
rc = msg_recv(env->msgqueueid , env->workerno + SB_REQREPMSG_TYPESPLIT, &msg, IPC_NOWAIT);
if (rc) {
if (errno != EAGAIN && errno != ENOMSG) {
ErrorOutput ("recv dns_msg errno[%d]-error[%s] [%d]", errno, strerror(errno), env->msgqueueid);
return SB_ERROR;
} else {
//DebugOutput("no recv dns_msg");
break;
}
}
resp_text = msg.m.mtext;
resp_text[msg.mtext_len] = '\0';
DebugOutput("RECV dns_msg[%s]-len[%d]", resp_text, msg.mtext_len);
host = index(resp_text, '#');
if (host == NULL) {
status = SB_DNS_STATUS_ERROR;
ip_len = 0;
}
else {
*host = '\0';
status = *(host + 1);
ip = host + 3;
ip_len = resp_text + msg.mtext_len - ip + 1;
}
host = resp_text;
slot_index = RSHash(host) % SB_DNS_HASH_SLOTS;
hash_head = &hash_slots[slot_index];
if (hash_head->first == NULL) {
/* no hash value */
return;
} else {
struct hlist_node *pos = NULL;
struct hlist_node *pre_pos = NULL;
struct sb_dns_request *type;
int cmp;
/* can't find do nothing */
hlist_for_each_entry(type, pos, hash_head, hash_node) {
if ((cmp = strcmp(type->host, host)) == 0) {
list_for_each_entry_safe (sess, next_sess, &type->sess_list_head, dns_list_node) {
if (status == SB_DNS_STATUS_CHAR_OK) {
memcpy(sess->server.net_address.ip, ip, ip_len);
sess->status = SB_DNS_STATUS_OK;
} else if (status == SB_DNS_STATUS_CHAR_FAIL || status == SB_DNS_STATUS_CHAR_TIMEOUT) {
sess->status = SB_DNS_STATUS_ERROR;
} else {
ErrorOutput("host[%s] dns request get unknown status [%c]", host, status);
sess->status = SB_DNS_STATUS_ERROR;
}
sb_dns_done_handler(env, sess);
list_del_init(&sess->dns_list_node);
}
} else if (cmp > 0) {
break;
}
}
}
}
return SB_OK;
}
int sb_proxy_discard_resp(struct sb_cycle_env *env, int msgtype)
{
struct dns_msg msg;
int rc;
while (1) {
memset(&msg, 0x00, sizeof(struct dns_msg));
rc = msg_recv(env->msgqueueid , msgtype + SB_REQREPMSG_TYPESPLIT, &msg, IPC_NOWAIT);
if (rc) {
if (errno != EAGAIN && errno != ENOMSG) {
ErrorOutput ("recv dns_msg errno[%d]-error[%s] [%d]", errno, strerror(errno), env->msgqueueid);
return SB_ERROR;
}
} else {
break;
}
}
return SB_OK;
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/lihuaib/sbalance.git
git@gitee.com:lihuaib/sbalance.git
lihuaib
sbalance
sbalance
master

搜索帮助