Changed the way pending requests are stored in an osrfAppSession.
[OpenSRF.git] / src / libopensrf / osrf_app_session.c
1 /**
2         @file osrf_app_session.c
3         @brief Implementation of osrfAppSession.
4 */
5
6 #include <time.h>
7 #include "opensrf/osrf_app_session.h"
8 #include "opensrf/osrf_stack.h"
9
10 struct osrf_app_request_struct {
11         /** The controlling session. */
12         struct osrf_app_session_struct* session;
13
14         /** Request id.  It is the same as the thread_trace of the REQUEST message
15                 for which it was created.
16         */
17         int request_id;
18         /** True if we have received a 'request complete' message from our request. */
19         int complete;
20         /** The original REQUEST message payload. */
21         osrfMessage* payload;
22         /** Linked list of responses to the request. */
23         osrfMessage* result;
24
25         /** Boolean; if true, then a call that is waiting on a response will reset the
26         timeout and set this variable back to false. */
27         int reset_timeout;
28         /** Linkage pointers for a linked list.  We maintain a hash table of pending requests,
29             and each slot of the hash table is a doubly linked list. */
30         osrfAppRequest* next;
31         osrfAppRequest* prev;
32 };
33
34 static inline unsigned int request_id_hash( int req_id );
35 static osrfAppRequest* find_app_request( const osrfAppSession* session, int req_id );
36 static void add_app_request( osrfAppSession* session, osrfAppRequest* req );
37
38 /* Send the given message */
39 static int _osrf_app_session_send( osrfAppSession*, osrfMessage* msg );
40
41 static int osrfAppSessionMakeLocaleRequest(
42                 osrfAppSession* session, const jsonObject* params, const char* method_name,
43                 int protocol, osrfStringArray* param_strings, char* locale );
44
45 /** @brief The global session cache.
46
47         Key: session_id.  Data: osrfAppSession.
48 */
49 static osrfHash* osrfAppSessionCache = NULL;
50
51 // --------------------------------------------------------------------------
52 // Request API
53 // --------------------------------------------------------------------------
54
55 /**
56         @brief Create a new osrfAppRequest.
57         @param session Pointer to the osrfAppSession that will own the new osrfAppRequest.
58         @param msg Pointer to the osrfMessage representing the request.
59         @return Pointer to the new osrfAppRequest.
60
61         The calling code is responsible for freeing the osrfAppRequest by calling
62         _osrf_app_request_free().
63 */
64 static osrfAppRequest* _osrf_app_request_init(
65                 osrfAppSession* session, osrfMessage* msg ) {
66
67         osrfAppRequest* req = safe_malloc(sizeof(osrfAppRequest));
68
69         req->session        = session;
70         req->request_id     = msg->thread_trace;
71         req->complete       = 0;
72         req->payload        = msg;
73         req->result         = NULL;
74         req->reset_timeout  = 0;
75         req->next           = NULL;
76         req->prev           = NULL;
77
78         return req;
79 }
80
81
82 /**
83         @brief Free an osrfAppRequest and everything it owns.
84         @param req Pointer to an osrfAppRequest, cast to a void pointer.
85 */
86 static void _osrf_app_request_free( osrfAppRequest * req ) {
87         if( req ) {
88                 if( req->payload )
89                         osrfMessageFree( req->payload );
90
91                 /* Free the messages in the result queue */
92                 osrfMessage* next_msg;
93                 while( req->result ) {
94                         next_msg = req->result->next;
95                         osrfMessageFree( req->result );
96                         req->result = next_msg;
97                 }
98
99                 free( req );
100         }
101 }
102
103 /**
104         @brief Append a new message to the list of responses to a request.
105         @param req Pointer to the osrfAppRequest for the original REQUEST message.
106         @param result Pointer to an osrfMessage received in response to the request.
107
108         We maintain a linked list of response messages, and traverse it to find the end.
109 */
110 static void _osrf_app_request_push_queue( osrfAppRequest* req, osrfMessage* result ){
111         if(req == NULL || result == NULL)
112                 return;
113
114         osrfLogDebug( OSRF_LOG_MARK, "App Session pushing request [%d] onto request queue",
115                         result->thread_trace );
116         if(req->result == NULL) {
117                 req->result = result;   // Add the first node
118
119         } else {
120
121                 // Find the last node in the list, and append the new node to it
122                 osrfMessage* ptr = req->result;
123                 osrfMessage* ptr2 = req->result->next;
124                 while( ptr2 ) {
125                         ptr = ptr2;
126                         ptr2 = ptr2->next;
127                 }
128                 ptr->next = result;
129         }
130 }
131
132 /**
133         @brief Remove an osrfAppRequest (identified by request_id) from an osrfAppSession.
134         @param session Pointer to the osrfAppSession that owns the osrfAppRequest.
135         @param req_id request_id of the osrfAppRequest to be removed.
136 */
137 void osrf_app_session_request_finish( osrfAppSession* session, int req_id ) {
138
139         if( session ) {
140                 // Search the hash table for the request in question
141                 unsigned int index = request_id_hash( req_id );
142                 osrfAppRequest* old_req = session->request_hash[ index ];
143                 while( old_req ) {
144                         if( old_req->request_id == req_id )
145                                 break;
146                         else
147                                 old_req = old_req->next;
148                 }
149
150                 if( old_req ) {
151                         // Remove the request from the doubly linked list
152                         if( old_req->prev )
153                                 old_req->prev->next = old_req->next;
154                         else
155                                 session->request_hash[ index ] = old_req->next;
156
157                         if( old_req->next )
158                                 old_req->next->prev = old_req->prev;
159
160                         _osrf_app_request_free( old_req );
161                 }
162         }
163 }
164
165 /**
166         @brief Derive a hash key from a request id.
167         @param req_id The request id.
168         @return The corresponding hash key; an index into request_hash[].
169
170         If OSRF_REQUEST_HASH_SIZE is a power of two, then this calculation should
171         reduce to a binary AND.
172 */
173 static inline unsigned int request_id_hash( int req_id ) {
174         return ((unsigned int) req_id ) % OSRF_REQUEST_HASH_SIZE;
175 }
176
177 /**
178         @brief Search for an osrfAppRequest in the hash table, given a request id.
179         @param session Pointer to the relevant osrfAppSession.
180         @param req_id The request_id of the osrfAppRequest being sought.
181         @return A pointer to the osrfAppRequest if found, or NULL if not.
182 */
183 static osrfAppRequest* find_app_request( const osrfAppSession* session, int req_id ) {
184
185         osrfAppRequest* req = session->request_hash[ request_id_hash( req_id) ];
186         while( req ) {
187                 if( req->request_id == req_id )
188                         break;
189                 else
190                         req = req->next;
191         }
192
193         return req;
194 }
195
196 /**
197         @brief Add an osrfAppRequest to the hash table of a given osrfAppSession.
198         @param session Pointer to the session to which the request belongs.
199         @param req Pointer to the osrfAppRequest to be stored.
200
201         Find the right spot in the hash table; then add the request to the linked list at that
202         spot.  We just add it to the head of the list, without trying to maintain any particular
203         ordering.
204 */
205 static void add_app_request( osrfAppSession* session, osrfAppRequest* req ) {
206         if( session && req ) {
207                 unsigned int index = request_id_hash( req->request_id );
208                 req->next = session->request_hash[ index ];
209                 req->prev = NULL;
210                 session->request_hash[ index ] = req;
211         }
212 }
213
214 /**
215         @brief Set the timeout for a request to one second.
216         @param session Pointer to the relevant osrfAppSession.
217         @param req_id Request ID of the request whose timeout is to be reset.
218
219         The request to be reset is identified by the combination of session and request id.
220 */
221 void osrf_app_session_request_reset_timeout( osrfAppSession* session, int req_id ) {
222         if(session == NULL)
223                 return;
224         osrfLogDebug( OSRF_LOG_MARK, "Resetting request timeout %d", req_id );
225         osrfAppRequest* req = find_app_request( session, req_id );
226         if( req )
227                 req->reset_timeout = 1;
228 }
229
230 /**
231         Checks the receive queue for messages.  If any are found, the first
232         is popped off and returned.  Otherwise, this method will wait at most timeout
233         seconds for a message to appear in the receive queue.  Once it arrives it is returned.
234         If no messages arrive in the timeout provided, null is returned.
235 */
236 /**
237         @brief Return the next response message for a given request, subject to a timeout.
238         @param req Pointer to the osrfAppRequest representing the request.
239         @param timeout Maxmimum time to wait, in seconds.
240
241         @return 
242
243         If the input queue for this request is not empty, dequeue the next message and return it.
244 */
245 static osrfMessage* _osrf_app_request_recv( osrfAppRequest* req, int timeout ) {
246
247         if(req == NULL) return NULL;
248
249         if( req->result != NULL ) {
250                 /* Dequeue the next message in the list */
251                 osrfMessage* tmp_msg = req->result;
252                 req->result = req->result->next;
253                 return tmp_msg;
254         }
255
256         time_t start = time(NULL);
257         time_t remaining = (time_t) timeout;
258
259         while( remaining >= 0 ) {
260                 /* tell the session to wait for stuff */
261                 osrfLogDebug( OSRF_LOG_MARK,  "In app_request receive with remaining time [%d]",
262                                 (int) remaining );
263
264                 osrf_app_session_queue_wait( req->session, 0, NULL );
265                 if(req->session->transport_error) {
266                         osrfLogError(OSRF_LOG_MARK, "Transport error in recv()");
267                         return NULL;
268                 }
269
270                 if( req->result != NULL ) { /* if we received anything */
271                         /* pop off the first message in the list */
272                         osrfLogDebug( OSRF_LOG_MARK, "app_request_recv received a message, returning it" );
273                         osrfMessage* ret_msg = req->result;
274                         req->result = ret_msg->next;
275                         if (ret_msg->sender_locale)
276                                 osrf_app_session_set_locale(req->session, ret_msg->sender_locale);
277
278                         return ret_msg;
279                 }
280
281                 if( req->complete )
282                         return NULL;
283
284                 osrf_app_session_queue_wait( req->session, (int) remaining, NULL );
285
286                 if(req->session->transport_error) {
287                         osrfLogError(OSRF_LOG_MARK, "Transport error in recv()");
288                         return NULL;
289                 }
290
291                 if( req->result != NULL ) { /* if we received anything */
292                         /* pop off the first message in the list */
293                         osrfLogDebug( OSRF_LOG_MARK,  "app_request_recv received a message, returning it");
294                         osrfMessage* ret_msg = req->result;
295                         req->result = ret_msg->next;
296                         if (ret_msg->sender_locale)
297                                 osrf_app_session_set_locale(req->session, ret_msg->sender_locale);
298
299                         return ret_msg;
300                 }
301                 if( req->complete )
302                         return NULL;
303
304                 if(req->reset_timeout) {
305                         remaining = (time_t) timeout;
306                         req->reset_timeout = 0;
307                         osrfLogDebug( OSRF_LOG_MARK, "Received a timeout reset");
308                 } else {
309                         remaining -= (int) (time(NULL) - start);
310                 }
311         }
312
313         char* paramString = jsonObjectToJSON(req->payload->_params);
314         osrfLogInfo( OSRF_LOG_MARK, "Returning NULL from app_request_recv after timeout: %s %s",
315                 req->payload->method_name, paramString);
316         free(paramString);
317
318         return NULL;
319 }
320
321 /** Resend this requests original request message */
322 static int _osrf_app_request_resend( osrfAppRequest* req ) {
323         if(req == NULL) return 0;
324         if(!req->complete) {
325                 osrfLogDebug( OSRF_LOG_MARK,  "Resending request [%d]", req->request_id );
326                 return _osrf_app_session_send( req->session, req->payload );
327         }
328         return 1;
329 }
330
331
332 // --------------------------------------------------------------------------
333 // Session API
334 // --------------------------------------------------------------------------
335
336 /** Install a locale for the session */
337 char* osrf_app_session_set_locale( osrfAppSession* session, const char* locale ) {
338         if (!session || !locale)
339                 return NULL;
340
341         if(session->session_locale) {
342                 if( strlen(session->session_locale) >= strlen(locale) ) {
343                         /* There's room available; just copy */
344                         strcpy(session->session_locale, locale);
345                 } else {
346                         free(session->session_locale);
347                         session->session_locale = strdup( locale );
348                 }
349         } else {
350                 session->session_locale = strdup( locale );
351         }
352
353         return session->session_locale;
354 }
355
356 /**
357         @brief Find the osrfAppSession for a given session id.
358         @param session_id The session id to look for.
359         @return Pointer to the corresponding osrfAppSession if found, or NULL if not.
360
361         Search the global session cache for the specified session id.
362 */
363 osrfAppSession* osrf_app_session_find_session( const char* session_id ) {
364         if(session_id)
365                 return osrfHashGet( osrfAppSessionCache, session_id );
366         return NULL;
367 }
368
369
370 /**
371         @brief Add a session to the global session cache, keyed by session id.
372         @param session Pointer to the osrfAppSession to be added.
373
374         If a cache doesn't exist yet, create one.  It's an osrfHash using session ids for the
375         key and osrfAppSessions for the data.
376 */
377 static void _osrf_app_session_push_session( osrfAppSession* session ) {
378         if( session ) {
379                 if( osrfAppSessionCache == NULL )
380                         osrfAppSessionCache = osrfNewHash();
381                 if( osrfHashGet( osrfAppSessionCache, session->session_id ) )
382                         return;   // A session with this id is already in the cache.  Shouldn't happen.
383                 osrfHashSet( osrfAppSessionCache, session, session->session_id );
384         }
385 }
386
387 /** Allocates and initializes a new app_session */
388
389 osrfAppSession* osrfAppSessionClientInit( const char* remote_service ) {
390
391         if (!remote_service) {
392                 osrfLogWarning( OSRF_LOG_MARK, "No remote service specified in osrfAppSessionClientInit");
393                 return NULL;
394         }
395
396         osrfAppSession* session = safe_malloc(sizeof(osrfAppSession));
397
398         session->transport_handle = osrfSystemGetTransportClient();
399         if( session->transport_handle == NULL ) {
400                 osrfLogWarning( OSRF_LOG_MARK, "No transport client for service 'client'");
401                 free( session );
402                 return NULL;
403         }
404
405         osrfStringArray* arr = osrfNewStringArray(8);
406         osrfConfigGetValueList(NULL, arr, "/domain");
407         const char* domain = osrfStringArrayGetString(arr, 0);
408
409         if (!domain) {
410                 osrfLogWarning( OSRF_LOG_MARK, "No domains specified in the OpenSRF config file");
411                 free( session );
412                 osrfStringArrayFree(arr);
413                 return NULL;
414         }
415
416         char* router_name = osrfConfigGetValue(NULL, "/router_name");
417         if (!router_name) {
418                 osrfLogWarning( OSRF_LOG_MARK, "No router name specified in the OpenSRF config file");
419                 free( session );
420                 osrfStringArrayFree(arr);
421                 return NULL;
422         }
423
424         char target_buf[512];
425         target_buf[ 0 ] = '\0';
426
427         int len = snprintf( target_buf, sizeof(target_buf), "%s@%s/%s",
428                         router_name ? router_name : "(null)",
429                         domain ? domain : "(null)",
430                         remote_service ? remote_service : "(null)" );
431         osrfStringArrayFree(arr);
432         //free(domain);
433         free(router_name);
434
435         if( len >= sizeof( target_buf ) ) {
436                 osrfLogWarning( OSRF_LOG_MARK, "Buffer overflow for remote_id");
437                 free( session );
438                 return NULL;
439         }
440
441         session->remote_id = strdup(target_buf);
442         session->orig_remote_id = strdup(session->remote_id);
443         session->remote_service = strdup(remote_service);
444         session->session_locale = NULL;
445         session->transport_error = 0;
446
447         #ifdef ASSUME_STATELESS
448         session->stateless = 1;
449         osrfLogDebug( OSRF_LOG_MARK, "%s session is stateless", remote_service );
450         #else
451         session->stateless = 0;
452         osrfLogDebug( OSRF_LOG_MARK, "%s session is NOT stateless", remote_service );
453         #endif
454
455         /* build a chunky, random session id */
456         char id[256];
457
458         snprintf(id, sizeof(id), "%f.%d%ld", get_timestamp_millis(), (int)time(NULL), (long) getpid());
459         session->session_id = strdup(id);
460         osrfLogDebug( OSRF_LOG_MARK,  "Building a new client session with id [%s] [%s]",
461                         session->remote_service, session->session_id );
462
463         session->thread_trace = 0;
464         session->state = OSRF_SESSION_DISCONNECTED;
465         session->type = OSRF_SESSION_CLIENT;
466
467         session->userData = NULL;
468         session->userDataFree = NULL;
469
470         // Initialize the hash table
471         int i;
472
473         for( i = 0; i < OSRF_REQUEST_HASH_SIZE; ++i )
474                 session->request_hash[ i ] = NULL;
475
476         _osrf_app_session_push_session( session );
477         return session;
478 }
479
480 osrfAppSession* osrf_app_server_session_init(
481                 const char* session_id, const char* our_app, const char* remote_id ) {
482
483         osrfLogDebug( OSRF_LOG_MARK, "Initing server session with session id %s, service %s,"
484                         " and remote_id %s", session_id, our_app, remote_id );
485
486         osrfAppSession* session = osrf_app_session_find_session( session_id );
487         if(session) return session;
488
489         session = safe_malloc(sizeof(osrfAppSession));
490
491         session->transport_handle = osrfSystemGetTransportClient();
492         if( session->transport_handle == NULL ) {
493                 osrfLogWarning( OSRF_LOG_MARK, "No transport client for service '%s'", our_app );
494                 free(session);
495                 return NULL;
496         }
497
498         int stateless = 0;
499         char* statel = osrf_settings_host_value("/apps/%s/stateless", our_app );
500         if(statel) stateless = atoi(statel);
501         free(statel);
502
503         session->remote_id = strdup(remote_id);
504         session->orig_remote_id = strdup(remote_id);
505         session->session_id = strdup(session_id);
506         session->remote_service = strdup(our_app);
507         session->stateless = stateless;
508
509         #ifdef ASSUME_STATELESS
510         session->stateless = 1;
511         #else
512         session->stateless = 0;
513         #endif
514
515         session->thread_trace = 0;
516         session->state = OSRF_SESSION_DISCONNECTED;
517         session->type = OSRF_SESSION_SERVER;
518         session->session_locale = NULL;
519
520         session->userData = NULL;
521         session->userDataFree = NULL;
522
523         // Initialize the hash table
524         int i;
525
526         for( i = 0; i < OSRF_REQUEST_HASH_SIZE; ++i )
527                 session->request_hash[ i ] = NULL;
528
529         _osrf_app_session_push_session( session );
530         return session;
531 }
532
533 /**
534         @brief Create a REQUEST message, send it, and save it for future reference.
535         @param session Pointer to the current session, which has the addressing information.
536         @param params One way of specifying the parameters for the method.
537         @param method_name The name of the method to be called.
538         @param protocol Protocol.
539         @param param_strings Another way of specifying the parameters for the method.
540         @return The request ID of the resulting REQUEST message, or -1 upon error.
541
542         DEPRECATED.  Use osrfAppSessionSendRequest() instead.  It is identical except that it
543         doesn't use the param_strings argument, which is redundant, confusing, and unused.
544
545         If @a params is non-NULL, use it to specify the parameters to the method.  Otherwise
546         use @a param_strings.
547
548         If @a params points to a JSON_ARRAY, then pass each element of the array as a separate
549         parameter.  If @a params points to any other kind of jsonObject, pass it as a single
550         parameter.
551
552         If @a params is NULL, and @a param_strings is not NULL, then each pointer in the
553         osrfStringArray must point to a JSON string encoding a parameter.  Pass them.
554
555         At this writing, all calls to this function use @a params to pass parameters, rather than
556         @a param_strings.
557
558         This function is a thin wrapper for osrfAppSessionMakeLocaleRequest().
559 */
560 int osrfAppSessionMakeRequest(
561                 osrfAppSession* session, const jsonObject* params,
562                 const char* method_name, int protocol, osrfStringArray* param_strings ) {
563
564         osrfLogWarning( OSRF_LOG_MARK, "Function osrfAppSessionMakeRequest() is deprecasted; "
565                         "call osrfAppSessionSendRequest() instead" );
566         return osrfAppSessionMakeLocaleRequest( session, params,
567                         method_name, protocol, param_strings, NULL );
568 }
569
570 /**
571         @brief Create a REQUEST message, send it, and save it for future reference.
572         @param session Pointer to the current session, which has the addressing information.
573         @param params One way of specifying the parameters for the method.
574         @param method_name The name of the method to be called.
575         @param protocol Protocol.
576         @return The request ID of the resulting REQUEST message, or -1 upon error.
577
578         If @a params points to a JSON_ARRAY, then pass each element of the array as a separate
579         parameter.  If @a params points to any other kind of jsonObject, pass it as a single
580         parameter.
581
582         This function is a thin wrapper for osrfAppSessionMakeLocaleRequest().
583 */
584 int osrfAppSessionSendRequest( osrfAppSession* session, const jsonObject* params,
585                 const char* method_name, int protocol ) {
586
587         return osrfAppSessionMakeLocaleRequest( session, params,
588                  method_name, protocol, NULL, NULL );
589 }
590
591 /**
592         @brief Create a REQUEST message, send it, and save it for future reference.
593         @param session Pointer to the current session, which has the addressing information.
594         @param params One way of specifying the parameters for the method.
595         @param method_name The name of the method to be called.
596         @param protocol Protocol.
597         @param param_strings Another way of specifying the parameters for the method.
598         @param locale Pointer to a locale string.
599         @return The request ID of the resulting REQUEST message, or -1 upon error.
600
601         See the discussion of osrfAppSessionSendRequest(), which at this writing is the only
602         place that calls this function, except for the similar but deprecated function
603         osrfAppSessionMakeRequest().
604
605         At this writing, the @a param_strings and @a locale parameters are always NULL.
606 */
607 static int osrfAppSessionMakeLocaleRequest(
608                 osrfAppSession* session, const jsonObject* params, const char* method_name,
609                 int protocol, osrfStringArray* param_strings, char* locale ) {
610
611         if(session == NULL) return -1;
612
613         osrfLogMkXid();
614
615         osrfMessage* req_msg = osrf_message_init( REQUEST, ++(session->thread_trace), protocol );
616         osrf_message_set_method(req_msg, method_name);
617
618         if (locale) {
619                 osrf_message_set_locale(req_msg, locale);
620         } else if (session->session_locale) {
621                 osrf_message_set_locale(req_msg, session->session_locale);
622         }
623
624         if(params) {
625                 osrf_message_set_params(req_msg, params);
626
627         } else {
628
629                 if(param_strings) {
630                         int i;
631                         for(i = 0; i!= param_strings->size ; i++ ) {
632                                 osrf_message_add_param(req_msg,
633                                         osrfStringArrayGetString(param_strings,i));
634                         }
635                 }
636         }
637
638         osrfAppRequest* req = _osrf_app_request_init( session, req_msg );
639         if(_osrf_app_session_send( session, req_msg ) ) {
640                 osrfLogWarning( OSRF_LOG_MARK,  "Error sending request message [%d]",
641                                 session->thread_trace );
642                 _osrf_app_request_free(req);
643                 return -1;
644         }
645
646         osrfLogDebug( OSRF_LOG_MARK,  "Pushing [%d] onto request queue for session [%s] [%s]",
647                         req->request_id, session->remote_service, session->session_id );
648         add_app_request( session, req );
649         return req->request_id;
650 }
651
652 void osrf_app_session_set_complete( osrfAppSession* session, int request_id ) {
653         if(session == NULL)
654                 return;
655
656         osrfAppRequest* req = find_app_request( session, request_id );
657         if(req) req->complete = 1;
658 }
659
660 int osrf_app_session_request_complete( const osrfAppSession* session, int request_id ) {
661         if(session == NULL)
662                 return 0;
663         osrfAppRequest* req = find_app_request( session, request_id );
664         if(req)
665                 return req->complete;
666         return 0;
667 }
668
669
670 /** Resets the remote connection id to that of the original*/
671 void osrf_app_session_reset_remote( osrfAppSession* session ){
672         if( session==NULL )
673                 return;
674
675         osrfLogDebug( OSRF_LOG_MARK,  "App Session [%s] [%s] resetting remote id to %s",
676                         session->remote_service, session->session_id, session->orig_remote_id );
677
678         osrf_app_session_set_remote( session, session->orig_remote_id );
679 }
680
681 void osrf_app_session_set_remote( osrfAppSession* session, const char* remote_id ) {
682         if(session == NULL)
683                 return;
684
685         if( session->remote_id ) {
686                 if( strlen(session->remote_id) >= strlen(remote_id) ) {
687                         // There's enough room; just copy it
688                         strcpy(session->remote_id, remote_id);
689                 } else {
690                         free(session->remote_id );
691                         session->remote_id = strdup( remote_id );
692                 }
693         } else
694                 session->remote_id = strdup( remote_id );
695 }
696
697 /**
698         pushes the given message into the result list of the app_request
699         with the given request_id
700 */
701 int osrf_app_session_push_queue(
702                 osrfAppSession* session, osrfMessage* msg ){
703         if(session == NULL || msg == NULL) return 0;
704
705         osrfAppRequest* req = find_app_request( session, msg->thread_trace );
706         if(req == NULL) return 0;
707         _osrf_app_request_push_queue( req, msg );
708
709         return 0;
710 }
711
712 /** Attempts to connect to the remote service */
713 int osrfAppSessionConnect( osrfAppSession* session ) {
714
715         if(session == NULL)
716                 return 0;
717
718         if(session->state == OSRF_SESSION_CONNECTED) {
719                 return 1;
720         }
721
722         int timeout = 5; /* XXX CONFIG VALUE */
723
724         osrfLogDebug( OSRF_LOG_MARK,  "AppSession connecting to %s", session->remote_id );
725
726         /* defaulting to protocol 1 for now */
727         osrfMessage* con_msg = osrf_message_init( CONNECT, session->thread_trace, 1 );
728         osrf_app_session_reset_remote( session );
729         session->state = OSRF_SESSION_CONNECTING;
730         int ret = _osrf_app_session_send( session, con_msg );
731         osrfMessageFree(con_msg);
732         if(ret)
733                 return 0;
734
735         time_t start = time(NULL);
736         time_t remaining = (time_t) timeout;
737
738         while( session->state != OSRF_SESSION_CONNECTED && remaining >= 0 ) {
739                 osrf_app_session_queue_wait( session, remaining, NULL );
740                 if(session->transport_error) {
741                         osrfLogError(OSRF_LOG_MARK, "cannot communicate with %s", session->remote_service);
742                         return 0;
743                 }
744                 remaining -= (int) (time(NULL) - start);
745         }
746
747         if(session->state == OSRF_SESSION_CONNECTED)
748                 osrfLogDebug( OSRF_LOG_MARK, " * Connected Successfully to %s", session->remote_service );
749
750         if(session->state != OSRF_SESSION_CONNECTED)
751                 return 0;
752
753         return 1;
754 }
755
756
757
758 /** Disconnects from the remote service */
759 int osrf_app_session_disconnect( osrfAppSession* session){
760         if(session == NULL)
761                 return 1;
762
763         if(session->state == OSRF_SESSION_DISCONNECTED)
764                 return 1;
765
766         if(session->stateless && session->state != OSRF_SESSION_CONNECTED) {
767                 osrfLogDebug( OSRF_LOG_MARK,
768                                 "Exiting disconnect on stateless session %s",
769                                 session->session_id);
770                 return 1;
771         }
772
773         osrfLogDebug(OSRF_LOG_MARK,  "AppSession disconnecting from %s", session->remote_id );
774
775         osrfMessage* dis_msg = osrf_message_init( DISCONNECT, session->thread_trace, 1 );
776         _osrf_app_session_send( session, dis_msg );
777         session->state = OSRF_SESSION_DISCONNECTED;
778
779         osrfMessageFree( dis_msg );
780         osrf_app_session_reset_remote( session );
781         return 1;
782 }
783
784 int osrf_app_session_request_resend( osrfAppSession* session, int req_id ) {
785         osrfAppRequest* req = find_app_request( session, req_id );
786         return _osrf_app_request_resend( req );
787 }
788
789
790 static int osrfAppSessionSendBatch( osrfAppSession* session, osrfMessage* msgs[], int size ) {
791
792         if( !(session && msgs && size > 0) ) return 0;
793         int retval = 0;
794
795         osrfMessage* msg = msgs[0];
796
797         if(msg) {
798
799                 osrf_app_session_queue_wait( session, 0, NULL );
800
801                 if(session->state != OSRF_SESSION_CONNECTED)  {
802
803                         if(session->stateless) { /* stateless session always send to the root listener */
804                                 osrf_app_session_reset_remote(session);
805
806                         } else {
807
808                                 /* do an auto-connect if necessary */
809                                 if( ! session->stateless &&
810                                         (msg->m_type != CONNECT) &&
811                                         (msg->m_type != DISCONNECT) &&
812                                         (session->state != OSRF_SESSION_CONNECTED) ) {
813
814                                         if(!osrfAppSessionConnect( session ))
815                                                 return 0;
816                                 }
817                         }
818                 }
819         }
820
821         char* string = osrfMessageSerializeBatch(msgs, size);
822
823         if( string ) {
824
825                 transport_message* t_msg = message_init(
826                                 string, "", session->session_id, session->remote_id, NULL );
827                 message_set_osrf_xid( t_msg, osrfLogGetXid() );
828
829                 retval = client_send_message( session->transport_handle, t_msg );
830
831                 if( retval ) osrfLogError(OSRF_LOG_MARK, "client_send_message failed");
832
833                 osrfLogInfo(OSRF_LOG_MARK, "[%s] sent %d bytes of data to %s",
834                         session->remote_service, strlen(string), t_msg->recipient );
835
836                 osrfLogDebug(OSRF_LOG_MARK, "Sent: %s", string );
837
838                 free(string);
839                 message_free( t_msg );
840         }
841
842         return retval;
843 }
844
845
846
847 static int _osrf_app_session_send( osrfAppSession* session, osrfMessage* msg ){
848         if( !(session && msg) ) return 0;
849         osrfMessage* a[1];
850         a[0] = msg;
851         return osrfAppSessionSendBatch( session, a, 1 );
852 }
853
854
855 /**
856         Waits up to 'timeout' seconds for some data to arrive.
857         Any data that arrives will be processed according to its
858         payload and message type.  This method will return after
859         any data has arrived.
860 */
861 int osrf_app_session_queue_wait( osrfAppSession* session, int timeout, int* recvd ){
862         if(session == NULL) return 0;
863         osrfLogDebug(OSRF_LOG_MARK, "AppSession in queue_wait with timeout %d", timeout );
864         return osrf_stack_process(session->transport_handle, timeout, recvd);
865 }
866
867 /** Disconnects (if client) and removes the given session from the global session cache
868         ! This frees all attached app_requests !
869 */
870 void osrfAppSessionFree( osrfAppSession* session ){
871         if(session == NULL) return;
872
873         /* Disconnect */
874
875         osrfLogDebug(OSRF_LOG_MARK,  "AppSession [%s] [%s] destroying self and deleting requests",
876                         session->remote_service, session->session_id );
877         if(session->type == OSRF_SESSION_CLIENT
878                         && session->state != OSRF_SESSION_DISCONNECTED ) { /* disconnect if we're a client */
879                 osrfMessage* dis_msg = osrf_message_init( DISCONNECT, session->thread_trace, 1 );
880                 _osrf_app_session_send( session, dis_msg );
881                 osrfMessageFree(dis_msg);
882         }
883
884         /* Remove self from the global session cache */
885
886         osrfHashRemove( osrfAppSessionCache, session->session_id );
887
888         /* Free the memory */
889
890         if( session->userDataFree && session->userData )
891                 session->userDataFree(session->userData);
892
893         if(session->session_locale)
894                 free(session->session_locale);
895
896         free(session->remote_id);
897         free(session->orig_remote_id);
898         free(session->session_id);
899         free(session->remote_service);
900         
901         // Free the request hash
902         int i;
903         for( i = 0; i < OSRF_REQUEST_HASH_SIZE; ++i ) {
904                 _osrf_app_request_free( session->request_hash[ i ] );
905         }
906         free(session);
907 }
908
909 osrfMessage* osrfAppSessionRequestRecv(
910                 osrfAppSession* session, int req_id, int timeout ) {
911         if(req_id < 0 || session == NULL)
912                 return NULL;
913         osrfAppRequest* req = find_app_request( session, req_id );
914         return _osrf_app_request_recv( req, timeout );
915 }
916
917 int osrfAppRequestRespond( osrfAppSession* ses, int requestId, const jsonObject* data ) {
918         if(!ses || ! data ) return -1;
919
920         osrfMessage* msg = osrf_message_init( RESULT, requestId, 1 );
921         osrf_message_set_status_info( msg, NULL, "OK", OSRF_STATUS_OK );
922         char* json = jsonObjectToJSON( data );
923
924         osrf_message_set_result_content( msg, json );
925         _osrf_app_session_send( ses, msg );
926
927         free(json);
928         osrfMessageFree( msg );
929
930         return 0;
931 }
932
933
934 int osrfAppRequestRespondComplete(
935                 osrfAppSession* ses, int requestId, const jsonObject* data ) {
936
937         osrfMessage* status = osrf_message_init( STATUS, requestId, 1);
938         osrf_message_set_status_info( status, "osrfConnectStatus", "Request Complete",
939                         OSRF_STATUS_COMPLETE );
940
941         if (data) {
942                 osrfMessage* payload = osrf_message_init( RESULT, requestId, 1 );
943                 osrf_message_set_status_info( payload, NULL, "OK", OSRF_STATUS_OK );
944
945                 char* json = jsonObjectToJSON( data );
946                 osrf_message_set_result_content( payload, json );
947                 free(json);
948
949                 osrfMessage* ms[2];
950                 ms[0] = payload;
951                 ms[1] = status;
952
953                 osrfAppSessionSendBatch( ses, ms, 2 );
954
955                 osrfMessageFree( payload );
956         } else {
957                 osrfAppSessionSendBatch( ses, &status, 1 );
958         }
959
960         osrfMessageFree( status );
961
962         return 0;
963 }
964
965 int osrfAppSessionStatus( osrfAppSession* ses, int type,
966                 const char* name, int reqId, const char* message ) {
967
968         if(ses) {
969                 osrfMessage* msg = osrf_message_init( STATUS, reqId, 1);
970                 osrf_message_set_status_info( msg, name, message, type );
971                 _osrf_app_session_send( ses, msg );
972                 osrfMessageFree( msg );
973                 return 0;
974         }
975         return -1;
976 }
977
978 /**
979         @brief Free the global session cache.
980         
981         Note that the osrfHash that implements the global session cache does @em not have a
982         callback function installed for freeing its cargo.  As a result, any outstanding
983         osrfAppSessions are leaked, along with all the osrfAppRequests and osrfMessages they
984         own.
985 */
986 void osrfAppSessionCleanup( void ) {
987         osrfHashFree(osrfAppSessionCache);
988         osrfAppSessionCache = NULL;
989 }
990
991
992
993
994