1 from twisted.internet.protocol import DatagramProtocol
2 from twisted.internet import reactor
3 from struct import pack, unpack
4 from hashlib import md5
6 from Screens.MessageBox import MessageBox
7 from Tools import Notifications
8 from Components.config import config
10 from GrowleeConnection import emergencyDisable
11 from . import NOTIFICATIONID
15 class GrowlTalk(DatagramProtocol):
19 self.addr = (ip, GROWL_UDP_PORT)
20 if config.plugins.growlee.enable_outgoing.value:
24 7, # length of application name == len("growlee")
26 1, # one of them default
28 p += "growlee" # application name
30 32, # length of first notification type name
32 p += "Notifications from your Dreambox" # first notification type name
33 p += "\x00" # index of default notifications
35 password = config.plugins.growlee.password.value
39 checksum.update(password)
40 p += checksum.digest()
42 self.transport.write(p, self.addr)
44 def noIP(self, error):
45 print "--------------------------------", error
48 def startProtocol(self):
49 reactor.resolve(config.plugins.growlee.address.value).addCallback(self.gotIP).addErrback(self.noIP)
51 def sendNotification(self, title='No title.', description='No description.', flags=0):
52 if not self.transport or not self.addr or not config.plugins.growlee.enable_outgoing.value:
58 flags, # 3-bit signed priority, 1 bit sticky in rightmost nibble
59 32, # len("Notifications from your Dreambox")
64 p += "Notifications from your Dreambox"
69 password = config.plugins.growlee.password.value
73 checksum.update(password)
74 p += checksum.digest()
76 self.transport.write(p, self.addr)
78 def datagramReceived(self, data, addr):
79 if not config.plugins.growlee.enable_incoming.value:
86 # ver == GROWL_PROTOCOL_VERSION
90 # type == GROWL_TYPE_NOTIFICATION
93 password = config.plugins.growlee.password.value
95 checksum.update(data[:-16])
97 checksum.update(password)
98 if digest != checksum.digest():
101 nlen, tlen, dlen, alen = unpack("!HHHH",str(data[4:12]))
102 notification, title, description = unpack("%ds%ds%ds" % (nlen, tlen, dlen), data[12:Len-alen-16])
103 # type == GROWL_TYPE_NOTIFICATION_NOAUTH
104 elif data[1] == '\x05':
105 nlen, tlen, dlen, alen = unpack("!HHHH",str(data[4:12]))
106 notification, title, description = unpack("%ds%ds%ds" % (nlen, tlen, dlen), data[12:Len-alen])
108 # don't handle any other packet yet
111 Notifications.AddNotificationWithID(
114 text = title + '\n' + description,
115 type = MessageBox.TYPE_INFO,
117 close_on_any_key = True,
120 class GrowlTalkAbstraction:
122 self.growltalk = GrowlTalk()
123 self.serverPort = reactor.listenUDP(GROWL_UDP_PORT, self.growltalk)
125 def sendNotification(self, title='No title.', description='No description.', priority=-1, timeout=-1):
127 flags = 8 + (-priority << 1)
129 flags = priority << 1
131 # NOTE: sticky didn't work in any of my tests, but let's assume this is my configurations fault
135 self.growltalk.sendNotification(title=title, description=description, flags=flags)
138 return self.serverPort.stopListening()