]> git.evergreen-ils.org Git - working/Evergreen.git/blob - Open-ILS/src/c-apps/oils_sql.c
Improved permission checking speed for PCRUD mode.
[working/Evergreen.git] / Open-ILS / src / c-apps / oils_sql.c
1 /**
2         @file oils_sql.c
3         @brief Utility routines for translating JSON into SQL.
4 */
5
6 #include <stdlib.h>
7 #include <string.h>
8 #include <time.h>
9 #include <ctype.h>
10 #include <dbi/dbi.h>
11 #include "opensrf/utils.h"
12 #include "opensrf/log.h"
13 #include "opensrf/osrf_application.h"
14 #include "openils/oils_utils.h"
15 #include "openils/oils_sql.h"
16
17 // The next four macros are OR'd together as needed to form a set
18 // of bitflags.  SUBCOMBO enables an extra pair of parentheses when
19 // nesting one UNION, INTERSECT or EXCEPT inside another.
20 // SUBSELECT tells us we're in a subquery, so don't add the
21 // terminal semicolon yet.
22 #define SUBCOMBO    8
23 #define SUBSELECT   4
24 #define DISABLE_I18N    2
25 #define SELECT_DISTINCT 1
26
27 #define AND_OP_JOIN     0
28 #define OR_OP_JOIN      1
29
30 struct ClassInfoStruct;
31 typedef struct ClassInfoStruct ClassInfo;
32
33 #define ALIAS_STORE_SIZE 16
34 #define CLASS_NAME_STORE_SIZE 16
35
36 struct ClassInfoStruct {
37         char* alias;
38         char* class_name;
39         char* source_def;
40         osrfHash* class_def;      // Points into IDL
41         osrfHash* fields;         // Points into IDL
42         osrfHash* links;          // Points into IDL
43
44         // The remaining members are private and internal.  Client code should not
45         // access them directly.
46
47         ClassInfo* next;          // Supports linked list of joined classes
48         int in_use;               // boolean
49
50         // We usually store the alias and class name in the following arrays, and
51         // point the corresponding pointers at them.  When the string is too big
52         // for the array (which will probably never happen in practice), we strdup it.
53
54         char alias_store[ ALIAS_STORE_SIZE + 1 ];
55         char class_name_store[ CLASS_NAME_STORE_SIZE + 1 ];
56 };
57
58 struct QueryFrameStruct;
59 typedef struct QueryFrameStruct QueryFrame;
60
61 struct QueryFrameStruct {
62         ClassInfo core;
63         ClassInfo* join_list;  // linked list of classes joined to the core class
64         QueryFrame* next;      // implements stack as linked list
65         int in_use;            // boolean
66 };
67
68 static int timeout_needs_resetting;
69 static time_t time_next_reset;
70
71 static int verifyObjectClass ( osrfMethodContext*, const jsonObject* );
72
73 static void setXactId( osrfMethodContext* ctx );
74 static inline const char* getXactId( osrfMethodContext* ctx );
75 static inline void clearXactId( osrfMethodContext* ctx );
76
77 static jsonObject* doFieldmapperSearch ( osrfMethodContext* ctx, osrfHash* class_meta,
78                 jsonObject* where_hash, jsonObject* query_hash, int* err );
79 static jsonObject* oilsMakeFieldmapperFromResult( dbi_result, osrfHash* );
80 static jsonObject* oilsMakeJSONFromResult( dbi_result );
81
82 static char* searchSimplePredicate ( const char* op, const char* class_alias,
83                                 osrfHash* field, const jsonObject* node );
84 static char* searchFunctionPredicate ( const char*, osrfHash*, const jsonObject*, const char* );
85 static char* searchFieldTransform ( const char*, osrfHash*, const jsonObject* );
86 static char* searchFieldTransformPredicate ( const ClassInfo*, osrfHash*, const jsonObject*,
87                 const char* );
88 static char* searchBETWEENPredicate ( const char*, osrfHash*, const jsonObject* );
89 static char* searchINPredicate ( const char*, osrfHash*,
90                                                                  jsonObject*, const char*, osrfMethodContext* );
91 static char* searchPredicate ( const ClassInfo*, osrfHash*, jsonObject*, osrfMethodContext* );
92 static char* searchJOIN ( const jsonObject*, const ClassInfo* left_info );
93 static char* searchWHERE ( const jsonObject* search_hash, const ClassInfo*, int, osrfMethodContext* );
94 static char* buildSELECT( const jsonObject*, jsonObject* rest_of_query,
95         osrfHash* meta, osrfMethodContext* ctx );
96 static char* buildOrderByFromArray( osrfMethodContext* ctx, const jsonObject* order_array );
97
98 char* buildQuery( osrfMethodContext* ctx, jsonObject* query, int flags );
99
100 char* SELECT ( osrfMethodContext*, jsonObject*, const jsonObject*, const jsonObject*,
101         const jsonObject*, const jsonObject*, const jsonObject*, const jsonObject*, int );
102
103 static osrfStringArray* getPermLocationCache( osrfMethodContext*, const char* );
104 static void setPermLocationCache( osrfMethodContext*, const char*, osrfStringArray* );
105
106 void userDataFree( void* );
107 static void sessionDataFree( char*, void* );
108 static void pcacheFree( char*, void* );
109 static int obj_is_true( const jsonObject* obj );
110 static const char* json_type( int code );
111 static const char* get_primitive( osrfHash* field );
112 static const char* get_datatype( osrfHash* field );
113 static void pop_query_frame( void );
114 static void push_query_frame( void );
115 static int add_query_core( const char* alias, const char* class_name );
116 static inline ClassInfo* search_alias( const char* target );
117 static ClassInfo* search_all_alias( const char* target );
118 static ClassInfo* add_joined_class( const char* alias, const char* classname );
119 static void clear_query_stack( void );
120
121 static const jsonObject* verifyUserPCRUD( osrfMethodContext* );
122 static int verifyObjectPCRUD( osrfMethodContext*, const jsonObject*, const int );
123 static const char* org_tree_root( osrfMethodContext* ctx );
124 static jsonObject* single_hash( const char* key, const char* value );
125
126 static int child_initialized = 0;   /* boolean */
127
128 static dbi_conn writehandle; /* our MASTER db connection */
129 static dbi_conn dbhandle; /* our CURRENT db connection */
130 //static osrfHash * readHandles;
131
132 // The following points to the top of a stack of QueryFrames.  It's a little
133 // confusing because the top level of the query is at the bottom of the stack.
134 static QueryFrame* curr_query = NULL;
135
136 static dbi_conn writehandle; /* our MASTER db connection */
137 static dbi_conn dbhandle; /* our CURRENT db connection */
138 //static osrfHash * readHandles;
139
140 static int max_flesh_depth = 100;
141
142 static int perm_at_threshold = 5;
143 static int enforce_pcrud = 0;     // Boolean
144 static char* modulename = NULL;
145
146 /**
147         @brief Connect to the database.
148         @return A database connection if successful, or NULL if not.
149 */
150 dbi_conn oilsConnectDB( const char* mod_name ) {
151
152         osrfLogDebug( OSRF_LOG_MARK, "Attempting to initialize libdbi..." );
153         if( dbi_initialize( NULL ) == -1 ) {
154                 osrfLogError( OSRF_LOG_MARK, "Unable to initialize libdbi" );
155                 return NULL;
156         } else
157                 osrfLogDebug( OSRF_LOG_MARK, "... libdbi initialized." );
158
159         char* driver = osrf_settings_host_value( "/apps/%s/app_settings/driver", mod_name );
160         char* user   = osrf_settings_host_value( "/apps/%s/app_settings/database/user", mod_name );
161         char* host   = osrf_settings_host_value( "/apps/%s/app_settings/database/host", mod_name );
162         char* port   = osrf_settings_host_value( "/apps/%s/app_settings/database/port", mod_name );
163         char* db     = osrf_settings_host_value( "/apps/%s/app_settings/database/db", mod_name );
164         char* pw     = osrf_settings_host_value( "/apps/%s/app_settings/database/pw", mod_name );
165
166         osrfLogDebug( OSRF_LOG_MARK, "Attempting to load the database driver [%s]...", driver );
167         dbi_conn handle = dbi_conn_new( driver );
168
169         if( !handle ) {
170                 osrfLogError( OSRF_LOG_MARK, "Error loading database driver [%s]", driver );
171                 return NULL;
172         }
173         osrfLogDebug( OSRF_LOG_MARK, "Database driver [%s] seems OK", driver );
174
175         osrfLogInfo(OSRF_LOG_MARK, "%s connecting to database.  host=%s, "
176                 "port=%s, user=%s, db=%s", mod_name, host, port, user, db );
177
178         if( host ) dbi_conn_set_option( handle, "host", host );
179         if( port ) dbi_conn_set_option_numeric( handle, "port", atoi( port ));
180         if( user ) dbi_conn_set_option( handle, "username", user );
181         if( pw )   dbi_conn_set_option( handle, "password", pw );
182         if( db )   dbi_conn_set_option( handle, "dbname", db );
183
184         free( user );
185         free( host );
186         free( port );
187         free( db );
188         free( pw );
189
190         if( dbi_conn_connect( handle ) < 0 ) {
191                 sleep( 1 );
192                 if( dbi_conn_connect( handle ) < 0 ) {
193                         const char* msg;
194                         dbi_conn_error( handle, &msg );
195                         osrfLogError( OSRF_LOG_MARK, "Error connecting to database: %s",
196                                 msg ? msg : "(No description available)" );
197                         return NULL;
198                 }
199         }
200
201         osrfLogInfo( OSRF_LOG_MARK, "%s successfully connected to the database", mod_name );
202
203         return handle;
204 }
205
206 /**
207         @brief Select some options.
208         @param module_name: Name of the server.
209         @param do_pcrud: Boolean.  True if we are to enforce PCRUD permissions.
210
211         This source file is used (at this writing) to implement three different servers:
212         - open-ils.reporter-store
213         - open-ils.pcrud
214         - open-ils.cstore
215
216         These servers behave mostly the same, but they implement different combinations of
217         methods, and open-ils.pcrud enforces a permissions scheme that the other two don't.
218
219         Here we use the server name in messages to identify which kind of server issued them.
220         We use do_crud as a boolean to control whether or not to enforce the permissions scheme.
221 */
222 void oilsSetSQLOptions( const char* module_name, int do_pcrud, int flesh_depth ) {
223         if( !module_name )
224                 module_name = "open-ils.cstore";   // bulletproofing with a default
225
226         if( modulename )
227                 free( modulename );
228
229         modulename = strdup( module_name );
230         enforce_pcrud = do_pcrud;
231         max_flesh_depth = flesh_depth;
232 }
233
234 /**
235         @brief Install a database connection.
236         @param conn Pointer to a database connection.
237
238         In some contexts, @a conn may merely provide a driver so that we can process strings
239         properly, without providing an open database connection.
240 */
241 void oilsSetDBConnection( dbi_conn conn ) {
242         dbhandle = writehandle = conn;
243 }
244
245 /**
246         @brief Determine whether a database connection is alive.
247         @param handle Handle for a database connection.
248         @return 1 if the connection is alive, or zero if it isn't.
249 */
250 int oilsIsDBConnected( dbi_conn handle ) {
251         // Do an innocuous SELECT.  If it succeeds, the database connection is still good.
252         dbi_result result = dbi_conn_query( handle, "SELECT 1;" );
253         if( result ) {
254                 dbi_result_free( result );
255                 return 1;
256         } else {
257                 // This is a terrible, horrible, no good, very bad kludge.
258                 // Sometimes the SELECT 1 query fails, not because the database connection is dead,
259                 // but because (due to a previous error) the database is ignoring all commands,
260                 // even innocuous SELECTs, until the current transaction is rolled back.  The only
261                 // known way to detect this condition via the dbi library is by looking at the error
262                 // message.  This approach will break if the language or wording of the message ever
263                 // changes.
264                 // Note: the dbi_conn_ping function purports to determine whether the doatabase
265                 // connection is live, but at this writing this function is unreliable and useless.
266                 static const char* ok_msg = "ERROR:  current transaction is aborted, commands "
267                         "ignored until end of transaction block\n";
268                 const char* msg;
269                 dbi_conn_error( handle, &msg );
270                 if( strcmp( msg, ok_msg )) {
271                         osrfLogError( OSRF_LOG_MARK, "Database connection isn't working" );
272                         return 0;
273                 } else
274                         return 1;   // ignoring SELECT due to previous error; that's okay
275         }
276 }
277
278 /**
279         @brief Get a table name, view name, or subquery for use in a FROM clause.
280         @param class Pointer to the IDL class entry.
281         @return A table name, a view name, or a subquery in parentheses.
282
283         In some cases the IDL defines a class, not with a table name or a view name, but with
284         a SELECT statement, which may be used as a subquery.
285 */
286 char* oilsGetRelation( osrfHash* classdef ) {
287
288         char* source_def = NULL;
289         const char* tabledef = osrfHashGet( classdef, "tablename" );
290
291         if( tabledef ) {
292                 source_def = strdup( tabledef );   // Return the name of a table or view
293         } else {
294                 tabledef = osrfHashGet( classdef, "source_definition" );
295                 if( tabledef ) {
296                         // Return a subquery, enclosed in parentheses
297                         source_def = safe_malloc( strlen( tabledef ) + 3 );
298                         source_def[ 0 ] = '(';
299                         strcpy( source_def + 1, tabledef );
300                         strcat( source_def, ")" );
301                 } else {
302                         // Not found: return an error
303                         const char* classname = osrfHashGet( classdef, "classname" );
304                         if( !classname )
305                                 classname = "???";
306                         osrfLogError(
307                                 OSRF_LOG_MARK,
308                                 "%s ERROR No tablename or source_definition for class \"%s\"",
309                                 modulename,
310                                 classname
311                         );
312                 }
313         }
314
315         return source_def;
316 }
317
318 /**
319         @brief Add datatypes from the database to the fields in the IDL.
320         @param handle Handle for a database connection
321         @return Zero if successful, or 1 upon error.
322
323         For each relevant class in the IDL: ask the database for the datatype of every field.
324         In particular, determine which fields are text fields and which fields are numeric
325         fields, so that we know whether to enclose their values in quotes.
326 */
327 int oilsExtendIDL( dbi_conn handle ) {
328         osrfHashIterator* class_itr = osrfNewHashIterator( oilsIDL() );
329         osrfHash* class = NULL;
330         growing_buffer* query_buf = buffer_init( 64 );
331         int results_found = 0;   // boolean
332
333         // For each class in the IDL...
334         while( (class = osrfHashIteratorNext( class_itr ) ) ) {
335                 const char* classname = osrfHashIteratorKey( class_itr );
336                 osrfHash* fields = osrfHashGet( class, "fields" );
337
338                 // If the class is virtual, ignore it
339                 if( str_is_true( osrfHashGet(class, "virtual") ) ) {
340                         osrfLogDebug(OSRF_LOG_MARK, "Class %s is virtual, skipping", classname );
341                         continue;
342                 }
343
344                 char* tabledef = oilsGetRelation( class );
345                 if( !tabledef )
346                         continue;   // No such relation -- a query of it would be doomed to failure
347
348                 buffer_reset( query_buf );
349                 buffer_fadd( query_buf, "SELECT * FROM %s AS x WHERE 1=0;", tabledef );
350
351                 free(tabledef );
352
353                 osrfLogDebug( OSRF_LOG_MARK, "%s Investigatory SQL = %s",
354                                 modulename, OSRF_BUFFER_C_STR( query_buf ) );
355
356                 dbi_result result = dbi_conn_query( handle, OSRF_BUFFER_C_STR( query_buf ) );
357                 if( result ) {
358
359                         results_found = 1;
360                         int columnIndex = 1;
361                         const char* columnName;
362                         while( (columnName = dbi_result_get_field_name(result, columnIndex)) ) {
363
364                                 osrfLogInternal( OSRF_LOG_MARK, "Looking for column named [%s]...",
365                                                 columnName );
366
367                                 /* fetch the fieldmapper index */
368                                 osrfHash* _f = osrfHashGet(fields, columnName);
369                                 if( _f ) {
370
371                                         osrfLogDebug(OSRF_LOG_MARK, "Found [%s] in IDL hash...", columnName);
372
373                                         /* determine the field type and storage attributes */
374
375                                         switch( dbi_result_get_field_type_idx( result, columnIndex )) {
376
377                                                 case DBI_TYPE_INTEGER : {
378
379                                                         if( !osrfHashGet(_f, "primitive") )
380                                                                 osrfHashSet(_f, "number", "primitive");
381
382                                                         int attr = dbi_result_get_field_attribs_idx( result, columnIndex );
383                                                         if( attr & DBI_INTEGER_SIZE8 )
384                                                                 osrfHashSet( _f, "INT8", "datatype" );
385                                                         else
386                                                                 osrfHashSet( _f, "INT", "datatype" );
387                                                         break;
388                                                 }
389                                                 case DBI_TYPE_DECIMAL :
390                                                         if( !osrfHashGet( _f, "primitive" ))
391                                                                 osrfHashSet( _f, "number", "primitive" );
392
393                                                         osrfHashSet( _f, "NUMERIC", "datatype" );
394                                                         break;
395
396                                                 case DBI_TYPE_STRING :
397                                                         if( !osrfHashGet( _f, "primitive" ))
398                                                                 osrfHashSet( _f, "string", "primitive" );
399
400                                                         osrfHashSet( _f,"TEXT", "datatype" );
401                                                         break;
402
403                                                 case DBI_TYPE_DATETIME :
404                                                         if( !osrfHashGet( _f, "primitive" ))
405                                                                 osrfHashSet( _f, "string", "primitive" );
406
407                                                         osrfHashSet( _f, "TIMESTAMP", "datatype" );
408                                                         break;
409
410                                                 case DBI_TYPE_BINARY :
411                                                         if( !osrfHashGet( _f, "primitive" ))
412                                                                 osrfHashSet( _f, "string", "primitive" );
413
414                                                         osrfHashSet( _f, "BYTEA", "datatype" );
415                                         }
416
417                                         osrfLogDebug(
418                                                 OSRF_LOG_MARK,
419                                                 "Setting [%s] to primitive [%s] and datatype [%s]...",
420                                                 columnName,
421                                                 osrfHashGet( _f, "primitive" ),
422                                                 osrfHashGet( _f, "datatype" )
423                                         );
424                                 }
425                                 ++columnIndex;
426                         } // end while loop for traversing columns of result
427                         dbi_result_free( result  );
428                 } else {
429                         const char* msg;
430                         int errnum = dbi_conn_error( handle, &msg );
431                         osrfLogDebug( OSRF_LOG_MARK, "No data found for class [%s]: %d, %s", classname,
432                                 errnum, msg ? msg : "(No description available)" );
433                         // We don't check the database connection here.  It's routine to get failures at
434                         // this point; we routinely try to query tables that don't exist, because they
435                         // are defined in the IDL but not in the database.
436                 }
437         } // end for each class in IDL
438
439         buffer_free( query_buf );
440         osrfHashIteratorFree( class_itr );
441         child_initialized = 1;
442
443         if( !results_found ) {
444                 osrfLogError( OSRF_LOG_MARK,
445                         "No results found for any class -- bad database connection?" );
446                 return 1;
447         } else if( ! oilsIsDBConnected( handle )) {
448                 osrfLogError( OSRF_LOG_MARK,
449                         "Unable to extend IDL: database connection isn't working" );
450                 return 1;
451         }
452         else
453                 return 0;
454 }
455
456 /**
457         @brief Free an osrfHash that stores a transaction ID.
458         @param blob A pointer to the osrfHash to be freed, cast to a void pointer.
459
460         This function is a callback, to be called by the application session when it ends.
461         The application session stores the osrfHash via an opaque pointer.
462
463         If the osrfHash contains an entry for the key "xact_id", it means that an
464         uncommitted transaction is pending.  Roll it back.
465 */
466 void userDataFree( void* blob ) {
467         osrfHash* hash = (osrfHash*) blob;
468         if( osrfHashGet( hash, "xact_id" ) && writehandle ) {
469                 if( !dbi_conn_query( writehandle, "ROLLBACK;" )) {
470                         const char* msg;
471                         int errnum = dbi_conn_error( writehandle, &msg );
472                         osrfLogWarning( OSRF_LOG_MARK, "Unable to perform rollback: %d %s",
473                                 errnum, msg ? msg : "(No description available)" );
474                 };
475         }
476
477         osrfHashFree( hash );
478 }
479
480 /**
481         @name Managing session data
482         @brief Maintain data stored via the userData pointer of the application session.
483
484         Currently, session-level data is stored in an osrfHash.  Other arrangements are
485         possible, and some would be more efficient.  The application session calls a
486         callback function to free userData before terminating.
487
488         Currently, the only data we store at the session level is the transaction id.  By this
489         means we can ensure that any pending transactions are rolled back before the application
490         session terminates.
491 */
492 /*@{*/
493
494 /**
495         @brief Free an item in the application session's userData.
496         @param key The name of a key for an osrfHash.
497         @param item An opaque pointer to the item associated with the key.
498
499         We store an osrfHash as userData with the application session, and arrange (by
500         installing userDataFree() as a different callback) for the session to free that
501         osrfHash before terminating.
502
503         This function is a callback for freeing items in the osrfHash.  Currently we store
504         two things:
505         - Transaction id of a pending transaction; a character string.  Key: "xact_id".
506         - Authkey; a character string.  Key: "authkey".
507         - User object from the authentication server; a jsonObject.  Key: "user_login".
508
509         If we ever store anything else in userData, we will need to revisit this function so
510         that it will free whatever else needs freeing.
511 */
512 static void sessionDataFree( char* key, void* item ) {
513         if( !strcmp( key, "xact_id" ) || !strcmp( key, "authkey" ) ) 
514                 free( item );
515         else if( !strcmp( key, "user_login" ) )
516                 jsonObjectFree( (jsonObject*) item );
517         else if( !strcmp( key, "pcache" ) )
518                 osrfHashFree( (osrfHash*) item );
519 }
520
521 static void pcacheFree( char* key, void* item ) {
522         osrfStringArrayFree( (osrfStringArray*) item );
523 }
524
525 /**
526         @brief Save a transaction id.
527         @param ctx Pointer to the method context.
528
529         Save the session_id of the current application session as a transaction id.
530 */
531 static void setXactId( osrfMethodContext* ctx ) {
532         if( ctx && ctx->session ) {
533                 osrfAppSession* session = ctx->session;
534
535                 osrfHash* cache = session->userData;
536
537                 // If the session doesn't already have a hash, create one.  Make sure
538                 // that the application session frees the hash when it terminates.
539                 if( NULL == cache ) {
540                         session->userData = cache = osrfNewHash();
541                         osrfHashSetCallback( cache, &sessionDataFree );
542                         ctx->session->userDataFree = &userDataFree;
543                 }
544
545                 // Save the transaction id in the hash, with the key "xact_id"
546                 osrfHashSet( cache, strdup( session->session_id ), "xact_id" );
547         }
548 }
549
550 /**
551         @brief Get the transaction ID for the current transaction, if any.
552         @param ctx Pointer to the method context.
553         @return Pointer to the transaction ID.
554
555         The return value points to an internal buffer, and will become invalid upon issuing
556         a commit or rollback.
557 */
558 static inline const char* getXactId( osrfMethodContext* ctx ) {
559         if( ctx && ctx->session && ctx->session->userData )
560                 return osrfHashGet( (osrfHash*) ctx->session->userData, "xact_id" );
561         else
562                 return NULL;
563 }
564
565 /**
566         @brief Clear the current transaction id.
567         @param ctx Pointer to the method context.
568 */
569 static inline void clearXactId( osrfMethodContext* ctx ) {
570         if( ctx && ctx->session && ctx->session->userData )
571                 osrfHashRemove( ctx->session->userData, "xact_id" );
572 }
573 /*@}*/
574
575 /**
576         @brief Stash the location for a particular perm in the sessionData cache
577         @param ctx Pointer to the method context.
578         @param perm Name of the permission we're looking at
579         @param array StringArray of perm location ids
580 */
581 static void setPermLocationCache( osrfMethodContext* ctx, const char* perm, osrfStringArray* locations ) {
582         if( ctx && ctx->session ) {
583                 osrfAppSession* session = ctx->session;
584
585                 osrfHash* cache = session->userData;
586
587                 // If the session doesn't already have a hash, create one.  Make sure
588                 // that the application session frees the hash when it terminates.
589                 if( NULL == cache ) {
590                         session->userData = cache = osrfNewHash();
591                         osrfHashSetCallback( cache, &sessionDataFree );
592                         ctx->session->userDataFree = &userDataFree;
593                 }
594
595                 osrfHash* pcache = osrfHashGet(cache, "pcache");
596
597                 if( NULL == pcache ) {
598                         pcache = osrfNewHash();
599                         osrfHashSetCallback( pcache, &pcacheFree );
600                         osrfHashSet( cache, pcache, "pcache" );
601                 }
602
603                 if( perm && locations )
604                         osrfHashSet( pcache, locations, strdup(perm) );
605         }
606 }
607
608 /**
609         @brief Grab stashed location for a particular perm in the sessionData cache
610         @param ctx Pointer to the method context.
611         @param perm Name of the permission we're looking at
612 */
613 static osrfStringArray* getPermLocationCache( osrfMethodContext* ctx, const char* perm ) {
614         if( ctx && ctx->session ) {
615                 osrfAppSession* session = ctx->session;
616                 osrfHash* cache = session->userData;
617                 if( cache ) {
618                         osrfHash* pcache = osrfHashGet(cache, "pcache");
619                         if( pcache ) {
620                                 return osrfHashGet( pcache, perm );
621                         }
622                 }
623         }
624
625         return NULL;
626 }
627
628 /**
629         @brief Save the user's login in the userData for the current application session.
630         @param ctx Pointer to the method context.
631         @param user_login Pointer to the user login object to be cached (we cache the original,
632         not a copy of it).
633
634         If @a user_login is NULL, remove the user login if one is already cached.
635 */
636 static void setUserLogin( osrfMethodContext* ctx, jsonObject* user_login ) {
637         if( ctx && ctx->session ) {
638                 osrfAppSession* session = ctx->session;
639
640                 osrfHash* cache = session->userData;
641
642                 // If the session doesn't already have a hash, create one.  Make sure
643                 // that the application session frees the hash when it terminates.
644                 if( NULL == cache ) {
645                         session->userData = cache = osrfNewHash();
646                         osrfHashSetCallback( cache, &sessionDataFree );
647                         ctx->session->userDataFree = &userDataFree;
648                 }
649
650                 if( user_login )
651                         osrfHashSet( cache, user_login, "user_login" );
652                 else
653                         osrfHashRemove( cache, "user_login" );
654         }
655 }
656
657 /**
658         @brief Get the user login object for the current application session, if any.
659         @param ctx Pointer to the method context.
660         @return Pointer to the user login object if found; otherwise NULL.
661
662         The user login object was returned from the authentication server, and then cached so
663         we don't have to call the authentication server again for the same user.
664 */
665 static const jsonObject* getUserLogin( osrfMethodContext* ctx ) {
666         if( ctx && ctx->session && ctx->session->userData )
667                 return osrfHashGet( (osrfHash*) ctx->session->userData, "user_login" );
668         else
669                 return NULL;
670 }
671
672 /**
673         @brief Save a copy of an authkey in the userData of the current application session.
674         @param ctx Pointer to the method context.
675         @param authkey The authkey to be saved.
676
677         If @a authkey is NULL, remove the authkey if one is already cached.
678 */
679 static void setAuthkey( osrfMethodContext* ctx, const char* authkey ) {
680         if( ctx && ctx->session && authkey ) {
681                 osrfAppSession* session = ctx->session;
682                 osrfHash* cache = session->userData;
683
684                 // If the session doesn't already have a hash, create one.  Make sure
685                 // that the application session frees the hash when it terminates.
686                 if( NULL == cache ) {
687                         session->userData = cache = osrfNewHash();
688                         osrfHashSetCallback( cache, &sessionDataFree );
689                         ctx->session->userDataFree = &userDataFree;
690                 }
691
692                 // Save the transaction id in the hash, with the key "xact_id"
693                 if( authkey && *authkey )
694                         osrfHashSet( cache, strdup( authkey ), "authkey" );
695                 else
696                         osrfHashRemove( cache, "authkey" );
697         }
698 }
699
700 /**
701         @brief Reset the login timeout.
702         @param authkey The authentication key for the current login session.
703         @param now The current time.
704         @return Zero if successful, or 1 if not.
705
706         Tell the authentication server to reset the timeout so that the login session won't
707         expire for a while longer.
708
709         We could dispense with the @a now parameter by calling time().  But we just called
710         time() in order to decide whether to reset the timeout, so we might as well reuse
711         the result instead of calling time() again.
712 */
713 static int reset_timeout( const char* authkey, time_t now ) {
714         jsonObject* auth_object = jsonNewObject( authkey );
715
716         // Ask the authentication server to reset the timeout.  It returns an event
717         // indicating success or failure.
718         jsonObject* result = oilsUtilsQuickReq( "open-ils.auth",
719                 "open-ils.auth.session.reset_timeout", auth_object );
720         jsonObjectFree( auth_object );
721
722         if( !result || result->type != JSON_HASH ) {
723                 osrfLogError( OSRF_LOG_MARK,
724                          "Unexpected object type receieved from open-ils.auth.session.reset_timeout" );
725                 jsonObjectFree( result );
726                 return 1;       // Not the right sort of object returned
727         }
728
729         const jsonObject* ilsevent = jsonObjectGetKeyConst( result, "ilsevent" );
730         if( !ilsevent || ilsevent->type != JSON_NUMBER ) {
731                 osrfLogError( OSRF_LOG_MARK, "ilsevent is absent or malformed" );
732                 jsonObjectFree( result );
733                 return 1;    // Return code from method not available
734         }
735
736         if( jsonObjectGetNumber( ilsevent ) != 0.0 ) {
737                 const char* desc = jsonObjectGetString( jsonObjectGetKeyConst( result, "desc" ));
738                 if( !desc )
739                         desc = "(No reason available)";    // failsafe; shouldn't happen
740                 osrfLogInfo( OSRF_LOG_MARK, "Failure to reset timeout: %s", desc );
741                 jsonObjectFree( result );
742                 return 1;
743         }
744
745         // Revise our local proxy for the timeout deadline
746         // by a smallish fraction of the timeout interval
747         const char* timeout = jsonObjectGetString( jsonObjectGetKeyConst( result, "payload" ));
748         if( !timeout )
749                 timeout = "1";   // failsafe; shouldn't happen
750         time_next_reset = now + atoi( timeout ) / 15;
751
752         jsonObjectFree( result );
753         return 0;     // Successfully reset timeout
754 }
755
756 /**
757         @brief Get the authkey string for the current application session, if any.
758         @param ctx Pointer to the method context.
759         @return Pointer to the cached authkey if found; otherwise NULL.
760
761         If present, the authkey string was cached from a previous method call.
762 */
763 static const char* getAuthkey( osrfMethodContext* ctx ) {
764         if( ctx && ctx->session && ctx->session->userData ) {
765                 const char* authkey = osrfHashGet( (osrfHash*) ctx->session->userData, "authkey" );
766
767                 // Possibly reset the authentication timeout to keep the login alive.  We do so
768                 // no more than once per method call, and not at all if it has been only a short
769                 // time since the last reset.
770
771                 // Here we reset explicitly, if at all.  We also implicitly reset the timeout
772                 // whenever we call the "open-ils.auth.session.retrieve" method.
773                 if( timeout_needs_resetting ) {
774                         time_t now = time( NULL );
775                         if( now >= time_next_reset && reset_timeout( authkey, now ) )
776                                 authkey = NULL;    // timeout has apparently expired already
777                 }
778
779                 timeout_needs_resetting = 0;
780                 return authkey;
781         }
782         else
783                 return NULL;
784 }
785
786 /**
787         @brief Implement the transaction.begin method.
788         @param ctx Pointer to the method context.
789         @return Zero if successful, or -1 upon error.
790
791         Start a transaction.  Save a transaction ID for future reference.
792
793         Method parameters:
794         - authkey (PCRUD only)
795
796         Return to client: Transaction ID
797 */
798 int beginTransaction( osrfMethodContext* ctx ) {
799         if(osrfMethodVerifyContext( ctx )) {
800                 osrfLogError( OSRF_LOG_MARK,  "Invalid method context" );
801                 return -1;
802         }
803
804         if( enforce_pcrud ) {
805                 timeout_needs_resetting = 1;
806                 const jsonObject* user = verifyUserPCRUD( ctx );
807                 if( !user )
808                         return -1;
809         }
810
811         dbi_result result = dbi_conn_query( writehandle, "START TRANSACTION;" );
812         if( !result ) {
813                 const char* msg;
814                 int errnum = dbi_conn_error( writehandle, &msg );
815                 osrfLogError( OSRF_LOG_MARK, "%s: Error starting transaction: %d %s",
816                         modulename, errnum, msg ? msg : "(No description available)" );
817                 osrfAppSessionStatus( ctx->session, OSRF_STATUS_INTERNALSERVERERROR,
818                         "osrfMethodException", ctx->request, "Error starting transaction" );
819                 if( !oilsIsDBConnected( writehandle ))
820                         osrfAppSessionPanic( ctx->session );
821                 return -1;
822         } else {
823                 dbi_result_free( result );
824                 setXactId( ctx );
825                 jsonObject* ret = jsonNewObject( getXactId( ctx ) );
826                 osrfAppRespondComplete( ctx, ret );
827                 jsonObjectFree( ret );
828                 return 0;
829         }
830 }
831
832 /**
833         @brief Implement the savepoint.set method.
834         @param ctx Pointer to the method context.
835         @return Zero if successful, or -1 if not.
836
837         Issue a SAVEPOINT to the database server.
838
839         Method parameters:
840         - authkey (PCRUD only)
841         - savepoint name
842
843         Return to client: Savepoint name
844 */
845 int setSavepoint( osrfMethodContext* ctx ) {
846         if(osrfMethodVerifyContext( ctx )) {
847                 osrfLogError( OSRF_LOG_MARK,  "Invalid method context" );
848                 return -1;
849         }
850
851         int spNamePos = 0;
852         if( enforce_pcrud ) {
853                 spNamePos = 1;
854                 timeout_needs_resetting = 1;
855                 const jsonObject* user = verifyUserPCRUD( ctx );
856                 if( !user )
857                         return -1;
858         }
859
860         // Verify that a transaction is pending
861         const char* trans_id = getXactId( ctx );
862         if( NULL == trans_id ) {
863                 osrfAppSessionStatus(
864                         ctx->session,
865                         OSRF_STATUS_INTERNALSERVERERROR,
866                         "osrfMethodException",
867                         ctx->request,
868                         "No active transaction -- required for savepoints"
869                 );
870                 return -1;
871         }
872
873         // Get the savepoint name from the method params
874         const char* spName = jsonObjectGetString( jsonObjectGetIndex(ctx->params, spNamePos) );
875
876         dbi_result result = dbi_conn_queryf( writehandle, "SAVEPOINT \"%s\";", spName );
877         if( !result ) {
878                 const char* msg;
879                 int errnum = dbi_conn_error( writehandle, &msg );
880                 osrfLogError(
881                         OSRF_LOG_MARK,
882                         "%s: Error creating savepoint %s in transaction %s: %d %s",
883                         modulename,
884                         spName,
885                         trans_id,
886                         errnum,
887                         msg ? msg : "(No description available)"
888                 );
889                 osrfAppSessionStatus( ctx->session, OSRF_STATUS_INTERNALSERVERERROR,
890                         "osrfMethodException", ctx->request, "Error creating savepoint" );
891                 if( !oilsIsDBConnected( writehandle ))
892                         osrfAppSessionPanic( ctx->session );
893                 return -1;
894         } else {
895                 dbi_result_free( result );
896                 jsonObject* ret = jsonNewObject( spName );
897                 osrfAppRespondComplete( ctx, ret );
898                 jsonObjectFree( ret  );
899                 return 0;
900         }
901 }
902
903 /**
904         @brief Implement the savepoint.release method.
905         @param ctx Pointer to the method context.
906         @return Zero if successful, or -1 if not.
907
908         Issue a RELEASE SAVEPOINT to the database server.
909
910         Method parameters:
911         - authkey (PCRUD only)
912         - savepoint name
913
914         Return to client: Savepoint name
915 */
916 int releaseSavepoint( osrfMethodContext* ctx ) {
917         if(osrfMethodVerifyContext( ctx )) {
918                 osrfLogError( OSRF_LOG_MARK,  "Invalid method context" );
919                 return -1;
920         }
921
922         int spNamePos = 0;
923         if( enforce_pcrud ) {
924                 spNamePos = 1;
925                 timeout_needs_resetting = 1;
926                 const jsonObject* user = verifyUserPCRUD( ctx );
927                 if(  !user )
928                         return -1;
929         }
930
931         // Verify that a transaction is pending
932         const char* trans_id = getXactId( ctx );
933         if( NULL == trans_id ) {
934                 osrfAppSessionStatus(
935                         ctx->session,
936                         OSRF_STATUS_INTERNALSERVERERROR,
937                         "osrfMethodException",
938                         ctx->request,
939                         "No active transaction -- required for savepoints"
940                 );
941                 return -1;
942         }
943
944         // Get the savepoint name from the method params
945         const char* spName = jsonObjectGetString( jsonObjectGetIndex(ctx->params, spNamePos) );
946
947         dbi_result result = dbi_conn_queryf( writehandle, "RELEASE SAVEPOINT \"%s\";", spName );
948         if( !result ) {
949                 const char* msg;
950                 int errnum = dbi_conn_error( writehandle, &msg );
951                 osrfLogError(
952                         OSRF_LOG_MARK,
953                         "%s: Error releasing savepoint %s in transaction %s: %d %s",
954                         modulename,
955                         spName,
956                         trans_id,
957                         errnum,
958                         msg ? msg : "(No description available)"
959                 );
960                 osrfAppSessionStatus( ctx->session, OSRF_STATUS_INTERNALSERVERERROR,
961                         "osrfMethodException", ctx->request, "Error releasing savepoint" );
962                 if( !oilsIsDBConnected( writehandle ))
963                         osrfAppSessionPanic( ctx->session );
964                 return -1;
965         } else {
966                 dbi_result_free( result );
967                 jsonObject* ret = jsonNewObject( spName );
968                 osrfAppRespondComplete( ctx, ret );
969                 jsonObjectFree( ret );
970                 return 0;
971         }
972 }
973
974 /**
975         @brief Implement the savepoint.rollback method.
976         @param ctx Pointer to the method context.
977         @return Zero if successful, or -1 if not.
978
979         Issue a ROLLBACK TO SAVEPOINT to the database server.
980
981         Method parameters:
982         - authkey (PCRUD only)
983         - savepoint name
984
985         Return to client: Savepoint name
986 */
987 int rollbackSavepoint( osrfMethodContext* ctx ) {
988         if(osrfMethodVerifyContext( ctx )) {
989                 osrfLogError( OSRF_LOG_MARK,  "Invalid method context" );
990                 return -1;
991         }
992
993         int spNamePos = 0;
994         if( enforce_pcrud ) {
995                 spNamePos = 1;
996                 timeout_needs_resetting = 1;
997                 const jsonObject* user = verifyUserPCRUD( ctx );
998                 if( !user )
999                         return -1;
1000         }
1001
1002         // Verify that a transaction is pending
1003         const char* trans_id = getXactId( ctx );
1004         if( NULL == trans_id ) {
1005                 osrfAppSessionStatus(
1006                         ctx->session,
1007                         OSRF_STATUS_INTERNALSERVERERROR,
1008                         "osrfMethodException",
1009                         ctx->request,
1010                         "No active transaction -- required for savepoints"
1011                 );
1012                 return -1;
1013         }
1014
1015         // Get the savepoint name from the method params
1016         const char* spName = jsonObjectGetString( jsonObjectGetIndex(ctx->params, spNamePos) );
1017
1018         dbi_result result = dbi_conn_queryf( writehandle, "ROLLBACK TO SAVEPOINT \"%s\";", spName );
1019         if( !result ) {
1020                 const char* msg;
1021                 int errnum = dbi_conn_error( writehandle, &msg );
1022                 osrfLogError(
1023                         OSRF_LOG_MARK,
1024                         "%s: Error rolling back savepoint %s in transaction %s: %d %s",
1025                         modulename,
1026                         spName,
1027                         trans_id,
1028                         errnum,
1029                         msg ? msg : "(No description available)"
1030                 );
1031                 osrfAppSessionStatus( ctx->session, OSRF_STATUS_INTERNALSERVERERROR,
1032                         "osrfMethodException", ctx->request, "Error rolling back savepoint" );
1033                 if( !oilsIsDBConnected( writehandle ))
1034                         osrfAppSessionPanic( ctx->session );
1035                 return -1;
1036         } else {
1037                 dbi_result_free( result );
1038                 jsonObject* ret = jsonNewObject( spName );
1039                 osrfAppRespondComplete( ctx, ret );
1040                 jsonObjectFree( ret );
1041                 return 0;
1042         }
1043 }
1044
1045 /**
1046         @brief Implement the transaction.commit method.
1047         @param ctx Pointer to the method context.
1048         @return Zero if successful, or -1 if not.
1049
1050         Issue a COMMIT to the database server.
1051
1052         Method parameters:
1053         - authkey (PCRUD only)
1054
1055         Return to client: Transaction ID.
1056 */
1057 int commitTransaction( osrfMethodContext* ctx ) {
1058         if(osrfMethodVerifyContext( ctx )) {
1059                 osrfLogError( OSRF_LOG_MARK, "Invalid method context" );
1060                 return -1;
1061         }
1062
1063         if( enforce_pcrud ) {
1064                 timeout_needs_resetting = 1;
1065                 const jsonObject* user = verifyUserPCRUD( ctx );
1066                 if( !user )
1067                         return -1;
1068         }
1069
1070         // Verify that a transaction is pending
1071         const char* trans_id = getXactId( ctx );
1072         if( NULL == trans_id ) {
1073                 osrfAppSessionStatus( ctx->session, OSRF_STATUS_INTERNALSERVERERROR,
1074                                 "osrfMethodException", ctx->request, "No active transaction to commit" );
1075                 return -1;
1076         }
1077
1078         dbi_result result = dbi_conn_query( writehandle, "COMMIT;" );
1079         if( !result ) {
1080                 const char* msg;
1081                 int errnum = dbi_conn_error( writehandle, &msg );
1082                 osrfLogError( OSRF_LOG_MARK, "%s: Error committing transaction: %d %s",
1083                         modulename, errnum, msg ? msg : "(No description available)" );
1084                 osrfAppSessionStatus( ctx->session, OSRF_STATUS_INTERNALSERVERERROR,
1085                         "osrfMethodException", ctx->request, "Error committing transaction" );
1086                 if( !oilsIsDBConnected( writehandle ))
1087                         osrfAppSessionPanic( ctx->session );
1088                 return -1;
1089         } else {
1090                 dbi_result_free( result );
1091                 jsonObject* ret = jsonNewObject( trans_id );
1092                 osrfAppRespondComplete( ctx, ret );
1093                 jsonObjectFree( ret );
1094                 clearXactId( ctx );
1095                 return 0;
1096         }
1097 }
1098
1099 /**
1100         @brief Implement the transaction.rollback method.
1101         @param ctx Pointer to the method context.
1102         @return Zero if successful, or -1 if not.
1103
1104         Issue a ROLLBACK to the database server.
1105
1106         Method parameters:
1107         - authkey (PCRUD only)
1108
1109         Return to client: Transaction ID
1110 */
1111 int rollbackTransaction( osrfMethodContext* ctx ) {
1112         if( osrfMethodVerifyContext( ctx )) {
1113                 osrfLogError( OSRF_LOG_MARK,  "Invalid method context" );
1114                 return -1;
1115         }
1116
1117         if( enforce_pcrud ) {
1118                 timeout_needs_resetting = 1;
1119                 const jsonObject* user = verifyUserPCRUD( ctx );
1120                 if( !user )
1121                         return -1;
1122         }
1123
1124         // Verify that a transaction is pending
1125         const char* trans_id = getXactId( ctx );
1126         if( NULL == trans_id ) {
1127                 osrfAppSessionStatus( ctx->session, OSRF_STATUS_INTERNALSERVERERROR,
1128                                 "osrfMethodException", ctx->request, "No active transaction to roll back" );
1129                 return -1;
1130         }
1131
1132         dbi_result result = dbi_conn_query( writehandle, "ROLLBACK;" );
1133         if( !result ) {
1134                 const char* msg;
1135                 int errnum = dbi_conn_error( writehandle, &msg );
1136                 osrfLogError( OSRF_LOG_MARK, "%s: Error rolling back transaction: %d %s",
1137                         modulename, errnum, msg ? msg : "(No description available)" );
1138                 osrfAppSessionStatus( ctx->session, OSRF_STATUS_INTERNALSERVERERROR,
1139                         "osrfMethodException", ctx->request, "Error rolling back transaction" );
1140                 if( !oilsIsDBConnected( writehandle ))
1141                         osrfAppSessionPanic( ctx->session );
1142                 return -1;
1143         } else {
1144                 dbi_result_free( result );
1145                 jsonObject* ret = jsonNewObject( trans_id );
1146                 osrfAppRespondComplete( ctx, ret );
1147                 jsonObjectFree( ret );
1148                 clearXactId( ctx );
1149                 return 0;
1150         }
1151 }
1152
1153 /**
1154         @brief Implement the "search" method.
1155         @param ctx Pointer to the method context.
1156         @return Zero if successful, or -1 if not.
1157
1158         Method parameters:
1159         - authkey (PCRUD only)
1160         - WHERE clause, as jsonObject
1161         - Other SQL clause(s), as a JSON_HASH: joins, SELECT list, LIMIT, etc.
1162
1163         Return to client: rows of the specified class that satisfy a specified WHERE clause.
1164         Optionally flesh linked fields.
1165 */
1166 int doSearch( osrfMethodContext* ctx ) {
1167         if( osrfMethodVerifyContext( ctx )) {
1168                 osrfLogError( OSRF_LOG_MARK, "Invalid method context" );
1169                 return -1;
1170         }
1171
1172         if( enforce_pcrud )
1173                 timeout_needs_resetting = 1;
1174
1175         jsonObject* where_clause;
1176         jsonObject* rest_of_query;
1177
1178         if( enforce_pcrud ) {
1179                 where_clause  = jsonObjectGetIndex( ctx->params, 1 );
1180                 rest_of_query = jsonObjectGetIndex( ctx->params, 2 );
1181         } else {
1182                 where_clause  = jsonObjectGetIndex( ctx->params, 0 );
1183                 rest_of_query = jsonObjectGetIndex( ctx->params, 1 );
1184         }
1185
1186         // Get the class metadata
1187         osrfHash* method_meta = (osrfHash*) ctx->method->userData;
1188         osrfHash* class_meta = osrfHashGet( method_meta, "class" );
1189
1190         // Do the query
1191         int err = 0;
1192         jsonObject* obj = doFieldmapperSearch( ctx, class_meta, where_clause, rest_of_query, &err );
1193         if( err ) {
1194                 osrfAppRespondComplete( ctx, NULL );
1195                 return -1;
1196         }
1197
1198         // Return each row to the client (except that some may be suppressed by PCRUD)
1199         jsonObject* cur = 0;
1200         unsigned long res_idx = 0;
1201         while((cur = jsonObjectGetIndex( obj, res_idx++ ) )) {
1202                 if( enforce_pcrud && !verifyObjectPCRUD( ctx, cur, obj->size ))
1203                         continue;
1204                 osrfAppRespond( ctx, cur );
1205         }
1206         jsonObjectFree( obj );
1207
1208         osrfAppRespondComplete( ctx, NULL );
1209         return 0;
1210 }
1211
1212 /**
1213         @brief Implement the "id_list" method.
1214         @param ctx Pointer to the method context.
1215         @param err Pointer through which to return an error code.
1216         @return Zero if successful, or -1 if not.
1217
1218         Method parameters:
1219         - authkey (PCRUD only)
1220         - WHERE clause, as jsonObject
1221         - Other SQL clause(s), as a JSON_HASH: joins, LIMIT, etc.
1222
1223         Return to client: The primary key values for all rows of the relevant class that
1224         satisfy a specified WHERE clause.
1225
1226         This method relies on the assumption that every class has a primary key consisting of
1227         a single column.
1228 */
1229 int doIdList( osrfMethodContext* ctx ) {
1230         if( osrfMethodVerifyContext( ctx )) {
1231                 osrfLogError( OSRF_LOG_MARK, "Invalid method context" );
1232                 return -1;
1233         }
1234
1235         if( enforce_pcrud )
1236                 timeout_needs_resetting = 1;
1237
1238         jsonObject* where_clause;
1239         jsonObject* rest_of_query;
1240
1241         // We use the where clause without change.  But we need to massage the rest of the
1242         // query, so we work with a copy of it instead of modifying the original.
1243
1244         if( enforce_pcrud ) {
1245                 where_clause  = jsonObjectGetIndex( ctx->params, 1 );
1246                 rest_of_query = jsonObjectClone( jsonObjectGetIndex( ctx->params, 2 ) );
1247         } else {
1248                 where_clause  = jsonObjectGetIndex( ctx->params, 0 );
1249                 rest_of_query = jsonObjectClone( jsonObjectGetIndex( ctx->params, 1 ) );
1250         }
1251
1252         // Eliminate certain SQL clauses, if present.
1253         if( rest_of_query ) {
1254                 jsonObjectRemoveKey( rest_of_query, "select" );
1255                 jsonObjectRemoveKey( rest_of_query, "no_i18n" );
1256                 jsonObjectRemoveKey( rest_of_query, "flesh" );
1257                 jsonObjectRemoveKey( rest_of_query, "flesh_fields" );
1258         } else {
1259                 rest_of_query = jsonNewObjectType( JSON_HASH );
1260         }
1261
1262         jsonObjectSetKey( rest_of_query, "no_i18n", jsonNewBoolObject( 1 ) );
1263
1264         // Get the class metadata
1265         osrfHash* method_meta = (osrfHash*) ctx->method->userData;
1266         osrfHash* class_meta = osrfHashGet( method_meta, "class" );
1267
1268         // Build a SELECT list containing just the primary key,
1269         // i.e. like { "classname":["keyname"] }
1270         jsonObject* col_list_obj = jsonNewObjectType( JSON_ARRAY );
1271
1272         // Load array with name of primary key
1273         jsonObjectPush( col_list_obj, jsonNewObject( osrfHashGet( class_meta, "primarykey" ) ) );
1274         jsonObject* select_clause = jsonNewObjectType( JSON_HASH );
1275         jsonObjectSetKey( select_clause, osrfHashGet( class_meta, "classname" ), col_list_obj );
1276
1277         jsonObjectSetKey( rest_of_query, "select", select_clause );
1278
1279         // Do the query
1280         int err = 0;
1281         jsonObject* obj =
1282                 doFieldmapperSearch( ctx, class_meta, where_clause, rest_of_query, &err );
1283
1284         jsonObjectFree( rest_of_query );
1285         if( err ) {
1286                 osrfAppRespondComplete( ctx, NULL );
1287                 return -1;
1288         }
1289
1290         // Return each primary key value to the client
1291         jsonObject* cur;
1292         unsigned long res_idx = 0;
1293         while((cur = jsonObjectGetIndex( obj, res_idx++ ) )) {
1294                 if( enforce_pcrud && !verifyObjectPCRUD( ctx, cur, obj->size ))
1295                         continue;        // Suppress due to lack of permission
1296                 else
1297                         osrfAppRespond( ctx,
1298                                 oilsFMGetObject( cur, osrfHashGet( class_meta, "primarykey" ) ) );
1299         }
1300
1301         jsonObjectFree( obj );
1302         osrfAppRespondComplete( ctx, NULL );
1303         return 0;
1304 }
1305
1306 /**
1307         @brief Verify that we have a valid class reference.
1308         @param ctx Pointer to the method context.
1309         @param param Pointer to the method parameters.
1310         @return 1 if the class reference is valid, or zero if it isn't.
1311
1312         The class of the method params must match the class to which the method id devoted.
1313         For PCRUD there are additional restrictions.
1314 */
1315 static int verifyObjectClass ( osrfMethodContext* ctx, const jsonObject* param ) {
1316
1317         osrfHash* method_meta = (osrfHash*) ctx->method->userData;
1318         osrfHash* class = osrfHashGet( method_meta, "class" );
1319
1320         // Compare the method's class to the parameters' class
1321         if( !param->classname || (strcmp( osrfHashGet(class, "classname"), param->classname ))) {
1322
1323                 // Oops -- they don't match.  Complain.
1324                 growing_buffer* msg = buffer_init( 128 );
1325                 buffer_fadd(
1326                         msg,
1327                         "%s: %s method for type %s was passed a %s",
1328                         modulename,
1329                         osrfHashGet( method_meta, "methodtype" ),
1330                         osrfHashGet( class, "classname" ),
1331                         param->classname ? param->classname : "(null)"
1332                 );
1333
1334                 char* m = buffer_release( msg );
1335                 osrfAppSessionStatus( ctx->session, OSRF_STATUS_BADREQUEST, "osrfMethodException",
1336                                 ctx->request, m );
1337                 free( m );
1338
1339                 return 0;
1340         }
1341
1342         if( enforce_pcrud )
1343                 return verifyObjectPCRUD( ctx, param, 1 );
1344         else
1345                 return 1;
1346 }
1347
1348 /**
1349         @brief (PCRUD only) Verify that the user is properly logged in.
1350         @param ctx Pointer to the method context.
1351         @return If the user is logged in, a pointer to the user object from the authentication
1352         server; otherwise NULL.
1353 */
1354 static const jsonObject* verifyUserPCRUD( osrfMethodContext* ctx ) {
1355
1356         // Get the authkey (the first method parameter)
1357         const char* auth = jsonObjectGetString( jsonObjectGetIndex( ctx->params, 0 ) );
1358
1359         // See if we have the same authkey, and a user object,
1360         // locally cached from a previous call
1361         const char* cached_authkey = getAuthkey( ctx );
1362         if( cached_authkey && !strcmp( cached_authkey, auth ) ) {
1363                 const jsonObject* cached_user = getUserLogin( ctx );
1364                 if( cached_user )
1365                         return cached_user;
1366         }
1367
1368         // We have no matching authentication data in the cache.  Authenticate from scratch.
1369         jsonObject* auth_object = jsonNewObject( auth );
1370
1371         // Fetch the user object from the authentication server
1372         jsonObject* user = oilsUtilsQuickReq( "open-ils.auth", "open-ils.auth.session.retrieve",
1373                         auth_object );
1374         jsonObjectFree( auth_object );
1375
1376         if( !user->classname || strcmp(user->classname, "au" )) {
1377
1378                 growing_buffer* msg = buffer_init( 128 );
1379                 buffer_fadd(
1380                         msg,
1381                         "%s: permacrud received a bad auth token: %s",
1382                         modulename,
1383                         auth
1384                 );
1385
1386                 char* m = buffer_release( msg );
1387                 osrfAppSessionStatus( ctx->session, OSRF_STATUS_UNAUTHORIZED, "osrfMethodException",
1388                                 ctx->request, m );
1389
1390                 free( m );
1391                 jsonObjectFree( user );
1392                 user = NULL;
1393         }
1394
1395         setUserLogin( ctx, user );
1396         setAuthkey( ctx, auth );
1397
1398         // Allow ourselves up to a second before we have to reset the login timeout.
1399         // It would be nice to use some fraction of the timeout interval enforced by the
1400         // authentication server, but that value is not readily available at this point.
1401         // Instead, we use a conservative default interval.
1402         time_next_reset = time( NULL ) + 1;
1403
1404         return user;
1405 }
1406
1407 /**
1408         @brief For PCRUD: Determine whether the current user may access the current row.
1409         @param ctx Pointer to the method context.
1410         @param obj Pointer to the row being potentially accessed.
1411         @return 1 if access is permitted, or 0 if it isn't.
1412
1413         The @a obj parameter points to a JSON_HASH of column values, keyed on column name.
1414 */
1415 static int verifyObjectPCRUD (  osrfMethodContext* ctx, const jsonObject* obj, const int rs_size ) {
1416
1417         dbhandle = writehandle;
1418
1419         // Figure out what class and method are involved
1420         osrfHash* method_metadata = (osrfHash*) ctx->method->userData;
1421         osrfHash* class = osrfHashGet( method_metadata, "class" );
1422         const char* method_type = osrfHashGet( method_metadata, "methodtype" );
1423
1424         // Set fetch to 1 in all cases except for inserts, meaning that for local or foreign
1425         // contexts we will do another lookup of the current row, even if we already have a
1426         // previously fetched row image, because the row image in hand may not include the
1427         // foreign key(s) that we need.
1428
1429         // This is a quick fix with a bludgeon.  There are ways to avoid the extra lookup,
1430         // but they aren't implemented yet.
1431
1432         int fetch = 0;
1433         if( *method_type == 's' || *method_type == 'i' ) {
1434                 method_type = "retrieve"; // search and id_list are equivalent to retrieve for this
1435                 fetch = 1;
1436         } else if( *method_type == 'u' || *method_type == 'd' ) {
1437                 fetch = 1; // MUST go to the db for the object for update and delete
1438         }
1439
1440         // Get the appropriate permacrud entry from the IDL, depending on method type
1441         osrfHash* pcrud = osrfHashGet( osrfHashGet( class, "permacrud" ), method_type );
1442         if( !pcrud ) {
1443                 // No permacrud for this method type on this class
1444
1445                 growing_buffer* msg = buffer_init( 128 );
1446                 buffer_fadd(
1447                         msg,
1448                         "%s: %s on class %s has no permacrud IDL entry",
1449                         modulename,
1450                         osrfHashGet( method_metadata, "methodtype" ),
1451                         osrfHashGet( class, "classname" )
1452                 );
1453
1454                 char* m = buffer_release( msg );
1455                 osrfAppSessionStatus( ctx->session, OSRF_STATUS_FORBIDDEN,
1456                                 "osrfMethodException", ctx->request, m );
1457
1458                 free( m );
1459
1460                 return 0;
1461         }
1462
1463         // Get the user id, and make sure the user is logged in
1464         const jsonObject* user = verifyUserPCRUD( ctx );
1465         if( !user )
1466                 return 0;    // Not logged in?  No access.
1467
1468         int userid = atoi( oilsFMGetStringConst( user, "id" ) );
1469
1470         // Get a list of permissions from the permacrud entry.
1471         osrfStringArray* permission = osrfHashGet( pcrud, "permission" );
1472         if( permission->size == 0 ) {
1473                 osrfLogDebug( OSRF_LOG_MARK, "No permissions required for this action, passing through" );
1474                 return 1;
1475         }
1476
1477         // Build a list of org units that own the row.  This is fairly convoluted because there
1478         // are several different ways that an org unit may own the row, as defined by the
1479         // permacrud entry.
1480
1481         // Local context means that the row includes a foreign key pointing to actor.org_unit,
1482         // identifying an owning org_unit..
1483         osrfStringArray* local_context = osrfHashGet( pcrud, "local_context" );
1484
1485         // Foreign context adds a layer of indirection.  The row points to some other row that
1486         // an org unit may own.  The "jump" attribute, if present, adds another layer of
1487         // indirection.
1488         osrfHash* foreign_context = osrfHashGet( pcrud, "foreign_context" );
1489
1490         // The following string array stores the list of org units.  (We don't have a thingie
1491         // for storing lists of integers, so we fake it with a list of strings.)
1492         osrfStringArray* context_org_array = osrfNewStringArray( 1 );
1493
1494         int err = 0;
1495         const char* pkey_value = NULL;
1496         if( str_is_true( osrfHashGet(pcrud, "global_required") ) ) {
1497                 // If the global_required attribute is present and true, then the only owning
1498                 // org unit is the root org unit, i.e. the one with no parent.
1499                 osrfLogDebug( OSRF_LOG_MARK,
1500                                 "global-level permissions required, fetching top of the org tree" );
1501
1502                 // check for perm at top of org tree
1503                 const char* org_tree_root_id = org_tree_root( ctx );
1504                 if( org_tree_root_id ) {
1505                         osrfStringArrayAdd( context_org_array, org_tree_root_id );
1506                         osrfLogDebug( OSRF_LOG_MARK, "top of the org tree is %s", org_tree_root_id );
1507                 } else  {
1508                         osrfStringArrayFree( context_org_array );
1509                         return 0;
1510                 }
1511
1512         } else {
1513                 // If the global_required attribute is absent or false, then we look for
1514                 // local and/or foreign context.  In order to find the relevant foreign
1515                 // keys, we must either read the relevant row from the database, or look at
1516                 // the image of the row that we already have in memory.
1517
1518                 // Even if we have an image of the row in memory, that image may not include the
1519                 // foreign key column(s) that we need.  So whenever possible, we do a fresh read
1520                 // of the row to make sure that we have what we need.
1521
1522             osrfLogDebug( OSRF_LOG_MARK, "global-level permissions not required, "
1523                                 "fetching context org ids" );
1524             const char* pkey = osrfHashGet( class, "primarykey" );
1525                 jsonObject *param = NULL;
1526
1527                 if( !pkey ) {
1528                         // There is no primary key, so we can't do a fresh lookup.  Use the row
1529                         // image that we already have.  If it doesn't have everything we need, too bad.
1530                         fetch = 0;
1531                         param = jsonObjectClone( obj );
1532                         osrfLogDebug( OSRF_LOG_MARK, "No primary key; using clone of object" );
1533                 } else if( obj->classname ) {
1534                         pkey_value = oilsFMGetStringConst( obj, pkey );
1535                         if( !fetch )
1536                                 param = jsonObjectClone( obj );
1537                         osrfLogDebug( OSRF_LOG_MARK, "Object supplied, using primary key value of %s",
1538                                 pkey_value );
1539                 } else {
1540                         pkey_value = jsonObjectGetString( obj );
1541                         fetch = 1;
1542                         osrfLogDebug( OSRF_LOG_MARK, "Object not supplied, using primary key value "
1543                                 "of %s and retrieving from the database", pkey_value );
1544                 }
1545
1546                 if( fetch ) {
1547                         // Fetch the row so that we can look at the foreign key(s)
1548                         jsonObject* _tmp_params = single_hash( pkey, pkey_value );
1549                         jsonObject* _list = doFieldmapperSearch( ctx, class, _tmp_params, NULL, &err );
1550                         jsonObjectFree( _tmp_params );
1551
1552                         param = jsonObjectExtractIndex( _list, 0 );
1553                         jsonObjectFree( _list );
1554                 }
1555
1556                 if( !param ) {
1557                         // The row doesn't exist.  Complain, and deny access.
1558                         osrfLogDebug( OSRF_LOG_MARK,
1559                                         "Object not found in the database with primary key %s of %s",
1560                                         pkey, pkey_value );
1561
1562                         growing_buffer* msg = buffer_init( 128 );
1563                         buffer_fadd(
1564                                 msg,
1565                                 "%s: no object found with primary key %s of %s",
1566                                 modulename,
1567                                 pkey,
1568                                 pkey_value
1569                         );
1570
1571                         char* m = buffer_release( msg );
1572                         osrfAppSessionStatus(
1573                                 ctx->session,
1574                                 OSRF_STATUS_INTERNALSERVERERROR,
1575                                 "osrfMethodException",
1576                                 ctx->request,
1577                                 m
1578                         );
1579
1580                         free( m );
1581                         return 0;
1582                 }
1583
1584                 if( local_context && local_context->size > 0 ) {
1585                         // The IDL provides a list of column names for the foreign keys denoting
1586                         // local context, i.e. columns identifying owing org units directly.  Look up
1587                         // the value of each one, and if it isn't null, add it to the list of org units.
1588                         osrfLogDebug( OSRF_LOG_MARK, "%d class-local context field(s) specified",
1589                                 local_context->size );
1590                         int i = 0;
1591                         const char* lcontext = NULL;
1592                         while ( (lcontext = osrfStringArrayGetString(local_context, i++)) ) {
1593                                 const char* fkey_value = oilsFMGetStringConst( param, lcontext );
1594                                 if( fkey_value ) {    // if not null
1595                                         osrfStringArrayAdd( context_org_array, fkey_value );
1596                                         osrfLogDebug(
1597                                                 OSRF_LOG_MARK,
1598                                                 "adding class-local field %s (value: %s) to the context org list",
1599                                                 lcontext,
1600                                                 osrfStringArrayGetString( context_org_array, context_org_array->size - 1 )
1601                                         );
1602                                 }
1603                         }
1604                 }
1605
1606                 if( foreign_context ) {
1607                         unsigned long class_count = osrfHashGetCount( foreign_context );
1608                         osrfLogDebug( OSRF_LOG_MARK, "%d foreign context classes(s) specified", class_count );
1609
1610                         if( class_count > 0 ) {
1611
1612                                 // The IDL provides a list of foreign key columns pointing to rows that
1613                                 // an org unit may own.  Follow each link, identify the owning org unit,
1614                                 // and add it to the list.
1615                                 osrfHash* fcontext = NULL;
1616                                 osrfHashIterator* class_itr = osrfNewHashIterator( foreign_context );
1617                                 while( (fcontext = osrfHashIteratorNext( class_itr )) ) {
1618                                         // For each class to which a foreign key points:
1619                                         const char* class_name = osrfHashIteratorKey( class_itr );
1620                                         osrfHash* fcontext = osrfHashGet( foreign_context, class_name );
1621
1622                                         osrfLogDebug(
1623                                                 OSRF_LOG_MARK,
1624                                                 "%d foreign context fields(s) specified for class %s",
1625                                                 ((osrfStringArray*)osrfHashGet(fcontext,"context"))->size,
1626                                                 class_name
1627                                         );
1628
1629                                         // Get the name of the key field in the foreign table
1630                                         const char* foreign_pkey = osrfHashGet( fcontext, "field" );
1631
1632                                         // Get the value of the foreign key pointing to the foreign table
1633                                         char* foreign_pkey_value =
1634                                                         oilsFMGetString( param, osrfHashGet( fcontext, "fkey" ));
1635                                         if( !foreign_pkey_value )
1636                                                 continue;    // Foreign key value is null; skip it
1637
1638                                         // Look up the row to which the foreign key points
1639                                         jsonObject* _tmp_params = single_hash( foreign_pkey, foreign_pkey_value );
1640                                         jsonObject* _list = doFieldmapperSearch(
1641                                                 ctx, osrfHashGet( oilsIDL(), class_name ), _tmp_params, NULL, &err );
1642
1643                                         jsonObject* _fparam = NULL;
1644                                         if( _list && JSON_ARRAY == _list->type && _list->size > 0 )
1645                                                 _fparam = jsonObjectExtractIndex( _list, 0 );
1646
1647                                         jsonObjectFree( _tmp_params );
1648                                         jsonObjectFree( _list );
1649
1650                                         // At this point _fparam either points to the row identified by the
1651                                         // foreign key, or it's NULL (no such row found).
1652
1653                                         osrfStringArray* jump_list = osrfHashGet( fcontext, "jump" );
1654
1655                                         const char* bad_class = NULL;  // For noting failed lookups
1656                                         if( ! _fparam )
1657                                                 bad_class = class_name;    // Referenced row not found
1658                                         else if( jump_list ) {
1659                                                 // Follow a chain of rows, linked by foreign keys, to find an owner
1660                                                 const char* flink = NULL;
1661                                                 int k = 0;
1662                                                 while ( (flink = osrfStringArrayGetString(jump_list, k++)) && _fparam ) {
1663                                                         // For each entry in the jump list.  Each entry (i.e. flink) is
1664                                                         // the name of a foreign key column in the current row.
1665
1666                                                         // From the IDL, get the linkage information for the next jump
1667                                                         osrfHash* foreign_link_hash =
1668                                                                         oilsIDLFindPath( "/%s/links/%s", _fparam->classname, flink );
1669
1670                                                         // Get the class metadata for the class
1671                                                         // to which the foreign key points
1672                                                         osrfHash* foreign_class_meta = osrfHashGet( oilsIDL(),
1673                                                                         osrfHashGet( foreign_link_hash, "class" ));
1674
1675                                                         // Get the name of the referenced key of that class
1676                                                         foreign_pkey = osrfHashGet( foreign_link_hash, "key" );
1677
1678                                                         // Get the value of the foreign key pointing to that class
1679                                                         free( foreign_pkey_value );
1680                                                         foreign_pkey_value = oilsFMGetString( _fparam, flink );
1681                                                         if( !foreign_pkey_value )
1682                                                                 break;    // Foreign key is null; quit looking
1683
1684                                                         // Build a WHERE clause for the lookup
1685                                                         _tmp_params = single_hash( foreign_pkey, foreign_pkey_value );
1686
1687                                                         // Do the lookup
1688                                                         _list = doFieldmapperSearch( ctx, foreign_class_meta,
1689                                                                         _tmp_params, NULL, &err );
1690
1691                                                         // Get the resulting row
1692                                                         jsonObjectFree( _fparam );
1693                                                         if( _list && JSON_ARRAY == _list->type && _list->size > 0 )
1694                                                                 _fparam = jsonObjectExtractIndex( _list, 0 );
1695                                                         else {
1696                                                                 // Referenced row not found
1697                                                                 _fparam = NULL;
1698                                                                 bad_class = osrfHashGet( foreign_link_hash, "class" );
1699                                                         }
1700
1701                                                         jsonObjectFree( _tmp_params );
1702                                                         jsonObjectFree( _list );
1703                                                 }
1704                                         }
1705
1706                                         if( bad_class ) {
1707
1708                                                 // We had a foreign key pointing to such-and-such a row, but then
1709                                                 // we couldn't fetch that row.  The data in the database are in an
1710                                                 // inconsistent state; the database itself may even be corrupted.
1711                                                 growing_buffer* msg = buffer_init( 128 );
1712                                                 buffer_fadd(
1713                                                         msg,
1714                                                         "%s: no object of class %s found with primary key %s of %s",
1715                                                         modulename,
1716                                                         bad_class,
1717                                                         foreign_pkey,
1718                                                         foreign_pkey_value ? foreign_pkey_value : "(null)"
1719                                                 );
1720
1721                                                 char* m = buffer_release( msg );
1722                                                 osrfAppSessionStatus(
1723                                                         ctx->session,
1724                                                         OSRF_STATUS_INTERNALSERVERERROR,
1725                                                         "osrfMethodException",
1726                                                         ctx->request,
1727                                                         m
1728                                                 );
1729
1730                                                 free( m );
1731                                                 osrfHashIteratorFree( class_itr );
1732                                                 free( foreign_pkey_value );
1733                                                 jsonObjectFree( param );
1734
1735                                                 return 0;
1736                                         }
1737
1738                                         free( foreign_pkey_value );
1739
1740                                         if( _fparam ) {
1741                                                 // Examine each context column of the foreign row,
1742                                                 // and add its value to the list of org units.
1743                                                 int j = 0;
1744                                                 const char* foreign_field = NULL;
1745                                                 osrfStringArray* ctx_array = osrfHashGet( fcontext, "context" );
1746                                                 while ( (foreign_field = osrfStringArrayGetString( ctx_array, j++ )) ) {
1747                                                         osrfStringArrayAdd( context_org_array,
1748                                                                 oilsFMGetStringConst( _fparam, foreign_field ));
1749                                                         osrfLogDebug( OSRF_LOG_MARK,
1750                                                                 "adding foreign class %s field %s (value: %s) "
1751                                                                         "to the context org list",
1752                                                                 class_name,
1753                                                                 foreign_field,
1754                                                                 osrfStringArrayGetString(
1755                                                                         context_org_array, context_org_array->size - 1 )
1756                                                         );
1757                                                 }
1758
1759                                                 jsonObjectFree( _fparam );
1760                                         }
1761                                 }
1762
1763                                 osrfHashIteratorFree( class_itr );
1764                         }
1765                 }
1766
1767                 jsonObjectFree( param );
1768         }
1769
1770         const char* context_org = NULL;
1771         const char* perm = NULL;
1772         int OK = 0;
1773
1774         // For every combination of permission and context org unit: call a stored procedure
1775         // to determine if the user has this permission in the context of this org unit.
1776         // If the answer is yes at any point, then we're done, and the user has permission.
1777         // In other words permissions are additive.
1778         int i = 0;
1779         while( (perm = osrfStringArrayGetString(permission, i++)) ) {
1780                 dbi_result result;
1781
1782         osrfStringArray* pcache = NULL;
1783         if (rs_size > perm_at_threshold) { // grab and cache locations of user perms
1784                         pcache = getPermLocationCache(ctx, perm);
1785
1786                         if (!pcache) {
1787                         pcache = osrfNewStringArray(0);
1788         
1789                                 result = dbi_conn_queryf(
1790                                         writehandle,
1791                                         "SELECT permission.usr_has_perm_at_all(%d, '%s') AS at;",
1792                                         userid,
1793                                         perm
1794                                 );
1795                 
1796                                 if( result ) {
1797                                         osrfLogDebug(
1798                                                 OSRF_LOG_MARK,
1799                                                 "Received a result for permission [%s] for user %d",
1800                                                 perm,
1801                                                 userid
1802                                         );
1803                 
1804                                         if( dbi_result_first_row( result )) {
1805                             do {
1806                                                 jsonObject* return_val = oilsMakeJSONFromResult( result );
1807                                                 osrfStringArrayAdd( pcache, jsonObjectGetString( jsonObjectGetKeyConst( return_val, "at" ) ) );
1808                                 jsonObjectFree( return_val );
1809                                             } while( dbi_result_next_row( result ));
1810
1811                                                 setPermLocationCache(ctx, perm, pcache);
1812                                         }
1813                 
1814                                         dbi_result_free( result );
1815                     }
1816                         }
1817         }
1818
1819                 int j = 0;
1820                 while( (context_org = osrfStringArrayGetString( context_org_array, j++ )) ) {
1821
1822             if (rs_size > perm_at_threshold) {
1823                 if (osrfStringArrayContains( pcache, context_org )) {
1824                     OK = 1;
1825                     break;
1826                 }
1827             }
1828
1829                         if( pkey_value ) {
1830                                 osrfLogDebug(
1831                                         OSRF_LOG_MARK,
1832                                         "Checking object permission [%s] for user %d "
1833                                                         "on object %s (class %s) at org %d",
1834                                         perm,
1835                                         userid,
1836                                         pkey_value,
1837                                         osrfHashGet( class, "classname" ),
1838                                         atoi( context_org )
1839                                 );
1840
1841                                 result = dbi_conn_queryf(
1842                                         writehandle,
1843                                         "SELECT permission.usr_has_object_perm(%d, '%s', '%s', '%s', %d) AS has_perm;",
1844                                         userid,
1845                                         perm,
1846                                         osrfHashGet( class, "classname" ),
1847                                         pkey_value,
1848                                         atoi( context_org )
1849                                 );
1850
1851                                 if( result ) {
1852                                         osrfLogDebug(
1853                                                 OSRF_LOG_MARK,
1854                                                 "Received a result for object permission [%s] "
1855                                                                 "for user %d on object %s (class %s) at org %d",
1856                                                 perm,
1857                                                 userid,
1858                                                 pkey_value,
1859                                                 osrfHashGet( class, "classname" ),
1860                                                 atoi( context_org )
1861                                         );
1862
1863                                         if( dbi_result_first_row( result )) {
1864                                                 jsonObject* return_val = oilsMakeJSONFromResult( result );
1865                                                 const char* has_perm = jsonObjectGetString(
1866                                                                 jsonObjectGetKeyConst( return_val, "has_perm" ));
1867
1868                                                 osrfLogDebug(
1869                                                         OSRF_LOG_MARK,
1870                                                         "Status of object permission [%s] for user %d "
1871                                                                         "on object %s (class %s) at org %d is %s",
1872                                                         perm,
1873                                                         userid,
1874                                                         pkey_value,
1875                                                         osrfHashGet(class, "classname"),
1876                                                         atoi(context_org),
1877                                                         has_perm
1878                                                 );
1879
1880                                                 if( *has_perm == 't' )
1881                                                         OK = 1;
1882                                                 jsonObjectFree( return_val );
1883                                         }
1884
1885                                         dbi_result_free( result );
1886                                         if( OK )
1887                                                 break;
1888                                 } else {
1889                                         const char* msg;
1890                                         int errnum = dbi_conn_error( writehandle, &msg );
1891                                         osrfLogWarning( OSRF_LOG_MARK,
1892                                                 "Unable to call check object permissions: %d, %s",
1893                                                 errnum, msg ? msg : "(No description available)" );
1894                                         if( !oilsIsDBConnected( writehandle ))
1895                                                 osrfAppSessionPanic( ctx->session );
1896                                 }
1897                         }
1898
1899             if (rs_size > perm_at_threshold) break;
1900
1901                         osrfLogDebug( OSRF_LOG_MARK,
1902                                         "Checking non-object permission [%s] for user %d at org %d",
1903                                         perm, userid, atoi(context_org) );
1904                         result = dbi_conn_queryf(
1905                                 writehandle,
1906                                 "SELECT permission.usr_has_perm(%d, '%s', %d) AS has_perm;",
1907                                 userid,
1908                                 perm,
1909                                 atoi( context_org )
1910                         );
1911
1912                         if( result ) {
1913                                 osrfLogDebug( OSRF_LOG_MARK,
1914                                         "Received a result for permission [%s] for user %d at org %d",
1915                                         perm, userid, atoi( context_org ));
1916                                 if( dbi_result_first_row( result )) {
1917                                         jsonObject* return_val = oilsMakeJSONFromResult( result );
1918                                         const char* has_perm = jsonObjectGetString(
1919                                                 jsonObjectGetKeyConst( return_val, "has_perm" ));
1920                                         osrfLogDebug( OSRF_LOG_MARK,
1921                                                 "Status of permission [%s] for user %d at org %d is [%s]",
1922                                                 perm, userid, atoi( context_org ), has_perm );
1923                                         if( *has_perm == 't' )
1924                                                 OK = 1;
1925                                         jsonObjectFree( return_val );
1926                                 }
1927
1928                                 dbi_result_free( result );
1929                                 if( OK )
1930                                         break;
1931                         } else {
1932                                 const char* msg;
1933                                 int errnum = dbi_conn_error( writehandle, &msg );
1934                                 osrfLogWarning( OSRF_LOG_MARK, "Unable to call user object permissions: %d, %s",
1935                                         errnum, msg ? msg : "(No description available)" );
1936                                 if( !oilsIsDBConnected( writehandle ))
1937                                         osrfAppSessionPanic( ctx->session );
1938                         }
1939
1940                 }
1941
1942                 if( OK )
1943                         break;
1944         }
1945
1946         osrfStringArrayFree( context_org_array );
1947
1948         return OK;
1949 }
1950
1951 /**
1952         @brief Look up the root of the org_unit tree.
1953         @param ctx Pointer to the method context.
1954         @return The id of the root org unit, as a character string.
1955
1956         Query actor.org_unit where parent_ou is null, and return the id as a string.
1957
1958         This function assumes that there is only one root org unit, i.e. that we
1959         have a single tree, not a forest.
1960
1961         The calling code is responsible for freeing the returned string.
1962 */
1963 static const char* org_tree_root( osrfMethodContext* ctx ) {
1964
1965         static char cached_root_id[ 32 ] = "";  // extravagantly large buffer
1966         static time_t last_lookup_time = 0;
1967         time_t current_time = time( NULL );
1968
1969         if( cached_root_id[ 0 ] && ( current_time - last_lookup_time < 3600 ) ) {
1970                 // We successfully looked this up less than an hour ago.
1971                 // It's not likely to have changed since then.
1972                 return strdup( cached_root_id );
1973         }
1974         last_lookup_time = current_time;
1975
1976         int err = 0;
1977         jsonObject* where_clause = single_hash( "parent_ou", NULL );
1978         jsonObject* result = doFieldmapperSearch(
1979                 ctx, osrfHashGet( oilsIDL(), "aou" ), where_clause, NULL, &err );
1980         jsonObjectFree( where_clause );
1981
1982         jsonObject* tree_top = jsonObjectGetIndex( result, 0 );
1983
1984         if( !tree_top ) {
1985                 jsonObjectFree( result );
1986
1987                 growing_buffer* msg = buffer_init( 128 );
1988                 OSRF_BUFFER_ADD( msg, modulename );
1989                 OSRF_BUFFER_ADD( msg,
1990                                 ": Internal error, could not find the top of the org tree (parent_ou = NULL)" );
1991
1992                 char* m = buffer_release( msg );
1993                 osrfAppSessionStatus( ctx->session,
1994                                 OSRF_STATUS_INTERNALSERVERERROR, "osrfMethodException", ctx->request, m );
1995                 free( m );
1996
1997                 cached_root_id[ 0 ] = '\0';
1998                 return NULL;
1999         }
2000
2001         const char* root_org_unit_id = oilsFMGetStringConst( tree_top, "id" );
2002         osrfLogDebug( OSRF_LOG_MARK, "Top of the org tree is %s", root_org_unit_id );
2003
2004         strcpy( cached_root_id, root_org_unit_id );
2005         jsonObjectFree( result );
2006         return cached_root_id;
2007 }
2008
2009 /**
2010         @brief Create a JSON_HASH with a single key/value pair.
2011         @param key The key of the key/value pair.
2012         @param value the value of the key/value pair.
2013         @return Pointer to a newly created jsonObject of type JSON_HASH.
2014
2015         The value of the key/value is either a string or (if @a value is NULL) a null.
2016 */
2017 static jsonObject* single_hash( const char* key, const char* value ) {
2018         // Sanity check
2019         if( ! key ) key = "";
2020
2021         jsonObject* hash = jsonNewObjectType( JSON_HASH );
2022         jsonObjectSetKey( hash, key, jsonNewObject( value ) );
2023         return hash;
2024 }
2025
2026
2027 int doCreate( osrfMethodContext* ctx ) {
2028         if(osrfMethodVerifyContext( ctx )) {
2029                 osrfLogError( OSRF_LOG_MARK, "Invalid method context" );
2030                 return -1;
2031         }
2032
2033         if( enforce_pcrud )
2034                 timeout_needs_resetting = 1;
2035
2036         osrfHash* meta = osrfHashGet( (osrfHash*) ctx->method->userData, "class" );
2037         jsonObject* target = NULL;
2038         jsonObject* options = NULL;
2039
2040         if( enforce_pcrud ) {
2041                 target = jsonObjectGetIndex( ctx->params, 1 );
2042                 options = jsonObjectGetIndex( ctx->params, 2 );
2043         } else {
2044                 target = jsonObjectGetIndex( ctx->params, 0 );
2045                 options = jsonObjectGetIndex( ctx->params, 1 );
2046         }
2047
2048         if( !verifyObjectClass( ctx, target )) {
2049                 osrfAppRespondComplete( ctx, NULL );
2050                 return -1;
2051         }
2052
2053         osrfLogDebug( OSRF_LOG_MARK, "Object seems to be of the correct type" );
2054
2055         const char* trans_id = getXactId( ctx );
2056         if( !trans_id ) {
2057                 osrfLogError( OSRF_LOG_MARK, "No active transaction -- required for CREATE" );
2058
2059                 osrfAppSessionStatus(
2060                         ctx->session,
2061                         OSRF_STATUS_BADREQUEST,
2062                         "osrfMethodException",
2063                         ctx->request,
2064                         "No active transaction -- required for CREATE"
2065                 );
2066                 osrfAppRespondComplete( ctx, NULL );
2067                 return -1;
2068         }
2069
2070         // The following test is harmless but redundant.  If a class is
2071         // readonly, we don't register a create method for it.
2072         if( str_is_true( osrfHashGet( meta, "readonly" ) ) ) {
2073                 osrfAppSessionStatus(
2074                         ctx->session,
2075                         OSRF_STATUS_BADREQUEST,
2076                         "osrfMethodException",
2077                         ctx->request,
2078                         "Cannot INSERT readonly class"
2079                 );
2080                 osrfAppRespondComplete( ctx, NULL );
2081                 return -1;
2082         }
2083
2084         // Set the last_xact_id
2085         int index = oilsIDL_ntop( target->classname, "last_xact_id" );
2086         if( index > -1 ) {
2087                 osrfLogDebug(OSRF_LOG_MARK, "Setting last_xact_id to %s on %s at position %d",
2088                         trans_id, target->classname, index);
2089                 jsonObjectSetIndex( target, index, jsonNewObject( trans_id ));
2090         }
2091
2092         osrfLogDebug( OSRF_LOG_MARK, "There is a transaction running..." );
2093
2094         dbhandle = writehandle;
2095
2096         osrfHash* fields = osrfHashGet( meta, "fields" );
2097         char* pkey       = osrfHashGet( meta, "primarykey" );
2098         char* seq        = osrfHashGet( meta, "sequence" );
2099
2100         growing_buffer* table_buf = buffer_init( 128 );
2101         growing_buffer* col_buf   = buffer_init( 128 );
2102         growing_buffer* val_buf   = buffer_init( 128 );
2103
2104         OSRF_BUFFER_ADD( table_buf, "INSERT INTO " );
2105         OSRF_BUFFER_ADD( table_buf, osrfHashGet( meta, "tablename" ));
2106         OSRF_BUFFER_ADD_CHAR( col_buf, '(' );
2107         buffer_add( val_buf,"VALUES (" );
2108
2109
2110         int first = 1;
2111         osrfHash* field = NULL;
2112         osrfHashIterator* field_itr = osrfNewHashIterator( fields );
2113         while( (field = osrfHashIteratorNext( field_itr ) ) ) {
2114
2115                 const char* field_name = osrfHashIteratorKey( field_itr );
2116
2117                 if( str_is_true( osrfHashGet( field, "virtual" ) ) )
2118                         continue;
2119
2120                 const jsonObject* field_object = oilsFMGetObject( target, field_name );
2121
2122                 char* value;
2123                 if( field_object && field_object->classname ) {
2124                         value = oilsFMGetString(
2125                                 field_object,
2126                                 (char*)oilsIDLFindPath( "/%s/primarykey", field_object->classname )
2127                         );
2128                 } else if( field_object && JSON_BOOL == field_object->type ) {
2129                         if( jsonBoolIsTrue( field_object ) )
2130                                 value = strdup( "t" );
2131                         else
2132                                 value = strdup( "f" );
2133                 } else {
2134                         value = jsonObjectToSimpleString( field_object );
2135                 }
2136
2137                 if( first ) {
2138                         first = 0;
2139                 } else {
2140                         OSRF_BUFFER_ADD_CHAR( col_buf, ',' );
2141                         OSRF_BUFFER_ADD_CHAR( val_buf, ',' );
2142                 }
2143
2144                 buffer_add( col_buf, field_name );
2145
2146                 if( !field_object || field_object->type == JSON_NULL ) {
2147                         buffer_add( val_buf, "DEFAULT" );
2148
2149                 } else if( !strcmp( get_primitive( field ), "number" )) {
2150                         const char* numtype = get_datatype( field );
2151                         if( !strcmp( numtype, "INT8" )) {
2152                                 buffer_fadd( val_buf, "%lld", atoll( value ));
2153
2154                         } else if( !strcmp( numtype, "INT" )) {
2155                                 buffer_fadd( val_buf, "%d", atoi( value ));
2156
2157                         } else if( !strcmp( numtype, "NUMERIC" )) {
2158                                 buffer_fadd( val_buf, "%f", atof( value ));
2159                         }
2160                 } else {
2161                         if( dbi_conn_quote_string( writehandle, &value )) {
2162                                 OSRF_BUFFER_ADD( val_buf, value );
2163
2164                         } else {
2165                                 osrfLogError( OSRF_LOG_MARK, "%s: Error quoting string [%s]", modulename, value );
2166                                 osrfAppSessionStatus(
2167                                         ctx->session,
2168                                         OSRF_STATUS_INTERNALSERVERERROR,
2169                                         "osrfMethodException",
2170                                         ctx->request,
2171                                         "Error quoting string -- please see the error log for more details"
2172                                 );
2173                                 free( value );
2174                                 buffer_free( table_buf );
2175                                 buffer_free( col_buf );
2176                                 buffer_free( val_buf );
2177                                 osrfAppRespondComplete( ctx, NULL );
2178                                 return -1;
2179                         }
2180                 }
2181
2182                 free( value );
2183         }
2184
2185         osrfHashIteratorFree( field_itr );
2186
2187         OSRF_BUFFER_ADD_CHAR( col_buf, ')' );
2188         OSRF_BUFFER_ADD_CHAR( val_buf, ')' );
2189
2190         char* table_str = buffer_release( table_buf );
2191         char* col_str   = buffer_release( col_buf );
2192         char* val_str   = buffer_release( val_buf );
2193         growing_buffer* sql = buffer_init( 128 );
2194         buffer_fadd( sql, "%s %s %s;", table_str, col_str, val_str );
2195         free( table_str );
2196         free( col_str );
2197         free( val_str );
2198
2199         char* query = buffer_release( sql );
2200
2201         osrfLogDebug( OSRF_LOG_MARK, "%s: Insert SQL [%s]", modulename, query );
2202
2203         jsonObject* obj = NULL;
2204         int rc = 0;
2205
2206         dbi_result result = dbi_conn_query( writehandle, query );
2207         if( !result ) {
2208                 obj = jsonNewObject( NULL );
2209                 const char* msg;
2210                 int errnum = dbi_conn_error( writehandle, &msg );
2211                 osrfLogError(
2212                         OSRF_LOG_MARK,
2213                         "%s ERROR inserting %s object using query [%s]: %d %s",
2214                         modulename,
2215                         osrfHashGet(meta, "fieldmapper"),
2216                         query,
2217                         errnum,
2218                         msg ? msg : "(No description available)"
2219                 );
2220                 osrfAppSessionStatus(
2221                         ctx->session,
2222                         OSRF_STATUS_INTERNALSERVERERROR,
2223                         "osrfMethodException",
2224                         ctx->request,
2225                         "INSERT error -- please see the error log for more details"
2226                 );
2227                 if( !oilsIsDBConnected( writehandle ))
2228                         osrfAppSessionPanic( ctx->session );
2229                 rc = -1;
2230         } else {
2231                 dbi_result_free( result );
2232
2233                 char* id = oilsFMGetString( target, pkey );
2234                 if( !id ) {
2235                         unsigned long long new_id = dbi_conn_sequence_last( writehandle, seq );
2236                         growing_buffer* _id = buffer_init( 10 );
2237                         buffer_fadd( _id, "%lld", new_id );
2238                         id = buffer_release( _id );
2239                 }
2240
2241                 // Find quietness specification, if present
2242                 const char* quiet_str = NULL;
2243                 if( options ) {
2244                         const jsonObject* quiet_obj = jsonObjectGetKeyConst( options, "quiet" );
2245                         if( quiet_obj )
2246                                 quiet_str = jsonObjectGetString( quiet_obj );
2247                 }
2248
2249                 if( str_is_true( quiet_str )) {  // if quietness is specified
2250                         obj = jsonNewObject( id );
2251                 }
2252                 else {
2253
2254                         // Fetch the row that we just inserted, so that we can return it to the client
2255                         jsonObject* where_clause = jsonNewObjectType( JSON_HASH );
2256                         jsonObjectSetKey( where_clause, pkey, jsonNewObject( id ));
2257
2258                         int err = 0;
2259                         jsonObject* list = doFieldmapperSearch( ctx, meta, where_clause, NULL, &err );
2260                         if( err )
2261                                 rc = -1;
2262                         else
2263                                 obj = jsonObjectClone( jsonObjectGetIndex( list, 0 ));
2264
2265                         jsonObjectFree( list );
2266                         jsonObjectFree( where_clause );
2267                 }
2268
2269                 free( id );
2270         }
2271
2272         free( query );
2273         osrfAppRespondComplete( ctx, obj );
2274         jsonObjectFree( obj );
2275         return rc;
2276 }
2277
2278 /**
2279         @brief Implement the retrieve method.
2280         @param ctx Pointer to the method context.
2281         @param err Pointer through which to return an error code.
2282         @return If successful, a pointer to the result to be returned to the client;
2283         otherwise NULL.
2284
2285         From the method's class, fetch a row with a specified value in the primary key.  This
2286         method relies on the database design convention that a primary key consists of a single
2287         column.
2288
2289         Method parameters:
2290         - authkey (PCRUD only)
2291         - value of the primary key for the desired row, for building the WHERE clause
2292         - a JSON_HASH containing any other SQL clauses: select, join, etc.
2293
2294         Return to client: One row from the query.
2295 */
2296 int doRetrieve( osrfMethodContext* ctx ) {
2297         if(osrfMethodVerifyContext( ctx )) {
2298                 osrfLogError( OSRF_LOG_MARK, "Invalid method context" );
2299                 return -1;
2300         }
2301
2302         if( enforce_pcrud )
2303                 timeout_needs_resetting = 1;
2304
2305         int id_pos = 0;
2306         int order_pos = 1;
2307
2308         if( enforce_pcrud ) {
2309                 id_pos = 1;
2310                 order_pos = 2;
2311         }
2312
2313         // Get the class metadata
2314         osrfHash* class_def = osrfHashGet( (osrfHash*) ctx->method->userData, "class" );
2315
2316         // Get the value of the primary key, from a method parameter
2317         const jsonObject* id_obj = jsonObjectGetIndex( ctx->params, id_pos );
2318
2319         osrfLogDebug(
2320                 OSRF_LOG_MARK,
2321                 "%s retrieving %s object with primary key value of %s",
2322                 modulename,
2323                 osrfHashGet( class_def, "fieldmapper" ),
2324                 jsonObjectGetString( id_obj )
2325         );
2326
2327         // Build a WHERE clause based on the key value
2328         jsonObject* where_clause = jsonNewObjectType( JSON_HASH );
2329         jsonObjectSetKey(
2330                 where_clause,
2331                 osrfHashGet( class_def, "primarykey" ),  // name of key column
2332                 jsonObjectClone( id_obj )                // value of key column
2333         );
2334
2335         jsonObject* rest_of_query = jsonObjectGetIndex( ctx->params, order_pos );
2336
2337         // Do the query
2338         int err = 0;
2339         jsonObject* list = doFieldmapperSearch( ctx, class_def, where_clause, rest_of_query, &err );
2340
2341         jsonObjectFree( where_clause );
2342         if( err ) {
2343                 osrfAppRespondComplete( ctx, NULL );
2344                 return -1;
2345         }
2346
2347         jsonObject* obj = jsonObjectExtractIndex( list, 0 );
2348         jsonObjectFree( list );
2349
2350         if( enforce_pcrud ) {
2351                 if(!verifyObjectPCRUD( ctx, obj, 1 )) {
2352                         jsonObjectFree( obj );
2353
2354                         growing_buffer* msg = buffer_init( 128 );
2355                         OSRF_BUFFER_ADD( msg, modulename );
2356                         OSRF_BUFFER_ADD( msg, ": Insufficient permissions to retrieve object" );
2357
2358                         char* m = buffer_release( msg );
2359                         osrfAppSessionStatus( ctx->session, OSRF_STATUS_NOTALLOWED, "osrfMethodException",
2360                                         ctx->request, m );
2361                         free( m );
2362
2363                         osrfAppRespondComplete( ctx, NULL );
2364                         return -1;
2365                 }
2366         }
2367
2368         osrfAppRespondComplete( ctx, obj );
2369         jsonObjectFree( obj );
2370         return 0;
2371 }
2372
2373 /**
2374         @brief Translate a numeric value to a string representation for the database.
2375         @param field Pointer to the IDL field definition.
2376         @param value Pointer to a jsonObject holding the value of a field.
2377         @return Pointer to a newly allocated string.
2378
2379         The input object is typically a JSON_NUMBER, but it may be a JSON_STRING as long as
2380         its contents are numeric.  A non-numeric string is likely to result in invalid SQL,
2381         or (what is worse) valid SQL that is wrong.
2382
2383         If the datatype of the receiving field is not numeric, wrap the value in quotes.
2384
2385         The calling code is responsible for freeing the resulting string by calling free().
2386 */
2387 static char* jsonNumberToDBString( osrfHash* field, const jsonObject* value ) {
2388         growing_buffer* val_buf = buffer_init( 32 );
2389         const char* numtype = get_datatype( field );
2390
2391         // For historical reasons the following contains cruft that could be cleaned up.
2392         if( !strncmp( numtype, "INT", 3 ) ) {
2393                 if( value->type == JSON_NUMBER )
2394                         //buffer_fadd( val_buf, "%ld", (long)jsonObjectGetNumber(value) );
2395                         buffer_fadd( val_buf, jsonObjectGetString( value ) );
2396                 else {
2397                         buffer_fadd( val_buf, jsonObjectGetString( value ) );
2398                 }
2399
2400         } else if( !strcmp( numtype, "NUMERIC" )) {
2401                 if( value->type == JSON_NUMBER )
2402                         buffer_fadd( val_buf, jsonObjectGetString( value ));
2403                 else {
2404                         buffer_fadd( val_buf, jsonObjectGetString( value ));
2405                 }
2406
2407         } else {
2408                 // Presumably this was really intended to be a string, so quote it
2409                 char* str = jsonObjectToSimpleString( value );
2410                 if( dbi_conn_quote_string( dbhandle, &str )) {
2411                         OSRF_BUFFER_ADD( val_buf, str );
2412                         free( str );
2413                 } else {
2414                         osrfLogError( OSRF_LOG_MARK, "%s: Error quoting key string [%s]", modulename, str );
2415                         free( str );
2416                         buffer_free( val_buf );
2417                         return NULL;
2418                 }
2419         }
2420
2421         return buffer_release( val_buf );
2422 }
2423
2424 static char* searchINPredicate( const char* class_alias, osrfHash* field,
2425                 jsonObject* node, const char* op, osrfMethodContext* ctx ) {
2426         growing_buffer* sql_buf = buffer_init( 32 );
2427
2428         buffer_fadd(
2429                 sql_buf,
2430                 "\"%s\".%s ",
2431                 class_alias,
2432                 osrfHashGet( field, "name" )
2433         );
2434
2435         if( !op ) {
2436                 buffer_add( sql_buf, "IN (" );
2437         } else if( !strcasecmp( op,"not in" )) {
2438                 buffer_add( sql_buf, "NOT IN (" );
2439         } else {
2440                 buffer_add( sql_buf, "IN (" );
2441         }
2442
2443         if( node->type == JSON_HASH ) {
2444                 // subquery predicate
2445                 char* subpred = buildQuery( ctx, node, SUBSELECT );
2446                 if( ! subpred ) {
2447                         buffer_free( sql_buf );
2448                         return NULL;
2449                 }
2450
2451                 buffer_add( sql_buf, subpred );
2452                 free( subpred );
2453
2454         } else if( node->type == JSON_ARRAY ) {
2455                 // literal value list
2456                 int in_item_index = 0;
2457                 int in_item_first = 1;
2458                 const jsonObject* in_item;
2459                 while( (in_item = jsonObjectGetIndex( node, in_item_index++ )) ) {
2460
2461                         if( in_item_first )
2462                                 in_item_first = 0;
2463                         else
2464                                 buffer_add( sql_buf, ", " );
2465
2466                         // Sanity check
2467                         if( in_item->type != JSON_STRING && in_item->type != JSON_NUMBER ) {
2468                                 osrfLogError( OSRF_LOG_MARK,
2469                                                 "%s: Expected string or number within IN list; found %s",
2470                                                 modulename, json_type( in_item->type ) );
2471                                 buffer_free( sql_buf );
2472                                 return NULL;
2473                         }
2474
2475                         // Append the literal value -- quoted if not a number
2476                         if( JSON_NUMBER == in_item->type ) {
2477                                 char* val = jsonNumberToDBString( field, in_item );
2478                                 OSRF_BUFFER_ADD( sql_buf, val );
2479                                 free( val );
2480
2481                         } else if( !strcmp( get_primitive( field ), "number" )) {
2482                                 char* val = jsonNumberToDBString( field, in_item );
2483                                 OSRF_BUFFER_ADD( sql_buf, val );
2484                                 free( val );
2485
2486                         } else {
2487                                 char* key_string = jsonObjectToSimpleString( in_item );
2488                                 if( dbi_conn_quote_string( dbhandle, &key_string )) {
2489                                         OSRF_BUFFER_ADD( sql_buf, key_string );
2490                                         free( key_string );
2491                                 } else {
2492                                         osrfLogError( OSRF_LOG_MARK,
2493                                                         "%s: Error quoting key string [%s]", modulename, key_string );
2494                                         free( key_string );
2495                                         buffer_free( sql_buf );
2496                                         return NULL;
2497                                 }
2498                         }
2499                 }
2500
2501                 if( in_item_first ) {
2502                         osrfLogError(OSRF_LOG_MARK, "%s: Empty IN list", modulename );
2503                         buffer_free( sql_buf );
2504                         return NULL;
2505                 }
2506         } else {
2507                 osrfLogError( OSRF_LOG_MARK, "%s: Expected object or array for IN clause; found %s",
2508                         modulename, json_type( node->type ));
2509                 buffer_free( sql_buf );
2510                 return NULL;
2511         }
2512
2513         OSRF_BUFFER_ADD_CHAR( sql_buf, ')' );
2514
2515         return buffer_release( sql_buf );
2516 }
2517
2518 // Receive a JSON_ARRAY representing a function call.  The first
2519 // entry in the array is the function name.  The rest are parameters.
2520 static char* searchValueTransform( const jsonObject* array ) {
2521
2522         if( array->size < 1 ) {
2523                 osrfLogError( OSRF_LOG_MARK, "%s: Empty array for value transform", modulename );
2524                 return NULL;
2525         }
2526
2527         // Get the function name
2528         jsonObject* func_item = jsonObjectGetIndex( array, 0 );
2529         if( func_item->type != JSON_STRING ) {
2530                 osrfLogError( OSRF_LOG_MARK, "%s: Error: expected function name, found %s",
2531                         modulename, json_type( func_item->type ));
2532                 return NULL;
2533         }
2534
2535         growing_buffer* sql_buf = buffer_init( 32 );
2536
2537         OSRF_BUFFER_ADD( sql_buf, jsonObjectGetString( func_item ) );
2538         OSRF_BUFFER_ADD( sql_buf, "( " );
2539
2540         // Get the parameters
2541         int func_item_index = 1;   // We already grabbed the zeroth entry
2542         while( (func_item = jsonObjectGetIndex( array, func_item_index++ )) ) {
2543
2544                 // Add a separator comma, if we need one
2545                 if( func_item_index > 2 )
2546                         buffer_add( sql_buf, ", " );
2547
2548                 // Add the current parameter
2549                 if( func_item->type == JSON_NULL ) {
2550                         buffer_add( sql_buf, "NULL" );
2551                 } else {
2552                         char* val = jsonObjectToSimpleString( func_item );
2553                         if( dbi_conn_quote_string( dbhandle, &val )) {
2554                                 OSRF_BUFFER_ADD( sql_buf, val );
2555                                 free( val );
2556                         } else {
2557                                 osrfLogError( OSRF_LOG_MARK, "%s: Error quoting key string [%s]",
2558                                         modulename, val );
2559                                 buffer_free( sql_buf );
2560                                 free( val );
2561                                 return NULL;
2562                         }
2563                 }
2564         }
2565
2566         buffer_add( sql_buf, " )" );
2567
2568         return buffer_release( sql_buf );
2569 }
2570
2571 static char* searchFunctionPredicate( const char* class_alias, osrfHash* field,
2572                 const jsonObject* node, const char* op ) {
2573
2574         if( ! is_good_operator( op ) ) {
2575                 osrfLogError( OSRF_LOG_MARK, "%s: Invalid operator [%s]", modulename, op );
2576                 return NULL;
2577         }
2578
2579         char* val = searchValueTransform( node );
2580         if( !val )
2581                 return NULL;
2582
2583         growing_buffer* sql_buf = buffer_init( 32 );
2584         buffer_fadd(
2585                 sql_buf,
2586                 "\"%s\".%s %s %s",
2587                 class_alias,
2588                 osrfHashGet( field, "name" ),
2589                 op,
2590                 val
2591         );
2592
2593         free( val );
2594
2595         return buffer_release( sql_buf );
2596 }
2597
2598 // class_alias is a class name or other table alias
2599 // field is a field definition as stored in the IDL
2600 // node comes from the method parameter, and may represent an entry in the SELECT list
2601 static char* searchFieldTransform( const char* class_alias, osrfHash* field,
2602                 const jsonObject* node ) {
2603         growing_buffer* sql_buf = buffer_init( 32 );
2604
2605         const char* field_transform = jsonObjectGetString(
2606                 jsonObjectGetKeyConst( node, "transform" ) );
2607         const char* transform_subcolumn = jsonObjectGetString(
2608                 jsonObjectGetKeyConst( node, "result_field" ) );
2609
2610         if( transform_subcolumn ) {
2611                 if( ! is_identifier( transform_subcolumn ) ) {
2612                         osrfLogError( OSRF_LOG_MARK, "%s: Invalid subfield name: \"%s\"\n",
2613                                         modulename, transform_subcolumn );
2614                         buffer_free( sql_buf );
2615                         return NULL;
2616                 }
2617                 OSRF_BUFFER_ADD_CHAR( sql_buf, '(' );    // enclose transform in parentheses
2618         }
2619
2620         if( field_transform ) {
2621
2622                 if( ! is_identifier( field_transform ) ) {
2623                         osrfLogError( OSRF_LOG_MARK, "%s: Expected function name, found \"%s\"\n",
2624                                         modulename, field_transform );
2625                         buffer_free( sql_buf );
2626                         return NULL;
2627                 }
2628
2629                 if( obj_is_true( jsonObjectGetKeyConst( node, "distinct" ) ) ) {
2630                         buffer_fadd( sql_buf, "%s(DISTINCT \"%s\".%s",
2631                                 field_transform, class_alias, osrfHashGet( field, "name" ));
2632                 } else {
2633                         buffer_fadd( sql_buf, "%s(\"%s\".%s",
2634                                 field_transform, class_alias, osrfHashGet( field, "name" ));
2635                 }
2636
2637                 const jsonObject* array = jsonObjectGetKeyConst( node, "params" );
2638
2639                 if( array ) {
2640                         if( array->type != JSON_ARRAY ) {
2641                                 osrfLogError( OSRF_LOG_MARK,
2642                                         "%s: Expected JSON_ARRAY for function params; found %s",
2643                                         modulename, json_type( array->type ) );
2644                                 buffer_free( sql_buf );
2645                                 return NULL;
2646                         }
2647                         int func_item_index = 0;
2648                         jsonObject* func_item;
2649                         while( (func_item = jsonObjectGetIndex( array, func_item_index++ ))) {
2650
2651                                 char* val = jsonObjectToSimpleString( func_item );
2652
2653                                 if( !val ) {
2654                                         buffer_add( sql_buf, ",NULL" );
2655                                 } else if( dbi_conn_quote_string( dbhandle, &val )) {
2656                                         OSRF_BUFFER_ADD_CHAR( sql_buf, ',' );
2657                                         OSRF_BUFFER_ADD( sql_buf, val );
2658                                 } else {
2659                                         osrfLogError( OSRF_LOG_MARK,
2660                                                         "%s: Error quoting key string [%s]", modulename, val );
2661                                         free( val );
2662                                         buffer_free( sql_buf );
2663                                         return NULL;
2664                                 }
2665                                 free( val );
2666                         }
2667                 }
2668
2669                 buffer_add( sql_buf, " )" );
2670
2671         } else {
2672                 buffer_fadd( sql_buf, "\"%s\".%s", class_alias, osrfHashGet( field, "name" ));
2673         }
2674
2675         if( transform_subcolumn )
2676                 buffer_fadd( sql_buf, ").\"%s\"", transform_subcolumn );
2677
2678         return buffer_release( sql_buf );
2679 }
2680
2681 static char* searchFieldTransformPredicate( const ClassInfo* class_info, osrfHash* field,
2682                 const jsonObject* node, const char* op ) {
2683
2684         if( ! is_good_operator( op ) ) {
2685                 osrfLogError( OSRF_LOG_MARK, "%s: Error: Invalid operator %s", modulename, op );
2686                 return NULL;
2687         }
2688
2689         char* field_transform = searchFieldTransform( class_info->alias, field, node );
2690         if( ! field_transform )
2691                 return NULL;
2692         char* value = NULL;
2693         int extra_parens = 0;   // boolean
2694
2695         const jsonObject* value_obj = jsonObjectGetKeyConst( node, "value" );
2696         if( ! value_obj ) {
2697                 value = searchWHERE( node, class_info, AND_OP_JOIN, NULL );
2698                 if( !value ) {
2699                         osrfLogError( OSRF_LOG_MARK, "%s: Error building condition for field transform",
2700                                 modulename );
2701                         free( field_transform );
2702                         return NULL;
2703                 }
2704                 extra_parens = 1;
2705         } else if( value_obj->type == JSON_ARRAY ) {
2706                 value = searchValueTransform( value_obj );
2707                 if( !value ) {
2708                         osrfLogError( OSRF_LOG_MARK,
2709                                 "%s: Error building value transform for field transform", modulename );
2710                         free( field_transform );
2711                         return NULL;
2712                 }
2713         } else if( value_obj->type == JSON_HASH ) {
2714                 value = searchWHERE( value_obj, class_info, AND_OP_JOIN, NULL );
2715                 if( !value ) {
2716                         osrfLogError( OSRF_LOG_MARK, "%s: Error building predicate for field transform",
2717                                 modulename );
2718                         free( field_transform );
2719                         return NULL;
2720                 }
2721                 extra_parens = 1;
2722         } else if( value_obj->type == JSON_NUMBER ) {
2723                 value = jsonNumberToDBString( field, value_obj );
2724         } else if( value_obj->type == JSON_NULL ) {
2725                 osrfLogError( OSRF_LOG_MARK,
2726                         "%s: Error building predicate for field transform: null value", modulename );
2727                 free( field_transform );
2728                 return NULL;
2729         } else if( value_obj->type == JSON_BOOL ) {
2730                 osrfLogError( OSRF_LOG_MARK,
2731                         "%s: Error building predicate for field transform: boolean value", modulename );
2732                 free( field_transform );
2733                 return NULL;
2734         } else {
2735                 if( !strcmp( get_primitive( field ), "number") ) {
2736                         value = jsonNumberToDBString( field, value_obj );
2737                 } else {
2738                         value = jsonObjectToSimpleString( value_obj );
2739                         if( !dbi_conn_quote_string( dbhandle, &value )) {
2740                                 osrfLogError( OSRF_LOG_MARK, "%s: Error quoting key string [%s]",
2741                                         modulename, value );
2742                                 free( value );
2743                                 free( field_transform );
2744                                 return NULL;
2745                         }
2746                 }
2747         }
2748
2749         const char* left_parens  = "";
2750         const char* right_parens = "";
2751
2752         if( extra_parens ) {
2753                 left_parens  = "(";
2754                 right_parens = ")";
2755         }
2756
2757         growing_buffer* sql_buf = buffer_init( 32 );
2758
2759         buffer_fadd(
2760                 sql_buf,
2761                 "%s%s %s %s %s %s%s",
2762                 left_parens,
2763                 field_transform,
2764                 op,
2765                 left_parens,
2766                 value,
2767                 right_parens,
2768                 right_parens
2769         );
2770
2771         free( value );
2772         free( field_transform );
2773
2774         return buffer_release( sql_buf );
2775 }
2776
2777 static char* searchSimplePredicate( const char* op, const char* class_alias,
2778                 osrfHash* field, const jsonObject* node ) {
2779
2780         if( ! is_good_operator( op ) ) {
2781                 osrfLogError( OSRF_LOG_MARK, "%s: Invalid operator [%s]", modulename, op );
2782                 return NULL;
2783         }
2784
2785         char* val = NULL;
2786
2787         // Get the value to which we are comparing the specified column
2788         if( node->type != JSON_NULL ) {
2789                 if( node->type == JSON_NUMBER ) {
2790                         val = jsonNumberToDBString( field, node );
2791                 } else if( !strcmp( get_primitive( field ), "number" ) ) {
2792                         val = jsonNumberToDBString( field, node );
2793                 } else {
2794                         val = jsonObjectToSimpleString( node );
2795                 }
2796         }
2797
2798         if( val ) {
2799                 if( JSON_NUMBER != node->type && strcmp( get_primitive( field ), "number") ) {
2800                         // Value is not numeric; enclose it in quotes
2801                         if( !dbi_conn_quote_string( dbhandle, &val ) ) {
2802                                 osrfLogError( OSRF_LOG_MARK, "%s: Error quoting key string [%s]",
2803                                         modulename, val );
2804                                 free( val );
2805                                 return NULL;
2806                         }
2807                 }
2808         } else {
2809                 // Compare to a null value
2810                 val = strdup( "NULL" );
2811                 if( strcmp( op, "=" ))
2812                         op = "IS NOT";
2813                 else
2814                         op = "IS";
2815         }
2816
2817         growing_buffer* sql_buf = buffer_init( 32 );
2818         buffer_fadd( sql_buf, "\"%s\".%s %s %s", class_alias, osrfHashGet(field, "name"), op, val );
2819         char* pred = buffer_release( sql_buf );
2820
2821         free( val );
2822
2823         return pred;
2824 }
2825
2826 static char* searchBETWEENPredicate( const char* class_alias,
2827                 osrfHash* field, const jsonObject* node ) {
2828
2829         const jsonObject* x_node = jsonObjectGetIndex( node, 0 );
2830         const jsonObject* y_node = jsonObjectGetIndex( node, 1 );
2831
2832         if( NULL == y_node ) {
2833                 osrfLogError( OSRF_LOG_MARK, "%s: Not enough operands for BETWEEN operator", modulename );
2834                 return NULL;
2835         }
2836         else if( NULL != jsonObjectGetIndex( node, 2 ) ) {
2837                 osrfLogError( OSRF_LOG_MARK, "%s: Too many operands for BETWEEN operator", modulename );
2838                 return NULL;
2839         }
2840
2841         char* x_string;
2842         char* y_string;
2843
2844         if( !strcmp( get_primitive( field ), "number") ) {
2845                 x_string = jsonNumberToDBString( field, x_node );
2846                 y_string = jsonNumberToDBString( field, y_node );
2847
2848         } else {
2849                 x_string = jsonObjectToSimpleString( x_node );
2850                 y_string = jsonObjectToSimpleString( y_node );
2851                 if( !(dbi_conn_quote_string( dbhandle, &x_string )
2852                         && dbi_conn_quote_string( dbhandle, &y_string )) ) {
2853                         osrfLogError( OSRF_LOG_MARK, "%s: Error quoting key strings [%s] and [%s]",
2854                                         modulename, x_string, y_string );
2855                         free( x_string );
2856                         free( y_string );
2857                         return NULL;
2858                 }
2859         }
2860
2861         growing_buffer* sql_buf = buffer_init( 32 );
2862         buffer_fadd( sql_buf, "\"%s\".%s BETWEEN %s AND %s",
2863                         class_alias, osrfHashGet( field, "name" ), x_string, y_string );
2864         free( x_string );
2865         free( y_string );
2866
2867         return buffer_release( sql_buf );
2868 }
2869
2870 static char* searchPredicate( const ClassInfo* class_info, osrfHash* field,
2871                                                           jsonObject* node, osrfMethodContext* ctx ) {
2872
2873         char* pred = NULL;
2874         if( node->type == JSON_ARRAY ) { // equality IN search
2875                 pred = searchINPredicate( class_info->alias, field, node, NULL, ctx );
2876         } else if( node->type == JSON_HASH ) { // other search
2877                 jsonIterator* pred_itr = jsonNewIterator( node );
2878                 if( !jsonIteratorHasNext( pred_itr ) ) {
2879                         osrfLogError( OSRF_LOG_MARK, "%s: Empty predicate for field \"%s\"",
2880                                         modulename, osrfHashGet(field, "name" ));
2881                 } else {
2882                         jsonObject* pred_node = jsonIteratorNext( pred_itr );
2883
2884                         // Verify that there are no additional predicates
2885                         if( jsonIteratorHasNext( pred_itr ) ) {
2886                                 osrfLogError( OSRF_LOG_MARK, "%s: Multiple predicates for field \"%s\"",
2887                                                 modulename, osrfHashGet(field, "name" ));
2888                         } else if( !(strcasecmp( pred_itr->key,"between" )) )
2889                                 pred = searchBETWEENPredicate( class_info->alias, field, pred_node );
2890                         else if( !(strcasecmp( pred_itr->key,"in" ))
2891                                         || !(strcasecmp( pred_itr->key,"not in" )) )
2892                                 pred = searchINPredicate(
2893                                         class_info->alias, field, pred_node, pred_itr->key, ctx );
2894                         else if( pred_node->type == JSON_ARRAY )
2895                                 pred = searchFunctionPredicate(
2896                                         class_info->alias, field, pred_node, pred_itr->key );
2897                         else if( pred_node->type == JSON_HASH )
2898                                 pred = searchFieldTransformPredicate(
2899                                         class_info, field, pred_node, pred_itr->key );
2900                         else
2901                                 pred = searchSimplePredicate( pred_itr->key, class_info->alias, field, pred_node );
2902                 }
2903                 jsonIteratorFree( pred_itr );
2904
2905         } else if( node->type == JSON_NULL ) { // IS NULL search
2906                 growing_buffer* _p = buffer_init( 64 );
2907                 buffer_fadd(
2908                         _p,
2909                         "\"%s\".%s IS NULL",
2910                         class_info->class_name,
2911                         osrfHashGet( field, "name" )
2912                 );
2913                 pred = buffer_release( _p );
2914         } else { // equality search
2915                 pred = searchSimplePredicate( "=", class_info->alias, field, node );
2916         }
2917
2918         return pred;
2919
2920 }
2921
2922
2923 /*
2924
2925 join : {
2926         acn : {
2927                 field : record,
2928                 fkey : id
2929                 type : left
2930                 filter_op : or
2931                 filter : { ... },
2932                 join : {
2933                         acp : {
2934                                 field : call_number,
2935                                 fkey : id,
2936                                 filter : { ... },
2937                         },
2938                 },
2939         },
2940         mrd : {
2941                 field : record,
2942                 type : inner
2943                 fkey : id,
2944                 filter : { ... },
2945         }
2946 }
2947
2948 */
2949
2950 static char* searchJOIN( const jsonObject* join_hash, const ClassInfo* left_info ) {
2951
2952         const jsonObject* working_hash;
2953         jsonObject* freeable_hash = NULL;
2954
2955         if( join_hash->type == JSON_HASH ) {
2956                 working_hash = join_hash;
2957         } else if( join_hash->type == JSON_STRING ) {
2958                 // turn it into a JSON_HASH by creating a wrapper
2959                 // around a copy of the original
2960                 const char* _tmp = jsonObjectGetString( join_hash );
2961                 freeable_hash = jsonNewObjectType( JSON_HASH );
2962                 jsonObjectSetKey( freeable_hash, _tmp, NULL );
2963                 working_hash = freeable_hash;
2964         } else {
2965                 osrfLogError(
2966                         OSRF_LOG_MARK,
2967                         "%s: JOIN failed; expected JSON object type not found",
2968                         modulename
2969                 );
2970                 return NULL;
2971         }
2972
2973         growing_buffer* join_buf = buffer_init( 128 );
2974         const char* leftclass = left_info->class_name;
2975
2976         jsonObject* snode = NULL;
2977         jsonIterator* search_itr = jsonNewIterator( working_hash );
2978
2979         while ( (snode = jsonIteratorNext( search_itr )) ) {
2980                 const char* right_alias = search_itr->key;
2981                 const char* class =
2982                                 jsonObjectGetString( jsonObjectGetKeyConst( snode, "class" ) );
2983                 if( ! class )
2984                         class = right_alias;
2985
2986                 const ClassInfo* right_info = add_joined_class( right_alias, class );
2987                 if( !right_info ) {
2988                         osrfLogError(
2989                                 OSRF_LOG_MARK,
2990                                 "%s: JOIN failed.  Class \"%s\" not resolved in IDL",
2991                                 modulename,
2992                                 search_itr->key
2993                         );
2994                         jsonIteratorFree( search_itr );
2995                         buffer_free( join_buf );
2996                         if( freeable_hash )
2997                                 jsonObjectFree( freeable_hash );
2998                         return NULL;
2999                 }
3000                 osrfHash* links    = right_info->links;
3001                 const char* table  = right_info->source_def;
3002
3003                 const char* fkey  = jsonObjectGetString( jsonObjectGetKeyConst( snode, "fkey" ) );
3004                 const char* field = jsonObjectGetString( jsonObjectGetKeyConst( snode, "field" ) );
3005
3006                 if( field && !fkey ) {
3007                         // Look up the corresponding join column in the IDL.
3008                         // The link must be defined in the child table,
3009                         // and point to the right parent table.
3010                         osrfHash* idl_link = (osrfHash*) osrfHashGet( links, field );
3011                         const char* reltype = NULL;
3012                         const char* other_class = NULL;
3013                         reltype = osrfHashGet( idl_link, "reltype" );
3014                         if( reltype && strcmp( reltype, "has_many" ) )
3015                                 other_class = osrfHashGet( idl_link, "class" );
3016                         if( other_class && !strcmp( other_class, leftclass ) )
3017                                 fkey = osrfHashGet( idl_link, "key" );
3018                         if( !fkey ) {
3019                                 osrfLogError(
3020                                         OSRF_LOG_MARK,
3021                                         "%s: JOIN failed.  No link defined from %s.%s to %s",
3022                                         modulename,
3023                                         class,
3024                                         field,
3025                                         leftclass
3026                                 );
3027                                 buffer_free( join_buf );
3028                                 if( freeable_hash )
3029                                         jsonObjectFree( freeable_hash );
3030                                 jsonIteratorFree( search_itr );
3031                                 return NULL;
3032                         }
3033
3034                 } else if( !field && fkey ) {
3035                         // Look up the corresponding join column in the IDL.
3036                         // The link must be defined in the child table,
3037                         // and point to the right parent table.
3038                         osrfHash* left_links = left_info->links;
3039                         osrfHash* idl_link = (osrfHash*) osrfHashGet( left_links, fkey );
3040                         const char* reltype = NULL;
3041                         const char* other_class = NULL;
3042                         reltype = osrfHashGet( idl_link, "reltype" );
3043                         if( reltype && strcmp( reltype, "has_many" ) )
3044                                 other_class = osrfHashGet( idl_link, "class" );
3045                         if( other_class && !strcmp( other_class, class ) )
3046                                 field = osrfHashGet( idl_link, "key" );
3047                         if( !field ) {
3048                                 osrfLogError(
3049                                         OSRF_LOG_MARK,
3050                                         "%s: JOIN failed.  No link defined from %s.%s to %s",
3051                                         modulename,
3052                                         leftclass,
3053                                         fkey,
3054                                         class
3055                                 );
3056                                 buffer_free( join_buf );
3057                                 if( freeable_hash )
3058                                         jsonObjectFree( freeable_hash );
3059                                 jsonIteratorFree( search_itr );
3060                                 return NULL;
3061                         }
3062
3063                 } else if( !field && !fkey ) {
3064                         osrfHash* left_links = left_info->links;
3065
3066                         // For each link defined for the left class:
3067                         // see if the link references the joined class
3068                         osrfHashIterator* itr = osrfNewHashIterator( left_links );
3069                         osrfHash* curr_link = NULL;
3070                         while( (curr_link = osrfHashIteratorNext( itr ) ) ) {
3071                                 const char* other_class = osrfHashGet( curr_link, "class" );
3072                                 if( other_class && !strcmp( other_class, class ) ) {
3073
3074                                         // In the IDL, the parent class doesn't always know then names of the child
3075                                         // columns that are pointing to it, so don't use that end of the link
3076                                         const char* reltype = osrfHashGet( curr_link, "reltype" );
3077                                         if( reltype && strcmp( reltype, "has_many" ) ) {
3078                                                 // Found a link between the classes
3079                                                 fkey = osrfHashIteratorKey( itr );
3080                                                 field = osrfHashGet( curr_link, "key" );
3081                                                 break;
3082                                         }
3083                                 }
3084                         }
3085                         osrfHashIteratorFree( itr );
3086
3087                         if( !field || !fkey ) {
3088                                 // Do another such search, with the classes reversed
3089
3090                                 // For each link defined for the joined class:
3091                                 // see if the link references the left class
3092                                 osrfHashIterator* itr = osrfNewHashIterator( links );
3093                                 osrfHash* curr_link = NULL;
3094                                 while( (curr_link = osrfHashIteratorNext( itr ) ) ) {
3095                                         const char* other_class = osrfHashGet( curr_link, "class" );
3096                                         if( other_class && !strcmp( other_class, leftclass ) ) {
3097
3098                                                 // In the IDL, the parent class doesn't know then names of the child
3099                                                 // columns that are pointing to it, so don't use that end of the link
3100                                                 const char* reltype = osrfHashGet( curr_link, "reltype" );
3101                                                 if( reltype && strcmp( reltype, "has_many" ) ) {
3102                                                         // Found a link between the classes
3103                                                         field = osrfHashIteratorKey( itr );
3104                                                         fkey = osrfHashGet( curr_link, "key" );
3105                                                         break;
3106                                                 }
3107                                         }
3108                                 }
3109                                 osrfHashIteratorFree( itr );
3110                         }
3111
3112                         if( !field || !fkey ) {
3113                                 osrfLogError(
3114                                         OSRF_LOG_MARK,
3115                                         "%s: JOIN failed.  No link defined between %s and %s",
3116                                         modulename,
3117                                         leftclass,
3118                                         class
3119                                 );
3120                                 buffer_free( join_buf );
3121                                 if( freeable_hash )
3122                                         jsonObjectFree( freeable_hash );
3123                                 jsonIteratorFree( search_itr );
3124                                 return NULL;
3125                         }
3126                 }
3127
3128                 const char* type = jsonObjectGetString( jsonObjectGetKeyConst( snode, "type" ) );
3129                 if( type ) {
3130                         if( !strcasecmp( type,"left" )) {
3131                                 buffer_add( join_buf, " LEFT JOIN" );
3132                         } else if( !strcasecmp( type,"right" )) {
3133                                 buffer_add( join_buf, " RIGHT JOIN" );
3134                         } else if( !strcasecmp( type,"full" )) {
3135                                 buffer_add( join_buf, " FULL JOIN" );
3136                         } else {
3137                                 buffer_add( join_buf, " INNER JOIN" );
3138                         }
3139                 } else {
3140                         buffer_add( join_buf, " INNER JOIN" );
3141                 }
3142
3143                 buffer_fadd( join_buf, " %s AS \"%s\" ON ( \"%s\".%s = \"%s\".%s",
3144                                         table, right_alias, right_alias, field, left_info->alias, fkey );
3145
3146                 // Add any other join conditions as specified by "filter"
3147                 const jsonObject* filter = jsonObjectGetKeyConst( snode, "filter" );
3148                 if( filter ) {
3149                         const char* filter_op = jsonObjectGetString(
3150                                 jsonObjectGetKeyConst( snode, "filter_op" ) );
3151                         if( filter_op && !strcasecmp( "or",filter_op )) {
3152                                 buffer_add( join_buf, " OR " );
3153                         } else {
3154                                 buffer_add( join_buf, " AND " );
3155                         }
3156
3157                         char* jpred = searchWHERE( filter, right_info, AND_OP_JOIN, NULL );
3158                         if( jpred ) {
3159                                 OSRF_BUFFER_ADD_CHAR( join_buf, ' ' );
3160                                 OSRF_BUFFER_ADD( join_buf, jpred );
3161                                 free( jpred );
3162                         } else {
3163                                 osrfLogError(
3164                                         OSRF_LOG_MARK,
3165                                         "%s: JOIN failed.  Invalid conditional expression.",
3166                                         modulename
3167                                 );
3168                                 jsonIteratorFree( search_itr );
3169                                 buffer_free( join_buf );
3170                                 if( freeable_hash )
3171                                         jsonObjectFree( freeable_hash );
3172                                 return NULL;
3173                         }
3174                 }
3175
3176                 buffer_add( join_buf, " ) " );
3177
3178                 // Recursively add a nested join, if one is present
3179                 const jsonObject* join_filter = jsonObjectGetKeyConst( snode, "join" );
3180                 if( join_filter ) {
3181                         char* jpred = searchJOIN( join_filter, right_info );
3182                         if( jpred ) {
3183                                 OSRF_BUFFER_ADD_CHAR( join_buf, ' ' );
3184                                 OSRF_BUFFER_ADD( join_buf, jpred );
3185                                 free( jpred );
3186                         } else {
3187                                 osrfLogError( OSRF_LOG_MARK, "%s: Invalid nested join.", modulename );
3188                                 jsonIteratorFree( search_itr );
3189                                 buffer_free( join_buf );
3190                                 if( freeable_hash )
3191                                         jsonObjectFree( freeable_hash );
3192                                 return NULL;
3193                         }
3194                 }
3195         }
3196
3197         if( freeable_hash )
3198                 jsonObjectFree( freeable_hash );
3199         jsonIteratorFree( search_itr );
3200
3201         return buffer_release( join_buf );
3202 }
3203
3204 /*
3205
3206 { +class : { -or|-and : { field : { op : value }, ... } ... }, ... }
3207 { +class : { -or|-and : [ { field : { op : value }, ... }, ...] ... }, ... }
3208 [ { +class : { -or|-and : [ { field : { op : value }, ... }, ...] ... }, ... }, ... ]
3209
3210 Generate code to express a set of conditions, as for a WHERE clause.  Parameters:
3211
3212 search_hash is the JSON expression of the conditions.
3213 meta is the class definition from the IDL, for the relevant table.
3214 opjoin_type indicates whether multiple conditions, if present, should be
3215         connected by AND or OR.
3216 osrfMethodContext is loaded with all sorts of stuff, but all we do with it here is
3217         to pass it to other functions -- and all they do with it is to use the session
3218         and request members to send error messages back to the client.
3219
3220 */
3221
3222 static char* searchWHERE( const jsonObject* search_hash, const ClassInfo* class_info,
3223                 int opjoin_type, osrfMethodContext* ctx ) {
3224
3225         osrfLogDebug(
3226                 OSRF_LOG_MARK,
3227                 "%s: Entering searchWHERE; search_hash addr = %p, meta addr = %p, "
3228                 "opjoin_type = %d, ctx addr = %p",
3229                 modulename,
3230                 search_hash,
3231                 class_info->class_def,
3232                 opjoin_type,
3233                 ctx
3234         );
3235
3236         growing_buffer* sql_buf = buffer_init( 128 );
3237
3238         jsonObject* node = NULL;
3239
3240         int first = 1;
3241         if( search_hash->type == JSON_ARRAY ) {
3242                 if( 0 == search_hash->size ) {
3243                         osrfLogError(
3244                                 OSRF_LOG_MARK,
3245                                 "%s: Invalid predicate structure: empty JSON array",
3246                                 modulename
3247                         );
3248                         buffer_free( sql_buf );
3249                         return NULL;
3250                 }
3251
3252                 unsigned long i = 0;
3253                 while(( node = jsonObjectGetIndex( search_hash, i++ ) )) {
3254                         if( first ) {
3255                                 first = 0;
3256                         } else {
3257                                 if( opjoin_type == OR_OP_JOIN )
3258                                         buffer_add( sql_buf, " OR " );
3259                                 else
3260                                         buffer_add( sql_buf, " AND " );
3261                         }
3262
3263                         char* subpred = searchWHERE( node, class_info, opjoin_type, ctx );
3264                         if( ! subpred ) {
3265                                 buffer_free( sql_buf );
3266                                 return NULL;
3267                         }
3268
3269                         buffer_fadd( sql_buf, "( %s )", subpred );
3270                         free( subpred );
3271                 }
3272
3273         } else if( search_hash->type == JSON_HASH ) {
3274                 osrfLogDebug( OSRF_LOG_MARK,
3275                         "%s: In WHERE clause, condition type is JSON_HASH", modulename );
3276                 jsonIterator* search_itr = jsonNewIterator( search_hash );
3277                 if( !jsonIteratorHasNext( search_itr ) ) {
3278                         osrfLogError(
3279                                 OSRF_LOG_MARK,
3280                                 "%s: Invalid predicate structure: empty JSON object",
3281                                 modulename
3282                         );
3283                         jsonIteratorFree( search_itr );
3284                         buffer_free( sql_buf );
3285                         return NULL;
3286                 }
3287
3288                 while( (node = jsonIteratorNext( search_itr )) ) {
3289
3290                         if( first ) {
3291                                 first = 0;
3292                         } else {
3293                                 if( opjoin_type == OR_OP_JOIN )
3294                                         buffer_add( sql_buf, " OR " );
3295                                 else
3296                                         buffer_add( sql_buf, " AND " );
3297                         }
3298
3299                         if( '+' == search_itr->key[ 0 ] ) {
3300
3301                                 // This plus sign prefixes a class name or other table alias;
3302                                 // make sure the table alias is in scope
3303                                 ClassInfo* alias_info = search_all_alias( search_itr->key + 1 );
3304                                 if( ! alias_info ) {
3305                                         osrfLogError(
3306                                                          OSRF_LOG_MARK,
3307                                                         "%s: Invalid table alias \"%s\" in WHERE clause",
3308                                                         modulename,
3309                                                         search_itr->key + 1
3310                                         );
3311                                         jsonIteratorFree( search_itr );
3312                                         buffer_free( sql_buf );
3313                                         return NULL;
3314                                 }
3315
3316                                 if( node->type == JSON_STRING ) {
3317                                         // It's the name of a column; make sure it belongs to the class
3318                                         const char* fieldname = jsonObjectGetString( node );
3319                                         if( ! osrfHashGet( alias_info->fields, fieldname ) ) {
3320                                                 osrfLogError(
3321                                                         OSRF_LOG_MARK,
3322                                                         "%s: Invalid column name \"%s\" in WHERE clause "
3323                                                         "for table alias \"%s\"",
3324                                                         modulename,
3325                                                         fieldname,
3326                                                         alias_info->alias
3327                                                 );
3328                                                 jsonIteratorFree( search_itr );
3329                                                 buffer_free( sql_buf );
3330                                                 return NULL;
3331                                         }
3332
3333                                         buffer_fadd( sql_buf, " \"%s\".%s ", alias_info->alias, fieldname );
3334                                 } else {
3335                                         // It's something more complicated
3336                                         char* subpred = searchWHERE( node, alias_info, AND_OP_JOIN, ctx );
3337                                         if( ! subpred ) {
3338                                                 jsonIteratorFree( search_itr );
3339                                                 buffer_free( sql_buf );
3340                                                 return NULL;
3341                                         }
3342
3343                                         buffer_fadd( sql_buf, "( %s )", subpred );
3344                                         free( subpred );
3345                                 }
3346                         } else if( '-' == search_itr->key[ 0 ] ) {
3347                                 if( !strcasecmp( "-or", search_itr->key )) {
3348                                         char* subpred = searchWHERE( node, class_info, OR_OP_JOIN, ctx );
3349                                         if( ! subpred ) {
3350                                                 jsonIteratorFree( search_itr );
3351                                                 buffer_free( sql_buf );
3352                                                 return NULL;
3353                                         }
3354
3355                                         buffer_fadd( sql_buf, "( %s )", subpred );
3356                                         free( subpred );
3357                                 } else if( !strcasecmp( "-and", search_itr->key )) {
3358                                         char* subpred = searchWHERE( node, class_info, AND_OP_JOIN, ctx );
3359                                         if( ! subpred ) {
3360                                                 jsonIteratorFree( search_itr );
3361                                                 buffer_free( sql_buf );
3362                                                 return NULL;
3363                                         }
3364
3365                                         buffer_fadd( sql_buf, "( %s )", subpred );
3366                                         free( subpred );
3367                                 } else if( !strcasecmp("-not",search_itr->key) ) {
3368                                         char* subpred = searchWHERE( node, class_info, AND_OP_JOIN, ctx );
3369                                         if( ! subpred ) {
3370                                                 jsonIteratorFree( search_itr );
3371                                                 buffer_free( sql_buf );
3372                                                 return NULL;
3373                                         }
3374
3375                                         buffer_fadd( sql_buf, " NOT ( %s )", subpred );
3376                                         free( subpred );
3377                                 } else if( !strcasecmp( "-exists", search_itr->key )) {
3378                                         char* subpred = buildQuery( ctx, node, SUBSELECT );
3379                                         if( ! subpred ) {
3380                                                 jsonIteratorFree( search_itr );
3381                                                 buffer_free( sql_buf );
3382                                                 return NULL;
3383                                         }
3384
3385                                         buffer_fadd( sql_buf, "EXISTS ( %s )", subpred );
3386                                         free( subpred );
3387                                 } else if( !strcasecmp("-not-exists", search_itr->key )) {
3388                                         char* subpred = buildQuery( ctx, node, SUBSELECT );
3389                                         if( ! subpred ) {
3390                                                 jsonIteratorFree( search_itr );
3391                                                 buffer_free( sql_buf );
3392                                                 return NULL;
3393                                         }
3394
3395                                         buffer_fadd( sql_buf, "NOT EXISTS ( %s )", subpred );
3396                                         free( subpred );
3397                                 } else {     // Invalid "minus" operator
3398                                         osrfLogError(
3399                                                          OSRF_LOG_MARK,
3400                                                         "%s: Invalid operator \"%s\" in WHERE clause",
3401                                                         modulename,
3402                                                         search_itr->key
3403                                         );
3404                                         jsonIteratorFree( search_itr );
3405                                         buffer_free( sql_buf );
3406                                         return NULL;
3407                                 }
3408
3409                         } else {
3410
3411                                 const char* class = class_info->class_name;
3412                                 osrfHash* fields = class_info->fields;
3413                                 osrfHash* field = osrfHashGet( fields, search_itr->key );
3414
3415                                 if( !field ) {
3416                                         const char* table = class_info->source_def;
3417                                         osrfLogError(
3418                                                 OSRF_LOG_MARK,
3419                                                 "%s: Attempt to reference non-existent column \"%s\" on %s (%s)",
3420                                                 modulename,
3421                                                 search_itr->key,
3422                                                 table ? table : "?",
3423                                                 class ? class : "?"
3424                                         );
3425                                         jsonIteratorFree( search_itr );
3426                                         buffer_free( sql_buf );
3427                                         return NULL;
3428                                 }
3429
3430                                 char* subpred = searchPredicate( class_info, field, node, ctx );
3431                                 if( ! subpred ) {
3432                                         buffer_free( sql_buf );
3433                                         jsonIteratorFree( search_itr );
3434                                         return NULL;
3435                                 }
3436
3437                                 buffer_add( sql_buf, subpred );
3438                                 free( subpred );
3439                         }
3440                 }
3441                 jsonIteratorFree( search_itr );
3442
3443         } else {
3444                 // ERROR ... only hash and array allowed at this level
3445                 char* predicate_string = jsonObjectToJSON( search_hash );
3446                 osrfLogError(
3447                         OSRF_LOG_MARK,
3448                         "%s: Invalid predicate structure: %s",
3449                         modulename,
3450                         predicate_string
3451                 );
3452                 buffer_free( sql_buf );
3453                 free( predicate_string );
3454                 return NULL;
3455         }
3456
3457         return buffer_release( sql_buf );
3458 }
3459
3460 /* Build a JSON_ARRAY of field names for a given table alias
3461 */
3462 static jsonObject* defaultSelectList( const char* table_alias ) {
3463
3464         if( ! table_alias )
3465                 table_alias = "";
3466
3467         ClassInfo* class_info = search_all_alias( table_alias );
3468         if( ! class_info ) {
3469                 osrfLogError(
3470                         OSRF_LOG_MARK,
3471                         "%s: Can't build default SELECT clause for \"%s\"; no such table alias",
3472                         modulename,
3473                         table_alias
3474                 );
3475                 return NULL;
3476         }
3477
3478         jsonObject* array = jsonNewObjectType( JSON_ARRAY );
3479         osrfHash* field_def = NULL;
3480         osrfHashIterator* field_itr = osrfNewHashIterator( class_info->fields );
3481         while( ( field_def = osrfHashIteratorNext( field_itr ) ) ) {
3482                 const char* field_name = osrfHashIteratorKey( field_itr );
3483                 if( ! str_is_true( osrfHashGet( field_def, "virtual" ) ) ) {
3484                         jsonObjectPush( array, jsonNewObject( field_name ) );
3485                 }
3486         }
3487         osrfHashIteratorFree( field_itr );
3488
3489         return array;
3490 }
3491
3492 // Translate a jsonObject into a UNION, INTERSECT, or EXCEPT query.
3493 // The jsonObject must be a JSON_HASH with an single entry for "union",
3494 // "intersect", or "except".  The data associated with this key must be an
3495 // array of hashes, each hash being a query.
3496 // Also allowed but currently ignored: entries for "order_by" and "alias".
3497 static char* doCombo( osrfMethodContext* ctx, jsonObject* combo, int flags ) {
3498         // Sanity check
3499         if( ! combo || combo->type != JSON_HASH )
3500                 return NULL;      // should be impossible; validated by caller
3501
3502         const jsonObject* query_array = NULL;   // array of subordinate queries
3503         const char* op = NULL;     // name of operator, e.g. UNION
3504         const char* alias = NULL;  // alias for the query (needed for ORDER BY)
3505         int op_count = 0;          // for detecting conflicting operators
3506         int excepting = 0;         // boolean
3507         int all = 0;               // boolean
3508         jsonObject* order_obj = NULL;
3509
3510         // Identify the elements in the hash
3511         jsonIterator* query_itr = jsonNewIterator( combo );
3512         jsonObject* curr_obj = NULL;
3513         while( (curr_obj = jsonIteratorNext( query_itr ) ) ) {
3514                 if( ! strcmp( "union", query_itr->key ) ) {
3515                         ++op_count;
3516                         op = " UNION ";
3517                         query_array = curr_obj;
3518                 } else if( ! strcmp( "intersect", query_itr->key ) ) {
3519                         ++op_count;
3520                         op = " INTERSECT ";
3521                         query_array = curr_obj;
3522                 } else if( ! strcmp( "except", query_itr->key ) ) {
3523                         ++op_count;
3524                         op = " EXCEPT ";
3525                         excepting = 1;
3526                         query_array = curr_obj;
3527                 } else if( ! strcmp( "order_by", query_itr->key ) ) {
3528                         osrfLogWarning(
3529                                 OSRF_LOG_MARK,
3530                                 "%s: ORDER BY not supported for UNION, INTERSECT, or EXCEPT",
3531                                 modulename
3532                         );
3533                         order_obj = curr_obj;
3534                 } else if( ! strcmp( "alias", query_itr->key ) ) {
3535                         if( curr_obj->type != JSON_STRING ) {
3536                                 jsonIteratorFree( query_itr );
3537                                 return NULL;
3538                         }
3539                         alias = jsonObjectGetString( curr_obj );
3540                 } else if( ! strcmp( "all", query_itr->key ) ) {
3541                         if( obj_is_true( curr_obj ) )
3542                                 all = 1;
3543                 } else {
3544                         if( ctx )
3545                                 osrfAppSessionStatus(
3546                                         ctx->session,
3547                                         OSRF_STATUS_INTERNALSERVERERROR,
3548                                         "osrfMethodException",
3549                                         ctx->request,
3550                                         "Malformed query; unexpected entry in query object"
3551                                 );
3552                         osrfLogError(
3553                                 OSRF_LOG_MARK,
3554                                 "%s: Unexpected entry for \"%s\" in%squery",
3555                                 modulename,
3556                                 query_itr->key,
3557                                 op
3558                         );
3559                         jsonIteratorFree( query_itr );
3560                         return NULL;
3561                 }
3562         }
3563         jsonIteratorFree( query_itr );
3564
3565         // More sanity checks
3566         if( ! query_array ) {
3567                 if( ctx )
3568                         osrfAppSessionStatus(
3569                                 ctx->session,
3570                                 OSRF_STATUS_INTERNALSERVERERROR,
3571                                 "osrfMethodException",
3572                                 ctx->request,
3573                                 "Expected UNION, INTERSECT, or EXCEPT operator not found"
3574                         );
3575                 osrfLogError(
3576                         OSRF_LOG_MARK,
3577                         "%s: Expected UNION, INTERSECT, or EXCEPT operator not found",
3578                         modulename
3579                 );
3580                 return NULL;        // should be impossible...
3581         } else if( op_count > 1 ) {
3582                 if( ctx )
3583                                 osrfAppSessionStatus(
3584                                 ctx->session,
3585                                 OSRF_STATUS_INTERNALSERVERERROR,
3586                                 "osrfMethodException",
3587                                 ctx->request,
3588                                 "Found more than one of UNION, INTERSECT, and EXCEPT in same query"
3589                         );
3590                 osrfLogError(
3591                         OSRF_LOG_MARK,
3592                         "%s: Found more than one of UNION, INTERSECT, and EXCEPT in same query",
3593                         modulename
3594                 );
3595                 return NULL;
3596         } if( query_array->type != JSON_ARRAY ) {
3597                 if( ctx )
3598                                 osrfAppSessionStatus(
3599                                 ctx->session,
3600                                 OSRF_STATUS_INTERNALSERVERERROR,
3601                                 "osrfMethodException",
3602                                 ctx->request,
3603                                 "Malformed query: expected array of queries under UNION, INTERSECT or EXCEPT"
3604                         );
3605                 osrfLogError(
3606                         OSRF_LOG_MARK,
3607                         "%s: Expected JSON_ARRAY of queries for%soperator; found %s",
3608                         modulename,
3609                         op,
3610                         json_type( query_array->type )
3611                 );
3612                 return NULL;
3613         } if( query_array->size < 2 ) {
3614                 if( ctx )
3615                         osrfAppSessionStatus(
3616                                 ctx->session,
3617                                 OSRF_STATUS_INTERNALSERVERERROR,
3618                                 "osrfMethodException",
3619                                 ctx->request,
3620                                 "UNION, INTERSECT or EXCEPT requires multiple queries as operands"
3621                         );
3622                 osrfLogError(
3623                         OSRF_LOG_MARK,
3624                         "%s:%srequires multiple queries as operands",
3625                         modulename,
3626                         op
3627                 );
3628                 return NULL;
3629         } else if( excepting && query_array->size > 2 ) {
3630                 if( ctx )
3631                         osrfAppSessionStatus(
3632                                 ctx->session,
3633                                 OSRF_STATUS_INTERNALSERVERERROR,
3634                                 "osrfMethodException",
3635                                 ctx->request,
3636                                 "EXCEPT operator has too many queries as operands"
3637                         );
3638                 osrfLogError(
3639                         OSRF_LOG_MARK,
3640                         "%s:EXCEPT operator has too many queries as operands",
3641                         modulename
3642                 );
3643                 return NULL;
3644         } else if( order_obj && ! alias ) {
3645                 if( ctx )
3646                         osrfAppSessionStatus(
3647                                 ctx->session,
3648                                 OSRF_STATUS_INTERNALSERVERERROR,
3649                                 "osrfMethodException",
3650                                 ctx->request,
3651                                 "ORDER BY requires an alias for a UNION, INTERSECT, or EXCEPT"
3652                         );
3653                 osrfLogError(
3654                         OSRF_LOG_MARK,
3655                         "%s:ORDER BY requires an alias for a UNION, INTERSECT, or EXCEPT",
3656                         modulename
3657                 );
3658                 return NULL;
3659         }
3660
3661         // So far so good.  Now build the SQL.
3662         growing_buffer* sql = buffer_init( 256 );
3663
3664         // If we nested inside another UNION, INTERSECT, or EXCEPT,
3665         // Add a layer of parentheses
3666         if( flags & SUBCOMBO )
3667                 OSRF_BUFFER_ADD( sql, "( " );
3668
3669         // Traverse the query array.  Each entry should be a hash.
3670         int first = 1;   // boolean
3671         int i = 0;
3672         jsonObject* query = NULL;
3673         while( (query = jsonObjectGetIndex( query_array, i++ )) ) {
3674                 if( query->type != JSON_HASH ) {
3675                         if( ctx )
3676                                 osrfAppSessionStatus(
3677                                         ctx->session,
3678                                         OSRF_STATUS_INTERNALSERVERERROR,
3679                                         "osrfMethodException",
3680                                         ctx->request,
3681                                         "Malformed query under UNION, INTERSECT or EXCEPT"
3682                                 );
3683                         osrfLogError(
3684                                 OSRF_LOG_MARK,
3685                                 "%s: Malformed query under%s -- expected JSON_HASH, found %s",
3686                                 modulename,
3687                                 op,
3688                                 json_type( query->type )
3689                         );
3690                         buffer_free( sql );
3691                         return NULL;
3692                 }
3693
3694                 if( first )
3695                         first = 0;
3696                 else {
3697                         OSRF_BUFFER_ADD( sql, op );
3698                         if( all )
3699                                 OSRF_BUFFER_ADD( sql, "ALL " );
3700                 }
3701
3702                 char* query_str = buildQuery( ctx, query, SUBSELECT | SUBCOMBO );
3703                 if( ! query_str ) {
3704                         osrfLogError(
3705                                 OSRF_LOG_MARK,
3706                                 "%s: Error building query under%s",
3707                                 modulename,
3708                                 op
3709                         );
3710                         buffer_free( sql );
3711                         return NULL;
3712                 }
3713
3714                 OSRF_BUFFER_ADD( sql, query_str );
3715         }
3716
3717         if( flags & SUBCOMBO )
3718                 OSRF_BUFFER_ADD_CHAR( sql, ')' );
3719
3720         if( !(flags & SUBSELECT) )
3721                 OSRF_BUFFER_ADD_CHAR( sql, ';' );
3722
3723         return buffer_release( sql );
3724 }
3725
3726 // Translate a jsonObject into a SELECT, UNION, INTERSECT, or EXCEPT query.
3727 // The jsonObject must be a JSON_HASH with an entry for "from", "union", "intersect",
3728 // or "except" to indicate the type of query.
3729 char* buildQuery( osrfMethodContext* ctx, jsonObject* query, int flags ) {
3730         // Sanity checks
3731         if( ! query ) {
3732                 if( ctx )
3733                         osrfAppSessionStatus(
3734                                 ctx->session,
3735                                 OSRF_STATUS_INTERNALSERVERERROR,
3736                                 "osrfMethodException",
3737                                 ctx->request,
3738                                 "Malformed query; no query object"
3739                         );
3740                 osrfLogError( OSRF_LOG_MARK, "%s: Null pointer to query object", modulename );
3741                 return NULL;
3742         } else if( query->type != JSON_HASH ) {
3743                 if( ctx )
3744                         osrfAppSessionStatus(
3745                                 ctx->session,
3746                                 OSRF_STATUS_INTERNALSERVERERROR,
3747                                 "osrfMethodException",
3748                                 ctx->request,
3749                                 "Malformed query object"
3750                         );
3751                 osrfLogError(
3752                         OSRF_LOG_MARK,
3753                         "%s: Query object is %s instead of JSON_HASH",
3754                         modulename,
3755                         json_type( query->type )
3756                 );
3757                 return NULL;
3758         }
3759
3760         // Determine what kind of query it purports to be, and dispatch accordingly.
3761         if( jsonObjectGetKeyConst( query, "union" ) ||
3762                 jsonObjectGetKeyConst( query, "intersect" ) ||
3763                 jsonObjectGetKeyConst( query, "except" )) {
3764                 return doCombo( ctx, query, flags );
3765         } else {
3766                 // It is presumably a SELECT query
3767
3768                 // Push a node onto the stack for the current query.  Every level of
3769                 // subquery gets its own QueryFrame on the Stack.
3770                 push_query_frame();
3771
3772                 // Build an SQL SELECT statement
3773                 char* sql = SELECT(
3774                         ctx,
3775                         jsonObjectGetKey( query, "select" ),
3776                         jsonObjectGetKeyConst( query, "from" ),
3777                         jsonObjectGetKeyConst( query, "where" ),
3778                         jsonObjectGetKeyConst( query, "having" ),
3779                         jsonObjectGetKeyConst( query, "order_by" ),
3780                         jsonObjectGetKeyConst( query, "limit" ),
3781                         jsonObjectGetKeyConst( query, "offset" ),
3782                         flags
3783                 );
3784                 pop_query_frame();
3785                 return sql;
3786         }
3787 }
3788
3789 char* SELECT (
3790                 /* method context */ osrfMethodContext* ctx,
3791
3792                 /* SELECT   */ jsonObject* selhash,
3793                 /* FROM     */ const jsonObject* join_hash,
3794                 /* WHERE    */ const jsonObject* search_hash,
3795                 /* HAVING   */ const jsonObject* having_hash,
3796                 /* ORDER BY */ const jsonObject* order_hash,
3797                 /* LIMIT    */ const jsonObject* limit,
3798                 /* OFFSET   */ const jsonObject* offset,
3799                 /* flags    */ int flags
3800 ) {
3801         const char* locale = osrf_message_get_last_locale();
3802
3803         // general tmp objects
3804         const jsonObject* tmp_const;
3805         jsonObject* selclass = NULL;
3806         jsonObject* snode = NULL;
3807         jsonObject* onode = NULL;
3808
3809         char* string = NULL;
3810         int from_function = 0;
3811         int first = 1;
3812         int gfirst = 1;
3813         //int hfirst = 1;
3814
3815         osrfLogDebug(OSRF_LOG_MARK, "cstore SELECT locale: %s", locale ? locale : "(none)" );
3816
3817         // punt if there's no FROM clause
3818         if( !join_hash || ( join_hash->type == JSON_HASH && !join_hash->size )) {
3819                 osrfLogError(
3820                         OSRF_LOG_MARK,
3821                         "%s: FROM clause is missing or empty",
3822                         modulename
3823                 );
3824                 if( ctx )
3825                         osrfAppSessionStatus(
3826                                 ctx->session,
3827                                 OSRF_STATUS_INTERNALSERVERERROR,
3828                                 "osrfMethodException",
3829                                 ctx->request,
3830                                 "FROM clause is missing or empty in JSON query"
3831                         );
3832                 return NULL;
3833         }
3834
3835         // the core search class
3836         const char* core_class = NULL;
3837
3838         // get the core class -- the only key of the top level FROM clause, or a string
3839         if( join_hash->type == JSON_HASH ) {
3840                 jsonIterator* tmp_itr = jsonNewIterator( join_hash );
3841                 snode = jsonIteratorNext( tmp_itr );
3842
3843                 // Populate the current QueryFrame with information
3844                 // about the core class
3845                 if( add_query_core( NULL, tmp_itr->key ) ) {
3846                         if( ctx )
3847                                 osrfAppSessionStatus(
3848                                         ctx->session,
3849                                         OSRF_STATUS_INTERNALSERVERERROR,
3850                                         "osrfMethodException",
3851                                         ctx->request,
3852                                         "Unable to look up core class"
3853                                 );
3854                         return NULL;
3855                 }
3856                 core_class = curr_query->core.class_name;
3857                 join_hash = snode;
3858
3859                 jsonObject* extra = jsonIteratorNext( tmp_itr );
3860
3861                 jsonIteratorFree( tmp_itr );
3862                 snode = NULL;
3863
3864                 // There shouldn't be more than one entry in join_hash
3865                 if( extra ) {
3866                         osrfLogError(
3867                                 OSRF_LOG_MARK,
3868                                 "%s: Malformed FROM clause: extra entry in JSON_HASH",
3869                                 modulename
3870                         );
3871                         if( ctx )
3872                                 osrfAppSessionStatus(
3873                                         ctx->session,
3874                                         OSRF_STATUS_INTERNALSERVERERROR,
3875                                         "osrfMethodException",
3876                                         ctx->request,
3877                                         "Malformed FROM clause in JSON query"
3878                                 );
3879                         return NULL;    // Malformed join_hash; extra entry
3880                 }
3881         } else if( join_hash->type == JSON_ARRAY ) {
3882                 // We're selecting from a function, not from a table
3883                 from_function = 1;
3884                 core_class = jsonObjectGetString( jsonObjectGetIndex( join_hash, 0 ));
3885                 selhash = NULL;
3886
3887         } else if( join_hash->type == JSON_STRING ) {
3888                 // Populate the current QueryFrame with information
3889                 // about the core class
3890                 core_class = jsonObjectGetString( join_hash );
3891                 join_hash = NULL;
3892                 if( add_query_core( NULL, core_class ) ) {
3893                         if( ctx )
3894                                 osrfAppSessionStatus(
3895                                         ctx->session,
3896                                         OSRF_STATUS_INTERNALSERVERERROR,
3897                                         "osrfMethodException",
3898                                         ctx->request,
3899                                         "Unable to look up core class"
3900                                 );
3901                         return NULL;
3902                 }
3903         }
3904         else {
3905                 osrfLogError(
3906                         OSRF_LOG_MARK,
3907                         "%s: FROM clause is unexpected JSON type: %s",
3908                         modulename,
3909                         json_type( join_hash->type )
3910                 );
3911                 if( ctx )
3912                         osrfAppSessionStatus(
3913                                 ctx->session,
3914                                 OSRF_STATUS_INTERNALSERVERERROR,
3915                                 "osrfMethodException",
3916                                 ctx->request,
3917                                 "Ill-formed FROM clause in JSON query"
3918                         );
3919                 return NULL;
3920         }
3921
3922         // Build the join clause, if any, while filling out the list
3923         // of joined classes in the current QueryFrame.
3924         char* join_clause = NULL;
3925         if( join_hash && ! from_function ) {
3926
3927                 join_clause = searchJOIN( join_hash, &curr_query->core );
3928                 if( ! join_clause ) {
3929                         if( ctx )
3930                                 osrfAppSessionStatus(
3931                                         ctx->session,
3932                                         OSRF_STATUS_INTERNALSERVERERROR,
3933                                         "osrfMethodException",
3934                                         ctx->request,
3935                                         "Unable to construct JOIN clause(s)"
3936                                 );
3937                         return NULL;
3938                 }
3939         }
3940
3941         // For in case we don't get a select list
3942         jsonObject* defaultselhash = NULL;
3943
3944         // if there is no select list, build a default select list ...
3945         if( !selhash && !from_function ) {
3946                 jsonObject* default_list = defaultSelectList( core_class );
3947                 if( ! default_list ) {
3948                         if( ctx ) {
3949                                 osrfAppSessionStatus(
3950                                         ctx->session,
3951                                         OSRF_STATUS_INTERNALSERVERERROR,
3952                                         "osrfMethodException",
3953                                         ctx->request,
3954                                         "Unable to build default SELECT clause in JSON query"
3955                                 );
3956                                 free( join_clause );
3957                                 return NULL;
3958                         }
3959                 }
3960
3961                 selhash = defaultselhash = jsonNewObjectType( JSON_HASH );
3962                 jsonObjectSetKey( selhash, core_class, default_list );
3963         }
3964
3965         // The SELECT clause can be encoded only by a hash
3966         if( !from_function && selhash->type != JSON_HASH ) {
3967                 osrfLogError(
3968                         OSRF_LOG_MARK,
3969                         "%s: Expected JSON_HASH for SELECT clause; found %s",
3970                         modulename,
3971                         json_type( selhash->type )
3972                 );
3973
3974                 if( ctx )
3975                         osrfAppSessionStatus(
3976                                 ctx->session,
3977                                 OSRF_STATUS_INTERNALSERVERERROR,
3978                                 "osrfMethodException",
3979                                 ctx->request,
3980                                 "Malformed SELECT clause in JSON query"
3981                         );
3982                 free( join_clause );
3983                 return NULL;
3984         }
3985
3986         // If you see a null or wild card specifier for the core class, or an
3987         // empty array, replace it with a default SELECT list
3988         tmp_const = jsonObjectGetKeyConst( selhash, core_class );
3989         if( tmp_const ) {
3990                 int default_needed = 0;   // boolean
3991                 if( JSON_STRING == tmp_const->type
3992                         && !strcmp( "*", jsonObjectGetString( tmp_const ) ))
3993                                 default_needed = 1;
3994                 else if( JSON_NULL == tmp_const->type )
3995                         default_needed = 1;
3996
3997                 if( default_needed ) {
3998                         // Build a default SELECT list
3999                         jsonObject* default_list = defaultSelectList( core_class );
4000                         if( ! default_list ) {
4001                                 if( ctx ) {
4002                                         osrfAppSessionStatus(
4003                                                 ctx->session,
4004                                                 OSRF_STATUS_INTERNALSERVERERROR,
4005                                                 "osrfMethodException",
4006                                                 ctx->request,
4007                                                 "Can't build default SELECT clause in JSON query"
4008                                         );
4009                                         free( join_clause );
4010                                         return NULL;
4011                                 }
4012                         }
4013
4014                         jsonObjectSetKey( selhash, core_class, default_list );
4015                 }
4016         }
4017
4018         // temp buffers for the SELECT list and GROUP BY clause
4019         growing_buffer* select_buf = buffer_init( 128 );
4020         growing_buffer* group_buf  = buffer_init( 128 );
4021
4022         int aggregate_found = 0;     // boolean
4023
4024         // Build a select list
4025         if( from_function )   // From a function we select everything
4026                 OSRF_BUFFER_ADD_CHAR( select_buf, '*' );
4027         else {
4028
4029                 // Build the SELECT list as SQL
4030             int sel_pos = 1;
4031             first = 1;
4032             gfirst = 1;
4033             jsonIterator* selclass_itr = jsonNewIterator( selhash );
4034             while ( (selclass = jsonIteratorNext( selclass_itr )) ) {    // For each class
4035
4036                         const char* cname = selclass_itr->key;
4037
4038                         // Make sure the target relation is in the FROM clause.
4039
4040                         // At this point join_hash is a step down from the join_hash we
4041                         // received as a parameter.  If the original was a JSON_STRING,
4042                         // then json_hash is now NULL.  If the original was a JSON_HASH,
4043                         // then json_hash is now the first (and only) entry in it,
4044                         // denoting the core class.  We've already excluded the
4045                         // possibility that the original was a JSON_ARRAY, because in
4046                         // that case from_function would be non-NULL, and we wouldn't
4047                         // be here.
4048
4049                         // If the current table alias isn't in scope, bail out
4050                         ClassInfo* class_info = search_alias( cname );
4051                         if( ! class_info ) {
4052                                 osrfLogError(
4053                                         OSRF_LOG_MARK,
4054                                         "%s: SELECT clause references class not in FROM clause: \"%s\"",
4055                                         modulename,
4056                                         cname
4057                                 );
4058                                 if( ctx )
4059                                         osrfAppSessionStatus(
4060                                                 ctx->session,
4061                                                 OSRF_STATUS_INTERNALSERVERERROR,
4062                                                 "osrfMethodException",
4063                                                 ctx->request,
4064                                                 "Selected class not in FROM clause in JSON query"
4065                                         );
4066                                 jsonIteratorFree( selclass_itr );
4067                                 buffer_free( select_buf );
4068                                 buffer_free( group_buf );
4069                                 if( defaultselhash )
4070                                         jsonObjectFree( defaultselhash );
4071                                 free( join_clause );
4072                                 return NULL;
4073                         }
4074
4075                         if( selclass->type != JSON_ARRAY ) {
4076                                 osrfLogError(
4077                                         OSRF_LOG_MARK,
4078                                         "%s: Malformed SELECT list for class \"%s\"; not an array",
4079                                         modulename,
4080                                         cname
4081                                 );
4082                                 if( ctx )
4083                                         osrfAppSessionStatus(
4084                                                 ctx->session,
4085                                                 OSRF_STATUS_INTERNALSERVERERROR,
4086                                                 "osrfMethodException",
4087                                                 ctx->request,
4088                                                 "Selected class not in FROM clause in JSON query"
4089                                         );
4090
4091                                 jsonIteratorFree( selclass_itr );
4092                                 buffer_free( select_buf );
4093                                 buffer_free( group_buf );
4094                                 if( defaultselhash )
4095                                         jsonObjectFree( defaultselhash );
4096                                 free( join_clause );
4097                                 return NULL;
4098                         }
4099
4100                         // Look up some attributes of the current class
4101                         osrfHash* idlClass        = class_info->class_def;
4102                         osrfHash* class_field_set = class_info->fields;
4103                         const char* class_pkey    = osrfHashGet( idlClass, "primarykey" );
4104                         const char* class_tname   = osrfHashGet( idlClass, "tablename" );
4105
4106                         if( 0 == selclass->size ) {
4107                                 osrfLogWarning(
4108                                         OSRF_LOG_MARK,
4109                                         "%s: No columns selected from \"%s\"",
4110                                         modulename,
4111                                         cname
4112                                 );
4113                         }
4114
4115                         // stitch together the column list for the current table alias...
4116                         unsigned long field_idx = 0;
4117                         jsonObject* selfield = NULL;
4118                         while(( selfield = jsonObjectGetIndex( selclass, field_idx++ ) )) {
4119
4120                                 // If we need a separator comma, add one
4121                                 if( first ) {
4122                                         first = 0;
4123                                 } else {
4124                                         OSRF_BUFFER_ADD_CHAR( select_buf, ',' );
4125                                 }
4126
4127                                 // if the field specification is a string, add it to the list
4128                                 if( selfield->type == JSON_STRING ) {
4129
4130                                         // Look up the field in the IDL
4131                                         const char* col_name = jsonObjectGetString( selfield );
4132                                         osrfHash* field_def = osrfHashGet( class_field_set, col_name );
4133                                         if( !field_def ) {
4134                                                 // No such field in current class
4135                                                 osrfLogError(
4136                                                         OSRF_LOG_MARK,
4137                                                         "%s: Selected column \"%s\" not defined in IDL for class \"%s\"",
4138                                                         modulename,
4139                                                         col_name,
4140                                                         cname
4141                                                 );
4142                                                 if( ctx )
4143                                                         osrfAppSessionStatus(
4144                                                                 ctx->session,
4145                                                                 OSRF_STATUS_INTERNALSERVERERROR,
4146                                                                 "osrfMethodException",
4147                                                                 ctx->request,
4148                                                                 "Selected column not defined in JSON query"
4149                                                         );
4150                                                 jsonIteratorFree( selclass_itr );
4151                                                 buffer_free( select_buf );
4152                                                 buffer_free( group_buf );
4153                                                 if( defaultselhash )
4154                                                         jsonObjectFree( defaultselhash );
4155                                                 free( join_clause );
4156                                                 return NULL;
4157                                         } else if( str_is_true( osrfHashGet( field_def, "virtual" ) ) ) {
4158                                                 // Virtual field not allowed
4159                                                 osrfLogError(
4160                                                         OSRF_LOG_MARK,
4161                                                         "%s: Selected column \"%s\" for class \"%s\" is virtual",
4162                                                         modulename,
4163                                                         col_name,
4164                                                         cname
4165                                                 );
4166                                                 if( ctx )
4167                                                         osrfAppSessionStatus(
4168                                                                 ctx->session,
4169                                                                 OSRF_STATUS_INTERNALSERVERERROR,
4170                                                                 "osrfMethodException",
4171                                                                 ctx->request,
4172                                                                 "Selected column may not be virtual in JSON query"
4173                                                         );
4174                                                 jsonIteratorFree( selclass_itr );
4175                                                 buffer_free( select_buf );
4176                                                 buffer_free( group_buf );
4177                                                 if( defaultselhash )
4178                                                         jsonObjectFree( defaultselhash );
4179                                                 free( join_clause );
4180                                                 return NULL;
4181                                         }
4182
4183                                         if( locale ) {
4184                                                 const char* i18n;
4185                                                 if( flags & DISABLE_I18N )
4186                                                         i18n = NULL;
4187                                                 else
4188                                                         i18n = osrfHashGet( field_def, "i18n" );
4189
4190                                                 if( str_is_true( i18n ) ) {
4191                                                         buffer_fadd( select_buf, " oils_i18n_xlate('%s', '%s', '%s', "
4192                                                                 "'%s', \"%s\".%s::TEXT, '%s') AS \"%s\"",
4193                                                                 class_tname, cname, col_name, class_pkey,
4194                                                                 cname, class_pkey, locale, col_name );
4195                                                 } else {
4196                                                         buffer_fadd( select_buf, " \"%s\".%s AS \"%s\"",
4197                                                                 cname, col_name, col_name );
4198                                                 }
4199                                         } else {
4200                                                 buffer_fadd( select_buf, " \"%s\".%s AS \"%s\"",
4201                                                                 cname, col_name, col_name );
4202                                         }
4203
4204                                 // ... but it could be an object, in which case we check for a Field Transform
4205                                 } else if( selfield->type == JSON_HASH ) {
4206
4207                                         const char* col_name = jsonObjectGetString(
4208                                                         jsonObjectGetKeyConst( selfield, "column" ) );
4209
4210                                         // Get the field definition from the IDL
4211                                         osrfHash* field_def = osrfHashGet( class_field_set, col_name );
4212                                         if( !field_def ) {
4213                                                 // No such field in current class
4214                                                 osrfLogError(
4215                                                         OSRF_LOG_MARK,
4216                                                         "%s: Selected column \"%s\" is not defined in IDL for class \"%s\"",
4217                                                         modulename,
4218                                                         col_name,
4219                                                         cname
4220                                                 );
4221                                                 if( ctx )
4222                                                         osrfAppSessionStatus(
4223                                                                 ctx->session,
4224                                                                 OSRF_STATUS_INTERNALSERVERERROR,
4225                                                                 "osrfMethodException",
4226                                                                 ctx->request,
4227                                                                 "Selected column is not defined in JSON query"
4228                                                         );
4229                                                 jsonIteratorFree( selclass_itr );
4230                                                 buffer_free( select_buf );
4231                                                 buffer_free( group_buf );
4232                                                 if( defaultselhash )
4233                                                         jsonObjectFree( defaultselhash );
4234                                                 free( join_clause );
4235                                                 return NULL;
4236                                         } else if( str_is_true( osrfHashGet( field_def, "virtual" ))) {
4237                                                 // No such field in current class
4238                                                 osrfLogError(
4239                                                         OSRF_LOG_MARK,
4240                                                         "%s: Selected column \"%s\" is virtual for class \"%s\"",
4241                                                         modulename,
4242                                                         col_name,
4243                                                         cname
4244                                                 );
4245                                                 if( ctx )
4246                                                         osrfAppSessionStatus(
4247                                                                 ctx->session,
4248                                                                 OSRF_STATUS_INTERNALSERVERERROR,
4249                                                                 "osrfMethodException",
4250                                                                 ctx->request,
4251                                                                 "Selected column is virtual in JSON query"
4252                                                         );
4253                                                 jsonIteratorFree( selclass_itr );
4254                                                 buffer_free( select_buf );
4255                                                 buffer_free( group_buf );
4256                                                 if( defaultselhash )
4257                                                         jsonObjectFree( defaultselhash );
4258                                                 free( join_clause );
4259                                                 return NULL;
4260                                         }
4261
4262                                         // Decide what to use as a column alias
4263                                         const char* _alias;
4264                                         if((tmp_const = jsonObjectGetKeyConst( selfield, "alias" ))) {
4265                                                 _alias = jsonObjectGetString( tmp_const );
4266                                         } else {         // Use field name as the alias
4267                                                 _alias = col_name;
4268                                         }
4269
4270                                         if( jsonObjectGetKeyConst( selfield, "transform" )) {
4271                                                 char* transform_str = searchFieldTransform(
4272                                                         class_info->alias, field_def, selfield );
4273                                                 if( transform_str ) {
4274                                                         buffer_fadd( select_buf, " %s AS \"%s\"", transform_str, _alias );
4275                                                         free( transform_str );
4276                                                 } else {
4277                                                         if( ctx )
4278                                                                 osrfAppSessionStatus(
4279                                                                         ctx->session,
4280                                                                         OSRF_STATUS_INTERNALSERVERERROR,
4281                                                                         "osrfMethodException",
4282                                                                         ctx->request,
4283                                                                         "Unable to generate transform function in JSON query"
4284                                                                 );
4285                                                         jsonIteratorFree( selclass_itr );
4286                                                         buffer_free( select_buf );
4287                                                         buffer_free( group_buf );
4288                                                         if( defaultselhash )
4289                                                                 jsonObjectFree( defaultselhash );
4290                                                         free( join_clause );
4291                                                         return NULL;
4292                                                 }
4293                                         } else {
4294
4295                                                 if( locale ) {
4296                                                         const char* i18n;
4297                                                         if( flags & DISABLE_I18N )
4298                                                                 i18n = NULL;
4299                                                         else
4300                                                                 i18n = osrfHashGet( field_def, "i18n" );
4301
4302                                                         if( str_is_true( i18n ) ) {
4303                                                                 buffer_fadd( select_buf,
4304                                                                         " oils_i18n_xlate('%s', '%s', '%s', '%s', "
4305                                                                         "\"%s\".%s::TEXT, '%s') AS \"%s\"",
4306                                                                         class_tname, cname, col_name, class_pkey, cname,
4307                                                                         class_pkey, locale, _alias );
4308                                                         } else {
4309                                                                 buffer_fadd( select_buf, " \"%s\".%s AS \"%s\"",
4310                                                                         cname, col_name, _alias );
4311                                                         }
4312                                                 } else {
4313                                                         buffer_fadd( select_buf, " \"%s\".%s AS \"%s\"",
4314                                                                 cname, col_name, _alias );
4315                                                 }
4316                                         }
4317                                 }
4318                                 else {
4319                                         osrfLogError(
4320                                                 OSRF_LOG_MARK,
4321                                                 "%s: Selected item is unexpected JSON type: %s",
4322                                                 modulename,
4323                                                 json_type( selfield->type )
4324                                         );
4325                                         if( ctx )
4326                                                 osrfAppSessionStatus(
4327                                                         ctx->session,
4328                                                         OSRF_STATUS_INTERNALSERVERERROR,
4329                                                         "osrfMethodException",
4330                                                         ctx->request,
4331                                                         "Ill-formed SELECT item in JSON query"
4332                                                 );
4333                                         jsonIteratorFree( selclass_itr );
4334                                         buffer_free( select_buf );
4335                                         buffer_free( group_buf );
4336                                         if( defaultselhash )
4337                                                 jsonObjectFree( defaultselhash );
4338                                         free( join_clause );
4339                                         return NULL;
4340                                 }
4341
4342                                 const jsonObject* agg_obj = jsonObjectGetKeyConst( selfield, "aggregate" );
4343                                 if( obj_is_true( agg_obj ) )
4344                                         aggregate_found = 1;
4345                                 else {
4346                                         // Append a comma (except for the first one)
4347                                         // and add the column to a GROUP BY clause
4348                                         if( gfirst )
4349                                                 gfirst = 0;
4350                                         else
4351                                                 OSRF_BUFFER_ADD_CHAR( group_buf, ',' );
4352
4353                                         buffer_fadd( group_buf, " %d", sel_pos );
4354                                 }
4355
4356 #if 0
4357                             if (is_agg->size || (flags & SELECT_DISTINCT)) {
4358
4359                                         const jsonObject* aggregate_obj = jsonObjectGetKeyConst( elfield, "aggregate");
4360                                     if ( ! obj_is_true( aggregate_obj ) ) {
4361                                             if (gfirst) {
4362                                                     gfirst = 0;
4363                                             } else {
4364                                                         OSRF_BUFFER_ADD_CHAR( group_buf, ',' );
4365                                             }
4366
4367                                             buffer_fadd(group_buf, " %d", sel_pos);
4368
4369                                         /*
4370                                     } else if (is_agg = jsonObjectGetKeyConst( selfield, "having" )) {
4371                                             if (gfirst) {
4372                                                     gfirst = 0;
4373                                             } else {
4374                                                         OSRF_BUFFER_ADD_CHAR( group_buf, ',' );
4375                                             }
4376
4377                                             _column = searchFieldTransform(class_info->alias, field, selfield);
4378                                                 OSRF_BUFFER_ADD_CHAR(group_buf, ' ');
4379                                                 OSRF_BUFFER_ADD(group_buf, _column);
4380                                             _column = searchFieldTransform(class_info->alias, field, selfield);
4381                                         */
4382                                     }
4383                             }
4384 #endif
4385
4386                                 sel_pos++;
4387                         } // end while -- iterating across SELECT columns
4388
4389                 } // end while -- iterating across classes
4390
4391                 jsonIteratorFree( selclass_itr );
4392         }
4393
4394         char* col_list = buffer_release( select_buf );
4395
4396         // Make sure the SELECT list isn't empty.  This can happen, for example,
4397         // if we try to build a default SELECT clause from a non-core table.
4398
4399         if( ! *col_list ) {
4400                 osrfLogError( OSRF_LOG_MARK, "%s: SELECT clause is empty", modulename );
4401                 if( ctx )
4402                         osrfAppSessionStatus(
4403                                 ctx->session,
4404                                 OSRF_STATUS_INTERNALSERVERERROR,
4405                                 "osrfMethodException",
4406                                 ctx->request,
4407                                 "SELECT list is empty"
4408                 );
4409                 free( col_list );
4410                 buffer_free( group_buf );
4411                 if( defaultselhash )
4412                         jsonObjectFree( defaultselhash );
4413                 free( join_clause );
4414                 return NULL;
4415         }
4416
4417         char* table = NULL;
4418         if( from_function )
4419                 table = searchValueTransform( join_hash );
4420         else
4421                 table = strdup( curr_query->core.source_def );
4422
4423         if( !table ) {
4424                 if( ctx )
4425                         osrfAppSessionStatus(
4426                                 ctx->session,
4427                                 OSRF_STATUS_INTERNALSERVERERROR,
4428                                 "osrfMethodException",
4429                                 ctx->request,
4430                                 "Unable to identify table for core class"
4431                         );
4432                 free( col_list );
4433                 buffer_free( group_buf );
4434                 if( defaultselhash )
4435                         jsonObjectFree( defaultselhash );
4436                 free( join_clause );
4437                 return NULL;
4438         }
4439
4440         // Put it all together
4441         growing_buffer* sql_buf = buffer_init( 128 );
4442         buffer_fadd(sql_buf, "SELECT %s FROM %s AS \"%s\" ", col_list, table, core_class );
4443         free( col_list );
4444         free( table );
4445
4446         // Append the join clause, if any
4447         if( join_clause ) {
4448                 buffer_add(sql_buf, join_clause );
4449                 free( join_clause );
4450         }
4451
4452         char* order_by_list = NULL;
4453         char* having_buf = NULL;
4454
4455         if( !from_function ) {
4456
4457                 // Build a WHERE clause, if there is one
4458                 if( search_hash ) {
4459                         buffer_add( sql_buf, " WHERE " );
4460
4461                         // and it's on the WHERE clause
4462                         char* pred = searchWHERE( search_hash, &curr_query->core, AND_OP_JOIN, ctx );
4463                         if( ! pred ) {
4464                                 if( ctx ) {
4465                                         osrfAppSessionStatus(
4466                                                 ctx->session,
4467                                                 OSRF_STATUS_INTERNALSERVERERROR,
4468                                                 "osrfMethodException",
4469                                                 ctx->request,
4470                                                 "Severe query error in WHERE predicate -- see error log for more details"
4471                                         );
4472                                 }
4473                                 buffer_free( group_buf );
4474                                 buffer_free( sql_buf );
4475                                 if( defaultselhash )
4476                                         jsonObjectFree( defaultselhash );
4477                                 return NULL;
4478                         }
4479
4480                         buffer_add( sql_buf, pred );
4481                         free( pred );
4482                 }
4483
4484                 // Build a HAVING clause, if there is one
4485                 if( having_hash ) {
4486
4487                         // and it's on the the WHERE clause
4488                         having_buf = searchWHERE( having_hash, &curr_query->core, AND_OP_JOIN, ctx );
4489
4490                         if( ! having_buf ) {
4491                                 if( ctx ) {
4492                                                 osrfAppSessionStatus(
4493                                                 ctx->session,
4494                                                 OSRF_STATUS_INTERNALSERVERERROR,
4495                                                 "osrfMethodException",
4496                                                 ctx->request,
4497                                                 "Severe query error in HAVING predicate -- see error log for more details"
4498                                         );
4499                                 }
4500                                 buffer_free( group_buf );
4501                                 buffer_free( sql_buf );
4502                                 if( defaultselhash )
4503                                         jsonObjectFree( defaultselhash );
4504                                 return NULL;
4505                         }
4506                 }
4507
4508                 // Build an ORDER BY clause, if there is one
4509                 if( NULL == order_hash )
4510                         ;  // No ORDER BY? do nothing
4511                 else if( JSON_ARRAY == order_hash->type ) {
4512                         order_by_list = buildOrderByFromArray( ctx, order_hash );
4513                         if( !order_by_list ) {
4514                                 free( having_buf );
4515                                 buffer_free( group_buf );
4516                                 buffer_free( sql_buf );
4517                                 if( defaultselhash )
4518                                         jsonObjectFree( defaultselhash );
4519                                 return NULL;
4520                         }
4521                 } else if( JSON_HASH == order_hash->type ) {
4522                         // This hash is keyed on class alias.  Each class has either
4523                         // an array of field names or a hash keyed on field name.
4524                         growing_buffer* order_buf = NULL;  // to collect ORDER BY list
4525                         jsonIterator* class_itr = jsonNewIterator( order_hash );
4526                         while( (snode = jsonIteratorNext( class_itr )) ) {
4527
4528                                 ClassInfo* order_class_info = search_alias( class_itr->key );
4529                                 if( ! order_class_info ) {
4530                                         osrfLogError( OSRF_LOG_MARK,
4531                                                 "%s: Invalid class \"%s\" referenced in ORDER BY clause",
4532                                                 modulename, class_itr->key );
4533                                         if( ctx )
4534                                                 osrfAppSessionStatus(
4535                                                         ctx->session,
4536                                                         OSRF_STATUS_INTERNALSERVERERROR,
4537                                                         "osrfMethodException",
4538                                                         ctx->request,
4539                                                         "Invalid class referenced in ORDER BY clause -- "
4540                                                                 "see error log for more details"
4541                                                 );
4542                                         jsonIteratorFree( class_itr );
4543                                         buffer_free( order_buf );
4544                                         free( having_buf );
4545                                         buffer_free( group_buf );
4546                                         buffer_free( sql_buf );
4547                                         if( defaultselhash )
4548                                                 jsonObjectFree( defaultselhash );
4549                                         return NULL;
4550                                 }
4551
4552                                 osrfHash* field_list_def = order_class_info->fields;
4553
4554                                 if( snode->type == JSON_HASH ) {
4555
4556                                         // Hash is keyed on field names from the current class.  For each field
4557                                         // there is another layer of hash to define the sorting details, if any,
4558                                         // or a string to indicate direction of sorting.
4559                                         jsonIterator* order_itr = jsonNewIterator( snode );
4560                                         while( (onode = jsonIteratorNext( order_itr )) ) {
4561
4562                                                 osrfHash* field_def = osrfHashGet( field_list_def, order_itr->key );
4563                                                 if( !field_def ) {
4564                                                         osrfLogError( OSRF_LOG_MARK,
4565                                                                 "%s: Invalid field \"%s\" in ORDER BY clause",
4566                                                                 modulename, order_itr->key );
4567                                                         if( ctx )
4568                                                                 osrfAppSessionStatus(
4569                                                                         ctx->session,
4570                                                                         OSRF_STATUS_INTERNALSERVERERROR,
4571                                                                         "osrfMethodException",
4572                                                                         ctx->request,
4573                                                                         "Invalid field in ORDER BY clause -- "
4574                                                                         "see error log for more details"
4575                                                                 );
4576                                                         jsonIteratorFree( order_itr );
4577                                                         jsonIteratorFree( class_itr );
4578                                                         buffer_free( order_buf );
4579                                                         free( having_buf );
4580                                                         buffer_free( group_buf );
4581                                                         buffer_free( sql_buf );
4582                                                         if( defaultselhash )
4583                                                                 jsonObjectFree( defaultselhash );
4584                                                         return NULL;
4585                                                 } else if( str_is_true( osrfHashGet( field_def, "virtual" ) ) ) {
4586                                                         osrfLogError( OSRF_LOG_MARK,
4587                                                                 "%s: Virtual field \"%s\" in ORDER BY clause",
4588                                                                 modulename, order_itr->key );
4589                                                         if( ctx )
4590                                                                 osrfAppSessionStatus(
4591                                                                         ctx->session,
4592                                                                         OSRF_STATUS_INTERNALSERVERERROR,
4593                                                                         "osrfMethodException",
4594                                                                         ctx->request,
4595                                                                         "Virtual field in ORDER BY clause -- "
4596                                                                         "see error log for more details"
4597                                                         );
4598                                                         jsonIteratorFree( order_itr );
4599                                                         jsonIteratorFree( class_itr );
4600                                                         buffer_free( order_buf );
4601                                                         free( having_buf );
4602                                                         buffer_free( group_buf );
4603                                                         buffer_free( sql_buf );
4604                                                         if( defaultselhash )
4605                                                                 jsonObjectFree( defaultselhash );
4606                                                         return NULL;
4607                                                 }
4608
4609                                                 const char* direction = NULL;
4610                                                 if( onode->type == JSON_HASH ) {
4611                                                         if( jsonObjectGetKeyConst( onode, "transform" ) ) {
4612                                                                 string = searchFieldTransform(
4613                                                                         class_itr->key,
4614                                                                         osrfHashGet( field_list_def, order_itr->key ),
4615                                                                         onode
4616                                                                 );
4617                                                                 if( ! string ) {
4618                                                                         if( ctx ) osrfAppSessionStatus(
4619                                                                                 ctx->session,
4620                                                                                 OSRF_STATUS_INTERNALSERVERERROR,
4621                                                                                 "osrfMethodException",
4622                                                                                 ctx->request,
4623                                                                                 "Severe query error in ORDER BY clause -- "
4624                                                                                 "see error log for more details"
4625                                                                         );
4626                                                                         jsonIteratorFree( order_itr );
4627                                                                         jsonIteratorFree( class_itr );
4628                                                                         free( having_buf );
4629                                                                         buffer_free( group_buf );
4630                                                                         buffer_free( order_buf);
4631                                                                         buffer_free( sql_buf );
4632                                                                         if( defaultselhash )
4633                                                                                 jsonObjectFree( defaultselhash );
4634                                                                         return NULL;
4635                                                                 }
4636                                                         } else {
4637                                                                 growing_buffer* field_buf = buffer_init( 16 );
4638                                                                 buffer_fadd( field_buf, "\"%s\".%s",
4639                                                                         class_itr->key, order_itr->key );
4640                                                                 string = buffer_release( field_buf );
4641                                                         }
4642
4643                                                         if( (tmp_const = jsonObjectGetKeyConst( onode, "direction" )) ) {
4644                                                                 const char* dir = jsonObjectGetString( tmp_const );
4645                                                                 if(!strncasecmp( dir, "d", 1 )) {
4646                                                                         direction = " DESC";
4647                                                                 } else {
4648                                                                         direction = " ASC";
4649                                                                 }
4650                                                         }
4651
4652                                                 } else if( JSON_NULL == onode->type || JSON_ARRAY == onode->type ) {
4653                                                         osrfLogError( OSRF_LOG_MARK,
4654                                                                 "%s: Expected JSON_STRING in ORDER BY clause; found %s",
4655                                                                 modulename, json_type( onode->type ) );
4656                                                         if( ctx )
4657                                                                 osrfAppSessionStatus(
4658                                                                         ctx->session,
4659                                                                         OSRF_STATUS_INTERNALSERVERERROR,
4660                                                                         "osrfMethodException",
4661                                                                         ctx->request,
4662                                                                         "Malformed ORDER BY clause -- see error log for more details"
4663                                                                 );
4664                                                         jsonIteratorFree( order_itr );
4665                                                         jsonIteratorFree( class_itr );
4666                                                         free( having_buf );
4667                                                         buffer_free( group_buf );
4668                                                         buffer_free( order_buf );
4669                                                         buffer_free( sql_buf );
4670                                                         if( defaultselhash )
4671                                                                 jsonObjectFree( defaultselhash );
4672                                                         return NULL;
4673
4674                                                 } else {
4675                                                         string = strdup( order_itr->key );
4676                                                         const char* dir = jsonObjectGetString( onode );
4677                                                         if( !strncasecmp( dir, "d", 1 )) {
4678                                                                 direction = " DESC";
4679                                                         } else {
4680                                                                 direction = " ASC";
4681                                                         }
4682                                                 }
4683
4684                                                 if( order_buf )
4685                                                         OSRF_BUFFER_ADD( order_buf, ", " );
4686                                                 else
4687                                                         order_buf = buffer_init( 128 );
4688
4689                                                 OSRF_BUFFER_ADD( order_buf, string );
4690                                                 free( string );
4691
4692                                                 if( direction ) {
4693                                                          OSRF_BUFFER_ADD( order_buf, direction );
4694                                                 }
4695
4696                                         } // end while
4697                                         jsonIteratorFree( order_itr );
4698
4699                                 } else if( snode->type == JSON_ARRAY ) {
4700
4701                                         // Array is a list of fields from the current class
4702                                         unsigned long order_idx = 0;
4703                                         while(( onode = jsonObjectGetIndex( snode, order_idx++ ) )) {
4704
4705                                                 const char* _f = jsonObjectGetString( onode );
4706
4707                                                 osrfHash* field_def = osrfHashGet( field_list_def, _f );
4708                                                 if( !field_def ) {
4709                                                         osrfLogError( OSRF_LOG_MARK,
4710                                                                         "%s: Invalid field \"%s\" in ORDER BY clause",
4711                                                                         modulename, _f );
4712                                                         if( ctx )
4713                                                                 osrfAppSessionStatus(
4714                                                                         ctx->session,
4715                                                                         OSRF_STATUS_INTERNALSERVERERROR,
4716                                                                         "osrfMethodException",
4717                                                                         ctx->request,
4718                                                                         "Invalid field in ORDER BY clause -- "
4719                                                                         "see error log for more details"
4720                                                                 );
4721                                                         jsonIteratorFree( class_itr );
4722                                                         buffer_free( order_buf );
4723                                                         free( having_buf );
4724                                                         buffer_free( group_buf );
4725                                                         buffer_free( sql_buf );
4726                                                         if( defaultselhash )
4727                                                                 jsonObjectFree( defaultselhash );
4728                                                         return NULL;
4729                                                 } else if( str_is_true( osrfHashGet( field_def, "virtual" ) ) ) {
4730                                                         osrfLogError( OSRF_LOG_MARK,
4731                                                                 "%s: Virtual field \"%s\" in ORDER BY clause",
4732                                                                 modulename, _f );
4733                                                         if( ctx )
4734                                                                 osrfAppSessionStatus(
4735                                                                         ctx->session,
4736                                                                         OSRF_STATUS_INTERNALSERVERERROR,
4737                                                                         "osrfMethodException",
4738                                                                         ctx->request,
4739                                                                         "Virtual field in ORDER BY clause -- "
4740                                                                         "see error log for more details"
4741                                                                 );
4742                                                         jsonIteratorFree( class_itr );
4743                                                         buffer_free( order_buf );
4744                                                         free( having_buf );
4745                                                         buffer_free( group_buf );
4746                                                         buffer_free( sql_buf );
4747                                                         if( defaultselhash )
4748                                                                 jsonObjectFree( defaultselhash );
4749                                                         return NULL;
4750                                                 }
4751
4752                                                 if( order_buf )
4753                                                         OSRF_BUFFER_ADD( order_buf, ", " );
4754                                                 else
4755                                                         order_buf = buffer_init( 128 );
4756
4757                                                 buffer_fadd( order_buf, "\"%s\".%s", class_itr->key, _f );
4758
4759                                         } // end while
4760
4761                                 // IT'S THE OOOOOOOOOOOLD STYLE!
4762                                 } else {
4763                                         osrfLogError( OSRF_LOG_MARK,
4764                                                 "%s: Possible SQL injection attempt; direct order by is not allowed",
4765                                                 modulename );
4766                                         if(ctx) {
4767                                                 osrfAppSessionStatus(
4768                                                         ctx->session,
4769                                                         OSRF_STATUS_INTERNALSERVERERROR,
4770                                                         "osrfMethodException",
4771                                                         ctx->request,
4772                                                         "Severe query error -- see error log for more details"
4773                                                 );
4774                                         }
4775
4776                                         free( having_buf );
4777                                         buffer_free( group_buf );
4778                                         buffer_free( order_buf );
4779                                         buffer_free( sql_buf );
4780                                         if( defaultselhash )
4781                                                 jsonObjectFree( defaultselhash );
4782                                         jsonIteratorFree( class_itr );
4783                                         return NULL;
4784                                 }
4785                         } // end while
4786                         jsonIteratorFree( class_itr );
4787                         if( order_buf )
4788                                 order_by_list = buffer_release( order_buf );
4789                 } else {
4790                         osrfLogError( OSRF_LOG_MARK,
4791                                 "%s: Malformed ORDER BY clause; expected JSON_HASH or JSON_ARRAY, found %s",
4792                                 modulename, json_type( order_hash->type ) );
4793                         if( ctx )
4794                                 osrfAppSessionStatus(
4795                                         ctx->session,
4796                                         OSRF_STATUS_INTERNALSERVERERROR,
4797                                         "osrfMethodException",
4798                                         ctx->request,
4799                                         "Malformed ORDER BY clause -- see error log for more details"
4800                                 );
4801                         free( having_buf );
4802                         buffer_free( group_buf );
4803                         buffer_free( sql_buf );
4804                         if( defaultselhash )
4805                                 jsonObjectFree( defaultselhash );
4806                         return NULL;
4807                 }
4808         }
4809
4810         string = buffer_release( group_buf );
4811
4812         if( *string && ( aggregate_found || (flags & SELECT_DISTINCT) ) ) {
4813                 OSRF_BUFFER_ADD( sql_buf, " GROUP BY " );
4814                 OSRF_BUFFER_ADD( sql_buf, string );
4815         }
4816
4817         free( string );
4818
4819         if( having_buf && *having_buf ) {
4820                 OSRF_BUFFER_ADD( sql_buf, " HAVING " );
4821                 OSRF_BUFFER_ADD( sql_buf, having_buf );
4822                 free( having_buf );
4823         }
4824
4825         if( order_by_list ) {
4826
4827                 if( *order_by_list ) {
4828                         OSRF_BUFFER_ADD( sql_buf, " ORDER BY " );
4829                         OSRF_BUFFER_ADD( sql_buf, order_by_list );
4830                 }
4831
4832                 free( order_by_list );
4833         }
4834
4835         if( limit ){
4836                 const char* str = jsonObjectGetString( limit );
4837                 buffer_fadd( sql_buf, " LIMIT %d", atoi( str ));
4838         }
4839
4840         if( offset ) {
4841                 const char* str = jsonObjectGetString( offset );
4842                 buffer_fadd( sql_buf, " OFFSET %d", atoi( str ));
4843         }
4844
4845         if( !(flags & SUBSELECT) )
4846                 OSRF_BUFFER_ADD_CHAR( sql_buf, ';' );
4847
4848         if( defaultselhash )
4849                  jsonObjectFree( defaultselhash );
4850
4851         return buffer_release( sql_buf );
4852
4853 } // end of SELECT()
4854
4855 /**
4856         @brief Build a list of ORDER BY expressions.
4857         @param ctx Pointer to the method context.
4858         @param order_array Pointer to a JSON_ARRAY of field specifications.
4859         @return Pointer to a string containing a comma-separated list of ORDER BY expressions.
4860         Each expression may be either a column reference or a function call whose first parameter
4861         is a column reference.
4862
4863         Each entry in @a order_array must be a JSON_HASH with values for "class" and "field".
4864         It may optionally include entries for "direction" and/or "transform".
4865
4866         The calling code is responsible for freeing the returned string.
4867 */
4868 static char* buildOrderByFromArray( osrfMethodContext* ctx, const jsonObject* order_array ) {
4869         if( ! order_array ) {
4870                 osrfLogError( OSRF_LOG_MARK, "%s: Logic error: NULL pointer for ORDER BY clause",
4871                         modulename );
4872                 if( ctx )
4873                         osrfAppSessionStatus(
4874                                 ctx->session,
4875                                 OSRF_STATUS_INTERNALSERVERERROR,
4876                                 "osrfMethodException",
4877                                 ctx->request,
4878                                 "Logic error: ORDER BY clause expected, not found; "
4879                                         "see error log for more details"
4880                         );
4881                 return NULL;
4882         } else if( order_array->type != JSON_ARRAY ) {
4883                 osrfLogError( OSRF_LOG_MARK,
4884                         "%s: Logic error: Expected JSON_ARRAY for ORDER BY clause, not found", modulename );
4885                 if( ctx )
4886                         osrfAppSessionStatus(
4887                         ctx->session,
4888                         OSRF_STATUS_INTERNALSERVERERROR,
4889                         "osrfMethodException",
4890                         ctx->request,
4891                         "Logic error: Unexpected format for ORDER BY clause; see error log for more details" );
4892                 return NULL;
4893         }
4894
4895         growing_buffer* order_buf = buffer_init( 128 );
4896         int first = 1;        // boolean
4897         int order_idx = 0;
4898         jsonObject* order_spec;
4899         while( (order_spec = jsonObjectGetIndex( order_array, order_idx++ ))) {
4900
4901                 if( JSON_HASH != order_spec->type ) {
4902                         osrfLogError( OSRF_LOG_MARK,
4903                                 "%s: Malformed field specification in ORDER BY clause; "
4904                                 "expected JSON_HASH, found %s",
4905                                 modulename, json_type( order_spec->type ) );
4906                         if( ctx )
4907                                 osrfAppSessionStatus(
4908                                          ctx->session,
4909                                         OSRF_STATUS_INTERNALSERVERERROR,
4910                                         "osrfMethodException",
4911                                         ctx->request,
4912                                         "Malformed ORDER BY clause -- see error log for more details"
4913                                 );
4914                         buffer_free( order_buf );
4915                         return NULL;
4916                 }
4917
4918                 const char* class_alias =
4919                         jsonObjectGetString( jsonObjectGetKeyConst( order_spec, "class" ));
4920                 const char* field =
4921                         jsonObjectGetString( jsonObjectGetKeyConst( order_spec, "field" ));
4922
4923                 if( !field || !class_alias ) {
4924                         osrfLogError( OSRF_LOG_MARK,
4925                                 "%s: Missing class or field name in field specification of ORDER BY clause",
4926                                 modulename );
4927                         if( ctx )
4928                                 osrfAppSessionStatus(
4929                                         ctx->session,
4930                                         OSRF_STATUS_INTERNALSERVERERROR,
4931                                         "osrfMethodException",
4932                                         ctx->request,
4933                                         "Malformed ORDER BY clause -- see error log for more details"
4934                                 );
4935                         buffer_free( order_buf );
4936                         return NULL;
4937                 }
4938
4939                 const ClassInfo* order_class_info = search_alias( class_alias );
4940                 if( ! order_class_info ) {
4941                         osrfLogInternal( OSRF_LOG_MARK, "%s: ORDER BY clause references class \"%s\" "
4942                                 "not in FROM clause, skipping it", modulename, class_alias );
4943                         continue;
4944                 }
4945
4946                 // Add a separating comma, except at the beginning
4947                 if( first )
4948                         first = 0;
4949                 else
4950                         OSRF_BUFFER_ADD( order_buf, ", " );
4951
4952                 osrfHash* field_def = osrfHashGet( order_class_info->fields, field );
4953                 if( !field_def ) {
4954                         osrfLogError( OSRF_LOG_MARK,
4955                                 "%s: Invalid field \"%s\".%s referenced in ORDER BY clause",
4956                                 modulename, class_alias, field );
4957                         if( ctx )
4958                                 osrfAppSessionStatus(
4959                                         ctx->session,
4960                                         OSRF_STATUS_INTERNALSERVERERROR,
4961                                         "osrfMethodException",
4962                                         ctx->request,
4963                                         "Invalid field referenced in ORDER BY clause -- "
4964                                         "see error log for more details"
4965                                 );
4966                         free( order_buf );
4967                         return NULL;
4968                 } else if( str_is_true( osrfHashGet( field_def, "virtual" ) ) ) {
4969                         osrfLogError( OSRF_LOG_MARK, "%s: Virtual field \"%s\" in ORDER BY clause",
4970                                 modulename, field );
4971                         if( ctx )
4972                                 osrfAppSessionStatus(
4973                                         ctx->session,
4974                                         OSRF_STATUS_INTERNALSERVERERROR,
4975                                         "osrfMethodException",
4976                                         ctx->request,
4977                                         "Virtual field in ORDER BY clause -- see error log for more details"
4978                                 );
4979                         buffer_free( order_buf );
4980                         return NULL;
4981                 }
4982
4983                 if( jsonObjectGetKeyConst( order_spec, "transform" )) {
4984                         char* transform_str = searchFieldTransform( class_alias, field_def, order_spec );
4985                         if( ! transform_str ) {
4986                                 if( ctx )
4987                                         osrfAppSessionStatus(
4988                                                 ctx->session,
4989                                                 OSRF_STATUS_INTERNALSERVERERROR,
4990                                                 "osrfMethodException",
4991                                                 ctx->request,
4992                                                 "Severe query error in ORDER BY clause -- "
4993                                                 "see error log for more details"
4994                                         );
4995                                 buffer_free( order_buf );
4996                                 return NULL;
4997                         }
4998
4999                         OSRF_BUFFER_ADD( order_buf, transform_str );
5000                         free( transform_str );
5001                 }
5002                 else
5003                         buffer_fadd( order_buf, "\"%s\".%s", class_alias, field );
5004
5005                 const char* direction =
5006                         jsonObjectGetString( jsonObjectGetKeyConst( order_spec, "direction" ) );
5007                 if( direction ) {
5008                         if( direction[ 0 ] || 'D' == direction[ 0 ] )
5009                                 OSRF_BUFFER_ADD( order_buf, " DESC" );
5010                         else
5011                                 OSRF_BUFFER_ADD( order_buf, " ASC" );
5012                 }
5013         }
5014
5015         return buffer_release( order_buf );
5016 }
5017
5018 /**
5019         @brief Build a SELECT statement.
5020         @param search_hash Pointer to a JSON_HASH or JSON_ARRAY encoding the WHERE clause.
5021         @param rest_of_query Pointer to a JSON_HASH containing any other SQL clauses.
5022         @param meta Pointer to the class metadata for the core class.
5023         @param ctx Pointer to the method context.
5024         @return Pointer to a character string containing the WHERE clause; or NULL upon error.
5025
5026         Within the rest_of_query hash, the meaningful keys are "join", "select", "no_i18n",
5027         "order_by", "limit", and "offset".
5028
5029         The SELECT statements built here are distinct from those built for the json_query method.
5030 */
5031 static char* buildSELECT ( const jsonObject* search_hash, jsonObject* rest_of_query,
5032         osrfHash* meta, osrfMethodContext* ctx ) {
5033
5034         const char* locale = osrf_message_get_last_locale();
5035
5036         osrfHash* fields = osrfHashGet( meta, "fields" );
5037         const char* core_class = osrfHashGet( meta, "classname" );
5038
5039         const jsonObject* join_hash = jsonObjectGetKeyConst( rest_of_query, "join" );
5040
5041         jsonObject* selhash = NULL;
5042         jsonObject* defaultselhash = NULL;
5043
5044         growing_buffer* sql_buf = buffer_init( 128 );
5045         growing_buffer* select_buf = buffer_init( 128 );
5046
5047         if( !(selhash = jsonObjectGetKey( rest_of_query, "select" )) ) {
5048                 defaultselhash = jsonNewObjectType( JSON_HASH );
5049                 selhash = defaultselhash;
5050         }
5051
5052         // If there's no SELECT list for the core class, build one
5053         if( !jsonObjectGetKeyConst( selhash, core_class ) ) {
5054                 jsonObject* field_list = jsonNewObjectType( JSON_ARRAY );
5055
5056                 // Add every non-virtual field to the field list
5057                 osrfHash* field_def = NULL;
5058                 osrfHashIterator* field_itr = osrfNewHashIterator( fields );
5059                 while( ( field_def = osrfHashIteratorNext( field_itr ) ) ) {
5060                         if( ! str_is_true( osrfHashGet( field_def, "virtual" ) ) ) {
5061                                 const char* field = osrfHashIteratorKey( field_itr );
5062                                 jsonObjectPush( field_list, jsonNewObject( field ) );
5063                         }
5064                 }
5065                 osrfHashIteratorFree( field_itr );
5066                 jsonObjectSetKey( selhash, core_class, field_list );
5067         }
5068
5069         // Build a list of columns for the SELECT clause
5070         int first = 1;
5071         const jsonObject* snode = NULL;
5072         jsonIterator* class_itr = jsonNewIterator( selhash );
5073         while( (snode = jsonIteratorNext( class_itr )) ) {        // For each class
5074
5075                 // If the class isn't in the IDL, ignore it
5076                 const char* cname = class_itr->key;
5077                 osrfHash* idlClass = osrfHashGet( oilsIDL(), cname );
5078                 if( !idlClass )
5079                         continue;
5080
5081                 // If the class isn't the core class, and isn't in the JOIN clause, ignore it
5082                 if( strcmp( core_class, class_itr->key )) {
5083                         if( !join_hash )
5084                                 continue;
5085
5086                         jsonObject* found = jsonObjectFindPath( join_hash, "//%s", class_itr->key );
5087                         if( !found->size ) {
5088                                 jsonObjectFree( found );
5089                                 continue;
5090                         }
5091
5092                         jsonObjectFree( found );
5093                 }
5094
5095                 const jsonObject* node = NULL;
5096                 jsonIterator* select_itr = jsonNewIterator( snode );
5097                 while( (node = jsonIteratorNext( select_itr )) ) {
5098                         const char* item_str = jsonObjectGetString( node );
5099                         osrfHash* field = osrfHashGet( osrfHashGet( idlClass, "fields" ), item_str );
5100                         char* fname = osrfHashGet( field, "name" );
5101
5102                         if( !field )
5103                                 continue;
5104
5105                         if( first ) {
5106                                 first = 0;
5107                         } else {
5108                                 OSRF_BUFFER_ADD_CHAR( select_buf, ',' );
5109                         }
5110
5111                         if( locale ) {
5112                                 const char* i18n;
5113                                 const jsonObject* no_i18n_obj = jsonObjectGetKeyConst( rest_of_query, "no_i18n" );
5114                                 if( obj_is_true( no_i18n_obj ) )    // Suppress internationalization?
5115                                         i18n = NULL;
5116                                 else
5117                                         i18n = osrfHashGet( field, "i18n" );
5118
5119                                 if( str_is_true( i18n ) ) {
5120                                         char* pkey = osrfHashGet( idlClass, "primarykey" );
5121                                         char* tname = osrfHashGet( idlClass, "tablename" );
5122
5123                                         buffer_fadd( select_buf, " oils_i18n_xlate('%s', '%s', '%s', "
5124                                                         "'%s', \"%s\".%s::TEXT, '%s') AS \"%s\"",
5125                                                         tname, cname, fname, pkey, cname, pkey, locale, fname );
5126                                 } else {
5127                                         buffer_fadd( select_buf, " \"%s\".%s", cname, fname );
5128                                 }
5129                         } else {
5130                                 buffer_fadd( select_buf, " \"%s\".%s", cname, fname );
5131                         }
5132                 }
5133
5134                 jsonIteratorFree( select_itr );
5135         }
5136
5137         jsonIteratorFree( class_itr );
5138
5139         char* col_list = buffer_release( select_buf );
5140         char* table = oilsGetRelation( meta );
5141         if( !table )
5142                 table = strdup( "(null)" );
5143
5144         buffer_fadd( sql_buf, "SELECT %s FROM %s AS \"%s\"", col_list, table, core_class );
5145         free( col_list );
5146         free( table );
5147
5148         // Clear the query stack (as a fail-safe precaution against possible
5149         // leftover garbage); then push the first query frame onto the stack.
5150         clear_query_stack();
5151         push_query_frame();
5152         if( add_query_core( NULL, core_class ) ) {
5153                 if( ctx )
5154                         osrfAppSessionStatus(
5155                                 ctx->session,
5156                                 OSRF_STATUS_INTERNALSERVERERROR,
5157                                 "osrfMethodException",
5158                                 ctx->request,
5159                                 "Unable to build query frame for core class"
5160                         );
5161                 buffer_free( sql_buf );
5162                 if( defaultselhash )
5163                         jsonObjectFree( defaultselhash );
5164                 return NULL;
5165         }
5166
5167         // Add the JOIN clauses, if any
5168         if( join_hash ) {
5169                 char* join_clause = searchJOIN( join_hash, &curr_query->core );
5170                 OSRF_BUFFER_ADD_CHAR( sql_buf, ' ' );
5171                 OSRF_BUFFER_ADD( sql_buf, join_clause );
5172                 free( join_clause );
5173         }
5174
5175         osrfLogDebug( OSRF_LOG_MARK, "%s pre-predicate SQL =  %s",
5176                 modulename, OSRF_BUFFER_C_STR( sql_buf ));
5177
5178         OSRF_BUFFER_ADD( sql_buf, " WHERE " );
5179
5180         // Add the conditions in the WHERE clause
5181         char* pred = searchWHERE( search_hash, &curr_query->core, AND_OP_JOIN, ctx );
5182         if( !pred ) {
5183                 osrfAppSessionStatus(
5184                         ctx->session,
5185                         OSRF_STATUS_INTERNALSERVERERROR,
5186                                 "osrfMethodException",
5187                                 ctx->request,
5188                                 "Severe query error -- see error log for more details"
5189                         );
5190                 buffer_free( sql_buf );
5191                 if( defaultselhash )
5192                         jsonObjectFree( defaultselhash );
5193                 clear_query_stack();
5194                 return NULL;
5195         } else {
5196                 buffer_add( sql_buf, pred );
5197                 free( pred );
5198         }
5199
5200         // Add the ORDER BY, LIMIT, and/or OFFSET clauses, if present
5201         if( rest_of_query ) {
5202                 const jsonObject* order_by = NULL;
5203                 if( ( order_by = jsonObjectGetKeyConst( rest_of_query, "order_by" )) ){
5204
5205                         char* order_by_list = NULL;
5206
5207                         if( JSON_ARRAY == order_by->type ) {
5208                                 order_by_list = buildOrderByFromArray( ctx, order_by );
5209                                 if( !order_by_list ) {
5210                                         buffer_free( sql_buf );
5211                                         if( defaultselhash )
5212                                                 jsonObjectFree( defaultselhash );
5213                                         clear_query_stack();
5214                                         return NULL;
5215                                 }
5216                         } else if( JSON_HASH == order_by->type ) {
5217                                 // We expect order_by to be a JSON_HASH keyed on class names.  Traverse it
5218                                 // and build a list of ORDER BY expressions.
5219                                 growing_buffer* order_buf = buffer_init( 128 );
5220                                 first = 1;
5221                                 jsonIterator* class_itr = jsonNewIterator( order_by );
5222                                 while( (snode = jsonIteratorNext( class_itr )) ) {  // For each class:
5223
5224                                         ClassInfo* order_class_info = search_alias( class_itr->key );
5225                                         if( ! order_class_info )
5226                                                 continue;    // class not referenced by FROM clause?  Ignore it.
5227
5228                                         if( JSON_HASH == snode->type ) {
5229
5230                                                 // If the data for the current class is a JSON_HASH, then it is
5231                                                 // keyed on field name.
5232
5233                                                 const jsonObject* onode = NULL;
5234                                                 jsonIterator* order_itr = jsonNewIterator( snode );
5235                                                 while( (onode = jsonIteratorNext( order_itr )) ) {  // For each field
5236
5237                                                         osrfHash* field_def = osrfHashGet(
5238                                                                 order_class_info->fields, order_itr->key );
5239                                                         if( !field_def )
5240                                                                 continue;    // Field not defined in IDL?  Ignore it.
5241                                                         if( str_is_true( osrfHashGet( field_def, "virtual")))
5242                                                                 continue;    // Field is virtual?  Ignore it.
5243
5244                                                         char* field_str = NULL;
5245                                                         char* direction = NULL;
5246                                                         if( onode->type == JSON_HASH ) {
5247                                                                 if( jsonObjectGetKeyConst( onode, "transform" ) ) {
5248                                                                         field_str = searchFieldTransform(
5249                                                                                 class_itr->key, field_def, onode );
5250                                                                         if( ! field_str ) {
5251                                                                                 osrfAppSessionStatus(
5252                                                                                         ctx->session,
5253                                                                                         OSRF_STATUS_INTERNALSERVERERROR,
5254                                                                                         "osrfMethodException",
5255                                                                                         ctx->request,
5256                                                                                         "Severe query error in ORDER BY clause -- "
5257                                                                                         "see error log for more details"
5258                                                                                 );
5259                                                                                 jsonIteratorFree( order_itr );
5260                                                                                 jsonIteratorFree( class_itr );
5261                                                                                 buffer_free( order_buf );
5262                                                                                 buffer_free( sql_buf );
5263                                                                                 if( defaultselhash )
5264                                                                                         jsonObjectFree( defaultselhash );
5265                                                                                 clear_query_stack();
5266                                                                                 return NULL;
5267                                                                         }
5268                                                                 } else {
5269                                                                         growing_buffer* field_buf = buffer_init( 16 );
5270                                                                         buffer_fadd( field_buf, "\"%s\".%s",
5271                                                                                 class_itr->key, order_itr->key );
5272                                                                         field_str = buffer_release( field_buf );
5273                                                                 }
5274
5275                                                                 if( ( order_by = jsonObjectGetKeyConst( onode, "direction" )) ) {
5276                                                                         const char* dir = jsonObjectGetString( order_by );
5277                                                                         if(!strncasecmp( dir, "d", 1 )) {
5278                                                                                 direction = " DESC";
5279                                                                         }
5280                                                                 }
5281                                                         } else {
5282                                                                 field_str = strdup( order_itr->key );
5283                                                                 const char* dir = jsonObjectGetString( onode );
5284                                                                 if( !strncasecmp( dir, "d", 1 )) {
5285                                                                         direction = " DESC";
5286                                                                 } else {
5287                                                                         direction = " ASC";
5288                                                                 }
5289                                                         }
5290
5291                                                         if( first ) {
5292                                                                 first = 0;
5293                                                         } else {
5294                                                                 buffer_add( order_buf, ", " );
5295                                                         }
5296
5297                                                         buffer_add( order_buf, field_str );
5298                                                         free( field_str );
5299
5300                                                         if( direction ) {
5301                                                                 buffer_add( order_buf, direction );
5302                                                         }
5303                                                 } // end while; looping over ORDER BY expressions
5304
5305                                                 jsonIteratorFree( order_itr );
5306
5307                                         } else if( JSON_STRING == snode->type ) {
5308                                                 // We expect a comma-separated list of sort fields.
5309                                                 const char* str = jsonObjectGetString( snode );
5310                                                 if( strchr( str, ';' )) {
5311                                                         // No semicolons allowed.  It is theoretically possible for a
5312                                                         // legitimate semicolon to occur within quotes, but it's not likely
5313                                                         // to occur in practice in the context of an ORDER BY list.
5314                                                         osrfLogError( OSRF_LOG_MARK, "%s: Possible attempt at SOL injection -- "
5315                                                                 "semicolon found in ORDER BY list: \"%s\"", modulename, str );
5316                                                         if( ctx ) {
5317                                                                 osrfAppSessionStatus(
5318                                                                         ctx->session,
5319                                                                         OSRF_STATUS_INTERNALSERVERERROR,
5320                                                                         "osrfMethodException",
5321                                                                         ctx->request,
5322                                                                         "Possible attempt at SOL injection -- "
5323                                                                                 "semicolon found in ORDER BY list"
5324                                                                 );
5325                                                         }
5326                                                         jsonIteratorFree( class_itr );
5327                                                         buffer_free( order_buf );
5328                                                         buffer_free( sql_buf );
5329                                                         if( defaultselhash )
5330                                                                 jsonObjectFree( defaultselhash );
5331                                                         clear_query_stack();
5332                                                         return NULL;
5333                                                 }
5334                                                 buffer_add( order_buf, str );
5335                                                 break;
5336                                         }
5337
5338                                 } // end while; looping over order_by classes
5339
5340                                 jsonIteratorFree( class_itr );
5341                                 order_by_list = buffer_release( order_buf );
5342
5343                         } else {
5344                                 osrfLogWarning( OSRF_LOG_MARK,
5345                                         "\"order_by\" object in a query is not a JSON_HASH or JSON_ARRAY;"
5346                                         "no ORDER BY generated" );
5347                         }
5348
5349                         if( order_by_list && *order_by_list ) {
5350                                 OSRF_BUFFER_ADD( sql_buf, " ORDER BY " );
5351                                 OSRF_BUFFER_ADD( sql_buf, order_by_list );
5352                         }
5353
5354                         free( order_by_list );
5355                 }
5356
5357                 const jsonObject* limit = jsonObjectGetKeyConst( rest_of_query, "limit" );
5358                 if( limit ) {
5359                         const char* str = jsonObjectGetString( limit );
5360                         buffer_fadd(
5361                                 sql_buf,
5362                                 " LIMIT %d",
5363                                 atoi(str)
5364                         );
5365                 }
5366
5367                 const jsonObject* offset = jsonObjectGetKeyConst( rest_of_query, "offset" );
5368                 if( offset ) {
5369                         const char* str = jsonObjectGetString( offset );
5370                         buffer_fadd(
5371                                 sql_buf,
5372                                 " OFFSET %d",
5373                                 atoi( str )
5374                         );
5375                 }
5376         }
5377
5378         if( defaultselhash )
5379                 jsonObjectFree( defaultselhash );
5380         clear_query_stack();
5381
5382         OSRF_BUFFER_ADD_CHAR( sql_buf, ';' );
5383         return buffer_release( sql_buf );
5384 }
5385
5386 int doJSONSearch ( osrfMethodContext* ctx ) {
5387         if(osrfMethodVerifyContext( ctx )) {
5388                 osrfLogError( OSRF_LOG_MARK,  "Invalid method context" );
5389                 return -1;
5390         }
5391
5392         osrfLogDebug( OSRF_LOG_MARK, "Received query request" );
5393
5394         int err = 0;
5395
5396         jsonObject* hash = jsonObjectGetIndex( ctx->params, 0 );
5397
5398         int flags = 0;
5399
5400         if( obj_is_true( jsonObjectGetKeyConst( hash, "distinct" )))
5401                 flags |= SELECT_DISTINCT;
5402
5403         if( obj_is_true( jsonObjectGetKeyConst( hash, "no_i18n" )))
5404                 flags |= DISABLE_I18N;
5405
5406         osrfLogDebug( OSRF_LOG_MARK, "Building SQL ..." );
5407         clear_query_stack();       // a possibly needless precaution
5408         char* sql = buildQuery( ctx, hash, flags );
5409         clear_query_stack();
5410
5411         if( !sql ) {
5412                 err = -1;
5413                 return err;
5414         }
5415
5416         osrfLogDebug( OSRF_LOG_MARK, "%s SQL =  %s", modulename, sql );
5417
5418         // XXX for now...
5419         dbhandle = writehandle;
5420
5421         dbi_result result = dbi_conn_query( dbhandle, sql );
5422
5423         if( result ) {
5424                 osrfLogDebug( OSRF_LOG_MARK, "Query returned with no errors" );
5425
5426                 if( dbi_result_first_row( result )) {
5427                         /* JSONify the result */
5428                         osrfLogDebug( OSRF_LOG_MARK, "Query returned at least one row" );
5429
5430                         do {
5431                                 jsonObject* return_val = oilsMakeJSONFromResult( result );
5432                                 osrfAppRespond( ctx, return_val );
5433                                 jsonObjectFree( return_val );
5434                         } while( dbi_result_next_row( result ));
5435
5436                 } else {
5437                         osrfLogDebug( OSRF_LOG_MARK, "%s returned no results for query %s", modulename, sql );
5438                 }
5439
5440                 osrfAppRespondComplete( ctx, NULL );
5441
5442                 /* clean up the query */
5443                 dbi_result_free( result );
5444
5445         } else {
5446                 err = -1;
5447                 const char* msg;
5448                 int errnum = dbi_conn_error( dbhandle, &msg );
5449                 osrfLogError( OSRF_LOG_MARK, "%s: Error with query [%s]: %d %s",
5450                         modulename, sql, errnum, msg ? msg : "(No description available)" );
5451                 osrfAppSessionStatus(
5452                         ctx->session,
5453                         OSRF_STATUS_INTERNALSERVERERROR,
5454                         "osrfMethodException",
5455                         ctx->request,
5456                         "Severe query error -- see error log for more details"
5457                 );
5458                 if( !oilsIsDBConnected( dbhandle ))
5459                         osrfAppSessionPanic( ctx->session );
5460         }
5461
5462         free( sql );
5463         return err;
5464 }
5465
5466 // The last parameter, err, is used to report an error condition by updating an int owned by
5467 // the calling code.
5468
5469 // In case of an error, we set *err to -1.  If there is no error, *err is left unchanged.
5470 // It is the responsibility of the calling code to initialize *err before the
5471 // call, so that it will be able to make sense of the result.
5472
5473 // Note also that we return NULL if and only if we set *err to -1.  So the err parameter is
5474 // redundant anyway.
5475 static jsonObject* doFieldmapperSearch( osrfMethodContext* ctx, osrfHash* class_meta,
5476                 jsonObject* where_hash, jsonObject* query_hash, int* err ) {
5477
5478         // XXX for now...
5479         dbhandle = writehandle;
5480
5481         char* core_class = osrfHashGet( class_meta, "classname" );
5482         char* pkey = osrfHashGet( class_meta, "primarykey" );
5483
5484         const jsonObject* _tmp;
5485
5486         char* sql = buildSELECT( where_hash, query_hash, class_meta, ctx );
5487         if( !sql ) {
5488                 osrfLogDebug( OSRF_LOG_MARK, "Problem building query, returning NULL" );
5489                 *err = -1;
5490                 return NULL;
5491         }
5492
5493         osrfLogDebug( OSRF_LOG_MARK, "%s SQL =  %s", modulename, sql );
5494
5495         dbi_result result = dbi_conn_query( dbhandle, sql );
5496         if( NULL == result ) {
5497                 const char* msg;
5498                 int errnum = dbi_conn_error( dbhandle, &msg );
5499                 osrfLogError(OSRF_LOG_MARK, "%s: Error retrieving %s with query [%s]: %d %s",
5500                         modulename, osrfHashGet( class_meta, "fieldmapper" ), sql, errnum,
5501                         msg ? msg : "(No description available)" );
5502                 if( !oilsIsDBConnected( dbhandle ))
5503                         osrfAppSessionPanic( ctx->session );
5504                 osrfAppSessionStatus(
5505                         ctx->session,
5506                         OSRF_STATUS_INTERNALSERVERERROR,
5507                         "osrfMethodException",
5508                         ctx->request,
5509                         "Severe query error -- see error log for more details"
5510                 );
5511                 *err = -1;
5512                 free( sql );
5513                 return NULL;
5514
5515         } else {
5516                 osrfLogDebug( OSRF_LOG_MARK, "Query returned with no errors" );
5517         }
5518
5519         jsonObject* res_list = jsonNewObjectType( JSON_ARRAY );
5520         jsonObject* row_obj = NULL;
5521
5522         if( dbi_result_first_row( result )) {
5523
5524                 // Convert each row to a JSON_ARRAY of column values, and enclose those objects
5525                 // in a JSON_ARRAY of rows.  If two or more rows have the same key value, then
5526                 // eliminate the duplicates.
5527                 osrfLogDebug( OSRF_LOG_MARK, "Query returned at least one row" );
5528                 osrfHash* dedup = osrfNewHash();
5529                 do {
5530                         row_obj = oilsMakeFieldmapperFromResult( result, class_meta );
5531                         char* pkey_val = oilsFMGetString( row_obj, pkey );
5532                         if( osrfHashGet( dedup, pkey_val ) ) {
5533                                 jsonObjectFree( row_obj );
5534                                 free( pkey_val );
5535                         } else {
5536                                 osrfHashSet( dedup, pkey_val, pkey_val );
5537                                 jsonObjectPush( res_list, row_obj );
5538                         }
5539                 } while( dbi_result_next_row( result ));
5540                 osrfHashFree( dedup );
5541
5542         } else {
5543                 osrfLogDebug( OSRF_LOG_MARK, "%s returned no results for query %s",
5544                         modulename, sql );
5545         }
5546
5547         /* clean up the query */
5548         dbi_result_free( result );
5549         free( sql );
5550
5551         // If we're asked to flesh, and there's anything to flesh, then flesh it
5552         // (but not for PCRUD, lest the user to bypass permissions by fleshing
5553         // something that he has no permission to look at).
5554         if( res_list->size && query_hash && ! enforce_pcrud ) {
5555                 _tmp = jsonObjectGetKeyConst( query_hash, "flesh" );
5556                 if( _tmp ) {
5557                         // Get the flesh depth
5558                         int flesh_depth = (int) jsonObjectGetNumber( _tmp );
5559                         if( flesh_depth == -1 || flesh_depth > max_flesh_depth )
5560                                 flesh_depth = max_flesh_depth;
5561
5562                         // We need a non-zero flesh depth, and a list of fields to flesh
5563                         const jsonObject* temp_blob = jsonObjectGetKeyConst( query_hash, "flesh_fields" );
5564                         if( temp_blob && flesh_depth > 0 ) {
5565
5566                                 jsonObject* flesh_blob = jsonObjectClone( temp_blob );
5567                                 const jsonObject* flesh_fields = jsonObjectGetKeyConst( flesh_blob, core_class );
5568
5569                                 osrfStringArray* link_fields = NULL;
5570                                 osrfHash* links = osrfHashGet( class_meta, "links" );
5571
5572                                 // Make an osrfStringArray of the names of fields to be fleshed
5573                                 if( flesh_fields ) {
5574                                         if( flesh_fields->size == 1 ) {
5575                                                 const char* _t = jsonObjectGetString(
5576                                                         jsonObjectGetIndex( flesh_fields, 0 ) );
5577                                                 if( !strcmp( _t, "*" ))
5578                                                         link_fields = osrfHashKeys( links );
5579                                         }
5580
5581                                         if( !link_fields ) {
5582                                                 jsonObject* _f;
5583                                                 link_fields = osrfNewStringArray( 1 );
5584                                                 jsonIterator* _i = jsonNewIterator( flesh_fields );
5585                                                 while ((_f = jsonIteratorNext( _i ))) {
5586                                                         osrfStringArrayAdd( link_fields, jsonObjectGetString( _f ) );
5587                                                 }
5588                                                 jsonIteratorFree( _i );
5589                                         }
5590                                 }
5591
5592                                 osrfHash* fields = osrfHashGet( class_meta, "fields" );
5593
5594                                 // Iterate over the JSON_ARRAY of rows
5595                                 jsonObject* cur;
5596                                 unsigned long res_idx = 0;
5597                                 while((cur = jsonObjectGetIndex( res_list, res_idx++ ) )) {
5598
5599                                         int i = 0;
5600                                         const char* link_field;
5601
5602                                         // Iterate over the list of fleshable fields
5603                                         while( (link_field = osrfStringArrayGetString(link_fields, i++)) ) {
5604
5605                                                 osrfLogDebug( OSRF_LOG_MARK, "Starting to flesh %s", link_field );
5606
5607                                                 osrfHash* kid_link = osrfHashGet( links, link_field );
5608                                                 if( !kid_link )
5609                                                         continue;     // Not a link field; skip it
5610
5611                                                 osrfHash* field = osrfHashGet( fields, link_field );
5612                                                 if( !field )
5613                                                         continue;     // Not a field at all; skip it (IDL is ill-formed)
5614
5615                                                 osrfHash* kid_idl = osrfHashGet( oilsIDL(),
5616                                                         osrfHashGet( kid_link, "class" ));
5617                                                 if( !kid_idl )
5618                                                         continue;   // The class it links to doesn't exist; skip it
5619
5620                                                 const char* reltype = osrfHashGet( kid_link, "reltype" );
5621                                                 if( !reltype )
5622                                                         continue;   // No reltype; skip it (IDL is ill-formed)
5623
5624                                                 osrfHash* value_field = field;
5625
5626                                                 if(    !strcmp( reltype, "has_many" )
5627                                                         || !strcmp( reltype, "might_have" ) ) { // has_many or might_have
5628                                                         value_field = osrfHashGet(
5629                                                                 fields, osrfHashGet( class_meta, "primarykey" ) );
5630                                                 }
5631
5632                                                 osrfStringArray* link_map = osrfHashGet( kid_link, "map" );
5633
5634                                                 if( link_map->size > 0 ) {
5635                                                         jsonObject* _kid_key = jsonNewObjectType( JSON_ARRAY );
5636                                                         jsonObjectPush(
5637                                                                 _kid_key,
5638                                                                 jsonNewObject( osrfStringArrayGetString( link_map, 0 ) )
5639                                                         );
5640
5641                                                         jsonObjectSetKey(
5642                                                                 flesh_blob,
5643                                                                 osrfHashGet( kid_link, "class" ),
5644                                                                 _kid_key
5645                                                         );
5646                                                 };
5647
5648                                                 osrfLogDebug(
5649                                                         OSRF_LOG_MARK,
5650                                                         "Link field: %s, remote class: %s, fkey: %s, reltype: %s",
5651                                                         osrfHashGet( kid_link, "field" ),
5652                                                         osrfHashGet( kid_link, "class" ),
5653                                                         osrfHashGet( kid_link, "key" ),
5654                                                         osrfHashGet( kid_link, "reltype" )
5655                                                 );
5656
5657                                                 const char* search_key = jsonObjectGetString(
5658                                                         jsonObjectGetIndex( cur,
5659                                                                 atoi( osrfHashGet( value_field, "array_position" ) )
5660                                                         )
5661                                                 );
5662
5663                                                 if( !search_key ) {
5664                                                         osrfLogDebug( OSRF_LOG_MARK, "Nothing to search for!" );
5665                                                         continue;
5666                                                 }
5667
5668                                                 osrfLogDebug( OSRF_LOG_MARK, "Creating param objects..." );
5669
5670                                                 // construct WHERE clause
5671                                                 jsonObject* where_clause  = jsonNewObjectType( JSON_HASH );
5672                                                 jsonObjectSetKey(
5673                                                         where_clause,
5674                                                         osrfHashGet( kid_link, "key" ),
5675                                                         jsonNewObject( search_key )
5676                                                 );
5677
5678                                                 // construct the rest of the query, mostly
5679                                                 // by copying pieces of the previous level of query
5680                                                 jsonObject* rest_of_query = jsonNewObjectType( JSON_HASH );
5681                                                 jsonObjectSetKey( rest_of_query, "flesh",
5682                                                         jsonNewNumberObject( flesh_depth - 1 + link_map->size )
5683                                                 );
5684
5685                                                 if( flesh_blob )
5686                                                         jsonObjectSetKey( rest_of_query, "flesh_fields",
5687                                                                 jsonObjectClone( flesh_blob ));
5688
5689                                                 if( jsonObjectGetKeyConst( query_hash, "order_by" )) {
5690                                                         jsonObjectSetKey( rest_of_query, "order_by",
5691                                                                 jsonObjectClone( jsonObjectGetKeyConst( query_hash, "order_by" ))
5692                                                         );
5693                                                 }
5694
5695                                                 if( jsonObjectGetKeyConst( query_hash, "select" )) {
5696                                                         jsonObjectSetKey( rest_of_query, "select",
5697                                                                 jsonObjectClone( jsonObjectGetKeyConst( query_hash, "select" ))
5698                                                         );
5699                                                 }
5700
5701                                                 // do the query, recursively, to expand the fleshable field
5702                                                 jsonObject* kids = doFieldmapperSearch( ctx, kid_idl,
5703                                                         where_clause, rest_of_query, err );
5704
5705                                                 jsonObjectFree( where_clause );
5706                                                 jsonObjectFree( rest_of_query );
5707
5708                                                 if( *err ) {
5709                                                         osrfStringArrayFree( link_fields );
5710                                                         jsonObjectFree( res_list );
5711                                                         jsonObjectFree( flesh_blob );
5712                                                         return NULL;
5713                                                 }
5714
5715                                                 osrfLogDebug( OSRF_LOG_MARK, "Search for %s return %d linked objects",
5716                                                         osrfHashGet( kid_link, "class" ), kids->size );
5717
5718                                                 // Traverse the result set
5719                                                 jsonObject* X = NULL;
5720                                                 if( link_map->size > 0 && kids->size > 0 ) {
5721                                                         X = kids;
5722                                                         kids = jsonNewObjectType( JSON_ARRAY );
5723
5724                                                         jsonObject* _k_node;
5725                                                         unsigned long res_idx = 0;
5726                                                         while((_k_node = jsonObjectGetIndex( X, res_idx++ ) )) {
5727                                                                 jsonObjectPush(
5728                                                                         kids,
5729                                                                         jsonObjectClone(
5730                                                                                 jsonObjectGetIndex(
5731                                                                                         _k_node,
5732                                                                                         (unsigned long) atoi(
5733                                                                                                 osrfHashGet(
5734                                                                                                         osrfHashGet(
5735                                                                                                                 osrfHashGet(
5736                                                                                                                         osrfHashGet(
5737                                                                                                                                 oilsIDL(),
5738                                                                                                                                 osrfHashGet( kid_link, "class" )
5739                                                                                                                         ),
5740                                                                                                                         "fields"
5741                                                                                                                 ),
5742                                                                                                                 osrfStringArrayGetString( link_map, 0 )
5743                                                                                                         ),
5744                                                                                                         "array_position"
5745                                                                                                 )
5746                                                                                         )
5747                                                                                 )
5748                                                                         )
5749                                                                 );
5750                                                         } // end while loop traversing X
5751                                                 }
5752
5753                                                 if(    !strcmp( osrfHashGet( kid_link, "reltype" ), "has_a" )
5754                                                         || !strcmp( osrfHashGet( kid_link, "reltype" ), "might_have" )) {
5755                                                         osrfLogDebug(OSRF_LOG_MARK, "Storing fleshed objects in %s",
5756                                                                 osrfHashGet( kid_link, "field" ));
5757                                                         jsonObjectSetIndex(
5758                                                                 cur,
5759                                                                 (unsigned long) atoi( osrfHashGet( field, "array_position" ) ),
5760                                                                 jsonObjectClone( jsonObjectGetIndex( kids, 0 ))
5761                                                         );
5762                                                 }
5763
5764                                                 if( !strcmp( osrfHashGet( kid_link, "reltype" ), "has_many" )) {
5765                                                         // has_many
5766                                                         osrfLogDebug( OSRF_LOG_MARK, "Storing fleshed objects in %s",
5767                                                                 osrfHashGet( kid_link, "field" ) );
5768                                                         jsonObjectSetIndex(
5769                                                                 cur,
5770                                                                 (unsigned long) atoi( osrfHashGet( field, "array_position" ) ),
5771                                                                 jsonObjectClone( kids )
5772                                                         );
5773                                                 }
5774
5775                                                 if( X ) {
5776                                                         jsonObjectFree( kids );
5777                                                         kids = X;
5778                                                 }
5779
5780                                                 jsonObjectFree( kids );
5781
5782                                                 osrfLogDebug( OSRF_LOG_MARK, "Fleshing of %s complete",
5783                                                         osrfHashGet( kid_link, "field" ) );
5784                                                 osrfLogDebug( OSRF_LOG_MARK, "%s", jsonObjectToJSON( cur ));
5785
5786                                         } // end while loop traversing list of fleshable fields
5787                                 } // end while loop traversing res_list
5788                                 jsonObjectFree( flesh_blob );
5789                                 osrfStringArrayFree( link_fields );
5790                         }
5791                 }
5792         }
5793
5794         return res_list;
5795 }
5796
5797
5798 int doUpdate( osrfMethodContext* ctx ) {
5799         if( osrfMethodVerifyContext( ctx )) {
5800                 osrfLogError( OSRF_LOG_MARK, "Invalid method context" );
5801                 return -1;
5802         }
5803
5804         if( enforce_pcrud )
5805                 timeout_needs_resetting = 1;
5806
5807         osrfHash* meta = osrfHashGet( (osrfHash*) ctx->method->userData, "class" );
5808
5809         jsonObject* target = NULL;
5810         if( enforce_pcrud )
5811                 target = jsonObjectGetIndex( ctx->params, 1 );
5812         else
5813                 target = jsonObjectGetIndex( ctx->params, 0 );
5814
5815         if(!verifyObjectClass( ctx, target )) {
5816                 osrfAppRespondComplete( ctx, NULL );
5817                 return -1;
5818         }
5819
5820         if( getXactId( ctx ) == NULL ) {
5821                 osrfAppSessionStatus(
5822                         ctx->session,
5823                         OSRF_STATUS_BADREQUEST,
5824                         "osrfMethodException",
5825                         ctx->request,
5826                         "No active transaction -- required for UPDATE"
5827                 );
5828                 osrfAppRespondComplete( ctx, NULL );
5829                 return -1;
5830         }
5831
5832         // The following test is harmless but redundant.  If a class is
5833         // readonly, we don't register an update method for it.
5834         if( str_is_true( osrfHashGet( meta, "readonly" ) ) ) {
5835                 osrfAppSessionStatus(
5836                         ctx->session,
5837                         OSRF_STATUS_BADREQUEST,
5838                         "osrfMethodException",
5839                         ctx->request,
5840                         "Cannot UPDATE readonly class"
5841                 );
5842                 osrfAppRespondComplete( ctx, NULL );
5843                 return -1;
5844         }
5845
5846         const char* trans_id = getXactId( ctx );
5847
5848         // Set the last_xact_id
5849         int index = oilsIDL_ntop( target->classname, "last_xact_id" );
5850         if( index > -1 ) {
5851                 osrfLogDebug( OSRF_LOG_MARK, "Setting last_xact_id to %s on %s at position %d",
5852                                 trans_id, target->classname, index );
5853                 jsonObjectSetIndex( target, index, jsonNewObject( trans_id ));
5854         }
5855
5856         char* pkey = osrfHashGet( meta, "primarykey" );
5857         osrfHash* fields = osrfHashGet( meta, "fields" );
5858
5859         char* id = oilsFMGetString( target, pkey );
5860
5861         osrfLogDebug(
5862                 OSRF_LOG_MARK,
5863                 "%s updating %s object with %s = %s",
5864                 modulename,
5865                 osrfHashGet( meta, "fieldmapper" ),
5866                 pkey,
5867                 id
5868         );
5869
5870         dbhandle = writehandle;
5871         growing_buffer* sql = buffer_init( 128 );
5872         buffer_fadd( sql,"UPDATE %s SET", osrfHashGet( meta, "tablename" ));
5873
5874         int first = 1;
5875         osrfHash* field_def = NULL;
5876         osrfHashIterator* field_itr = osrfNewHashIterator( fields );
5877         while( ( field_def = osrfHashIteratorNext( field_itr ) ) ) {
5878
5879                 // Skip virtual fields, and the primary key
5880                 if( str_is_true( osrfHashGet( field_def, "virtual") ) )
5881                         continue;
5882
5883                 const char* field_name = osrfHashIteratorKey( field_itr );
5884                 if( ! strcmp( field_name, pkey ) )
5885                         continue;
5886
5887                 const jsonObject* field_object = oilsFMGetObject( target, field_name );
5888
5889                 int value_is_numeric = 0;    // boolean
5890                 char* value;
5891                 if( field_object && field_object->classname ) {
5892                         value = oilsFMGetString(
5893                                 field_object,
5894                                 (char*) oilsIDLFindPath( "/%s/primarykey", field_object->classname )
5895                         );
5896                 } else if( field_object && JSON_BOOL == field_object->type ) {
5897                         if( jsonBoolIsTrue( field_object ) )
5898                                 value = strdup( "t" );
5899                         else
5900                                 value = strdup( "f" );
5901                 } else {
5902                         value = jsonObjectToSimpleString( field_object );
5903                         if( field_object && JSON_NUMBER == field_object->type )
5904                                 value_is_numeric = 1;
5905                 }
5906
5907                 osrfLogDebug( OSRF_LOG_MARK, "Updating %s object with %s = %s",
5908                                 osrfHashGet( meta, "fieldmapper" ), field_name, value);
5909
5910                 if( !field_object || field_object->type == JSON_NULL ) {
5911                         if( !( !( strcmp( osrfHashGet( meta, "classname" ), "au" ) )
5912                                         && !( strcmp( field_name, "passwd" ) )) ) { // arg at the special case!
5913                                 if( first )
5914                                         first = 0;
5915                                 else
5916                                         OSRF_BUFFER_ADD_CHAR( sql, ',' );
5917                                 buffer_fadd( sql, " %s = NULL", field_name );
5918                         }
5919
5920                 } else if( value_is_numeric || !strcmp( get_primitive( field_def ), "number") ) {
5921                         if( first )
5922                                 first = 0;
5923                         else
5924                                 OSRF_BUFFER_ADD_CHAR( sql, ',' );
5925
5926                         const char* numtype = get_datatype( field_def );
5927                         if( !strncmp( numtype, "INT", 3 ) ) {
5928                                 buffer_fadd( sql, " %s = %ld", field_name, atol( value ) );
5929                         } else if( !strcmp( numtype, "NUMERIC" ) ) {
5930                                 buffer_fadd( sql, " %s = %f", field_name, atof( value ) );
5931                         } else {
5932                                 // Must really be intended as a string, so quote it
5933                                 if( dbi_conn_quote_string( dbhandle, &value )) {
5934                                         buffer_fadd( sql, " %s = %s", field_name, value );
5935                                 } else {
5936                                         osrfLogError( OSRF_LOG_MARK, "%s: Error quoting string [%s]",
5937                                                 modulename, value );
5938                                         osrfAppSessionStatus(
5939                                                 ctx->session,
5940                                                 OSRF_STATUS_INTERNALSERVERERROR,
5941                                                 "osrfMethodException",
5942                                                 ctx->request,
5943                                                 "Error quoting string -- please see the error log for more details"
5944                                         );
5945                                         free( value );
5946                                         free( id );
5947                                         osrfHashIteratorFree( field_itr );
5948                                         buffer_free( sql );
5949                                         osrfAppRespondComplete( ctx, NULL );
5950                                         return -1;
5951                                 }
5952                         }
5953
5954                         osrfLogDebug( OSRF_LOG_MARK, "%s is of type %s", field_name, numtype );
5955
5956                 } else {
5957                         if( dbi_conn_quote_string( dbhandle, &value ) ) {
5958                                 if( first )
5959                                         first = 0;
5960                                 else
5961                                         OSRF_BUFFER_ADD_CHAR( sql, ',' );
5962                                 buffer_fadd( sql, " %s = %s", field_name, value );
5963                         } else {
5964                                 osrfLogError( OSRF_LOG_MARK, "%s: Error quoting string [%s]", modulename, value );
5965                                 osrfAppSessionStatus(
5966                                         ctx->session,
5967                                         OSRF_STATUS_INTERNALSERVERERROR,
5968                                         "osrfMethodException",
5969                                         ctx->request,
5970                                         "Error quoting string -- please see the error log for more details"
5971                                 );
5972                                 free( value );
5973                                 free( id );
5974                                 osrfHashIteratorFree( field_itr );
5975                                 buffer_free( sql );
5976                                 osrfAppRespondComplete( ctx, NULL );
5977                                 return -1;
5978                         }
5979                 }
5980
5981                 free( value );
5982
5983         } // end while
5984
5985         osrfHashIteratorFree( field_itr );
5986
5987         jsonObject* obj = jsonNewObject( id );
5988
5989         if( strcmp( get_primitive( osrfHashGet( osrfHashGet(meta, "fields"), pkey )), "number" ))
5990                 dbi_conn_quote_string( dbhandle, &id );
5991
5992         buffer_fadd( sql, " WHERE %s = %s;", pkey, id );
5993
5994         char* query = buffer_release( sql );
5995         osrfLogDebug( OSRF_LOG_MARK, "%s: Update SQL [%s]", modulename, query );
5996
5997         dbi_result result = dbi_conn_query( dbhandle, query );
5998         free( query );
5999
6000         int rc = 0;
6001         if( !result ) {
6002                 jsonObjectFree( obj );
6003                 obj = jsonNewObject( NULL );
6004                 const char* msg;
6005                 int errnum = dbi_conn_error( dbhandle, &msg );
6006                 osrfLogError(
6007                         OSRF_LOG_MARK,
6008                         "%s ERROR updating %s object with %s = %s: %d %s",
6009                         modulename,
6010                         osrfHashGet( meta, "fieldmapper" ),
6011                         pkey,
6012                         id,
6013                         errnum,
6014                         msg ? msg : "(No description available)"
6015                 );
6016                 osrfAppSessionStatus(
6017                         ctx->session,
6018                         OSRF_STATUS_INTERNALSERVERERROR,
6019                         "osrfMethodException",
6020                         ctx->request,
6021                         "Error in updating a row -- please see the error log for more details"
6022                 );
6023                 if( !oilsIsDBConnected( dbhandle ))
6024                         osrfAppSessionPanic( ctx->session );
6025                 rc = -1;
6026         } else
6027                 dbi_result_free( result );
6028
6029         free( id );
6030         osrfAppRespondComplete( ctx, obj );
6031         jsonObjectFree( obj );
6032         return rc;
6033 }
6034
6035 int doDelete( osrfMethodContext* ctx ) {
6036         if( osrfMethodVerifyContext( ctx )) {
6037                 osrfLogError( OSRF_LOG_MARK, "Invalid method context" );
6038                 return -1;
6039         }
6040
6041         if( enforce_pcrud )
6042                 timeout_needs_resetting = 1;
6043
6044         osrfHash* meta = osrfHashGet( (osrfHash*) ctx->method->userData, "class" );
6045
6046         if( getXactId( ctx ) == NULL ) {
6047                 osrfAppSessionStatus(
6048                         ctx->session,
6049                         OSRF_STATUS_BADREQUEST,
6050                         "osrfMethodException",
6051                         ctx->request,
6052                         "No active transaction -- required for DELETE"
6053                 );
6054                 osrfAppRespondComplete( ctx, NULL );
6055                 return -1;
6056         }
6057
6058         // The following test is harmless but redundant.  If a class is
6059         // readonly, we don't register a delete method for it.
6060         if( str_is_true( osrfHashGet( meta, "readonly" ) ) ) {
6061                 osrfAppSessionStatus(
6062                         ctx->session,
6063                         OSRF_STATUS_BADREQUEST,
6064                         "osrfMethodException",
6065                         ctx->request,
6066                         "Cannot DELETE readonly class"
6067                 );
6068                 osrfAppRespondComplete( ctx, NULL );
6069                 return -1;
6070         }
6071
6072         dbhandle = writehandle;
6073
6074         char* pkey = osrfHashGet( meta, "primarykey" );
6075
6076         int _obj_pos = 0;
6077         if( enforce_pcrud )
6078                 _obj_pos = 1;
6079
6080         char* id;
6081         if( jsonObjectGetIndex( ctx->params, _obj_pos )->classname ) {
6082                 if( !verifyObjectClass( ctx, jsonObjectGetIndex( ctx->params, _obj_pos ))) {
6083                         osrfAppRespondComplete( ctx, NULL );
6084                         return -1;
6085                 }
6086
6087                 id = oilsFMGetString( jsonObjectGetIndex(ctx->params, _obj_pos), pkey );
6088         } else {
6089                 if( enforce_pcrud && !verifyObjectPCRUD( ctx, NULL, 1 )) {
6090                         osrfAppRespondComplete( ctx, NULL );
6091                         return -1;
6092                 }
6093                 id = jsonObjectToSimpleString( jsonObjectGetIndex( ctx->params, _obj_pos ));
6094         }
6095
6096         osrfLogDebug(
6097                 OSRF_LOG_MARK,
6098                 "%s deleting %s object with %s = %s",
6099                 modulename,
6100                 osrfHashGet( meta, "fieldmapper" ),
6101                 pkey,
6102                 id
6103         );
6104
6105         jsonObject* obj = jsonNewObject( id );
6106
6107         if( strcmp( get_primitive( osrfHashGet( osrfHashGet(meta, "fields"), pkey ) ), "number" ) )
6108                 dbi_conn_quote_string( writehandle, &id );
6109
6110         dbi_result result = dbi_conn_queryf( writehandle, "DELETE FROM %s WHERE %s = %s;",
6111                 osrfHashGet( meta, "tablename" ), pkey, id );
6112
6113         int rc = 0;
6114         if( !result ) {
6115                 rc = -1;
6116                 jsonObjectFree( obj );
6117                 obj = jsonNewObject( NULL );
6118                 const char* msg;
6119                 int errnum = dbi_conn_error( writehandle, &msg );
6120                 osrfLogError(
6121                         OSRF_LOG_MARK,
6122                         "%s ERROR deleting %s object with %s = %s: %d %s",
6123                         modulename,
6124                         osrfHashGet( meta, "fieldmapper" ),
6125                         pkey,
6126                         id,
6127                         errnum,
6128                         msg ? msg : "(No description available)"
6129                 );
6130                 osrfAppSessionStatus(
6131                         ctx->session,
6132                         OSRF_STATUS_INTERNALSERVERERROR,
6133                         "osrfMethodException",
6134                         ctx->request,
6135                         "Error in deleting a row -- please see the error log for more details"
6136                 );
6137                 if( !oilsIsDBConnected( writehandle ))
6138                         osrfAppSessionPanic( ctx->session );
6139         } else
6140                 dbi_result_free( result );
6141
6142         free( id );
6143
6144         osrfAppRespondComplete( ctx, obj );
6145         jsonObjectFree( obj );
6146         return rc;
6147 }
6148
6149 /**
6150         @brief Translate a row returned from the database into a jsonObject of type JSON_ARRAY.
6151         @param result An iterator for a result set; we only look at the current row.
6152         @param @meta Pointer to the class metadata for the core class.
6153         @return Pointer to the resulting jsonObject if successful; otherwise NULL.
6154
6155         If a column is not defined in the IDL, or if it has no array_position defined for it in
6156         the IDL, or if it is defined as virtual, ignore it.
6157
6158         Otherwise, translate the column value into a jsonObject of type JSON_NULL, JSON_NUMBER,
6159         or JSON_STRING.  Then insert this jsonObject into the JSON_ARRAY according to its
6160         array_position in the IDL.
6161
6162         A field defined in the IDL but not represented in the returned row will leave a hole
6163         in the JSON_ARRAY.  In effect it will be treated as a null value.
6164
6165         In the resulting JSON_ARRAY, the field values appear in the sequence defined by the IDL,
6166         regardless of their sequence in the SELECT statement.  The JSON_ARRAY is assigned the
6167         classname corresponding to the @a meta argument.
6168
6169         The calling code is responsible for freeing the the resulting jsonObject by calling
6170         jsonObjectFree().
6171 */
6172 static jsonObject* oilsMakeFieldmapperFromResult( dbi_result result, osrfHash* meta) {
6173         if( !( result && meta )) return NULL;
6174
6175         jsonObject* object = jsonNewObjectType( JSON_ARRAY );
6176         jsonObjectSetClass( object, osrfHashGet( meta, "classname" ));
6177         osrfLogInternal( OSRF_LOG_MARK, "Setting object class to %s ", object->classname );
6178
6179         osrfHash* fields = osrfHashGet( meta, "fields" );
6180
6181         int columnIndex = 1;
6182         const char* columnName;
6183
6184         /* cycle through the columns in the row returned from the database */
6185         while( (columnName = dbi_result_get_field_name( result, columnIndex )) ) {
6186
6187                 osrfLogInternal( OSRF_LOG_MARK, "Looking for column named [%s]...", (char*) columnName );
6188
6189                 int fmIndex = -1;  // Will be set to the IDL's sequence number for this field
6190
6191                 /* determine the field type and storage attributes */
6192                 unsigned short type = dbi_result_get_field_type_idx( result, columnIndex );
6193                 int attr            = dbi_result_get_field_attribs_idx( result, columnIndex );
6194
6195                 // Fetch the IDL's sequence number for the field.  If the field isn't in the IDL,
6196                 // or if it has no sequence number there, or if it's virtual, skip it.
6197                 osrfHash* _f = osrfHashGet( fields, (char*) columnName );
6198                 if( _f ) {
6199
6200                         if( str_is_true( osrfHashGet( _f, "virtual" )))
6201                                 continue;   // skip this column: IDL says it's virtual
6202
6203                         const char* pos = (char*) osrfHashGet( _f, "array_position" );
6204                         if( !pos )      // IDL has no sequence number for it.  This shouldn't happen,
6205                                 continue;    // since we assign sequence numbers dynamically as we load the IDL.
6206
6207                         fmIndex = atoi( pos );
6208                         osrfLogInternal( OSRF_LOG_MARK, "... Found column at position [%s]...", pos );
6209                 } else {
6210                         continue;     // This field is not defined in the IDL
6211                 }
6212
6213                 // Stuff the column value into a slot in the JSON_ARRAY, indexed according to the
6214                 // sequence number from the IDL (which is likely to be different from the sequence
6215                 // of columns in the SELECT clause).
6216                 if( dbi_result_field_is_null_idx( result, columnIndex )) {
6217                         jsonObjectSetIndex( object, fmIndex, jsonNewObject( NULL ));
6218                 } else {
6219
6220                         switch( type ) {
6221
6222                                 case DBI_TYPE_INTEGER :
6223
6224                                         if( attr & DBI_INTEGER_SIZE8 )
6225                                                 jsonObjectSetIndex( object, fmIndex,
6226                                                         jsonNewNumberObject(
6227                                                                 dbi_result_get_longlong_idx( result, columnIndex )));
6228                                         else
6229                                                 jsonObjectSetIndex( object, fmIndex,
6230                                                         jsonNewNumberObject( dbi_result_get_int_idx( result, columnIndex )));
6231
6232                                         break;
6233
6234                                 case DBI_TYPE_DECIMAL :
6235                                         jsonObjectSetIndex( object, fmIndex,
6236                                                         jsonNewNumberObject( dbi_result_get_double_idx(result, columnIndex )));
6237                                         break;
6238
6239                                 case DBI_TYPE_STRING :
6240
6241                                         jsonObjectSetIndex(
6242                                                 object,
6243                                                 fmIndex,
6244                                                 jsonNewObject( dbi_result_get_string_idx( result, columnIndex ))
6245                                         );
6246
6247                                         break;
6248
6249                                 case DBI_TYPE_DATETIME : {
6250
6251                                         char dt_string[ 256 ] = "";
6252                                         struct tm gmdt;
6253
6254                                         // Fetch the date column as a time_t
6255                                         time_t _tmp_dt = dbi_result_get_datetime_idx( result, columnIndex );
6256
6257                                         // Translate the time_t to a human-readable string
6258                                         if( !( attr & DBI_DATETIME_DATE )) {
6259                                                 gmtime_r( &_tmp_dt, &gmdt );
6260                                                 strftime( dt_string, sizeof( dt_string ), "%T", &gmdt );
6261                                         } else if( !( attr & DBI_DATETIME_TIME )) {
6262                                                 localtime_r( &_tmp_dt, &gmdt );
6263                                                 strftime( dt_string, sizeof( dt_string ), "%F", &gmdt );
6264                                         } else {
6265                                                 localtime_r( &_tmp_dt, &gmdt );
6266                                                 strftime( dt_string, sizeof( dt_string ), "%FT%T%z", &gmdt );
6267                                         }
6268
6269                                         jsonObjectSetIndex( object, fmIndex, jsonNewObject( dt_string ));
6270
6271                                         break;
6272                                 }
6273                                 case DBI_TYPE_BINARY :
6274                                         osrfLogError( OSRF_LOG_MARK,
6275                                                 "Can't do binary at column %s : index %d", columnName, columnIndex );
6276                         } // End switch
6277                 }
6278                 ++columnIndex;
6279         } // End while
6280
6281         return object;
6282 }
6283
6284 static jsonObject* oilsMakeJSONFromResult( dbi_result result ) {
6285         if( !result ) return NULL;
6286
6287         jsonObject* object = jsonNewObject( NULL );
6288
6289         time_t _tmp_dt;
6290         char dt_string[ 256 ];
6291         struct tm gmdt;
6292
6293         int fmIndex;
6294         int columnIndex = 1;
6295         int attr;
6296         unsigned short type;
6297         const char* columnName;
6298
6299         /* cycle through the column list */
6300         while(( columnName = dbi_result_get_field_name( result, columnIndex ))) {
6301
6302                 osrfLogInternal( OSRF_LOG_MARK, "Looking for column named [%s]...", (char*) columnName );
6303
6304                 fmIndex = -1; // reset the position
6305
6306                 /* determine the field type and storage attributes */
6307                 type = dbi_result_get_field_type_idx( result, columnIndex );
6308                 attr = dbi_result_get_field_attribs_idx( result, columnIndex );
6309
6310                 if( dbi_result_field_is_null_idx( result, columnIndex )) {
6311                         jsonObjectSetKey( object, columnName, jsonNewObject( NULL ));
6312                 } else {
6313
6314                         switch( type ) {
6315
6316                                 case DBI_TYPE_INTEGER :
6317
6318                                         if( attr & DBI_INTEGER_SIZE8 )
6319                                                 jsonObjectSetKey( object, columnName,
6320                                                                 jsonNewNumberObject( dbi_result_get_longlong_idx(
6321                                                                                 result, columnIndex )) );
6322                                         else
6323                                                 jsonObjectSetKey( object, columnName, jsonNewNumberObject(
6324                                                                 dbi_result_get_int_idx( result, columnIndex )) );
6325                                         break;
6326
6327                                 case DBI_TYPE_DECIMAL :
6328                                         jsonObjectSetKey( object, columnName, jsonNewNumberObject(
6329                                                 dbi_result_get_double_idx( result, columnIndex )) );
6330                                         break;
6331
6332                                 case DBI_TYPE_STRING :
6333                                         jsonObjectSetKey( object, columnName,
6334                                                 jsonNewObject( dbi_result_get_string_idx( result, columnIndex )));
6335                                         break;
6336
6337                                 case DBI_TYPE_DATETIME :
6338
6339                                         memset( dt_string, '\0', sizeof( dt_string ));
6340                                         memset( &gmdt, '\0', sizeof( gmdt ));
6341
6342                                         _tmp_dt = dbi_result_get_datetime_idx( result, columnIndex );
6343
6344                                         if( !( attr & DBI_DATETIME_DATE )) {
6345                                                 gmtime_r( &_tmp_dt, &gmdt );
6346                                                 strftime( dt_string, sizeof( dt_string ), "%T", &gmdt );
6347                                         } else if( !( attr & DBI_DATETIME_TIME )) {
6348                                                 localtime_r( &_tmp_dt, &gmdt );
6349                                                 strftime( dt_string, sizeof( dt_string ), "%F", &gmdt );
6350                                         } else {
6351                                                 localtime_r( &_tmp_dt, &gmdt );
6352                                                 strftime( dt_string, sizeof( dt_string ), "%FT%T%z", &gmdt );
6353                                         }
6354
6355                                         jsonObjectSetKey( object, columnName, jsonNewObject( dt_string ));
6356                                         break;
6357
6358                                 case DBI_TYPE_BINARY :
6359                                         osrfLogError( OSRF_LOG_MARK,
6360                                                 "Can't do binary at column %s : index %d", columnName, columnIndex );
6361                         }
6362                 }
6363                 ++columnIndex;
6364         } // end while loop traversing result
6365
6366         return object;
6367 }
6368
6369 // Interpret a string as true or false
6370 int str_is_true( const char* str ) {
6371         if( NULL == str || strcasecmp( str, "true" ) )
6372                 return 0;
6373         else
6374                 return 1;
6375 }
6376
6377 // Interpret a jsonObject as true or false
6378 static int obj_is_true( const jsonObject* obj ) {
6379         if( !obj )
6380                 return 0;
6381         else switch( obj->type )
6382         {
6383                 case JSON_BOOL :
6384                         if( obj->value.b )
6385                                 return 1;
6386                         else
6387                                 return 0;
6388                 case JSON_STRING :
6389                         if( strcasecmp( obj->value.s, "true" ) )
6390                                 return 0;
6391                         else
6392                                 return 1;
6393                 case JSON_NUMBER :          // Support 1/0 for perl's sake
6394                         if( jsonObjectGetNumber( obj ) == 1.0 )
6395                                 return 1;
6396                         else
6397                                 return 0;
6398                 default :
6399                         return 0;
6400         }
6401 }
6402
6403 // Translate a numeric code into a text string identifying a type of
6404 // jsonObject.  To be used for building error messages.
6405 static const char* json_type( int code ) {
6406         switch ( code )
6407         {
6408                 case 0 :
6409                         return "JSON_HASH";
6410                 case 1 :
6411                         return "JSON_ARRAY";
6412                 case 2 :
6413                         return "JSON_STRING";
6414                 case 3 :
6415                         return "JSON_NUMBER";
6416                 case 4 :
6417                         return "JSON_NULL";
6418                 case 5 :
6419                         return "JSON_BOOL";
6420                 default :
6421                         return "(unrecognized)";
6422         }
6423 }
6424
6425 // Extract the "primitive" attribute from an IDL field definition.
6426 // If we haven't initialized the app, then we must be running in
6427 // some kind of testbed.  In that case, default to "string".
6428 static const char* get_primitive( osrfHash* field ) {
6429         const char* s = osrfHashGet( field, "primitive" );
6430         if( !s ) {
6431                 if( child_initialized )
6432                         osrfLogError(
6433                                 OSRF_LOG_MARK,
6434                                 "%s ERROR No \"datatype\" attribute for field \"%s\"",
6435                                 modulename,
6436                                 osrfHashGet( field, "name" )
6437                         );
6438
6439                 s = "string";
6440         }
6441         return s;
6442 }
6443
6444 // Extract the "datatype" attribute from an IDL field definition.
6445 // If we haven't initialized the app, then we must be running in
6446 // some kind of testbed.  In that case, default to to NUMERIC,
6447 // since we look at the datatype only for numbers.
6448 static const char* get_datatype( osrfHash* field ) {
6449         const char* s = osrfHashGet( field, "datatype" );
6450         if( !s ) {
6451                 if( child_initialized )
6452                         osrfLogError(
6453                                 OSRF_LOG_MARK,
6454                                 "%s ERROR No \"datatype\" attribute for field \"%s\"",
6455                                 modulename,
6456                                 osrfHashGet( field, "name" )
6457                         );
6458                 else
6459                         s = "NUMERIC";
6460         }
6461         return s;
6462 }
6463
6464 /**
6465         @brief Determine whether a string is potentially a valid SQL identifier.
6466         @param s The identifier to be tested.
6467         @return 1 if the input string is potentially a valid SQL identifier, or 0 if not.
6468
6469         Purpose: to prevent certain kinds of SQL injection.  To that end we don't necessarily
6470         need to follow all the rules exactly, such as requiring that the first character not
6471         be a digit.
6472
6473         We allow leading and trailing white space.  In between, we do not allow punctuation
6474         (except for underscores and dollar signs), control characters, or embedded white space.
6475
6476         More pedantically we should allow quoted identifiers containing arbitrary characters, but
6477         for the foreseeable future such quoted identifiers are not likely to be an issue.
6478 */
6479 int is_identifier( const char* s) {
6480         if( !s )
6481                 return 0;
6482
6483         // Skip leading white space
6484         while( isspace( (unsigned char) *s ) )
6485                 ++s;
6486
6487         if( !s )
6488                 return 0;   // Nothing but white space?  Not okay.
6489
6490         // Check each character until we reach white space or
6491         // end-of-string.  Letters, digits, underscores, and
6492         // dollar signs are okay. With the exception of periods
6493         // (as in schema.identifier), control characters and other
6494         // punctuation characters are not okay.  Anything else
6495         // is okay -- it could for example be part of a multibyte
6496         // UTF8 character such as a letter with diacritical marks,
6497         // and those are allowed.
6498         do {
6499                 if( isalnum( (unsigned char) *s )
6500                         || '.' == *s
6501                         || '_' == *s
6502                         || '$' == *s )
6503                         ;  // Fine; keep going
6504                 else if(   ispunct( (unsigned char) *s )
6505                                 || iscntrl( (unsigned char) *s ) )
6506                         return 0;
6507                         ++s;
6508         } while( *s && ! isspace( (unsigned char) *s ) );
6509
6510         // If we found any white space in the above loop,
6511         // the rest had better be all white space.
6512
6513         while( isspace( (unsigned char) *s ) )
6514                 ++s;
6515
6516         if( *s )
6517                 return 0;   // White space was embedded within non-white space
6518
6519         return 1;
6520 }
6521
6522 /**
6523         @brief Determine whether to accept a character string as a comparison operator.
6524         @param op The candidate comparison operator.
6525         @return 1 if the string is acceptable as a comparison operator, or 0 if not.
6526
6527         We don't validate the operator for real.  We just make sure that it doesn't contain
6528         any semicolons or white space (with special exceptions for a few specific operators).
6529         The idea is to block certain kinds of SQL injection.  If it has no semicolons or white
6530         space but it's still not a valid operator, then the database will complain.
6531
6532         Another approach would be to compare the string against a short list of approved operators.
6533         We don't do that because we want to allow custom operators like ">100*", which at this
6534         writing would be difficult or impossible to express otherwise in a JSON query.
6535 */
6536 int is_good_operator( const char* op ) {
6537         if( !op ) return 0;   // Sanity check
6538
6539         const char* s = op;
6540         while( *s ) {
6541                 if( isspace( (unsigned char) *s ) ) {
6542                         // Special exceptions for SIMILAR TO, IS DISTINCT FROM,
6543                         // and IS NOT DISTINCT FROM.
6544                         if( !strcasecmp( op, "similar to" ) )
6545                                 return 1;
6546                         else if( !strcasecmp( op, "is distinct from" ) )
6547                                 return 1;
6548                         else if( !strcasecmp( op, "is not distinct from" ) )
6549                                 return 1;
6550                         else
6551                                 return 0;
6552                 }
6553                 else if( ';' == *s )
6554                         return 0;
6555                 ++s;
6556         }
6557         return 1;
6558 }
6559
6560 /**
6561         @name Query Frame Management
6562
6563         The following machinery supports a stack of query frames for use by SELECT().
6564
6565         A query frame caches information about one level of a SELECT query.  When we enter
6566         a subquery, we push another query frame onto the stack, and pop it off when we leave.
6567
6568         The query frame stores information about the core class, and about any joined classes
6569         in the FROM clause.
6570
6571         The main purpose is to map table aliases to classes and tables, so that a query can
6572         join to the same table more than once.  A secondary goal is to reduce the number of
6573         lookups in the IDL by caching the results.
6574 */
6575 /*@{*/
6576
6577 #define STATIC_CLASS_INFO_COUNT 3
6578
6579 static ClassInfo static_class_info[ STATIC_CLASS_INFO_COUNT ];
6580
6581 /**
6582         @brief Allocate a ClassInfo as raw memory.
6583         @return Pointer to the newly allocated ClassInfo.
6584
6585         Except for the in_use flag, which is used only by the allocation and deallocation
6586         logic, we don't initialize the ClassInfo here.
6587 */
6588 static ClassInfo* allocate_class_info( void ) {
6589         // In order to reduce the number of mallocs and frees, we return a static
6590         // instance of ClassInfo, if we can find one that we're not already using.
6591         // We rely on the fact that the compiler will implicitly initialize the
6592         // static instances so that in_use == 0.
6593
6594         int i;
6595         for( i = 0; i < STATIC_CLASS_INFO_COUNT; ++i ) {
6596                 if( ! static_class_info[ i ].in_use ) {
6597                         static_class_info[ i ].in_use = 1;
6598                         return static_class_info + i;
6599                 }
6600         }
6601
6602         // The static ones are all in use.  Malloc one.
6603
6604         return safe_malloc( sizeof( ClassInfo ) );
6605 }
6606
6607 /**
6608         @brief Free any malloc'd memory owned by a ClassInfo, returning it to a pristine state.
6609         @param info Pointer to the ClassInfo to be cleared.
6610 */
6611 static void clear_class_info( ClassInfo* info ) {
6612         // Sanity check
6613         if( ! info )
6614                 return;
6615
6616         // Free any malloc'd strings
6617
6618         if( info->alias != info->alias_store )
6619                 free( info->alias );
6620
6621         if( info->class_name != info->class_name_store )
6622                 free( info->class_name );
6623
6624         free( info->source_def );
6625
6626         info->alias = info->class_name = info->source_def = NULL;
6627         info->next = NULL;
6628 }
6629
6630 /**
6631         @brief Free a ClassInfo and everything it owns.
6632         @param info Pointer to the ClassInfo to be freed.
6633 */
6634 static void free_class_info( ClassInfo* info ) {
6635         // Sanity check
6636         if( ! info )
6637                 return;
6638
6639         clear_class_info( info );
6640
6641         // If it's one of the static instances, just mark it as not in use
6642
6643         int i;
6644         for( i = 0; i < STATIC_CLASS_INFO_COUNT; ++i ) {
6645                 if( info == static_class_info + i ) {
6646                         static_class_info[ i ].in_use = 0;
6647                         return;
6648                 }
6649         }
6650
6651         // Otherwise it must have been malloc'd, so free it
6652
6653         free( info );
6654 }
6655
6656 /**
6657         @brief Populate an already-allocated ClassInfo.
6658         @param info Pointer to the ClassInfo to be populated.
6659         @param alias Alias for the class.  If it is NULL, or an empty string, use the class
6660         name for an alias.
6661         @param class Name of the class.
6662         @return Zero if successful, or 1 if not.
6663
6664         Populate the ClassInfo with copies of the alias and class name, and with pointers to
6665         the relevant portions of the IDL for the specified class.
6666 */
6667 static int build_class_info( ClassInfo* info, const char* alias, const char* class ) {
6668         // Sanity checks
6669         if( ! info ){
6670                 osrfLogError( OSRF_LOG_MARK,
6671                                           "%s ERROR: No ClassInfo available to populate", modulename );
6672                 info->alias = info->class_name = info->source_def = NULL;
6673                 info->class_def = info->fields = info->links = NULL;
6674                 return 1;
6675         }
6676
6677         if( ! class ) {
6678                 osrfLogError( OSRF_LOG_MARK,
6679                                           "%s ERROR: No class name provided for lookup", modulename );
6680                 info->alias = info->class_name = info->source_def = NULL;
6681                 info->class_def = info->fields = info->links = NULL;
6682                 return 1;
6683         }
6684
6685         // Alias defaults to class name if not supplied
6686         if( ! alias || ! alias[ 0 ] )
6687                 alias = class;
6688
6689         // Look up class info in the IDL
6690         osrfHash* class_def = osrfHashGet( oilsIDL(), class );
6691         if( ! class_def ) {
6692                 osrfLogError( OSRF_LOG_MARK,
6693                                           "%s ERROR: Class %s not defined in IDL", modulename, class );
6694                 info->alias = info->class_name = info->source_def = NULL;
6695                 info->class_def = info->fields = info->links = NULL;
6696                 return 1;
6697         } else if( str_is_true( osrfHashGet( class_def, "virtual" ) ) ) {
6698                 osrfLogError( OSRF_LOG_MARK,
6699                                           "%s ERROR: Class %s is defined as virtual", modulename, class );
6700                 info->alias = info->class_name = info->source_def = NULL;
6701                 info->class_def = info->fields = info->links = NULL;
6702                 return 1;
6703         }
6704
6705         osrfHash* links = osrfHashGet( class_def, "links" );
6706         if( ! links ) {
6707                 osrfLogError( OSRF_LOG_MARK,
6708                                           "%s ERROR: No links defined in IDL for class %s", modulename, class );
6709                 info->alias = info->class_name = info->source_def = NULL;
6710                 info->class_def = info->fields = info->links = NULL;
6711                 return 1;
6712         }
6713
6714         osrfHash* fields = osrfHashGet( class_def, "fields" );
6715         if( ! fields ) {
6716                 osrfLogError( OSRF_LOG_MARK,
6717                                           "%s ERROR: No fields defined in IDL for class %s", modulename, class );
6718                 info->alias = info->class_name = info->source_def = NULL;
6719                 info->class_def = info->fields = info->links = NULL;
6720                 return 1;
6721         }
6722
6723         char* source_def = oilsGetRelation( class_def );
6724         if( ! source_def )
6725                 return 1;
6726
6727         // We got everything we need, so populate the ClassInfo
6728         if( strlen( alias ) > ALIAS_STORE_SIZE )
6729                 info->alias = strdup( alias );
6730         else {
6731                 strcpy( info->alias_store, alias );
6732                 info->alias = info->alias_store;
6733         }
6734
6735         if( strlen( class ) > CLASS_NAME_STORE_SIZE )
6736                 info->class_name = strdup( class );
6737         else {
6738                 strcpy( info->class_name_store, class );
6739                 info->class_name = info->class_name_store;
6740         }
6741
6742         info->source_def = source_def;
6743
6744         info->class_def = class_def;
6745         info->links     = links;
6746         info->fields    = fields;
6747
6748         return 0;
6749 }
6750
6751 #define STATIC_FRAME_COUNT 3
6752
6753 static QueryFrame static_frame[ STATIC_FRAME_COUNT ];
6754
6755 /**
6756         @brief Allocate a QueryFrame as raw memory.
6757         @return Pointer to the newly allocated QueryFrame.
6758
6759         Except for the in_use flag, which is used only by the allocation and deallocation
6760         logic, we don't initialize the QueryFrame here.
6761 */
6762 static QueryFrame* allocate_frame( void ) {
6763         // In order to reduce the number of mallocs and frees, we return a static
6764         // instance of QueryFrame, if we can find one that we're not already using.
6765         // We rely on the fact that the compiler will implicitly initialize the
6766         // static instances so that in_use == 0.
6767
6768         int i;
6769         for( i = 0; i < STATIC_FRAME_COUNT; ++i ) {
6770                 if( ! static_frame[ i ].in_use ) {
6771                         static_frame[ i ].in_use = 1;
6772                         return static_frame + i;
6773                 }
6774         }
6775
6776         // The static ones are all in use.  Malloc one.
6777
6778         return safe_malloc( sizeof( QueryFrame ) );
6779 }
6780
6781 /**
6782         @brief Free a QueryFrame, and all the memory it owns.
6783         @param frame Pointer to the QueryFrame to be freed.
6784 */
6785 static void free_query_frame( QueryFrame* frame ) {
6786         // Sanity check
6787         if( ! frame )
6788                 return;
6789
6790         clear_class_info( &frame->core );
6791
6792         // Free the join list
6793         ClassInfo* temp;
6794         ClassInfo* info = frame->join_list;
6795         while( info ) {
6796                 temp = info->next;
6797                 free_class_info( info );
6798                 info = temp;
6799         }
6800
6801         frame->join_list = NULL;
6802         frame->next = NULL;
6803
6804         // If the frame is a static instance, just mark it as unused
6805         int i;
6806         for( i = 0; i < STATIC_FRAME_COUNT; ++i ) {
6807                 if( frame == static_frame + i ) {
6808                         static_frame[ i ].in_use = 0;
6809                         return;
6810                 }
6811         }
6812
6813         // Otherwise it must have been malloc'd, so free it
6814
6815         free( frame );
6816 }
6817
6818 /**
6819         @brief Search a given QueryFrame for a specified alias.
6820         @param frame Pointer to the QueryFrame to be searched.
6821         @param target The alias for which to search.
6822         @return Pointer to the ClassInfo for the specified alias, if found; otherwise NULL.
6823 */
6824 static ClassInfo* search_alias_in_frame( QueryFrame* frame, const char* target ) {
6825         if( ! frame || ! target ) {
6826                 return NULL;
6827         }
6828
6829         ClassInfo* found_class = NULL;
6830
6831         if( !strcmp( target, frame->core.alias ) )
6832                 return &(frame->core);
6833         else {
6834                 ClassInfo* curr_class = frame->join_list;
6835                 while( curr_class ) {
6836                         if( strcmp( target, curr_class->alias ) )
6837                                 curr_class = curr_class->next;
6838                         else {
6839                                 found_class = curr_class;
6840                                 break;
6841                         }
6842                 }
6843         }
6844
6845         return found_class;
6846 }
6847
6848 /**
6849         @brief Push a new (blank) QueryFrame onto the stack.
6850 */
6851 static void push_query_frame( void ) {
6852         QueryFrame* frame = allocate_frame();
6853         frame->join_list = NULL;
6854         frame->next = curr_query;
6855
6856         // Initialize the ClassInfo for the core class
6857         ClassInfo* core = &frame->core;
6858         core->alias = core->class_name = core->source_def = NULL;
6859         core->class_def = core->fields = core->links = NULL;
6860
6861         curr_query = frame;
6862 }
6863
6864 /**
6865         @brief Pop a QueryFrame off the stack and destroy it.
6866 */
6867 static void pop_query_frame( void ) {
6868         // Sanity check
6869         if( ! curr_query )
6870                 return;
6871
6872         QueryFrame* popped = curr_query;
6873         curr_query = popped->next;
6874
6875         free_query_frame( popped );
6876 }
6877
6878 /**
6879         @brief Populate the ClassInfo for the core class.
6880         @param alias Alias for the core class.  If it is NULL or an empty string, we use the
6881         class name as an alias.
6882         @param class_name Name of the core class.
6883         @return Zero if successful, or 1 if not.
6884
6885         Populate the ClassInfo of the core class with copies of the alias and class name, and
6886         with pointers to the relevant portions of the IDL for the core class.
6887 */
6888 static int add_query_core( const char* alias, const char* class_name ) {
6889
6890         // Sanity checks
6891         if( ! curr_query ) {
6892                 osrfLogError( OSRF_LOG_MARK,
6893                                           "%s ERROR: No QueryFrame available for class %s", modulename, class_name );
6894                 return 1;
6895         } else if( curr_query->core.alias ) {
6896                 osrfLogError( OSRF_LOG_MARK,
6897                                           "%s ERROR: Core class %s already populated as %s",
6898                                           modulename, curr_query->core.class_name, curr_query->core.alias );
6899                 return 1;
6900         }
6901
6902         build_class_info( &curr_query->core, alias, class_name );
6903         if( curr_query->core.alias )
6904                 return 0;
6905         else {
6906                 osrfLogError( OSRF_LOG_MARK,
6907                                           "%s ERROR: Unable to look up core class %s", modulename, class_name );
6908                 return 1;
6909         }
6910 }
6911
6912 /**
6913         @brief Search the current QueryFrame for a specified alias.
6914         @param target The alias for which to search.
6915         @return A pointer to the corresponding ClassInfo, if found; otherwise NULL.
6916 */
6917 static inline ClassInfo* search_alias( const char* target ) {
6918         return search_alias_in_frame( curr_query, target );
6919 }
6920
6921 /**
6922         @brief Search all levels of query for a specified alias, starting with the current query.
6923         @param target The alias for which to search.
6924         @return A pointer to the corresponding ClassInfo, if found; otherwise NULL.
6925 */
6926 static ClassInfo* search_all_alias( const char* target ) {
6927         ClassInfo* found_class = NULL;
6928         QueryFrame* curr_frame = curr_query;
6929
6930         while( curr_frame ) {
6931                 if(( found_class = search_alias_in_frame( curr_frame, target ) ))
6932                         break;
6933                 else
6934                         curr_frame = curr_frame->next;
6935         }
6936
6937         return found_class;
6938 }
6939
6940 /**
6941         @brief Add a class to the list of classes joined to the current query.
6942         @param alias Alias of the class to be added.  If it is NULL or an empty string, we use
6943         the class name as an alias.
6944         @param classname The name of the class to be added.
6945         @return A pointer to the ClassInfo for the added class, if successful; otherwise NULL.
6946 */
6947 static ClassInfo* add_joined_class( const char* alias, const char* classname ) {
6948
6949         if( ! classname || ! *classname ) {    // sanity check
6950                 osrfLogError( OSRF_LOG_MARK, "Can't join a class with no class name" );
6951                 return NULL;
6952         }
6953
6954         if( ! alias )
6955                 alias = classname;
6956
6957         const ClassInfo* conflict = search_alias( alias );
6958         if( conflict ) {
6959                 osrfLogError( OSRF_LOG_MARK,
6960                                           "%s ERROR: Table alias \"%s\" conflicts with class \"%s\"",
6961                                           modulename, alias, conflict->class_name );
6962                 return NULL;
6963         }
6964
6965         ClassInfo* info = allocate_class_info();
6966
6967         if( build_class_info( info, alias, classname ) ) {
6968                 free_class_info( info );
6969                 return NULL;
6970         }
6971
6972         // Add the new ClassInfo to the join list of the current QueryFrame
6973         info->next = curr_query->join_list;
6974         curr_query->join_list = info;
6975
6976         return info;
6977 }
6978
6979 /**
6980         @brief Destroy all nodes on the query stack.
6981 */
6982 static void clear_query_stack( void ) {
6983         while( curr_query )
6984                 pop_query_frame();
6985 }
6986
6987 /*@}*/