]> git.evergreen-ils.org Git - OpenSRF.git/blob - src/python/osrf/net.py
LP#1409055 Support specific protocols for OpenSRF gateway requests
[OpenSRF.git] / src / python / osrf / net.py
1 # -----------------------------------------------------------------------
2 # Copyright (C) 2007  Georgia Public Library Service
3 # Bill Erickson <billserickson@gmail.com>
4
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU General Public License
7 # as published by the Free Software Foundation; either version 2
8 # of the License, or (at your option) any later version.
9
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 # -----------------------------------------------------------------------
15
16
17 import os, time, threading
18 from pyxmpp.jabber.client import JabberClient
19 from pyxmpp.message import Message
20 from pyxmpp.jid import JID
21 from socket import gethostname
22 import libxml2
23 import osrf.log, osrf.ex
24
25 THREAD_SESSIONS = {}
26
27 # - log jabber activity (for future reference)
28 #import logging
29 #logger=logging.getLogger()
30 #logger.addHandler(logging.StreamHandler())
31 #logger.addHandler(logging.FileHandler('j.log'))
32 #logger.setLevel(logging.DEBUG)
33
34
35
36
37 class XMPPNoRecipient(osrf.ex.OSRFException):
38     ''' Raised when a message was sent to a non-existent recipient 
39         The recipient is stored in the 'recipient' field on this object
40     '''
41     def __init__(self, recipient):
42         osrf.ex.OSRFException.__init__(self, 'Error communicating with %s' % recipient)
43         self.recipient = recipient
44
45 class XMPPNoConnection(osrf.ex.OSRFException):
46     pass
47
48 def set_network_handle(handle):
49     """ Sets the thread-specific network handle"""
50     THREAD_SESSIONS[threading.currentThread().getName()] = handle
51
52 def get_network_handle():
53     """ Returns the thread-specific network connection handle."""
54     return THREAD_SESSIONS.get(threading.currentThread().getName())
55
56 def clear_network_handle():
57     ''' Disconnects the thread-specific handle and discards it '''
58     handle = THREAD_SESSIONS.get(threading.currentThread().getName())
59     if handle:
60         osrf.log.log_internal("clearing network handle %s" % handle.jid.as_utf8())
61         del THREAD_SESSIONS[threading.currentThread().getName()]
62         return handle
63
64 class NetworkMessage(object):
65     """Network message
66
67     attributes:
68
69     sender - message sender
70     recipient - message recipient
71     body - the body of the message
72     thread - the message thread
73     locale - locale of the message
74     osrf_xid - The logging transaction ID
75     """
76
77     def __init__(self, message=None, **args):
78         if message:
79             self.body = message.get_body()
80             self.thread = message.get_thread()
81             self.recipient = message.get_to()
82             self.router_command = None
83             self.router_class = None
84             if message.xmlnode.hasProp('router_from') and \
85                 message.xmlnode.prop('router_from') != '':
86                 self.sender = message.xmlnode.prop('router_from')
87             else:
88                 self.sender = message.get_from().as_utf8()
89             if message.xmlnode.hasProp('osrf_xid'):
90                 self.xid = message.xmlnode.prop('osrf_xid')
91             else:
92                 self.xid = ''
93         else:
94             self.sender = args.get('sender')
95             self.recipient = args.get('recipient')
96             self.body = args.get('body')
97             self.thread = args.get('thread')
98             self.router_command = args.get('router_command')
99             self.router_class = args.get('router_class')
100             self.xid = osrf.log.get_xid()
101
102     @staticmethod
103     def from_xml(xml):
104         doc = libxml2.parseDoc(xml)
105         msg = Message(doc.getRootElement())
106         return NetworkMessage(msg)
107         
108
109     def make_xmpp_msg(self):
110         ''' Creates a pyxmpp.message.Message and adds custom attributes '''
111
112         msg = Message(None, self.sender, self.recipient, None, None, None, \
113             self.body, self.thread)
114         if self.router_command:
115             msg.xmlnode.newProp('router_command', self.router_command)
116         if self.router_class:
117             msg.xmlnode.newProp('router_class', self.router_class)
118         if self.xid:
119             msg.xmlnode.newProp('osrf_xid', self.xid)
120         return msg
121
122     def to_xml(self):
123         ''' Turns this message into XML '''
124         return self.make_xmpp_msg().serialize()
125         
126
127 class Network(JabberClient):
128     def __init__(self, **args):
129         self.isconnected = False
130
131         # Create a unique jabber resource
132         resource = args.get('resource') or 'python_client'
133         resource += '_' + gethostname() + ':' + str(os.getpid()) + '_' + \
134             threading.currentThread().getName().lower()
135         self.jid = JID(args['username'], args['host'], resource)
136
137         osrf.log.log_debug("initializing network with JID %s and host=%s, "
138             "port=%s, username=%s" % (self.jid.as_utf8(), args['host'], \
139             args['port'], args['username']))
140
141         #initialize the superclass
142         JabberClient.__init__(self, self.jid, args['password'], args['host'])
143         self.queue = []
144
145         self.receive_callback = None
146         self.transport_error_msg = None
147
148     def connect(self):
149         JabberClient.connect(self)
150         while not self.isconnected:
151             stream = self.get_stream()
152             act = stream.loop_iter(10)
153             if not act:
154                 self.idle()
155
156     def set_receive_callback(self, func):
157         """The callback provided is called when a message is received.
158         
159             The only argument to the function is the received message. """
160         self.receive_callback = func
161
162     def session_started(self):
163         osrf.log.log_info("Successfully connected to the opensrf network")
164         self.authenticated()
165         self.stream.set_message_handler("normal", self.message_received)
166         self.stream.set_message_handler("error", self.error_received)
167         self.isconnected = True
168
169     def send(self, message):
170         """Sends the provided network message."""
171         osrf.log.log_internal("jabber sending to %s: %s" % (message.recipient, message.body))
172         message.sender = self.jid.as_utf8()
173         msg = message.make_xmpp_msg()
174         self.stream.send(msg)
175
176     def error_received(self, stanza):
177         self.transport_error_msg = NetworkMessage(stanza)
178         osrf.log.log_error("XMPP error message received from %s" % self.transport_error_msg.sender)
179     
180     def message_received(self, stanza):
181         """Handler for received messages."""
182         if stanza.get_type()=="headline":
183             return True
184         # check for errors
185         osrf.log.log_internal("jabber received message from %s : %s" 
186             % (stanza.get_from().as_utf8(), stanza.get_body()))
187         self.queue.append(NetworkMessage(stanza))
188         return True
189
190     def stream_closed(self, stream):
191         osrf.log.log_debug("XMPP Stream closing...")
192
193     def stream_error(self, err):
194         osrf.log.log_error("XMPP Stream error: condition: %s %r"
195             % (err.get_condition().name,err.serialize()))
196
197     def disconnected(self):
198         osrf.log.log_internal('XMPP Disconnected')
199
200     def recv(self, timeout=120):
201         """Attempts to receive a message from the network.
202
203         timeout - max number of seconds to wait for a message.  
204         If a message is received in 'timeout' seconds, the message is passed to 
205         the receive_callback is called and True is returned.  Otherwise, false is
206         returned.
207         """
208
209         forever = False
210         if timeout < 0:
211             forever = True
212             timeout = None
213
214         if len(self.queue) == 0:
215             while (forever or timeout >= 0) and len(self.queue) == 0:
216                 starttime = time.time()
217
218                 stream = self.get_stream()
219                 if not stream:
220                    raise XMPPNoConnection('We lost our server connection...') 
221
222                 act = stream.loop_iter(timeout)
223                 endtime = time.time() - starttime
224
225                 if not forever:
226                     timeout -= endtime
227
228                 osrf.log.log_internal("exiting stream loop after %s seconds. "
229                     "act=%s, queue size=%d" % (str(endtime), act, len(self.queue)))
230
231                 if self.transport_error_msg:
232                     msg = self.transport_error_msg
233                     self.transport_error_msg = None
234                     raise XMPPNoRecipient(msg.sender)
235
236                 if not act:
237                     self.idle()
238
239         # if we've acquired a message, handle it
240         msg = None
241         if len(self.queue) > 0:
242             msg = self.queue.pop(0)
243             if self.receive_callback:
244                 self.receive_callback(msg)
245
246         return msg
247
248
249     def flush_inbound_data(self):
250         ''' Read all pending inbound messages from the socket and discard them '''
251         cb = self.receive_callback
252         self.receive_callback = None
253         while self.recv(0): pass 
254         self.receive_callback = cb
255
256
257
258