SRS流媒体服务器之HTTP-FLV框架分析( 四 )


SRS流媒体服务器之HTTP-FLV框架分析

文章插图
 
这个http流程与前面分析的RTMP流程是类似:
(1)st_thread_create,在sched.c:616行 。
(2)_st_thread_main,在sched.c:337行 。
(3)函数SrsSTCoroutine::pfn,在
srs/app/srs_app_st.cpp:213行 。
(4)函数SrsSTCoroutine::cycle,在
srs/app/srs_app_st.cpp:198行 。
(5)函数SrsTcpListener::cycle,在
srs/app/srs_app_listener.cpp:202行 。
(6)函数
SrsBufferListener::on_tcp_client,在srs/app/srs_app_server.cpp:167行 。
(7)函数SrsServer::accept_client,类型是SrsListenerHttpStream,在
src/app/srs_app_server.cpp:1400行 。
(8)函数SrsServer::fd2conn,类型是SrsListenerHttpStream,在
src/app/srs_app_server.cpp:1465行 。
 
不同的客户端都是可以进来do_serve_http,当拉流客户端要拉取http数据时,包含真正的音视频数据,从这里可以分析,源码如下:
srs_error_t SrsLiveStream::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r){srs_error_t err = srs_success;string enc_desc;ISrsBufferEncoder* enc = NULL;srs_assert(entry);if (srs_string_ends_with(entry->pattern, ".flv")) {w->header()->set_content_type("video/x-flv");enc_desc = "FLV";enc = new SrsFlvStreamEncoder();} else if (srs_string_ends_with(entry->pattern, ".aac")) {w->header()->set_content_type("audio/x-aac");enc_desc = "AAC";enc = new SrsAacStreamEncoder();} else if (srs_string_ends_with(entry->pattern, ".mp3")) {w->header()->set_content_type("audio/mpeg");enc_desc = "MP3";enc = new SrsMp3StreamEncoder();} else if (srs_string_ends_with(entry->pattern, ".ts")) {w->header()->set_content_type("video/MP2T");enc_desc = "TS";enc = new SrsTsStreamEncoder();} else {return srs_error_new(ERROR_HTTP_LIVE_STREAM_EXT, "invalid pattern=%s", entry->pattern.c_str());}SrsAutoFree(ISrsBufferEncoder, enc);// Enter chunked mode, because we didn't set the content-length.w->write_header(SRS_CONSTS_HTTP_OK);// create consumer of souce, ignore gop cache, use the audio gop cache.SrsConsumer* consumer = NULL;if ((err = source->create_consumer(NULL, consumer, true, true, !enc->has_cache())) != srs_success) {return srs_error_wrap(err, "create consumer");}SrsAutoFree(SrsConsumer, consumer);srs_verbose("http: consumer created success.");SrsPithyPrint* pprint = SrsPithyPrint::create_http_stream();SrsAutoFree(SrsPithyPrint, pprint);SrsMessageArray msgs(SRS_PERF_MW_MSGS);// Use receive thread to accept the close event to avoid FD leak.// @see https://github.com/ossrs/srs/issues/636#issuecomment-298208427SrsHttpMessage* hr = dynamic_cast<SrsHttpMessage*>(r);SrsResponseonlyHttpConn* hc = dynamic_cast<SrsResponseOnlyHttpConn*>(hr->connection());// update the statistic when source disconveried.SrsStatistic* stat = SrsStatistic::instance();if ((err = stat->on_client(_srs_context->get_id(), req, hc, SrsRtmpConnPlay)) != srs_success) {return srs_error_wrap(err, "stat on client");}// the memory writer.SrsBufferWriter writer(w);if ((err = enc->initialize(&writer, cache)) != srs_success) {return srs_error_wrap(err, "init encoder");}// if gop cache enabled for encoder, dump to consumer.if (enc->has_cache()) {if ((err = enc->dump_cache(consumer, source->jitter())) != srs_success) {return srs_error_wrap(err, "encoder dump cache");}}SrsFlvStreamEncoder* ffe = dynamic_cast<SrsFlvStreamEncoder*>(enc);// Set the socket options for transport.bool tcp_nodelay = _srs_config->get_tcp_nodelay(req->vhost);if (tcp_nodelay) {if ((err = hc->set_tcp_nodelay(tcp_nodelay)) != srs_success) {return srs_error_wrap(err, "set tcp nodelay");}}srs_utime_t mw_sleep = _srs_config->get_mw_sleep(req->vhost);if ((err = hc->set_socket_buffer(mw_sleep)) != srs_success) {return srs_error_wrap(err, "set mw_sleep %" PRId64, mw_sleep);}SrsHttpRecvThread* trd = new SrsHttpRecvThread(hc);SrsAutoFree(SrsHttpRecvThread, trd);if ((err = trd->start()) != srs_success) {return srs_error_wrap(err, "start recv thread");}srs_trace("FLV %s, encoder=%s, nodelay=%d, mw_sleep=%dms, cache=%d, msgs=%d",entry->pattern.c_str(), enc_desc.c_str(), tcp_nodelay, srsu2msi(mw_sleep),enc->has_cache(), msgs.max);// TODO: free and erase the disabled entry after all related connections is closed.// TODO: FXIME: Support timeout for player, quit infinite-loop.while (entry->enabled) {// Whether client closed the FD.if ((err = trd->pull()) != srs_success) {return srs_error_wrap(err, "recv thread");}pprint->elapse();// get messages from consumer.// each msg in msgs.msgs must be free, for the SrsMessageArray never free them.int count = 0;if ((err = consumer->dump_packets(&msgs, count)) != srs_success) {return srs_error_wrap(err, "consumer dump packets");}if (count <= 0) {// Directly use sleep, donot use consumer wait, because we couldn't awake consumer.srs_usleep(mw_sleep);// ignore when nothing got.continue;}if (pprint->can_print()) {srs_trace("-> " SRS_CONSTS_LOG_HTTP_STREAM " http: got %d msgs, age=%d, min=%d, mw=%d",count, pprint->age(), SRS_PERF_MW_MIN_MSGS, srsu2msi(mw_sleep));}// sendout all messages.if (ffe) {err = ffe->write_tags(msgs.msgs, count);} else {err = streaming_send_messages(enc, msgs.msgs, count);}// free the messages.for (int i = 0; i < count; i++) {SrsSharedPtrMessage* msg = msgs.msgs[i];srs_freep(msg);}// check send error code.if (err != srs_success) {return srs_error_wrap(err, "send messages");}}// Here, the entry is disabled by encoder un-publishing or reloading,// so we must return a io.EOF error to disconnect the client, or the client will never quit.return srs_error_new(ERROR_HTTP_STREAM_EOF, "Stream EOF");}


推荐阅读