similar to recent opensrf-c changes, keep active and idle child procs in separate...
[OpenSRF.git] / src / python / osrf / server.py
index a1ca9b9..ea89756 100644 (file)
@@ -33,16 +33,15 @@ class Controller(object):
 
     def __init__(self, service):
         self.service = service # service name
-        self.application = None # the application we're serving
         self.max_requests = 0 # max child requests
         self.max_children = 0 # max num of child processes
         self.min_childen = 0 # min num of child processes
         self.num_children = 0 # current num children
-        self.child_idx = 0 # current index into the children array
-        self.children = [] # list of children
         self.osrf_handle = None # xmpp handle
         self.routers = [] # list of registered routers
         self.keepalive = 0 # how long to wait for subsequent, stateful requests
+        self.active_list = [] # list of active children
+        self.idle_list = [] # list of idle children
 
         # Global status socketpair.  All children relay their 
         # availability info to the parent through this socketpair. 
@@ -63,7 +62,7 @@ class Controller(object):
         self.read_status.close()
         self.write_status.close()
 
-        for child in self.children:
+        for child in self.idle_list + self.active_list:
             child.read_data.shutdown(socket.SHUT_RDWR)
             child.write_data.shutdown(socket.SHUT_RDWR)
             child.read_data.close()
@@ -126,33 +125,29 @@ class Controller(object):
 
     def try_avail_child(self, data):
         ''' Trys to send current request data to an available child process '''
-        ctr = 0
-        while ctr < self.num_children:
 
-            if self.child_idx >= self.num_children:
-                self.child_idx = 0
-            child = self.children[self.child_idx]
+        if len(self.idle_list) == 0:
+            return False
 
-            if child.available:
-                osrf.log.log_internal("sending data to available child")
-                self.write_child(child, data)
-                return True
-
-            ctr += 1
-            self.child_idx += 1
-        return False
+        child = self.idle_list.pop(0) # remove from idle list
+        osrf.log.log_internal("sending data to available child %d" % child.pid)
+        self.write_child(child, data)
+        self.active_list.insert(0, child) # add to active list
+        return True
 
     def try_new_child(self, data):
         ''' Tries to spawn a new child to send request data to '''
+
         if self.num_children < self.max_children:
             osrf.log.log_internal("spawning new child to handle data")
-            child = self.spawn_child()
+            child = self.spawn_child(True)
             self.write_child(child, data)
             return True
         return False
 
     def try_wait_child(self, data):
         ''' Waits for a child to become available '''
+
         osrf.log.log_warn("No children available, waiting...")
         child = self.check_status(True)
         self.write_child(child, data)
@@ -160,9 +155,8 @@ class Controller(object):
 
     def write_child(self, child, data):
         ''' Sends data to the child process '''
-        child.available = False
+        # Do we need to watch for sigpipe, etc?
         child.write_data.sendall(str(len(data)).rjust(SIZE_PAD) + data)
-        self.child_idx += 1
 
 
     def check_status(self, block=False):
@@ -171,7 +165,6 @@ class Controller(object):
             indefinitely for a child to be free. '''
 
         pid = None
-        child = None
         if block:
             pid = self.read_status.recv(SIZE_PAD)
         else:
@@ -185,14 +178,17 @@ class Controller(object):
                 
         if pid:
             pid = int(pid)
-            child = [c for c in self.children if c.pid == pid][0]
-            child.available = True
+            child = [c for c in self.active_list if c.pid == pid][0]
+            self.active_list.remove(child)
+            self.idle_list.insert(0, child)
+            return child
 
-        return child
+        return None
         
 
     def reap_children(self, done=False):
         ''' Uses waitpid() to reap the children.  If necessary, new children are spawned '''
+
         options = 0
         if not done: 
             options = os.WNOHANG 
@@ -204,9 +200,21 @@ class Controller(object):
                     if not done:
                         self.spawn_children()
                     return
+
                 osrf.log.log_debug("reaping child %d" % pid)
                 self.num_children -= 1
-                self.children = [c for c in self.children if c.pid != pid]
+
+                # locate the child in the active or idle list and remove it
+                # Note: typically, a dead child will be in the active list, since 
+                # exiting children do not send a cleanup status to the controller
+
+                child = [c for c in self.active_list if c.pid == pid]
+                if len(child) > 0:
+                    self.active_list.remove(child[0])
+                else:
+                    child = [c for c in self.idle_list if c.pid == pid]
+                    self.idle_list.remove(child[0])
+
             except OSError:
                 return
         
@@ -215,7 +223,7 @@ class Controller(object):
         while self.num_children < self.min_children:
             self.spawn_child()
 
-    def spawn_child(self):
+    def spawn_child(self, active=False):
         ''' Spawns a new child process '''
 
         child = Child(self)
@@ -224,7 +232,10 @@ class Controller(object):
 
         if child.pid:
             self.num_children += 1
-            self.children.append(child)
+            if active:
+                self.active_list.insert(0, child)
+            else:
+                self.idle_list.insert(0, child)
             osrf.log.log_debug("spawned child %d : %d total" % (child.pid, self.num_children))
             return child
         else:
@@ -256,6 +267,7 @@ class Controller(object):
 
     def register_router(self, target):
         ''' Registers with a single router '''
+
         osrf.log.log_info("registering with router %s" % target)
         self.routers.append(target)
 
@@ -270,6 +282,7 @@ class Controller(object):
 
     def cleanup_routers(self):
         ''' Un-registers with all connected routers '''
+
         for target in self.routers:
             osrf.log.log_info("un-registering with router %s" % target)
             unreg_msg = osrf.net.NetworkMessage(
@@ -289,12 +302,11 @@ class Child(object):
         self.num_requests = 0 # how many requests we've served so far
         self.read_data = None # the child reads data from the controller on this socket
         self.write_data = None # the controller sends data to the child on this socket 
-        self.available = True # true if this child is not currently serving a request
         self.pid = 0 # my process id
 
-
     def run(self):
         ''' Loops, processing data, until max_requests is reached '''
+
         while True:
             try:
                 size = int(self.read_data.recv(SIZE_PAD) or 0)
@@ -309,6 +321,7 @@ class Child(object):
                 self.send_status()
             except KeyboardInterrupt:
                 pass
+
         # run the exit handler
         osrf.app.Application.application.child_exit()