raise an exception when the xmpp recipient is not found. http_translator turns this...
[OpenSRF.git] / src / python / osrf / net.py
1 # -----------------------------------------------------------------------
2 # Copyright (C) 2007  Georgia Public Library Service
3 # Bill Erickson <billserickson@gmail.com>
4
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU General Public License
7 # as published by the Free Software Foundation; either version 2
8 # of the License, or (at your option) any later version.
9
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 # -----------------------------------------------------------------------
15
16
17 import os, time, threading
18 from pyxmpp.jabber.client import JabberClient
19 from pyxmpp.message import Message
20 from pyxmpp.jid import JID
21 from socket import gethostname
22 import libxml2
23 import osrf.log, osrf.ex
24
25 THREAD_SESSIONS = {}
26
27 # - log jabber activity (for future reference)
28 #import logging
29 #logger=logging.getLogger()
30 #logger.addHandler(logging.StreamHandler())
31 #logger.addHandler(logging.FileHandler('j.log'))
32 #logger.setLevel(logging.DEBUG)
33
34
35
36 class XMPPNoRecipient(osrf.ex.OSRFException):
37     ''' Raised when a message was sent to a non-existent recipient 
38         The recipient is stored in the 'recipient' field on this object
39     '''
40     def __init__(self, recipient):
41         osrf.ex.OSRFException.__init__(self, 'Error communicating with %s' % recipient)
42         self.recipient = recipient
43
44 def set_network_handle(handle):
45     """ Sets the thread-specific network handle"""
46     THREAD_SESSIONS[threading.currentThread().getName()] = handle
47
48 def get_network_handle():
49     """ Returns the thread-specific network connection handle."""
50     return THREAD_SESSIONS.get(threading.currentThread().getName())
51
52 def clear_network_handle():
53     ''' Disconnects the thread-specific handle and discards it '''
54     handle = THREAD_SESSIONS.get(threading.currentThread().getName())
55     if handle:
56         handle.disconnect()
57         del THREAD_SESSIONS[threading.currentThread().getName()]
58
59 class NetworkMessage(object):
60     """Network message
61
62     attributes:
63
64     sender - message sender
65     recipient - message recipient
66     body - the body of the message
67     thread - the message thread
68     locale - locale of the message
69     """
70
71     def __init__(self, message=None, **args):
72         if message:
73             self.body = message.get_body()
74             self.thread = message.get_thread()
75             self.recipient = message.get_to()
76             self.router_command = None
77             self.router_class = None
78             if message.xmlnode.hasProp('router_from') and \
79                 message.xmlnode.prop('router_from') != '':
80                 self.sender = message.xmlnode.prop('router_from')
81             else:
82                 self.sender = message.get_from().as_utf8()
83         else:
84             self.sender = args.get('sender')
85             self.recipient = args.get('recipient')
86             self.body = args.get('body')
87             self.thread = args.get('thread')
88             self.router_command = args.get('router_command')
89             self.router_class = args.get('router_class')
90
91     @staticmethod
92     def from_xml(xml):
93         doc = libxml2.parseDoc(xml)
94         msg = Message(doc.getRootElement())
95         return NetworkMessage(msg)
96         
97
98     def make_xmpp_msg(self):
99         ''' Creates a pyxmpp.message.Message and adds custom attributes '''
100
101         msg = Message(None, self.sender, self.recipient, None, None, None, \
102             self.body, self.thread)
103         if self.router_command:
104             msg.xmlnode.newProp('router_command', self.router_command)
105         if self.router_class:
106             msg.xmlnode.newProp('router_class', self.router_class)
107         return msg
108
109     def to_xml(self):
110         ''' Turns this message into XML '''
111         return self.make_xmpp_msg().serialize()
112         
113
114 class Network(JabberClient):
115     def __init__(self, **args):
116         self.isconnected = False
117
118         # Create a unique jabber resource
119         resource = args.get('resource') or 'python_client'
120         resource += '_' + gethostname() + ':' + str(os.getpid()) + '_' + \
121             threading.currentThread().getName().lower()
122         self.jid = JID(args['username'], args['host'], resource)
123
124         osrf.log.log_debug("initializing network with JID %s and host=%s, "
125             "port=%s, username=%s" % (self.jid.as_utf8(), args['host'], \
126             args['port'], args['username']))
127
128         #initialize the superclass
129         JabberClient.__init__(self, self.jid, args['password'], args['host'])
130         self.queue = []
131
132         self.receive_callback = None
133         self.transport_error_msg = None
134
135     def connect(self):
136         JabberClient.connect(self)
137         while not self.isconnected:
138             stream = self.get_stream()
139             act = stream.loop_iter(10)
140             if not act:
141                 self.idle()
142
143     def set_receive_callback(self, func):
144         """The callback provided is called when a message is received.
145         
146             The only argument to the function is the received message. """
147         self.receive_callback = func
148
149     def session_started(self):
150         osrf.log.log_info("Successfully connected to the opensrf network")
151         self.authenticated()
152         self.stream.set_message_handler("normal", self.message_received)
153         self.stream.set_message_handler("error", self.error_received)
154         self.isconnected = True
155
156     def send(self, message):
157         """Sends the provided network message."""
158         osrf.log.log_internal("jabber sending to %s: %s" % (message.recipient, message.body))
159         message.sender = self.jid.as_utf8()
160         msg = message.make_xmpp_msg()
161         self.stream.send(msg)
162
163     def error_received(self, stanza):
164         self.transport_error_msg = NetworkMessage(stanza)
165         osrf.log.log_error("XMPP error message received from %s" % self.transport_error_msg.sender)
166     
167     def message_received(self, stanza):
168         """Handler for received messages."""
169         if stanza.get_type()=="headline":
170             return True
171         # check for errors
172         osrf.log.log_internal("jabber received message from %s : %s" 
173             % (stanza.get_from().as_utf8(), stanza.get_body()))
174         self.queue.append(NetworkMessage(stanza))
175         return True
176
177     def recv(self, timeout=120):
178         """Attempts to receive a message from the network.
179
180         timeout - max number of seconds to wait for a message.  
181         If a message is received in 'timeout' seconds, the message is passed to 
182         the receive_callback is called and True is returned.  Otherwise, false is
183         returned.
184         """
185
186         forever = False
187         if timeout < 0:
188             forever = True
189             timeout = None
190
191         if len(self.queue) == 0:
192             while (forever or timeout >= 0) and len(self.queue) == 0:
193                 starttime = time.time()
194                 act = self.get_stream().loop_iter(timeout)
195                 endtime = time.time() - starttime
196                 if not forever:
197                     timeout -= endtime
198                 osrf.log.log_internal("exiting stream loop after %s seconds. "
199                     "act=%s, queue size=%d" % (str(endtime), act, len(self.queue)))
200
201                 if self.transport_error_msg:
202                     msg = self.transport_error_msg
203                     self.transport_error_msg = None
204                     raise XMPPNoRecipient(msg.sender)
205
206                 if not act:
207                     self.idle()
208
209         # if we've acquired a message, handle it
210         msg = None
211         if len(self.queue) > 0:
212             msg = self.queue.pop(0)
213             if self.receive_callback:
214                 self.receive_callback(msg)
215
216         return msg
217
218
219