1 # -----------------------------------------------------------------------
2 # Copyright (C) 2007 Georgia Public Library Service
3 # Bill Erickson <billserickson@gmail.com>
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU General Public License
7 # as published by the Free Software Foundation; either version 2
8 # of the License, or (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 # -----------------------------------------------------------------------
17 from pyxmpp.jabber.client import JabberClient
18 from pyxmpp.message import Message
19 from pyxmpp.jid import JID
20 from socket import gethostname
22 import os, time, threading
26 # - log jabber activity (for future reference)
28 #logger=logging.getLogger()
29 #logger.addHandler(logging.StreamHandler())
30 #logger.addHandler(logging.FileHandler('j.log'))
31 #logger.setLevel(logging.DEBUG)
33 def set_network_handle(handle):
34 """ Sets the thread-specific network handle"""
35 THREAD_SESSIONS[threading.currentThread().getName()] = handle
37 def get_network_handle():
38 """ Returns the thread-specific network connection handle."""
39 return THREAD_SESSIONS.get(threading.currentThread().getName())
41 def clear_network_handle():
42 ''' Disconnects the thread-specific handle and discards it '''
43 handle = THREAD_SESSIONS.get(threading.currentThread().getName())
46 del THREAD_SESSIONS[threading.currentThread().getName()]
48 class NetworkMessage(object):
53 sender - message sender
54 recipient - message recipient
55 body - the body of the message
56 thread - the message thread
57 locale - locale of the message
60 def __init__(self, message=None, **args):
62 self.body = message.get_body()
63 self.thread = message.get_thread()
64 self.recipient = message.get_to()
65 if message.xmlnode.hasProp('router_from') and \
66 message.xmlnode.prop('router_from') != '':
67 self.sender = message.xmlnode.prop('router_from')
69 self.sender = message.get_from().as_utf8()
71 self.sender = args.get('sender')
72 self.recipient = args.get('recipient')
73 self.body = args.get('body')
74 self.thread = args.get('thread')
75 self.router_command = args.get('router_command')
77 def make_xmpp_msg(self):
78 ''' Creates a pyxmpp.message.Message and adds custom attributes '''
80 msg = Message(None, None, self.recipient, None, None, None, \
81 self.body, self.thread)
82 if self.router_command:
83 msg.xmlnode.newProp('router_command', self.router_command)
87 ''' Turns this message into XML '''
88 return self.make_xmpp_msg().serialize()
91 class Network(JabberClient):
92 def __init__(self, **args):
93 self.isconnected = False
95 # Create a unique jabber resource
96 resource = args.get('resource') or 'python_client'
97 resource += '_' + gethostname() + ':' + str(os.getpid()) + '_' + \
98 threading.currentThread().getName().lower()
99 self.jid = JID(args['username'], args['host'], resource)
101 osrf.log.log_debug("initializing network with JID %s and host=%s, "
102 "port=%s, username=%s" % (self.jid.as_utf8(), args['host'], \
103 args['port'], args['username']))
105 #initialize the superclass
106 JabberClient.__init__(self, self.jid, args['password'], args['host'])
109 self.receive_callback = None
112 JabberClient.connect(self)
113 while not self.isconnected:
114 stream = self.get_stream()
115 act = stream.loop_iter(10)
119 def set_receive_callback(self, func):
120 """The callback provided is called when a message is received.
122 The only argument to the function is the received message. """
123 self.receive_callback = func
125 def session_started(self):
126 osrf.log.log_info("Successfully connected to the opensrf network")
128 self.stream.set_message_handler("normal", self.message_received)
129 self.isconnected = True
131 def send(self, message):
132 """Sends the provided network message."""
133 osrf.log.log_internal("jabber sending to %s: %s" % (message.recipient, message.body))
134 msg = message.make_xmpp_msg()
135 self.stream.send(msg)
137 def message_received(self, stanza):
138 """Handler for received messages."""
139 if stanza.get_type()=="headline":
142 osrf.log.log_internal("jabber received message from %s : %s"
143 % (stanza.get_from().as_utf8(), stanza.get_body()))
144 self.queue.append(NetworkMessage(stanza))
147 def recv(self, timeout=120):
148 """Attempts to receive a message from the network.
150 timeout - max number of seconds to wait for a message.
151 If a message is received in 'timeout' seconds, the message is passed to
152 the receive_callback is called and True is returned. Otherwise, false is
156 if len(self.queue) == 0:
157 while timeout >= 0 and len(self.queue) == 0:
158 starttime = time.time()
159 act = self.get_stream().loop_iter(timeout)
160 endtime = time.time() - starttime
162 osrf.log.log_internal("exiting stream loop after %s seconds. "
163 "act=%s, queue size=%d" % (str(endtime), act, len(self.queue)))
167 # if we've acquired a message, handle it
169 if len(self.queue) > 0:
170 msg = self.queue.pop(0)
171 if self.receive_callback:
172 self.receive_callback(msg)