Pedantic change for const-correctness: change three calls to
[OpenSRF.git] / src / gateway / osrf_http_translator.c
1 #include <sys/time.h>
2 #include <sys/resource.h>
3 #include <unistd.h>
4 #include <strings.h>
5 #include "apachetools.h"
6 #include <opensrf/osrf_app_session.h>
7 #include <opensrf/osrf_system.h>
8 #include <opensrf/osrfConfig.h>
9 #include <opensrf/osrf_json.h>
10 #include <opensrf/osrf_cache.h>
11
12 #define MODULE_NAME "osrf_http_translator_module"
13 #define OSRF_TRANSLATOR_CONFIG_FILE "OSRFTranslatorConfig"
14 #define OSRF_TRANSLATOR_CONFIG_CTX "OSRFTranslatorConfigContext"
15 #define OSRF_TRANSLATOR_CACHE_SERVER "OSRFTranslatorCacheServer"
16
17 #define DEFAULT_TRANSLATOR_CONFIG_CTX "gateway"
18 #define DEFAULT_TRANSLATOR_CONFIG_FILE "/openils/conf/opensrf_core.xml"
19 #define DEFAULT_TRANSLATOR_TIMEOUT 1200
20 #define DEFAULT_TRANSLATOR_CACHE_SERVERS "127.0.0.1:11211"
21
22 #define MULTIPART_CONTENT_TYPE "multipart/x-mixed-replace;boundary=\"%s\""
23 #define JSON_CONTENT_TYPE "text/plain"
24 #define MAX_MSGS_PER_PACKET 256
25 #define CACHE_TIME 300
26
27 #define OSRF_HTTP_HEADER_TO "X-OpenSRF-to"
28 #define OSRF_HTTP_HEADER_XID "X-OpenSRF-xid"
29 #define OSRF_HTTP_HEADER_FROM "X-OpenSRF-from"
30 #define OSRF_HTTP_HEADER_THREAD "X-OpenSRF-thread"
31 #define OSRF_HTTP_HEADER_TIMEOUT "X-OpenSRF-timeout"
32 #define OSRF_HTTP_HEADER_SERVICE "X-OpenSRF-service"
33 #define OSRF_HTTP_HEADER_MULTIPART "X-OpenSRF-multipart"
34
35
36 char* configFile = DEFAULT_TRANSLATOR_CONFIG_FILE;
37 char* configCtx = DEFAULT_TRANSLATOR_CONFIG_CTX;
38 char* cacheServers = DEFAULT_TRANSLATOR_CACHE_SERVERS;
39
40 char* routerName = NULL;
41 char* domainName = NULL;
42 int osrfConnected = 0;
43 char recipientBuf[128];
44 char contentTypeBuf[80];
45
46 // for development only, writes to apache error log
47 static void _dbg(char* s, ...) {
48     VA_LIST_TO_STRING(s);
49     fprintf(stderr, "%s\n", VA_BUF);
50     fflush(stderr);
51 }
52
53 // Translator struct
54 typedef struct {
55     request_rec* apreq;
56     transport_client* handle;
57     osrfList* messages;
58     char* body;
59     char* delim;
60     const char* recipient;
61     const char* service;
62     const char* thread;
63     const char* remoteHost;
64     int complete;
65     int timeout;
66     int multipart;
67     int connectOnly; // there is only 1 message, a CONNECT
68     int disconnectOnly; // there is only 1 message, a DISCONNECT
69     int connecting; // there is a connect message in this batch
70     int disconnecting; // there is a connect message in this batch
71     int localXid;
72 } osrfHttpTranslator;
73
74
75 static const char* osrfHttpTranslatorGetConfigFile(cmd_parms *parms, void *config, const char *arg) {
76     configFile = (char*) arg;
77         return NULL;
78 }
79 static const char* osrfHttpTranslatorGetConfigFileCtx(cmd_parms *parms, void *config, const char *arg) {
80     configCtx = (char*) arg;
81         return NULL;
82 }
83 static const char* osrfHttpTranslatorGetCacheServer(cmd_parms *parms, void *config, const char *arg) {
84     cacheServers = (char*) arg;
85         return NULL;
86 }
87
88 /** set up the configuration handlers */
89 static const command_rec osrfHttpTranslatorCmds[] = {
90         AP_INIT_TAKE1( OSRF_TRANSLATOR_CONFIG_FILE, osrfHttpTranslatorGetConfigFile,
91                         NULL, RSRC_CONF, "osrf translator config file"),
92         AP_INIT_TAKE1( OSRF_TRANSLATOR_CONFIG_CTX, osrfHttpTranslatorGetConfigFileCtx,
93                         NULL, RSRC_CONF, "osrf translator config file context"),
94         AP_INIT_TAKE1( OSRF_TRANSLATOR_CACHE_SERVER, osrfHttpTranslatorGetCacheServer,
95                         NULL, RSRC_CONF, "osrf translator cache server"),
96     {NULL}
97 };
98
99
100 // there can only be one, so use a global static one
101 static osrfHttpTranslator globalTranslator;
102
103 /*
104  * Constructs a new translator object based on the current apache 
105  * request_rec.  Reads the request body and headers.
106  */
107 static osrfHttpTranslator* osrfNewHttpTranslator(request_rec* apreq) {
108     osrfHttpTranslator* trans = &globalTranslator;
109     trans->apreq = apreq;
110     trans->complete = 0;
111     trans->connectOnly = 0;
112     trans->disconnectOnly = 0;
113     trans->connecting = 0;
114     trans->disconnecting = 0;
115     trans->remoteHost = apreq->connection->remote_ip;
116     trans->messages = NULL;
117
118     /* load the message body */
119         osrfStringArray* params = apacheParseParms(apreq);
120     trans->body = apacheGetFirstParamValue(params, "osrf-msg");
121     osrfStringArrayFree(params);
122
123     /* load the request headers */
124     if (apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_XID)) // force our log xid to match the caller
125             osrfLogForceXid(strdup(apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_XID)));
126
127     trans->handle = osrfSystemGetTransportClient();
128     trans->recipient = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_TO);
129     trans->service = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_SERVICE);
130
131     const char* timeout = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_TIMEOUT);
132     if(timeout) 
133         trans->timeout = atoi(timeout);
134     else 
135         trans->timeout = DEFAULT_TRANSLATOR_TIMEOUT;
136
137     const char* multipart = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_MULTIPART);
138     if(multipart && !strcasecmp(multipart, "true"))
139         trans->multipart = 1;
140     else
141         trans->multipart = 0;
142
143     char buf[32];
144     snprintf(buf, sizeof(buf), "%d%ld", getpid(), time(NULL));
145     trans->delim = md5sum(buf);
146
147     /* Use thread if it has been passed in; otherwise, just use the delimiter */
148     trans->thread = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_THREAD)
149         ?  apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_THREAD)
150         : (const char*)trans->delim;
151
152     return trans;
153 }
154
155 static void osrfHttpTranslatorFree(osrfHttpTranslator* trans) {
156     if(!trans) return;
157     if(trans->body)
158         free(trans->body);
159     if(trans->delim)
160         free(trans->delim);
161     osrfListFree(trans->messages);
162 }
163
164 static void osrfHttpTranslatorDebug(osrfHttpTranslator* trans) {
165     _dbg("-----------------------------------");
166     _dbg("body = %s", trans->body);
167     _dbg("service = %s", trans->service);
168     _dbg("thread = %s", trans->thread);
169     _dbg("multipart = %d", trans->multipart);
170     _dbg("recipient = %s", trans->recipient);
171 }
172
173 /**
174  * Determines the correct recipient address based on the requested 
175  * service or recipient address.  
176  */
177 static int osrfHttpTranslatorSetTo(osrfHttpTranslator* trans) {
178     int stat = 0;
179     jsonObject* sessionCache = NULL;
180
181     if(trans->service) {
182         if(trans->recipient) {
183             osrfLogError(OSRF_LOG_MARK, "Specifying both SERVICE and TO are not allowed");
184
185         } else {
186             // service is specified, build a recipient address 
187             // from the router, domain, and service
188             int size = snprintf(recipientBuf, 128, "%s@%s/%s", routerName, domainName, trans->service);
189             recipientBuf[size] = '\0';
190             osrfLogDebug(OSRF_LOG_MARK, "Set recipient to %s", recipientBuf);
191             trans->recipient = recipientBuf;
192             stat = 1;
193         }
194
195     } else {
196
197         if(trans->recipient) {
198             sessionCache = osrfCacheGetObject(trans->thread);
199
200             if(sessionCache) {
201                 const char* ipAddr = jsonObjectGetString(
202                     jsonObjectGetKeyConst( sessionCache, "ip" ));
203                 const char* recipient = jsonObjectGetString(
204                     jsonObjectGetKeyConst( sessionCache, "jid" ));
205
206                 // choosing a specific recipient address requires that the recipient and 
207                 // thread be cached on the server (so drone processes cannot be hijacked)
208                 if(!strcmp(ipAddr, trans->remoteHost) && !strcmp(recipient, trans->recipient)) {
209                     osrfLogDebug(OSRF_LOG_MARK, "Found cached session from host %s and recipient %s", 
210                         trans->remoteHost, trans->recipient);
211                     stat = 1;
212                     trans->service = apr_pstrdup(
213                         trans->apreq->pool, jsonObjectGetString(
214                             jsonObjectGetKeyConst( sessionCache, "service" )));
215
216                 } else {
217                     osrfLogError(OSRF_LOG_MARK, 
218                         "Session cache for thread %s does not match request", trans->thread);
219                 }
220             }  else {
221                 osrfLogError(OSRF_LOG_MARK, 
222                     "attempt to send directly to %s without a session", trans->recipient);
223             }
224         } else {
225             osrfLogError(OSRF_LOG_MARK, "No SERVICE or RECIPIENT defined");
226         } 
227     }
228
229     jsonObjectFree(sessionCache);
230     return stat;
231 }
232
233 /**
234  * Parses the request body and logs any REQUEST messages to the activity log
235  */
236 static int osrfHttpTranslatorParseRequest(osrfHttpTranslator* trans) {
237     osrfMessage* msg;
238     osrfMessage* msgList[MAX_MSGS_PER_PACKET];
239     int numMsgs = osrf_message_deserialize(trans->body, msgList, MAX_MSGS_PER_PACKET);
240     osrfLogDebug(OSRF_LOG_MARK, "parsed %d opensrf messages in this packet", numMsgs);
241
242     if(numMsgs == 0)
243         return 0;
244
245     if(numMsgs == 1) {
246         msg = msgList[0];
247         if(msg->m_type == CONNECT) {
248             trans->connectOnly = 1;
249             trans->connecting = 1;
250             return 1;
251         }
252         if(msg->m_type == DISCONNECT) {
253             trans->disconnectOnly = 1;
254             trans->disconnecting = 1;
255             return 1;
256         }
257     }
258
259     // log request messages to the activity log
260     int i;
261     for(i = 0; i < numMsgs; i++) {
262         msg = msgList[i];
263
264         switch(msg->m_type) {
265
266             case REQUEST: {
267                 jsonObject* params = msg->_params;
268                 growing_buffer* act = buffer_init(128); 
269                 buffer_fadd(act, "[%s] [%s] %s %s", trans->remoteHost, "", trans->service, msg->method_name);
270
271                 jsonObject* obj = NULL;
272                 int i = 0;
273                 char* str; 
274                 while((obj = jsonObjectGetIndex(params, i++))) {
275                     str = jsonObjectToJSON(obj);
276                     if( i == 1 )
277                         OSRF_BUFFER_ADD(act, " ");
278                     else 
279                         OSRF_BUFFER_ADD(act, ", ");
280                     OSRF_BUFFER_ADD(act, str);
281                     free(str);
282                 }
283                 osrfLogActivity(OSRF_LOG_MARK, "%s", act->buf);
284                 buffer_free(act);
285                 break;
286             }
287
288             case CONNECT:
289                 trans->connecting = 1;
290                 break;
291
292             case DISCONNECT:
293                 trans->disconnecting = 1;
294                 break;
295         }
296     }
297
298     return 1;
299 }
300
301 static int osrfHttpTranslatorCheckStatus(osrfHttpTranslator* trans, transport_message* msg) {
302     osrfMessage* omsgList[MAX_MSGS_PER_PACKET];
303     int numMsgs = osrf_message_deserialize(msg->body, omsgList, MAX_MSGS_PER_PACKET);
304     osrfLogDebug(OSRF_LOG_MARK, "parsed %d response messages", numMsgs);
305     if(numMsgs == 0) return 0;
306
307     osrfMessage* last = omsgList[numMsgs-1];
308     if(last->m_type == STATUS) {
309         if(last->status_code == OSRF_STATUS_TIMEOUT) {
310             osrfLogDebug(OSRF_LOG_MARK, "removing cached session on request timeout");
311             osrfCacheRemove(trans->thread);
312             return 0;
313         }
314         // XXX hm, check for explicit status=COMPLETE message instead??
315         if(last->status_code != OSRF_STATUS_CONTINUE)
316             trans->complete = 1;
317     }
318
319     return 1;
320 }
321
322 static void osrfHttpTranslatorInitHeaders(osrfHttpTranslator* trans, transport_message* msg) {
323     apr_table_set(trans->apreq->headers_out, OSRF_HTTP_HEADER_FROM, msg->sender);
324     apr_table_set(trans->apreq->headers_out, OSRF_HTTP_HEADER_THREAD, trans->thread);
325     if(trans->multipart) {
326         sprintf(contentTypeBuf, MULTIPART_CONTENT_TYPE, trans->delim);
327         contentTypeBuf[79] = '\0';
328         osrfLogDebug(OSRF_LOG_MARK, "content type %s : %s : %s", MULTIPART_CONTENT_TYPE, trans->delim, contentTypeBuf);
329             ap_set_content_type(trans->apreq, contentTypeBuf);
330         ap_rprintf(trans->apreq, "--%s\n", trans->delim);
331     } else {
332             ap_set_content_type(trans->apreq, JSON_CONTENT_TYPE);
333     }
334 }
335
336 /**
337  * Cache the transaction with the JID of the backend process we are talking to
338  */
339 static void osrfHttpTranslatorCacheSession(osrfHttpTranslator* trans, const char* jid) {
340     jsonObject* cacheObj = jsonNewObject(NULL);
341     jsonObjectSetKey(cacheObj, "ip", jsonNewObject(trans->remoteHost));
342     jsonObjectSetKey(cacheObj, "jid", jsonNewObject(jid));
343     jsonObjectSetKey(cacheObj, "service", jsonNewObject(trans->service));
344     osrfCachePutObject((char*) trans->thread, cacheObj, CACHE_TIME);
345 }
346
347            
348 /**
349  * Writes a single chunk of multipart/x-mixed-replace content
350  */
351 static void osrfHttpTranslatorWriteChunk(osrfHttpTranslator* trans, transport_message* msg) {
352     osrfLogInternal(OSRF_LOG_MARK, "sending multipart chunk %s", msg->body);
353     ap_rprintf(trans->apreq, 
354         "Content-type: %s\n\n%s\n\n", JSON_CONTENT_TYPE, msg->body);
355     //osrfLogInternal(OSRF_LOG_MARK, "Apache sending data: Content-type: %s\n\n%s\n\n", JSON_CONTENT_TYPE, msg->body);
356     if(trans->complete) {
357         ap_rprintf(trans->apreq, "--%s--\n", trans->delim);
358         //osrfLogInternal(OSRF_LOG_MARK, "Apache sending data: --%s--\n", trans->delim);
359     } else {
360         ap_rprintf(trans->apreq, "--%s\n", trans->delim);
361         //osrfLogInternal(OSRF_LOG_MARK, "Apache sending data: --%s\n", trans->delim);
362     }
363     ap_rflush(trans->apreq);
364 }
365
366 static int osrfHttpTranslatorProcess(osrfHttpTranslator* trans) {
367     if(trans->body == NULL)
368         return HTTP_BAD_REQUEST;
369
370     if(!osrfHttpTranslatorSetTo(trans))
371         return HTTP_BAD_REQUEST;
372
373     if(!osrfHttpTranslatorParseRequest(trans))
374         return HTTP_BAD_REQUEST;
375
376     while(client_recv(trans->handle, 0))
377         continue; // discard any old status messages in the recv queue
378
379     // send the message to the recipient
380     transport_message* tmsg = message_init(
381         trans->body, NULL, trans->thread, trans->recipient, NULL);
382     message_set_osrf_xid(tmsg, osrfLogGetXid());
383     client_send_message(trans->handle, tmsg);
384     message_free(tmsg); 
385
386     if(trans->disconnectOnly) {
387         osrfLogDebug(OSRF_LOG_MARK, "exiting early on disconnect");
388         osrfCacheRemove(trans->thread);
389         return OK;
390     }
391
392     // process the response from the opensrf service
393     int firstWrite = 1;
394     while(!trans->complete) {
395         transport_message* msg = client_recv(trans->handle, trans->timeout);
396
397         if(trans->handle->error) {
398             osrfLogError(OSRF_LOG_MARK, "Transport error");
399             osrfCacheRemove(trans->thread);
400             return HTTP_INTERNAL_SERVER_ERROR;
401         }
402
403         if(msg == NULL)
404             return HTTP_GATEWAY_TIME_OUT;
405
406         if(msg->is_error) {
407             osrfLogError(OSRF_LOG_MARK, "XMPP message resulted in error code %d", msg->error_code);
408             osrfCacheRemove(trans->thread);
409             return HTTP_NOT_FOUND;
410         }
411
412         if(!osrfHttpTranslatorCheckStatus(trans, msg))
413             continue;
414
415         if(firstWrite) {
416             osrfHttpTranslatorInitHeaders(trans, msg);
417             if(trans->connecting)
418                 osrfHttpTranslatorCacheSession(trans, msg->sender);
419             firstWrite = 0;
420         }
421
422         if(trans->multipart) {
423             osrfHttpTranslatorWriteChunk(trans, msg);
424             if(trans->connectOnly)
425                 break;
426         } else {
427             if(!trans->messages)
428                 trans->messages = osrfNewList();
429             osrfListPush(trans->messages, msg->body);
430
431             if(trans->complete || trans->connectOnly) {
432                 growing_buffer* buf = buffer_init(128);
433                 int i;
434                 OSRF_BUFFER_ADD(buf, osrfListGetIndex(trans->messages, 0));
435                 for(i = 1; i < trans->messages->size; i++) {
436                     buffer_chomp(buf); // chomp off the closing array bracket
437                     char* body = osrfListGetIndex(trans->messages, i);
438                     char newbuf[strlen(body)];
439                     sprintf(newbuf, body+1); // chomp off the opening array bracket
440                     OSRF_BUFFER_ADD_CHAR(buf, ',');
441                     OSRF_BUFFER_ADD(buf, newbuf);
442                 }
443                 
444                 ap_rputs(buf->buf, trans->apreq);
445                 buffer_free(buf);
446             }
447         }
448     }
449
450     if(trans->disconnecting) // DISCONNECT within a multi-message batch
451         osrfCacheRemove(trans->thread);
452
453     return OK;
454 }
455
456 static void testConnection(request_rec* r) {
457         if(!osrfConnected || !osrfSystemGetTransportClient()) {
458         osrfLogError(OSRF_LOG_MARK, "We're not connected to OpenSRF");
459                 ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, "We're not connected to OpenSRF");
460                 usleep(100000); // .1 second to prevent process die/start overload
461                 exit(1);
462         }
463 }
464
465 // it's dead, Jim
466 static apr_status_t childExit(void* data) {
467     osrf_system_shutdown();
468     return OK;
469 }
470
471 static void childInit(apr_pool_t *p, server_rec *s) {
472         if(!osrfSystemBootstrapClientResc(configFile, configCtx, "translator")) {
473                 ap_log_error( APLOG_MARK, APLOG_ERR, 0, s, 
474                         "Unable to Bootstrap OpenSRF Client with config %s..", configFile);
475                 return;
476         }
477
478     routerName = osrfConfigGetValue(NULL, "/router_name");
479     domainName = osrfConfigGetValue(NULL, "/domain");
480     const char* servers[] = {cacheServers};
481     osrfCacheInit(servers, 1, 86400);
482         osrfConnected = 1;
483
484     // at pool destroy time (= child exit time), cleanup
485     // XXX causes us to disconnect even for clone()'d process cleanup (as in mod_cgi)
486     //apr_pool_cleanup_register(p, NULL, childExit, apr_pool_cleanup_null);
487 }
488
489 static int handler(request_rec *r) {
490     int stat = OK;
491         if(strcmp(r->handler, MODULE_NAME)) return DECLINED;
492     if(r->header_only) return stat;
493
494         r->allowed |= (AP_METHOD_BIT << M_GET);
495         r->allowed |= (AP_METHOD_BIT << M_POST);
496
497         osrfLogSetAppname("osrf_http_translator");
498     testConnection(r);
499
500         osrfLogMkXid();
501     osrfHttpTranslator* trans = osrfNewHttpTranslator(r);
502     if(trans->body) {
503         stat = osrfHttpTranslatorProcess(trans);
504         //osrfHttpTranslatorDebug(trans);
505         osrfLogInfo(OSRF_LOG_MARK, "translator resulted in status %d", stat);
506     } else {
507         osrfLogWarning(OSRF_LOG_MARK, "no message body to process");
508     }
509     osrfHttpTranslatorFree(trans);
510         return stat;
511 }
512
513
514 static void registerHooks (apr_pool_t *p) {
515         ap_hook_handler(handler, NULL, NULL, APR_HOOK_MIDDLE);
516         ap_hook_child_init(childInit, NULL, NULL, APR_HOOK_MIDDLE);
517 }
518
519
520 module AP_MODULE_DECLARE_DATA osrf_http_translator_module = {
521         STANDARD20_MODULE_STUFF,
522     NULL,
523         NULL,
524     NULL,
525         NULL,
526     osrfHttpTranslatorCmds,
527         registerHooks,
528 };
529
530
531
532