Fetch the repository succeeded.
This action will force synchronization from winshining/nginx-http-flv-module, which will overwrite any changes that you have made since you forked the repository, and can not be recovered!!!
Synchronous operation will process in the background and will refresh the page when finishing processing. Please be patient.
/*
* Copyright (C) Roman Arutyunyan
*/
#include <ngx_config.h>
#include <ngx_core.h>
#include "ngx_rtmp_netcall_module.h"
static ngx_int_t ngx_rtmp_netcall_postconfiguration(ngx_conf_t *cf);
static void * ngx_rtmp_netcall_create_srv_conf(ngx_conf_t *cf);
static char * ngx_rtmp_netcall_merge_srv_conf(ngx_conf_t *cf,
void *parent, void *child);
static void ngx_rtmp_netcall_close(ngx_connection_t *cc);
static void ngx_rtmp_netcall_detach(ngx_connection_t *cc);
static void ngx_rtmp_netcall_recv(ngx_event_t *rev);
static void ngx_rtmp_netcall_send(ngx_event_t *wev);
typedef struct {
ngx_msec_t timeout;
size_t bufsize;
ngx_log_t *log;
} ngx_rtmp_netcall_srv_conf_t;
typedef struct ngx_rtmp_netcall_session_s {
ngx_rtmp_session_t *session;
ngx_peer_connection_t *pc;
ngx_url_t *url;
struct ngx_rtmp_netcall_session_s *next;
void *arg;
ngx_rtmp_netcall_handle_pt handle;
ngx_rtmp_netcall_filter_pt filter;
ngx_rtmp_netcall_sink_pt sink;
ngx_chain_t *in;
ngx_chain_t *inlast;
ngx_chain_t *out;
ngx_msec_t timeout;
unsigned detached:1;
size_t bufsize;
} ngx_rtmp_netcall_session_t;
typedef struct {
ngx_uint_t nb_cs;
ngx_rtmp_netcall_session_t *cs;
} ngx_rtmp_netcall_ctx_t;
static ngx_command_t ngx_rtmp_netcall_commands[] = {
{ ngx_string("netcall_timeout"),
NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_CONF_TAKE1,
ngx_conf_set_msec_slot,
NGX_RTMP_SRV_CONF_OFFSET,
offsetof(ngx_rtmp_netcall_srv_conf_t, timeout),
NULL },
{ ngx_string("netcall_buffer"),
NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_CONF_TAKE1,
ngx_conf_set_size_slot,
NGX_RTMP_SRV_CONF_OFFSET,
offsetof(ngx_rtmp_netcall_srv_conf_t, bufsize),
NULL },
ngx_null_command
};
static ngx_rtmp_module_t ngx_rtmp_netcall_module_ctx = {
NULL, /* preconfiguration */
ngx_rtmp_netcall_postconfiguration, /* postconfiguration */
NULL, /* create main configuration */
NULL, /* init main configuration */
ngx_rtmp_netcall_create_srv_conf, /* create server configuration */
ngx_rtmp_netcall_merge_srv_conf, /* merge server configuration */
NULL, /* create app configuration */
NULL /* merge app configuration */
};
ngx_module_t ngx_rtmp_netcall_module = {
NGX_MODULE_V1,
&ngx_rtmp_netcall_module_ctx, /* module context */
ngx_rtmp_netcall_commands, /* module directives */
NGX_RTMP_MODULE, /* module type */
NULL, /* init master */
NULL, /* init module */
NULL, /* init process */
NULL, /* init thread */
NULL, /* exit thread */
NULL, /* exit process */
NULL, /* exit master */
NGX_MODULE_V1_PADDING
};
static void *
ngx_rtmp_netcall_create_srv_conf(ngx_conf_t *cf)
{
ngx_rtmp_netcall_srv_conf_t *nscf;
nscf = ngx_pcalloc(cf->pool, sizeof(ngx_rtmp_netcall_srv_conf_t));
if (nscf == NULL) {
return NULL;
}
nscf->timeout = NGX_CONF_UNSET_MSEC;
nscf->bufsize = NGX_CONF_UNSET_SIZE;
nscf->log = &cf->cycle->new_log;
return nscf;
}
static char *
ngx_rtmp_netcall_merge_srv_conf(ngx_conf_t *cf, void *parent, void *child)
{
ngx_rtmp_netcall_srv_conf_t *prev = parent;
ngx_rtmp_netcall_srv_conf_t *conf = child;
ngx_conf_merge_msec_value(conf->timeout, prev->timeout, 10000);
ngx_conf_merge_size_value(conf->bufsize, prev->bufsize, 1024);
return NGX_CONF_OK;
}
static ngx_int_t
ngx_rtmp_netcall_disconnect(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_chain_t *in)
{
ngx_rtmp_netcall_ctx_t *ctx;
ngx_rtmp_netcall_session_t *cs;
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_netcall_module);
if (ctx) {
for (cs = ctx->cs; cs; cs = cs->next) {
ngx_rtmp_netcall_detach(cs->pc->connection);
}
}
return NGX_OK;
}
static ngx_int_t
ngx_rtmp_netcall_get_peer(ngx_peer_connection_t *pc, void *data)
{
ngx_rtmp_netcall_session_t *cs = data;
pc->sockaddr =(struct sockaddr *)&cs->url->sockaddr;
pc->socklen = cs->url->socklen;
pc->name = &cs->url->host;
return NGX_OK;
}
static void
ngx_rtmp_netcall_free_peer(ngx_peer_connection_t *pc, void *data,
ngx_uint_t state)
{
}
ngx_int_t
ngx_rtmp_netcall_create(ngx_rtmp_session_t *s, ngx_rtmp_netcall_init_t *ci)
{
ngx_rtmp_netcall_ctx_t *ctx;
ngx_peer_connection_t *pc;
ngx_rtmp_netcall_session_t *cs;
ngx_rtmp_netcall_srv_conf_t *nscf;
ngx_connection_t *c, *cc;
ngx_pool_t *pool;
ngx_int_t rc;
pool = NULL;
c = s->connection;
nscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_netcall_module);
if (nscf == NULL) {
goto error;
}
/* get module context */
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_netcall_module);
if (ctx == NULL) {
ctx = ngx_pcalloc(c->pool,
sizeof(ngx_rtmp_netcall_ctx_t));
if (ctx == NULL) {
return NGX_ERROR;
}
ngx_rtmp_set_ctx(s, ctx, ngx_rtmp_netcall_module);
} else {
/* I don't know why? But it works! */
if (ctx->nb_cs == 0) {
ctx->cs = NULL;
}
}
/* Create netcall pool, connection, session.
* Note we use shared (app-wide) log because
* s->connection->log might be unavailable
* in detached netcall when it's being closed */
pool = ngx_create_pool(4096, nscf->log);
if (pool == NULL) {
goto error;
}
pc = ngx_pcalloc(pool, sizeof(ngx_peer_connection_t));
if (pc == NULL) {
goto error;
}
cs = ngx_pcalloc(pool, sizeof(ngx_rtmp_netcall_session_t));
if (cs == NULL) {
goto error;
}
/* copy arg to connection pool */
if (ci->argsize) {
cs->arg = ngx_pcalloc(pool, ci->argsize);
if (cs->arg == NULL) {
goto error;
}
ngx_memcpy(cs->arg, ci->arg, ci->argsize);
}
cs->timeout = nscf->timeout;
cs->bufsize = nscf->bufsize;
cs->url = ci->url;
cs->session = s;
cs->filter = ci->filter;
cs->sink = ci->sink;
cs->handle = ci->handle;
if (cs->handle == NULL) {
cs->detached = 1;
}
pc->log = nscf->log;
pc->get = ngx_rtmp_netcall_get_peer;
pc->free = ngx_rtmp_netcall_free_peer;
pc->data = cs;
/* connect */
rc = ngx_event_connect_peer(pc);
if (rc != NGX_OK && rc != NGX_AGAIN ) {
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"netcall: connection failed");
goto error;
}
cc = pc->connection;
cc->data = cs;
cc->pool = pool;
cs->pc = pc;
cs->out = ci->create(s, ci->arg, pool);
if (cs->out == NULL) {
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"netcall: creation failed");
ngx_close_connection(pc->connection);
goto error;
}
cc->write->handler = ngx_rtmp_netcall_send;
cc->read->handler = ngx_rtmp_netcall_recv;
if (!cs->detached) {
cs->next = ctx->cs;
ctx->cs = cs;
ctx->nb_cs++;
}
ngx_rtmp_netcall_send(cc->write);
return c->destroyed ? NGX_ERROR : NGX_OK;
error:
if (pool) {
ngx_destroy_pool(pool);
}
return NGX_ERROR;
}
static void
ngx_rtmp_netcall_close(ngx_connection_t *cc)
{
ngx_rtmp_netcall_session_t *cs, **css;
ngx_pool_t *pool;
ngx_rtmp_session_t *s;
ngx_rtmp_netcall_ctx_t *ctx;
ngx_buf_t *b;
cs = cc->data;
if (cc->destroyed) {
return;
}
cc->destroyed = 1;
if (!cs->detached) {
s = cs->session;
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_netcall_module);
if (cs->in && cs->sink) {
cs->sink(cs->session, cs->in);
b = cs->in->buf;
b->pos = b->last = b->start;
}
for(css = &ctx->cs; *css; css = &((*css)->next)) {
if (*css == cs) {
*css = cs->next;
ctx->nb_cs--;
break;
}
}
if (cs->handle && cs->handle(s, cs->arg, cs->in) != NGX_OK) {
ngx_rtmp_finalize_session(s);
}
}
pool = cc->pool;
ngx_close_connection(cc);
ngx_destroy_pool(pool);
}
static void
ngx_rtmp_netcall_detach(ngx_connection_t *cc)
{
ngx_rtmp_netcall_session_t *cs;
cs = cc->data;
cs->detached = 1;
}
static void
ngx_rtmp_netcall_recv(ngx_event_t *rev)
{
ngx_rtmp_netcall_session_t *cs;
ngx_connection_t *cc;
ngx_chain_t *cl;
ngx_int_t n;
ngx_buf_t *b;
cc = rev->data;
cs = cc->data;
if (cc->destroyed) {
return;
}
if (rev->timedout) {
cc->timedout = 1;
ngx_rtmp_netcall_close(cc);
return;
}
if (rev->timer_set) {
ngx_del_timer(rev);
}
for ( ;; ) {
if (cs->inlast == NULL ||
cs->inlast->buf->last == cs->inlast->buf->end)
{
if (cs->in && cs->sink) {
if (!cs->detached) {
if (cs->sink(cs->session, cs->in) != NGX_OK) {
ngx_rtmp_netcall_close(cc);
return;
}
}
b = cs->in->buf;
b->pos = b->last = b->start;
} else {
cl = ngx_alloc_chain_link(cc->pool);
if (cl == NULL) {
ngx_rtmp_netcall_close(cc);
return;
}
cl->next = NULL;
cl->buf = ngx_create_temp_buf(cc->pool, cs->bufsize);
if (cl->buf == NULL) {
ngx_rtmp_netcall_close(cc);
return;
}
if (cs->in == NULL) {
cs->in = cl;
} else {
cs->inlast->next = cl;
}
cs->inlast = cl;
}
}
b = cs->inlast->buf;
n = cc->recv(cc, b->last, b->end - b->last);
if (n == NGX_ERROR || n == 0) {
ngx_rtmp_netcall_close(cc);
return;
}
if (n == NGX_AGAIN) {
if (cs->filter && cs->in
&& cs->filter(cs->in) != NGX_AGAIN)
{
ngx_rtmp_netcall_close(cc);
return;
}
ngx_add_timer(rev, cs->timeout);
if (ngx_handle_read_event(rev, 0) != NGX_OK) {
ngx_rtmp_netcall_close(cc);
}
return;
}
b->last += n;
}
}
static void
ngx_rtmp_netcall_send(ngx_event_t *wev)
{
ngx_rtmp_netcall_session_t *cs;
ngx_connection_t *cc;
ngx_chain_t *cl;
cc = wev->data;
cs = cc->data;
if (cc->destroyed) {
return;
}
if (wev->timedout) {
ngx_log_error(NGX_LOG_INFO, cc->log, NGX_ETIMEDOUT,
"netcall: client send timed out");
cc->timedout = 1;
ngx_rtmp_netcall_close(cc);
return;
}
if (wev->timer_set) {
ngx_del_timer(wev);
}
cl = cc->send_chain(cc, cs->out, 0);
if (cl == NGX_CHAIN_ERROR) {
ngx_rtmp_netcall_close(cc);
return;
}
cs->out = cl;
/* more data to send? */
if (cl) {
ngx_add_timer(wev, cs->timeout);
if (ngx_handle_write_event(wev, 0) != NGX_OK) {
ngx_rtmp_netcall_close(cc);
}
return;
}
/* we've sent everything we had.
* now receive reply */
ngx_del_event(wev, NGX_WRITE_EVENT, 0);
ngx_rtmp_netcall_recv(cc->read);
}
ngx_chain_t *
ngx_rtmp_netcall_http_format_request(ngx_int_t method, ngx_str_t *host,
ngx_str_t *uri, ngx_chain_t *args,
ngx_chain_t *body, ngx_pool_t *pool,
ngx_str_t *content_type)
{
ngx_chain_t *al, *bl, *ret;
ngx_buf_t *b;
size_t content_length;
static const char *methods[2] = { "GET", "POST" };
static const char rq_tmpl[] = " HTTP/1.0\r\n"
"Host: %V\r\n"
"Content-Type: %V\r\n"
"Connection: Close\r\n"
"Content-Length: %uz\r\n"
"\r\n";
content_length = 0;
for (al = body; al; al = al->next) {
b = al->buf;
content_length += (b->last - b->pos);
}
/* create first buffer */
al = ngx_alloc_chain_link(pool);
if (al == NULL) {
return NULL;
}
b = ngx_create_temp_buf(pool, sizeof("POST") + /* longest method + 1 */
uri->len);
if (b == NULL) {
return NULL;
}
b->last = ngx_snprintf(b->last, b->end - b->last, "%s %V",
methods[method], uri);
al->buf = b;
ret = al;
if (args) {
*b->last++ = '?';
al->next = args;
for (al = args; al->next; al = al->next);
}
/* create second buffer */
bl = ngx_alloc_chain_link(pool);
if (bl == NULL) {
return NULL;
}
b = ngx_create_temp_buf(pool, sizeof(rq_tmpl) + host->len +
content_type->len + NGX_SIZE_T_LEN);
if (b == NULL) {
return NULL;
}
bl->buf = b;
b->last = ngx_snprintf(b->last, b->end - b->last, rq_tmpl,
host, content_type, content_length);
al->next = bl;
bl->next = body;
return ret;
}
ngx_chain_t *
ngx_rtmp_netcall_http_format_session(ngx_rtmp_session_t *s, ngx_pool_t *pool)
{
ngx_chain_t *cl;
ngx_buf_t *b;
ngx_str_t *addr_text;
addr_text = &s->connection->addr_text;
cl = ngx_alloc_chain_link(pool);
if (cl == NULL) {
return NULL;
}
b = ngx_create_temp_buf(pool,
sizeof("app=") - 1 + s->app.len * 3 +
sizeof("&flashver=") - 1 + s->flashver.len * 3 +
sizeof("&swfurl=") - 1 + s->swf_url.len * 3 +
sizeof("&tcurl=") - 1 + s->tc_url.len * 3 +
sizeof("&pageurl=") - 1 + s->page_url.len * 3 +
sizeof("&addr=") - 1 + addr_text->len * 3 +
sizeof("&clientid=") - 1 + NGX_INT_T_LEN
);
if (b == NULL) {
return NULL;
}
cl->buf = b;
cl->next = NULL;
b->last = ngx_cpymem(b->last, (u_char*) "app=", sizeof("app=") - 1);
b->last = (u_char*) ngx_escape_uri(b->last, s->app.data, s->app.len,
NGX_ESCAPE_ARGS);
b->last = ngx_cpymem(b->last, (u_char*) "&flashver=",
sizeof("&flashver=") - 1);
b->last = (u_char*) ngx_escape_uri(b->last, s->flashver.data,
s->flashver.len, NGX_ESCAPE_ARGS);
b->last = ngx_cpymem(b->last, (u_char*) "&swfurl=",
sizeof("&swfurl=") - 1);
b->last = (u_char*) ngx_escape_uri(b->last, s->swf_url.data,
s->swf_url.len, NGX_ESCAPE_ARGS);
b->last = ngx_cpymem(b->last, (u_char*) "&tcurl=",
sizeof("&tcurl=") - 1);
b->last = (u_char*) ngx_escape_uri(b->last, s->tc_url.data,
s->tc_url.len, NGX_ESCAPE_ARGS);
b->last = ngx_cpymem(b->last, (u_char*) "&pageurl=",
sizeof("&pageurl=") - 1);
b->last = (u_char*) ngx_escape_uri(b->last, s->page_url.data,
s->page_url.len, NGX_ESCAPE_ARGS);
b->last = ngx_cpymem(b->last, (u_char*) "&addr=", sizeof("&addr=") - 1);
b->last = (u_char*) ngx_escape_uri(b->last, addr_text->data,
addr_text->len, NGX_ESCAPE_ARGS);
b->last = ngx_cpymem(b->last, (u_char*) "&clientid=",
sizeof("&clientid=") - 1);
b->last = ngx_sprintf(b->last, "%ui", (ngx_uint_t) s->connection->number);
return cl;
}
ngx_chain_t *
ngx_rtmp_netcall_http_skip_header(ngx_chain_t *in)
{
ngx_buf_t *b;
/* find \n[\r]\n */
enum {
normal,
lf,
lfcr
} state = normal;
if (in == NULL) {
return NULL;
}
b = in->buf;
for ( ;; ) {
while (b->pos == b->last) {
in = in->next;
if (in == NULL) {
return NULL;
}
b = in->buf;
}
switch (*b->pos++) {
case '\r':
state = (state == lf) ? lfcr : normal;
break;
case '\n':
if (state != normal) {
return in;
}
state = lf;
break;
default:
state = normal;
}
}
}
ngx_chain_t *
ngx_rtmp_netcall_memcache_set(ngx_rtmp_session_t *s, ngx_pool_t *pool,
ngx_str_t *key, ngx_str_t *value, ngx_uint_t flags, ngx_uint_t sec)
{
ngx_chain_t *cl;
ngx_buf_t *b;
cl = ngx_alloc_chain_link(pool);
if (cl == NULL) {
return NULL;
}
b = ngx_create_temp_buf(pool, sizeof("set ") - 1 + key->len +
(1 + NGX_INT_T_LEN) * 3 +
(sizeof("\r\n") - 1) * 2 + value->len);
if (b == NULL) {
return NULL;
}
cl->next = NULL;
cl->buf = b;
b->last = ngx_sprintf(b->pos, "set %V %ui %ui %ui\r\n%V\r\n",
key, flags, sec, (ngx_uint_t) value->len, value);
return cl;
}
static ngx_int_t
ngx_rtmp_netcall_postconfiguration(ngx_conf_t *cf)
{
ngx_rtmp_core_main_conf_t *cmcf;
ngx_rtmp_handler_pt *h;
cmcf = ngx_rtmp_conf_get_module_main_conf(cf, ngx_rtmp_core_module);
h = ngx_array_push(&cmcf->events[NGX_RTMP_DISCONNECT]);
*h = ngx_rtmp_netcall_disconnect;
return NGX_OK;
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。