Update httpstream
authorhschang <chang@dev3>
Thu, 6 Nov 2014 02:54:31 +0000 (11:54 +0900)
committerhschang <chang@dev3>
Fri, 7 Nov 2014 01:44:52 +0000 (10:44 +0900)
- added chunked transfer support.
- connect to TS stream in seperate thread.
- add basic http authentication support.
- add redirect and playlist support.

configure.ac
lib/base/httpstream.cpp
lib/base/httpstream.h
lib/base/itssource.h
lib/dvb/dvb.cpp

index 332fc7a..6063799 100644 (file)
@@ -30,7 +30,7 @@ TUXBOX_APPS_DVB
 AM_CONDITIONAL(HAVE_GIT_DIR, test -d "$srcdir/.git")
 AM_CONDITIONAL(HAVE_FAKE_GIT_DIR, test -f "$srcdir/.git/last_commit_info")
 
-PKG_CHECK_MODULES(BASE, [freetype2 fribidi gstreamer-0.10 gstreamer-pbutils-0.10 libdvbsi++ libpng libxml-2.0 sigc++-1.2])
+PKG_CHECK_MODULES(BASE, [freetype2 fribidi gstreamer-0.10 gstreamer-pbutils-0.10 libdvbsi++ libpng libxml-2.0 sigc++-1.2 libssl libcrypto])
 PKG_CHECK_MODULES(LIBDDVD, libdreamdvd, HAVE_LIBDDVD="yes", HAVE_LIBDDVD="no")
 AM_CONDITIONAL(HAVE_LIBDDVD, test "$HAVE_LIBDDVD" = "yes")
 
index 343109e..8f55425 100644 (file)
@@ -1,4 +1,5 @@
 #include <cstdio>
+#include <openssl/evp.h>
 
 #include <lib/base/httpstream.h>
 #include <lib/base/eerror.h>
@@ -8,14 +9,23 @@ DEFINE_REF(eHttpStream);
 eHttpStream::eHttpStream()
 {
        streamSocket = -1;
+       connectionStatus = FAILED;
+       isChunked = false;
+       currentChunkSize = 0;
+       partialPktSz = 0;
+       tmpBufSize = 32;
+       tmpBuf = (char*)malloc(tmpBufSize);
+       packetSize = 188;
 }
 
 eHttpStream::~eHttpStream()
 {
+       free(tmpBuf);
+       kill(true);
        close();
 }
 
-int eHttpStream::open(const char *url)
+int eHttpStream::openUrl(const std::string &url, std::string &newurl)
 {
        int port;
        std::string hostname;
@@ -27,27 +37,51 @@ int eHttpStream::open(const char *url)
        char proto[100];
        int statuscode = 0;
        char statusmsg[100];
+       bool playlist = false;
+       bool contenttypeparsed = false;
 
        close();
 
        int pathindex = uri.find("/", 7);
-       if (pathindex > 0) 
+       if (pathindex > 0)
        {
                hostname = uri.substr(7, pathindex - 7);
                uri = uri.substr(pathindex, uri.length() - pathindex);
-       } 
-       else 
+       }
+       else
        {
                hostname = uri.substr(7, uri.length() - 7);
-               uri = "";
+               uri = "/";
+       }
+       int authenticationindex = hostname.find("@");
+       if (authenticationindex > 0)
+       {
+               BIO *mbio, *b64bio, *bio;
+               char *p = (char*)NULL;
+               int length = 0;
+               authorizationData = hostname.substr(0, authenticationindex);
+               hostname = hostname.substr(authenticationindex + 1);
+               mbio = BIO_new(BIO_s_mem());
+               b64bio = BIO_new(BIO_f_base64());
+               bio = BIO_push(b64bio, mbio);
+               BIO_write(bio, authorizationData.c_str(), authorizationData.length());
+               BIO_flush(bio);
+               length = BIO_ctrl(mbio, BIO_CTRL_INFO, 0, (char*)&p);
+               authorizationData = "";
+               if (p && length > 0)
+               {
+                       /* base64 output contains a linefeed, which we ignore */
+                       authorizationData.append(p, length - 1);
+               }
+               BIO_free_all(bio);
        }
        int customportindex = hostname.find(":");
-       if (customportindex > 0) 
+       if (customportindex > 0)
        {
                port = atoi(hostname.substr(customportindex + 1, hostname.length() - customportindex - 1).c_str());
                hostname = hostname.substr(0, customportindex);
-       } 
-       else if (customportindex == 0) 
+       }
+       else if (customportindex == 0)
        {
                port = atoi(hostname.substr(1, hostname.length() - 1).c_str());
                hostname = "localhost";
@@ -57,41 +91,142 @@ int eHttpStream::open(const char *url)
                port = 80;
        }
        streamSocket = connect(hostname.c_str(), port, 10);
-       if (streamSocket < 0) goto error;
+       if (streamSocket < 0)
+               goto error;
 
        request = "GET ";
        request.append(uri).append(" HTTP/1.1\r\n");
        request.append("Host: ").append(hostname).append("\r\n");
