added support for multi-threaded client interactions. much like the java lib, each...
[OpenSRF.git] / src / python / osrf / net.py
1 # -----------------------------------------------------------------------
2 # Copyright (C) 2007  Georgia Public Library Service
3 # Bill Erickson <billserickson@gmail.com>
4
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU General Public License
7 # as published by the Free Software Foundation; either version 2
8 # of the License, or (at your option) any later version.
9
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 # -----------------------------------------------------------------------
15
16
17 from pyxmpp.jabber.client import JabberClient
18 from pyxmpp.message import Message
19 from pyxmpp.jid import JID
20 from socket import gethostname
21 from osrf.log import *
22 import os, time, threading
23 import logging
24
25 threadSessions = {}
26
27 # - log jabber activity (for future reference)
28 #logger=logging.getLogger()
29 #logger.addHandler(logging.StreamHandler())
30 #logger.addHandler(logging.FileHandler('j.log'))
31 #logger.setLevel(logging.DEBUG)
32
33 def osrfSetNetworkHandle(handle):
34     """ Sets the thread-specific network handle"""
35     threadSessions[threading.currentThread().getName()] = handle
36
37 def osrfGetNetworkHandle():
38     """ Returns the thread-specific network connection handle."""
39     return threadSessions.get(threading.currentThread().getName())
40
41
42 class osrfNetworkMessage(object):
43     """Network message
44
45     attributes:
46
47     sender - message sender
48     to - message recipient
49     body - the body of the message
50     thread - the message thread
51     """
52
53     def __init__(self, message=None, **args):
54         if message:
55             self.body = message.get_body()
56             self.thread = message.get_thread()
57             self.to = message.get_to()
58             if message.xmlnode.hasProp('router_from') and message.xmlnode.prop('router_from') != '':
59                 self.sender = message.xmlnode.prop('router_from')
60             else: self.sender = message.get_from().as_utf8()
61         else:
62             if args.has_key('sender'): self.sender = args['sender']
63             if args.has_key('to'): self.to = args['to']
64             if args.has_key('body'): self.body = args['body']
65             if args.has_key('thread'): self.thread = args['thread']
66
67
68 class osrfNetwork(JabberClient):
69     def __init__(self, **args):
70         self.isconnected = False
71
72         # Create a unique jabber resource
73         resource = 'python_'
74         if args.has_key('resource'):
75             resource = args['resource']
76         resource += '_' + gethostname()+':'+ str(os.getpid()) + '_'+ threading.currentThread().getName().lower()
77         self.jid = JID(args['username'], args['host'], resource)
78
79         osrfLogDebug("initializing network with JID %s and host=%s, port=%s, username=%s" % 
80             (self.jid.as_utf8(), args['host'], args['port'], args['username']))
81
82         #initialize the superclass
83         JabberClient.__init__(self, self.jid, args['password'], args['host'])
84         self.queue = []
85
86     def connect(self):
87         JabberClient.connect(self)
88         while not self.isconnected:
89             stream = self.get_stream()
90             act = stream.loop_iter(10)
91             if not act: self.idle()
92
93     def setRecvCallback(self, func):
94         """The callback provided is called when a message is received.
95         
96             The only argument to the function is the received message. """
97         self.recvCallback = func
98
99     def session_started(self):
100         osrfLogInfo("Successfully connected to the opensrf network")
101         self.authenticated()
102         self.stream.set_message_handler("normal",self.message_received)
103         self.isconnected = True
104
105     def send(self, message):
106         """Sends the provided network message."""
107         osrfLogInternal("jabber sending to %s: %s" % (message.to, message.body))
108         msg = Message(None, None, message.to, None, None, None, message.body, message.thread)
109         self.stream.send(msg)
110     
111     def message_received(self, stanza):
112         """Handler for received messages."""
113         osrfLogInternal("jabber received a message of type %s" % stanza.get_type())
114         if stanza.get_type()=="headline":
115             return True
116         # check for errors
117         osrfLogInternal("jabber received message from %s : %s" 
118             % (stanza.get_from().as_utf8(), stanza.get_body()))
119         self.queue.append(osrfNetworkMessage(stanza))
120         return True
121
122     def recv(self, timeout=120):
123         """Attempts to receive a message from the network.
124
125         timeout - max number of seconds to wait for a message.  
126         If no message is received in 'timeout' seconds, None is returned. """
127
128         msg = None
129         if len(self.queue) == 0:
130             while timeout >= 0 and len(self.queue) == 0:
131                 starttime = time.time()
132                 osrfLogInternal("going into stream loop at " + str(starttime))
133                 act = self.get_stream().loop_iter(timeout)
134                 endtime = time.time() - starttime
135                 timeout -= endtime
136                 osrfLogInternal("exiting stream loop after %s seconds" % str(endtime))
137                 osrfLogInternal("act = %s : queue length = %d" % (act, len(self.queue)) )
138                 if not act: self.idle()
139
140         # if we've acquired a message, handle it
141         if len(self.queue) > 0:
142             self.recvCallback(self.queue.pop(0))
143         return None
144
145
146