0fd1f95bfff8d00f16129487dde58e1498bdeaa7
[OpenSRF.git] / src / python / osrf / net.py
1 # -----------------------------------------------------------------------
2 # Copyright (C) 2007  Georgia Public Library Service
3 # Bill Erickson <billserickson@gmail.com>
4
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU General Public License
7 # as published by the Free Software Foundation; either version 2
8 # of the License, or (at your option) any later version.
9
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 # -----------------------------------------------------------------------
15
16
17 import os, time, threading
18 from pyxmpp.jabber.client import JabberClient
19 from pyxmpp.message import Message
20 from pyxmpp.jid import JID
21 from socket import gethostname
22 import libxml2
23 import osrf.log
24
25 THREAD_SESSIONS = {}
26
27 # - log jabber activity (for future reference)
28 #import logging
29 #logger=logging.getLogger()
30 #logger.addHandler(logging.StreamHandler())
31 #logger.addHandler(logging.FileHandler('j.log'))
32 #logger.setLevel(logging.DEBUG)
33
34 def set_network_handle(handle):
35     """ Sets the thread-specific network handle"""
36     THREAD_SESSIONS[threading.currentThread().getName()] = handle
37
38 def get_network_handle():
39     """ Returns the thread-specific network connection handle."""
40     return THREAD_SESSIONS.get(threading.currentThread().getName())
41
42 def clear_network_handle():
43     ''' Disconnects the thread-specific handle and discards it '''
44     handle = THREAD_SESSIONS.get(threading.currentThread().getName())
45     if handle:
46         handle.disconnect()
47         del THREAD_SESSIONS[threading.currentThread().getName()]
48
49 class NetworkMessage(object):
50     """Network message
51
52     attributes:
53
54     sender - message sender
55     recipient - message recipient
56     body - the body of the message
57     thread - the message thread
58     locale - locale of the message
59     """
60
61     def __init__(self, message=None, **args):
62         if message:
63             self.body = message.get_body()
64             self.thread = message.get_thread()
65             self.recipient = message.get_to()
66             self.router_command = None
67             if message.xmlnode.hasProp('router_from') and \
68                 message.xmlnode.prop('router_from') != '':
69                 self.sender = message.xmlnode.prop('router_from')
70             else:
71                 self.sender = message.get_from().as_utf8()
72         else:
73             self.sender = args.get('sender')
74             self.recipient = args.get('recipient')
75             self.body = args.get('body')
76             self.thread = args.get('thread')
77             self.router_command = args.get('router_command')
78
79     @staticmethod
80     def from_xml(xml):
81         doc=libxml2.parseDoc(xml)
82         msg = Message(doc.getRootElement())
83         return NetworkMessage(msg)
84         
85
86     def make_xmpp_msg(self):
87         ''' Creates a pyxmpp.message.Message and adds custom attributes '''
88
89         msg = Message(None, self.sender, self.recipient, None, None, None, \
90             self.body, self.thread)
91         if self.router_command:
92             msg.xmlnode.newProp('router_command', self.router_command)
93         return msg
94
95     def to_xml(self):
96         ''' Turns this message into XML '''
97         return self.make_xmpp_msg().serialize()
98         
99
100 class Network(JabberClient):
101     def __init__(self, **args):
102         self.isconnected = False
103
104         # Create a unique jabber resource
105         resource = args.get('resource') or 'python_client'
106         resource += '_' + gethostname() + ':' + str(os.getpid()) + '_' + \
107             threading.currentThread().getName().lower()
108         self.jid = JID(args['username'], args['host'], resource)
109
110         osrf.log.log_debug("initializing network with JID %s and host=%s, "
111             "port=%s, username=%s" % (self.jid.as_utf8(), args['host'], \
112             args['port'], args['username']))
113
114         #initialize the superclass
115         JabberClient.__init__(self, self.jid, args['password'], args['host'])
116         self.queue = []
117
118         self.receive_callback = None
119
120     def connect(self):
121         JabberClient.connect(self)
122         while not self.isconnected:
123             stream = self.get_stream()
124             act = stream.loop_iter(10)
125             if not act:
126                 self.idle()
127
128     def set_receive_callback(self, func):
129         """The callback provided is called when a message is received.
130         
131             The only argument to the function is the received message. """
132         self.receive_callback = func
133
134     def session_started(self):
135         osrf.log.log_info("Successfully connected to the opensrf network")
136         self.authenticated()
137         self.stream.set_message_handler("normal", self.message_received)
138         self.isconnected = True
139
140     def send(self, message):
141         """Sends the provided network message."""
142         osrf.log.log_internal("jabber sending to %s: %s" % (message.recipient, message.body))
143         message.sender = self.jid.as_utf8()
144         msg = message.make_xmpp_msg()
145         self.stream.send(msg)
146     
147     def message_received(self, stanza):
148         """Handler for received messages."""
149         if stanza.get_type()=="headline":
150             return True
151         # check for errors
152         osrf.log.log_internal("jabber received message from %s : %s" 
153             % (stanza.get_from().as_utf8(), stanza.get_body()))
154         self.queue.append(NetworkMessage(stanza))
155         return True
156
157     def recv(self, timeout=120):
158         """Attempts to receive a message from the network.
159
160         timeout - max number of seconds to wait for a message.  
161         If a message is received in 'timeout' seconds, the message is passed to 
162         the receive_callback is called and True is returned.  Otherwise, false is
163         returned.
164         """
165
166         forever = False
167         if timeout < 0:
168             forever = True
169             timeout = None
170
171         if len(self.queue) == 0:
172             while (forever or timeout >= 0) and len(self.queue) == 0:
173                 starttime = time.time()
174                 act = self.get_stream().loop_iter(timeout)
175                 endtime = time.time() - starttime
176                 if not forever:
177                     timeout -= endtime
178                 osrf.log.log_internal("exiting stream loop after %s seconds. "
179                     "act=%s, queue size=%d" % (str(endtime), act, len(self.queue)))
180                 if not act:
181                     self.idle()
182
183         # if we've acquired a message, handle it
184         msg = None
185         if len(self.queue) > 0:
186             msg = self.queue.pop(0)
187             if self.receive_callback:
188                 self.receive_callback(msg)
189
190         return msg
191
192
193