Implement the chunking of OSRF messages. I.e. bundle multiple
[OpenSRF.git] / src / libopensrf / osrf_application.c
index 836f73b..141d39f 100644 (file)
@@ -48,6 +48,8 @@
 #define OSRF_SYSMETHOD_ECHO_ATOMIC              "opensrf.system.echo.atomic"
 /*@}*/
 
+#define OSRF_MSG_BUFFER_SIZE     10240
+
 /**
        @brief Represent an Application.
 */
@@ -238,13 +240,12 @@ void osrfAppRunExitCode( void ) {
 
        The @a options parameter is zero or more of the following macros, OR'd together:
 
-       - OSRF_METHOD_SYSTEM
-       - OSRF_METHOD_STREAMING
-       - OSRF_METHOD_ATOMIC
-       - OSRF_METHOD_CACHABLE
+       - OSRF_METHOD_SYSTEM        called by static linkage (shouldn't be used here)
+       - OSRF_METHOD_STREAMING     method may return more than one response
+       - OSRF_METHOD_ATOMIC        return all responses collected in a single RESULT message
+       - OSRF_METHOD_CACHABLE      cache results in memcache
 
-       If the OSRF_METHOD_STREAMING bit is set, also register an ".atomic" version of
-       the method.
+       If the OSRF_METHOD_STREAMING bit is set, also register an ".atomic" version of the method.
 */
 int osrfAppRegisterMethod( const char* appName, const char* methodName,
                const char* symbolName, const char* notes, int argc, int options ) {
@@ -325,7 +326,7 @@ static osrfMethod* _osrfAppBuildMethod( const char* methodName, const char* symb
                methodName = "";  // should never happen
 
        if( options & OSRF_METHOD_ATOMIC ) {
-               // Append ".atomic" to the name, and make the method streaming
+               // Append ".atomic" to the name, and make the method atomic
                char mb[ strlen( methodName ) + 8 ];
                sprintf( mb, "%s.atomic", methodName );
                method->name        = strdup( mb );
@@ -486,7 +487,11 @@ int osrfAppRunMethod( const char* appName, const char* methodName,
                return osrfAppRequestRespondException(
                                ses, reqId, "An unknown server error occurred" );
 
-       return _osrfAppPostProcess( &context, retcode );
+       retcode = _osrfAppPostProcess( &context, retcode );
+
+       if( context.responses )
+               jsonObjectFree( context.responses );
+       return retcode;
 }
 
 /**
@@ -526,25 +531,64 @@ int osrfAppRespondComplete( osrfMethodContext* context, const jsonObject* data )
 }
 
 /**
+       @brief Send any response messages that have accumulated in the output buffer.
+       @param ses Pointer to the current application session.
+       @param outbuf Pointer to the output buffer.
+       @return Zero if successful, or -1 if not.
+
+       Used only by servers to respond to clients.
+*/
+static int flush_responses( osrfAppSession* ses, growing_buffer* outbuf ) {
+
+       // Collect any inbound traffic on the socket(s).  This doesn't accomplish anything for the
+       // immediate task at hand, but it may help to keep TCP from getting clogged in some cases.
+       osrf_app_session_queue_wait( ses, 0, NULL );
+
+       int rc = 0;
+       if( buffer_length( outbuf ) > 0 ) {    // If there's anything to send...
+               buffer_add_char( outbuf, ']' );    // Close the JSON array
+               if( osrfSendTransportPayload( ses, OSRF_BUFFER_C_STR( ses->outbuf ))) {
+                       osrfLogError( OSRF_LOG_MARK, "Unable to flush response buffer" );
+                       rc = -1;
+               }
+       }
+       buffer_reset( ses->outbuf );
+       return rc;
+}
+
+/**
+       @brief Add a message to an output buffer.
+       @param outbuf Pointer to the output buffer.
+       @param msg Pointer to the message to be added, in the form of a JSON string.
+
+       Since the output buffer is in the form of a JSON array, prepend a left bracket to the
+       first message, and a comma to subsequent ones.
+
+       Used only by servers to respond to clients.
+*/
+static inline void append_msg( growing_buffer* outbuf, const char* msg ) {
+       if( outbuf && msg ) {
+               char prefix = buffer_length( outbuf ) > 0 ? ',' : '[';
+               buffer_add_char( outbuf, prefix );
+               buffer_add( outbuf, msg );
+       }
+}
+
+/**
        @brief Either send or enqueue a response to a client, optionally with a completion notice.
        @param ctx Pointer to the method context.
        @param data Pointer to the response, in the form of a jsonObject.
        @param complete Boolean: if true, we will accompany the RESULT message with a STATUS
        message indicating that the response is complete.
-       @return Zero if successful, or -1 upon error.  The only recognized errors are if either
-       the @a ctx pointer or its method pointer is NULL.
+       @return Zero if successful, or -1 upon error.
 
        For an atomic method, add a copy of the response data to a cache within the method
        context, to be sent later.  In this case the @a complete parameter has no effect,
        because we'll send the STATUS message later when we send the cached results.
 
-       If the method is cachable but not atomic, do nothing, ignoring the results in @a data.
-       Apparently there are no cachable methods at this writing.  If we ever invent some, we
-       may need to revisit this function.
-
-       If the method is neither atomic nor cachable, then send a RESULT message to the client,
-       with the results in @a data.  If @a complete is true, also send a STATUS message to
-       indicate that the response is complete.
+       If the method is not atomic, translate the message into JSON and append it to a buffer,
+       flushing the buffer as needed to avoid overflow.  If @a complete is true, append
+       a STATUS message (as JSON) to the buffer and flush the buffer.
 */
 static int _osrfAppRespond( osrfMethodContext* ctx, const jsonObject* data, int complete ) {
        if(!(ctx && ctx->method)) return -1;
@@ -560,15 +604,55 @@ static int _osrfAppRespond( osrfMethodContext* ctx, const jsonObject* data, int
                // Add a copy of the data object to the cache.
                if ( data != NULL )
                        jsonObjectPush( ctx->responses, jsonObjectClone(data) );
-       }
+       } else {
+               osrfLogDebug( OSRF_LOG_MARK,
+                       "Adding responses to stash for method %s", ctx->method->name );
+
+               if( data ) {
+                       // If you want to flush the intput buffers for every output message,
+                       // this is the place to do it.
+                       //osrf_app_session_queue_wait( ctx->session, 0, NULL );
+
+                       // Create an OSRF message
+                       osrfMessage* msg = osrf_message_init( RESULT, ctx->request, 1 );
+                       osrf_message_set_status_info( msg, NULL, "OK", OSRF_STATUS_OK );
+                       osrf_message_set_result( msg, data );
+
+                       // Serialize the OSRF message into JSON text
+                       char* json = jsonObjectToJSON( osrfMessageToJSON( msg ));
+                       osrfMessageFree( msg );
+
+                       // If the new message would overflow the buffer, flush the output buffer first
+                       int len_so_far = buffer_length( ctx->session->outbuf );
+                       if( len_so_far && (strlen( json ) + len_so_far >= OSRF_MSG_BUFFER_SIZE - 3) ) {
+                               if( flush_responses( ctx->session, ctx->session->outbuf ))
+                                       return -1;
+                       }
+
+                       // Append the JSON text to the output buffer
+                       append_msg( ctx->session->outbuf, json );
+                       free( json );
+               }
 
-       if( !(ctx->method->options & OSRF_METHOD_ATOMIC ) &&
-                       !(ctx->method->options & OSRF_METHOD_CACHABLE) ) {
+               if(complete) {
+                       // Create a STATUS message
+                       osrfMessage* status_msg = osrf_message_init( STATUS, ctx->request, 1 );
+                       osrf_message_set_status_info( status_msg, "osrfConnectStatus", "Request Complete",
+                               OSRF_STATUS_COMPLETE );
 
-               if(complete)
-                       osrfAppRequestRespondComplete( ctx->session, ctx->request, data );
-               else
-                       osrfAppRequestRespond( ctx->session, ctx->request, data );
+                       // Serialize the STATUS message into JSON text
+                       char* json = jsonObjectToJSON( osrfMessageToJSON( status_msg ));
+                       osrfMessageFree( status_msg );
+
+                       // Add the STATUS message to the output buffer.
+                       // It's short, so don't worry about avoiding overflow.
+                       append_msg( ctx->session->outbuf, json );
+                       free( json );
+
+                       // Flush the output buffer, sending any accumulated messages.
+                       if( flush_responses( ctx->session, ctx->session->outbuf ))
+                               return -1;
+               }
        }
 
        return 0;
@@ -598,8 +682,6 @@ static int _osrfAppPostProcess( osrfMethodContext* ctx, int retcode ) {
                // any responses yet).  Now send them all at once, followed by a STATUS message
                // to say that we're finished.
                osrfAppRequestRespondComplete( ctx->session, ctx->request, ctx->responses );
-               jsonObjectFree(ctx->responses);
-               ctx->responses = NULL;
 
        } else {
                // We have no cached responses to return.