2 Implements an OpenSRF forking request server
4 # -----------------------------------------------------------------------
5 # Copyright (C) 2008-2010 Equinox Software, Inc.
6 # Bill Erickson <erickson@esilibrary.com>
8 # This program is free software; you can redistribute it and/or
9 # modify it under the terms of the GNU General Public License
10 # as published by the Free Software Foundation; either version 2
11 # of the License, or (at your option) any later version.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
18 # You should have received a copy of the GNU General Public License
19 # along with this program; if not, write to the Free Software
20 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 # -----------------------------------------------------------------------
24 import os, sys, fcntl, socket, errno, signal, time
25 import osrf.log, osrf.conf, osrf.net, osrf.system
26 import osrf.stack, osrf.app, osrf.const
29 # used to define the size of the PID/size leader in
30 # status and data messages passed to and from children
33 class Controller(object):
35 OpenSRF forking request server.
38 def __init__(self, service):
39 '''Initialize the Controller object'''
40 self.service = service # service name
41 self.max_requests = 0 # max child requests
42 self.max_children = 0 # max num of child processes
43 self.min_children = 0 # min num of child processes
44 self.num_children = 0 # current num children
45 self.osrf_handle = None # xmpp handle
46 self.routers = [] # list of registered routers
47 self.keepalive = 0 # how long to wait for subsequent, stateful requests
48 self.active_list = [] # list of active children
49 self.idle_list = [] # list of idle children
50 self.pid_map = {} # map of pid -> child object for faster access
52 # Global status socketpair. All children relay their
53 # availability info to the parent through this socketpair.
54 self.read_status, self.write_status = socket.socketpair()
55 self.read_status.setblocking(0)
58 ''' Closes management sockets, kills children, reaps children, exits '''
60 osrf.log.log_info("server: shutting down...")
61 self.cleanup_routers()
63 self.read_status.shutdown(socket.SHUT_RDWR)
64 self.write_status.shutdown(socket.SHUT_RDWR)
65 self.read_status.close()
66 self.write_status.close()
68 for child in self.idle_list + self.active_list:
69 child.read_data.shutdown(socket.SHUT_RDWR)
70 child.write_data.shutdown(socket.SHUT_RDWR)
71 child.read_data.close()
72 child.write_data.close()
73 os.kill(child.pid, signal.SIGKILL)
75 self.reap_children(True)
79 def handle_signals(self):
80 ''' Installs SIGINT and SIGTERM handlers '''
82 def handler(signum, frame):
83 ''' Handler implementation '''
86 signal.signal(signal.SIGINT, handler)
87 signal.signal(signal.SIGTERM, handler)
91 ''' Run the OpenSRF service, spawning and reaping children '''
93 osrf.net.get_network_handle().disconnect()
94 osrf.net.clear_network_handle()
98 # give children a chance to connect before we start taking data
100 self.osrf_handle = osrf.system.System.net_connect(
101 resource = '%s_listener' % self.service,
102 service = self.service
105 # clear the recv callback so inbound messages do not filter
106 # through the opensrf stack
107 self.osrf_handle.receive_callback = None
109 # connect to our listening routers
110 self.register_routers()
113 osrf.log.log_internal("server: entering main server loop...")
115 while True: # main server loop
119 data = self.osrf_handle.recv(-1).to_xml()
122 if len(self.idle_list) > 0:
123 child = self.idle_list.pop()
124 self.active_list.append(child)
125 osrf.log.log_internal(
126 "server: sending data to available child %d" % child.pid
129 elif self.num_children < self.max_children:
130 child = self.spawn_child(True)
131 osrf.log.log_internal(
132 "server: sending data to new child %d" % child.pid
136 osrf.log.log_warn("server: no children available, \
137 waiting... consider increasing max_children for this application higher than \
138 %d in the OpenSRF configuration if this message occurs frequently" \
140 child = self.check_status(True)
142 self.write_child(child, data)
144 except KeyboardInterrupt:
145 osrf.log.log_info("server: exiting with keyboard interrupt")
147 except Exception, exc:
149 "server: exiting with exception: %s" % exc.message
156 def write_child(self, child, data):
157 ''' Sends data to the child process '''
160 child.write_data.sendall(data)
162 except Exception, ex:
164 "server: error sending data to child %d: %s"
165 % (child.pid, str(ex))
167 self.cleanup_child(child.pid, True)
173 def check_status(self, wait=False):
174 ''' Checks to see if any children have indicated they are done with
175 their current request. If wait is true, wait indefinitely
176 for a child to be free. '''
181 self.read_status.setblocking(1)
187 pid = self.read_status.recv(SIZE_PAD)
189 except socket.error, exc:
190 if exc.args[0] == errno.EAGAIN:
191 break # no data left to read in nonblocking mode
194 "server: child status check failed: %s" % str(exc)
196 if not wait or ret_child:
200 if wait and ret_child:
201 # we've received a status update from at least
202 # 1 child. No need to block anymore.
203 self.read_status.setblocking(0)
206 child = self.pid_map[int(pid)]
207 osrf.log.log_internal(
208 "server: child process %d reporting for duty" % child.pid
210 if wait and ret_child is None:
211 # caller is waiting for a free child;
212 # leave it in the active list
215 self.active_list.remove(child)
216 self.idle_list.append(child)
221 def reap_children(self, done=False):
223 Uses waitpid() to reap the children. If necessary, spawns new children.
232 (pid, status) = os.waitpid(0, options)
235 self.spawn_children()
238 osrf.log.log_internal("server: cleaning up child %d" % pid)
239 self.num_children -= 1
240 self.cleanup_child(pid)
245 def cleanup_child(self, pid, kill=False):
247 Removes the child from the active or idle list.
249 Kills the process if requested.
253 os.kill(pid, signal.SIGKILL)
255 # locate the child in the active or idle list and remove it
256 # Note: typically, a dead child will be in the active list, since
257 # exiting children do not send a cleanup status to the controller
260 self.active_list.pop(self.active_list.index(self.pid_map[pid]))
263 self.idle_list.pop(self.active_list.index(self.pid_map[pid]))
267 del self.pid_map[pid]
271 def spawn_children(self):
272 ''' Launches up to min_children child processes '''
273 while self.num_children < self.min_children:
276 def spawn_child(self, active=False):
277 ''' Spawns a new child process '''
280 child.read_data, child.write_data = socket.socketpair()
281 child.pid = os.fork()
283 sys.stdin = open(os.devnull, 'r')
285 sys.stdout = open(os.devnull, 'w')
287 sys.stderr = open(os.devnull, 'w')
291 self.num_children += 1
292 self.pid_map[child.pid] = child
294 self.active_list.append(child)
296 self.idle_list.append(child)
297 osrf.log.log_internal(
298 "server: %s spawned child %d : %d total"
299 % (self.service, child.pid, self.num_children)
303 child.pid = os.getpid()
306 osrf.net.get_network_handle().disconnect()
307 osrf.log.log_internal("server: child exiting...")
310 def register_routers(self):
311 ''' Registers this application instance with all configured routers '''
312 routers = osrf.conf.get('routers.router')
314 if not isinstance(routers, list):
317 for router in routers:
318 if isinstance(router, dict):
319 if not 'services' in router or \
320 self.service in router['services']['service']:
321 target = "%s@%s/router" % (router['name'], router['domain'])
322 self.register_router(target)
324 router_name = osrf.conf.get('router_name')
325 target = "%s@%s/router" % (router_name, router)
326 self.register_router(target)
329 def register_router(self, target):
330 ''' Registers with a single router '''
332 osrf.log.log_info("server: registering with router %s" % target)
333 self.routers.append(target)
335 reg_msg = osrf.net.NetworkMessage(
337 body = 'registering...',
338 router_command = 'register',
339 router_class = self.service
342 self.osrf_handle.send(reg_msg)
344 def cleanup_routers(self):
345 ''' Un-registers with all connected routers '''
347 for target in self.routers:
348 osrf.log.log_info("server: un-registering with router %s" % target)
349 unreg_msg = osrf.net.NetworkMessage(
351 body = 'un-registering...',
352 router_command = 'unregister',
353 router_class = self.service
355 self.osrf_handle.send(unreg_msg)
359 ''' Models a single child process '''
361 def __init__(self, controller):
362 ''' Initializes child process instance '''
364 # our Controller object
365 self.controller = controller
367 # how many requests we've served so far
368 self.num_requests = 0
370 # the child reads data from the controller on this socket
371 self.read_data = None
373 # the controller sends data to the child on this socket
374 self.write_data = None
380 ''' Loops, processing data, until max_requests is reached '''
386 self.read_data.setblocking(1)
389 while True: # read all the data from the socket
393 buf = self.read_data.recv(2048)
394 except socket.error, exc:
395 if exc.args[0] == errno.EAGAIN:
398 "server: child data read failed: %s" % str(exc)
400 osrf.app.Application.application.child_exit()
403 if buf is None or buf == '':
407 self.read_data.setblocking(0)
409 osrf.log.log_internal("server: child received message: " + data)
411 osrf.net.get_network_handle().flush_inbound_data()
412 session = osrf.stack.push(
413 osrf.net.NetworkMessage.from_xml(data)
415 self.keepalive_loop(session)
417 self.num_requests += 1
419 osrf.log.log_internal("server: child done processing message")
421 if self.num_requests == self.controller.max_requests:
424 # tell the parent we're done w/ this request session
427 except KeyboardInterrupt:
430 # run the exit handler
431 osrf.app.Application.application.child_exit()
433 def keepalive_loop(self, session):
435 Keeps session alive while client is connected.
437 If timeout occurs, session disconnects and gets cleaned up.
439 keepalive = self.controller.keepalive
441 while session.state == osrf.const.OSRF_APP_SESSION_CONNECTED:
443 status = session.wait(keepalive)
445 if session.state == osrf.const.OSRF_APP_SESSION_DISCONNECTED:
446 osrf.log.log_internal(
447 "server: client sent disconnect, exiting keepalive"
451 # if no msg received before keepalive timeout expired
455 "server: no request was received in %d seconds from %s, "
456 "exiting stateful session"
457 % (session.remote_id, int(keepalive))
462 osrf.net_obj.NetworkObject.osrfConnectStatus({
463 'status' : 'Disconnected on timeout',
464 'statusCode': osrf.const.OSRF_STATUS_TIMEOUT
473 def send_status(self):
474 ''' Informs the controller that we are done processing this request '''
475 fcntl.lockf(self.controller.write_status.fileno(), fcntl.LOCK_EX)
477 self.controller.write_status.sendall(str(self.pid).rjust(SIZE_PAD))
479 fcntl.lockf(self.controller.write_status.fileno(), fcntl.LOCK_UN)
482 ''' Connects the opensrf xmpp handle '''
483 osrf.net.clear_network_handle()
484 osrf.system.System.net_connect(
485 resource = '%s_drone' % self.controller.service,
486 service = self.controller.service
488 osrf.app.Application.application.child_init()