1 # -----------------------------------------------------------------------
2 # Copyright (C) 2008 Equinox Software, Inc.
3 # Bill Erickson <erickson@esilibrary.com>
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU General Public License
7 # as published by the Free Software Foundation; either version 2
8 # of the License, or (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program; if not, write to the Free Software
17 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # -----------------------------------------------------------------------
21 import os, sys, threading, fcntl, socket, errno, signal, time
22 import osrf.log, osrf.conf, osrf.net, osrf.system, osrf.stack, osrf.app, osrf.const
25 # used to define the size of the PID/size leader in
26 # status and data messages passed to and from children
29 class Controller(object):
31 OpenSRF forking request server.
34 def __init__(self, service):
35 self.service = service # service name
36 self.max_requests = 0 # max child requests
37 self.max_children = 0 # max num of child processes
38 self.min_children = 0 # min num of child processes
39 self.num_children = 0 # current num children
40 self.osrf_handle = None # xmpp handle
41 self.routers = [] # list of registered routers
42 self.keepalive = 0 # how long to wait for subsequent, stateful requests
43 self.active_list = [] # list of active children
44 self.idle_list = [] # list of idle children
46 # Global status socketpair. All children relay their
47 # availability info to the parent through this socketpair.
48 self.read_status, self.write_status = socket.socketpair()
51 settings = osrf.set.get('activeapps.%s' % self.service)
55 ''' Closes management sockets, kills children, reaps children, exits '''
57 osrf.log.log_info("Shutting down...")
58 self.cleanup_routers()
60 self.read_status.shutdown(socket.SHUT_RDWR)
61 self.write_status.shutdown(socket.SHUT_RDWR)
62 self.read_status.close()
63 self.write_status.close()
65 for child in self.idle_list + self.active_list:
66 child.read_data.shutdown(socket.SHUT_RDWR)
67 child.write_data.shutdown(socket.SHUT_RDWR)
68 child.read_data.close()
69 child.write_data.close()
71 os.kill(0, signal.SIGKILL)
72 self.reap_children(True)
76 def handle_signals(self):
77 ''' Installs SIGINT and SIGTERM handlers '''
78 def handler(signum, frame):
80 signal.signal(signal.SIGINT, handler)
81 signal.signal(signal.SIGTERM, handler)
86 osrf.net.get_network_handle().disconnect()
87 osrf.net.clear_network_handle()
91 time.sleep(.5) # give children a chance to connect before we start taking data
92 self.osrf_handle = osrf.system.System.net_connect(
93 resource = '%s_listener' % self.service,
94 service = self.service
97 # clear the recv callback so inbound messages do not filter through the opensrf stack
98 self.osrf_handle.receive_callback = None
100 # connect to our listening routers
101 self.register_routers()
104 osrf.log.log_debug("entering main server loop...")
105 while True: # main server loop
109 data = self.osrf_handle.recv(-1).to_xml()
111 if self.try_avail_child(data):
114 if self.try_new_child(data):
117 self.try_wait_child()
119 except KeyboardInterrupt:
121 #except Exception, e:
122 #osrf.log.log_error("server exiting with exception: %s" % e.message)
126 def try_avail_child(self, data):
127 ''' Trys to send current request data to an available child process '''
129 if len(self.idle_list) == 0:
132 child = self.idle_list.pop(0) # remove from idle list
133 osrf.log.log_internal("sending data to available child %d" % child.pid)
134 self.write_child(child, data)
135 self.active_list.insert(0, child) # add to active list
138 def try_new_child(self, data):
139 ''' Tries to spawn a new child to send request data to '''
141 osrf.log.log_debug("try_new_child: service=%s num_children=%s max_children=%s" % (self.service, self.num_children, self.max_children))
142 if self.num_children < self.max_children:
143 osrf.log.log_internal("spawning new child to handle data")
144 child = self.spawn_child(True)
145 self.write_child(child, data)
149 def try_wait_child(self, data):
150 ''' Waits for a child to become available '''
152 osrf.log.log_warn("No children available, waiting...")
153 child = self.check_status(True)
154 self.write_child(child, data)
157 def write_child(self, child, data):
158 ''' Sends data to the child process '''
159 # Do we need to watch for sigpipe, etc?
160 child.write_data.sendall(str(len(data)).rjust(SIZE_PAD) + data)
163 def check_status(self, block=False):
164 ''' Checks to see if any children have indicated they are done with
165 their current request. If block is true, this will wait
166 indefinitely for a child to be free. '''
170 pid = self.read_status.recv(SIZE_PAD)
173 self.read_status.setblocking(0)
174 pid = self.read_status.recv(SIZE_PAD)
175 except socket.error, e:
176 if e.args[0] != errno.EAGAIN:
178 self.read_status.setblocking(1)
182 child = [c for c in self.active_list if c.pid == pid][0]
183 self.active_list.remove(child)
184 self.idle_list.insert(0, child)
190 def reap_children(self, done=False):
191 ''' Uses waitpid() to reap the children. If necessary, new children are spawned '''
199 (pid, status) = os.waitpid(0, options)
202 self.spawn_children()
205 osrf.log.log_debug("reaping child %d" % pid)
206 self.num_children -= 1
208 # locate the child in the active or idle list and remove it
209 # Note: typically, a dead child will be in the active list, since
210 # exiting children do not send a cleanup status to the controller
212 child = [c for c in self.active_list if c.pid == pid]
214 self.active_list.remove(child[0])
216 child = [c for c in self.idle_list if c.pid == pid]
217 self.idle_list.remove(child[0])
222 def spawn_children(self):
223 ''' Launches up to min_children child processes '''
224 osrf.log.log_debug("spawn_children: service=%s num_children=%s min_children=%s" % (self.service, self.num_children, self.min_children))
225 while self.num_children < self.min_children:
228 def spawn_child(self, active=False):
229 ''' Spawns a new child process '''
232 child.read_data, child.write_data = socket.socketpair()
233 child.pid = os.fork()
236 self.num_children += 1
238 self.active_list.insert(0, child)
240 self.idle_list.insert(0, child)
241 osrf.log.log_debug("service %s spawned child %d : %d total" % (self.service, child.pid, self.num_children))
244 child.pid = os.getpid()
247 osrf.net.get_network_handle().disconnect()
248 osrf.log.log_internal("child exiting...")
251 def register_routers(self):
252 ''' Registers this application instance with all configured routers '''
253 routers = osrf.conf.get('routers.router')
255 if not isinstance(routers, list):
258 for router in routers:
259 if isinstance(router, dict):
260 if not 'services' in router or \
261 self.service in router['services']['service']:
262 target = "%s@%s/router" % (router['name'], router['domain'])
263 self.register_router(target)
265 router_name = osrf.conf.get('router_name')
266 target = "%s@%s/router" % (router_name, router)
267 self.register_router(target)
270 def register_router(self, target):
271 ''' Registers with a single router '''
273 osrf.log.log_info("registering with router %s" % target)
274 self.routers.append(target)
276 reg_msg = osrf.net.NetworkMessage(
278 body = 'registering...',
279 router_command = 'register',
280 router_class = self.service
283 self.osrf_handle.send(reg_msg)
285 def cleanup_routers(self):
286 ''' Un-registers with all connected routers '''
288 for target in self.routers:
289 osrf.log.log_info("un-registering with router %s" % target)
290 unreg_msg = osrf.net.NetworkMessage(
292 body = 'un-registering...',
293 router_command = 'unregister',
294 router_class = self.service
296 self.osrf_handle.send(unreg_msg)
300 ''' Models a single child process '''
302 def __init__(self, controller):
303 self.controller = controller # our Controller object
304 self.num_requests = 0 # how many requests we've served so far
305 self.read_data = None # the child reads data from the controller on this socket
306 self.write_data = None # the controller sends data to the child on this socket
307 self.pid = 0 # my process id
310 ''' Loops, processing data, until max_requests is reached '''
314 size = int(self.read_data.recv(SIZE_PAD) or 0)
315 data = self.read_data.recv(size)
316 osrf.log.log_internal("recv'd data " + data)
317 osrf.net.get_network_handle().flush_inbound_data()
318 session = osrf.stack.push(osrf.net.NetworkMessage.from_xml(data))
319 self.keepalive_loop(session)
320 self.num_requests += 1
321 if self.num_requests == self.controller.max_requests:
324 except KeyboardInterrupt:
327 # run the exit handler
328 osrf.app.Application.application.child_exit()
330 def keepalive_loop(self, session):
331 keepalive = self.controller.keepalive
333 while session.state == osrf.const.OSRF_APP_SESSION_CONNECTED:
335 status = session.wait(keepalive)
337 if session.state == osrf.const.OSRF_APP_SESSION_DISCONNECTED:
338 osrf.log.log_internal("client sent disconnect, exiting keepalive")
341 if status is None: # no msg received before keepalive timeout expired
343 osrf.log.log_info("No request was received in %d seconds, exiting stateful session" % int(keepalive));
347 osrf.net_obj.NetworkObject.osrfConnectStatus({
348 'status' : 'Disconnected on timeout',
349 'statusCode': osrf.const.OSRF_STATUS_TIMEOUT
358 def send_status(self):
359 ''' Informs the controller that we are done processing this request '''
360 fcntl.lockf(self.controller.write_status.fileno(), fcntl.LOCK_EX)
362 self.controller.write_status.sendall(str(self.pid).rjust(SIZE_PAD))
364 fcntl.lockf(self.controller.write_status.fileno(), fcntl.LOCK_UN)
367 ''' Connects the opensrf xmpp handle '''
368 osrf.net.clear_network_handle()
369 osrf.system.System.net_connect(
370 resource = '%s_drone' % self.controller.service,
371 service = self.controller.service
373 osrf.app.Application.application.child_init()