]> git.evergreen-ils.org Git - OpenSRF.git/blob - src/libstack/osrf_app_session.c
0b3553717e4b0d3646788f80ee36e36af92e75b0
[OpenSRF.git] / src / libstack / osrf_app_session.c
1 #include "osrf_app_session.h"
2 #include <time.h>
3
4 /* the global app_session cache */
5 osrfHash* osrfAppSessionCache = NULL;
6
7
8 // --------------------------------------------------------------------------
9 // --------------------------------------------------------------------------
10 // Request API
11 // --------------------------------------------------------------------------
12
13 /** Allocation and initializes a new app_request object */
14 osrf_app_request* _osrf_app_request_init( 
15                 osrf_app_session* session, osrf_message* msg ) {
16
17         osrf_app_request* req = 
18                 (osrf_app_request*) safe_malloc(sizeof(osrf_app_request));
19
20         req->session            = session;
21         req->request_id = msg->thread_trace;
22         req->complete           = 0;
23         req->payload            = msg;
24         req->result                     = NULL;
25
26         return req;
27
28 }
29
30
31 void osrfAppSessionCleanup() {
32         osrfHashFree(osrfAppSessionCache);      
33 }
34
35 /** Frees memory used by an app_request object */
36 void _osrf_app_request_free( void * req ){
37         if( req == NULL ) return;
38         osrfAppRequest* r = (osrfAppRequest*) req;
39         if( r->payload ) osrf_message_free( r->payload );
40         free( r );
41 }
42
43 /** Pushes the given message onto the list of 'responses' to this request */
44 void _osrf_app_request_push_queue( osrf_app_request* req, osrf_message* result ){
45         if(req == NULL || result == NULL) return;
46         osrfLogDebug( OSRF_LOG_MARK,  "App Session pushing request [%d] onto request queue", result->thread_trace );
47         if(req->result == NULL) {
48                 req->result = result;
49
50         } else {
51                 
52                 osrf_message* ptr = req->result;
53                 osrf_message* ptr2 = req->result->next;
54                 while( ptr2 ) {
55                         ptr = ptr2;
56                         ptr2 = ptr2->next;
57                 }
58                 ptr->next = result;
59         }
60 }
61
62 /** Removes this app_request from our session request set */
63 void osrf_app_session_request_finish( 
64                 osrf_app_session* session, int req_id ){
65
66         if(session == NULL) return;
67         osrf_app_request* req = OSRF_LIST_GET_INDEX( session->request_queue, req_id );
68         if(req == NULL) return;
69         osrfListRemove( req->session->request_queue, req->request_id );
70 }
71
72
73 void osrf_app_session_request_reset_timeout( osrf_app_session* session, int req_id ) {
74         if(session == NULL) return;
75         osrfLogDebug( OSRF_LOG_MARK, "Resetting request timeout %d", req_id );
76         osrf_app_request* req = OSRF_LIST_GET_INDEX( session->request_queue, req_id );
77         if(req == NULL) return;
78         req->reset_timeout = 1;
79 }
80
81 /** Checks the receive queue for messages.  If any are found, the first
82   * is popped off and returned.  Otherwise, this method will wait at most timeout 
83   * seconds for a message to appear in the receive queue.  Once it arrives it is returned.
84   * If no messages arrive in the timeout provided, null is returned.
85   */
86 osrf_message* _osrf_app_request_recv( osrf_app_request* req, int timeout ) {
87
88         if(req == NULL) return NULL;
89
90         if( req->result != NULL ) {
91                 /* pop off the first message in the list */
92                 osrf_message* tmp_msg = req->result;
93                 req->result = req->result->next;
94                 return tmp_msg;
95         }
96
97         time_t start = time(NULL);      
98         time_t remaining = (time_t) timeout;
99
100         while( remaining >= 0 ) {
101                 /* tell the session to wait for stuff */
102                 osrfLogDebug( OSRF_LOG_MARK,  "In app_request receive with remaining time [%d]", (int) remaining );
103
104                 osrf_app_session_queue_wait( req->session, 0, NULL );
105
106                 if( req->result != NULL ) { /* if we received anything */
107                         /* pop off the first message in the list */
108                         osrfLogDebug( OSRF_LOG_MARK,  "app_request_recv received a message, returning it");
109                         osrf_message* ret_msg = req->result;
110                         osrf_message* tmp_msg = ret_msg->next;
111                         req->result = tmp_msg;
112                         return ret_msg;
113                 }
114
115                 if( req->complete )
116                         return NULL;
117
118                 osrf_app_session_queue_wait( req->session, (int) remaining, NULL );
119
120                 if( req->result != NULL ) { /* if we received anything */
121                         /* pop off the first message in the list */
122                         osrfLogDebug( OSRF_LOG_MARK,  "app_request_recv received a message, returning it");
123                         osrf_message* ret_msg = req->result;
124                         osrf_message* tmp_msg = ret_msg->next;
125                         req->result = tmp_msg;
126                         return ret_msg;
127                 }
128                 if( req->complete )
129                         return NULL;
130
131                 if(req->reset_timeout) {
132                         remaining = (time_t) timeout;
133                         req->reset_timeout = 0;
134                         osrfLogDebug( OSRF_LOG_MARK, "Recevied a timeout reset");
135                 } else {
136                         remaining -= (int) (time(NULL) - start);
137                 }
138         }
139
140         osrfLogInfo( OSRF_LOG_MARK, "Returning NULL from app_request_recv after timeout");
141         return NULL;
142 }
143
144 /** Resend this requests original request message */
145 int _osrf_app_request_resend( osrf_app_request* req ) {
146         if(req == NULL) return 0;
147         if(!req->complete) {
148                 osrfLogDebug( OSRF_LOG_MARK,  "Resending request [%d]", req->request_id );
149                 return _osrf_app_session_send( req->session, req->payload );
150         }
151         return 1;
152 }
153
154
155
156 // --------------------------------------------------------------------------
157 // --------------------------------------------------------------------------
158 // Session API
159 // --------------------------------------------------------------------------
160
161 /** returns a session from the global session hash */
162 osrf_app_session* osrf_app_session_find_session( char* session_id ) {
163         if(session_id) return osrfHashGet(osrfAppSessionCache, session_id);
164         return NULL;
165 }
166
167
168 /** adds a session to the global session cache */
169 void _osrf_app_session_push_session( osrf_app_session* session ) {
170         if(!session) return;
171         if( osrfAppSessionCache == NULL ) osrfAppSessionCache = osrfNewHash();
172         if( osrfHashGet( osrfAppSessionCache, session->session_id ) ) return;
173         osrfHashSet( osrfAppSessionCache, session, session->session_id );
174 }
175
176 /** Allocates a initializes a new app_session */
177
178 osrf_app_session* osrfAppSessionClientInit( char* remote_service ) {
179         return osrf_app_client_session_init( remote_service );
180 }
181
182 osrf_app_session* osrf_app_client_session_init( char* remote_service ) {
183
184         osrf_app_session* session = safe_malloc(sizeof(osrf_app_session));      
185
186         session->transport_handle = osrf_system_get_transport_client();
187         if( session->transport_handle == NULL ) {
188                 osrfLogWarning( OSRF_LOG_MARK, "No transport client for service 'client'");
189                 return NULL;
190         }
191
192         char target_buf[512];
193         memset(target_buf,0,512);
194
195         osrfStringArray* arr = osrfNewStringArray(8);
196         osrfConfigGetValueList(NULL, arr, "/domains/domain");
197         char* domain = osrfStringArrayGetString(arr, 0);
198         char* router_name = osrfConfigGetValue(NULL, "/router_name");
199         
200         sprintf( target_buf, "%s@%s/%s",  router_name, domain, remote_service );
201         osrfStringArrayFree(arr);
202         //free(domain);
203         free(router_name);
204
205         session->request_queue = osrfNewList();
206         session->request_queue->freeItem = &_osrf_app_request_free;
207         session->remote_id = strdup(target_buf);
208         session->orig_remote_id = strdup(session->remote_id);
209         session->remote_service = strdup(remote_service);
210
211         #ifdef ASSUME_STATELESS
212         session->stateless = 1;
213         osrfLogDebug( OSRF_LOG_MARK, "%s session is stateless", remote_service );
214         #else
215         session->stateless = 0;
216         osrfLogDebug( OSRF_LOG_MARK, "%s session is NOT stateless", remote_service );
217         #endif
218
219         /* build a chunky, random session id */
220         char id[256];
221         memset(id,0,256);
222
223         sprintf(id, "%lf.%d%d", get_timestamp_millis(), (int)time(NULL), getpid());
224         session->session_id = strdup(id);
225         osrfLogDebug( OSRF_LOG_MARK,  "Building a new client session with id [%s] [%s]", 
226                         session->remote_service, session->session_id );
227
228         session->thread_trace = 0;
229         session->state = OSRF_SESSION_DISCONNECTED;
230         session->type = OSRF_SESSION_CLIENT;
231         //session->next = NULL;
232         _osrf_app_session_push_session( session );
233         return session;
234 }
235
236 osrf_app_session* osrf_app_server_session_init( 
237                 char* session_id, char* our_app, char* remote_id ) {
238
239         osrfLogDebug( OSRF_LOG_MARK, "Initing server session with session id %s, service %s,"
240                         " and remote_id %s", session_id, our_app, remote_id );
241
242         osrf_app_session* session = osrf_app_session_find_session( session_id );
243         if(session) return session;
244
245         session = safe_malloc(sizeof(osrf_app_session));        
246
247         session->transport_handle = osrf_system_get_transport_client();
248         if( session->transport_handle == NULL ) {
249                 osrfLogWarning( OSRF_LOG_MARK, "No transport client for service '%s'", our_app );
250                 return NULL;
251         }
252
253         int stateless = 0;
254         char* statel = osrf_settings_host_value("/apps/%s/stateless", our_app );
255         if(statel) stateless = atoi(statel);
256         free(statel);
257
258
259         session->request_queue = osrfNewList();
260         session->request_queue->freeItem = &_osrf_app_request_free;
261         session->remote_id = strdup(remote_id);
262         session->orig_remote_id = strdup(remote_id);
263         session->session_id = strdup(session_id);
264         session->remote_service = strdup(our_app);
265         session->stateless = stateless;
266
267         #ifdef ASSUME_STATELESS
268         session->stateless = 1;
269         #endif
270
271         session->thread_trace = 0;
272         session->state = OSRF_SESSION_DISCONNECTED;
273         session->type = OSRF_SESSION_SERVER;
274
275         _osrf_app_session_push_session( session );
276         return session;
277
278 }
279
280
281
282 /** frees memory held by a session */
283 void _osrf_app_session_free( osrf_app_session* session ){
284         if(session==NULL)
285                 return;
286
287         if( session->userDataFree && session->userData ) 
288                 session->userDataFree(session->userData);
289         
290         free(session->remote_id);
291         free(session->orig_remote_id);
292         free(session->session_id);
293         free(session->remote_service);
294         osrfListFree(session->request_queue);
295         free(session);
296 }
297
298 int osrfAppSessionMakeRequest(
299                 osrf_app_session* session, jsonObject* params, 
300                 char* method_name, int protocol, string_array* param_strings ) {
301
302         return osrf_app_session_make_req( session, params, 
303                         method_name, protocol, param_strings );
304 }
305
306 int osrf_app_session_make_req( 
307                 osrf_app_session* session, jsonObject* params, 
308                 char* method_name, int protocol, string_array* param_strings ) {
309         if(session == NULL) return -1;
310
311         osrf_message* req_msg = osrf_message_init( REQUEST, ++(session->thread_trace), protocol );
312         osrf_message_set_method(req_msg, method_name);
313         if(params) {
314                 osrf_message_set_params(req_msg, params);
315
316         } else {
317
318                 if(param_strings) {
319                         int i;
320                         for(i = 0; i!= param_strings->size ; i++ ) {
321                                 osrf_message_add_param(req_msg,
322                                         string_array_get_string(param_strings,i));
323                         }
324                 }
325         }
326
327         osrf_app_request* req = _osrf_app_request_init( session, req_msg );
328         if(_osrf_app_session_send( session, req_msg ) ) {
329                 osrfLogWarning( OSRF_LOG_MARK,  "Error sending request message [%d]", session->thread_trace );
330                 return -1;
331         }
332
333         osrfLogDebug( OSRF_LOG_MARK,  "Pushing [%d] onto requeust queue for session [%s] [%s]",
334                         req->request_id, session->remote_service, session->session_id );
335         osrfListSet( session->request_queue, req, req->request_id ); 
336         return req->request_id;
337 }
338
339 void osrf_app_session_set_complete( osrf_app_session* session, int request_id ) {
340         if(session == NULL)
341                 return;
342
343         osrf_app_request* req = OSRF_LIST_GET_INDEX( session->request_queue, request_id );
344         if(req) req->complete = 1;
345 }
346
347 int osrf_app_session_request_complete( osrf_app_session* session, int request_id ) {
348         if(session == NULL)
349                 return 0;
350         osrf_app_request* req = OSRF_LIST_GET_INDEX( session->request_queue, request_id );
351         if(req)
352                 return req->complete;
353         return 0;
354 }
355
356
357 /** Resets the remote connection id to that of the original*/
358 void osrf_app_session_reset_remote( osrf_app_session* session ){
359         if( session==NULL )
360                 return;
361
362         free(session->remote_id);
363         osrfLogDebug( OSRF_LOG_MARK,  "App Session [%s] [%s] resetting remote id to %s",
364                         session->remote_service, session->session_id, session->orig_remote_id );
365
366         session->remote_id = strdup(session->orig_remote_id);
367 }
368
369 void osrf_app_session_set_remote( osrf_app_session* session, char* remote_id ) {
370         if(session == NULL)
371                 return;
372         if( session->remote_id )
373                 free(session->remote_id );
374         session->remote_id = strdup( remote_id );
375 }
376
377 /** pushes the given message into the result list of the app_request
378   with the given request_id */
379 int osrf_app_session_push_queue( 
380                 osrf_app_session* session, osrf_message* msg ){
381         if(session == NULL || msg == NULL) return 0;
382
383         osrf_app_request* req = OSRF_LIST_GET_INDEX( session->request_queue, msg->thread_trace );
384         if(req == NULL) return 0;
385         _osrf_app_request_push_queue( req, msg );
386
387         return 0;
388 }
389
390 int osrfAppSessionConnect( osrf_app_session* session ) { 
391         return osrf_app_session_connect(session);
392 }
393
394
395 /** Attempts to connect to the remote service */
396 int osrf_app_session_connect(osrf_app_session* session){
397         
398         if(session == NULL)
399                 return 0;
400
401         if(session->state == OSRF_SESSION_CONNECTED) {
402                 return 1;
403         }
404
405         int timeout = 5; /* XXX CONFIG VALUE */
406
407         osrfLogDebug( OSRF_LOG_MARK,  "AppSession connecting to %s", session->remote_id );
408
409         /* defaulting to protocol 1 for now */
410         osrf_message* con_msg = osrf_message_init( CONNECT, session->thread_trace, 1 );
411         osrf_app_session_reset_remote( session );
412         session->state = OSRF_SESSION_CONNECTING;
413         int ret = _osrf_app_session_send( session, con_msg );
414         osrf_message_free(con_msg);
415         if(ret) return 0;
416
417         time_t start = time(NULL);      
418         time_t remaining = (time_t) timeout;
419
420         while( session->state != OSRF_SESSION_CONNECTED && remaining >= 0 ) {
421                 osrf_app_session_queue_wait( session, remaining, NULL );
422                 remaining -= (int) (time(NULL) - start);
423         }
424
425         if(session->state == OSRF_SESSION_CONNECTED)
426                 osrfLogDebug( OSRF_LOG_MARK, " * Connected Successfully to %s", session->remote_service );
427
428         if(session->state != OSRF_SESSION_CONNECTED)
429                 return 0;
430
431         return 1;
432 }
433
434
435
436 /** Disconnects from the remote service */
437 int osrf_app_session_disconnect( osrf_app_session* session){
438         if(session == NULL)
439                 return 1;
440
441         if(session->state == OSRF_SESSION_DISCONNECTED)
442                 return 1;
443
444         if(session->stateless && session->state != OSRF_SESSION_CONNECTED) {
445                 osrfLogDebug( OSRF_LOG_MARK,  
446                                 "Exiting disconnect on stateless session %s", 
447                                 session->session_id);
448                 return 1;
449         }
450
451         osrfLogDebug(OSRF_LOG_MARK,  "AppSession disconnecting from %s", session->remote_id );
452
453         osrf_message* dis_msg = osrf_message_init( DISCONNECT, session->thread_trace, 1 );
454         _osrf_app_session_send( session, dis_msg );
455         session->state = OSRF_SESSION_DISCONNECTED;
456
457         osrf_message_free( dis_msg );
458         osrf_app_session_reset_remote( session );
459         return 1;
460 }
461
462 int osrf_app_session_request_resend( osrf_app_session* session, int req_id ) {
463         osrf_app_request* req = OSRF_LIST_GET_INDEX( session->request_queue, req_id );
464         return _osrf_app_request_resend( req );
465 }
466
467
468 int osrfAppSessionSendBatch( osrfAppSession* session, osrf_message* msgs[], int size ) {
469
470         if( !(session && msgs && size > 0) ) return 0;
471         int retval = 0;
472
473         osrfMessage* msg = msgs[0];
474
475         if(msg) {
476
477                 osrf_app_session_queue_wait( session, 0, NULL );
478
479                 if(session->state != OSRF_SESSION_CONNECTED)  {
480
481                         if(session->stateless) { /* stateless session always send to the root listener */
482                                 osrf_app_session_reset_remote(session);
483
484                         } else { 
485
486                                 /* do an auto-connect if necessary */
487                                 if( ! session->stateless &&
488                                         (msg->m_type != CONNECT) && 
489                                         (msg->m_type != DISCONNECT) &&
490                                         (session->state != OSRF_SESSION_CONNECTED) ) {
491
492                                         if(!osrf_app_session_connect( session )) 
493                                                 return 0;
494                                 }
495                         }
496                 }
497         }
498
499         char* string = osrfMessageSerializeBatch(msgs, size);
500
501         if( string ) {
502
503                 transport_message* t_msg = message_init( 
504                                 string, "", session->session_id, session->remote_id, NULL );
505
506                 retval = client_send_message( session->transport_handle, t_msg );
507
508                 if( retval ) osrfLogError(OSRF_LOG_MARK, "client_send_message failed");
509
510                 osrfLogInfo(OSRF_LOG_MARK, "[%s] sent %d bytes of data to %s",
511                         session->remote_service, strlen(string), t_msg->recipient );
512
513                 osrfLogDebug(OSRF_LOG_MARK, "Sent: %s", string );
514
515                 free(string);
516                 message_free( t_msg );
517         }
518
519         return retval; 
520 }
521
522
523
524 int _osrf_app_session_send( osrf_app_session* session, osrf_message* msg ){
525         if( !(session && msg) ) return 0;
526         osrfMessage* a[1];
527         a[0] = msg;
528         return osrfAppSessionSendBatch( session, a, 1 );
529 }
530
531
532
533
534 /**  Waits up to 'timeout' seconds for some data to arrive.
535   * Any data that arrives will be processed according to its
536   * payload and message type.  This method will return after
537   * any data has arrived.
538   */
539 int osrf_app_session_queue_wait( osrf_app_session* session, int timeout, int* recvd ){
540         if(session == NULL) return 0;
541         int ret_val = 0;
542         osrfLogDebug(OSRF_LOG_MARK,  "AppSession in queue_wait with timeout %d", timeout );
543         ret_val = osrf_stack_entry_point(session->transport_handle, timeout, recvd);
544         return ret_val;
545 }
546
547 /** Disconnects (if client) and removes the given session from the global session cache 
548   * ! This free's all attached app_requests ! 
549   */
550 void osrfAppSessionFree( osrfAppSession* ses ) {
551         osrf_app_session_destroy( ses );
552 }
553
554
555 void osrf_app_session_destroy( osrf_app_session* session ){
556         if(session == NULL) return;
557
558         osrfLogDebug(OSRF_LOG_MARK,  "AppSession [%s] [%s] destroying self and deleting requests", 
559                         session->remote_service, session->session_id );
560         if(session->type == OSRF_SESSION_CLIENT 
561                         && session->state != OSRF_SESSION_DISCONNECTED ) { /* disconnect if we're a client */
562                 osrf_message* dis_msg = osrf_message_init( DISCONNECT, session->thread_trace, 1 );
563                 _osrf_app_session_send( session, dis_msg ); 
564                 osrf_message_free(dis_msg);
565         }
566
567         osrfHashRemove( osrfAppSessionCache, session->session_id );
568         _osrf_app_session_free( session );
569 }
570
571 osrf_message* osrfAppSessionRequestRecv(
572                 osrf_app_session* session, int req_id, int timeout ) {
573         return osrf_app_session_request_recv( session, req_id, timeout );
574 }
575 osrf_message* osrf_app_session_request_recv( 
576                 osrf_app_session* session, int req_id, int timeout ) {
577         if(req_id < 0 || session == NULL)
578                 return NULL;
579         osrf_app_request* req = OSRF_LIST_GET_INDEX( session->request_queue, req_id );
580         return _osrf_app_request_recv( req, timeout );
581 }
582
583
584
585 int osrfAppRequestRespond( osrfAppSession* ses, int requestId, jsonObject* data ) {
586         if(!ses || ! data ) return -1;
587
588         osrf_message* msg = osrf_message_init( RESULT, requestId, 1 );
589         char* json = jsonObjectToJSON( data );
590
591         osrf_message_set_result_content( msg, json );
592         _osrf_app_session_send( ses, msg ); 
593
594         free(json);
595         osrf_message_free( msg );
596
597         return 0;
598 }
599
600
601 int osrfAppRequestRespondComplete( 
602                 osrfAppSession* ses, int requestId, jsonObject* data ) {
603
604         osrf_message* payload = osrf_message_init( RESULT, requestId, 1 );
605         osrf_message_set_status_info( payload, NULL, "OK", OSRF_STATUS_OK );
606
607         osrf_message* status = osrf_message_init( STATUS, requestId, 1);
608         osrf_message_set_status_info( status, "osrfConnectStatus", "Request Complete", OSRF_STATUS_COMPLETE );
609         
610         if (data) {
611                 char* json = jsonObjectToJSON( data );
612                 osrf_message_set_result_content( payload, json );
613                 free(json);
614
615                 osrfMessage* ms[2];
616                 ms[0] = payload;
617                 ms[1] = status;
618
619                 osrfAppSessionSendBatch( ses, ms, 2 );
620
621                 osrf_message_free( payload );
622         } else {
623                 osrfAppSessionSendBatch( ses, &status, 1 );
624         }
625
626         osrf_message_free( status );
627
628         return 0;
629 }
630
631 int osrfAppSessionStatus( osrfAppSession* ses, int type, char* name, int reqId, char* message ) {
632
633         if(ses) {
634                 osrf_message* msg = osrf_message_init( STATUS, reqId, 1);
635                 osrf_message_set_status_info( msg, name, message, type );
636                 _osrf_app_session_send( ses, msg ); 
637                 osrf_message_free( msg );
638                 return 0;
639         }
640         return -1;
641 }
642
643
644
645
646
647