代码拉取完成,页面将自动刷新
// netlib.c
#include "netlib.h"
#ifdef _WIN32
#include <stdio.h>
#include <stdarg.h>
#include <assert.h>
#include <time.h>
#ifdef KS_DEBUG
static void warning(const char* fmt, ...) {
va_list args;
va_start(args, fmt);
vfprintf(stderr, fmt, args);
va_end(args);
}
#else
static inline void warning(const char* fmt, ...) {}
#endif
#ifndef WSAID_CONNECTEX
#define WSAID_ACCEPTEX {0xb5367df1,0xcbac,0x11cf,{0x95,0xca,0x00,0x80,0x5f,0x48,0xa1,0x92}}
#define WSAID_CONNECTEX {0x25a207b9,0xddf3,0x4660,{0x8e,0xe9,0x76,0xe5,0x8c,0x74,0x06,0x3e}}
#define WSAID_GETACCEPTEXSOCKADDRS {0xb5367df2,0xcbac,0x11cf,{0x95,0xca,0x00,0x80,0x5f,0x48,0xa1,0x92}}
#define SO_UPDATE_ACCEPT_CONTEXT 0x700B
#define SO_UPDATE_CONNECT_CONTEXT 0x7010
typedef
BOOL
(PASCAL FAR * LPFN_CONNECTEX) (
IN SOCKET s,
IN const struct sockaddr FAR *name,
IN int namelen,
IN PVOID lpSendBuffer OPTIONAL,
IN DWORD dwSendDataLength,
OUT LPDWORD lpdwBytesSent,
IN LPOVERLAPPED lpOverlapped
);
typedef BOOL (WINAPI *LPFN_DISCONNECTEX)(SOCKET, LPOVERLAPPED, DWORD, DWORD);
typedef
BOOL
(PASCAL FAR * LPFN_ACCEPTEX)(
IN SOCKET sListenSocket,
IN SOCKET sAcceptSocket,
IN PVOID lpOutputBuffer,
IN DWORD dwReceiveDataLength,
IN DWORD dwLocalAddressLength,
IN DWORD dwRemoteAddressLength,
OUT LPDWORD lpdwBytesReceived,
IN LPOVERLAPPED lpOverlapped
);
typedef
VOID
(PASCAL FAR * LPFN_GETACCEPTEXSOCKADDRS)(
IN PVOID lpOutputBuffer,
IN DWORD dwReceiveDataLength,
IN DWORD dwLocalAddressLength,
IN DWORD dwRemoteAddressLength,
OUT struct sockaddr **LocalSockaddr,
OUT LPINT LocalSockaddrLength,
OUT struct sockaddr **RemoteSockaddr,
OUT LPINT RemoteSockaddrLength
);
#endif
static LPFN_CONNECTEX get_connectex_fn(SOCKET fd) {
static LPFN_CONNECTEX lpfnConnectEx = NULL; // ConnectEx
static GUID guidConnectEx = WSAID_CONNECTEX; // ConnectEx function GUID
if( !lpfnConnectEx ) {
DWORD dwBytes = 0;
// get connectex function pointer
WSAIoctl( fd
, SIO_GET_EXTENSION_FUNCTION_POINTER
, &guidConnectEx
, sizeof(guidConnectEx)
, &lpfnConnectEx
, sizeof(lpfnConnectEx)
, &dwBytes
, NULL
,NULL );
}
return lpfnConnectEx;
}
static LPFN_ACCEPTEX get_acceptex_fn(SOCKET fd) {
static LPFN_ACCEPTEX lpfnAcceptEx = NULL; //AcceptEx
static GUID guidAcceptEx = WSAID_ACCEPTEX; //Accept function GUID
if( !lpfnAcceptEx ) {
DWORD dwBytes = 0;
//get acceptex function pointer
WSAIoctl( fd
, SIO_GET_EXTENSION_FUNCTION_POINTER
, &guidAcceptEx
, sizeof(guidAcceptEx)
, &lpfnAcceptEx
, sizeof(lpfnAcceptEx)
, &dwBytes
, NULL
, NULL );
}
return lpfnAcceptEx;
}
static LPFN_GETACCEPTEXSOCKADDRS get_getacceptexsockaddrs_fn(SOCKET fd) {
static LPFN_GETACCEPTEXSOCKADDRS lpfnGetAcceptExSockAddrs = NULL; //GetAcceptExSockAddrs
static GUID guidGetAcceptExSockAddrs = WSAID_GETACCEPTEXSOCKADDRS; //GetAcceptExSockAddrs function GUID
if( !lpfnGetAcceptExSockAddrs ) {
DWORD dwBytes = 0;
//get acceptex function pointer
WSAIoctl( fd
, SIO_GET_EXTENSION_FUNCTION_POINTER
, &guidGetAcceptExSockAddrs
, sizeof(guidGetAcceptExSockAddrs)
, &lpfnGetAcceptExSockAddrs
, sizeof(lpfnGetAcceptExSockAddrs)
, &dwBytes
, NULL
, NULL );
}
return lpfnGetAcceptExSockAddrs;
}
static inline KSIOCPOperator* iocp_operator_new(int struct_len, KSIOCPNotify notify) {
KSIOCPOperator* op = (KSIOCPOperator*)GlobalAlloc(GPTR, struct_len);
if( op ) {
ZeroMemory(op, struct_len);
((KSIOCPOperator*)op)->notify = notify;
}
return op;
}
static inline void iocp_operator_free(KSIOCPOperator* op) {
if( op )
GlobalFree((HGLOBAL)op);
}
// initialize self
BOOL ks_net_event_dispatcher_init(KSNetEventDispatcher* self, int timeout_ms) {
self->__completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
self->timeout_ms = timeout_ms;
return self->__completion_port != 0;
}
void ks_net_event_dispatcher_uninit(KSNetEventDispatcher* self) {
if( self->__completion_port ) {
CloseHandle( self->__completion_port );
self->__completion_port = 0;
}
}
void ks_net_event_dispatcher_dispatch(KSNetEventDispatcher* self) {
BOOL result;
DWORD transferred = 0;
ULONG_PTR key = 0;
LPOVERLAPPED op = 0;
int err = 0;
result = GetQueuedCompletionStatus( self->__completion_port
, &transferred
, &key
, &op
, self->timeout_ms );
if( !result ) {
err = GetLastError();
switch( err ) {
case ERROR_IO_PENDING:
return;
case WAIT_TIMEOUT:
return;
case ERROR_NOACCESS:
Sleep(self->timeout_ms);
return;
default:
break;
}
if( !op )
return;
}
(*(((KSIOCPOperator*)op)->notify))( (KSIOCPOperator*)op, transferred, (void*)key, err );
}
static BOOL ks_net_event_dispatcher_post(KSNetEventDispatcher* self, KSIOCPOperator* op, DWORD transferred, void* tag) {
ZeroMemory(&(op->ov), sizeof(OVERLAPPED));
return PostQueuedCompletionStatus( self->__completion_port, transferred, (ULONG_PTR)(tag), &(op->ov) );
}
static BOOL ks_net_listener_post_accept(KSNetListener* self, KSListenOperator* op) {
LPFN_ACCEPTEX accept_ex;
SOCKET self_fd = self->fd;
DWORD bytes = 0;
SOCKET fd;
if( self_fd==INVALID_SOCKET )
return FALSE;
assert( op );
accept_ex = get_acceptex_fn(self_fd);
if( !accept_ex )
return FALSE;
//fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
fd = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
if( fd==INVALID_SOCKET ) {
// create socket error!!!
return FALSE;
}
ZeroMemory((LPOVERLAPPED)op, sizeof(OVERLAPPED));
((KSListenOperator*)op)->new_fd = fd;
if( !(*accept_ex)( self_fd
, fd
, op->addr_buf
, 0
, sizeof(SOCKADDR_IN) + 16
, sizeof(SOCKADDR_IN) + 16
, &bytes
, (LPOVERLAPPED)op ) )
{
// call AcceptEx failed!
int err = GetLastError();
if( err != ERROR_IO_PENDING ) {
// AcceptEx failed!!!
closesocket(fd);
return FALSE;
}
}
return TRUE;
}
static void ks_net_listener_post_accepts(KSNetListener* self) {
int i;
for( i=0; i<self->__op_count; ++i )
if( self->__op_accepts[i]->new_fd==INVALID_SOCKET )
ks_net_listener_post_accept(self, self->__op_accepts[i]);
}
static void ks_net_listener_on_notify(KSIOCPOperator* op, DWORD transferred, void* key, int err) {
KSNetListener* self = (KSNetListener*)key;
KSListenOperator* lop = (KSListenOperator*)op;
SOCKET fd = self->fd;
if( err ) {
closesocket( lop->new_fd );
} else if( lop->new_fd != INVALID_SOCKET ) {
if( setsockopt( lop->new_fd
, SOL_SOCKET
, SO_UPDATE_ACCEPT_CONTEXT
, (const char*)&fd
, sizeof(SOCKET) )==SOCKET_ERROR )
{
closesocket( lop->new_fd );
} else {
LPFN_GETACCEPTEXSOCKADDRS lpfnGetAcceptExSockaddrs = get_getacceptexsockaddrs_fn(fd);
struct sockaddr* local_addr;
struct sockaddr* remote_addr;
int local_len;
int remote_len;
(*lpfnGetAcceptExSockaddrs)( lop->addr_buf
, 0
, sizeof(SOCKADDR_IN) + 16
, sizeof(SOCKADDR_IN) + 16
, &local_addr, &local_len
, &remote_addr, &remote_len
);
(*(self->notify))(self, lop->new_fd, remote_addr, remote_len);
}
}
lop->new_fd = INVALID_SOCKET;
if( fd != INVALID_SOCKET )
ks_net_listener_post_accept(self, lop);
}
BOOL ks_net_listener_create( KSNetListener* self
, const SOCKADDR* addr
, int listen_count
, KSNetListenerNotify notify )
{
int i;
self->__op_count = 0;
self->__op_accepts = 0;
self->fd = -1;
self->__op_count = listen_count;
self->__op_accepts = (KSListenOperator**)malloc(sizeof(KSListenOperator*) * listen_count);
if( !self->__op_accepts )
goto error_finish;
memset(&self->addr, 0, sizeof(struct sockaddr));
memcpy(&(self->bind_addr), addr, sizeof(struct sockaddr));
self->listen_count = listen_count;
for( i=0; i<self->__op_count; ++i ) {
self->__op_accepts[i] = (KSListenOperator*)iocp_operator_new(sizeof(KSListenOperator), ks_net_listener_on_notify);
if( self->__op_accepts[i]==0 ) {
for( ; i>=0; --i )
iocp_operator_free((KSIOCPOperator*)(self->__op_accepts[i]));
goto error_finish;
}
self->__op_accepts[i]->new_fd = INVALID_SOCKET;
}
self->notify = notify;
return TRUE;
error_finish:
if( self->__op_accepts ) {
free(self->__op_accepts);
self->__op_accepts = 0;
}
return FALSE;
}
void ks_net_listener_destroy(KSNetListener* self) {
int i;
if( !self )
return;
if( self->__op_accepts ) {
for( i=0; i<self->__op_count; ++i ) {
if( self->__op_accepts[i] ) {
if( self->__op_accepts[i]->new_fd != INVALID_SOCKET )
closesocket(self->__op_accepts[i]->new_fd);
iocp_operator_free((KSIOCPOperator*)(self->__op_accepts[i]));
}
}
free(self->__op_accepts);
self->__op_accepts = 0;
}
}
BOOL ks_net_listener_is_working(KSNetListener* self) {
int i;
if( self->fd!=INVALID_SOCKET )
return TRUE;
if( !self->__op_accepts )
return FALSE;
for( i=0; i<self->__op_count; ++i )
if( self->__op_accepts[i]->new_fd!=INVALID_SOCKET )
return TRUE;
return FALSE;
}
BOOL ks_net_listener_start(KSNetListener* self, KSNetEventDispatcher* dp) {
SOCKET fd;
BOOL reuse_opt = self->reuse_addr;
struct linger linger_opt = {1,0};
assert( self->fd == INVALID_SOCKET );
fd = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
if( fd==INVALID_SOCKET )
goto error_finish;
if( !CreateIoCompletionPort((HANDLE)fd, dp->__completion_port, (ULONG_PTR)self, 0 ) )
goto error_finish;
if( reuse_opt && ks_sock_setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse_opt, sizeof(reuse_opt))!=0 )
goto error_finish;
if( ks_sock_setsockopt(fd, SOL_SOCKET, SO_LINGER, &linger_opt, sizeof(linger_opt))!=0 )
goto error_finish;
if( bind(fd, &(self->bind_addr), sizeof(struct sockaddr))!=0 )
goto error_finish;
if( listen(fd, self->listen_count)!=0 )
goto error_finish;
if( ks_sock_getsockname(&self->addr, fd)!=0 )
goto error_finish;
self->fd = fd;
ks_net_listener_post_accepts(self);
return TRUE;
error_finish:
if( fd != INVALID_SOCKET ) {
closesocket(fd);
self->fd = INVALID_SOCKET;
}
return FALSE;
}
void ks_net_listener_stop(KSNetListener* self) {
SOCKET fd = self->fd;
if( fd != INVALID_SOCKET ) {
closesocket(fd);
self->fd = INVALID_SOCKET;
}
}
static inline void _net_channel_post_send(KSNetChannel* self) {
WSABUF wsabuf;
DWORD dwBytes = 0;
DWORD dwFlags = 0;
u_long len = (u_long)(self->spos - self->sbuf);
assert( !self->__op_send->busy );
assert( len > 0 );
ZeroMemory( self->__op_send, sizeof(OVERLAPPED));
wsabuf.len = len;
wsabuf.buf = self->sbuf;
if( WSASend(self->fd
, &wsabuf
, 1
, &dwBytes
, dwFlags
, (LPOVERLAPPED)(self->__op_send)
, NULL) != NO_ERROR )
{
int err = WSAGetLastError();
if( err!=ERROR_IO_PENDING ) {
ks_net_channel_close_full(self, KS_NET_CHANNEL_ST_SEND_IO_FAILED, err);
return;
}
}
self->__op_send->busy = TRUE;
}
static inline void _net_channel_post_recv(KSNetChannel* self) {
WSABUF wsabuf;
DWORD dwBytes;
DWORD dwFlags;
u_long len = (u_long)(self->rbuf + self->rlen - self->rpos);
assert( self->rpos );
assert( !self->__op_recv->busy );
ZeroMemory( self->__op_recv, sizeof(OVERLAPPED));
dwBytes = 0;
dwFlags = 0;
wsabuf.len = len;
wsabuf.buf = self->rpos;
if( WSARecv(self->fd
, &wsabuf
, 1
, &dwBytes
, &dwFlags
, (LPOVERLAPPED)(self->__op_recv)
, NULL) != NO_ERROR )
{
int err = WSAGetLastError();
if( err!=ERROR_IO_PENDING ) {
ks_net_channel_close_full(self, KS_NET_CHANNEL_ST_RECV_IO_FAILED, err);
return;
}
}
self->__op_recv->busy = TRUE;
}
static inline void _net_channel_on_send(KSNetChannel* self, DWORD transferred) {
char* p = self->sbuf + transferred;
size_t sz = self->spos - p;
assert( self->spos >= p );
memmove(self->sbuf, p, sz);
self->spos = self->sbuf + sz;
ks_net_channel_flush(self);
}
static inline void _net_channel_on_recv(KSNetChannel* self, DWORD transferred) {
KSNetChannelSink* sink = self->sink;
int len;
int sz;
char* p;
self->rpos += transferred;
if( sink->cb_rlimit_check ) {
size_t rlimit_speed = 0;
size_t rlimit_delay = 0;
DWORD now;
DWORD speed;
(*(sink->cb_rlimit_check))(self, &rlimit_speed, &rlimit_delay);
if( rlimit_speed > 0 ) {
now = GetTickCount();
if( now < (DWORD)(self->__rlimit_time) ) {
self->__rlimit_time = now;
self->__rlimit_size = 0;
} else if( (now - self->__rlimit_time) > 1000 ) {
speed = (DWORD)((self->__rlimit_size + transferred) * 1000 / (now - self->__rlimit_time + rlimit_delay));
self->__rlimit_time = now;
self->__rlimit_size = (uint32_t)(speed * rlimit_delay / 1000);
// g_debug("now count speed : %d %d !", now - self->__rlimit_time, speed);
if( speed > rlimit_speed ) {
if( sink->cb_rlimit )
(*(sink->cb_rlimit))(self, speed, now);
else
ks_net_channel_close_full(self, KS_NET_CHANNEL_ST_SPEED_LIMIT, (int)speed);
if( self->status!=KS_NET_CHANNEL_ST_CONNECTED )
return;
}
} else {
self->__rlimit_size += transferred;
}
}
}
assert( self->rpos >= self->rbuf );
assert( self->rpos <= self->rbuf + self->rlen );
p = self->rbuf;
// only ST_CONNECTED status need recv
//
while( self->status==KS_NET_CHANNEL_ST_CONNECTED ) {
sz = (int)(self->rpos - p);
if( sz==0 )
break;
if( sink->cb_parse_size==0 ) {
len = (int)sz;
} else {
len = (*(sink->cb_parse_size))(self, p, sz);
if( len==0 ) {
ks_net_channel_close(self, KS_NET_CHANNEL_ST_BAD_PACKET_SIZE);
break;
}
if( len < 0 )
break;
if( len > sz ) {
if( (size_t)len > self->rlen )
ks_net_channel_close(self, KS_NET_CHANNEL_ST_BAD_PACKET_SIZE);
break;
}
}
(*(sink->cb_packet))(self, p, len);
p += len;
}
if( self->status==KS_NET_CHANNEL_ST_CONNECTED ) {
if( p > self->rbuf ) {
sz = (int)(self->rpos - p);
memmove(self->rbuf, p, sz);
self->rpos = self->rbuf + sz;
}
_net_channel_post_recv(self);
}
}
static void ks_net_channel_on_notify(KSIOCPOperator* op, DWORD transferred, void* key, int err) {
KSNetChannel* self = (KSNetChannel*)key;
KSChannelOperator* cop = (KSChannelOperator*)op;
int st;
if( cop )
cop->busy = FALSE;
if( err ) {
if( self->status==KS_NET_CHANNEL_ST_CONNECTING ) {
ks_net_channel_close_full(self, KS_NET_CHANNEL_ST_CONNECT_FAILED, err);
} else if( ks_net_channel_is_working(self) ) {
ks_net_channel_close_full(self, (cop==self->__op_send) ? KS_NET_CHANNEL_ST_SEND_IO_FAILED : KS_NET_CHANNEL_ST_RECV_IO_FAILED, err);
}
} else if( self->status==KS_NET_CHANNEL_ST_CONNECTING ) {
self->status = KS_NET_CHANNEL_ST_CONNECTED;
ks_sock_getsockname(&self->sock_addr, self->fd);
ks_sock_getpeername(&self->peer_addr, self->fd);
self->__op_send->busy = FALSE;
self->__op_recv->busy = FALSE;
self->spos = self->sbuf;
self->rpos = self->rbuf;
(*(self->sink->cb_connected))(self);
if( ks_net_channel_is_working(self) ) {
self->__rlimit_time = GetTickCount();
self->__rlimit_size = 0;
_net_channel_post_recv(self);
}
} else if( ks_net_channel_is_working(self) ) {
if( transferred==0 ) {
ks_net_channel_close_full(self, KS_NET_CHANNEL_ST_PEER_SHUTDOWN, err);
} else if( cop==self->__op_send ) {
_net_channel_on_send(self, transferred);
} else {
_net_channel_on_recv(self, transferred);
}
}
if( self->__op_recv->busy || self->__op_send->busy )
return;
if( self->status==KS_NET_CHANNEL_ST_WAIT_PEER_CLOSE ) {
if( !self->__op_recv->busy )
_net_channel_post_recv(self);
}
if( !ks_net_channel_is_working(self) ) {
// NOTICE : must set status before cb_closed, channel maybe deleted after cb_close
st = self->status;
self->status = KS_NET_CHANNEL_ST_FREE;
(*(self->sink->cb_closed))(self, st);
}
}
BOOL ks_net_channel_init(KSNetChannel* self, KSNetChannelSink* sink) {
memset(self, 0, sizeof(KSNetChannel));
self->fd = INVALID_SOCKET;
self->status = KS_NET_CHANNEL_ST_FREE;
self->sink = sink;
self->__op_send = (KSChannelOperator*)iocp_operator_new(sizeof(KSChannelOperator), ks_net_channel_on_notify);
self->__op_recv = (KSChannelOperator*)iocp_operator_new(sizeof(KSChannelOperator), ks_net_channel_on_notify);
if( self->__op_send==0 || self->__op_recv==0 ) {
iocp_operator_free( (KSIOCPOperator*)(self->__op_send) );
iocp_operator_free( (KSIOCPOperator*)(self->__op_recv) );
return FALSE;
}
return TRUE;
}
void ks_net_channel_uninit(KSNetChannel* self) {
if( self->__op_send )
iocp_operator_free( (KSIOCPOperator*)(self->__op_send) );
if( self->__op_recv )
iocp_operator_free( (KSIOCPOperator*)(self->__op_recv) );
self->__op_send = 0;
self->__op_recv = 0;
}
BOOL ks_net_channel_connect_sync(KSNetChannel* self, KSNetEventDispatcher* dp, const SOCKADDR* addr) {
SOCKET fd;
//fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
fd = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
if( fd==INVALID_SOCKET )
return FALSE;
if( connect(fd, addr, sizeof(struct sockaddr))!=0 ) {
closesocket(fd);
return FALSE;
}
if( CreateIoCompletionPort((HANDLE)fd, dp->__completion_port, (ULONG_PTR)self, 0)==NULL ) {
closesocket(fd);
return FALSE;
}
self->fd = fd;
self->status = KS_NET_CHANNEL_ST_CONNECTING;
ks_net_channel_on_notify(0, 0, self, 0);
return self->fd != INVALID_SOCKET;
}
BOOL ks_net_channel_connect_async(KSNetChannel* self, KSNetEventDispatcher* dp, const SOCKADDR* addr) {
static char nouse[64];
struct sockaddr_in must_bind_addr;
LPFN_CONNECTEX connect_ex;
DWORD bytes;
SOCKET fd;
//fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
fd = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
if( fd==INVALID_SOCKET )
return FALSE;
must_bind_addr.sin_family = AF_INET;
must_bind_addr.sin_addr.s_addr = INADDR_ANY;
must_bind_addr.sin_port = 0;
if( bind(fd, (const struct sockaddr*)&must_bind_addr, sizeof(must_bind_addr))!=0 ) {
closesocket(fd);
return FALSE;
}
if( ks_sock_set_blocking(fd, FALSE)!=0 ) {
closesocket(fd);
return FALSE;
}
connect_ex = get_connectex_fn(fd);
if( !connect_ex ) {
closesocket(fd);
return FALSE;
}
if( CreateIoCompletionPort((HANDLE)fd, dp->__completion_port, (ULONG_PTR)self, 0)==NULL ) {
closesocket(fd);
return FALSE;
}
self->fd = fd;
self->status = KS_NET_CHANNEL_ST_CONNECTING;
assert( !self->__op_send->busy );
ZeroMemory( self->__op_send, sizeof(OVERLAPPED));
if( !(*connect_ex)(fd
, addr
, sizeof(struct sockaddr)
, nouse
, 0
, &bytes
, (LPOVERLAPPED)(self->__op_send)) )
{
int err = WSAGetLastError();
if( err!=ERROR_IO_PENDING ) {
self->fd = INVALID_SOCKET;
self->status = KS_NET_CHANNEL_ST_FREE;
self->err = err;
closesocket(fd);
return FALSE;
}
}
self->__op_send->busy = TRUE;
return TRUE;
}
BOOL ks_net_channel_connect_finish(KSNetChannel* self, KSNetEventDispatcher* dp, KSSockFD fd) {
if( setsockopt(fd
, SOL_SOCKET
, SO_UPDATE_CONNECT_CONTEXT
, NULL
, 0)==SOCKET_ERROR )
{
return FALSE;
}
if( CreateIoCompletionPort((HANDLE)fd
, dp->__completion_port
, (ULONG_PTR)self
, 0)==NULL )
{
return FALSE;
}
self->fd = fd;
self->status = KS_NET_CHANNEL_ST_CONNECTING;
ks_net_channel_on_notify(0, 0, self, 0);
return TRUE; // status may not connected, but on_closed invoked
}
BOOL ks_net_channel_write_all_full(KSNetChannel* self, const void* buf, size_t len, const void* header_buf, size_t header_len) {
size_t all_len = len + header_len;
if( all_len > self->slen )
return FALSE;
if( !ks_net_channel_is_working(self) )
return FALSE;
// NOTICE : check outside if need, not in here
//
//if( ks_net_channel_sending_busy(self) )
// return FALSE;
if( all_len > ks_net_channel_sbuf_remain(self) )
return FALSE;
memcpy(self->spos, header_buf, header_len);
self->spos += header_len;
memcpy(self->spos, buf, len);
self->spos += len;
return TRUE;
}
size_t ks_net_channel_write(KSNetChannel* self, const void* buf, size_t len) {
size_t remain;
if( !ks_net_channel_is_working(self) )
return 0;
// NOTICE : check outside, not in here
//
//if( ks_net_channel_sending_busy(self) )
// return 0;
remain = ks_net_channel_sbuf_remain(self);
if( len > remain )
len = remain;
memcpy(self->spos, buf, len);
self->spos += len;
return len;
}
void ks_net_channel_flush(KSNetChannel* self) {
if( ks_net_channel_is_working(self) ) {
(*(self->sink->cb_flush))(self);
if( !(self->__op_send->busy) && (self->sbuf < self->spos) )
_net_channel_post_send(self);
}
}
void ks_net_channel_close_full(KSNetChannel* self, int status, int err) {
if( status==KS_NET_CHANNEL_ST_WAIT_PEER_CLOSE ) {
if( self->status==KS_NET_CHANNEL_ST_CONNECTED ) {
self->status = status;
return;
} else if( self->status==KS_NET_CHANNEL_ST_WAIT_PEER_CLOSE ) {
return;
}
status = KS_NET_CHANNEL_ST_CLOSED;
}
assert( status > 0 );
if( ks_net_channel_is_working(self) ) {
self->status = status;
self->err = err;
closesocket( self->fd );
self->fd = INVALID_SOCKET;
}
}
static void net_signal_on_notify(KSIOCPOperator* op, DWORD transferred, void* key, int err) {
KSNetSingal* self = (KSNetSingal*)key;
(*(self->notify_fun))(self->notify_tag);
}
BOOL ks_net_signal_init(KSNetSingal* self, KSNetEventDispatcher* dp, KSNetSingalNotify notify_fun, void* notify_tag) {
self->__notify_op = iocp_operator_new(sizeof(KSIOCPOperator), net_signal_on_notify);
if( self->__notify_op ) {
self->dp = dp;
self->notify_fun = notify_fun;
self->notify_tag = notify_tag;
return TRUE;
}
iocp_operator_free(self->__notify_op);
return FALSE;
}
void ks_net_signal_uninit(KSNetSingal* self) {
if( self->__notify_op ) {
iocp_operator_free(self->__notify_op);
self->__notify_op = NULL;
}
}
void ks_net_signal(KSNetSingal* self) {
if( !ks_net_event_dispatcher_post(self->dp, self->__notify_op, 0, self) )
warning("warning : post singal failed!\n");
}
#else//LINUX
#include <sys/time.h>
#include <fcntl.h>
#include <pthread.h>
#include <errno.h>
#include <assert.h>
#include <stdio.h>
BOOL ks_net_event_dispatcher_init(KSNetEventDispatcher* self, int timeout_ms) {
int epoll_fd;
int fd_num;
struct epoll_event* events;
fd_num = 2048;
epoll_fd = epoll_create(fd_num);
if( epoll_fd == -1) {
perror("create epoll fd failed!");
return FALSE;
}
events = (struct epoll_event*)malloc(sizeof(struct epoll_event) * fd_num);
if( !events ) {
perror("new epoll events array failed!");
close(epoll_fd);
return FALSE;
}
memset(events, 0, (sizeof(struct epoll_event) * fd_num));
self->__epoll_fd = epoll_fd;
self->__fd_num = fd_num;
self->__events = events;
self->__async_events = &(self->__async_events_end_node);
self->timeout_ms = timeout_ms;
return TRUE;
}
void ks_net_event_dispatcher_uninit(KSNetEventDispatcher* self) {
if( self->__epoll_fd != -1 )
close(self->__epoll_fd);
if( self->__events )
free(self->__events);
}
void ks_net_event_dispatcher_dispatch(KSNetEventDispatcher* self) {
int i, nfds;
struct epoll_event* ev;
KSEpollNotifer* notifier;
KSEpollNotifer* p;
KSEpollNotifer* end;
assert( self->__epoll_fd!=-1 );
assert( self->__events != 0 );
end = &(self->__async_events_end_node);
p = self->__async_events;
self->__async_events = end;
while( p != end ) {
notifier = p;
p = p->async_event_next;
notifier->async_event_next = 0;
(*(notifier->cb_notify))(notifier, 0);
}
nfds = epoll_wait(self->__epoll_fd, self->__events, self->__fd_num, self->timeout_ms);
if( nfds < 0 ) {
if( errno!=0 && errno!=EINTR ) {
perror("epoll wait error!");
}
}
for( i=0; i<nfds; ++i ) {
ev = self->__events + i;
notifier = (KSEpollNotifer*)ev->data.ptr;
assert( notifier );
(*(notifier->cb_notify))(notifier, ev->events);
}
}
static void ks_net_listener_on_notify(KSEpollNotifer* notifier, uint32_t events) {
KSNetListener* self = (KSNetListener*)notifier;
struct sockaddr addr;
socklen_t addr_len;
int new_fd;
while( self->fd != -1 ) {
addr_len = sizeof(addr);
new_fd = accept(self->fd,&addr, &addr_len);
if( new_fd == -1 ) {
if( errno==EAGAIN )
return;
if( errno==EINTR )
continue;
return;
}
(*(self->notify))(self, new_fd, &addr, (int)addr_len);
}
}
BOOL ks_net_listener_create( KSNetListener* self
, const struct sockaddr* addr
, int listen_count
, KSNetListenerNotify notify )
{
((KSEpollNotifer*)self)->cb_notify = ks_net_listener_on_notify;
self->fd = -1;
self->notify = notify;
memset(&self->addr, 0, sizeof(struct sockaddr));
memcpy(&(self->bind_addr), addr, sizeof(struct sockaddr));
self->listen_count = listen_count;
self->reuse_addr = FALSE;
return TRUE;
}
void ks_net_listener_destroy(KSNetListener* self) {
}
BOOL ks_net_listener_is_working(KSNetListener* self) {
return self->fd != -1;
}
BOOL ks_net_listener_start(KSNetListener* self, KSNetEventDispatcher* dp) {
int fd;
struct epoll_event ev;
BOOL reuse_opt = TRUE;
struct linger linger_opt = {1,0};
fd = socket(AF_INET, SOCK_STREAM, 0);
if( fd==-1 )
goto error_finish;
if( ks_sock_set_blocking(fd, 0)!=0 )
goto error_finish;
if( ks_sock_setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse_opt, sizeof(reuse_opt))!=0 )
goto error_finish;
if( ks_sock_setsockopt(fd, SOL_SOCKET, SO_LINGER, &linger_opt, sizeof(linger_opt))!=0 )
goto error_finish;
if( bind(fd, &(self->bind_addr), sizeof(struct sockaddr))!=0 )
goto error_finish;
if( listen(fd, self->listen_count)!=0 )
goto error_finish;
if( ks_sock_getsockname(&self->addr, fd)!=0 )
goto error_finish;
memset(&ev, 0, sizeof(ev));
ev.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLET;
ev.data.ptr = (void*)self;
if( epoll_ctl(dp->__epoll_fd, EPOLL_CTL_ADD, fd, &ev)!=0 )
goto error_finish;
self->fd = fd;
return TRUE;
error_finish:
if( fd )
close(fd);
return FALSE;
}
void ks_net_listener_stop(KSNetListener* self) {
if( self->fd != -1 ) {
close(self->fd);
self->fd = -1;
}
}
static inline void _net_channel_on_writeable(KSNetChannel* self) {
int sent;
int sz;
// g_debug("on writeable %d/%d", sz, main_service->messages_memory_size);
sz = self->spos - self->sbuf;
while( sz > 0 ) {
sent = send(self->fd, self->sbuf, sz, 0);
if( sent < 0 ) {
if( errno==EAGAIN )
return;
if( errno==EINTR )
continue;
ks_net_channel_close_full(self, KS_NET_CHANNEL_ST_SEND_IO_FAILED, errno);
return;
} else if( sent==0 ) {
ks_net_channel_close(self, KS_NET_CHANNEL_ST_PEER_SHUTDOWN);
return;
} else if( sent < sz ) {
sz -= sent;
memmove(self->sbuf, self->sbuf + sent, sz);
self->spos = self->sbuf + sz;
return;
} else {
self->spos = self->sbuf;
break;
}
}
ks_net_channel_flush(self);
}
static inline void _net_channel_on_readable(KSNetChannel* self) {
KSNetChannelSink* sink = self->sink;
size_t sz;
int len;
char* p;
while( self->status==KS_NET_CHANNEL_ST_CONNECTED ) {
sz = self->rlen - (self->rpos - self->rbuf);
len = recv(self->fd, self->rpos, sz, 0);
if( len < 0 ) {
if( errno==EAGAIN )
return;
if( errno==EINTR )
continue;
ks_net_channel_close_full(self, KS_NET_CHANNEL_ST_RECV_IO_FAILED, errno);
return;
} else if( len==0 ) {
ks_net_channel_close(self, KS_NET_CHANNEL_ST_PEER_SHUTDOWN);
return;
}
self->rpos += len;
assert( self->rpos >= self->rbuf );
assert( self->rpos <= self->rbuf + self->rlen );
if( sink->cb_rlimit_check ) {
size_t rlimit_speed = 0;
size_t rlimit_delay = 0;
(*(sink->cb_rlimit_check))(self, &rlimit_speed, &rlimit_delay);
if( rlimit_speed > 0 ) {
struct timeval tv;
uint32_t speed;
uint32_t now;
gettimeofday(&tv, 0);
now = tv.tv_sec*1000 + tv.tv_usec/1000;
if( now <= self->__rlimit_time ) {
self->__rlimit_time = now;
self->__rlimit_size = 0;
} else if( (now - self->__rlimit_time) > 1000 ) {
speed = (self->__rlimit_size + len) * 1000 / (now - self->__rlimit_time + rlimit_delay);
self->__rlimit_time = now;
self->__rlimit_size = speed * rlimit_delay / 1000;
// g_debug("now count speed : %d %d !", now - self->__rlimit_time, speed);
if( speed > rlimit_speed ) {
if( sink->cb_rlimit )
(*(sink->cb_rlimit))(self, speed, now);
else
ks_net_channel_close_full(self, KS_NET_CHANNEL_ST_SPEED_LIMIT, speed);
if( ks_net_channel_is_working(self) )
return;
}
} else {
self->__rlimit_size += len;
}
}
}
if( self->status==KS_NET_CHANNEL_ST_CONNECTED ) {
p = self->rbuf;
while( self->status==KS_NET_CHANNEL_ST_CONNECTED ) {
sz = self->rpos - p;
if( sz==0 )
break;
if( sink->cb_parse_size==0 ) {
len = (int)sz;
} else {
len = (*(sink->cb_parse_size))(self, p, sz);
if( len==0 ) {
ks_net_channel_close(self, KS_NET_CHANNEL_ST_BAD_PACKET_SIZE);
return;
}
if( len < 0 )
break;
if( (size_t)len > sz ) {
if( (size_t)len > self->rlen )
ks_net_channel_close(self, KS_NET_CHANNEL_ST_BAD_PACKET_SIZE);
break;
}
}
(*(sink->cb_packet))(self, p, len);
p += len;
}
if( self->status==KS_NET_CHANNEL_ST_CONNECTED ) {
if( p > self->rbuf ) {
sz = self->rpos - p;
memmove(self->rbuf, p, sz);
self->rpos = self->rbuf + sz;
}
}
} else {
// wait peer close
self->rpos = self->rbuf;
}
}
}
// under Linux 2.6.17
#ifndef EPOLLRDHUP
#define EPOLLRDHUP 0
#endif
static void ks_net_channel_on_notify(KSEpollNotifer* notifier, uint32_t events) {
KSNetChannel* self = (KSNetChannel*)notifier;
int st;
assert( self->sink );
if( events==0 ) {
assert( !ks_net_channel_is_working(self) );
// NOTICE : must set status before cb_closed, channel maybe deleted after cb_close
st = self->status;
self->status = KS_NET_CHANNEL_ST_FREE;
(*(self->sink->cb_closed))(self, st);
} else if( events & (EPOLLRDHUP|EPOLLHUP|EPOLLERR) ) {
if( events & EPOLLRDHUP ) {
// read all remain data
if( events & EPOLLIN )
_net_channel_on_readable(self);
ks_net_channel_close_full(self, KS_NET_CHANNEL_ST_PEER_SHUTDOWN, errno);
} else {
// client force exit, errno also equal EAGAIN
//
// if( errno!=EAGAIN ) {
if( self->status==KS_NET_CHANNEL_ST_CONNECTING )
ks_net_channel_close_full(self, KS_NET_CHANNEL_ST_CONNECT_FAILED, errno);
else
ks_net_channel_close_full(self, KS_NET_CHANNEL_ST_IO_ERROR, errno);
// }
}
} else {
if( events & EPOLLIN )
_net_channel_on_readable(self);
if( events & EPOLLOUT ) {
if( self->status==KS_NET_CHANNEL_ST_CONNECTING ) {
struct epoll_event ev;
memset(&ev, 0, sizeof(ev));
ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP; // | EPOLLHUP | EPOLLERR;
ev.data.ptr = notifier;
epoll_ctl(self->__dp->__epoll_fd, EPOLL_CTL_MOD, self->fd, &ev);
self->status = KS_NET_CHANNEL_ST_CONNECTED;
ks_sock_getsockname(&self->sock_addr, self->fd);
ks_sock_getpeername(&self->peer_addr, self->fd);
self->spos = self->sbuf;
self->rpos = self->rbuf;
(*(self->sink->cb_connected))(self);
} else {
_net_channel_on_writeable(self);
}
}
}
}
BOOL ks_net_channel_init(KSNetChannel* self, KSNetChannelSink* sink) {
memset(self, 0, sizeof(KSNetChannel));
self->fd = -1;
self->status = KS_NET_CHANNEL_ST_FREE;
self->sink = sink;
((KSEpollNotifer*)self)->cb_notify = ks_net_channel_on_notify;
return TRUE;
}
void ks_net_channel_uninit(KSNetChannel* self) {
}
BOOL ks_net_channel_connect_sync(KSNetChannel* self, KSNetEventDispatcher* dp, const struct sockaddr* addr) {
int fd;
struct epoll_event ev;
fd = socket(AF_INET, SOCK_STREAM, 0);
if( fd==-1 )
return FALSE;
if( connect(fd, addr, sizeof(struct sockaddr))!=0 ) {
close(fd);
return FALSE;
}
if( ks_sock_set_blocking(fd, 0)!=0 ) {
close(fd);
return FALSE;
}
// defualt nodelay
ks_sock_set_tcp_nodelay(fd, TRUE);
memset(&ev, 0, sizeof(ev));
ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
ev.data.ptr = (void*)self;
if( epoll_ctl(dp->__epoll_fd, EPOLL_CTL_ADD, fd, &ev)!=0 ) {
close(fd);
return FALSE;
}
self->__dp = dp;
self->fd = fd;
self->status = KS_NET_CHANNEL_ST_CONNECTING;
ks_net_channel_on_notify((KSEpollNotifer*)self, EPOLLOUT);
return self->fd != -1;
}
BOOL ks_net_channel_connect_async(KSNetChannel* self, KSNetEventDispatcher* dp, const struct sockaddr* addr) {
int fd;
struct epoll_event ev;
fd = socket(AF_INET, SOCK_STREAM, 0);
if( fd==-1 )
return FALSE;
if( ks_sock_set_blocking(fd, 0)!=0 ) {
close(fd);
return FALSE;
}
// defualt nodelay
ks_sock_set_tcp_nodelay(fd, TRUE);
memset(&ev, 0, sizeof(ev));
ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLHUP | EPOLLERR | EPOLLRDHUP;
ev.data.ptr = (void*)self;
if( epoll_ctl(dp->__epoll_fd, EPOLL_CTL_ADD, fd, &ev)!=0 ) {
close(fd);
return FALSE;
}
if( connect(fd, addr, sizeof(struct sockaddr))!=0 ) {
if( errno != EINPROGRESS ) {
ks_sock_close(fd);
return FALSE;
}
}
self->__dp = dp;
self->fd = fd;
self->status = KS_NET_CHANNEL_ST_CONNECTING;
return TRUE;
}
BOOL ks_net_channel_connect_finish(KSNetChannel* self, KSNetEventDispatcher* dp, KSSockFD fd) {
struct epoll_event ev;
if( ks_sock_set_blocking(fd, 0)!=0 )
return FALSE;
// defualt nodelay
ks_sock_set_tcp_nodelay(fd, TRUE);
memset(&ev, 0, sizeof(ev));
ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP; // | EPOLLERR | EPOLLHUP;
ev.data.ptr = (void*)self;
if( epoll_ctl(dp->__epoll_fd, EPOLL_CTL_ADD, fd, &ev)!=0 )
return FALSE;
self->__dp = dp;
self->fd = fd;
self->status = KS_NET_CHANNEL_ST_CONNECTING;
ks_net_channel_on_notify((KSEpollNotifer*)self, EPOLLOUT);
return TRUE;
}
BOOL ks_net_channel_write_all_full(KSNetChannel* self, const void* buf, size_t len, const void* header_buf, size_t header_len) {
int sent;
size_t all_len = len + header_len;
if( all_len > self->slen )
return FALSE;
if( !ks_net_channel_is_working(self) )
return FALSE;
if( ks_net_channel_sending_busy(self) ) {
size_t remain = ks_net_channel_sbuf_remain(self);
if( all_len > remain )
return FALSE;
goto copy_to_sbuf;
}
while( header_len ) {
sent = send(self->fd, header_buf, header_len, 0);
if( sent < 0 ) {
if( errno==EAGAIN ) {
sent = 0;
} else if( errno==EINTR ) {
continue;
} else {
ks_net_channel_close_full(self, KS_NET_CHANNEL_ST_SEND_IO_FAILED, errno);
return FALSE;
}
} else if( sent==0 ) {
ks_net_channel_close(self, KS_NET_CHANNEL_ST_PEER_SHUTDOWN);
return FALSE;
}
if( (size_t)sent < header_len ) {
header_buf = ((const char*)header_buf) + sent;
header_len -= (size_t)sent;
goto copy_to_sbuf;
}
break;
}
while( len ) {
sent = send(self->fd, buf, len, 0);
if( sent < 0 ) {
if( errno==EAGAIN ) {
sent = 0;
} else if( errno==EINTR ) {
continue;
} else {
ks_net_channel_close_full(self, KS_NET_CHANNEL_ST_SEND_IO_FAILED, errno);
return FALSE;
}
sent = 0;
} else if( sent==0 ) {
ks_net_channel_close(self, KS_NET_CHANNEL_ST_PEER_SHUTDOWN);
return FALSE;
}
if( (size_t)sent < len ) {
buf = ((const char*)buf) + sent;
len -= (size_t)sent;
goto copy_buf_to_sbuf;
}
return TRUE;
}
copy_to_sbuf:
memcpy(self->spos, header_buf, header_len);
self->spos += header_len;
copy_buf_to_sbuf:
memcpy(self->spos, buf, len);
self->spos += len;
return TRUE;
}
size_t ks_net_channel_write(KSNetChannel* self, const void* buf, size_t len) {
size_t remain;
int sent = 0;
if( !ks_net_channel_is_working(self) )
return 0;
if( ks_net_channel_sending_busy(self) ) {
sent = 0;
} else {
while( len ) {
sent = send(self->fd, buf, len, 0);
if( sent < 0 ) {
if( errno==EAGAIN ) {
sent = 0;
} else if( errno==EINTR ) {
continue;
} else {
ks_net_channel_close_full(self, KS_NET_CHANNEL_ST_SEND_IO_FAILED, errno);
return 0;
}
sent = 0;
} else if( sent==0 ) {
ks_net_channel_close(self, KS_NET_CHANNEL_ST_PEER_SHUTDOWN);
return 0;
}
break;
}
}
if( (size_t)sent < len ) {
buf = ((const char*)buf) + sent;
len -= (size_t)sent;
remain = ks_net_channel_sbuf_remain(self);
if( len > remain )
len = remain;
memcpy(self->spos, buf, len);
self->spos += len;
len += (size_t)sent;
} else {
len = (size_t)sent;
}
return len;
}
void ks_net_channel_flush(KSNetChannel* self) {
if( ks_net_channel_is_working(self) ) {
(*(self->sink->cb_flush))(self);
}
}
void ks_net_channel_close_full(KSNetChannel* self, int status, int err) {
if( status==KS_NET_CHANNEL_ST_WAIT_PEER_CLOSE ) {
if( self->status==KS_NET_CHANNEL_ST_CONNECTED ) {
// TODO : now linux version net test WAIT_PEER_CLOSE function!!!
// shutdown(self->fd, SHUT_RD);
self->status = status;
return;
} else if( self->status==KS_NET_CHANNEL_ST_WAIT_PEER_CLOSE ) {
return;
}
status = KS_NET_CHANNEL_ST_CLOSED;
}
assert( status > 0 );
if( ks_net_channel_is_working(self) ) {
self->status = status;
self->err = err;
ks_sock_close( self->fd );
self->fd = -1;
if( !((KSEpollNotifer*)self)->async_event_next ) {
((KSEpollNotifer*)self)->async_event_next = self->__dp->__async_events;
self->__dp->__async_events = (KSEpollNotifer*)self;
}
}
}
static void net_signal_on_notify(KSEpollNotifer* notifier, uint32_t events) {
KSNetSingal* self = (KSNetSingal*)notifier;
char buf[1024];
int readed;
if( events & (EPOLLHUP | EPOLLERR) ) {
perror("why? i can't believe it\n");
return;
}
assert( events & EPOLLIN );
for(;;) {
readed = recv(self->__notify_fds[1], buf, 1024, 0);
if( readed < 0 ) {
if( errno==EAGAIN ) {
break;
} else if( errno==EINTR ) {
continue;
} else {
perror("why? ignore errno\n");
break;
}
} else if( readed==0 ) {
perror("why? ignore peer shutdown\n");
break;
}
}
(*(self->notify_fun))(self->notify_tag);
}
BOOL ks_net_signal_init(KSNetSingal* self, KSNetEventDispatcher* dp, KSNetSingalNotify notify_fun, void* notify_tag) {
memset(self, 0, sizeof(KSNetSingal));
self->__notify_fds[0] = -1;
self->__notify_fds[1] = -1;
if( socketpair(AF_UNIX, SOCK_STREAM, 0, self->__notify_fds)==0 ) {
struct epoll_event ev;
memset(&ev, 0, sizeof(ev));
ev.events = EPOLLIN | EPOLLET;
ev.data.ptr = (void*)self;
if( epoll_ctl(dp->__epoll_fd, EPOLL_CTL_ADD, self->__notify_fds[1], &ev)==0 ) {
ks_sock_set_blocking(self->__notify_fds[0], 0);
ks_sock_set_blocking(self->__notify_fds[1], 0);
ks_sock_set_tcp_nodelay(self->__notify_fds[0], TRUE);
ks_sock_set_tcp_nodelay(self->__notify_fds[1], TRUE);
((KSEpollNotifer*)self)->cb_notify = net_signal_on_notify;
self->dp = dp;
self->notify_fun = notify_fun;
self->notify_tag = notify_tag;
return TRUE;
} else {
ks_sock_close(self->__notify_fds[0]);
ks_sock_close(self->__notify_fds[1]);
}
}
return FALSE;
}
void ks_net_signal_uninit(KSNetSingal* self) {
if( self->__notify_fds[0] != -1 ) {
ks_sock_close(self->__notify_fds[0]);
self->__notify_fds[0] = -1;
}
if( self->__notify_fds[1] != -1 ) {
ks_sock_close(self->__notify_fds[1]);
self->__notify_fds[1] = -1;
}
}
void ks_net_signal(KSNetSingal* self) {
if( send(self->__notify_fds[0], "!", 1, 0)<=0 )
perror("warning : post singal error!\n");
}
#endif//_WIN32
const char* ks_net_channel_fetch_status_desc(int status) {
switch( status ) {
case KS_NET_CHANNEL_ST_CLOSED: return "normal closed";
case KS_NET_CHANNEL_ST_CONNECT_FAILED: return "connect failed";
case KS_NET_CHANNEL_ST_CONNECT_FAILED_TIMEOUT: return "connect timeout";
case KS_NET_CHANNEL_ST_PEER_SHUTDOWN: return "peer shutdown";
case KS_NET_CHANNEL_ST_SEND_IO_FAILED: return "send() return -1";
case KS_NET_CHANNEL_ST_RECV_IO_FAILED: return "recv() return -1";
case KS_NET_CHANNEL_ST_IO_ERROR: return "io error";
case KS_NET_CHANNEL_ST_SPEED_LIMIT: return "speed limit";
case KS_NET_CHANNEL_ST_BAD_PACKET_SIZE: return "parse size error";
case KS_NET_CHANNEL_ST_ERROR_REVERSED_0:
case KS_NET_CHANNEL_ST_ERROR_REVERSED_1:
case KS_NET_CHANNEL_ST_ERROR_REVERSED_2:
case KS_NET_CHANNEL_ST_ERROR_REVERSED_3:
case KS_NET_CHANNEL_ST_ERROR_REVERSED_4:
case KS_NET_CHANNEL_ST_ERROR_REVERSED_5: return "reversed error";
case KS_NET_CHANNEL_ST_ERROR: return "logic error";
default: break;
}
return "user defined";
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。