set log xid on outbound messages
[OpenSRF.git] / src / gateway / osrf_http_translator.c
1 #include <sys/time.h>
2 #include <sys/resource.h>
3 #include <unistd.h>
4 #include <strings.h>
5 #include "apachetools.h"
6 #include <opensrf/osrf_app_session.h>
7 #include <opensrf/osrf_system.h>
8 #include <opensrf/osrfConfig.h>
9 #include <opensrf/osrf_json.h>
10 #include <opensrf/osrf_cache.h>
11
12 #define MODULE_NAME "osrf_http_translator_module"
13 #define OSRF_TRANSLATOR_CONFIG_FILE "OSRFTranslatorConfig"
14 #define OSRF_TRANSLATOR_CONFIG_CTX "OSRFTranslatorConfigContext"
15 #define OSRF_TRANSLATOR_CACHE_SERVER "OSRFTranslatorCacheServer"
16
17 #define DEFAULT_TRANSLATOR_CONFIG_CTX "gateway"
18 #define DEFAULT_TRANSLATOR_CONFIG_FILE "/openils/conf/opensrf_core.xml"
19 #define DEFAULT_TRANSLATOR_TIMEOUT 1200
20 #define DEFAULT_TRANSLATOR_CACHE_SERVERS "127.0.0.1:11211"
21
22 #define MULTIPART_CONTENT_TYPE "multipart/x-mixed-replace;boundary=\"%s\""
23 #define JSON_CONTENT_TYPE "text/plain"
24 #define MAX_MSGS_PER_PACKET 256
25 #define CACHE_TIME 300
26
27 #define OSRF_HTTP_HEADER_TO "X-OpenSRF-to"
28 #define OSRF_HTTP_HEADER_XID "X-OpenSRF-xid"
29 #define OSRF_HTTP_HEADER_FROM "X-OpenSRF-from"
30 #define OSRF_HTTP_HEADER_THREAD "X-OpenSRF-thread"
31 #define OSRF_HTTP_HEADER_TIMEOUT "X-OpenSRF-timeout"
32 #define OSRF_HTTP_HEADER_SERVICE "X-OpenSRF-service"
33 #define OSRF_HTTP_HEADER_MULTIPART "X-OpenSRF-multipart"
34
35 char* configFile = DEFAULT_TRANSLATOR_CONFIG_FILE;
36 char* configCtx = DEFAULT_TRANSLATOR_CONFIG_CTX;
37 char* cacheServers = DEFAULT_TRANSLATOR_CACHE_SERVERS;
38
39 char* routerName = NULL;
40 char* domainName = NULL;
41 int osrfConnected = 0;
42 char recipientBuf[128];
43 char contentTypeBuf[80];
44
45 // for development only, writes to apache error log
46 static void _dbg(char* s, ...) {
47     VA_LIST_TO_STRING(s);
48     fprintf(stderr, "%s\n", VA_BUF);
49     fflush(stderr);
50 }
51
52 // Translator struct
53 typedef struct {
54     request_rec* apreq;
55     transport_client* handle;
56     osrfList* messages;
57     char* body;
58     char* delim;
59     const char* recipient;
60     const char* service;
61     const char* thread;
62     const char* remoteHost;
63     int complete;
64     int timeout;
65     int multipart;
66     int connectOnly;
67     int disconnectOnly;
68     int localXid;
69 } osrfHttpTranslator;
70
71
72 static const char* osrfHttpTranslatorGetConfigFile(cmd_parms *parms, void *config, const char *arg) {
73     configFile = (char*) arg;
74         return NULL;
75 }
76 static const char* osrfHttpTranslatorGetConfigFileCtx(cmd_parms *parms, void *config, const char *arg) {
77     configCtx = (char*) arg;
78         return NULL;
79 }
80 static const char* osrfHttpTranslatorGetCacheServer(cmd_parms *parms, void *config, const char *arg) {
81     cacheServers = (char*) arg;
82         return NULL;
83 }
84
85 /** set up the configuratoin handlers */
86 static const command_rec osrf_json_gateway_cmds[] = {
87         AP_INIT_TAKE1( OSRF_TRANSLATOR_CONFIG_FILE, osrfHttpTranslatorGetConfigFile,
88                         NULL, RSRC_CONF, "osrf translator config file"),
89         AP_INIT_TAKE1( OSRF_TRANSLATOR_CONFIG_CTX, osrfHttpTranslatorGetConfigFileCtx,
90                         NULL, RSRC_CONF, "osrf translator config file context"),
91         AP_INIT_TAKE1( OSRF_TRANSLATOR_CACHE_SERVER, osrfHttpTranslatorGetCacheServer,
92                         NULL, RSRC_CONF, "osrf translator cache server"),
93     {NULL}
94 };
95
96
97 // there can only be one, so use a global static one
98 static osrfHttpTranslator globalTranslator;
99
100 /*
101  * Constructs a new translator object based on the current apache 
102  * request_rec.  Reads the request body and headers.
103  */
104 static osrfHttpTranslator* osrfNewHttpTranslator(request_rec* apreq) {
105     osrfHttpTranslator* trans = &globalTranslator;
106     trans->apreq = apreq;
107     trans->complete = 0;
108     trans->connectOnly = 0;
109     trans->disconnectOnly = 0;
110     trans->remoteHost = apreq->connection->remote_ip;
111     trans->messages = NULL;
112
113     /* load the message body */
114         osrfStringArray* params = apacheParseParms(apreq);
115     trans->body = apacheGetFirstParamValue(params, "osrf-msg");
116     osrfStringArrayFree(params);
117
118     /* load the request headers */
119     if (apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_XID)) // force our log xid to match the caller
120             osrfLogForceXid(strdup(apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_XID)));
121
122     trans->handle = osrfSystemGetTransportClient();
123     trans->recipient = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_TO);
124     trans->service = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_SERVICE);
125     trans->thread = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_THREAD); /* XXX create thread if necessary */
126
127     const char* timeout = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_TIMEOUT);
128     if(timeout) 
129         trans->timeout = atoi(timeout);
130     else 
131         trans->timeout = DEFAULT_TRANSLATOR_TIMEOUT;
132
133     const char* multipart = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_MULTIPART);
134     if(multipart && !strcasecmp(multipart, "true"))
135         trans->multipart = 1;
136     else
137         trans->multipart = 0;
138
139     char buf[32];
140     snprintf(buf, sizeof(buf), "%d%ld", getpid(), time(NULL));
141     trans->delim = md5sum(buf);
142
143     return trans;
144 }
145
146 static void osrfHttpTranslatorFree(osrfHttpTranslator* trans) {
147     if(!trans) return;
148     if(trans->body)
149         free(trans->body);
150     if(trans->delim)
151         free(trans->delim);
152     osrfListFree(trans->messages);
153 }
154
155 static void osrfHttpTranslatorDebug(osrfHttpTranslator* trans) {
156     _dbg("-----------------------------------");
157     _dbg("body = %s", trans->body);
158     _dbg("service = %s", trans->service);
159     _dbg("thread = %s", trans->thread);
160     _dbg("multipart = %d", trans->multipart);
161     _dbg("recipient = %s", trans->recipient);
162 }
163
164 /**
165  * Determines the correct recipient address based on the requested 
166  * service or recipient address.  
167  */
168 static int osrfHttpTranslatorSetTo(osrfHttpTranslator* trans) {
169     int stat = 0;
170     jsonObject* sessionCache = NULL;
171
172     if(trans->service) {
173         if(trans->recipient) {
174             osrfLogError(OSRF_LOG_MARK, "Specifying both SERVICE and TO are not allowed");
175
176         } else {
177             // service is specified, build a recipient address 
178             // from the router, domain, and service
179             int size = snprintf(recipientBuf, 128, "%s@%s/%s", routerName, domainName, trans->service);
180             recipientBuf[size] = '\0';
181             osrfLogDebug(OSRF_LOG_MARK, "Set recipient to %s", recipientBuf);
182             trans->recipient = recipientBuf;
183             stat = 1;
184         }
185
186     } else {
187
188         if(trans->recipient) {
189             sessionCache = osrfCacheGetObject(trans->thread);
190
191             if(sessionCache) {
192                 char* ipAddr = jsonObjectGetString(jsonObjectGetKey(sessionCache, "ip"));
193                 char* recipient = jsonObjectGetString(jsonObjectGetKey(sessionCache, "jid"));
194
195                 // choosing a specific recipient address requires that the recipient and 
196                 // thread be cached on the server (so drone processes cannot be hijacked)
197                 if(!strcmp(ipAddr, trans->remoteHost) && !strcmp(recipient, trans->recipient)) {
198                     osrfLogDebug(OSRF_LOG_MARK, "Found cached session from host %s and recipient %s", 
199                         trans->remoteHost, trans->recipient);
200                     stat = 1;
201                     trans->service = jsonObjectGetString(jsonObjectGetKey(sessionCache, "service"));
202
203                 } else {
204                     osrfLogError(OSRF_LOG_MARK, 
205                         "Session cache for thread %s does not match request", trans->thread);
206                 }
207             }  else {
208                 osrfLogError(OSRF_LOG_MARK, 
209                     "attempt to send directly to %s without a session", trans->recipient);
210             }
211         } else {
212             osrfLogError(OSRF_LOG_MARK, "No SERVICE or RECIPIENT defined");
213         } 
214     }
215
216     jsonObjectFree(sessionCache);
217     return stat;
218 }
219
220 /**
221  * Parses the request body and logs any REQUEST messages to the activity log
222  */
223 static int osrfHttpTranslatorParseRequest(osrfHttpTranslator* trans) {
224     osrfMessage* msg;
225     osrfMessage* msgList[MAX_MSGS_PER_PACKET];
226     int numMsgs = osrf_message_deserialize(trans->body, msgList, MAX_MSGS_PER_PACKET);
227     osrfLogDebug(OSRF_LOG_MARK, "parsed %d opensrf messages in this packet", numMsgs);
228
229     if(numMsgs == 0)
230         return 0;
231
232     if(numMsgs == 1) {
233         msg = msgList[0];
234         if(msg->m_type == CONNECT) {
235             trans->connectOnly = 1;
236             return 1;
237         }
238         if(msg->m_type == DISCONNECT) {
239             trans->disconnectOnly = 1;
240             return 1;
241         }
242     }
243
244     // log request messages to the activity log
245     int i;
246     for(i = 0; i < numMsgs; i++) {
247         msg = msgList[i];
248         if(msg->m_type == REQUEST) {
249
250             jsonObject* params = msg->_params;
251             growing_buffer* act = buffer_init(128);     
252             buffer_fadd(act, "[%s] [%s] %s %s", trans->remoteHost, "", trans->service, msg->method_name);
253
254             char* str; 
255             int i = 0;
256             while((str = jsonObjectGetString(jsonObjectGetIndex(params, i++)))) {
257                 if( i == 1 )
258                     OSRF_BUFFER_ADD(act, " ");
259                 else 
260                     OSRF_BUFFER_ADD(act, ", ");
261                 OSRF_BUFFER_ADD(act, str);
262             }
263             osrfLogActivity(OSRF_LOG_MARK, act->buf);
264             buffer_free(act);
265         }
266     }
267
268     return 1;
269 }
270
271 static int osrfHttpTranslatorCheckStatus(osrfHttpTranslator* trans, transport_message* msg) {
272     osrfMessage* omsgList[MAX_MSGS_PER_PACKET];
273     int numMsgs = osrf_message_deserialize(msg->body, omsgList, MAX_MSGS_PER_PACKET);
274     osrfLogDebug(OSRF_LOG_MARK, "parsed %d response messages", numMsgs);
275     if(numMsgs == 0) return 0;
276
277     osrfMessage* last = omsgList[numMsgs-1];
278     if(last->m_type == STATUS) {
279         if(last->status_code == OSRF_STATUS_TIMEOUT) {
280             osrfLogDebug(OSRF_LOG_MARK, "removing cached session on request timeout");
281             osrfCacheRemove(trans->thread);
282             return 0;
283         }
284         // XXX hm, check for explicit status=COMPLETE message instead??
285         if(last->status_code != OSRF_STATUS_CONTINUE)
286             trans->complete = 1;
287     }
288
289     return 1;
290 }
291
292 static void osrfHttpTranslatorInitHeaders(osrfHttpTranslator* trans, transport_message* msg) {
293     apr_table_set(trans->apreq->headers_out, OSRF_HTTP_HEADER_FROM, msg->sender);
294     apr_table_set(trans->apreq->headers_out, OSRF_HTTP_HEADER_THREAD, trans->thread);
295     if(trans->multipart) {
296         sprintf(contentTypeBuf, MULTIPART_CONTENT_TYPE, trans->delim);
297         contentTypeBuf[79] = '\0';
298         osrfLogDebug(OSRF_LOG_MARK, "content type %s : %s : %s", MULTIPART_CONTENT_TYPE, trans->delim, contentTypeBuf);
299             ap_set_content_type(trans->apreq, contentTypeBuf);
300         ap_rprintf(trans->apreq, "--%s\n", trans->delim);
301     } else {
302             ap_set_content_type(trans->apreq, JSON_CONTENT_TYPE);
303     }
304 }
305
306 static void osrfHttpTranslatorCacheSession(osrfHttpTranslator* trans) {
307     jsonObject* cacheObj = jsonNewObject(NULL);
308     jsonObjectSetKey(cacheObj, "ip", jsonNewObject(trans->remoteHost));
309     jsonObjectSetKey(cacheObj, "jid", jsonNewObject(trans->recipient));
310     jsonObjectSetKey(cacheObj, "service", jsonNewObject(trans->service));
311     osrfCachePutObject((char*) trans->thread, cacheObj, CACHE_TIME);
312 }
313
314            
315 /**
316  * Writes a single chunk of multipart/x-mixed-replace content
317  */
318 static void osrfHttpTranslatorWriteChunk(osrfHttpTranslator* trans, transport_message* msg) {
319     ap_rprintf(trans->apreq, 
320         "Content-type: %s\n\n%s\n\n", JSON_CONTENT_TYPE, msg->body);
321     if(trans->complete)
322         ap_rprintf(trans->apreq, "--%s--\n", trans->delim);
323     else
324         ap_rprintf(trans->apreq, "--%s\n", trans->delim);
325     ap_rflush(trans->apreq);
326 }
327
328 static int osrfHttpTranslatorProcess(osrfHttpTranslator* trans) {
329     if(trans->body == NULL)
330         return HTTP_BAD_REQUEST;
331
332     if(!osrfHttpTranslatorSetTo(trans))
333         return HTTP_BAD_REQUEST;
334
335     if(!osrfHttpTranslatorParseRequest(trans))
336         return HTTP_BAD_REQUEST;
337
338     while(client_recv(trans->handle, 0))
339         continue; // discard any old status messages in the recv queue
340
341     // send the message to the recipient
342     transport_message* tmsg = message_init(
343         trans->body, NULL, trans->thread, trans->recipient, NULL);
344     message_set_osrf_xid(tmsg, osrfLogGetXid());
345     client_send_message(trans->handle, tmsg);
346     message_free(tmsg); 
347
348     if(trans->disconnectOnly) {
349         osrfLogDebug(OSRF_LOG_MARK, "exiting early on disconnect");
350         return OK;
351     }
352
353     // process the response from the opensrf service
354     int firstWrite = 1;
355     while(!trans->complete) {
356         transport_message* msg = client_recv(trans->handle, trans->timeout);
357
358         if(trans->handle->error) {
359             osrfLogError(OSRF_LOG_MARK, "Transport error");
360             return HTTP_INTERNAL_SERVER_ERROR;
361         }
362
363         if(msg == NULL)
364             return HTTP_GATEWAY_TIME_OUT;
365
366         if(msg->is_error) {
367             osrfLogError(OSRF_LOG_MARK, "XMPP message resulted in error code %d", msg->error_code);
368             return HTTP_NOT_FOUND;
369         }
370
371         if(!osrfHttpTranslatorCheckStatus(trans, msg))
372             continue;
373
374         if(firstWrite) {
375             osrfHttpTranslatorInitHeaders(trans, msg);
376             osrfHttpTranslatorCacheSession(trans);
377             firstWrite = 0;
378         }
379
380         if(trans->multipart) {
381             osrfHttpTranslatorWriteChunk(trans, msg);
382             if(trans->connectOnly)
383                 break;
384         } else {
385             if(!trans->messages)
386                 trans->messages = osrfNewList();
387             osrfListPush(trans->messages, msg->body);
388
389             if(trans->complete || trans->connectOnly) {
390                 growing_buffer* buf = buffer_init(128);
391                 int i;
392                 OSRF_BUFFER_ADD(buf, osrfListGetIndex(trans->messages, 0));
393                 for(i = 1; i < trans->messages->size; i++) {
394                     buffer_chomp(buf); // chomp off the closing array bracket
395                     char* body = osrfListGetIndex(trans->messages, i);
396                     char newbuf[strlen(body)];
397                     sprintf(newbuf, body+1); // chomp off the opening array bracket
398                     OSRF_BUFFER_ADD_CHAR(buf, ',');
399                     OSRF_BUFFER_ADD(buf, newbuf);
400                 }
401                 
402                 ap_rputs(buf->buf, trans->apreq);
403                 buffer_free(buf);
404             }
405         }
406     }
407
408     return OK;
409 }
410
411 static void testConnection(request_rec* r) {
412         if(!osrfConnected || !osrfSystemGetTransportClient()) {
413         osrfLogError(OSRF_LOG_MARK, "We're not connected to OpenSRF");
414                 ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, "We're not connected to OpenSRF");
415                 usleep(100000); // .1 second to prevent process die/start overload
416                 exit(1);
417         }
418 }
419
420 // it's dead, Jim
421 static apr_status_t childExit(void* data) {
422     osrf_system_shutdown();
423     return OK;
424 }
425
426 static void childInit(apr_pool_t *p, server_rec *s) {
427         if(!osrfSystemBootstrapClientResc(configFile, configCtx, "translator")) {
428                 ap_log_error( APLOG_MARK, APLOG_ERR, 0, s, 
429                         "Unable to Bootstrap OpenSRF Client with config %s..", configFile);
430                 return;
431         }
432
433     routerName = osrfConfigGetValue(NULL, "/router_name");
434     domainName = osrfConfigGetValue(NULL, "/domain");
435     const char* servers[] = {cacheServers};
436     osrfCacheInit(servers, 1, 86400);
437         osrfConnected = 1;
438
439     // at pool destroy time (= child exit time), cleanup
440     apr_pool_cleanup_register(p, NULL, childExit, NULL);
441 }
442
443 static int handler(request_rec *r) {
444     int stat = OK;
445         if(strcmp(r->handler, MODULE_NAME)) return DECLINED;
446     if(r->header_only) return stat;
447
448         r->allowed |= (AP_METHOD_BIT << M_GET);
449         r->allowed |= (AP_METHOD_BIT << M_POST);
450
451         osrfLogSetAppname("osrf_http_translator");
452     testConnection(r);
453
454     osrfHttpTranslator* trans = osrfNewHttpTranslator(r);
455         osrfLogMkXid();
456     if(trans->body) {
457         stat = osrfHttpTranslatorProcess(trans);
458         //osrfHttpTranslatorDebug(trans);
459         osrfLogInfo(OSRF_LOG_MARK, "translator resulted in status %d", stat);
460     } else {
461         osrfLogWarning(OSRF_LOG_MARK, "no message body to process");
462     }
463     osrfHttpTranslatorFree(trans);
464         return stat;
465 }
466
467
468 static void registerHooks (apr_pool_t *p) {
469         ap_hook_handler(handler, NULL, NULL, APR_HOOK_MIDDLE);
470         ap_hook_child_init(childInit, NULL, NULL, APR_HOOK_MIDDLE);
471 }
472
473
474 module AP_MODULE_DECLARE_DATA osrf_http_translator_module = {
475         STANDARD20_MODULE_STUFF,
476     NULL,
477         NULL,
478     NULL,
479         NULL,
480     NULL,
481         registerHooks,
482 };
483
484
485
486