]> git.evergreen-ils.org Git - OpenSRF.git/blob - src/gateway/osrf_http_translator.c
Several minor and inconsequential changes:
[OpenSRF.git] / src / gateway / osrf_http_translator.c
1 #include <sys/time.h>
2 #include <sys/resource.h>
3 #include <unistd.h>
4 #include <strings.h>
5 #include "apachetools.h"
6 #include <opensrf/osrf_app_session.h>
7 #include <opensrf/osrf_system.h>
8 #include <opensrf/osrfConfig.h>
9 #include <opensrf/osrf_json.h>
10 #include <opensrf/osrf_cache.h>
11
12 #define MODULE_NAME "osrf_http_translator_module"
13 #define OSRF_TRANSLATOR_CONFIG_FILE "OSRFTranslatorConfig"
14 #define OSRF_TRANSLATOR_CONFIG_CTX "OSRFTranslatorConfigContext"
15 #define OSRF_TRANSLATOR_CACHE_SERVER "OSRFTranslatorCacheServer"
16
17 #define DEFAULT_TRANSLATOR_CONFIG_CTX "gateway"
18 #define DEFAULT_TRANSLATOR_CONFIG_FILE "/openils/conf/opensrf_core.xml"
19 #define DEFAULT_TRANSLATOR_TIMEOUT 1200
20 #define DEFAULT_TRANSLATOR_CACHE_SERVERS "127.0.0.1:11211"
21
22 #define MULTIPART_CONTENT_TYPE "multipart/x-mixed-replace;boundary=\"%s\""
23 #define JSON_CONTENT_TYPE "text/plain"
24 #define MAX_MSGS_PER_PACKET 256
25 #define CACHE_TIME 300
26
27 #define OSRF_HTTP_HEADER_TO "X-OpenSRF-to"
28 #define OSRF_HTTP_HEADER_XID "X-OpenSRF-xid"
29 #define OSRF_HTTP_HEADER_FROM "X-OpenSRF-from"
30 #define OSRF_HTTP_HEADER_THREAD "X-OpenSRF-thread"
31 #define OSRF_HTTP_HEADER_TIMEOUT "X-OpenSRF-timeout"
32 #define OSRF_HTTP_HEADER_SERVICE "X-OpenSRF-service"
33 #define OSRF_HTTP_HEADER_MULTIPART "X-OpenSRF-multipart"
34
35
36 char* configFile = DEFAULT_TRANSLATOR_CONFIG_FILE;
37 char* configCtx = DEFAULT_TRANSLATOR_CONFIG_CTX;
38 char* cacheServers = DEFAULT_TRANSLATOR_CACHE_SERVERS;
39
40 char* routerName = NULL;
41 char* domainName = NULL;
42 int osrfConnected = 0;
43 char recipientBuf[128];
44 char contentTypeBuf[80];
45
46 #if 0
47 // Commented out to avoid compiler warning
48 // for development only, writes to apache error log
49 static void _dbg(char* s, ...) {
50     VA_LIST_TO_STRING(s);
51     fprintf(stderr, "%s\n", VA_BUF);
52     fflush(stderr);
53 }
54 #endif
55
56 // Translator struct
57 typedef struct {
58     request_rec* apreq;
59     transport_client* handle;
60     osrfList* messages;
61     char* body;
62     char* delim;
63     const char* recipient;
64     const char* service;
65     const char* thread;
66     const char* remoteHost;
67     int complete;
68     int timeout;
69     int multipart;
70     int connectOnly; // there is only 1 message, a CONNECT
71     int disconnectOnly; // there is only 1 message, a DISCONNECT
72     int connecting; // there is a connect message in this batch
73     int disconnecting; // there is a connect message in this batch
74     int localXid;
75 } osrfHttpTranslator;
76
77
78 static const char* osrfHttpTranslatorGetConfigFile(cmd_parms *parms, void *config, const char *arg) {
79     configFile = (char*) arg;
80         return NULL;
81 }
82 static const char* osrfHttpTranslatorGetConfigFileCtx(cmd_parms *parms, void *config, const char *arg) {
83     configCtx = (char*) arg;
84         return NULL;
85 }
86 static const char* osrfHttpTranslatorGetCacheServer(cmd_parms *parms, void *config, const char *arg) {
87     cacheServers = (char*) arg;
88         return NULL;
89 }
90
91 /** set up the configuration handlers */
92 static const command_rec osrfHttpTranslatorCmds[] = {
93         AP_INIT_TAKE1( OSRF_TRANSLATOR_CONFIG_FILE, osrfHttpTranslatorGetConfigFile,
94                         NULL, RSRC_CONF, "osrf translator config file"),
95         AP_INIT_TAKE1( OSRF_TRANSLATOR_CONFIG_CTX, osrfHttpTranslatorGetConfigFileCtx,
96                         NULL, RSRC_CONF, "osrf translator config file context"),
97         AP_INIT_TAKE1( OSRF_TRANSLATOR_CACHE_SERVER, osrfHttpTranslatorGetCacheServer,
98                         NULL, RSRC_CONF, "osrf translator cache server"),
99     {NULL}
100 };
101
102
103 // there can only be one, so use a global static one
104 static osrfHttpTranslator globalTranslator;
105
106 /*
107  * Constructs a new translator object based on the current apache 
108  * request_rec.  Reads the request body and headers.
109  */
110 static osrfHttpTranslator* osrfNewHttpTranslator(request_rec* apreq) {
111     osrfHttpTranslator* trans = &globalTranslator;
112     trans->apreq = apreq;
113     trans->complete = 0;
114     trans->connectOnly = 0;
115     trans->disconnectOnly = 0;
116     trans->connecting = 0;
117     trans->disconnecting = 0;
118     trans->remoteHost = apreq->connection->remote_ip;
119     trans->messages = NULL;
120
121     /* load the message body */
122     osrfStringArray* params     = apacheParseParms(apreq);
123     trans->body = apacheGetFirstParamValue(params, "osrf-msg");
124     osrfStringArrayFree(params);
125
126     /* load the request headers */
127     if (apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_XID))
128         // force our log xid to match the caller
129         osrfLogForceXid(strdup(apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_XID)));
130
131     trans->handle = osrfSystemGetTransportClient();
132     trans->recipient = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_TO);
133     trans->service = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_SERVICE);
134
135     const char* timeout = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_TIMEOUT);
136     if(timeout) 
137         trans->timeout = atoi(timeout);
138     else 
139         trans->timeout = DEFAULT_TRANSLATOR_TIMEOUT;
140
141     const char* multipart = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_MULTIPART);
142     if(multipart && !strcasecmp(multipart, "true"))
143         trans->multipart = 1;
144     else
145         trans->multipart = 0;
146
147     char buf[32];
148     snprintf(buf, sizeof(buf), "%d%ld", getpid(), time(NULL));
149     trans->delim = md5sum(buf);
150
151     /* Use thread if it has been passed in; otherwise, just use the delimiter */
152     trans->thread = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_THREAD)
153         ?  apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_THREAD)
154         : (const char*)trans->delim;
155
156     return trans;
157 }
158
159 static void osrfHttpTranslatorFree(osrfHttpTranslator* trans) {
160     if(!trans) return;
161     if(trans->body)
162         free(trans->body);
163     if(trans->delim)
164         free(trans->delim);
165     osrfListFree(trans->messages);
166 }
167
168 #if 0
169 // Commented out to avoid compiler warning
170 static void osrfHttpTranslatorDebug(osrfHttpTranslator* trans) {
171     _dbg("-----------------------------------");
172     _dbg("body = %s", trans->body);
173     _dbg("service = %s", trans->service);
174     _dbg("thread = %s", trans->thread);
175     _dbg("multipart = %d", trans->multipart);
176     _dbg("recipient = %s", trans->recipient);
177 }
178 #endif
179
180 /**
181  * Determines the correct recipient address based on the requested 
182  * service or recipient address.  
183  */
184 static int osrfHttpTranslatorSetTo(osrfHttpTranslator* trans) {
185     int stat = 0;
186     jsonObject* sessionCache = NULL;
187
188     if(trans->service) {
189         if(trans->recipient) {
190             osrfLogError(OSRF_LOG_MARK, "Specifying both SERVICE and TO are not allowed");
191
192         } else {
193             // service is specified, build a recipient address 
194             // from the router, domain, and service
195             int size = snprintf(recipientBuf, 128, "%s@%s/%s", routerName,
196                 domainName, trans->service);
197             recipientBuf[size] = '\0';
198             osrfLogDebug(OSRF_LOG_MARK, "Set recipient to %s", recipientBuf);
199             trans->recipient = recipientBuf;
200             stat = 1;
201         }
202
203     } else {
204
205         if(trans->recipient) {
206             sessionCache = osrfCacheGetObject(trans->thread);
207
208             if(sessionCache) {
209                 const char* ipAddr = jsonObjectGetString(
210                     jsonObjectGetKeyConst( sessionCache, "ip" ));
211                 const char* recipient = jsonObjectGetString(
212                     jsonObjectGetKeyConst( sessionCache, "jid" ));
213
214                 // choosing a specific recipient address requires that the recipient and 
215                 // thread be cached on the server (so drone processes cannot be hijacked)
216                 if(!strcmp(ipAddr, trans->remoteHost) && !strcmp(recipient, trans->recipient)) {
217                     osrfLogDebug( OSRF_LOG_MARK,
218                         "Found cached session from host %s and recipient %s",
219                         trans->remoteHost, trans->recipient);
220                     stat = 1;
221                     trans->service = apr_pstrdup(
222                         trans->apreq->pool, jsonObjectGetString(
223                             jsonObjectGetKeyConst( sessionCache, "service" )));
224
225                 } else {
226                     osrfLogError(OSRF_LOG_MARK, 
227                         "Session cache for thread %s does not match request", trans->thread);
228                 }
229             }  else {
230                 osrfLogError(OSRF_LOG_MARK, 
231                     "attempt to send directly to %s without a session", trans->recipient);
232             }
233         } else {
234             osrfLogError(OSRF_LOG_MARK, "No SERVICE or RECIPIENT defined");
235         } 
236     }
237
238     jsonObjectFree(sessionCache);
239     return stat;
240 }
241
242 /**
243  * Parses the request body and logs any REQUEST messages to the activity log
244  */
245 static int osrfHttpTranslatorParseRequest(osrfHttpTranslator* trans) {
246     const osrfMessage* msg;
247     osrfMessage* msgList[MAX_MSGS_PER_PACKET];
248     int numMsgs = osrf_message_deserialize(trans->body, msgList, MAX_MSGS_PER_PACKET);
249     osrfLogDebug(OSRF_LOG_MARK, "parsed %d opensrf messages in this packet", numMsgs);
250
251     if(numMsgs == 0)
252         return 0;
253
254     if(numMsgs == 1) {
255         msg = msgList[0];
256         if(msg->m_type == CONNECT) {
257             trans->connectOnly = 1;
258             trans->connecting = 1;
259             return 1;
260         }
261         if(msg->m_type == DISCONNECT) {
262             trans->disconnectOnly = 1;
263             trans->disconnecting = 1;
264             return 1;
265         }
266     }
267
268     // log request messages to the activity log
269     int i;
270     for(i = 0; i < numMsgs; i++) {
271         msg = msgList[i];
272
273         switch(msg->m_type) {
274
275             case REQUEST: {
276                 const jsonObject* params = msg->_params;
277                 growing_buffer* act = buffer_init(128); 
278                 buffer_fadd(act, "[%s] [%s] %s %s", trans->remoteHost, "",
279                     trans->service, msg->method_name);
280
281                 const jsonObject* obj = NULL;
282                 int i = 0;
283                 char* str; 
284                 while((obj = jsonObjectGetIndex(params, i++))) {
285                     str = jsonObjectToJSON(obj);
286                     if( i == 1 )
287                         OSRF_BUFFER_ADD(act, " ");
288                     else 
289                         OSRF_BUFFER_ADD(act, ", ");
290                     OSRF_BUFFER_ADD(act, str);
291                     free(str);
292                 }
293                 osrfLogActivity(OSRF_LOG_MARK, "%s", act->buf);
294                 buffer_free(act);
295                 break;
296             }
297
298             case CONNECT:
299                 trans->connecting = 1;
300                 break;
301
302             case DISCONNECT:
303                 trans->disconnecting = 1;
304                 break;
305
306             case RESULT:
307                 osrfLogWarning( OSRF_LOG_MARK, "Unexpected RESULT message received" );
308                 break;
309
310             case STATUS:
311                 osrfLogWarning( OSRF_LOG_MARK, "Unexpected STATUS message received" );
312                 break;
313
314             default:
315                 osrfLogWarning( OSRF_LOG_MARK, "Invalid message type %d received",
316                     msg->m_type );
317                 break;
318         }
319     }
320
321     return 1;
322 }
323
324 static int osrfHttpTranslatorCheckStatus(osrfHttpTranslator* trans, transport_message* msg) {
325     osrfMessage* omsgList[MAX_MSGS_PER_PACKET];
326     int numMsgs = osrf_message_deserialize(msg->body, omsgList, MAX_MSGS_PER_PACKET);
327     osrfLogDebug(OSRF_LOG_MARK, "parsed %d response messages", numMsgs);
328     if(numMsgs == 0) return 0;
329
330     osrfMessage* last = omsgList[numMsgs-1];
331     if(last->m_type == STATUS) {
332         if(last->status_code == OSRF_STATUS_TIMEOUT) {
333             osrfLogDebug(OSRF_LOG_MARK, "removing cached session on request timeout");
334             osrfCacheRemove(trans->thread);
335             return 0;
336         }
337         // XXX hm, check for explicit status=COMPLETE message instead??
338         if(last->status_code != OSRF_STATUS_CONTINUE)
339             trans->complete = 1;
340     }
341
342     return 1;
343 }
344
345 static void osrfHttpTranslatorInitHeaders(osrfHttpTranslator* trans, transport_message* msg) {
346     apr_table_set(trans->apreq->headers_out, OSRF_HTTP_HEADER_FROM, msg->sender);
347     apr_table_set(trans->apreq->headers_out, OSRF_HTTP_HEADER_THREAD, trans->thread);
348     if(trans->multipart) {
349         sprintf(contentTypeBuf, MULTIPART_CONTENT_TYPE, trans->delim);
350         contentTypeBuf[79] = '\0';
351         osrfLogDebug(OSRF_LOG_MARK, "content type %s : %s : %s", MULTIPART_CONTENT_TYPE,
352         trans->delim, contentTypeBuf);
353         ap_set_content_type(trans->apreq, contentTypeBuf);
354         ap_rprintf(trans->apreq, "--%s\n", trans->delim);
355     } else {
356         ap_set_content_type(trans->apreq, JSON_CONTENT_TYPE);
357     }
358 }
359
360 /**
361  * Cache the transaction with the JID of the backend process we are talking to
362  */
363 static void osrfHttpTranslatorCacheSession(osrfHttpTranslator* trans, const char* jid) {
364     jsonObject* cacheObj = jsonNewObject(NULL);
365     jsonObjectSetKey(cacheObj, "ip", jsonNewObject(trans->remoteHost));
366     jsonObjectSetKey(cacheObj, "jid", jsonNewObject(jid));
367     jsonObjectSetKey(cacheObj, "service", jsonNewObject(trans->service));
368     osrfCachePutObject((char*) trans->thread, cacheObj, CACHE_TIME);
369 }
370
371
372 /**
373  * Writes a single chunk of multipart/x-mixed-replace content
374  */
375 static void osrfHttpTranslatorWriteChunk(osrfHttpTranslator* trans, transport_message* msg) {
376     osrfLogInternal(OSRF_LOG_MARK, "sending multipart chunk %s", msg->body);
377     ap_rprintf(trans->apreq, 
378         "Content-type: %s\n\n%s\n\n", JSON_CONTENT_TYPE, msg->body);
379     //osrfLogInternal(OSRF_LOG_MARK, "Apache sending data: Content-type: %s\n\n%s\n\n",
380     //JSON_CONTENT_TYPE, msg->body);
381     if(trans->complete) {
382         ap_rprintf(trans->apreq, "--%s--\n", trans->delim);
383         //osrfLogInternal(OSRF_LOG_MARK, "Apache sending data: --%s--\n", trans->delim);
384     } else {
385         ap_rprintf(trans->apreq, "--%s\n", trans->delim);
386         //osrfLogInternal(OSRF_LOG_MARK, "Apache sending data: --%s\n", trans->delim);
387     }
388     ap_rflush(trans->apreq);
389 }
390
391 static int osrfHttpTranslatorProcess(osrfHttpTranslator* trans) {
392     if(trans->body == NULL)
393         return HTTP_BAD_REQUEST;
394
395     if(!osrfHttpTranslatorSetTo(trans))
396         return HTTP_BAD_REQUEST;
397
398     if(!osrfHttpTranslatorParseRequest(trans))
399         return HTTP_BAD_REQUEST;
400
401     while(client_recv(trans->handle, 0))
402         continue; // discard any old status messages in the recv queue
403
404     // send the message to the recipient
405     transport_message* tmsg = message_init(
406         trans->body, NULL, trans->thread, trans->recipient, NULL);
407     message_set_osrf_xid(tmsg, osrfLogGetXid());
408     client_send_message(trans->handle, tmsg);
409     message_free(tmsg); 
410
411     if(trans->disconnectOnly) {
412         osrfLogDebug(OSRF_LOG_MARK, "exiting early on disconnect");
413         osrfCacheRemove(trans->thread);
414         return OK;
415     }
416
417     // process the response from the opensrf service
418     int firstWrite = 1;
419     while(!trans->complete) {
420         transport_message* msg = client_recv(trans->handle, trans->timeout);
421
422         if(trans->handle->error) {
423             osrfLogError(OSRF_LOG_MARK, "Transport error");
424             osrfCacheRemove(trans->thread);
425             return HTTP_INTERNAL_SERVER_ERROR;
426         }
427
428         if(msg == NULL)
429             return HTTP_GATEWAY_TIME_OUT;
430
431         if(msg->is_error) {
432             osrfLogError(OSRF_LOG_MARK, "XMPP message resulted in error code %d", msg->error_code);
433             osrfCacheRemove(trans->thread);
434             return HTTP_NOT_FOUND;
435         }
436
437         if(!osrfHttpTranslatorCheckStatus(trans, msg))
438             continue;
439
440         if(firstWrite) {
441             osrfHttpTranslatorInitHeaders(trans, msg);
442             if(trans->connecting)
443                 osrfHttpTranslatorCacheSession(trans, msg->sender);
444             firstWrite = 0;
445         }
446
447         if(trans->multipart) {
448             osrfHttpTranslatorWriteChunk(trans, msg);
449             if(trans->connectOnly)
450                 break;
451         } else {
452             if(!trans->messages)
453                 trans->messages = osrfNewList();
454             osrfListPush(trans->messages, msg->body);
455
456             if(trans->complete || trans->connectOnly) {
457                 growing_buffer* buf = buffer_init(128);
458                 int i;
459                 OSRF_BUFFER_ADD(buf, osrfListGetIndex(trans->messages, 0));
460                 for(i = 1; i < trans->messages->size; i++) {
461                     buffer_chomp(buf); // chomp off the closing array bracket
462                     char* body = osrfListGetIndex(trans->messages, i);
463                     char newbuf[strlen(body)];
464                     sprintf(newbuf, body+1); // chomp off the opening array bracket
465                     OSRF_BUFFER_ADD_CHAR(buf, ',');
466                     OSRF_BUFFER_ADD(buf, newbuf);
467                 }
468
469                 ap_rputs(buf->buf, trans->apreq);
470                 buffer_free(buf);
471             }
472         }
473     }
474
475     if(trans->disconnecting) // DISCONNECT within a multi-message batch
476         osrfCacheRemove(trans->thread);
477
478     return OK;
479 }
480
481 static void testConnection(request_rec* r) {
482         if(!osrfConnected || !osrfSystemGetTransportClient()) {
483         osrfLogError(OSRF_LOG_MARK, "We're not connected to OpenSRF");
484                 ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, "We're not connected to OpenSRF");
485                 usleep(100000); // .1 second to prevent process die/start overload
486                 exit(1);
487         }
488 }
489
490 #if 0
491 // Commented out to avoid compiler warning
492 // it's dead, Jim
493 static apr_status_t childExit(void* data) {
494     osrf_system_shutdown();
495     return OK;
496 }
497 #endif
498
499 static void childInit(apr_pool_t *p, server_rec *s) {
500         if(!osrfSystemBootstrapClientResc(configFile, configCtx, "translator")) {
501                 ap_log_error( APLOG_MARK, APLOG_ERR, 0, s, 
502                         "Unable to Bootstrap OpenSRF Client with config %s..", configFile);
503                 return;
504         }
505
506     routerName = osrfConfigGetValue(NULL, "/router_name");
507     domainName = osrfConfigGetValue(NULL, "/domain");
508     const char* servers[] = {cacheServers};
509     osrfCacheInit(servers, 1, 86400);
510         osrfConnected = 1;
511
512     // at pool destroy time (= child exit time), cleanup
513     // XXX causes us to disconnect even for clone()'d process cleanup (as in mod_cgi)
514     //apr_pool_cleanup_register(p, NULL, childExit, apr_pool_cleanup_null);
515 }
516
517 static int handler(request_rec *r) {
518     int stat = OK;
519         if(strcmp(r->handler, MODULE_NAME)) return DECLINED;
520     if(r->header_only) return stat;
521
522         r->allowed |= (AP_METHOD_BIT << M_GET);
523         r->allowed |= (AP_METHOD_BIT << M_POST);
524
525         osrfLogSetAppname("osrf_http_translator");
526     testConnection(r);
527
528         osrfLogMkXid();
529     osrfHttpTranslator* trans = osrfNewHttpTranslator(r);
530     if(trans->body) {
531         stat = osrfHttpTranslatorProcess(trans);
532         //osrfHttpTranslatorDebug(trans);
533         osrfLogInfo(OSRF_LOG_MARK, "translator resulted in status %d", stat);
534     } else {
535         osrfLogWarning(OSRF_LOG_MARK, "no message body to process");
536     }
537     osrfHttpTranslatorFree(trans);
538         return stat;
539 }
540
541
542 static void registerHooks (apr_pool_t *p) {
543         ap_hook_handler(handler, NULL, NULL, APR_HOOK_MIDDLE);
544         ap_hook_child_init(childInit, NULL, NULL, APR_HOOK_MIDDLE);
545 }
546
547
548 module AP_MODULE_DECLARE_DATA osrf_http_translator_module = {
549         STANDARD20_MODULE_STUFF,
550     NULL,
551         NULL,
552     NULL,
553         NULL,
554     osrfHttpTranslatorCmds,
555         registerHooks,
556 };