From 75a9906d5a5e90c60c8e0614e0c71796c511ec18 Mon Sep 17 00:00:00 2001 From: Bill Erickson Date: Fri, 28 Feb 2014 12:44:11 -0500 Subject: [PATCH] LP#1612771: implement C max_chunk_size server support Signed-off-by: Bill Erickson Signed-off-by: Galen Charlton --- src/libopensrf/osrf_application.c | 117 ++++++++++++++++++++++++------ src/libopensrf/osrf_message.c | 8 +- 2 files changed, 101 insertions(+), 24 deletions(-) diff --git a/src/libopensrf/osrf_application.c b/src/libopensrf/osrf_application.c index b79a778..cbd9371 100644 --- a/src/libopensrf/osrf_application.c +++ b/src/libopensrf/osrf_application.c @@ -733,29 +733,100 @@ static int _osrfAppRespond( osrfMethodContext* ctx, const jsonObject* data, int "Adding responses to stash for method %s", ctx->method->name ); if( data ) { - // If you want to flush the intput buffers for every output message, - // this is the place to do it. - //osrf_app_session_queue_wait( ctx->session, 0, NULL ); - - // Create an OSRF message - osrfMessage* msg = osrf_message_init( RESULT, ctx->request, 1 ); - osrf_message_set_status_info( msg, NULL, "OK", OSRF_STATUS_OK ); - osrf_message_set_result( msg, data ); - - // Serialize the OSRF message into JSON text - char* json = jsonObjectToJSON( osrfMessageToJSON( msg )); - osrfMessageFree( msg ); - - // If the new message would overflow the buffer, flush the output buffer first - int len_so_far = buffer_length( ctx->session->outbuf ); - if( len_so_far && (strlen( json ) + len_so_far + 3 >= ctx->method->max_bundle_size )) { - if( flush_responses( ctx->session, ctx->session->outbuf )) - return -1; - } - - // Append the JSON text to the output buffer - append_msg( ctx->session->outbuf, json ); - free( json ); + char* data_str = jsonObjectToJSON(data); // free me (below) + size_t data_size = strlen(data_str); + size_t chunk_size = ctx->method->max_chunk_size; + + if (chunk_size > 0 && chunk_size < data_size) { + // chunking -- response message exceeds max message size. + // break it up into chunks for partial delivery + + int i; + for (i = 0; i < data_size; i += chunk_size) { + + osrfMessage* msg = + osrf_message_init(RESULT, ctx->request, 1); + osrf_message_set_status_info(msg, + "osrfResultPartial", + "Partial Response", + OSRF_STATUS_PARTIAL + ); + + // see how long this chunk is. If this is the last + // chunk, it will likely be less than chunk_size + int partial_size = strlen(&data_str[i]); + if (partial_size > chunk_size) + partial_size = chunk_size; + + // substr(data_str, i, partial_size) + char partial_buf[partial_size + 1]; + memcpy(partial_buf, &data_str[i], partial_size); + partial_buf[partial_size] = '\0'; + + // package the partial chunk as a JSON string object + jsonObject * partial_obj = jsonNewObject(partial_buf); + osrf_message_set_result(msg, partial_obj); + jsonObjectFree(partial_obj); + + // package the osrf message within an array then + // serialize to json for delivery + jsonObject* arr = jsonNewObject(NULL); + + // msg json freed when arr is freed + jsonObjectPush(arr, osrfMessageToJSON(msg)); + char* json = jsonObjectToJSON(arr); + + osrfSendTransportPayload(ctx->session, json); + osrfMessageFree(msg); + jsonObjectFree(arr); + free(json); + } + + // all chunks sent; send the final partial-complete msg + osrfMessage* msg = + osrf_message_init(RESULT, ctx->request, 1); + osrf_message_set_status_info(msg, + "osrfResultPartialComplete", + "Partial Response Finalized", + OSRF_STATUS_NOCONTENT + ); + + jsonObject* arr = jsonNewObject(NULL); + jsonObjectPush(arr, osrfMessageToJSON(msg)); + char* json = jsonObjectToJSON(arr); + osrfSendTransportPayload(ctx->session, json); + osrfMessageFree(msg); + jsonObjectFree(arr); + free(json); + + + } else { + + // bundling -- message body (may be) too small for single + // delivery. prepare message for bundling. + + // Create an OSRF message + osrfMessage* msg = osrf_message_init( RESULT, ctx->request, 1 ); + osrf_message_set_status_info( msg, NULL, "OK", OSRF_STATUS_OK ); + osrf_message_set_result( msg, data ); + + // Serialize the OSRF message into JSON text + char* json = jsonObjectToJSON( osrfMessageToJSON( msg )); + osrfMessageFree( msg ); + + // If the new message would overflow the buffer, flush the output buffer first + int len_so_far = buffer_length( ctx->session->outbuf ); + if( len_so_far && (strlen( json ) + len_so_far + 3 >= ctx->method->max_bundle_size )) { + if( flush_responses( ctx->session, ctx->session->outbuf )) + return -1; + } + + // Append the JSON text to the output buffer + append_msg( ctx->session->outbuf, json ); + free( json ); + } + + free(data_str); } if(complete) { diff --git a/src/libopensrf/osrf_message.c b/src/libopensrf/osrf_message.c index 9de5e0f..e7aae08 100644 --- a/src/libopensrf/osrf_message.c +++ b/src/libopensrf/osrf_message.c @@ -489,7 +489,13 @@ jsonObject* osrfMessageToJSON( const osrfMessage* msg ) { case RESULT: jsonObjectSetKey(json, "type", jsonNewObject("RESULT")); payload = jsonNewObject(NULL); - jsonObjectSetClass(payload,"osrfResult"); + char* cname = "osrfResult"; + if (msg->status_code == OSRF_STATUS_PARTIAL) { + cname = "osrfResultPartial"; + } else if (msg->status_code == OSRF_STATUS_NOCONTENT) { + cname = "osrfResultPartialComplete"; + } + jsonObjectSetClass(payload, cname); jsonObjectSetKey(payload, "status", jsonNewObject(msg->status_text)); snprintf(sc, sizeof(sc), "%d", msg->status_code); jsonObjectSetKey(payload, "statusCode", jsonNewObject(sc)); -- 2.43.2