1 # -----------------------------------------------------------------------
2 # Copyright (C) 2007 Georgia Public Library Service
3 # Bill Erickson <billserickson@gmail.com>
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU General Public License
7 # as published by the Free Software Foundation; either version 2
8 # of the License, or (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 # -----------------------------------------------------------------------
17 from pyxmpp.jabber.client import JabberClient
18 from pyxmpp.message import Message
19 from pyxmpp.jid import JID
20 from socket import gethostname
22 import os, time, threading
26 # - log jabber activity (for future reference)
28 #logger=logging.getLogger()
29 #logger.addHandler(logging.StreamHandler())
30 #logger.addHandler(logging.FileHandler('j.log'))
31 #logger.setLevel(logging.DEBUG)
33 def set_network_handle(handle):
34 """ Sets the thread-specific network handle"""
35 THREAD_SESSIONS[threading.currentThread().getName()] = handle
37 def get_network_handle():
38 """ Returns the thread-specific network connection handle."""
39 return THREAD_SESSIONS.get(threading.currentThread().getName())
42 class NetworkMessage(object):
47 sender - message sender
48 recipient - message recipient
49 body - the body of the message
50 thread - the message thread
51 locale - locale of the message
54 def __init__(self, message=None, **args):
56 self.body = message.get_body()
57 self.thread = message.get_thread()
58 self.recipient = message.get_to()
59 if message.xmlnode.hasProp('router_from') and \
60 message.xmlnode.prop('router_from') != '':
61 self.sender = message.xmlnode.prop('router_from')
63 self.sender = message.get_from().as_utf8()
65 if args.has_key('sender'):
66 self.sender = args['sender']
67 if args.has_key('recipient'):
68 self.recipient = args['recipient']
69 if args.has_key('body'):
70 self.body = args['body']
71 if args.has_key('thread'):
72 self.thread = args['thread']
74 class Network(JabberClient):
75 def __init__(self, **args):
76 self.isconnected = False
78 # Create a unique jabber resource
80 if args.has_key('resource'):
81 resource = args['resource']
82 resource += '_' + gethostname() + ':' + str(os.getpid()) + '_' + \
83 threading.currentThread().getName().lower()
84 self.jid = JID(args['username'], args['host'], resource)
86 osrf.log.log_debug("initializing network with JID %s and host=%s, "
87 "port=%s, username=%s" % (self.jid.as_utf8(), args['host'], \
88 args['port'], args['username']))
90 #initialize the superclass
91 JabberClient.__init__(self, self.jid, args['password'], args['host'])
94 self.receive_callback = None
97 JabberClient.connect(self)
98 while not self.isconnected:
99 stream = self.get_stream()
100 act = stream.loop_iter(10)
104 def set_receive_callback(self, func):
105 """The callback provided is called when a message is received.
107 The only argument to the function is the received message. """
108 self.receive_callback = func
110 def session_started(self):
111 osrf.log.log_info("Successfully connected to the opensrf network")
113 self.stream.set_message_handler("normal", self.message_received)
114 self.isconnected = True
116 def send(self, message):
117 """Sends the provided network message."""
118 osrf.log.log_internal("jabber sending to %s: %s" % \
119 (message.recipient, message.body))
120 msg = Message(None, None, message.recipient, None, None, None, \
121 message.body, message.thread)
122 self.stream.send(msg)
124 def message_received(self, stanza):
125 """Handler for received messages."""
126 if stanza.get_type()=="headline":
129 osrf.log.log_internal("jabber received message from %s : %s"
130 % (stanza.get_from().as_utf8(), stanza.get_body()))
131 self.queue.append(NetworkMessage(stanza))
134 def recv(self, timeout=120):
135 """Attempts to receive a message from the network.
137 timeout - max number of seconds to wait for a message.
138 If a message is received in 'timeout' seconds, the message is passed to
139 the receive_callback is called and True is returned. Otherwise, false is
143 if len(self.queue) == 0:
144 while timeout >= 0 and len(self.queue) == 0:
145 starttime = time.time()
146 act = self.get_stream().loop_iter(timeout)
147 endtime = time.time() - starttime
149 osrf.log.log_internal("exiting stream loop after %s seconds. "
150 "act=%s, queue size=%d" % (str(endtime), act, len(self.queue)))
154 # if we've acquired a message, handle it
156 if len(self.queue) > 0:
157 msg = self.queue.pop(0)
158 if self.receive_callback:
159 self.receive_callback(msg)