2709d690174e8e52078162642d3bcb38a2529d97
[OpenSRF.git] / src / libopensrf / osrf_app_session.c
1 /**
2         @file osrf_app_session.c
3         @brief Implementation of osrfAppSession.
4 */
5
6 #include <time.h>
7 #include "opensrf/osrf_app_session.h"
8 #include "opensrf/osrf_stack.h"
9
10 struct osrf_app_request_struct {
11         /** The controlling session. */
12         struct osrf_app_session_struct* session;
13
14         /** Request id.  It is the same as the thread_trace of the REQUEST message
15                 for which it was created.
16         */
17         int request_id;
18         /** True if we have received a 'request complete' message from our request. */
19         int complete;
20         /** The original REQUEST message payload. */
21         osrfMessage* payload;
22         /** Linked list of responses to the request. */
23         osrfMessage* result;
24
25         /** Boolean; if true, then a call that is waiting on a response will reset the
26         timeout and set this variable back to false. */
27         int reset_timeout;
28 };
29 typedef struct osrf_app_request_struct osrfAppRequest;
30
31 /* Send the given message */
32 static int _osrf_app_session_send( osrfAppSession*, osrfMessage* msg );
33
34 static int osrfAppSessionMakeLocaleRequest(
35                 osrfAppSession* session, const jsonObject* params, const char* method_name,
36                 int protocol, osrfStringArray* param_strings, char* locale );
37
38 /** @brief The global session cache.
39
40         Key: session_id.  Data: osrfAppSession.
41 */
42 static osrfHash* osrfAppSessionCache = NULL;
43
44 // --------------------------------------------------------------------------
45 // Request API
46 // --------------------------------------------------------------------------
47
48 /**
49         @brief Create a new osrfAppRequest.
50         @param session Pointer to the osrfAppSession that will own the new osrfAppRequest.
51         @param msg Pointer to the osrfMessage representing the request.
52         @return Pointer to the new osrfAppRequest.
53
54         The calling code is responsible for freeing the osrfAppRequest by calling
55         _osrf_app_request_free().  In practice this normally happens via a callback installed
56         in an osrfList.
57 */
58 static osrfAppRequest* _osrf_app_request_init(
59                 osrfAppSession* session, osrfMessage* msg ) {
60
61         osrfAppRequest* req =
62                 (osrfAppRequest*) safe_malloc(sizeof(osrfAppRequest));
63
64         req->session        = session;
65         req->request_id     = msg->thread_trace;
66         req->complete       = 0;
67         req->payload        = msg;
68         req->result         = NULL;
69         req->reset_timeout  = 0;
70
71         return req;
72 }
73
74
75 /**
76         @brief Free an osrfAppRequest and everything it owns.
77         @param req Pointer to an osrfAppRequest, cast to a void pointer.
78
79         This function is installed as a callback in the osrfList request_queue, a member
80         of osrfAppSession.
81 */
82 static void _osrf_app_request_free( void * req ){
83         if( req ) {
84                 osrfAppRequest* r = (osrfAppRequest*) req;
85                 if( r->payload )
86                         osrfMessageFree( r->payload );
87
88                 /* Free the messages in the result queue */
89                 osrfMessage* next_msg;
90                 while( r->result ) {
91                         next_msg = r->result->next;
92                         osrfMessageFree( r->result );
93                         r->result = next_msg;
94                 }
95
96                 free( r );
97         }
98 }
99
100 /**
101         @brief Append a new message to the list of responses to a request.
102         @param req Pointer to the osrfAppRequest for the original REQUEST message.
103         @param result Pointer to an osrfMessage received in response to the request.
104
105         We maintain a linked list of response messages, and traverse it to find the end.
106 */
107 static void _osrf_app_request_push_queue( osrfAppRequest* req, osrfMessage* result ){
108         if(req == NULL || result == NULL)
109                 return;
110
111         osrfLogDebug( OSRF_LOG_MARK, "App Session pushing request [%d] onto request queue",
112                         result->thread_trace );
113         if(req->result == NULL) {
114                 req->result = result;   // Add the first node
115
116         } else {
117
118                 // Find the last node in the list, and append the new node to it
119                 osrfMessage* ptr = req->result;
120                 osrfMessage* ptr2 = req->result->next;
121                 while( ptr2 ) {
122                         ptr = ptr2;
123                         ptr2 = ptr2->next;
124                 }
125                 ptr->next = result;
126         }
127 }
128
129 /**
130         @brief Remove an osrfAppRequest (identified by request_id) from an osrfAppSession.
131         @param session Pointer to the osrfAppSession that owns the osrfAppRequest.
132         @param req_id request_id of the osrfAppRequest to be removed.
133
134         The osrfAppRequest itself is freed by a callback installed in the osrfList where it resides.
135 */
136 void osrf_app_session_request_finish(
137                 osrfAppSession* session, int req_id ){
138
139         if( session ) {
140                 osrfAppRequest* req = OSRF_LIST_GET_INDEX( session->request_queue, req_id );
141                 if(req)
142                         osrfListRemove( req->session->request_queue, req->request_id );
143         }
144 }
145
146
147 /**
148         @brief Set the timeout for a request to one second.
149         @param session Pointer to the relevant osrfAppSession.
150         @param req_id Request ID of the request whose timeout is to be reset.
151
152         The request to be reset is identified by the combination of session and request id.
153 */
154 void osrf_app_session_request_reset_timeout( osrfAppSession* session, int req_id ) {
155         if(session == NULL)
156                 return;
157         osrfLogDebug( OSRF_LOG_MARK, "Resetting request timeout %d", req_id );
158         osrfAppRequest* req = OSRF_LIST_GET_INDEX( session->request_queue, req_id );
159         if( req )
160                 req->reset_timeout = 1;
161 }
162
163 /**
164         Checke the receive queue for messages.  If any are found, the first
165         is popped off and returned.  Otherwise, this method will wait at most timeout
166         seconds for a message to appear in the receive queue.  Once it arrives it is returned.
167         If no messages arrive in the timeout provided, null is returned.
168 */
169 /**
170         @brief Return the next response message for a given request, subject to a timeout.
171         @param req Pointer to the osrfAppRequest representing the request.
172         @param timeout Maxmimum time to wait, in seconds.
173
174         @return 
175
176         If the input queue for this request is not empty, dequeue the next message and return it.
177 */
178 static osrfMessage* _osrf_app_request_recv( osrfAppRequest* req, int timeout ) {
179
180         if(req == NULL) return NULL;
181
182         if( req->result != NULL ) {
183                 /* Dequeue the next message in the list */
184                 osrfMessage* tmp_msg = req->result;
185                 req->result = req->result->next;
186                 return tmp_msg;
187         }
188
189         time_t start = time(NULL);
190         time_t remaining = (time_t) timeout;
191
192         while( remaining >= 0 ) {
193                 /* tell the session to wait for stuff */
194                 osrfLogDebug( OSRF_LOG_MARK,  "In app_request receive with remaining time [%d]",
195                                 (int) remaining );
196
197                 osrf_app_session_queue_wait( req->session, 0, NULL );
198                 if(req->session->transport_error) {
199                         osrfLogError(OSRF_LOG_MARK, "Transport error in recv()");
200                         return NULL;
201                 }
202
203                 if( req->result != NULL ) { /* if we received anything */
204                         /* pop off the first message in the list */
205                         osrfLogDebug( OSRF_LOG_MARK, "app_request_recv received a message, returning it" );
206                         osrfMessage* ret_msg = req->result;
207                         req->result = ret_msg->next;
208                         if (ret_msg->sender_locale)
209                                 osrf_app_session_set_locale(req->session, ret_msg->sender_locale);
210
211                         return ret_msg;
212                 }
213
214                 if( req->complete )
215                         return NULL;
216
217                 osrf_app_session_queue_wait( req->session, (int) remaining, NULL );
218
219                 if(req->session->transport_error) {
220                         osrfLogError(OSRF_LOG_MARK, "Transport error in recv()");
221                         return NULL;
222                 }
223
224                 if( req->result != NULL ) { /* if we received anything */
225                         /* pop off the first message in the list */
226                         osrfLogDebug( OSRF_LOG_MARK,  "app_request_recv received a message, returning it");
227                         osrfMessage* ret_msg = req->result;
228                         req->result = ret_msg->next;
229                         if (ret_msg->sender_locale)
230                                 osrf_app_session_set_locale(req->session, ret_msg->sender_locale);
231
232                         return ret_msg;
233                 }
234                 if( req->complete )
235                         return NULL;
236
237                 if(req->reset_timeout) {
238                         remaining = (time_t) timeout;
239                         req->reset_timeout = 0;
240                         osrfLogDebug( OSRF_LOG_MARK, "Received a timeout reset");
241                 } else {
242                         remaining -= (int) (time(NULL) - start);
243                 }
244         }
245
246         char* paramString = jsonObjectToJSON(req->payload->_params);
247         osrfLogInfo( OSRF_LOG_MARK, "Returning NULL from app_request_recv after timeout: %s %s",
248                 req->payload->method_name, paramString);
249         free(paramString);
250
251         return NULL;
252 }
253
254 /** Resend this requests original request message */
255 static int _osrf_app_request_resend( osrfAppRequest* req ) {
256         if(req == NULL) return 0;
257         if(!req->complete) {
258                 osrfLogDebug( OSRF_LOG_MARK,  "Resending request [%d]", req->request_id );
259                 return _osrf_app_session_send( req->session, req->payload );
260         }
261         return 1;
262 }
263
264
265 // --------------------------------------------------------------------------
266 // Session API
267 // --------------------------------------------------------------------------
268
269 /** Install a locale for the session */
270 char* osrf_app_session_set_locale( osrfAppSession* session, const char* locale ) {
271         if (!session || !locale)
272                 return NULL;
273
274         if(session->session_locale) {
275                 if( strlen(session->session_locale) >= strlen(locale) ) {
276                         /* There's room available; just copy */
277                         strcpy(session->session_locale, locale);
278                 } else {
279                         free(session->session_locale);
280                         session->session_locale = strdup( locale );
281                 }
282         } else {
283                 session->session_locale = strdup( locale );
284         }
285
286         return session->session_locale;
287 }
288
289 /**
290         @brief Find the osrfAppSession for a given session id.
291         @param session_id The session id to look for.
292         @return Pointer to the corresponding osrfAppSession if found, or NULL if not.
293
294         Search the global session cache for the specified session id.
295 */
296 osrfAppSession* osrf_app_session_find_session( const char* session_id ) {
297         if(session_id)
298                 return osrfHashGet( osrfAppSessionCache, session_id );
299         return NULL;
300 }
301
302
303 /**
304         @brief Add a session to the global session cache, keyed by session id.
305         @param session Pointer to the osrfAppSession to be added.
306
307         If a cache doesn't exist yet, create one.  It's an osrfHash using session ids for the
308         key and osrfAppSessions for the data.
309 */
310 static void _osrf_app_session_push_session( osrfAppSession* session ) {
311         if( session ) {
312                 if( osrfAppSessionCache == NULL )
313                         osrfAppSessionCache = osrfNewHash();
314                 if( osrfHashGet( osrfAppSessionCache, session->session_id ) )
315                         return;   // A session with this id is already in the cache.  Shouldn't happen.
316                 osrfHashSet( osrfAppSessionCache, session, session->session_id );
317         }
318 }
319
320 /** Allocates and initializes a new app_session */
321
322 osrfAppSession* osrfAppSessionClientInit( const char* remote_service ) {
323
324         if (!remote_service) {
325                 osrfLogWarning( OSRF_LOG_MARK, "No remote service specified in osrfAppSessionClientInit");
326                 return NULL;
327         }
328
329         osrfAppSession* session = safe_malloc(sizeof(osrfAppSession));
330
331         session->transport_handle = osrfSystemGetTransportClient();
332         if( session->transport_handle == NULL ) {
333                 osrfLogWarning( OSRF_LOG_MARK, "No transport client for service 'client'");
334                 free( session );
335                 return NULL;
336         }
337
338         osrfStringArray* arr = osrfNewStringArray(8);
339         osrfConfigGetValueList(NULL, arr, "/domain");
340         const char* domain = osrfStringArrayGetString(arr, 0);
341
342         if (!domain) {
343                 osrfLogWarning( OSRF_LOG_MARK, "No domains specified in the OpenSRF config file");
344                 free( session );
345                 osrfStringArrayFree(arr);
346                 return NULL;
347         }
348
349         char* router_name = osrfConfigGetValue(NULL, "/router_name");
350         if (!router_name) {
351                 osrfLogWarning( OSRF_LOG_MARK, "No router name specified in the OpenSRF config file");
352                 free( session );
353                 osrfStringArrayFree(arr);
354                 return NULL;
355         }
356
357         char target_buf[512];
358         target_buf[ 0 ] = '\0';
359
360         int len = snprintf( target_buf, sizeof(target_buf), "%s@%s/%s",
361                         router_name ? router_name : "(null)",
362                         domain ? domain : "(null)",
363                         remote_service ? remote_service : "(null)" );
364         osrfStringArrayFree(arr);
365         //free(domain);
366         free(router_name);
367
368         if( len >= sizeof( target_buf ) ) {
369                 osrfLogWarning( OSRF_LOG_MARK, "Buffer overflow for remote_id");
370                 free( session );
371                 return NULL;
372         }
373
374         session->request_queue = osrfNewList();
375         session->request_queue->freeItem = &_osrf_app_request_free;
376         session->remote_id = strdup(target_buf);
377         session->orig_remote_id = strdup(session->remote_id);
378         session->remote_service = strdup(remote_service);
379         session->session_locale = NULL;
380         session->transport_error = 0;
381
382         #ifdef ASSUME_STATELESS
383         session->stateless = 1;
384         osrfLogDebug( OSRF_LOG_MARK, "%s session is stateless", remote_service );
385         #else
386         session->stateless = 0;
387         osrfLogDebug( OSRF_LOG_MARK, "%s session is NOT stateless", remote_service );
388         #endif
389
390         /* build a chunky, random session id */
391         char id[256];
392
393         snprintf(id, sizeof(id), "%f.%d%ld", get_timestamp_millis(), (int)time(NULL), (long) getpid());
394         session->session_id = strdup(id);
395         osrfLogDebug( OSRF_LOG_MARK,  "Building a new client session with id [%s] [%s]",
396                         session->remote_service, session->session_id );
397
398         session->thread_trace = 0;
399         session->state = OSRF_SESSION_DISCONNECTED;
400         session->type = OSRF_SESSION_CLIENT;
401
402         session->userData = NULL;
403         session->userDataFree = NULL;
404
405         _osrf_app_session_push_session( session );
406         return session;
407 }
408
409 osrfAppSession* osrf_app_server_session_init(
410                 const char* session_id, const char* our_app, const char* remote_id ) {
411
412         osrfLogDebug( OSRF_LOG_MARK, "Initing server session with session id %s, service %s,"
413                         " and remote_id %s", session_id, our_app, remote_id );
414
415         osrfAppSession* session = osrf_app_session_find_session( session_id );
416         if(session) return session;
417
418         session = safe_malloc(sizeof(osrfAppSession));
419
420         session->transport_handle = osrfSystemGetTransportClient();
421         if( session->transport_handle == NULL ) {
422                 osrfLogWarning( OSRF_LOG_MARK, "No transport client for service '%s'", our_app );
423                 free(session);
424                 return NULL;
425         }
426
427         int stateless = 0;
428         char* statel = osrf_settings_host_value("/apps/%s/stateless", our_app );
429         if(statel) stateless = atoi(statel);
430         free(statel);
431
432
433         session->request_queue = osrfNewList();
434         session->request_queue->freeItem = &_osrf_app_request_free;
435         session->remote_id = strdup(remote_id);
436         session->orig_remote_id = strdup(remote_id);
437         session->session_id = strdup(session_id);
438         session->remote_service = strdup(our_app);
439         session->stateless = stateless;
440
441         #ifdef ASSUME_STATELESS
442         session->stateless = 1;
443         #else
444         session->stateless = 0;
445         #endif
446
447         session->thread_trace = 0;
448         session->state = OSRF_SESSION_DISCONNECTED;
449         session->type = OSRF_SESSION_SERVER;
450         session->session_locale = NULL;
451
452         session->userData = NULL;
453         session->userDataFree = NULL;
454
455         _osrf_app_session_push_session( session );
456         return session;
457
458 }
459
460 /**
461         @brief Create a REQUEST message, send it, and save it for future reference.
462         @param session Pointer to the current session, which has the addressing information.
463         @param params One way of specifying the parameters for the method.
464         @param method_name The name of the method to be called.
465         @param protocol Protocol.
466         @param param_strings Another way of specifying the parameters for the method.
467         @return The request ID of the resulting REQUEST message, or -1 upon error.
468
469         If @a params is non-NULL, use it to specify the parameters to the method.  Otherwise
470         use @a param_strings.
471
472         If @a params points to a JSON_ARRAY, then pass each element of the array as a separate
473         parameter.  If @a params points to any other kind of jsonObject, pass it as a single
474         parameter.
475
476         If @a params is NULL, and @a param_strings is not NULL, then each pointer in the
477         osrfStringArray must point to a JSON string encoding a parameter.  Pass them.
478
479         At this writing, all calls to this function use @a params to pass parameters, rather than
480         @a param_strings.
481
482         This function is a thin wrapper for osrfAppSessionMakeLocaleRequest().
483 */
484 int osrfAppSessionMakeRequest(
485                 osrfAppSession* session, const jsonObject* params,
486                 const char* method_name, int protocol, osrfStringArray* param_strings ) {
487
488         return osrfAppSessionMakeLocaleRequest( session, params,
489                         method_name, protocol, param_strings, NULL );
490 }
491
492 /**
493         @brief Create a REQUEST message, send it, and save it for future reference.
494         @param session Pointer to the current session, which has the addressing information.
495         @param params One way of specifying the parameters for the method.
496         @param method_name The name of the method to be called.
497         @param protocol Protocol.
498         @param param_strings Another way of specifying the parameters for the method.
499         @param locale Pointer to a locale string.
500         @return The request ID of the resulting REQUEST message, or -1 upon error.
501
502         See the discussion of osrfAppSessionMakeRequest(), which at this writing is the only
503         place that calls this function.
504
505         At this writing, the @a param_strings and @a locale parameters are always NULL.
506 */
507 static int osrfAppSessionMakeLocaleRequest(
508                 osrfAppSession* session, const jsonObject* params, const char* method_name,
509                 int protocol, osrfStringArray* param_strings, char* locale ) {
510
511         if(session == NULL) return -1;
512
513         osrfLogMkXid();
514
515         osrfMessage* req_msg = osrf_message_init( REQUEST, ++(session->thread_trace), protocol );
516         osrf_message_set_method(req_msg, method_name);
517
518         if (locale) {
519                 osrf_message_set_locale(req_msg, locale);
520         } else if (session->session_locale) {
521                 osrf_message_set_locale(req_msg, session->session_locale);
522         }
523
524         if(params) {
525                 osrf_message_set_params(req_msg, params);
526
527         } else {
528
529                 if(param_strings) {
530                         int i;
531                         for(i = 0; i!= param_strings->size ; i++ ) {
532                                 osrf_message_add_param(req_msg,
533                                         osrfStringArrayGetString(param_strings,i));
534                         }
535                 }
536         }
537
538         osrfAppRequest* req = _osrf_app_request_init( session, req_msg );
539         if(_osrf_app_session_send( session, req_msg ) ) {
540                 osrfLogWarning( OSRF_LOG_MARK,  "Error sending request message [%d]",
541                                 session->thread_trace );
542                 _osrf_app_request_free(req);
543                 return -1;
544         }
545
546         osrfLogDebug( OSRF_LOG_MARK,  "Pushing [%d] onto request queue for session [%s] [%s]",
547                         req->request_id, session->remote_service, session->session_id );
548         osrfListSet( session->request_queue, req, req->request_id );
549         return req->request_id;
550 }
551
552 void osrf_app_session_set_complete( osrfAppSession* session, int request_id ) {
553         if(session == NULL)
554                 return;
555
556         osrfAppRequest* req = OSRF_LIST_GET_INDEX( session->request_queue, request_id );
557         if(req) req->complete = 1;
558 }
559
560 int osrf_app_session_request_complete( const osrfAppSession* session, int request_id ) {
561         if(session == NULL)
562                 return 0;
563         osrfAppRequest* req = OSRF_LIST_GET_INDEX( session->request_queue, request_id );
564         if(req)
565                 return req->complete;
566         return 0;
567 }
568
569
570 /** Resets the remote connection id to that of the original*/
571 void osrf_app_session_reset_remote( osrfAppSession* session ){
572         if( session==NULL )
573                 return;
574
575         osrfLogDebug( OSRF_LOG_MARK,  "App Session [%s] [%s] resetting remote id to %s",
576                         session->remote_service, session->session_id, session->orig_remote_id );
577
578         osrf_app_session_set_remote( session, session->orig_remote_id );
579 }
580
581 void osrf_app_session_set_remote( osrfAppSession* session, const char* remote_id ) {
582         if(session == NULL)
583                 return;
584
585         if( session->remote_id ) {
586                 if( strlen(session->remote_id) >= strlen(remote_id) ) {
587                         // There's enough room; just copy it
588                         strcpy(session->remote_id, remote_id);
589                 } else {
590                         free(session->remote_id );
591                         session->remote_id = strdup( remote_id );
592                 }
593         } else
594                 session->remote_id = strdup( remote_id );
595 }
596
597 /**
598         pushes the given message into the result list of the app_request
599         with the given request_id
600 */
601 int osrf_app_session_push_queue(
602                 osrfAppSession* session, osrfMessage* msg ){
603         if(session == NULL || msg == NULL) return 0;
604
605         osrfAppRequest* req = OSRF_LIST_GET_INDEX( session->request_queue, msg->thread_trace );
606         if(req == NULL) return 0;
607         _osrf_app_request_push_queue( req, msg );
608
609         return 0;
610 }
611
612 /** Attempts to connect to the remote service */
613 int osrfAppSessionConnect( osrfAppSession* session ) {
614
615         if(session == NULL)
616                 return 0;
617
618         if(session->state == OSRF_SESSION_CONNECTED) {
619                 return 1;
620         }
621
622         int timeout = 5; /* XXX CONFIG VALUE */
623
624         osrfLogDebug( OSRF_LOG_MARK,  "AppSession connecting to %s", session->remote_id );
625
626         /* defaulting to protocol 1 for now */
627         osrfMessage* con_msg = osrf_message_init( CONNECT, session->thread_trace, 1 );
628         osrf_app_session_reset_remote( session );
629         session->state = OSRF_SESSION_CONNECTING;
630         int ret = _osrf_app_session_send( session, con_msg );
631         osrfMessageFree(con_msg);
632         if(ret)
633                 return 0;
634
635         time_t start = time(NULL);
636         time_t remaining = (time_t) timeout;
637
638         while( session->state != OSRF_SESSION_CONNECTED && remaining >= 0 ) {
639                 osrf_app_session_queue_wait( session, remaining, NULL );
640                 if(session->transport_error) {
641                         osrfLogError(OSRF_LOG_MARK, "cannot communicate with %s", session->remote_service);
642                         return 0;
643                 }
644                 remaining -= (int) (time(NULL) - start);
645         }
646
647         if(session->state == OSRF_SESSION_CONNECTED)
648                 osrfLogDebug( OSRF_LOG_MARK, " * Connected Successfully to %s", session->remote_service );
649
650         if(session->state != OSRF_SESSION_CONNECTED)
651                 return 0;
652
653         return 1;
654 }
655
656
657
658 /** Disconnects from the remote service */
659 int osrf_app_session_disconnect( osrfAppSession* session){
660         if(session == NULL)
661                 return 1;
662
663         if(session->state == OSRF_SESSION_DISCONNECTED)
664                 return 1;
665
666         if(session->stateless && session->state != OSRF_SESSION_CONNECTED) {
667                 osrfLogDebug( OSRF_LOG_MARK,
668                                 "Exiting disconnect on stateless session %s",
669                                 session->session_id);
670                 return 1;
671         }
672
673         osrfLogDebug(OSRF_LOG_MARK,  "AppSession disconnecting from %s", session->remote_id );
674
675         osrfMessage* dis_msg = osrf_message_init( DISCONNECT, session->thread_trace, 1 );
676         _osrf_app_session_send( session, dis_msg );
677         session->state = OSRF_SESSION_DISCONNECTED;
678
679         osrfMessageFree( dis_msg );
680         osrf_app_session_reset_remote( session );
681         return 1;
682 }
683
684 int osrf_app_session_request_resend( osrfAppSession* session, int req_id ) {
685         osrfAppRequest* req = OSRF_LIST_GET_INDEX( session->request_queue, req_id );
686         return _osrf_app_request_resend( req );
687 }
688
689
690 static int osrfAppSessionSendBatch( osrfAppSession* session, osrfMessage* msgs[], int size ) {
691
692         if( !(session && msgs && size > 0) ) return 0;
693         int retval = 0;
694
695         osrfMessage* msg = msgs[0];
696
697         if(msg) {
698
699                 osrf_app_session_queue_wait( session, 0, NULL );
700
701                 if(session->state != OSRF_SESSION_CONNECTED)  {
702
703                         if(session->stateless) { /* stateless session always send to the root listener */
704                                 osrf_app_session_reset_remote(session);
705
706                         } else {
707
708                                 /* do an auto-connect if necessary */
709                                 if( ! session->stateless &&
710                                         (msg->m_type != CONNECT) &&
711                                         (msg->m_type != DISCONNECT) &&
712                                         (session->state != OSRF_SESSION_CONNECTED) ) {
713
714                                         if(!osrfAppSessionConnect( session ))
715                                                 return 0;
716                                 }
717                         }
718                 }
719         }
720
721         char* string = osrfMessageSerializeBatch(msgs, size);
722
723         if( string ) {
724
725                 transport_message* t_msg = message_init(
726                                 string, "", session->session_id, session->remote_id, NULL );
727                 message_set_osrf_xid( t_msg, osrfLogGetXid() );
728
729                 retval = client_send_message( session->transport_handle, t_msg );
730
731                 if( retval ) osrfLogError(OSRF_LOG_MARK, "client_send_message failed");
732
733                 osrfLogInfo(OSRF_LOG_MARK, "[%s] sent %d bytes of data to %s",
734                         session->remote_service, strlen(string), t_msg->recipient );
735
736                 osrfLogDebug(OSRF_LOG_MARK, "Sent: %s", string );
737
738                 free(string);
739                 message_free( t_msg );
740         }
741
742         return retval;
743 }
744
745
746
747 static int _osrf_app_session_send( osrfAppSession* session, osrfMessage* msg ){
748         if( !(session && msg) ) return 0;
749         osrfMessage* a[1];
750         a[0] = msg;
751         return osrfAppSessionSendBatch( session, a, 1 );
752 }
753
754
755 /**
756         Waits up to 'timeout' seconds for some data to arrive.
757         Any data that arrives will be processed according to its
758         payload and message type.  This method will return after
759         any data has arrived.
760 */
761 int osrf_app_session_queue_wait( osrfAppSession* session, int timeout, int* recvd ){
762         if(session == NULL) return 0;
763         osrfLogDebug(OSRF_LOG_MARK, "AppSession in queue_wait with timeout %d", timeout );
764         return osrf_stack_process(session->transport_handle, timeout, recvd);
765 }
766
767 /** Disconnects (if client) and removes the given session from the global session cache
768         ! This frees all attached app_requests !
769 */
770 void osrfAppSessionFree( osrfAppSession* session ){
771         if(session == NULL) return;
772
773         /* Disconnect */
774
775         osrfLogDebug(OSRF_LOG_MARK,  "AppSession [%s] [%s] destroying self and deleting requests",
776                         session->remote_service, session->session_id );
777         if(session->type == OSRF_SESSION_CLIENT
778                         && session->state != OSRF_SESSION_DISCONNECTED ) { /* disconnect if we're a client */
779                 osrfMessage* dis_msg = osrf_message_init( DISCONNECT, session->thread_trace, 1 );
780                 _osrf_app_session_send( session, dis_msg );
781                 osrfMessageFree(dis_msg);
782         }
783
784         /* Remove self from the global session cache */
785
786         osrfHashRemove( osrfAppSessionCache, session->session_id );
787
788         /* Free the memory */
789
790         if( session->userDataFree && session->userData )
791                 session->userDataFree(session->userData);
792
793         if(session->session_locale)
794                 free(session->session_locale);
795
796         free(session->remote_id);
797         free(session->orig_remote_id);
798         free(session->session_id);
799         free(session->remote_service);
800         osrfListFree(session->request_queue);
801         free(session);
802 }
803
804 osrfMessage* osrfAppSessionRequestRecv(
805                 osrfAppSession* session, int req_id, int timeout ) {
806         if(req_id < 0 || session == NULL)
807                 return NULL;
808         osrfAppRequest* req = OSRF_LIST_GET_INDEX( session->request_queue, req_id );
809         return _osrf_app_request_recv( req, timeout );
810 }
811
812
813
814 int osrfAppRequestRespond( osrfAppSession* ses, int requestId, const jsonObject* data ) {
815         if(!ses || ! data ) return -1;
816
817         osrfMessage* msg = osrf_message_init( RESULT, requestId, 1 );
818         osrf_message_set_status_info( msg, NULL, "OK", OSRF_STATUS_OK );
819         char* json = jsonObjectToJSON( data );
820
821         osrf_message_set_result_content( msg, json );
822         _osrf_app_session_send( ses, msg );
823
824         free(json);
825         osrfMessageFree( msg );
826
827         return 0;
828 }
829
830
831 int osrfAppRequestRespondComplete(
832                 osrfAppSession* ses, int requestId, const jsonObject* data ) {
833
834         osrfMessage* status = osrf_message_init( STATUS, requestId, 1);
835         osrf_message_set_status_info( status, "osrfConnectStatus", "Request Complete",
836                         OSRF_STATUS_COMPLETE );
837
838         if (data) {
839                 osrfMessage* payload = osrf_message_init( RESULT, requestId, 1 );
840                 osrf_message_set_status_info( payload, NULL, "OK", OSRF_STATUS_OK );
841
842                 char* json = jsonObjectToJSON( data );
843                 osrf_message_set_result_content( payload, json );
844                 free(json);
845
846                 osrfMessage* ms[2];
847                 ms[0] = payload;
848                 ms[1] = status;
849
850                 osrfAppSessionSendBatch( ses, ms, 2 );
851
852                 osrfMessageFree( payload );
853         } else {
854                 osrfAppSessionSendBatch( ses, &status, 1 );
855         }
856
857         osrfMessageFree( status );
858
859         return 0;
860 }
861
862 int osrfAppSessionStatus( osrfAppSession* ses, int type,
863                 const char* name, int reqId, const char* message ) {
864
865         if(ses) {
866                 osrfMessage* msg = osrf_message_init( STATUS, reqId, 1);
867                 osrf_message_set_status_info( msg, name, message, type );
868                 _osrf_app_session_send( ses, msg );
869                 osrfMessageFree( msg );
870                 return 0;
871         }
872         return -1;
873 }
874
875 /**
876         @brief Free the global session cache.
877         
878         Note that the osrfHash that implements the global session cache does @em not have a
879         callback function installed for freeing its cargo.  As a result, any outstanding
880         osrfAppSessions are leaked, along with all the osrfAppRequests and osrfMessages they
881         own.
882 */
883 void osrfAppSessionCleanup( void ) {
884         osrfHashFree(osrfAppSessionCache);
885         osrfAppSessionCache = NULL;
886 }
887
888
889
890
891