+       request.append("User-Agent: ").append("Enigma2").append("\r\n");
+       if (authorizationData != "")
+       {
+               request.append("Authorization: Basic ").append(authorizationData).append("\r\n");
+       }
+
        request.append("Accept: */*\r\n");
        request.append("Connection: close\r\n");
        request.append("\r\n");
+
        writeAll(streamSocket, request.c_str(), request.length());
 
        linebuf = (char*)malloc(buflen);
 
        result = readLine(streamSocket, &linebuf, &buflen);
-       if (result <= 0) goto error;
+       if (result <= 0)
+               goto error;
 
        result = sscanf(linebuf, "%99s %d %99s", proto, &statuscode, statusmsg);
-       if (result != 3 || statuscode != 200) 
+       if (result != 3 || (statuscode != 200 && statuscode != 206 && statuscode != 302 && 
+                       statuscode != 301 && statuscode != 303 && statuscode != 307 && statuscode != 308))
        {
-               eDebug("eHttpStream::open: wrong http response code: %d", statuscode);
+               eDebug("%s: wrong http response code: %d", __FUNCTION__, statuscode);
                goto error;
        }
-       while (result > 0)
+
+       while (1)
        {
                result = readLine(streamSocket, &linebuf, &buflen);
+               if (!contenttypeparsed)
+               {
+                       char contenttype[33];
+                       if (sscanf(linebuf, "Content-Type: %32s", contenttype) == 1)
+                       {
+                               contenttypeparsed = true;
+                               if (!strcasecmp(contenttype, "application/text")
+                               || !strcasecmp(contenttype, "audio/x-mpegurl")
+                               || !strcasecmp(contenttype, "audio/mpegurl")
+                               || !strcasecmp(contenttype, "application/m3u"))
+                               {
+                                       /* assume we'll get a playlist, some text file containing a stream url */
+                                       playlist = true;
+                               }
+                               continue;
+                       }
+               }
+
+               if (playlist && !strncasecmp(linebuf, "http://", 7))
+               {
+                       newurl = linebuf;
+                       eDebug("%s: playlist entry: %s", __FUNCTION__, newurl.c_str());
+                       break;
+               }
+
+               if (((statuscode == 301) || (statuscode == 302) || (statuscode == 303) || (statuscode == 307) || (statuscode == 308)) &&
+                               strncasecmp(linebuf, "location: ", 10) == 0)
+               {
+                       newurl = &linebuf[10];
+                       eDebug("%s: redirecting to: %s", __FUNCTION__, newurl.c_str());
+                       break;
+               }
+
+               if (((statuscode == 200) || (statuscode == 206)) && !strncasecmp(linebuf, "transfer-encoding: chunked", strlen("transfer-encoding: chunked")))
+               {
+                       isChunked = true;
+               }
+
+               if (!playlist && result == 0)
+                       break;
+
+               if (result < 0)
+                       break;
        }
 
        free(linebuf);
        return 0;
 error:
-       eDebug("eHttpStream::open failed");
+       eDebug("%s failed", __FUNCTION__);
        free(linebuf);
        close();
        return -1;
 }
 
+int eHttpStream::open(const char *url)
+{
+       streamUrl = url;
+       /*
+        * We're in gui thread context here, and establishing
+        * a connection might block for up to 10 seconds.
+        * Spawn a new thread to establish the connection.
+        */
+       connectionStatus = BUSY;
+       eDebug("eHttpStream::Start thread");
+       run();
+       return 0;
+}
+
+void eHttpStream::thread()
+{
+       hasStarted();
+       std::string currenturl, newurl;
+       currenturl = streamUrl;
+       for (unsigned int i = 0; i < 5; i++)
+       {
+               if (openUrl(currenturl, newurl) < 0)
+               {
+                       /* connection failed */
+                       eDebug("eHttpStream::Thread end NO connection");
+                       connectionStatus = FAILED;
+                       return;
+               }
+               if (newurl == "")
+               {
+                       /* we have a valid stream connection */
+                       eDebug("eHttpStream::Thread end connection");
+                       connectionStatus = CONNECTED;
+                       return;
+               }
+               /* switch to new url */
+               close();
+               currenturl = newurl;
+               newurl = "";
+       }
+       /* too many redirect / playlist levels */
+       eDebug("eHttpStream::Thread end NO connection");
+       connectionStatus = FAILED;
+       return;
+}
+
+
 off_t eHttpStream::lseek(off_t offset, int whence)
 {
        return (off_t)-1;
@@ -108,13 +243,108 @@ int eHttpStream::close()
        return retval;
 }
 
