1 #include "osrf_app_session.h"
4 /* the global app_session cache */
5 osrfHash* osrfAppSessionCache = NULL;
8 // --------------------------------------------------------------------------
9 // --------------------------------------------------------------------------
11 // --------------------------------------------------------------------------
13 /** Allocation and initializes a new app_request object */
14 osrf_app_request* _osrf_app_request_init(
15 osrf_app_session* session, osrf_message* msg ) {
17 osrf_app_request* req =
18 (osrf_app_request*) safe_malloc(sizeof(osrf_app_request));
20 req->session = session;
21 req->request_id = msg->thread_trace;
31 void osrfAppSessionCleanup() {
32 osrfHashFree(osrfAppSessionCache);
35 /** Frees memory used by an app_request object */
36 void _osrf_app_request_free( void * req ){
37 if( req == NULL ) return;
38 osrfAppRequest* r = (osrfAppRequest*) req;
40 if( r->payload ) osrf_message_free( r->payload );
43 osrf_message* cur_msg = req->result;
44 while( cur_msg != NULL ) {
45 osrf_message* next_msg = cur_msg->next;
46 osrf_message_free( cur_msg );
49 osrf_message_free( req->payload );
55 /** Pushes the given message onto the list of 'responses' to this request */
56 void _osrf_app_request_push_queue( osrf_app_request* req, osrf_message* result ){
57 if(req == NULL || result == NULL) return;
58 osrfLogDebug( OSRF_LOG_MARK, "App Session pushing request [%d] onto request queue", result->thread_trace );
59 if(req->result == NULL)
62 result->next = req->result;
67 /** Removes this app_request from our session request set */
68 void osrf_app_session_request_finish(
69 osrf_app_session* session, int req_id ){
71 if(session == NULL) return;
72 osrf_app_request* req = OSRF_LIST_GET_INDEX( session->request_queue, req_id );
73 if(req == NULL) return;
74 osrfListRemove( req->session->request_queue, req->request_id );
78 void osrf_app_session_request_reset_timeout( osrf_app_session* session, int req_id ) {
79 if(session == NULL) return;
80 osrfLogDebug( OSRF_LOG_MARK, "Resetting request timeout %d", req_id );
81 osrf_app_request* req = OSRF_LIST_GET_INDEX( session->request_queue, req_id );
82 if(req == NULL) return;
83 req->reset_timeout = 1;
86 /** Checks the receive queue for messages. If any are found, the first
87 * is popped off and returned. Otherwise, this method will wait at most timeout
88 * seconds for a message to appear in the receive queue. Once it arrives it is returned.
89 * If no messages arrive in the timeout provided, null is returned.
91 osrf_message* _osrf_app_request_recv( osrf_app_request* req, int timeout ) {
93 if(req == NULL) return NULL;
95 if( req->result != NULL ) {
96 /* pop off the first message in the list */
97 osrf_message* tmp_msg = req->result;
98 req->result = req->result->next;
102 time_t start = time(NULL);
103 time_t remaining = (time_t) timeout;
105 while( remaining >= 0 ) {
106 /* tell the session to wait for stuff */
107 osrfLogDebug( OSRF_LOG_MARK, "In app_request receive with remaining time [%d]", (int) remaining );
109 osrf_app_session_queue_wait( req->session, 0 );
111 if( req->result != NULL ) { /* if we received anything */
112 /* pop off the first message in the list */
113 osrfLogDebug( OSRF_LOG_MARK, "app_request_recv received a message, returning it");
114 osrf_message* ret_msg = req->result;
115 osrf_message* tmp_msg = ret_msg->next;
116 req->result = tmp_msg;
123 osrf_app_session_queue_wait( req->session, (int) remaining );
125 if( req->result != NULL ) { /* if we received anything */
126 /* pop off the first message in the list */
127 osrfLogDebug( OSRF_LOG_MARK, "app_request_recv received a message, returning it");
128 osrf_message* ret_msg = req->result;
129 osrf_message* tmp_msg = ret_msg->next;
130 req->result = tmp_msg;
136 if(req->reset_timeout) {
137 remaining = (time_t) timeout;
138 req->reset_timeout = 0;
139 osrfLogDebug( OSRF_LOG_MARK, "Recevied a timeout reset");
141 remaining -= (int) (time(NULL) - start);
145 osrfLogDebug( OSRF_LOG_MARK, "Returning NULL from app_request_recv after timeout");
149 /** Resend this requests original request message */
150 int _osrf_app_request_resend( osrf_app_request* req ) {
151 if(req == NULL) return 0;
153 osrfLogDebug( OSRF_LOG_MARK, "Resending request [%d]", req->request_id );
154 return _osrf_app_session_send( req->session, req->payload );
161 // --------------------------------------------------------------------------
162 // --------------------------------------------------------------------------
164 // --------------------------------------------------------------------------
166 /** returns a session from the global session hash */
167 osrf_app_session* osrf_app_session_find_session( char* session_id ) {
168 if(session_id) return osrfHashGet(osrfAppSessionCache, session_id);
173 /** adds a session to the global session cache */
174 void _osrf_app_session_push_session( osrf_app_session* session ) {
176 if( osrfAppSessionCache == NULL ) osrfAppSessionCache = osrfNewHash();
177 if( osrfHashGet( osrfAppSessionCache, session->session_id ) ) return;
178 osrfHashSet( osrfAppSessionCache, session, session->session_id );
181 /** Allocates a initializes a new app_session */
183 osrf_app_session* osrfAppSessionClientInit( char* remote_service ) {
184 return osrf_app_client_session_init( remote_service );
187 osrf_app_session* osrf_app_client_session_init( char* remote_service ) {
189 osrf_app_session* session = safe_malloc(sizeof(osrf_app_session));
191 session->transport_handle = osrf_system_get_transport_client();
192 if( session->transport_handle == NULL ) {
193 osrfLogWarning( OSRF_LOG_MARK, "No transport client for service 'client'");
197 char target_buf[512];
198 memset(target_buf,0,512);
200 osrfStringArray* arr = osrfNewStringArray(8);
201 osrfConfigGetValueList(NULL, arr, "/domains/domain");
202 char* domain = osrfStringArrayGetString(arr, 0);
203 char* router_name = osrfConfigGetValue(NULL, "/router_name");
205 sprintf( target_buf, "%s@%s/%s", router_name, domain, remote_service );
206 osrfStringArrayFree(arr);
210 session->request_queue = osrfNewList();
211 session->request_queue->freeItem = &_osrf_app_request_free;
212 session->remote_id = strdup(target_buf);
213 session->orig_remote_id = strdup(session->remote_id);
214 session->remote_service = strdup(remote_service);
216 #ifdef ASSUME_STATELESS
217 session->stateless = 1;
218 osrfLogDebug( OSRF_LOG_MARK, "%s session is stateless", remote_service );
220 session->stateless = 0;
221 osrfLogDebug( OSRF_LOG_MARK, "%s session is NOT stateless", remote_service );
224 /* build a chunky, random session id */
228 sprintf(id, "%lf.%d%d", get_timestamp_millis(), (int)time(NULL), getpid());
229 session->session_id = strdup(id);
230 osrfLogDebug( OSRF_LOG_MARK, "Building a new client session with id [%s] [%s]",
231 session->remote_service, session->session_id );
233 session->thread_trace = 0;
234 session->state = OSRF_SESSION_DISCONNECTED;
235 session->type = OSRF_SESSION_CLIENT;
236 //session->next = NULL;
237 _osrf_app_session_push_session( session );
241 osrf_app_session* osrf_app_server_session_init(
242 char* session_id, char* our_app, char* remote_id ) {
244 osrfLogInfo( OSRF_LOG_MARK, "Initing server session with session id %s, service %s,"
245 " and remote_id %s", session_id, our_app, remote_id );
247 osrf_app_session* session = osrf_app_session_find_session( session_id );
248 if(session) return session;
250 session = safe_malloc(sizeof(osrf_app_session));
252 session->transport_handle = osrf_system_get_transport_client();
253 if( session->transport_handle == NULL ) {
254 osrfLogWarning( OSRF_LOG_MARK, "No transport client for service '%s'", our_app );
259 char* statel = osrf_settings_host_value("/apps/%s/stateless", our_app );
260 if(statel) stateless = atoi(statel);
264 session->request_queue = osrfNewList();
265 session->request_queue->freeItem = &_osrf_app_request_free;
266 session->remote_id = strdup(remote_id);
267 session->orig_remote_id = strdup(remote_id);
268 session->session_id = strdup(session_id);
269 session->remote_service = strdup(our_app);
270 session->stateless = stateless;
272 #ifdef ASSUME_STATELESS
273 session->stateless = 1;
276 session->thread_trace = 0;
277 session->state = OSRF_SESSION_DISCONNECTED;
278 session->type = OSRF_SESSION_SERVER;
280 _osrf_app_session_push_session( session );
287 /** frees memory held by a session */
288 void _osrf_app_session_free( osrf_app_session* session ){
292 free(session->remote_id);
293 free(session->orig_remote_id);
294 free(session->session_id);
295 free(session->remote_service);
296 osrfListFree(session->request_queue);
300 int osrfAppSessionMakeRequest(
301 osrf_app_session* session, jsonObject* params,
302 char* method_name, int protocol, string_array* param_strings ) {
304 return osrf_app_session_make_req( session, params,
305 method_name, protocol, param_strings );
308 int osrf_app_session_make_req(
309 osrf_app_session* session, jsonObject* params,
310 char* method_name, int protocol, string_array* param_strings ) {
311 if(session == NULL) return -1;
313 osrf_message* req_msg = osrf_message_init( REQUEST, ++(session->thread_trace), protocol );
314 osrf_message_set_method(req_msg, method_name);
316 osrf_message_set_params(req_msg, params);
322 for(i = 0; i!= param_strings->size ; i++ ) {
323 osrf_message_add_param(req_msg,
324 string_array_get_string(param_strings,i));
329 osrf_app_request* req = _osrf_app_request_init( session, req_msg );
330 if(_osrf_app_session_send( session, req_msg ) ) {
331 osrfLogWarning( OSRF_LOG_MARK, "Error sending request message [%d]", session->thread_trace );
335 osrfLogDebug( OSRF_LOG_MARK, "Pushing [%d] onto requeust queue for session [%s] [%s]",
336 req->request_id, session->remote_service, session->session_id );
337 osrfListSet( session->request_queue, req, req->request_id );
338 return req->request_id;
341 void osrf_app_session_set_complete( osrf_app_session* session, int request_id ) {
345 osrf_app_request* req = OSRF_LIST_GET_INDEX( session->request_queue, request_id );
346 if(req) req->complete = 1;
349 int osrf_app_session_request_complete( osrf_app_session* session, int request_id ) {
352 osrf_app_request* req = OSRF_LIST_GET_INDEX( session->request_queue, request_id );
354 return req->complete;
359 /** Resets the remote connection id to that of the original*/
360 void osrf_app_session_reset_remote( osrf_app_session* session ){
364 free(session->remote_id);
365 osrfLogDebug( OSRF_LOG_MARK, "App Session [%s] [%s] resetting remote id to %s",
366 session->remote_service, session->session_id, session->orig_remote_id );
368 session->remote_id = strdup(session->orig_remote_id);
371 void osrf_app_session_set_remote( osrf_app_session* session, char* remote_id ) {
374 if( session->remote_id )
375 free(session->remote_id );
376 session->remote_id = strdup( remote_id );
379 /** pushes the given message into the result list of the app_request
380 with the given request_id */
381 int osrf_app_session_push_queue(
382 osrf_app_session* session, osrf_message* msg ){
383 if(session == NULL || msg == NULL) return 0;
385 osrf_app_request* req = OSRF_LIST_GET_INDEX( session->request_queue, msg->thread_trace );
386 if(req == NULL) return 0;
387 _osrf_app_request_push_queue( req, msg );
392 /** Attempts to connect to the remote service */
393 int osrf_app_session_connect(osrf_app_session* session){
398 if(session->state == OSRF_SESSION_CONNECTED) {
402 int timeout = 5; /* XXX CONFIG VALUE */
404 osrfLogDebug( OSRF_LOG_MARK, "AppSession connecting to %s", session->remote_id );
406 /* defaulting to protocol 1 for now */
407 osrf_message* con_msg = osrf_message_init( CONNECT, session->thread_trace, 1 );
408 osrf_app_session_reset_remote( session );
409 session->state = OSRF_SESSION_CONNECTING;
410 int ret = _osrf_app_session_send( session, con_msg );
411 osrf_message_free(con_msg);
414 time_t start = time(NULL);
415 time_t remaining = (time_t) timeout;
417 while( session->state != OSRF_SESSION_CONNECTED && remaining >= 0 ) {
418 osrf_app_session_queue_wait( session, remaining );
419 remaining -= (int) (time(NULL) - start);
422 if(session->state == OSRF_SESSION_CONNECTED)
423 osrfLogDebug( OSRF_LOG_MARK, " * Connected Successfully to %s", session->remote_service );
425 if(session->state != OSRF_SESSION_CONNECTED)
433 /** Disconnects from the remote service */
434 int osrf_app_session_disconnect( osrf_app_session* session){
438 if(session->state == OSRF_SESSION_DISCONNECTED)
441 if(session->stateless && session->state != OSRF_SESSION_CONNECTED) {
442 osrfLogDebug( OSRF_LOG_MARK,
443 "Exiting disconnect on stateless session %s",
444 session->session_id);
448 osrfLogDebug(OSRF_LOG_MARK, "AppSession disconnecting from %s", session->remote_id );
450 osrf_message* dis_msg = osrf_message_init( DISCONNECT, session->thread_trace, 1 );
451 session->state = OSRF_SESSION_DISCONNECTED;
452 _osrf_app_session_send( session, dis_msg );
454 osrf_message_free( dis_msg );
455 osrf_app_session_reset_remote( session );
459 int osrf_app_session_request_resend( osrf_app_session* session, int req_id ) {
460 osrf_app_request* req = OSRF_LIST_GET_INDEX( session->request_queue, req_id );
461 return _osrf_app_request_resend( req );
465 int osrfAppSessionSendBatch( osrfAppSession* session, osrf_message* msgs[], int size ) {
467 if( !(session && msgs && size > 0) ) return 0;
471 osrfMessage* msg = msgs[0];
475 osrf_app_session_queue_wait( session, 0 );
477 /* if we're not stateless and not connected and the first
478 message is not a connect message, then we do the connect first */
479 if(session->stateless) {
480 osrf_app_session_reset_remote(session);
484 if( (msg->m_type != CONNECT) && (msg->m_type != DISCONNECT) &&
485 (session->state != OSRF_SESSION_CONNECTED) ) {
486 if(!osrf_app_session_connect( session ))
492 char* string = osrfMessageSerializeBatch(msgs, size);
496 transport_message* t_msg = message_init(
497 string, "", session->session_id, session->remote_id, NULL );
499 osrfLogDebug(OSRF_LOG_MARK, "Session [%s] [%s] sending to %s \nData: %s",
500 session->remote_service, session->session_id, t_msg->recipient, string );
502 retval = client_send_message( session->transport_handle, t_msg );
505 message_free( t_msg );
514 int _osrf_app_session_send( osrf_app_session* session, osrf_message* msg ){
515 if( !(session && msg) ) return 0;
518 return osrfAppSessionSendBatch( session, a, 1 );
524 /** Waits up to 'timeout' seconds for some data to arrive.
525 * Any data that arrives will be processed according to its
526 * payload and message type. This method will return after
527 * any data has arrived.
529 int osrf_app_session_queue_wait( osrf_app_session* session, int timeout ){
530 if(session == NULL) return 0;
532 osrfLogDebug(OSRF_LOG_MARK, "AppSession in queue_wait with timeout %d", timeout );
533 ret_val = osrf_stack_entry_point(session->transport_handle, timeout);
537 /** Disconnects (if client) and removes the given session from the global session cache
538 * ! This free's all attached app_requests !
540 void osrfAppSessionFree( osrfAppSession* ses ) {
541 osrf_app_session_destroy( ses );
545 void osrf_app_session_destroy( osrf_app_session* session ){
546 if(session == NULL) return;
548 osrfLogDebug(OSRF_LOG_MARK, "AppSession [%s] [%s] destroying self and deleting requests",
549 session->remote_service, session->session_id );
550 if(session->type == OSRF_SESSION_CLIENT
551 && session->state != OSRF_SESSION_DISCONNECTED ) { /* disconnect if we're a client */
552 osrf_message* dis_msg = osrf_message_init( DISCONNECT, session->thread_trace, 1 );
553 _osrf_app_session_send( session, dis_msg );
554 osrf_message_free(dis_msg);
557 osrfHashRemove( osrfAppSessionCache, session->session_id );
558 _osrf_app_session_free( session );
561 osrf_message* osrfAppSessionRequestRecv(
562 osrf_app_session* session, int req_id, int timeout ) {
563 return osrf_app_session_request_recv( session, req_id, timeout );
565 osrf_message* osrf_app_session_request_recv(
566 osrf_app_session* session, int req_id, int timeout ) {
567 if(req_id < 0 || session == NULL)
569 osrf_app_request* req = OSRF_LIST_GET_INDEX( session->request_queue, req_id );
570 return _osrf_app_request_recv( req, timeout );
575 int osrfAppRequestRespond( osrfAppSession* ses, int requestId, jsonObject* data ) {
576 if(!ses || ! data ) return -1;
578 osrf_message* msg = osrf_message_init( RESULT, requestId, 1 );
579 char* json = jsonObjectToJSON( data );
580 osrf_message_set_result_content( msg, json );
581 _osrf_app_session_send( ses, msg );
584 osrf_message_free( msg );
590 int osrfAppRequestRespondComplete(
591 osrfAppSession* ses, int requestId, jsonObject* data ) {
593 osrf_message* payload = osrf_message_init( RESULT, requestId, 1 );
594 osrf_message_set_status_info( payload, NULL, "OK", OSRF_STATUS_OK );
596 char* json = jsonObjectToJSON( data );
597 osrf_message_set_result_content( payload, json );
600 osrf_message* status = osrf_message_init( STATUS, requestId, 1);
601 osrf_message_set_status_info( status, "osrfConnectStatus", "Request Complete", OSRF_STATUS_COMPLETE );
607 osrfAppSessionSendBatch( ses, ms, 2 );
609 osrf_message_free( payload );
610 osrf_message_free( status );
617 int osrfAppSessionStatus( osrfAppSession* ses, int type, char* name, int reqId, char* message ) {
620 osrf_message* msg = osrf_message_init( STATUS, reqId, 1);
621 osrf_message_set_status_info( msg, name, message, type );
622 _osrf_app_session_send( ses, msg );
623 osrf_message_free( msg );