From d51b115edfc68fe1ba6ddf862857c685afabcff4 Mon Sep 17 00:00:00 2001 From: erickson Date: Mon, 13 Dec 2010 14:53:47 +0000 Subject: [PATCH] improved select/read/write fault tolerance; cleaner and more efficient child process idle/active list management; improved logging git-svn-id: svn://svn.open-ils.org/OpenSRF/trunk@2124 9efc2488-bf62-4759-914b-345cdb29e865 --- src/python/osrf/server.py | 219 +++++++++++++++++++++++--------------- 1 file changed, 134 insertions(+), 85 deletions(-) diff --git a/src/python/osrf/server.py b/src/python/osrf/server.py index 97b4114..0255b91 100644 --- a/src/python/osrf/server.py +++ b/src/python/osrf/server.py @@ -1,5 +1,5 @@ # ----------------------------------------------------------------------- -# Copyright (C) 2008 Equinox Software, Inc. +# Copyright (C) 2008-2010 Equinox Software, Inc. # Bill Erickson # # This program is free software; you can redistribute it and/or @@ -42,10 +42,12 @@ class Controller(object): self.keepalive = 0 # how long to wait for subsequent, stateful requests self.active_list = [] # list of active children self.idle_list = [] # list of idle children + self.pid_map = {} # map of pid -> child object for faster access # Global status socketpair. All children relay their # availability info to the parent through this socketpair. self.read_status, self.write_status = socket.socketpair() + self.read_status.setblocking(0) def load_app(self): settings = osrf.set.get('activeapps.%s' % self.service) @@ -54,7 +56,7 @@ class Controller(object): def cleanup(self): ''' Closes management sockets, kills children, reaps children, exits ''' - osrf.log.log_info("Shutting down...") + osrf.log.log_info("server: shutting down...") self.cleanup_routers() self.read_status.shutdown(socket.SHUT_RDWR) @@ -67,8 +69,8 @@ class Controller(object): child.write_data.shutdown(socket.SHUT_RDWR) child.read_data.close() child.write_data.close() + os.kill(child.pid, signal.SIGKILL) - os.kill(0, signal.SIGKILL) self.reap_children(True) os._exit(0) @@ -101,90 +103,95 @@ class Controller(object): self.register_routers() try: - osrf.log.log_debug("entering main server loop...") + osrf.log.log_internal("server: entering main server loop...") + while True: # main server loop self.reap_children() self.check_status() data = self.osrf_handle.recv(-1).to_xml() + child = None + + if len(self.idle_list) > 0: + child = self.idle_list.pop() + self.active_list.append(child) + osrf.log.log_internal("server: sending data to available child %d" % child.pid) - if self.try_avail_child(data): - continue + elif self.num_children < self.max_children: + child = self.spawn_child(True) + osrf.log.log_internal("server: sending data to new child %d" % child.pid) - if self.try_new_child(data): - continue + else: + osrf.log.log_warn("server: no children available, waiting...") + child = self.check_status(True) - self.try_wait_child() + self.write_child(child, data) except KeyboardInterrupt: + osrf.log.log_info("server: exiting with keyboard interrupt") + + except Exception, e: + osrf.log.log_error("server: exiting with exception: %s" % e.message) + + finally: self.cleanup() - #except Exception, e: - #osrf.log.log_error("server exiting with exception: %s" % e.message) - #self.cleanup() - def try_avail_child(self, data): - ''' Trys to send current request data to an available child process ''' + def write_child(self, child, data): + ''' Sends data to the child process ''' + + try: + child.write_data.sendall(data) - if len(self.idle_list) == 0: + except Exception, e: + osrf.log.log_error("server: error sending data to child %d: %s" % (child.pid, str(e))) + self.cleanup_child(child.pid, True) return False - child = self.idle_list.pop(0) # remove from idle list - osrf.log.log_internal("sending data to available child %d" % child.pid) - self.write_child(child, data) - self.active_list.insert(0, child) # add to active list return True - def try_new_child(self, data): - ''' Tries to spawn a new child to send request data to ''' - - osrf.log.log_debug("try_new_child: service=%s num_children=%s max_children=%s" % (self.service, self.num_children, self.max_children)) - if self.num_children < self.max_children: - osrf.log.log_internal("spawning new child to handle data") - child = self.spawn_child(True) - self.write_child(child, data) - return True - return False - def try_wait_child(self, data): - ''' Waits for a child to become available ''' - - osrf.log.log_warn("No children available, waiting...") - child = self.check_status(True) - self.write_child(child, data) + def check_status(self, wait=False): + ''' Checks to see if any children have indicated they are done with + their current request. If wait is true, wait indefinitely + for a child to be free. ''' + ret_child = None - def write_child(self, child, data): - ''' Sends data to the child process ''' - # Do we need to watch for sigpipe, etc? - child.write_data.sendall(str(len(data)).rjust(SIZE_PAD) + data) - + if wait: + self.read_status.setblocking(1) - def check_status(self, block=False): - ''' Checks to see if any children have indicated they are done with - their current request. If block is true, this will wait - indefinitely for a child to be free. ''' + while True: + pid = None - pid = None - if block: - pid = self.read_status.recv(SIZE_PAD) - else: try: - self.read_status.setblocking(0) pid = self.read_status.recv(SIZE_PAD) + except socket.error, e: - if e.args[0] != errno.EAGAIN: - raise e - self.read_status.setblocking(1) - - if pid: - pid = int(pid) - child = [c for c in self.active_list if c.pid == pid][0] - self.active_list.remove(child) - self.idle_list.insert(0, child) - return child + if e.args[0] == errno.EAGAIN: + break # no data left to read in nonblocking mode - return None + osrf.log.log_error("server: child status check failed: %s" % str(e)) + if not wait or ret_child: + break + + finally: + if wait and ret_child: + # we've received a status update from at least + # 1 child. No need to block anymore. + self.read_status.setblocking(0) + + if pid: + child = self.pid_map[int(pid)] + osrf.log.log_internal("server: child process %d reporting for duty" % child.pid) + if wait and ret_child is None: + # caller is waiting for a free child, leave it in the active list + ret_child = child + else: + self.active_list.remove(child) + self.idle_list.append(child) + + return ret_child def reap_children(self, done=False): @@ -202,26 +209,36 @@ class Controller(object): self.spawn_children() return - osrf.log.log_debug("reaping child %d" % pid) + osrf.log.log_internal("server: cleaning up child %d" % pid) self.num_children -= 1 - - # locate the child in the active or idle list and remove it - # Note: typically, a dead child will be in the active list, since - # exiting children do not send a cleanup status to the controller - - child = [c for c in self.active_list if c.pid == pid] - if len(child) > 0: - self.active_list.remove(child[0]) - else: - child = [c for c in self.idle_list if c.pid == pid] - self.idle_list.remove(child[0]) + self.cleanup_child(pid) except OSError: return + + def cleanup_child(self, pid, kill=False): + + if kill: + os.kill(pid, signal.SIGKILL) + + # locate the child in the active or idle list and remove it + # Note: typically, a dead child will be in the active list, since + # exiting children do not send a cleanup status to the controller + + try: + self.active_list.pop(self.active_list.index(self.pid_map[pid])) + except: + try: + self.idle_list.pop(self.active_list.index(self.pid_map[pid])) + except: + pass + + del self.pid_map[pid] + + def spawn_children(self): ''' Launches up to min_children child processes ''' - osrf.log.log_debug("spawn_children: service=%s num_children=%s min_children=%s" % (self.service, self.num_children, self.min_children)) while self.num_children < self.min_children: self.spawn_child() @@ -232,20 +249,21 @@ class Controller(object): child.read_data, child.write_data = socket.socketpair() child.pid = os.fork() - if child.pid: + if child.pid: # parent process self.num_children += 1 + self.pid_map[child.pid] = child if active: - self.active_list.insert(0, child) + self.active_list.append(child) else: - self.idle_list.insert(0, child) - osrf.log.log_debug("service %s spawned child %d : %d total" % (self.service, child.pid, self.num_children)) + self.idle_list.append(child) + osrf.log.log_internal("server: %s spawned child %d : %d total" % (self.service, child.pid, self.num_children)) return child else: child.pid = os.getpid() child.init() child.run() osrf.net.get_network_handle().disconnect() - osrf.log.log_internal("child exiting...") + osrf.log.log_internal("server: child exiting...") os._exit(0) def register_routers(self): @@ -270,7 +288,7 @@ class Controller(object): def register_router(self, target): ''' Registers with a single router ''' - osrf.log.log_info("registering with router %s" % target) + osrf.log.log_info("server: registering with router %s" % target) self.routers.append(target) reg_msg = osrf.net.NetworkMessage( @@ -286,7 +304,7 @@ class Controller(object): ''' Un-registers with all connected routers ''' for target in self.routers: - osrf.log.log_info("un-registering with router %s" % target) + osrf.log.log_info("server: un-registering with router %s" % target) unreg_msg = osrf.net.NetworkMessage( recipient = target, body = 'un-registering...', @@ -310,17 +328,46 @@ class Child(object): ''' Loops, processing data, until max_requests is reached ''' while True: + try: - size = int(self.read_data.recv(SIZE_PAD) or 0) - data = self.read_data.recv(size) - osrf.log.log_internal("recv'd data " + data) + + self.read_data.setblocking(1) + data = '' + + while True: # read all the data from the socket + + buf = None + try: + buf = self.read_data.recv(2048) + except socket.error, e: + if e.args[0] == errno.EAGAIN: + break + osrf.log.log_error("server: child data read failed: %s" % str(e)) + osrf.app.Application.application.child_exit() + return + + if buf is None or buf == '': + break + + data += buf + self.read_data.setblocking(0) + + osrf.log.log_internal("server: child received message: " + data) + osrf.net.get_network_handle().flush_inbound_data() session = osrf.stack.push(osrf.net.NetworkMessage.from_xml(data)) self.keepalive_loop(session) + self.num_requests += 1 + + osrf.log.log_internal("server: child done processing message") + if self.num_requests == self.controller.max_requests: break + + # tell the parent we're done w/ this request session self.send_status() + except KeyboardInterrupt: pass @@ -335,12 +382,14 @@ class Child(object): status = session.wait(keepalive) if session.state == osrf.const.OSRF_APP_SESSION_DISCONNECTED: - osrf.log.log_internal("client sent disconnect, exiting keepalive") + osrf.log.log_internal("server: client sent disconnect, exiting keepalive") break if status is None: # no msg received before keepalive timeout expired - osrf.log.log_info("No request was received in %d seconds, exiting stateful session" % int(keepalive)); + osrf.log.log_info( + "server: no request was received in %d seconds from %s, exiting stateful session" % ( + session.remote_id, int(keepalive))); session.send_status( session.thread, -- 2.43.2