diff options
author | oskwon <oskwon@dev3> | 2014-06-28 03:47:48 (GMT) |
---|---|---|
committer | oskwon <oskwon@dev3> | 2014-06-28 03:47:48 (GMT) |
commit | e61557805d53db07643a60d97cea0cf64afcfb79 (patch) | |
tree | 9b2f19306b50a30f6289b44c149d1eeb02d68b94 | |
parent | 7302a1cf53b36472514c668de7ea9b201c1ba6fb (diff) | |
parent | 41562f0f6c1e1b658e31e033ba0ce7e78fe0aa86 (diff) |
Merge branch 'tsp_new' into transtreamproxy
Conflicts:
.gitignore
src/main.cpp
-rw-r--r-- | .gitignore | 3 | ||||
-rw-r--r-- | script/Makefile | 71 | ||||
-rwxr-xr-x | script/script.config | 48 | ||||
-rwxr-xr-x | script/script.upload | 33 | ||||
-rw-r--r-- | src/Demuxer.cpp | 240 | ||||
-rw-r--r-- | src/Demuxer.h | 49 | ||||
-rw-r--r-- | src/Encoder.cpp | 250 | ||||
-rw-r--r-- | src/Encoder.h | 73 | ||||
-rw-r--r-- | src/Http.cpp | 188 | ||||
-rw-r--r-- | src/Http.h | 53 | ||||
-rw-r--r-- | src/Logger.cpp | 208 | ||||
-rw-r--r-- | src/Logger.h | 81 | ||||
-rw-r--r-- | src/Mpeg.cpp | 510 | ||||
-rw-r--r-- | src/Mpeg.h | 74 | ||||
-rw-r--r-- | src/SharedMemory.h | 92 | ||||
-rw-r--r-- | src/Source.h | 23 | ||||
-rw-r--r-- | src/UriDecoder.cpp | 277 | ||||
-rw-r--r-- | src/UriDecoder.h | 43 | ||||
-rw-r--r-- | src/Util.cpp | 145 | ||||
-rw-r--r-- | src/Util.h | 49 | ||||
-rw-r--r-- | src/external/mpegts.cpp | 735 | ||||
-rw-r--r-- | src/external/mpegts.h | 216 | ||||
-rw-r--r-- | src/external/trap.cpp | 42 | ||||
-rw-r--r-- | src/external/trap.h | 33 | ||||
-rw-r--r-- | src/main.cpp | 545 |
25 files changed, 3844 insertions, 237 deletions
@@ -24,4 +24,5 @@ install-sh missing stamp-h1 config.h.in~ - +config.mk +config.mk.bak diff --git a/script/Makefile b/script/Makefile new file mode 100644 index 0000000..e67f04c --- /dev/null +++ b/script/Makefile @@ -0,0 +1,71 @@ +#============================================================================ +# Name : Makefile (transtreamproxy) +# Author : oskwon(kos@dev3) +# Version : +# Copyright : Copyright(c)2013 Vu+ Team. All right reserved. +# Description : +#============================================================================ + +-include config.mk + +ifeq ($(MODEL),) +$(error config.mk is not set. please run script.config before make.) +endif + +MAJOR = 3 +MINOR = 0 +PROJECT = transtreamproxy + +TOP=$(PWD)/.. +OBJ=./obj/ + +CROSS=$(OETOP)/$(MODEL)/build/tmp/cross/mipsel/bin/mipsel-oe-linux- +SYSROOT=$(OETOP)/$(MODEL)/build/tmp/staging/mipsel-oe-linux + +RM=rm -Rf +CXX=$(CROSS)g++ +LD=$(CROSS)ld +STRIP=$(CROSS)strip +UPLOAD=$(TOP)/script/script.upload + +SRCS = $(shell find ../src/ -name "*.cpp") +OBJS=$(SRCS:.cpp=.o) + +CFLAGS += -D_MAJOR=$(MAJOR) -D_MINOR=$(MINOR) +CFLAGS += -O2 -D_GNU_SOURCE -D_FILE_OFFSET_BITS=64 -I../src -I../src/external -I$(SYSROOT)/usr/include +LDFLAGS += -L$(SYSROOT)/usr/lib -lpthread -lrt + +.SUFFIXES : .cpp .o +.PHONY : all clean install .showinfo .prepare $(PROJECT) + +.cpp.o: + $(CXX) -c $(CFLAGS) -o $(OBJ)$(notdir $@) $< + +all: .showinfo .prepare $(PROJECT) + +$(PROJECT):$(OBJS) + $(CXX) -o $@ $(addprefix $(OBJ), $(notdir $(OBJS))) $(LDFLAGS) + $(STRIP) $@ + +install: + @$(UPLOAD) $(IP) . $(PROJECT) $(UPDIR) + +clean: + $(RM) $(PROJECT) obj *.log *.o + +.prepare: + @if [ ! -e obj ]; then mkdir obj; fi + +.showinfo: + @echo "-----------------------------------------------------" + @echo " [ BUILD ENVIRONMENT ] " + @echo "-----------------------------------------------------" + @echo "PROJECT : "$(PROJECT)" (v"$(MAJOR)"."$(MINOR)")" + @echo "" + @echo "CXX : "$(CXX) + @echo "LD : "$(LD) + @echo "STRIP : "$(STRIP) + @echo "CFLAGS : "$(CFLAGS) + @echo "LDFLAGS : "$(LDFLAGS) + @echo "-----------------------------------------------------" + @echo diff --git a/script/script.config b/script/script.config new file mode 100755 index 0000000..4ef3bc1 --- /dev/null +++ b/script/script.config @@ -0,0 +1,48 @@ +#!/bin/sh + +IP=192.168.0.1 +MODEL=vusolo2 +OETOP=/openembedded/path/here +UPDIR=/home/root + +CONFIG_PATH=$PWD/config.mk + +function getValue() { + IN=$1 + set -- "$IN" + IFS="="; declare -a Array=($*) + echo "${Array[1]}" +} + +# set default config value from old config. +if [ -e $CONFIG_PATH ]; then + IP="$(getValue `cat $CONFIG_PATH | grep 'IP='`)" + MODEL="$(getValue `cat $CONFIG_PATH | grep 'MODEL='`)" + OETOP="$(getValue `cat $CONFIG_PATH | grep 'OETOP='`)" + UPDIR="$(getValue `cat $CONFIG_PATH | grep 'UPDIR='`)" +fi + +# input new config. +read -p "Please, input model name [$MODEL] : " NEW_MODEL +read -p "Please, input openembeded root path [$OETOP] : " NEW_OETOP +read -p "Please, input target ip [$IP] : " NEW_IP +read -p "Please, input upload path [$UPDIR] : " NEW_UPDIR + +# check new config value. +if [ ! -z $NEW_IP ]; then IP=$NEW_IP; fi +if [ ! -z $NEW_MODEL ]; then MODEL=$NEW_MODEL; fi +if [ ! -z $NEW_OETOP ]; then OETOP=$NEW_OETOP; fi +if [ ! -z $NEW_UPDIR ]; then UPDIR=$NEW_UPDIR; fi + +# backup config file. +if [ -e $CONFIG_PATH ]; then + cp -a $CONFIG_PATH $CONFIG_PATH.bak +fi + +# write config file. +echo "MODEL=$MODEL" > $CONFIG_PATH +echo "OETOP=$OETOP" >> $CONFIG_PATH +echo "" >> $CONFIG_PATH +echo "IP=$IP" >> $CONFIG_PATH +echo "" >> $CONFIG_PATH +echo "UPDIR=$UPDIR" >> $CONFIG_PATH diff --git a/script/script.upload b/script/script.upload new file mode 100755 index 0000000..841a5e1 --- /dev/null +++ b/script/script.upload @@ -0,0 +1,33 @@ +#!/bin/sh + +IP=$1 +LOC=$2 +BIN=$3 +TAR=$4 + +usage() { + echo "" + echo "usage : $0 [IP] [LOCAL] [BIN] [TARGET]" + echo " - IP : target ip" + echo " - BIN : binary name" + echo " - LOCAL : binary path" + echo " - TARGET : target path to install" + exit +} + +if [ -z $IP ]; then echo "[!] IP is not set."; usage; fi +if [ -z $BIN ]; then echo "[!] BIN is not set."; usage; fi +if [ -z $LOC ]; then echo "[!] LOCAL is not set."; usage; fi +if [ -z $TAR ]; then echo "[!] TARGET is not set."; usage; fi + +echo "[*] upload... $LOC/$BIN to $IP:$TAR" +cd $LOC +ftp -n $IP << + +user root a +prompt off +bi +cd $TAR +put $BIN +bye ++ + diff --git a/src/Demuxer.cpp b/src/Demuxer.cpp new file mode 100644 index 0000000..83290c6 --- /dev/null +++ b/src/Demuxer.cpp @@ -0,0 +1,240 @@ +/* + * Demux.h + * + * Created on: 2014. 6. 11. + * Author: oskwon + */ + +#include <poll.h> +#include <errno.h> +#include <fcntl.h> +#include <string.h> +#include <sys/ioctl.h> +#include <arpa/inet.h> +#include <netinet/ip.h> +#include <linux/dvb/dmx.h> +#include <linux/dvb/version.h> + +#include "Util.h" +#include "Logger.h" +#include "Demuxer.h" + +using namespace std; +//------------------------------------------------------------------------------- + +std::string Demuxer::webif_reauest(std::string request) throw(http_trap) +{ + if ((sock = socket(PF_INET, SOCK_STREAM, 0)) < 0) + throw(trap("webif create socket fail.")); + + struct sockaddr_in sock_addr; + sock_addr.sin_family = AF_INET; + sock_addr.sin_port = htons(80); + sock_addr.sin_addr.s_addr = inet_addr("127.0.0.1"); + if (connect(sock, (struct sockaddr*)&sock_addr, sizeof(struct sockaddr_in))) + throw(http_trap("webif connect fail.", 502, "Bad Gateway, webif connect fail")); + + if (write(sock, request.c_str(), request.length()) != request.length()) + throw(http_trap("webif request fail.", 502, "Bad Gateway, webif request error")); + DEBUG("webif request :\n", request.c_str()); + + std::string response = ""; + struct pollfd pollevt[2]; + pollevt[0].fd = sock; + pollevt[0].events = POLLIN; + for (;;) { + char buffer[1024] = {0}; + + pollevt[0].revents = 0; + int poll_state = poll(pollevt, 1, 1000); + if (poll_state == 0) + break; + else if (poll_state < 0) { + ERROR("webif receive poll error : %s (%d)", strerror(errno), errno); + throw(http_trap("webif response fail.", 502, "Bad Gateway, webif response error")); + } + if (pollevt[0].revents & POLLIN) { + if (read(sock, buffer, 1024) <= 0) { + break; + } + response += buffer; + } + } + return response; +} +//------------------------------------------------------------------------------- + +bool Demuxer::already_exist(std::vector<unsigned long> &pidlist, int pid) +{ + for(int i = 0; i < pidlist.size(); ++i) { + if(pidlist[i] == pid) + return true; + } + return false; +} +//------------------------------------------------------------------------------- + +void Demuxer::set_filter(std::vector<unsigned long> &new_pids) throw(trap) +{ + struct dmx_pes_filter_params filter; + ioctl(fd, DMX_SET_BUFFER_SIZE, 1024*1024); + + filter.pid = new_pids[0]; + filter.input = DMX_IN_FRONTEND; +#if DVB_API_VERSION > 3 + filter.output = DMX_OUT_TSDEMUX_TAP; + filter.pes_type = DMX_PES_OTHER; +#else + filter.output = DMX_OUT_TAP; + filter.pes_type = DMX_TAP_TS; +#endif + filter.flags = DMX_IMMEDIATE_START; + + if (::ioctl(fd, DMX_SET_PES_FILTER, &filter) < 0) + throw(trap("demux filter setting failed.")); + DEBUG("demux filter setting ok."); + + for(int i = 1; i < new_pids.size(); ++i) { + uint16_t pid = new_pids[i]; + if (pid == 0) { + continue; + } + if(already_exist(pids, pid)) + continue; + LOG("demux add pid (%x).", pid); + +#if DVB_API_VERSION > 3 + if (::ioctl(fd, DMX_ADD_PID, &pid) < 0) + throw(trap("demux add pid failed.")); +#else + if (::ioctl(fd, DMX_ADD_PID, pid) < 0) + throw(trap("demux add pid failed.")); +#endif + } + + for(int i = 0; i < pids.size(); ++i) { + uint16_t pid = pids[i]; + if(already_exist(new_pids, pid)) + continue; + if(i == 4) break; + + LOG("demux remove pid (%x).", pid); +#if DVB_API_VERSION > 3 + ::ioctl(fd, DMX_REMOVE_PID, &pid); +#else + ::ioctl(fd, DMX_REMOVE_PID, pid); +#endif + } + DEBUG("demux setting PID ok."); + pids = new_pids; +} +//------------------------------------------------------------------------------- + +bool Demuxer::parse_webif_response(std::string& response, std::vector<unsigned long> &new_pids) +{ + int start_idx, end_idx; + if ((start_idx = response.rfind('+')) == string::npos) + return false; + if ((end_idx = response.find('\n', start_idx)) == string::npos) + return false; + + std::string line = response.substr(start_idx, end_idx - start_idx); + if (line.length() < 3 || line.at(0) != '+') + return false; + + /*+0:0:pat,17d4:pmt,17de:video,17e8:audio,17e9:audio,17eb:audio,17ea:audio,17f3:subtitle,17de:pcr,17f2:text*/ + demux_id = atoi(line.substr(1,1).c_str()); + + std::vector<std::string> pidtokens; + if (Util::split(line.c_str() + 3, ',', pidtokens)) { + for (int i = 0; i < pidtokens.size(); ++i) { + std::string pidstr, pidtype; + std::string toekn = pidtokens[i]; + if (!Util::split_key_value(toekn, ":", pidstr, pidtype)) + continue; + + unsigned long pid = strtoul(pidstr.c_str(), 0, 0x10); + if (pid == -1) continue; + + if (!video_pid || !audio_pid || !pmt_pid) { + if (pidtype == "pat") { + pat_pid = pid; + } + else if (pidtype == "pmt") { + pmt_pid = pid; + } + else if (pidtype == "video") { + video_pid = pid; + } + else if (pidtype == "audio") { + audio_pid = pid; + } + } + if (!already_exist(new_pids, pid)) { + new_pids.push_back(pid); + } + DEBUG("find pid : %s - %04X", toekn.c_str(), pid); + } + } + return true; +} +//------------------------------------------------------------------------------- + +Demuxer::Demuxer(HttpHeader *header) throw(http_trap) +{ + demux_id = pat_pid = fd = sock = -1; + pmt_pid = audio_pid = video_pid = 0; + + std::string webif_request = string("GET /web/stream?StreamService=") + header->path.substr(1) + " HTTP/1.0\r\n"; + if (header->params.find("Authorization") != header->params.end()) { + if (header->params["Authorization"].length() < 5) { + throw(http_trap("no authorization data.", 401, "Unauthorized")); + } + webif_request += "Authorization: " + header->params["Authorization"] + "\r\n"; + if (header->params["Cookie"].length() > 0) { + webif_request += "Cookie: " + header->params["Cookie"] + "\r\n"; + } + } + webif_request += "\r\n"; + + std::string webif_response = webif_reauest(webif_request); + DEBUG("webif response :\n%s", webif_response.c_str()); + + if (webif_response.find("WWW-Authenticate") != std::string::npos) { + header->authorization = webif_response; + throw(http_trap("webif whthentication fail.", 401, "Unauthorized")); + } + + std::vector<unsigned long> new_pids; + if (!parse_webif_response(webif_response, new_pids)) + throw(http_trap("webif response parsing fail.", 503, "Service Unavailable")); + + std::string demuxpath = "/dev/dvb/adapter0/demux" + Util::ultostr(demux_id); + if ((fd = open(demuxpath.c_str(), O_RDWR | O_NONBLOCK)) < 0) { + throw(http_trap(std::string("demux open fail : ") + demuxpath, 503, "Service Unavailable")); + } + INFO("demux open success : %s", demuxpath.c_str()); + + try { + set_filter(new_pids); + } + catch (const trap &e) { + throw(http_trap(e.what(), 503, "Service Unavailable")); + } +} +//------------------------------------------------------------------------------- + +Demuxer::~Demuxer() throw() +{ + if (fd != -1) close(fd); + if (sock != -1) close(sock); + + fd = sock = -1; +} +//------------------------------------------------------------------------------- + +int Demuxer::get_fd() const throw() +{ + return fd; +} +//------------------------------------------------------------------------------- diff --git a/src/Demuxer.h b/src/Demuxer.h new file mode 100644 index 0000000..5c90dda --- /dev/null +++ b/src/Demuxer.h @@ -0,0 +1,49 @@ +/* + * Demux.h + * + * Created on: 2014. 6. 11. + * Author: oskwon + */ + +#ifndef DEMUX_H_ +#define DEMUX_H_ + +#include <vector> +#include <string> + +#include "trap.h" + +#include "Util.h" +#include "Http.h" +#include "Source.h" +//---------------------------------------------------------------------- + +class Demuxer : public Source +{ +public: + int pmt_pid; + int video_pid; + int audio_pid; + +private: + int fd; + int sock; + + int demux_id; + int pat_pid; + std::vector<unsigned long> pids; + +protected: + std::string webif_reauest(std::string request) throw(http_trap); + bool already_exist(std::vector<unsigned long> &pidlist, int pid); + void set_filter(std::vector<unsigned long> &new_pids) throw(trap); + bool parse_webif_response(std::string& response, std::vector<unsigned long> &new_pids); + +public: + Demuxer(HttpHeader *header) throw(http_trap); + virtual ~Demuxer() throw(); + int get_fd() const throw(); +}; +//---------------------------------------------------------------------- + +#endif /* DEMUX_H_ */ diff --git a/src/Encoder.cpp b/src/Encoder.cpp new file mode 100644 index 0000000..c9d09f2 --- /dev/null +++ b/src/Encoder.cpp @@ -0,0 +1,250 @@ +/* + * Encoder.cpp + * + * Created on: 2014. 6. 12. + * Author: oskwon + */ + +#include <stdio.h> +#include <fcntl.h> +#include <errno.h> +#include <dirent.h> +#include <string.h> +#include <unistd.h> +#include <string.h> +#include <sys/ioctl.h> + +#include "Util.h" +#include "Logger.h" +#include "Encoder.h" + +using namespace std; +//---------------------------------------------------------------------- + +Encoder::Encoder() throw(trap) +{ + encoder_id = fd = -1; + max_encodr_count = state = ENCODER_STAT_INIT; + + DIR* d = opendir("/dev"); + if (d != 0) { + struct dirent* de; + while ((de = readdir(d)) != 0) { + if (strncmp("bcm_enc", de->d_name, 7) == 0) { + max_encodr_count++; + } + } + closedir(d); + } + + mSemId = 0; + mShmFd = 0; + mShmData = 0; + + mSemName = "/tsp_session_sem"; + mShmName = "/tsp_session_shm"; + mShmSize = sizeof(Session) * max_encodr_count; + + if (Open() == false) + throw(trap("session ctrl init fail.")); + DEBUG("shm-info : fd [%d], name [%s], size [%d], data [%p]", mShmFd, mShmName.c_str(), mShmSize, mShmData); + DEBUG("sem-info : id [%p], name [%s]", mSemId, mSemName.c_str()); + + std::vector<int> pidlist = Util::find_process_by_name("transtreamproxy", 0); + + session_dump("before init."); + + Wait(); + for (int i = 0; i < max_encodr_count; i++) { + if (mShmData[i].pid != 0) { + int pid = mShmData[i].pid; + if(session_terminated(pidlist, pid)) { + session_erase(pid); + } + } + } + Post(); + + int mypid = getpid(); + std::string ipaddr = Util::host_addr(); + if (session_already_exist(ipaddr) > 0) { + encoder_id = session_update(ipaddr, mypid); + } + else { + encoder_id = session_register(ipaddr, mypid); + } + DEBUG("encoder_device_id : %d", encoder_id); +} +//---------------------------------------------------------------------- + +Encoder::~Encoder() +{ + Post(); + if (fd != -1) { + if (state == ENCODER_STAT_STARTED) { + DEBUG("stop transcoding.."); + ioctl(IOCTL_STOP_TRANSCODING, 0); + } + close(fd); + fd = -1; + } +} +//---------------------------------------------------------------------- + +bool Encoder::encoder_open() +{ + std::string path = "/dev/bcm_enc" + Util::ultostr(encoder_id); + fd = ::open(path.c_str(), O_RDWR, 0); + if (fd >= 0) { + state = ENCODER_STAT_OPENED; + } + DEBUG("open encoder : %s, fd : %d", path.c_str(), fd); + return (state == ENCODER_STAT_OPENED) ? true : false; +} +//---------------------------------------------------------------------- + +bool Encoder::retry_open(int retry_count, int sleep_time) +{ + for (int i = 0; i < retry_count; ++i) { + if (encoder_open()) { + DEBUG("encoder-%d open success..", encoder_id); + return true; + } + WARNING("encoder%d open fail, retry count : %d/%d", encoder_id, i, retry_count); + sleep(sleep_time); + } + ERROR("encoder open fail : %s (%d)", strerror(errno), errno); + return false; +} +//---------------------------------------------------------------------- + +bool Encoder::ioctl(int cmd, int value) +{ + int result = ::ioctl(fd, cmd, value); + DEBUG("ioctl command : %d -> %x, result : %d", cmd, value, result); + + if (result == 0) { + switch (cmd) { + case IOCTL_START_TRANSCODING: state = ENCODER_STAT_STARTED; break; + case IOCTL_STOP_TRANSCODING: state = ENCODER_STAT_STOPED; break; + } + } + + return (result == 0) ? true : false; +} +//---------------------------------------------------------------------- + +int Encoder::get_fd() +{ + return fd; +} +//---------------------------------------------------------------------- + +void Encoder::session_dump(const char* aMessage) +{ + if (Logger::instance()->get_level() >= Logger::INFO) { + DUMMY(" >> %s", aMessage); + DUMMY("-------- [ DUMP HOST INFO ] ---------"); + for (int i = 0; i < max_encodr_count; i++) { + DUMMY("%d : ip [%s], pid [%d]", i, mShmData[i].ip, mShmData[i].pid); + } + DUMMY("-------------------------------------"); + } +} +//---------------------------------------------------------------------- + +bool Encoder::session_terminated(std::vector<int>& aList, int aPid) +{ + for (int i = 0; i < aList.size(); ++i) { + if (aList[i] == aPid) { + return false; + } + } + return true; +} +//---------------------------------------------------------------------- + +int Encoder::session_register(std::string aIpAddr, int aPid) +{ + int i = 0; + bool result = false; + + Wait(); + for (; i < max_encodr_count; i++) { + if (mShmData[i].pid == 0) { + result = true; + mShmData[i].pid = aPid; + strcpy(mShmData[i].ip, aIpAddr.c_str()); + break; + } + } + Post(); + session_dump("after register."); + + return result ? i : -1; +} +//---------------------------------------------------------------------- + +void Encoder::session_unregister(std::string aIpAddr) +{ + Wait(); + for (int i = 0; i < max_encodr_count; i++) { + if (strcmp(mShmData[i].ip, aIpAddr.c_str()) == 0) { + memset(mShmData[i].ip, 0, 16); + mShmData[i].pid = 0; + break; + } + } + Post(); + session_dump("after unregister."); +} +//---------------------------------------------------------------------- + +void Encoder::session_erase(int aPid) +{ + for (int i = 0; i < max_encodr_count; i++) { + if (mShmData[i].pid == aPid) { + DEBUG("erase.. %s : %d", mShmData[i].ip, mShmData[i].pid); + memset(mShmData[i].ip, 0, 16); + mShmData[i].pid = 0; + break; + } + } +} +//---------------------------------------------------------------------- + +int Encoder::session_update(std::string aIpAddr, int aPid) +{ + int i = 0; + bool result = false; + + session_dump("before update."); + Wait(); + for (; i < max_encodr_count; i++) { + if (strcmp(mShmData[i].ip, aIpAddr.c_str()) == 0) { + result = true; + Util::kill_process(mShmData[i].pid); + memset(mShmData[i].ip, 0, 16); + mShmData[i].pid = 0; + break; + } + } + Post(); + session_register(aIpAddr, aPid); + return result ? i : -1; +} +//---------------------------------------------------------------------- + +int Encoder::session_already_exist(std::string aIpAddr) +{ + int existCount = 0; + Wait(); + for (int i = 0; i < max_encodr_count; i++) { + if (strcmp(mShmData[i].ip, aIpAddr.c_str()) == 0) { + existCount++; + } + } + Post(); + return existCount; +} +//---------------------------------------------------------------------- diff --git a/src/Encoder.h b/src/Encoder.h new file mode 100644 index 0000000..0760dbb --- /dev/null +++ b/src/Encoder.h @@ -0,0 +1,73 @@ +/* + * Encoder.h + * + * Created on: 2014. 6. 10. + * Author: oskwon + */ + +#ifndef ENCODER_H_ +#define ENCODER_H_ + +#include <string> + +#include "trap.h" + +#include "SharedMemory.h" +//---------------------------------------------------------------------- + +typedef struct _session_t { + int pid; + char ip[16]; +} Session; +//---------------------------------------------------------------------- + +class Encoder : public SharedMemory<Session> +{ +private: + int fd; + +public: + enum { + IOCTL_SET_VPID = 1, + IOCTL_SET_APID = 2, + IOCTL_SET_PMTPID = 3, + IOCTL_START_TRANSCODING = 100, + IOCTL_STOP_TRANSCODING = 200 + }; + + enum { + ENCODER_STAT_INIT = 0, + ENCODER_STAT_OPENED, + ENCODER_STAT_STARTED, + ENCODER_STAT_STOPED, + }; + + int state; + int encoder_id; + int max_encodr_count; + +protected: + void session_dump(const char* aMessage); + + void session_erase(int aPid); + int session_register(std::string aIpAddr, int aPid); + void session_unregister(std::string aIpAddr); + + int session_update(std::string aIpAddr, int aPid); + bool session_terminated(std::vector<int>& aList, int aPid); + int session_already_exist(std::string aIpAddr); + +protected: + bool encoder_open(); + +public: + Encoder() throw(trap); + virtual ~Encoder(); + + int get_fd(); + bool ioctl(int cmd, int value); + bool retry_open(int retry_count, int sleep_time); +}; +//---------------------------------------------------------------------- + +#endif /* ENCODER_H_ */ diff --git a/src/Http.cpp b/src/Http.cpp new file mode 100644 index 0000000..1f410c0 --- /dev/null +++ b/src/Http.cpp @@ -0,0 +1,188 @@ +/* + * Http.cpp + * + * Created on: 2014. 6. 18. + * Author: oskwon + */ + +#include <string.h> + +#include <sstream> + +#include "Util.h" +#include "Logger.h" + +#include "Http.h" +#include "UriDecoder.h" + +using namespace std; +//---------------------------------------------------------------------- + +bool HttpHeader::parse_request(std::string header) +{ + std::string line, key, value; + std::istringstream request_stream; + request_stream.str(header); + + request_stream >> method; + request_stream >> path; + request_stream >> version; + std::getline(request_stream, line); + + while(std::getline(request_stream, line)) { + if ((line = Util::trim(line)) != "") { + Util::split_key_value(line, ":", key, value); + + key = Util::trim(key); + value = Util::trim(value); + + params[key] = value; + DEBUG("add param : [%s] - [%s]", key.c_str(), value.c_str()); + } + } + + int idx = path.find("?"); + // page + if (idx != std::string::npos) { + page = path.substr(0,idx); + std::string page_param = path.substr(idx + 1); + + DEBUG("request url : [%s] - [%s]", page.c_str(), page_param.c_str()); + std::istringstream request_params_stream; + request_params_stream.str(page_param); + while(std::getline(request_params_stream, line, '&')) { + if ((line = Util::trim(line)) != "") { + Util::split_key_value(line, "=", key, value); + + key = Util::trim(key); + value = Util::trim(value); + + page_params[key] = value; + DEBUG("add page param : [%s] - [%s]", key.c_str(), value.c_str()); + } + } + + if (page == "/file_stream") { + type = HttpHeader::TRANSCODING_FILE; + } + else if (page == "/m3u") { + type = HttpHeader::M3U; + } + } + // live + else { + type = HttpHeader::TRANSCODING_LIVE; + } + return true; +} +//---------------------------------------------------------------------- + +static const char *http_ok = "HTTP/1.1 200 OK\r\n"; +static const char *http_partial = "HTTP/1.1 206 Partial Content\r\n"; +static const char *http_connection = "Connection: Close\r\n"; +static const char *http_server = "Server: transtreamproxy\r\n"; +static const char *http_done = "\r\n"; +std::string HttpHeader::build_response(Mpeg *source) +{ + std::ostringstream oss; + + switch(type) { + case HttpHeader::TRANSCODING_FILE: { + std::string range = params["Range"]; + off_t seek_offset = 0, content_length = 0; + + if((range.length() > 7) && (range.substr(0, 6) == "bytes=")) { + range = range.substr(6); + if(range.find('-') == (range.length() - 1)) { + seek_offset = Util::strtollu(range); + } + } + + content_length = source->stream_length - seek_offset; + if (seek_offset > 0) { + content_length += 1; + oss << http_partial; + } + else { + oss << http_ok; + } + oss << http_connection; + oss << "Content-Type: video/mpeg\r\n"; + oss << http_server; + oss << "Accept-Ranges: bytes\r\n"; + oss << "Content-Length: " << Util::ultostr(content_length) << "\r\n"; + oss << "Content-Range: bytes " << + Util::ultostr(seek_offset) << "-" << + Util::ultostr(source->stream_length - 1) << "/" << + Util::ultostr(source->stream_length) << "\r\n"; + oss << http_done; + } + break; + case HttpHeader::TRANSCODING_LIVE: { + oss << http_ok; + oss << http_connection; + oss << "Content-Type: video/mpeg\r\n"; + oss << http_server; + oss << http_done; + } + break; + case HttpHeader::M3U: { + std::ostringstream m3u_oss; + m3u_oss << "#EXTM3U\n"; + m3u_oss << "#EXTVLCOPT--http-reconnect=true\n"; + m3u_oss << "http://" << params["Host"] << "/file_stream?file=" << page_params["file"]; + if (page_params["position"] != "") { + m3u_oss << "&position=" << page_params["position"]; + } + m3u_oss << "\n"; + m3u_oss << http_done; + + std::string m3u_content = m3u_oss.str(); + + oss << http_partial; + oss << "Content-Type: audio/x-mpegurl\r\n"; + oss << "Accept-Ranges: bytes\r\n"; + oss << http_connection; + oss << http_server; + oss << "Content-Length: " << Util::ultostr(m3u_content.length()) << "\r\n"; + oss << "Content-Range: bytes 0-" << + Util::ultostr(m3u_content.length() - 1) << "/" << + Util::ultostr(m3u_content.length()) << "\r\n"; + oss << http_done; + oss << m3u_content; + } + break; + default: return ""; + } + return oss.str(); +} +//---------------------------------------------------------------------- + +std::string HttpHeader::read_request() +{ + std::string request = ""; + while (true) { + char buffer[128] = {0}; + fgets(buffer, 127, stdin); + + request += buffer; + if(request.find("\r\n\r\n") != string::npos) + break; + } + return request; +} +//---------------------------------------------------------------------- + +std::string HttpUtil::http_error(int errcode, std::string errmsg) +{ + std::ostringstream oss; + + oss << "HTTP/1.1 " << Util::ultostr(errcode) << " " << errmsg << "\r\n"; + oss << "Content-Type: text/html\r\n"; + oss << "Connection: close\r\n"; + oss << "Accept-Ranges: bytes\r\n"; + oss << "\r\n"; + + return oss.str(); +} +//---------------------------------------------------------------------- diff --git a/src/Http.h b/src/Http.h new file mode 100644 index 0000000..69f7a09 --- /dev/null +++ b/src/Http.h @@ -0,0 +1,53 @@ +/* + * Http.h + * + * Created on: 2014. 6. 18. + * Author: oskwon + */ + +#ifndef HTTP_H_ +#define HTTP_H_ + +#include <map> +#include <string> + +#include "Mpeg.h" +//---------------------------------------------------------------------- + +class HttpHeader +{ +public: + enum { + UNKNOWN = 0, + TRANSCODING_LIVE, + TRANSCODING_FILE, + M3U + }; + + int type; + std::string method; + std::string path; + std::string version; + std::map<std::string, std::string> params; + + std::string page; + std::map<std::string, std::string> page_params; + + std::string authorization; +public: + HttpHeader() : type(UNKNOWN) {} + virtual ~HttpHeader() {} + + bool parse_request(std::string header); + std::string build_response(Mpeg *source); + + static std::string read_request(); +}; +//---------------------------------------------------------------------- + +namespace HttpUtil { + std::string http_error(int errcode, std::string errmsg); +}; +//---------------------------------------------------------------------- + +#endif /* HTTP_H_ */ diff --git a/src/Logger.cpp b/src/Logger.cpp new file mode 100644 index 0000000..dc05d3a --- /dev/null +++ b/src/Logger.cpp @@ -0,0 +1,208 @@ +/* + * logger.cpp + * + * Created on: 2014. 2. 7. + * Author: kos + */ + +#include <time.h> +#include <unistd.h> +#include <string.h> +#include <sys/types.h> + +#include "Logger.h" +//---------------------------------------------------------------------- + +#define USE_COLOR_LOG 1 + +static char log_data_buffer[MAX_PRINT_LEN] = {0}; +#ifdef USE_COLOR_LOG +static const char* LOG_LV_STR[] = { + "[ NONE]", + "\e[1;31m[ ERROR]\e[00m", + "\e[1;33m[WARNING]\e[00m", + "\e[1;32m[ INFO]\e[00m", + "\e[1;36m[ DEBUG]\e[00m", + "[ LOG]" +}; +#else +static const char* LOG_LV_STR[] = { + "[ NONE]", + "[ ERROR]", + "[WARNING]", + "[ INFO]", + "[ DEBUG]", + "[ LOG]" +}; +#endif +//---------------------------------------------------------------------- + +char* timestamp(const char* aFormat) +{ + time_t t; + time(&t); + + struct tm* m = localtime(&t); + static char sz_timestamp[32] = {0}; + strftime(sz_timestamp, sizeof(sz_timestamp), aFormat, m); + return sz_timestamp; +} +//---------------------------------------------------------------------- + +Logger::Logger() + : mLogLevel(0), mLogHandle(0) +{ + mPid = getpid(); +} +//---------------------------------------------------------------------- + +Logger::~Logger() +{ + if (mLogHandle) { + fclose(mLogHandle); + mLogHandle = 0; + } +} +//---------------------------------------------------------------------- + +void Logger::set_pid() +{ + mPid = getpid(); +} +//---------------------------------------------------------------------- + +Logger* Logger::instance() +{ + if (mInstHandle == 0) { + mInstHandle = new Logger(); + atexit(logger_release); + } + return mInstHandle; +} +//---------------------------------------------------------------------- + +bool Logger::init(const char* aName, int aLogLevel, bool aWithTimestamp) +{ + if (access("/tmp/.debug_on", F_OK) == 0) { + FILE *fp = fopen("/tmp/.debug_on", "r"); + + int lv = 0; + fscanf(fp, "%d", &lv); + if (Logger::NONE < lv && lv <= Logger::LOG) { + mLogLevel = lv; + } + else { + mLogLevel = aLogLevel; + } + fclose(fp); + } + else { + mLogLevel = aLogLevel; + } + + if (aName == NULL) { + mLogHandle = stdout; + INFO("logger initialized."); + return true; + } + char path[256] = {0}; + if (aWithTimestamp) + sprintf(path, "%s_%s.log", aName, timestamp("%Y%m%d")); + else sprintf(path, "%s.log", aName); + if (!(mLogHandle = fopen(path, "a+"))) { + mLogHandle = 0; +// printf("fail to open logger [%s].", path); + return false; + } + + if (mLogLevel >= Logger::INFO) { +#if defined(_MAJOR) && defined(_MINOR) + DUMMY("Logger initialized. (Ver %d.%d)", _MAJOR, _MINOR); +#else + DUMMY("Logger initialized."); +#endif + } + return true; +} +//---------------------------------------------------------------------- + +void Logger::hexlog(const char *header, const char *buffer, const int length, const char *aFormat, ...) +{ + int offset = 0, i = 0, ll = 0; + + FILE* output = mLogHandle; + + memset(log_data_buffer, 0, MAX_PRINT_LEN); + + va_list args; + va_start(args, aFormat); + ll = vsnprintf(log_data_buffer, MAX_PRINT_LEN-1, aFormat, args); + va_end(args); + + if (ll > MAX_PRINT_LEN - 1) { + ll = MAX_PRINT_LEN - 1; + } + fprintf(output, "%s\n", log_data_buffer); + + fprintf(output, "HEX DUMP : [%s]-[%d]\n", header, length); + fprintf(output, "-----------------------------------------------------------------------------\n"); + while (offset < length) { + char *tmp = (char*) (buffer + offset); + int tmp_len = (offset + 16 < length) ? 16 : (length - offset); + + fprintf(output, "%08X: ", offset); + for (i = 0; i < tmp_len; i++) { + if (i == 8) fprintf(output, " "); + fprintf(output, "%02X ", (unsigned char) tmp[i]); + } + + for (i = 0; i <= (16 - tmp_len) * 3; i++) + fprintf(output, " "); + if (tmp_len < 9) fprintf(output, " "); + + for (i = 0; i < tmp_len; i++) + fprintf(output, "%c", (tmp[i] >= 0x20 && tmp[i] <= 0x7E) ? tmp[i] : '.'); + offset += 16; fprintf(output, "\n"); + } + if (offset == 0) fprintf(output, "%08X: ", offset); + fprintf(output, "-----------------------------------------------------------------------------\n"); + fflush(output); +} +//---------------------------------------------------------------------- + +void Logger::log(const char* aFormat, ...) +{ + memset(log_data_buffer, 0, MAX_PRINT_LEN); + + va_list args; + va_start(args, aFormat); + vsnprintf(log_data_buffer, MAX_PRINT_LEN-1, aFormat, args); + va_end(args); + + fprintf(mLogHandle, "%s\n", log_data_buffer); + fflush(mLogHandle); +} +//---------------------------------------------------------------------- + +void Logger::log(int aLogLevel, const char* aFormat, ...) +{ +#ifndef _DISABLE_LOGGER + if (aLogLevel > mLogLevel || mLogHandle == 0) { + //printf("mLogHandle : %p, mLogLevel : %d, aLogLevel : %d\n", mLogHandle, mLogLevel, aLogLevel); + return; + } + + memset(log_data_buffer, 0, MAX_PRINT_LEN); + va_list args; + va_start(args, aFormat); + vsnprintf(log_data_buffer, MAX_PRINT_LEN-1, aFormat, args); + va_end(args); + + fprintf(mLogHandle, "[%s]%s[%d] %s\n", timestamp(DEFAULT_TIMESTAMP_FORMAT), LOG_LV_STR[aLogLevel], mPid, log_data_buffer); + fflush(mLogHandle); +#endif +} +//---------------------------------------------------------------------- + +Logger* Logger::mInstHandle = 0; +//---------------------------------------------------------------------- diff --git a/src/Logger.h b/src/Logger.h new file mode 100644 index 0000000..e9bcb23 --- /dev/null +++ b/src/Logger.h @@ -0,0 +1,81 @@ +/* + * Logger.h + * + * Created on: 2014. 2. 7. + * Author: oskwon + */ + +#ifndef LOGGER_H_ +#define LOGGER_H_ + +#include <stdio.h> +#include <stdlib.h> +#include <stdarg.h> +#include <stdint.h> +//---------------------------------------------------------------------- + +#define MAX_PRINT_LEN 2048 +#define DEFAULT_TIMESTAMP_FORMAT "%Y%m%d-%H%M%S" + +#ifdef _DISABLE_LOGGER +# define ERROR(fmt,...) {} +# define WARNING(fmt,...) {} +# define INFO(fmt,...) {} +# define DEBUG(fmt,...) {} +# define LOG(fmt,...) {} +# define LINESTAMP(fmt,...) {} +# define HEXLOG(fmt,...) {} +# define DUMMY(fmt,...) {} +#else +# define ERROR(fmt,...) { Logger::instance()->log(Logger::ERROR, fmt" (%s, %s:%d)", ##__VA_ARGS__, __FILE__, __FUNCTION__, __LINE__); } +# define WARNING(fmt,...){ Logger::instance()->log(Logger::WARNING, fmt" (%s, %s:%d)", ##__VA_ARGS__, __FILE__, __FUNCTION__, __LINE__); } +# define INFO(fmt,...) { Logger::instance()->log(Logger::INFO, fmt" (%s, %s:%d)", ##__VA_ARGS__, __FILE__, __FUNCTION__, __LINE__); } +# define DEBUG(fmt,...) { Logger::instance()->log(Logger::DEBUG, fmt" (%s, %s:%d)", ##__VA_ARGS__, __FILE__, __FUNCTION__, __LINE__); } +# define LOG(fmt,...) { Logger::instance()->log(Logger::LOG, fmt" (%s, %s:%d)", ##__VA_ARGS__, __FILE__, __FUNCTION__, __LINE__); } +# define LINESTAMP(fmt,...) { Logger::instance()->log(fmt" (%s, %s:%d)", ##__VA_ARGS__, __FILE__, __FUNCTION__, __LINE__); } +# define HEXLOG(header, buffer, length) { Logger::instance()->hexlog(header, buffer, length, " (%s, %s:%d)", __FILE__, __FUNCTION__, __LINE__); } +# define DUMMY(fmt,...) { Logger::instance()->log(fmt, ##__VA_ARGS__); } +#endif /* USE_DEBUG */ +//---------------------------------------------------------------------- + +class Logger +{ +private: + int mLogLevel, mPid; + FILE* mLogHandle; + + static Logger* mInstHandle; + +private: + Logger(); + virtual ~Logger(); + + static void logger_release() + { + if (mInstHandle) { + if (Logger::instance()->get_level() >= Logger::INFO) { + DUMMY("Logger Released."); + } + delete mInstHandle; + } + }; + +public: + enum { NONE = 0, ERROR, WARNING, INFO, DEBUG, LOG }; + +#ifndef _DISABLE_LOGGER + bool init(const char* aFileName = 0, int aLogLevel = Logger::ERROR, bool aWithTimestamp = false); + + void log(const char* aFormat, ...); + void log(int aLogLevel, const char* aFormat, ...); + void hexlog(const char *header, const char *buffer, const int length, const char *aFormat, ...); + + static Logger* instance(); +#endif + + void set_pid(); + int get_level() { return mLogLevel; } +}; +//---------------------------------------------------------------------- + +#endif /* ULOGGER_H_ */ diff --git a/src/Mpeg.cpp b/src/Mpeg.cpp new file mode 100644 index 0000000..772cb6f --- /dev/null +++ b/src/Mpeg.cpp @@ -0,0 +1,510 @@ +/* + * Mpeg.cpp + * + * Created on: 2014. 6. 18. + * Author: oskwon + */ + +#include "Mpeg.h" +#include "Http.h" +#include "Util.h" +#include "Logger.h" +//---------------------------------------------------------------------- + +void Mpeg::seek(HttpHeader &header) +{ + try { + off_t byte_offset = 0; + std::string position = header.page_params["position"]; + std::string relative = header.page_params["relative"]; + if (position.empty() && relative.empty()) { + std::string range = header.params["Range"]; + DEBUG("Range : %s", range.c_str()); + if((range.length() > 7) && (range.substr(0, 6) == "bytes=")) { + range = range.substr(6); + if(range.find('-') == (range.length() - 1)) { + byte_offset = Util::strtollu(range); + DEBUG("Range To : %s -> %llu", range.c_str(), byte_offset); + } + } + } + else { + off_t position_offset; + if (!relative.empty()) { + int dur = calc_length(); + DEBUG("duration : %d", dur); + position_offset = (dur * Util::strtollu(relative)) / 100; + } + else { + position_offset = Util::strtollu(position); + } + position_offset *= 90000; + get_offset(byte_offset, position_offset, -1); + } + + DEBUG("seek to byte_offset %llu", byte_offset); + if (byte_offset > 0) { + seek_absolute(byte_offset); + DEBUG("seek ok"); + } + } + catch (...) { + WARNING("seek fail."); + } +} +//---------------------------------------------------------------------- + +off_t Mpeg::seek_internal(off_t offset, int whence) +{ + if (m_nrfiles < 2) + return ::lseek(get_fd(), offset, whence); + + switch (whence) { + case SEEK_SET: m_current_offset = offset; break; + case SEEK_CUR: m_current_offset += offset; break; + case SEEK_END: m_current_offset = m_totallength + offset; break; + } + + if (m_current_offset < 0) + m_current_offset = 0; + return m_current_offset; +} +//---------------------------------------------------------------------- + +ssize_t Mpeg::read_internal(off_t offset, void *buf, size_t count) +{ + if (offset != m_current_offset) { + m_current_offset = seek_internal(offset, SEEK_SET); + if (m_current_offset < 0) { + return m_current_offset; + } + } + switch_offset(m_current_offset); + + if (m_nrfiles >= 2) { + if ((m_current_offset + count) > m_totallength) + count = m_totallength - m_current_offset; + if (count < 0) { + return 0; + } + } + + int ret = ::read(get_fd(), buf, count); + if (ret > 0) + m_current_offset = m_last_offset += ret; + return ret; +} +//---------------------------------------------------------------------- + +int Mpeg::switch_offset(off_t off) +{ + if (m_splitsize) { + int filenr = off / m_splitsize; + if (filenr >= m_nrfiles) + filenr = m_nrfiles - 1; +#if 0 + if (filenr != m_current_file) { + close(); + m_fd = open(filenr); + m_last_offset = m_base_offset = m_splitsize * filenr; + m_current_file = filenr; + } +#endif + } + else m_base_offset = 0; + + return (off != m_last_offset) ? (m_last_offset = ::lseek(get_fd(), off - m_base_offset, SEEK_SET) + m_base_offset) : m_last_offset; +} +//---------------------------------------------------------------------- + +void Mpeg::calc_begin() +{ + if (!(m_begin_valid || m_futile)) { + m_offset_begin = 0; + if (!get_pts(m_offset_begin, m_pts_begin, 0)) + m_begin_valid = 1; + else m_futile = 1; + } + if (m_begin_valid) { + m_end_valid = 0; + } +} +//---------------------------------------------------------------------- + +void Mpeg::calc_end() +{ + off_t end = seek_internal(0, SEEK_END); + + if (llabs(end - m_last_filelength) > 1*1024*1024) { + m_last_filelength = end; + m_end_valid = 0; + + m_futile = 0; + } + + int maxiter = 10; + + m_offset_end = m_last_filelength; + + while (!(m_end_valid || m_futile)) { + if (!--maxiter) { + m_futile = 1; + return; + } + + m_offset_end -= 256*1024; + if (m_offset_end < 0) + m_offset_end = 0; + + off_t off = m_offset_end; + + if (!get_pts(m_offset_end, m_pts_end, 0)) + m_end_valid = 1; + else m_offset_end = off; + + if (!m_offset_end) { + m_futile = 1; + break; + } + } +} +//---------------------------------------------------------------------- + +int Mpeg::fix_pts(const off_t &offset, pts_t &now) +{ + /* for the simple case, we assume one epoch, with up to one wrap around in the middle. */ + calc_begin(); + if (!m_begin_valid) { + return -1; + } + + pts_t pos = m_pts_begin; + if ((now < pos) && ((pos - now) < 90000 * 10)) { + pos = 0; + return 0; + } + + if (now < pos) /* wrap around */ + now = now + 0x200000000LL - pos; + else now -= pos; + + return 0; +} +//---------------------------------------------------------------------- + +void Mpeg::take_samples() +{ + m_samples_taken = 1; + m_samples.clear(); + int retries=2; + pts_t dummy = calc_length(); + + if (dummy <= 0) + return; + + int nr_samples = 30; + off_t bytes_per_sample = (m_offset_end - m_offset_begin) / (long long)nr_samples; + if (bytes_per_sample < 40*1024*1024) + bytes_per_sample = 40*1024*1024; + + bytes_per_sample -= bytes_per_sample % 188; + + DEBUG("samples step %lld, pts begin %llx, pts end %llx, offs begin %lld, offs end %lld:", + bytes_per_sample, m_pts_begin, m_pts_end, m_offset_begin, m_offset_end); + + for (off_t offset = m_offset_begin; offset < m_offset_end;) { + pts_t p; + if (take_sample(offset, p) && retries--) + continue; + retries = 2; + offset += bytes_per_sample; + } + m_samples[0] = m_offset_begin; + m_samples[m_pts_end - m_pts_begin] = m_offset_end; +} +//---------------------------------------------------------------------- + +/* returns 0 when a sample was taken. */ +int Mpeg::take_sample(off_t off, pts_t &p) +{ + off_t offset_org = off; + + if (!get_pts(off, p, 1)) { + /* as we are happily mixing PTS and PCR values (no comment, please), we might + end up with some "negative" segments. + so check if this new sample is between the previous and the next field*/ + std::map<pts_t, off_t>::const_iterator l = m_samples.lower_bound(p); + std::map<pts_t, off_t>::const_iterator u = l; + + if (l != m_samples.begin()) { + --l; + if (u != m_samples.end()) { + if ((l->second > off) || (u->second < off)) { + DEBUG("ignoring sample %lld %lld %lld (%llx %llx %llx)", l->second, off, u->second, l->first, p, u->first); + return 1; + } + } + } + + DEBUG("adding sample %lld: pts 0x%llx -> pos %lld (diff %lld bytes)", offset_org, p, off, off-offset_org); + m_samples[p] = off; + return 0; + } + return -1; +} +//---------------------------------------------------------------------- + +int Mpeg::calc_bitrate() +{ + calc_length(); + if (!m_begin_valid || !m_end_valid) { + return -1; + } + + pts_t len_in_pts = m_pts_end - m_pts_begin; + + /* wrap around? */ + if (len_in_pts < 0) { + len_in_pts += 0x200000000LL; + } + off_t len_in_bytes = m_offset_end - m_offset_begin; + if (!len_in_pts) return -1; + + unsigned long long bitrate = len_in_bytes * 90000 * 8 / len_in_pts; + if ((bitrate < 10000) || (bitrate > 100000000)) { + return -1; + } + return bitrate; +} +//---------------------------------------------------------------------- + +int Mpeg::get_offset(off_t &offset, pts_t &pts, int marg) +{ + calc_length(); + if (!m_begin_valid || !m_end_valid) { + return -1; + } + + if (!m_samples_taken) { + take_samples(); + } + + if (!m_samples.empty()) { + int maxtries = 5; + pts_t p = -1; + + while (maxtries--) { + /* search entry before and after */ + std::map<pts_t, off_t>::const_iterator l = m_samples.lower_bound(pts); + std::map<pts_t, off_t>::const_iterator u = l; + + if (l != m_samples.begin()) + --l; + + /* we could have seeked beyond the end */ + if (u == m_samples.end()) { + /* use last segment for interpolation. */ + if (l != m_samples.begin()) { + --u; + --l; + } + } + + /* if we don't have enough points */ + if (u == m_samples.end()) + break; + + pts_t pts_diff = u->first - l->first; + off_t offset_diff = u->second - l->second; + + if (offset_diff < 0) { + DEBUG("something went wrong when taking samples."); + m_samples.clear(); + take_samples(); + continue; + } + DEBUG("using: %llx:%llx -> %llx:%llx", l->first, u->first, l->second, u->second); + + int bitrate = (pts_diff) ? (offset_diff * 90000 * 8 / pts_diff) : 0; + offset = l->second; + offset += ((pts - l->first) * (pts_t)bitrate) / 8ULL / 90000ULL; + offset -= offset % 188; + p = pts; + + if (!take_sample(offset, p)) { + int diff = (p - pts) / 90; + DEBUG("calculated diff %d ms", diff); + + if (::abs(diff) > 300) { + DEBUG("diff to big, refining"); + continue; + } + } + else DEBUG("no sample taken, refinement not possible."); + break; + } + + if (p != -1) { + pts = p; + DEBUG("aborting. Taking %llx as offset for %lld", offset, pts); + return 0; + } + } + + int bitrate = calc_bitrate(); + offset = pts * (pts_t)bitrate / 8ULL / 90000ULL; + DEBUG("fallback, bitrate=%d, results in %016llx", bitrate, offset); + offset -= offset % 188; + + return 0; +} +//---------------------------------------------------------------------- + +int Mpeg::get_pts(off_t &offset, pts_t &pts, int fixed) +{ + int left = 256*1024; + + offset -= offset % 188; + + while (left >= 188) { + unsigned char packet[188]; + if (read_internal(offset, packet, 188) != 188) { + //break; + return -1; + } + left -= 188; + offset += 188; + + if (packet[0] != 0x47) { + int i = 0; + while (i < 188) { + if (packet[i] == 0x47) + break; + --offset; ++i; + } + continue; + } + + unsigned char *payload; + int pusi = !!(packet[1] & 0x40); + + /* check for adaption field */ + if (packet[3] & 0x20) { + if (packet[4] >= 183) + continue; + if (packet[4]) { + if (packet[5] & 0x10) { /* PCR present */ + pts = ((unsigned long long)(packet[ 6]&0xFF)) << 25; + pts |= ((unsigned long long)(packet[ 7]&0xFF)) << 17; + pts |= ((unsigned long long)(packet[ 8]&0xFE)) << 9; + pts |= ((unsigned long long)(packet[ 9]&0xFF)) << 1; + pts |= ((unsigned long long)(packet[10]&0x80)) >> 7; + offset -= 188; + if (fixed && fix_pts(offset, pts)) + return -1; + return 0; + } + } + payload = packet + packet[4] + 4 + 1; + } + else payload = packet + 4; + + if (!pusi) continue; + + /* somehow not a startcode. (this is invalid, since pusi was set.) ignore it. */ + if (payload[0] || payload[1] || (payload[2] != 1)) + continue; + + if (payload[3] == 0xFD) { // stream use extension mechanism defined in ISO 13818-1 Amendment 2 + if (payload[7] & 1) { // PES extension flag + int offs = 0; + if (payload[7] & 0x80) // pts avail + offs += 5; + if (payload[7] & 0x40) // dts avail + offs += 5; + if (payload[7] & 0x20) // escr avail + offs += 6; + if (payload[7] & 0x10) // es rate + offs += 3; + if (payload[7] & 0x8) // dsm trickmode + offs += 1; + if (payload[7] & 0x4) // additional copy info + offs += 1; + if (payload[7] & 0x2) // crc + offs += 2; + if (payload[8] < offs) + continue; + uint8_t pef = payload[9+offs++]; // pes extension field + if (pef & 1) { // pes extension flag 2 + if (pef & 0x80) // private data flag + offs += 16; + if (pef & 0x40) // pack header field flag + offs += 1; + if (pef & 0x20) // program packet sequence counter flag + offs += 2; + if (pef & 0x10) // P-STD buffer flag + offs += 2; + if (payload[8] < offs) + continue; + uint8_t stream_id_extension_len = payload[9+offs++] & 0x7F; + if (stream_id_extension_len >= 1) { + if (payload[8] < (offs + stream_id_extension_len) ) + continue; + if (payload[9+offs] & 0x80) // stream_id_extension_bit (should not set) + continue; + switch (payload[9+offs]) { + case 0x55 ... 0x5f: // VC-1 + break; + case 0x71: // AC3 / DTS + break; + case 0x72: // DTS - HD + break; + default: + continue; + } + } + else continue; + } + else continue; + } + else continue; + } + /* drop non-audio, non-video packets because other streams can be non-compliant.*/ + else if (((payload[3] & 0xE0) != 0xC0) && // audio + ((payload[3] & 0xF0) != 0xE0)) { // video + continue; + } + + if (payload[7] & 0x80) { /* PTS */ + pts = ((unsigned long long)(payload[ 9]&0xE)) << 29; + pts |= ((unsigned long long)(payload[10]&0xFF)) << 22; + pts |= ((unsigned long long)(payload[11]&0xFE)) << 14; + pts |= ((unsigned long long)(payload[12]&0xFF)) << 7; + pts |= ((unsigned long long)(payload[13]&0xFE)) >> 1; + offset -= 188; + + return (fixed && fix_pts(offset, pts)) ? -1 : 0; + } + } + return -1; +} +//---------------------------------------------------------------------- + +int Mpeg::calc_length() +{ + if (m_duration <= 0) { + calc_begin(); calc_end(); + if (!(m_begin_valid && m_end_valid)) + return -1; + pts_t len = m_pts_end - m_pts_begin; + + if (len < 0) + len += 0x200000000LL; + + len = len / 90000; + + m_duration = int(len); + } + return m_duration; +} +//---------------------------------------------------------------------- diff --git a/src/Mpeg.h b/src/Mpeg.h new file mode 100644 index 0000000..5a24f35 --- /dev/null +++ b/src/Mpeg.h @@ -0,0 +1,74 @@ +/* + * Mpeg.h + * + * Created on: 2014. 6. 18. + * Author: oskwon + */ + +#ifndef MPEG_H_ +#define MPEG_H_ + +#include "trap.h" +#include "mpegts.h" +//---------------------------------------------------------------------- + +class HttpHeader; + +typedef long long pts_t; + +class Mpeg : public MpegTS +{ +private: + off_t m_splitsize, m_totallength, m_current_offset, m_base_offset, m_last_offset; + int m_nrfiles, m_current_file; + + pts_t m_pts_begin, m_pts_end; + + off_t m_offset_begin, m_offset_end; + off_t m_last_filelength; + + int m_begin_valid, m_end_valid, m_futile; + + int m_samples_taken; + std::map<pts_t, off_t> m_samples; + + int m_duration; + + void scan(); + int switch_offset(off_t off); + + void calc_end(); + void calc_begin(); + int calc_length(); + int calc_bitrate(); + + int fix_pts(const off_t &offset, pts_t &now); + int get_pts(off_t &offset, pts_t &pts, int fixed); + int get_offset(off_t &offset, pts_t &pts, int marg); + + void take_samples(); + int take_sample(off_t off, pts_t &p); + + off_t seek_internal(off_t offset, int whence); + ssize_t read_internal(off_t offset, void *buf, size_t count); + +public: + Mpeg(std::string filename, bool request_time_seek) throw (trap) + : MpegTS(filename, request_time_seek) + { + m_current_offset = m_base_offset = m_last_offset = 0; + m_splitsize = m_nrfiles = m_current_file = m_totallength = 0; + + m_pts_begin = m_pts_end = m_offset_begin = m_offset_end = 0; + m_last_filelength = m_begin_valid = m_end_valid = m_futile =0; + + m_duration = m_samples_taken = 0; + } + + virtual ~Mpeg() throw () {} + + void seek(HttpHeader &header); +}; +//---------------------------------------------------------------------- + +#endif /* MPEG_H_ */ diff --git a/src/SharedMemory.h b/src/SharedMemory.h new file mode 100644 index 0000000..de02b24 --- /dev/null +++ b/src/SharedMemory.h @@ -0,0 +1,92 @@ +/* + * SharedMemory.h + * + * Created on: 2014. 6. 12. + * Author: oskwon + */ + +#ifndef SHAREDMEMORY_H_ +#define SHAREDMEMORY_H_ + +#include <string> + +#include <fcntl.h> +#include <unistd.h> +#include <semaphore.h> +#include <sys/mman.h> + +#include "Logger.h" + +using namespace std; +//------------------------------------------------------------------------------- + +template <class T> +class SharedMemory +{ +protected: + sem_t* mSemId; + std::string mSemName; + + int mShmFd; + int mShmSize; + std::string mShmName; + + T* mShmData; + +protected: + void Close() + { + if (mShmData > 0) { + munmap(mShmData, mShmSize); + } + mShmData = 0; + if (mShmFd > 0) { + close(mShmFd); + //shm_unlink(mShmName.c_str()); + } + mShmFd = 0; + if (mSemId > 0) { + sem_close(mSemId); + sem_unlink(mSemName.c_str()); + } + mSemId = 0; + } + + bool Open() + { + mShmFd = shm_open(mShmName.c_str(), O_CREAT | O_RDWR, S_IRWXU | S_IRWXG); + if (mShmFd < 0) { + return false; + } + ftruncate(mShmFd, mShmSize); + + mShmData = (T*) mmap(NULL, mShmSize, PROT_READ | PROT_WRITE, MAP_SHARED, mShmFd, 0); + if (mShmData == 0) { + return false; + } + mSemId = sem_open(mSemName.c_str(), O_CREAT, S_IRUSR | S_IWUSR, 1); + return true; + } + + void Wait() + { + DEBUG("WAIT-BEFORE"); + sem_wait(mSemId); + DEBUG("WAIT-AFTER"); + } + void Post() + { + DEBUG("POST-BEFORE"); + sem_post(mSemId); + DEBUG("POST-AFTER"); + } + +public: + ~SharedMemory() + { + Close(); + } +}; +//------------------------------------------------------------------------------- + +#endif /* UPOSIXSHAREDMEMORY_H_ */ diff --git a/src/Source.h b/src/Source.h new file mode 100644 index 0000000..003fdeb --- /dev/null +++ b/src/Source.h @@ -0,0 +1,23 @@ +/* + * Source.h + * + * Created on: 2014. 6. 12. + * Author: oskwon + */ + +#ifndef SOURCE_H_ +#define SOURCE_H_ + +#include "trap.h" +//---------------------------------------------------------------------- + +class Source +{ +public: + Source(){} + virtual ~Source(){} + virtual int get_fd() const throw() = 0; +}; +//---------------------------------------------------------------------- + +#endif /* SOURCE_H_ */ diff --git a/src/UriDecoder.cpp b/src/UriDecoder.cpp new file mode 100644 index 0000000..55b62bb --- /dev/null +++ b/src/UriDecoder.cpp @@ -0,0 +1,277 @@ +/* + * UriDecoder.h + * + * Created on: 2014. 6. 10. + * Author: oskwon + */ + +#include <stdio.h> +#include <string.h> + +#include "UriDecoder.h" +//------------------------------------------------------------------------------- + +unsigned char UriDecoder::h2i(wchar_t aHexDigit) +{ + switch (aHexDigit) { + case _UL_('0'): + case _UL_('1'): + case _UL_('2'): + case _UL_('3'): + case _UL_('4'): + case _UL_('5'): + case _UL_('6'): + case _UL_('7'): + case _UL_('8'): + case _UL_('9'): + return (unsigned char)(9 + aHexDigit - _UL_('9')); + case _UL_('a'): + case _UL_('b'): + case _UL_('c'): + case _UL_('d'): + case _UL_('e'): + case _UL_('f'): + return (unsigned char)(15 + aHexDigit - _UL_('f')); + case _UL_('A'): + case _UL_('B'): + case _UL_('C'): + case _UL_('D'): + case _UL_('E'): + case _UL_('F'): + return (unsigned char)(15 + aHexDigit - _UL_('F')); + default: + return 0; + } +} +//------------------------------------------------------------------------------- + +const wchar_t* UriDecoder::decode_uri(wchar_t* aData, int aBreakCond) +{ + wchar_t* read = aData; + wchar_t* write = aData; + bool prevWasCr = false; + + if (aData == NULL) { + return NULL; + } + + for (;;) { + switch (read[0]) { + case _UL_('\0'): + if (read > write) { + write[0] = _UL_('\0'); + } + return write; + + case _UL_('%'): + switch (read[1]) { + case _UL_('0'): + case _UL_('1'): + case _UL_('2'): + case _UL_('3'): + case _UL_('4'): + case _UL_('5'): + case _UL_('6'): + case _UL_('7'): + case _UL_('8'): + case _UL_('9'): + case _UL_('a'): + case _UL_('b'): + case _UL_('c'): + case _UL_('d'): + case _UL_('e'): + case _UL_('f'): + case _UL_('A'): + case _UL_('B'): + case _UL_('C'): + case _UL_('D'): + case _UL_('E'): + case _UL_('F'): + switch (read[2]) { + case _UL_('0'): + case _UL_('1'): + case _UL_('2'): + case _UL_('3'): + case _UL_('4'): + case _UL_('5'): + case _UL_('6'): + case _UL_('7'): + case _UL_('8'): + case _UL_('9'): + case _UL_('a'): + case _UL_('b'): + case _UL_('c'): + case _UL_('d'): + case _UL_('e'): + case _UL_('f'): + case _UL_('A'): + case _UL_('B'): + case _UL_('C'): + case _UL_('D'): + case _UL_('E'): + case _UL_('F'): { + const unsigned char left = h2i(read[1]); + const unsigned char right = h2i(read[2]); + const int code = 16 * left + right; + switch (code) { + case 10: + switch (aBreakCond) { + case BR_TO_LF: + if (!prevWasCr) { + write[0] = (wchar_t)10; + write++; + } + break; + + case BR_TO_CRLF: + if (!prevWasCr) { + write[0] = (wchar_t)13; + write[1] = (wchar_t)10; + write += 2; + } + break; + + case BR_TO_CR: + if (!prevWasCr) { + write[0] = (wchar_t)13; + write++; + } + break; + + case BR_DONT_TOUCH: + default: + write[0] = (wchar_t)10; + write++; + + } + prevWasCr = false; + break; + + case 13: + switch (aBreakCond) { + case BR_TO_LF: + write[0] = (wchar_t)10; + write++; + break; + + case BR_TO_CRLF: + write[0] = (wchar_t)13; + write[1] = (wchar_t)10; + write += 2; + break; + + case BR_TO_CR: + write[0] = (wchar_t)13; + write++; + break; + + case BR_DONT_TOUCH: + default: + write[0] = (wchar_t)13; + write++; + + } + prevWasCr = true; + break; + + default: + write[0] = (wchar_t)(code); + write++; + + prevWasCr = false; + + } + read += 3; + } + break; + + default: + if (read > write) { + write[0] = read[0]; + write[1] = read[1]; + } + read += 2; + write += 2; + + prevWasCr = false; + break; + } + break; + + default: + if (read > write) { + write[0] = read[0]; + } + read++; + write++; + + prevWasCr = false; + break; + } + break; + + case _UL_('+'): + if (read > write) { + write[0] = _UL_(' '); + } + read++; + write++; + + prevWasCr = false; + break; + + default: + if (read > write) { + write[0] = read[0]; + } + read++; + write++; + + prevWasCr = false; + break; + } + } + return NULL; +} +//------------------------------------------------------------------------------- + +std::wstring UriDecoder::decode64(const wchar_t* aInput) +{ + wchar_t working[1024] = {0}; + + wcscpy(working, aInput); + decode_uri(working, BR_DONT_TOUCH); + + return std::wstring(working); +} +//------------------------------------------------------------------------------- + +std::string UriDecoder::decode(const char* aInput) +{ + std::string tmp = aInput; + std::wstring in = L""; + in.assign(tmp.begin(), tmp.end()); + + std::wstring decode = decode64(in.c_str()); + + tmp.assign(decode.begin(), decode.end()); + + return tmp; +} +//------------------------------------------------------------------------------- + +#ifdef UNIT_TEST +#include <iostream> +int main() +{ + std::string in = "/home/kos/work/workspace/tsstreamproxy/test/20130528%201415%20-%20ZDF%20-%20Die%20K%C3%BCchenschlacht.ts"; + std::string out = UriDecoder().decode(in.c_str()); + + cout << out << endl; + + FILE* fp = fopen(out.c_str(), "rb"); + + cout << (fp == NULL) ? "OPEN FAIL!!" : "OPEN OK" << endl; +} + +#endif diff --git a/src/UriDecoder.h b/src/UriDecoder.h new file mode 100644 index 0000000..2e5a0d5 --- /dev/null +++ b/src/UriDecoder.h @@ -0,0 +1,43 @@ +/* + * UriDecoder.h + * + * Created on: 2014. 6. 10. + * Author: oskwon + */ + +#ifndef URIDECODER_H_ +#define URIDECODER_H_ + +#include <memory> +#include <string> + +#include <wchar.h> +//------------------------------------------------------------------------------- + +#define BR_TO_LF 0 +#define BR_TO_CRLF 1 +#define BR_TO_CR 2 +#define BR_TO_UNIX BR_TO_LF +#define BR_TO_WINDOWS BR_TO_CRLF +#define BR_TO_MAC BR_TO_CR +#define BR_DONT_TOUCH 6 + +#define _UL_(x) L##x +//------------------------------------------------------------------------------- + +class UriDecoder +{ +protected: + unsigned char h2i(wchar_t aHexDigit); + const wchar_t* decode_uri(wchar_t* aData, int aBreakCond); + +public: + UriDecoder(){}; + virtual ~UriDecoder(){}; + + std::string decode(const char* aInput); + std::wstring decode64(const wchar_t* aInput); +}; +//------------------------------------------------------------------------------- + +#endif /* URIDECODER_H_ */ diff --git a/src/Util.cpp b/src/Util.cpp new file mode 100644 index 0000000..639cdfb --- /dev/null +++ b/src/Util.cpp @@ -0,0 +1,145 @@ +/* + * Utils.cpp + * + * Created on: 2014. 6. 10. + * Author: oskwon + */ + +#include <errno.h> +#include <stdarg.h> +#include <string.h> +#include <dirent.h> +#include <signal.h> +#include <sys/wait.h> + +#include <arpa/inet.h> +#include <sys/socket.h> + +#include <sstream> +#include <fstream> + +#include "Util.h" +#include "Logger.h" + +using namespace std; +//---------------------------------------------------------------------- + +std::string Util::ultostr(int64_t data) +{ + std::stringstream ss; + ss << data; + return ss.str(); +} +//---------------------------------------------------------------------- + +int Util::strtollu(std::string data) +{ + long long retval; + std::stringstream ss; + try { + ss.str(data); + ss >> retval; + } + catch(...) { + return -1; + } + return retval; +} +//---------------------------------------------------------------------- + +std::string Util::trim(std::string& s, const std::string& drop) +{ + std::string r = s.erase(s.find_last_not_of(drop) + 1); + return r.erase(0, r.find_first_not_of(drop)); +} +//---------------------------------------------------------------------- + +int Util::split(std::string data, const char delimiter, std::vector<string>& tokens) +{ + std::stringstream data_stream(data); + for(std::string token; std::getline(data_stream, token, delimiter); tokens.push_back(trim(token))); + return tokens.size(); +} +//---------------------------------------------------------------------- + +bool Util::split_key_value(std::string data, std::string delimiter, std::string &key, std::string &value) +{ + int idx = data.find(delimiter); + if (idx == string::npos) { + WARNING("split key & value (data : %s, delimiter : %s)", data.c_str(), delimiter.c_str()); + return false; + } + key = data.substr(0, idx); + value = data.substr(idx+1, data.length()-idx); + return true; +} +//---------------------------------------------------------------------- + +void Util::vlog(const char * format, ...) throw() +{ + static char vlog_buffer[MAX_PRINT_LEN]; + memset(vlog_buffer, 0, MAX_PRINT_LEN); + + va_list args; + va_start(args, format); + vsnprintf(vlog_buffer, MAX_PRINT_LEN-1, format, args); + va_end(args); + + WARNING("%s", vlog_buffer); +} +//---------------------------------------------------------------------- + +std::string Util::host_addr() +{ + std::stringstream ss; + struct sockaddr_in addr; + socklen_t addrlen = sizeof(addr); + + getpeername(0, (struct sockaddr*)&addr, &addrlen); + ss << inet_ntoa(addr.sin_addr); + + return ss.str(); +} +//------------------------------------------------------------------------------- + +std::vector<int> Util::find_process_by_name(std::string name, int mypid) +{ + std::vector<int> pidlist; + char cmdlinepath[256] = {0}; + DIR* d = opendir("/proc"); + if (d != 0) { + struct dirent* de; + while ((de = readdir(d)) != 0) { + int pid = atoi(de->d_name); + if (pid > 0) { + sprintf(cmdlinepath, "/proc/%s/cmdline", de->d_name); + + std::string cmdline; + std::ifstream cmdlinefile(cmdlinepath); + std::getline(cmdlinefile, cmdline); + if (!cmdline.empty()) { + size_t pos = cmdline.find('\0'); + if (pos != string::npos) + cmdline = cmdline.substr(0, pos); + pos = cmdline.rfind('/'); + if (pos != string::npos) + cmdline = cmdline.substr(pos + 1); + if ((name == cmdline) && ((mypid != pid) || (mypid == 0))) { + pidlist.push_back(pid); + } + } + } + } + closedir(d); + } + return pidlist; +} +//------------------------------------------------------------------------------- + +void Util::kill_process(int pid) +{ + int result = kill(pid, SIGINT); + DEBUG("SEND SIGINT to %d, result : %d", pid, result); + //sleep(1); +} +//---------------------------------------------------------------------- diff --git a/src/Util.h b/src/Util.h new file mode 100644 index 0000000..089ffb5 --- /dev/null +++ b/src/Util.h @@ -0,0 +1,49 @@ +/* + * Utils.h + * + * Created on: 2014. 6. 10. + * Author: oskwon + */ + +#ifndef UTILS_H_ +#define UTILS_H_ + +#include <map> +#include <string> +#include <vector> + +#include <stdint.h> + +#include "Http.h" +#include "Source.h" +#include "Encoder.h" +//---------------------------------------------------------------------- + +class Util { +public: + static void vlog(const char * format, ...) throw(); + + static int strtollu(std::string data); + static std::string ultostr(int64_t data); + + static std::string trim(std::string& s, const std::string& drop = " \t\n\v\r"); + + static int split(std::string data, const char delimiter, std::vector<std::string>& tokens); + static bool split_key_value(std::string data, std::string delimiter, std::string &key, std::string &value); + + static void kill_process(int pid); + + static std::string host_addr(); + + static std::vector<int> find_process_by_name(std::string name, int mypid); +}; +//---------------------------------------------------------------------- + +typedef struct _thread_params_t { + Source *source; + Encoder *encoder; + HttpHeader *request; +} ThreadParams; +//---------------------------------------------------------------------- + +#endif /* UTILS_H_ */ diff --git a/src/external/mpegts.cpp b/src/external/mpegts.cpp new file mode 100644 index 0000000..c70b92f --- /dev/null +++ b/src/external/mpegts.cpp @@ -0,0 +1,735 @@ +//#include "config.h" // oskwon +#include "trap.h" + +#include "mpegts.h" +//#include "util.h" // oskwon +#include "Util.h" // oskwon + +#include <unistd.h> +#include <stddef.h> +#include <fcntl.h> +#include <sys/stat.h> + +#include <boost/crc.hpp> + +#include <string> +using std::string; + +#include <boost/algorithm/string.hpp> + +MpegTS::MpegTS(int fd_in, bool request_time_seek_in) throw(trap) + : private_fd(false), + fd(fd_in), + request_time_seek(request_time_seek_in) +{ + init(); +} + +MpegTS::MpegTS(string filename, bool request_time_seek_in) throw(trap) + : private_fd(true), + request_time_seek(request_time_seek_in) +{ + // oskwon : add option O_LARGEFILE. + if((fd = open(filename.c_str(), O_RDONLY | O_LARGEFILE, 0)) < 0) + throw(trap("MpegTS::MpegTS: cannot open file")); + + init(); +} + +MpegTS::~MpegTS() throw() +{ + if(private_fd) + close(fd); +} + +void MpegTS::init() throw(trap) +{ + mpegts_pat_t::const_iterator it; + struct stat filestat; + + if(fstat(fd, &filestat)) + throw(trap("MpegTS::init: cannot stat")); + + Util::vlog("MpegTS::init: file length: %lld Mb", filestat.st_size >> 20); + + eof_offset = stream_length = filestat.st_size; + eof_offset = (eof_offset / sizeof(ts_packet_t)) * sizeof(ts_packet_t); + + if(!read_pat()) + throw(trap("MpegTS::init: invalid transport stream (no suitable pat)")); + + for(it = pat.begin(); it != pat.end(); it++) + if(read_pmt(it->second)) + break; + + if(it == pat.end()) + throw(trap("MpegTS::init: invalid transport stream (no suitable pmt)")); + + pmt_pid = it->second; + is_time_seekable = false; + + if(request_time_seek) + { + Util::vlog("MpegTS: start find pcr"); + + if(lseek(fd, 0, SEEK_SET) == (off_t)-1) + Util::vlog("MpegTS::init seek to sof fails"); + else + { + Util::vlog("MpegTS: start find first pcr"); + + first_pcr_ms = find_pcr_ms(direction_forward); + + if(lseek(fd, eof_offset, SEEK_SET) == (off_t)-1) + Util::vlog("MpegTS::init: seek to eof fails"); + else + { + Util::vlog("MpegTS: start find last pcr"); + last_pcr_ms = find_pcr_ms(direction_backward); + + if(last_pcr_ms < first_pcr_ms) + Util::vlog("MpegTS::init: pcr wraparound, cannot seek this stream, first pcr: %d, last pcr: %d", first_pcr_ms / 1000, last_pcr_ms / 1000); + else + if((first_pcr_ms >= 0) && (last_pcr_ms >= 0)) + is_time_seekable = true; + } + } + + Util::vlog("MpegTS: find pcr done"); + } + + if(!is_time_seekable) + { + first_pcr_ms = -1; + last_pcr_ms = -1; + } + + if(lseek(fd, 0, SEEK_SET) == (off_t)-1) + Util::vlog("MpegTS::init: seek to sof fails"); + + //Util::vlog("first_pcr_ms = %d", first_pcr_ms); + //Util::vlog("last_pcr_ms = %d", last_pcr_ms); + //Util::vlog("eof_offset is at %lld", eof_offset); +} + +bool MpegTS::read_table(int filter_pid, int filter_table) throw(trap) +{ + typedef boost::crc_optimal<32, 0x04c11db7, 0xffffffff, 0x0, false, false> boost_mpeg_crc_t; + + boost_mpeg_crc_t my_crc; + mpeg_crc_t mpeg_crc; + uint32_t their_crc; + + ts_packet_t packet; + const section_table_header_t *table; + const section_table_syntax_t *syntax; + const uint8_t *payload_data; + + int timeout; + int section_length = -1; + int section_length_remaining = -1; + int pid; + int cc = -1; + int packet_payload_offset; + int payload_length; + + raw_table_data.clear(); + table_data.clear(); + + for(timeout = 0; timeout < 2000; timeout++) + { + if(read(fd, (void *)&packet, sizeof(packet)) != sizeof(packet)) + throw(trap("MpegTS::read_table: read error")); + + if(packet.header.sync_byte != MpegTS::sync_byte_value) + throw(trap("MpegTS::read_table: no sync byte found")); + + pid = (packet.header.pid_high << 8) | (packet.header.pid_low); + + if(pid != filter_pid) + continue; + + if(packet.header.tei) + continue; + + if(!packet.header.payload_present) + continue; + + if((cc != -1) && (cc != packet.header.cc)) + { + //Util::vlog("MpegTS::read_table discontinuity: %d/%d", cc, packet.header.cc); + goto retry; + } + + cc = (packet.header.cc + 1) & 0x0f; + + if(packet.header.pusi) + { + if(table_data.length() > 0) + { + //Util::vlog("MpegTS::read_table: start payload upexpected"); + goto retry; + } + } + else + { + if(table_data.length() <= 0) + { + //Util::vlog("MpegTS::read_table: continue payload unexpected"); + goto retry; + } + } + + //Util::vlog("MpegTS::read_table: correct packet with pid: %x, %s", pid, packet.header.pusi ? "start" : "continuation"); + + packet_payload_offset = offsetof(ts_packet_t, header.payload); + + //Util::vlog("MpegTS::read_table: payload offset: %d", packet_payload_offset); + + if(packet.header.af) + packet_payload_offset = offsetof(ts_packet_t, header.afield) + packet_payload_offset; + + //Util::vlog("MpegTS::read_table: payload offset after adaptation field: %d", packet_payload_offset); + + if(packet.header.pusi) + packet_payload_offset += packet.byte[packet_payload_offset] + 1; // psi payload pointer byte + + //Util::vlog("MpegTS::read_table: payload offset after section pointer field: %d", packet_payload_offset); + + if(table_data.length() == 0) + { + table = (const section_table_header_t *)&packet.byte[packet_payload_offset]; + + raw_table_data.assign((const uint8_t *)table, offsetof(section_table_header_t, payload)); + + //Util::vlog("MpegTS::read_table: table id: %d", table->table_id); + + if(table->table_id != filter_table) + { + Util::vlog("MpegTS::read_table: table %d != %d", table->table_id, filter_table); + goto retry; + } + + if(table->private_bit) + { + Util::vlog("MpegTS::read_table: private != 0: %d", table->private_bit); + goto retry; + } + + if(!table->section_syntax) + { + Util::vlog("MpegTS::read_table: section_syntax == 0: %d", table->section_syntax); + goto retry; + } + + if(table->reserved != 0x03) + { + Util::vlog("MpegTS::read_table: reserved != 0x03: %x", table->reserved); + goto retry; + } + + if(table->section_length_unused != 0x00) + { + Util::vlog("MpegTS::read_table: section length unused != 0x00: %x", table->section_length_unused); + goto retry; + } + + section_length = ((table->section_length_high << 8) | (table->section_length_low)) - offsetof(section_table_syntax_t, data); + //Util::vlog("MpegTS::read_table: section length: %d", section_length); + + if(section_length < 0) + { + Util::vlog("MpegTS::read_table: section length < 0: %d", section_length); + goto retry; + } + + if(section_length_remaining < 0) + section_length_remaining = section_length; + + syntax = (const section_table_syntax_t *)&table->payload; + + raw_table_data.append((const uint8_t *)syntax, offsetof(section_table_syntax_t, data)); + + //Util::vlog("MpegTS::read_table: tide: 0x%x", (syntax->tide_high << 8) | syntax->tide_low); + + if(syntax->reserved != 0x03) + { + Util::vlog("MpegTS::read_table: syntax reserved != 0x03: %d", syntax->reserved); + goto retry; + } + + //Util::vlog("MpegTS::read_table: currnext: %d", syntax->currnext); + //Util::vlog("MpegTS::read_table: version: %d", syntax->version); + //Util::vlog("MpegTS::read_table: ordinal: %d", syntax->ordinal); + //Util::vlog("MpegTS::read_table: last: %d", syntax->last); + + payload_length = sizeof(ts_packet_t) - ((const uint8_t *)&syntax->data - &packet.byte[0]); + //Util::vlog("MpegTS::read_table: payload length: %d", payload_length); + + if(payload_length > section_length_remaining) + payload_length = section_length_remaining; + + //Util::vlog("MpegTS::read_table: payload length after trimming: %d", payload_length); + + raw_table_data.append((const uint8_t *)&syntax->data, payload_length); + table_data.append((const uint8_t *)&syntax->data, payload_length); + } + else + { + payload_data = (const uint8_t *)&packet.byte[packet_payload_offset]; + payload_length = sizeof(ts_packet_t) - packet_payload_offset; + + if(payload_length > section_length_remaining) + payload_length = section_length_remaining; + + raw_table_data.append((const uint8_t *)payload_data, payload_length); + table_data.append((const uint8_t *)payload_data, payload_length); + } + + section_length_remaining -= payload_length; + + if((section_length > 0) && (section_length_remaining == 0)) + break; + + continue; + +retry: + section_length = -1; + section_length_remaining = -1; + raw_table_data.clear(); + table_data.clear(); + } + + if(table_data.length() == 0) + { + //Util::vlog("MpegTS::read_table: timeout"); + return(false); + } + + if(section_length < (int)sizeof(mpeg_crc_t)) + { + Util::vlog("MpegTS::read_table: table too small"); + return(false); + } + + my_crc.process_bytes(raw_table_data.data(), raw_table_data.length() - sizeof(mpeg_crc_t)); + + mpeg_crc = *(const mpeg_crc_t *)(&raw_table_data.data()[raw_table_data.length() - sizeof(mpeg_crc_t)]); + their_crc = (mpeg_crc.byte[0] << 24) | (mpeg_crc.byte[1] << 16) | (mpeg_crc.byte[2] << 8) | mpeg_crc.byte[3]; + + if(my_crc.checksum() != their_crc) + { + Util::vlog("MpegTS::read_table: crc mismatch: my crc: %x, their crc: %x", my_crc.checksum(), their_crc); + return(false); + } + + return(true); +} + +bool MpegTS::read_pat() throw(trap) +{ + int attempt; + int current, entries, program, pid; + const pat_entry_t *entry; + + for(attempt = 0; attempt < 16; attempt++) + { + if(!read_table(0, 0)) + continue; + + entries = (table_data.length() - sizeof(mpeg_crc_t)) / sizeof(*entry); + entry = (const pat_entry_t *)table_data.data(); + + for(current = 0; current < entries; current++) + { + program = (entry[current].program_high << 8) | (entry[current].program_low); + pid = (entry[current].pmt_pid_high << 8) | (entry[current].pmt_pid_low); + //Util::vlog("MpegTS::read_pat > program: %d -> pid %x", program, pid); + + if(entry[current].reserved != 0x07) + { + Util::vlog("MpegTS::read_pat > reserved != 0x07: 0x%x", entry[current].reserved); + goto next_pat_entry; + } + + pat[program] = pid; + } + + return(true); + +next_pat_entry: + (void)0; + } + + return(false); +} + +bool MpegTS::read_pmt(int filter_pid) throw(trap) +{ + int attempt, programinfo_length, esinfo_length; + int es_pid, es_data_length, es_data_skip, es_data_offset; + int ds_data_skip, ds_data_offset; + bool private_stream_is_ac3; + string stream_language; + + const uint8_t *es_data; + const pmt_header_t *pmt_header; + const pmt_es_entry_t *es_entry; + const pmt_ds_entry_t *ds_entry; + const pmt_ds_a_t *ds_a; + + pcr_pid = video_pid = audio_pid = -1; + + for(attempt = 0; attempt < 16; attempt++) + { + if(!read_table(filter_pid, table_pmt)) + break; + + pmt_header = (const pmt_header_t *)table_data.data(); + pcr_pid = (pmt_header->pcrpid_high << 8) | pmt_header->pcrpid_low; + programinfo_length = (pmt_header->programinfo_length_high << 8) | + pmt_header->programinfo_length_low; + + if(pmt_header->reserved_1 != 0x07) + { + Util::vlog("MpegTS::read_pmt > reserved_1: %x", pmt_header->reserved_1); + continue; + } + + //Util::vlog("MpegTS::read_pmt: > pcr_pid: %x", pcr_pid); + //Util::vlog("MpegTS::read_pmt: > program info length: %d", programinfo_length); + + if(pmt_header->unused != 0x00) + { + Util::vlog("MpegTS::read_pmt: > unused: %x", pmt_header->unused); + continue; + } + + if(pmt_header->reserved_2 != 0x0f) + { + Util::vlog("MpegTS::read_pmt: > reserved_2: %x", pmt_header->reserved_2); + continue; + } + + es_data = &(table_data.data()[offsetof(pmt_header_t, data) + programinfo_length]); + es_data_length = table_data.length() - offsetof(pmt_header_t, data); + es_data_skip = offsetof(pmt_es_entry_t, descriptors); + + for(es_data_offset = 0; (es_data_offset + es_data_skip + (int)sizeof(uint32_t)) < es_data_length; ) + { + es_entry = (const pmt_es_entry_t *)&es_data[es_data_offset]; + esinfo_length = (es_entry->es_length_high << 8) | es_entry->es_length_low; + es_pid = (es_entry->es_pid_high << 8) | es_entry->es_pid_low; + + if(es_entry->reserved_1 != 0x07) + { + Util::vlog("MpegTS::read_pmt: reserved 1: %x", es_entry->reserved_1); + goto next_descriptor_entry; + } + + //Util::vlog("MpegTS::read_pmt: >> pid: %x", es_pid); + + if(es_entry->reserved_2 != 0x0f) + { + Util::vlog("MpegTS::read_pmt: reserved 2: %x", es_entry->reserved_2); + goto next_descriptor_entry; + } + + if(es_entry->unused != 0x00) + { + Util::vlog("MpegTS::read_pmt: unused: %x", es_entry->unused); + goto next_descriptor_entry; + } + + //Util::vlog("MpegTS::read_pmt: esinfo_length: %d", esinfo_length); + + switch(es_entry->stream_type) + { + case(mpeg_streamtype_video_mpeg1): + case(mpeg_streamtype_video_mpeg2): + case(mpeg_streamtype_video_h264): + { + if(video_pid < 0) + video_pid = es_pid; + break; + } + + case(mpeg_streamtype_audio_mpeg1): + case(mpeg_streamtype_audio_mpeg2): + case(mpeg_streamtype_private_pes): // ac3 + { + private_stream_is_ac3 = false; + + ds_data_skip = es_data_offset + offsetof(pmt_es_entry_t, descriptors); + + for(ds_data_offset = 0; (ds_data_offset + 2) < esinfo_length; ) + { + ds_entry = (const pmt_ds_entry_t *)&es_data[ds_data_skip + ds_data_offset]; + + //Util::vlog("MpegTS::read_pmt: >>> offset: %d", ds_data_offset); + //Util::vlog("MpegTS::read_pmt: >>> descriptor id: %x", ds_entry->id); + //Util::vlog("MpegTS::read_pmt: >>> length: %d", ds_entry->length); + + switch(ds_entry->id) + { + case(pmt_desc_language): + { + ds_a = (const pmt_ds_a_t *)&ds_entry->data; + //Util::vlog("MpegTS::read_pmt: >>>> lang: %c%c%c [%d]", ds_a->lang[0], + // ds_a->lang[1], ds_a->lang[2], ds_a->code); + + stream_language.assign((const char *)&ds_a->lang, offsetof(pmt_ds_a_t, code)); + + break; + } + + case(pmt_desc_ac3): + { + private_stream_is_ac3 = true; + break; + } + } + + ds_data_offset += ds_entry->length + offsetof(pmt_ds_entry_t, data); + } + + if(!boost::iequals(stream_language, "nar")) + { + if(private_stream_is_ac3 || (audio_pid < 0)) // ac3 stream has preference + audio_pid = es_pid; + } + } + } + +next_descriptor_entry: + es_data_offset += es_data_skip + esinfo_length; + } + + return(true); + } + + return(false); +} + +int MpegTS::get_fd() const throw() +{ + return(fd); +} + +void MpegTS::parse_pts_ms(int pts_ms, int &h, int &m, int &s, int &ms) throw() +{ + h = pts_ms / (60 * 60 * 1000); + pts_ms -= h * (60 * 60 * 1000); + m = pts_ms / (60 * 1000); + pts_ms -= m * (60 * 1000); + s = pts_ms / (1000); + pts_ms -= s * (1000); + ms = pts_ms; +} + +int MpegTS::find_pcr_ms(seek_direction_t direction) const throw() +{ + ts_packet_t packet; + ts_adaptation_field_t *afield; + int attempt, pcr_ms = -1, pid; + int ms, h, m, s; + off_t offset; + + for(attempt = 0; (pcr_ms < 0) && (attempt < find_pcr_max_probe); attempt++) + { + if(direction == direction_backward) + { + offset = (off_t)sizeof(ts_packet_t) * 2 * -1; + + if(lseek(fd, offset, SEEK_CUR) == (off_t)-1) + { + Util::vlog("MpegTS::find_pcr_ms: lseek failed"); + return(-1); + } + } + + if(read(fd, &packet, sizeof(packet)) != sizeof(packet)) + { + Util::vlog("MpegTS::find_pcr_ms: read error"); + return(-1); + } + + if(packet.header.sync_byte != sync_byte_value) + { + Util::vlog("MpegTS::find_pcr_ms: no sync byte"); + return(-1); + } + + pid = (packet.header.pid_high << 8) | packet.header.pid_low; + + //Util::vlog("MpegTS::find_pcr_ms: pid: %d", pid); + + if(pid != pcr_pid) + continue; + + if(!packet.header.af) + { + //Util::vlog("MpegTS::find_pcr_ms: no adaptation field"); + continue; + } + + afield = (ts_adaptation_field_t *)&packet.header.afield; + + if(!afield->contains_pcr) + { + //Util::vlog("MpegTS::find_pcr_ms: attempt: %d, adaptation field does not have pcr field", attempt); + continue; + } + + // read 32 bits of the total 42 bits of clock precision, which is + // enough for seeking, one tick = 1/45th second + + pcr_ms = uint32_t(afield->pcr_0 << 24) | uint32_t(afield->pcr_1 << 16) | + uint32_t(afield->pcr_2 << 8) | uint32_t(afield->pcr_3 << 0); + pcr_ms /= 90; + pcr_ms <<= 1; + } + + if(attempt >= find_pcr_max_probe) + { + Util::vlog("MpegTS::find_pcr_ms: no pcr found"); + return(-1); + } + + parse_pts_ms(pcr_ms, h, m, s, ms); + + //Util::vlog("PCR found after %d packets", attempt); + //Util::vlog("PCR = %d ms (%02d:%02d:%02d:%03d)", pcr_ms, h, m, s, ms); + + return(pcr_ms); +} + +off_t MpegTS::seek(int whence, off_t offset) const throw(trap) +{ + ts_packet_t packet; + off_t actual_offset; + off_t new_offset; + + offset = ((offset / (off_t)sizeof(ts_packet_t)) * (off_t)sizeof(ts_packet_t)); + + if(lseek(fd, offset, whence) < 0) + throw(trap("MpegTS::seek: lseek (1)")); + + if(read(fd, &packet, sizeof(packet)) != sizeof(packet)) + throw(trap("MpegTS::seek: read error")); + + if(packet.header.sync_byte != sync_byte_value) + throw(trap("MpegTS::seek: no sync byte")); + + new_offset = (off_t)0 - (off_t)sizeof(packet); + + if((actual_offset = lseek(fd, new_offset, SEEK_CUR)) < 0) + throw(trap("MpegTS::seek: lseek (2)")); + + return(actual_offset); +} + +off_t MpegTS::seek_absolute(off_t offset) const throw(trap) +{ + return(seek(SEEK_SET, offset)); +} + +off_t MpegTS::seek_relative(off_t roffset, off_t limit) const throw(trap) +{ + off_t offset; + + if((roffset < 0) || (roffset > limit)) + throw(trap("MpegTS::seek_relative: value out of range")); + + offset = (eof_offset / limit) * roffset; + + return(seek(SEEK_SET, offset)); +} + +off_t MpegTS::seek_time(int pts_ms) const throw(trap) +{ + int h, m, s, ms; + int attempt; + off_t lower_bound_offset = 0; + int lower_bound_pts_ms = first_pcr_ms; + off_t upper_bound_offset = eof_offset; + int upper_bound_pts_ms = last_pcr_ms; + off_t disect_offset; + int disect_pts_ms; + off_t current_offset; + + if(!is_time_seekable) + throw(trap("MpegTS::seek: stream is not seekable")); + + if(pts_ms < first_pcr_ms) + { + Util::vlog("MpegTS::seek: seek pts beyond start of file"); + return(-1); + } + + if(pts_ms > last_pcr_ms) + { + Util::vlog("MpegTS::seek: pts beyond end of file"); + return(-1); + } + + for(attempt = 0; attempt < seek_max_attempts; attempt++) + { + disect_offset = (lower_bound_offset + upper_bound_offset) / 2; + + parse_pts_ms(pts_ms, h, m, s, ms); + //Util::vlog("MpegTS::seek: seek for [%02d:%02d:%02d.%03d] between ", h, m, s, ms); + parse_pts_ms(lower_bound_pts_ms, h, m, s, ms); + //Util::vlog("MpegTS::seek: [%02d:%02d:%02d.%03d", h, m, s, ms); + parse_pts_ms(upper_bound_pts_ms, h, m, s, ms); + //Util::vlog("MpegTS::seek: -%02d:%02d:%02d.%03d], ", h, m, s, ms); + //Util::vlog("MpegTS::seek: offset = %lld [%lld-%lld], ", disect_offset, lower_bound_offset, upper_bound_offset); + + if((current_offset = seek(SEEK_SET, disect_offset)) == 1) + { + Util::vlog("MpegTS::seek: seek fails"); + return(-1); + } + + //Util::vlog("MpegTS::seek: current offset = %lld (%lld%%)", current_offset, (current_offset * 100) / eof_offset); + + if((disect_pts_ms = find_pcr_ms(direction_forward)) < 0) + { + Util::vlog("MpegTS::seek: eof"); + return(-1); + } + + parse_pts_ms(disect_pts_ms, h, m, s, ms); + //Util::vlog("MpegTS::seek: disect=[%02d:%02d:%02d.%03d]", h, m, s, ms); + + if(disect_pts_ms < 0) + { + Util::vlog("MpegTS::seek failed to find pts"); + return(-1); + } + + if(((disect_pts_ms > pts_ms) && ((disect_pts_ms - pts_ms) < 8000)) || + ((pts_ms >= disect_pts_ms) && ((pts_ms - disect_pts_ms) < 8000))) + { + //Util::vlog("MpegTS::seek: found"); + return(current_offset); + } + + if(disect_pts_ms < pts_ms) // seek higher + { + lower_bound_offset = disect_offset; + lower_bound_pts_ms = disect_pts_ms; + + //Util::vlog("MpegTS::seek: not found: change lower bound"); + } + else + { + upper_bound_offset = disect_offset; + upper_bound_pts_ms = disect_pts_ms; + + //Util::vlog("MpegTS::seek: not found: change upper bound"); + } + } + + return(-1); +} diff --git a/src/external/mpegts.h b/src/external/mpegts.h new file mode 100644 index 0000000..9eaec7c --- /dev/null +++ b/src/external/mpegts.h @@ -0,0 +1,216 @@ +#ifndef _mpegts_h_ +#define _mpegts_h_ + +//#include "config.h" // oskwon +#include "trap.h" + +#include <map> +#include <string> + +#include <stdint.h> +#include <sys/types.h> + +#include "Source.h" // oskwon +class MpegTS : public Source // oskwon : inherit Source class. +{ + private: + + typedef std::basic_string<uint8_t> u8string; + typedef unsigned int bf; + + enum + { + sync_byte_value = 0x47, + }; + + enum + { + table_pmt = 0x02, + }; + + enum + { + find_pcr_max_probe = 1000000, + seek_max_attempts = 32, + }; + + enum + { + pmt_desc_language = 0x0a, + pmt_desc_ac3 = 0x6a, + }; + + enum + { + mpeg_streamtype_video_mpeg1 = 0x01, + mpeg_streamtype_video_mpeg2 = 0x02, + mpeg_streamtype_video_h264 = 0x1b, + mpeg_streamtype_audio_mpeg1 = 0x03, + mpeg_streamtype_audio_mpeg2 = 0x04, + mpeg_streamtype_private_pes = 0x06, + }; + + typedef struct + { + uint8_t byte[4]; + } mpeg_crc_t; + + typedef union + { + struct + { + uint8_t sync_byte; + bf pid_high:5; + bf tp:1; + bf pusi:1; + bf tei:1; + uint8_t pid_low; + bf cc:4; + bf payload_present:1; + bf af:1; + bf sc:2; + uint8_t payload[0]; + uint8_t afield_length; + uint8_t afield[0]; + } header; + uint8_t byte[188]; + } ts_packet_t; + + typedef struct + { + bf field_ext:1; + bf private_data:1; + bf splice_point:1; + bf contains_opcr:1; + bf contains_pcr:1; + bf priority:1; + bf gop_start:1; + bf discont:1; + uint8_t pcr_0; // 32 bits of the total 42 bits + uint8_t pcr_1; // of clock precision, which is + uint8_t pcr_2; // enough for seeking + uint8_t pcr_3; // tick = 1/45th second + } ts_adaptation_field_t; + + typedef struct + { + uint8_t table_id; + bf section_length_high:2; + bf section_length_unused:2; + bf reserved:2; + bf private_bit:1; + bf section_syntax:1; + uint8_t section_length_low; + uint8_t payload[0]; + } section_table_header_t; + + typedef struct + { + uint8_t tide_high; + uint8_t tide_low; + bf currnext:1; + bf version:5; + bf reserved:2; + uint8_t ordinal; + uint8_t last; + uint8_t data[0]; + } section_table_syntax_t; + + typedef struct + { + uint8_t program_high; + uint8_t program_low; + bf pmt_pid_high:5; + bf reserved:3; + uint8_t pmt_pid_low; + } pat_entry_t; + + typedef struct + { + bf pcrpid_high:5; + bf reserved_1:3; + uint8_t pcrpid_low; + uint programinfo_length_high:2; + bf unused:2; + bf reserved_2:4; + uint8_t programinfo_length_low; + uint8_t data[0]; + } pmt_header_t; + + typedef struct + { + uint8_t stream_type; + bf es_pid_high:5; + bf reserved_1:3; + uint8_t es_pid_low; + bf es_length_high:2; + bf unused:2; + bf reserved_2:4; + uint8_t es_length_low; + uint8_t descriptors[0]; + } pmt_es_entry_t; + + typedef struct + { + uint8_t id; + uint8_t length; + uint8_t data[0]; + } pmt_ds_entry_t; + + typedef struct + { + uint8_t lang[3]; + uint8_t code; + } pmt_ds_a_t; + + typedef enum + { + direction_forward, + direction_backward, + } seek_direction_t; + + typedef std::map<int, int> mpegts_pat_t; + + MpegTS(); + MpegTS(const MpegTS &); + + bool private_fd; + int fd; + mpegts_pat_t pat; + u8string raw_table_data; + u8string table_data; + + static void parse_pts_ms(int pts_ms, int &h, int &m, int &s, int &ms) throw(); + + void init() throw(trap); + bool read_table(int filter_pid, int filter_table) throw(trap); + bool read_pat() throw(trap); + bool read_pmt(int filter_pid) throw(trap); + int find_pcr_ms(seek_direction_t direction) const throw(); + off_t seek(int whence, off_t offset) const throw(trap); + + public: + + int pmt_pid; + int pcr_pid; + int video_pid; + int audio_pid; + + bool request_time_seek; + bool is_time_seekable; + + int first_pcr_ms; + int last_pcr_ms; + off_t eof_offset; + off_t stream_length; + + MpegTS(int fd, bool request_time_seek) throw(trap); + MpegTS(std::string file, bool request_time_seek) throw(trap); + ~MpegTS() throw(); + + int get_fd() const throw(); + off_t seek_absolute(off_t offset) const throw(trap); + off_t seek_relative(off_t offset, off_t limit) const throw(trap); + off_t seek_time(int pts_ms) const throw(trap); +}; +#endif diff --git a/src/external/trap.cpp b/src/external/trap.cpp new file mode 100644 index 0000000..b57b23e --- /dev/null +++ b/src/external/trap.cpp @@ -0,0 +1,42 @@ +//#include "config.h" // oskwon +#include "trap.h" + +#include <string> +using std::string; + +#include <exception> +using std::exception; + +trap::trap(string message_in) throw() + : + message(message_in) +{ + message += string(" (") + exception::what() + ")"; +} + +const char *trap::what() const throw() +{ + return(message.c_str()); +} + +trap::~trap() throw() +{ +} + +http_trap::http_trap(string message_in, + int http_error_in, string http_header_in) throw() + : + trap(message_in), + http_error(http_error_in), + http_header(http_header_in) +{ +} + +const char *http_trap::what() const throw() +{ + return(message.c_str()); +} + +http_trap::~http_trap() throw() +{ +} diff --git a/src/external/trap.h b/src/external/trap.h new file mode 100644 index 0000000..ed33328 --- /dev/null +++ b/src/external/trap.h @@ -0,0 +1,33 @@ +#ifndef _trap_h_ +#define _trap_h_ + +#include <string> +#include <exception> + +class trap : public std::exception +{ + public: + + std::string message; + + trap(std::string msg) throw(); + virtual const char *what() const throw(); + virtual ~trap() throw(); +}; + +class http_trap : public trap +{ + public: + + int http_error; + std::string http_header; + + http_trap(std::string msg, + int http_error, std::string http_header) throw(); + + virtual const char *what() const throw(); + virtual ~http_trap() throw(); + +}; + +#endif diff --git a/src/main.cpp b/src/main.cpp index 897b58e..4d5da36 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -1,293 +1,366 @@ /* * main.cpp * - * Created on: 2013. 9. 12. - * Author: kos + * Created on: 2014. 6. 10. + * Author: oskwon */ #include <stdio.h> +#include <unistd.h> #include <string.h> -#include <fcntl.h> +#include <pthread.h> #include <poll.h> -#include <stdlib.h> +#include <errno.h> #include <signal.h> -#include <unistd.h> -#include <sys/ioctl.h> -#include <stdint.h> -#include <sys/types.h> -#include <vector> #include <string> -#include <iterator> - -#include "uStringTool.h" -#include "ePreDefine.h" -#include "eParser.h" -#include "eUpstreamSocket.h" -#include "eTransCodingDevice.h" -#include "eHostInfoMgr.h" +#include "Util.h" +#include "Logger.h" -#include "eFilePumpThread.h" -#include "eDemuxPumpThread.h" -#include "eNetworkPumpThread.h" +#include "Http.h" +#include "Mpeg.h" -#ifdef DEBUG_LOG -int myPid = 0; -FILE* fpLog = 0; -//#undef LOG -//#define LOG(X,...) { do{}while(0); } -#endif +#include "Demuxer.h" +#include "Encoder.h" +#include "UriDecoder.h" using namespace std; -//------------------------------------------------------------------------------- +//---------------------------------------------------------------------- -eFilePumpThread* hFilePumpThread = 0; -eDemuxPumpThread* hDemuxPumpThread = 0; -eNetworkPumpThread* hNetworkPumpThread = 0; -eTransCodingDevice* hTranscodingDevice = 0; +#define BUFFFER_SIZE (188 * 256) -void SigHandler(int aSigNo); -//------------------------------------------------------------------------------- +void show_help(); +void signal_handler(int sig_no); -/* -GET /1:0:19:2B66:3F3:1:C00000:0:0:0: HTTP/1.1 -Host: 192.168.102.177:8002 -User-Agent: VLC/2.0.8 LibVLC/2.0.8 -Range: bytes=0- -Connection: close -Icy-MetaData: 1 - -GET /file?file=/hdd/movie/20131023%201005%20-%20DW%20-%20Germany%20Today.ts HTTP/1.1 -*/ -int main(int argc, char** argv) -{ - char request[MAX_LINE_LENGTH] = {0}; - int videopid = 0, audiopid = 0, pmtid = 0; - -#ifdef DEBUG_LOG - myPid = getpid(); - fpLog = fopen("/tmp/transtreamproxy.log", "a+"); -#endif - - std::string ipaddr = eHostInfoMgr::GetHostAddr(); -#ifdef DEBUG_LOG - LOG("client info : %s, device count : %d", ipaddr.c_str(), eTransCodingDevice::GetMaxDeviceCount()); -#endif - eHostInfoMgr hostmgr("tsp", eTransCodingDevice::GetMaxDeviceCount()); - if (hostmgr.Init() == false) { - return 1; - } - if (hostmgr.IsExist(ipaddr) > 0) { - hostmgr.Update(ipaddr, getpid()); - } else { - hostmgr.Register(ipaddr, getpid()); - } +void *source_thread_main(void *params); +void *streaming_thread_main(void *params); - signal(SIGINT, SigHandler); +int streaming_write(const char *buffer, size_t buffer_len, bool enable_log = false); +//---------------------------------------------------------------------- - if (!ReadRequest(request)) { - RETURN_ERR_400(); - } -#ifdef DEBUG_LOG - LOG("%s", request); -#endif +static bool is_terminated = true; +static int source_thread_id, stream_thread_id; +static pthread_t source_thread_handle, stream_thread_handle; +//---------------------------------------------------------------------- - if (strncmp(request, "GET /", 5)) { - RETURN_ERR_400(); +int main(int argc, char **argv) +{ + if (argc > 1) { + if (strcmp(argv[1], "-h") == 0) + show_help(); + exit(0); } + Logger::instance()->init("/tmp/transtreamproxy", Logger::WARNING); - char* http = strchr(request + 5, ' '); - if (!http || strncmp(http, " HTTP/1.", 7)) { -#ifdef DEBUG_LOG - LOG("Not support request (%s).", http); -#endif - RETURN_ERR_400("Not support request (%s).", http); - } + signal(SIGINT, signal_handler); + + HttpHeader header; + std::string req = HttpHeader::read_request(); -#ifdef DEBUG_LOG - LOG("%s", request + 5); -#endif + DEBUG("request head :\n%s", req.c_str()); - bool isfilestream = true; - std::string responsedata = ""; - if(strncmp(request + 4, "/file?", 6) != 0) { - char authorization[MAX_LINE_LENGTH] = {0}; - if(eParser::Authorization(authorization)) { - RETURN_ERR_401(); + try { + if (req.find("\r\n\r\n") == std::string::npos) { + throw(http_trap("no found request done code.", 400, "Bad Request")); } - eUpstreamSocket upstreamsocket; - if(!upstreamsocket.Connect()) { -#ifdef DEBUG_LOG - LOG("Upstream connect failed."); -#endif - RETURN_ERR_502("Upstream connect failed."); + if (header.parse_request(req) == false) { + throw(http_trap("request parse error.", 400, "Bad Request")); } - if(upstreamsocket.Request(eParser::ServiceRef(request + 5, authorization), responsedata) < 0) { -#ifdef DEBUG_LOG - LOG("Upstream request failed."); -#endif - RETURN_ERR_502(); + if (header.method != "GET") { + throw(http_trap("not support request type.", 400, "Bad Request, not support request")); } - isfilestream = false; - } - eTransCodingDevice transcoding; - if(transcoding.Open() == false) { -#ifdef DEBUG_LOG - LOG("Open device failed."); -#endif - return 1;//RETURN_ERR_502("Open device failed."); - } - hTranscodingDevice = &transcoding; - - bool ispidseted = false; - eNetworkPumpThread networkpump(transcoding.GetDeviceFd()); - hNetworkPumpThread = &networkpump; - - if(!isfilestream) { - int demuxno = 0; - std::string wwwauthenticate = ""; - std::vector<unsigned long> pidlist; - ispidseted = eParser::LiveStreamPid(responsedata, pidlist, demuxno, videopid, audiopid, pmtid, wwwauthenticate); - if(ispidseted) { - if(transcoding.SetStreamPid(videopid, audiopid, pmtid) == false) { -#ifdef DEBUG_LOG - LOG("Pid setting failed."); -#endif - return 1;//RETURN_ERR_502("Pid setting failed."); + Encoder encoder; + Source *source = 0; + ThreadParams thread_params = { 0, &encoder, &header }; + + int video_pid = 0, audio_pid = 0, pmt_pid = 0; + + switch(header.type) { + case HttpHeader::TRANSCODING_FILE: + try { + std::string uri = UriDecoder().decode(header.page_params["file"].c_str()); + Mpeg *ts = new Mpeg(uri, false); + pmt_pid = ts->pmt_pid; + video_pid = ts->video_pid; + audio_pid = ts->audio_pid; + source = ts; + } + catch (const trap &e) { + throw(http_trap(e.what(), 404, "Not Found")); + } + break; + case HttpHeader::TRANSCODING_LIVE: + try { + Demuxer *dmx = new Demuxer(&header); + pmt_pid = dmx->pmt_pid; + video_pid = dmx->video_pid; + audio_pid = dmx->audio_pid; + source = dmx; + } + catch (const http_trap &e) { + throw(e); + } + break; + case HttpHeader::M3U: + try { + std::string response = header.build_response((Mpeg*) source); + if (response != "") { + streaming_write(response.c_str(), response.length(), true); + } + } + catch (...) { } - } else { -#ifdef DEBUG_LOG - LOG("Invalid upstream response."); -#endif - RETURN_ERR_502("Invalid upstream response."); + exit(0); + default: + throw(http_trap(std::string("not support source type : ") + Util::ultostr(header.type), 400, "Bad Request")); } + thread_params.source = source; -#ifdef DEBUG_LOG - LOG("stream pids parsing result : %d, video : %d, audio : %d, pmt : %d, pids size : [%d]", ispidseted, videopid, audiopid, pmtid, pidlist.size()); - for(int j = 0; j < pidlist.size(); ++j) { - LOG("saved pid : [%x]", pidlist[j]); + if (!encoder.retry_open(2, 3)) { + throw(http_trap("encoder open fail.", 503, "Service Unavailable")); } -#endif - - eDemuxPumpThread demuxpump; - if(pidlist.size() > 0) { - if(!demuxpump.Open(demuxno)) { -#ifdef DEBUG_LOG - LOG("Demux open failed."); -#endif - return 1;//RETURN_ERR_502("%s", demuxpump.GetMessage().c_str()); + + if (encoder.state == Encoder::ENCODER_STAT_OPENED) { + std::string response = header.build_response((Mpeg*) source); + if (response == "") { + throw(http_trap("response build fail.", 503, "Service Unavailable")); } - demuxpump.SetDeviceFd(transcoding.GetDeviceFd()); - demuxpump.Start(); - hDemuxPumpThread = &demuxpump; - - if(demuxpump.GetState() < eDemuxState::stSetedFilter) { - if(!demuxpump.SetFilter(pidlist)) { -#ifdef DEBUG_LOG - LOG("Demux setting filter failed."); -#endif - return 1;//RETURN_ERR_502("Demux setting filter failed."); - } + + streaming_write(response.c_str(), response.length(), true); + + if (header.type == HttpHeader::TRANSCODING_FILE) { + ((Mpeg*) source)->seek(header); + } + + if (!encoder.ioctl(Encoder::IOCTL_SET_VPID, video_pid)) { + throw(http_trap("video pid setting fail.", 503, "Service Unavailable")); + } + if (!encoder.ioctl(Encoder::IOCTL_SET_APID, audio_pid)) { + throw(http_trap("audio pid setting fail.", 503, "Service Unavailable")); } - if(!demuxpump.SetPidList(pidlist)) { -#ifdef DEBUG_LOG - LOG("PID setting failed."); -#endif - RETURN_ERR_502("PID setting failed."); + if (!encoder.ioctl(Encoder::IOCTL_SET_PMTPID, pmt_pid)) { + throw(http_trap("pmt pid setting fail.", 503, "Service Unavailable")); } - } else { -#ifdef DEBUG_LOG - LOG("No found PID for selected stream."); -#endif - return 1;//RETURN_ERR_502("No found PID for selected stream."); } -#ifdef NORMAL_STREAMPROXY - demuxpump.Join(); -#else - if(transcoding.StartTranscoding() == false) { - return 1;//RETURN_ERR_502("Transcoding start failed."); + is_terminated = false; + source_thread_id = pthread_create(&source_thread_handle, 0, source_thread_main, (void *)&thread_params); + if (source_thread_id < 0) { + is_terminated = true; + throw(http_trap("souce thread create fail.", 503, "Service Unavailable")); } - networkpump.Start(); - networkpump.Join(); - demuxpump.Stop(); - demuxpump.Join(); -#endif - } else { - std::string srcfilename = ""; - eParser::FileName(request, http, srcfilename); - - ispidseted = eParser::MetaData(srcfilename, videopid, audiopid); - if(ispidseted) { - if(transcoding.SetStreamPid(videopid, audiopid) == false) { -#ifdef DEBUG_LOG - LOG("No found PID for selected stream."); -#endif - return 1;//RETURN_ERR_502("Pid setting failed."); + else { + pthread_detach(source_thread_handle); + if (!encoder.ioctl(Encoder::IOCTL_START_TRANSCODING, 0)) { + is_terminated = true; + throw(http_trap("start transcoding fail.", 503, "Service Unavailable")); + } + else { + stream_thread_id = pthread_create(&stream_thread_handle, 0, streaming_thread_main, (void *)&thread_params); + if (stream_thread_id < 0) { + is_terminated = true; + throw(http_trap("stream thread create fail.", 503, "Service Unavailable")); + } } } -#ifdef DEBUG_LOG - LOG("meta parsing result : %d, video : %d, audio : %d", ispidseted, videopid, audiopid); -#endif - - eFilePumpThread filepump(transcoding.GetDeviceFd()); - if(filepump.Open(srcfilename) == false) { -#ifdef DEBUG_LOG - LOG("TS file open failed."); -#endif - RETURN_ERR_503("TS file open failed."); + pthread_join(stream_thread_handle, 0); + is_terminated = true; + + if (source != 0) { + delete source; + source = 0; } - filepump.Start(); - hFilePumpThread = &filepump; - - sleep(1); - filepump.SeekOffset(0); - if(transcoding.StartTranscoding() == false) { -#ifdef DEBUG_LOG - LOG("Transcoding start failed."); -#endif - return 1;//RETURN_ERR_502("Transcoding start failed."); + } + catch (const http_trap &e) { + ERROR("%s", e.message.c_str()); + std::string error = ""; + if (e.http_error == 401 && header.authorization.length() > 0) { + error = header.authorization; + } + else { + error = HttpUtil::http_error(e.http_error, e.http_header); } + streaming_write(error.c_str(), error.length(), true); + exit(-1); + } + catch (...) { + ERROR("unknown exception..."); + std::string error = HttpUtil::http_error(400, "Bad request"); + streaming_write(error.c_str(), error.length(), true); + exit(-1); + } + return 0; +} +//---------------------------------------------------------------------- + +void *streaming_thread_main(void *params) +{ + if (is_terminated) return 0; + + INFO("streaming thread start."); + Encoder *encoder = ((ThreadParams*) params)->encoder; + HttpHeader *header = ((ThreadParams*) params)->request; - networkpump.Start(); - networkpump.Join(); - filepump.Stop(); - filepump.Join(); + try { + int poll_state, rc, wc; + struct pollfd poll_fd[2]; + unsigned char buffer[BUFFFER_SIZE]; + + poll_fd[0].fd = encoder->get_fd(); + poll_fd[0].events = POLLIN | POLLHUP; + + while(!is_terminated) { + poll_state = poll(poll_fd, 1, 1000); + if (poll_state == -1) { + throw(trap("poll error.")); + } + else if (poll_state == 0) { + continue; + } + if (poll_fd[0].revents & POLLIN) { + rc = wc = 0; + rc = read(encoder->get_fd(), buffer, BUFFFER_SIZE - 1); + if (rc <= 0) { + break; + } + else if (rc > 0) { + wc = streaming_write((const char*) buffer, rc); + if (wc < rc) { + //DEBUG("need rewrite.. remain (%d)", rc - wc); + int retry_wc = 0; + for (int remain_len = rc - wc; rc != wc; remain_len -= retry_wc) { + poll_fd[0].revents = 0; + + retry_wc = streaming_write((const char*) (buffer + rc - remain_len), remain_len); + wc += retry_wc; + } + LOG("re-write result : %d - %d", wc, rc); + } + } + } + else if (poll_fd[0].revents & POLLHUP) + { + if (encoder->state == Encoder::ENCODER_STAT_STARTED) { + DEBUG("stop transcoding.."); + encoder->ioctl(Encoder::IOCTL_STOP_TRANSCODING, 0); + } + break; + } + } } -#ifdef DEBUG_LOG - fclose(fpLog); -#endif + catch (const trap &e) { + ERROR("%s %s (%d)", e.what(), strerror(errno), errno); + } + is_terminated = true; + INFO("streaming thread stop."); + + if (encoder->state == Encoder::ENCODER_STAT_STARTED) { + DEBUG("stop transcoding.."); + encoder->ioctl(Encoder::IOCTL_STOP_TRANSCODING, 0); + } + + pthread_exit(0); + return 0; } -//------------------------------------------------------------------------------- +//---------------------------------------------------------------------- -char* ReadRequest(char* aRequest) +void *source_thread_main(void *params) { - return fgets(aRequest, MAX_LINE_LENGTH-1, stdin); + Source *source = ((ThreadParams*) params)->source; + Encoder *encoder = ((ThreadParams*) params)->encoder; + HttpHeader *header = ((ThreadParams*) params)->request; + + INFO("source thread start."); + + try { + int poll_state, rc, wc; + struct pollfd poll_fd[2]; + unsigned char buffer[BUFFFER_SIZE]; + + poll_fd[0].fd = encoder->get_fd(); + poll_fd[0].events = POLLOUT; + + poll_fd[1].fd = source->get_fd(); + poll_fd[1].events = POLLIN; + + while(!is_terminated) { + poll_state = poll(poll_fd, 2, 1000); + if (poll_state == -1) { + throw(trap("poll error.")); + } + else if (poll_state == 0) { + continue; + } + + if (poll_fd[0].revents & POLLOUT) { + rc = wc = 0; + if (poll_fd[1].revents & POLLIN) { + rc = read(source->get_fd(), buffer, BUFFFER_SIZE - 1); + if (rc == 0) { + break; + } + else if (rc > 0) { + wc = write(encoder->get_fd(), buffer, rc); + //DEBUG("write : %d", wc); + if (wc < rc) { + //DEBUG("need rewrite.. remain (%d)", rc - wc); + int retry_wc = 0; + for (int remain_len = rc - wc; rc != wc; remain_len -= retry_wc) { + poll_fd[0].revents = 0; + + poll_state = poll(poll_fd, 1, 1000); + if (poll_fd[0].revents & POLLOUT) { + retry_wc = write(encoder->get_fd(), (buffer + rc - remain_len), remain_len); + wc += retry_wc; + } + } + LOG("re-write result : %d - %d", wc, rc); + usleep(500000); + } + } + } + } + } + } + catch (const trap &e) { + ERROR("%s %s (%d)", e.what(), strerror(errno), errno); + } + INFO("source thread stop."); + + pthread_exit(0); + + return 0; } -//------------------------------------------------------------------------------- +//---------------------------------------------------------------------- -void SigHandler(int aSigNo) +int streaming_write(const char *buffer, size_t buffer_len, bool enable_log) { -#ifdef DEBUG_LOG - LOG("%d", aSigNo); -#endif - switch(aSigNo) { - case SIGINT: -#ifdef DEBUG_LOG - LOG("SIGINT detected."); -#endif -// if(hDemuxPumpThread) { -// hDemuxPumpThread->Close(); -// } -// if(hTranscodingDevice) { -// hTranscodingDevice->close(); -// } - exit(0); + if (enable_log) { + DEBUG("response data :\n%s", buffer); } + return write(1, buffer, buffer_len); +} +//---------------------------------------------------------------------- + +void signal_handler(int sig_no) +{ + INFO("signal no : %d", sig_no); + is_terminated = true; +} +//---------------------------------------------------------------------- + +void show_help() +{ + printf("usage : transtreamproxy [-h]\n"); + printf("\n"); + printf(" * To active debug mode, input NUMBER on /tmp/debug_on file. (default : warning)\n"); + printf(" NUMBER : error(1), warning(2), info(3), debug(4), log(5)\n"); + printf("\n"); + printf(" ex > echo \"4\" > /tmp/.debug_on\n"); } -//------------------------------------------------------------------------------- +//---------------------------------------------------------------------- |