Python libs for OpenSRF ingress tracking
[OpenSRF.git] / src / python / osrf / stack.py
1 # -----------------------------------------------------------------------
2 # Copyright (C) 2007  Georgia Public Library Service
3 # Bill Erickson <billserickson@gmail.com>
4
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU General Public License
7 # as published by the Free Software Foundation; either version 2
8 # of the License, or (at your option) any later version.
9
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 # -----------------------------------------------------------------------
15
16 import time
17 import osrf.json, osrf.log, osrf.ex, osrf.ses, osrf.const, osrf.app
18
19 def push(net_msg):
20
21     ses = osrf.ses.Session.find_or_create(net_msg.thread)
22     ses.set_remote_id(net_msg.sender)
23     if not ses.service:
24         ses.service = osrf.app.Application.name
25
26     omessages = osrf.json.to_object(net_msg.body)
27
28     osrf.log.log_internal("stack.push(): received %d messages" % len(omessages))
29
30     # Pass each bundled opensrf message to the message handler
31     start = time.time()
32     for msg in omessages:
33         handle_message(ses, msg)
34     duration = time.time() - start
35
36     if isinstance(ses, osrf.ses.ServerSession):
37         osrf.log.log_info("Message processing duration %f" % duration)
38
39     return ses
40
41 def handle_message(session, message):
42
43     osrf.log.log_internal("handle_message(): processing message of "
44         "type %s" % message.type())
45
46     osrf.ses.Session.ingress(message.ingress())
47
48     if isinstance(session, osrf.ses.ClientSession):
49         handle_client(session, message)
50     else:
51         handle_server(session, message)
52
53
54 def handle_client(session, message):
55
56     if message.type() == osrf.const.OSRF_MESSAGE_TYPE_RESULT:
57         session.push_response_queue(message)
58         return
59
60     if message.type() == osrf.const.OSRF_MESSAGE_TYPE_STATUS:
61
62         status_code = int(message.payload().statusCode())
63         status_text = message.payload().status()
64         osrf.log.log_internal("handle_message(): processing STATUS, "
65             "status_code =  %d" % status_code)
66
67         if status_code == osrf.const.OSRF_STATUS_COMPLETE:
68             # The server has informed us that this request is complete
69             req = session.find_request(message.threadTrace())
70             if req: 
71                 osrf.log.log_internal("marking request as complete: %d" % req.rid)
72                 req.set_complete()
73             return
74
75         if status_code == osrf.const.OSRF_STATUS_OK:
76             # We have connected successfully
77             osrf.log.log_debug("Successfully connected to " + session.service)
78             session.state = osrf.const.OSRF_APP_SESSION_CONNECTED
79             return
80
81         if status_code == osrf.const.OSRF_STATUS_CONTINUE:
82             # server is telling us to reset our wait timeout and keep waiting for a response
83             session.reset_request_timeout(message.threadTrace())
84             return
85
86         if status_code == osrf.const.OSRF_STATUS_TIMEOUT:
87             osrf.log.log_debug("The server did not receive a request from us in time...")
88             session.state = osrf.const.OSRF_APP_SESSION_DISCONNECTED
89             return
90
91         if status_code == osrf.const.OSRF_STATUS_NOTFOUND:
92             osrf.log.log_error("Requested method was not found on the server: %s" % status_text)
93             session.state = osrf.const.OSRF_APP_SESSION_DISCONNECTED
94             raise osrf.ex.OSRFServiceException(status_text)
95
96         if status_code == osrf.const.OSRF_STATUS_INTERNALSERVERERROR:
97             raise osrf.ex.OSRFServiceException("Server error %d : %s" % (status_code, status_text))
98
99         raise osrf.ex.OSRFProtocolException("Unknown message status: %d" % status_code)
100
101
102 def handle_server(session, message):
103
104     if message.type() == osrf.const.OSRF_MESSAGE_TYPE_REQUEST:
105         osrf.log.log_debug("server received REQUEST from %s" % session.remote_id)
106         session.run_callback('pre_request')
107         osrf.app.Application.handle_request(session, message)
108         session.run_callback('post_request')
109         return
110
111     if message.type() == osrf.const.OSRF_MESSAGE_TYPE_CONNECT:
112         osrf.log.log_debug("server received CONNECT from %s" % session.remote_id)
113         session.state = osrf.const.OSRF_APP_SESSION_CONNECTED 
114         session.send_connect_ok(message.threadTrace())
115         return
116
117     if message.type() == osrf.const.OSRF_MESSAGE_TYPE_DISCONNECT:
118         osrf.log.log_debug("server received DISCONNECT from %s" % session.remote_id)
119         session.state = osrf.const.OSRF_APP_SESSION_DISCONNECTED
120         session.run_callback('disconnect')
121         return
122
123     if message.type() == osrf.const.OSRF_MESSAGE_TYPE_STATUS:
124         osrf.log.log_debug("server ignoring STATUS from %s" % session.remote_id)
125         return
126
127     if message.type() == osrf.const.OSRF_MESSAGE_TYPE_RESULT:
128         osrf.log.log_debug("server ignoring RESULT from %s" % session.remote_id)
129         return
130
131