1 Star 0 Fork 1

shawhen/netlib

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
netlib.c 39.36 KB
一键复制 编辑 原始数据 按行查看 历史
shawhen 提交于 2015-06-11 18:40 . init
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685
// netlib.c
#include "netlib.h"
#ifdef _WIN32
#include <stdio.h>
#include <stdarg.h>
#include <assert.h>
#include <time.h>
#ifdef KS_DEBUG
static void warning(const char* fmt, ...) {
va_list args;
va_start(args, fmt);
vfprintf(stderr, fmt, args);
va_end(args);
}
#else
static inline void warning(const char* fmt, ...) {}
#endif
#ifndef WSAID_CONNECTEX
#define WSAID_ACCEPTEX {0xb5367df1,0xcbac,0x11cf,{0x95,0xca,0x00,0x80,0x5f,0x48,0xa1,0x92}}
#define WSAID_CONNECTEX {0x25a207b9,0xddf3,0x4660,{0x8e,0xe9,0x76,0xe5,0x8c,0x74,0x06,0x3e}}
#define WSAID_GETACCEPTEXSOCKADDRS {0xb5367df2,0xcbac,0x11cf,{0x95,0xca,0x00,0x80,0x5f,0x48,0xa1,0x92}}
#define SO_UPDATE_ACCEPT_CONTEXT 0x700B
#define SO_UPDATE_CONNECT_CONTEXT 0x7010
typedef
BOOL
(PASCAL FAR * LPFN_CONNECTEX) (
IN SOCKET s,
IN const struct sockaddr FAR *name,
IN int namelen,
IN PVOID lpSendBuffer OPTIONAL,
IN DWORD dwSendDataLength,
OUT LPDWORD lpdwBytesSent,
IN LPOVERLAPPED lpOverlapped
);
typedef BOOL (WINAPI *LPFN_DISCONNECTEX)(SOCKET, LPOVERLAPPED, DWORD, DWORD);
typedef
BOOL
(PASCAL FAR * LPFN_ACCEPTEX)(
IN SOCKET sListenSocket,
IN SOCKET sAcceptSocket,
IN PVOID lpOutputBuffer,
IN DWORD dwReceiveDataLength,
IN DWORD dwLocalAddressLength,
IN DWORD dwRemoteAddressLength,
OUT LPDWORD lpdwBytesReceived,
IN LPOVERLAPPED lpOverlapped
);
typedef
VOID
(PASCAL FAR * LPFN_GETACCEPTEXSOCKADDRS)(
IN PVOID lpOutputBuffer,
IN DWORD dwReceiveDataLength,
IN DWORD dwLocalAddressLength,
IN DWORD dwRemoteAddressLength,
OUT struct sockaddr **LocalSockaddr,
OUT LPINT LocalSockaddrLength,
OUT struct sockaddr **RemoteSockaddr,
OUT LPINT RemoteSockaddrLength
);
#endif
static LPFN_CONNECTEX get_connectex_fn(SOCKET fd) {
static LPFN_CONNECTEX lpfnConnectEx = NULL; // ConnectEx
static GUID guidConnectEx = WSAID_CONNECTEX; // ConnectEx function GUID
if( !lpfnConnectEx ) {
DWORD dwBytes = 0;
// get connectex function pointer
WSAIoctl( fd
, SIO_GET_EXTENSION_FUNCTION_POINTER
, &guidConnectEx
, sizeof(guidConnectEx)
, &lpfnConnectEx
, sizeof(lpfnConnectEx)
, &dwBytes
, NULL
,NULL );
}
return lpfnConnectEx;
}
static LPFN_ACCEPTEX get_acceptex_fn(SOCKET fd) {
static LPFN_ACCEPTEX lpfnAcceptEx = NULL; //AcceptEx
static GUID guidAcceptEx = WSAID_ACCEPTEX; //Accept function GUID
if( !lpfnAcceptEx ) {
DWORD dwBytes = 0;
//get acceptex function pointer
WSAIoctl( fd
, SIO_GET_EXTENSION_FUNCTION_POINTER
, &guidAcceptEx
, sizeof(guidAcceptEx)
, &lpfnAcceptEx
, sizeof(lpfnAcceptEx)
, &dwBytes
, NULL
, NULL );
}
return lpfnAcceptEx;
}
static LPFN_GETACCEPTEXSOCKADDRS get_getacceptexsockaddrs_fn(SOCKET fd) {
static LPFN_GETACCEPTEXSOCKADDRS lpfnGetAcceptExSockAddrs = NULL; //GetAcceptExSockAddrs
static GUID guidGetAcceptExSockAddrs = WSAID_GETACCEPTEXSOCKADDRS; //GetAcceptExSockAddrs function GUID
if( !lpfnGetAcceptExSockAddrs ) {
DWORD dwBytes = 0;
//get acceptex function pointer
WSAIoctl( fd
, SIO_GET_EXTENSION_FUNCTION_POINTER
, &guidGetAcceptExSockAddrs
, sizeof(guidGetAcceptExSockAddrs)
, &lpfnGetAcceptExSockAddrs
, sizeof(lpfnGetAcceptExSockAddrs)
, &dwBytes
, NULL
, NULL );
}
return lpfnGetAcceptExSockAddrs;
}
static inline KSIOCPOperator* iocp_operator_new(int struct_len, KSIOCPNotify notify) {
KSIOCPOperator* op = (KSIOCPOperator*)GlobalAlloc(GPTR, struct_len);
if( op ) {
ZeroMemory(op, struct_len);
((KSIOCPOperator*)op)->notify = notify;
}
return op;
}
static inline void iocp_operator_free(KSIOCPOperator* op) {
if( op )
GlobalFree((HGLOBAL)op);
}
// initialize self
BOOL ks_net_event_dispatcher_init(KSNetEventDispatcher* self, int timeout_ms) {
self->__completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
self->timeout_ms = timeout_ms;
return self->__completion_port != 0;
}
void ks_net_event_dispatcher_uninit(KSNetEventDispatcher* self) {
if( self->__completion_port ) {
CloseHandle( self->__completion_port );
self->__completion_port = 0;
}
}
void ks_net_event_dispatcher_dispatch(KSNetEventDispatcher* self) {
BOOL result;
DWORD transferred = 0;
ULONG_PTR key = 0;
LPOVERLAPPED op = 0;
int err = 0;
result = GetQueuedCompletionStatus( self->__completion_port
, &transferred
, &key
, &op
, self->timeout_ms );
if( !result ) {
err = GetLastError();
switch( err ) {
case ERROR_IO_PENDING:
return;
case WAIT_TIMEOUT:
return;
case ERROR_NOACCESS:
Sleep(self->timeout_ms);
return;
default:
break;
}
if( !op )
return;
}
(*(((KSIOCPOperator*)op)->notify))( (KSIOCPOperator*)op, transferred, (void*)key, err );
}
static BOOL ks_net_event_dispatcher_post(KSNetEventDispatcher* self, KSIOCPOperator* op, DWORD transferred, void* tag) {
ZeroMemory(&(op->ov), sizeof(OVERLAPPED));
return PostQueuedCompletionStatus( self->__completion_port, transferred, (ULONG_PTR)(tag), &(op->ov) );
}
static BOOL ks_net_listener_post_accept(KSNetListener* self, KSListenOperator* op) {
LPFN_ACCEPTEX accept_ex;
SOCKET self_fd = self->fd;
DWORD bytes = 0;
SOCKET fd;
if( self_fd==INVALID_SOCKET )
return FALSE;
assert( op );
accept_ex = get_acceptex_fn(self_fd);
if( !accept_ex )
return FALSE;
//fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
fd = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
if( fd==INVALID_SOCKET ) {
// create socket error!!!
return FALSE;
}
ZeroMemory((LPOVERLAPPED)op, sizeof(OVERLAPPED));
((KSListenOperator*)op)->new_fd = fd;
if( !(*accept_ex)( self_fd
, fd
, op->addr_buf
, 0
, sizeof(SOCKADDR_IN) + 16
, sizeof(SOCKADDR_IN) + 16
, &bytes
, (LPOVERLAPPED)op ) )
{
// call AcceptEx failed!
int err = GetLastError();
if( err != ERROR_IO_PENDING ) {
// AcceptEx failed!!!
closesocket(fd);
return FALSE;
}
}
return TRUE;
}
static void ks_net_listener_post_accepts(KSNetListener* self) {
int i;
for( i=0; i<self->__op_count; ++i )
if( self->__op_accepts[i]->new_fd==INVALID_SOCKET )
ks_net_listener_post_accept(self, self->__op_accepts[i]);
}
static void ks_net_listener_on_notify(KSIOCPOperator* op, DWORD transferred, void* key, int err) {
KSNetListener* self = (KSNetListener*)key;
KSListenOperator* lop = (KSListenOperator*)op;
SOCKET fd = self->fd;
if( err ) {
closesocket( lop->new_fd );
} else if( lop->new_fd != INVALID_SOCKET ) {
if( setsockopt( lop->new_fd
, SOL_SOCKET
, SO_UPDATE_ACCEPT_CONTEXT
, (const char*)&fd
, sizeof(SOCKET) )==SOCKET_ERROR )
{
closesocket( lop->new_fd );
} else {
LPFN_GETACCEPTEXSOCKADDRS lpfnGetAcceptExSockaddrs = get_getacceptexsockaddrs_fn(fd);
struct sockaddr* local_addr;
struct sockaddr* remote_addr;
int local_len;
int remote_len;
(*lpfnGetAcceptExSockaddrs)( lop->addr_buf
, 0
, sizeof(SOCKADDR_IN) + 16
, sizeof(SOCKADDR_IN) + 16
, &local_addr, &local_len
, &remote_addr, &remote_len
);
(*(self->notify))(self, lop->new_fd, remote_addr, remote_len);
}
}
lop->new_fd = INVALID_SOCKET;
if( fd != INVALID_SOCKET )
ks_net_listener_post_accept(self, lop);
}
BOOL ks_net_listener_create( KSNetListener* self
, const SOCKADDR* addr
, int listen_count
, KSNetListenerNotify notify )
{
int i;
self->__op_count = 0;
self->__op_accepts = 0;
self->fd = -1;
self->__op_count = listen_count;
self->__op_accepts = (KSListenOperator**)malloc(sizeof(KSListenOperator*) * listen_count);
if( !self->__op_accepts )
goto error_finish;
memset(&self->addr, 0, sizeof(struct sockaddr));
memcpy(&(self->bind_addr), addr, sizeof(struct sockaddr));
self->listen_count = listen_count;
for( i=0; i<self->__op_count; ++i ) {
self->__op_accepts[i] = (KSListenOperator*)iocp_operator_new(sizeof(KSListenOperator), ks_net_listener_on_notify);
if( self->__op_accepts[i]==0 ) {
for( ; i>=0; --i )
iocp_operator_free((KSIOCPOperator*)(self->__op_accepts[i]));
goto error_finish;
}
self->__op_accepts[i]->new_fd = INVALID_SOCKET;
}
self->notify = notify;
return TRUE;
error_finish:
if( self->__op_accepts ) {
free(self->__op_accepts);
self->__op_accepts = 0;
}
return FALSE;
}
void ks_net_listener_destroy(KSNetListener* self) {
int i;
if( !self )
return;
if( self->__op_accepts ) {
for( i=0; i<self->__op_count; ++i ) {
if( self->__op_accepts[i] ) {
if( self->__op_accepts[i]->new_fd != INVALID_SOCKET )
closesocket(self->__op_accepts[i]->new_fd);
iocp_operator_free((KSIOCPOperator*)(self->__op_accepts[i]));
}
}
free(self->__op_accepts);
self->__op_accepts = 0;
}
}
BOOL ks_net_listener_is_working(KSNetListener* self) {
int i;
if( self->fd!=INVALID_SOCKET )
return TRUE;
if( !self->__op_accepts )
return FALSE;
for( i=0; i<self->__op_count; ++i )
if( self->__op_accepts[i]->new_fd!=INVALID_SOCKET )
return TRUE;
return FALSE;
}
BOOL ks_net_listener_start(KSNetListener* self, KSNetEventDispatcher* dp) {
SOCKET fd;
BOOL reuse_opt = self->reuse_addr;
struct linger linger_opt = {1,0};
assert( self->fd == INVALID_SOCKET );
fd = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
if( fd==INVALID_SOCKET )
goto error_finish;
if( !CreateIoCompletionPort((HANDLE)fd, dp->__completion_port, (ULONG_PTR)self, 0 ) )
goto error_finish;
if( reuse_opt && ks_sock_setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse_opt, sizeof(reuse_opt))!=0 )
goto error_finish;
if( ks_sock_setsockopt(fd, SOL_SOCKET, SO_LINGER, &linger_opt, sizeof(linger_opt))!=0 )
goto error_finish;
if( bind(fd, &(self->bind_addr), sizeof(struct sockaddr))!=0 )
goto error_finish;
if( listen(fd, self->listen_count)!=0 )
goto error_finish;
if( ks_sock_getsockname(&self->addr, fd)!=0 )
goto error_finish;
self->fd = fd;
ks_net_listener_post_accepts(self);
return TRUE;
error_finish:
if( fd != INVALID_SOCKET ) {
closesocket(fd);
self->fd = INVALID_SOCKET;
}
return FALSE;
}
void ks_net_listener_stop(KSNetListener* self) {
SOCKET fd = self->fd;
if( fd != INVALID_SOCKET ) {
closesocket(fd);
self->fd = INVALID_SOCKET;
}
}
static inline void _net_channel_post_send(KSNetChannel* self) {
WSABUF wsabuf;
DWORD dwBytes = 0;
DWORD dwFlags = 0;
u_long len = (u_long)(self->spos - self->sbuf);
assert( !self->__op_send->busy );
assert( len > 0 );
ZeroMemory( self->__op_send, sizeof(OVERLAPPED));
wsabuf.len = len;
wsabuf.buf = self->sbuf;
if( WSASend(self->fd
, &wsabuf
, 1
, &dwBytes
, dwFlags
, (LPOVERLAPPED)(self->__op_send)
, NULL) != NO_ERROR )
{
int err = WSAGetLastError();
if( err!=ERROR_IO_PENDING ) {
ks_net_channel_close_full(self, KS_NET_CHANNEL_ST_SEND_IO_FAILED, err);
return;
}
}
self->__op_send->busy = TRUE;
}
static inline void _net_channel_post_recv(KSNetChannel* self) {
WSABUF wsabuf;
DWORD dwBytes;
DWORD dwFlags;
u_long len = (u_long)(self->rbuf + self->rlen - self->rpos);
assert( self->rpos );
assert( !self->__op_recv->busy );
ZeroMemory( self->__op_recv, sizeof(OVERLAPPED));
dwBytes = 0;
dwFlags = 0;
wsabuf.len = len;
wsabuf.buf = self->rpos;
if( WSARecv(self->fd
, &wsabuf
, 1
, &dwBytes
, &dwFlags
, (LPOVERLAPPED)(self->__op_recv)
, NULL) != NO_ERROR )
{
int err = WSAGetLastError();
if( err!=ERROR_IO_PENDING ) {
ks_net_channel_close_full(self, KS_NET_CHANNEL_ST_RECV_IO_FAILED, err);
return;
}
}
self->__op_recv->busy = TRUE;
}
static inline void _net_channel_on_send(KSNetChannel* self, DWORD transferred) {
char* p = self->sbuf + transferred;
size_t sz = self->spos - p;
assert( self->spos >= p );
memmove(self->sbuf, p, sz);
self->spos = self->sbuf + sz;
ks_net_channel_flush(self);
}
static inline void _net_channel_on_recv(KSNetChannel* self, DWORD transferred) {
KSNetChannelSink* sink = self->sink;
int len;
int sz;
char* p;
self->rpos += transferred;
if( sink->cb_rlimit_check ) {
size_t rlimit_speed = 0;
size_t rlimit_delay = 0;
DWORD now;
DWORD speed;
(*(sink->cb_rlimit_check))(self, &rlimit_speed, &rlimit_delay);
if( rlimit_speed > 0 ) {
now = GetTickCount();
if( now < (DWORD)(self->__rlimit_time) ) {
self->__rlimit_time = now;
self->__rlimit_size = 0;
} else if( (now - self->__rlimit_time) > 1000 ) {
speed = (DWORD)((self->__rlimit_size + transferred) * 1000 / (now - self->__rlimit_time + rlimit_delay));
self->__rlimit_time = now;
self->__rlimit_size = (uint32_t)(speed * rlimit_delay / 1000);
// g_debug("now count speed : %d %d !", now - self->__rlimit_time, speed);
if( speed > rlimit_speed ) {
if( sink->cb_rlimit )
(*(sink->cb_rlimit))(self, speed, now);
else
ks_net_channel_close_full(self, KS_NET_CHANNEL_ST_SPEED_LIMIT, (int)speed);
if( self->status!=KS_NET_CHANNEL_ST_CONNECTED )
return;
}
} else {
self->__rlimit_size += transferred;
}
}
}
assert( self->rpos >= self->rbuf );
assert( self->rpos <= self->rbuf + self->rlen );
p = self->rbuf;
// only ST_CONNECTED status need recv
//
while( self->status==KS_NET_CHANNEL_ST_CONNECTED ) {
sz = (int)(self->rpos - p);
if( sz==0 )
break;
if( sink->cb_parse_size==0 ) {
len = (int)sz;
} else {
len = (*(sink->cb_parse_size))(self, p, sz);
if( len==0 ) {
ks_net_channel_close(self, KS_NET_CHANNEL_ST_BAD_PACKET_SIZE);
break;
}
if( len < 0 )
break;
if( len > sz ) {
if( (size_t)len > self->rlen )
ks_net_channel_close(self, KS_NET_CHANNEL_ST_BAD_PACKET_SIZE);
break;
}
}
(*(sink->cb_packet))(self, p, len);
p += len;
}
if( self->status==KS_NET_CHANNEL_ST_CONNECTED ) {
if( p > self->rbuf ) {
sz = (int)(self->rpos - p);
memmove(self->rbuf, p, sz);
self->rpos = self->rbuf + sz;
}
_net_channel_post_recv(self);
}
}
static void ks_net_channel_on_notify(KSIOCPOperator* op, DWORD transferred, void* key, int err) {
KSNetChannel* self = (KSNetChannel*)key;
KSChannelOperator* cop = (KSChannelOperator*)op;
int st;
if( cop )
cop->busy = FALSE;
if( err ) {
if( self->status==KS_NET_CHANNEL_ST_CONNECTING ) {
ks_net_channel_close_full(self, KS_NET_CHANNEL_ST_CONNECT_FAILED, err);
} else if( ks_net_channel_is_working(self) ) {
ks_net_channel_close_full(self, (cop==self->__op_send) ? KS_NET_CHANNEL_ST_SEND_IO_FAILED : KS_NET_CHANNEL_ST_RECV_IO_FAILED, err);
}
} else if( self->status==KS_NET_CHANNEL_ST_CONNECTING ) {
self->status = KS_NET_CHANNEL_ST_CONNECTED;
ks_sock_getsockname(&self->sock_addr, self->fd);
ks_sock_getpeername(&self->peer_addr, self->fd);
self->__op_send->busy = FALSE;
self->__op_recv->busy = FALSE;
self->spos = self->sbuf;
self->rpos = self->rbuf;
(*(self->sink->cb_connected))(self);
if( ks_net_channel_is_working(self) ) {
self->__rlimit_time = GetTickCount();
self->__rlimit_size = 0;
_net_channel_post_recv(self);
}
} else if( ks_net_channel_is_working(self) ) {
if( transferred==0 ) {
ks_net_channel_close_full(self, KS_NET_CHANNEL_ST_PEER_SHUTDOWN, err);
} else if( cop==self->__op_send ) {
_net_channel_on_send(self, transferred);
} else {
_net_channel_on_recv(self, transferred);
}
}
if( self->__op_recv->busy || self->__op_send->busy )
return;
if( self->status==KS_NET_CHANNEL_ST_WAIT_PEER_CLOSE ) {
if( !self->__op_recv->busy )
_net_channel_post_recv(self);
}
if( !ks_net_channel_is_working(self) ) {
// NOTICE : must set status before cb_closed, channel maybe deleted after cb_close
st = self->status;
self->status = KS_NET_CHANNEL_ST_FREE;
(*(self->sink->cb_closed))(self, st);
}
}
BOOL ks_net_channel_init(KSNetChannel* self, KSNetChannelSink* sink) {
memset(self, 0, sizeof(KSNetChannel));
self->fd = INVALID_SOCKET;
self->status = KS_NET_CHANNEL_ST_FREE;
self->sink = sink;
self->__op_send = (KSChannelOperator*)iocp_operator_new(sizeof(KSChannelOperator), ks_net_channel_on_notify);
self->__op_recv = (KSChannelOperator*)iocp_operator_new(sizeof(KSChannelOperator), ks_net_channel_on_notify);
if( self->__op_send==0 || self->__op_recv==0 ) {
iocp_operator_free( (KSIOCPOperator*)(self->__op_send) );
iocp_operator_free( (KSIOCPOperator*)(self->__op_recv) );
return FALSE;
}
return TRUE;
}
void ks_net_channel_uninit(KSNetChannel* self) {
if( self->__op_send )
iocp_operator_free( (KSIOCPOperator*)(self->__op_send) );
if( self->__op_recv )
iocp_operator_free( (KSIOCPOperator*)(self->__op_recv) );
self->__op_send = 0;
self->__op_recv = 0;
}
BOOL ks_net_channel_connect_sync(KSNetChannel* self, KSNetEventDispatcher* dp, const SOCKADDR* addr) {
SOCKET fd;
//fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
fd = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
if( fd==INVALID_SOCKET )
return FALSE;
if( connect(fd, addr, sizeof(struct sockaddr))!=0 ) {
closesocket(fd);
return FALSE;
}
if( CreateIoCompletionPort((HANDLE)fd, dp->__completion_port, (ULONG_PTR)self, 0)==NULL ) {
closesocket(fd);
return FALSE;
}
self->fd = fd;
self->status = KS_NET_CHANNEL_ST_CONNECTING;
ks_net_channel_on_notify(0, 0, self, 0);
return self->fd != INVALID_SOCKET;
}
BOOL ks_net_channel_connect_async(KSNetChannel* self, KSNetEventDispatcher* dp, const SOCKADDR* addr) {
static char nouse[64];
struct sockaddr_in must_bind_addr;
LPFN_CONNECTEX connect_ex;
DWORD bytes;
SOCKET fd;
//fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
fd = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
if( fd==INVALID_SOCKET )
return FALSE;
must_bind_addr.sin_family = AF_INET;
must_bind_addr.sin_addr.s_addr = INADDR_ANY;
must_bind_addr.sin_port = 0;
if( bind(fd, (const struct sockaddr*)&must_bind_addr, sizeof(must_bind_addr))!=0 ) {
closesocket(fd);
return FALSE;
}
if( ks_sock_set_blocking(fd, FALSE)!=0 ) {
closesocket(fd);
return FALSE;
}
connect_ex = get_connectex_fn(fd);
if( !connect_ex ) {
closesocket(fd);
return FALSE;
}
if( CreateIoCompletionPort((HANDLE)fd, dp->__completion_port, (ULONG_PTR)self, 0)==NULL ) {
closesocket(fd);
return FALSE;
}
self->fd = fd;
self->status = KS_NET_CHANNEL_ST_CONNECTING;
assert( !self->__op_send->busy );
ZeroMemory( self->__op_send, sizeof(OVERLAPPED));
if( !(*connect_ex)(fd
, addr
, sizeof(struct sockaddr)
, nouse
, 0
, &bytes
, (LPOVERLAPPED)(self->__op_send)) )
{
int err = WSAGetLastError();
if( err!=ERROR_IO_PENDING ) {
self->fd = INVALID_SOCKET;
self->status = KS_NET_CHANNEL_ST_FREE;
self->err = err;
closesocket(fd);
return FALSE;
}
}
self->__op_send->busy = TRUE;
return TRUE;
}
BOOL ks_net_channel_connect_finish(KSNetChannel* self, KSNetEventDispatcher* dp, KSSockFD fd) {
if( setsockopt(fd
, SOL_SOCKET
, SO_UPDATE_CONNECT_CONTEXT
, NULL
, 0)==SOCKET_ERROR )
{
return FALSE;
}
if( CreateIoCompletionPort((HANDLE)fd
, dp->__completion_port
, (ULONG_PTR)self
, 0)==NULL )
{
return FALSE;
}
self->fd = fd;
self->status = KS_NET_CHANNEL_ST_CONNECTING;
ks_net_channel_on_notify(0, 0, self, 0);
return TRUE; // status may not connected, but on_closed invoked
}
BOOL ks_net_channel_write_all_full(KSNetChannel* self, const void* buf, size_t len, const void* header_buf, size_t header_len) {
size_t all_len = len + header_len;
if( all_len > self->slen )
return FALSE;
if( !ks_net_channel_is_working(self) )
return FALSE;
// NOTICE : check outside if need, not in here
//
//if( ks_net_channel_sending_busy(self) )
// return FALSE;
if( all_len > ks_net_channel_sbuf_remain(self) )
return FALSE;
memcpy(self->spos, header_buf, header_len);
self->spos += header_len;
memcpy(self->spos, buf, len);
self->spos += len;
return TRUE;
}
size_t ks_net_channel_write(KSNetChannel* self, const void* buf, size_t len) {
size_t remain;
if( !ks_net_channel_is_working(self) )
return 0;
// NOTICE : check outside, not in here
//
//if( ks_net_channel_sending_busy(self) )
// return 0;
remain = ks_net_channel_sbuf_remain(self);
if( len > remain )
len = remain;
memcpy(self->spos, buf, len);
self->spos += len;
return len;
}
void ks_net_channel_flush(KSNetChannel* self) {
if( ks_net_channel_is_working(self) ) {
(*(self->sink->cb_flush))(self);
if( !(self->__op_send->busy) && (self->sbuf < self->spos) )
_net_channel_post_send(self);
}
}
void ks_net_channel_close_full(KSNetChannel* self, int status, int err) {
if( status==KS_NET_CHANNEL_ST_WAIT_PEER_CLOSE ) {
if( self->status==KS_NET_CHANNEL_ST_CONNECTED ) {
self->status = status;
return;
} else if( self->status==KS_NET_CHANNEL_ST_WAIT_PEER_CLOSE ) {
return;
}
status = KS_NET_CHANNEL_ST_CLOSED;
}
assert( status > 0 );
if( ks_net_channel_is_working(self) ) {
self->status = status;
self->err = err;
closesocket( self->fd );
self->fd = INVALID_SOCKET;
}
}
static void net_signal_on_notify(KSIOCPOperator* op, DWORD transferred, void* key, int err) {
KSNetSingal* self = (KSNetSingal*)key;
(*(self->notify_fun))(self->notify_tag);
}
BOOL ks_net_signal_init(KSNetSingal* self, KSNetEventDispatcher* dp, KSNetSingalNotify notify_fun, void* notify_tag) {
self->__notify_op = iocp_operator_new(sizeof(KSIOCPOperator), net_signal_on_notify);
if( self->__notify_op ) {
self->dp = dp;
self->notify_fun = notify_fun;
self->notify_tag = notify_tag;
return TRUE;
}
iocp_operator_free(self->__notify_op);
return FALSE;
}
void ks_net_signal_uninit(KSNetSingal* self) {
if( self->__notify_op ) {
iocp_operator_free(self->__notify_op);
self->__notify_op = NULL;
}
}
void ks_net_signal(KSNetSingal* self) {
if( !ks_net_event_dispatcher_post(self->dp, self->__notify_op, 0, self) )
warning("warning : post singal failed!\n");
}
#else//LINUX
#include <sys/time.h>
#include <fcntl.h>
#include <pthread.h>
#include <errno.h>
#include <assert.h>
#include <stdio.h>
BOOL ks_net_event_dispatcher_init(KSNetEventDispatcher* self, int timeout_ms) {
int epoll_fd;
int fd_num;
struct epoll_event* events;
fd_num = 2048;
epoll_fd = epoll_create(fd_num);
if( epoll_fd == -1) {
perror("create epoll fd failed!");
return FALSE;
}
events = (struct epoll_event*)malloc(sizeof(struct epoll_event) * fd_num);
if( !events ) {
perror("new epoll events array failed!");
close(epoll_fd);
return FALSE;
}
memset(events, 0, (sizeof(struct epoll_event) * fd_num));
self->__epoll_fd = epoll_fd;
self->__fd_num = fd_num;
self->__events = events;
self->__async_events = &(self->__async_events_end_node);
self->timeout_ms = timeout_ms;
return TRUE;
}
void ks_net_event_dispatcher_uninit(KSNetEventDispatcher* self) {
if( self->__epoll_fd != -1 )
close(self->__epoll_fd);
if( self->__events )
free(self->__events);
}
void ks_net_event_dispatcher_dispatch(KSNetEventDispatcher* self) {
int i, nfds;
struct epoll_event* ev;
KSEpollNotifer* notifier;
KSEpollNotifer* p;
KSEpollNotifer* end;
assert( self->__epoll_fd!=-1 );
assert( self->__events != 0 );
end = &(self->__async_events_end_node);
p = self->__async_events;
self->__async_events = end;
while( p != end ) {
notifier = p;
p = p->async_event_next;
notifier->async_event_next = 0;
(*(notifier->cb_notify))(notifier, 0);
}
nfds = epoll_wait(self->__epoll_fd, self->__events, self->__fd_num, self->timeout_ms);
if( nfds < 0 ) {
if( errno!=0 && errno!=EINTR ) {
perror("epoll wait error!");
}
}
for( i=0; i<nfds; ++i ) {
ev = self->__events + i;
notifier = (KSEpollNotifer*)ev->data.ptr;
assert( notifier );
(*(notifier->cb_notify))(notifier, ev->events);
}
}
static void ks_net_listener_on_notify(KSEpollNotifer* notifier, uint32_t events) {
KSNetListener* self = (KSNetListener*)notifier;
struct sockaddr addr;
socklen_t addr_len;
int new_fd;
while( self->fd != -1 ) {
addr_len = sizeof(addr);
new_fd = accept(self->fd,&addr, &addr_len);
if( new_fd == -1 ) {
if( errno==EAGAIN )
return;
if( errno==EINTR )
continue;
return;
}
(*(self->notify))(self, new_fd, &addr, (int)addr_len);
}
}
BOOL ks_net_listener_create( KSNetListener* self
, const struct sockaddr* addr
, int listen_count
, KSNetListenerNotify notify )
{
((KSEpollNotifer*)self)->cb_notify = ks_net_listener_on_notify;
self->fd = -1;
self->notify = notify;
memset(&self->addr, 0, sizeof(struct sockaddr));
memcpy(&(self->bind_addr), addr, sizeof(struct sockaddr));
self->listen_count = listen_count;
self->reuse_addr = FALSE;
return TRUE;
}
void ks_net_listener_destroy(KSNetListener* self) {
}
BOOL ks_net_listener_is_working(KSNetListener* self) {
return self->fd != -1;
}
BOOL ks_net_listener_start(KSNetListener* self, KSNetEventDispatcher* dp) {
int fd;
struct epoll_event ev;
BOOL reuse_opt = TRUE;
struct linger linger_opt = {1,0};
fd = socket(AF_INET, SOCK_STREAM, 0);
if( fd==-1 )
goto error_finish;
if( ks_sock_set_blocking(fd, 0)!=0 )
goto error_finish;
if( ks_sock_setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse_opt, sizeof(reuse_opt))!=0 )
goto error_finish;
if( ks_sock_setsockopt(fd, SOL_SOCKET, SO_LINGER, &linger_opt, sizeof(linger_opt))!=0 )
goto error_finish;
if( bind(fd, &(self->bind_addr), sizeof(struct sockaddr))!=0 )
goto error_finish;
if( listen(fd, self->listen_count)!=0 )
goto error_finish;
if( ks_sock_getsockname(&self->addr, fd)!=0 )
goto error_finish;
memset(&ev, 0, sizeof(ev));
ev.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLET;
ev.data.ptr = (void*)self;
if( epoll_ctl(dp->__epoll_fd, EPOLL_CTL_ADD, fd, &ev)!=0 )
goto error_finish;
self->fd = fd;
return TRUE;
error_finish:
if( fd )
close(fd);
return FALSE;
}
void ks_net_listener_stop(KSNetListener* self) {
if( self->fd != -1 ) {
close(self->fd);
self->fd = -1;
}
}
static inline void _net_channel_on_writeable(KSNetChannel* self) {
int sent;
int sz;
// g_debug("on writeable %d/%d", sz, main_service->messages_memory_size);
sz = self->spos - self->sbuf;
while( sz > 0 ) {
sent = send(self->fd, self->sbuf, sz, 0);
if( sent < 0 ) {
if( errno==EAGAIN )
return;
if( errno==EINTR )
continue;
ks_net_channel_close_full(self, KS_NET_CHANNEL_ST_SEND_IO_FAILED, errno);
return;
} else if( sent==0 ) {
ks_net_channel_close(self, KS_NET_CHANNEL_ST_PEER_SHUTDOWN);
return;
} else if( sent < sz ) {
sz -= sent;
memmove(self->sbuf, self->sbuf + sent, sz);
self->spos = self->sbuf + sz;
return;
} else {
self->spos = self->sbuf;
break;
}
}
ks_net_channel_flush(self);
}
static inline void _net_channel_on_readable(KSNetChannel* self) {
KSNetChannelSink* sink = self->sink;
size_t sz;
int len;
char* p;
while( self->status==KS_NET_CHANNEL_ST_CONNECTED ) {
sz = self->rlen - (self->rpos - self->rbuf);
len = recv(self->fd, self->rpos, sz, 0);
if( len < 0 ) {
if( errno==EAGAIN )
return;
if( errno==EINTR )
continue;
ks_net_channel_close_full(self, KS_NET_CHANNEL_ST_RECV_IO_FAILED, errno);
return;
} else if( len==0 ) {
ks_net_channel_close(self, KS_NET_CHANNEL_ST_PEER_SHUTDOWN);
return;
}
self->rpos += len;
assert( self->rpos >= self->rbuf );
assert( self->rpos <= self->rbuf + self->rlen );
if( sink->cb_rlimit_check ) {
size_t rlimit_speed = 0;
size_t rlimit_delay = 0;
(*(sink->cb_rlimit_check))(self, &rlimit_speed, &rlimit_delay);
if( rlimit_speed > 0 ) {
struct timeval tv;
uint32_t speed;
uint32_t now;
gettimeofday(&tv, 0);
now = tv.tv_sec*1000 + tv.tv_usec/1000;
if( now <= self->__rlimit_time ) {
self->__rlimit_time = now;
self->__rlimit_size = 0;
} else if( (now - self->__rlimit_time) > 1000 ) {
speed = (self->__rlimit_size + len) * 1000 / (now - self->__rlimit_time + rlimit_delay);
self->__rlimit_time = now;
self->__rlimit_size = speed * rlimit_delay / 1000;
// g_debug("now count speed : %d %d !", now - self->__rlimit_time, speed);
if( speed > rlimit_speed ) {
if( sink->cb_rlimit )
(*(sink->cb_rlimit))(self, speed, now);
else
ks_net_channel_close_full(self, KS_NET_CHANNEL_ST_SPEED_LIMIT, speed);
if( ks_net_channel_is_working(self) )
return;
}
} else {
self->__rlimit_size += len;
}
}
}
if( self->status==KS_NET_CHANNEL_ST_CONNECTED ) {
p = self->rbuf;
while( self->status==KS_NET_CHANNEL_ST_CONNECTED ) {
sz = self->rpos - p;
if( sz==0 )
break;
if( sink->cb_parse_size==0 ) {
len = (int)sz;
} else {
len = (*(sink->cb_parse_size))(self, p, sz);
if( len==0 ) {
ks_net_channel_close(self, KS_NET_CHANNEL_ST_BAD_PACKET_SIZE);
return;
}
if( len < 0 )
break;
if( (size_t)len > sz ) {
if( (size_t)len > self->rlen )
ks_net_channel_close(self, KS_NET_CHANNEL_ST_BAD_PACKET_SIZE);
break;
}
}
(*(sink->cb_packet))(self, p, len);
p += len;
}
if( self->status==KS_NET_CHANNEL_ST_CONNECTED ) {
if( p > self->rbuf ) {
sz = self->rpos - p;
memmove(self->rbuf, p, sz);
self->rpos = self->rbuf + sz;
}
}
} else {
// wait peer close
self->rpos = self->rbuf;
}
}
}
// under Linux 2.6.17
#ifndef EPOLLRDHUP
#define EPOLLRDHUP 0
#endif
static void ks_net_channel_on_notify(KSEpollNotifer* notifier, uint32_t events) {
KSNetChannel* self = (KSNetChannel*)notifier;
int st;
assert( self->sink );
if( events==0 ) {
assert( !ks_net_channel_is_working(self) );
// NOTICE : must set status before cb_closed, channel maybe deleted after cb_close
st = self->status;
self->status = KS_NET_CHANNEL_ST_FREE;
(*(self->sink->cb_closed))(self, st);
} else if( events & (EPOLLRDHUP|EPOLLHUP|EPOLLERR) ) {
if( events & EPOLLRDHUP ) {
// read all remain data
if( events & EPOLLIN )
_net_channel_on_readable(self);
ks_net_channel_close_full(self, KS_NET_CHANNEL_ST_PEER_SHUTDOWN, errno);
} else {
// client force exit, errno also equal EAGAIN
//
// if( errno!=EAGAIN ) {
if( self->status==KS_NET_CHANNEL_ST_CONNECTING )
ks_net_channel_close_full(self, KS_NET_CHANNEL_ST_CONNECT_FAILED, errno);
else
ks_net_channel_close_full(self, KS_NET_CHANNEL_ST_IO_ERROR, errno);
// }
}
} else {
if( events & EPOLLIN )
_net_channel_on_readable(self);
if( events & EPOLLOUT ) {
if( self->status==KS_NET_CHANNEL_ST_CONNECTING ) {
struct epoll_event ev;
memset(&ev, 0, sizeof(ev));
ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP; // | EPOLLHUP | EPOLLERR;
ev.data.ptr = notifier;
epoll_ctl(self->__dp->__epoll_fd, EPOLL_CTL_MOD, self->fd, &ev);
self->status = KS_NET_CHANNEL_ST_CONNECTED;
ks_sock_getsockname(&self->sock_addr, self->fd);
ks_sock_getpeername(&self->peer_addr, self->fd);
self->spos = self->sbuf;
self->rpos = self->rbuf;
(*(self->sink->cb_connected))(self);
} else {
_net_channel_on_writeable(self);
}
}
}
}
BOOL ks_net_channel_init(KSNetChannel* self, KSNetChannelSink* sink) {
memset(self, 0, sizeof(KSNetChannel));
self->fd = -1;
self->status = KS_NET_CHANNEL_ST_FREE;
self->sink = sink;
((KSEpollNotifer*)self)->cb_notify = ks_net_channel_on_notify;
return TRUE;
}
void ks_net_channel_uninit(KSNetChannel* self) {
}
BOOL ks_net_channel_connect_sync(KSNetChannel* self, KSNetEventDispatcher* dp, const struct sockaddr* addr) {
int fd;
struct epoll_event ev;
fd = socket(AF_INET, SOCK_STREAM, 0);
if( fd==-1 )
return FALSE;
if( connect(fd, addr, sizeof(struct sockaddr))!=0 ) {
close(fd);
return FALSE;
}
if( ks_sock_set_blocking(fd, 0)!=0 ) {
close(fd);
return FALSE;
}
// defualt nodelay
ks_sock_set_tcp_nodelay(fd, TRUE);
memset(&ev, 0, sizeof(ev));
ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
ev.data.ptr = (void*)self;
if( epoll_ctl(dp->__epoll_fd, EPOLL_CTL_ADD, fd, &ev)!=0 ) {
close(fd);
return FALSE;
}
self->__dp = dp;
self->fd = fd;
self->status = KS_NET_CHANNEL_ST_CONNECTING;
ks_net_channel_on_notify((KSEpollNotifer*)self, EPOLLOUT);
return self->fd != -1;
}
BOOL ks_net_channel_connect_async(KSNetChannel* self, KSNetEventDispatcher* dp, const struct sockaddr* addr) {
int fd;
struct epoll_event ev;
fd = socket(AF_INET, SOCK_STREAM, 0);
if( fd==-1 )
return FALSE;
if( ks_sock_set_blocking(fd, 0)!=0 ) {
close(fd);
return FALSE;
}
// defualt nodelay
ks_sock_set_tcp_nodelay(fd, TRUE);
memset(&ev, 0, sizeof(ev));
ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLHUP | EPOLLERR | EPOLLRDHUP;
ev.data.ptr = (void*)self;
if( epoll_ctl(dp->__epoll_fd, EPOLL_CTL_ADD, fd, &ev)!=0 ) {
close(fd);
return FALSE;
}
if( connect(fd, addr, sizeof(struct sockaddr))!=0 ) {
if( errno != EINPROGRESS ) {
ks_sock_close(fd);
return FALSE;
}
}
self->__dp = dp;
self->fd = fd;
self->status = KS_NET_CHANNEL_ST_CONNECTING;
return TRUE;
}
BOOL ks_net_channel_connect_finish(KSNetChannel* self, KSNetEventDispatcher* dp, KSSockFD fd) {
struct epoll_event ev;
if( ks_sock_set_blocking(fd, 0)!=0 )
return FALSE;
// defualt nodelay
ks_sock_set_tcp_nodelay(fd, TRUE);
memset(&ev, 0, sizeof(ev));
ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP; // | EPOLLERR | EPOLLHUP;
ev.data.ptr = (void*)self;
if( epoll_ctl(dp->__epoll_fd, EPOLL_CTL_ADD, fd, &ev)!=0 )
return FALSE;
self->__dp = dp;
self->fd = fd;
self->status = KS_NET_CHANNEL_ST_CONNECTING;
ks_net_channel_on_notify((KSEpollNotifer*)self, EPOLLOUT);
return TRUE;
}
BOOL ks_net_channel_write_all_full(KSNetChannel* self, const void* buf, size_t len, const void* header_buf, size_t header_len) {
int sent;
size_t all_len = len + header_len;
if( all_len > self->slen )
return FALSE;
if( !ks_net_channel_is_working(self) )
return FALSE;
if( ks_net_channel_sending_busy(self) ) {
size_t remain = ks_net_channel_sbuf_remain(self);
if( all_len > remain )
return FALSE;
goto copy_to_sbuf;
}
while( header_len ) {
sent = send(self->fd, header_buf, header_len, 0);
if( sent < 0 ) {
if( errno==EAGAIN ) {
sent = 0;
} else if( errno==EINTR ) {
continue;
} else {
ks_net_channel_close_full(self, KS_NET_CHANNEL_ST_SEND_IO_FAILED, errno);
return FALSE;
}
} else if( sent==0 ) {
ks_net_channel_close(self, KS_NET_CHANNEL_ST_PEER_SHUTDOWN);
return FALSE;
}
if( (size_t)sent < header_len ) {
header_buf = ((const char*)header_buf) + sent;
header_len -= (size_t)sent;
goto copy_to_sbuf;
}
break;
}
while( len ) {
sent = send(self->fd, buf, len, 0);
if( sent < 0 ) {
if( errno==EAGAIN ) {
sent = 0;
} else if( errno==EINTR ) {
continue;
} else {
ks_net_channel_close_full(self, KS_NET_CHANNEL_ST_SEND_IO_FAILED, errno);
return FALSE;
}
sent = 0;
} else if( sent==0 ) {
ks_net_channel_close(self, KS_NET_CHANNEL_ST_PEER_SHUTDOWN);
return FALSE;
}
if( (size_t)sent < len ) {
buf = ((const char*)buf) + sent;
len -= (size_t)sent;
goto copy_buf_to_sbuf;
}
return TRUE;
}
copy_to_sbuf:
memcpy(self->spos, header_buf, header_len);
self->spos += header_len;
copy_buf_to_sbuf:
memcpy(self->spos, buf, len);
self->spos += len;
return TRUE;
}
size_t ks_net_channel_write(KSNetChannel* self, const void* buf, size_t len) {
size_t remain;
int sent = 0;
if( !ks_net_channel_is_working(self) )
return 0;
if( ks_net_channel_sending_busy(self) ) {
sent = 0;
} else {
while( len ) {
sent = send(self->fd, buf, len, 0);
if( sent < 0 ) {
if( errno==EAGAIN ) {
sent = 0;
} else if( errno==EINTR ) {
continue;
} else {
ks_net_channel_close_full(self, KS_NET_CHANNEL_ST_SEND_IO_FAILED, errno);
return 0;
}
sent = 0;
} else if( sent==0 ) {
ks_net_channel_close(self, KS_NET_CHANNEL_ST_PEER_SHUTDOWN);
return 0;
}
break;
}
}
if( (size_t)sent < len ) {
buf = ((const char*)buf) + sent;
len -= (size_t)sent;
remain = ks_net_channel_sbuf_remain(self);
if( len > remain )
len = remain;
memcpy(self->spos, buf, len);
self->spos += len;
len += (size_t)sent;
} else {
len = (size_t)sent;
}
return len;
}
void ks_net_channel_flush(KSNetChannel* self) {
if( ks_net_channel_is_working(self) ) {
(*(self->sink->cb_flush))(self);
}
}
void ks_net_channel_close_full(KSNetChannel* self, int status, int err) {
if( status==KS_NET_CHANNEL_ST_WAIT_PEER_CLOSE ) {
if( self->status==KS_NET_CHANNEL_ST_CONNECTED ) {
// TODO : now linux version net test WAIT_PEER_CLOSE function!!!
// shutdown(self->fd, SHUT_RD);
self->status = status;
return;
} else if( self->status==KS_NET_CHANNEL_ST_WAIT_PEER_CLOSE ) {
return;
}
status = KS_NET_CHANNEL_ST_CLOSED;
}
assert( status > 0 );
if( ks_net_channel_is_working(self) ) {
self->status = status;
self->err = err;
ks_sock_close( self->fd );
self->fd = -1;
if( !((KSEpollNotifer*)self)->async_event_next ) {
((KSEpollNotifer*)self)->async_event_next = self->__dp->__async_events;
self->__dp->__async_events = (KSEpollNotifer*)self;
}
}
}
static void net_signal_on_notify(KSEpollNotifer* notifier, uint32_t events) {
KSNetSingal* self = (KSNetSingal*)notifier;
char buf[1024];
int readed;
if( events & (EPOLLHUP | EPOLLERR) ) {
perror("why? i can't believe it\n");
return;
}
assert( events & EPOLLIN );
for(;;) {
readed = recv(self->__notify_fds[1], buf, 1024, 0);
if( readed < 0 ) {
if( errno==EAGAIN ) {
break;
} else if( errno==EINTR ) {
continue;
} else {
perror("why? ignore errno\n");
break;
}
} else if( readed==0 ) {
perror("why? ignore peer shutdown\n");
break;
}
}
(*(self->notify_fun))(self->notify_tag);
}
BOOL ks_net_signal_init(KSNetSingal* self, KSNetEventDispatcher* dp, KSNetSingalNotify notify_fun, void* notify_tag) {
memset(self, 0, sizeof(KSNetSingal));
self->__notify_fds[0] = -1;
self->__notify_fds[1] = -1;
if( socketpair(AF_UNIX, SOCK_STREAM, 0, self->__notify_fds)==0 ) {
struct epoll_event ev;
memset(&ev, 0, sizeof(ev));
ev.events = EPOLLIN | EPOLLET;
ev.data.ptr = (void*)self;
if( epoll_ctl(dp->__epoll_fd, EPOLL_CTL_ADD, self->__notify_fds[1], &ev)==0 ) {
ks_sock_set_blocking(self->__notify_fds[0], 0);
ks_sock_set_blocking(self->__notify_fds[1], 0);
ks_sock_set_tcp_nodelay(self->__notify_fds[0], TRUE);
ks_sock_set_tcp_nodelay(self->__notify_fds[1], TRUE);
((KSEpollNotifer*)self)->cb_notify = net_signal_on_notify;
self->dp = dp;
self->notify_fun = notify_fun;
self->notify_tag = notify_tag;
return TRUE;
} else {
ks_sock_close(self->__notify_fds[0]);
ks_sock_close(self->__notify_fds[1]);
}
}
return FALSE;
}
void ks_net_signal_uninit(KSNetSingal* self) {
if( self->__notify_fds[0] != -1 ) {
ks_sock_close(self->__notify_fds[0]);
self->__notify_fds[0] = -1;
}
if( self->__notify_fds[1] != -1 ) {
ks_sock_close(self->__notify_fds[1]);
self->__notify_fds[1] = -1;
}
}
void ks_net_signal(KSNetSingal* self) {
if( send(self->__notify_fds[0], "!", 1, 0)<=0 )
perror("warning : post singal error!\n");
}
#endif//_WIN32
const char* ks_net_channel_fetch_status_desc(int status) {
switch( status ) {
case KS_NET_CHANNEL_ST_CLOSED: return "normal closed";
case KS_NET_CHANNEL_ST_CONNECT_FAILED: return "connect failed";
case KS_NET_CHANNEL_ST_CONNECT_FAILED_TIMEOUT: return "connect timeout";
case KS_NET_CHANNEL_ST_PEER_SHUTDOWN: return "peer shutdown";
case KS_NET_CHANNEL_ST_SEND_IO_FAILED: return "send() return -1";
case KS_NET_CHANNEL_ST_RECV_IO_FAILED: return "recv() return -1";
case KS_NET_CHANNEL_ST_IO_ERROR: return "io error";
case KS_NET_CHANNEL_ST_SPEED_LIMIT: return "speed limit";
case KS_NET_CHANNEL_ST_BAD_PACKET_SIZE: return "parse size error";
case KS_NET_CHANNEL_ST_ERROR_REVERSED_0:
case KS_NET_CHANNEL_ST_ERROR_REVERSED_1:
case KS_NET_CHANNEL_ST_ERROR_REVERSED_2:
case KS_NET_CHANNEL_ST_ERROR_REVERSED_3:
case KS_NET_CHANNEL_ST_ERROR_REVERSED_4:
case KS_NET_CHANNEL_ST_ERROR_REVERSED_5: return "reversed error";
case KS_NET_CHANNEL_ST_ERROR: return "logic error";
default: break;
}
return "user defined";
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
C
1
https://gitee.com/shawhen/netlib.git
git@gitee.com:shawhen/netlib.git
shawhen
netlib
netlib
master

搜索帮助

0d507c66 1850385 C8b1a773 1850385