initial C version of the opensrf http translator. not complete. I have not plugged...
[OpenSRF.git] / src / gateway / osrf_http_translator.c
1 #include <sys/time.h>
2 #include <sys/resource.h>
3 #include <unistd.h>
4 #include <strings.h>
5 #include "apachetools.h"
6 #include <opensrf/osrf_app_session.h>
7 #include <opensrf/osrf_system.h>
8 #include <opensrf/osrfConfig.h>
9 #include <opensrf/osrf_json.h>
10 #include <opensrf/osrf_cache.h>
11
12 #define MODULE_NAME "osrf_http_translator_module"
13 #define TRANSLATOR_CONFIG_FILE "OSRFTranslatorConfig"
14 #define TRANSLATOR_CONFIG_CTX "OSRFTranslatorConfigContext"
15 #define TRANSLATOR_CACHE_SERVER "OSRFTranslatorCacheServer"
16 #define DEFAULT_TRANSLATOR_CONFIG_CTX "gateway"
17 #define DEFAULT_TRANSLATOR_CONFIG_FILE "/openils/conf/opensrf_core.xml"
18 #define DEFAULT_TRANSLATOR_TIMEOUT 1200
19
20 #define MULTIPART_CONTENT_TYPE "multipart/x-mixed-replace;boundary=\"%s\""
21 #define JSON_CONTENT_TYPE "text/plain"
22 #define CACHE_TIME 300
23
24 #define OSRF_HTTP_HEADER_TO "X-OpenSRF-to"
25 #define OSRF_HTTP_HEADER_XID "X-OpenSRF-xid"
26 #define OSRF_HTTP_HEADER_FROM "X-OpenSRF-from"
27 #define OSRF_HTTP_HEADER_THREAD "X-OpenSRF-thread"
28 #define OSRF_HTTP_HEADER_TIMEOUT "X-OpenSRF-timeout"
29 #define OSRF_HTTP_HEADER_SERVICE "X-OpenSRF-service"
30 #define OSRF_HTTP_HEADER_MULTIPART "X-OpenSRF-multipart"
31 #define MAX_MSGS_PER_PACKET 256
32
33 char* configFile = DEFAULT_TRANSLATOR_CONFIG_FILE;
34 char* configCtx = DEFAULT_TRANSLATOR_CONFIG_CTX;
35 char* routerName = NULL;
36 char* domainName = NULL;
37 int osrfConnected = 0;
38 char recipientBuf[128];
39
40 // for development only, writes to apache error log
41 static void _dbg(char* s, ...) {
42     VA_LIST_TO_STRING(s);
43     fprintf(stderr, "%s\n", VA_BUF);
44     fflush(stderr);
45 }
46
47 // Translator struct
48 typedef struct {
49     request_rec* apreq;
50     transport_client* handle;
51     osrfList* messages;
52     char* body;
53     char* delim;
54     const char* recipient;
55     const char* service;
56     const char* thread;
57     const char* remoteHost;
58     int complete;
59     int timeout;
60     int multipart;
61     int connectOnly;
62     int disconnectOnly;
63     int localXid;
64 } osrfHttpTranslator;
65
66 /*
67  * Constructs a new translator object based on the current apache 
68  * request_rec.  Reads the request body and headers.
69  */
70 static osrfHttpTranslator* osrfNewHttpTranslator(request_rec* apreq) {
71     osrfHttpTranslator* trans;
72     OSRF_MALLOC(trans, sizeof(trans));
73     trans->apreq = apreq;
74     trans->complete = 0;
75     trans->connectOnly = 0;
76     trans->disconnectOnly = 0;
77     trans->remoteHost = apreq->connection->remote_ip;
78     trans->messages = NULL;
79
80     /* load the message body */
81         osrfStringArray* params = apacheParseParms(apreq);
82     trans->body = apacheGetFirstParamValue(params, "osrf-msg");
83     osrfStringArrayFree(params);
84
85     /* load the request headers */
86     if (apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_XID) ) {
87         trans->localXid = 0;
88     } else {
89         trans->localXid = 1;
90     }
91
92     trans->handle = osrfSystemGetTransportClient();
93     trans->recipient = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_TO);
94     trans->service = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_SERVICE);
95     trans->thread = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_THREAD); /* XXX create thread if necessary */
96
97     const char* timeout = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_TIMEOUT);
98     if(timeout) 
99         trans->timeout = atoi(timeout);
100     else 
101         trans->timeout = DEFAULT_TRANSLATOR_TIMEOUT;
102
103     const char* multipart = apr_table_get(apreq->headers_in, OSRF_HTTP_HEADER_MULTIPART);
104     if(multipart && !strcasecmp(multipart, "true"))
105         trans->multipart = 1;
106     else
107         trans->multipart = 0;
108
109     char buf[32];
110     snprintf(buf, sizeof(buf), "%d%ld", getpid(), time(NULL));
111     trans->delim = md5sum(buf);
112
113     return trans;
114 }
115
116 static void osrfHttpTranslatorFree(osrfHttpTranslator* trans) {
117     if(!trans) return;
118     if(trans->body)
119         free(trans->body);
120     if(trans->delim)
121         free(trans->delim);
122     osrfListFree(trans->messages);
123     //free(trans); // why does freeing this cause Apache to croak??
124 }
125
126 static void osrfHttpTranslatorDebug(osrfHttpTranslator* trans) {
127     _dbg("-----------------------------------");
128     _dbg("body = %s", trans->body);
129     _dbg("service = %s", trans->service);
130     _dbg("thread = %s", trans->thread);
131     _dbg("multipart = %d", trans->multipart);
132     _dbg("recipient = %s", trans->recipient);
133 }
134
135 /**
136  * Determines the correct recipient address based on the requested 
137  * service or recipient address.  
138  */
139 static int osrfHttpTranslatorSetTo(osrfHttpTranslator* trans) {
140     int stat = 0;
141     jsonObject* sessionCache = NULL;
142
143     if(trans->service) {
144         if(trans->recipient) {
145             osrfLogError(OSRF_LOG_MARK, "Specifying both SERVICE and TO are not allowed");
146
147         } else {
148             // service is specified, build a recipient address 
149             // from the router, domain, and service
150             int size = snprintf(recipientBuf, 128, "%s@%s/%s", routerName, domainName, trans->service);
151             recipientBuf[size] = '\0';
152             osrfLogDebug(OSRF_LOG_MARK, "Set recipient to %s", recipientBuf);
153             trans->recipient = recipientBuf;
154             stat = 1;
155         }
156
157     } else {
158
159         if(trans->recipient) {
160             sessionCache = osrfCacheGetObject(trans->thread);
161
162             if(sessionCache) {
163                 char* ipAddr = jsonObjectGetString(jsonObjectGetKey(sessionCache, "ip"));
164                 char* recipient = jsonObjectGetString(jsonObjectGetKey(sessionCache, "jid"));
165
166                 // choosing a specific recipient address requires that the recipient and 
167                 // thread be cached on the server (so drone processes cannot be hijacked)
168                 if(!strcmp(ipAddr, trans->remoteHost) && !strcmp(recipient, trans->recipient)) {
169                     osrfLogDebug(OSRF_LOG_MARK, "Found cached session from host %s and recipient %s", 
170                         trans->remoteHost, trans->recipient);
171                     jsonObjectFree(sessionCache);
172                     stat = 1;
173                      
174                     // XXX free me
175                     trans->service = jsonObjectGetString(jsonObjectGetKey(sessionCache, "service"));
176
177                 } else {
178                     osrfLogError(OSRF_LOG_MARK, 
179                         "Session cache for thread %s does not match request", trans->thread);
180                 }
181
182                 free(ipAddr);
183                 free(recipient);
184
185             }  else {
186                 osrfLogError(OSRF_LOG_MARK, 
187                     "attempt to send directly to %s without a session", trans->recipient);
188             }
189         } else {
190             osrfLogError(OSRF_LOG_MARK, "No SERVICE or RECIPIENT defined");
191         } 
192     }
193
194     jsonObjectFree(sessionCache);
195     return stat;
196 }
197
198 /**
199  * Parses the request body and logs any REQUEST messages to the activity log
200  */
201 static int osrfHttpTranslatorParseRequest(osrfHttpTranslator* trans) {
202     osrfMessage* msg;
203     osrfMessage* msgList[MAX_MSGS_PER_PACKET];
204     int numMsgs = osrf_message_deserialize(trans->body, msgList, MAX_MSGS_PER_PACKET);
205     osrfLogDebug(OSRF_LOG_MARK, "parsed %d opensrf messages in this packet", numMsgs);
206
207     if(numMsgs == 0)
208         return 0;
209
210     if(numMsgs == 1) {
211         msg = msgList[0];
212         if(msg->m_type == CONNECT) {
213             trans->connectOnly = 1;
214             return 1;
215         }
216         if(msg->m_type == DISCONNECT) {
217             trans->disconnectOnly = 1;
218             return 1;
219         }
220     }
221
222     // log request messages to the activity log
223     int i;
224     for(i = 0; i < numMsgs; i++) {
225         msg = msgList[i];
226         if(msg->m_type == REQUEST) {
227
228             jsonObject* params = msg->_params;
229             growing_buffer* act = buffer_init(128);     
230             buffer_fadd(act, "[%s] [%s] %s %s", trans->remoteHost, "", trans->service, msg->method_name);
231
232             char* str; 
233             int i = 0;
234             while((str = jsonObjectGetString(jsonObjectGetIndex(params, i++)))) {
235                 if( i == 1 )
236                     OSRF_BUFFER_ADD(act, " ");
237                 else 
238                     OSRF_BUFFER_ADD(act, ", ");
239                 OSRF_BUFFER_ADD(act, str);
240             }
241             osrfLogActivity(OSRF_LOG_MARK, act->buf);
242             buffer_free(act);
243         }
244     }
245
246     return 1;
247 }
248
249 static int osrfHttpTranslatorCheckStatus(osrfHttpTranslator* trans, transport_message* msg) {
250     osrfMessage* omsgList[MAX_MSGS_PER_PACKET];
251     int numMsgs = osrf_message_deserialize(msg->body, omsgList, MAX_MSGS_PER_PACKET);
252     osrfLogDebug(OSRF_LOG_MARK, "parsed %d response messages", numMsgs);
253     if(numMsgs == 0) return 0;
254
255     osrfMessage* last = omsgList[numMsgs-1];
256     if(last->m_type == STATUS) {
257         if(last->status_code == OSRF_STATUS_TIMEOUT) {
258             osrfLogDebug(OSRF_LOG_MARK, "removing cached session on request timeout");
259             osrfCacheRemove(trans->thread);
260             return 0;
261         }
262
263         if(last->status_code != OSRF_STATUS_CONTINUE)
264             trans->complete = 1;
265     }
266
267     return 1;
268 }
269
270 static void osrfHttpTranslatorInitHeaders(osrfHttpTranslator* trans, transport_message* msg) {
271     apr_table_set(trans->apreq->headers_out, OSRF_HTTP_HEADER_FROM, msg->sender);
272     apr_table_set(trans->apreq->headers_out, OSRF_HTTP_HEADER_THREAD, trans->thread);
273     if(trans->multipart) {
274         char buf[strlen(MULTIPART_CONTENT_TYPE) + strlen(trans->delim) + 1];
275         sprintf(buf, MULTIPART_CONTENT_TYPE, trans->delim);
276             ap_set_content_type(trans->apreq, buf);
277         ap_rvputs(trans->apreq, "--%s\n", trans->delim);
278     } else {
279             ap_set_content_type(trans->apreq, JSON_CONTENT_TYPE);
280     }
281 }
282
283 static void osrfHttpTranslatorCacheSession(osrfHttpTranslator* trans) {
284     jsonObject* cacheObj = jsonNewObject(NULL);
285     jsonObjectSetKey(cacheObj, "ip", jsonNewObject(trans->remoteHost));
286     jsonObjectSetKey(cacheObj, "jid", jsonNewObject(trans->recipient));
287     jsonObjectSetKey(cacheObj, "service", jsonNewObject(trans->service));
288     osrfCachePutObject((char*) trans->thread, cacheObj, CACHE_TIME);
289 }
290
291            
292 /**
293  * Writes a single chunk of multipart/x-mixed-replace content
294  */
295 static void osrfHttpTranslatorWriteChunk(osrfHttpTranslator* trans, transport_message* msg) {
296     ap_rvputs(trans->apreq, 
297         "Content-type: %s\n\n%s\n\n", JSON_CONTENT_TYPE, msg->body);
298     if(trans->complete)
299         ap_rvputs(trans->apreq, "--%s--\n", trans->delim);
300     else
301         ap_rvputs(trans->apreq, "--%s\n", trans->delim);
302     ap_rflush(trans->apreq);
303 }
304
305 static int osrfHttpTranslatorProcess(osrfHttpTranslator* trans) {
306     if(trans->body == NULL)
307         return HTTP_BAD_REQUEST;
308
309     if(!osrfHttpTranslatorSetTo(trans))
310         return HTTP_BAD_REQUEST;
311
312     if(!osrfHttpTranslatorParseRequest(trans))
313         return HTTP_BAD_REQUEST;
314
315     while(client_recv(trans->handle, 0))
316         continue; // discard any old status messages in the recv queue
317
318     // send the message to the recipient
319     transport_message* tmsg = message_init(
320         trans->body, NULL, trans->thread, trans->recipient, NULL);
321     client_send_message(trans->handle, tmsg);
322     message_free(tmsg);
323
324     if(trans->disconnectOnly) {
325         osrfLogDebug(OSRF_LOG_MARK, "exiting early on disconnect");
326         return OK;
327     }
328
329     // process the response from the opensrf service
330     int firstWrite = 1;
331     while(!trans->complete) {
332         osrfLogDebug(OSRF_LOG_MARK, "1");
333         transport_message* msg = client_recv(trans->handle, trans->timeout);
334         osrfLogDebug(OSRF_LOG_MARK, "2");
335
336         if(trans->handle->error) {
337             osrfLogError(OSRF_LOG_MARK, "Transport error");
338             return HTTP_NOT_FOUND;
339         }
340
341         if(msg == NULL)
342             return HTTP_GATEWAY_TIME_OUT;
343
344         if(msg->is_error) {
345             osrfLogError(OSRF_LOG_MARK, "XMPP message resulted in error code %d", msg->error_code);
346             return HTTP_NOT_FOUND;
347         }
348
349         if(!osrfHttpTranslatorCheckStatus(trans, msg))
350             continue;
351
352         if(firstWrite) {
353             osrfHttpTranslatorInitHeaders(trans, msg);
354             osrfHttpTranslatorCacheSession(trans);
355             firstWrite = 0;
356         }
357
358         if(trans->multipart) {
359             osrfHttpTranslatorWriteChunk(trans, msg);
360             if(trans->connectOnly)
361                 break;
362         } else {
363             if(!trans->messages)
364                 trans->messages = osrfNewList();
365             osrfListPush(trans->messages, msg->body);
366
367             if(trans->complete || trans->connectOnly) {
368                 growing_buffer* buf = buffer_init(128);
369                 int i;
370                 OSRF_BUFFER_ADD(buf, osrfListGetIndex(trans->messages, 0));
371                 for(i = 1; i < trans->messages->size; i++) {
372                     // yay! string mangling
373                 }
374             }
375         }
376     }
377
378     return OK;
379 }
380
381 static void testConnection(request_rec* r) {
382         if(!osrfConnected || !osrfSystemGetTransportClient()) {
383                 ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, "We're not connected to OpenSRF");
384                 usleep(100000); // .1 second to prevent process die/start overload
385                 exit(1);
386         }
387 }
388
389 // it's dead, Jim
390 static apr_status_t childExit(void* data) {
391     osrf_system_shutdown();
392     return OK;
393 }
394
395 static void childInit(apr_pool_t *p, server_rec *s) {
396         if(!osrfSystemBootstrapClientResc(configFile, configCtx, "translator")) {
397                 ap_log_error( APLOG_MARK, APLOG_ERR, 0, s, 
398                         "Unable to Bootstrap OpenSRF Client with config %s..", configFile);
399                 return;
400         }
401
402     routerName = osrfConfigGetValue(NULL, "/router_name");
403     domainName = osrfConfigGetValue(NULL, "/domain");
404     // ---------------------
405     // XXX initialize the cache from the Apache settings
406     const char* servers[] = {"127.0.0.1:11211"};
407     osrfCacheInit(servers, 1, 86400);
408     // ---------------------
409         osrfConnected = 1;
410
411     // at pool destroy time (= child exit time), cleanup
412     apr_pool_cleanup_register(p, NULL, childExit, NULL);
413 }
414
415 static int handler(request_rec *r) {
416     int stat = OK;
417         if(strcmp(r->handler, MODULE_NAME)) return DECLINED;
418     if(r->header_only) return stat;
419
420         r->allowed |= (AP_METHOD_BIT << M_GET);
421         r->allowed |= (AP_METHOD_BIT << M_POST);
422
423         osrfLogSetAppname("osrf_http_translator");
424     testConnection(r);
425
426     osrfHttpTranslator* trans = osrfNewHttpTranslator(r);
427     if(trans->body) {
428         stat = osrfHttpTranslatorProcess(trans);
429         osrfHttpTranslatorDebug(trans);
430     
431         osrfLogInfo(OSRF_LOG_MARK, "translator resulted in status %d", stat);
432     
433         ap_rputs(trans->body, r);
434     } else {
435         osrfLogWarning(OSRF_LOG_MARK, "no message body to process");
436     }
437     osrfHttpTranslatorFree(trans);
438         return stat;
439 }
440
441
442 static void registerHooks (apr_pool_t *p) {
443         ap_hook_handler(handler, NULL, NULL, APR_HOOK_MIDDLE);
444         ap_hook_child_init(childInit, NULL, NULL, APR_HOOK_MIDDLE);
445 }
446
447
448 module AP_MODULE_DECLARE_DATA osrf_http_translator_module = {
449         STANDARD20_MODULE_STUFF,
450     NULL,
451         NULL,
452     NULL,
453         NULL,
454     NULL,
455         registerHooks,
456 };
457
458
459
460