added new disconnected exception. added some error condition logging. no longer...
[OpenSRF.git] / src / python / osrf / net.py
1 # -----------------------------------------------------------------------
2 # Copyright (C) 2007  Georgia Public Library Service
3 # Bill Erickson <billserickson@gmail.com>
4
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU General Public License
7 # as published by the Free Software Foundation; either version 2
8 # of the License, or (at your option) any later version.
9
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 # -----------------------------------------------------------------------
15
16
17 import os, time, threading
18 from pyxmpp.jabber.client import JabberClient
19 from pyxmpp.message import Message
20 from pyxmpp.jid import JID
21 from socket import gethostname
22 import libxml2
23 import osrf.log, osrf.ex
24
25 THREAD_SESSIONS = {}
26
27 # - log jabber activity (for future reference)
28 #import logging
29 #logger=logging.getLogger()
30 #logger.addHandler(logging.StreamHandler())
31 #logger.addHandler(logging.FileHandler('j.log'))
32 #logger.setLevel(logging.DEBUG)
33
34
35
36
37 class XMPPNoRecipient(osrf.ex.OSRFException):
38     ''' Raised when a message was sent to a non-existent recipient 
39         The recipient is stored in the 'recipient' field on this object
40     '''
41     def __init__(self, recipient):
42         osrf.ex.OSRFException.__init__(self, 'Error communicating with %s' % recipient)
43         self.recipient = recipient
44
45 class XMPPNoConnection(osrf.ex.OSRFException):
46     pass
47
48 def set_network_handle(handle):
49     """ Sets the thread-specific network handle"""
50     THREAD_SESSIONS[threading.currentThread().getName()] = handle
51
52 def get_network_handle():
53     """ Returns the thread-specific network connection handle."""
54     return THREAD_SESSIONS.get(threading.currentThread().getName())
55
56 def clear_network_handle():
57     ''' Disconnects the thread-specific handle and discards it '''
58     handle = THREAD_SESSIONS.get(threading.currentThread().getName())
59     if handle:
60         osrf.log.log_internal("clearing network handle %s" % handle.jid.as_utf8())
61         #handle.disconnect()
62         del THREAD_SESSIONS[threading.currentThread().getName()]
63         return handle
64
65 class NetworkMessage(object):
66     """Network message
67
68     attributes:
69
70     sender - message sender
71     recipient - message recipient
72     body - the body of the message
73     thread - the message thread
74     locale - locale of the message
75     osrf_xid - The logging transaction ID
76     """
77
78     def __init__(self, message=None, **args):
79         if message:
80             self.body = message.get_body()
81             self.thread = message.get_thread()
82             self.recipient = message.get_to()
83             self.router_command = None
84             self.router_class = None
85             if message.xmlnode.hasProp('router_from') and \
86                 message.xmlnode.prop('router_from') != '':
87                 self.sender = message.xmlnode.prop('router_from')
88             else:
89                 self.sender = message.get_from().as_utf8()
90             if message.xmlnode.hasProp('osrf_xid'):
91                 self.xid = message.xmlnode.prop('osrf_xid')
92             else:
93                 self.xid = ''
94         else:
95             self.sender = args.get('sender')
96             self.recipient = args.get('recipient')
97             self.body = args.get('body')
98             self.thread = args.get('thread')
99             self.router_command = args.get('router_command')
100             self.router_class = args.get('router_class')
101             self.xid = osrf.log.get_xid()
102
103     @staticmethod
104     def from_xml(xml):
105         doc = libxml2.parseDoc(xml)
106         msg = Message(doc.getRootElement())
107         return NetworkMessage(msg)
108         
109
110     def make_xmpp_msg(self):
111         ''' Creates a pyxmpp.message.Message and adds custom attributes '''
112
113         msg = Message(None, self.sender, self.recipient, None, None, None, \
114             self.body, self.thread)
115         if self.router_command:
116             msg.xmlnode.newProp('router_command', self.router_command)
117         if self.router_class:
118             msg.xmlnode.newProp('router_class', self.router_class)
119         if self.xid:
120             msg.xmlnode.newProp('osrf_xid', self.xid)
121         return msg
122
123     def to_xml(self):
124         ''' Turns this message into XML '''
125         return self.make_xmpp_msg().serialize()
126         
127
128 class Network(JabberClient):
129     def __init__(self, **args):
130         self.isconnected = False
131
132         # Create a unique jabber resource
133         resource = args.get('resource') or 'python_client'
134         resource += '_' + gethostname() + ':' + str(os.getpid()) + '_' + \
135             threading.currentThread().getName().lower()
136         self.jid = JID(args['username'], args['host'], resource)
137
138         osrf.log.log_debug("initializing network with JID %s and host=%s, "
139             "port=%s, username=%s" % (self.jid.as_utf8(), args['host'], \
140             args['port'], args['username']))
141
142         #initialize the superclass
143         JabberClient.__init__(self, self.jid, args['password'], args['host'])
144         self.queue = []
145
146         self.receive_callback = None
147         self.transport_error_msg = None
148
149     def connect(self):
150         JabberClient.connect(self)
151         while not self.isconnected:
152             stream = self.get_stream()
153             act = stream.loop_iter(10)
154             if not act:
155                 self.idle()
156
157     def set_receive_callback(self, func):
158         """The callback provided is called when a message is received.
159         
160             The only argument to the function is the received message. """
161         self.receive_callback = func
162
163     def session_started(self):
164         osrf.log.log_info("Successfully connected to the opensrf network")
165         self.authenticated()
166         self.stream.set_message_handler("normal", self.message_received)
167         self.stream.set_message_handler("error", self.error_received)
168         self.isconnected = True
169
170     def send(self, message):
171         """Sends the provided network message."""
172         osrf.log.log_internal("jabber sending to %s: %s" % (message.recipient, message.body))
173         message.sender = self.jid.as_utf8()
174         msg = message.make_xmpp_msg()
175         self.stream.send(msg)
176
177     def error_received(self, stanza):
178         self.transport_error_msg = NetworkMessage(stanza)
179         osrf.log.log_error("XMPP error message received from %s" % self.transport_error_msg.sender)
180     
181     def message_received(self, stanza):
182         """Handler for received messages."""
183         if stanza.get_type()=="headline":
184             return True
185         # check for errors
186         osrf.log.log_internal("jabber received message from %s : %s" 
187             % (stanza.get_from().as_utf8(), stanza.get_body()))
188         self.queue.append(NetworkMessage(stanza))
189         return True
190
191     def stream_closed(self, stream):
192         osrf.log.log_debug("XMPP Stream closing...")
193
194     def stream_error(self, err):
195         osrf.log.log_error("XMPP Stream error: condition: %s %r"
196             % (err.get_condition().name,err.serialize()))
197
198     def disconnected(self):
199         osrf.log.log_internal('XMPP Disconnected')
200
201     def recv(self, timeout=120):
202         """Attempts to receive a message from the network.
203
204         timeout - max number of seconds to wait for a message.  
205         If a message is received in 'timeout' seconds, the message is passed to 
206         the receive_callback is called and True is returned.  Otherwise, false is
207         returned.
208         """
209
210         forever = False
211         if timeout < 0:
212             forever = True
213             timeout = None
214
215         if len(self.queue) == 0:
216             while (forever or timeout >= 0) and len(self.queue) == 0:
217                 starttime = time.time()
218
219                 stream = self.get_stream()
220                 if not stream:
221                    raise XMPPNoConnection('We lost our server connection...') 
222
223                 act = stream.loop_iter(timeout)
224                 endtime = time.time() - starttime
225
226                 if not forever:
227                     timeout -= endtime
228
229                 osrf.log.log_internal("exiting stream loop after %s seconds. "
230                     "act=%s, queue size=%d" % (str(endtime), act, len(self.queue)))
231
232                 if self.transport_error_msg:
233                     msg = self.transport_error_msg
234                     self.transport_error_msg = None
235                     raise XMPPNoRecipient(msg.sender)
236
237                 if not act:
238                     self.idle()
239
240         # if we've acquired a message, handle it
241         msg = None
242         if len(self.queue) > 0:
243             msg = self.queue.pop(0)
244             if self.receive_callback:
245                 self.receive_callback(msg)
246
247         return msg
248
249
250