From f8dac6be690c103a29133695fee2c092ee8993c4 Mon Sep 17 00:00:00 2001 From: erickson Date: Tue, 19 May 2009 14:12:40 +0000 Subject: [PATCH] added final bits for stateful sessions (drone keepalive) git-svn-id: svn://svn.open-ils.org/OpenSRF/trunk@1703 9efc2488-bf62-4759-914b-345cdb29e865 --- src/python/osrf/server.py | 35 +++++++++++++++++++++++++++++++++-- src/python/osrf/ses.py | 3 +++ src/python/osrf/stack.py | 4 +++- 3 files changed, 39 insertions(+), 3 deletions(-) diff --git a/src/python/osrf/server.py b/src/python/osrf/server.py index 10f4d66..6f2307d 100644 --- a/src/python/osrf/server.py +++ b/src/python/osrf/server.py @@ -19,7 +19,7 @@ # ----------------------------------------------------------------------- import os, sys, threading, logging, fcntl, socket, errno, signal, time -import osrf.log, osrf.conf, osrf.net, osrf.system, osrf.stack, osrf.app +import osrf.log, osrf.conf, osrf.net, osrf.system, osrf.stack, osrf.app, osrf.const # used to define the size of the PID/size leader in @@ -42,6 +42,7 @@ class Controller(object): self.children = [] # list of children self.osrf_handle = None # xmpp handle self.routers = [] # list of registered routers + self.keepalive = 0 # how long to wait for subsequent, stateful requests # Global status socketpair. All children relay their # availability info to the parent through this socketpair. @@ -296,7 +297,8 @@ class Child(object): size = int(self.read_data.recv(SIZE_PAD) or 0) data = self.read_data.recv(size) osrf.log.log_internal("recv'd data " + data) - osrf.stack.push(osrf.net.NetworkMessage.from_xml(data)) + session = osrf.stack.push(osrf.net.NetworkMessage.from_xml(data)) + self.keepalive_loop(session) self.num_requests += 1 if self.num_requests == self.controller.max_requests: break @@ -306,6 +308,35 @@ class Child(object): # run the exit handler osrf.app.Application.application.child_exit() + def keepalive_loop(self, session): + keepalive = self.controller.keepalive + + while session.state == osrf.const.OSRF_APP_SESSION_CONNECTED: + + start = time.time() + session.wait(keepalive) + end = time.time() + + if session.state == osrf.const.OSRF_APP_SESSION_DISCONNECTED: + osrf.log.log_internal("client sent disconnect, exiting keepalive") + break + + if (end - start) >= keepalive: # exceeded keepalive timeout + + osrf.log.log_info("No request was received in %d seconds, exiting stateful session" % int(keepalive)); + + session.send_status( + session.thread, + osrf.net_obj.NetworkObject.osrfConnectStatus({ + 'status' : 'Disconnected on timeout', + 'statusCode': osrf.const.OSRF_STATUS_TIMEOUT + }) + ) + + break + + return + def send_status(self): ''' Informs the controller that we are done processing this request ''' fcntl.lockf(self.controller.write_status.fileno(), fcntl.LOCK_EX) diff --git a/src/python/osrf/ses.py b/src/python/osrf/ses.py index 63c3a35..09182a0 100644 --- a/src/python/osrf/ses.py +++ b/src/python/osrf/ses.py @@ -46,6 +46,7 @@ class Session(object): self.thread = None self.service = None + @staticmethod def find_or_create(thread): if thread in Session.session_cache: @@ -114,6 +115,7 @@ class ClientSession(Session): # cache this session in the global session cache Session.session_cache[self.thread] = self + def reset_request_timeout(self, rid): req = self.find_request(rid) if req: @@ -334,6 +336,7 @@ class ServerSession(Session): def __init__(self, thread): Session.__init__(self) self.thread = thread + Session.session_cache[thread] = self def send_status(self, thread_trace, payload): self.send( diff --git a/src/python/osrf/stack.py b/src/python/osrf/stack.py index 3426ada..417b431 100644 --- a/src/python/osrf/stack.py +++ b/src/python/osrf/stack.py @@ -36,6 +36,8 @@ def push(net_msg): if isinstance(ses, osrf.ses.ServerSession): osrf.log.log_info("Message processing duration %f" % duration) + return ses + def handle_message(session, message): osrf.log.log_internal("handle_message(): processing message of " @@ -104,7 +106,7 @@ def handle_server(session, message): if message.type() == osrf.const.OSRF_MESSAGE_TYPE_CONNECT: osrf.log.log_debug("server received CONNECT from %s" % session.remote_id) - session.state == osrf.const.OSRF_APP_SESSION_CONNECTED + session.state = osrf.const.OSRF_APP_SESSION_CONNECTED session.send_connect_ok(message.threadTrace()) return -- 2.43.2