1 # -----------------------------------------------------------------------
2 # Copyright (C) 2007 Georgia Public Library Service
3 # Bill Erickson <billserickson@gmail.com>
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU General Public License
7 # as published by the Free Software Foundation; either version 2
8 # of the License, or (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 # -----------------------------------------------------------------------
17 import os, time, threading
18 from pyxmpp.jabber.client import JabberClient
19 from pyxmpp.message import Message
20 from pyxmpp.jid import JID
21 from socket import gethostname
23 import osrf.log, osrf.ex
27 # - log jabber activity (for future reference)
29 #logger=logging.getLogger()
30 #logger.addHandler(logging.StreamHandler())
31 #logger.addHandler(logging.FileHandler('j.log'))
32 #logger.setLevel(logging.DEBUG)
37 class XMPPNoRecipient(osrf.ex.OSRFException):
38 ''' Raised when a message was sent to a non-existent recipient
39 The recipient is stored in the 'recipient' field on this object
41 def __init__(self, recipient):
42 osrf.ex.OSRFException.__init__(self, 'Error communicating with %s' % recipient)
43 self.recipient = recipient
45 class XMPPNoConnection(osrf.ex.OSRFException):
48 def set_network_handle(handle):
49 """ Sets the thread-specific network handle"""
50 THREAD_SESSIONS[threading.currentThread().getName()] = handle
52 def get_network_handle():
53 """ Returns the thread-specific network connection handle."""
54 return THREAD_SESSIONS.get(threading.currentThread().getName())
56 def clear_network_handle():
57 ''' Disconnects the thread-specific handle and discards it '''
58 handle = THREAD_SESSIONS.get(threading.currentThread().getName())
60 osrf.log.log_internal("clearing network handle %s" % handle.jid.as_utf8())
61 del THREAD_SESSIONS[threading.currentThread().getName()]
64 class NetworkMessage(object):
69 sender - message sender
70 recipient - message recipient
71 body - the body of the message
72 thread - the message thread
73 locale - locale of the message
74 osrf_xid - The logging transaction ID
77 def __init__(self, message=None, **args):
79 self.body = message.get_body()
80 self.thread = message.get_thread()
81 self.recipient = message.get_to()
82 self.router_command = None
83 self.router_class = None
84 if message.xmlnode.hasProp('router_from') and \
85 message.xmlnode.prop('router_from') != '':
86 self.sender = message.xmlnode.prop('router_from')
88 self.sender = message.get_from().as_utf8()
89 if message.xmlnode.hasProp('osrf_xid'):
90 self.xid = message.xmlnode.prop('osrf_xid')
94 self.sender = args.get('sender')
95 self.recipient = args.get('recipient')
96 self.body = args.get('body')
97 self.thread = args.get('thread')
98 self.router_command = args.get('router_command')
99 self.router_class = args.get('router_class')
100 self.xid = osrf.log.get_xid()
104 doc = libxml2.parseDoc(xml)
105 msg = Message(doc.getRootElement())
106 return NetworkMessage(msg)
109 def make_xmpp_msg(self):
110 ''' Creates a pyxmpp.message.Message and adds custom attributes '''
112 msg = Message(None, self.sender, self.recipient, None, None, None, \
113 self.body, self.thread)
114 if self.router_command:
115 msg.xmlnode.newProp('router_command', self.router_command)
116 if self.router_class:
117 msg.xmlnode.newProp('router_class', self.router_class)
119 msg.xmlnode.newProp('osrf_xid', self.xid)
123 ''' Turns this message into XML '''
124 return self.make_xmpp_msg().serialize()
127 class Network(JabberClient):
128 def __init__(self, **args):
129 self.isconnected = False
131 # Create a unique jabber resource
132 resource = args.get('resource') or 'python_client'
133 resource += '_' + gethostname() + ':' + str(os.getpid()) + '_' + \
134 threading.currentThread().getName().lower()
135 self.jid = JID(args['username'], args['host'], resource)
137 osrf.log.log_debug("initializing network with JID %s and host=%s, "
138 "port=%s, username=%s" % (self.jid.as_utf8(), args['host'], \
139 args['port'], args['username']))
141 #initialize the superclass
142 JabberClient.__init__(self, self.jid, args['password'], args['host'])
145 self.receive_callback = None
146 self.transport_error_msg = None
149 JabberClient.connect(self)
150 while not self.isconnected:
151 stream = self.get_stream()
152 act = stream.loop_iter(10)
156 def set_receive_callback(self, func):
157 """The callback provided is called when a message is received.
159 The only argument to the function is the received message. """
160 self.receive_callback = func
162 def session_started(self):
163 osrf.log.log_info("Successfully connected to the opensrf network")
165 self.stream.set_message_handler("normal", self.message_received)
166 self.stream.set_message_handler("error", self.error_received)
167 self.isconnected = True
169 def send(self, message):
170 """Sends the provided network message."""
171 osrf.log.log_internal("jabber sending to %s: %s" % (message.recipient, message.body))
172 message.sender = self.jid.as_utf8()
173 msg = message.make_xmpp_msg()
174 self.stream.send(msg)
176 def error_received(self, stanza):
177 self.transport_error_msg = NetworkMessage(stanza)
178 osrf.log.log_error("XMPP error message received from %s" % self.transport_error_msg.sender)
180 def message_received(self, stanza):
181 """Handler for received messages."""
182 if stanza.get_type()=="headline":
185 osrf.log.log_internal("jabber received message from %s : %s"
186 % (stanza.get_from().as_utf8(), stanza.get_body()))
187 self.queue.append(NetworkMessage(stanza))
190 def stream_closed(self, stream):
191 osrf.log.log_debug("XMPP Stream closing...")
193 def stream_error(self, err):
194 osrf.log.log_error("XMPP Stream error: condition: %s %r"
195 % (err.get_condition().name,err.serialize()))
197 def disconnected(self):
198 osrf.log.log_internal('XMPP Disconnected')
200 def recv(self, timeout=120):
201 """Attempts to receive a message from the network.
203 timeout - max number of seconds to wait for a message.
204 If a message is received in 'timeout' seconds, the message is passed to
205 the receive_callback is called and True is returned. Otherwise, false is
214 if len(self.queue) == 0:
215 while (forever or timeout >= 0) and len(self.queue) == 0:
216 starttime = time.time()
218 stream = self.get_stream()
220 raise XMPPNoConnection('We lost our server connection...')
222 act = stream.loop_iter(timeout)
223 endtime = time.time() - starttime
228 osrf.log.log_internal("exiting stream loop after %s seconds. "
229 "act=%s, queue size=%d" % (str(endtime), act, len(self.queue)))
231 if self.transport_error_msg:
232 msg = self.transport_error_msg
233 self.transport_error_msg = None
234 raise XMPPNoRecipient(msg.sender)
239 # if we've acquired a message, handle it
241 if len(self.queue) > 0:
242 msg = self.queue.pop(0)
243 if self.receive_callback:
244 self.receive_callback(msg)
249 def flush_inbound_data(self):
250 ''' Read all pending inbound messages from the socket and discard them '''
251 cb = self.receive_callback
252 self.receive_callback = None
253 while self.recv(0): pass
254 self.receive_callback = cb