summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoroskwon <oskwon@dev3>2014-10-27 10:49:06 (GMT)
committeroskwon <oskwon@dev3>2014-10-27 10:49:06 (GMT)
commit280ea227fe29cc8fc1d9ef23419a499d72f29d00 (patch)
tree3a84ae560521408f8cd717d1140b1d6a8d0696f9
parentb55a219498eeceb63a98dcfdc7597b7c40d6977c (diff)
Fix hanup issue and reduce cpu usage.
-rw-r--r--.gitignore2
-rw-r--r--build/Makefile4
-rw-r--r--example/Makefile93
-rwxr-xr-xexample/scripts/ck.sh8
-rwxr-xr-xexample/scripts/dbg.sh8
-rwxr-xr-xexample/scripts/ps.sh8
-rw-r--r--example/stress_full.cpp104
-rw-r--r--src/3rdparty/mpegts.cpp735
-rw-r--r--src/3rdparty/mpegts.h216
-rw-r--r--src/Demuxer.cpp61
-rw-r--r--src/Demuxer.h4
-rw-r--r--src/Encoder.cpp14
-rw-r--r--src/Encoder.h2
-rw-r--r--src/Http.cpp12
-rw-r--r--src/Logger.cpp19
-rw-r--r--src/Makefile.am1
-rw-r--r--src/Mpeg.cpp185
-rw-r--r--src/Mpeg.h33
-rw-r--r--src/Source.h1
-rw-r--r--src/Util.cpp5
-rw-r--r--src/main.cpp332
21 files changed, 752 insertions, 1095 deletions
diff --git a/.gitignore b/.gitignore
index 44ddef9..9fbd4b0 100644
--- a/.gitignore
+++ b/.gitignore
@@ -24,3 +24,5 @@ stamp-h1
config.h.in~
config.mk
config.mk.bak
+example/bin
+example/obj
diff --git a/build/Makefile b/build/Makefile
index 2408020..a07c88b 100644
--- a/build/Makefile
+++ b/build/Makefile
@@ -20,8 +20,8 @@ TOP=$(PWD)/..
OBJ=./obj/
ifeq ($(CROSS),YES)
-SYSROOT=$(OETOP)/$(MODEL)/build/tmp/staging/mipsel-oe-linux
-TOOLCHAIN=$(OETOP)/$(MODEL)/build/tmp/cross/mipsel/bin/mipsel-oe-linux-
+SYSROOT=$(OETOP)/build/$(MODEL)/tmp/sysroots/$(MODEL)
+TOOLCHAIN=$(OETOP)/build/$(MODEL)/tmp/sysroots/i686-linux/usr/bin/mips32el-oe-linux/mipsel-oe-linux-
endif
RM=rm -Rf
diff --git a/example/Makefile b/example/Makefile
new file mode 100644
index 0000000..5d5dc57
--- /dev/null
+++ b/example/Makefile
@@ -0,0 +1,93 @@
+#============================================================================
+# Name : Makefile (transtreamproxy)
+# Author : oskwon(oskwon@dev3)
+# Version :
+# Copyright : Copyright(c)2014 Vu+ Team. All right reserved.
+# Description :
+#============================================================================
+
+-include ../build/config.mk
+
+ifeq ($(MODEL),)
+$(error config.mk is not set. please run script.config before make.)
+endif
+
+MAJOR = 3
+MINOR = 0
+
+TOP=$(PWD)/..
+OBJ=./obj/
+
+ifeq ($(CROSS),YES)
+SYSROOT=$(OETOP)/build/$(MODEL)/tmp/sysroots/$(MODEL)
+TOOLCHAIN=$(OETOP)/build/$(MODEL)/tmp/sysroots/i686-linux/usr/bin/mips32el-oe-linux/mipsel-oe-linux-
+endif
+
+RM=rm -Rf
+CXX=$(TOOLCHAIN)g++
+LD=$(TOOLCHAIN)ld
+STRIP=$(TOOLCHAIN)strip
+UPLOAD=./script.upload
+
+ifeq ($(MODE),DEBUG)
+CFLAGS += -g
+else
+CFLAGS += -O2
+endif
+CFLAGS += -D_MAJOR=$(MAJOR) -D_MINOR=$(MINOR) -D_GNU_SOURCE -D_FILE_OFFSET_BITS=64
+CFLAGS += -I$(SYSROOT)/usr/include
+LDFLAGS += -L$(SYSROOT)/usr/lib -lpthread -lrt
+
+SRCS = ../src/3rdparty/trap.cpp
+SRCS+= ../src/Demuxer.cpp
+SRCS+= ../src/Encoder.cpp
+SRCS+= ../src/Http.cpp
+SRCS+= ../src/Logger.cpp
+SRCS+= ../src/Mpeg.cpp
+SRCS+= ../src/UriDecoder.cpp
+SRCS+= ../src/Util.cpp
+
+OBJS = $(SRCS:.cpp=.o)
+CFLAGS += $(addprefix -I, $(shell find ../src/ -type d))
+
+.SUFFIXES : .cpp .o
+.PHONY : all clean install .showinfo .prepare $(PROJECT)
+
+.cpp.o:
+ @echo "Compile... ["$<"]"
+ @$(CXX) -c $(CFLAGS) -o $(OBJ)$(notdir $@) $<
+
+all: .showinfo .prepare $(OBJS) stress_full
+
+demux_loop: .prepare
+ @echo "Link... ["$@"]"
+ @$(CXX) $(CFLAGS) -o bin/$@ $@.cpp $(addprefix $(OBJ), $(notdir $(OBJS))) $(LDFLAGS)
+
+stress_full: .prepare
+ @echo "Link... ["$@"]"
+ @g++ -o bin/$@ $@.cpp
+
+transcoding_file: .prepare
+ @echo "Link... ["$@"]"
+ @$(CXX) $(CFLAGS) -o bin/$@ $@.cpp $(LDFLAGS)
+
+clean:
+ $(RM) $(PROJECT) obj bin *.o *.a *.d *.log
+
+.prepare:
+ @if [ ! -e obj ]; then mkdir obj; fi
+ @if [ ! -e bin ]; then mkdir bin; fi
+
+.showinfo:
+ @echo "-----------------------------------------------------"
+ @echo " [ BUILD ENVIRONMENT ] "
+ @echo "-----------------------------------------------------"
+ @echo "PROJECT : "$(PROJECT)" (v"$(MAJOR)"."$(MINOR)")"
+ @echo ""
+ @echo "CXX : "$(CXX)
+ @echo "LD : "$(LD)
+ @echo "STRIP : "$(STRIP)
+ @echo "CFLAGS : "$(CFLAGS)
+ @echo "LDFLAGS : "$(LDFLAGS)
+ @echo "-----------------------------------------------------"
+ @echo
diff --git a/example/scripts/ck.sh b/example/scripts/ck.sh
new file mode 100755
index 0000000..8086068
--- /dev/null
+++ b/example/scripts/ck.sh
@@ -0,0 +1,8 @@
+#!/bin/sh
+
+while [ 1 ]; do
+ clear
+ ls -la /tmp/tsp_status.*
+ sleep 1
+done
+
diff --git a/example/scripts/dbg.sh b/example/scripts/dbg.sh
new file mode 100755
index 0000000..babce17
--- /dev/null
+++ b/example/scripts/dbg.sh
@@ -0,0 +1,8 @@
+#!/bin/sh
+
+ehco 5 > /tmp/.debug_on
+
+touch /tmp/transtreamproxy.log
+
+tail -f /tmp/transtreamproxy.log
+
diff --git a/example/scripts/ps.sh b/example/scripts/ps.sh
new file mode 100755
index 0000000..1180b28
--- /dev/null
+++ b/example/scripts/ps.sh
@@ -0,0 +1,8 @@
+#!/bin/sh
+
+while [ 1 ]; do
+ clear
+ ps -ef | grep transtreamproxy | grep -v grep | grep -v tail
+ sleep 1
+done
+
diff --git a/example/stress_full.cpp b/example/stress_full.cpp
new file mode 100644
index 0000000..f5ee3aa
--- /dev/null
+++ b/example/stress_full.cpp
@@ -0,0 +1,104 @@
+/*
+ * stress.cpp
+ *
+ * Created on: 2014. 10. 17.
+ * Author: oskwon
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <signal.h>
+#include <string.h>
+
+#define MAX_INTERVAL_LENGTH (32)
+#define DD_LOG(X,...) { printf(X" (%s:%d)\n", ##__VA_ARGS__, __FUNCTION__, __LINE__); }
+//-------------------------------------------------------------------------------
+
+#define PORT 8002
+#define IP "192.168.100.240"
+
+int intervals[MAX_INTERVAL_LENGTH+1] = {
+ 2,4,6,1,4,2,15,9,
+ 2,1,2,2,2,2,15,3,
+ 2,2,2,2,2,2,2,2,
+ 2,2,2,2,2,2,2,2,
+ 0
+};
+const char* services[] = {
+ "1:0:19:2B66:3F3:1:C00000:0:0:0:",
+ "1:0:19:2B7A:3F3:1:C00000:0:0:0:",
+ ""
+};
+
+pid_t child_pid = 0;
+//-------------------------------------------------------------------------------
+
+void child_sigint_handler( int signo)
+{
+ exit(0);
+}
+//-------------------------------------------------------------------------------
+
+void parent_sigint_handler( int signo)
+{
+ kill(child_pid, SIGINT);
+ exit(0);
+}
+//-------------------------------------------------------------------------------
+
+void command_execute(char* command)
+{
+ DD_LOG("excute... [%s]", command);
+ system(command);
+}
+//-------------------------------------------------------------------------------
+
+void child_main(const char* ip, int port, const char* service)
+{
+ char command[2048];
+
+ signal(SIGUSR2, child_sigint_handler);
+
+ sprintf(command, "curl http://%s/web/zap?sRef=%s", ip, service);
+ command_execute(command);
+
+ sprintf(command, "curl http://%s:%d/%s > /dev/null", ip, port, service);
+ command_execute(command);
+}
+//-------------------------------------------------------------------------------
+
+int main(int argc, char** argv)
+{
+ signal(SIGINT, parent_sigint_handler);
+
+ for (int idx = 0; idx < 500; idx++) {
+ DD_LOG("\e[1;32m==========================================>> try : %d\e[00m", idx);
+
+ child_pid = fork();
+ if (child_pid < 0) {
+ DD_LOG("fork fail");
+ return 0;
+ }
+
+ if (child_pid == 0) {
+ if (argc == 1) {
+ child_main(IP, PORT, services[idx%2]);
+ }
+ else {
+ child_main(argv[1], PORT, services[idx%2]);
+ }
+ return 0;
+ }
+
+ usleep(intervals[idx % MAX_INTERVAL_LENGTH] * 1000*1000);
+ kill(child_pid, SIGINT);
+
+ if (idx == 499) {
+ idx = 0;
+ }
+ }
+ return 0;
+}
+//-------------------------------------------------------------------------------
+
diff --git a/src/3rdparty/mpegts.cpp b/src/3rdparty/mpegts.cpp
deleted file mode 100644
index cc90959..0000000
--- a/src/3rdparty/mpegts.cpp
+++ /dev/null
@@ -1,735 +0,0 @@
-//#include "config.h" // oskwon
-#include "trap.h"
-
-#include "mpegts.h"
-//#include "util.h" // oskwon
-#include "../Util.h" // oskwon
-
-#include <unistd.h>
-#include <stddef.h>
-#include <fcntl.h>
-#include <sys/stat.h>
-
-#include <boost/crc.hpp>
-
-#include <string>
-using std::string;
-
-#include <boost/algorithm/string.hpp>
-
-MpegTS::MpegTS(int fd_in, bool request_time_seek_in) throw(trap)
- : private_fd(false),
- fd(fd_in),
- request_time_seek(request_time_seek_in)
-{
- init();
-}
-
-MpegTS::MpegTS(string filename, bool request_time_seek_in) throw(trap)
- : private_fd(true),
- request_time_seek(request_time_seek_in)
-{
- // oskwon : add option O_LARGEFILE.
- if((fd = open(filename.c_str(), O_RDONLY | O_LARGEFILE, 0)) < 0)
- throw(trap("MpegTS::MpegTS: cannot open file"));
-
- init();
-}
-
-MpegTS::~MpegTS() throw()
-{
- if(private_fd)
- close(fd);
-}
-
-void MpegTS::init() throw(trap)
-{
- mpegts_pat_t::const_iterator it;
- struct stat filestat;
-
- if(fstat(fd, &filestat))
- throw(trap("MpegTS::init: cannot stat"));
-
- Util::vlog("MpegTS::init: file length: %lld Mb", filestat.st_size >> 20);
-
- eof_offset = stream_length = filestat.st_size;
- eof_offset = (eof_offset / sizeof(ts_packet_t)) * sizeof(ts_packet_t);
-
- if(!read_pat())
- throw(trap("MpegTS::init: invalid transport stream (no suitable pat)"));
-
- for(it = pat.begin(); it != pat.end(); it++)
- if(read_pmt(it->second))
- break;
-
- if(it == pat.end())
- throw(trap("MpegTS::init: invalid transport stream (no suitable pmt)"));
-
- pmt_pid = it->second;
- is_time_seekable = false;
-
- if(request_time_seek)
- {
- Util::vlog("MpegTS: start find pcr");
-
- if(lseek(fd, 0, SEEK_SET) == (off_t)-1)
- Util::vlog("MpegTS::init seek to sof fails");
- else
- {
- Util::vlog("MpegTS: start find first pcr");
-
- first_pcr_ms = find_pcr_ms(direction_forward);
-
- if(lseek(fd, eof_offset, SEEK_SET) == (off_t)-1)
- Util::vlog("MpegTS::init: seek to eof fails");
- else
- {
- Util::vlog("MpegTS: start find last pcr");
- last_pcr_ms = find_pcr_ms(direction_backward);
-
- if(last_pcr_ms < first_pcr_ms)
- Util::vlog("MpegTS::init: pcr wraparound, cannot seek this stream, first pcr: %d, last pcr: %d", first_pcr_ms / 1000, last_pcr_ms / 1000);
- else
- if((first_pcr_ms >= 0) && (last_pcr_ms >= 0))
- is_time_seekable = true;
- }
- }
-
- Util::vlog("MpegTS: find pcr done");
- }
-
- if(!is_time_seekable)
- {
- first_pcr_ms = -1;
- last_pcr_ms = -1;
- }
-
- if(lseek(fd, 0, SEEK_SET) == (off_t)-1)
- Util::vlog("MpegTS::init: seek to sof fails");
-
- //Util::vlog("first_pcr_ms = %d", first_pcr_ms);
- //Util::vlog("last_pcr_ms = %d", last_pcr_ms);
- //Util::vlog("eof_offset is at %lld", eof_offset);
-}
-
-bool MpegTS::read_table(int filter_pid, int filter_table) throw(trap)
-{
- typedef boost::crc_optimal<32, 0x04c11db7, 0xffffffff, 0x0, false, false> boost_mpeg_crc_t;
-
- boost_mpeg_crc_t my_crc;
- mpeg_crc_t mpeg_crc;
- uint32_t their_crc;
-
- ts_packet_t packet;
- const section_table_header_t *table;
- const section_table_syntax_t *syntax;
- const uint8_t *payload_data;
-
- int timeout;
- int section_length = -1;
- int section_length_remaining = -1;
- int pid;
- int cc = -1;
- int packet_payload_offset;
- int payload_length;
-
- raw_table_data.clear();
- table_data.clear();
-
- for(timeout = 0; timeout < 2000; timeout++)
- {
- if(read(fd, (void *)&packet, sizeof(packet)) != sizeof(packet))
- throw(trap("MpegTS::read_table: read error"));
-
- if(packet.header.sync_byte != MpegTS::sync_byte_value)
- throw(trap("MpegTS::read_table: no sync byte found"));
-
- pid = (packet.header.pid_high << 8) | (packet.header.pid_low);
-
- if(pid != filter_pid)
- continue;
-
- if(packet.header.tei)
- continue;
-
- if(!packet.header.payload_present)
- continue;
-
- if((cc != -1) && (cc != packet.header.cc))
- {
- //Util::vlog("MpegTS::read_table discontinuity: %d/%d", cc, packet.header.cc);
- goto retry;
- }
-
- cc = (packet.header.cc + 1) & 0x0f;
-
- if(packet.header.pusi)
- {
- if(table_data.length() > 0)
- {
- //Util::vlog("MpegTS::read_table: start payload upexpected");
- goto retry;
- }
- }
- else
- {
- if(table_data.length() <= 0)
- {
- //Util::vlog("MpegTS::read_table: continue payload unexpected");
- goto retry;
- }
- }
-
- //Util::vlog("MpegTS::read_table: correct packet with pid: %x, %s", pid, packet.header.pusi ? "start" : "continuation");
-
- packet_payload_offset = offsetof(ts_packet_t, header.payload);
-
- //Util::vlog("MpegTS::read_table: payload offset: %d", packet_payload_offset);
-
- if(packet.header.af)
- packet_payload_offset = offsetof(ts_packet_t, header.afield) + packet_payload_offset;
-
- //Util::vlog("MpegTS::read_table: payload offset after adaptation field: %d", packet_payload_offset);
-
- if(packet.header.pusi)
- packet_payload_offset += packet.byte[packet_payload_offset] + 1; // psi payload pointer byte
-
- //Util::vlog("MpegTS::read_table: payload offset after section pointer field: %d", packet_payload_offset);
-
- if(table_data.length() == 0)
- {
- table = (const section_table_header_t *)&packet.byte[packet_payload_offset];
-
- raw_table_data.assign((const uint8_t *)table, offsetof(section_table_header_t, payload));
-
- //Util::vlog("MpegTS::read_table: table id: %d", table->table_id);
-
- if(table->table_id != filter_table)
- {
- Util::vlog("MpegTS::read_table: table %d != %d", table->table_id, filter_table);
- goto retry;
- }
-
- if(table->private_bit)
- {
- Util::vlog("MpegTS::read_table: private != 0: %d", table->private_bit);
- goto retry;
- }
-
- if(!table->section_syntax)
- {
- Util::vlog("MpegTS::read_table: section_syntax == 0: %d", table->section_syntax);
- goto retry;
- }
-
- if(table->reserved != 0x03)
- {
- Util::vlog("MpegTS::read_table: reserved != 0x03: %x", table->reserved);
- goto retry;
- }
-
- if(table->section_length_unused != 0x00)
- {
- Util::vlog("MpegTS::read_table: section length unused != 0x00: %x", table->section_length_unused);
- goto retry;
- }
-
- section_length = ((table->section_length_high << 8) | (table->section_length_low)) - offsetof(section_table_syntax_t, data);
- //Util::vlog("MpegTS::read_table: section length: %d", section_length);
-
- if(section_length < 0)
- {
- Util::vlog("MpegTS::read_table: section length < 0: %d", section_length);
- goto retry;
- }
-
- if(section_length_remaining < 0)
- section_length_remaining = section_length;
-
- syntax = (const section_table_syntax_t *)&table->payload;
-
- raw_table_data.append((const uint8_t *)syntax, offsetof(section_table_syntax_t, data));
-
- //Util::vlog("MpegTS::read_table: tide: 0x%x", (syntax->tide_high << 8) | syntax->tide_low);
-
- if(syntax->reserved != 0x03)
- {
- Util::vlog("MpegTS::read_table: syntax reserved != 0x03: %d", syntax->reserved);
- goto retry;
- }
-
- //Util::vlog("MpegTS::read_table: currnext: %d", syntax->currnext);
- //Util::vlog("MpegTS::read_table: version: %d", syntax->version);
- //Util::vlog("MpegTS::read_table: ordinal: %d", syntax->ordinal);
- //Util::vlog("MpegTS::read_table: last: %d", syntax->last);
-
- payload_length = sizeof(ts_packet_t) - ((const uint8_t *)&syntax->data - &packet.byte[0]);
- //Util::vlog("MpegTS::read_table: payload length: %d", payload_length);
-
- if(payload_length > section_length_remaining)
- payload_length = section_length_remaining;
-
- //Util::vlog("MpegTS::read_table: payload length after trimming: %d", payload_length);
-
- raw_table_data.append((const uint8_t *)&syntax->data, payload_length);
- table_data.append((const uint8_t *)&syntax->data, payload_length);
- }
- else
- {
- payload_data = (const uint8_t *)&packet.byte[packet_payload_offset];
- payload_length = sizeof(ts_packet_t) - packet_payload_offset;
-
- if(payload_length > section_length_remaining)
- payload_length = section_length_remaining;
-
- raw_table_data.append((const uint8_t *)payload_data, payload_length);
- table_data.append((const uint8_t *)payload_data, payload_length);
- }
-
- section_length_remaining -= payload_length;
-
- if((section_length > 0) && (section_length_remaining == 0))
- break;
-
- continue;
-
-retry:
- section_length = -1;
- section_length_remaining = -1;
- raw_table_data.clear();
- table_data.clear();
- }
-
- if(table_data.length() == 0)
- {
- //Util::vlog("MpegTS::read_table: timeout");
- return(false);
- }
-
- if(section_length < (int)sizeof(mpeg_crc_t))
- {
- Util::vlog("MpegTS::read_table: table too small");
- return(false);
- }
-
- my_crc.process_bytes(raw_table_data.data(), raw_table_data.length() - sizeof(mpeg_crc_t));
-
- mpeg_crc = *(const mpeg_crc_t *)(&raw_table_data.data()[raw_table_data.length() - sizeof(mpeg_crc_t)]);
- their_crc = (mpeg_crc.byte[0] << 24) | (mpeg_crc.byte[1] << 16) | (mpeg_crc.byte[2] << 8) | mpeg_crc.byte[3];
-
- if(my_crc.checksum() != their_crc)
- {
- Util::vlog("MpegTS::read_table: crc mismatch: my crc: %x, their crc: %x", my_crc.checksum(), their_crc);
- return(false);
- }
-
- return(true);
-}
-
-bool MpegTS::read_pat() throw(trap)
-{
- int attempt;
- int current, entries, program, pid;
- const pat_entry_t *entry;
-
- for(attempt = 0; attempt < 16; attempt++)
- {
- if(!read_table(0, 0))
- continue;
-
- entries = (table_data.length() - sizeof(mpeg_crc_t)) / sizeof(*entry);
- entry = (const pat_entry_t *)table_data.data();
-
- for(current = 0; current < entries; current++)
- {
- program = (entry[current].program_high << 8) | (entry[current].program_low);
- pid = (entry[current].pmt_pid_high << 8) | (entry[current].pmt_pid_low);
- //Util::vlog("MpegTS::read_pat > program: %d -> pid %x", program, pid);
-
- if(entry[current].reserved != 0x07)
- {
- Util::vlog("MpegTS::read_pat > reserved != 0x07: 0x%x", entry[current].reserved);
- goto next_pat_entry;
- }
-
- pat[program] = pid;
- }
-
- return(true);
-
-next_pat_entry:
- (void)0;
- }
-
- return(false);
-}
-
-bool MpegTS::read_pmt(int filter_pid) throw(trap)
-{
- int attempt, programinfo_length, esinfo_length;
- int es_pid, es_data_length, es_data_skip, es_data_offset;
- int ds_data_skip, ds_data_offset;
- bool private_stream_is_ac3;
- string stream_language;
-
- const uint8_t *es_data;
- const pmt_header_t *pmt_header;
- const pmt_es_entry_t *es_entry;
- const pmt_ds_entry_t *ds_entry;
- const pmt_ds_a_t *ds_a;
-
- pcr_pid = video_pid = audio_pid = -1;
-
- for(attempt = 0; attempt < 16; attempt++)
- {
- if(!read_table(filter_pid, table_pmt))
- break;
-
- pmt_header = (const pmt_header_t *)table_data.data();
- pcr_pid = (pmt_header->pcrpid_high << 8) | pmt_header->pcrpid_low;
- programinfo_length = (pmt_header->programinfo_length_high << 8) |
- pmt_header->programinfo_length_low;
-
- if(pmt_header->reserved_1 != 0x07)
- {
- Util::vlog("MpegTS::read_pmt > reserved_1: %x", pmt_header->reserved_1);
- continue;
- }
-
- //Util::vlog("MpegTS::read_pmt: > pcr_pid: %x", pcr_pid);
- //Util::vlog("MpegTS::read_pmt: > program info length: %d", programinfo_length);
-
- if(pmt_header->unused != 0x00)
- {
- Util::vlog("MpegTS::read_pmt: > unused: %x", pmt_header->unused);
- continue;
- }
-
- if(pmt_header->reserved_2 != 0x0f)
- {
- Util::vlog("MpegTS::read_pmt: > reserved_2: %x", pmt_header->reserved_2);
- continue;
- }
-
- es_data = &(table_data.data()[offsetof(pmt_header_t, data) + programinfo_length]);
- es_data_length = table_data.length() - offsetof(pmt_header_t, data);
- es_data_skip = offsetof(pmt_es_entry_t, descriptors);
-
- for(es_data_offset = 0; (es_data_offset + es_data_skip + (int)sizeof(uint32_t)) < es_data_length; )
- {
- es_entry = (const pmt_es_entry_t *)&es_data[es_data_offset];
- esinfo_length = (es_entry->es_length_high << 8) | es_entry->es_length_low;
- es_pid = (es_entry->es_pid_high << 8) | es_entry->es_pid_low;
-
- if(es_entry->reserved_1 != 0x07)
- {
- Util::vlog("MpegTS::read_pmt: reserved 1: %x", es_entry->reserved_1);
- goto next_descriptor_entry;
- }
-
- //Util::vlog("MpegTS::read_pmt: >> pid: %x", es_pid);
-
- if(es_entry->reserved_2 != 0x0f)
- {
- Util::vlog("MpegTS::read_pmt: reserved 2: %x", es_entry->reserved_2);
- goto next_descriptor_entry;
- }
-
- if(es_entry->unused != 0x00)
- {
- Util::vlog("MpegTS::read_pmt: unused: %x", es_entry->unused);
- goto next_descriptor_entry;
- }
-
- //Util::vlog("MpegTS::read_pmt: esinfo_length: %d", esinfo_length);
-
- switch(es_entry->stream_type)
- {
- case(mpeg_streamtype_video_mpeg1):
- case(mpeg_streamtype_video_mpeg2):
- case(mpeg_streamtype_video_h264):
- {
- if(video_pid < 0)
- video_pid = es_pid;
- break;
- }
-
- case(mpeg_streamtype_audio_mpeg1):
- case(mpeg_streamtype_audio_mpeg2):
- case(mpeg_streamtype_private_pes): // ac3
- {
- private_stream_is_ac3 = false;
-
- ds_data_skip = es_data_offset + offsetof(pmt_es_entry_t, descriptors);
-
- for(ds_data_offset = 0; (ds_data_offset + 2) < esinfo_length; )
- {
- ds_entry = (const pmt_ds_entry_t *)&es_data[ds_data_skip + ds_data_offset];
-
- //Util::vlog("MpegTS::read_pmt: >>> offset: %d", ds_data_offset);
- //Util::vlog("MpegTS::read_pmt: >>> descriptor id: %x", ds_entry->id);
- //Util::vlog("MpegTS::read_pmt: >>> length: %d", ds_entry->length);
-
- switch(ds_entry->id)
- {
- case(pmt_desc_language):
- {
- ds_a = (const pmt_ds_a_t *)&ds_entry->data;
- //Util::vlog("MpegTS::read_pmt: >>>> lang: %c%c%c [%d]", ds_a->lang[0],
- // ds_a->lang[1], ds_a->lang[2], ds_a->code);
-
- stream_language.assign((const char *)&ds_a->lang, offsetof(pmt_ds_a_t, code));
-
- break;
- }
-
- case(pmt_desc_ac3):
- {
- private_stream_is_ac3 = true;
- break;
- }
- }
-
- ds_data_offset += ds_entry->length + offsetof(pmt_ds_entry_t, data);
- }
-
- if(!boost::iequals(stream_language, "nar"))
- {
- if(private_stream_is_ac3 || (audio_pid < 0)) // ac3 stream has preference
- audio_pid = es_pid;
- }
- }
- }
-
-next_descriptor_entry:
- es_data_offset += es_data_skip + esinfo_length;
- }
-
- return(true);
- }
-
- return(false);
-}
-
-int MpegTS::get_fd() const throw()
-{
- return(fd);
-}
-
-void MpegTS::parse_pts_ms(int pts_ms, int &h, int &m, int &s, int &ms) throw()
-{
- h = pts_ms / (60 * 60 * 1000);
- pts_ms -= h * (60 * 60 * 1000);
- m = pts_ms / (60 * 1000);
- pts_ms -= m * (60 * 1000);
- s = pts_ms / (1000);
- pts_ms -= s * (1000);
- ms = pts_ms;
-}
-
-int MpegTS::find_pcr_ms(seek_direction_t direction) const throw()
-{
- ts_packet_t packet;
- ts_adaptation_field_t *afield;
- int attempt, pcr_ms = -1, pid;
- int ms, h, m, s;
- off_t offset;
-
- for(attempt = 0; (pcr_ms < 0) && (attempt < find_pcr_max_probe); attempt++)
- {
- if(direction == direction_backward)
- {
- offset = (off_t)sizeof(ts_packet_t) * 2 * -1;
-
- if(lseek(fd, offset, SEEK_CUR) == (off_t)-1)
- {
- Util::vlog("MpegTS::find_pcr_ms: lseek failed");
- return(-1);
- }
- }
-
- if(read(fd, &packet, sizeof(packet)) != sizeof(packet))
- {
- Util::vlog("MpegTS::find_pcr_ms: read error");
- return(-1);
- }
-
- if(packet.header.sync_byte != sync_byte_value)
- {
- Util::vlog("MpegTS::find_pcr_ms: no sync byte");
- return(-1);
- }
-
- pid = (packet.header.pid_high << 8) | packet.header.pid_low;
-
- //Util::vlog("MpegTS::find_pcr_ms: pid: %d", pid);
-
- if(pid != pcr_pid)
- continue;
-
- if(!packet.header.af)
- {
- //Util::vlog("MpegTS::find_pcr_ms: no adaptation field");
- continue;
- }
-
- afield = (ts_adaptation_field_t *)&packet.header.afield;
-
- if(!afield->contains_pcr)
- {
- //Util::vlog("MpegTS::find_pcr_ms: attempt: %d, adaptation field does not have pcr field", attempt);
- continue;
- }
-
- // read 32 bits of the total 42 bits of clock precision, which is
- // enough for seeking, one tick = 1/45th second
-
- pcr_ms = uint32_t(afield->pcr_0 << 24) | uint32_t(afield->pcr_1 << 16) |
- uint32_t(afield->pcr_2 << 8) | uint32_t(afield->pcr_3 << 0);
- pcr_ms /= 90;
- pcr_ms <<= 1;
- }
-
- if(attempt >= find_pcr_max_probe)
- {
- Util::vlog("MpegTS::find_pcr_ms: no pcr found");
- return(-1);
- }
-
- parse_pts_ms(pcr_ms, h, m, s, ms);
-
- //Util::vlog("PCR found after %d packets", attempt);
- //Util::vlog("PCR = %d ms (%02d:%02d:%02d:%03d)", pcr_ms, h, m, s, ms);
-
- return(pcr_ms);
-}
-
-off_t MpegTS::seek(int whence, off_t offset) const throw(trap)
-{
- ts_packet_t packet;
- off_t actual_offset;
- off_t new_offset;
-
- offset = ((offset / (off_t)sizeof(ts_packet_t)) * (off_t)sizeof(ts_packet_t));
-
- if(lseek(fd, offset, whence) < 0)
- throw(trap("MpegTS::seek: lseek (1)"));
-
- if(read(fd, &packet, sizeof(packet)) != sizeof(packet))
- throw(trap("MpegTS::seek: read error"));
-
- if(packet.header.sync_byte != sync_byte_value)
- throw(trap("MpegTS::seek: no sync byte"));
-
- new_offset = (off_t)0 - (off_t)sizeof(packet);
-
- if((actual_offset = lseek(fd, new_offset, SEEK_CUR)) < 0)
- throw(trap("MpegTS::seek: lseek (2)"));
-
- return(actual_offset);
-}
-
-off_t MpegTS::seek_absolute(off_t offset) const throw(trap)
-{
- return(seek(SEEK_SET, offset));
-}
-
-off_t MpegTS::seek_relative(off_t roffset, off_t limit) const throw(trap)
-{
- off_t offset;
-
- if((roffset < 0) || (roffset > limit))
- throw(trap("MpegTS::seek_relative: value out of range"));
-
- offset = (eof_offset / limit) * roffset;
-
- return(seek(SEEK_SET, offset));
-}
-
-off_t MpegTS::seek_time(int pts_ms) const throw(trap)
-{
- int h, m, s, ms;
- int attempt;
- off_t lower_bound_offset = 0;
- int lower_bound_pts_ms = first_pcr_ms;
- off_t upper_bound_offset = eof_offset;
- int upper_bound_pts_ms = last_pcr_ms;
- off_t disect_offset;
- int disect_pts_ms;
- off_t current_offset;
-
- if(!is_time_seekable)
- throw(trap("MpegTS::seek: stream is not seekable"));
-
- if(pts_ms < first_pcr_ms)
- {
- Util::vlog("MpegTS::seek: seek pts beyond start of file");
- return(-1);
- }
-
- if(pts_ms > last_pcr_ms)
- {
- Util::vlog("MpegTS::seek: pts beyond end of file");
- return(-1);
- }
-
- for(attempt = 0; attempt < seek_max_attempts; attempt++)
- {
- disect_offset = (lower_bound_offset + upper_bound_offset) / 2;
-
- parse_pts_ms(pts_ms, h, m, s, ms);
- //Util::vlog("MpegTS::seek: seek for [%02d:%02d:%02d.%03d] between ", h, m, s, ms);
- parse_pts_ms(lower_bound_pts_ms, h, m, s, ms);
- //Util::vlog("MpegTS::seek: [%02d:%02d:%02d.%03d", h, m, s, ms);
- parse_pts_ms(upper_bound_pts_ms, h, m, s, ms);
- //Util::vlog("MpegTS::seek: -%02d:%02d:%02d.%03d], ", h, m, s, ms);
- //Util::vlog("MpegTS::seek: offset = %lld [%lld-%lld], ", disect_offset, lower_bound_offset, upper_bound_offset);
-
- if((current_offset = seek(SEEK_SET, disect_offset)) == 1)
- {
- Util::vlog("MpegTS::seek: seek fails");
- return(-1);
- }
-
- //Util::vlog("MpegTS::seek: current offset = %lld (%lld%%)", current_offset, (current_offset * 100) / eof_offset);
-
- if((disect_pts_ms = find_pcr_ms(direction_forward)) < 0)
- {
- Util::vlog("MpegTS::seek: eof");
- return(-1);
- }
-
- parse_pts_ms(disect_pts_ms, h, m, s, ms);
- //Util::vlog("MpegTS::seek: disect=[%02d:%02d:%02d.%03d]", h, m, s, ms);
-
- if(disect_pts_ms < 0)
- {
- Util::vlog("MpegTS::seek failed to find pts");
- return(-1);
- }
-
- if(((disect_pts_ms > pts_ms) && ((disect_pts_ms - pts_ms) < 8000)) ||
- ((pts_ms >= disect_pts_ms) && ((pts_ms - disect_pts_ms) < 8000)))
- {
- //Util::vlog("MpegTS::seek: found");
- return(current_offset);
- }
-
- if(disect_pts_ms < pts_ms) // seek higher
- {
- lower_bound_offset = disect_offset;
- lower_bound_pts_ms = disect_pts_ms;
-
- //Util::vlog("MpegTS::seek: not found: change lower bound");
- }
- else
- {
- upper_bound_offset = disect_offset;
- upper_bound_pts_ms = disect_pts_ms;
-
- //Util::vlog("MpegTS::seek: not found: change upper bound");
- }
- }
-
- return(-1);
-}
diff --git a/src/3rdparty/mpegts.h b/src/3rdparty/mpegts.h
deleted file mode 100644
index 6402dca..0000000
--- a/src/3rdparty/mpegts.h
+++ /dev/null
@@ -1,216 +0,0 @@
-#ifndef _mpegts_h_
-#define _mpegts_h_
-
-//#include "config.h" // oskwon
-#include "trap.h"
-
-#include <map>
-#include <string>
-
-#include <stdint.h>
-#include <sys/types.h>
-
-#include "../Source.h" // oskwon
-class MpegTS : public Source // oskwon : inherit Source class.
-{
- private:
-
- typedef std::basic_string<uint8_t> u8string;
- typedef unsigned int bf;
-
- enum
- {
- sync_byte_value = 0x47,
- };
-
- enum
- {
- table_pmt = 0x02,
- };
-
- enum
- {
- find_pcr_max_probe = 1000000,
- seek_max_attempts = 32,
- };
-
- enum
- {
- pmt_desc_language = 0x0a,
- pmt_desc_ac3 = 0x6a,
- };
-
- enum
- {
- mpeg_streamtype_video_mpeg1 = 0x01,
- mpeg_streamtype_video_mpeg2 = 0x02,
- mpeg_streamtype_video_h264 = 0x1b,
- mpeg_streamtype_audio_mpeg1 = 0x03,
- mpeg_streamtype_audio_mpeg2 = 0x04,
- mpeg_streamtype_private_pes = 0x06,
- };
-
- typedef struct
- {
- uint8_t byte[4];
- } mpeg_crc_t;
-
- typedef union
- {
- struct
- {
- uint8_t sync_byte;
- bf pid_high:5;
- bf tp:1;
- bf pusi:1;
- bf tei:1;
- uint8_t pid_low;
- bf cc:4;
- bf payload_present:1;
- bf af:1;
- bf sc:2;
- uint8_t payload[0];
- uint8_t afield_length;
- uint8_t afield[0];
- } header;
- uint8_t byte[188];
- } ts_packet_t;
-
- typedef struct
- {
- bf field_ext:1;
- bf private_data:1;
- bf splice_point:1;
- bf contains_opcr:1;
- bf contains_pcr:1;
- bf priority:1;
- bf gop_start:1;
- bf discont:1;
- uint8_t pcr_0; // 32 bits of the total 42 bits
- uint8_t pcr_1; // of clock precision, which is
- uint8_t pcr_2; // enough for seeking
- uint8_t pcr_3; // tick = 1/45th second
- } ts_adaptation_field_t;
-
- typedef struct
- {
- uint8_t table_id;
- bf section_length_high:2;
- bf section_length_unused:2;
- bf reserved:2;
- bf private_bit:1;
- bf section_syntax:1;
- uint8_t section_length_low;
- uint8_t payload[0];
- } section_table_header_t;
-
- typedef struct
- {
- uint8_t tide_high;
- uint8_t tide_low;
- bf currnext:1;
- bf version:5;
- bf reserved:2;
- uint8_t ordinal;
- uint8_t last;
- uint8_t data[0];
- } section_table_syntax_t;
-
- typedef struct
- {
- uint8_t program_high;
- uint8_t program_low;
- bf pmt_pid_high:5;
- bf reserved:3;
- uint8_t pmt_pid_low;
- } pat_entry_t;
-
- typedef struct
- {
- bf pcrpid_high:5;
- bf reserved_1:3;
- uint8_t pcrpid_low;
- uint programinfo_length_high:2;
- bf unused:2;
- bf reserved_2:4;
- uint8_t programinfo_length_low;
- uint8_t data[0];
- } pmt_header_t;
-
- typedef struct
- {
- uint8_t stream_type;
- bf es_pid_high:5;
- bf reserved_1:3;
- uint8_t es_pid_low;
- bf es_length_high:2;
- bf unused:2;
- bf reserved_2:4;
- uint8_t es_length_low;
- uint8_t descriptors[0];
- } pmt_es_entry_t;
-
- typedef struct
- {
- uint8_t id;
- uint8_t length;
- uint8_t data[0];
- } pmt_ds_entry_t;
-
- typedef struct
- {
- uint8_t lang[3];
- uint8_t code;
- } pmt_ds_a_t;
-
- typedef enum
- {
- direction_forward,
- direction_backward,
- } seek_direction_t;
-
- typedef std::map<int, int> mpegts_pat_t;
-
- MpegTS();
- MpegTS(const MpegTS &);
-
- bool private_fd;
- int fd;
- mpegts_pat_t pat;
- u8string raw_table_data;
- u8string table_data;
-
- static void parse_pts_ms(int pts_ms, int &h, int &m, int &s, int &ms) throw();
-
- void init() throw(trap);
- bool read_table(int filter_pid, int filter_table) throw(trap);
- bool read_pat() throw(trap);
- bool read_pmt(int filter_pid) throw(trap);
- int find_pcr_ms(seek_direction_t direction) const throw();
- off_t seek(int whence, off_t offset) const throw(trap);
-
- public:
-
- int pmt_pid;
- int pcr_pid;
- int video_pid;
- int audio_pid;
-
- bool request_time_seek;
- bool is_time_seekable;
-
- int first_pcr_ms;
- int last_pcr_ms;
- off_t eof_offset;
- off_t stream_length;
-
- MpegTS(int fd, bool request_time_seek) throw(trap);
- MpegTS(std::string file, bool request_time_seek) throw(trap);
- ~MpegTS() throw();
-
- int get_fd() const throw();
- off_t seek_absolute(off_t offset) const throw(trap);
- off_t seek_relative(off_t offset, off_t limit) const throw(trap);
- off_t seek_time(int pts_ms) const throw(trap);
-};
-#endif
diff --git a/src/Demuxer.cpp b/src/Demuxer.cpp
index 83290c6..d504b4e 100644
--- a/src/Demuxer.cpp
+++ b/src/Demuxer.cpp
@@ -22,6 +22,7 @@
using namespace std;
//-------------------------------------------------------------------------------
+bool terminated();
std::string Demuxer::webif_reauest(std::string request) throw(http_trap)
{
if ((sock = socket(PF_INET, SOCK_STREAM, 0)) < 0)
@@ -59,6 +60,10 @@ std::string Demuxer::webif_reauest(std::string request) throw(http_trap)
}
response += buffer;
}
+ if (terminated()) {
+ response = "";
+ break;
+ }
}
return response;
}
@@ -180,6 +185,26 @@ bool Demuxer::parse_webif_response(std::string& response, std::vector<unsigned l
}
//-------------------------------------------------------------------------------
+void Demuxer::open() throw(http_trap)
+{
+ if (demux_id < 0) {
+ throw(http_trap("demux id is not set!!", 503, "Service Unavailable"));
+ }
+ std::string demuxpath = "/dev/dvb/adapter0/demux" + Util::ultostr(demux_id);
+ if ((fd = ::open(demuxpath.c_str(), O_RDWR | O_NONBLOCK)) < 0) {
+ throw(http_trap(std::string("demux open fail : ") + demuxpath, 503, "Service Unavailable"));
+ }
+ INFO("demux open success : %s", demuxpath.c_str());
+
+ try {
+ set_filter(new_pids);
+ }
+ catch (const trap &e) {
+ throw(http_trap(e.what(), 503, "Service Unavailable"));
+ }
+}
+//-------------------------------------------------------------------------------
+
Demuxer::Demuxer(HttpHeader *header) throw(http_trap)
{
demux_id = pat_pid = fd = sock = -1;
@@ -205,27 +230,35 @@ Demuxer::Demuxer(HttpHeader *header) throw(http_trap)
throw(http_trap("webif whthentication fail.", 401, "Unauthorized"));
}
- std::vector<unsigned long> new_pids;
+ new_pids.clear();
if (!parse_webif_response(webif_response, new_pids))
throw(http_trap("webif response parsing fail.", 503, "Service Unavailable"));
-
- std::string demuxpath = "/dev/dvb/adapter0/demux" + Util::ultostr(demux_id);
- if ((fd = open(demuxpath.c_str(), O_RDWR | O_NONBLOCK)) < 0) {
- throw(http_trap(std::string("demux open fail : ") + demuxpath, 503, "Service Unavailable"));
- }
- INFO("demux open success : %s", demuxpath.c_str());
-
- try {
- set_filter(new_pids);
- }
- catch (const trap &e) {
- throw(http_trap(e.what(), 503, "Service Unavailable"));
- }
}
//-------------------------------------------------------------------------------
Demuxer::~Demuxer() throw()
{
+ std::vector<unsigned long>::iterator iter = pids.begin();
+ for (; iter != pids.end(); ++iter) {
+ unsigned long pid = *iter;
+
+ while(pid != -1) {
+#if HAVE_DVB_API_VERSION > 3
+ __u16 p = pid;
+ if (::ioctl(fd, DMX_REMOVE_PID, &p) < 0) {
+#else
+ if (::ioctl(fd, DMX_REMOVE_PID, pid) < 0) {
+#endif
+ WARNING("DMX_REMOVE_PID");
+ if (errno == EAGAIN || errno == EINTR) {
+ DEBUG("retry DMX_REMOVE_PID");
+ continue;
+ }
+ }
+ break;
+ }
+ }
+
if (fd != -1) close(fd);
if (sock != -1) close(sock);
diff --git a/src/Demuxer.h b/src/Demuxer.h
index 4b8732a..11c0797 100644
--- a/src/Demuxer.h
+++ b/src/Demuxer.h
@@ -32,6 +32,7 @@ private:
int demux_id;
int pat_pid;
std::vector<unsigned long> pids;
+ std::vector<unsigned long> new_pids;
protected:
std::string webif_reauest(std::string request) throw(http_trap);
@@ -42,7 +43,10 @@ protected:
public:
Demuxer(HttpHeader *header) throw(http_trap);
virtual ~Demuxer() throw();
+ void open() throw(http_trap);
+
int get_fd() const throw();
+ bool is_initialized() { return true; }
};
//----------------------------------------------------------------------
diff --git a/src/Encoder.cpp b/src/Encoder.cpp
index c9d09f2..044e130 100644
--- a/src/Encoder.cpp
+++ b/src/Encoder.cpp
@@ -80,6 +80,12 @@ Encoder::Encoder() throw(trap)
Encoder::~Encoder()
{
Post();
+ encoder_close();
+}
+//----------------------------------------------------------------------
+
+void Encoder::encoder_close()
+{
if (fd != -1) {
if (state == ENCODER_STAT_STARTED) {
DEBUG("stop transcoding..");
@@ -103,15 +109,21 @@ bool Encoder::encoder_open()
}
//----------------------------------------------------------------------
+bool terminated();
bool Encoder::retry_open(int retry_count, int sleep_time)
{
for (int i = 0; i < retry_count; ++i) {
+ if (terminated()) {
+ break;
+ }
if (encoder_open()) {
DEBUG("encoder-%d open success..", encoder_id);
return true;
}
WARNING("encoder%d open fail, retry count : %d/%d", encoder_id, i, retry_count);
- sleep(sleep_time);
+ if (retry_count > 1) {
+ sleep(sleep_time);
+ }
}
ERROR("encoder open fail : %s (%d)", strerror(errno), errno);
return false;
diff --git a/src/Encoder.h b/src/Encoder.h
index e44cfb9..3a69327 100644
--- a/src/Encoder.h
+++ b/src/Encoder.h
@@ -67,6 +67,8 @@ public:
int get_fd();
bool ioctl(int cmd, int value);
bool retry_open(int retry_count, int sleep_time);
+
+ void encoder_close();
};
//----------------------------------------------------------------------
diff --git a/src/Http.cpp b/src/Http.cpp
index 161b288..26e1e99 100644
--- a/src/Http.cpp
+++ b/src/Http.cpp
@@ -171,16 +171,24 @@ std::string HttpHeader::build_response(Mpeg *source)
}
//----------------------------------------------------------------------
+bool terminated();
std::string HttpHeader::read_request()
{
std::string request = "";
while (true) {
char buffer[128] = {0};
- fgets(buffer, 127, stdin);
-
+ if (!read (0, buffer, 127)) {
+ break;
+ }
request += buffer;
if(request.find("\r\n\r\n") != string::npos)
break;
+
+ if (terminated()) {
+ request = "";
+ break;
+ }
+ usleep(0);
}
return request;
}
diff --git a/src/Logger.cpp b/src/Logger.cpp
index dc05d3a..b591bd6 100644
--- a/src/Logger.cpp
+++ b/src/Logger.cpp
@@ -37,18 +37,6 @@ static const char* LOG_LV_STR[] = {
#endif
//----------------------------------------------------------------------
-char* timestamp(const char* aFormat)
-{
- time_t t;
- time(&t);
-
- struct tm* m = localtime(&t);
- static char sz_timestamp[32] = {0};
- strftime(sz_timestamp, sizeof(sz_timestamp), aFormat, m);
- return sz_timestamp;
-}
-//----------------------------------------------------------------------
-
Logger::Logger()
: mLogLevel(0), mLogHandle(0)
{
@@ -106,9 +94,7 @@ bool Logger::init(const char* aName, int aLogLevel, bool aWithTimestamp)
return true;
}
char path[256] = {0};
- if (aWithTimestamp)
- sprintf(path, "%s_%s.log", aName, timestamp("%Y%m%d"));
- else sprintf(path, "%s.log", aName);
+ sprintf(path, "%s.log", aName);
if (!(mLogHandle = fopen(path, "a+"))) {
mLogHandle = 0;
// printf("fail to open logger [%s].", path);
@@ -197,8 +183,7 @@ void Logger::log(int aLogLevel, const char* aFormat, ...)
va_start(args, aFormat);
vsnprintf(log_data_buffer, MAX_PRINT_LEN-1, aFormat, args);
va_end(args);
-
- fprintf(mLogHandle, "[%s]%s[%d] %s\n", timestamp(DEFAULT_TIMESTAMP_FORMAT), LOG_LV_STR[aLogLevel], mPid, log_data_buffer);
+ fprintf(mLogHandle, "%s[%d] %s\n", LOG_LV_STR[aLogLevel], mPid, log_data_buffer);
fflush(mLogHandle);
#endif
}
diff --git a/src/Makefile.am b/src/Makefile.am
index 5f6df1a..75a67e5 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -10,7 +10,6 @@ transtreamproxy_SOURCES = \
Mpeg.cpp \
UriDecoder.cpp \
Util.cpp \
- 3rdparty/mpegts.cpp \
3rdparty/trap.cpp
transtreamproxy_CXXFLAGS = $(LIBSDL_CFLAGS) -Wno-unused-result -D_GNU_SOURCE -D_FILE_OFFSET_BITS=64
diff --git a/src/Mpeg.cpp b/src/Mpeg.cpp
index 0cac649..c2e7781 100644
--- a/src/Mpeg.cpp
+++ b/src/Mpeg.cpp
@@ -9,6 +9,15 @@
#include "Http.h"
#include "Util.h"
#include "Logger.h"
+
+#include <sys/stat.h>
+
+#include <vector>
+#include <string>
+#include <fstream>
+#include <iterator>
+
+using namespace std;
//----------------------------------------------------------------------
void Mpeg::seek(HttpHeader &header)
@@ -278,6 +287,58 @@ int Mpeg::calc_bitrate()
}
//----------------------------------------------------------------------
+int Mpeg::find_pmt()
+{
+ off_t position=0;
+
+ int left = 5*1024*1024;
+
+ while (left >= 188) {
+ unsigned char packet[188];
+ int ret = read_internal(position, packet, 188);
+ if (ret != 188) {
+ DEBUG("read error");
+ break;
+ }
+ left -= 188;
+ position += 188;
+
+ if (packet[0] != 0x47) {
+ int i = 0;
+ while (i < 188) {
+ if (packet[i] == 0x47)
+ break;
+ --position; ++i;
+ }
+ continue;
+ }
+ int pid = ((packet[1] << 8) | packet[2]) & 0x1FFF;
+ int pusi = !!(packet[1] & 0x40);
+
+ if (!pusi) continue;
+
+ /* ok, now we have a PES header or section header*/
+ unsigned char *sec;
+
+ /* check for adaption field */
+ if (packet[3] & 0x20) {
+ if (packet[4] >= 183)
+ continue;
+ sec = packet + packet[4] + 4 + 1;
+ }
+ else {
+ sec = packet + 4;
+ }
+ if (sec[0]) continue; /* table pointer, assumed to be 0 */
+ if (sec[1] == 0x02) { /* program map section */
+ pmt_pid = pid;
+ return 0;
+ }
+ }
+ return -1;
+}
+//----------------------------------------------------------------------
+
int Mpeg::get_offset(off_t &offset, pts_t &pts, int marg)
{
calc_length();
@@ -508,3 +569,127 @@ int Mpeg::calc_length()
return m_duration;
}
//----------------------------------------------------------------------
+
+namespace eCacheID {
+ enum {
+ cVPID = 0,
+ cAPID,
+ cTPID,
+ cPCRPID,
+ cAC3PID,
+ cVTYPE,
+ cACHANNEL,
+ cAC3DELAY,
+ cPCMDELAY,
+ cSUBTITLE,
+ cacheMax
+ };
+};
+
+/* f:40,c:00007b,c:01008f,c:03007b */
+bool Mpeg::read_ts_meta(std::string media_file_name, int &vpid, int &apid)
+{
+ std::string metafilename = media_file_name;
+ metafilename += ".meta";
+
+ std::ifstream ifs(metafilename.c_str());
+
+ if (!ifs.is_open()) {
+ DEBUG("metadata is not exists..");
+ return false;
+ }
+
+ size_t rc = 0, i = 0;
+ char buffer[1024] = {0};
+ while (!ifs.eof()) {
+ ifs.getline(buffer, 1024);
+ if (i++ == 7) {
+ DEBUG("%d [%s]", i, buffer);
+ std::vector<string> tokens;
+ Util::split(buffer, ',', tokens);
+ if(tokens.size() < 3) {
+ DEBUG("pid count size error : %d", tokens.size());
+ return false;
+ }
+
+ int setting_done = false;
+ for (int ii = 0; ii < tokens.size(); ++ii) {
+ std::string token = tokens[ii];
+ if(token.length() <= 0) continue;
+ if(token.at(0) != 'c') continue;
+
+ int cache_id = atoi(token.substr(2,2).c_str());
+ DEBUG("token : %d [%s], chcke_id : [%d]", ii, token.c_str(), cache_id);
+ switch(cache_id) {
+ case(eCacheID::cVPID):
+ vpid = strtol(token.substr(4,4).c_str(), NULL, 16);
+ DEBUG("video pid : %d", vpid);
+ setting_done = (vpid != -1 && apid != -1) ? true : false;
+ break;
+ case(eCacheID::cAC3PID):
+ apid = strtol(token.substr(4,4).c_str(), NULL, 16);
+ DEBUG("audio pid : %d", apid);
+ break;
+ case(eCacheID::cAPID):
+ apid = strtol(token.substr(4,4).c_str(), NULL, 16);
+ DEBUG("audio pid : %d", apid);
+ setting_done = (vpid != -1 && apid != -1) ? true : false;
+ break;
+ }
+ if(setting_done) break;
+ }
+ break;
+ }
+ }
+ ifs.close();
+ return true;
+}
+//-------------------------------------------------------------------------------
+
+Mpeg::Mpeg(std::string filename, bool request_time_seek) throw (trap)
+// : MpegTS(filename)
+{
+ m_current_offset = m_base_offset = m_last_offset = 0;
+ m_splitsize = m_nrfiles = m_current_file = m_totallength = 0;
+
+ m_pts_begin = m_pts_end = m_offset_begin = m_offset_end = 0;
+ m_last_filelength = m_begin_valid = m_end_valid = m_futile =0;
+
+ m_duration = m_samples_taken = 0;
+ m_is_initialized = false;
+
+ pmt_pid = video_pid = audio_pid = -1;
+
+ fd = open(filename.c_str(), O_RDONLY | O_LARGEFILE, 0);
+ if (fd < 0) {
+ throw(trap("cannot open file"));
+ }
+
+ struct stat filestat;
+ if (fstat(fd, &filestat)) {
+ throw(trap("MpegTS::init: cannot stat"));
+ }
+ INFO("file length: %lld Mb", filestat.st_size >> 20);
+
+ stream_length = filestat.st_size;
+
+ int apid = -1, vpid = -1;
+ if (read_ts_meta(filename, vpid, apid)) {
+ if (vpid != -1 && apid != -1) {
+ video_pid = vpid;
+ audio_pid = apid;
+ m_is_initialized = true;
+
+ /*find_pmt();*/
+ return;
+ }
+ }
+}
+//----------------------------------------------------------------------
+
+off_t Mpeg::seek_absolute(off_t offset2)
+{
+ off_t result = seek_internal(offset2, SEEK_SET);
+ return result;
+}
+//----------------------------------------------------------------------
diff --git a/src/Mpeg.h b/src/Mpeg.h
index 88db459..f3e643f 100644
--- a/src/Mpeg.h
+++ b/src/Mpeg.h
@@ -8,15 +8,19 @@
#ifndef MPEG_H_
#define MPEG_H_
+#include <map>
+#include <string>
+
+#include "Source.h"
+#include "Logger.h"
#include "3rdparty/trap.h"
-#include "3rdparty/mpegts.h"
//----------------------------------------------------------------------
class HttpHeader;
typedef long long pts_t;
-class Mpeg : public MpegTS
+class Mpeg : public Source
{
private:
off_t m_splitsize, m_totallength, m_current_offset, m_base_offset, m_last_offset;
@@ -34,6 +38,9 @@ private:
int m_duration;
+ int fd;
+ bool m_is_initialized;
+
void scan();
int switch_offset(off_t off);
@@ -49,25 +56,25 @@ private:
void take_samples();
int take_sample(off_t off, pts_t &p);
- off_t seek_internal(off_t offset, int whence);
ssize_t read_internal(off_t offset, void *buf, size_t count);
-public:
- Mpeg(std::string filename, bool request_time_seek) throw (trap)
- : MpegTS(filename, request_time_seek)
- {
- m_current_offset = m_base_offset = m_last_offset = 0;
- m_splitsize = m_nrfiles = m_current_file = m_totallength = 0;
+ off_t seek_absolute(off_t offset);
+ off_t seek_internal(off_t offset, int whence);
+
+ bool read_ts_meta(std::string media_file_name, int &vpid, int &apid);
- m_pts_begin = m_pts_end = m_offset_begin = m_offset_end = 0;
- m_last_filelength = m_begin_valid = m_end_valid = m_futile =0;
+ int find_pmt();
- m_duration = m_samples_taken = 0;
- }
+public:
+ off_t stream_length;
+ int pmt_pid, video_pid, audio_pid;
+ Mpeg(std::string filename, bool request_time_seek) throw (trap);
virtual ~Mpeg() throw () {}
void seek(HttpHeader &header);
+ bool is_initialized() { return m_is_initialized; }
+ int get_fd() const throw() { return fd; }
};
//----------------------------------------------------------------------
diff --git a/src/Source.h b/src/Source.h
index 399fa54..81dbb13 100644
--- a/src/Source.h
+++ b/src/Source.h
@@ -17,6 +17,7 @@ public:
Source(){}
virtual ~Source(){}
virtual int get_fd() const throw() = 0;
+ virtual bool is_initialized() = 0;
};
//----------------------------------------------------------------------
diff --git a/src/Util.cpp b/src/Util.cpp
index 639cdfb..ad54464 100644
--- a/src/Util.cpp
+++ b/src/Util.cpp
@@ -138,8 +138,9 @@ std::vector<int> Util::find_process_by_name(std::string name, int mypid)
void Util::kill_process(int pid)
{
- int result = kill(pid, SIGINT);
+ int result = 0;
+
+ result = kill(pid, SIGINT);
DEBUG("SEND SIGINT to %d, result : %d", pid, result);
- //sleep(1);
}
//----------------------------------------------------------------------
diff --git a/src/main.cpp b/src/main.cpp
index 93197bd..fdf2263 100644
--- a/src/main.cpp
+++ b/src/main.cpp
@@ -15,6 +15,9 @@
#include <string>
+#include <sys/types.h>
+#include <sys/stat.h>
+
#include "Util.h"
#include "Logger.h"
@@ -36,12 +39,123 @@ void signal_handler(int sig_no);
void *source_thread_main(void *params);
void *streaming_thread_main(void *params);
-int streaming_write(const char *buffer, size_t buffer_len, bool enable_log = false);
-//----------------------------------------------------------------------
+static Source *source = 0;
+static Encoder *encoder = 0;
static bool is_terminated = true;
static int source_thread_id, stream_thread_id;
static pthread_t source_thread_handle, stream_thread_handle;
+
+#define TSP_CHECKER_TEMPLETE "/tmp/tsp_status.%d"
+
+pid_t tsp_pid = 0, checker_pid = 0;
+unsigned long last_updated_time = 0;
+//----------------------------------------------------------------------
+
+bool terminated()
+{
+ return is_terminated;
+}
+//----------------------------------------------------------------------
+
+void cbexit()
+{
+ INFO("release resource start");
+ if (encoder) { delete encoder; encoder = 0; }
+ if (source) { delete source; source = 0; }
+
+ char checker_filename[255] = {0};
+ if (tsp_pid) {
+ ::sprintf(checker_filename, TSP_CHECKER_TEMPLETE, tsp_pid);
+ if (::access(checker_filename, F_OK) == 0) {
+ ::unlink(checker_filename);
+ }
+ }
+ INFO("release resource finish");
+}
+//----------------------------------------------------------------------
+
+inline int streaming_write(const char *buffer, size_t buffer_len, bool enable_log = false)
+{
+ if (enable_log) {
+ DEBUG("response data :\n%s", buffer);
+ }
+ return write(1, buffer, buffer_len);
+}
+//----------------------------------------------------------------------
+
+#define DD_LOG(X,...) { \
+ char log_message[128] = {0};\
+ sprintf(log_message, "echo \""X"\" > /tmp/tsp_checker.log", ##__VA_ARGS__);\
+ system(log_message);\
+ }
+//----------------------------------------------------------------------
+
+int send_signal(pid_t pid, int signal)
+{
+ char process_path[255] = {0};
+ sprintf(process_path, "/proc/%d", pid);
+
+ if (access(process_path, F_OK) == 0) {
+ kill(pid, signal);
+ DD_LOG(" >> run kill-pid : %ld -> %ld (%d)", getpid(), pid, signal);
+ }
+ return 0;
+}
+//----------------------------------------------------------------------
+
+void signal_handler_checker(int sig_no)
+{
+ is_terminated = true;
+}
+//----------------------------------------------------------------------
+
+int tsp_checker(pid_t pid)
+{
+ char check_filename[255] = {0};
+ sleep(1);
+ sprintf(check_filename, TSP_CHECKER_TEMPLETE, ::getppid());
+
+ int timebase_count = 0, exit_count = 0;
+ while(!is_terminated) {
+ if (timebase_count != 10) {
+ timebase_count++;
+ }
+ else {
+ if (access(check_filename, F_OK) != 0) {
+ send_signal(tsp_pid, SIGUSR2);
+ DD_LOG("no found %s, %d", check_filename, timebase_count);
+ break;
+ }
+ }
+
+ struct stat sb;
+ stat(check_filename, &sb);
+
+ if (last_updated_time == sb.st_ctime && timebase_count == 10) {
+ if (exit_count > 2) {
+ send_signal(tsp_pid, SIGUSR2);
+ DD_LOG("%ld == %ld", last_updated_time, sb.st_ctime);
+ break;
+ }
+ exit_count++;
+ sleep(1);
+ continue;
+ }
+ exit_count = 0;
+ last_updated_time = sb.st_ctime;
+ sleep(1);
+ }
+ unlink(check_filename);
+
+ DD_LOG("kill (%ld)", tsp_pid);
+
+ sleep(3);
+ send_signal(tsp_pid, SIGKILL);
+ sleep(2);
+
+ return 0;
+}
//----------------------------------------------------------------------
int main(int argc, char **argv)
@@ -51,9 +165,21 @@ int main(int argc, char **argv)
show_help();
exit(0);
}
- Logger::instance()->init("/tmp/transtreamproxy", Logger::WARNING);
+ tsp_pid = ::getpid();
+
+ signal(SIGUSR1, signal_handler_checker);
+
+ Logger::instance()->init("/tmp/transtreamproxy", Logger::ERROR);
+ signal(SIGINT, signal_handler);
+ signal(SIGSEGV, signal_handler);
+ signal(SIGUSR2, signal_handler);
+
+ atexit(cbexit);
+
+ is_terminated = false;
+
+ char update_status_command[255] = {0};
- signal(SIGINT, signal_handler);
HttpHeader header;
std::string req = HttpHeader::read_request();
@@ -73,10 +199,6 @@ int main(int argc, char **argv)
throw(http_trap("not support request type.", 400, "Bad Request, not support request"));
}
- Encoder encoder;
- Source *source = 0;
- ThreadParams thread_params = { 0, &encoder, &header };
-
int video_pid = 0, audio_pid = 0, pmt_pid = 0;
switch(header.type) {
@@ -95,6 +217,15 @@ int main(int argc, char **argv)
break;
case HttpHeader::TRANSCODING_LIVE:
try {
+ checker_pid = ::fork();
+ if (checker_pid == 0) {
+ tsp_checker(checker_pid);
+ exit(0);
+ }
+
+ sprintf(update_status_command, "touch "TSP_CHECKER_TEMPLETE, tsp_pid);
+ system(update_status_command);
+
Demuxer *dmx = new Demuxer(&header);
pmt_pid = dmx->pmt_pid;
video_pid = dmx->video_pid;
@@ -119,13 +250,17 @@ int main(int argc, char **argv)
default:
throw(http_trap(std::string("not support source type : ") + Util::ultostr(header.type), 400, "Bad Request"));
}
- thread_params.source = source;
- if (!encoder.retry_open(2, 3)) {
+ encoder = new Encoder();
+ int encoder_retry_max_count = 1;
+ if (header.type == HttpHeader::TRANSCODING_FILE) {
+ encoder_retry_max_count = 2;
+ }
+ if (!encoder->retry_open(encoder_retry_max_count, 3)) {
throw(http_trap("encoder open fail.", 503, "Service Unavailable"));
}
- if (encoder.state == Encoder::ENCODER_STAT_OPENED) {
+ if (encoder->state == Encoder::ENCODER_STAT_OPENED) {
std::string response = header.build_response((Mpeg*) source);
if (response == "") {
throw(http_trap("response build fail.", 503, "Service Unavailable"));
@@ -137,37 +272,58 @@ int main(int argc, char **argv)
((Mpeg*) source)->seek(header);
}
- if (!encoder.ioctl(Encoder::IOCTL_SET_VPID, video_pid)) {
- throw(http_trap("video pid setting fail.", 503, "Service Unavailable"));
- }
- if (!encoder.ioctl(Encoder::IOCTL_SET_APID, audio_pid)) {
- throw(http_trap("audio pid setting fail.", 503, "Service Unavailable"));
- }
- if (!encoder.ioctl(Encoder::IOCTL_SET_PMTPID, pmt_pid)) {
- throw(http_trap("pmt pid setting fail.", 503, "Service Unavailable"));
+ if (source->is_initialized()) {
+ if (!encoder->ioctl(Encoder::IOCTL_SET_VPID, video_pid)) {
+ throw(http_trap("video pid setting fail.", 503, "Service Unavailable"));
+ }
+ if (!encoder->ioctl(Encoder::IOCTL_SET_APID, audio_pid)) {
+ throw(http_trap("audio pid setting fail.", 503, "Service Unavailable"));
+ }
+
+ if (pmt_pid != -1) {
+ if (!encoder->ioctl(Encoder::IOCTL_SET_PMTPID, pmt_pid)) {
+ throw(http_trap("pmt pid setting fail.", 503, "Service Unavailable"));
+ }
+ }
}
}
- is_terminated = false;
- source_thread_id = pthread_create(&source_thread_handle, 0, source_thread_main, (void *)&thread_params);
+ if (header.type == HttpHeader::TRANSCODING_LIVE) {
+ ((Demuxer*)source)->open();
+ if (((Demuxer*)source)->get_fd() < 0) {
+ throw(http_trap("demux open fail!!", 503, "Service Unavailable"));
+ }
+ }
+ source_thread_id = pthread_create(&source_thread_handle, 0, source_thread_main, 0);
if (source_thread_id < 0) {
is_terminated = true;
throw(http_trap("souce thread create fail.", 503, "Service Unavailable"));
}
else {
pthread_detach(source_thread_handle);
- if (!encoder.ioctl(Encoder::IOCTL_START_TRANSCODING, 0)) {
+ if (!source->is_initialized()) {
+ sleep(1);
+ }
+
+ if (!encoder->ioctl(Encoder::IOCTL_START_TRANSCODING, 0)) {
is_terminated = true;
throw(http_trap("start transcoding fail.", 503, "Service Unavailable"));
}
else {
- stream_thread_id = pthread_create(&stream_thread_handle, 0, streaming_thread_main, (void *)&thread_params);
+ stream_thread_id = pthread_create(&stream_thread_handle, 0, streaming_thread_main, 0);
if (stream_thread_id < 0) {
is_terminated = true;
throw(http_trap("stream thread create fail.", 503, "Service Unavailable"));
}
}
}
+
+ while(!is_terminated) {
+ system(update_status_command);
+ sleep(1);
+ }
+
+ send_signal(checker_pid, SIGUSR1);
pthread_join(stream_thread_handle, 0);
is_terminated = true;
@@ -186,14 +342,17 @@ int main(int argc, char **argv)
error = HttpUtil::http_error(e.http_error, e.http_header);
}
streaming_write(error.c_str(), error.length(), true);
+ send_signal(checker_pid, SIGUSR1);
exit(-1);
}
catch (...) {
ERROR("unknown exception...");
std::string error = HttpUtil::http_error(400, "Bad request");
streaming_write(error.c_str(), error.length(), true);
+ send_signal(checker_pid, SIGUSR1);
exit(-1);
}
+ send_signal(checker_pid, SIGUSR1);
return 0;
}
//----------------------------------------------------------------------
@@ -203,39 +362,38 @@ void *streaming_thread_main(void *params)
if (is_terminated) return 0;
INFO("streaming thread start.");
- Encoder *encoder = ((ThreadParams*) params)->encoder;
- HttpHeader *header = ((ThreadParams*) params)->request;
try {
- int poll_state, rc, wc;
- struct pollfd poll_fd[2];
unsigned char buffer[BUFFFER_SIZE];
- poll_fd[0].fd = encoder->get_fd();
- poll_fd[0].events = POLLIN | POLLHUP;
-
while(!is_terminated) {
- poll_state = poll(poll_fd, 1, 1000);
+ int rc = 0, wc = 0;
+ struct pollfd poll_fd[2];
+ poll_fd[0].fd = encoder->get_fd();
+ poll_fd[0].events = POLLIN | POLLHUP;
+
+ int poll_state = ::poll(poll_fd, 1, 1000);
if (poll_state == -1) {
throw(trap("poll error."));
}
- else if (poll_state == 0) {
- continue;
- }
if (poll_fd[0].revents & POLLIN) {
rc = wc = 0;
- rc = read(encoder->get_fd(), buffer, BUFFFER_SIZE - 1);
+ rc = ::read(encoder->get_fd(), buffer, BUFFFER_SIZE - 1);
+
+ //DEBUG("%d bytes read..", rc);
+
if (rc <= 0) {
- break;
+ continue;
}
- else if (rc > 0) {
+ else {
wc = streaming_write((const char*) buffer, rc);
if (wc < rc) {
//DEBUG("need rewrite.. remain (%d)", rc - wc);
int retry_wc = 0;
- for (int remain_len = rc - wc; rc != wc; remain_len -= retry_wc) {
- poll_fd[0].revents = 0;
-
+ for (int remain_len = rc - wc; (rc != wc) && (!is_terminated); remain_len -= retry_wc) {
+ if (is_terminated) {
+ throw(trap("terminated"));
+ }
retry_wc = streaming_write((const char*) (buffer + rc - remain_len), remain_len);
wc += retry_wc;
}
@@ -243,115 +401,104 @@ void *streaming_thread_main(void *params)
}
}
}
- else if (poll_fd[0].revents & POLLHUP)
- {
+ else if (poll_fd[0].revents & POLLHUP) {
if (encoder->state == Encoder::ENCODER_STAT_STARTED) {
DEBUG("stop transcoding..");
encoder->ioctl(Encoder::IOCTL_STOP_TRANSCODING, 0);
}
break;
}
+ usleep(0);
}
}
catch (const trap &e) {
- ERROR("%s %s (%d)", e.what(), strerror(errno), errno);
+ ERROR("%s %s (%d)", e.what(), ::strerror(errno), errno);
}
is_terminated = true;
INFO("streaming thread stop.");
- if (encoder->state == Encoder::ENCODER_STAT_STARTED) {
- DEBUG("stop transcoding..");
- encoder->ioctl(Encoder::IOCTL_STOP_TRANSCODING, 0);
- }
-
pthread_exit(0);
return 0;
}
//----------------------------------------------------------------------
-void *source_thread_main(void *params)
+void *source_thread_main(void* params)
{
- Source *source = ((ThreadParams*) params)->source;
- Encoder *encoder = ((ThreadParams*) params)->encoder;
- HttpHeader *header = ((ThreadParams*) params)->request;
INFO("source thread start.");
try {
int poll_state, rc, wc;
- struct pollfd poll_fd[2];
+ struct pollfd poll_fd_enc[1];
+ struct pollfd poll_fd_src[1];
unsigned char buffer[BUFFFER_SIZE];
- poll_fd[0].fd = encoder->get_fd();
- poll_fd[0].events = POLLOUT;
-
- poll_fd[1].fd = source->get_fd();
- poll_fd[1].events = POLLIN;
+ poll_fd_enc[0].fd = encoder->get_fd();
+ poll_fd_enc[0].events = POLLOUT;
+ poll_fd_src[0].fd = source->get_fd();
+ poll_fd_src[0].events = POLLIN;
while(!is_terminated) {
- poll_state = poll(poll_fd, 2, 1000);
+ poll_state = poll(poll_fd_src, 1, 1000);
if (poll_state == -1) {
throw(trap("poll error."));
}
- else if (poll_state == 0) {
- continue;
- }
-
- if (poll_fd[0].revents & POLLOUT) {
- rc = wc = 0;
- if (poll_fd[1].revents & POLLIN) {
- rc = read(source->get_fd(), buffer, BUFFFER_SIZE - 1);
- if (rc == 0) {
- break;
- }
- else if (rc > 0) {
+ if (poll_fd_src[0].revents & POLLIN) {
+ rc =::read(source->get_fd(), buffer, BUFFFER_SIZE - 1);
+ if (rc <= 0) {
+ continue;
+ }
+ else if (rc > 0) {
+ poll_fd_enc[0].revents = 0;
+ poll_state = poll(poll_fd_enc, 1, 1000);
+ if (poll_fd_enc[0].revents & POLLOUT) {
wc = write(encoder->get_fd(), buffer, rc);
- //DEBUG("write : %d", wc);
if (wc < rc) {
- //DEBUG("need rewrite.. remain (%d)", rc - wc);
int retry_wc = 0;
- for (int remain_len = rc - wc; rc != wc; remain_len -= retry_wc) {
- poll_fd[0].revents = 0;
+ for (int remain_len = rc - wc; (rc != wc) && (!is_terminated); remain_len -= retry_wc) {
+ if (is_terminated) {
+ throw(trap("terminated"));
+ }
+ poll_fd_enc[0].revents = 0;
+ poll_state = poll(poll_fd_enc, 1, 1000);
+ if (poll_state == -1) {
+ throw(trap("poll error."));
+ }
- poll_state = poll(poll_fd, 1, 1000);
- if (poll_fd[0].revents & POLLOUT) {
- retry_wc = write(encoder->get_fd(), (buffer + rc - remain_len), remain_len);
+ if (poll_fd_enc[0].revents & POLLOUT) {
+ retry_wc = ::write(encoder->get_fd(), (buffer + rc - remain_len), remain_len);
wc += retry_wc;
}
+ LOG("re-write result : %d - %d", wc, rc);
}
- LOG("re-write result : %d - %d", wc, rc);
- usleep(500000);
}
}
}
}
+ usleep(0);
}
}
catch (const trap &e) {
- ERROR("%s %s (%d)", e.what(), strerror(errno), errno);
+ ERROR("%s %s (%d)", e.what(), ::strerror(errno), errno);
}
INFO("source thread stop.");
- pthread_exit(0);
+ pthread_exit(0);
return 0;
}
//----------------------------------------------------------------------
-int streaming_write(const char *buffer, size_t buffer_len, bool enable_log)
-{
- if (enable_log) {
- DEBUG("response data :\n%s", buffer);
- }
- return write(1, buffer, buffer_len);
-}
-//----------------------------------------------------------------------
-
void signal_handler(int sig_no)
{
- INFO("signal no : %d", sig_no);
+ ERROR("signal no : %s (%d)", strsignal(sig_no), sig_no);
is_terminated = true;
+ cbexit();
+
+ if (sig_no == SIGSEGV) {
+ exit(0);
+ }
}
//----------------------------------------------------------------------
@@ -365,3 +512,4 @@ void show_help()
printf(" ex > echo \"4\" > /tmp/.debug_on\n");
}
//----------------------------------------------------------------------
+