1 # -----------------------------------------------------------------------
2 # Copyright (C) 2008-2010 Equinox Software, Inc.
3 # Bill Erickson <erickson@esilibrary.com>
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU General Public License
7 # as published by the Free Software Foundation; either version 2
8 # of the License, or (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program; if not, write to the Free Software
17 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # -----------------------------------------------------------------------
21 import os, sys, threading, fcntl, socket, errno, signal, time
22 import osrf.log, osrf.conf, osrf.net, osrf.system, osrf.stack, osrf.app, osrf.const
25 # used to define the size of the PID/size leader in
26 # status and data messages passed to and from children
29 class Controller(object):
31 OpenSRF forking request server.
34 def __init__(self, service):
35 self.service = service # service name
36 self.max_requests = 0 # max child requests
37 self.max_children = 0 # max num of child processes
38 self.min_children = 0 # min num of child processes
39 self.num_children = 0 # current num children
40 self.osrf_handle = None # xmpp handle
41 self.routers = [] # list of registered routers
42 self.keepalive = 0 # how long to wait for subsequent, stateful requests
43 self.active_list = [] # list of active children
44 self.idle_list = [] # list of idle children
45 self.pid_map = {} # map of pid -> child object for faster access
47 # Global status socketpair. All children relay their
48 # availability info to the parent through this socketpair.
49 self.read_status, self.write_status = socket.socketpair()
50 self.read_status.setblocking(0)
53 settings = osrf.set.get('activeapps.%s' % self.service)
57 ''' Closes management sockets, kills children, reaps children, exits '''
59 osrf.log.log_info("server: shutting down...")
60 self.cleanup_routers()
62 self.read_status.shutdown(socket.SHUT_RDWR)
63 self.write_status.shutdown(socket.SHUT_RDWR)
64 self.read_status.close()
65 self.write_status.close()
67 for child in self.idle_list + self.active_list:
68 child.read_data.shutdown(socket.SHUT_RDWR)
69 child.write_data.shutdown(socket.SHUT_RDWR)
70 child.read_data.close()
71 child.write_data.close()
72 os.kill(child.pid, signal.SIGKILL)
74 self.reap_children(True)
78 def handle_signals(self):
79 ''' Installs SIGINT and SIGTERM handlers '''
80 def handler(signum, frame):
82 signal.signal(signal.SIGINT, handler)
83 signal.signal(signal.SIGTERM, handler)
88 osrf.net.get_network_handle().disconnect()
89 osrf.net.clear_network_handle()
93 time.sleep(.5) # give children a chance to connect before we start taking data
94 self.osrf_handle = osrf.system.System.net_connect(
95 resource = '%s_listener' % self.service,
96 service = self.service
99 # clear the recv callback so inbound messages do not filter through the opensrf stack
100 self.osrf_handle.receive_callback = None
102 # connect to our listening routers
103 self.register_routers()
106 osrf.log.log_internal("server: entering main server loop...")
108 while True: # main server loop
112 data = self.osrf_handle.recv(-1).to_xml()
115 if len(self.idle_list) > 0:
116 child = self.idle_list.pop()
117 self.active_list.append(child)
118 osrf.log.log_internal("server: sending data to available child %d" % child.pid)
120 elif self.num_children < self.max_children:
121 child = self.spawn_child(True)
122 osrf.log.log_internal("server: sending data to new child %d" % child.pid)
125 osrf.log.log_warn("server: no children available, waiting...")
126 child = self.check_status(True)
128 self.write_child(child, data)
130 except KeyboardInterrupt:
131 osrf.log.log_info("server: exiting with keyboard interrupt")
134 osrf.log.log_error("server: exiting with exception: %s" % e.message)
140 def write_child(self, child, data):
141 ''' Sends data to the child process '''
144 child.write_data.sendall(data)
147 osrf.log.log_error("server: error sending data to child %d: %s" % (child.pid, str(e)))
148 self.cleanup_child(child.pid, True)
154 def check_status(self, wait=False):
155 ''' Checks to see if any children have indicated they are done with
156 their current request. If wait is true, wait indefinitely
157 for a child to be free. '''
162 self.read_status.setblocking(1)
168 pid = self.read_status.recv(SIZE_PAD)
170 except socket.error, e:
171 if e.args[0] == errno.EAGAIN:
172 break # no data left to read in nonblocking mode
174 osrf.log.log_error("server: child status check failed: %s" % str(e))
175 if not wait or ret_child:
179 if wait and ret_child:
180 # we've received a status update from at least
181 # 1 child. No need to block anymore.
182 self.read_status.setblocking(0)
185 child = self.pid_map[int(pid)]
186 osrf.log.log_internal("server: child process %d reporting for duty" % child.pid)
187 if wait and ret_child is None:
188 # caller is waiting for a free child, leave it in the active list
191 self.active_list.remove(child)
192 self.idle_list.append(child)
197 def reap_children(self, done=False):
198 ''' Uses waitpid() to reap the children. If necessary, new children are spawned '''
206 (pid, status) = os.waitpid(0, options)
209 self.spawn_children()
212 osrf.log.log_internal("server: cleaning up child %d" % pid)
213 self.num_children -= 1
214 self.cleanup_child(pid)
219 def cleanup_child(self, pid, kill=False):
222 os.kill(pid, signal.SIGKILL)
224 # locate the child in the active or idle list and remove it
225 # Note: typically, a dead child will be in the active list, since
226 # exiting children do not send a cleanup status to the controller
229 self.active_list.pop(self.active_list.index(self.pid_map[pid]))
232 self.idle_list.pop(self.active_list.index(self.pid_map[pid]))
236 del self.pid_map[pid]
240 def spawn_children(self):
241 ''' Launches up to min_children child processes '''
242 while self.num_children < self.min_children:
245 def spawn_child(self, active=False):
246 ''' Spawns a new child process '''
249 child.read_data, child.write_data = socket.socketpair()
250 child.pid = os.fork()
252 if child.pid: # parent process
253 self.num_children += 1
254 self.pid_map[child.pid] = child
256 self.active_list.append(child)
258 self.idle_list.append(child)
259 osrf.log.log_internal("server: %s spawned child %d : %d total" % (self.service, child.pid, self.num_children))
262 child.pid = os.getpid()
265 osrf.net.get_network_handle().disconnect()
266 osrf.log.log_internal("server: child exiting...")
269 def register_routers(self):
270 ''' Registers this application instance with all configured routers '''
271 routers = osrf.conf.get('routers.router')
273 if not isinstance(routers, list):
276 for router in routers:
277 if isinstance(router, dict):
278 if not 'services' in router or \
279 self.service in router['services']['service']:
280 target = "%s@%s/router" % (router['name'], router['domain'])
281 self.register_router(target)
283 router_name = osrf.conf.get('router_name')
284 target = "%s@%s/router" % (router_name, router)
285 self.register_router(target)
288 def register_router(self, target):
289 ''' Registers with a single router '''
291 osrf.log.log_info("server: registering with router %s" % target)
292 self.routers.append(target)
294 reg_msg = osrf.net.NetworkMessage(
296 body = 'registering...',
297 router_command = 'register',
298 router_class = self.service
301 self.osrf_handle.send(reg_msg)
303 def cleanup_routers(self):
304 ''' Un-registers with all connected routers '''
306 for target in self.routers:
307 osrf.log.log_info("server: un-registering with router %s" % target)
308 unreg_msg = osrf.net.NetworkMessage(
310 body = 'un-registering...',
311 router_command = 'unregister',
312 router_class = self.service
314 self.osrf_handle.send(unreg_msg)
318 ''' Models a single child process '''
320 def __init__(self, controller):
321 self.controller = controller # our Controller object
322 self.num_requests = 0 # how many requests we've served so far
323 self.read_data = None # the child reads data from the controller on this socket
324 self.write_data = None # the controller sends data to the child on this socket
325 self.pid = 0 # my process id
328 ''' Loops, processing data, until max_requests is reached '''
334 self.read_data.setblocking(1)
337 while True: # read all the data from the socket
341 buf = self.read_data.recv(2048)
342 except socket.error, e:
343 if e.args[0] == errno.EAGAIN:
345 osrf.log.log_error("server: child data read failed: %s" % str(e))
346 osrf.app.Application.application.child_exit()
349 if buf is None or buf == '':
353 self.read_data.setblocking(0)
355 osrf.log.log_internal("server: child received message: " + data)
357 osrf.net.get_network_handle().flush_inbound_data()
358 session = osrf.stack.push(osrf.net.NetworkMessage.from_xml(data))
359 self.keepalive_loop(session)
361 self.num_requests += 1
363 osrf.log.log_internal("server: child done processing message")
365 if self.num_requests == self.controller.max_requests:
368 # tell the parent we're done w/ this request session
371 except KeyboardInterrupt:
374 # run the exit handler
375 osrf.app.Application.application.child_exit()
377 def keepalive_loop(self, session):
378 keepalive = self.controller.keepalive
380 while session.state == osrf.const.OSRF_APP_SESSION_CONNECTED:
382 status = session.wait(keepalive)
384 if session.state == osrf.const.OSRF_APP_SESSION_DISCONNECTED:
385 osrf.log.log_internal("server: client sent disconnect, exiting keepalive")
388 if status is None: # no msg received before keepalive timeout expired
391 "server: no request was received in %d seconds from %s, exiting stateful session" % (
392 session.remote_id, int(keepalive)));
396 osrf.net_obj.NetworkObject.osrfConnectStatus({
397 'status' : 'Disconnected on timeout',
398 'statusCode': osrf.const.OSRF_STATUS_TIMEOUT
407 def send_status(self):
408 ''' Informs the controller that we are done processing this request '''
409 fcntl.lockf(self.controller.write_status.fileno(), fcntl.LOCK_EX)
411 self.controller.write_status.sendall(str(self.pid).rjust(SIZE_PAD))
413 fcntl.lockf(self.controller.write_status.fileno(), fcntl.LOCK_UN)
416 ''' Connects the opensrf xmpp handle '''
417 osrf.net.clear_network_handle()
418 osrf.system.System.net_connect(
419 resource = '%s_drone' % self.controller.service,
420 service = self.controller.service
422 osrf.app.Application.application.child_init()