1 from twisted.internet.protocol import DatagramProtocol
2 from twisted.internet import reactor
3 from struct import pack, unpack
4 from hashlib import md5
6 from Screens.MessageBox import MessageBox
7 from Tools import Notifications
9 from GrowleeConnection import emergencyDisable
10 from . import NOTIFICATIONID
14 class GrowlTalk(DatagramProtocol):
17 def __init__(self, host):
21 self.addr = (ip, GROWL_UDP_PORT)
22 if self.host.enable_outgoing.value:
26 7, # length of application name == len("growlee")
28 1, # one of them default
30 p += "growlee" # application name
32 32, # length of first notification type name
34 p += "Notifications from your Dreambox" # first notification type name
35 p += "\x00" # index of default notifications
37 password = self.host.password.value
41 checksum.update(password)
42 p += checksum.digest()
44 self.transport.write(p, self.addr)
46 def noIP(self, error):
47 print "--------------------------------", error
50 def startProtocol(self):
51 reactor.resolve(self.host.address.value).addCallback(self.gotIP).addErrback(self.noIP)
53 def sendNotification(self, title='No title.', description='No description.', flags=0):
54 if not self.transport or not self.addr or not self.host.enable_outgoing.value:
60 flags, # 3-bit signed priority, 1 bit sticky in rightmost nibble
61 32, # len("Notifications from your Dreambox")
66 p += "Notifications from your Dreambox"
71 password = self.host.password.value
75 checksum.update(password)
76 p += checksum.digest()
78 self.transport.write(p, self.addr)
80 def datagramReceived(self, data, addr):
81 if not self.host.enable_incoming.value:
88 # ver == GROWL_PROTOCOL_VERSION
92 # type == GROWL_TYPE_NOTIFICATION
95 password = self.host.password.value
97 checksum.update(data[:-16])
99 checksum.update(password)
100 if digest != checksum.digest():
103 nlen, tlen, dlen, alen = unpack("!HHHH",str(data[4:12]))
104 notification, title, description = unpack("%ds%ds%ds" % (nlen, tlen, dlen), data[12:Len-alen-16])
105 # type == GROWL_TYPE_NOTIFICATION_NOAUTH
106 elif data[1] == '\x05':
107 nlen, tlen, dlen, alen = unpack("!HHHH",str(data[4:12]))
108 notification, title, description = unpack("%ds%ds%ds" % (nlen, tlen, dlen), data[12:Len-alen])
110 # don't handle any other packet yet
113 Notifications.AddNotificationWithID(
116 text = title + '\n' + description,
117 type = MessageBox.TYPE_INFO,
119 close_on_any_key = True,
122 class GrowlTalkAbstraction:
123 def __init__(self, host):
124 self.growltalk = GrowlTalk(host)
125 self.serverPort = reactor.listenUDP(GROWL_UDP_PORT, self.growltalk)
127 def sendNotification(self, title='No title.', description='No description.', priority=-1, timeout=-1):
129 flags = 8 + (-priority << 1)
131 flags = priority << 1
133 # NOTE: sticky didn't work in any of my tests, but let's assume this is my configurations fault
137 self.growltalk.sendNotification(title=title, description=description, flags=flags)
140 return self.serverPort.stopListening()