From 80f68801c21a46892cd762578b5d22bfd8103772 Mon Sep 17 00:00:00 2001 From: oskwon Date: Tue, 15 Dec 2015 15:58:14 +0900 Subject: support solo4k and fix play bug. --- .gitignore | 1 + build/Makefile | 9 +-- build/Makefile.arm | 80 +++++++++++++++++++++++ configure.ac | 12 ++++ src/Demuxer.cpp | 76 +++++++++++++++------- src/Demuxer.h | 2 + src/Encoder.cpp | 180 ++++++++------------------------------------------- src/Encoder.h | 35 ++++------ src/Http.cpp | 11 +++- src/Http.h | 4 +- src/Logger.cpp | 16 +++++ src/Logger.h | 2 + src/Mpeg.cpp | 6 +- src/Mpeg.h | 4 ++ src/SessionMap.cpp | 184 +++++++++++++++++++++++++++++++++++++++++++++++++++++ src/SessionMap.h | 69 ++++++++++++++++++++ src/Source.h | 11 ++++ src/Util.cpp | 24 ++++++- src/Util.h | 2 + src/main.cpp | 35 ++++++++-- 20 files changed, 549 insertions(+), 214 deletions(-) create mode 100644 build/Makefile.arm create mode 100644 src/SessionMap.cpp create mode 100644 src/SessionMap.h diff --git a/.gitignore b/.gitignore index eaf2fa2..8a0906e 100644 --- a/.gitignore +++ b/.gitignore @@ -36,3 +36,4 @@ transtreamproxy.PR transtreamproxy.PS transtreamproxy.PRI transtreamproxy.WK3 +siproject/ diff --git a/build/Makefile b/build/Makefile index a07c88b..1f369ad 100644 --- a/build/Makefile +++ b/build/Makefile @@ -21,12 +21,12 @@ OBJ=./obj/ ifeq ($(CROSS),YES) SYSROOT=$(OETOP)/build/$(MODEL)/tmp/sysroots/$(MODEL) -TOOLCHAIN=$(OETOP)/build/$(MODEL)/tmp/sysroots/i686-linux/usr/bin/mips32el-oe-linux/mipsel-oe-linux- +TOOLCHAIN=$(OETOP)/build/$(MODEL)/tmp/sysroots/i686-linux/usr/bin/mipsel-oe-linux/mipsel-oe-linux- endif RM=rm -Rf -CXX=$(TOOLCHAIN)g++ -LD=$(TOOLCHAIN)ld +CXX=$(TOOLCHAIN)g++ --sysroot=$(OETOP)/build/$(MODEL)/tmp/sysroots/$(MODEL) +LD=$(TOOLCHAIN)ld --sysroot=$(OETOP)/build/$(MODEL)/tmp/sysroots/$(MODEL) STRIP=$(TOOLCHAIN)strip UPLOAD=./script.upload @@ -36,7 +36,7 @@ else CFLAGS += -O2 endif CFLAGS += -D_MAJOR=$(MAJOR) -D_MINOR=$(MINOR) -D_GNU_SOURCE -D_FILE_OFFSET_BITS=64 -CFLAGS += -I$(SYSROOT)/usr/include +CFLAGS += -I$(SYSROOT)/usr/include -I$(TOP)/build LDFLAGS += -L$(SYSROOT)/usr/lib -lpthread -lrt SRCS = $(shell find ../src/ -name "*.cpp") @@ -62,6 +62,7 @@ clean: $(RM) $(PROJECT) obj *.o *.a *.d *.log .prepare: + echo "" > config.h @if [ ! -e obj ]; then mkdir obj; fi .showinfo: diff --git a/build/Makefile.arm b/build/Makefile.arm new file mode 100644 index 0000000..c871428 --- /dev/null +++ b/build/Makefile.arm @@ -0,0 +1,80 @@ +#============================================================================ +# Name : Makefile (transtreamproxy) +# Author : oskwon(oskwon@dev3) +# Version : +# Copyright : Copyright(c)2014 Vu+ Team. All right reserved. +# Description : +#============================================================================ + +-include config.mk + +ifeq ($(MODEL),) +$(error config.mk is not set. please run script.config before make.) +endif + +MAJOR = 3 +MINOR = 0 +PROJECT = transtreamproxy + +TOP=$(PWD)/.. +OBJ=./obj/ + +ifeq ($(CROSS),YES) +SYSROOT=$(OETOP)/build/$(MODEL)/tmp/sysroots/$(MODEL) +TOOLCHAIN=$(OETOP)/build/$(MODEL)/tmp/sysroots/i686-linux/usr/bin/arm-oe-linux-gnueabi/arm-oe-linux-gnueabi- +endif + +RM=rm -Rf +CXX=$(TOOLCHAIN)g++ -march=armv7-a -mfloat-abi=hard -mfpu=neon --sysroot=$(SYSROOT) +LD=$(TOOLCHAIN)ld --sysroot=$(SYSROOT) +STRIP=$(TOOLCHAIN)strip +UPLOAD=./script.upload + +ifeq ($(MODE),DEBUG) +CFLAGS += -g +else +CFLAGS += -O2 +endif +CFLAGS += -D_MAJOR=$(MAJOR) -D_MINOR=$(MINOR) -D_GNU_SOURCE -D_FILE_OFFSET_BITS=64 +CFLAGS += -I$(SYSROOT)/usr/include -I$(TOP)/build +LDFLAGS += -L$(SYSROOT)/usr/lib -lpthread -lrt + +SRCS = $(shell find ../src/ -name "*.cpp") +OBJS = $(SRCS:.cpp=.o) +CFLAGS += $(addprefix -I, $(shell find ../src/ -type d)) + +.SUFFIXES : .cpp .o +.PHONY : all clean install .showinfo .prepare $(PROJECT) + +.cpp.o: + $(CXX) -c $(CFLAGS) -o $(OBJ)$(notdir $@) $< + +all: .showinfo .prepare $(PROJECT) + +$(PROJECT):$(OBJS) + $(CXX) -o $@ $(addprefix $(OBJ), $(notdir $(OBJS))) $(LDFLAGS) + $(STRIP) $@ + +install: + $(UPLOAD) $(UPIP) . $(PROJECT) $(UPDIR) + +clean: + $(RM) $(PROJECT) obj *.o *.a *.d *.log + +.prepare: + @echo "#define HAVE_EXT_PID 1" > $(TOP)/build/config.h + @if [ ! -e obj ]; then mkdir obj; fi + +.showinfo: + @echo "-----------------------------------------------------" + @echo " [ BUILD ENVIRONMENT ] " + @echo "-----------------------------------------------------" + @echo "PROJECT : "$(PROJECT)" (v"$(MAJOR)"."$(MINOR)")" + @echo "" + @echo "CXX : "$(CXX) + @echo "LD : "$(LD) + @echo "STRIP : "$(STRIP) + @echo "CFLAGS : "$(CFLAGS) + @echo "LDFLAGS : "$(LDFLAGS) + @echo "-----------------------------------------------------" + @echo diff --git a/configure.ac b/configure.ac index dfe149a..4a9f975 100644 --- a/configure.ac +++ b/configure.ac @@ -32,6 +32,18 @@ AC_SUBST(PLATFORM) AC_SUBST(INCPATH) # Checks for typedefs, structures, and compiler characteristics. +AC_SUBST(USE_EXT_PID) + +AC_ARG_ENABLE([ext-pid], + [AS_HELP_STRING([--enable-ext-pid], + [enable extend pid (default is no)])], + [use_ext_pid=$enableval], + [use_ext_pid=no]) + +if test "$use_ext_pid" = "yes"; then + AC_DEFINE([HAVE_EXT_PID], [1], [Define to 1 if support extend pid.]) + USE_EXT_PID=1 +fi # Checks for library functions. diff --git a/src/Demuxer.cpp b/src/Demuxer.cpp index 345c830..e8be7bd 100644 --- a/src/Demuxer.cpp +++ b/src/Demuxer.cpp @@ -29,6 +29,9 @@ std::string Demuxer::webif_reauest(std::string request) throw(http_trap) if ((sock = socket(PF_INET, SOCK_STREAM, 0)) < 0) throw(trap("webif create socket fail.")); + int nostream_count = 0; + std::string response = ""; + struct sockaddr_in sock_addr; sock_addr.sin_family = AF_INET; sock_addr.sin_port = htons(80); @@ -36,41 +39,68 @@ std::string Demuxer::webif_reauest(std::string request) throw(http_trap) if (connect(sock, (struct sockaddr*)&sock_addr, sizeof(struct sockaddr_in))) throw(http_trap("webif connect fail.", 502, "Bad Gateway, webif connect fail")); - if (write(sock, request.c_str(), request.length()) != request.length()) - throw(http_trap("webif request fail.", 502, "Bad Gateway, webif request error")); - DEBUG("webif request :\n", request.c_str()); + for (nostream_count = 0; nostream_count < 3; nostream_count++) { + if (write(sock, request.c_str(), request.length()) != request.length()) + throw(http_trap("webif request fail.", 502, "Bad Gateway, webif request error")); + DEBUG("webif request : %s\n", request.c_str()); - std::string response = ""; - struct pollfd pollevt[2]; - pollevt[0].fd = sock; - pollevt[0].events = POLLIN; - for (;;) { - char buffer[1024] = {0}; - - pollevt[0].revents = 0; - int poll_state = poll(pollevt, 1, 1000); - if (poll_state == 0) { - break; - } - else if (poll_state < 0) { - ERROR("webif receive poll error : %s (%d)", strerror(errno), errno); - throw(http_trap("webif response fail.", 502, "Bad Gateway, webif response error")); - } - if (pollevt[0].revents & POLLIN) { - if (read(sock, buffer, 1024) <= 0) { + struct pollfd pollevt[2]; + pollevt[0].fd = sock; + pollevt[0].events = POLLIN; + for (;;) { + char buffer[1024] = {0}; + + pollevt[0].revents = 0; + int poll_state = poll(pollevt, 1, 1000); + DEBUG("poll state : %d", poll_state); + + if (poll_state == 0) { + break; + } + else if (poll_state < 0) { + ERROR("webif receive poll error : %s (%d)", strerror(errno), errno); + throw(http_trap("webif response fail.", 502, "Bad Gateway, webif response error")); + } + if (pollevt[0].revents & POLLIN) { + int rc = read(sock, buffer, 1024); + if (rc <= 0) { + WARNING("read error : %d", rc); + break; + } + response += buffer; + } + if (terminated()) { + DEBUG("already terminated!!"); + response = ""; break; } - response += buffer; } + ERROR("++nostream count : %d", nostream_count); if (terminated()) { + DEBUG("already terminated!!"); response = ""; break; } + if (response.find("= NO STREAM") != std::string::npos && response.find(":pat,") == std::string::npos) { + response = ""; + usleep(1000*500); + continue; + } } + return response; } //------------------------------------------------------------------------------- +void Demuxer::disconnect_webif_socket() +{ + if (sock != -1) { + ::close(sock); + } + sock = -1; +} +//------------------------------------------------------------------------------- + bool Demuxer::already_exist(std::vector &pidlist, int pid) { for(int i = 0; i < pidlist.size(); ++i) { @@ -209,6 +239,8 @@ void Demuxer::open() throw(http_trap) Demuxer::Demuxer(HttpHeader *header) throw(http_trap) { + source_type = Source::SOURCE_TYPE_LIVE; + SingleLock lock(&demux_mutex); demux_id = pat_pid = fd = sock = -1; diff --git a/src/Demuxer.h b/src/Demuxer.h index 5370f44..63f98e4 100644 --- a/src/Demuxer.h +++ b/src/Demuxer.h @@ -45,6 +45,8 @@ public: int get_fd() const throw(); bool is_initialized() { return true; } + + void disconnect_webif_socket(); }; //---------------------------------------------------------------------- diff --git a/src/Encoder.cpp b/src/Encoder.cpp index 10142a1..ce87b30 100644 --- a/src/Encoder.cpp +++ b/src/Encoder.cpp @@ -8,7 +8,6 @@ #include #include #include -#include #include #include #include @@ -18,6 +17,8 @@ #include "Logger.h" #include "Encoder.h" +#include "SessionMap.h" + bool terminated(); using namespace std; @@ -28,56 +29,30 @@ Encoder::Encoder() throw(trap) SingleLock lock(&encoder_mutex); encoder_id = fd = -1; - max_encodr_count = state = ENCODER_STAT_INIT; + state = ENCODER_STAT_INIT; - DIR* d = opendir("/dev"); - if (d != 0) { - struct dirent* de; - while ((de = readdir(d)) != 0) { - if (strncmp("bcm_enc", de->d_name, 7) == 0) { - max_encodr_count++; - } + try { + SessionMap* session_map = SessionMap::get(); + if (session_map == 0) { + throw(trap("create session map fail.")); } - closedir(d); - } - - mSemId = 0; - mShmFd = 0; - mShmData = 0; - - mSemName = "/tsp_session_sem"; - mShmName = "/tsp_session_shm"; - mShmSize = sizeof(Session) * max_encodr_count; - if (Open() == false) - throw(trap("session ctrl init fail.")); - DEBUG("shm-info : fd [%d], name [%s], size [%d], data [%p]", mShmFd, mShmName.c_str(), mShmSize, mShmData); - DEBUG("sem-info : id [%p], name [%s]", mSemId, mSemName.c_str()); + session_map->dump("before init."); + session_map->cleanup(); - std::vector pidlist = Util::find_process_by_name("transtreamproxy", 0); - - session_dump("before init."); - - Wait(); - for (int i = 0; i < max_encodr_count; i++) { - if (mShmData[i].pid != 0) { - int pid = mShmData[i].pid; - if(session_terminated(pidlist, pid)) { - session_erase(pid); - } + int mypid = getpid(); + std::string ipaddr = Util::host_addr(); + if (session_map->already_exist(ipaddr) > 0) { + encoder_id = session_map->update(ipaddr, mypid); } + else { + encoder_id = session_map->add(ipaddr, mypid); + } + DEBUG("encoder_device_id : %d", encoder_id); } - Post(); - - int mypid = getpid(); - std::string ipaddr = Util::host_addr(); - if (session_already_exist(ipaddr) > 0) { - encoder_id = session_update(ipaddr, mypid); - } - else { - encoder_id = session_register(ipaddr, mypid); + catch (const trap &e) { + throw(e); } - DEBUG("encoder_device_id : %d", encoder_id); } //---------------------------------------------------------------------- @@ -85,7 +60,14 @@ Encoder::~Encoder() { SingleLock lock(&encoder_mutex); - Post(); + try { + SessionMap* session_map = SessionMap::get(); + if (session_map) { + session_map->post(); + } + } + catch (const trap &e) { + } encoder_close(); } //---------------------------------------------------------------------- @@ -156,111 +138,3 @@ int Encoder::get_fd() } //---------------------------------------------------------------------- -void Encoder::session_dump(const char* aMessage) -{ - if (Logger::instance()->get_level() >= Logger::INFO) { - DUMMY(" >> %s", aMessage); - DUMMY("-------- [ DUMP HOST INFO ] ---------"); - for (int i = 0; i < max_encodr_count; i++) { - DUMMY("%d : ip [%s], pid [%d]", i, mShmData[i].ip, mShmData[i].pid); - } - DUMMY("-------------------------------------"); - } -} -//---------------------------------------------------------------------- - -bool Encoder::session_terminated(std::vector& aList, int aPid) -{ - for (int i = 0; i < aList.size(); ++i) { - if (aList[i] == aPid) { - return false; - } - } - return true; -} -//---------------------------------------------------------------------- - -int Encoder::session_register(std::string aIpAddr, int aPid) -{ - int i = 0; - bool result = false; - - Wait(); - for (; i < max_encodr_count; i++) { - if (mShmData[i].pid == 0) { - result = true; - mShmData[i].pid = aPid; - strcpy(mShmData[i].ip, aIpAddr.c_str()); - break; - } - } - Post(); - session_dump("after register."); - - return result ? i : -1; -} -//---------------------------------------------------------------------- - -void Encoder::session_unregister(std::string aIpAddr) -{ - Wait(); - for (int i = 0; i < max_encodr_count; i++) { - if (strcmp(mShmData[i].ip, aIpAddr.c_str()) == 0) { - memset(mShmData[i].ip, 0, 16); - mShmData[i].pid = 0; - break; - } - } - Post(); - session_dump("after unregister."); -} -//---------------------------------------------------------------------- - -void Encoder::session_erase(int aPid) -{ - for (int i = 0; i < max_encodr_count; i++) { - if (mShmData[i].pid == aPid) { - DEBUG("erase.. %s : %d", mShmData[i].ip, mShmData[i].pid); - memset(mShmData[i].ip, 0, 16); - mShmData[i].pid = 0; - break; - } - } -} -//---------------------------------------------------------------------- - -int Encoder::session_update(std::string aIpAddr, int aPid) -{ - int i = 0; - bool result = false; - - session_dump("before update."); - Wait(); - for (; i < max_encodr_count; i++) { - if (strcmp(mShmData[i].ip, aIpAddr.c_str()) == 0) { - result = true; - Util::kill_process(mShmData[i].pid); - memset(mShmData[i].ip, 0, 16); - mShmData[i].pid = 0; - break; - } - } - Post(); - session_register(aIpAddr, aPid); - return result ? i : -1; -} -//---------------------------------------------------------------------- - -int Encoder::session_already_exist(std::string aIpAddr) -{ - int existCount = 0; - Wait(); - for (int i = 0; i < max_encodr_count; i++) { - if (strcmp(mShmData[i].ip, aIpAddr.c_str()) == 0) { - existCount++; - } - } - Post(); - return existCount; -} -//---------------------------------------------------------------------- diff --git a/src/Encoder.h b/src/Encoder.h index 5831482..05f353c 100644 --- a/src/Encoder.h +++ b/src/Encoder.h @@ -8,21 +8,15 @@ #ifndef ENCODER_H_ #define ENCODER_H_ +#include "config.h" + #include #include "3rdparty/trap.h" #include "Mutex.h" -#include "SharedMemory.h" -//---------------------------------------------------------------------- -typedef struct _session_t { - int pid; - char ip[16]; -} Session; -//---------------------------------------------------------------------- - -class Encoder : public SharedMemory +class Encoder { private: int fd; @@ -30,9 +24,16 @@ private: public: enum { - IOCTL_SET_VPID = 1, - IOCTL_SET_APID = 2, +#ifdef HAVE_EXT_PID + IOCTL_SET_VPID = 11, + IOCTL_SET_APID = 12, + IOCTL_SET_PMTPID = 13, +#else + IOCTL_SET_VPID = 1, + IOCTL_SET_APID = 2, IOCTL_SET_PMTPID = 3, +#endif + IOCTL_START_TRANSCODING = 100, IOCTL_STOP_TRANSCODING = 200 }; @@ -46,18 +47,6 @@ public: int state; int encoder_id; - int max_encodr_count; - -protected: - void session_dump(const char* aMessage); - - void session_erase(int aPid); - int session_register(std::string aIpAddr, int aPid); - void session_unregister(std::string aIpAddr); - - int session_update(std::string aIpAddr, int aPid); - bool session_terminated(std::vector& aList, int aPid); - int session_already_exist(std::string aIpAddr); protected: bool encoder_open(); diff --git a/src/Http.cpp b/src/Http.cpp index 26e1e99..3583811 100644 --- a/src/Http.cpp +++ b/src/Http.cpp @@ -6,7 +6,7 @@ */ #include - +#include #include #include "Util.h" @@ -73,6 +73,11 @@ bool HttpHeader::parse_request(std::string header) else if (page == "/m3u") { type = HttpHeader::M3U; } + else if (page == "/live") { + if (page_params["cmd"] == "stop") { + type = HttpHeader::TRANSCODING_LIVE_STOP; + } + } } // live else { @@ -177,7 +182,7 @@ std::string HttpHeader::read_request() std::string request = ""; while (true) { char buffer[128] = {0}; - if (!read (0, buffer, 127)) { + if (!::read (0, buffer, 127)) { break; } request += buffer; @@ -188,7 +193,7 @@ std::string HttpHeader::read_request() request = ""; break; } - usleep(0); + ::usleep(0); } return request; } diff --git a/src/Http.h b/src/Http.h index 6c54d95..f996579 100644 --- a/src/Http.h +++ b/src/Http.h @@ -22,7 +22,9 @@ public: TRANSCODING_LIVE, TRANSCODING_FILE, M3U, - TRANSCODING_FILE_CHECK + TRANSCODING_FILE_CHECK, + TRANSCODING_LIVE_STOP, + TRANSCODING_MAX }; int type; diff --git a/src/Logger.cpp b/src/Logger.cpp index b591bd6..c7bfe35 100644 --- a/src/Logger.cpp +++ b/src/Logger.cpp @@ -37,6 +37,22 @@ static const char* LOG_LV_STR[] = { #endif //---------------------------------------------------------------------- +char* get_timestamp() +{ + time_t rawtime; + struct tm *timeinfo; + static char buffer[80]; + + memset(buffer, 0, 80); + + time(&rawtime); + timeinfo = localtime(&rawtime); + strftime(buffer, 80, "%Y%m%d-%H%M%S", timeinfo); + + return buffer; +} +//---------------------------------------------------------------------- + Logger::Logger() : mLogLevel(0), mLogHandle(0) { diff --git a/src/Logger.h b/src/Logger.h index e9bcb23..7dcb342 100644 --- a/src/Logger.h +++ b/src/Logger.h @@ -38,6 +38,8 @@ #endif /* USE_DEBUG */ //---------------------------------------------------------------------- +char* get_timestamp(); + class Logger { private: diff --git a/src/Mpeg.cpp b/src/Mpeg.cpp index c2e7781..dafc1c2 100644 --- a/src/Mpeg.cpp +++ b/src/Mpeg.cpp @@ -10,6 +10,8 @@ #include "Util.h" #include "Logger.h" +#include +#include #include #include @@ -649,6 +651,8 @@ bool Mpeg::read_ts_meta(std::string media_file_name, int &vpid, int &apid) Mpeg::Mpeg(std::string filename, bool request_time_seek) throw (trap) // : MpegTS(filename) { + source_type = Source::SOURCE_TYPE_FILE; + m_current_offset = m_base_offset = m_last_offset = 0; m_splitsize = m_nrfiles = m_current_file = m_totallength = 0; @@ -660,7 +664,7 @@ Mpeg::Mpeg(std::string filename, bool request_time_seek) throw (trap) pmt_pid = video_pid = audio_pid = -1; - fd = open(filename.c_str(), O_RDONLY | O_LARGEFILE, 0); + fd = ::open(filename.c_str(), O_RDONLY | O_LARGEFILE, 0); if (fd < 0) { throw(trap("cannot open file")); } diff --git a/src/Mpeg.h b/src/Mpeg.h index 4c30e07..a6f3a23 100644 --- a/src/Mpeg.h +++ b/src/Mpeg.h @@ -8,6 +8,10 @@ #ifndef MPEG_H_ #define MPEG_H_ +#include +#include +#include + #include #include diff --git a/src/SessionMap.cpp b/src/SessionMap.cpp new file mode 100644 index 0000000..55b4f5d --- /dev/null +++ b/src/SessionMap.cpp @@ -0,0 +1,184 @@ +/* + * SessionMap.cpp + * + * Created on: 2015. 12. 10. + * Author: oskwon + */ + +#include +#include +#include +#include +#include +#include + +#include "Logger.h" +#include "Util.h" +#include "SessionMap.h" + +using namespace std; + +SessionMap* SessionMap::instance = 0; + +//------------------------------------------------------------------------------- +SessionMap::SessionMap() throw(trap) +{ + mSemId = 0; + mShmFd = 0; + mShmData = 0; + max_encoder_count = Util::get_encoder_count(); + + mSemName = "/tsp_session_sem"; + mShmName = "/tsp_session_shm"; + mShmSize = sizeof(SessionInfo) * max_encoder_count; + + if (Open() == false) + throw(trap("session ctrl init fail.")); + DEBUG("shm-info : fd [%d], name [%s], size [%d], data [%p]", mShmFd, mShmName.c_str(), mShmSize, mShmData); + DEBUG("sem-info : id [%p], name [%s]", mSemId, mSemName.c_str()); + +} +//------------------------------------------------------------------------------- + +int SessionMap::get_pid_by_ip(std::string aIpAddr) +{ + for (int i = 0; i < max_encoder_count; i++) { + if (strcmp(mShmData[i].ip, aIpAddr.c_str()) == 0) + return mShmData[i].pid; + } + return 0; +} +//------------------------------------------------------------------------------- + +void SessionMap::cleanup() +{ + std::vector pidlist = Util::find_process_by_name("transtreamproxy", 0); + + Wait(); + for (int i = 0; i < max_encoder_count; i++) { + if (mShmData[i].pid != 0) { + int pid = mShmData[i].pid; + if(terminated(pidlist, pid)) { + erase(pid); + } + } + } + Post(); +} +//------------------------------------------------------------------------------- + +void SessionMap::dump(const char* aMessage) +{ + if (Logger::instance()->get_level() >= Logger::INFO) { + DUMMY(" >> %s", aMessage); + DUMMY("-------- [ DUMP HOST INFO ] ---------"); + for (int i = 0; i < max_encoder_count; i++) { + DUMMY("%d : ip [%s], pid [%d]", i, mShmData[i].ip, mShmData[i].pid); + } + DUMMY("-------------------------------------"); + } +} +//---------------------------------------------------------------------- + +bool SessionMap::terminated(std::vector& aList, int aPid) +{ + for (int i = 0; i < aList.size(); ++i) { + if (aList[i] == aPid) { + return false; + } + } + return true; +} +//---------------------------------------------------------------------- + +int SessionMap::add(std::string aIpAddr, int aPid) +{ + int i = 0; + bool result = false; + + Wait(); + for (; i < max_encoder_count; i++) { + if (mShmData[i].pid == 0) { + result = true; + mShmData[i].pid = aPid; + strcpy(mShmData[i].ip, aIpAddr.c_str()); + break; + } + } + Post(); + dump("after register."); + + return result ? i : -1; +} +//---------------------------------------------------------------------- + +void SessionMap::remove(std::string aIpAddr) +{ + Wait(); + for (int i = 0; i < max_encoder_count; i++) { + if (strcmp(mShmData[i].ip, aIpAddr.c_str()) == 0) { + memset(mShmData[i].ip, 0, 16); + mShmData[i].pid = 0; + break; + } + } + Post(); + dump("after unregister."); +} +//---------------------------------------------------------------------- + +void SessionMap::erase(int aPid) +{ + for (int i = 0; i < max_encoder_count; i++) { + if (mShmData[i].pid == aPid) { + DEBUG("erase.. %s : %d", mShmData[i].ip, mShmData[i].pid); + memset(mShmData[i].ip, 0, 16); + mShmData[i].pid = 0; + break; + } + } +} +//---------------------------------------------------------------------- + +int SessionMap::update(std::string aIpAddr, int aPid) +{ + int i = 0; + bool result = false; + + dump("before update."); + Wait(); + for (; i < max_encoder_count; i++) { + if (strcmp(mShmData[i].ip, aIpAddr.c_str()) == 0) { + result = true; + Util::kill_process(mShmData[i].pid); + memset(mShmData[i].ip, 0, 16); + mShmData[i].pid = 0; + break; + } + } + Post(); + add(aIpAddr, aPid); + return result ? i : -1; +} +//---------------------------------------------------------------------- + +int SessionMap::already_exist(std::string aIpAddr) +{ + int existCount = 0; + Wait(); + for (int i = 0; i < max_encoder_count; i++) { + if (strcmp(mShmData[i].ip, aIpAddr.c_str()) == 0) { + existCount++; + } + } + Post(); + return existCount; +} +//---------------------------------------------------------------------- + +void SessionMap::post() +{ + Post(); +} +//---------------------------------------------------------------------- + diff --git a/src/SessionMap.h b/src/SessionMap.h new file mode 100644 index 0000000..3f4c643 --- /dev/null +++ b/src/SessionMap.h @@ -0,0 +1,69 @@ +/* + * SessionMap.h + * + * Created on: 2015. 12. 10. + * Author: oskwon + */ + +#ifndef SESSION_MAP_H_ +#define SESSION_MAP_H_ + +#include +#include + + +#include "config.h" + +#include "SharedMemory.h" +#include "3rdparty/trap.h" +//------------------------------------------------------------------------------- + +typedef struct _session_t { + int pid; + char ip[16]; +} SessionInfo; +//------------------------------------------------------------------------------- + +class SessionMap : public SharedMemory +{ +protected: + int max_encoder_count; + static SessionMap* instance; + + virtual ~SessionMap() {} + +public: + SessionMap() throw(trap); + + void dump(const char* aMessage); + + void cleanup(); + + void erase(int aPid); + int add(std::string aIpAddr, int aPid); + void remove(std::string aIpAddr); + + int update(std::string aIpAddr, int aPid); + bool terminated(std::vector& aList, int aPid); + int already_exist(std::string aIpAddr); + int get_pid_by_ip(std::string aIpAddr); + void post(); + + static SessionMap* get() throw(trap) + { + if (instance == 0) { + try { + instance = new SessionMap(); + } + catch (const trap &e) { + throw(e); + } + } + return instance; + } + +}; +//---------------------------------------------------------------------- + +#endif /*SESSION_MAP_H_*/ + diff --git a/src/Source.h b/src/Source.h index 2b90fa4..b81ba6c 100644 --- a/src/Source.h +++ b/src/Source.h @@ -14,15 +14,26 @@ class Source { public: + enum source_type_t { + SOURCE_TYPE_NONE=0, + SOURCE_TYPE_FILE, + SOURCE_TYPE_LIVE, + SOURCE_TYPE_MAX + }; + Source(){} virtual ~Source(){} virtual int get_fd() const throw() = 0; virtual bool is_initialized() = 0; + + Source::source_type_t get_source_type() { return source_type; } public: int pmt_pid; int video_pid; int audio_pid; + + Source::source_type_t source_type; }; //---------------------------------------------------------------------- diff --git a/src/Util.cpp b/src/Util.cpp index ad54464..5822bb8 100644 --- a/src/Util.cpp +++ b/src/Util.cpp @@ -10,6 +10,7 @@ #include #include #include +#include #include #include @@ -100,7 +101,7 @@ std::string Util::host_addr() return ss.str(); } -//------------------------------------------------------------------------------- +//---------------------------------------------------------------------- std::vector Util::find_process_by_name(std::string name, int mypid) { @@ -134,7 +135,8 @@ std::vector Util::find_process_by_name(std::string name, int mypid) } return pidlist; } -//------------------------------------------------------------------------------- +//---------------------------------------------------------------------- + void Util::kill_process(int pid) { @@ -144,3 +146,21 @@ void Util::kill_process(int pid) DEBUG("SEND SIGINT to %d, result : %d", pid, result); } //---------------------------------------------------------------------- + +int Util::get_encoder_count() +{ + int max_encodr_count = 0; + DIR* d = opendir("/dev"); + if (d != 0) { + struct dirent* de; + while ((de = readdir(d)) != 0) { + if (strncmp("bcm_enc", de->d_name, 7) == 0) { + max_encodr_count++; + } + } + closedir(d); + } + return max_encodr_count; +} +//---------------------------------------------------------------------- + diff --git a/src/Util.h b/src/Util.h index 089ffb5..19a274c 100644 --- a/src/Util.h +++ b/src/Util.h @@ -36,6 +36,8 @@ public: static std::string host_addr(); static std::vector find_process_by_name(std::string name, int mypid); + + static int get_encoder_count(); }; //---------------------------------------------------------------------- diff --git a/src/main.cpp b/src/main.cpp index b00c33e..9996ac2 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -27,6 +27,7 @@ #include "Demuxer.h" #include "Encoder.h" #include "UriDecoder.h" +#include "SessionMap.h" using namespace std; //---------------------------------------------------------------------- @@ -52,6 +53,19 @@ pid_t tsp_pid = 0, checker_pid = 0; unsigned long last_updated_time = 0; //---------------------------------------------------------------------- +void release_webif_record() +{ + if (source) { + Source::source_type_t type = source->get_source_type(); + if (type == Source::SOURCE_TYPE_LIVE) { + Demuxer *demux = (Demuxer*)source; + demux->disconnect_webif_socket(); + DEBUG("---->> disconnected at %s <<----", get_timestamp()); + } + } +} +//---------------------------------------------------------------------- + bool terminated() { return is_terminated; @@ -127,6 +141,7 @@ inline int send_signal(pid_t pid, int signal) void signal_handler_checker(int sig_no) { + release_webif_record(); is_terminated = true; } //---------------------------------------------------------------------- @@ -202,7 +217,7 @@ int main(int argc, char **argv) exit(0); } tsp_pid = ::getpid(); - + signal(SIGUSR1, signal_handler_checker); Logger::instance()->init("/tmp/transtreamproxy", Logger::ERROR); @@ -278,6 +293,13 @@ int main(int argc, char **argv) catch (...) { } exit(0); + case HttpHeader::TRANSCODING_LIVE_STOP: { + char command[32] = {0}; + sprintf(command, "kill -16 %d", SessionMap::get()->get_pid_by_ip(Util::host_addr())); + system(command); /* sending SIGUSR1 signal to specific transtreamproxy process before zapping another tp. */ + DEBUG("---->> live stop starting at %s :: %s <<----", get_timestamp(), command); + throw(http_trap(std::string("transcoding live stop : ") + Util::ultostr(header.type), 200, "OK")); + } default: throw(http_trap(std::string("not support source type : ") + Util::ultostr(header.type), 400, "Bad Request")); } @@ -355,6 +377,7 @@ int main(int argc, char **argv) send_signal(checker_pid, SIGUSR1); pthread_join(stream_thread_handle, 0); + is_terminated = true; if (source != 0) { @@ -383,6 +406,8 @@ int main(int argc, char **argv) exit(-1); } send_signal(checker_pid, SIGUSR1); + + DEBUG("---->> done at %s <<----", get_timestamp()); return 0; } //---------------------------------------------------------------------- @@ -402,7 +427,7 @@ void *streaming_thread_main(void *params) poll_fd[0].fd = encoder->get_fd(); poll_fd[0].events = POLLIN | POLLHUP; - int poll_state = ::poll(poll_fd, 1, 1000); + int poll_state = ::poll(poll_fd, 1, 500); if (poll_state == -1) { throw(trap("poll error.")); } @@ -470,7 +495,7 @@ void *source_thread_main(void* params) poll_fd_src[0].events = POLLIN; while(!is_terminated) { - poll_state = poll(poll_fd_src, 1, 1000); + poll_state = poll(poll_fd_src, 1, 500); if (poll_state == -1) { throw(trap("poll error.")); } @@ -481,7 +506,7 @@ void *source_thread_main(void* params) } else if (rc > 0) { poll_fd_enc[0].revents = 0; - poll_state = poll(poll_fd_enc, 1, 1000); + poll_state = poll(poll_fd_enc, 1, 500); if (poll_fd_enc[0].revents & POLLOUT) { wc = write(encoder->get_fd(), buffer, rc); if (wc < rc) { @@ -491,7 +516,7 @@ void *source_thread_main(void* params) throw(trap("terminated")); } poll_fd_enc[0].revents = 0; - poll_state = poll(poll_fd_enc, 1, 1000); + poll_state = poll(poll_fd_enc, 1, 500); if (poll_state == -1) { throw(trap("poll error.")); } -- cgit