60 Star 460 Fork 134

GVPwinshining/nginx-http-flv-module

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
ngx_rtmp_relay_module.c 51.98 KB
一键复制 编辑 原始数据 按行查看 历史
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939
/*
* Copyright (C) Roman Arutyunyan
* Copyright (C) Winshining
*/
#include <ngx_config.h>
#include <ngx_core.h>
#include "ngx_rtmp_relay_module.h"
#include "ngx_rtmp_cmd_module.h"
#include "ngx_rtmp_eval.h"
static ngx_rtmp_publish_pt next_publish;
static ngx_rtmp_play_pt next_play;
static ngx_rtmp_delete_stream_pt next_delete_stream;
static ngx_rtmp_close_stream_pt next_close_stream;
static ngx_int_t ngx_rtmp_relay_init_process(ngx_cycle_t *cycle);
static ngx_int_t ngx_rtmp_relay_postconfiguration(ngx_conf_t *cf);
static void * ngx_rtmp_relay_create_app_conf(ngx_conf_t *cf);
static char * ngx_rtmp_relay_merge_app_conf(ngx_conf_t *cf,
void *parent, void *child);
static char * ngx_rtmp_relay_push_pull(ngx_conf_t *cf, ngx_command_t *cmd,
void *conf);
static ngx_int_t ngx_rtmp_relay_publish(ngx_rtmp_session_t *s,
ngx_rtmp_publish_t *v);
static ngx_rtmp_relay_ctx_t * ngx_rtmp_relay_create_connection(
ngx_rtmp_conf_ctx_t *cctx, ngx_str_t* name,
ngx_rtmp_relay_target_t *target);
static void ngx_rtmp_relay_eval_ctx_str(void *ctx, ngx_rtmp_eval_t *e,
ngx_str_t *ret);
/* _____
* =push= | |---publish--->
* ---publish--->| |---publish--->
* (src) | |---publish--->
* ----- (next,relay)
* need reconnect
* =pull= _____
* -----play---->| |
* -----play---->| |----play----->
* -----play---->| | (src,relay)
* (next) -----
*/
typedef struct {
ngx_rtmp_conf_ctx_t cctx;
ngx_rtmp_relay_target_t *target;
} ngx_rtmp_relay_static_t;
#define NGX_RTMP_RELAY_CONNECT_TRANS 1
#define NGX_RTMP_RELAY_RELEASE_STREAM_TRANS 2
#define NGX_RTMP_RELAY_FCPUBLISH_STREAM_TRANS 3
#define NGX_RTMP_RELAY_CREATE_STREAM_TRANS 4
#define NGX_RTMP_RELAY_CSID_AMF_INI 3
#define NGX_RTMP_RELAY_CSID_AMF 5
#define NGX_RTMP_RELAY_MSID 1
/* default flashVer */
#define NGX_RTMP_RELAY_FLASHVER "LNX.11,1,102,55"
static ngx_command_t ngx_rtmp_relay_commands[] = {
{ ngx_string("push"),
NGX_RTMP_APP_CONF|NGX_CONF_1MORE,
ngx_rtmp_relay_push_pull,
NGX_RTMP_APP_CONF_OFFSET,
0,
NULL },
{ ngx_string("pull"),
NGX_RTMP_APP_CONF|NGX_CONF_1MORE,
ngx_rtmp_relay_push_pull,
NGX_RTMP_APP_CONF_OFFSET,
0,
NULL },
{ ngx_string("relay_buffer"),
NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_CONF_TAKE1,
ngx_conf_set_msec_slot,
NGX_RTMP_APP_CONF_OFFSET,
offsetof(ngx_rtmp_relay_app_conf_t, buflen),
NULL },
{ ngx_string("push_reconnect"),
NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_RTMP_APP_CONF|NGX_CONF_TAKE1,
ngx_conf_set_msec_slot,
NGX_RTMP_APP_CONF_OFFSET,
offsetof(ngx_rtmp_relay_app_conf_t, push_reconnect),
NULL },
{ ngx_string("pull_reconnect"),
NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_RTMP_APP_CONF|NGX_CONF_TAKE1,
ngx_conf_set_msec_slot,
NGX_RTMP_APP_CONF_OFFSET,
offsetof(ngx_rtmp_relay_app_conf_t, pull_reconnect),
NULL },
{ ngx_string("session_relay"),
NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_RTMP_APP_CONF|NGX_CONF_TAKE1,
ngx_conf_set_flag_slot,
NGX_RTMP_APP_CONF_OFFSET,
offsetof(ngx_rtmp_relay_app_conf_t, session_relay),
NULL },
ngx_null_command
};
static ngx_rtmp_module_t ngx_rtmp_relay_module_ctx = {
NULL, /* preconfiguration */
ngx_rtmp_relay_postconfiguration, /* postconfiguration */
NULL, /* create main configuration */
NULL, /* init main configuration */
NULL, /* create server configuration */
NULL, /* merge server configuration */
ngx_rtmp_relay_create_app_conf, /* create app configuration */
ngx_rtmp_relay_merge_app_conf /* merge app configuration */
};
ngx_module_t ngx_rtmp_relay_module = {
NGX_MODULE_V1,
&ngx_rtmp_relay_module_ctx, /* module context */
ngx_rtmp_relay_commands, /* module directives */
NGX_RTMP_MODULE, /* module type */
NULL, /* init master */
NULL, /* init module */
ngx_rtmp_relay_init_process, /* init process */
NULL, /* init thread */
NULL, /* exit thread */
NULL, /* exit process */
NULL, /* exit master */
NGX_MODULE_V1_PADDING
};
static ngx_rtmp_eval_t ngx_rtmp_relay_specific_eval[] = {
{ ngx_string("name"),
ngx_rtmp_relay_eval_ctx_str,
offsetof(ngx_rtmp_session_t, stream) },
{ ngx_string("args"),
ngx_rtmp_relay_eval_ctx_str,
offsetof(ngx_rtmp_session_t, args) },
ngx_rtmp_null_eval
};
static ngx_rtmp_eval_t *ngx_rtmp_relay_eval[] = {
ngx_rtmp_eval_session,
ngx_rtmp_relay_specific_eval,
NULL
};
static void
ngx_rtmp_relay_eval_ctx_str(void *ctx, ngx_rtmp_eval_t *e, ngx_str_t *ret)
{
*ret = *(ngx_str_t *) ((u_char *) ctx + e->offset);
}
static void *
ngx_rtmp_relay_create_app_conf(ngx_conf_t *cf)
{
ngx_rtmp_relay_app_conf_t *racf;
racf = ngx_pcalloc(cf->pool, sizeof(ngx_rtmp_relay_app_conf_t));
if (racf == NULL) {
return NULL;
}
if (ngx_array_init(&racf->pushes, cf->pool, 1, sizeof(void *)) != NGX_OK) {
return NULL;
}
if (ngx_array_init(&racf->pulls, cf->pool, 1, sizeof(void *)) != NGX_OK) {
return NULL;
}
if (ngx_array_init(&racf->static_pulls, cf->pool, 1, sizeof(void *))
!= NGX_OK)
{
return NULL;
}
if (ngx_array_init(&racf->static_events, cf->pool, 1, sizeof(void *))
!= NGX_OK)
{
return NULL;
}
racf->nbuckets = 1024;
racf->log = &cf->cycle->new_log;
racf->buflen = NGX_CONF_UNSET_MSEC;
racf->session_relay = NGX_CONF_UNSET;
racf->push_reconnect = NGX_CONF_UNSET_MSEC;
racf->pull_reconnect = NGX_CONF_UNSET_MSEC;
return racf;
}
static char *
ngx_rtmp_relay_merge_app_conf(ngx_conf_t *cf, void *parent, void *child)
{
ngx_rtmp_relay_app_conf_t *prev = parent;
ngx_rtmp_relay_app_conf_t *conf = child;
conf->ctx = ngx_pcalloc(cf->pool, sizeof(ngx_rtmp_relay_ctx_t *)
* conf->nbuckets);
ngx_conf_merge_value(conf->session_relay, prev->session_relay, 0);
ngx_conf_merge_msec_value(conf->buflen, prev->buflen, 5000);
ngx_conf_merge_msec_value(conf->push_reconnect, prev->push_reconnect,
3000);
ngx_conf_merge_msec_value(conf->pull_reconnect, prev->pull_reconnect,
3000);
return NGX_CONF_OK;
}
static void
ngx_rtmp_relay_static_pull_reconnect(ngx_event_t *ev)
{
ngx_rtmp_relay_static_t *rs = ev->data;
ngx_rtmp_relay_ctx_t *ctx;
ngx_rtmp_relay_app_conf_t *racf;
racf = ngx_rtmp_get_module_app_conf(&rs->cctx, ngx_rtmp_relay_module);
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, racf->log, 0,
"relay: reconnecting static pull");
ctx = ngx_rtmp_relay_create_connection(&rs->cctx, &rs->target->name,
rs->target);
if (ctx) {
ctx->session->static_relay = 1;
ctx->static_evt = ev;
return;
}
ngx_add_timer(ev, racf->pull_reconnect);
}
static void
ngx_rtmp_relay_push_reconnect(ngx_event_t *ev)
{
ngx_rtmp_session_t *s = ev->data;
ngx_rtmp_relay_app_conf_t *racf;
ngx_rtmp_relay_ctx_t *ctx, *pctx;
ngx_uint_t n;
ngx_rtmp_relay_target_t *target, **t;
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"relay: push reconnect");
racf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_relay_module);
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
if (ctx == NULL) {
return;
}
t = racf->pushes.elts;
for (n = 0; n < racf->pushes.nelts; ++n, ++t) {
target = *t;
if (target->name.len && (ctx->name.len != target->name.len ||
ngx_memcmp(ctx->name.data, target->name.data, ctx->name.len)))
{
continue;
}
for (pctx = ctx->play; pctx; pctx = pctx->next) {
if (pctx->tag == &ngx_rtmp_relay_module &&
pctx->data == target)
{
break;
}
}
if (pctx) {
continue;
}
if (ngx_rtmp_relay_push(s, &ctx->name, target) == NGX_OK) {
continue;
}
ngx_log_error(NGX_LOG_ERR, s->connection->log, 0,
"relay: push reconnect failed name='%V' app='%V' "
"playpath='%V' url='%V' args='%V'",
&ctx->name, &target->app, &target->play_path,
&target->url.url, &s->args);
if (!ctx->push_evt.timer_set) {
ngx_add_timer(&ctx->push_evt, racf->push_reconnect);
}
}
}
static ngx_int_t
ngx_rtmp_relay_get_peer(ngx_peer_connection_t *pc, void *data)
{
return NGX_OK;
}
static void
ngx_rtmp_relay_free_peer(ngx_peer_connection_t *pc, void *data,
ngx_uint_t state)
{
}
typedef ngx_rtmp_relay_ctx_t * (* ngx_rtmp_relay_create_ctx_pt)
(ngx_rtmp_session_t *s, ngx_str_t *name, ngx_rtmp_relay_target_t *target);
static ngx_int_t
ngx_rtmp_relay_copy_str(ngx_pool_t *pool, ngx_str_t *dst, ngx_str_t *src)
{
if (src->len == 0) {
return NGX_OK;
}
dst->len = src->len;
dst->data = ngx_palloc(pool, src->len);
if (dst->data == NULL) {
return NGX_ERROR;
}
ngx_memcpy(dst->data, src->data, src->len);
return NGX_OK;
}
static ngx_rtmp_relay_ctx_t *
ngx_rtmp_relay_create_connection(ngx_rtmp_conf_ctx_t *cctx, ngx_str_t *name,
ngx_rtmp_relay_target_t *target)
{
ngx_rtmp_relay_app_conf_t *racf;
ngx_rtmp_relay_ctx_t *rctx;
ngx_rtmp_addr_conf_t *addr_conf;
ngx_rtmp_conf_ctx_t *addr_ctx;
ngx_rtmp_session_t *rs;
ngx_peer_connection_t *pc;
ngx_connection_t *c;
ngx_addr_t *addr;
ngx_pool_t *pool;
size_t len;
ngx_int_t rc;
ngx_str_t v, *uri;
u_char *first, *last, *p;
u_char buf[NGX_SOCKADDR_STRLEN];
racf = ngx_rtmp_get_module_app_conf(cctx, ngx_rtmp_relay_module);
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, racf->log, 0,
"relay: create remote context");
pool = NULL;
pool = ngx_create_pool(4096, racf->log);
if (pool == NULL) {
return NULL;
}
rctx = ngx_pcalloc(pool, sizeof(ngx_rtmp_relay_ctx_t));
if (rctx == NULL) {
goto clear;
}
if (name && ngx_rtmp_relay_copy_str(pool, &rctx->name, name) != NGX_OK) {
goto clear;
}
if (ngx_rtmp_relay_copy_str(pool, &rctx->url, &target->url.url) != NGX_OK) {
goto clear;
}
rctx->tag = target->tag;
rctx->data = target->data;
#define NGX_RTMP_RELAY_STR_COPY(to, from) \
if (ngx_rtmp_relay_copy_str(pool, &rctx->to, &target->from) != NGX_OK) { \
goto clear; \
}
NGX_RTMP_RELAY_STR_COPY(app, app);
NGX_RTMP_RELAY_STR_COPY(tc_url, tc_url);
NGX_RTMP_RELAY_STR_COPY(page_url, page_url);
NGX_RTMP_RELAY_STR_COPY(swf_url, swf_url);
NGX_RTMP_RELAY_STR_COPY(flash_ver, flash_ver);
NGX_RTMP_RELAY_STR_COPY(play_path, play_path);
rctx->live = target->live;
rctx->start = target->start;
rctx->stop = target->stop;
#undef NGX_RTMP_RELAY_STR_COPY
if (rctx->app.len == 0 || rctx->play_path.len == 0) {
/* parse uri */
uri = &target->url.uri;
first = uri->data;
last = uri->data + uri->len;
if (first != last && *first == '/') {
++first;
}
if (first != last) {
/* deduce app */
p = ngx_strlchr(first, last, '/');
if (p == NULL) {
p = last;
}
if (rctx->app.len == 0 && first != p) {
v.data = first;
v.len = p - first;
if (ngx_rtmp_relay_copy_str(pool, &rctx->app, &v) != NGX_OK) {
goto clear;
}
}
/* deduce play_path */
if (p != last) {
++p;
}
if (rctx->play_path.len == 0 && p != last) {
v.data = p;
v.len = last - p;
if (ngx_rtmp_relay_copy_str(pool, &rctx->play_path, &v)
!= NGX_OK)
{
goto clear;
}
}
}
}
pc = ngx_pcalloc(pool, sizeof(ngx_peer_connection_t));
if (pc == NULL) {
goto clear;
}
if (target->url.naddrs == 0) {
ngx_log_error(NGX_LOG_ERR, racf->log, 0,
"relay: no address");
goto clear;
}
/* get address */
addr = &target->url.addrs[target->counter % target->url.naddrs];
target->counter++;
/* copy log to keep shared log unchanged */
rctx->log = *racf->log;
pc->log = &rctx->log;
pc->get = ngx_rtmp_relay_get_peer;
pc->free = ngx_rtmp_relay_free_peer;
pc->name = ngx_palloc(pool, sizeof(ngx_str_t) + addr->name.len);
if (pc->name == NULL) {
goto clear;
}
pc->name->len = addr->name.len;
pc->name->data = (u_char *) pc->name + sizeof(ngx_str_t);
ngx_memcpy(pc->name->data, addr->name.data, addr->name.len);
pc->socklen = addr->socklen;
pc->sockaddr = (struct sockaddr *)ngx_palloc(pool, pc->socklen);
if (pc->sockaddr == NULL) {
goto clear;
}
ngx_memcpy(pc->sockaddr, addr->sockaddr, pc->socklen);
rc = ngx_event_connect_peer(pc);
if (rc != NGX_OK && rc != NGX_AGAIN ) {
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, racf->log, 0,
"relay: connection failed");
goto clear;
}
c = pc->connection;
c->pool = pool;
ngx_str_set(&c->addr_text, "ngx-relay");
addr_conf = ngx_pcalloc(pool, sizeof(ngx_rtmp_addr_conf_t));
if (addr_conf == NULL) {
goto clear;
}
#if (NGX_HAVE_UNIX_DOMAIN)
if (addr->sockaddr->sa_family == AF_UNIX) {
addr_conf->addr_text.len = target->url.host.len;
addr_conf->addr_text.data = ngx_pcalloc(pool,
addr_conf->addr_text.len);
if (addr_conf->addr_text.data == NULL) {
ngx_log_error(NGX_LOG_ERR, racf->log, 0,
"relay: allocation for unix address failed");
goto clear;
}
ngx_memcpy(addr_conf->addr_text.data, target->url.host.data,
addr_conf->addr_text.len);
} else
#endif
{
len = ngx_sock_ntop(pc->sockaddr,
#if (nginx_version >= 1005003)
pc->socklen,
#endif
buf, NGX_SOCKADDR_STRLEN, 1);
addr_conf->addr_text.data = ngx_pcalloc(pool, len);
if (addr_conf->addr_text.data == NULL) {
ngx_log_error(NGX_LOG_ERR, racf->log, 0,
"relay: allocation for address failed");
goto clear;
}
addr_conf->addr_text.len = len;
ngx_memcpy(addr_conf->addr_text.data, buf, len);
}
addr_conf->default_server = ngx_pcalloc(pool,
sizeof(ngx_rtmp_core_srv_conf_t));
if (addr_conf->default_server == NULL) {
goto clear;
}
addr_ctx = ngx_pcalloc(pool, sizeof(ngx_rtmp_conf_ctx_t));
if (addr_ctx == NULL) {
goto clear;
}
addr_conf->default_server->ctx = addr_ctx;
addr_ctx->main_conf = cctx->main_conf;
addr_ctx->srv_conf = cctx->srv_conf;
rs = ngx_rtmp_init_session(c, addr_conf);
if (rs == NULL) {
/* no need to destroy pool */
return NULL;
}
rs->app_conf = cctx->app_conf;
rs->relay = 1;
rctx->session = rs;
ngx_rtmp_set_ctx(rs, rctx, ngx_rtmp_relay_module);
ngx_str_set(&rs->flashver, "ngx-local-relay");
#if (NGX_STAT_STUB)
(void) ngx_atomic_fetch_add(ngx_stat_active, 1);
#endif
ngx_rtmp_client_handshake(rs, 1);
return rctx;
clear:
if (pool) {
ngx_destroy_pool(pool);
}
return NULL;
}
static ngx_rtmp_relay_ctx_t *
ngx_rtmp_relay_create_remote_ctx(ngx_rtmp_session_t *s, ngx_str_t *name,
ngx_rtmp_relay_target_t *target)
{
ngx_str_t url;
ngx_url_t save;
ngx_rtmp_conf_ctx_t cctx;
ngx_rtmp_relay_ctx_t *rctx;
cctx.app_conf = s->app_conf;
cctx.srv_conf = s->srv_conf;
cctx.main_conf = s->main_conf;
rctx = NULL;
save = target->url;
ngx_memzero(&url, sizeof(ngx_str_t));
if(ngx_strlchr(target->url.url.data,
target->url.url.data + target->url.url.len, '$'))
{
ngx_memzero(&url, sizeof(ngx_str_t));
if(ngx_rtmp_eval(s, &target->url.url, ngx_rtmp_relay_eval,
&url, s->connection->log) == NGX_OK)
{
target->url.default_port = 1935;
target->url.uri_part = 1;
target->url.url = url;
target->url.addrs = NULL;
target->url.naddrs = 0;
if(ngx_parse_url(s->connection->pool, &target->url) != NGX_OK) {
ngx_log_error(NGX_LOG_ERR, s->connection->log, 0,
"relay: invalid url='%V'", &target->url.url);
goto error;
}
} else {
ngx_log_error(NGX_LOG_ERR, s->connection->log, 0,
"relay: failed to eval url='%V'", &target->url.url);
goto error;
}
}
rctx = ngx_rtmp_relay_create_connection(&cctx, name, target);
if (rctx) {
rctx->server_name.data = s->host_start;
rctx->server_name.len = s->host_end - s->host_start;
}
error:
target->url = save;
if (url.len) {
ngx_free(url.data);
ngx_memzero(&url, sizeof(ngx_str_t));
}
return rctx;
}
static ngx_rtmp_relay_ctx_t *
ngx_rtmp_relay_create_local_ctx(ngx_rtmp_session_t *s, ngx_str_t *name,
ngx_rtmp_relay_target_t *target)
{
ngx_rtmp_relay_ctx_t *ctx;
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"relay: create local context");
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
if (ctx == NULL) {
ctx = ngx_pcalloc(s->connection->pool, sizeof(ngx_rtmp_relay_ctx_t));
if (ctx == NULL) {
return NULL;
}
ngx_rtmp_set_ctx(s, ctx, ngx_rtmp_relay_module);
}
ctx->session = s;
ctx->push_evt.data = s;
ctx->push_evt.log = s->connection->log;
ctx->push_evt.handler = ngx_rtmp_relay_push_reconnect;
if (ctx->publish) {
return NULL;
}
if (ngx_rtmp_relay_copy_str(s->connection->pool, &ctx->name, name)
!= NGX_OK)
{
return NULL;
}
return ctx;
}
static ngx_int_t
ngx_rtmp_relay_create(ngx_rtmp_session_t *s, ngx_str_t *name,
ngx_rtmp_relay_target_t *target,
ngx_rtmp_relay_create_ctx_pt create_publish_ctx,
ngx_rtmp_relay_create_ctx_pt create_play_ctx)
{
ngx_rtmp_relay_app_conf_t *racf;
ngx_rtmp_relay_ctx_t *publish_ctx, *play_ctx, **cctx;
ngx_uint_t hash;
racf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_relay_module);
if (racf == NULL) {
return NGX_ERROR;
}
play_ctx = create_play_ctx(s, name, target);
if (play_ctx == NULL) {
return NGX_ERROR;
}
hash = ngx_hash_key(name->data, name->len);
cctx = &racf->ctx[hash % racf->nbuckets];
for (; *cctx; cctx = &(*cctx)->next) {
if ((*cctx)->name.len == name->len
&& !ngx_memcmp(name->data, (*cctx)->name.data,
name->len))
{
break;
}
}
if (*cctx) {
play_ctx->publish = (*cctx)->publish;
play_ctx->next = (*cctx)->play;
(*cctx)->play = play_ctx;
return NGX_OK;
}
publish_ctx = create_publish_ctx(s, name, target);
if (publish_ctx == NULL) {
ngx_rtmp_finalize_session(play_ctx->session);
return NGX_ERROR;
}
publish_ctx->publish = publish_ctx;
publish_ctx->play = play_ctx;
play_ctx->publish = publish_ctx;
*cctx = publish_ctx;
return NGX_OK;
}
ngx_int_t
ngx_rtmp_relay_pull(ngx_rtmp_session_t *s, ngx_str_t *name,
ngx_rtmp_relay_target_t *target)
{
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,
"relay: create pull name='%V' app='%V' playpath='%V' "
"url='%V' args='%V'",
name, &target->app, &target->play_path,
&target->url.url, &s->args);
return ngx_rtmp_relay_create(s, name, target,
ngx_rtmp_relay_create_remote_ctx,
ngx_rtmp_relay_create_local_ctx);
}
ngx_int_t
ngx_rtmp_relay_push(ngx_rtmp_session_t *s, ngx_str_t *name,
ngx_rtmp_relay_target_t *target)
{
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,
"relay: create push name='%V' app='%V' playpath='%V' "
"url='%V' args='%V'",
name, &target->app, &target->play_path,
&target->url.url, &s->args);
return ngx_rtmp_relay_create(s, name, target,
ngx_rtmp_relay_create_local_ctx,
ngx_rtmp_relay_create_remote_ctx);
}
static ngx_int_t
ngx_rtmp_relay_publish(ngx_rtmp_session_t *s, ngx_rtmp_publish_t *v)
{
ngx_rtmp_relay_app_conf_t *racf;
ngx_rtmp_relay_target_t *target, **t;
ngx_str_t name;
size_t n;
ngx_rtmp_relay_ctx_t *ctx;
if (s->auto_pushed) {
goto next;
}
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
if (ctx && s->relay) {
goto next;
}
racf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_relay_module);
if (racf == NULL || racf->pushes.nelts == 0) {
goto next;
}
name.len = ngx_strlen(v->name);
name.data = v->name;
t = racf->pushes.elts;
for (n = 0; n < racf->pushes.nelts; ++n, ++t) {
target = *t;
if (target->name.len && (name.len != target->name.len ||
ngx_memcmp(name.data, target->name.data, name.len)))
{
continue;
}
if (ngx_rtmp_relay_push(s, &name, target) == NGX_OK) {
continue;
}
ngx_log_error(NGX_LOG_ERR, s->connection->log, 0,
"relay: push failed name='%V' app='%V' "
"playpath='%V' url='%V' args='%V'",
&name, &target->app, &target->play_path,
&target->url.url, &s->args);
/* ctx == NULL && s->relay == 0, BOOM! */
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
if (ctx && !ctx->push_evt.timer_set) {
ngx_add_timer(&ctx->push_evt, racf->push_reconnect);
}
}
next:
return next_publish(s, v);
}
static ngx_int_t
ngx_rtmp_relay_play(ngx_rtmp_session_t *s, ngx_rtmp_play_t *v)
{
ngx_rtmp_relay_app_conf_t *racf;
ngx_rtmp_relay_target_t *target, **t;
ngx_str_t name;
size_t n;
ngx_rtmp_relay_ctx_t *ctx;
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
if (ctx && s->relay) {
goto next;
}
racf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_relay_module);
if (racf == NULL || racf->pulls.nelts == 0) {
goto next;
}
name.len = ngx_strlen(v->name);
name.data = v->name;
t = racf->pulls.elts;
for (n = 0; n < racf->pulls.nelts; ++n, ++t) {
target = *t;
if (target->name.len && (name.len != target->name.len ||
ngx_memcmp(name.data, target->name.data, name.len)))
{
continue;
}
if (ngx_rtmp_relay_pull(s, &name, target) == NGX_OK) {
continue;
}
ngx_log_error(NGX_LOG_ERR, s->connection->log, 0,
"relay: pull failed name='%V' app='%V' "
"playpath='%V' url='%V' args='%V'",
&name, &target->app, &target->play_path,
&target->url.url, &s->args);
}
next:
return next_play(s, v);
}
static ngx_int_t
ngx_rtmp_relay_play_local(ngx_rtmp_session_t *s)
{
ngx_rtmp_play_t v;
ngx_rtmp_relay_ctx_t *ctx;
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
if (ctx == NULL) {
return NGX_ERROR;
}
ngx_memzero(&v, sizeof(ngx_rtmp_play_t));
v.silent = 1;
*(ngx_cpymem(v.name, ctx->name.data,
ngx_min(sizeof(v.name) - 1, ctx->name.len))) = 0;
return ngx_rtmp_play(s, &v);
}
static ngx_int_t
ngx_rtmp_relay_publish_local(ngx_rtmp_session_t *s)
{
ngx_rtmp_publish_t v;
ngx_rtmp_relay_ctx_t *ctx;
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
if (ctx == NULL) {
return NGX_ERROR;
}
ngx_memzero(&v, sizeof(ngx_rtmp_publish_t));
v.silent = 1;
*(ngx_cpymem(v.name, ctx->name.data,
ngx_min(sizeof(v.name) - 1, ctx->name.len))) = 0;
return ngx_rtmp_publish(s, &v);
}
static ngx_int_t
ngx_rtmp_relay_send_connect(ngx_rtmp_session_t *s)
{
static double trans = NGX_RTMP_RELAY_CONNECT_TRANS;
static double acodecs = 3575;
static double vcodecs = 252;
static ngx_rtmp_amf_elt_t out_cmd[] = {
{ NGX_RTMP_AMF_STRING,
ngx_string("app"),
NULL, 0 }, /* <-- fill */
{ NGX_RTMP_AMF_STRING,
ngx_string("tcUrl"),
NULL, 0 }, /* <-- fill */
{ NGX_RTMP_AMF_STRING,
ngx_string("pageUrl"),
NULL, 0 }, /* <-- fill */
{ NGX_RTMP_AMF_STRING,
ngx_string("swfUrl"),
NULL, 0 }, /* <-- fill */
{ NGX_RTMP_AMF_STRING,
ngx_string("flashVer"),
NULL, 0 }, /* <-- fill */
{ NGX_RTMP_AMF_STRING,
ngx_string("serverName"),
NULL, 0 }, /* <-- fill */
{ NGX_RTMP_AMF_NUMBER,
ngx_string("audioCodecs"),
&acodecs, 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_string("videoCodecs"),
&vcodecs, 0 }
};
static ngx_rtmp_amf_elt_t out_elts[] = {
{ NGX_RTMP_AMF_STRING,
ngx_null_string,
"connect", 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
&trans, 0 },
{ NGX_RTMP_AMF_OBJECT,
ngx_null_string,
out_cmd, sizeof(out_cmd) }
};
ngx_rtmp_core_app_conf_t *cacf;
ngx_rtmp_core_srv_conf_t *cscf;
ngx_rtmp_relay_ctx_t *ctx;
ngx_rtmp_header_t h;
size_t len, url_len;
u_char *p, *url_end;
cacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_core_module);
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
if (cacf == NULL || ctx == NULL) {
return NGX_ERROR;
}
/* app */
if (ctx->app.len) {
out_cmd[0].data = ctx->app.data;
out_cmd[0].len = ctx->app.len;
} else {
out_cmd[0].data = cacf->name.data;
out_cmd[0].len = cacf->name.len;
}
/* tcUrl */
if (ctx->tc_url.len) {
out_cmd[1].data = ctx->tc_url.data;
out_cmd[1].len = ctx->tc_url.len;
} else {
len = sizeof("rtmp://") - 1 + ctx->url.len +
sizeof("/") - 1 + ctx->app.len;
p = ngx_palloc(s->connection->pool, len);
if (p == NULL) {
return NGX_ERROR;
}
out_cmd[1].data = p;
p = ngx_cpymem(p, "rtmp://", sizeof("rtmp://") - 1);
url_len = ctx->url.len;
url_end = ngx_strlchr(ctx->url.data, ctx->url.data + ctx->url.len, '/');
if (url_end) {
url_len = (size_t) (url_end - ctx->url.data);
}
p = ngx_cpymem(p, ctx->url.data, url_len);
*p++ = '/';
p = ngx_cpymem(p, ctx->app.data, ctx->app.len);
out_cmd[1].len = p - (u_char *)out_cmd[1].data;
}
/* pageUrl */
out_cmd[2].data = ctx->page_url.data;
out_cmd[2].len = ctx->page_url.len;
/* swfUrl */
out_cmd[3].data = ctx->swf_url.data;
out_cmd[3].len = ctx->swf_url.len;
/* flashVer */
if (ctx->flash_ver.len) {
out_cmd[4].data = ctx->flash_ver.data;
out_cmd[4].len = ctx->flash_ver.len;
} else {
out_cmd[4].data = NGX_RTMP_RELAY_FLASHVER;
out_cmd[4].len = sizeof(NGX_RTMP_RELAY_FLASHVER) - 1;
}
/* used in ngx_rtmp_set_virtual_server when auto_pushed */
out_cmd[5].data = ctx->server_name.data;
out_cmd[5].len = ctx->server_name.len;
ngx_memzero(&h, sizeof(h));
h.csid = NGX_RTMP_RELAY_CSID_AMF_INI;
h.type = NGX_RTMP_MSG_AMF_CMD;
return ngx_rtmp_send_chunk_size(s, cscf->chunk_size) != NGX_OK
|| ngx_rtmp_send_ack_size(s, cscf->ack_window) != NGX_OK
|| ngx_rtmp_send_amf(s, &h, out_elts,
sizeof(out_elts) / sizeof(out_elts[0])) != NGX_OK
? NGX_ERROR
: NGX_OK;
}
#if 0
static ngx_int_t
ngx_rtmp_relay_send_release_stream(ngx_rtmp_session_t *s)
{
static double trans = NGX_RTMP_RELAY_RELEASE_STREAM_TRANS;
static ngx_rtmp_amf_elt_t out_elts[] = {
{ NGX_RTMP_AMF_STRING,
ngx_null_string,
"releaseStream", 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
&trans, 0 },
{ NGX_RTMP_AMF_NULL,
ngx_null_string,
NULL, 0 },
{ NGX_RTMP_AMF_STRING,
ngx_string("stream"),
NULL, 0 }
};
ngx_rtmp_header_t h;
ngx_rtmp_core_app_conf_t *cacf;
ngx_rtmp_relay_ctx_t *ctx;
cacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_core_module);
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
if (cacf == NULL || ctx == NULL) {
return NGX_ERROR;
}
if (ctx->name.len) {
out_elts[3].data = ctx->name.data;
out_elts[3].len = ctx->name.len;
}
ngx_memzero(&h, sizeof(h));
h.csid = NGX_RTMP_RELAY_CSID_AMF_INI;
h.type = NGX_RTMP_MSG_AMF_CMD;
return ngx_rtmp_send_amf(s, &h, out_elts,
sizeof(out_elts) / sizeof(out_elts[0]));
}
static ngx_int_t
ngx_rtmp_relay_send_fcpublish(ngx_rtmp_session_t *s)
{
static double trans = NGX_RTMP_RELAY_FCPUBLISH_STREAM_TRANS;
static ngx_rtmp_amf_elt_t out_elts[] = {
{ NGX_RTMP_AMF_STRING,
ngx_null_string,
"FCPublish", 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
&trans, 0 },
{ NGX_RTMP_AMF_NULL,
ngx_null_string,
NULL, 0 },
{ NGX_RTMP_AMF_STRING,
ngx_string("stream"),
NULL, 0 }
};
ngx_rtmp_header_t h;
ngx_rtmp_core_app_conf_t *cacf;
ngx_rtmp_relay_ctx_t *ctx;
cacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_core_module);
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
if (cacf == NULL || ctx == NULL) {
return NGX_ERROR;
}
if (ctx->name.len) {
out_elts[3].data = ctx->name.data;
out_elts[3].len = ctx->name.len;
}
ngx_memzero(&h, sizeof(h));
h.csid = NGX_RTMP_RELAY_CSID_AMF_INI;
h.type = NGX_RTMP_MSG_AMF_CMD;
return ngx_rtmp_send_amf(s, &h, out_elts,
sizeof(out_elts) / sizeof(out_elts[0]));
}
#endif
static ngx_int_t
ngx_rtmp_relay_send_create_stream(ngx_rtmp_session_t *s)
{
static double trans = NGX_RTMP_RELAY_CREATE_STREAM_TRANS;
static ngx_rtmp_amf_elt_t out_elts[] = {
{ NGX_RTMP_AMF_STRING,
ngx_null_string,
"createStream", 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
&trans, 0 },
{ NGX_RTMP_AMF_NULL,
ngx_null_string,
NULL, 0 }
};
ngx_rtmp_header_t h;
ngx_memzero(&h, sizeof(h));
h.csid = NGX_RTMP_RELAY_CSID_AMF_INI;
h.type = NGX_RTMP_MSG_AMF_CMD;
#if 0
return ngx_rtmp_relay_send_release_stream(s) != NGX_OK
|| ngx_rtmp_relay_send_fcpublish(s) != NGX_OK
|| ngx_rtmp_send_amf(s, &h, out_elts,
sizeof(out_elts) / sizeof(out_elts[0])) != NGX_OK
? NGX_ERROR
: NGX_OK;
#endif
return ngx_rtmp_send_amf(s, &h, out_elts,
sizeof(out_elts) / sizeof(out_elts[0]));
}
static ngx_int_t
ngx_rtmp_relay_send_publish(ngx_rtmp_session_t *s)
{
static double trans;
static ngx_rtmp_amf_elt_t out_elts[] = {
{ NGX_RTMP_AMF_STRING,
ngx_null_string,
"publish", 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
&trans, 0 },
{ NGX_RTMP_AMF_NULL,
ngx_null_string,
NULL, 0 },
{ NGX_RTMP_AMF_STRING,
ngx_null_string,
NULL, 0 }, /* <- to fill */
{ NGX_RTMP_AMF_STRING,
ngx_null_string,
"live", 0 }
};
ngx_rtmp_header_t h;
ngx_rtmp_relay_ctx_t *ctx;
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
if (ctx == NULL) {
return NGX_ERROR;
}
if (ctx->play_path.len) {
out_elts[3].data = ctx->play_path.data;
out_elts[3].len = ctx->play_path.len;
} else {
out_elts[3].data = ctx->name.data;
out_elts[3].len = ctx->name.len;
}
ngx_memzero(&h, sizeof(h));
h.csid = NGX_RTMP_RELAY_CSID_AMF;
h.msid = NGX_RTMP_RELAY_MSID;
h.type = NGX_RTMP_MSG_AMF_CMD;
return ngx_rtmp_send_amf(s, &h, out_elts,
sizeof(out_elts) / sizeof(out_elts[0]));
}
static ngx_int_t
ngx_rtmp_relay_send_play(ngx_rtmp_session_t *s)
{
static double trans;
static double start, duration;
static ngx_rtmp_amf_elt_t out_elts[] = {
{ NGX_RTMP_AMF_STRING,
ngx_null_string,
"play", 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
&trans, 0 },
{ NGX_RTMP_AMF_NULL,
ngx_null_string,
NULL, 0 },
{ NGX_RTMP_AMF_STRING,
ngx_null_string,
NULL, 0 }, /* <- fill */
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
&start, 0 },
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
&duration, 0 },
};
ngx_rtmp_header_t h;
ngx_rtmp_relay_ctx_t *ctx;
ngx_rtmp_relay_app_conf_t *racf;
racf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_relay_module);
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
if (racf == NULL || ctx == NULL) {
return NGX_ERROR;
}
if (ctx->play_path.len) {
out_elts[3].data = ctx->play_path.data;
out_elts[3].len = ctx->play_path.len;
} else {
out_elts[3].data = ctx->name.data;
out_elts[3].len = ctx->name.len;
}
if (ctx->live) {
start = -1000;
duration = -1000;
} else {
start = (ctx->start ? ctx->start : -2000);
duration = (ctx->stop ? ctx->stop - ctx->start : -1000);
}
ngx_memzero(&h, sizeof(h));
h.csid = NGX_RTMP_RELAY_CSID_AMF;
h.msid = NGX_RTMP_RELAY_MSID;
h.type = NGX_RTMP_MSG_AMF_CMD;
return ngx_rtmp_send_amf(s, &h, out_elts,
sizeof(out_elts) / sizeof(out_elts[0])) != NGX_OK
|| ngx_rtmp_send_set_buflen(s, NGX_RTMP_RELAY_MSID,
racf->buflen) != NGX_OK
? NGX_ERROR
: NGX_OK;
}
static ngx_int_t
ngx_rtmp_relay_on_result(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_chain_t *in)
{
ngx_rtmp_relay_ctx_t *ctx;
static struct {
double trans;
u_char level[32];
u_char code[128];
u_char desc[1024];
} v;
static ngx_rtmp_amf_elt_t in_inf[] = {
{ NGX_RTMP_AMF_STRING,
ngx_string("level"),
&v.level, sizeof(v.level) },
{ NGX_RTMP_AMF_STRING,
ngx_string("code"),
&v.code, sizeof(v.code) },
{ NGX_RTMP_AMF_STRING,
ngx_string("description"),
&v.desc, sizeof(v.desc) },
};
static ngx_rtmp_amf_elt_t in_elts[] = {
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
&v.trans, 0 },
{ NGX_RTMP_AMF_NULL,
ngx_null_string,
NULL, 0 },
{ NGX_RTMP_AMF_OBJECT,
ngx_null_string,
in_inf, sizeof(in_inf) },
};
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
if (ctx == NULL || !s->relay) {
return NGX_OK;
}
ngx_memzero(&v, sizeof(v));
if (ngx_rtmp_receive_amf(s, in, in_elts,
sizeof(in_elts) / sizeof(in_elts[0])))
{
return NGX_ERROR;
}
ngx_log_debug3(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"relay: _result: level='%s' code='%s' description='%s'",
v.level, v.code, v.desc);
switch ((ngx_int_t)v.trans) {
case NGX_RTMP_RELAY_CONNECT_TRANS:
return ngx_rtmp_relay_send_create_stream(s);
case NGX_RTMP_RELAY_CREATE_STREAM_TRANS:
if (ctx->publish != ctx && !s->static_relay) {
if (ngx_rtmp_relay_send_publish(s) != NGX_OK) {
return NGX_ERROR;
}
return ngx_rtmp_relay_play_local(s);
} else {
if (ngx_rtmp_relay_send_play(s) != NGX_OK) {
return NGX_ERROR;
}
return ngx_rtmp_relay_publish_local(s);
}
default:
return NGX_OK;
}
}
static ngx_int_t
ngx_rtmp_relay_on_error(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_chain_t *in)
{
ngx_rtmp_relay_ctx_t *ctx;
static struct {
double trans;
u_char level[32];
u_char code[128];
u_char desc[1024];
} v;
static ngx_rtmp_amf_elt_t in_inf[] = {
{ NGX_RTMP_AMF_STRING,
ngx_string("level"),
&v.level, sizeof(v.level) },
{ NGX_RTMP_AMF_STRING,
ngx_string("code"),
&v.code, sizeof(v.code) },
{ NGX_RTMP_AMF_STRING,
ngx_string("description"),
&v.desc, sizeof(v.desc) },
};
static ngx_rtmp_amf_elt_t in_elts[] = {
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
&v.trans, 0 },
{ NGX_RTMP_AMF_NULL,
ngx_null_string,
NULL, 0 },
{ NGX_RTMP_AMF_OBJECT,
ngx_null_string,
in_inf, sizeof(in_inf) },
};
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
if (ctx == NULL || !s->relay) {
return NGX_OK;
}
ngx_memzero(&v, sizeof(v));
if (ngx_rtmp_receive_amf(s, in, in_elts,
sizeof(in_elts) / sizeof(in_elts[0])))
{
return NGX_ERROR;
}
ngx_log_debug3(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"relay: _error: level='%s' code='%s' description='%s'",
v.level, v.code, v.desc);
return NGX_OK;
}
static ngx_int_t
ngx_rtmp_relay_on_status(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_chain_t *in)
{
ngx_rtmp_relay_ctx_t *ctx;
static struct {
double trans;
u_char level[32];
u_char code[128];
u_char desc[1024];
} v;
static ngx_rtmp_amf_elt_t in_inf[] = {
{ NGX_RTMP_AMF_STRING,
ngx_string("level"),
&v.level, sizeof(v.level) },
{ NGX_RTMP_AMF_STRING,
ngx_string("code"),
&v.code, sizeof(v.code) },
{ NGX_RTMP_AMF_STRING,
ngx_string("description"),
&v.desc, sizeof(v.desc) },
};
static ngx_rtmp_amf_elt_t in_elts[] = {
{ NGX_RTMP_AMF_NUMBER,
ngx_null_string,
&v.trans, 0 },
{ NGX_RTMP_AMF_NULL,
ngx_null_string,
NULL, 0 },
{ NGX_RTMP_AMF_OBJECT,
ngx_null_string,
in_inf, sizeof(in_inf) },
};
static ngx_rtmp_amf_elt_t in_elts_meta[] = {
{ NGX_RTMP_AMF_OBJECT,
ngx_null_string,
in_inf, sizeof(in_inf) },
};
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
if (ctx == NULL || !s->relay) {
return NGX_OK;
}
ngx_memzero(&v, sizeof(v));
if (h->type == NGX_RTMP_MSG_AMF_META) {
ngx_rtmp_receive_amf(s, in, in_elts_meta,
sizeof(in_elts_meta) / sizeof(in_elts_meta[0]));
} else {
ngx_rtmp_receive_amf(s, in, in_elts,
sizeof(in_elts) / sizeof(in_elts[0]));
}
ngx_log_debug3(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"relay: onStatus: level='%s' code='%s' description='%s'",
v.level, v.code, v.desc);
return NGX_OK;
}
static ngx_int_t
ngx_rtmp_relay_handshake_done(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_chain_t *in)
{
ngx_rtmp_relay_ctx_t *ctx;
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
if (ctx == NULL || !s->relay) {
return NGX_OK;
}
return ngx_rtmp_relay_send_connect(s);
}
static void
ngx_rtmp_relay_close(ngx_rtmp_session_t *s)
{
ngx_rtmp_relay_app_conf_t *racf;
ngx_rtmp_relay_ctx_t *ctx, **cctx, **next;
ngx_uint_t hash;
racf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_relay_module);
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
if (ctx == NULL) {
return;
}
if (s->static_relay) {
ngx_add_timer(ctx->static_evt, racf->pull_reconnect);
}
if (ctx->publish == NULL) {
return;
}
/* play end disconnect? */
if (ctx->publish != ctx) {
for (cctx = &ctx->publish->play; *cctx; cctx = &(*cctx)->next) {
if (*cctx == ctx) {
*cctx = ctx->next;
break;
}
}
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, ctx->session->connection->log, 0,
"relay: play disconnect app='%V' name='%V'",
&ctx->app, &ctx->name);
/* push reconnect */
if (s->relay && ctx->tag == &ngx_rtmp_relay_module &&
!ctx->publish->push_evt.timer_set)
{
ngx_add_timer(&ctx->publish->push_evt, racf->push_reconnect);
}
#ifdef NGX_DEBUG
{
ngx_uint_t n = 0;
for (cctx = &ctx->publish->play; *cctx; cctx = &(*cctx)->next, ++n);
ngx_log_debug3(NGX_LOG_DEBUG_RTMP, ctx->session->connection->log, 0,
"relay: play left after disconnect app='%V' name='%V': %ui",
&ctx->app, &ctx->name, n);
}
#endif
if (ctx->publish->play == NULL && ctx->publish->session->relay) {
ngx_log_debug2(NGX_LOG_DEBUG_RTMP,
ctx->publish->session->connection->log, 0,
"relay: publish disconnect empty app='%V' name='%V'",
&ctx->app, &ctx->name);
ngx_rtmp_finalize_session(ctx->publish->session);
}
ctx->publish = NULL;
return;
}
/* publish end disconnect */
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, ctx->session->connection->log, 0,
"relay: publish disconnect app='%V' name='%V'",
&ctx->app, &ctx->name);
if (ctx->push_evt.timer_set) {
ngx_del_timer(&ctx->push_evt);
}
for (cctx = &ctx->play; *cctx; /* cctx = &(*cctx)->next */) {
(*cctx)->publish = NULL;
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, (*cctx)->session->connection->log,
0, "relay: play disconnect orphan app='%V' name='%V'",
&(*cctx)->app, &(*cctx)->name);
next = &(*cctx)->next;
ngx_rtmp_finalize_session((*cctx)->session);
cctx = next;
}
ctx->publish = NULL;
hash = ngx_hash_key(ctx->name.data, ctx->name.len);
cctx = &racf->ctx[hash % racf->nbuckets];
for (; *cctx && *cctx != ctx; cctx = &(*cctx)->next);
if (*cctx) {
*cctx = ctx->next;
}
}
static ngx_int_t
ngx_rtmp_relay_close_stream(ngx_rtmp_session_t *s, ngx_rtmp_close_stream_t *v)
{
ngx_rtmp_relay_app_conf_t *racf;
racf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_relay_module);
if (racf && !racf->session_relay) {
ngx_rtmp_relay_close(s);
}
return next_close_stream(s, v);
}
static ngx_int_t
ngx_rtmp_relay_delete_stream(ngx_rtmp_session_t *s, ngx_rtmp_delete_stream_t *v)
{
ngx_rtmp_relay_close(s);
return next_delete_stream(s, v);
}
static char *
ngx_rtmp_relay_push_pull(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
ngx_str_t *value, v, n;
ngx_rtmp_relay_app_conf_t *racf;
ngx_rtmp_relay_target_t *target, **t;
ngx_url_t *u;
ngx_uint_t i;
ngx_int_t is_pull, is_static;
ngx_event_t **ee, *e;
ngx_rtmp_relay_static_t *rs;
u_char *p;
value = cf->args->elts;
racf = ngx_rtmp_conf_get_module_app_conf(cf, ngx_rtmp_relay_module);
is_pull = (value[0].data[3] == 'l');
is_static = 0;
target = ngx_pcalloc(cf->pool, sizeof(*target));
if (target == NULL) {
return NGX_CONF_ERROR;
}
target->tag = &ngx_rtmp_relay_module;
target->data = target;
u = &target->url;
u->default_port = 1935;
u->uri_part = 1;
u->url = value[1];
if (ngx_strncasecmp(u->url.data, (u_char *) "rtmp://", 7) == 0) {
u->url.data += 7;
u->url.len -= 7;
}
if (ngx_parse_url(cf->pool, u) != NGX_OK) {
if (u->err) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"%s in url \"%V\"", u->err, &u->url);
}
return NGX_CONF_ERROR;
}
value += 2;
for (i = 2; i < cf->args->nelts; ++i, ++value) {
p = ngx_strlchr(value->data, value->data + value->len, '=');
if (p == NULL) {
n = *value;
ngx_str_set(&v, "1");
} else {
n.data = value->data;
n.len = p - value->data;
v.data = p + 1;
v.len = value->data + value->len - p - 1;
}
#define NGX_RTMP_RELAY_STR_PAR(name, var) \
if (n.len == sizeof(name) - 1 \
&& ngx_strncasecmp(n.data, (u_char *) name, n.len) == 0) \
{ \
target->var = v; \
continue; \
}
#define NGX_RTMP_RELAY_NUM_PAR(name, var) \
if (n.len == sizeof(name) - 1 \
&& ngx_strncasecmp(n.data, (u_char *) name, n.len) == 0) \
{ \
target->var = ngx_atoi(v.data, v.len); \
continue; \
}
NGX_RTMP_RELAY_STR_PAR("app", app);
NGX_RTMP_RELAY_STR_PAR("name", name);
NGX_RTMP_RELAY_STR_PAR("tcUrl", tc_url);
NGX_RTMP_RELAY_STR_PAR("pageUrl", page_url);
NGX_RTMP_RELAY_STR_PAR("swfUrl", swf_url);
NGX_RTMP_RELAY_STR_PAR("flashVer", flash_ver);
NGX_RTMP_RELAY_STR_PAR("playPath", play_path);
NGX_RTMP_RELAY_NUM_PAR("live", live);
NGX_RTMP_RELAY_NUM_PAR("start", start);
NGX_RTMP_RELAY_NUM_PAR("stop", stop);
#undef NGX_RTMP_RELAY_STR_PAR
#undef NGX_RTMP_RELAY_NUM_PAR
if (n.len == sizeof("static") - 1 &&
ngx_strncasecmp(n.data, (u_char *) "static", n.len) == 0 &&
ngx_atoi(v.data, v.len))
{
is_static = 1;
continue;
}
return "unsuppored parameter";
}
if (is_static) {
if (!is_pull) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"static push is not allowed");
return NGX_CONF_ERROR;
}
if (ngx_strlchr(target->url.url.data,
target->url.url.data + target->url.url.len, '$'))
{
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"variable is not allowed");
return NGX_CONF_ERROR;
}
if (target->name.len == 0) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"stream name missing in static pull "
"declaration");
return NGX_CONF_ERROR;
}
ee = ngx_array_push(&racf->static_events);
if (ee == NULL) {
return NGX_CONF_ERROR;
}
e = ngx_pcalloc(cf->pool, sizeof(ngx_event_t));
if (e == NULL) {
return NGX_CONF_ERROR;
}
*ee = e;
rs = ngx_pcalloc(cf->pool, sizeof(ngx_rtmp_relay_static_t));
if (rs == NULL) {
return NGX_CONF_ERROR;
}
rs->target = target;
e->data = rs;
e->log = &cf->cycle->new_log;
e->handler = ngx_rtmp_relay_static_pull_reconnect;
t = ngx_array_push(&racf->static_pulls);
} else if (is_pull) {
t = ngx_array_push(&racf->pulls);
} else {
t = ngx_array_push(&racf->pushes);
}
if (t == NULL) {
return NGX_CONF_ERROR;
}
*t = target;
return NGX_CONF_OK;
}
static ngx_int_t
ngx_rtmp_relay_init_process(ngx_cycle_t *cycle)
{
#if !(NGX_WIN32)
ngx_rtmp_core_main_conf_t *cmcf = ngx_rtmp_core_main_conf;
ngx_rtmp_core_srv_conf_t **pcscf, *cscf;
ngx_rtmp_core_app_conf_t **pcacf, *cacf;
ngx_rtmp_relay_app_conf_t *racf;
ngx_uint_t n, m, k;
ngx_rtmp_relay_static_t *rs;
ngx_event_t **pevent, *event;
if (cmcf == NULL || cmcf->servers.nelts == 0) {
return NGX_OK;
}
/* only first worker does static pulling */
if (ngx_process_slot) {
return NGX_OK;
}
pcscf = cmcf->servers.elts;
for (n = 0; n < cmcf->servers.nelts; ++n, ++pcscf) {
cscf = *pcscf;
pcacf = cscf->applications.elts;
for (m = 0; m < cscf->applications.nelts; ++m, ++pcacf) {
cacf = *pcacf;
racf = cacf->app_conf[ngx_rtmp_relay_module.ctx_index];
pevent = racf->static_events.elts;
for (k = 0; k < racf->static_events.nelts; ++k, ++pevent) {
event = *pevent;
rs = event->data;
rs->cctx = *cscf->ctx;
rs->cctx.app_conf = cacf->app_conf;
ngx_post_event(event, &ngx_rtmp_init_queue);
}
}
}
#endif
return NGX_OK;
}
static ngx_int_t
ngx_rtmp_relay_postconfiguration(ngx_conf_t *cf)
{
ngx_rtmp_core_main_conf_t *cmcf;
ngx_rtmp_handler_pt *h;
ngx_rtmp_amf_handler_t *ch;
cmcf = ngx_rtmp_conf_get_module_main_conf(cf, ngx_rtmp_core_module);
h = ngx_array_push(&cmcf->events[NGX_RTMP_HANDSHAKE_DONE]);
*h = ngx_rtmp_relay_handshake_done;
next_publish = ngx_rtmp_publish;
ngx_rtmp_publish = ngx_rtmp_relay_publish;
next_play = ngx_rtmp_play;
ngx_rtmp_play = ngx_rtmp_relay_play;
next_delete_stream = ngx_rtmp_delete_stream;
ngx_rtmp_delete_stream = ngx_rtmp_relay_delete_stream;
next_close_stream = ngx_rtmp_close_stream;
ngx_rtmp_close_stream = ngx_rtmp_relay_close_stream;
ch = ngx_array_push(&cmcf->amf);
ngx_str_set(&ch->name, "_result");
ch->handler = ngx_rtmp_relay_on_result;
ch = ngx_array_push(&cmcf->amf);
ngx_str_set(&ch->name, "_error");
ch->handler = ngx_rtmp_relay_on_error;
ch = ngx_array_push(&cmcf->amf);
ngx_str_set(&ch->name, "onStatus");
ch->handler = ngx_rtmp_relay_on_status;
return NGX_OK;
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
C
1
https://gitee.com/winshining/nginx-http-flv-module.git
git@gitee.com:winshining/nginx-http-flv-module.git
winshining
nginx-http-flv-module
nginx-http-flv-module
master

搜索帮助