e79b43b57b4bd9bb2e4b25143ab92f4eaee51b03
[OpenSRF.git] / src / python / osrf / net.py
1 # -----------------------------------------------------------------------
2 # Copyright (C) 2007  Georgia Public Library Service
3 # Bill Erickson <billserickson@gmail.com>
4
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU General Public License
7 # as published by the Free Software Foundation; either version 2
8 # of the License, or (at your option) any later version.
9
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 # -----------------------------------------------------------------------
15
16
17 from pyxmpp.jabber.client import JabberClient
18 from pyxmpp.message import Message
19 from pyxmpp.jid import JID
20 from socket import gethostname
21 import osrf.log
22 import os, time, threading
23
24 THREAD_SESSIONS = {}
25
26 # - log jabber activity (for future reference)
27 #import logging
28 #logger=logging.getLogger()
29 #logger.addHandler(logging.StreamHandler())
30 #logger.addHandler(logging.FileHandler('j.log'))
31 #logger.setLevel(logging.DEBUG)
32
33 def set_network_handle(handle):
34     """ Sets the thread-specific network handle"""
35     THREAD_SESSIONS[threading.currentThread().getName()] = handle
36
37 def get_network_handle():
38     """ Returns the thread-specific network connection handle."""
39     return THREAD_SESSIONS.get(threading.currentThread().getName())
40
41 def clear_network_handle():
42     ''' Disconnects the thread-specific handle and discards it '''
43     handle = THREAD_SESSIONS.get(threading.currentThread().getName())
44     if handle:
45         handle.disconnect()
46         del THREAD_SESSIONS[threading.currentThread().getName()]
47
48 class NetworkMessage(object):
49     """Network message
50
51     attributes:
52
53     sender - message sender
54     recipient - message recipient
55     body - the body of the message
56     thread - the message thread
57     locale - locale of the message
58     """
59
60     def __init__(self, message=None, **args):
61         if message:
62             self.body = message.get_body()
63             self.thread = message.get_thread()
64             self.recipient = message.get_to()
65             if message.xmlnode.hasProp('router_from') and \
66                 message.xmlnode.prop('router_from') != '':
67                 self.sender = message.xmlnode.prop('router_from')
68             else:
69                 self.sender = message.get_from().as_utf8()
70         else:
71             self.sender = args.get('sender')
72             self.recipient = args.get('recipient')
73             self.body = args.get('body')
74             self.thread = args.get('thread')
75             self.router_command = args.get('router_command')
76
77     def make_xmpp_msg(self):
78         ''' Creates a pyxmpp.message.Message and adds custom attributes '''
79
80         msg = Message(None, None, self.recipient, None, None, None, \
81             self.body, self.thread)
82         if self.router_command:
83             msg.xmlnode.newProp('router_command', self.router_command)
84         return msg
85
86     def to_xml(self):
87         ''' Turns this message into XML '''
88         return self.make_xmpp_msg().serialize()
89         
90
91 class Network(JabberClient):
92     def __init__(self, **args):
93         self.isconnected = False
94
95         # Create a unique jabber resource
96         resource = args.get('resource') or 'python_client'
97         resource += '_' + gethostname() + ':' + str(os.getpid()) + '_' + \
98             threading.currentThread().getName().lower()
99         self.jid = JID(args['username'], args['host'], resource)
100
101         osrf.log.log_debug("initializing network with JID %s and host=%s, "
102             "port=%s, username=%s" % (self.jid.as_utf8(), args['host'], \
103             args['port'], args['username']))
104
105         #initialize the superclass
106         JabberClient.__init__(self, self.jid, args['password'], args['host'])
107         self.queue = []
108
109         self.receive_callback = None
110
111     def connect(self):
112         JabberClient.connect(self)
113         while not self.isconnected:
114             stream = self.get_stream()
115             act = stream.loop_iter(10)
116             if not act:
117                 self.idle()
118
119     def set_receive_callback(self, func):
120         """The callback provided is called when a message is received.
121         
122             The only argument to the function is the received message. """
123         self.receive_callback = func
124
125     def session_started(self):
126         osrf.log.log_info("Successfully connected to the opensrf network")
127         self.authenticated()
128         self.stream.set_message_handler("normal", self.message_received)
129         self.isconnected = True
130
131     def send(self, message):
132         """Sends the provided network message."""
133         osrf.log.log_internal("jabber sending to %s: %s" % (message.recipient, message.body))
134         msg = message.make_xmpp_msg()
135         self.stream.send(msg)
136     
137     def message_received(self, stanza):
138         """Handler for received messages."""
139         if stanza.get_type()=="headline":
140             return True
141         # check for errors
142         osrf.log.log_internal("jabber received message from %s : %s" 
143             % (stanza.get_from().as_utf8(), stanza.get_body()))
144         self.queue.append(NetworkMessage(stanza))
145         return True
146
147     def recv(self, timeout=120):
148         """Attempts to receive a message from the network.
149
150         timeout - max number of seconds to wait for a message.  
151         If a message is received in 'timeout' seconds, the message is passed to 
152         the receive_callback is called and True is returned.  Otherwise, false is
153         returned.
154         """
155
156         if len(self.queue) == 0:
157             while timeout >= 0 and len(self.queue) == 0:
158                 starttime = time.time()
159                 act = self.get_stream().loop_iter(timeout)
160                 endtime = time.time() - starttime
161                 timeout -= endtime
162                 osrf.log.log_internal("exiting stream loop after %s seconds. "
163                     "act=%s, queue size=%d" % (str(endtime), act, len(self.queue)))
164                 if not act:
165                     self.idle()
166
167         # if we've acquired a message, handle it
168         msg = None
169         if len(self.queue) > 0:
170             msg = self.queue.pop(0)
171             if self.receive_callback:
172                 self.receive_callback(msg)
173
174         return msg
175
176
177