]> git.evergreen-ils.org Git - OpenSRF.git/blob - src/libstack/osrf_app_session.c
added some additional logging and some cleanup functions for improving
[OpenSRF.git] / src / libstack / osrf_app_session.c
1 #include "osrf_app_session.h"
2 #include <time.h>
3
4 /* the global app_session cache */
5 osrfHash* osrfAppSessionCache = NULL;
6
7
8 // --------------------------------------------------------------------------
9 // --------------------------------------------------------------------------
10 // Request API
11 // --------------------------------------------------------------------------
12
13 /** Allocation and initializes a new app_request object */
14 osrf_app_request* _osrf_app_request_init( 
15                 osrf_app_session* session, osrf_message* msg ) {
16
17         osrf_app_request* req = 
18                 (osrf_app_request*) safe_malloc(sizeof(osrf_app_request));
19
20         req->session            = session;
21         req->request_id = msg->thread_trace;
22         req->complete           = 0;
23         req->payload            = msg;
24         req->result                     = NULL;
25
26         return req;
27
28 }
29
30
31 void osrfAppSessionCleanup() {
32         osrfHashFree(osrfAppSessionCache);      
33 }
34
35 /** Frees memory used by an app_request object */
36 void _osrf_app_request_free( void * req ){
37         if( req == NULL ) return;
38         osrfAppRequest* r = (osrfAppRequest*) req;
39
40         if( r->payload ) osrf_message_free( r->payload );
41
42         /*
43         osrf_message* cur_msg = req->result;
44         while( cur_msg != NULL ) {
45                 osrf_message* next_msg = cur_msg->next;
46                 osrf_message_free( cur_msg );
47                 cur_msg = next_msg;
48         }
49         osrf_message_free( req->payload );
50         */
51
52         free( r );
53 }
54
55 /** Pushes the given message onto the list of 'responses' to this request */
56 void _osrf_app_request_push_queue( osrf_app_request* req, osrf_message* result ){
57         if(req == NULL || result == NULL) return;
58         osrfLogDebug( OSRF_LOG_MARK,  "App Session pushing request [%d] onto request queue", result->thread_trace );
59         if(req->result == NULL)
60                 req->result = result;
61         else {
62                 result->next = req->result;
63                 req->result = result;
64         }
65 }
66
67 /** Removes this app_request from our session request set */
68 void osrf_app_session_request_finish( 
69                 osrf_app_session* session, int req_id ){
70
71         if(session == NULL) return;
72         osrf_app_request* req = OSRF_LIST_GET_INDEX( session->request_queue, req_id );
73         if(req == NULL) return;
74         osrfListRemove( req->session->request_queue, req->request_id );
75 }
76
77
78 void osrf_app_session_request_reset_timeout( osrf_app_session* session, int req_id ) {
79         if(session == NULL) return;
80         osrfLogDebug( OSRF_LOG_MARK, "Resetting request timeout %d", req_id );
81         osrf_app_request* req = OSRF_LIST_GET_INDEX( session->request_queue, req_id );
82         if(req == NULL) return;
83         req->reset_timeout = 1;
84 }
85
86 /** Checks the receive queue for messages.  If any are found, the first
87   * is popped off and returned.  Otherwise, this method will wait at most timeout 
88   * seconds for a message to appear in the receive queue.  Once it arrives it is returned.
89   * If no messages arrive in the timeout provided, null is returned.
90   */
91 osrf_message* _osrf_app_request_recv( osrf_app_request* req, int timeout ) {
92
93         if(req == NULL) return NULL;
94
95         if( req->result != NULL ) {
96                 /* pop off the first message in the list */
97                 osrf_message* tmp_msg = req->result;
98                 req->result = req->result->next;
99                 return tmp_msg;
100         }
101
102         time_t start = time(NULL);      
103         time_t remaining = (time_t) timeout;
104
105         while( remaining >= 0 ) {
106                 /* tell the session to wait for stuff */
107                 osrfLogDebug( OSRF_LOG_MARK,  "In app_request receive with remaining time [%d]", (int) remaining );
108
109                 osrf_app_session_queue_wait( req->session, 0 );
110
111                 if( req->result != NULL ) { /* if we received anything */
112                         /* pop off the first message in the list */
113                         osrfLogDebug( OSRF_LOG_MARK,  "app_request_recv received a message, returning it");
114                         osrf_message* ret_msg = req->result;
115                         osrf_message* tmp_msg = ret_msg->next;
116                         req->result = tmp_msg;
117                         return ret_msg;
118                 }
119
120                 if( req->complete )
121                         return NULL;
122
123                 osrf_app_session_queue_wait( req->session, (int) remaining );
124
125                 if( req->result != NULL ) { /* if we received anything */
126                         /* pop off the first message in the list */
127                         osrfLogDebug( OSRF_LOG_MARK,  "app_request_recv received a message, returning it");
128                         osrf_message* ret_msg = req->result;
129                         osrf_message* tmp_msg = ret_msg->next;
130                         req->result = tmp_msg;
131                         return ret_msg;
132                 }
133                 if( req->complete )
134                         return NULL;
135
136                 if(req->reset_timeout) {
137                         remaining = (time_t) timeout;
138                         req->reset_timeout = 0;
139                         osrfLogDebug( OSRF_LOG_MARK, "Recevied a timeout reset");
140                 } else {
141                         remaining -= (int) (time(NULL) - start);
142                 }
143         }
144
145         osrfLogDebug( OSRF_LOG_MARK, "Returning NULL from app_request_recv after timeout");
146         return NULL;
147 }
148
149 /** Resend this requests original request message */
150 int _osrf_app_request_resend( osrf_app_request* req ) {
151         if(req == NULL) return 0;
152         if(!req->complete) {
153                 osrfLogDebug( OSRF_LOG_MARK,  "Resending request [%d]", req->request_id );
154                 return _osrf_app_session_send( req->session, req->payload );
155         }
156         return 1;
157 }
158
159
160
161 // --------------------------------------------------------------------------
162 // --------------------------------------------------------------------------
163 // Session API
164 // --------------------------------------------------------------------------
165
166 /** returns a session from the global session hash */
167 osrf_app_session* osrf_app_session_find_session( char* session_id ) {
168         if(session_id) return osrfHashGet(osrfAppSessionCache, session_id);
169         return NULL;
170 }
171
172
173 /** adds a session to the global session cache */
174 void _osrf_app_session_push_session( osrf_app_session* session ) {
175         if(!session) return;
176         if( osrfAppSessionCache == NULL ) osrfAppSessionCache = osrfNewHash();
177         if( osrfHashGet( osrfAppSessionCache, session->session_id ) ) return;
178         osrfHashSet( osrfAppSessionCache, session, session->session_id );
179 }
180
181 /** Allocates a initializes a new app_session */
182
183 osrf_app_session* osrfAppSessionClientInit( char* remote_service ) {
184         return osrf_app_client_session_init( remote_service );
185 }
186
187 osrf_app_session* osrf_app_client_session_init( char* remote_service ) {
188
189         osrf_app_session* session = safe_malloc(sizeof(osrf_app_session));      
190
191         session->transport_handle = osrf_system_get_transport_client();
192         if( session->transport_handle == NULL ) {
193                 osrfLogWarning( OSRF_LOG_MARK, "No transport client for service 'client'");
194                 return NULL;
195         }
196
197         char target_buf[512];
198         memset(target_buf,0,512);
199
200         osrfStringArray* arr = osrfNewStringArray(8);
201         osrfConfigGetValueList(NULL, arr, "/domains/domain");
202         char* domain = osrfStringArrayGetString(arr, 0);
203         char* router_name = osrfConfigGetValue(NULL, "/router_name");
204         
205         sprintf( target_buf, "%s@%s/%s",  router_name, domain, remote_service );
206         osrfStringArrayFree(arr);
207         //free(domain);
208         free(router_name);
209
210         session->request_queue = osrfNewList();
211         session->request_queue->freeItem = &_osrf_app_request_free;
212         session->remote_id = strdup(target_buf);
213         session->orig_remote_id = strdup(session->remote_id);
214         session->remote_service = strdup(remote_service);
215
216         #ifdef ASSUME_STATELESS
217         session->stateless = 1;
218         osrfLogDebug( OSRF_LOG_MARK, "%s session is stateless", remote_service );
219         #else
220         session->stateless = 0;
221         osrfLogDebug( OSRF_LOG_MARK, "%s session is NOT stateless", remote_service );
222         #endif
223
224         /* build a chunky, random session id */
225         char id[256];
226         memset(id,0,256);
227
228         sprintf(id, "%lf.%d%d", get_timestamp_millis(), (int)time(NULL), getpid());
229         session->session_id = strdup(id);
230         osrfLogDebug( OSRF_LOG_MARK,  "Building a new client session with id [%s] [%s]", 
231                         session->remote_service, session->session_id );
232
233         session->thread_trace = 0;
234         session->state = OSRF_SESSION_DISCONNECTED;
235         session->type = OSRF_SESSION_CLIENT;
236         //session->next = NULL;
237         _osrf_app_session_push_session( session );
238         return session;
239 }
240
241 osrf_app_session* osrf_app_server_session_init( 
242                 char* session_id, char* our_app, char* remote_id ) {
243
244         osrfLogInfo( OSRF_LOG_MARK, "Initing server session with session id %s, service %s,"
245                         " and remote_id %s", session_id, our_app, remote_id );
246
247         osrf_app_session* session = osrf_app_session_find_session( session_id );
248         if(session) return session;
249
250         session = safe_malloc(sizeof(osrf_app_session));        
251
252         session->transport_handle = osrf_system_get_transport_client();
253         if( session->transport_handle == NULL ) {
254                 osrfLogWarning( OSRF_LOG_MARK, "No transport client for service '%s'", our_app );
255                 return NULL;
256         }
257
258         int stateless = 0;
259         char* statel = osrf_settings_host_value("/apps/%s/stateless", our_app );
260         if(statel) stateless = atoi(statel);
261         free(statel);
262
263
264         session->request_queue = osrfNewList();
265         session->request_queue->freeItem = &_osrf_app_request_free;
266         session->remote_id = strdup(remote_id);
267         session->orig_remote_id = strdup(remote_id);
268         session->session_id = strdup(session_id);
269         session->remote_service = strdup(our_app);
270         session->stateless = stateless;
271
272         #ifdef ASSUME_STATELESS
273         session->stateless = 1;
274         #endif
275
276         session->thread_trace = 0;
277         session->state = OSRF_SESSION_DISCONNECTED;
278         session->type = OSRF_SESSION_SERVER;
279
280         _osrf_app_session_push_session( session );
281         return session;
282
283 }
284
285
286
287 /** frees memory held by a session */
288 void _osrf_app_session_free( osrf_app_session* session ){
289         if(session==NULL)
290                 return;
291         
292         free(session->remote_id);
293         free(session->orig_remote_id);
294         free(session->session_id);
295         free(session->remote_service);
296         osrfListFree(session->request_queue);
297         free(session);
298 }
299
300 int osrfAppSessionMakeRequest(
301                 osrf_app_session* session, jsonObject* params, 
302                 char* method_name, int protocol, string_array* param_strings ) {
303
304         return osrf_app_session_make_req( session, params, 
305                         method_name, protocol, param_strings );
306 }
307
308 int osrf_app_session_make_req( 
309                 osrf_app_session* session, jsonObject* params, 
310                 char* method_name, int protocol, string_array* param_strings ) {
311         if(session == NULL) return -1;
312
313         osrf_message* req_msg = osrf_message_init( REQUEST, ++(session->thread_trace), protocol );
314         osrf_message_set_method(req_msg, method_name);
315         if(params) {
316                 osrf_message_set_params(req_msg, params);
317
318         } else {
319
320                 if(param_strings) {
321                         int i;
322                         for(i = 0; i!= param_strings->size ; i++ ) {
323                                 osrf_message_add_param(req_msg,
324                                         string_array_get_string(param_strings,i));
325                         }
326                 }
327         }
328
329         osrf_app_request* req = _osrf_app_request_init( session, req_msg );
330         if(_osrf_app_session_send( session, req_msg ) ) {
331                 osrfLogWarning( OSRF_LOG_MARK,  "Error sending request message [%d]", session->thread_trace );
332                 return -1;
333         }
334
335         osrfLogDebug( OSRF_LOG_MARK,  "Pushing [%d] onto requeust queue for session [%s] [%s]",
336                         req->request_id, session->remote_service, session->session_id );
337         osrfListSet( session->request_queue, req, req->request_id ); 
338         return req->request_id;
339 }
340
341 void osrf_app_session_set_complete( osrf_app_session* session, int request_id ) {
342         if(session == NULL)
343                 return;
344
345         osrf_app_request* req = OSRF_LIST_GET_INDEX( session->request_queue, request_id );
346         if(req) req->complete = 1;
347 }
348
349 int osrf_app_session_request_complete( osrf_app_session* session, int request_id ) {
350         if(session == NULL)
351                 return 0;
352         osrf_app_request* req = OSRF_LIST_GET_INDEX( session->request_queue, request_id );
353         if(req)
354                 return req->complete;
355         return 0;
356 }
357
358
359 /** Resets the remote connection id to that of the original*/
360 void osrf_app_session_reset_remote( osrf_app_session* session ){
361         if( session==NULL )
362                 return;
363
364         free(session->remote_id);
365         osrfLogDebug( OSRF_LOG_MARK,  "App Session [%s] [%s] resetting remote id to %s",
366                         session->remote_service, session->session_id, session->orig_remote_id );
367
368         session->remote_id = strdup(session->orig_remote_id);
369 }
370
371 void osrf_app_session_set_remote( osrf_app_session* session, char* remote_id ) {
372         if(session == NULL)
373                 return;
374         if( session->remote_id )
375                 free(session->remote_id );
376         session->remote_id = strdup( remote_id );
377 }
378
379 /** pushes the given message into the result list of the app_request
380   with the given request_id */
381 int osrf_app_session_push_queue( 
382                 osrf_app_session* session, osrf_message* msg ){
383         if(session == NULL || msg == NULL) return 0;
384
385         osrf_app_request* req = OSRF_LIST_GET_INDEX( session->request_queue, msg->thread_trace );
386         if(req == NULL) return 0;
387         _osrf_app_request_push_queue( req, msg );
388
389         return 0;
390 }
391
392 /** Attempts to connect to the remote service */
393 int osrf_app_session_connect(osrf_app_session* session){
394         
395         if(session == NULL)
396                 return 0;
397
398         if(session->state == OSRF_SESSION_CONNECTED) {
399                 return 1;
400         }
401
402         int timeout = 5; /* XXX CONFIG VALUE */
403
404         osrfLogDebug( OSRF_LOG_MARK,  "AppSession connecting to %s", session->remote_id );
405
406         /* defaulting to protocol 1 for now */
407         osrf_message* con_msg = osrf_message_init( CONNECT, session->thread_trace, 1 );
408         osrf_app_session_reset_remote( session );
409         session->state = OSRF_SESSION_CONNECTING;
410         int ret = _osrf_app_session_send( session, con_msg );
411         osrf_message_free(con_msg);
412         if(ret) return 0;
413
414         time_t start = time(NULL);      
415         time_t remaining = (time_t) timeout;
416
417         while( session->state != OSRF_SESSION_CONNECTED && remaining >= 0 ) {
418                 osrf_app_session_queue_wait( session, remaining );
419                 remaining -= (int) (time(NULL) - start);
420         }
421
422         if(session->state == OSRF_SESSION_CONNECTED)
423                 osrfLogDebug( OSRF_LOG_MARK, " * Connected Successfully to %s", session->remote_service );
424
425         if(session->state != OSRF_SESSION_CONNECTED)
426                 return 0;
427
428         return 1;
429 }
430
431
432
433 /** Disconnects from the remote service */
434 int osrf_app_session_disconnect( osrf_app_session* session){
435         if(session == NULL)
436                 return 1;
437
438         if(session->state == OSRF_SESSION_DISCONNECTED)
439                 return 1;
440
441         if(session->stateless && session->state != OSRF_SESSION_CONNECTED) {
442                 osrfLogDebug( OSRF_LOG_MARK,  
443                                 "Exiting disconnect on stateless session %s", 
444                                 session->session_id);
445                 return 1;
446         }
447
448         osrfLogDebug(OSRF_LOG_MARK,  "AppSession disconnecting from %s", session->remote_id );
449
450         osrf_message* dis_msg = osrf_message_init( DISCONNECT, session->thread_trace, 1 );
451         session->state = OSRF_SESSION_DISCONNECTED;
452         _osrf_app_session_send( session, dis_msg );
453
454         osrf_message_free( dis_msg );
455         osrf_app_session_reset_remote( session );
456         return 1;
457 }
458
459 int osrf_app_session_request_resend( osrf_app_session* session, int req_id ) {
460         osrf_app_request* req = OSRF_LIST_GET_INDEX( session->request_queue, req_id );
461         return _osrf_app_request_resend( req );
462 }
463
464
465 int osrfAppSessionSendBatch( osrfAppSession* session, osrf_message* msgs[], int size ) {
466
467         if( !(session && msgs && size > 0) ) return 0;
468         int retval = 0;
469
470
471         osrfMessage* msg = msgs[0];
472
473         if(msg) {
474
475                 osrf_app_session_queue_wait( session, 0 );
476
477                 /* if we're not stateless and not connected and the first 
478                         message is not a connect message, then we do the connect first */
479                 if(session->stateless) {
480                                 osrf_app_session_reset_remote(session);
481
482                 } else {
483
484                         if( (msg->m_type != CONNECT) && (msg->m_type != DISCONNECT) &&
485                                 (session->state != OSRF_SESSION_CONNECTED) ) {
486                                 if(!osrf_app_session_connect( session )) 
487                                         return 0;
488                         }
489                 }
490         }
491
492         char* string = osrfMessageSerializeBatch(msgs, size);
493
494         if( string ) {
495
496                 transport_message* t_msg = message_init( 
497                                 string, "", session->session_id, session->remote_id, NULL );
498         
499                 osrfLogDebug(OSRF_LOG_MARK, "Session [%s] [%s]  sending to %s \nData: %s", 
500                                 session->remote_service, session->session_id, t_msg->recipient, string );
501
502                 retval = client_send_message( session->transport_handle, t_msg );
503         
504                 free(string);
505                 message_free( t_msg );
506         }
507
508         return retval; 
509
510 }
511
512
513
514 int _osrf_app_session_send( osrf_app_session* session, osrf_message* msg ){
515         if( !(session && msg) ) return 0;
516         osrfMessage* a[1];
517         a[0] = msg;
518         return osrfAppSessionSendBatch( session, a, 1 );
519 }
520
521
522
523
524 /**  Waits up to 'timeout' seconds for some data to arrive.
525   * Any data that arrives will be processed according to its
526   * payload and message type.  This method will return after
527   * any data has arrived.
528   */
529 int osrf_app_session_queue_wait( osrf_app_session* session, int timeout ){
530         if(session == NULL) return 0;
531         int ret_val = 0;
532         osrfLogDebug(OSRF_LOG_MARK,  "AppSession in queue_wait with timeout %d", timeout );
533         ret_val = osrf_stack_entry_point(session->transport_handle, timeout);
534         return ret_val;
535 }
536
537 /** Disconnects (if client) and removes the given session from the global session cache 
538   * ! This free's all attached app_requests ! 
539   */
540 void osrfAppSessionFree( osrfAppSession* ses ) {
541         osrf_app_session_destroy( ses );
542 }
543
544
545 void osrf_app_session_destroy( osrf_app_session* session ){
546         if(session == NULL) return;
547
548         osrfLogDebug(OSRF_LOG_MARK,  "AppSession [%s] [%s] destroying self and deleting requests", 
549                         session->remote_service, session->session_id );
550         if(session->type == OSRF_SESSION_CLIENT 
551                         && session->state != OSRF_SESSION_DISCONNECTED ) { /* disconnect if we're a client */
552                 osrf_message* dis_msg = osrf_message_init( DISCONNECT, session->thread_trace, 1 );
553                 _osrf_app_session_send( session, dis_msg ); 
554                 osrf_message_free(dis_msg);
555         }
556
557         osrfHashRemove( osrfAppSessionCache, session->session_id );
558         _osrf_app_session_free( session );
559 }
560
561 osrf_message* osrfAppSessionRequestRecv(
562                 osrf_app_session* session, int req_id, int timeout ) {
563         return osrf_app_session_request_recv( session, req_id, timeout );
564 }
565 osrf_message* osrf_app_session_request_recv( 
566                 osrf_app_session* session, int req_id, int timeout ) {
567         if(req_id < 0 || session == NULL)
568                 return NULL;
569         osrf_app_request* req = OSRF_LIST_GET_INDEX( session->request_queue, req_id );
570         return _osrf_app_request_recv( req, timeout );
571 }
572
573
574
575 int osrfAppRequestRespond( osrfAppSession* ses, int requestId, jsonObject* data ) {
576         if(!ses || ! data ) return -1;
577
578         osrf_message* msg = osrf_message_init( RESULT, requestId, 1 );
579         char* json = jsonObjectToJSON( data );
580         osrf_message_set_result_content( msg, json );
581         _osrf_app_session_send( ses, msg ); 
582
583         free(json);
584         osrf_message_free( msg );
585
586         return 0;
587 }
588
589
590 int osrfAppRequestRespondComplete( 
591                 osrfAppSession* ses, int requestId, jsonObject* data ) {
592
593         osrf_message* payload = osrf_message_init( RESULT, requestId, 1 );
594         osrf_message_set_status_info( payload, NULL, "OK", OSRF_STATUS_OK );
595
596         char* json = jsonObjectToJSON( data );
597         osrf_message_set_result_content( payload, json );
598         free(json);
599
600         osrf_message* status = osrf_message_init( STATUS, requestId, 1);
601         osrf_message_set_status_info( status, "osrfConnectStatus", "Request Complete", OSRF_STATUS_COMPLETE );
602
603         osrfMessage* ms[2];
604         ms[0] = payload;
605         ms[1] = status;
606
607         osrfAppSessionSendBatch( ses, ms, 2 );
608
609         osrf_message_free( payload );
610         osrf_message_free( status );
611
612         /* join and free */
613
614         return 0;
615 }
616
617 int osrfAppSessionStatus( osrfAppSession* ses, int type, char* name, int reqId, char* message ) {
618
619         if(ses) {
620                 osrf_message* msg = osrf_message_init( STATUS, reqId, 1);
621                 osrf_message_set_status_info( msg, name, message, type );
622                 _osrf_app_session_send( ses, msg ); 
623                 osrf_message_free( msg );
624                 return 0;
625         }
626         return -1;
627 }
628
629
630
631
632
633