From fe619dbed1c9bf14a0d442cc16d7e968769bbcc0 Mon Sep 17 00:00:00 2001 From: hschang Date: Thu, 6 Nov 2014 11:54:31 +0900 Subject: [PATCH] Update httpstream - added chunked transfer support. - connect to TS stream in seperate thread. - add basic http authentication support. - add redirect and playlist support. --- configure.ac | 2 +- lib/base/httpstream.cpp | 260 +++++++++++++++++++++++++++++++++++++++++++++--- lib/base/httpstream.h | 20 +++- lib/base/itssource.h | 1 + lib/dvb/dvb.cpp | 4 +- 5 files changed, 267 insertions(+), 20 deletions(-) diff --git a/configure.ac b/configure.ac index 332fc7a..6063799 100644 --- a/configure.ac +++ b/configure.ac @@ -30,7 +30,7 @@ TUXBOX_APPS_DVB AM_CONDITIONAL(HAVE_GIT_DIR, test -d "$srcdir/.git") AM_CONDITIONAL(HAVE_FAKE_GIT_DIR, test -f "$srcdir/.git/last_commit_info") -PKG_CHECK_MODULES(BASE, [freetype2 fribidi gstreamer-0.10 gstreamer-pbutils-0.10 libdvbsi++ libpng libxml-2.0 sigc++-1.2]) +PKG_CHECK_MODULES(BASE, [freetype2 fribidi gstreamer-0.10 gstreamer-pbutils-0.10 libdvbsi++ libpng libxml-2.0 sigc++-1.2 libssl libcrypto]) PKG_CHECK_MODULES(LIBDDVD, libdreamdvd, HAVE_LIBDDVD="yes", HAVE_LIBDDVD="no") AM_CONDITIONAL(HAVE_LIBDDVD, test "$HAVE_LIBDDVD" = "yes") diff --git a/lib/base/httpstream.cpp b/lib/base/httpstream.cpp index 343109e..8f55425 100644 --- a/lib/base/httpstream.cpp +++ b/lib/base/httpstream.cpp @@ -1,4 +1,5 @@ #include +#include #include #include @@ -8,14 +9,23 @@ DEFINE_REF(eHttpStream); eHttpStream::eHttpStream() { streamSocket = -1; + connectionStatus = FAILED; + isChunked = false; + currentChunkSize = 0; + partialPktSz = 0; + tmpBufSize = 32; + tmpBuf = (char*)malloc(tmpBufSize); + packetSize = 188; } eHttpStream::~eHttpStream() { + free(tmpBuf); + kill(true); close(); } -int eHttpStream::open(const char *url) +int eHttpStream::openUrl(const std::string &url, std::string &newurl) { int port; std::string hostname; @@ -27,27 +37,51 @@ int eHttpStream::open(const char *url) char proto[100]; int statuscode = 0; char statusmsg[100]; + bool playlist = false; + bool contenttypeparsed = false; close(); int pathindex = uri.find("/", 7); - if (pathindex > 0) + if (pathindex > 0) { hostname = uri.substr(7, pathindex - 7); uri = uri.substr(pathindex, uri.length() - pathindex); - } - else + } + else { hostname = uri.substr(7, uri.length() - 7); - uri = ""; + uri = "/"; + } + int authenticationindex = hostname.find("@"); + if (authenticationindex > 0) + { + BIO *mbio, *b64bio, *bio; + char *p = (char*)NULL; + int length = 0; + authorizationData = hostname.substr(0, authenticationindex); + hostname = hostname.substr(authenticationindex + 1); + mbio = BIO_new(BIO_s_mem()); + b64bio = BIO_new(BIO_f_base64()); + bio = BIO_push(b64bio, mbio); + BIO_write(bio, authorizationData.c_str(), authorizationData.length()); + BIO_flush(bio); + length = BIO_ctrl(mbio, BIO_CTRL_INFO, 0, (char*)&p); + authorizationData = ""; + if (p && length > 0) + { + /* base64 output contains a linefeed, which we ignore */ + authorizationData.append(p, length - 1); + } + BIO_free_all(bio); } int customportindex = hostname.find(":"); - if (customportindex > 0) + if (customportindex > 0) { port = atoi(hostname.substr(customportindex + 1, hostname.length() - customportindex - 1).c_str()); hostname = hostname.substr(0, customportindex); - } - else if (customportindex == 0) + } + else if (customportindex == 0) { port = atoi(hostname.substr(1, hostname.length() - 1).c_str()); hostname = "localhost"; @@ -57,41 +91,142 @@ int eHttpStream::open(const char *url) port = 80; } streamSocket = connect(hostname.c_str(), port, 10); - if (streamSocket < 0) goto error; + if (streamSocket < 0) + goto error; request = "GET "; request.append(uri).append(" HTTP/1.1\r\n"); request.append("Host: ").append(hostname).append("\r\n"); + request.append("User-Agent: ").append("Enigma2").append("\r\n"); + if (authorizationData != "") + { + request.append("Authorization: Basic ").append(authorizationData).append("\r\n"); + } + request.append("Accept: */*\r\n"); request.append("Connection: close\r\n"); request.append("\r\n"); + writeAll(streamSocket, request.c_str(), request.length()); linebuf = (char*)malloc(buflen); result = readLine(streamSocket, &linebuf, &buflen); - if (result <= 0) goto error; + if (result <= 0) + goto error; result = sscanf(linebuf, "%99s %d %99s", proto, &statuscode, statusmsg); - if (result != 3 || statuscode != 200) + if (result != 3 || (statuscode != 200 && statuscode != 206 && statuscode != 302 && + statuscode != 301 && statuscode != 303 && statuscode != 307 && statuscode != 308)) { - eDebug("eHttpStream::open: wrong http response code: %d", statuscode); + eDebug("%s: wrong http response code: %d", __FUNCTION__, statuscode); goto error; } - while (result > 0) + + while (1) { result = readLine(streamSocket, &linebuf, &buflen); + if (!contenttypeparsed) + { + char contenttype[33]; + if (sscanf(linebuf, "Content-Type: %32s", contenttype) == 1) + { + contenttypeparsed = true; + if (!strcasecmp(contenttype, "application/text") + || !strcasecmp(contenttype, "audio/x-mpegurl") + || !strcasecmp(contenttype, "audio/mpegurl") + || !strcasecmp(contenttype, "application/m3u")) + { + /* assume we'll get a playlist, some text file containing a stream url */ + playlist = true; + } + continue; + } + } + + if (playlist && !strncasecmp(linebuf, "http://", 7)) + { + newurl = linebuf; + eDebug("%s: playlist entry: %s", __FUNCTION__, newurl.c_str()); + break; + } + + if (((statuscode == 301) || (statuscode == 302) || (statuscode == 303) || (statuscode == 307) || (statuscode == 308)) && + strncasecmp(linebuf, "location: ", 10) == 0) + { + newurl = &linebuf[10]; + eDebug("%s: redirecting to: %s", __FUNCTION__, newurl.c_str()); + break; + } + + if (((statuscode == 200) || (statuscode == 206)) && !strncasecmp(linebuf, "transfer-encoding: chunked", strlen("transfer-encoding: chunked"))) + { + isChunked = true; + } + + if (!playlist && result == 0) + break; + + if (result < 0) + break; } free(linebuf); return 0; error: - eDebug("eHttpStream::open failed"); + eDebug("%s failed", __FUNCTION__); free(linebuf); close(); return -1; } +int eHttpStream::open(const char *url) +{ + streamUrl = url; + /* + * We're in gui thread context here, and establishing + * a connection might block for up to 10 seconds. + * Spawn a new thread to establish the connection. + */ + connectionStatus = BUSY; + eDebug("eHttpStream::Start thread"); + run(); + return 0; +} + +void eHttpStream::thread() +{ + hasStarted(); + std::string currenturl, newurl; + currenturl = streamUrl; + for (unsigned int i = 0; i < 5; i++) + { + if (openUrl(currenturl, newurl) < 0) + { + /* connection failed */ + eDebug("eHttpStream::Thread end NO connection"); + connectionStatus = FAILED; + return; + } + if (newurl == "") + { + /* we have a valid stream connection */ + eDebug("eHttpStream::Thread end connection"); + connectionStatus = CONNECTED; + return; + } + /* switch to new url */ + close(); + currenturl = newurl; + newurl = ""; + } + /* too many redirect / playlist levels */ + eDebug("eHttpStream::Thread end NO connection"); + connectionStatus = FAILED; + return; +} + + off_t eHttpStream::lseek(off_t offset, int whence) { return (off_t)-1; @@ -108,13 +243,108 @@ int eHttpStream::close() return retval; } +ssize_t eHttpStream::syncNextRead(void *buf, ssize_t length) +{ + unsigned char *b = (unsigned char*)buf; + unsigned char *e = b + length; + partialPktSz = 0; + + if (*(char*)buf != 0x47) + { + // the current read is not aligned + // get the head position of the last packet + // so we'll try to align the next read + while (e != b && *e != 0x47) e--; + } + else + { + // the current read is aligned + // get the last incomplete packet position + e -= length % packetSize; + } + + if (e != b && e != (b + length)) + { + partialPktSz = (b + length) - e; + // if the last packet is read partially save it to align the next read + if (partialPktSz > 0 && partialPktSz < packetSize) + { + memcpy(partialPkt, e, partialPktSz); + } + } + return (length - partialPktSz); +} + +ssize_t eHttpStream::httpChunkedRead(void *buf, size_t count) +{ + ssize_t ret = -1; + size_t total_read = partialPktSz; + + // write partial packet from the previous read + if (partialPktSz > 0) + { + memcpy(buf, partialPkt, partialPktSz); + partialPktSz = 0; + } + + if (!isChunked) + { + ret = timedRead(streamSocket,((char*)buf) + total_read , count - total_read, 5000, 100); + if (ret > 0) + { + ret += total_read; + ret = syncNextRead(buf, ret); + } + } + else + { + while (total_read < count) + { + if (0 == currentChunkSize) + { + do + { + ret = readLine(streamSocket, &tmpBuf, &tmpBufSize); + if (ret < 0) return -1; + } while (!*tmpBuf && ret > 0); /* skip CR LF from last chunk */ + if (ret == 0) + break; + currentChunkSize = strtol(tmpBuf, NULL, 16); + if (currentChunkSize == 0) return -1; + } + + size_t to_read = count - total_read; + if (currentChunkSize < to_read) + to_read = currentChunkSize; + + // do not wait too long if we have something in the buffer already + ret = timedRead(streamSocket, ((char*)buf) + total_read, to_read, ((total_read)? 100 : 5000), 100); + if (ret <= 0) + break; + currentChunkSize -= ret; + total_read += ret; + } + if (total_read > 0) + { + ret = syncNextRead(buf, total_read); + } + } + return ret; +} + ssize_t eHttpStream::read(off_t offset, void *buf, size_t count) { - return timedRead(streamSocket, buf, count, 5000, 500); + if (connectionStatus == BUSY) + return 0; + else if (connectionStatus == FAILED) + return -1; + return httpChunkedRead(buf, count); } int eHttpStream::valid() { + if (connectionStatus == BUSY) + return 0; return streamSocket >= 0; } diff --git a/lib/base/httpstream.h b/lib/base/httpstream.h index f02bb24..228d2b0 100644 --- a/lib/base/httpstream.h +++ b/lib/base/httpstream.h @@ -5,19 +5,35 @@ #include #include #include +#include -class eHttpStream: public iTsSource, public eSocketBase, public Object +class eHttpStream: public iTsSource, public eSocketBase, public Object, public eThread { DECLARE_REF(eHttpStream); int streamSocket; + enum { BUSY, CONNECTED, FAILED } connectionStatus; + bool isChunked; + size_t currentChunkSize; + std::string streamUrl; + std::string authorizationData; + char partialPkt[192]; + size_t partialPktSz; + char* tmpBuf; + size_t tmpBufSize; + int packetSize; + + int openUrl(const std::string &url, std::string &newurl); + void thread(); + ssize_t httpChunkedRead(void *buf, size_t count); + ssize_t syncNextRead(void *buf, ssize_t length); /* iTsSource */ off_t lseek(off_t offset, int whence); ssize_t read(off_t offset, void *buf, size_t count); off_t length(); int valid(); - + bool isStream() { return true; } public: eHttpStream(); ~eHttpStream(); diff --git a/lib/base/itssource.h b/lib/base/itssource.h index 91167ff..adaeb4c 100644 --- a/lib/base/itssource.h +++ b/lib/base/itssource.h @@ -14,6 +14,7 @@ public: virtual off_t length()=0; virtual int valid()=0; + virtual bool isStream() { return false; } }; #endif diff --git a/lib/dvb/dvb.cpp b/lib/dvb/dvb.cpp index ee68253..e32e5f8 100755 --- a/lib/dvb/dvb.cpp +++ b/lib/dvb/dvb.cpp @@ -1832,7 +1832,7 @@ RESULT eDVBChannel::playSource(ePtr &source, const char *streaminfo_f m_pvr_thread = 0; } - if (!source->valid()) + if (!source->valid() && !source->isStream()) { eDebug("PVR source is not valid!"); return -ENOENT; @@ -1875,7 +1875,7 @@ RESULT eDVBChannel::playSource(ePtr &source, const char *streaminfo_f m_pvr_thread = new eDVBChannelFilePush(); m_pvr_thread->enablePVRCommit(1); /* If the source specifies a length, it's a file. If not, it's a stream */ - m_pvr_thread->setStreamMode(source->length() <= 0); + m_pvr_thread->setStreamMode(source->isStream()); m_pvr_thread->setScatterGather(this); m_event(this, evtPreStart); -- 2.7.4