1 #include <opensrf/osrf_stack.h>
2 #include <opensrf/osrf_application.h>
4 /* the max number of oilsMessage blobs present in any one root packet */
5 #define OSRF_MAX_MSGS_PER_PACKET 256
6 // -----------------------------------------------------------------------------
8 static void osrf_stack_application_handler( osrfAppSession* session, osrfMessage* msg );
9 static void _do_client( osrfAppSession*, osrfMessage* );
10 static void _do_server( osrfAppSession*, osrfMessage* );
13 @brief Read and process available transport_messages for a transport_client.
14 @param client Pointer to the transport_client whose socket is to be read.
15 @param timeout How many seconds to wait for the first message.
16 @param msg_received A pointer through which to report whether a message was received.
17 @return 0 upon success (even if a timeout occurred) or -1 upon failure.
19 Read and process all available transport_messages from the socket of the specified
20 transport_client. Pass each one through osrf_stack_transport().
22 The timeout applies only to the first message. Any subsequent messages must be
23 available immediately. Don't wait for them, even if the timeout has not expired. In
24 theory, a sufficiently large backlog of input messages could keep you working past the
25 nominal expiration of the timeout.
27 The @a msg_received parameter points to an int owned by the calling code and used as
28 a boolean. Set it to true if you receive at least one transport_message, or to false
29 if you don't. A timeout is not treated as an error; it just means you must set that
32 int osrf_stack_process( transport_client* client, int timeout, int* msg_received ) {
33 if( !client ) return -1;
34 transport_message* msg = NULL;
35 if(msg_received) *msg_received = 0;
37 // Loop through the available input messages
38 while( (msg = client_recv( client, timeout )) ) {
39 if(msg_received) *msg_received = 1;
40 osrfLogDebug( OSRF_LOG_MARK, "Received message from transport code from %s", msg->sender );
41 osrf_stack_transport_handler( msg, NULL );
46 osrfLogWarning(OSRF_LOG_MARK, "transport_client had trouble reading from the socket..");
50 if( ! client_connected( client ) ) return -1;
55 // -----------------------------------------------------------------------------
56 // Entry point into the stack
57 // -----------------------------------------------------------------------------
59 @brief Unpack a transport into one or more osrfMessages, and process each one.
60 @param msg The transport message to be unpacked and processed.
61 @param my_service Application name (optional).
62 @return Pointer to an osrfAppSession -- either a pre-existing one or a new one.
64 Look for an existing osrfAppSession with which the message is associated. Such a session
65 may already exist if, for example, you're a client waiting for a response from some other
66 application, or if you're a server that has opened a stateful session with a client.
68 If you can't find an existing session for the current message, and the @a my_service
69 parameter has provided an application name, then you're presumably a server receiving
70 something from a new client. Create an application server session to own the new message.
72 Barring various errors and malformations, extract one or more osrfMessages from the
73 transport_message. Pass each one to the appropriate routine for processing, depending
74 on whether you're acting as a client or as a server.
76 struct osrf_app_session_struct* osrf_stack_transport_handler( transport_message* msg,
77 const char* my_service ) {
81 osrfLogSetXid(msg->osrf_xid);
83 osrfLogDebug( OSRF_LOG_MARK, "Transport handler received new message \nfrom %s "
84 "to %s with body \n\n%s\n", msg->sender, msg->recipient, msg->body );
86 if( msg->is_error && ! msg->thread ) {
87 osrfLogWarning( OSRF_LOG_MARK,
88 "!! Received jabber layer error for %s ... exiting\n", msg->sender );
93 if(! msg->thread && ! msg->is_error ) {
94 osrfLogWarning( OSRF_LOG_MARK,
95 "Received a non-error message with no thread trace... dropping");
100 osrfAppSession* session = osrf_app_session_find_session( msg->thread );
102 if( !session && my_service )
103 session = osrf_app_server_session_init( msg->thread, my_service, msg->sender);
111 osrfLogDebug( OSRF_LOG_MARK, "Session [%s] found or built", session->session_id );
113 osrf_app_session_set_remote( session, msg->sender );
114 osrfMessage* arr[OSRF_MAX_MSGS_PER_PACKET];
116 /* Convert the message body into one or more osrfMessages */
117 int num_msgs = osrf_message_deserialize(msg->body, arr, OSRF_MAX_MSGS_PER_PACKET);
119 osrfLogDebug( OSRF_LOG_MARK, "We received %d messages from %s", num_msgs, msg->sender );
121 double starttime = get_timestamp_millis();
124 for( i = 0; i < num_msgs; i++ ) {
126 /* if we've received a jabber layer error message (probably talking to
127 someone who no longer exists) and we're not talking to the original
128 remote id for this server, consider it a redirect and pass it up */
130 osrfLogWarning( OSRF_LOG_MARK, " !!! Received Jabber layer error message" );
132 if( strcmp( session->remote_id, session->orig_remote_id ) ) {
133 osrfLogWarning( OSRF_LOG_MARK, "Treating jabber error as redirect for tt [%d] "
134 "and session [%s]", arr[i]->thread_trace, session->session_id );
136 arr[i]->m_type = STATUS;
137 arr[i]->status_code = OSRF_STATUS_REDIRECTED;
140 osrfLogWarning( OSRF_LOG_MARK, " * Jabber Error is for top level remote "
141 " id [%s], no one to send my message to! Cutting request short...",
142 session->remote_id );
143 session->transport_error = 1;
148 if( session->type == OSRF_SESSION_CLIENT )
149 _do_client( session, arr[i] );
151 _do_server( session, arr[i] );
154 double duration = get_timestamp_millis() - starttime;
155 osrfLogInfo(OSRF_LOG_MARK, "Message processing duration %f", duration);
158 osrfLogDebug( OSRF_LOG_MARK, "after msg delete");
164 If we return a message, that message should be passed up the stack,
165 if we return NULL, we're finished for now...
167 static void _do_client( osrfAppSession* session, osrfMessage* msg ) {
168 if(session == NULL || msg == NULL)
171 osrfMessage* further_msg = NULL;
173 if( msg->m_type == STATUS ) {
175 switch( msg->status_code ) {
178 osrfLogDebug( OSRF_LOG_MARK, "We connected successfully");
179 session->state = OSRF_SESSION_CONNECTED;
180 osrfLogDebug( OSRF_LOG_MARK, "State: %x => %s => %d", session,
181 session->session_id, session->state );
184 case OSRF_STATUS_COMPLETE:
185 osrf_app_session_set_complete( session, msg->thread_trace );
188 case OSRF_STATUS_CONTINUE:
189 osrf_app_session_request_reset_timeout( session, msg->thread_trace );
192 case OSRF_STATUS_REDIRECTED:
193 osrf_app_session_reset_remote( session );
194 session->state = OSRF_SESSION_DISCONNECTED;
195 osrf_app_session_request_resend( session, msg->thread_trace );
198 case OSRF_STATUS_EXPFAILED:
199 osrf_app_session_reset_remote( session );
200 session->state = OSRF_SESSION_DISCONNECTED;
203 case OSRF_STATUS_TIMEOUT:
204 osrf_app_session_reset_remote( session );
205 session->state = OSRF_SESSION_DISCONNECTED;
206 osrf_app_session_request_resend( session, msg->thread_trace );
210 /* Replace the old message with a new one */
211 further_msg = osrf_message_init( RESULT, msg->thread_trace, msg->protocol );
212 osrf_message_set_status_info( further_msg,
213 msg->status_name, msg->status_text, msg->status_code );
214 osrfLogWarning( OSRF_LOG_MARK, "The stack doesn't know what to do with "
215 "the provided message code: %d, name %s. Passing UP.",
216 msg->status_code, msg->status_name );
217 further_msg->is_exception = 1;
218 osrf_app_session_set_complete( session, msg->thread_trace );
222 } else if( msg->m_type == RESULT ) {
227 osrfLogDebug( OSRF_LOG_MARK, "passing client message %d / session %s to app handler",
228 msg->thread_trace, session->session_id );
229 osrf_stack_application_handler( session, further_msg );
232 if(msg != further_msg)
233 osrfMessageFree(msg);
240 If we return a message, that message should be passed up the stack,
241 if we return NULL, we're finished for now...
243 static void _do_server( osrfAppSession* session, osrfMessage* msg ) {
245 if(session == NULL || msg == NULL) return;
247 osrfLogDebug( OSRF_LOG_MARK, "Server received message of type %d", msg->m_type );
249 osrfMessage* further_msg = NULL;
251 switch( msg->m_type ) {
257 /* session will be freed by the forker */
258 osrfLogDebug(OSRF_LOG_MARK, "Client sent explicit disconnect");
259 session->state = OSRF_SESSION_DISCONNECTED;
263 osrfAppSessionStatus( session, OSRF_STATUS_OK,
264 "osrfConnectStatus", msg->thread_trace, "Connection Successful" );
265 session->state = OSRF_SESSION_CONNECTED;
270 osrfLogDebug( OSRF_LOG_MARK, "server passing message %d to application handler "
271 "for session %s", msg->thread_trace, session->session_id );
276 osrfLogWarning( OSRF_LOG_MARK,
277 "Server cannot handle message of type %d", msg->m_type );
278 session->state = OSRF_SESSION_DISCONNECTED;
283 osrfLogDebug( OSRF_LOG_MARK, "passing server message %d / session %s to app handler",
284 msg->thread_trace, session->session_id );
285 osrf_stack_application_handler( session, further_msg );
288 if(msg != further_msg)
289 osrfMessageFree(msg);
296 static void osrf_stack_application_handler( osrfAppSession* session, osrfMessage* msg ) {
297 if(session == NULL || msg == NULL) return;
299 if(msg->m_type == RESULT && session->type == OSRF_SESSION_CLIENT) {
300 /* Enqueue the RESULT message to be processed later */
301 osrf_app_session_push_queue( session, msg );
303 else if(msg->m_type == REQUEST) {
304 char* method = msg->method_name;
305 char* app = session->remote_service;
306 jsonObject* params = msg->_params;
308 osrfAppRunMethod( app, method, session, msg->thread_trace, params );
309 osrfMessageFree(msg);