added the ability to wait forever by passing <0 to recv. explicitly setting sender...
[OpenSRF.git] / src / python / osrf / stack.py
1 # -----------------------------------------------------------------------
2 # Copyright (C) 2007  Georgia Public Library Service
3 # Bill Erickson <billserickson@gmail.com>
4
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU General Public License
7 # as published by the Free Software Foundation; either version 2
8 # of the License, or (at your option) any later version.
9
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 # -----------------------------------------------------------------------
15
16 import osrf.json
17 import osrf.log
18 import osrf.ex
19 import osrf.ses
20 from osrf.const import OSRF_APP_SESSION_CONNECTED, \
21     OSRF_APP_SESSION_DISCONNECTED, OSRF_MESSAGE_TYPE_RESULT, \
22     OSRF_MESSAGE_TYPE_STATUS, OSRF_STATUS_COMPLETE, OSRF_STATUS_CONTINUE, \
23     OSRF_STATUS_NOTFOUND, OSRF_STATUS_OK, OSRF_STATUS_TIMEOUT
24 import time
25
26
27 def push(net_msg):
28     ses = osrf.ses.Session.find_session(net_msg.thread)
29
30     if not ses:
31         # This is an incoming request from a client, create a new server session
32         osrf.log.log_error("server-side sessions don't exist yet")
33         return
34
35     ses.set_remote_id(net_msg.sender)
36
37     omessages = osrf.json.to_object(net_msg.body)
38
39     osrf.log.log_internal("push(): received %d messages" \
40         % len(omessages))
41
42     # Pass each bundled opensrf message to the message handler
43     start = time.time()
44     for msg in omessages:
45         handle_message(ses, msg)
46     duration = time.time() - start
47
48     if isinstance(ses, osrf.ses.ServerSession):
49         osrf.log.log_info("Message processing duration %f" % duration)
50
51 def handle_message(session, message):
52
53     osrf.log.log_internal("handle_message(): processing message of "
54         "type %s" % message.type())
55
56     if isinstance(session, osrf.ses.ClientSession):
57
58         if message.type() == OSRF_MESSAGE_TYPE_RESULT:
59             session.push_response_queue(message)
60             return
61
62         if message.type() == OSRF_MESSAGE_TYPE_STATUS:
63
64             status_code = int(message.payload().statusCode())
65             status_text = message.payload().status()
66             osrf.log.log_internal("handle_message(): processing STATUS, "
67                 "status_code =  %d" % status_code)
68
69         if status_code == OSRF_STATUS_COMPLETE:
70             # The server has informed us that this request is complete
71             req = session.find_request(message.threadTrace())
72             if req: 
73                 osrf.log.log_internal("marking request as complete: %d" % req.rid)
74                 req.set_complete()
75             return
76
77         if status_code == OSRF_STATUS_OK:
78             # We have connected successfully
79             osrf.log.log_debug("Successfully connected to " + session.service)
80             session.state = OSRF_APP_SESSION_CONNECTED
81             return
82
83         if status_code == OSRF_STATUS_CONTINUE:
84             # server is telling us to reset our wait timeout and keep waiting for a response
85             session.reset_request_timeout(message.threadTrace())
86             return
87
88         if status_code == OSRF_STATUS_TIMEOUT:
89             osrf.log.log_debug("The server did not receive a request from us in time...")
90             session.state = OSRF_APP_SESSION_DISCONNECTED
91             return
92
93         if status_code == OSRF_STATUS_NOTFOUND:
94             osrf.log.log_error("Requested method was not found on the server: %s" % status_text)
95             session.state = OSRF_APP_SESSION_DISCONNECTED
96             raise osrf.ex.OSRFServiceException(status_text)
97
98         raise osrf.ex.OSRFProtocolException("Unknown message status: %d" % status_code)
99
100
101
102