diff options
author | oskwon <oskwon@dev3> | 2014-10-27 10:49:06 (GMT) |
---|---|---|
committer | oskwon <oskwon@dev3> | 2014-10-27 10:49:06 (GMT) |
commit | 280ea227fe29cc8fc1d9ef23419a499d72f29d00 (patch) | |
tree | 3a84ae560521408f8cd717d1140b1d6a8d0696f9 | |
parent | b55a219498eeceb63a98dcfdc7597b7c40d6977c (diff) |
Fix hanup issue and reduce cpu usage.
-rw-r--r-- | .gitignore | 2 | ||||
-rw-r--r-- | build/Makefile | 4 | ||||
-rw-r--r-- | example/Makefile | 93 | ||||
-rwxr-xr-x | example/scripts/ck.sh | 8 | ||||
-rwxr-xr-x | example/scripts/dbg.sh | 8 | ||||
-rwxr-xr-x | example/scripts/ps.sh | 8 | ||||
-rw-r--r-- | example/stress_full.cpp | 104 | ||||
-rw-r--r-- | src/3rdparty/mpegts.cpp | 735 | ||||
-rw-r--r-- | src/3rdparty/mpegts.h | 216 | ||||
-rw-r--r-- | src/Demuxer.cpp | 61 | ||||
-rw-r--r-- | src/Demuxer.h | 4 | ||||
-rw-r--r-- | src/Encoder.cpp | 14 | ||||
-rw-r--r-- | src/Encoder.h | 2 | ||||
-rw-r--r-- | src/Http.cpp | 12 | ||||
-rw-r--r-- | src/Logger.cpp | 19 | ||||
-rw-r--r-- | src/Makefile.am | 1 | ||||
-rw-r--r-- | src/Mpeg.cpp | 185 | ||||
-rw-r--r-- | src/Mpeg.h | 33 | ||||
-rw-r--r-- | src/Source.h | 1 | ||||
-rw-r--r-- | src/Util.cpp | 5 | ||||
-rw-r--r-- | src/main.cpp | 332 |
21 files changed, 752 insertions, 1095 deletions
@@ -24,3 +24,5 @@ stamp-h1 config.h.in~ config.mk config.mk.bak +example/bin +example/obj diff --git a/build/Makefile b/build/Makefile index 2408020..a07c88b 100644 --- a/build/Makefile +++ b/build/Makefile @@ -20,8 +20,8 @@ TOP=$(PWD)/.. OBJ=./obj/ ifeq ($(CROSS),YES) -SYSROOT=$(OETOP)/$(MODEL)/build/tmp/staging/mipsel-oe-linux -TOOLCHAIN=$(OETOP)/$(MODEL)/build/tmp/cross/mipsel/bin/mipsel-oe-linux- +SYSROOT=$(OETOP)/build/$(MODEL)/tmp/sysroots/$(MODEL) +TOOLCHAIN=$(OETOP)/build/$(MODEL)/tmp/sysroots/i686-linux/usr/bin/mips32el-oe-linux/mipsel-oe-linux- endif RM=rm -Rf diff --git a/example/Makefile b/example/Makefile new file mode 100644 index 0000000..5d5dc57 --- /dev/null +++ b/example/Makefile @@ -0,0 +1,93 @@ +#============================================================================ +# Name : Makefile (transtreamproxy) +# Author : oskwon(oskwon@dev3) +# Version : +# Copyright : Copyright(c)2014 Vu+ Team. All right reserved. +# Description : +#============================================================================ + +-include ../build/config.mk + +ifeq ($(MODEL),) +$(error config.mk is not set. please run script.config before make.) +endif + +MAJOR = 3 +MINOR = 0 + +TOP=$(PWD)/.. +OBJ=./obj/ + +ifeq ($(CROSS),YES) +SYSROOT=$(OETOP)/build/$(MODEL)/tmp/sysroots/$(MODEL) +TOOLCHAIN=$(OETOP)/build/$(MODEL)/tmp/sysroots/i686-linux/usr/bin/mips32el-oe-linux/mipsel-oe-linux- +endif + +RM=rm -Rf +CXX=$(TOOLCHAIN)g++ +LD=$(TOOLCHAIN)ld +STRIP=$(TOOLCHAIN)strip +UPLOAD=./script.upload + +ifeq ($(MODE),DEBUG) +CFLAGS += -g +else +CFLAGS += -O2 +endif +CFLAGS += -D_MAJOR=$(MAJOR) -D_MINOR=$(MINOR) -D_GNU_SOURCE -D_FILE_OFFSET_BITS=64 +CFLAGS += -I$(SYSROOT)/usr/include +LDFLAGS += -L$(SYSROOT)/usr/lib -lpthread -lrt + +SRCS = ../src/3rdparty/trap.cpp +SRCS+= ../src/Demuxer.cpp +SRCS+= ../src/Encoder.cpp +SRCS+= ../src/Http.cpp +SRCS+= ../src/Logger.cpp +SRCS+= ../src/Mpeg.cpp +SRCS+= ../src/UriDecoder.cpp +SRCS+= ../src/Util.cpp + +OBJS = $(SRCS:.cpp=.o) +CFLAGS += $(addprefix -I, $(shell find ../src/ -type d)) + +.SUFFIXES : .cpp .o +.PHONY : all clean install .showinfo .prepare $(PROJECT) + +.cpp.o: + @echo "Compile... ["$<"]" + @$(CXX) -c $(CFLAGS) -o $(OBJ)$(notdir $@) $< + +all: .showinfo .prepare $(OBJS) stress_full + +demux_loop: .prepare + @echo "Link... ["$@"]" + @$(CXX) $(CFLAGS) -o bin/$@ $@.cpp $(addprefix $(OBJ), $(notdir $(OBJS))) $(LDFLAGS) + +stress_full: .prepare + @echo "Link... ["$@"]" + @g++ -o bin/$@ $@.cpp + +transcoding_file: .prepare + @echo "Link... ["$@"]" + @$(CXX) $(CFLAGS) -o bin/$@ $@.cpp $(LDFLAGS) + +clean: + $(RM) $(PROJECT) obj bin *.o *.a *.d *.log + +.prepare: + @if [ ! -e obj ]; then mkdir obj; fi + @if [ ! -e bin ]; then mkdir bin; fi + +.showinfo: + @echo "-----------------------------------------------------" + @echo " [ BUILD ENVIRONMENT ] " + @echo "-----------------------------------------------------" + @echo "PROJECT : "$(PROJECT)" (v"$(MAJOR)"."$(MINOR)")" + @echo "" + @echo "CXX : "$(CXX) + @echo "LD : "$(LD) + @echo "STRIP : "$(STRIP) + @echo "CFLAGS : "$(CFLAGS) + @echo "LDFLAGS : "$(LDFLAGS) + @echo "-----------------------------------------------------" + @echo diff --git a/example/scripts/ck.sh b/example/scripts/ck.sh new file mode 100755 index 0000000..8086068 --- /dev/null +++ b/example/scripts/ck.sh @@ -0,0 +1,8 @@ +#!/bin/sh + +while [ 1 ]; do + clear + ls -la /tmp/tsp_status.* + sleep 1 +done + diff --git a/example/scripts/dbg.sh b/example/scripts/dbg.sh new file mode 100755 index 0000000..babce17 --- /dev/null +++ b/example/scripts/dbg.sh @@ -0,0 +1,8 @@ +#!/bin/sh + +ehco 5 > /tmp/.debug_on + +touch /tmp/transtreamproxy.log + +tail -f /tmp/transtreamproxy.log + diff --git a/example/scripts/ps.sh b/example/scripts/ps.sh new file mode 100755 index 0000000..1180b28 --- /dev/null +++ b/example/scripts/ps.sh @@ -0,0 +1,8 @@ +#!/bin/sh + +while [ 1 ]; do + clear + ps -ef | grep transtreamproxy | grep -v grep | grep -v tail + sleep 1 +done + diff --git a/example/stress_full.cpp b/example/stress_full.cpp new file mode 100644 index 0000000..f5ee3aa --- /dev/null +++ b/example/stress_full.cpp @@ -0,0 +1,104 @@ +/* + * stress.cpp + * + * Created on: 2014. 10. 17. + * Author: oskwon + */ + +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> +#include <signal.h> +#include <string.h> + +#define MAX_INTERVAL_LENGTH (32) +#define DD_LOG(X,...) { printf(X" (%s:%d)\n", ##__VA_ARGS__, __FUNCTION__, __LINE__); } +//------------------------------------------------------------------------------- + +#define PORT 8002 +#define IP "192.168.100.240" + +int intervals[MAX_INTERVAL_LENGTH+1] = { + 2,4,6,1,4,2,15,9, + 2,1,2,2,2,2,15,3, + 2,2,2,2,2,2,2,2, + 2,2,2,2,2,2,2,2, + 0 +}; +const char* services[] = { + "1:0:19:2B66:3F3:1:C00000:0:0:0:", + "1:0:19:2B7A:3F3:1:C00000:0:0:0:", + "" +}; + +pid_t child_pid = 0; +//------------------------------------------------------------------------------- + +void child_sigint_handler( int signo) +{ + exit(0); +} +//------------------------------------------------------------------------------- + +void parent_sigint_handler( int signo) +{ + kill(child_pid, SIGINT); + exit(0); +} +//------------------------------------------------------------------------------- + +void command_execute(char* command) +{ + DD_LOG("excute... [%s]", command); + system(command); +} +//------------------------------------------------------------------------------- + +void child_main(const char* ip, int port, const char* service) +{ + char command[2048]; + + signal(SIGUSR2, child_sigint_handler); + + sprintf(command, "curl http://%s/web/zap?sRef=%s", ip, service); + command_execute(command); + + sprintf(command, "curl http://%s:%d/%s > /dev/null", ip, port, service); + command_execute(command); +} +//------------------------------------------------------------------------------- + +int main(int argc, char** argv) +{ + signal(SIGINT, parent_sigint_handler); + + for (int idx = 0; idx < 500; idx++) { + DD_LOG("\e[1;32m==========================================>> try : %d\e[00m", idx); + + child_pid = fork(); + if (child_pid < 0) { + DD_LOG("fork fail"); + return 0; + } + + if (child_pid == 0) { + if (argc == 1) { + child_main(IP, PORT, services[idx%2]); + } + else { + child_main(argv[1], PORT, services[idx%2]); + } + return 0; + } + + usleep(intervals[idx % MAX_INTERVAL_LENGTH] * 1000*1000); + kill(child_pid, SIGINT); + + if (idx == 499) { + idx = 0; + } + } + return 0; +} +//------------------------------------------------------------------------------- + diff --git a/src/3rdparty/mpegts.cpp b/src/3rdparty/mpegts.cpp deleted file mode 100644 index cc90959..0000000 --- a/src/3rdparty/mpegts.cpp +++ /dev/null @@ -1,735 +0,0 @@ -//#include "config.h" // oskwon -#include "trap.h" - -#include "mpegts.h" -//#include "util.h" // oskwon -#include "../Util.h" // oskwon - -#include <unistd.h> -#include <stddef.h> -#include <fcntl.h> -#include <sys/stat.h> - -#include <boost/crc.hpp> - -#include <string> -using std::string; - -#include <boost/algorithm/string.hpp> - -MpegTS::MpegTS(int fd_in, bool request_time_seek_in) throw(trap) - : private_fd(false), - fd(fd_in), - request_time_seek(request_time_seek_in) -{ - init(); -} - -MpegTS::MpegTS(string filename, bool request_time_seek_in) throw(trap) - : private_fd(true), - request_time_seek(request_time_seek_in) -{ - // oskwon : add option O_LARGEFILE. - if((fd = open(filename.c_str(), O_RDONLY | O_LARGEFILE, 0)) < 0) - throw(trap("MpegTS::MpegTS: cannot open file")); - - init(); -} - -MpegTS::~MpegTS() throw() -{ - if(private_fd) - close(fd); -} - -void MpegTS::init() throw(trap) -{ - mpegts_pat_t::const_iterator it; - struct stat filestat; - - if(fstat(fd, &filestat)) - throw(trap("MpegTS::init: cannot stat")); - - Util::vlog("MpegTS::init: file length: %lld Mb", filestat.st_size >> 20); - - eof_offset = stream_length = filestat.st_size; - eof_offset = (eof_offset / sizeof(ts_packet_t)) * sizeof(ts_packet_t); - - if(!read_pat()) - throw(trap("MpegTS::init: invalid transport stream (no suitable pat)")); - - for(it = pat.begin(); it != pat.end(); it++) - if(read_pmt(it->second)) - break; - - if(it == pat.end()) - throw(trap("MpegTS::init: invalid transport stream (no suitable pmt)")); - - pmt_pid = it->second; - is_time_seekable = false; - - if(request_time_seek) - { - Util::vlog("MpegTS: start find pcr"); - - if(lseek(fd, 0, SEEK_SET) == (off_t)-1) - Util::vlog("MpegTS::init seek to sof fails"); - else - { - Util::vlog("MpegTS: start find first pcr"); - - first_pcr_ms = find_pcr_ms(direction_forward); - - if(lseek(fd, eof_offset, SEEK_SET) == (off_t)-1) - Util::vlog("MpegTS::init: seek to eof fails"); - else - { - Util::vlog("MpegTS: start find last pcr"); - last_pcr_ms = find_pcr_ms(direction_backward); - - if(last_pcr_ms < first_pcr_ms) - Util::vlog("MpegTS::init: pcr wraparound, cannot seek this stream, first pcr: %d, last pcr: %d", first_pcr_ms / 1000, last_pcr_ms / 1000); - else - if((first_pcr_ms >= 0) && (last_pcr_ms >= 0)) - is_time_seekable = true; - } - } - - Util::vlog("MpegTS: find pcr done"); - } - - if(!is_time_seekable) - { - first_pcr_ms = -1; - last_pcr_ms = -1; - } - - if(lseek(fd, 0, SEEK_SET) == (off_t)-1) - Util::vlog("MpegTS::init: seek to sof fails"); - - //Util::vlog("first_pcr_ms = %d", first_pcr_ms); - //Util::vlog("last_pcr_ms = %d", last_pcr_ms); - //Util::vlog("eof_offset is at %lld", eof_offset); -} - -bool MpegTS::read_table(int filter_pid, int filter_table) throw(trap) -{ - typedef boost::crc_optimal<32, 0x04c11db7, 0xffffffff, 0x0, false, false> boost_mpeg_crc_t; - - boost_mpeg_crc_t my_crc; - mpeg_crc_t mpeg_crc; - uint32_t their_crc; - - ts_packet_t packet; - const section_table_header_t *table; - const section_table_syntax_t *syntax; - const uint8_t *payload_data; - - int timeout; - int section_length = -1; - int section_length_remaining = -1; - int pid; - int cc = -1; - int packet_payload_offset; - int payload_length; - - raw_table_data.clear(); - table_data.clear(); - - for(timeout = 0; timeout < 2000; timeout++) - { - if(read(fd, (void *)&packet, sizeof(packet)) != sizeof(packet)) - throw(trap("MpegTS::read_table: read error")); - - if(packet.header.sync_byte != MpegTS::sync_byte_value) - throw(trap("MpegTS::read_table: no sync byte found")); - - pid = (packet.header.pid_high << 8) | (packet.header.pid_low); - - if(pid != filter_pid) - continue; - - if(packet.header.tei) - continue; - - if(!packet.header.payload_present) - continue; - - if((cc != -1) && (cc != packet.header.cc)) - { - //Util::vlog("MpegTS::read_table discontinuity: %d/%d", cc, packet.header.cc); - goto retry; - } - - cc = (packet.header.cc + 1) & 0x0f; - - if(packet.header.pusi) - { - if(table_data.length() > 0) - { - //Util::vlog("MpegTS::read_table: start payload upexpected"); - goto retry; - } - } - else - { - if(table_data.length() <= 0) - { - //Util::vlog("MpegTS::read_table: continue payload unexpected"); - goto retry; - } - } - - //Util::vlog("MpegTS::read_table: correct packet with pid: %x, %s", pid, packet.header.pusi ? "start" : "continuation"); - - packet_payload_offset = offsetof(ts_packet_t, header.payload); - - //Util::vlog("MpegTS::read_table: payload offset: %d", packet_payload_offset); - - if(packet.header.af) - packet_payload_offset = offsetof(ts_packet_t, header.afield) + packet_payload_offset; - - //Util::vlog("MpegTS::read_table: payload offset after adaptation field: %d", packet_payload_offset); - - if(packet.header.pusi) - packet_payload_offset += packet.byte[packet_payload_offset] + 1; // psi payload pointer byte - - //Util::vlog("MpegTS::read_table: payload offset after section pointer field: %d", packet_payload_offset); - - if(table_data.length() == 0) - { - table = (const section_table_header_t *)&packet.byte[packet_payload_offset]; - - raw_table_data.assign((const uint8_t *)table, offsetof(section_table_header_t, payload)); - - //Util::vlog("MpegTS::read_table: table id: %d", table->table_id); - - if(table->table_id != filter_table) - { - Util::vlog("MpegTS::read_table: table %d != %d", table->table_id, filter_table); - goto retry; - } - - if(table->private_bit) - { - Util::vlog("MpegTS::read_table: private != 0: %d", table->private_bit); - goto retry; - } - - if(!table->section_syntax) - { - Util::vlog("MpegTS::read_table: section_syntax == 0: %d", table->section_syntax); - goto retry; - } - - if(table->reserved != 0x03) - { - Util::vlog("MpegTS::read_table: reserved != 0x03: %x", table->reserved); - goto retry; - } - - if(table->section_length_unused != 0x00) - { - Util::vlog("MpegTS::read_table: section length unused != 0x00: %x", table->section_length_unused); - goto retry; - } - - section_length = ((table->section_length_high << 8) | (table->section_length_low)) - offsetof(section_table_syntax_t, data); - //Util::vlog("MpegTS::read_table: section length: %d", section_length); - - if(section_length < 0) - { - Util::vlog("MpegTS::read_table: section length < 0: %d", section_length); - goto retry; - } - - if(section_length_remaining < 0) - section_length_remaining = section_length; - - syntax = (const section_table_syntax_t *)&table->payload; - - raw_table_data.append((const uint8_t *)syntax, offsetof(section_table_syntax_t, data)); - - //Util::vlog("MpegTS::read_table: tide: 0x%x", (syntax->tide_high << 8) | syntax->tide_low); - - if(syntax->reserved != 0x03) - { - Util::vlog("MpegTS::read_table: syntax reserved != 0x03: %d", syntax->reserved); - goto retry; - } - - //Util::vlog("MpegTS::read_table: currnext: %d", syntax->currnext); - //Util::vlog("MpegTS::read_table: version: %d", syntax->version); - //Util::vlog("MpegTS::read_table: ordinal: %d", syntax->ordinal); - //Util::vlog("MpegTS::read_table: last: %d", syntax->last); - - payload_length = sizeof(ts_packet_t) - ((const uint8_t *)&syntax->data - &packet.byte[0]); - //Util::vlog("MpegTS::read_table: payload length: %d", payload_length); - - if(payload_length > section_length_remaining) - payload_length = section_length_remaining; - - //Util::vlog("MpegTS::read_table: payload length after trimming: %d", payload_length); - - raw_table_data.append((const uint8_t *)&syntax->data, payload_length); - table_data.append((const uint8_t *)&syntax->data, payload_length); - } - else - { - payload_data = (const uint8_t *)&packet.byte[packet_payload_offset]; - payload_length = sizeof(ts_packet_t) - packet_payload_offset; - - if(payload_length > section_length_remaining) - payload_length = section_length_remaining; - - raw_table_data.append((const uint8_t *)payload_data, payload_length); - table_data.append((const uint8_t *)payload_data, payload_length); - } - - section_length_remaining -= payload_length; - - if((section_length > 0) && (section_length_remaining == 0)) - break; - - continue; - -retry: - section_length = -1; - section_length_remaining = -1; - raw_table_data.clear(); - table_data.clear(); - } - - if(table_data.length() == 0) - { - //Util::vlog("MpegTS::read_table: timeout"); - return(false); - } - - if(section_length < (int)sizeof(mpeg_crc_t)) - { - Util::vlog("MpegTS::read_table: table too small"); - return(false); - } - - my_crc.process_bytes(raw_table_data.data(), raw_table_data.length() - sizeof(mpeg_crc_t)); - - mpeg_crc = *(const mpeg_crc_t *)(&raw_table_data.data()[raw_table_data.length() - sizeof(mpeg_crc_t)]); - their_crc = (mpeg_crc.byte[0] << 24) | (mpeg_crc.byte[1] << 16) | (mpeg_crc.byte[2] << 8) | mpeg_crc.byte[3]; - - if(my_crc.checksum() != their_crc) - { - Util::vlog("MpegTS::read_table: crc mismatch: my crc: %x, their crc: %x", my_crc.checksum(), their_crc); - return(false); - } - - return(true); -} - -bool MpegTS::read_pat() throw(trap) -{ - int attempt; - int current, entries, program, pid; - const pat_entry_t *entry; - - for(attempt = 0; attempt < 16; attempt++) - { - if(!read_table(0, 0)) - continue; - - entries = (table_data.length() - sizeof(mpeg_crc_t)) / sizeof(*entry); - entry = (const pat_entry_t *)table_data.data(); - - for(current = 0; current < entries; current++) - { - program = (entry[current].program_high << 8) | (entry[current].program_low); - pid = (entry[current].pmt_pid_high << 8) | (entry[current].pmt_pid_low); - //Util::vlog("MpegTS::read_pat > program: %d -> pid %x", program, pid); - - if(entry[current].reserved != 0x07) - { - Util::vlog("MpegTS::read_pat > reserved != 0x07: 0x%x", entry[current].reserved); - goto next_pat_entry; - } - - pat[program] = pid; - } - - return(true); - -next_pat_entry: - (void)0; - } - - return(false); -} - -bool MpegTS::read_pmt(int filter_pid) throw(trap) -{ - int attempt, programinfo_length, esinfo_length; - int es_pid, es_data_length, es_data_skip, es_data_offset; - int ds_data_skip, ds_data_offset; - bool private_stream_is_ac3; - string stream_language; - - const uint8_t *es_data; - const pmt_header_t *pmt_header; - const pmt_es_entry_t *es_entry; - const pmt_ds_entry_t *ds_entry; - const pmt_ds_a_t *ds_a; - - pcr_pid = video_pid = audio_pid = -1; - - for(attempt = 0; attempt < 16; attempt++) - { - if(!read_table(filter_pid, table_pmt)) - break; - - pmt_header = (const pmt_header_t *)table_data.data(); - pcr_pid = (pmt_header->pcrpid_high << 8) | pmt_header->pcrpid_low; - programinfo_length = (pmt_header->programinfo_length_high << 8) | - pmt_header->programinfo_length_low; - - if(pmt_header->reserved_1 != 0x07) - { - Util::vlog("MpegTS::read_pmt > reserved_1: %x", pmt_header->reserved_1); - continue; - } - - //Util::vlog("MpegTS::read_pmt: > pcr_pid: %x", pcr_pid); - //Util::vlog("MpegTS::read_pmt: > program info length: %d", programinfo_length); - - if(pmt_header->unused != 0x00) - { - Util::vlog("MpegTS::read_pmt: > unused: %x", pmt_header->unused); - continue; - } - - if(pmt_header->reserved_2 != 0x0f) - { - Util::vlog("MpegTS::read_pmt: > reserved_2: %x", pmt_header->reserved_2); - continue; - } - - es_data = &(table_data.data()[offsetof(pmt_header_t, data) + programinfo_length]); - es_data_length = table_data.length() - offsetof(pmt_header_t, data); - es_data_skip = offsetof(pmt_es_entry_t, descriptors); - - for(es_data_offset = 0; (es_data_offset + es_data_skip + (int)sizeof(uint32_t)) < es_data_length; ) - { - es_entry = (const pmt_es_entry_t *)&es_data[es_data_offset]; - esinfo_length = (es_entry->es_length_high << 8) | es_entry->es_length_low; - es_pid = (es_entry->es_pid_high << 8) | es_entry->es_pid_low; - - if(es_entry->reserved_1 != 0x07) - { - Util::vlog("MpegTS::read_pmt: reserved 1: %x", es_entry->reserved_1); - goto next_descriptor_entry; - } - - //Util::vlog("MpegTS::read_pmt: >> pid: %x", es_pid); - - if(es_entry->reserved_2 != 0x0f) - { - Util::vlog("MpegTS::read_pmt: reserved 2: %x", es_entry->reserved_2); - goto next_descriptor_entry; - } - - if(es_entry->unused != 0x00) - { - Util::vlog("MpegTS::read_pmt: unused: %x", es_entry->unused); - goto next_descriptor_entry; - } - - //Util::vlog("MpegTS::read_pmt: esinfo_length: %d", esinfo_length); - - switch(es_entry->stream_type) - { - case(mpeg_streamtype_video_mpeg1): - case(mpeg_streamtype_video_mpeg2): - case(mpeg_streamtype_video_h264): - { - if(video_pid < 0) - video_pid = es_pid; - break; - } - - case(mpeg_streamtype_audio_mpeg1): - case(mpeg_streamtype_audio_mpeg2): - case(mpeg_streamtype_private_pes): // ac3 - { - private_stream_is_ac3 = false; - - ds_data_skip = es_data_offset + offsetof(pmt_es_entry_t, descriptors); - - for(ds_data_offset = 0; (ds_data_offset + 2) < esinfo_length; ) - { - ds_entry = (const pmt_ds_entry_t *)&es_data[ds_data_skip + ds_data_offset]; - - //Util::vlog("MpegTS::read_pmt: >>> offset: %d", ds_data_offset); - //Util::vlog("MpegTS::read_pmt: >>> descriptor id: %x", ds_entry->id); - //Util::vlog("MpegTS::read_pmt: >>> length: %d", ds_entry->length); - - switch(ds_entry->id) - { - case(pmt_desc_language): - { - ds_a = (const pmt_ds_a_t *)&ds_entry->data; - //Util::vlog("MpegTS::read_pmt: >>>> lang: %c%c%c [%d]", ds_a->lang[0], - // ds_a->lang[1], ds_a->lang[2], ds_a->code); - - stream_language.assign((const char *)&ds_a->lang, offsetof(pmt_ds_a_t, code)); - - break; - } - - case(pmt_desc_ac3): - { - private_stream_is_ac3 = true; - break; - } - } - - ds_data_offset += ds_entry->length + offsetof(pmt_ds_entry_t, data); - } - - if(!boost::iequals(stream_language, "nar")) - { - if(private_stream_is_ac3 || (audio_pid < 0)) // ac3 stream has preference - audio_pid = es_pid; - } - } - } - -next_descriptor_entry: - es_data_offset += es_data_skip + esinfo_length; - } - - return(true); - } - - return(false); -} - -int MpegTS::get_fd() const throw() -{ - return(fd); -} - -void MpegTS::parse_pts_ms(int pts_ms, int &h, int &m, int &s, int &ms) throw() -{ - h = pts_ms / (60 * 60 * 1000); - pts_ms -= h * (60 * 60 * 1000); - m = pts_ms / (60 * 1000); - pts_ms -= m * (60 * 1000); - s = pts_ms / (1000); - pts_ms -= s * (1000); - ms = pts_ms; -} - -int MpegTS::find_pcr_ms(seek_direction_t direction) const throw() -{ - ts_packet_t packet; - ts_adaptation_field_t *afield; - int attempt, pcr_ms = -1, pid; - int ms, h, m, s; - off_t offset; - - for(attempt = 0; (pcr_ms < 0) && (attempt < find_pcr_max_probe); attempt++) - { - if(direction == direction_backward) - { - offset = (off_t)sizeof(ts_packet_t) * 2 * -1; - - if(lseek(fd, offset, SEEK_CUR) == (off_t)-1) - { - Util::vlog("MpegTS::find_pcr_ms: lseek failed"); - return(-1); - } - } - - if(read(fd, &packet, sizeof(packet)) != sizeof(packet)) - { - Util::vlog("MpegTS::find_pcr_ms: read error"); - return(-1); - } - - if(packet.header.sync_byte != sync_byte_value) - { - Util::vlog("MpegTS::find_pcr_ms: no sync byte"); - return(-1); - } - - pid = (packet.header.pid_high << 8) | packet.header.pid_low; - - //Util::vlog("MpegTS::find_pcr_ms: pid: %d", pid); - - if(pid != pcr_pid) - continue; - - if(!packet.header.af) - { - //Util::vlog("MpegTS::find_pcr_ms: no adaptation field"); - continue; - } - - afield = (ts_adaptation_field_t *)&packet.header.afield; - - if(!afield->contains_pcr) - { - //Util::vlog("MpegTS::find_pcr_ms: attempt: %d, adaptation field does not have pcr field", attempt); - continue; - } - - // read 32 bits of the total 42 bits of clock precision, which is - // enough for seeking, one tick = 1/45th second - - pcr_ms = uint32_t(afield->pcr_0 << 24) | uint32_t(afield->pcr_1 << 16) | - uint32_t(afield->pcr_2 << 8) | uint32_t(afield->pcr_3 << 0); - pcr_ms /= 90; - pcr_ms <<= 1; - } - - if(attempt >= find_pcr_max_probe) - { - Util::vlog("MpegTS::find_pcr_ms: no pcr found"); - return(-1); - } - - parse_pts_ms(pcr_ms, h, m, s, ms); - - //Util::vlog("PCR found after %d packets", attempt); - //Util::vlog("PCR = %d ms (%02d:%02d:%02d:%03d)", pcr_ms, h, m, s, ms); - - return(pcr_ms); -} - -off_t MpegTS::seek(int whence, off_t offset) const throw(trap) -{ - ts_packet_t packet; - off_t actual_offset; - off_t new_offset; - - offset = ((offset / (off_t)sizeof(ts_packet_t)) * (off_t)sizeof(ts_packet_t)); - - if(lseek(fd, offset, whence) < 0) - throw(trap("MpegTS::seek: lseek (1)")); - - if(read(fd, &packet, sizeof(packet)) != sizeof(packet)) - throw(trap("MpegTS::seek: read error")); - - if(packet.header.sync_byte != sync_byte_value) - throw(trap("MpegTS::seek: no sync byte")); - - new_offset = (off_t)0 - (off_t)sizeof(packet); - - if((actual_offset = lseek(fd, new_offset, SEEK_CUR)) < 0) - throw(trap("MpegTS::seek: lseek (2)")); - - return(actual_offset); -} - -off_t MpegTS::seek_absolute(off_t offset) const throw(trap) -{ - return(seek(SEEK_SET, offset)); -} - -off_t MpegTS::seek_relative(off_t roffset, off_t limit) const throw(trap) -{ - off_t offset; - - if((roffset < 0) || (roffset > limit)) - throw(trap("MpegTS::seek_relative: value out of range")); - - offset = (eof_offset / limit) * roffset; - - return(seek(SEEK_SET, offset)); -} - -off_t MpegTS::seek_time(int pts_ms) const throw(trap) -{ - int h, m, s, ms; - int attempt; - off_t lower_bound_offset = 0; - int lower_bound_pts_ms = first_pcr_ms; - off_t upper_bound_offset = eof_offset; - int upper_bound_pts_ms = last_pcr_ms; - off_t disect_offset; - int disect_pts_ms; - off_t current_offset; - - if(!is_time_seekable) - throw(trap("MpegTS::seek: stream is not seekable")); - - if(pts_ms < first_pcr_ms) - { - Util::vlog("MpegTS::seek: seek pts beyond start of file"); - return(-1); - } - - if(pts_ms > last_pcr_ms) - { - Util::vlog("MpegTS::seek: pts beyond end of file"); - return(-1); - } - - for(attempt = 0; attempt < seek_max_attempts; attempt++) - { - disect_offset = (lower_bound_offset + upper_bound_offset) / 2; - - parse_pts_ms(pts_ms, h, m, s, ms); - //Util::vlog("MpegTS::seek: seek for [%02d:%02d:%02d.%03d] between ", h, m, s, ms); - parse_pts_ms(lower_bound_pts_ms, h, m, s, ms); - //Util::vlog("MpegTS::seek: [%02d:%02d:%02d.%03d", h, m, s, ms); - parse_pts_ms(upper_bound_pts_ms, h, m, s, ms); - //Util::vlog("MpegTS::seek: -%02d:%02d:%02d.%03d], ", h, m, s, ms); - //Util::vlog("MpegTS::seek: offset = %lld [%lld-%lld], ", disect_offset, lower_bound_offset, upper_bound_offset); - - if((current_offset = seek(SEEK_SET, disect_offset)) == 1) - { - Util::vlog("MpegTS::seek: seek fails"); - return(-1); - } - - //Util::vlog("MpegTS::seek: current offset = %lld (%lld%%)", current_offset, (current_offset * 100) / eof_offset); - - if((disect_pts_ms = find_pcr_ms(direction_forward)) < 0) - { - Util::vlog("MpegTS::seek: eof"); - return(-1); - } - - parse_pts_ms(disect_pts_ms, h, m, s, ms); - //Util::vlog("MpegTS::seek: disect=[%02d:%02d:%02d.%03d]", h, m, s, ms); - - if(disect_pts_ms < 0) - { - Util::vlog("MpegTS::seek failed to find pts"); - return(-1); - } - - if(((disect_pts_ms > pts_ms) && ((disect_pts_ms - pts_ms) < 8000)) || - ((pts_ms >= disect_pts_ms) && ((pts_ms - disect_pts_ms) < 8000))) - { - //Util::vlog("MpegTS::seek: found"); - return(current_offset); - } - - if(disect_pts_ms < pts_ms) // seek higher - { - lower_bound_offset = disect_offset; - lower_bound_pts_ms = disect_pts_ms; - - //Util::vlog("MpegTS::seek: not found: change lower bound"); - } - else - { - upper_bound_offset = disect_offset; - upper_bound_pts_ms = disect_pts_ms; - - //Util::vlog("MpegTS::seek: not found: change upper bound"); - } - } - - return(-1); -} diff --git a/src/3rdparty/mpegts.h b/src/3rdparty/mpegts.h deleted file mode 100644 index 6402dca..0000000 --- a/src/3rdparty/mpegts.h +++ /dev/null @@ -1,216 +0,0 @@ -#ifndef _mpegts_h_ -#define _mpegts_h_ - -//#include "config.h" // oskwon -#include "trap.h" - -#include <map> -#include <string> - -#include <stdint.h> -#include <sys/types.h> - -#include "../Source.h" // oskwon -class MpegTS : public Source // oskwon : inherit Source class. -{ - private: - - typedef std::basic_string<uint8_t> u8string; - typedef unsigned int bf; - - enum - { - sync_byte_value = 0x47, - }; - - enum - { - table_pmt = 0x02, - }; - - enum - { - find_pcr_max_probe = 1000000, - seek_max_attempts = 32, - }; - - enum - { - pmt_desc_language = 0x0a, - pmt_desc_ac3 = 0x6a, - }; - - enum - { - mpeg_streamtype_video_mpeg1 = 0x01, - mpeg_streamtype_video_mpeg2 = 0x02, - mpeg_streamtype_video_h264 = 0x1b, - mpeg_streamtype_audio_mpeg1 = 0x03, - mpeg_streamtype_audio_mpeg2 = 0x04, - mpeg_streamtype_private_pes = 0x06, - }; - - typedef struct - { - uint8_t byte[4]; - } mpeg_crc_t; - - typedef union - { - struct - { - uint8_t sync_byte; - bf pid_high:5; - bf tp:1; - bf pusi:1; - bf tei:1; - uint8_t pid_low; - bf cc:4; - bf payload_present:1; - bf af:1; - bf sc:2; - uint8_t payload[0]; - uint8_t afield_length; - uint8_t afield[0]; - } header; - uint8_t byte[188]; - } ts_packet_t; - - typedef struct - { - bf field_ext:1; - bf private_data:1; - bf splice_point:1; - bf contains_opcr:1; - bf contains_pcr:1; - bf priority:1; - bf gop_start:1; - bf discont:1; - uint8_t pcr_0; // 32 bits of the total 42 bits - uint8_t pcr_1; // of clock precision, which is - uint8_t pcr_2; // enough for seeking - uint8_t pcr_3; // tick = 1/45th second - } ts_adaptation_field_t; - - typedef struct - { - uint8_t table_id; - bf section_length_high:2; - bf section_length_unused:2; - bf reserved:2; - bf private_bit:1; - bf section_syntax:1; - uint8_t section_length_low; - uint8_t payload[0]; - } section_table_header_t; - - typedef struct - { - uint8_t tide_high; - uint8_t tide_low; - bf currnext:1; - bf version:5; - bf reserved:2; - uint8_t ordinal; - uint8_t last; - uint8_t data[0]; - } section_table_syntax_t; - - typedef struct - { - uint8_t program_high; - uint8_t program_low; - bf pmt_pid_high:5; - bf reserved:3; - uint8_t pmt_pid_low; - } pat_entry_t; - - typedef struct - { - bf pcrpid_high:5; - bf reserved_1:3; - uint8_t pcrpid_low; - uint programinfo_length_high:2; - bf unused:2; - bf reserved_2:4; - uint8_t programinfo_length_low; - uint8_t data[0]; - } pmt_header_t; - - typedef struct - { - uint8_t stream_type; - bf es_pid_high:5; - bf reserved_1:3; - uint8_t es_pid_low; - bf es_length_high:2; - bf unused:2; - bf reserved_2:4; - uint8_t es_length_low; - uint8_t descriptors[0]; - } pmt_es_entry_t; - - typedef struct - { - uint8_t id; - uint8_t length; - uint8_t data[0]; - } pmt_ds_entry_t; - - typedef struct - { - uint8_t lang[3]; - uint8_t code; - } pmt_ds_a_t; - - typedef enum - { - direction_forward, - direction_backward, - } seek_direction_t; - - typedef std::map<int, int> mpegts_pat_t; - - MpegTS(); - MpegTS(const MpegTS &); - - bool private_fd; - int fd; - mpegts_pat_t pat; - u8string raw_table_data; - u8string table_data; - - static void parse_pts_ms(int pts_ms, int &h, int &m, int &s, int &ms) throw(); - - void init() throw(trap); - bool read_table(int filter_pid, int filter_table) throw(trap); - bool read_pat() throw(trap); - bool read_pmt(int filter_pid) throw(trap); - int find_pcr_ms(seek_direction_t direction) const throw(); - off_t seek(int whence, off_t offset) const throw(trap); - - public: - - int pmt_pid; - int pcr_pid; - int video_pid; - int audio_pid; - - bool request_time_seek; - bool is_time_seekable; - - int first_pcr_ms; - int last_pcr_ms; - off_t eof_offset; - off_t stream_length; - - MpegTS(int fd, bool request_time_seek) throw(trap); - MpegTS(std::string file, bool request_time_seek) throw(trap); - ~MpegTS() throw(); - - int get_fd() const throw(); - off_t seek_absolute(off_t offset) const throw(trap); - off_t seek_relative(off_t offset, off_t limit) const throw(trap); - off_t seek_time(int pts_ms) const throw(trap); -}; -#endif diff --git a/src/Demuxer.cpp b/src/Demuxer.cpp index 83290c6..d504b4e 100644 --- a/src/Demuxer.cpp +++ b/src/Demuxer.cpp @@ -22,6 +22,7 @@ using namespace std; //------------------------------------------------------------------------------- +bool terminated(); std::string Demuxer::webif_reauest(std::string request) throw(http_trap) { if ((sock = socket(PF_INET, SOCK_STREAM, 0)) < 0) @@ -59,6 +60,10 @@ std::string Demuxer::webif_reauest(std::string request) throw(http_trap) } response += buffer; } + if (terminated()) { + response = ""; + break; + } } return response; } @@ -180,6 +185,26 @@ bool Demuxer::parse_webif_response(std::string& response, std::vector<unsigned l } //------------------------------------------------------------------------------- +void Demuxer::open() throw(http_trap) +{ + if (demux_id < 0) { + throw(http_trap("demux id is not set!!", 503, "Service Unavailable")); + } + std::string demuxpath = "/dev/dvb/adapter0/demux" + Util::ultostr(demux_id); + if ((fd = ::open(demuxpath.c_str(), O_RDWR | O_NONBLOCK)) < 0) { + throw(http_trap(std::string("demux open fail : ") + demuxpath, 503, "Service Unavailable")); + } + INFO("demux open success : %s", demuxpath.c_str()); + + try { + set_filter(new_pids); + } + catch (const trap &e) { + throw(http_trap(e.what(), 503, "Service Unavailable")); + } +} +//------------------------------------------------------------------------------- + Demuxer::Demuxer(HttpHeader *header) throw(http_trap) { demux_id = pat_pid = fd = sock = -1; @@ -205,27 +230,35 @@ Demuxer::Demuxer(HttpHeader *header) throw(http_trap) throw(http_trap("webif whthentication fail.", 401, "Unauthorized")); } - std::vector<unsigned long> new_pids; + new_pids.clear(); if (!parse_webif_response(webif_response, new_pids)) throw(http_trap("webif response parsing fail.", 503, "Service Unavailable")); - - std::string demuxpath = "/dev/dvb/adapter0/demux" + Util::ultostr(demux_id); - if ((fd = open(demuxpath.c_str(), O_RDWR | O_NONBLOCK)) < 0) { - throw(http_trap(std::string("demux open fail : ") + demuxpath, 503, "Service Unavailable")); - } - INFO("demux open success : %s", demuxpath.c_str()); - - try { - set_filter(new_pids); - } - catch (const trap &e) { - throw(http_trap(e.what(), 503, "Service Unavailable")); - } } //------------------------------------------------------------------------------- Demuxer::~Demuxer() throw() { + std::vector<unsigned long>::iterator iter = pids.begin(); + for (; iter != pids.end(); ++iter) { + unsigned long pid = *iter; + + while(pid != -1) { +#if HAVE_DVB_API_VERSION > 3 + __u16 p = pid; + if (::ioctl(fd, DMX_REMOVE_PID, &p) < 0) { +#else + if (::ioctl(fd, DMX_REMOVE_PID, pid) < 0) { +#endif + WARNING("DMX_REMOVE_PID"); + if (errno == EAGAIN || errno == EINTR) { + DEBUG("retry DMX_REMOVE_PID"); + continue; + } + } + break; + } + } + if (fd != -1) close(fd); if (sock != -1) close(sock); diff --git a/src/Demuxer.h b/src/Demuxer.h index 4b8732a..11c0797 100644 --- a/src/Demuxer.h +++ b/src/Demuxer.h @@ -32,6 +32,7 @@ private: int demux_id; int pat_pid; std::vector<unsigned long> pids; + std::vector<unsigned long> new_pids; protected: std::string webif_reauest(std::string request) throw(http_trap); @@ -42,7 +43,10 @@ protected: public: Demuxer(HttpHeader *header) throw(http_trap); virtual ~Demuxer() throw(); + void open() throw(http_trap); + int get_fd() const throw(); + bool is_initialized() { return true; } }; //---------------------------------------------------------------------- diff --git a/src/Encoder.cpp b/src/Encoder.cpp index c9d09f2..044e130 100644 --- a/src/Encoder.cpp +++ b/src/Encoder.cpp @@ -80,6 +80,12 @@ Encoder::Encoder() throw(trap) Encoder::~Encoder() { Post(); + encoder_close(); +} +//---------------------------------------------------------------------- + +void Encoder::encoder_close() +{ if (fd != -1) { if (state == ENCODER_STAT_STARTED) { DEBUG("stop transcoding.."); @@ -103,15 +109,21 @@ bool Encoder::encoder_open() } //---------------------------------------------------------------------- +bool terminated(); bool Encoder::retry_open(int retry_count, int sleep_time) { for (int i = 0; i < retry_count; ++i) { + if (terminated()) { + break; + } if (encoder_open()) { DEBUG("encoder-%d open success..", encoder_id); return true; } WARNING("encoder%d open fail, retry count : %d/%d", encoder_id, i, retry_count); - sleep(sleep_time); + if (retry_count > 1) { + sleep(sleep_time); + } } ERROR("encoder open fail : %s (%d)", strerror(errno), errno); return false; diff --git a/src/Encoder.h b/src/Encoder.h index e44cfb9..3a69327 100644 --- a/src/Encoder.h +++ b/src/Encoder.h @@ -67,6 +67,8 @@ public: int get_fd(); bool ioctl(int cmd, int value); bool retry_open(int retry_count, int sleep_time); + + void encoder_close(); }; //---------------------------------------------------------------------- diff --git a/src/Http.cpp b/src/Http.cpp index 161b288..26e1e99 100644 --- a/src/Http.cpp +++ b/src/Http.cpp @@ -171,16 +171,24 @@ std::string HttpHeader::build_response(Mpeg *source) } //---------------------------------------------------------------------- +bool terminated(); std::string HttpHeader::read_request() { std::string request = ""; while (true) { char buffer[128] = {0}; - fgets(buffer, 127, stdin); - + if (!read (0, buffer, 127)) { + break; + } request += buffer; if(request.find("\r\n\r\n") != string::npos) break; + + if (terminated()) { + request = ""; + break; + } + usleep(0); } return request; } diff --git a/src/Logger.cpp b/src/Logger.cpp index dc05d3a..b591bd6 100644 --- a/src/Logger.cpp +++ b/src/Logger.cpp @@ -37,18 +37,6 @@ static const char* LOG_LV_STR[] = { #endif //---------------------------------------------------------------------- -char* timestamp(const char* aFormat) -{ - time_t t; - time(&t); - - struct tm* m = localtime(&t); - static char sz_timestamp[32] = {0}; - strftime(sz_timestamp, sizeof(sz_timestamp), aFormat, m); - return sz_timestamp; -} -//---------------------------------------------------------------------- - Logger::Logger() : mLogLevel(0), mLogHandle(0) { @@ -106,9 +94,7 @@ bool Logger::init(const char* aName, int aLogLevel, bool aWithTimestamp) return true; } char path[256] = {0}; - if (aWithTimestamp) - sprintf(path, "%s_%s.log", aName, timestamp("%Y%m%d")); - else sprintf(path, "%s.log", aName); + sprintf(path, "%s.log", aName); if (!(mLogHandle = fopen(path, "a+"))) { mLogHandle = 0; // printf("fail to open logger [%s].", path); @@ -197,8 +183,7 @@ void Logger::log(int aLogLevel, const char* aFormat, ...) va_start(args, aFormat); vsnprintf(log_data_buffer, MAX_PRINT_LEN-1, aFormat, args); va_end(args); - - fprintf(mLogHandle, "[%s]%s[%d] %s\n", timestamp(DEFAULT_TIMESTAMP_FORMAT), LOG_LV_STR[aLogLevel], mPid, log_data_buffer); + fprintf(mLogHandle, "%s[%d] %s\n", LOG_LV_STR[aLogLevel], mPid, log_data_buffer); fflush(mLogHandle); #endif } diff --git a/src/Makefile.am b/src/Makefile.am index 5f6df1a..75a67e5 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -10,7 +10,6 @@ transtreamproxy_SOURCES = \ Mpeg.cpp \ UriDecoder.cpp \ Util.cpp \ - 3rdparty/mpegts.cpp \ 3rdparty/trap.cpp transtreamproxy_CXXFLAGS = $(LIBSDL_CFLAGS) -Wno-unused-result -D_GNU_SOURCE -D_FILE_OFFSET_BITS=64 diff --git a/src/Mpeg.cpp b/src/Mpeg.cpp index 0cac649..c2e7781 100644 --- a/src/Mpeg.cpp +++ b/src/Mpeg.cpp @@ -9,6 +9,15 @@ #include "Http.h" #include "Util.h" #include "Logger.h" + +#include <sys/stat.h> + +#include <vector> +#include <string> +#include <fstream> +#include <iterator> + +using namespace std; //---------------------------------------------------------------------- void Mpeg::seek(HttpHeader &header) @@ -278,6 +287,58 @@ int Mpeg::calc_bitrate() } //---------------------------------------------------------------------- +int Mpeg::find_pmt() +{ + off_t position=0; + + int left = 5*1024*1024; + + while (left >= 188) { + unsigned char packet[188]; + int ret = read_internal(position, packet, 188); + if (ret != 188) { + DEBUG("read error"); + break; + } + left -= 188; + position += 188; + + if (packet[0] != 0x47) { + int i = 0; + while (i < 188) { + if (packet[i] == 0x47) + break; + --position; ++i; + } + continue; + } + int pid = ((packet[1] << 8) | packet[2]) & 0x1FFF; + int pusi = !!(packet[1] & 0x40); + + if (!pusi) continue; + + /* ok, now we have a PES header or section header*/ + unsigned char *sec; + + /* check for adaption field */ + if (packet[3] & 0x20) { + if (packet[4] >= 183) + continue; + sec = packet + packet[4] + 4 + 1; + } + else { + sec = packet + 4; + } + if (sec[0]) continue; /* table pointer, assumed to be 0 */ + if (sec[1] == 0x02) { /* program map section */ + pmt_pid = pid; + return 0; + } + } + return -1; +} +//---------------------------------------------------------------------- + int Mpeg::get_offset(off_t &offset, pts_t &pts, int marg) { calc_length(); @@ -508,3 +569,127 @@ int Mpeg::calc_length() return m_duration; } //---------------------------------------------------------------------- + +namespace eCacheID { + enum { + cVPID = 0, + cAPID, + cTPID, + cPCRPID, + cAC3PID, + cVTYPE, + cACHANNEL, + cAC3DELAY, + cPCMDELAY, + cSUBTITLE, + cacheMax + }; +}; + +/* f:40,c:00007b,c:01008f,c:03007b */ +bool Mpeg::read_ts_meta(std::string media_file_name, int &vpid, int &apid) +{ + std::string metafilename = media_file_name; + metafilename += ".meta"; + + std::ifstream ifs(metafilename.c_str()); + + if (!ifs.is_open()) { + DEBUG("metadata is not exists.."); + return false; + } + + size_t rc = 0, i = 0; + char buffer[1024] = {0}; + while (!ifs.eof()) { + ifs.getline(buffer, 1024); + if (i++ == 7) { + DEBUG("%d [%s]", i, buffer); + std::vector<string> tokens; + Util::split(buffer, ',', tokens); + if(tokens.size() < 3) { + DEBUG("pid count size error : %d", tokens.size()); + return false; + } + + int setting_done = false; + for (int ii = 0; ii < tokens.size(); ++ii) { + std::string token = tokens[ii]; + if(token.length() <= 0) continue; + if(token.at(0) != 'c') continue; + + int cache_id = atoi(token.substr(2,2).c_str()); + DEBUG("token : %d [%s], chcke_id : [%d]", ii, token.c_str(), cache_id); + switch(cache_id) { + case(eCacheID::cVPID): + vpid = strtol(token.substr(4,4).c_str(), NULL, 16); + DEBUG("video pid : %d", vpid); + setting_done = (vpid != -1 && apid != -1) ? true : false; + break; + case(eCacheID::cAC3PID): + apid = strtol(token.substr(4,4).c_str(), NULL, 16); + DEBUG("audio pid : %d", apid); + break; + case(eCacheID::cAPID): + apid = strtol(token.substr(4,4).c_str(), NULL, 16); + DEBUG("audio pid : %d", apid); + setting_done = (vpid != -1 && apid != -1) ? true : false; + break; + } + if(setting_done) break; + } + break; + } + } + ifs.close(); + return true; +} +//------------------------------------------------------------------------------- + +Mpeg::Mpeg(std::string filename, bool request_time_seek) throw (trap) +// : MpegTS(filename) +{ + m_current_offset = m_base_offset = m_last_offset = 0; + m_splitsize = m_nrfiles = m_current_file = m_totallength = 0; + + m_pts_begin = m_pts_end = m_offset_begin = m_offset_end = 0; + m_last_filelength = m_begin_valid = m_end_valid = m_futile =0; + + m_duration = m_samples_taken = 0; + m_is_initialized = false; + + pmt_pid = video_pid = audio_pid = -1; + + fd = open(filename.c_str(), O_RDONLY | O_LARGEFILE, 0); + if (fd < 0) { + throw(trap("cannot open file")); + } + + struct stat filestat; + if (fstat(fd, &filestat)) { + throw(trap("MpegTS::init: cannot stat")); + } + INFO("file length: %lld Mb", filestat.st_size >> 20); + + stream_length = filestat.st_size; + + int apid = -1, vpid = -1; + if (read_ts_meta(filename, vpid, apid)) { + if (vpid != -1 && apid != -1) { + video_pid = vpid; + audio_pid = apid; + m_is_initialized = true; + + /*find_pmt();*/ + return; + } + } +} +//---------------------------------------------------------------------- + +off_t Mpeg::seek_absolute(off_t offset2) +{ + off_t result = seek_internal(offset2, SEEK_SET); + return result; +} +//---------------------------------------------------------------------- @@ -8,15 +8,19 @@ #ifndef MPEG_H_ #define MPEG_H_ +#include <map> +#include <string> + +#include "Source.h" +#include "Logger.h" #include "3rdparty/trap.h" -#include "3rdparty/mpegts.h" //---------------------------------------------------------------------- class HttpHeader; typedef long long pts_t; -class Mpeg : public MpegTS +class Mpeg : public Source { private: off_t m_splitsize, m_totallength, m_current_offset, m_base_offset, m_last_offset; @@ -34,6 +38,9 @@ private: int m_duration; + int fd; + bool m_is_initialized; + void scan(); int switch_offset(off_t off); @@ -49,25 +56,25 @@ private: void take_samples(); int take_sample(off_t off, pts_t &p); - off_t seek_internal(off_t offset, int whence); ssize_t read_internal(off_t offset, void *buf, size_t count); -public: - Mpeg(std::string filename, bool request_time_seek) throw (trap) - : MpegTS(filename, request_time_seek) - { - m_current_offset = m_base_offset = m_last_offset = 0; - m_splitsize = m_nrfiles = m_current_file = m_totallength = 0; + off_t seek_absolute(off_t offset); + off_t seek_internal(off_t offset, int whence); + + bool read_ts_meta(std::string media_file_name, int &vpid, int &apid); - m_pts_begin = m_pts_end = m_offset_begin = m_offset_end = 0; - m_last_filelength = m_begin_valid = m_end_valid = m_futile =0; + int find_pmt(); - m_duration = m_samples_taken = 0; - } +public: + off_t stream_length; + int pmt_pid, video_pid, audio_pid; + Mpeg(std::string filename, bool request_time_seek) throw (trap); virtual ~Mpeg() throw () {} void seek(HttpHeader &header); + bool is_initialized() { return m_is_initialized; } + int get_fd() const throw() { return fd; } }; //---------------------------------------------------------------------- diff --git a/src/Source.h b/src/Source.h index 399fa54..81dbb13 100644 --- a/src/Source.h +++ b/src/Source.h @@ -17,6 +17,7 @@ public: Source(){} virtual ~Source(){} virtual int get_fd() const throw() = 0; + virtual bool is_initialized() = 0; }; //---------------------------------------------------------------------- diff --git a/src/Util.cpp b/src/Util.cpp index 639cdfb..ad54464 100644 --- a/src/Util.cpp +++ b/src/Util.cpp @@ -138,8 +138,9 @@ std::vector<int> Util::find_process_by_name(std::string name, int mypid) void Util::kill_process(int pid) { - int result = kill(pid, SIGINT); + int result = 0; + + result = kill(pid, SIGINT); DEBUG("SEND SIGINT to %d, result : %d", pid, result); - //sleep(1); } //---------------------------------------------------------------------- diff --git a/src/main.cpp b/src/main.cpp index 93197bd..fdf2263 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -15,6 +15,9 @@ #include <string> +#include <sys/types.h> +#include <sys/stat.h> + #include "Util.h" #include "Logger.h" @@ -36,12 +39,123 @@ void signal_handler(int sig_no); void *source_thread_main(void *params); void *streaming_thread_main(void *params); -int streaming_write(const char *buffer, size_t buffer_len, bool enable_log = false); -//---------------------------------------------------------------------- +static Source *source = 0; +static Encoder *encoder = 0; static bool is_terminated = true; static int source_thread_id, stream_thread_id; static pthread_t source_thread_handle, stream_thread_handle; + +#define TSP_CHECKER_TEMPLETE "/tmp/tsp_status.%d" + +pid_t tsp_pid = 0, checker_pid = 0; +unsigned long last_updated_time = 0; +//---------------------------------------------------------------------- + +bool terminated() +{ + return is_terminated; +} +//---------------------------------------------------------------------- + +void cbexit() +{ + INFO("release resource start"); + if (encoder) { delete encoder; encoder = 0; } + if (source) { delete source; source = 0; } + + char checker_filename[255] = {0}; + if (tsp_pid) { + ::sprintf(checker_filename, TSP_CHECKER_TEMPLETE, tsp_pid); + if (::access(checker_filename, F_OK) == 0) { + ::unlink(checker_filename); + } + } + INFO("release resource finish"); +} +//---------------------------------------------------------------------- + +inline int streaming_write(const char *buffer, size_t buffer_len, bool enable_log = false) +{ + if (enable_log) { + DEBUG("response data :\n%s", buffer); + } + return write(1, buffer, buffer_len); +} +//---------------------------------------------------------------------- + +#define DD_LOG(X,...) { \ + char log_message[128] = {0};\ + sprintf(log_message, "echo \""X"\" > /tmp/tsp_checker.log", ##__VA_ARGS__);\ + system(log_message);\ + } +//---------------------------------------------------------------------- + +int send_signal(pid_t pid, int signal) +{ + char process_path[255] = {0}; + sprintf(process_path, "/proc/%d", pid); + + if (access(process_path, F_OK) == 0) { + kill(pid, signal); + DD_LOG(" >> run kill-pid : %ld -> %ld (%d)", getpid(), pid, signal); + } + return 0; +} +//---------------------------------------------------------------------- + +void signal_handler_checker(int sig_no) +{ + is_terminated = true; +} +//---------------------------------------------------------------------- + +int tsp_checker(pid_t pid) +{ + char check_filename[255] = {0}; + sleep(1); + sprintf(check_filename, TSP_CHECKER_TEMPLETE, ::getppid()); + + int timebase_count = 0, exit_count = 0; + while(!is_terminated) { + if (timebase_count != 10) { + timebase_count++; + } + else { + if (access(check_filename, F_OK) != 0) { + send_signal(tsp_pid, SIGUSR2); + DD_LOG("no found %s, %d", check_filename, timebase_count); + break; + } + } + + struct stat sb; + stat(check_filename, &sb); + + if (last_updated_time == sb.st_ctime && timebase_count == 10) { + if (exit_count > 2) { + send_signal(tsp_pid, SIGUSR2); + DD_LOG("%ld == %ld", last_updated_time, sb.st_ctime); + break; + } + exit_count++; + sleep(1); + continue; + } + exit_count = 0; + last_updated_time = sb.st_ctime; + sleep(1); + } + unlink(check_filename); + + DD_LOG("kill (%ld)", tsp_pid); + + sleep(3); + send_signal(tsp_pid, SIGKILL); + sleep(2); + + return 0; +} //---------------------------------------------------------------------- int main(int argc, char **argv) @@ -51,9 +165,21 @@ int main(int argc, char **argv) show_help(); exit(0); } - Logger::instance()->init("/tmp/transtreamproxy", Logger::WARNING); + tsp_pid = ::getpid(); + + signal(SIGUSR1, signal_handler_checker); + + Logger::instance()->init("/tmp/transtreamproxy", Logger::ERROR); + signal(SIGINT, signal_handler); + signal(SIGSEGV, signal_handler); + signal(SIGUSR2, signal_handler); + + atexit(cbexit); + + is_terminated = false; + + char update_status_command[255] = {0}; - signal(SIGINT, signal_handler); HttpHeader header; std::string req = HttpHeader::read_request(); @@ -73,10 +199,6 @@ int main(int argc, char **argv) throw(http_trap("not support request type.", 400, "Bad Request, not support request")); } - Encoder encoder; - Source *source = 0; - ThreadParams thread_params = { 0, &encoder, &header }; - int video_pid = 0, audio_pid = 0, pmt_pid = 0; switch(header.type) { @@ -95,6 +217,15 @@ int main(int argc, char **argv) break; case HttpHeader::TRANSCODING_LIVE: try { + checker_pid = ::fork(); + if (checker_pid == 0) { + tsp_checker(checker_pid); + exit(0); + } + + sprintf(update_status_command, "touch "TSP_CHECKER_TEMPLETE, tsp_pid); + system(update_status_command); + Demuxer *dmx = new Demuxer(&header); pmt_pid = dmx->pmt_pid; video_pid = dmx->video_pid; @@ -119,13 +250,17 @@ int main(int argc, char **argv) default: throw(http_trap(std::string("not support source type : ") + Util::ultostr(header.type), 400, "Bad Request")); } - thread_params.source = source; - if (!encoder.retry_open(2, 3)) { + encoder = new Encoder(); + int encoder_retry_max_count = 1; + if (header.type == HttpHeader::TRANSCODING_FILE) { + encoder_retry_max_count = 2; + } + if (!encoder->retry_open(encoder_retry_max_count, 3)) { throw(http_trap("encoder open fail.", 503, "Service Unavailable")); } - if (encoder.state == Encoder::ENCODER_STAT_OPENED) { + if (encoder->state == Encoder::ENCODER_STAT_OPENED) { std::string response = header.build_response((Mpeg*) source); if (response == "") { throw(http_trap("response build fail.", 503, "Service Unavailable")); @@ -137,37 +272,58 @@ int main(int argc, char **argv) ((Mpeg*) source)->seek(header); } - if (!encoder.ioctl(Encoder::IOCTL_SET_VPID, video_pid)) { - throw(http_trap("video pid setting fail.", 503, "Service Unavailable")); - } - if (!encoder.ioctl(Encoder::IOCTL_SET_APID, audio_pid)) { - throw(http_trap("audio pid setting fail.", 503, "Service Unavailable")); - } - if (!encoder.ioctl(Encoder::IOCTL_SET_PMTPID, pmt_pid)) { - throw(http_trap("pmt pid setting fail.", 503, "Service Unavailable")); + if (source->is_initialized()) { + if (!encoder->ioctl(Encoder::IOCTL_SET_VPID, video_pid)) { + throw(http_trap("video pid setting fail.", 503, "Service Unavailable")); + } + if (!encoder->ioctl(Encoder::IOCTL_SET_APID, audio_pid)) { + throw(http_trap("audio pid setting fail.", 503, "Service Unavailable")); + } + + if (pmt_pid != -1) { + if (!encoder->ioctl(Encoder::IOCTL_SET_PMTPID, pmt_pid)) { + throw(http_trap("pmt pid setting fail.", 503, "Service Unavailable")); + } + } } } - is_terminated = false; - source_thread_id = pthread_create(&source_thread_handle, 0, source_thread_main, (void *)&thread_params); + if (header.type == HttpHeader::TRANSCODING_LIVE) { + ((Demuxer*)source)->open(); + if (((Demuxer*)source)->get_fd() < 0) { + throw(http_trap("demux open fail!!", 503, "Service Unavailable")); + } + } + source_thread_id = pthread_create(&source_thread_handle, 0, source_thread_main, 0); if (source_thread_id < 0) { is_terminated = true; throw(http_trap("souce thread create fail.", 503, "Service Unavailable")); } else { pthread_detach(source_thread_handle); - if (!encoder.ioctl(Encoder::IOCTL_START_TRANSCODING, 0)) { + if (!source->is_initialized()) { + sleep(1); + } + + if (!encoder->ioctl(Encoder::IOCTL_START_TRANSCODING, 0)) { is_terminated = true; throw(http_trap("start transcoding fail.", 503, "Service Unavailable")); } else { - stream_thread_id = pthread_create(&stream_thread_handle, 0, streaming_thread_main, (void *)&thread_params); + stream_thread_id = pthread_create(&stream_thread_handle, 0, streaming_thread_main, 0); if (stream_thread_id < 0) { is_terminated = true; throw(http_trap("stream thread create fail.", 503, "Service Unavailable")); } } } + + while(!is_terminated) { + system(update_status_command); + sleep(1); + } + + send_signal(checker_pid, SIGUSR1); pthread_join(stream_thread_handle, 0); is_terminated = true; @@ -186,14 +342,17 @@ int main(int argc, char **argv) error = HttpUtil::http_error(e.http_error, e.http_header); } streaming_write(error.c_str(), error.length(), true); + send_signal(checker_pid, SIGUSR1); exit(-1); } catch (...) { ERROR("unknown exception..."); std::string error = HttpUtil::http_error(400, "Bad request"); streaming_write(error.c_str(), error.length(), true); + send_signal(checker_pid, SIGUSR1); exit(-1); } + send_signal(checker_pid, SIGUSR1); return 0; } //---------------------------------------------------------------------- @@ -203,39 +362,38 @@ void *streaming_thread_main(void *params) if (is_terminated) return 0; INFO("streaming thread start."); - Encoder *encoder = ((ThreadParams*) params)->encoder; - HttpHeader *header = ((ThreadParams*) params)->request; try { - int poll_state, rc, wc; - struct pollfd poll_fd[2]; unsigned char buffer[BUFFFER_SIZE]; - poll_fd[0].fd = encoder->get_fd(); - poll_fd[0].events = POLLIN | POLLHUP; - while(!is_terminated) { - poll_state = poll(poll_fd, 1, 1000); + int rc = 0, wc = 0; + struct pollfd poll_fd[2]; + poll_fd[0].fd = encoder->get_fd(); + poll_fd[0].events = POLLIN | POLLHUP; + + int poll_state = ::poll(poll_fd, 1, 1000); if (poll_state == -1) { throw(trap("poll error.")); } - else if (poll_state == 0) { - continue; - } if (poll_fd[0].revents & POLLIN) { rc = wc = 0; - rc = read(encoder->get_fd(), buffer, BUFFFER_SIZE - 1); + rc = ::read(encoder->get_fd(), buffer, BUFFFER_SIZE - 1); + + //DEBUG("%d bytes read..", rc); + if (rc <= 0) { - break; + continue; } - else if (rc > 0) { + else { wc = streaming_write((const char*) buffer, rc); if (wc < rc) { //DEBUG("need rewrite.. remain (%d)", rc - wc); int retry_wc = 0; - for (int remain_len = rc - wc; rc != wc; remain_len -= retry_wc) { - poll_fd[0].revents = 0; - + for (int remain_len = rc - wc; (rc != wc) && (!is_terminated); remain_len -= retry_wc) { + if (is_terminated) { + throw(trap("terminated")); + } retry_wc = streaming_write((const char*) (buffer + rc - remain_len), remain_len); wc += retry_wc; } @@ -243,115 +401,104 @@ void *streaming_thread_main(void *params) } } } - else if (poll_fd[0].revents & POLLHUP) - { + else if (poll_fd[0].revents & POLLHUP) { if (encoder->state == Encoder::ENCODER_STAT_STARTED) { DEBUG("stop transcoding.."); encoder->ioctl(Encoder::IOCTL_STOP_TRANSCODING, 0); } break; } + usleep(0); } } catch (const trap &e) { - ERROR("%s %s (%d)", e.what(), strerror(errno), errno); + ERROR("%s %s (%d)", e.what(), ::strerror(errno), errno); } is_terminated = true; INFO("streaming thread stop."); - if (encoder->state == Encoder::ENCODER_STAT_STARTED) { - DEBUG("stop transcoding.."); - encoder->ioctl(Encoder::IOCTL_STOP_TRANSCODING, 0); - } - pthread_exit(0); return 0; } //---------------------------------------------------------------------- -void *source_thread_main(void *params) +void *source_thread_main(void* params) { - Source *source = ((ThreadParams*) params)->source; - Encoder *encoder = ((ThreadParams*) params)->encoder; - HttpHeader *header = ((ThreadParams*) params)->request; INFO("source thread start."); try { int poll_state, rc, wc; - struct pollfd poll_fd[2]; + struct pollfd poll_fd_enc[1]; + struct pollfd poll_fd_src[1]; unsigned char buffer[BUFFFER_SIZE]; - poll_fd[0].fd = encoder->get_fd(); - poll_fd[0].events = POLLOUT; - - poll_fd[1].fd = source->get_fd(); - poll_fd[1].events = POLLIN; + poll_fd_enc[0].fd = encoder->get_fd(); + poll_fd_enc[0].events = POLLOUT; + poll_fd_src[0].fd = source->get_fd(); + poll_fd_src[0].events = POLLIN; while(!is_terminated) { - poll_state = poll(poll_fd, 2, 1000); + poll_state = poll(poll_fd_src, 1, 1000); if (poll_state == -1) { throw(trap("poll error.")); } - else if (poll_state == 0) { - continue; - } - - if (poll_fd[0].revents & POLLOUT) { - rc = wc = 0; - if (poll_fd[1].revents & POLLIN) { - rc = read(source->get_fd(), buffer, BUFFFER_SIZE - 1); - if (rc == 0) { - break; - } - else if (rc > 0) { + if (poll_fd_src[0].revents & POLLIN) { + rc =::read(source->get_fd(), buffer, BUFFFER_SIZE - 1); + if (rc <= 0) { + continue; + } + else if (rc > 0) { + poll_fd_enc[0].revents = 0; + poll_state = poll(poll_fd_enc, 1, 1000); + if (poll_fd_enc[0].revents & POLLOUT) { wc = write(encoder->get_fd(), buffer, rc); - //DEBUG("write : %d", wc); if (wc < rc) { - //DEBUG("need rewrite.. remain (%d)", rc - wc); int retry_wc = 0; - for (int remain_len = rc - wc; rc != wc; remain_len -= retry_wc) { - poll_fd[0].revents = 0; + for (int remain_len = rc - wc; (rc != wc) && (!is_terminated); remain_len -= retry_wc) { + if (is_terminated) { + throw(trap("terminated")); + } + poll_fd_enc[0].revents = 0; + poll_state = poll(poll_fd_enc, 1, 1000); + if (poll_state == -1) { + throw(trap("poll error.")); + } - poll_state = poll(poll_fd, 1, 1000); - if (poll_fd[0].revents & POLLOUT) { - retry_wc = write(encoder->get_fd(), (buffer + rc - remain_len), remain_len); + if (poll_fd_enc[0].revents & POLLOUT) { + retry_wc = ::write(encoder->get_fd(), (buffer + rc - remain_len), remain_len); wc += retry_wc; } + LOG("re-write result : %d - %d", wc, rc); } - LOG("re-write result : %d - %d", wc, rc); - usleep(500000); } } } } + usleep(0); } } catch (const trap &e) { - ERROR("%s %s (%d)", e.what(), strerror(errno), errno); + ERROR("%s %s (%d)", e.what(), ::strerror(errno), errno); } INFO("source thread stop."); - pthread_exit(0); + pthread_exit(0); return 0; } //---------------------------------------------------------------------- -int streaming_write(const char *buffer, size_t buffer_len, bool enable_log) -{ - if (enable_log) { - DEBUG("response data :\n%s", buffer); - } - return write(1, buffer, buffer_len); -} -//---------------------------------------------------------------------- - void signal_handler(int sig_no) { - INFO("signal no : %d", sig_no); + ERROR("signal no : %s (%d)", strsignal(sig_no), sig_no); is_terminated = true; + cbexit(); + + if (sig_no == SIGSEGV) { + exit(0); + } } //---------------------------------------------------------------------- @@ -365,3 +512,4 @@ void show_help() printf(" ex > echo \"4\" > /tmp/.debug_on\n"); } //---------------------------------------------------------------------- + |