1 Star 1 Fork 0

yejinlei-mirror/janus-gateway

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
sctp.c 49.27 KB
一键复制 编辑 原始数据 按行查看 历史
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436
/*! \file sctp.c
* \author Lorenzo Miniero <lorenzo@meetecho.com>
* \copyright GNU General Public License v3
* \brief SCTP processing for data channels
* \details Implementation (based on libusrsctp) of the SCTP Data Channels.
* The code takes care of the SCTP association between peers and the server,
* and allows for sending and receiving text messages (binary stuff yet to
* be implemented) after that.
*
* \note Right now, the code is heavily based on the rtcweb.c sample code
* provided in the \c usrsctp library code, and as such the copyright notice
* that appears at the beginning of that code is ideally present here as
* well: http://code.google.com/p/sctp-refimpl/source/browse/trunk/KERN/usrsctp/programs/rtcweb.c
*
* \note If you want/need to debug SCTP messages for any reason, you can
* do so by uncommenting the definition of \c DEBUG_SCTP in sctp.h. This
* will force this code to save all the SCTP messages being exchanged to
* a separate file for each session. You must choose what folder to save
* these files in by modifying the \c debug_folder variable. Once a file
* has been saved, you need to process it using the \c text2pcap tool
* that is usually shipped with Wireshark, e.g.:
*
\verbatim
cd /path/to/sctp
/usr/sbin/text2pcap -n -l 248 -D -t '%H:%M:%S.' sctp-debug-XYZ.txt sctp-debug-XYZ.pcapng
/usr/sbin/wireshark sctp-debug-XYZ.pcapng
\endverbatim
*
* \ingroup protocols
* \ref protocols
*/
#ifdef HAVE_SCTP
#include "sctp.h"
#include "dtls.h"
#include "janus.h"
#include "ice.h"
#include "debug.h"
#ifdef DEBUG_SCTP
/* If we're debugging the SCTP messaging, save the files here (edit path) */
const char *debug_folder = "/path/to/sctp";
#endif
static const char *default_label = "JanusDataChannel";
#define SCTP_MAX_PACKET_SIZE (1<<16)
/* Events we're interested in */
static uint16_t event_types[] = {
SCTP_ASSOC_CHANGE,
SCTP_PEER_ADDR_CHANGE,
SCTP_REMOTE_ERROR,
SCTP_SHUTDOWN_EVENT,
SCTP_ADAPTATION_INDICATION,
SCTP_SEND_FAILED_EVENT,
SCTP_SENDER_DRY_EVENT,
SCTP_STREAM_RESET_EVENT,
SCTP_STREAM_CHANGE_EVENT
};
/* Buffered message (in case we can't send right away) */
typedef struct janus_sctp_pending_message {
uint16_t id;
gboolean textdata;
char *buf;
size_t len;
} janus_sctp_pending_message;
static janus_sctp_pending_message *janus_sctp_pending_message_create(uint16_t id, gboolean textdata, char *buf, size_t len) {
janus_sctp_pending_message *m = g_malloc(sizeof(janus_sctp_pending_message));
m->id = id;
m->textdata = textdata;
if(buf != NULL && len > 0) {
m->buf = g_malloc(len);
memcpy(m->buf, buf, len);
m->len = len;
} else {
m->buf = NULL;
m->len = 0;
}
return m;
}
static void janus_sctp_pending_message_free(janus_sctp_pending_message *m) {
if(m != NULL) {
g_free(m->buf);
g_free(m);
}
}
/* usrsctp callbacks and methods */
int janus_sctp_data_to_dtls(void *instance, void *buffer, size_t length, uint8_t tos, uint8_t set_df);
static int janus_sctp_incoming_data(struct socket *sock, union sctp_sockstore addr, void *data, size_t datalen, struct sctp_rcvinfo rcv, int flags, void *ulp_info);
janus_sctp_channel *janus_sctp_find_channel_by_stream(janus_sctp_association *sctp, uint16_t stream);
janus_sctp_channel *janus_sctp_find_free_channel(janus_sctp_association *sctp);
uint16_t janus_sctp_find_free_stream(janus_sctp_association *sctp);
void janus_sctp_request_more_streams(janus_sctp_association *sctp);
int janus_sctp_send_open_request_message(struct socket *sock, uint16_t stream, char *label, char *protocol, uint8_t unordered, uint16_t pr_policy, uint32_t pr_value);
int janus_sctp_send_open_response_message(struct socket *sock, uint16_t stream);
int janus_sctp_send_open_ack_message(struct socket *sock, uint16_t stream);
void janus_sctp_send_deferred_messages(janus_sctp_association *sctp);
int janus_sctp_open_channel(janus_sctp_association *sctp, char *label, char *protocol, uint8_t unordered, uint16_t pr_policy, uint32_t pr_value);
int janus_sctp_send_text_or_binary(janus_sctp_association *sctp, uint16_t id, gboolean textdata, char *text, size_t length);
void janus_sctp_reset_outgoing_stream(janus_sctp_association *sctp, uint16_t stream);
void janus_sctp_send_outgoing_stream_reset(janus_sctp_association *sctp);
int janus_sctp_close_channel(janus_sctp_association *sctp, uint16_t id);
void janus_sctp_data_ready(janus_sctp_association *sctp);
void janus_sctp_handle_open_request_message(janus_sctp_association *sctp, janus_datachannel_open_request *req, size_t length, uint16_t stream);
void janus_sctp_handle_open_response_message(janus_sctp_association *sctp, janus_datachannel_open_response *rsp, size_t length, uint16_t stream);
void janus_sctp_handle_open_ack_message(janus_sctp_association *sctp, janus_datachannel_ack *ack, size_t length, uint16_t stream);
void janus_sctp_handle_unknown_message(char *msg, size_t length, uint16_t stream);
void janus_sctp_handle_data_message(janus_sctp_association *sctp, gboolean textdata, char *buffer, size_t length, uint16_t stream);
void janus_sctp_handle_message(janus_sctp_association *sctp, char *buffer, size_t length, uint32_t ppid, uint16_t stream, int flags);
void janus_sctp_handle_association_change_event(struct sctp_assoc_change *sac);
void janus_sctp_handle_peer_address_change_event(struct sctp_paddr_change *spc);
void janus_sctp_handle_adaptation_indication(struct sctp_adaptation_event *sai);
void janus_sctp_handle_shutdown_event(struct sctp_shutdown_event *sse);
void janus_sctp_handle_stream_reset_event(janus_sctp_association *sctp, struct sctp_stream_reset_event *strrst);
void janus_sctp_handle_remote_error_event(struct sctp_remote_error *sre);
void janus_sctp_handle_send_failed_event(struct sctp_send_failed_event *ssfe);
void janus_sctp_handle_notification(janus_sctp_association *sctp, union sctp_notification *notif, size_t n);
/* We need to keep a map of associations with random IDs, as usrsctp will
* use the pointer to our structures in the actual messages instead */
static janus_mutex sctp_mutex;
static GHashTable *sctp_ids = NULL;
static void janus_sctp_association_unref(janus_sctp_association *sctp);
/* SCTP management code */
static gboolean sctp_running;
int janus_sctp_init(void) {
/* Initialize the SCTP stack */
usrsctp_init(0, janus_sctp_data_to_dtls, NULL);
sctp_running = TRUE;
#ifdef DEBUG_SCTP
JANUS_LOG(LOG_WARN, "SCTP debugging to files enabled: going to save them in %s\n", debug_folder);
if(janus_mkdir(debug_folder, 0755) < 0) {
JANUS_LOG(LOG_ERR, "Error creating folder %s, expect problems...\n", debug_folder);
}
#endif
/* Create a map of local IDs too, to map them to our SCTP associations */
janus_mutex_init(&sctp_mutex);
sctp_ids = g_hash_table_new_full(NULL, NULL, NULL, (GDestroyNotify)janus_sctp_association_unref);
return 0;
}
void janus_sctp_deinit(void) {
usrsctp_finish();
sctp_running = FALSE;
janus_mutex_lock(&sctp_mutex);
g_hash_table_unref(sctp_ids);
janus_mutex_unlock(&sctp_mutex);
}
static void janus_sctp_association_unref(janus_sctp_association *sctp) {
if(sctp)
janus_refcount_decrease(&sctp->ref);
}
static void janus_sctp_association_free(const janus_refcount *sctp_ref) {
janus_sctp_association *sctp = janus_refcount_containerof(sctp_ref, janus_sctp_association, ref);
/* This association can be destroyed, free all the resources */
janus_refcount_decrease(&sctp->handle->ref);
janus_refcount_decrease(&sctp->dtls->ref);
if(sctp->pending_messages != NULL)
g_queue_free_full(sctp->pending_messages, (GDestroyNotify)janus_sctp_pending_message_free);
#ifdef DEBUG_SCTP
if(sctp->debug_dump != NULL)
fclose(sctp->debug_dump);
sctp->debug_dump = NULL;
#endif
g_free(sctp->buffer);
g_free(sctp);
sctp = NULL;
}
janus_sctp_association *janus_sctp_association_create(janus_dtls_srtp *dtls, janus_ice_handle *handle, uint16_t udp_port) {
if(dtls == NULL || handle == NULL || udp_port == 0)
return NULL;
/* usrsctp provides UDP encapsulation of SCTP, but we need these messages to
* be encapsulated in DTLS and actually sent/received by libnice, and not by
* usrsctp itself... as such, we make use of the AF_CONN approach */
janus_sctp_association *sctp = g_malloc0(sizeof(janus_sctp_association));
janus_refcount_init(&sctp->ref, janus_sctp_association_free);
g_atomic_int_set(&sctp->destroyed, 0);
sctp->dtls = dtls;
janus_refcount_increase(&dtls->ref);
sctp->handle = handle;
janus_refcount_increase(&handle->ref);
sctp->handle_id = handle->handle_id;
sctp->local_port = 5000; /* FIXME We always use this one */
sctp->remote_port = udp_port;
sctp->buffer = NULL;
sctp->buflen = 0;
sctp->offset = 0;
sctp->pending_messages = NULL;
#ifdef DEBUG_SCTP
sctp->debug_dump = NULL;
#endif
struct socket *sock = NULL;
unsigned int i = 0;
struct sockaddr_conn sconn = { 0 };
/* Now go on with SCTP */
janus_sctp_channel *channel = NULL;
for(i = 0; i < NUMBER_OF_CHANNELS; i++) {
channel = &(sctp->channels[i]);
channel->id = i;
channel->label[0] = '\0';
channel->state = DATA_CHANNEL_CLOSED;
channel->pr_policy = SCTP_PR_SCTP_NONE;
channel->pr_value = 0;
channel->stream = 0;
channel->unordered = 0;
channel->flags = 0;
}
for(i = 0; i < NUMBER_OF_STREAMS; i++) {
sctp->stream_channel[i] = NULL;
sctp->stream_buffer[i] = 0;
}
sctp->stream_buffer_counter = 0;
/* Create a unique ID to map locally: this is what we'll pass to
* usrsctp_socket, which means that's what we'll get in callbacks
* too: we can then use the map to retrieve the actual struct */
janus_mutex_lock(&sctp_mutex);
while(sctp->map_id == 0) {
sctp->map_id = janus_random_uint32();
if(g_hash_table_lookup(sctp_ids, GUINT_TO_POINTER(sctp->map_id)) != NULL) {
/* ID already taken, try another one */
sctp->map_id = 0;
}
}
janus_refcount_increase(&sctp->ref);
g_hash_table_insert(sctp_ids, GUINT_TO_POINTER(sctp->map_id), sctp);
janus_mutex_unlock(&sctp_mutex);
usrsctp_register_address(GUINT_TO_POINTER(sctp->map_id));
usrsctp_sysctl_set_sctp_ecn_enable(0);
if((sock = usrsctp_socket(AF_CONN, SOCK_STREAM, IPPROTO_SCTP, janus_sctp_incoming_data, NULL, 0,
GUINT_TO_POINTER(sctp->map_id))) == NULL) {
JANUS_LOG(LOG_ERR, "[%"SCNu64"] Error creating usrsctp socket... (%d)\n", sctp->handle_id, errno);
janus_sctp_association_destroy(sctp);
return NULL;
}
/* Store the socket handle to make sure it is closed in any case if the association creation fails */
sctp->sock = sock;
/* Make the socket non-blocking. Connect, close, shutdown etc will not block
* the thread waiting for the socket operation to complete. */
if (usrsctp_set_non_blocking(sock, 1) < 0) {
JANUS_LOG(LOG_ERR, "[%"SCNu64"] Error setting socket to non-blocking... (%d)\n", sctp->handle_id, errno);
janus_sctp_association_destroy(sctp);
return NULL;
}
/* Set SO_LINGER */
struct linger linger_opt;
linger_opt.l_onoff = 1;
linger_opt.l_linger = 0;
if(usrsctp_setsockopt(sock, SOL_SOCKET, SO_LINGER, &linger_opt, sizeof(linger_opt))) {
JANUS_LOG(LOG_ERR, "[%"SCNu64"] setsockopt error: SO_LINGER (%d)\n", sctp->handle_id, errno);
janus_sctp_association_destroy(sctp);
return NULL;
}
/* Allow resetting streams */
struct sctp_assoc_value av;
av.assoc_id = SCTP_ALL_ASSOC;
av.assoc_value = SCTP_ENABLE_RESET_STREAM_REQ | SCTP_ENABLE_CHANGE_ASSOC_REQ;
if(usrsctp_setsockopt(sock, IPPROTO_SCTP, SCTP_ENABLE_STREAM_RESET, &av, sizeof(struct sctp_assoc_value)) < 0) {
JANUS_LOG(LOG_ERR, "[%"SCNu64"] setsockopt error: SCTP_ENABLE_STREAM_RESET (%d)\n", sctp->handle_id, errno);
janus_sctp_association_destroy(sctp);
return NULL;
}
/* Disable Nagle */
uint32_t nodelay = 1;
if(usrsctp_setsockopt(sock, IPPROTO_SCTP, SCTP_NODELAY, &nodelay, sizeof(nodelay))) {
JANUS_LOG(LOG_ERR, "[%"SCNu64"] setsockopt error: SCTP_NODELAY (%d)\n", sctp->handle_id, errno);
janus_sctp_association_destroy(sctp);
return NULL;
}
/* Enable the events of interest */
struct sctp_event event;
memset(&event, 0, sizeof(event));
event.se_assoc_id = SCTP_ALL_ASSOC;
event.se_on = 1;
for(i = 0; i < sizeof(event_types)/sizeof(uint16_t); i++) {
event.se_type = event_types[i];
if(usrsctp_setsockopt(sock, IPPROTO_SCTP, SCTP_EVENT, &event, sizeof(event)) < 0) {
JANUS_LOG(LOG_ERR, "[%"SCNu64"] setsockopt error: SCTP_EVENT (%d)\n", sctp->handle_id, errno);
janus_sctp_association_destroy(sctp);
return NULL;
}
}
/* Configure our INIT message */
struct sctp_initmsg initmsg;
memset(&initmsg, 0, sizeof(struct sctp_initmsg));
initmsg.sinit_num_ostreams = NUMBER_OF_STREAMS;
initmsg.sinit_max_instreams = NUMBER_OF_STREAMS;
if(usrsctp_setsockopt(sock, IPPROTO_SCTP, SCTP_INITMSG, &initmsg, sizeof(struct sctp_initmsg)) < 0) {
JANUS_LOG(LOG_ERR, "[%"SCNu64"] setsockopt error: SCTP_INITMSG (%d)\n", sctp->handle_id, errno);
janus_sctp_association_destroy(sctp);
return NULL;
}
/* Bind our side of the communication, using AF_CONN as we're doing the actual delivery ourselves */
memset(&sconn, 0, sizeof(struct sockaddr_conn));
sconn.sconn_family = AF_CONN;
sconn.sconn_port = htons(sctp->local_port);
sconn.sconn_addr = GUINT_TO_POINTER(sctp->map_id);
if(usrsctp_bind(sock, (struct sockaddr *)&sconn, sizeof(struct sockaddr_conn)) < 0) {
JANUS_LOG(LOG_ERR, "[%"SCNu64"] Error binding client on port %"SCNu16" (%d)\n", sctp->handle_id, sctp->local_port, errno);
janus_sctp_association_destroy(sctp);
return NULL;
}
#ifdef DEBUG_SCTP
char debug_file[1024];
g_snprintf(debug_file, 1024, "%s/sctp-debug-%"SCNu64".txt", debug_folder, sctp->handle_id);
sctp->debug_dump = fopen(debug_file, "wt");
#endif
/* Operating as client */
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Connecting the SCTP association\n", sctp->handle_id);
struct sockaddr_conn rconn = { 0 };
memset(&rconn, 0, sizeof(struct sockaddr_conn));
rconn.sconn_family = AF_CONN;
rconn.sconn_port = htons(sctp->remote_port);
rconn.sconn_addr = GUINT_TO_POINTER(sctp->map_id);
#ifdef HAVE_SCONN_LEN
rconn.sconn_len = sizeof(struct sockaddr_conn);
#endif
int res = usrsctp_connect(sock, (struct sockaddr *)&rconn, sizeof(struct sockaddr_conn));
if(res < 0 && errno != EINPROGRESS) {
JANUS_LOG(LOG_ERR, "[%"SCNu64"] Error connecting to SCTP server at port %"SCNu16" (%d)\n", sctp->handle_id, sctp->remote_port, errno);
janus_sctp_association_destroy(sctp);
return NULL;
}
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Connected to the DataChannel peer\n", sctp->handle_id);
return sctp;
}
void janus_sctp_association_destroy(janus_sctp_association *sctp) {
if(sctp == NULL || !g_atomic_int_compare_and_exchange(&sctp->destroyed, 0, 1))
return;
if(sctp->map_id != 0) {
usrsctp_deregister_address(GUINT_TO_POINTER(sctp->map_id));
janus_mutex_lock(&sctp_mutex);
g_hash_table_remove(sctp_ids, GUINT_TO_POINTER(sctp->map_id));
janus_mutex_unlock(&sctp_mutex);
}
if(sctp->sock != NULL) {
usrsctp_shutdown(sctp->sock, SHUT_RDWR);
usrsctp_close(sctp->sock);
}
janus_refcount_decrease(&sctp->ref);
}
void janus_sctp_data_from_dtls(janus_sctp_association *sctp, char *buf, int len) {
if(sctp == NULL || sctp->handle == NULL || buf == NULL || len <= 0)
return;
JANUS_LOG(LOG_HUGE, "[%"SCNu64"] Data from DTLS to SCTP stack: %d bytes\n", sctp->handle_id, len);
#ifdef DEBUG_SCTP
if(sctp->debug_dump != NULL) {
/* Dump incoming message */
char *dump = usrsctp_dumppacket(buf, len, SCTP_DUMP_INBOUND);
if(dump != NULL) {
fwrite(dump, sizeof(char), strlen(dump), sctp->debug_dump);
fflush(sctp->debug_dump);
usrsctp_freedumpbuffer(dump);
}
}
#endif
usrsctp_conninput(GUINT_TO_POINTER(sctp->map_id), buf, len, 0);
}
int janus_sctp_data_to_dtls(void *instance, void *buffer, size_t length, uint8_t tos, uint8_t set_df) {
janus_mutex_lock(&sctp_mutex);
janus_sctp_association *sctp = (janus_sctp_association *)g_hash_table_lookup(sctp_ids, instance);
janus_mutex_unlock(&sctp_mutex);
if(sctp == NULL || sctp->handle == NULL)
return -1;
JANUS_LOG(LOG_HUGE, "[%"SCNu64"] Data from SCTP to DTLS stack: %zu bytes\n", sctp->handle_id, length);
#ifdef DEBUG_SCTP
if(sctp->debug_dump != NULL) {
/* Dump outgoing message */
char *dump = usrsctp_dumppacket(buffer, length, SCTP_DUMP_OUTBOUND);
if(dump != NULL) {
fwrite(dump, sizeof(char), strlen(dump), sctp->debug_dump);
fflush(sctp->debug_dump);
usrsctp_freedumpbuffer(dump);
}
}
#endif
janus_ice_relay_sctp(sctp->handle, buffer, length);
return 0;
}
static int janus_sctp_incoming_data(struct socket *sock, union sctp_sockstore addr, void *data, size_t datalen, struct sctp_rcvinfo rcv, int flags, void *ulp_info) {
janus_mutex_lock(&sctp_mutex);
janus_sctp_association *sctp = (janus_sctp_association *)g_hash_table_lookup(sctp_ids, ulp_info);
janus_mutex_unlock(&sctp_mutex);
if(sctp == NULL || sctp->dtls == NULL) {
free(data);
return 0;
}
if(data) {
if(flags & MSG_NOTIFICATION) {
janus_sctp_handle_notification(sctp, (union sctp_notification *)data, datalen);
} else {
janus_sctp_handle_message(sctp, data, datalen, ntohl(rcv.rcv_ppid), rcv.rcv_sid, flags);
}
free(data);
}
return 1;
}
void janus_sctp_send_data(janus_sctp_association *sctp, char *label, char *protocol, gboolean textdata, char *buf, int len) {
if(sctp == NULL)
return;
if(buf == NULL || len <= 0)
return;
if(label == NULL)
label = (char *)default_label;
JANUS_LOG(LOG_VERB, "[%"SCNu64"] SCTP data to send (label=%s, %d bytes) coming from a plugin.\n",
sctp->handle_id, label, len);
JANUS_LOG(LOG_HUGE, "[%"SCNu64"] Outgoing SCTP contents: %.*s\n",
sctp->handle_id, len, buf);
/* FIXME Is there any open channel we can use? */
int i = 0, found = 0;
for(i = 0; i < NUMBER_OF_CHANNELS; i++) {
if(sctp->channels[i].state != DATA_CHANNEL_CLOSED && !strcmp(sctp->channels[i].label, label)) {
found = 1;
JANUS_LOG(LOG_VERB, "[%"SCNu64"] -- Using open channel %i\n", sctp->handle_id, i);
break;
}
}
if(!found) {
/* There's no open channel, try opening one now */
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Creating channel '%s'...\n", sctp->handle_id, label);
if(janus_sctp_open_channel(sctp, label, protocol, 0, 0, 0) < 0) {
JANUS_LOG(LOG_ERR, "[%"SCNu64"] Couldn't open channel...\n", sctp->handle_id);
return;
}
for(i = 0; i < NUMBER_OF_CHANNELS; i++) {
if(sctp->channels[i].state != DATA_CHANNEL_CLOSED && !strcmp(sctp->channels[i].label, label)) {
found = 1;
JANUS_LOG(LOG_VERB, "[%"SCNu64"] -- Using open channel %i\n", sctp->handle_id, i);
break;
}
}
if(!found) {
JANUS_LOG(LOG_ERR, "[%"SCNu64"] Channel opened but not found?? giving up...\n", sctp->handle_id);
return;
}
}
/* Send the data, whether it's text or binary */
if(sctp->pending_messages != NULL && !g_queue_is_empty(sctp->pending_messages)) {
/* We couldn't send all pending messages, queue the new one as well */
if(buf != NULL && len > 0) {
JANUS_LOG(LOG_WARN, "[%"SCNu64"] Couldn't send all pending messages, queueing new message\n",
sctp->handle_id);
janus_sctp_pending_message *m = janus_sctp_pending_message_create(i, textdata, buf, len);
if(sctp->pending_messages == NULL)
sctp->pending_messages = g_queue_new();
g_queue_push_tail(sctp->pending_messages, m);
}
return;
}
int res = janus_sctp_send_text_or_binary(sctp, i, textdata, buf, len);
if(res == -2) {
/* Delivery failed with an EAGAIN, queue and retry later */
JANUS_LOG(LOG_WARN, "[%"SCNu64"] Got EAGAIN when trying to send message on channel %"SCNu16", retrying later\n",
sctp->handle_id, i);
janus_sctp_pending_message *m = janus_sctp_pending_message_create(i, textdata, buf, len);
if(sctp->pending_messages == NULL)
sctp->pending_messages = g_queue_new();
g_queue_push_tail(sctp->pending_messages, m);
}
}
/* From now on, it's SCTP stuff */
janus_sctp_channel *janus_sctp_find_channel_by_stream(janus_sctp_association *sctp, uint16_t stream) {
if(sctp == NULL)
return NULL;
if(stream < NUMBER_OF_STREAMS) {
return (sctp->stream_channel[stream]);
} else {
return NULL;
}
}
janus_sctp_channel *janus_sctp_find_free_channel(janus_sctp_association *sctp) {
uint32_t i;
for(i = 0; i < NUMBER_OF_CHANNELS; i++) {
if(sctp->channels[i].state == DATA_CHANNEL_CLOSED) {
break;
}
}
if(i == NUMBER_OF_CHANNELS) {
return NULL;
} else {
return (&(sctp->channels[i]));
}
}
uint16_t janus_sctp_find_free_stream(janus_sctp_association *sctp) {
struct sctp_status status;
uint32_t i, limit;
socklen_t len;
len = (socklen_t)sizeof(struct sctp_status);
if(usrsctp_getsockopt(sctp->sock, IPPROTO_SCTP, SCTP_STATUS, &status, &len) < 0) {
JANUS_LOG(LOG_ERR, "[%"SCNu64"] getsockopt error: SCTP_STATUS\n", sctp->handle_id);
return 0;
}
if(status.sstat_outstrms < NUMBER_OF_STREAMS) {
limit = status.sstat_outstrms;
} else {
limit = NUMBER_OF_STREAMS;
}
/* stream id 0 is reserved */
for(i = 1; i < limit; i++) {
if(sctp->stream_channel[i] == NULL) {
break;
}
}
if(i == limit) {
return 0;
} else {
return ((uint16_t)i);
}
}
void janus_sctp_request_more_streams(janus_sctp_association *sctp) {
struct sctp_status status;
struct sctp_add_streams sas;
uint32_t i, streams_needed;
socklen_t len;
streams_needed = 0;
for(i = 0; i < NUMBER_OF_CHANNELS; i++) {
if((sctp->channels[i].state == DATA_CHANNEL_CONNECTING) &&
(sctp->channels[i].stream == 0)) {
streams_needed++;
}
}
len = (socklen_t)sizeof(struct sctp_status);
if(usrsctp_getsockopt(sctp->sock, IPPROTO_SCTP, SCTP_STATUS, &status, &len) < 0) {
JANUS_LOG(LOG_ERR, "[%"SCNu64"] getsockopt error: SCTP_STATUS\n", sctp->handle_id);
return;
}
if(status.sstat_outstrms + streams_needed > NUMBER_OF_STREAMS) {
streams_needed = NUMBER_OF_STREAMS - status.sstat_outstrms;
}
if(streams_needed == 0) {
return;
}
memset(&sas, 0, sizeof(struct sctp_add_streams));
sas.sas_instrms = 0;
sas.sas_outstrms = (uint16_t)streams_needed; /* XXX eror handling */
if(usrsctp_setsockopt(sctp->sock, IPPROTO_SCTP, SCTP_ADD_STREAMS, &sas, (socklen_t)sizeof(struct sctp_add_streams)) < 0) {
JANUS_LOG(LOG_ERR, "[%"SCNu64"] setsockopt error: SCTP_ADD_STREAMS\n", sctp->handle_id);
}
return;
}
int janus_sctp_send_open_request_message(struct socket *sock, uint16_t stream, char *label, char *protocol, uint8_t unordered, uint16_t pr_policy, uint32_t pr_value) {
/* XXX: This should be encoded in a better way */
janus_datachannel_open_request *req = NULL;
struct sctp_sndinfo sndinfo;
/* Use the default label, if none was provided */
if(label == NULL)
label = (char *)default_label;
size_t label_size = strlen(label);
size_t protocol_size = protocol ? strlen(protocol) : 0;
JANUS_LOG(LOG_VERB, "Opening channel with label '%s' (%zu, protocol %s)\n",
label, label_size, (protocol ? protocol : "unknown"));
req = g_malloc0(sizeof(janus_datachannel_open_request) + label_size + protocol_size);
req->msg_type = DATA_CHANNEL_OPEN_REQUEST;
switch (pr_policy) {
case SCTP_PR_SCTP_NONE:
/* XXX: What about DATA_CHANNEL_RELIABLE_STREAM */
req->channel_type = DATA_CHANNEL_RELIABLE;
break;
case SCTP_PR_SCTP_TTL:
/* XXX: What about DATA_CHANNEL_UNRELIABLE */
req->channel_type = DATA_CHANNEL_PARTIAL_RELIABLE_TIMED;
break;
case SCTP_PR_SCTP_RTX:
req->channel_type = DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT;
break;
default:
return 0;
}
req->priority = htons(0); /* XXX: add support */
req->reliability_params = htonl((uint32_t)pr_value);
req->label_length = htons(label_size);
req->protocol_length = htons(protocol_size);
memcpy(req->label, label, label_size);
if(protocol != NULL)
memcpy(req->label + label_size, protocol, protocol_size);
memset(&sndinfo, 0, sizeof(struct sctp_sndinfo));
sndinfo.snd_sid = stream;
sndinfo.snd_flags = SCTP_EOR;
sndinfo.snd_ppid = htonl(DATA_CHANNEL_PPID_CONTROL);
if(usrsctp_sendv(sock,
req, sizeof(janus_datachannel_open_request) + label_size + protocol_size,
NULL, 0,
&sndinfo, (socklen_t)sizeof(struct sctp_sndinfo),
SCTP_SENDV_SNDINFO, 0) < 0) {
JANUS_LOG(LOG_ERR, "usrsctp_sendv error (%d)\n", errno);
g_free(req);
req = NULL;
return 0;
} else {
g_free(req);
req = NULL;
return 1;
}
}
int janus_sctp_send_open_response_message(struct socket *sock, uint16_t stream) {
/* XXX: This should be encoded in a better way */
janus_datachannel_open_response rsp;
struct sctp_sndinfo sndinfo;
memset(&rsp, 0, sizeof(janus_datachannel_open_response));
rsp.msg_type = DATA_CHANNEL_OPEN_RESPONSE;
rsp.error = 0;
rsp.flags = htons(0);
rsp.reverse_stream = htons(stream);
memset(&sndinfo, 0, sizeof(struct sctp_sndinfo));
sndinfo.snd_sid = stream;
sndinfo.snd_flags = SCTP_EOR;
sndinfo.snd_ppid = htonl(DATA_CHANNEL_PPID_CONTROL);
if(usrsctp_sendv(sock,
&rsp, sizeof(janus_datachannel_open_response),
NULL, 0,
&sndinfo, (socklen_t)sizeof(struct sctp_sndinfo),
SCTP_SENDV_SNDINFO, 0) < 0) {
JANUS_LOG(LOG_ERR, "usrsctp_sendv error (%d)\n", errno);
return 0;
} else {
return 1;
}
}
int janus_sctp_send_open_ack_message(struct socket *sock, uint16_t stream) {
/* XXX: This should be encoded in a better way */
janus_datachannel_ack ack;
struct sctp_sndinfo sndinfo;
memset(&ack, 0, sizeof(janus_datachannel_ack));
ack.msg_type = DATA_CHANNEL_ACK;
memset(&sndinfo, 0, sizeof(struct sctp_sndinfo));
sndinfo.snd_sid = stream;
sndinfo.snd_flags = SCTP_EOR;
sndinfo.snd_ppid = htonl(DATA_CHANNEL_PPID_CONTROL);
if(usrsctp_sendv(sock,
&ack, sizeof(janus_datachannel_ack),
NULL, 0,
&sndinfo, (socklen_t)sizeof(struct sctp_sndinfo),
SCTP_SENDV_SNDINFO, 0) < 0) {
JANUS_LOG(LOG_ERR, "usrsctp_sendv error (%d)\n", errno);
return 0;
} else {
return 1;
}
}
void janus_sctp_send_deferred_messages(janus_sctp_association *sctp) {
uint32_t i;
janus_sctp_channel *channel;
for(i = 0; i < NUMBER_OF_CHANNELS; i++) {
channel = &(sctp->channels[i]);
if(channel->flags & DATA_CHANNEL_FLAGS_SEND_REQ) {
if(janus_sctp_send_open_request_message(sctp->sock, channel->stream,
channel->label, channel->protocol, channel->unordered, channel->pr_policy, channel->pr_value)) {
channel->flags &= ~DATA_CHANNEL_FLAGS_SEND_REQ;
} else {
if(errno != EAGAIN) {
/* XXX: error handling */
}
}
}
if(channel->flags & DATA_CHANNEL_FLAGS_SEND_RSP) {
if(janus_sctp_send_open_response_message(sctp->sock, channel->stream)) {
channel->flags &= ~DATA_CHANNEL_FLAGS_SEND_RSP;
} else {
if(errno != EAGAIN) {
/* XXX: error handling */
}
}
}
if(channel->flags & DATA_CHANNEL_FLAGS_SEND_ACK) {
if(janus_sctp_send_open_ack_message(sctp->sock, channel->stream)) {
channel->flags &= ~DATA_CHANNEL_FLAGS_SEND_ACK;
} else {
if(errno != EAGAIN) {
/* XXX: error handling */
}
}
}
}
return;
}
int janus_sctp_open_channel(janus_sctp_association *sctp, char *label, char *protocol, uint8_t unordered, uint16_t pr_policy, uint32_t pr_value) {
if(sctp == NULL)
return -1;
janus_sctp_channel *channel;
uint16_t stream;
if((pr_policy != SCTP_PR_SCTP_NONE) &&
(pr_policy != SCTP_PR_SCTP_TTL) &&
(pr_policy != SCTP_PR_SCTP_RTX)) {
JANUS_LOG(LOG_ERR, "[%"SCNu64"] Invalid pr_policy %"SCNu32"\n", sctp->handle_id, pr_policy);
return -1;
}
if((pr_policy == SCTP_PR_SCTP_NONE) && (pr_value != 0)) {
JANUS_LOG(LOG_ERR, "[%"SCNu64"] Invalid pr_value %"SCNu32" for SCTP_PR_SCTP_NONE\n", sctp->handle_id, pr_value);
return -1;
}
if((channel = janus_sctp_find_free_channel(sctp)) == NULL) {
JANUS_LOG(LOG_ERR, "[%"SCNu64"] No more free channels available\n", sctp->handle_id);
return -1;
}
stream = janus_sctp_find_free_stream(sctp);
channel->state = DATA_CHANNEL_CONNECTING;
channel->unordered = unordered ? 1 : 0;
channel->pr_policy = pr_policy;
channel->pr_value = pr_value;
channel->stream = stream;
channel->flags = 0;
g_snprintf(channel->label, sizeof(channel->label), "%s", (label ? label : default_label));
channel->protocol[0] = '\0';
if(protocol != NULL)
g_snprintf(channel->protocol, sizeof(channel->protocol), "%s", protocol);
if(stream == 0) {
janus_sctp_request_more_streams(sctp);
} else {
if(janus_sctp_send_open_request_message(sctp->sock, stream, channel->label, channel->protocol, unordered, pr_policy, pr_value)) {
sctp->stream_channel[stream] = channel;
} else {
if(errno == EAGAIN) {
sctp->stream_channel[stream] = channel;
channel->flags |= DATA_CHANNEL_FLAGS_SEND_REQ;
} else {
channel->label[0] = '\0';
channel->protocol[0] = '\0';
channel->state = DATA_CHANNEL_CLOSED;
channel->unordered = 0;
channel->pr_policy = 0;
channel->pr_value = 0;
channel->stream = 0;
channel->flags = 0;
channel = NULL;
}
}
}
return 0;
}
int janus_sctp_send_text_or_binary(janus_sctp_association *sctp, uint16_t id, gboolean textdata, char *text, size_t length) {
if(id >= NUMBER_OF_CHANNELS || text == NULL)
return -1;
struct sctp_sendv_spa spa;
janus_sctp_channel *channel = &sctp->channels[id];
if(channel == NULL) {
/* No such channel */
JANUS_LOG(LOG_ERR, "[%"SCNu64"] No such channel %"SCNu16"...\n", sctp->handle_id, id);
return -1;
}
if((channel->state != DATA_CHANNEL_OPEN) && (channel->state != DATA_CHANNEL_CONNECTING)) {
JANUS_LOG(LOG_ERR, "[%"SCNu64"] Channel %"SCNu16" is neither open nor connecting (state=%d)...\n", sctp->handle_id, id, channel->state);
return -1;
}
memset(&spa, 0, sizeof(struct sctp_sendv_spa));
spa.sendv_sndinfo.snd_sid = channel->stream;
if((channel->state == DATA_CHANNEL_OPEN) && (channel->unordered)) {
spa.sendv_sndinfo.snd_flags = SCTP_EOR | SCTP_UNORDERED;
} else {
spa.sendv_sndinfo.snd_flags = SCTP_EOR;
}
spa.sendv_sndinfo.snd_ppid = htonl(textdata ? DATA_CHANNEL_PPID_DOMSTRING : DATA_CHANNEL_PPID_BINARY);
spa.sendv_flags = SCTP_SEND_SNDINFO_VALID;
if((channel->pr_policy == SCTP_PR_SCTP_TTL) || (channel->pr_policy == SCTP_PR_SCTP_RTX)) {
spa.sendv_prinfo.pr_policy = channel->pr_policy;
spa.sendv_prinfo.pr_value = channel->pr_value;
spa.sendv_flags |= SCTP_SEND_PRINFO_VALID;
}
if(usrsctp_sendv(sctp->sock, text, length, NULL, 0,
&spa, (socklen_t)sizeof(struct sctp_sendv_spa),
SCTP_SENDV_SPA, 0) < 0) {
int res = errno;
if(res == EAGAIN) {
/* Couldn't send the message right away, add to the queue and retry later */
return -2;
}
JANUS_LOG(LOG_ERR, "[%"SCNu64"] sctp_sendv error (%d)\n", sctp->handle_id, res);
return -1;
}
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Message sent on channel %"SCNu16"\n", sctp->handle_id, id);
return 0;
}
void janus_sctp_reset_outgoing_stream(janus_sctp_association *sctp, uint16_t stream) {
uint32_t i;
for(i = 0; i < sctp->stream_buffer_counter; i++) {
if(sctp->stream_buffer[i] == stream) {
return;
}
}
sctp->stream_buffer[sctp->stream_buffer_counter++] = stream;
return;
}
void janus_sctp_send_outgoing_stream_reset(janus_sctp_association *sctp) {
struct sctp_reset_streams *srs;
uint32_t i;
size_t len;
if(sctp->stream_buffer_counter == 0) {
return;
}
len = sizeof(sctp_assoc_t) + (2 + sctp->stream_buffer_counter) * sizeof(uint16_t);
srs = g_malloc0(len);
srs->srs_flags = SCTP_STREAM_RESET_OUTGOING;
srs->srs_number_streams = sctp->stream_buffer_counter;
for(i = 0; i < sctp->stream_buffer_counter; i++) {
srs->srs_stream_list[i] = sctp->stream_buffer[i];
}
if(usrsctp_setsockopt(sctp->sock, IPPROTO_SCTP, SCTP_RESET_STREAMS, srs, (socklen_t)len) < 0) {
JANUS_LOG(LOG_ERR, "[%"SCNu64"] setsockopt error: SCTP_RESET_STREAMS (%d)\n", sctp->handle_id, errno);
} else {
for(i = 0; i < sctp->stream_buffer_counter; i++) {
srs->srs_stream_list[i] = 0;
}
sctp->stream_buffer_counter = 0;
}
g_free(srs);
return;
}
int janus_sctp_close_channel(janus_sctp_association *sctp, uint16_t id) {
if(id >= NUMBER_OF_CHANNELS)
return -1;
janus_sctp_channel *channel = &sctp->channels[id];
if(channel == NULL) {
return -1;
}
if(channel->state != DATA_CHANNEL_OPEN) {
return -1;
}
janus_sctp_reset_outgoing_stream(sctp, channel->stream);
janus_sctp_send_outgoing_stream_reset(sctp);
channel->state = DATA_CHANNEL_CLOSING;
return 0;
}
void janus_sctp_data_ready(janus_sctp_association *sctp) {
if(sctp == NULL || g_atomic_int_get(&sctp->destroyed))
return;
if(sctp->pending_messages != NULL && !g_queue_is_empty(sctp->pending_messages)) {
/* Messages waiting in the queue, send those first */
janus_sctp_pending_message *m = g_queue_peek_head(sctp->pending_messages);
while(m != NULL) {
int res = janus_sctp_send_text_or_binary(sctp, m->id, m->textdata, m->buf, m->len);
if(res == -2) {
JANUS_LOG(LOG_WARN, "[%"SCNu64"] Got EAGAIN when trying to resend pending message on channel %"SCNu16"\n",
sctp->handle_id, m->id);
break;
}
(void)g_queue_pop_head(sctp->pending_messages);
janus_sctp_pending_message_free(m);
m = g_queue_peek_head(sctp->pending_messages);
}
}
janus_dtls_sctp_data_ready(sctp->dtls);
}
void janus_sctp_handle_open_request_message(janus_sctp_association *sctp, janus_datachannel_open_request *req, size_t length, uint16_t stream) {
janus_sctp_channel *channel;
uint32_t pr_value;
uint16_t pr_policy;
uint8_t unordered;
if(stream >= NUMBER_OF_STREAMS) {
JANUS_LOG(LOG_ERR, "[%"SCNu64"] Exceeded number of allowed streams (%u > %u).\n", sctp->handle_id, (stream+1), NUMBER_OF_STREAMS);
/* XXX: some error handling */
return;
}
if((channel = janus_sctp_find_channel_by_stream(sctp, stream))) {
JANUS_LOG(LOG_ERR, "[%"SCNu64"] channel %d is in state %d instead of CLOSED.\n", sctp->handle_id, channel->id, channel->state);
JANUS_LOG(LOG_ERR, "%.*s\n", req->label_length, req->label);
/* XXX: some error handling */
return;
}
if((channel = janus_sctp_find_free_channel(sctp)) == NULL) {
/* XXX: some error handling */
return;
}
switch (req->channel_type) {
case DATA_CHANNEL_RELIABLE:
pr_policy = SCTP_PR_SCTP_NONE;
unordered = 0;
break;
case DATA_CHANNEL_RELIABLE_UNORDERED:
pr_policy = SCTP_PR_SCTP_NONE;
unordered = 1;
break;
case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED:
pr_policy = SCTP_PR_SCTP_TTL;
unordered = 0;
break;
case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED_UNORDERED:
pr_policy = SCTP_PR_SCTP_TTL;
unordered = 1;
break;
case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT:
pr_policy = SCTP_PR_SCTP_RTX;
unordered = 1;
break;
case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT_UNORDERED:
pr_policy = SCTP_PR_SCTP_RTX;
unordered = 1;
break;
default:
pr_policy = SCTP_PR_SCTP_NONE;
unordered = 0;
/* FIXME Should we handle some error, here? */
break;
}
pr_value = ntohs(req->reliability_params);
channel->state = DATA_CHANNEL_CONNECTING;
channel->unordered = unordered;
channel->pr_policy = pr_policy;
channel->pr_value = pr_value;
channel->stream = stream;
channel->flags = 0;
sctp->stream_channel[stream] = channel;
if(janus_sctp_send_open_ack_message(sctp->sock, stream)) {
sctp->stream_channel[stream] = channel;
} else {
if(errno == EAGAIN) {
channel->flags |= DATA_CHANNEL_FLAGS_SEND_ACK;
sctp->stream_channel[stream] = channel;
} else {
/* XXX: Signal error to the other end */
sctp->stream_channel[stream] = NULL;
channel->label[0] = '\0';
channel->protocol[0] = '\0';
channel->state = DATA_CHANNEL_CLOSED;
channel->unordered = 0;
channel->pr_policy = 0;
channel->pr_value = 0;
channel->stream = 0;
channel->flags = 0;
}
}
/* Read label, if available */
char *label = NULL;
guint len = ntohs(req->label_length);
if(len > 0 && len < length) {
label = g_malloc(len+1);
memcpy(label, req->label, len);
label[len] = '\0';
g_snprintf(channel->label, sizeof(channel->label), "%s", label);
}
char *protocol = NULL;
guint plen = ntohs(req->protocol_length);
if(plen > 0 && plen < length) {
protocol = g_malloc(plen+1);
memcpy(protocol, req->label+len, plen);
protocol[plen] = '\0';
g_snprintf(channel->protocol, sizeof(channel->protocol), "%s", protocol);
}
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Opened channel '%s' (protocol=%s, id=%"SCNu16") (%d/%d/%d)\n",
sctp->handle_id, label ? label : "??", protocol ? protocol : "??",
channel->stream, channel->unordered, channel->pr_policy, channel->pr_value);
g_free(label);
g_free(protocol);
}
void janus_sctp_handle_open_response_message(janus_sctp_association *sctp, janus_datachannel_open_response *rsp, size_t length, uint16_t stream) {
janus_sctp_channel *channel;
channel = janus_sctp_find_channel_by_stream(sctp, stream);
if(channel == NULL) {
/* XXX: improve error handling */
JANUS_LOG(LOG_ERR, "[%"SCNu64"] Can't find channel for outgoing steam %d.\n", sctp->handle_id, stream);
return;
}
if(channel->state != DATA_CHANNEL_CONNECTING) {
/* XXX: improve error handling */
JANUS_LOG(LOG_ERR, "[%"SCNu64"] Channel with id %d for outgoing steam %d is in state %d.\n", sctp->handle_id, channel->id, stream, channel->state);
return;
}
if(janus_sctp_find_channel_by_stream(sctp, stream)) {
/* XXX: improve error handling */
JANUS_LOG(LOG_ERR, "[%"SCNu64"] Channel collision for channel with id %d and streams (in/out) = (%d/%d).\n", sctp->handle_id, channel->id, stream, stream);
return;
}
channel->stream = stream;
channel->state = DATA_CHANNEL_OPEN;
sctp->stream_channel[stream] = channel;
if(janus_sctp_send_open_ack_message(sctp->sock, stream)) {
channel->flags = 0;
} else {
channel->flags |= DATA_CHANNEL_FLAGS_SEND_ACK;
}
return;
}
void janus_sctp_handle_open_ack_message(janus_sctp_association *sctp, janus_datachannel_ack *ack, size_t length, uint16_t stream) {
janus_sctp_channel *channel;
channel = janus_sctp_find_channel_by_stream(sctp, stream);
if(channel == NULL) {
/* XXX: some error handling */
JANUS_LOG(LOG_ERR, "Ops, no channel with stream %"SCNu16"?\n", stream);
return;
}
if(channel->state == DATA_CHANNEL_OPEN) {
return;
}
if(channel->state != DATA_CHANNEL_CONNECTING) {
/* XXX: error handling */
return;
}
channel->state = DATA_CHANNEL_OPEN;
return;
}
void janus_sctp_handle_unknown_message(char *msg, size_t length, uint16_t stream) {
/* XXX: Send an error message */
return;
}
void janus_sctp_handle_data_message(janus_sctp_association *sctp, gboolean textdata, char *buffer, size_t length, uint16_t stream) {
janus_sctp_channel *channel;
channel = janus_sctp_find_channel_by_stream(sctp, stream);
if(channel == NULL) {
/* XXX: Some error handling */
JANUS_LOG(LOG_WARN, "[%"SCNu64"] Got data from this SCTP association but there is no channel with stream %"SCNu16"...\n", sctp->handle_id, stream);
return;
}
if(channel->state == DATA_CHANNEL_CONNECTING) {
/* Implicit ACK */
channel->state = DATA_CHANNEL_OPEN;
}
if(channel->state != DATA_CHANNEL_OPEN) {
/* XXX: What about other states? */
/* XXX: Some error handling */
JANUS_LOG(LOG_WARN, "[%"SCNu64"] Got data from this SCTP association but channel isn't open yet...\n", sctp->handle_id);
return;
} else {
/* XXX: Protect for non 0 terminated buffer */
JANUS_LOG(LOG_VERB, "[%"SCNu64"] SCTP data received of length %zu on channel with id %d.\n",
sctp->handle_id, length, channel->id);
JANUS_LOG(LOG_HUGE, "[%"SCNu64"] Incoming SCTP contents: %.*s\n",
sctp->handle_id, (int)length, buffer);
/* Pass this to the core */
janus_dtls_notify_sctp_data(sctp->dtls, channel->label,
strlen(channel->protocol) ? channel->protocol : NULL,
textdata, buffer, (int)length);
}
return;
}
void janus_sctp_handle_message(janus_sctp_association *sctp, char *buffer, size_t length, uint32_t ppid, uint16_t stream, int flags) {
janus_datachannel_open_request *req;
janus_datachannel_open_response *rsp;
janus_datachannel_ack *ack, *msg;
switch (ppid) {
case DATA_CHANNEL_PPID_CONTROL:
if(length < sizeof(janus_datachannel_ack)) {
return;
}
msg = (janus_datachannel_ack *)buffer;
switch (msg->msg_type) {
case DATA_CHANNEL_OPEN_REQUEST:
if(length < sizeof(janus_datachannel_open_request)) {
/* XXX: error handling? */
return;
}
req = (janus_datachannel_open_request *)buffer;
janus_sctp_handle_open_request_message(sctp, req, length, stream);
break;
case DATA_CHANNEL_OPEN_RESPONSE:
if(length < sizeof(janus_datachannel_open_response)) {
/* XXX: error handling? */
return;
}
rsp = (janus_datachannel_open_response *)buffer;
janus_sctp_handle_open_response_message(sctp, rsp, length, stream);
break;
case DATA_CHANNEL_ACK:
if(length < sizeof(janus_datachannel_ack)) {
/* XXX: error handling? */
return;
}
ack = (janus_datachannel_ack *)buffer;
janus_sctp_handle_open_ack_message(sctp, ack, length, stream);
break;
default:
janus_sctp_handle_unknown_message(buffer, length, stream);
break;
}
break;
case DATA_CHANNEL_PPID_DOMSTRING:
case DATA_CHANNEL_PPID_BINARY:
case DATA_CHANNEL_PPID_DOMSTRING_PARTIAL:
case DATA_CHANNEL_PPID_BINARY_PARTIAL:
if((flags & MSG_EOR) &&
ppid != DATA_CHANNEL_PPID_DOMSTRING_PARTIAL &&
ppid != DATA_CHANNEL_PPID_BINARY_PARTIAL) {
/* Message is complete, send it */
gboolean textdata = (ppid == DATA_CHANNEL_PPID_DOMSTRING || ppid == DATA_CHANNEL_PPID_DOMSTRING_PARTIAL);
if(sctp->offset > 0) {
/* We buffered multiple partial messages */
janus_sctp_handle_data_message(sctp, textdata, sctp->buffer, sctp->offset, stream);
sctp->offset = 0;
} else {
/* No buffering done, send this message as it is */
janus_sctp_handle_data_message(sctp, textdata, buffer, length, stream);
}
} else {
/* Partial message, buffer only for now */
if(length > (sctp->buflen - sctp->offset)) {
/* (re)Allocate the buffer */
int newlen = sctp->buflen + (length - (sctp->buflen - sctp->offset));
sctp->buffer = g_realloc(sctp->buffer, newlen);
sctp->buflen = newlen;
}
memcpy(sctp->buffer + sctp->offset, buffer, length);
sctp->offset += length;
}
break;
default:
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Message of length %zu, PPID %u on stream %u received.\n",
sctp->handle_id, length, ppid, stream);
break;
}
}
void janus_sctp_handle_association_change_event(struct sctp_assoc_change *sac) {
unsigned int i, n;
JANUS_LOG(LOG_VERB, "Association change ");
switch (sac->sac_state) {
case SCTP_COMM_UP:
JANUS_LOG(LOG_VERB, "SCTP_COMM_UP");
break;
case SCTP_COMM_LOST:
JANUS_LOG(LOG_VERB, "SCTP_COMM_LOST");
break;
case SCTP_RESTART:
JANUS_LOG(LOG_VERB, "SCTP_RESTART");
break;
case SCTP_SHUTDOWN_COMP:
JANUS_LOG(LOG_VERB, "SCTP_SHUTDOWN_COMP");
break;
case SCTP_CANT_STR_ASSOC:
JANUS_LOG(LOG_VERB, "SCTP_CANT_STR_ASSOC");
break;
default:
JANUS_LOG(LOG_VERB, "UNKNOWN");
break;
}
JANUS_LOG(LOG_VERB, ", streams (in/out) = (%u/%u)",
sac->sac_inbound_streams, sac->sac_outbound_streams);
n = sac->sac_length - sizeof(struct sctp_assoc_change);
if(((sac->sac_state == SCTP_COMM_UP) ||
(sac->sac_state == SCTP_RESTART)) && (n > 0)) {
JANUS_LOG(LOG_VERB, ", supports");
for(i = 0; i < n; i++) {
switch (sac->sac_info[i]) {
case SCTP_ASSOC_SUPPORTS_PR:
JANUS_LOG(LOG_VERB, " PR");
break;
case SCTP_ASSOC_SUPPORTS_AUTH:
JANUS_LOG(LOG_VERB, " AUTH");
break;
case SCTP_ASSOC_SUPPORTS_ASCONF:
JANUS_LOG(LOG_VERB, " ASCONF");
break;
case SCTP_ASSOC_SUPPORTS_MULTIBUF:
JANUS_LOG(LOG_VERB, " MULTIBUF");
break;
case SCTP_ASSOC_SUPPORTS_RE_CONFIG:
JANUS_LOG(LOG_VERB, " RE-CONFIG");
break;
default:
JANUS_LOG(LOG_VERB, " UNKNOWN(0x%02x)", sac->sac_info[i]);
break;
}
}
} else if(((sac->sac_state == SCTP_COMM_LOST) ||
(sac->sac_state == SCTP_CANT_STR_ASSOC)) && (n > 0)) {
JANUS_LOG(LOG_VERB, ", ABORT =");
for(i = 0; i < n; i++) {
JANUS_LOG(LOG_VERB, " 0x%02x", sac->sac_info[i]);
}
}
JANUS_LOG(LOG_VERB, ".\n");
if((sac->sac_state == SCTP_CANT_STR_ASSOC) ||
(sac->sac_state == SCTP_SHUTDOWN_COMP) ||
(sac->sac_state == SCTP_COMM_LOST)) {
/* FIXME Should we notify the application that data channels were lost? */
}
return;
}
void janus_sctp_handle_peer_address_change_event(struct sctp_paddr_change *spc) {
char addr_buf[INET6_ADDRSTRLEN];
const char *addr;
struct sockaddr_in *sin;
struct sockaddr_in6 *sin6;
switch (spc->spc_aaddr.ss_family) {
case AF_INET:
sin = (struct sockaddr_in *)&spc->spc_aaddr;
addr = inet_ntop(AF_INET, &sin->sin_addr, addr_buf, INET_ADDRSTRLEN);
break;
case AF_INET6:
sin6 = (struct sockaddr_in6 *)&spc->spc_aaddr;
addr = inet_ntop(AF_INET6, &sin6->sin6_addr, addr_buf, INET6_ADDRSTRLEN);
break;
default:
snprintf(addr_buf, INET6_ADDRSTRLEN, "Unknown family %d", spc->spc_aaddr.ss_family);
addr = addr_buf;
break;
}
JANUS_LOG(LOG_VERB, "Peer address %s is now ", addr);
switch (spc->spc_state) {
case SCTP_ADDR_AVAILABLE:
JANUS_LOG(LOG_VERB, "SCTP_ADDR_AVAILABLE");
break;
case SCTP_ADDR_UNREACHABLE:
JANUS_LOG(LOG_VERB, "SCTP_ADDR_UNREACHABLE");
break;
case SCTP_ADDR_REMOVED:
JANUS_LOG(LOG_VERB, "SCTP_ADDR_REMOVED");
break;
case SCTP_ADDR_ADDED:
JANUS_LOG(LOG_VERB, "SCTP_ADDR_ADDED");
break;
case SCTP_ADDR_MADE_PRIM:
JANUS_LOG(LOG_VERB, "SCTP_ADDR_MADE_PRIM");
break;
case SCTP_ADDR_CONFIRMED:
JANUS_LOG(LOG_VERB, "SCTP_ADDR_CONFIRMED");
break;
default:
JANUS_LOG(LOG_VERB, "UNKNOWN");
break;
}
JANUS_LOG(LOG_VERB, " (error = 0x%08x).\n", spc->spc_error);
return;
}
void janus_sctp_handle_adaptation_indication(struct sctp_adaptation_event *sai) {
JANUS_LOG(LOG_VERB, "Adaptation indication: %x.\n", sai-> sai_adaptation_ind);
return;
}
void janus_sctp_handle_shutdown_event(struct sctp_shutdown_event *sse) {
JANUS_LOG(LOG_VERB, "Shutdown event.\n");
/* XXX: notify all channels */
return;
}
void janus_sctp_handle_stream_reset_event(janus_sctp_association *sctp, struct sctp_stream_reset_event *strrst) {
uint32_t n, i;
janus_sctp_channel *channel;
n = (strrst->strreset_length - sizeof(struct sctp_stream_reset_event)) / sizeof(uint16_t);
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Stream reset event: flags = %x, ", sctp->handle_id, strrst->strreset_flags);
if(strrst->strreset_flags & SCTP_STREAM_RESET_INCOMING_SSN) {
if(strrst->strreset_flags & SCTP_STREAM_RESET_OUTGOING_SSN) {
JANUS_LOG(LOG_VERB, "incoming/");
}
JANUS_LOG(LOG_VERB, "incoming ");
}
if(strrst->strreset_flags & SCTP_STREAM_RESET_OUTGOING_SSN) {
JANUS_LOG(LOG_VERB, "outgoing ");
}
JANUS_LOG(LOG_VERB, "stream ids = ");
for(i = 0; i < n; i++) {
if(i > 0) {
JANUS_LOG(LOG_VERB, ", ");
}
JANUS_LOG(LOG_VERB, "%d", strrst->strreset_stream_list[i]);
}
JANUS_LOG(LOG_VERB, ".\n");
if(!(strrst->strreset_flags & SCTP_STREAM_RESET_DENIED) &&
!(strrst->strreset_flags & SCTP_STREAM_RESET_FAILED)) {
for(i = 0; i < n; i++) {
if(strrst->strreset_flags & SCTP_STREAM_RESET_INCOMING_SSN ||
strrst->strreset_flags & SCTP_STREAM_RESET_OUTGOING_SSN) {
channel = janus_sctp_find_channel_by_stream(sctp, strrst->strreset_stream_list[i]);
if(channel != NULL) {
sctp->stream_channel[channel->stream] = NULL;
if(channel->stream == 0) {
channel->pr_policy = SCTP_PR_SCTP_NONE;
channel->pr_value = 0;
channel->unordered = 0;
channel->flags = 0;
channel->state = DATA_CHANNEL_CLOSED;
channel->label[0] = '\0';
} else {
if(channel->state == DATA_CHANNEL_OPEN) {
janus_sctp_reset_outgoing_stream(sctp, channel->stream);
channel->state = DATA_CHANNEL_CLOSING;
} else {
/* XXX: What to do? */
}
}
}
}
}
}
return;
}
void janus_sctp_handle_remote_error_event(struct sctp_remote_error *sre) {
size_t i, n;
n = sre->sre_length - sizeof(struct sctp_remote_error);
JANUS_LOG(LOG_VERB, "Remote Error (error = 0x%04x): ", sre->sre_error);
for(i = 0; i < n; i++) {
JANUS_LOG(LOG_VERB, " 0x%02x", sre-> sre_data[i]);
}
JANUS_LOG(LOG_VERB, ".\n");
return;
}
void janus_sctp_handle_send_failed_event(struct sctp_send_failed_event *ssfe) {
size_t i, n;
if(ssfe->ssfe_flags & SCTP_DATA_UNSENT) {
JANUS_LOG(LOG_VERB, "Unsent ");
}
if(ssfe->ssfe_flags & SCTP_DATA_SENT) {
JANUS_LOG(LOG_VERB, "Sent ");
}
if(ssfe->ssfe_flags & ~(SCTP_DATA_SENT | SCTP_DATA_UNSENT)) {
JANUS_LOG(LOG_VERB, "(flags = %x) ", ssfe->ssfe_flags);
}
JANUS_LOG(LOG_VERB, "message with PPID = %d, SID = %d, flags: 0x%04x due to error = 0x%08x",
ntohl(ssfe->ssfe_info.snd_ppid), ssfe->ssfe_info.snd_sid,
ssfe->ssfe_info.snd_flags, ssfe->ssfe_error);
n = ssfe->ssfe_length - sizeof(struct sctp_send_failed_event);
for(i = 0; i < n; i++) {
JANUS_LOG(LOG_VERB, " 0x%02x", ssfe->ssfe_data[i]);
}
JANUS_LOG(LOG_VERB, ".\n");
return;
}
void janus_sctp_handle_notification(janus_sctp_association *sctp, union sctp_notification *notif, size_t n) {
if(notif->sn_header.sn_length != (uint32_t)n) {
return;
}
switch (notif->sn_header.sn_type) {
case SCTP_ASSOC_CHANGE:
janus_sctp_handle_association_change_event(&(notif->sn_assoc_change));
break;
case SCTP_PEER_ADDR_CHANGE:
janus_sctp_handle_peer_address_change_event(&(notif->sn_paddr_change));
break;
case SCTP_REMOTE_ERROR:
janus_sctp_handle_remote_error_event(&(notif->sn_remote_error));
break;
case SCTP_SHUTDOWN_EVENT:
janus_sctp_handle_shutdown_event(&(notif->sn_shutdown_event));
break;
case SCTP_ADAPTATION_INDICATION:
janus_sctp_handle_adaptation_indication(&(notif->sn_adaptation_event));
break;
case SCTP_PARTIAL_DELIVERY_EVENT:
break;
case SCTP_AUTHENTICATION_EVENT:
break;
case SCTP_SENDER_DRY_EVENT: {
/* Internal buffers empty, notify the application they can send again */
janus_sctp_data_ready(sctp);
break;
}
case SCTP_NOTIFICATIONS_STOPPED_EVENT:
break;
case SCTP_SEND_FAILED_EVENT:
janus_sctp_handle_send_failed_event(&(notif->sn_send_failed_event));
break;
case SCTP_STREAM_RESET_EVENT:
janus_sctp_handle_stream_reset_event(sctp, &(notif->sn_strreset_event));
janus_sctp_send_deferred_messages(sctp);
janus_sctp_send_outgoing_stream_reset(sctp);
janus_sctp_request_more_streams(sctp);
break;
case SCTP_ASSOC_RESET_EVENT:
break;
case SCTP_STREAM_CHANGE_EVENT:
JANUS_LOG(LOG_VERB, "[%"SCNu64"] Stream change (in/out) = (%u/%u)\n", sctp ? sctp->handle_id : 0,
notif->sn_strchange_event.strchange_instrms, notif->sn_strchange_event.strchange_outstrms);
break;
default:
break;
}
}
#endif
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/yejinlei-mirror/janus-gateway.git
git@gitee.com:yejinlei-mirror/janus-gateway.git
yejinlei-mirror
janus-gateway
janus-gateway
master

搜索帮助