moved session code to osrfHash and osrfList instead of manual linked lists
[Evergreen.git] / OpenSRF / src / libstack / osrf_app_session.c
1 #include "osrf_app_session.h"
2 #include <time.h>
3
4 /* the global app_session cache */
5 osrfHash* osrfAppSessionCache = NULL;
6
7
8 // --------------------------------------------------------------------------
9 // --------------------------------------------------------------------------
10 // Request API
11 // --------------------------------------------------------------------------
12
13 /** Allocation and initializes a new app_request object */
14 osrf_app_request* _osrf_app_request_init( 
15                 osrf_app_session* session, osrf_message* msg ) {
16
17         osrf_app_request* req = 
18                 (osrf_app_request*) safe_malloc(sizeof(osrf_app_request));
19
20         req->session            = session;
21         req->request_id = msg->thread_trace;
22         req->complete           = 0;
23         req->payload            = msg;
24         req->result                     = NULL;
25
26         return req;
27
28 }
29
30 /** Frees memory used by an app_request object */
31 void _osrf_app_request_free( void * req ){
32         if( req == NULL ) return;
33         osrfAppRequest* r = (osrfAppRequest*) req;
34
35         if( r->payload ) osrf_message_free( r->payload );
36
37         /*
38         osrf_message* cur_msg = req->result;
39         while( cur_msg != NULL ) {
40                 osrf_message* next_msg = cur_msg->next;
41                 osrf_message_free( cur_msg );
42                 cur_msg = next_msg;
43         }
44         osrf_message_free( req->payload );
45         */
46
47         free( r );
48 }
49
50 /** Pushes the given message onto the list of 'responses' to this request */
51 void _osrf_app_request_push_queue( osrf_app_request* req, osrf_message* result ){
52         if(req == NULL || result == NULL) return;
53         debug_handler( "App Session pushing request [%d] onto request queue", result->thread_trace );
54         if(req->result == NULL)
55                 req->result = result;
56         else {
57                 result->next = req->result;
58                 req->result = result;
59         }
60 }
61
62 /** Removes this app_request from our session request set */
63 void osrf_app_session_request_finish( 
64                 osrf_app_session* session, int req_id ){
65
66         if(session == NULL) return;
67         osrf_app_request* req = osrfListGetIndex( session->request_queue, req_id );
68         if(req == NULL) return;
69         osrfListRemove( req->session->request_queue, req->request_id );
70 }
71
72
73 void osrf_app_session_request_reset_timeout( osrf_app_session* session, int req_id ) {
74         if(session == NULL) return;
75         debug_handler("Resetting request timeout %d", req_id );
76         osrf_app_request* req = osrfListGetIndex( session->request_queue, req_id );
77         if(req == NULL) return;
78         req->reset_timeout = 1;
79 }
80
81 /** Checks the receive queue for messages.  If any are found, the first
82   * is popped off and returned.  Otherwise, this method will wait at most timeout 
83   * seconds for a message to appear in the receive queue.  Once it arrives it is returned.
84   * If no messages arrive in the timeout provided, null is returned.
85   */
86 osrf_message* _osrf_app_request_recv( osrf_app_request* req, int timeout ) {
87
88         if(req == NULL) return NULL;
89
90         if( req->result != NULL ) {
91                 /* pop off the first message in the list */
92                 osrf_message* tmp_msg = req->result;
93                 req->result = req->result->next;
94                 return tmp_msg;
95         }
96
97         time_t start = time(NULL);      
98         time_t remaining = (time_t) timeout;
99
100         while( remaining >= 0 ) {
101                 /* tell the session to wait for stuff */
102                 debug_handler( "In app_request receive with remaining time [%d]", (int) remaining );
103
104                 osrf_app_session_queue_wait( req->session, 0 );
105
106                 if( req->result != NULL ) { /* if we received anything */
107                         /* pop off the first message in the list */
108                         debug_handler( "app_request_recv received a message, returning it");
109                         osrf_message* ret_msg = req->result;
110                         osrf_message* tmp_msg = ret_msg->next;
111                         req->result = tmp_msg;
112                         return ret_msg;
113                 }
114
115                 if( req->complete )
116                         return NULL;
117
118                 osrf_app_session_queue_wait( req->session, (int) remaining );
119
120                 if( req->result != NULL ) { /* if we received anything */
121                         /* pop off the first message in the list */
122                         debug_handler( "app_request_recv received a message, returning it");
123                         osrf_message* ret_msg = req->result;
124                         osrf_message* tmp_msg = ret_msg->next;
125                         req->result = tmp_msg;
126                         return ret_msg;
127                 }
128                 if( req->complete )
129                         return NULL;
130
131                 if(req->reset_timeout) {
132                         remaining = (time_t) timeout;
133                         req->reset_timeout = 0;
134                         debug_handler("Recevied a timeout reset");
135                 } else {
136                         remaining -= (int) (time(NULL) - start);
137                 }
138         }
139
140         debug_handler("Returning NULL from app_request_recv after timeout");
141         return NULL;
142 }
143
144 /** Resend this requests original request message */
145 int _osrf_app_request_resend( osrf_app_request* req ) {
146         if(req == NULL) return 0;
147         if(!req->complete) {
148                 debug_handler( "Resending request [%d]", req->request_id );
149                 return _osrf_app_session_send( req->session, req->payload );
150         }
151         return 1;
152 }
153
154
155
156 // --------------------------------------------------------------------------
157 // --------------------------------------------------------------------------
158 // Session API
159 // --------------------------------------------------------------------------
160
161 /** returns a session from the global session hash */
162 osrf_app_session* osrf_app_session_find_session( char* session_id ) {
163         if(session_id) return osrfHashGet(osrfAppSessionCache, session_id);
164         return NULL;
165 }
166
167
168 /** adds a session to the global session cache */
169 void _osrf_app_session_push_session( osrf_app_session* session ) {
170         if(!session) return;
171         if( osrfAppSessionCache == NULL ) osrfAppSessionCache = osrfNewHash();
172         if( osrfHashGet( osrfAppSessionCache, session->session_id ) ) return;
173         osrfHashSet( osrfAppSessionCache, session, session->session_id );
174 }
175
176 /** Allocates a initializes a new app_session */
177
178 osrf_app_session* osrfAppSessionClientInit( char* remote_service ) {
179         return osrf_app_client_session_init( remote_service );
180 }
181
182 osrf_app_session* osrf_app_client_session_init( char* remote_service ) {
183
184         osrf_app_session* session = safe_malloc(sizeof(osrf_app_session));      
185
186         session->transport_handle = osrf_system_get_transport_client();
187         if( session->transport_handle == NULL ) {
188                 warning_handler("No transport client for service 'client'");
189                 return NULL;
190         }
191
192         char target_buf[512];
193         memset(target_buf,0,512);
194
195         osrfStringArray* arr = osrfNewStringArray(8);
196         osrfConfigGetValueList(NULL, arr, "/domains/domain");
197         char* domain = osrfStringArrayGetString(arr, 0);
198         char* router_name = osrfConfigGetValue(NULL, "/router_name");
199         
200         sprintf( target_buf, "%s@%s/%s",  router_name, domain, remote_service );
201         osrfStringArrayFree(arr);
202         //free(domain);
203         free(router_name);
204
205         session->request_queue = osrfNewList();
206         session->request_queue->freeItem = &_osrf_app_request_free;
207         session->remote_id = strdup(target_buf);
208         session->orig_remote_id = strdup(session->remote_id);
209         session->remote_service = strdup(remote_service);
210
211         #ifdef ASSUME_STATELESS
212         session->stateless = 1;
213         debug_handler("%s session is stateless", remote_service );
214         #else
215         session->stateless = 0;
216         debug_handler("%s session is NOT stateless", remote_service );
217         #endif
218
219         /* build a chunky, random session id */
220         char id[256];
221         memset(id,0,256);
222
223         sprintf(id, "%lf.%d%d", get_timestamp_millis(), (int)time(NULL), getpid());
224         session->session_id = strdup(id);
225         debug_handler( "Building a new client session with id [%s] [%s]", 
226                         session->remote_service, session->session_id );
227
228         session->thread_trace = 0;
229         session->state = OSRF_SESSION_DISCONNECTED;
230         session->type = OSRF_SESSION_CLIENT;
231         //session->next = NULL;
232         _osrf_app_session_push_session( session );
233         return session;
234 }
235
236 osrf_app_session* osrf_app_server_session_init( 
237                 char* session_id, char* our_app, char* remote_id ) {
238
239         info_handler("Initing server session with session id %s, service %s,"
240                         " and remote_id %s", session_id, our_app, remote_id );
241
242         osrf_app_session* session = osrf_app_session_find_session( session_id );
243         if(session) return session;
244
245         session = safe_malloc(sizeof(osrf_app_session));        
246
247         session->transport_handle = osrf_system_get_transport_client();
248         if( session->transport_handle == NULL ) {
249                 warning_handler("No transport client for service '%s'", our_app );
250                 return NULL;
251         }
252
253         int stateless = 0;
254         char* statel = osrf_settings_host_value("/apps/%s/stateless", our_app );
255         if(statel) stateless = atoi(statel);
256         free(statel);
257
258
259         session->request_queue = osrfNewList();
260         session->request_queue->freeItem = &_osrf_app_request_free;
261         session->remote_id = strdup(remote_id);
262         session->orig_remote_id = strdup(remote_id);
263         session->session_id = strdup(session_id);
264         session->remote_service = strdup(our_app);
265         session->stateless = stateless;
266
267         #ifdef ASSUME_STATELESS
268         session->stateless = 1;
269         #endif
270
271         session->thread_trace = 0;
272         session->state = OSRF_SESSION_DISCONNECTED;
273         session->type = OSRF_SESSION_SERVER;
274
275         _osrf_app_session_push_session( session );
276         return session;
277
278 }
279
280
281
282 /** frees memory held by a session */
283 void _osrf_app_session_free( osrf_app_session* session ){
284         if(session==NULL)
285                 return;
286         
287         free(session->remote_id);
288         free(session->orig_remote_id);
289         free(session->session_id);
290         free(session->remote_service);
291         osrfListFree(session->request_queue);
292         free(session);
293 }
294
295 int osrfAppSessionMakeRequest(
296                 osrf_app_session* session, jsonObject* params, 
297                 char* method_name, int protocol, string_array* param_strings ) {
298
299         return osrf_app_session_make_req( session, params, 
300                         method_name, protocol, param_strings );
301 }
302
303 int osrf_app_session_make_req( 
304                 osrf_app_session* session, jsonObject* params, 
305                 char* method_name, int protocol, string_array* param_strings ) {
306         if(session == NULL) return -1;
307
308         osrf_message* req_msg = osrf_message_init( REQUEST, ++(session->thread_trace), protocol );
309         osrf_message_set_method(req_msg, method_name);
310         if(params) {
311                 osrf_message_set_params(req_msg, params);
312
313         } else {
314
315                 if(param_strings) {
316                         int i;
317                         for(i = 0; i!= param_strings->size ; i++ ) {
318                                 osrf_message_add_param(req_msg,
319                                         string_array_get_string(param_strings,i));
320                         }
321                 }
322         }
323
324         osrf_app_request* req = _osrf_app_request_init( session, req_msg );
325         if(_osrf_app_session_send( session, req_msg ) ) {
326                 warning_handler( "Error sending request message [%d]", session->thread_trace );
327                 return -1;
328         }
329
330         debug_handler( "Pushing [%d] onto requeust queue for session [%s] [%s]",
331                         req->request_id, session->remote_service, session->session_id );
332         osrfListSet( session->request_queue, req, req->request_id ); 
333         return req->request_id;
334 }
335
336 void osrf_app_session_set_complete( osrf_app_session* session, int request_id ) {
337         if(session == NULL)
338                 return;
339
340         osrf_app_request* req = osrfListGetIndex( session->request_queue, request_id );
341         if(req) req->complete = 1;
342 }
343
344 int osrf_app_session_request_complete( osrf_app_session* session, int request_id ) {
345         if(session == NULL)
346                 return 0;
347         osrf_app_request* req = osrfListGetIndex( session->request_queue, request_id );
348         if(req)
349                 return req->complete;
350         return 0;
351 }
352
353
354 /** Resets the remote connection id to that of the original*/
355 void osrf_app_session_reset_remote( osrf_app_session* session ){
356         if( session==NULL )
357                 return;
358
359         free(session->remote_id);
360         debug_handler( "App Session [%s] [%s] resetting remote id to %s",
361                         session->remote_service, session->session_id, session->orig_remote_id );
362
363         session->remote_id = strdup(session->orig_remote_id);
364 }
365
366 void osrf_app_session_set_remote( osrf_app_session* session, char* remote_id ) {
367         if(session == NULL)
368                 return;
369         if( session->remote_id )
370                 free(session->remote_id );
371         session->remote_id = strdup( remote_id );
372 }
373
374 /** pushes the given message into the result list of the app_request
375   with the given request_id */
376 int osrf_app_session_push_queue( 
377                 osrf_app_session* session, osrf_message* msg ){
378         if(session == NULL || msg == NULL) return 0;
379
380         osrf_app_request* req = osrfListGetIndex( session->request_queue, msg->thread_trace );
381         if(req == NULL) return 0;
382         _osrf_app_request_push_queue( req, msg );
383
384         return 0;
385 }
386
387 /** Attempts to connect to the remote service */
388 int osrf_app_session_connect(osrf_app_session* session){
389         
390         if(session == NULL)
391                 return 0;
392
393         if(session->state == OSRF_SESSION_CONNECTED) {
394                 return 1;
395         }
396
397         int timeout = 5; /* XXX CONFIG VALUE */
398
399         debug_handler( "AppSession connecting to %s", session->remote_id );
400
401         /* defaulting to protocol 1 for now */
402         osrf_message* con_msg = osrf_message_init( CONNECT, session->thread_trace, 1 );
403         osrf_app_session_reset_remote( session );
404         session->state = OSRF_SESSION_CONNECTING;
405         int ret = _osrf_app_session_send( session, con_msg );
406         osrf_message_free(con_msg);
407         if(ret) return 0;
408
409         time_t start = time(NULL);      
410         time_t remaining = (time_t) timeout;
411
412         while( session->state != OSRF_SESSION_CONNECTED && remaining >= 0 ) {
413                 osrf_app_session_queue_wait( session, remaining );
414                 remaining -= (int) (time(NULL) - start);
415         }
416
417         if(session->state == OSRF_SESSION_CONNECTED)
418                 debug_handler(" * Connected Successfully to %s", session->remote_service );
419
420         if(session->state != OSRF_SESSION_CONNECTED)
421                 return 0;
422
423         return 1;
424 }
425
426
427
428 /** Disconnects from the remote service */
429 int osrf_app_session_disconnect( osrf_app_session* session){
430         if(session == NULL)
431                 return 1;
432
433         if(session->state == OSRF_SESSION_DISCONNECTED)
434                 return 1;
435
436         if(session->stateless && session->state != OSRF_SESSION_CONNECTED) {
437                 debug_handler(
438                                 "Exiting disconnect on stateless session %s", 
439                                 session->session_id);
440                 return 1;
441         }
442
443         debug_handler( "AppSession disconnecting from %s", session->remote_id );
444
445         osrf_message* dis_msg = osrf_message_init( DISCONNECT, session->thread_trace, 1 );
446         session->state = OSRF_SESSION_DISCONNECTED;
447         _osrf_app_session_send( session, dis_msg );
448
449         osrf_message_free( dis_msg );
450         osrf_app_session_reset_remote( session );
451         return 1;
452 }
453
454 int osrf_app_session_request_resend( osrf_app_session* session, int req_id ) {
455         osrf_app_request* req = osrfListGetIndex( session->request_queue, req_id );
456         return _osrf_app_request_resend( req );
457 }
458
459
460 int osrfAppSessionSendBatch( osrfAppSession* session, osrf_message* msgs[], int size ) {
461
462         if( !(session && msgs && size > 0) ) return 0;
463         int retval = 0;
464
465
466         osrfMessage* msg = msgs[0];
467
468         if(msg) {
469
470                 osrf_app_session_queue_wait( session, 0 );
471
472                 /* if we're not stateless and not connected and the first 
473                         message is not a connect message, then we do the connect first */
474                 if(session->stateless) {
475                                 osrf_app_session_reset_remote(session);
476
477                 } else {
478
479                         if( (msg->m_type != CONNECT) && (msg->m_type != DISCONNECT) &&
480                                 (session->state != OSRF_SESSION_CONNECTED) ) {
481                                 if(!osrf_app_session_connect( session )) 
482                                         return 0;
483                         }
484                 }
485         }
486
487         char* string = osrfMessageSerializeBatch(msgs, size);
488
489         if( string ) {
490
491                 transport_message* t_msg = message_init( 
492                                 string, "", session->session_id, session->remote_id, NULL );
493         
494                 debug_handler("Session [%s] [%s]  sending to %s \nData: %s", 
495                                 session->remote_service, session->session_id, t_msg->recipient, string );
496
497                 retval = client_send_message( session->transport_handle, t_msg );
498         
499                 free(string);
500                 message_free( t_msg );
501         }
502
503         return retval; 
504
505 }
506
507
508
509 int _osrf_app_session_send( osrf_app_session* session, osrf_message* msg ){
510         if( !(session && msg) ) return 0;
511         osrfMessage* a[1];
512         a[0] = msg;
513         return osrfAppSessionSendBatch( session, a, 1 );
514 }
515
516
517
518
519 /**  Waits up to 'timeout' seconds for some data to arrive.
520   * Any data that arrives will be processed according to its
521   * payload and message type.  This method will return after
522   * any data has arrived.
523   */
524 int osrf_app_session_queue_wait( osrf_app_session* session, int timeout ){
525         if(session == NULL) return 0;
526         int ret_val = 0;
527         debug_handler( "AppSession in queue_wait with timeout %d", timeout );
528         ret_val = osrf_stack_entry_point(session->transport_handle, timeout);
529         return ret_val;
530 }
531
532 /** Disconnects (if client) and removes the given session from the global session cache 
533   * ! This free's all attached app_requests ! 
534   */
535 void osrfAppSessionFree( osrfAppSession* ses ) {
536         osrf_app_session_destroy( ses );
537 }
538
539
540 void osrf_app_session_destroy( osrf_app_session* session ){
541         if(session == NULL) return;
542
543         debug_handler( "AppSession [%s] [%s] destroying self and deleting requests", 
544                         session->remote_service, session->session_id );
545         if(session->type == OSRF_SESSION_CLIENT 
546                         && session->state != OSRF_SESSION_DISCONNECTED ) { /* disconnect if we're a client */
547                 osrf_message* dis_msg = osrf_message_init( DISCONNECT, session->thread_trace, 1 );
548                 _osrf_app_session_send( session, dis_msg ); 
549                 osrf_message_free(dis_msg);
550         }
551
552         osrfHashRemove( osrfAppSessionCache, session->session_id );
553         _osrf_app_session_free( session );
554 }
555
556 osrf_message* osrfAppSessionRequestRecv(
557                 osrf_app_session* session, int req_id, int timeout ) {
558         return osrf_app_session_request_recv( session, req_id, timeout );
559 }
560 osrf_message* osrf_app_session_request_recv( 
561                 osrf_app_session* session, int req_id, int timeout ) {
562         if(req_id < 0 || session == NULL)
563                 return NULL;
564         osrf_app_request* req = osrfListGetIndex( session->request_queue, req_id );
565         return _osrf_app_request_recv( req, timeout );
566 }
567
568
569
570 int osrfAppRequestRespond( osrfAppSession* ses, int requestId, jsonObject* data ) {
571         if(!ses || ! data ) return -1;
572
573         osrf_message* msg = osrf_message_init( RESULT, requestId, 1 );
574         char* json = jsonObjectToJSON( data );
575         osrf_message_set_result_content( msg, json );
576         _osrf_app_session_send( ses, msg ); 
577
578         free(json);
579         osrf_message_free( msg );
580
581         return 0;
582 }
583
584
585 int osrfAppRequestRespondComplete( 
586                 osrfAppSession* ses, int requestId, jsonObject* data ) {
587
588         osrf_message* payload = osrf_message_init( RESULT, requestId, 1 );
589         osrf_message_set_status_info( payload, NULL, "OK", OSRF_STATUS_OK );
590
591         char* json = jsonObjectToJSON( data );
592         osrf_message_set_result_content( payload, json );
593         free(json);
594
595         osrf_message* status = osrf_message_init( STATUS, requestId, 1);
596         osrf_message_set_status_info( status, "osrfConnectStatus", "Request Complete", OSRF_STATUS_COMPLETE );
597
598         osrfMessage* ms[2];
599         ms[0] = payload;
600         ms[1] = status;
601
602         osrfAppSessionSendBatch( ses, ms, 2 );
603
604         osrf_message_free( payload );
605         osrf_message_free( status );
606
607         /* join and free */
608
609         return 0;
610 }
611
612 int osrfAppSessionStatus( osrfAppSession* ses, int type, char* name, int reqId, char* message ) {
613
614         if(ses) {
615                 osrf_message* msg = osrf_message_init( STATUS, reqId, 1);
616                 osrf_message_set_status_info( msg, name, message, type );
617                 _osrf_app_session_send( ses, msg ); 
618                 osrf_message_free( msg );
619                 return 0;
620         }
621         return -1;
622 }
623
624
625
626
627
628