3e4d91ee60a78347a165504c98b2750ec93b0046
[OpenSRF.git] / src / python / osrf / net.py
1 # -----------------------------------------------------------------------
2 # Copyright (C) 2007  Georgia Public Library Service
3 # Bill Erickson <billserickson@gmail.com>
4
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU General Public License
7 # as published by the Free Software Foundation; either version 2
8 # of the License, or (at your option) any later version.
9
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 # -----------------------------------------------------------------------
15
16
17 import os, time, threading
18 from pyxmpp.jabber.client import JabberClient
19 from pyxmpp.message import Message
20 from pyxmpp.jid import JID
21 from socket import gethostname
22 import libxml2
23 import osrf.log, osrf.ex
24
25 THREAD_SESSIONS = {}
26
27 # - log jabber activity (for future reference)
28 #import logging
29 #logger=logging.getLogger()
30 #logger.addHandler(logging.StreamHandler())
31 #logger.addHandler(logging.FileHandler('j.log'))
32 #logger.setLevel(logging.DEBUG)
33
34
35
36 class XMPPNoRecipient(osrf.ex.OSRFException):
37     ''' Raised when a message was sent to a non-existent recipient 
38         The recipient is stored in the 'recipient' field on this object
39     '''
40     def __init__(self, recipient):
41         osrf.ex.OSRFException.__init__(self, 'Error communicating with %s' % recipient)
42         self.recipient = recipient
43
44 def set_network_handle(handle):
45     """ Sets the thread-specific network handle"""
46     THREAD_SESSIONS[threading.currentThread().getName()] = handle
47
48 def get_network_handle():
49     """ Returns the thread-specific network connection handle."""
50     return THREAD_SESSIONS.get(threading.currentThread().getName())
51
52 def clear_network_handle():
53     ''' Disconnects the thread-specific handle and discards it '''
54     handle = THREAD_SESSIONS.get(threading.currentThread().getName())
55     if handle:
56         handle.disconnect()
57         del THREAD_SESSIONS[threading.currentThread().getName()]
58
59 class NetworkMessage(object):
60     """Network message
61
62     attributes:
63
64     sender - message sender
65     recipient - message recipient
66     body - the body of the message
67     thread - the message thread
68     locale - locale of the message
69     osrf_xid - The logging transaction ID
70     """
71
72     def __init__(self, message=None, **args):
73         if message:
74             self.body = message.get_body()
75             self.thread = message.get_thread()
76             self.recipient = message.get_to()
77             self.router_command = None
78             self.router_class = None
79             if message.xmlnode.hasProp('router_from') and \
80                 message.xmlnode.prop('router_from') != '':
81                 self.sender = message.xmlnode.prop('router_from')
82             else:
83                 self.sender = message.get_from().as_utf8()
84             if message.xmlnode.hasProp('osrf_xid'):
85                 self.xid = message.xmlnode
86             else:
87                 self.xid = ''
88         else:
89             self.sender = args.get('sender')
90             self.recipient = args.get('recipient')
91             self.body = args.get('body')
92             self.thread = args.get('thread')
93             self.router_command = args.get('router_command')
94             self.router_class = args.get('router_class')
95             self.xid = osrf.log.get_xid()
96
97     @staticmethod
98     def from_xml(xml):
99         doc = libxml2.parseDoc(xml)
100         msg = Message(doc.getRootElement())
101         return NetworkMessage(msg)
102         
103
104     def make_xmpp_msg(self):
105         ''' Creates a pyxmpp.message.Message and adds custom attributes '''
106
107         msg = Message(None, self.sender, self.recipient, None, None, None, \
108             self.body, self.thread)
109         if self.router_command:
110             msg.xmlnode.newProp('router_command', self.router_command)
111         if self.router_class:
112             msg.xmlnode.newProp('router_class', self.router_class)
113         if self.xid:
114             msg.xmlnode.newProp('osrf_xid', self.xid)
115         return msg
116
117     def to_xml(self):
118         ''' Turns this message into XML '''
119         return self.make_xmpp_msg().serialize()
120         
121
122 class Network(JabberClient):
123     def __init__(self, **args):
124         self.isconnected = False
125
126         # Create a unique jabber resource
127         resource = args.get('resource') or 'python_client'
128         resource += '_' + gethostname() + ':' + str(os.getpid()) + '_' + \
129             threading.currentThread().getName().lower()
130         self.jid = JID(args['username'], args['host'], resource)
131
132         osrf.log.log_debug("initializing network with JID %s and host=%s, "
133             "port=%s, username=%s" % (self.jid.as_utf8(), args['host'], \
134             args['port'], args['username']))
135
136         #initialize the superclass
137         JabberClient.__init__(self, self.jid, args['password'], args['host'])
138         self.queue = []
139
140         self.receive_callback = None
141         self.transport_error_msg = None
142
143     def connect(self):
144         JabberClient.connect(self)
145         while not self.isconnected:
146             stream = self.get_stream()
147             act = stream.loop_iter(10)
148             if not act:
149                 self.idle()
150
151     def set_receive_callback(self, func):
152         """The callback provided is called when a message is received.
153         
154             The only argument to the function is the received message. """
155         self.receive_callback = func
156
157     def session_started(self):
158         osrf.log.log_info("Successfully connected to the opensrf network")
159         self.authenticated()
160         self.stream.set_message_handler("normal", self.message_received)
161         self.stream.set_message_handler("error", self.error_received)
162         self.isconnected = True
163
164     def send(self, message):
165         """Sends the provided network message."""
166         osrf.log.log_internal("jabber sending to %s: %s" % (message.recipient, message.body))
167         message.sender = self.jid.as_utf8()
168         msg = message.make_xmpp_msg()
169         self.stream.send(msg)
170
171     def error_received(self, stanza):
172         self.transport_error_msg = NetworkMessage(stanza)
173         osrf.log.log_error("XMPP error message received from %s" % self.transport_error_msg.sender)
174     
175     def message_received(self, stanza):
176         """Handler for received messages."""
177         if stanza.get_type()=="headline":
178             return True
179         # check for errors
180         osrf.log.log_internal("jabber received message from %s : %s" 
181             % (stanza.get_from().as_utf8(), stanza.get_body()))
182         self.queue.append(NetworkMessage(stanza))
183         return True
184
185     def recv(self, timeout=120):
186         """Attempts to receive a message from the network.
187
188         timeout - max number of seconds to wait for a message.  
189         If a message is received in 'timeout' seconds, the message is passed to 
190         the receive_callback is called and True is returned.  Otherwise, false is
191         returned.
192         """
193
194         forever = False
195         if timeout < 0:
196             forever = True
197             timeout = None
198
199         if len(self.queue) == 0:
200             while (forever or timeout >= 0) and len(self.queue) == 0:
201                 starttime = time.time()
202                 act = self.get_stream().loop_iter(timeout)
203                 endtime = time.time() - starttime
204                 if not forever:
205                     timeout -= endtime
206                 osrf.log.log_internal("exiting stream loop after %s seconds. "
207                     "act=%s, queue size=%d" % (str(endtime), act, len(self.queue)))
208
209                 if self.transport_error_msg:
210                     msg = self.transport_error_msg
211                     self.transport_error_msg = None
212                     raise XMPPNoRecipient(msg.sender)
213
214                 if not act:
215                     self.idle()
216
217         # if we've acquired a message, handle it
218         msg = None
219         if len(self.queue) > 0:
220             msg = self.queue.pop(0)
221             if self.receive_callback:
222                 self.receive_callback(msg)
223
224         return msg
225
226
227