+ssize_t eHttpStream::syncNextRead(void *buf, ssize_t length)
+{
+       unsigned char *b = (unsigned char*)buf;
+       unsigned char *e = b + length;
+       partialPktSz = 0;
+
+       if (*(char*)buf != 0x47)
+       {
+               // the current read is not aligned
+               // get the head position of the last packet
+               // so we'll try to align the next read
+               while (e != b && *e != 0x47) e--;
+       }
+       else
+       {
+               // the current read is aligned
+               // get the last incomplete packet position
+               e -= length % packetSize;
+       }
+
+       if (e != b && e != (b + length))
+       {
+               partialPktSz = (b + length) - e;
+               // if the last packet is read partially save it to align the next read
+               if (partialPktSz > 0 && partialPktSz < packetSize)
+               {
+                       memcpy(partialPkt, e, partialPktSz);
+               }
+       }
+       return (length - partialPktSz);
+}
+
+ssize_t eHttpStream::httpChunkedRead(void *buf, size_t count)
+{
+       ssize_t ret = -1;
+       size_t total_read = partialPktSz;
+
+       // write partial packet from the previous read
+       if (partialPktSz > 0)
+       {
+               memcpy(buf, partialPkt, partialPktSz);
+               partialPktSz = 0;
+       }
+
+       if (!isChunked)
+       {
+               ret = timedRead(streamSocket,((char*)buf) + total_read , count - total_read, 5000, 100);
+               if (ret > 0)
+               {
+                       ret += total_read;
+                       ret = syncNextRead(buf, ret);
+               }
+       }
+       else
+       {
+               while (total_read < count)
+               {
+                       if (0 == currentChunkSize)
+                       {
+                               do
+                               {
+                                       ret = readLine(streamSocket, &tmpBuf, &tmpBufSize);
+                                       if (ret < 0) return -1;
+                               } while (!*tmpBuf && ret > 0); /* skip CR LF from last chunk */
+                               if (ret == 0)
+                                       break;
+                               currentChunkSize = strtol(tmpBuf, NULL, 16);
+                               if (currentChunkSize == 0) return -1;
+                       }
+
+                       size_t to_read = count - total_read;
+                       if (currentChunkSize < to_read)
+                               to_read = currentChunkSize;
+
+                       // do not wait too long if we have something in the buffer already
+                       ret = timedRead(streamSocket, ((char*)buf) + total_read, to_read, ((total_read)? 100 : 5000), 100);
+                       if (ret <= 0)
+                               break;
+                       currentChunkSize -= ret;
+                       total_read += ret;
+               }
+               if (total_read > 0)
+               {
+                       ret = syncNextRead(buf, total_read);
+               }
+       }
+       return ret;
+}
+
 ssize_t eHttpStream::read(off_t offset, void *buf, size_t count)
 {
-       return timedRead(streamSocket, buf, count, 5000, 500);
+       if (connectionStatus == BUSY)
+               return 0;
+       else if (connectionStatus == FAILED)
+               return -1;
+       return httpChunkedRead(buf, count);
 }
 
 int eHttpStream::valid()
 {
+       if (connectionStatus == BUSY)
+               return 0;
        return streamSocket >= 0;
 }
 
index f02bb24..228d2b0 100644 (file)
@@ -5,19 +5,35 @@
 #include <lib/base/ebase.h>
 #include <lib/base/itssource.h>
 #include <lib/base/socketbase.h>
+#include <lib/base/thread.h>
 
-class eHttpStream: public iTsSource, public eSocketBase, public Object
+class eHttpStream: public iTsSource, public eSocketBase, public Object, public eThread
 {
        DECLARE_REF(eHttpStream);
 
        int streamSocket;
+       enum { BUSY, CONNECTED, FAILED } connectionStatus;
+       bool isChunked;
+       size_t currentChunkSize;
+       std::string streamUrl;
+       std::string authorizationData;
+       char partialPkt[192];
+       size_t partialPktSz;
+       char* tmpBuf;
+       size_t tmpBufSize;
+       int packetSize;
+
+       int openUrl(const std::string &url, std::string &newurl);
+       void thread();
+       ssize_t httpChunkedRead(void *buf, size_t count);
+       ssize_t syncNextRead(void *buf, ssize_t length);
 
        /* iTsSource */
        off_t lseek(off_t offset, int whence);
        ssize_t read(off_t offset, void *buf, size_t count);
        off_t length();
        int valid();
-
+       bool isStream() { return true; }
 public:
        eHttpStream();
        ~eHttpStream();
index 91167ff..adaeb4c 100644 (file)
@@ -14,6 +14,7 @@ public:
 
        virtual off_t length()=0;
        virtual int valid()=0;
+       virtual bool isStream() { return false; }
 };
 
 #endif
index ee68253..e32e5f8 100755 (executable)
@@ -1832,7 +1832,7 @@ RESULT eDVBChannel::playSource(ePtr<iTsSource> &source, const char *streaminfo_f
                m_pvr_thread = 0;
        }
 
-       if (!source->valid())
+       if (!source->valid() && !source->isStream())
        {
                eDebug("PVR source is not valid!");
                return -ENOENT;
@@ -1875,7 +1875,7 @@ RESULT eDVBChannel::playSource(ePtr<iTsSource> &source, const char *streaminfo_f
        m_pvr_thread = new eDVBChannelFilePush();
        m_pvr_thread->enablePVRCommit(1);
        /* If the source specifies a length, it's a file. If not, it's a stream */
-       m_pvr_thread->setStreamMode(source->length() <= 0);
+       m_pvr_thread->setStreamMode(source->isStream());
        m_pvr_thread->setScatterGather(this);
 
        m_event(this, evtPreStart);