assert isinstance(serviceref, ServiceReference)
- self.service_ref = serviceref
+ if serviceref.isRecordable():
+ self.service_ref = serviceref
+ else:
+ self.service_ref = ServiceReference(None)
self.eit = eit
self.dontSave = False
self.name = name
def __str__(self):
return self.ref.toString()
-
+
def getServiceName(self):
info = self.info()
return info and info.getName(self.ref) or ""
def list(self):
return self.serviceHandler.list(self.ref)
+
+ def getType(self):
+ return self.ref.type
+
+ def getPath(self):
+ return self.ref.getPath()
+
+ def getFlags(self):
+ return self.ref.flags
+
+ def isRecordable(self):
+ ref = self.ref
+ return ref.flags & eServiceReference.isGroup or (ref.type == eServiceReference.idDVB and ref.getPath() == "")
\ No newline at end of file
lib/python/Screens/Makefile
lib/python/Plugins/Makefile
lib/python/Plugins/DemoPlugins/Makefile
+lib/python/Plugins/DemoPlugins/TPMDemo/Makefile
lib/python/Plugins/DemoPlugins/TestPlugin/Makefile
lib/python/Plugins/Extensions/CutListEditor/Makefile
lib/python/Plugins/Extensions/CutListEditor/meta/Makefile
--- /dev/null
+The TPM check is currently to be considered a beta version. So please expect
+code changes in the future.
+
+If you'd like to write your own plugins and honor the efforts, Dream Multimedia
+puts into developing Enigma 2, you can protect your plugin against execution
+on Non-Dream Multimedia Hardware by implementing a TPM (Trusted Platform Module)
+check into your plugin.
+For ease of use we provide a demo plugin in lib/python/Plugins/DemoPlugins/TPMDemo.
+
+The main TPM check is implemented into the "main" function. You need to provide
+this code yourself in your plugin. So copy&paste the code into your own as well
+as the needed functions
+- bin2long
+- long2bin
+- rsa_pub1024
+- decrypt_block
+- validate_cert
+- read_random
+Importing the functions from somewhere else would spoil the security model. So
+you need to provide the code with your plugin.
+
+You can either use the given method using the main function (which will run the
+TPM check each time the plugin is called) or directly use it in the
+Plugins(**kwargs) function and not return the Plugins-list if the TPM check failes
+(which will prevent the plugin from showing up at all). You can also implement
+a warning message for all possible TPM failure scenarios.
+
init.cpp message.cpp thread.cpp \
smartptr.cpp estring.cpp connection.cpp \
filepush.cpp encoding.cpp console.cpp rawfile.cpp \
- nconfig.cpp ioprio.cpp
+ nconfig.cpp ioprio.cpp etpm.cpp
--- /dev/null
+#include <sys/socket.h>
+#include <fcntl.h>
+#include <stdbool.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <sys/un.h>
+#include <unistd.h>
+#include <openssl/bn.h>
+#include <openssl/sha.h>
+#include <lib/base/etpm.h>
+
+DEFINE_REF(eTPM);
+
+eTPM::eTPM()
+{
+ struct sockaddr_un addr;
+ unsigned char buf[8];
+ unsigned int tag, len;
+ unsigned char *val;
+
+ level2_cert_read = level3_cert_read = false;
+
+ addr.sun_family = AF_UNIX;
+ strcpy(addr.sun_path, TPMD_SOCKET);
+
+ fd = socket(PF_UNIX, SOCK_STREAM, 0);
+ if (fd < 0) {
+ eDebug("[eTPM] socket error");
+ return;
+ }
+
+ if (connect(fd, (const struct sockaddr *)&addr, SUN_LEN(&addr)) < 0) {
+ eDebug("[eTPM] connect error");
+ return;
+ }
+
+ buf[0] = TPMD_DT_LEVEL2_CERT;
+ buf[1] = TPMD_DT_LEVEL3_CERT;
+ if (!send_cmd(TPMD_CMD_GET_DATA, buf, 2))
+ {
+ return;
+ }
+
+ val = (unsigned char*)recv_cmd(&tag, &len);
+ if (val == NULL)
+ {
+ return;
+ }
+
+ parse_data(val, len);
+ free(val);
+}
+
+eTPM::~eTPM()
+{
+
+}
+
+bool eTPM::send_cmd(enum tpmd_cmd cmd, const void *data, unsigned int len)
+{
+ unsigned char buf[len + 4];
+
+ buf[0] = (cmd >> 8) & 0xff;
+ buf[1] = (cmd >> 0) & 0xff;
+ buf[2] = (len >> 8) & 0xff;
+ buf[3] = (len >> 0) & 0xff;
+ memcpy(&buf[4], data, len);
+
+ if (write(fd, buf, sizeof(buf)) != (ssize_t)sizeof(buf)) {
+ fprintf(stderr, "%s: incomplete write\n", __func__);
+ return false;
+ }
+
+ return true;
+}
+
+void* eTPM::recv_cmd(unsigned int *tag, unsigned int *len)
+{
+ unsigned char buf[4];
+ void *val;
+
+ if (read(fd, buf, 4) != 4) {
+ fprintf(stderr, "%s: incomplete read\n", __func__);
+ return NULL;
+ }
+
+ *tag = (buf[0] << 8) | buf[1];
+ *len = (buf[2] << 8) | buf[3];
+
+ val = malloc(*len);
+ if (val == NULL)
+ return NULL;
+
+ if (read(fd, val, *len) != (ssize_t)*len) {
+ fprintf(stderr, "%s: incomplete read\n", __func__);
+ free(val);
+ return NULL;
+ }
+
+ return val;
+}
+
+void eTPM::parse_data(const unsigned char *data, unsigned int datalen)
+{
+ unsigned int i;
+ unsigned int tag;
+ unsigned int len;
+ const unsigned char *val;
+
+ for (i = 0; i < datalen; i += len) {
+ tag = data[i++];
+ len = data[i++];
+ val = &data[i];
+
+ switch (tag) {
+ case TPMD_DT_LEVEL2_CERT:
+ if (len != 210)
+ break;
+ memcpy(level2_cert, val, 210);
+ level2_cert_read = true;
+ break;
+ case TPMD_DT_LEVEL3_CERT:
+ if (len != 210)
+ break;
+ memcpy(level3_cert, val, 210);
+ level3_cert_read = true;
+ break;
+ }
+ }
+}
+
+PyObject *eTPM::getCert(cert_type type)
+{
+ if (type == TPMD_DT_LEVEL2_CERT && level2_cert_read)
+ return PyBuffer_FromMemory(level2_cert, 210);
+ else if (type == TPMD_DT_LEVEL3_CERT && level3_cert_read)
+ return PyBuffer_FromMemory(level3_cert, 210);
+ return Py_None;
+
+}
+
+PyObject *eTPM::challenge(PyObject* rnd)
+{
+ if (PyString_Check(rnd) && PyString_Size(rnd) == 8)
+ {
+ char* buf = PyString_AsString(rnd);
+ if (!send_cmd(TPMD_CMD_COMPUTE_SIGNATURE, buf, 8))
+ return Py_None;
+
+ unsigned int tag, len;
+ unsigned char *val = (unsigned char*)recv_cmd(&tag, &len);
+
+ if (tag != TPMD_CMD_COMPUTE_SIGNATURE)
+ return Py_None;
+
+ return PyBuffer_FromMemory(val, len);
+ }
+ else
+ return Py_None;
+}
--- /dev/null
+#ifndef __lib_base_etpm_h
+#define __lib_base_etpm_h
+
+#include <lib/base/object.h>
+#include <lib/python/python.h>
+
+#ifndef SWIG
+#define TPMD_SOCKET "/var/run/tpmd_socket"
+#endif
+
+class eTPM: public Object, public iObject
+{
+ DECLARE_REF(eTPM);
+#ifndef SWIG
+ int fd;
+ unsigned char level2_cert[210];
+ unsigned char level3_cert[210];
+ bool level2_cert_read;
+ bool level3_cert_read;
+
+ enum tpmd_cmd {
+ TPMD_CMD_RESERVED = 0x0000,
+ TPMD_CMD_GET_DATA = 0x0001,
+ TPMD_CMD_APDU = 0x0002,
+ TPMD_CMD_COMPUTE_SIGNATURE = 0x0003,
+ TPMD_CMD_APP_CERT = 0x0004,
+ };
+
+ bool send_cmd(enum tpmd_cmd cmd, const void *data, unsigned int len);
+ void *recv_cmd(unsigned int *tag, unsigned int *len);
+ void parse_data(const unsigned char *data, unsigned int datalen);
+
+#endif
+public:
+ eTPM();
+ ~eTPM();
+
+ enum cert_type {
+ TPMD_DT_LEVEL2_CERT = 0x04,
+ TPMD_DT_LEVEL3_CERT = 0x05
+ };
+ PyObject *getCert(cert_type type);
+ PyObject *challenge(PyObject *rnd);
+};
+
+#endif // __lib_base_etpm_h
def __list__(self):
if self.type == choicesList.LIST_TYPE_LIST:
- ret = [not isinstance(x, tuple) and x or x[0] for x in self.choices]
+ ret = [not isinstance(x, tuple) and x or len(x) > 0 and x[0] or len(x) == 0 and x for x in self.choices]
else:
ret = self.choices.keys()
return ret or [""]
installdir = $(pkglibdir)/python/Plugins/DemoPlugins
-SUBDIRS = TestPlugin
+SUBDIRS = TestPlugin TPMDemo
install_PYTHON = \
- __init__.py
\ No newline at end of file
+ __init__.py
+
+
\ No newline at end of file
--- /dev/null
+installdir = $(LIBDIR)/enigma2/python/Plugins/DemoPlugins/TPMDemo
+
+install_PYTHON = \
+ __init__.py \
+ plugin.py
--- /dev/null
+Please read enigma2/doc/TPM for further instructions on how to integrate this into your own plugins.
\ No newline at end of file
--- /dev/null
+from Screens.Screen import Screen
+from Plugins.Plugin import PluginDescriptor
+from enigma import eTPM
+import sha
+
+def bin2long(s):
+ return reduce( lambda x,y:(x<<8L)+y, map(ord, s))
+
+def long2bin(l):
+ res = ""
+ for byte in range(128):
+ res += chr((l >> (1024 - (byte + 1) * 8)) & 0xff)
+ return res
+
+def rsa_pub1024(src, mod):
+ return long2bin(pow(bin2long(src), 65537, bin2long(mod)))
+
+def decrypt_block(src, mod):
+ if len(src) != 128 and len(src) != 202:
+ return None
+ dest = rsa_pub1024(src[:128], mod)
+ hash = sha.new(dest[1:107])
+ if len(src) == 202:
+ hash.update(src[131:192])
+ result = hash.digest()
+ if result == dest[107:127]:
+ return dest
+ return None
+
+def validate_cert(cert, key):
+ buf = decrypt_block(cert[8:], key)
+ if buf is None:
+ return None
+ return buf[36:107] + cert[139:196]
+
+def read_random():
+ try:
+ fd = open("/dev/urandom", "r")
+ buf = fd.read(8)
+ fd.close()
+ return buf
+ except:
+ return None
+
+def main(session, **kwargs):
+ try:
+ device = open("/proc/stb/info/model", "r").readline().strip()
+ except:
+ device = ""
+ if device != "dm7025":
+ rootkey = ['\x9f', '|', '\xe4', 'G', '\xc9', '\xb4', '\xf4', '#', '&', '\xce', '\xb3', '\xfe', '\xda', '\xc9', 'U', '`', '\xd8', '\x8c', 's', 'o', '\x90', '\x9b', '\\', 'b', '\xc0', '\x89', '\xd1', '\x8c', '\x9e', 'J', 'T', '\xc5', 'X', '\xa1', '\xb8', '\x13', '5', 'E', '\x02', '\xc9', '\xb2', '\xe6', 't', '\x89', '\xde', '\xcd', '\x9d', '\x11', '\xdd', '\xc7', '\xf4', '\xe4', '\xe4', '\xbc', '\xdb', '\x9c', '\xea', '}', '\xad', '\xda', 't', 'r', '\x9b', '\xdc', '\xbc', '\x18', '3', '\xe7', '\xaf', '|', '\xae', '\x0c', '\xe3', '\xb5', '\x84', '\x8d', '\r', '\x8d', '\x9d', '2', '\xd0', '\xce', '\xd5', 'q', '\t', '\x84', 'c', '\xa8', ')', '\x99', '\xdc', '<', '"', 'x', '\xe8', '\x87', '\x8f', '\x02', ';', 'S', 'm', '\xd5', '\xf0', '\xa3', '_', '\xb7', 'T', '\t', '\xde', '\xa7', '\xf1', '\xc9', '\xae', '\x8a', '\xd7', '\xd2', '\xcf', '\xb2', '.', '\x13', '\xfb', '\xac', 'j', '\xdf', '\xb1', '\x1d', ':', '?']
+
+ etpm = eTPM()
+ l2cert = etpm.getCert(eTPM.TPMD_DT_LEVEL2_CERT)
+ if l2cert is None:
+ print "l2cert not found"
+ return
+
+ l2key = validate_cert(l2cert, rootkey)
+ if l2key is None:
+ print "l2cert invalid"
+ return
+
+ l3cert = etpm.getCert(eTPM.TPMD_DT_LEVEL3_CERT)
+ if l3cert is None:
+ print "l3cert not found (can be fixed by running the genuine dreambox plugin and running the offered update)"
+ return
+
+ l3key = validate_cert(l3cert, l2key)
+ if l3key is None:
+ print "l3cert invalid"
+ return
+
+ rnd = read_random()
+ if rnd is None:
+ print "random error"
+ return
+ val = etpm.challenge(rnd)
+ result = decrypt_block(val, l3key)
+ if device == "dm7025" or result[80:88] == rnd:
+ print "successfully finished the tpm test"
+ # would start your plugin here
+
+def Plugins(**kwargs):
+ return [PluginDescriptor(name = "TPM Demo", description = _("A demo plugin for TPM usage."), where = PluginDescriptor.WHERE_EXTENSIONSMENU, fnc = main),
+ PluginDescriptor(name = "TPM Demo", description = _("A demo plugin for TPM usage."), icon = "plugin.png", where = PluginDescriptor.WHERE_PLUGINMENU, fnc = main)]
+
\ No newline at end of file
install_PYTHON = \
__init__.py Plugin.py
+
else:
append_when_current_valid(current, menu, (_("remove from parental protection"), boundFunction(self.removeParentalProtection, csel.getCurrentSelection())), level = 0)
if haveBouquets:
- append_when_current_valid(current, menu, (_("add service to bouquet"), self.addServiceToBouquetSelected), level = 0)
+ bouquets = self.csel.getBouquetList()
+ if bouquets is None:
+ bouquetCnt = 0
+ else:
+ bouquetCnt = len(bouquets)
+ if not inBouquet or bouquetCnt > 1:
+ append_when_current_valid(current, menu, (_("add service to bouquet"), self.addServiceToBouquetSelected), level = 0)
else:
- append_when_current_valid(current, menu, (_("add service to favourites"), self.addServiceToBouquetSelected), level = 0)
+ if not inBouquet:
+ append_when_current_valid(current, menu, (_("add service to favourites"), self.addServiceToBouquetSelected), level = 0)
else:
if current_root.getPath().find('FROM SATELLITES') != -1:
append_when_current_valid(current, menu, (_("remove selected satellite"), self.removeSatelliteServices), level = 0)
if cnt > 1: # show bouquet list
self.bsel = self.session.openWithCallback(self.bouquetSelClosed, BouquetSelector, bouquets, self.addCurrentServiceToBouquet)
elif cnt == 1: # add to only one existing bouquet
- self.addCurrentServiceToBouquet(bouquets[0][1])
+ self.addCurrentServiceToBouquet(bouquets[0][1], closeBouquetSelection = False)
def bouquetSelClosed(self, recursive):
self.bsel = None
self.csel.addMarker(marker)
self.close()
- def addCurrentServiceToBouquet(self, dest):
+ def addCurrentServiceToBouquet(self, dest, closeBouquetSelection = True):
self.csel.addServiceToBouquet(dest)
if self.bsel is not None:
self.bsel.close(True)
else:
- self.close(True) # close bouquet selection
+ self.close(closeBouquetSelection) # close bouquet selection
def removeCurrentService(self):
self.csel.removeCurrentService()
self["actions"] = ActionMap(["OkCancelActions", "TvRadioActions"],
{
- "keyTV": self.closeRadio,
- "keyRadio": self.closeRadio,
- "cancel": self.closeRadio,
+ "keyTV": self.cancel,
+ "keyRadio": self.cancel,
+ "cancel": self.cancel,
"ok": self.channelSelected,
})
self["RdsActions"].setEnabled(state)
########## RDS Radiotext / Rass Support END
- def closeRadio(self):
+ def cancel(self):
self.infobar.rds_display.onRassInteractivePossibilityChanged.remove(self.RassInteractivePossibilityChanged)
self.info.hide()
#set previous tv service
from Screens.MovieSelection import getPreferredTagEditor
from Screens.LocationBox import MovieLocationBox
from Screens.ChoiceBox import ChoiceBox
+from Screens.MessageBox import MessageBox
from RecordTimer import AFTEREVENT
-from enigma import eEPGCache
+from enigma import eEPGCache, eServiceReference
from time import localtime, mktime, time, strftime
from datetime import datetime
self.timerentry_service_ref = ServiceReference(args[0])
self.timerentry_service.setCurrentText(self.timerentry_service_ref.getServiceName())
self["config"].invalidate(self.channelEntry)
-
+
def getTimestamp(self, date, mytime):
d = localtime(date)
dt = datetime(d.tm_year, d.tm_mon, d.tm_mday, mytime[0], mytime[1])
end += 86400
return begin, end
- def keyGo(self):
+ def selectChannelSelector(self, *args):
+ self.session.openWithCallback(
+ self.finishedChannelSelectionCorrection,
+ ChannelSelection.SimpleChannelSelection,
+ _("Select channel to record from")
+ )
+
+ def finishedChannelSelectionCorrection(self, *args):
+ if args:
+ self.finishedChannelSelection(*args)
+ self.keyGo()
+
+ def keyGo(self, result = None):
+ if not self.timerentry_service_ref.isRecordable():
+ self.session.openWithCallback(self.selectChannelSelector, MessageBox, _("You didn't select a channel to record from."), MessageBox.TYPE_ERROR)
+ return
self.timer.name = self.timerentry_name.value
self.timer.description = self.timerentry_description.value
self.timer.justplay = self.timerentry_justplay.value == "zap"
def keyNumberGlobal(self, number):
if (self.wizard[self.currStep]["config"]["screen"] != None):
self.configInstance.keyNumberGlobal(number)
+ elif (self.wizard[self.currStep]["config"]["type"] == "dynamic"):
+ self["config"].handleKey(KEY_0 + number)
def keyGotAscii(self):
if (self.wizard[self.currStep]["config"]["screen"] != None):
self["config"].handleKey(KEY_ASCII)
+ elif (self.wizard[self.currStep]["config"]["type"] == "dynamic"):
+ self["config"].handleKey(KEY_ASCII)
def left(self):
self.resetCounter()
return
if self.lang == 'de_DE':
- self.mapping.append (u".,?'+\"0-()@/:_$!") # 0
+ self.mapping.append (u".,?'+\"0-()@/:_$!=") # 0
self.mapping.append (u" 1") # 1
self.mapping.append (u"aäbc2AÄBC") # 2
self.mapping.append (u"def3DEF") # 3
self.mapping.append (u"tuüv8TUÜV") # 8
self.mapping.append (u"wxyz9WXYZ") # 9
elif self.lang == 'es_ES':
- self.mapping.append (u".,?'+\"0-()@/:_$!") # 0
+ self.mapping.append (u".,?'+\"0-()@/:_$!=") # 0
self.mapping.append (u" 1") # 1
self.mapping.append (u"abcáà2ABCÁÀ") # 2
self.mapping.append (u"deéèf3DEFÉÈ") # 3
self.mapping.append (u"tuvúù8TUVÚÙ") # 8
self.mapping.append (u"wxyz9WXYZ") # 9
if self.lang in ('sv_SE', 'fi_FI'):
- self.mapping.append (u".,?'+\"0-()@/:_$!") # 0
+ self.mapping.append (u".,?'+\"0-()@/:_$!=") # 0
self.mapping.append (u" 1") # 1
self.mapping.append (u"abcåä2ABCÅÄ") # 2
self.mapping.append (u"defé3DEFÉ") # 3
self.mapping.append (u"tuv8TUV") # 8
self.mapping.append (u"wxyz9WXYZ") # 9
else:
- self.mapping.append (u".,?'+\"0-()@/:_$!") # 0
+ self.mapping.append (u".,?'+\"0-()@/:_$!=") # 0
self.mapping.append (u" 1") # 1
self.mapping.append (u"abc2ABC") # 2
self.mapping.append (u"def3DEF") # 3
#include <lib/base/ebase.h>
#include <lib/base/smartptr.h>
#include <lib/base/eerror.h>
+#include <lib/base/etpm.h>
#include <lib/base/nconfig.h>
#include <lib/base/message.h>
#include <lib/driver/rc.h>
%immutable ePythonMessagePump::recv_msg;
%immutable eDVBLocalTimeHandler::m_timeUpdated;
%include <lib/base/message.h>
+%include <lib/base/etpm.h>
%include <lib/base/nconfig.h>
%include <lib/driver/rc.h>
%include <lib/gdi/fb.h>