665cbc2b67958379459fa7c368fe4c6da1661847
[working/Evergreen.git] / Open-ILS / src / c-apps / oils_cstore.c
1 /**
2         @file oils_cstore.c
3         @brief As a server, perform database operations at the request of clients.
4 */
5
6 #include <ctype.h>
7 #include "opensrf/osrf_application.h"
8 #include "opensrf/osrf_settings.h"
9 #include "opensrf/osrf_message.h"
10 #include "opensrf/utils.h"
11 #include "opensrf/osrf_json.h"
12 #include "opensrf/log.h"
13 #include "openils/oils_utils.h"
14 #include "openils/oils_constants.h"
15 #include <dbi/dbi.h>
16
17 #include <time.h>
18 #include <stdlib.h>
19 #include <string.h>
20 #include <unistd.h>
21
22 #ifdef RSTORE
23 #  define MODULENAME "open-ils.reporter-store"
24 #  define ENFORCE_PCRUD 0
25 #else
26 #  ifdef PCRUD
27 #    define MODULENAME "open-ils.pcrud"
28 #    define ENFORCE_PCRUD 1
29 #  else
30 #    define MODULENAME "open-ils.cstore"
31 #    define ENFORCE_PCRUD 0
32 #  endif
33 #endif
34
35 // The next four macros are OR'd together as needed to form a set
36 // of bitflags.  SUBCOMBO enables an extra pair of parentheses when
37 // nesting one UNION, INTERSECT or EXCEPT inside another.
38 // SUBSELECT tells us we're in a subquery, so don't add the
39 // terminal semicolon yet.
40 #define SUBCOMBO    8
41 #define SUBSELECT   4
42 #define DISABLE_I18N    2
43 #define SELECT_DISTINCT 1
44
45 #define AND_OP_JOIN     0
46 #define OR_OP_JOIN      1
47
48 struct ClassInfoStruct;
49 typedef struct ClassInfoStruct ClassInfo;
50
51 #define ALIAS_STORE_SIZE 16
52 #define CLASS_NAME_STORE_SIZE 16
53
54 struct ClassInfoStruct {
55         char* alias;
56         char* class_name;
57         char* source_def;
58         osrfHash* class_def;      // Points into IDL
59         osrfHash* fields;         // Points into IDL
60         osrfHash* links;          // Points into IDL
61
62         // The remaining members are private and internal.  Client code should not
63         // access them directly.
64
65         ClassInfo* next;          // Supports linked list of joined classes
66         int in_use;               // boolean
67
68         // We usually store the alias and class name in the following arrays, and
69         // point the corresponding pointers at them.  When the string is too big
70         // for the array (which will probably never happen in practice), we strdup it.
71
72         char alias_store[ ALIAS_STORE_SIZE + 1 ];
73         char class_name_store[ CLASS_NAME_STORE_SIZE + 1 ];
74 };
75
76 struct QueryFrameStruct;
77 typedef struct QueryFrameStruct QueryFrame;
78
79 struct QueryFrameStruct {
80         ClassInfo core;
81         ClassInfo* join_list;  // linked list of classes joined to the core class
82         QueryFrame* next;      // implements stack as linked list
83         int in_use;            // boolean
84 };
85
86 static int timeout_needs_resetting;
87 static time_t time_next_reset;
88
89 int osrfAppChildInit();
90 int osrfAppInitialize();
91 void osrfAppChildExit();
92
93 static int verifyObjectClass ( osrfMethodContext*, const jsonObject* );
94
95 static void setXactId( osrfMethodContext* ctx );
96 static inline const char* getXactId( osrfMethodContext* ctx );
97 static inline void clearXactId( osrfMethodContext* ctx );
98
99 int beginTransaction ( osrfMethodContext* );
100 int commitTransaction ( osrfMethodContext* );
101 int rollbackTransaction ( osrfMethodContext* );
102
103 int setSavepoint ( osrfMethodContext* );
104 int releaseSavepoint ( osrfMethodContext* );
105 int rollbackSavepoint ( osrfMethodContext* );
106
107 int doJSONSearch ( osrfMethodContext* );
108
109 int dispatchCRUDMethod( osrfMethodContext* ctx );
110 static int doSearch( osrfMethodContext* ctx );
111 static int doIdList( osrfMethodContext* ctx );
112 static int doCreate( osrfMethodContext* ctx );
113 static int doRetrieve( osrfMethodContext* ctx );
114 static int doUpdate( osrfMethodContext* ctx );
115 static int doDelete ( osrfMethodContext* ctx );
116 static jsonObject* doFieldmapperSearch ( osrfMethodContext* ctx, osrfHash* class_meta,
117                 jsonObject* where_hash, jsonObject* query_hash, int* err );
118 static jsonObject* oilsMakeFieldmapperFromResult( dbi_result, osrfHash* );
119 static jsonObject* oilsMakeJSONFromResult( dbi_result );
120
121 static char* searchSimplePredicate ( const char* op, const char* class_alias,
122                                 osrfHash* field, const jsonObject* node );
123 static char* searchFunctionPredicate ( const char*, osrfHash*, const jsonObject*, const char* );
124 static char* searchFieldTransform ( const char*, osrfHash*, const jsonObject*);
125 static char* searchFieldTransformPredicate ( const ClassInfo*, osrfHash*, const jsonObject*, const char* );
126 static char* searchBETWEENPredicate ( const char*, osrfHash*, const jsonObject* );
127 static char* searchINPredicate ( const char*, osrfHash*,
128                                                                  jsonObject*, const char*, osrfMethodContext* );
129 static char* searchPredicate ( const ClassInfo*, osrfHash*, jsonObject*, osrfMethodContext* );
130 static char* searchJOIN ( const jsonObject*, const ClassInfo* left_info );
131 static char* searchWHERE ( const jsonObject*, const ClassInfo*, int, osrfMethodContext* );
132 static char* buildSELECT ( jsonObject*, jsonObject*, osrfHash*, osrfMethodContext* );
133 char* buildQuery( osrfMethodContext* ctx, jsonObject* query, int flags );
134
135 char* SELECT ( osrfMethodContext*, jsonObject*, jsonObject*, jsonObject*, jsonObject*, jsonObject*, jsonObject*, jsonObject*, int );
136
137 void userDataFree( void* );
138 static void sessionDataFree( char*, void* );
139 static char* getRelation( osrfHash* );
140 static int str_is_true( const char* str );
141 static int obj_is_true( const jsonObject* obj );
142 static const char* json_type( int code );
143 static const char* get_primitive( osrfHash* field );
144 static const char* get_datatype( osrfHash* field );
145 static int is_identifier( const char* s);
146 static int is_good_operator( const char* op );
147 static void pop_query_frame( void );
148 static void push_query_frame( void );
149 static int add_query_core( const char* alias, const char* class_name );
150 static inline ClassInfo* search_alias( const char* target );
151 static ClassInfo* search_all_alias( const char* target );
152 static ClassInfo* add_joined_class( const char* alias, const char* classname );
153 static void clear_query_stack( void );
154
155 static const jsonObject* verifyUserPCRUD( osrfMethodContext* );
156 static int verifyObjectPCRUD( osrfMethodContext*, const jsonObject* );
157 static char* org_tree_root( osrfMethodContext* ctx );
158 static jsonObject* single_hash( const char* key, const char* value );
159
160 static int child_initialized = 0;   /* boolean */
161
162 static dbi_conn writehandle; /* our MASTER db connection */
163 static dbi_conn dbhandle; /* our CURRENT db connection */
164 //static osrfHash * readHandles;
165 static jsonObject* const jsonNULL = NULL; //
166 static int max_flesh_depth = 100;
167
168 // The following points to the top of a stack of QueryFrames.  It's a little
169 // confusing because the top level of the query is at the bottom of the stack.
170 static QueryFrame* curr_query = NULL;
171
172 //----------------------------------
173
174 int oilsExtendIDL( void );
175
176 static dbi_conn writehandle; /* our MASTER db connection */
177 static dbi_conn dbhandle; /* our CURRENT db connection */
178 //static osrfHash * readHandles;
179
180 static const int enforce_pcrud = ENFORCE_PCRUD;     // Boolean
181 static const char modulename[] = MODULENAME;
182
183
184 /**
185         @brief Disconnect from the database.
186
187         This function is called when the server drone is about to terminate.
188 */
189 void osrfAppChildExit() {
190         osrfLogDebug(OSRF_LOG_MARK, "Child is exiting, disconnecting from database...");
191
192         int same = 0;
193         if (writehandle == dbhandle)
194                 same = 1;
195
196         if (writehandle) {
197                 dbi_conn_query(writehandle, "ROLLBACK;");
198                 dbi_conn_close(writehandle);
199                 writehandle = NULL;
200         }
201         if (dbhandle && !same)
202                 dbi_conn_close(dbhandle);
203
204         // XXX add cleanup of readHandles whenever that gets used
205
206         return;
207 }
208
209 /**
210         @brief Initialize the application.
211         @return Zero if successful, or non-zero if not.
212
213         Load the IDL file into an internal data structure for future reference.  Each non-virtual
214         class in the IDL corresponds to a table or view in the database, or to a subquery defined
215         in the IDL.  Ignore all virtual tables and virtual fields.
216
217         Register a number of methods, some of them general-purpose and others specific for
218         particular classes.
219
220         The name of the application is given by the MODULENAME macro, whose value depends on
221         conditional compilation.  The method names also incorporate MODULENAME, followed by a
222         dot, as a prefix.  Some methods are registered or not registered depending on whether
223         the PCRUD macro is defined, and on whether the IDL includes permacrud entries for the
224         class and method.
225
226         The general-purpose methods are as follows (minus their MODULENAME prefixes):
227
228         - json_query (not registered for PCRUD)
229         - transaction.begin
230         - transaction.commit
231         - transaction.rollback
232         - savepoint.set
233         - savepoint.release
234         - savepoint.rollback
235
236         For each non-virtual class, create up to eight class-specific methods:
237
238         - create    (not for readonly classes)
239         - retrieve
240         - update    (not for readonly classes)
241         - delete    (not for readonly classes
242         - search    (atomic and non-atomic versions)
243         - id_list   (atomic and non-atomic versions)
244
245         For PCRUD, the full method names follow the pattern "MODULENAME.method_type.classname".
246         Otherwise they follow the pattern "MODULENAME.direct.XXX.method_type", where XXX is the
247         fieldmapper name from the IDL, with every run of one or more consecutive colons replaced
248         by a period.  In addition, the names of atomic methods have a suffix of ".atomic".
249
250         This function is called when the registering the application, and is executed by the
251         listener before spawning the drones.
252 */
253 int osrfAppInitialize() {
254
255         osrfLogInfo(OSRF_LOG_MARK, "Initializing the CStore Server...");
256         osrfLogInfo(OSRF_LOG_MARK, "Finding XML file...");
257
258         if (!oilsIDLInit( osrf_settings_host_value("/IDL") ))
259                 return 1; /* return non-zero to indicate error */
260
261         growing_buffer* method_name = buffer_init(64);
262
263         if( ! enforce_pcrud ) {
264                 // Generic search thingy (not for PCRUD)
265                 buffer_add( method_name, modulename );
266                 buffer_add( method_name, ".json_query" );
267                 osrfAppRegisterMethod( modulename, OSRF_BUFFER_C_STR( method_name ),
268                         "doJSONSearch", "", 1, OSRF_METHOD_STREAMING );
269         }
270
271         // first we register all the transaction and savepoint methods
272         buffer_reset(method_name);
273         OSRF_BUFFER_ADD(method_name, modulename );
274         OSRF_BUFFER_ADD(method_name, ".transaction.begin");
275         osrfAppRegisterMethod( modulename, OSRF_BUFFER_C_STR( method_name ),
276                         "beginTransaction", "", 0, 0 );
277
278         buffer_reset(method_name);
279         OSRF_BUFFER_ADD(method_name, modulename );
280         OSRF_BUFFER_ADD(method_name, ".transaction.commit");
281         osrfAppRegisterMethod( modulename, OSRF_BUFFER_C_STR(method_name),
282                         "commitTransaction", "", 0, 0 );
283
284         buffer_reset(method_name);
285         OSRF_BUFFER_ADD(method_name, modulename );
286         OSRF_BUFFER_ADD(method_name, ".transaction.rollback");
287         osrfAppRegisterMethod( modulename, OSRF_BUFFER_C_STR(method_name),
288                         "rollbackTransaction", "", 0, 0 );
289
290         buffer_reset(method_name);
291         OSRF_BUFFER_ADD(method_name, modulename );
292         OSRF_BUFFER_ADD(method_name, ".savepoint.set");
293         osrfAppRegisterMethod( modulename, OSRF_BUFFER_C_STR(method_name),
294                         "setSavepoint", "", 1, 0 );
295
296         buffer_reset(method_name);
297         OSRF_BUFFER_ADD(method_name, modulename );
298         OSRF_BUFFER_ADD(method_name, ".savepoint.release");
299         osrfAppRegisterMethod( modulename, OSRF_BUFFER_C_STR(method_name),
300                         "releaseSavepoint", "", 1, 0 );
301
302         buffer_reset(method_name);
303         OSRF_BUFFER_ADD(method_name, modulename );
304         OSRF_BUFFER_ADD(method_name, ".savepoint.rollback");
305         osrfAppRegisterMethod( modulename, OSRF_BUFFER_C_STR(method_name),
306                         "rollbackSavepoint", "", 1, 0 );
307
308         static const char* global_method[] = {
309                 "create",
310                 "retrieve",
311                 "update",
312                 "delete",
313                 "search",
314                 "id_list"
315         };
316         const int global_method_count
317                 = sizeof( global_method ) / sizeof ( global_method[0] );
318
319         unsigned long class_count = osrfHashGetCount( oilsIDL() );
320         osrfLogDebug(OSRF_LOG_MARK, "%lu classes loaded", class_count );
321         osrfLogDebug(OSRF_LOG_MARK,
322                 "At most %lu methods will be generated",
323                 (unsigned long) (class_count * global_method_count) );
324
325         osrfHashIterator* class_itr = osrfNewHashIterator( oilsIDL() );
326         osrfHash* idlClass = NULL;
327
328         // For each class in the IDL...
329         while( (idlClass = osrfHashIteratorNext( class_itr ) ) ) {
330
331                 const char* classname = osrfHashIteratorKey( class_itr );
332                 osrfLogInfo(OSRF_LOG_MARK, "Generating class methods for %s", classname);
333
334                 if (!osrfStringArrayContains( osrfHashGet(idlClass, "controller"), modulename )) {
335                         osrfLogInfo(OSRF_LOG_MARK, "%s is not listed as a controller for %s, moving on",
336                                         modulename, classname);
337                         continue;
338                 }
339
340                 if ( str_is_true( osrfHashGet(idlClass, "virtual") ) ) {
341                         osrfLogDebug(OSRF_LOG_MARK, "Class %s is virtual, skipping", classname );
342                         continue;
343                 }
344
345                 // Look up some other attributes of the current class
346                 const char* idlClass_fieldmapper = osrfHashGet(idlClass, "fieldmapper");
347                 if( !idlClass_fieldmapper ) {
348                         osrfLogDebug( OSRF_LOG_MARK, "Skipping class \"%s\"; no fieldmapper in IDL",
349                                         classname );
350                         continue;
351                 }
352
353                 osrfHash* idlClass_permacrud = NULL;
354                 if( enforce_pcrud ) {
355                         // For PCRUD, ignore classes with no permacrud section
356                         idlClass_permacrud = osrfHashGet(idlClass, "permacrud");
357                         if (!idlClass_permacrud) {
358                                 osrfLogDebug( OSRF_LOG_MARK,
359                                         "Skipping class \"%s\"; no permacrud in IDL", classname );
360                                 continue;
361                         }
362                 }
363
364                 const char* readonly = osrfHashGet(idlClass, "readonly");
365
366                 int i;
367                 for( i = 0; i < global_method_count; ++i ) {  // for each global method
368                         const char* method_type = global_method[ i ];
369                         osrfLogDebug(OSRF_LOG_MARK,
370                                 "Using files to build %s class methods for %s", method_type, classname);
371
372                         if( enforce_pcrud ) {
373                                 // Treat "id_list" or "search" as forms of "retrieve"
374                                 const char* tmp_method = method_type;
375                                 if ( *tmp_method == 'i' || *tmp_method == 's') {  // "id_list" or "search"
376                                         tmp_method = "retrieve";
377                                 }
378                                 // Skip this method if there is no permacrud entry for it
379                                 if (!osrfHashGet( idlClass_permacrud, tmp_method ))
380                                         continue;
381                         }
382
383                         // No create, update, or delete methods for a readonly class
384                         if ( str_is_true( readonly )
385                                 && ( *method_type == 'c' || *method_type == 'u' || *method_type == 'd') )
386                                 continue;
387
388                         buffer_reset( method_name );
389
390                         // Build the method name
391                         if( enforce_pcrud ) {
392                                 // For PCRUD: MODULENAME.method_type.classname
393                                 buffer_fadd(method_name, "%s.%s.%s", modulename, method_type, classname);
394                         } else {
395                                 // For non-PCRUD: MODULENAME.MODULENAME.direct.XXX.method_type
396                                 // where XXX is the fieldmapper name from the IDL, with every run of
397                                 // one or more consecutive colons replaced by a period.
398                                 char* st_tmp = NULL;
399                                 char* part = NULL;
400                                 char* _fm = strdup( idlClass_fieldmapper );
401                                 part = strtok_r(_fm, ":", &st_tmp);
402
403                                 buffer_fadd(method_name, "%s.direct.%s", modulename, part);
404
405                                 while ((part = strtok_r(NULL, ":", &st_tmp))) {
406                                         OSRF_BUFFER_ADD_CHAR(method_name, '.');
407                                         OSRF_BUFFER_ADD(method_name, part);
408                                 }
409                                 OSRF_BUFFER_ADD_CHAR(method_name, '.');
410                                 OSRF_BUFFER_ADD(method_name, method_type);
411                                 free(_fm);
412                         }
413
414                         // For an id_list or search method we specify the OSRF_METHOD_STREAMING option.
415                         // The consequence is that we implicitly create an atomic method in addition to
416                         // the usual non-atomic method.
417                         int flags = 0;
418                         if (*method_type == 'i' || *method_type == 's') {  // id_list or search
419                                 flags = flags | OSRF_METHOD_STREAMING;
420                         }
421
422                         osrfHash* method_meta = osrfNewHash();
423                         osrfHashSet( method_meta, idlClass, "class");
424                         osrfHashSet( method_meta, buffer_data( method_name ), "methodname" );
425                         osrfHashSet( method_meta, strdup(method_type), "methodtype" );
426
427                         // Register the method, with a pointer to an osrfHash to tell the method
428                         // its name, type, and class.
429                         osrfAppRegisterExtendedMethod(
430                                 modulename,
431                                 OSRF_BUFFER_C_STR( method_name ),
432                                 "dispatchCRUDMethod",
433                                 "",
434                                 1,
435                                 flags,
436                                 (void*)method_meta
437                         );
438
439                 } // end for each global method
440         } // end for each class in IDL
441
442         buffer_free( method_name );
443         osrfHashIteratorFree( class_itr );
444
445         return 0;
446 }
447
448 /**
449         @brief Get a table name, view name, or subquery for use in a FROM clause.
450         @param class Pointer to the IDL class entry.
451         @return A table name, a view name, or a subquery in parentheses.
452
453         In some cases the IDL defines a class, not with a table name or a view name, but with
454         a SELECT statement, which may be used as a subquery.
455 */
456 static char* getRelation( osrfHash* class ) {
457
458         char* source_def = NULL;
459         const char* tabledef = osrfHashGet(class, "tablename");
460
461         if ( tabledef ) {
462                 source_def = strdup( tabledef );   // Return the name of a table or view
463         } else {
464                 tabledef = osrfHashGet( class, "source_definition" );
465                 if( tabledef ) {
466                         // Return a subquery, enclosed in parentheses
467                         source_def = safe_malloc( strlen( tabledef ) + 3 );
468                         source_def[ 0 ] = '(';
469                         strcpy( source_def + 1, tabledef );
470                         strcat( source_def, ")" );
471                 } else {
472                         // Not found: return an error
473                         const char* classname = osrfHashGet( class, "classname" );
474                         if( !classname )
475                                 classname = "???";
476                         osrfLogError(
477                                 OSRF_LOG_MARK,
478                                 "%s ERROR No tablename or source_definition for class \"%s\"",
479                                 modulename,
480                                 classname
481                         );
482                 }
483         }
484
485         return source_def;
486 }
487
488 /**
489         @brief Initialize a server drone.
490         @return Zero if successful, -1 if not.
491
492         Connect to the database.  For each non-virtual class in the IDL, execute a dummy "SELECT * "
493         query to get the datatype of each column.  Record the datatypes in the loaded IDL.
494
495         This function is called by a server drone shortly after it is spawned by the listener.
496 */
497 int osrfAppChildInit() {
498
499         osrfLogDebug(OSRF_LOG_MARK, "Attempting to initialize libdbi...");
500         dbi_initialize(NULL);
501         osrfLogDebug(OSRF_LOG_MARK, "... libdbi initialized.");
502
503         char* driver = osrf_settings_host_value("/apps/%s/app_settings/driver", modulename );
504         char* user   = osrf_settings_host_value("/apps/%s/app_settings/database/user", modulename );
505         char* host   = osrf_settings_host_value("/apps/%s/app_settings/database/host", modulename );
506         char* port   = osrf_settings_host_value("/apps/%s/app_settings/database/port", modulename );
507         char* db     = osrf_settings_host_value("/apps/%s/app_settings/database/db", modulename );
508         char* pw     = osrf_settings_host_value("/apps/%s/app_settings/database/pw", modulename );
509         char* md     = osrf_settings_host_value("/apps/%s/app_settings/max_query_recursion",
510                         modulename );
511
512         osrfLogDebug(OSRF_LOG_MARK, "Attempting to load the database driver [%s]...", driver);
513         writehandle = dbi_conn_new(driver);
514
515         if(!writehandle) {
516                 osrfLogError(OSRF_LOG_MARK, "Error loading database driver [%s]", driver);
517                 return -1;
518         }
519         osrfLogDebug(OSRF_LOG_MARK, "Database driver [%s] seems OK", driver);
520
521         osrfLogInfo(OSRF_LOG_MARK, "%s connecting to database.  host=%s, "
522                         "port=%s, user=%s, db=%s", modulename, host, port, user, db );
523
524         if(host) dbi_conn_set_option(writehandle, "host", host );
525         if(port) dbi_conn_set_option_numeric( writehandle, "port", atoi(port) );
526         if(user) dbi_conn_set_option(writehandle, "username", user);
527         if(pw)   dbi_conn_set_option(writehandle, "password", pw );
528         if(db)   dbi_conn_set_option(writehandle, "dbname", db );
529
530         if(md)                     max_flesh_depth = atoi(md);
531         if(max_flesh_depth < 0)    max_flesh_depth = 1;
532         if(max_flesh_depth > 1000) max_flesh_depth = 1000;
533
534         free(user);
535         free(host);
536         free(port);
537         free(db);
538         free(pw);
539
540         const char* err;
541         if (dbi_conn_connect(writehandle) < 0) {
542                 sleep(1);
543                 if (dbi_conn_connect(writehandle) < 0) {
544                         dbi_conn_error(writehandle, &err);
545                         osrfLogError( OSRF_LOG_MARK, "Error connecting to database: %s", err);
546                         return -1;
547                 }
548         }
549
550         osrfLogInfo(OSRF_LOG_MARK, "%s successfully connected to the database", modulename );
551
552         // Add datatypes from database to the fields in the IDL
553         if( oilsExtendIDL() ) {
554                 osrfLogError( OSRF_LOG_MARK, "Error extending the IDL" );
555                 return -1;
556         }
557         else
558                 return 0;
559 }
560
561 /**
562         @brief Add datatypes from the database to the fields in the IDL.
563         @return Zero if successful, or 1 upon error.
564
565         For each relevant class in the IDL: ask the database for the datatype of every field.
566         In particular, determine which fields are text fields and which fields are numeric
567         fields, so that we know whether to enclose their values in quotes.
568
569         At this writing this function does not detect any errors, so it always returns zero.
570 */
571 int oilsExtendIDL( void ) {
572         osrfHashIterator* class_itr = osrfNewHashIterator( oilsIDL() );
573         osrfHash* class = NULL;
574         growing_buffer* query_buf = buffer_init( 64 );
575
576         // For each class in the IDL...
577         while( (class = osrfHashIteratorNext( class_itr ) ) ) {
578                 const char* classname = osrfHashIteratorKey( class_itr );
579                 osrfHash* fields = osrfHashGet( class, "fields" );
580
581                 // If the class is virtual, ignore it
582                 if( str_is_true( osrfHashGet(class, "virtual") ) ) {
583                         osrfLogDebug(OSRF_LOG_MARK, "Class %s is virtual, skipping", classname );
584                         continue;
585                 }
586
587                 char* tabledef = getRelation(class);
588                 if( !tabledef )
589                         continue;   // No such relation -- a query of it would be doomed to failure
590
591                 buffer_reset( query_buf );
592                 buffer_fadd( query_buf, "SELECT * FROM %s AS x WHERE 1=0;", tabledef );
593
594                 free(tabledef);
595
596                 osrfLogDebug( OSRF_LOG_MARK, "%s Investigatory SQL = %s",
597                                 modulename, OSRF_BUFFER_C_STR( query_buf ) );
598
599                 dbi_result result = dbi_conn_query( writehandle, OSRF_BUFFER_C_STR( query_buf ) );
600                 if (result) {
601
602                         int columnIndex = 1;
603                         const char* columnName;
604                         osrfHash* _f;
605                         while( (columnName = dbi_result_get_field_name(result, columnIndex)) ) {
606
607                                 osrfLogInternal( OSRF_LOG_MARK, "Looking for column named [%s]...",
608                                                 columnName );
609
610                                 /* fetch the fieldmapper index */
611                                 if( (_f = osrfHashGet(fields, columnName)) ) {
612
613                                         osrfLogDebug(OSRF_LOG_MARK, "Found [%s] in IDL hash...", columnName);
614
615                                         /* determine the field type and storage attributes */
616
617                                         switch( dbi_result_get_field_type_idx(result, columnIndex) ) {
618
619                                                 case DBI_TYPE_INTEGER : {
620
621                                                         if ( !osrfHashGet(_f, "primitive") )
622                                                                 osrfHashSet(_f, "number", "primitive");
623
624                                                         int attr = dbi_result_get_field_attribs_idx(result, columnIndex);
625                                                         if( attr & DBI_INTEGER_SIZE8 )
626                                                                 osrfHashSet(_f, "INT8", "datatype");
627                                                         else
628                                                                 osrfHashSet(_f, "INT", "datatype");
629                                                         break;
630                                                 }
631                                                 case DBI_TYPE_DECIMAL :
632                                                         if ( !osrfHashGet(_f, "primitive") )
633                                                                 osrfHashSet(_f, "number", "primitive");
634
635                                                         osrfHashSet(_f,"NUMERIC", "datatype");
636                                                         break;
637
638                                                 case DBI_TYPE_STRING :
639                                                         if ( !osrfHashGet(_f, "primitive") )
640                                                                 osrfHashSet(_f,"string", "primitive");
641
642                                                         osrfHashSet(_f,"TEXT", "datatype");
643                                                         break;
644
645                                                 case DBI_TYPE_DATETIME :
646                                                         if ( !osrfHashGet(_f, "primitive") )
647                                                                 osrfHashSet(_f,"string", "primitive");
648
649                                                         osrfHashSet(_f,"TIMESTAMP", "datatype");
650                                                         break;
651
652                                                 case DBI_TYPE_BINARY :
653                                                         if ( !osrfHashGet(_f, "primitive") )
654                                                                 osrfHashSet(_f,"string", "primitive");
655
656                                                         osrfHashSet(_f,"BYTEA", "datatype");
657                                         }
658
659                                         osrfLogDebug(
660                                                 OSRF_LOG_MARK,
661                                                 "Setting [%s] to primitive [%s] and datatype [%s]...",
662                                                 columnName,
663                                                 osrfHashGet(_f, "primitive"),
664                                                 osrfHashGet(_f, "datatype")
665                                         );
666                                 }
667                                 ++columnIndex;
668                         } // end while loop for traversing columns of result
669                         dbi_result_free(result);
670                 } else {
671                         osrfLogDebug(OSRF_LOG_MARK, "No data found for class [%s]...", classname);
672                 }
673         } // end for each class in IDL
674
675         buffer_free( query_buf );
676         osrfHashIteratorFree( class_itr );
677         child_initialized = 1;
678         return 0;
679 }
680
681 /**
682         @brief Install a database driver.
683         @param conn Pointer to a database driver.
684
685         The driver is used to process quoted strings correctly.
686
687         This function is a sleazy hack, intended @em only for testing and debugging without
688         actually connecting to a database. Any real server process should initialize the
689         database connection by calling osrfAppChildInit().
690 */
691 void set_cstore_dbi_conn( dbi_conn conn ) {
692         dbhandle = writehandle = conn;
693 }
694
695 /**
696         @brief Free an osrfHash that stores a transaction ID.
697         @param blob A pointer to the osrfHash to be freed, cast to a void pointer.
698
699         This function is a callback, to be called by the application session when it ends.
700         The application session stores the osrfHash via an opaque pointer.
701
702         If the osrfHash contains an entry for the key "xact_id", it means that an
703         uncommitted transaction is pending.  Roll it back.
704 */
705 void userDataFree( void* blob ) {
706         osrfHash* hash = (osrfHash*) blob;
707         if( osrfHashGet( hash, "xact_id" ) && writehandle )
708                 dbi_conn_query( writehandle, "ROLLBACK;" );
709
710         osrfHashFree( hash );
711 }
712
713 /**
714         @name Managing session data
715         @brief Maintain data stored via the userData pointer of the application session.
716
717         Currently, session-level data is stored in an osrfHash.  Other arrangements are
718         possible, and some would be more efficient.  The application session calls a
719         callback function to free userData before terminating.
720
721         Currently, the only data we store at the session level is the transaction id.  By this
722         means we can ensure that any pending transactions are rolled back before the application
723         session terminates.
724 */
725 /*@{*/
726
727 /**
728         @brief Free an item in the application session's userData.
729         @param key The name of a key for an osrfHash.
730         @param item An opaque pointer to the item associated with the key.
731
732         We store an osrfHash as userData with the application session, and arrange (by
733         installing userDataFree() as a different callback) for the session to free that
734         osrfHash before terminating.
735
736         This function is a callback for freeing items in the osrfHash.  Currently we store
737         two things:
738         - Transaction id of a pending transaction; a character string.  Key: "xact_id".
739         - Authkey; a character string.  Key: "authkey".
740         - User object from the authentication server; a jsonObject.  Key: "user_login".
741
742         If we ever store anything else in userData, we will need to revisit this function so
743         that it will free whatever else needs freeing.
744 */
745 static void sessionDataFree( char* key, void* item ) {
746         if ( !strcmp( key, "xact_id" )
747              || !strcmp( key, "authkey" ) ) {
748                 free( item );
749         } else if( !strcmp( key, "user_login" ) )
750                 jsonObjectFree( (jsonObject*) item );
751 }
752
753 /**
754         @brief Save a transaction id.
755         @param ctx Pointer to the method context.
756
757         Save the session_id of the current application session as a transaction id.
758 */
759 static void setXactId( osrfMethodContext* ctx ) {
760         if( ctx && ctx->session ) {
761                 osrfAppSession* session = ctx->session;
762
763                 osrfHash* cache = session->userData;
764
765                 // If the session doesn't already have a hash, create one.  Make sure
766                 // that the application session frees the hash when it terminates.
767                 if( NULL == cache ) {
768                         session->userData = cache = osrfNewHash();
769                         osrfHashSetCallback( cache, &sessionDataFree );
770                         ctx->session->userDataFree = &userDataFree;
771                 }
772
773                 // Save the transaction id in the hash, with the key "xact_id"
774                 osrfHashSet( cache, strdup( session->session_id ), "xact_id" );
775         }
776 }
777
778 /**
779         @brief Get the transaction ID for the current transaction, if any.
780         @param ctx Pointer to the method context.
781         @return Pointer to the transaction ID.
782
783         The return value points to an internal buffer, and will become invalid upon issuing
784         a commit or rollback.
785 */
786 static inline const char* getXactId( osrfMethodContext* ctx ) {
787         if( ctx && ctx->session && ctx->session->userData )
788                 return osrfHashGet( (osrfHash*) ctx->session->userData, "xact_id" );
789         else
790                 return NULL;
791 }
792
793 /**
794         @brief Clear the current transaction id.
795         @param ctx Pointer to the method context.
796 */
797 static inline void clearXactId( osrfMethodContext* ctx ) {
798         if( ctx && ctx->session && ctx->session->userData )
799                 osrfHashRemove( ctx->session->userData, "xact_id" );
800 }
801 /*@}*/
802
803 /**
804         @brief Save the user's login in the userData for the current application session.
805         @param ctx Pointer to the method context.
806         @param user_login Pointer to the user login object to be cached (we cache the original,
807         not a copy of it).
808
809         If @a user_login is NULL, remove the user login if one is already cached.
810 */
811 static void setUserLogin( osrfMethodContext* ctx, jsonObject* user_login ) {
812         if( ctx && ctx->session ) {
813                 osrfAppSession* session = ctx->session;
814
815                 osrfHash* cache = session->userData;
816
817                 // If the session doesn't already have a hash, create one.  Make sure
818                 // that the application session frees the hash when it terminates.
819                 if( NULL == cache ) {
820                         session->userData = cache = osrfNewHash();
821                         osrfHashSetCallback( cache, &sessionDataFree );
822                         ctx->session->userDataFree = &userDataFree;
823                 }
824
825                 if( user_login )
826                         osrfHashSet( cache, user_login, "user_login" );
827                 else
828                         osrfHashRemove( cache, "user_login" );
829         }
830 }
831
832 /**
833         @brief Get the user login object for the current application session, if any.
834         @param ctx Pointer to the method context.
835         @return Pointer to the user login object if found; otherwise NULL.
836
837         The user login object was returned from the authentication server, and then cached so
838         we don't have to call the authentication server again for the same user.
839 */
840 static const jsonObject* getUserLogin( osrfMethodContext* ctx ) {
841         if( ctx && ctx->session && ctx->session->userData )
842                 return osrfHashGet( (osrfHash*) ctx->session->userData, "user_login" );
843         else
844                 return NULL;
845 }
846
847 /**
848         @brief Save a copy of an authkey in the userData of the current application session.
849         @param ctx Pointer to the method context.
850         @param authkey The authkey to be saved.
851
852         If @a authkey is NULL, remove the authkey if one is already cached.
853 */
854 static void setAuthkey( osrfMethodContext* ctx, const char* authkey ) {
855         if( ctx && ctx->session && authkey ) {
856                 osrfAppSession* session = ctx->session;
857                 osrfHash* cache = session->userData;
858
859                 // If the session doesn't already have a hash, create one.  Make sure
860                 // that the application session frees the hash when it terminates.
861                 if( NULL == cache ) {
862                         session->userData = cache = osrfNewHash();
863                         osrfHashSetCallback( cache, &sessionDataFree );
864                         ctx->session->userDataFree = &userDataFree;
865                 }
866
867                 // Save the transaction id in the hash, with the key "xact_id"
868                 if( authkey && *authkey )
869                         osrfHashSet( cache, strdup( authkey ), "authkey" );
870                 else
871                         osrfHashRemove( cache, "authkey" );
872         }
873 }
874
875 /**
876         @brief Reset the login timeout.
877         @param authkey The authentication key for the current login session.
878         @param now The current time.
879         @return Zero if successful, or 1 if not.
880
881         Tell the authentication server to reset the timeout so that the login session won't
882         expire for a while longer.
883
884         We could dispense with the @a now parameter by calling time().  But we just called
885         time() in order to decide whether to reset the timeout, so we might as well reuse
886         the result instead of calling time() again.
887 */
888 static int reset_timeout( const char* authkey, time_t now ) {
889         jsonObject* auth_object = jsonNewObject( authkey );
890
891         // Ask the authentication server to reset the timeout.  It returns an event
892         // indicating success or failure.
893         jsonObject* result = oilsUtilsQuickReq( "open-ils.auth",
894                 "open-ils.auth.session.reset_timeout", auth_object );
895         jsonObjectFree(auth_object);
896
897         if( !result || result->type != JSON_HASH ) {
898                 osrfLogError( OSRF_LOG_MARK,
899                          "Unexpected object type receieved from open-ils.auth.session.reset_timeout" );
900                 jsonObjectFree( result );
901                 return 1;       // Not the right sort of object returned
902         }
903
904         const jsonObject* ilsevent = jsonObjectGetKey( result, "ilsevent" );
905         if( !ilsevent || ilsevent->type != JSON_NUMBER ) {
906                 osrfLogError( OSRF_LOG_MARK, "ilsevent is absent or malformed" );
907                 jsonObjectFree( result );
908                 return 1;    // Return code from method not available
909         }
910
911         if( jsonObjectGetNumber( ilsevent ) != 0.0 ){
912                 const char* desc = jsonObjectGetString( jsonObjectGetKey( result, "desc" ) );
913                 if( !desc )
914                         desc = "(No reason available)";    // failsafe; shouldn't happen
915                 osrfLogInfo( OSRF_LOG_MARK, "Failure to reset timeout: %s", desc );
916                 jsonObjectFree( result );
917                 return 1;
918         }
919
920         // Revise our local proxy for the timeout deadline
921         // by a smallish fraction of the timeout interval
922         const char* timeout = jsonObjectGetString( jsonObjectGetKey( result, "payload" ) );
923         if( !timeout )
924                 timeout = "1";   // failsafe; shouldn't happen
925         time_next_reset = now + atoi( timeout ) / 15;
926
927         jsonObjectFree( result );
928         return 0;     // Successfully reset timeout
929 }
930
931 /**
932         @brief Get the authkey string for the current application session, if any.
933         @param ctx Pointer to the method context.
934         @return Pointer to the cached authkey if found; otherwise NULL.
935
936         If present, the authkey string was cached from a previous method call.
937 */
938 static const char* getAuthkey( osrfMethodContext* ctx ) {
939         if( ctx && ctx->session && ctx->session->userData ) {
940                 const char* authkey = osrfHashGet( (osrfHash*) ctx->session->userData, "authkey" );
941
942                 // Possibly reset the authentication timeout to keep the login alive.  We do so
943                 // no more than once per method call, and not at all if it has been only a short
944                 // time since the last reset.
945
946                 // Here we reset explicitly, if at all.  We also implicitly reset the timeout
947                 // whenever we call the "open-ils.auth.session.retrieve" method.
948                 if( timeout_needs_resetting ) {
949                         time_t now = time( NULL );
950                         if( now >= time_next_reset && reset_timeout( authkey, now ) )
951                                 authkey = NULL;    // timeout has apparently expired already
952                 }
953
954                 timeout_needs_resetting = 0;
955                 return authkey;
956         }
957         else
958                 return NULL;
959 }
960
961 /**
962         @brief Implement the transaction.begin method.
963         @param ctx Pointer to the method context.
964         @return Zero if successful, or -1 upon error.
965
966         Start a transaction.  Save a transaction ID for future reference.
967
968         Method parameters:
969         - authkey (PCRUD only)
970
971         Return to client: Transaction ID
972 */
973 int beginTransaction ( osrfMethodContext* ctx ) {
974         if(osrfMethodVerifyContext( ctx )) {
975                 osrfLogError( OSRF_LOG_MARK,  "Invalid method context" );
976                 return -1;
977         }
978
979         if( enforce_pcrud ) {
980                 timeout_needs_resetting = 1;
981                 const jsonObject* user = verifyUserPCRUD( ctx );
982                 if (!user)
983                         return -1;
984         }
985
986         dbi_result result = dbi_conn_query(writehandle, "START TRANSACTION;");
987         if (!result) {
988                 osrfLogError(OSRF_LOG_MARK, "%s: Error starting transaction", modulename );
989                 osrfAppSessionStatus( ctx->session, OSRF_STATUS_INTERNALSERVERERROR,
990                                 "osrfMethodException", ctx->request, "Error starting transaction" );
991                 return -1;
992         } else {
993                 setXactId( ctx );
994                 jsonObject* ret = jsonNewObject( getXactId( ctx ) );
995                 osrfAppRespondComplete( ctx, ret );
996                 jsonObjectFree(ret);
997         }
998         return 0;
999 }
1000
1001 /**
1002         @brief Implement the savepoint.set method.
1003         @param ctx Pointer to the method context.
1004         @return Zero if successful, or -1 if not.
1005
1006         Issue a SAVEPOINT to the database server.
1007
1008         Method parameters:
1009         - authkey (PCRUD only)
1010         - savepoint name
1011
1012         Return to client: Savepoint name
1013 */
1014 int setSavepoint ( osrfMethodContext* ctx ) {
1015         if(osrfMethodVerifyContext( ctx )) {
1016                 osrfLogError( OSRF_LOG_MARK,  "Invalid method context" );
1017                 return -1;
1018         }
1019
1020         int spNamePos = 0;
1021         if( enforce_pcrud ) {
1022                 spNamePos = 1;
1023                 timeout_needs_resetting = 1;
1024                 const jsonObject* user = verifyUserPCRUD( ctx );
1025                 if (!user)
1026                         return -1;
1027         }
1028
1029         // Verify that a transaction is pending
1030         const char* trans_id = getXactId( ctx );
1031         if( NULL == trans_id ) {
1032                 osrfAppSessionStatus(
1033                         ctx->session,
1034                         OSRF_STATUS_INTERNALSERVERERROR,
1035                         "osrfMethodException",
1036                         ctx->request,
1037                         "No active transaction -- required for savepoints"
1038                 );
1039                 return -1;
1040         }
1041
1042         // Get the savepoint name from the method params
1043         const char* spName = jsonObjectGetString( jsonObjectGetIndex(ctx->params, spNamePos) );
1044
1045         dbi_result result = dbi_conn_queryf(writehandle, "SAVEPOINT \"%s\";", spName);
1046         if (!result) {
1047                 osrfLogError(
1048                         OSRF_LOG_MARK,
1049                         "%s: Error creating savepoint %s in transaction %s",
1050                         modulename,
1051                         spName,
1052                         trans_id
1053                 );
1054                 osrfAppSessionStatus( ctx->session, OSRF_STATUS_INTERNALSERVERERROR,
1055                                 "osrfMethodException", ctx->request, "Error creating savepoint" );
1056                 return -1;
1057         } else {
1058                 jsonObject* ret = jsonNewObject(spName);
1059                 osrfAppRespondComplete( ctx, ret );
1060                 jsonObjectFree(ret);
1061         }
1062         return 0;
1063 }
1064
1065 /**
1066         @brief Implement the savepoint.release method.
1067         @param ctx Pointer to the method context.
1068         @return Zero if successful, or -1 if not.
1069
1070         Issue a RELEASE SAVEPOINT to the database server.
1071
1072         Method parameters:
1073         - authkey (PCRUD only)
1074         - savepoint name
1075
1076         Return to client: Savepoint name
1077 */
1078 int releaseSavepoint ( osrfMethodContext* ctx ) {
1079         if(osrfMethodVerifyContext( ctx )) {
1080                 osrfLogError( OSRF_LOG_MARK,  "Invalid method context" );
1081                 return -1;
1082         }
1083
1084         int spNamePos = 0;
1085         if( enforce_pcrud ) {
1086                 spNamePos = 1;
1087                 timeout_needs_resetting = 1;
1088                 const jsonObject* user = verifyUserPCRUD( ctx );
1089                 if (!user)
1090                         return -1;
1091         }
1092
1093         // Verify that a transaction is pending
1094         const char* trans_id = getXactId( ctx );
1095         if( NULL == trans_id ) {
1096                 osrfAppSessionStatus(
1097                         ctx->session,
1098                         OSRF_STATUS_INTERNALSERVERERROR,
1099                         "osrfMethodException",
1100                         ctx->request,
1101                         "No active transaction -- required for savepoints"
1102                 );
1103                 return -1;
1104         }
1105
1106         // Get the savepoint name from the method params
1107         const char* spName = jsonObjectGetString( jsonObjectGetIndex(ctx->params, spNamePos) );
1108
1109         dbi_result result = dbi_conn_queryf(writehandle, "RELEASE SAVEPOINT \"%s\";", spName);
1110         if (!result) {
1111                 osrfLogError(
1112                         OSRF_LOG_MARK,
1113                         "%s: Error releasing savepoint %s in transaction %s",
1114                         modulename,
1115                         spName,
1116                         trans_id
1117                 );
1118                 osrfAppSessionStatus( ctx->session, OSRF_STATUS_INTERNALSERVERERROR,
1119                                 "osrfMethodException", ctx->request, "Error releasing savepoint" );
1120                 return -1;
1121         } else {
1122                 jsonObject* ret = jsonNewObject(spName);
1123                 osrfAppRespondComplete( ctx, ret );
1124                 jsonObjectFree(ret);
1125         }
1126         return 0;
1127 }
1128
1129 /**
1130         @brief Implement the savepoint.rollback method.
1131         @param ctx Pointer to the method context.
1132         @return Zero if successful, or -1 if not.
1133
1134         Issue a ROLLBACK TO SAVEPOINT to the database server.
1135
1136         Method parameters:
1137         - authkey (PCRUD only)
1138         - savepoint name
1139
1140         Return to client: Savepoint name
1141 */
1142 int rollbackSavepoint ( osrfMethodContext* ctx ) {
1143         if(osrfMethodVerifyContext( ctx )) {
1144                 osrfLogError( OSRF_LOG_MARK,  "Invalid method context" );
1145                 return -1;
1146         }
1147
1148         int spNamePos = 0;
1149         if( enforce_pcrud ) {
1150                 spNamePos = 1;
1151                 timeout_needs_resetting = 1;
1152                 const jsonObject* user = verifyUserPCRUD( ctx );
1153                 if (!user)
1154                         return -1;
1155         }
1156
1157         // Verify that a transaction is pending
1158         const char* trans_id = getXactId( ctx );
1159         if( NULL == trans_id ) {
1160                 osrfAppSessionStatus(
1161                         ctx->session,
1162                         OSRF_STATUS_INTERNALSERVERERROR,
1163                         "osrfMethodException",
1164                         ctx->request,
1165                         "No active transaction -- required for savepoints"
1166                 );
1167                 return -1;
1168         }
1169
1170         // Get the savepoint name from the method params
1171         const char* spName = jsonObjectGetString( jsonObjectGetIndex(ctx->params, spNamePos) );
1172
1173         dbi_result result = dbi_conn_queryf(writehandle, "ROLLBACK TO SAVEPOINT \"%s\";", spName);
1174         if (!result) {
1175                 osrfLogError(
1176                         OSRF_LOG_MARK,
1177                         "%s: Error rolling back savepoint %s in transaction %s",
1178                         modulename,
1179                         spName,
1180                         trans_id
1181                 );
1182                 osrfAppSessionStatus( ctx->session, OSRF_STATUS_INTERNALSERVERERROR,
1183                                 "osrfMethodException", ctx->request, "Error rolling back savepoint" );
1184                 return -1;
1185         } else {
1186                 jsonObject* ret = jsonNewObject(spName);
1187                 osrfAppRespondComplete( ctx, ret );
1188                 jsonObjectFree(ret);
1189         }
1190         return 0;
1191 }
1192
1193 /**
1194         @brief Implement the transaction.commit method.
1195         @param ctx Pointer to the method context.
1196         @return Zero if successful, or -1 if not.
1197
1198         Issue a COMMIT to the database server.
1199
1200         Method parameters:
1201         - authkey (PCRUD only)
1202
1203         Return to client: Transaction ID.
1204 */
1205 int commitTransaction ( osrfMethodContext* ctx ) {
1206         if(osrfMethodVerifyContext( ctx )) {
1207                 osrfLogError( OSRF_LOG_MARK, "Invalid method context" );
1208                 return -1;
1209         }
1210
1211         if( enforce_pcrud ) {
1212                 timeout_needs_resetting = 1;
1213                 const jsonObject* user = verifyUserPCRUD( ctx );
1214                 if (!user)
1215                         return -1;
1216         }
1217
1218         // Verify that a transaction is pending
1219         const char* trans_id = getXactId( ctx );
1220         if( NULL == trans_id ) {
1221                 osrfAppSessionStatus( ctx->session, OSRF_STATUS_INTERNALSERVERERROR,
1222                                 "osrfMethodException", ctx->request, "No active transaction to commit" );
1223                 return -1;
1224         }
1225
1226         dbi_result result = dbi_conn_query(writehandle, "COMMIT;");
1227         if (!result) {
1228                 osrfLogError(OSRF_LOG_MARK, "%s: Error committing transaction", modulename );
1229                 osrfAppSessionStatus( ctx->session, OSRF_STATUS_INTERNALSERVERERROR,
1230                                 "osrfMethodException", ctx->request, "Error committing transaction" );
1231                 return -1;
1232         } else {
1233                 jsonObject* ret = jsonNewObject( trans_id );
1234                 osrfAppRespondComplete( ctx, ret );
1235                 jsonObjectFree(ret);
1236                 clearXactId( ctx );
1237         }
1238         return 0;
1239 }
1240
1241 /**
1242         @brief Implement the transaction.rollback method.
1243         @param ctx Pointer to the method context.
1244         @return Zero if successful, or -1 if not.
1245
1246         Issue a ROLLBACK to the database server.
1247
1248         Method parameters:
1249         - authkey (PCRUD only)
1250
1251         Return to client: Transaction ID
1252 */
1253 int rollbackTransaction ( osrfMethodContext* ctx ) {
1254         if(osrfMethodVerifyContext( ctx )) {
1255                 osrfLogError( OSRF_LOG_MARK,  "Invalid method context" );
1256                 return -1;
1257         }
1258
1259         if( enforce_pcrud ) {
1260                 timeout_needs_resetting = 1;
1261                 const jsonObject* user = verifyUserPCRUD( ctx );
1262                 if (!user)
1263                         return -1;
1264         }
1265
1266         // Verify that a transaction is pending
1267         const char* trans_id = getXactId( ctx );
1268         if( NULL == trans_id ) {
1269                 osrfAppSessionStatus( ctx->session, OSRF_STATUS_INTERNALSERVERERROR,
1270                                 "osrfMethodException", ctx->request, "No active transaction to roll back" );
1271                 return -1;
1272         }
1273
1274         dbi_result result = dbi_conn_query(writehandle, "ROLLBACK;");
1275         if (!result) {
1276                 osrfLogError(OSRF_LOG_MARK, "%s: Error rolling back transaction", modulename );
1277                 osrfAppSessionStatus( ctx->session, OSRF_STATUS_INTERNALSERVERERROR,
1278                                 "osrfMethodException", ctx->request, "Error rolling back transaction" );
1279                 return -1;
1280         } else {
1281                 jsonObject* ret = jsonNewObject( trans_id );
1282                 osrfAppRespondComplete( ctx, ret );
1283                 jsonObjectFree(ret);
1284                 clearXactId( ctx );
1285         }
1286         return 0;
1287 }
1288
1289 /**
1290         @brief Implement the class-specific methods.
1291         @param ctx Pointer to the method context.
1292         @return Zero if successful, or -1 if not.
1293
1294         Branch on the method type: create, retrieve, update, delete, search, or id_list.
1295
1296         The method parameters and the type of value returned to the client depend on the method
1297         type.  However, for PCRUD methods, the first method parameter should always be an
1298         authkey.
1299 */
1300 int dispatchCRUDMethod ( osrfMethodContext* ctx ) {
1301
1302         // Get the method type, then can branch on it
1303         osrfHash* method_meta = (osrfHash*) ctx->method->userData;
1304         const char* methodtype = osrfHashGet( method_meta, "methodtype" );
1305
1306         if( !strcmp( methodtype, "create" ))
1307                 return doCreate( ctx );
1308         else if( !strcmp(methodtype, "retrieve" ))
1309                 return doRetrieve( ctx );
1310         else if( !strcmp(methodtype, "update" ))
1311                 return doUpdate( ctx );
1312         else if( !strcmp(methodtype, "delete" ))
1313                 return doDelete( ctx );
1314         else if( !strcmp(methodtype, "search" ))
1315                 return doSearch( ctx );
1316         else if( !strcmp(methodtype, "id_list" ))
1317                 return doIdList( ctx );
1318         else {
1319                 osrfAppRespondComplete( ctx, NULL );      // should be unreachable...
1320                 return 0;
1321         }
1322 }
1323
1324 /**
1325         @brief Implement the "search" method.
1326         @param ctx Pointer to the method context.
1327         @return Zero if successful, or -1 if not.
1328
1329         Method parameters:
1330         - authkey (PCRUD only)
1331         - WHERE clause, as jsonObject
1332         - Other SQL clause(s), as a JSON_HASH: joins, SELECT list, LIMIT, etc.
1333
1334         Return to client: rows of the specified class that satisfy a specified WHERE clause.
1335         Optionally flesh linked fields.
1336 */
1337 static int doSearch( osrfMethodContext* ctx ) {
1338         if(osrfMethodVerifyContext( ctx )) {
1339                 osrfLogError( OSRF_LOG_MARK, "Invalid method context" );
1340                 return -1;
1341         }
1342
1343         if( enforce_pcrud )
1344                 timeout_needs_resetting = 1;
1345
1346         jsonObject* where_clause;
1347         jsonObject* rest_of_query;
1348
1349         if( enforce_pcrud ) {
1350                 where_clause  = jsonObjectGetIndex( ctx->params, 1 );
1351                 rest_of_query = jsonObjectGetIndex( ctx->params, 2 );
1352         } else {
1353                 where_clause  = jsonObjectGetIndex( ctx->params, 0 );
1354                 rest_of_query = jsonObjectGetIndex( ctx->params, 1 );
1355         }
1356
1357         // Get the class metadata
1358         osrfHash* method_meta = (osrfHash*) ctx->method->userData;
1359         osrfHash* class_meta = osrfHashGet( method_meta, "class" );
1360
1361         // Do the query
1362         int err = 0;
1363         jsonObject* obj = doFieldmapperSearch( ctx, class_meta, where_clause, rest_of_query, &err );
1364         if( err ) {
1365                 osrfAppRespondComplete( ctx, NULL );
1366                 return -1;
1367         }
1368
1369         // Return each row to the client (except that some may be suppressed by PCRUD)
1370         jsonObject* cur = 0;
1371         unsigned long res_idx = 0;
1372         while((cur = jsonObjectGetIndex( obj, res_idx++ ) )) {
1373                 if( enforce_pcrud && !verifyObjectPCRUD( ctx, cur ))
1374                         continue;
1375                 osrfAppRespond( ctx, cur );
1376         }
1377         jsonObjectFree( obj );
1378
1379         osrfAppRespondComplete( ctx, NULL );
1380         return 0;
1381 }
1382
1383 /**
1384         @brief Implement the "id_list" method.
1385         @param ctx Pointer to the method context.
1386         @param err Pointer through which to return an error code.
1387         @return Zero if successful, or -1 if not.
1388
1389         Method parameters:
1390         - authkey (PCRUD only)
1391         - WHERE clause, as jsonObject
1392         - Other SQL clause(s), as a JSON_HASH: joins, LIMIT, etc.
1393
1394         Return to client: The primary key values for all rows of the relevant class that
1395         satisfy a specified WHERE clause.
1396
1397         This method relies on the assumption that every class has a primary key consisting of
1398         a single column.
1399 */
1400 static int doIdList( osrfMethodContext* ctx ) {
1401         if(osrfMethodVerifyContext( ctx )) {
1402                 osrfLogError( OSRF_LOG_MARK, "Invalid method context" );
1403                 return -1;
1404         }
1405
1406         if( enforce_pcrud )
1407                 timeout_needs_resetting = 1;
1408
1409         jsonObject* where_clause;
1410         jsonObject* rest_of_query;
1411
1412         // We use the where clause without change.  But we need to massage the rest of the
1413         // query, so we work with a copy of it instead of modifying the original.
1414
1415         if( enforce_pcrud ) {
1416                 where_clause  = jsonObjectGetIndex( ctx->params, 1 );
1417                 rest_of_query = jsonObjectClone( jsonObjectGetIndex( ctx->params, 2 ) );
1418         } else {
1419                 where_clause  = jsonObjectGetIndex( ctx->params, 0 );
1420                 rest_of_query = jsonObjectClone( jsonObjectGetIndex( ctx->params, 1 ) );
1421         }
1422
1423         // Eliminate certain SQL clauses, if present.
1424         if ( rest_of_query ) {
1425                 jsonObjectRemoveKey( rest_of_query, "select" );
1426                 jsonObjectRemoveKey( rest_of_query, "no_i18n" );
1427                 jsonObjectRemoveKey( rest_of_query, "flesh" );
1428                 jsonObjectRemoveKey( rest_of_query, "flesh_columns" );
1429         } else {
1430                 rest_of_query = jsonNewObjectType( JSON_HASH );
1431         }
1432
1433         jsonObjectSetKey( rest_of_query, "no_i18n", jsonNewBoolObject( 1 ) );
1434
1435         // Get the class metadata
1436         osrfHash* method_meta = (osrfHash*) ctx->method->userData;
1437         osrfHash* class_meta = osrfHashGet( method_meta, "class" );
1438
1439         // Build a SELECT list containing just the primary key,
1440         // i.e. like { "classname":["keyname"] }
1441         jsonObject* col_list_obj = jsonNewObjectType( JSON_ARRAY );
1442
1443         // Load array with name of primary key
1444         jsonObjectPush( col_list_obj, jsonNewObject( osrfHashGet( class_meta, "primarykey" ) ) );
1445         jsonObject* select_clause = jsonNewObjectType( JSON_HASH );
1446         jsonObjectSetKey( select_clause, osrfHashGet( class_meta, "classname" ), col_list_obj );
1447
1448         jsonObjectSetKey( rest_of_query, "select", select_clause );
1449
1450         // Do the query
1451         int err = 0;
1452         jsonObject* obj =
1453                 doFieldmapperSearch( ctx, class_meta, where_clause, rest_of_query, &err );
1454
1455         jsonObjectFree( rest_of_query );
1456         if( err ) {
1457                 osrfAppRespondComplete( ctx, NULL );
1458                 return -1;
1459         }
1460
1461         // Return each primary key value to the client
1462         jsonObject* cur;
1463         unsigned long res_idx = 0;
1464         while((cur = jsonObjectGetIndex( obj, res_idx++ ) )) {
1465                 if( enforce_pcrud && !verifyObjectPCRUD( ctx, cur ))
1466                         continue;        // Suppress due to lack of permission
1467                 else
1468                         osrfAppRespond( ctx,
1469                                 oilsFMGetObject( cur, osrfHashGet( class_meta, "primarykey" ) ) );
1470         }
1471
1472         jsonObjectFree( obj );
1473         osrfAppRespondComplete( ctx, NULL );
1474         return 0;
1475 }
1476
1477 /**
1478         @brief Verify that we have a valid class reference.
1479         @param ctx Pointer to the method context.
1480         @param param Pointer to the method parameters.
1481         @return 1 if the class reference is valid, or zero if it isn't.
1482
1483         The class of the method params must match the class to which the method id devoted.
1484         For PCRUD there are additional restrictions.
1485 */
1486 static int verifyObjectClass ( osrfMethodContext* ctx, const jsonObject* param ) {
1487
1488         osrfHash* method_meta = (osrfHash*) ctx->method->userData;
1489         osrfHash* class = osrfHashGet( method_meta, "class" );
1490
1491         // Compare the method's class to the parameters' class
1492         if (!param->classname || (strcmp( osrfHashGet(class, "classname"), param->classname ))) {
1493
1494                 // Oops -- they don't match.  Complain.
1495                 growing_buffer* msg = buffer_init(128);
1496                 buffer_fadd(
1497                         msg,
1498                         "%s: %s method for type %s was passed a %s",
1499                         modulename,
1500                         osrfHashGet(method_meta, "methodtype"),
1501                         osrfHashGet(class, "classname"),
1502                         param->classname ? param->classname : "(null)"
1503                 );
1504
1505                 char* m = buffer_release(msg);
1506                 osrfAppSessionStatus( ctx->session, OSRF_STATUS_BADREQUEST, "osrfMethodException",
1507                                 ctx->request, m );
1508                 free(m);
1509
1510                 return 0;
1511         }
1512
1513         if( enforce_pcrud )
1514                 return verifyObjectPCRUD( ctx, param );
1515         else
1516                 return 1;
1517 }
1518
1519 /**
1520         @brief (PCRUD only) Verify that the user is properly logged in.
1521         @param ctx Pointer to the method context.
1522         @return If the user is logged in, a pointer to the user object from the authentication
1523         server; otherwise NULL.
1524 */
1525 static const jsonObject* verifyUserPCRUD( osrfMethodContext* ctx ) {
1526
1527         // Get the authkey (the first method parameter)
1528         const char* auth = jsonObjectGetString( jsonObjectGetIndex( ctx->params, 0 ) );
1529
1530         // See if we have the same authkey, and a user object,
1531         // locally cached from a previous call
1532         const char* cached_authkey = getAuthkey( ctx );
1533         if( cached_authkey && !strcmp( cached_authkey, auth ) ) {
1534                 const jsonObject* cached_user = getUserLogin( ctx );
1535                 if( cached_user )
1536                         return cached_user;
1537         }
1538
1539         // We have no matching authentication data in the cache.  Authenticate from scratch.
1540         jsonObject* auth_object = jsonNewObject(auth);
1541
1542         // Fetch the user object from the authentication server
1543         jsonObject* user = oilsUtilsQuickReq( "open-ils.auth", "open-ils.auth.session.retrieve",
1544                         auth_object );
1545         jsonObjectFree(auth_object);
1546
1547         if (!user->classname || strcmp(user->classname, "au")) {
1548
1549                 growing_buffer* msg = buffer_init(128);
1550                 buffer_fadd(
1551                         msg,
1552                         "%s: permacrud received a bad auth token: %s",
1553                         modulename,
1554                         auth
1555                 );
1556
1557                 char* m = buffer_release(msg);
1558                 osrfAppSessionStatus( ctx->session, OSRF_STATUS_UNAUTHORIZED, "osrfMethodException",
1559                                 ctx->request, m );
1560
1561                 free(m);
1562                 jsonObjectFree(user);
1563                 user = NULL;
1564         }
1565
1566         setUserLogin( ctx, user );
1567         setAuthkey( ctx, auth );
1568
1569         // Allow ourselves up to a second before we have to reset the login timeout.
1570         // It would be nice to use some fraction of the timeout interval enforced by the
1571         // authentication server, but that value is not readily available at this point.
1572         // Instead, we use a conservative default interval.
1573         time_next_reset = time( NULL ) + 1;
1574
1575         return user;
1576 }
1577
1578 static int verifyObjectPCRUD (  osrfMethodContext* ctx, const jsonObject* obj ) {
1579
1580         dbhandle = writehandle;
1581
1582         osrfHash* method_metadata = (osrfHash*) ctx->method->userData;
1583         osrfHash* class = osrfHashGet( method_metadata, "class" );
1584         const char* method_type = osrfHashGet( method_metadata, "methodtype" );
1585         int fetch = 0;
1586
1587         if ( ( *method_type == 's' || *method_type == 'i' ) ) {
1588                 method_type = "retrieve"; // search and id_list are equivalent to retrieve for this
1589         } else if ( *method_type == 'u' || *method_type == 'd' ) {
1590                 fetch = 1; // MUST go to the db for the object for update and delete
1591         }
1592
1593         osrfHash* pcrud = osrfHashGet( osrfHashGet(class, "permacrud"), method_type );
1594
1595         if (!pcrud) {
1596                 // No permacrud for this method type on this class
1597
1598                 growing_buffer* msg = buffer_init(128);
1599                 buffer_fadd(
1600                         msg,
1601                         "%s: %s on class %s has no permacrud IDL entry",
1602                         modulename,
1603                         osrfHashGet(method_metadata, "methodtype"),
1604                         osrfHashGet(class, "classname")
1605                 );
1606
1607                 char* m = buffer_release(msg);
1608                 osrfAppSessionStatus( ctx->session, OSRF_STATUS_FORBIDDEN,
1609                                 "osrfMethodException", ctx->request, m );
1610
1611                 free(m);
1612
1613                 return 0;
1614         }
1615
1616         const jsonObject* user = verifyUserPCRUD( ctx );
1617         if (!user)
1618                 return 0;
1619
1620         int userid = atoi( oilsFMGetString( user, "id" ) );
1621
1622         osrfStringArray* permission = osrfHashGet(pcrud, "permission");
1623         osrfStringArray* local_context = osrfHashGet(pcrud, "local_context");
1624         osrfHash* foreign_context = osrfHashGet(pcrud, "foreign_context");
1625
1626         osrfStringArray* context_org_array = osrfNewStringArray(1);
1627
1628         int err = 0;
1629         char* pkey_value = NULL;
1630         if ( str_is_true( osrfHashGet(pcrud, "global_required") ) ) {
1631                 osrfLogDebug( OSRF_LOG_MARK,
1632                                 "global-level permissions required, fetching top of the org tree" );
1633
1634                 // check for perm at top of org tree
1635                 char* org_tree_root_id = org_tree_root( ctx );
1636                 if( org_tree_root_id ) {
1637                         osrfStringArrayAdd( context_org_array, org_tree_root_id );
1638                         osrfLogDebug( OSRF_LOG_MARK, "top of the org tree is %s", org_tree_root_id );
1639                 } else  {
1640                         osrfStringArrayFree( context_org_array );
1641                         return 0;
1642                 }
1643
1644         } else {
1645             osrfLogDebug( OSRF_LOG_MARK, "global-level permissions not required, "
1646                                 "fetching context org ids" );
1647             const char* pkey = osrfHashGet(class, "primarykey");
1648                 jsonObject *param = NULL;
1649
1650                 if (obj->classname) {
1651                         pkey_value = oilsFMGetString( obj, pkey );
1652                         if (!fetch)
1653                                 param = jsonObjectClone(obj);
1654                         osrfLogDebug( OSRF_LOG_MARK, "Object supplied, using primary key value of %s",
1655                                         pkey_value );
1656                 } else {
1657                         pkey_value = jsonObjectToSimpleString( obj );
1658                         fetch = 1;
1659                         osrfLogDebug( OSRF_LOG_MARK, "Object not supplied, using primary key value "
1660                                         "of %s and retrieving from the database", pkey_value );
1661                 }
1662
1663                 if (fetch) {
1664                         jsonObject* _tmp_params = single_hash( pkey, pkey_value );
1665                         jsonObject* _list = doFieldmapperSearch( ctx, class, _tmp_params, NULL, &err );
1666                         jsonObjectFree(_tmp_params);
1667
1668                         param = jsonObjectExtractIndex(_list, 0);
1669                         jsonObjectFree(_list);
1670                 }
1671
1672                 if (!param) {
1673                         osrfLogDebug( OSRF_LOG_MARK,
1674                                         "Object not found in the database with primary key %s of %s",
1675                                         pkey, pkey_value );
1676
1677                         growing_buffer* msg = buffer_init(128);
1678                         buffer_fadd(
1679                                 msg,
1680                                 "%s: no object found with primary key %s of %s",
1681                                 modulename,
1682                                 pkey,
1683                                 pkey_value
1684                         );
1685
1686                         char* m = buffer_release(msg);
1687                         osrfAppSessionStatus(
1688                                 ctx->session,
1689                                 OSRF_STATUS_INTERNALSERVERERROR,
1690                                 "osrfMethodException",
1691                                 ctx->request,
1692                                 m
1693                         );
1694
1695                         free(m);
1696                         if (pkey_value) free(pkey_value);
1697
1698                         return 0;
1699                 }
1700
1701                 if (local_context->size > 0) {
1702                         osrfLogDebug( OSRF_LOG_MARK, "%d class-local context field(s) specified",
1703                                         local_context->size);
1704                         int i = 0;
1705                         const char* lcontext = NULL;
1706                         while ( (lcontext = osrfStringArrayGetString(local_context, i++)) ) {
1707                                 osrfStringArrayAdd( context_org_array, oilsFMGetString( param, lcontext ) );
1708                                 osrfLogDebug(
1709                                         OSRF_LOG_MARK,
1710                                         "adding class-local field %s (value: %s) to the context org list",
1711                                         lcontext,
1712                                         osrfStringArrayGetString(context_org_array, context_org_array->size - 1)
1713                                 );
1714                         }
1715                 }
1716
1717                 if (foreign_context) {
1718                         unsigned long class_count = osrfHashGetCount( foreign_context );
1719                         osrfLogDebug( OSRF_LOG_MARK, "%d foreign context classes(s) specified", class_count);
1720
1721                         if (class_count > 0) {
1722
1723                                 osrfHash* fcontext = NULL;
1724                                 osrfHashIterator* class_itr = osrfNewHashIterator( foreign_context );
1725                                 while( (fcontext = osrfHashIteratorNext( class_itr ) ) ) {
1726                                         const char* class_name = osrfHashIteratorKey( class_itr );
1727                                         osrfHash* fcontext = osrfHashGet(foreign_context, class_name);
1728
1729                                         osrfLogDebug(
1730                                                 OSRF_LOG_MARK,
1731                                                 "%d foreign context fields(s) specified for class %s",
1732                                                 ((osrfStringArray*)osrfHashGet(fcontext,"context"))->size,
1733                                                 class_name
1734                                         );
1735
1736                                         char* foreign_pkey = osrfHashGet(fcontext, "field");
1737                                         char* foreign_pkey_value =
1738                                                         oilsFMGetString(param, osrfHashGet(fcontext, "fkey"));
1739
1740                                         jsonObject* _tmp_params = single_hash( foreign_pkey, foreign_pkey_value );
1741
1742                                         jsonObject* _list = doFieldmapperSearch(
1743                                                 ctx, osrfHashGet( oilsIDL(), class_name ), _tmp_params, NULL, &err );
1744
1745                                         jsonObject* _fparam = jsonObjectClone(jsonObjectGetIndex(_list, 0));
1746                                         jsonObjectFree(_tmp_params);
1747                                         jsonObjectFree(_list);
1748
1749                                         osrfStringArray* jump_list = osrfHashGet(fcontext, "jump");
1750
1751                                         if (_fparam && jump_list) {
1752                                                 const char* flink = NULL;
1753                                                 int k = 0;
1754                                                 while ( (flink = osrfStringArrayGetString(jump_list, k++)) && _fparam ) {
1755                                                         free(foreign_pkey_value);
1756
1757                                                         osrfHash* foreign_link_hash =
1758                                                                         oilsIDLFindPath( "/%s/links/%s", _fparam->classname, flink );
1759
1760                                                         foreign_pkey_value = oilsFMGetString(_fparam, flink);
1761                                                         foreign_pkey = osrfHashGet( foreign_link_hash, "key" );
1762
1763                                                         _tmp_params = single_hash( foreign_pkey, foreign_pkey_value );
1764
1765                                                         _list = doFieldmapperSearch(
1766                                                                 ctx,
1767                                                                 osrfHashGet( oilsIDL(),
1768                                                                                 osrfHashGet( foreign_link_hash, "class" ) ),
1769                                                                 _tmp_params,
1770                                                                 NULL,
1771                                                                 &err
1772                                                         );
1773
1774                                                         _fparam = jsonObjectClone(jsonObjectGetIndex(_list, 0));
1775                                                         jsonObjectFree(_tmp_params);
1776                                                         jsonObjectFree(_list);
1777                                                 }
1778                                         }
1779
1780                                         if (!_fparam) {
1781
1782                                                 growing_buffer* msg = buffer_init(128);
1783                                                 buffer_fadd(
1784                                                         msg,
1785                                                         "%s: no object found with primary key %s of %s",
1786                                                         modulename,
1787                                                         foreign_pkey,
1788                                                         foreign_pkey_value
1789                                                 );
1790
1791                                                 char* m = buffer_release(msg);
1792                                                 osrfAppSessionStatus(
1793                                                         ctx->session,
1794                                                         OSRF_STATUS_INTERNALSERVERERROR,
1795                                                         "osrfMethodException",
1796                                                         ctx->request,
1797                                                         m
1798                                                 );
1799
1800                                                 free(m);
1801                                                 osrfHashIteratorFree(class_itr);
1802                                                 free(foreign_pkey_value);
1803                                                 jsonObjectFree(param);
1804
1805                                                 return 0;
1806                                         }
1807
1808                                         free(foreign_pkey_value);
1809
1810                                         int j = 0;
1811                                         const char* foreign_field = NULL;
1812                                         while ( (foreign_field = osrfStringArrayGetString(
1813                                                          osrfHashGet(fcontext,"context" ), j++)) ) {
1814                                                 osrfStringArrayAdd( context_org_array,
1815                                                                 oilsFMGetString( _fparam, foreign_field ) );
1816                                                 osrfLogDebug(
1817                                                         OSRF_LOG_MARK,
1818                                                         "adding foreign class %s field %s (value: %s) to the context org list",
1819                                                         class_name,
1820                                                         foreign_field,
1821                                                         osrfStringArrayGetString(
1822                                                                         context_org_array, context_org_array->size - 1)
1823                                                 );
1824                                         }
1825
1826                                         jsonObjectFree(_fparam);
1827                                 }
1828
1829                                 osrfHashIteratorFree( class_itr );
1830                         }
1831                 }
1832
1833                 jsonObjectFree(param);
1834         }
1835
1836         const char* context_org = NULL;
1837         const char* perm = NULL;
1838         int OK = 0;
1839
1840         if (permission->size == 0) {
1841             osrfLogDebug( OSRF_LOG_MARK, "No permission specified for this action, passing through" );
1842                 OK = 1;
1843         }
1844
1845         int i = 0;
1846         while ( (perm = osrfStringArrayGetString(permission, i++)) ) {
1847                 int j = 0;
1848                 while ( (context_org = osrfStringArrayGetString(context_org_array, j++)) ) {
1849                         dbi_result result;
1850
1851                         if (pkey_value) {
1852                                 osrfLogDebug(
1853                                         OSRF_LOG_MARK,
1854                                         "Checking object permission [%s] for user %d "
1855                                                         "on object %s (class %s) at org %d",
1856                                         perm,
1857                                         userid,
1858                                         pkey_value,
1859                                         osrfHashGet(class, "classname"),
1860                                         atoi(context_org)
1861                                 );
1862
1863                                 result = dbi_conn_queryf(
1864                                         writehandle,
1865                                         "SELECT permission.usr_has_object_perm(%d, '%s', '%s', '%s', %d) AS has_perm;",
1866                                         userid,
1867                                         perm,
1868                                         osrfHashGet(class, "classname"),
1869                                         pkey_value,
1870                                         atoi(context_org)
1871                                 );
1872
1873                                 if (result) {
1874                                         osrfLogDebug(
1875                                                 OSRF_LOG_MARK,
1876                                                 "Received a result for object permission [%s] "
1877                                                                 "for user %d on object %s (class %s) at org %d",
1878                                                 perm,
1879                                                 userid,
1880                                                 pkey_value,
1881                                                 osrfHashGet(class, "classname"),
1882                                                 atoi(context_org)
1883                                         );
1884
1885                                         if (dbi_result_first_row(result)) {
1886                                                 jsonObject* return_val = oilsMakeJSONFromResult( result );
1887                                                 const char* has_perm = jsonObjectGetString(
1888                                                                 jsonObjectGetKeyConst(return_val, "has_perm") );
1889
1890                                                 osrfLogDebug(
1891                                                         OSRF_LOG_MARK,
1892                                                         "Status of object permission [%s] for user %d "
1893                                                                         "on object %s (class %s) at org %d is %s",
1894                                                         perm,
1895                                                         userid,
1896                                                         pkey_value,
1897                                                         osrfHashGet(class, "classname"),
1898                                                         atoi(context_org),
1899                                                         has_perm
1900                                                 );
1901
1902                                                 if ( *has_perm == 't' ) OK = 1;
1903                                                 jsonObjectFree(return_val);
1904                                         }
1905
1906                                         dbi_result_free(result);
1907                                         if (OK)
1908                                                 break;
1909                                 }
1910                         }
1911
1912                         osrfLogDebug( OSRF_LOG_MARK,
1913                                         "Checking non-object permission [%s] for user %d at org %d",
1914                                         perm, userid, atoi(context_org) );
1915                         result = dbi_conn_queryf(
1916                                 writehandle,
1917                                 "SELECT permission.usr_has_perm(%d, '%s', %d) AS has_perm;",
1918                                 userid,
1919                                 perm,
1920                                 atoi(context_org)
1921                         );
1922
1923                         if (result) {
1924                                 osrfLogDebug( OSRF_LOG_MARK,
1925                                                 "Received a result for permission [%s] for user %d at org %d",
1926                                                 perm, userid, atoi(context_org) );
1927                                 if ( dbi_result_first_row(result) ) {
1928                                         jsonObject* return_val = oilsMakeJSONFromResult( result );
1929                                         const char* has_perm = jsonObjectGetString(
1930                                                         jsonObjectGetKeyConst(return_val, "has_perm") );
1931                                         osrfLogDebug( OSRF_LOG_MARK,
1932                                                         "Status of permission [%s] for user %d at org %d is [%s]",
1933                                                         perm, userid, atoi(context_org), has_perm );
1934                                         if ( *has_perm == 't' )
1935                                                 OK = 1;
1936                                         jsonObjectFree(return_val);
1937                                 }
1938
1939                                 dbi_result_free(result);
1940                                 if (OK) break;
1941                         }
1942
1943                 }
1944                 if (OK)
1945                         break;
1946         }
1947
1948         if (pkey_value) free(pkey_value);
1949         osrfStringArrayFree(context_org_array);
1950
1951         return OK;
1952 }
1953
1954 /**
1955         @brief Look up the root of the org_unit tree.
1956         @param ctx Pointer to the method context.
1957         @return The id of the root org unit, as a character string.
1958
1959         Query actor.org_unit where parent_ou is null, and return the id as a string.
1960
1961         This function assumes that there is only one root org unit, i.e. that we
1962         have a single tree, not a forest.
1963
1964         The calling code is responsible for freeing the returned string.
1965 */
1966 static char* org_tree_root( osrfMethodContext* ctx ) {
1967
1968         static char cached_root_id[ 32 ] = "";  // extravagantly large buffer
1969         static time_t last_lookup_time = 0;
1970         time_t current_time = time( NULL );
1971
1972         if( cached_root_id[ 0 ] && ( current_time - last_lookup_time < 3600 ) ) {
1973                 // We successfully looked this up less than an hour ago.
1974                 // It's not likely to have changed since then.
1975                 return strdup( cached_root_id );
1976         }
1977         last_lookup_time = current_time;
1978
1979         int err = 0;
1980         jsonObject* where_clause = single_hash( "parent_ou", NULL );
1981         jsonObject* result = doFieldmapperSearch(
1982                 ctx, osrfHashGet( oilsIDL(), "aou" ), where_clause, NULL, &err );
1983         jsonObjectFree( where_clause );
1984
1985         jsonObject* tree_top = jsonObjectGetIndex( result, 0 );
1986
1987         if (! tree_top) {
1988                 jsonObjectFree( result );
1989
1990                 growing_buffer* msg = buffer_init(128);
1991                 OSRF_BUFFER_ADD( msg, modulename );
1992                 OSRF_BUFFER_ADD( msg,
1993                                 ": Internal error, could not find the top of the org tree (parent_ou = NULL)" );
1994
1995                 char* m = buffer_release(msg);
1996                 osrfAppSessionStatus( ctx->session,
1997                                 OSRF_STATUS_INTERNALSERVERERROR, "osrfMethodException", ctx->request, m );
1998                 free(m);
1999
2000                 cached_root_id[ 0 ] = '\0';
2001                 return NULL;
2002         }
2003
2004         char* root_org_unit_id = oilsFMGetString( tree_top, "id" );
2005         osrfLogDebug( OSRF_LOG_MARK, "Top of the org tree is %s", root_org_unit_id );
2006
2007         jsonObjectFree( result );
2008
2009         strcpy( cached_root_id, root_org_unit_id );
2010         return root_org_unit_id;
2011 }
2012
2013 /**
2014         @brief Create a JSON_HASH with a single key/value pair.
2015         @param key The key of the key/value pair.
2016         @param value the value of the key/value pair.
2017         @return Pointer to a newly created jsonObject of type JSON_HASH.
2018
2019         The value of the key/value is either a string or (if @a value is NULL) a null.
2020 */
2021 static jsonObject* single_hash( const char* key, const char* value ) {
2022         // Sanity check
2023         if( ! key ) key = "";
2024
2025         jsonObject* hash = jsonNewObjectType( JSON_HASH );
2026         jsonObjectSetKey( hash, key, jsonNewObject( value ) );
2027         return hash;
2028 }
2029
2030
2031 static int doCreate( osrfMethodContext* ctx ) {
2032         if(osrfMethodVerifyContext( ctx )) {
2033                 osrfLogError( OSRF_LOG_MARK, "Invalid method context" );
2034                 return -1;
2035         }
2036
2037         if( enforce_pcrud )
2038                 timeout_needs_resetting = 1;
2039
2040         osrfHash* meta = osrfHashGet( (osrfHash*) ctx->method->userData, "class" );
2041         jsonObject* target = NULL;
2042         jsonObject* options = NULL;
2043
2044         if( enforce_pcrud ) {
2045                 target = jsonObjectGetIndex( ctx->params, 1 );
2046                 options = jsonObjectGetIndex( ctx->params, 2 );
2047         } else {
2048                 target = jsonObjectGetIndex( ctx->params, 0 );
2049                 options = jsonObjectGetIndex( ctx->params, 1 );
2050         }
2051
2052         if ( !verifyObjectClass( ctx, target )) {
2053                 osrfAppRespondComplete( ctx, NULL );
2054                 return -1;
2055         }
2056
2057         osrfLogDebug( OSRF_LOG_MARK, "Object seems to be of the correct type" );
2058
2059         const char* trans_id = getXactId( ctx );
2060         if ( !trans_id ) {
2061                 osrfLogError( OSRF_LOG_MARK, "No active transaction -- required for CREATE" );
2062
2063                 osrfAppSessionStatus(
2064                         ctx->session,
2065                         OSRF_STATUS_BADREQUEST,
2066                         "osrfMethodException",
2067                         ctx->request,
2068                         "No active transaction -- required for CREATE"
2069                 );
2070                 osrfAppRespondComplete( ctx, NULL );
2071                 return -1;
2072         }
2073
2074         // The following test is harmless but redundant.  If a class is
2075         // readonly, we don't register a create method for it.
2076         if( str_is_true( osrfHashGet( meta, "readonly" ) ) ) {
2077                 osrfAppSessionStatus(
2078                         ctx->session,
2079                         OSRF_STATUS_BADREQUEST,
2080                         "osrfMethodException",
2081                         ctx->request,
2082                         "Cannot INSERT readonly class"
2083                 );
2084                 osrfAppRespondComplete( ctx, NULL );
2085                 return -1;
2086         }
2087
2088         // Set the last_xact_id
2089         int index = oilsIDL_ntop( target->classname, "last_xact_id" );
2090         if (index > -1) {
2091                 osrfLogDebug(OSRF_LOG_MARK, "Setting last_xact_id to %s on %s at position %d",
2092                         trans_id, target->classname, index);
2093                 jsonObjectSetIndex(target, index, jsonNewObject(trans_id));
2094         }
2095
2096         osrfLogDebug( OSRF_LOG_MARK, "There is a transaction running..." );
2097
2098         dbhandle = writehandle;
2099
2100         osrfHash* fields = osrfHashGet(meta, "fields");
2101         char* pkey = osrfHashGet(meta, "primarykey");
2102         char* seq = osrfHashGet(meta, "sequence");
2103
2104         growing_buffer* table_buf = buffer_init(128);
2105         growing_buffer* col_buf = buffer_init(128);
2106         growing_buffer* val_buf = buffer_init(128);
2107
2108         OSRF_BUFFER_ADD(table_buf, "INSERT INTO ");
2109         OSRF_BUFFER_ADD(table_buf, osrfHashGet(meta, "tablename"));
2110         OSRF_BUFFER_ADD_CHAR( col_buf, '(' );
2111         buffer_add(val_buf,"VALUES (");
2112
2113
2114         int first = 1;
2115         osrfHash* field = NULL;
2116         osrfHashIterator* field_itr = osrfNewHashIterator( fields );
2117         while( (field = osrfHashIteratorNext( field_itr ) ) ) {
2118
2119                 const char* field_name = osrfHashIteratorKey( field_itr );
2120
2121                 if( str_is_true( osrfHashGet( field, "virtual" ) ) )
2122                         continue;
2123
2124                 const jsonObject* field_object = oilsFMGetObject( target, field_name );
2125
2126                 char* value;
2127                 if (field_object && field_object->classname) {
2128                         value = oilsFMGetString(
2129                                 field_object,
2130                                 (char*)oilsIDLFindPath("/%s/primarykey", field_object->classname)
2131                         );
2132                 } else if( field_object && JSON_BOOL == field_object->type ) {
2133                         if( jsonBoolIsTrue( field_object ) )
2134                                 value = strdup( "t" );
2135                         else
2136                                 value = strdup( "f" );
2137                 } else {
2138                         value = jsonObjectToSimpleString( field_object );
2139                 }
2140
2141                 if (first) {
2142                         first = 0;
2143                 } else {
2144                         OSRF_BUFFER_ADD_CHAR( col_buf, ',' );
2145                         OSRF_BUFFER_ADD_CHAR( val_buf, ',' );
2146                 }
2147
2148                 buffer_add(col_buf, field_name);
2149
2150                 if (!field_object || field_object->type == JSON_NULL) {
2151                         buffer_add( val_buf, "DEFAULT" );
2152
2153                 } else if ( !strcmp(get_primitive( field ), "number") ) {
2154                         const char* numtype = get_datatype( field );
2155                         if ( !strcmp( numtype, "INT8") ) {
2156                                 buffer_fadd( val_buf, "%lld", atoll(value) );
2157
2158                         } else if ( !strcmp( numtype, "INT") ) {
2159                                 buffer_fadd( val_buf, "%d", atoi(value) );
2160
2161                         } else if ( !strcmp( numtype, "NUMERIC") ) {
2162                                 buffer_fadd( val_buf, "%f", atof(value) );
2163                         }
2164                 } else {
2165                         if ( dbi_conn_quote_string(writehandle, &value) ) {
2166                                 OSRF_BUFFER_ADD( val_buf, value );
2167
2168                         } else {
2169                                 osrfLogError(OSRF_LOG_MARK, "%s: Error quoting string [%s]", modulename, value);
2170                                 osrfAppSessionStatus(
2171                                         ctx->session,
2172                                         OSRF_STATUS_INTERNALSERVERERROR,
2173                                         "osrfMethodException",
2174                                         ctx->request,
2175                                         "Error quoting string -- please see the error log for more details"
2176                                 );
2177                                 free(value);
2178                                 buffer_free(table_buf);
2179                                 buffer_free(col_buf);
2180                                 buffer_free(val_buf);
2181                                 osrfAppRespondComplete( ctx, NULL );
2182                                 return -1;
2183                         }
2184                 }
2185
2186                 free(value);
2187
2188         }
2189
2190         osrfHashIteratorFree( field_itr );
2191
2192         OSRF_BUFFER_ADD_CHAR( col_buf, ')' );
2193         OSRF_BUFFER_ADD_CHAR( val_buf, ')' );
2194
2195         char* table_str = buffer_release(table_buf);
2196         char* col_str   = buffer_release(col_buf);
2197         char* val_str   = buffer_release(val_buf);
2198         growing_buffer* sql = buffer_init(128);
2199         buffer_fadd( sql, "%s %s %s;", table_str, col_str, val_str );
2200         free(table_str);
2201         free(col_str);
2202         free(val_str);
2203
2204         char* query = buffer_release(sql);
2205
2206         osrfLogDebug(OSRF_LOG_MARK, "%s: Insert SQL [%s]", modulename, query);
2207
2208         jsonObject* obj = NULL;
2209         int rc = 0;
2210
2211         dbi_result result = dbi_conn_query( writehandle, query );
2212         if (!result) {
2213                 obj = jsonNewObject(NULL);
2214                 osrfLogError(
2215                         OSRF_LOG_MARK,
2216                         "%s ERROR inserting %s object using query [%s]",
2217                         modulename,
2218                         osrfHashGet(meta, "fieldmapper"),
2219                         query
2220                 );
2221                 osrfAppSessionStatus(
2222                         ctx->session,
2223                         OSRF_STATUS_INTERNALSERVERERROR,
2224                         "osrfMethodException",
2225                         ctx->request,
2226                         "INSERT error -- please see the error log for more details"
2227                 );
2228                 rc = -1;
2229         } else {
2230
2231                 char* id = oilsFMGetString(target, pkey);
2232                 if (!id) {
2233                         unsigned long long new_id = dbi_conn_sequence_last(writehandle, seq);
2234                         growing_buffer* _id = buffer_init(10);
2235                         buffer_fadd(_id, "%lld", new_id);
2236                         id = buffer_release(_id);
2237                 }
2238
2239                 // Find quietness specification, if present
2240                 const char* quiet_str = NULL;
2241                 if ( options ) {
2242                         const jsonObject* quiet_obj = jsonObjectGetKeyConst( options, "quiet" );
2243                         if( quiet_obj )
2244                                 quiet_str = jsonObjectGetString( quiet_obj );
2245                 }
2246
2247                 if( str_is_true( quiet_str ) ) {  // if quietness is specified
2248                         obj = jsonNewObject(id);
2249                 }
2250                 else {
2251
2252                         // Fetch the row that we just inserted, so that we can return it to the client
2253                         jsonObject* where_clause = jsonNewObjectType( JSON_HASH );
2254                         jsonObjectSetKey( where_clause, pkey, jsonNewObject(id) );
2255
2256                         int err = 0;
2257                         jsonObject* list = doFieldmapperSearch( ctx, meta, where_clause, NULL, &err );
2258                         if( err )
2259                                 rc = -1;
2260                         else
2261                                 obj = jsonObjectClone( jsonObjectGetIndex( list, 0 ));
2262
2263                         jsonObjectFree( list );
2264                         jsonObjectFree( where_clause );
2265                 }
2266
2267                 free(id);
2268         }
2269
2270         free(query);
2271         osrfAppRespondComplete( ctx, obj );
2272         jsonObjectFree( obj );
2273         return rc;
2274 }
2275
2276 /**
2277         @brief Implement the retrieve method.
2278         @param ctx Pointer to the method context.
2279         @param err Pointer through which to return an error code.
2280         @return If successful, a pointer to the result to be returned to the client;
2281         otherwise NULL.
2282
2283         From the method's class, fetch a row with a specified value in the primary key.  This
2284         method relies on the database design convention that a primary key consists of a single
2285         column.
2286
2287         Method parameters:
2288         - authkey (PCRUD only)
2289         - value of the primary key for the desired row, for building the WHERE clause
2290         - a JSON_HASH containing any other SQL clauses: select, join, etc.
2291
2292         Return to client: One row from the query.
2293 */
2294 static int doRetrieve(osrfMethodContext* ctx ) {
2295         if(osrfMethodVerifyContext( ctx )) {
2296                 osrfLogError( OSRF_LOG_MARK, "Invalid method context" );
2297                 return -1;
2298         }
2299
2300         if( enforce_pcrud )
2301                 timeout_needs_resetting = 1;
2302
2303         int id_pos = 0;
2304         int order_pos = 1;
2305
2306         if( enforce_pcrud ) {
2307                 id_pos = 1;
2308                 order_pos = 2;
2309         }
2310
2311         // Get the class metadata
2312         osrfHash* class_def = osrfHashGet( (osrfHash*) ctx->method->userData, "class" );
2313
2314         // Get the value of the primary key, from a method parameter
2315         const jsonObject* id_obj = jsonObjectGetIndex(ctx->params, id_pos);
2316
2317         osrfLogDebug(
2318                 OSRF_LOG_MARK,
2319                 "%s retrieving %s object with primary key value of %s",
2320                 modulename,
2321                 osrfHashGet( class_def, "fieldmapper" ),
2322                 jsonObjectGetString( id_obj )
2323         );
2324
2325         // Build a WHERE clause based on the key value
2326         jsonObject* where_clause = jsonNewObjectType( JSON_HASH );
2327         jsonObjectSetKey(
2328                 where_clause,
2329                 osrfHashGet( class_def, "primarykey" ),  // name of key column
2330                 jsonObjectClone( id_obj )                // value of key column
2331         );
2332
2333         jsonObject* rest_of_query = jsonObjectGetIndex(ctx->params, order_pos);
2334
2335         // Do the query
2336         int err = 0;
2337         jsonObject* list = doFieldmapperSearch( ctx, class_def, where_clause, rest_of_query, &err );
2338
2339         jsonObjectFree( where_clause );
2340         if( err ) {
2341                 osrfAppRespondComplete( ctx, NULL );
2342                 return -1;
2343         }
2344
2345         jsonObject* obj = jsonObjectExtractIndex( list, 0 );
2346         jsonObjectFree( list );
2347
2348         if( enforce_pcrud ) {
2349                 if(!verifyObjectPCRUD(ctx, obj)) {
2350                         jsonObjectFree(obj);
2351
2352                         growing_buffer* msg = buffer_init(128);
2353                         OSRF_BUFFER_ADD( msg, modulename );
2354                         OSRF_BUFFER_ADD( msg, ": Insufficient permissions to retrieve object" );
2355
2356                         char* m = buffer_release(msg);
2357                         osrfAppSessionStatus( ctx->session, OSRF_STATUS_NOTALLOWED, "osrfMethodException",
2358                                         ctx->request, m );
2359                         free(m);
2360
2361                         osrfAppRespondComplete( ctx, NULL );
2362                         return -1;
2363                 }
2364         }
2365
2366         osrfAppRespondComplete( ctx, obj );
2367         jsonObjectFree( obj );
2368         return 0;
2369 }
2370
2371 static char* jsonNumberToDBString ( osrfHash* field, const jsonObject* value ) {
2372         growing_buffer* val_buf = buffer_init(32);
2373         const char* numtype = get_datatype( field );
2374
2375         if ( !strncmp( numtype, "INT", 3 ) ) {
2376                 if (value->type == JSON_NUMBER)
2377                         //buffer_fadd( val_buf, "%ld", (long)jsonObjectGetNumber(value) );
2378                         buffer_fadd( val_buf, jsonObjectGetString( value ) );
2379                 else {
2380                         buffer_fadd( val_buf, jsonObjectGetString( value ) );
2381                 }
2382
2383         } else if ( !strcmp( numtype, "NUMERIC" ) ) {
2384                 if (value->type == JSON_NUMBER)
2385                         buffer_fadd( val_buf, jsonObjectGetString( value ) );
2386                 else {
2387                         buffer_fadd( val_buf, jsonObjectGetString( value ) );
2388                 }
2389
2390         } else {
2391                 // Presumably this was really intended ot be a string, so quote it
2392                 char* str = jsonObjectToSimpleString( value );
2393                 if ( dbi_conn_quote_string(dbhandle, &str) ) {
2394                         OSRF_BUFFER_ADD( val_buf, str );
2395                         free(str);
2396                 } else {
2397                         osrfLogError(OSRF_LOG_MARK, "%s: Error quoting key string [%s]", modulename, str);
2398                         free(str);
2399                         buffer_free(val_buf);
2400                         return NULL;
2401                 }
2402         }
2403
2404         return buffer_release(val_buf);
2405 }
2406
2407 static char* searchINPredicate (const char* class_alias, osrfHash* field,
2408                 jsonObject* node, const char* op, osrfMethodContext* ctx ) {
2409         growing_buffer* sql_buf = buffer_init(32);
2410
2411         buffer_fadd(
2412                 sql_buf,
2413                 "\"%s\".%s ",
2414                 class_alias,
2415                 osrfHashGet(field, "name")
2416         );
2417
2418         if (!op) {
2419                 buffer_add(sql_buf, "IN (");
2420         } else if (!(strcasecmp(op,"not in"))) {
2421                 buffer_add(sql_buf, "NOT IN (");
2422         } else {
2423                 buffer_add(sql_buf, "IN (");
2424         }
2425
2426         if (node->type == JSON_HASH) {
2427                 // subquery predicate
2428                 char* subpred = buildQuery( ctx, node, SUBSELECT );
2429                 if( ! subpred ) {
2430                         buffer_free( sql_buf );
2431                         return NULL;
2432                 }
2433
2434                 buffer_add(sql_buf, subpred);
2435                 free(subpred);
2436
2437         } else if (node->type == JSON_ARRAY) {
2438                 // literal value list
2439                 int in_item_index = 0;
2440                 int in_item_first = 1;
2441                 const jsonObject* in_item;
2442                 while ( (in_item = jsonObjectGetIndex(node, in_item_index++)) ) {
2443
2444                         if (in_item_first)
2445                                 in_item_first = 0;
2446                         else
2447                                 buffer_add(sql_buf, ", ");
2448
2449                         // Sanity check
2450                         if ( in_item->type != JSON_STRING && in_item->type != JSON_NUMBER ) {
2451                                 osrfLogError( OSRF_LOG_MARK,
2452                                                 "%s: Expected string or number within IN list; found %s",
2453                                                 modulename, json_type( in_item->type ) );
2454                                 buffer_free(sql_buf);
2455                                 return NULL;
2456                         }
2457
2458                         // Append the literal value -- quoted if not a number
2459                         if ( JSON_NUMBER == in_item->type ) {
2460                                 char* val = jsonNumberToDBString( field, in_item );
2461                                 OSRF_BUFFER_ADD( sql_buf, val );
2462                                 free(val);
2463
2464                         } else if ( !strcmp( get_primitive( field ), "number") ) {
2465                                 char* val = jsonNumberToDBString( field, in_item );
2466                                 OSRF_BUFFER_ADD( sql_buf, val );
2467                                 free(val);
2468
2469                         } else {
2470                                 char* key_string = jsonObjectToSimpleString(in_item);
2471                                 if ( dbi_conn_quote_string(dbhandle, &key_string) ) {
2472                                         OSRF_BUFFER_ADD( sql_buf, key_string );
2473                                         free(key_string);
2474                                 } else {
2475                                         osrfLogError(OSRF_LOG_MARK,
2476                                                         "%s: Error quoting key string [%s]", modulename, key_string);
2477                                         free(key_string);
2478                                         buffer_free(sql_buf);
2479                                         return NULL;
2480                                 }
2481                         }
2482                 }
2483
2484                 if( in_item_first ) {
2485                         osrfLogError(OSRF_LOG_MARK, "%s: Empty IN list", modulename );
2486                         buffer_free( sql_buf );
2487                         return NULL;
2488                 }
2489         } else {
2490                 osrfLogError(OSRF_LOG_MARK, "%s: Expected object or array for IN clause; found %s",
2491                         modulename, json_type( node->type ) );
2492                 buffer_free(sql_buf);
2493                 return NULL;
2494         }
2495
2496         OSRF_BUFFER_ADD_CHAR( sql_buf, ')' );
2497
2498         return buffer_release(sql_buf);
2499 }
2500
2501 // Receive a JSON_ARRAY representing a function call.  The first
2502 // entry in the array is the function name.  The rest are parameters.
2503 static char* searchValueTransform( const jsonObject* array ) {
2504
2505         if( array->size < 1 ) {
2506                 osrfLogError( OSRF_LOG_MARK, "%s: Empty array for value transform", modulename );
2507                 return NULL;
2508         }
2509
2510         // Get the function name
2511         jsonObject* func_item = jsonObjectGetIndex( array, 0 );
2512         if( func_item->type != JSON_STRING ) {
2513                 osrfLogError(OSRF_LOG_MARK, "%s: Error: expected function name, found %s",
2514                         modulename, json_type( func_item->type ) );
2515                 return NULL;
2516         }
2517
2518         growing_buffer* sql_buf = buffer_init(32);
2519
2520         OSRF_BUFFER_ADD( sql_buf, jsonObjectGetString( func_item ) );
2521         OSRF_BUFFER_ADD( sql_buf, "( " );
2522
2523         // Get the parameters
2524         int func_item_index = 1;   // We already grabbed the zeroth entry
2525         while ( (func_item = jsonObjectGetIndex(array, func_item_index++)) ) {
2526
2527                 // Add a separator comma, if we need one
2528                 if( func_item_index > 2 )
2529                         buffer_add( sql_buf, ", " );
2530
2531                 // Add the current parameter
2532                 if (func_item->type == JSON_NULL) {
2533                         buffer_add( sql_buf, "NULL" );
2534                 } else {
2535                         char* val = jsonObjectToSimpleString(func_item);
2536                         if ( dbi_conn_quote_string(dbhandle, &val) ) {
2537                                 OSRF_BUFFER_ADD( sql_buf, val );
2538                                 free(val);
2539                         } else {
2540                                 osrfLogError(OSRF_LOG_MARK, "%s: Error quoting key string [%s]", modulename, val);
2541                                 buffer_free(sql_buf);
2542                                 free(val);
2543                                 return NULL;
2544                         }
2545                 }
2546         }
2547
2548         buffer_add( sql_buf, " )" );
2549
2550         return buffer_release(sql_buf);
2551 }
2552
2553 static char* searchFunctionPredicate (const char* class_alias, osrfHash* field,
2554                 const jsonObject* node, const char* op) {
2555
2556         if( ! is_good_operator( op ) ) {
2557                 osrfLogError( OSRF_LOG_MARK, "%s: Invalid operator [%s]", modulename, op );
2558                 return NULL;
2559         }
2560
2561         char* val = searchValueTransform(node);
2562         if( !val )
2563                 return NULL;
2564
2565         growing_buffer* sql_buf = buffer_init(32);
2566         buffer_fadd(
2567                 sql_buf,
2568                 "\"%s\".%s %s %s",
2569                 class_alias,
2570                 osrfHashGet(field, "name"),
2571                 op,
2572                 val
2573         );
2574
2575         free(val);
2576
2577         return buffer_release(sql_buf);
2578 }
2579
2580 // class_alias is a class name or other table alias
2581 // field is a field definition as stored in the IDL
2582 // node comes from the method parameter, and may represent an entry in the SELECT list
2583 static char* searchFieldTransform (const char* class_alias, osrfHash* field, const jsonObject* node) {
2584         growing_buffer* sql_buf = buffer_init(32);
2585
2586         const char* field_transform = jsonObjectGetString(
2587                 jsonObjectGetKeyConst( node, "transform" ) );
2588         const char* transform_subcolumn = jsonObjectGetString(
2589                 jsonObjectGetKeyConst( node, "result_field" ) );
2590
2591         if(transform_subcolumn) {
2592                 if( ! is_identifier( transform_subcolumn ) ) {
2593                         osrfLogError( OSRF_LOG_MARK, "%s: Invalid subfield name: \"%s\"\n",
2594                                         modulename, transform_subcolumn );
2595                         buffer_free( sql_buf );
2596                         return NULL;
2597                 }
2598                 OSRF_BUFFER_ADD_CHAR( sql_buf, '(' );    // enclose transform in parentheses
2599         }
2600
2601         if (field_transform) {
2602
2603                 if( ! is_identifier( field_transform ) ) {
2604                         osrfLogError( OSRF_LOG_MARK, "%s: Expected function name, found \"%s\"\n",
2605                                         modulename, field_transform );
2606                         buffer_free( sql_buf );
2607                         return NULL;
2608                 }
2609
2610                 if( obj_is_true( jsonObjectGetKeyConst( node, "distinct" ) ) ) {
2611                         buffer_fadd( sql_buf, "%s(DISTINCT \"%s\".%s",
2612                                 field_transform, class_alias, osrfHashGet(field, "name"));
2613                 } else {
2614                         buffer_fadd( sql_buf, "%s(\"%s\".%s",
2615                                 field_transform, class_alias, osrfHashGet(field, "name"));
2616                 }
2617
2618                 const jsonObject* array = jsonObjectGetKeyConst( node, "params" );
2619
2620                 if (array) {
2621                         if( array->type != JSON_ARRAY ) {
2622                                 osrfLogError( OSRF_LOG_MARK,
2623                                         "%s: Expected JSON_ARRAY for function params; found %s",
2624                                         modulename, json_type( array->type ) );
2625                                 buffer_free( sql_buf );
2626                                 return NULL;
2627                         }
2628                         int func_item_index = 0;
2629                         jsonObject* func_item;
2630                         while ( (func_item = jsonObjectGetIndex(array, func_item_index++)) ) {
2631
2632                                 char* val = jsonObjectToSimpleString(func_item);
2633
2634                                 if ( !val ) {
2635                                         buffer_add( sql_buf, ",NULL" );
2636                                 } else if ( dbi_conn_quote_string(dbhandle, &val) ) {
2637                                         OSRF_BUFFER_ADD_CHAR( sql_buf, ',' );
2638                                         OSRF_BUFFER_ADD( sql_buf, val );
2639                                 } else {
2640                                         osrfLogError( OSRF_LOG_MARK,
2641                                                         "%s: Error quoting key string [%s]", modulename, val);
2642                                         free(val);
2643                                         buffer_free(sql_buf);
2644                                         return NULL;
2645                                 }
2646                                 free(val);
2647                         }
2648                 }
2649
2650                 buffer_add( sql_buf, " )" );
2651
2652         } else {
2653                 buffer_fadd( sql_buf, "\"%s\".%s", class_alias, osrfHashGet(field, "name"));
2654         }
2655
2656         if (transform_subcolumn)
2657                 buffer_fadd( sql_buf, ").\"%s\"", transform_subcolumn );
2658
2659         return buffer_release(sql_buf);
2660 }
2661
2662 static char* searchFieldTransformPredicate( const ClassInfo* class_info, osrfHash* field,
2663                 const jsonObject* node, const char* op ) {
2664
2665         if( ! is_good_operator( op ) ) {
2666                 osrfLogError(OSRF_LOG_MARK, "%s: Error: Invalid operator %s", modulename, op);
2667                 return NULL;
2668         }
2669
2670         char* field_transform = searchFieldTransform( class_info->alias, field, node );
2671         if( ! field_transform )
2672                 return NULL;
2673         char* value = NULL;
2674         int extra_parens = 0;   // boolean
2675
2676         const jsonObject* value_obj = jsonObjectGetKeyConst( node, "value" );
2677         if ( ! value_obj ) {
2678                 value = searchWHERE( node, class_info, AND_OP_JOIN, NULL );
2679                 if( !value ) {
2680                         osrfLogError( OSRF_LOG_MARK, "%s: Error building condition for field transform",
2681                                 modulename );
2682                         free(field_transform);
2683                         return NULL;
2684                 }
2685                 extra_parens = 1;
2686         } else if ( value_obj->type == JSON_ARRAY ) {
2687                 value = searchValueTransform( value_obj );
2688                 if( !value ) {
2689                         osrfLogError( OSRF_LOG_MARK,
2690                                 "%s: Error building value transform for field transform", modulename );
2691                         free( field_transform );
2692                         return NULL;
2693                 }
2694         } else if ( value_obj->type == JSON_HASH ) {
2695                 value = searchWHERE( value_obj, class_info, AND_OP_JOIN, NULL );
2696                 if( !value ) {
2697                         osrfLogError( OSRF_LOG_MARK, "%s: Error building predicate for field transform",
2698                                 modulename );
2699                         free(field_transform);
2700                         return NULL;
2701                 }
2702                 extra_parens = 1;
2703         } else if ( value_obj->type == JSON_NUMBER ) {
2704                 value = jsonNumberToDBString( field, value_obj );
2705         } else if ( value_obj->type == JSON_NULL ) {
2706                 osrfLogError( OSRF_LOG_MARK,
2707                         "%s: Error building predicate for field transform: null value", modulename );
2708                 free(field_transform);
2709                 return NULL;
2710         } else if ( value_obj->type == JSON_BOOL ) {
2711                 osrfLogError( OSRF_LOG_MARK,
2712                         "%s: Error building predicate for field transform: boolean value", modulename );
2713                 free(field_transform);
2714                 return NULL;
2715         } else {
2716                 if ( !strcmp( get_primitive( field ), "number") ) {
2717                         value = jsonNumberToDBString( field, value_obj );
2718                 } else {
2719                         value = jsonObjectToSimpleString( value_obj );
2720                         if ( !dbi_conn_quote_string(dbhandle, &value) ) {
2721                                 osrfLogError( OSRF_LOG_MARK, "%s: Error quoting key string [%s]",
2722                                         modulename, value );
2723                                 free(value);
2724                                 free(field_transform);
2725                                 return NULL;
2726                         }
2727                 }
2728         }
2729
2730         const char* left_parens  = "";
2731         const char* right_parens = "";
2732
2733         if( extra_parens ) {
2734                 left_parens  = "(";
2735                 right_parens = ")";
2736         }
2737
2738         growing_buffer* sql_buf = buffer_init(32);
2739
2740         buffer_fadd(
2741                 sql_buf,
2742                 "%s%s %s %s %s %s%s",
2743                 left_parens,
2744                 field_transform,
2745                 op,
2746                 left_parens,
2747                 value,
2748                 right_parens,
2749                 right_parens
2750         );
2751
2752         free(value);
2753         free(field_transform);
2754
2755         return buffer_release(sql_buf);
2756 }
2757
2758 static char* searchSimplePredicate (const char* op, const char* class_alias,
2759                 osrfHash* field, const jsonObject* node) {
2760
2761         if( ! is_good_operator( op ) ) {
2762                 osrfLogError( OSRF_LOG_MARK, "%s: Invalid operator [%s]", modulename, op );
2763                 return NULL;
2764         }
2765
2766         char* val = NULL;
2767
2768         // Get the value to which we are comparing the specified column
2769         if (node->type != JSON_NULL) {
2770                 if ( node->type == JSON_NUMBER ) {
2771                         val = jsonNumberToDBString( field, node );
2772                 } else if ( !strcmp( get_primitive( field ), "number" ) ) {
2773                         val = jsonNumberToDBString( field, node );
2774                 } else {
2775                         val = jsonObjectToSimpleString(node);
2776                 }
2777         }
2778
2779         if( val ) {
2780                 if( JSON_NUMBER != node->type && strcmp( get_primitive( field ), "number") ) {
2781                         // Value is not numeric; enclose it in quotes
2782                         if ( !dbi_conn_quote_string( dbhandle, &val ) ) {
2783                                 osrfLogError( OSRF_LOG_MARK, "%s: Error quoting key string [%s]",
2784                                         modulename, val );
2785                                 free( val );
2786                                 return NULL;
2787                         }
2788                 }
2789         } else {
2790                 // Compare to a null value
2791                 val = strdup( "NULL" );
2792                 if (strcmp( op, "=" ))
2793                         op = "IS NOT";
2794                 else
2795                         op = "IS";
2796         }
2797
2798         growing_buffer* sql_buf = buffer_init(32);
2799         buffer_fadd( sql_buf, "\"%s\".%s %s %s", class_alias, osrfHashGet(field, "name"), op, val );
2800         char* pred = buffer_release( sql_buf );
2801
2802         free(val);
2803
2804         return pred;
2805 }
2806
2807 static char* searchBETWEENPredicate (const char* class_alias,
2808                 osrfHash* field, const jsonObject* node) {
2809
2810         const jsonObject* x_node = jsonObjectGetIndex( node, 0 );
2811         const jsonObject* y_node = jsonObjectGetIndex( node, 1 );
2812
2813         if( NULL == y_node ) {
2814                 osrfLogError( OSRF_LOG_MARK, "%s: Not enough operands for BETWEEN operator", modulename );
2815                 return NULL;
2816         }
2817         else if( NULL != jsonObjectGetIndex( node, 2 ) ) {
2818                 osrfLogError( OSRF_LOG_MARK, "%s: Too many operands for BETWEEN operator", modulename );
2819                 return NULL;
2820         }
2821
2822         char* x_string;
2823         char* y_string;
2824
2825         if ( !strcmp( get_primitive( field ), "number") ) {
2826                 x_string = jsonNumberToDBString(field, x_node);
2827                 y_string = jsonNumberToDBString(field, y_node);
2828
2829         } else {
2830                 x_string = jsonObjectToSimpleString(x_node);
2831                 y_string = jsonObjectToSimpleString(y_node);
2832                 if ( !(dbi_conn_quote_string(dbhandle, &x_string) && dbi_conn_quote_string(dbhandle, &y_string)) ) {
2833                         osrfLogError( OSRF_LOG_MARK, "%s: Error quoting key strings [%s] and [%s]",
2834                                         modulename, x_string, y_string );
2835                         free(x_string);
2836                         free(y_string);
2837                         return NULL;
2838                 }
2839         }
2840
2841         growing_buffer* sql_buf = buffer_init(32);
2842         buffer_fadd( sql_buf, "\"%s\".%s BETWEEN %s AND %s",
2843                         class_alias, osrfHashGet(field, "name"), x_string, y_string );
2844         free(x_string);
2845         free(y_string);
2846
2847         return buffer_release(sql_buf);
2848 }
2849
2850 static char* searchPredicate ( const ClassInfo* class_info, osrfHash* field,
2851                                                            jsonObject* node, osrfMethodContext* ctx ) {
2852
2853         char* pred = NULL;
2854         if (node->type == JSON_ARRAY) { // equality IN search
2855                 pred = searchINPredicate( class_info->alias, field, node, NULL, ctx );
2856         } else if (node->type == JSON_HASH) { // other search
2857                 jsonIterator* pred_itr = jsonNewIterator( node );
2858                 if( !jsonIteratorHasNext( pred_itr ) ) {
2859                         osrfLogError( OSRF_LOG_MARK, "%s: Empty predicate for field \"%s\"",
2860                                         modulename, osrfHashGet(field, "name" ));
2861                 } else {
2862                         jsonObject* pred_node = jsonIteratorNext( pred_itr );
2863
2864                         // Verify that there are no additional predicates
2865                         if( jsonIteratorHasNext( pred_itr ) ) {
2866                                 osrfLogError( OSRF_LOG_MARK, "%s: Multiple predicates for field \"%s\"",
2867                                                 modulename, osrfHashGet(field, "name" ));
2868                         } else if ( !(strcasecmp( pred_itr->key,"between" )) )
2869                                 pred = searchBETWEENPredicate( class_info->alias, field, pred_node );
2870                         else if ( !(strcasecmp( pred_itr->key,"in" ))
2871                                         || !(strcasecmp( pred_itr->key,"not in" )) )
2872                                 pred = searchINPredicate(
2873                                         class_info->alias, field, pred_node, pred_itr->key, ctx );
2874                         else if ( pred_node->type == JSON_ARRAY )
2875                                 pred = searchFunctionPredicate(
2876                                         class_info->alias, field, pred_node, pred_itr->key );
2877                         else if ( pred_node->type == JSON_HASH )
2878                                 pred = searchFieldTransformPredicate(
2879                                         class_info, field, pred_node, pred_itr->key );
2880                         else
2881                                 pred = searchSimplePredicate( pred_itr->key, class_info->alias, field, pred_node );
2882                 }
2883                 jsonIteratorFree(pred_itr);
2884
2885         } else if (node->type == JSON_NULL) { // IS NULL search
2886                 growing_buffer* _p = buffer_init(64);
2887                 buffer_fadd(
2888                         _p,
2889                         "\"%s\".%s IS NULL",
2890                         class_info->class_name,
2891                         osrfHashGet(field, "name")
2892                 );
2893                 pred = buffer_release(_p);
2894         } else { // equality search
2895                 pred = searchSimplePredicate( "=", class_info->alias, field, node );
2896         }
2897
2898         return pred;
2899
2900 }
2901
2902
2903 /*
2904
2905 join : {
2906         acn : {
2907                 field : record,
2908                 fkey : id
2909                 type : left
2910                 filter_op : or
2911                 filter : { ... },
2912                 join : {
2913                         acp : {
2914                                 field : call_number,
2915                                 fkey : id,
2916                                 filter : { ... },
2917                         },
2918                 },
2919         },
2920         mrd : {
2921                 field : record,
2922                 type : inner
2923                 fkey : id,
2924                 filter : { ... },
2925         }
2926 }
2927
2928 */
2929
2930 static char* searchJOIN ( const jsonObject* join_hash, const ClassInfo* left_info ) {
2931
2932         const jsonObject* working_hash;
2933         jsonObject* freeable_hash = NULL;
2934
2935         if (join_hash->type == JSON_HASH) {
2936                 working_hash = join_hash;
2937         } else if (join_hash->type == JSON_STRING) {
2938                 // turn it into a JSON_HASH by creating a wrapper
2939                 // around a copy of the original
2940                 const char* _tmp = jsonObjectGetString( join_hash );
2941                 freeable_hash = jsonNewObjectType(JSON_HASH);
2942                 jsonObjectSetKey(freeable_hash, _tmp, NULL);
2943                 working_hash = freeable_hash;
2944         } else {
2945                 osrfLogError(
2946                         OSRF_LOG_MARK,
2947                         "%s: JOIN failed; expected JSON object type not found",
2948                         modulename
2949                 );
2950                 return NULL;
2951         }
2952
2953         growing_buffer* join_buf = buffer_init(128);
2954         const char* leftclass = left_info->class_name;
2955
2956         jsonObject* snode = NULL;
2957         jsonIterator* search_itr = jsonNewIterator( working_hash );
2958
2959         while ( (snode = jsonIteratorNext( search_itr )) ) {
2960                 const char* right_alias = search_itr->key;
2961                 const char* class =
2962                                 jsonObjectGetString( jsonObjectGetKeyConst( snode, "class" ) );
2963                 if( ! class )
2964                         class = right_alias;
2965
2966                 const ClassInfo* right_info = add_joined_class( right_alias, class );
2967                 if( !right_info ) {
2968                         osrfLogError(
2969                                 OSRF_LOG_MARK,
2970                                 "%s: JOIN failed.  Class \"%s\" not resolved in IDL",
2971                                 modulename,
2972                                 search_itr->key
2973                         );
2974                         jsonIteratorFree( search_itr );
2975                         buffer_free( join_buf );
2976                         if( freeable_hash )
2977                                 jsonObjectFree( freeable_hash );
2978                         return NULL;
2979                 }
2980                 osrfHash* links    = right_info->links;
2981                 const char* table  = right_info->source_def;
2982
2983                 const char* fkey  = jsonObjectGetString( jsonObjectGetKeyConst( snode, "fkey" ) );
2984                 const char* field = jsonObjectGetString( jsonObjectGetKeyConst( snode, "field" ) );
2985
2986                 if (field && !fkey) {
2987                         // Look up the corresponding join column in the IDL.
2988                         // The link must be defined in the child table,
2989                         // and point to the right parent table.
2990                         osrfHash* idl_link = (osrfHash*) osrfHashGet( links, field );
2991                         const char* reltype = NULL;
2992                         const char* other_class = NULL;
2993                         reltype = osrfHashGet( idl_link, "reltype" );
2994                         if( reltype && strcmp( reltype, "has_many" ) )
2995                                 other_class = osrfHashGet( idl_link, "class" );
2996                         if( other_class && !strcmp( other_class, leftclass ) )
2997                                 fkey = osrfHashGet( idl_link, "key" );
2998                         if (!fkey) {
2999                                 osrfLogError(
3000                                         OSRF_LOG_MARK,
3001                                         "%s: JOIN failed.  No link defined from %s.%s to %s",
3002                                         modulename,
3003                                         class,
3004                                         field,
3005                                         leftclass
3006                                 );
3007                                 buffer_free(join_buf);
3008                                 if(freeable_hash)
3009                                         jsonObjectFree(freeable_hash);
3010                                 jsonIteratorFree(search_itr);
3011                                 return NULL;
3012                         }
3013
3014                 } else if (!field && fkey) {
3015                         // Look up the corresponding join column in the IDL.
3016                         // The link must be defined in the child table,
3017                         // and point to the right parent table.
3018                         osrfHash* left_links = left_info->links;
3019                         osrfHash* idl_link = (osrfHash*) osrfHashGet( left_links, fkey );
3020                         const char* reltype = NULL;
3021                         const char* other_class = NULL;
3022                         reltype = osrfHashGet( idl_link, "reltype" );
3023                         if( reltype && strcmp( reltype, "has_many" ) )
3024                                 other_class = osrfHashGet( idl_link, "class" );
3025                         if( other_class && !strcmp( other_class, class ) )
3026                                 field = osrfHashGet( idl_link, "key" );
3027                         if (!field) {
3028                                 osrfLogError(
3029                                         OSRF_LOG_MARK,
3030                                         "%s: JOIN failed.  No link defined from %s.%s to %s",
3031                                         modulename,
3032                                         leftclass,
3033                                         fkey,
3034                                         class
3035                                 );
3036                                 buffer_free(join_buf);
3037                                 if(freeable_hash)
3038                                         jsonObjectFree(freeable_hash);
3039                                 jsonIteratorFree(search_itr);
3040                                 return NULL;
3041                         }
3042
3043                 } else if (!field && !fkey) {
3044                         osrfHash* left_links = left_info->links;
3045
3046                         // For each link defined for the left class:
3047                         // see if the link references the joined class
3048                         osrfHashIterator* itr = osrfNewHashIterator( left_links );
3049                         osrfHash* curr_link = NULL;
3050                         while( (curr_link = osrfHashIteratorNext( itr ) ) ) {
3051                                 const char* other_class = osrfHashGet( curr_link, "class" );
3052                                 if( other_class && !strcmp( other_class, class ) ) {
3053
3054                                         // In the IDL, the parent class doesn't always know then names of the child
3055                                         // columns that are pointing to it, so don't use that end of the link
3056                                         const char* reltype = osrfHashGet( curr_link, "reltype" );
3057                                         if( reltype && strcmp( reltype, "has_many" ) ) {
3058                                                 // Found a link between the classes
3059                                                 fkey = osrfHashIteratorKey( itr );
3060                                                 field = osrfHashGet( curr_link, "key" );
3061                                                 break;
3062                                         }
3063                                 }
3064                         }
3065                         osrfHashIteratorFree( itr );
3066
3067                         if (!field || !fkey) {
3068                                 // Do another such search, with the classes reversed
3069
3070                                 // For each link defined for the joined class:
3071                                 // see if the link references the left class
3072                                 osrfHashIterator* itr = osrfNewHashIterator( links );
3073                                 osrfHash* curr_link = NULL;
3074                                 while( (curr_link = osrfHashIteratorNext( itr ) ) ) {
3075                                         const char* other_class = osrfHashGet( curr_link, "class" );
3076                                         if( other_class && !strcmp( other_class, leftclass ) ) {
3077
3078                                                 // In the IDL, the parent class doesn't know then names of the child
3079                                                 // columns that are pointing to it, so don't use that end of the link
3080                                                 const char* reltype = osrfHashGet( curr_link, "reltype" );
3081                                                 if( reltype && strcmp( reltype, "has_many" ) ) {
3082                                                         // Found a link between the classes
3083                                                         field = osrfHashIteratorKey( itr );
3084                                                         fkey = osrfHashGet( curr_link, "key" );
3085                                                         break;
3086                                                 }
3087                                         }
3088                                 }
3089                                 osrfHashIteratorFree( itr );
3090                         }
3091
3092                         if (!field || !fkey) {
3093                                 osrfLogError(
3094                                         OSRF_LOG_MARK,
3095                                         "%s: JOIN failed.  No link defined between %s and %s",
3096                                         modulename,
3097                                         leftclass,
3098                                         class
3099                                 );
3100                                 buffer_free(join_buf);
3101                                 if(freeable_hash)
3102                                         jsonObjectFree(freeable_hash);
3103                                 jsonIteratorFree(search_itr);
3104                                 return NULL;
3105                         }
3106                 }
3107
3108                 const char* type = jsonObjectGetString( jsonObjectGetKeyConst( snode, "type" ) );
3109                 if (type) {
3110                         if ( !strcasecmp(type,"left") ) {
3111                                 buffer_add(join_buf, " LEFT JOIN");
3112                         } else if ( !strcasecmp(type,"right") ) {
3113                                 buffer_add(join_buf, " RIGHT JOIN");
3114                         } else if ( !strcasecmp(type,"full") ) {
3115                                 buffer_add(join_buf, " FULL JOIN");
3116                         } else {
3117                                 buffer_add(join_buf, " INNER JOIN");
3118                         }
3119                 } else {
3120                         buffer_add(join_buf, " INNER JOIN");
3121                 }
3122
3123                 buffer_fadd(join_buf, " %s AS \"%s\" ON ( \"%s\".%s = \"%s\".%s",
3124                                         table, right_alias, right_alias, field, left_info->alias, fkey);
3125
3126                 // Add any other join conditions as specified by "filter"
3127                 const jsonObject* filter = jsonObjectGetKeyConst( snode, "filter" );
3128                 if (filter) {
3129                         const char* filter_op = jsonObjectGetString(
3130                                 jsonObjectGetKeyConst( snode, "filter_op" ) );
3131                         if ( filter_op && !strcasecmp("or",filter_op) ) {
3132                                 buffer_add( join_buf, " OR " );
3133                         } else {
3134                                 buffer_add( join_buf, " AND " );
3135                         }
3136
3137                         char* jpred = searchWHERE( filter, right_info, AND_OP_JOIN, NULL );
3138                         if( jpred ) {
3139                                 OSRF_BUFFER_ADD_CHAR( join_buf, ' ' );
3140                                 OSRF_BUFFER_ADD( join_buf, jpred );
3141                                 free(jpred);
3142                         } else {
3143                                 osrfLogError(
3144                                         OSRF_LOG_MARK,
3145                                         "%s: JOIN failed.  Invalid conditional expression.",
3146                                         modulename
3147                                 );
3148                                 jsonIteratorFree( search_itr );
3149                                 buffer_free( join_buf );
3150                                 if( freeable_hash )
3151                                         jsonObjectFree( freeable_hash );
3152                                 return NULL;
3153                         }
3154                 }
3155
3156                 buffer_add(join_buf, " ) ");
3157
3158                 // Recursively add a nested join, if one is present
3159                 const jsonObject* join_filter = jsonObjectGetKeyConst( snode, "join" );
3160                 if (join_filter) {
3161                         char* jpred = searchJOIN( join_filter, right_info );
3162                         if( jpred ) {
3163                                 OSRF_BUFFER_ADD_CHAR( join_buf, ' ' );
3164                                 OSRF_BUFFER_ADD( join_buf, jpred );
3165                                 free(jpred);
3166                         } else {
3167                                 osrfLogError(  OSRF_LOG_MARK, "%s: Invalid nested join.", modulename );
3168                                 jsonIteratorFree( search_itr );
3169                                 buffer_free( join_buf );
3170                                 if( freeable_hash )
3171                                         jsonObjectFree( freeable_hash );
3172                                 return NULL;
3173                         }
3174                 }
3175         }
3176
3177         if(freeable_hash)
3178                 jsonObjectFree(freeable_hash);
3179         jsonIteratorFree(search_itr);
3180
3181         return buffer_release(join_buf);
3182 }
3183
3184 /*
3185
3186 { +class : { -or|-and : { field : { op : value }, ... } ... }, ... }
3187 { +class : { -or|-and : [ { field : { op : value }, ... }, ...] ... }, ... }
3188 [ { +class : { -or|-and : [ { field : { op : value }, ... }, ...] ... }, ... }, ... ]
3189
3190 Generate code to express a set of conditions, as for a WHERE clause.  Parameters:
3191
3192 search_hash is the JSON expression of the conditions.
3193 meta is the class definition from the IDL, for the relevant table.
3194 opjoin_type indicates whether multiple conditions, if present, should be
3195         connected by AND or OR.
3196 osrfMethodContext is loaded with all sorts of stuff, but all we do with it here is
3197         to pass it to other functions -- and all they do with it is to use the session
3198         and request members to send error messages back to the client.
3199
3200 */
3201
3202 static char* searchWHERE ( const jsonObject* search_hash, const ClassInfo* class_info,
3203                 int opjoin_type, osrfMethodContext* ctx ) {
3204
3205         osrfLogDebug(
3206                 OSRF_LOG_MARK,
3207                 "%s: Entering searchWHERE; search_hash addr = %p, meta addr = %p, "
3208                 "opjoin_type = %d, ctx addr = %p",
3209                 modulename,
3210                 search_hash,
3211                 class_info->class_def,
3212                 opjoin_type,
3213                 ctx
3214         );
3215
3216         growing_buffer* sql_buf = buffer_init(128);
3217
3218         jsonObject* node = NULL;
3219
3220         int first = 1;
3221         if ( search_hash->type == JSON_ARRAY ) {
3222                 osrfLogDebug( OSRF_LOG_MARK,
3223                   "%s: In WHERE clause, condition type is JSON_ARRAY", modulename );
3224                 if( 0 == search_hash->size ) {
3225                         osrfLogError(
3226                                 OSRF_LOG_MARK,
3227                                 "%s: Invalid predicate structure: empty JSON array",
3228                                 modulename
3229                         );
3230                         buffer_free( sql_buf );
3231                         return NULL;
3232                 }
3233
3234                 unsigned long i = 0;
3235                 while((node = jsonObjectGetIndex( search_hash, i++ ) )) {
3236                         if (first) {
3237                                 first = 0;
3238                         } else {
3239                                 if (opjoin_type == OR_OP_JOIN)
3240                                         buffer_add(sql_buf, " OR ");
3241                                 else
3242                                         buffer_add(sql_buf, " AND ");
3243                         }
3244
3245                         char* subpred = searchWHERE( node, class_info, opjoin_type, ctx );
3246                         if( ! subpred ) {
3247                                 buffer_free( sql_buf );
3248                                 return NULL;
3249                         }
3250
3251                         buffer_fadd(sql_buf, "( %s )", subpred);
3252                         free(subpred);
3253                 }
3254
3255         } else if ( search_hash->type == JSON_HASH ) {
3256                 osrfLogDebug( OSRF_LOG_MARK,
3257                         "%s: In WHERE clause, condition type is JSON_HASH", modulename );
3258                 jsonIterator* search_itr = jsonNewIterator( search_hash );
3259                 if( !jsonIteratorHasNext( search_itr ) ) {
3260                         osrfLogError(
3261                                 OSRF_LOG_MARK,
3262                                 "%s: Invalid predicate structure: empty JSON object",
3263                                 modulename
3264                         );
3265                         jsonIteratorFree( search_itr );
3266                         buffer_free( sql_buf );
3267                         return NULL;
3268                 }
3269
3270                 while ( (node = jsonIteratorNext( search_itr )) ) {
3271
3272                         if (first) {
3273                                 first = 0;
3274                         } else {
3275                                 if (opjoin_type == OR_OP_JOIN)
3276                                         buffer_add(sql_buf, " OR ");
3277                                 else
3278                           &