Clean up Python server implementation, guided by pylint
[OpenSRF.git] / src / python / osrf / server.py
1 """
2 Implements an OpenSRF forking request server
3 """
4 # -----------------------------------------------------------------------
5 # Copyright (C) 2008-2010  Equinox Software, Inc.
6 # Bill Erickson <erickson@esilibrary.com>
7 #
8 # This program is free software; you can redistribute it and/or
9 # modify it under the terms of the GNU General Public License
10 # as published by the Free Software Foundation; either version 2
11 # of the License, or (at your option) any later version.
12 #
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 # GNU General Public License for more details.
17 #
18 # You should have received a copy of the GNU General Public License
19 # along with this program; if not, write to the Free Software
20 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  
21 # 02110-1301, USA
22 # -----------------------------------------------------------------------
23
24 import os, sys, fcntl, socket, errno, signal, time
25 import osrf.log, osrf.conf, osrf.net, osrf.system
26 import osrf.stack, osrf.app, osrf.const
27
28
29 # used to define the size of the PID/size leader in 
30 # status and data messages passed to and from children
31 SIZE_PAD = 12
32
33 class Controller(object):
34     ''' 
35         OpenSRF forking request server.  
36     '''
37
38     def __init__(self, service):
39         '''Initialize the Controller object'''
40         self.service = service # service name
41         self.max_requests = 0 # max child requests
42         self.max_children = 0 # max num of child processes
43         self.min_children = 0 # min num of child processes
44         self.num_children = 0 # current num children
45         self.osrf_handle = None # xmpp handle
46         self.routers = [] # list of registered routers
47         self.keepalive = 0 # how long to wait for subsequent, stateful requests
48         self.active_list = [] # list of active children
49         self.idle_list = [] # list of idle children
50         self.pid_map = {} # map of pid -> child object for faster access
51
52         # Global status socketpair.  All children relay their 
53         # availability info to the parent through this socketpair. 
54         self.read_status, self.write_status = socket.socketpair()
55         self.read_status.setblocking(0)
56
57     def cleanup(self):
58         ''' Closes management sockets, kills children, reaps children, exits '''
59
60         osrf.log.log_info("server: shutting down...")
61         self.cleanup_routers()
62
63         self.read_status.shutdown(socket.SHUT_RDWR)
64         self.write_status.shutdown(socket.SHUT_RDWR)
65         self.read_status.close()
66         self.write_status.close()
67
68         for child in self.idle_list + self.active_list:
69             child.read_data.shutdown(socket.SHUT_RDWR)
70             child.write_data.shutdown(socket.SHUT_RDWR)
71             child.read_data.close()
72             child.write_data.close()
73             os.kill(child.pid, signal.SIGKILL)
74
75         self.reap_children(True)
76         os._exit(0)
77
78
79     def handle_signals(self):
80         ''' Installs SIGINT and SIGTERM handlers '''
81
82         def handler(signum, frame):
83             ''' Handler implementation '''
84             self.cleanup()
85
86         signal.signal(signal.SIGINT, handler)
87         signal.signal(signal.SIGTERM, handler)
88
89
90     def run(self):
91         ''' Run the OpenSRF service, spawning and reaping children '''
92
93         osrf.net.get_network_handle().disconnect()
94         osrf.net.clear_network_handle()
95         self.spawn_children()
96         self.handle_signals()
97
98         # give children a chance to connect before we start taking data
99         time.sleep(.5)
100         self.osrf_handle = osrf.system.System.net_connect(
101             resource = '%s_listener' % self.service,
102             service = self.service
103         )
104
105         # clear the recv callback so inbound messages do not filter 
106         # through the opensrf stack
107         self.osrf_handle.receive_callback = None
108
109         # connect to our listening routers
110         self.register_routers()
111
112         try:
113             osrf.log.log_internal("server: entering main server loop...")
114
115             while True: # main server loop
116
117                 self.reap_children()
118                 self.check_status()
119                 data = self.osrf_handle.recv(-1).to_xml()
120                 child = None
121
122                 if len(self.idle_list) > 0:
123                     child = self.idle_list.pop() 
124                     self.active_list.append(child)
125                     osrf.log.log_internal(
126                         "server: sending data to available child %d" % child.pid
127                     )
128
129                 elif self.num_children < self.max_children:
130                     child = self.spawn_child(True)
131                     osrf.log.log_internal(
132                         "server: sending data to new child %d" % child.pid
133                     )
134
135                 else:
136                     osrf.log.log_warn("server: no children available, \
137 waiting... consider increasing max_children for this application higher than \
138 %d in the  OpenSRF configuration if this message occurs frequently" \
139                         % self.max_children)
140                     child = self.check_status(True)
141
142                 self.write_child(child, data)
143
144         except KeyboardInterrupt:
145             osrf.log.log_info("server: exiting with keyboard interrupt")
146
147         except Exception, exc: 
148             osrf.log.log_error(
149                 "server: exiting with exception: %s" % exc.message
150             )
151
152         finally:
153             self.cleanup()
154                 
155
156     def write_child(self, child, data):
157         ''' Sends data to the child process '''
158
159         try:
160             child.write_data.sendall(data)
161
162         except Exception, ex:
163             osrf.log.log_error(
164                 "server: error sending data to child %d: %s" 
165                 % (child.pid, str(ex))
166             )
167             self.cleanup_child(child.pid, True)
168             return False
169
170         return True
171
172
173     def check_status(self, wait=False):
174         ''' Checks to see if any children have indicated they are done with 
175             their current request.  If wait is true, wait indefinitely 
176             for a child to be free. '''
177
178         ret_child = None
179
180         if wait:
181             self.read_status.setblocking(1)
182
183         while True:
184             pid = None
185
186             try:
187                 pid = self.read_status.recv(SIZE_PAD)
188
189             except socket.error, exc:
190                 if exc.args[0] == errno.EAGAIN:
191                     break # no data left to read in nonblocking mode
192
193                 osrf.log.log_error(
194                     "server: child status check failed: %s" % str(exc)
195                 )
196                 if not wait or ret_child:
197                     break
198
199             finally:
200                 if wait and ret_child:
201                     # we've received a status update from at least 
202                     # 1 child.  No need to block anymore.
203                     self.read_status.setblocking(0)
204
205             if pid:
206                 child = self.pid_map[int(pid)]
207                 osrf.log.log_internal(
208                     "server: child process %d reporting for duty" % child.pid
209                 )
210                 if wait and ret_child is None:
211                     # caller is waiting for a free child;
212                     # leave it in the active list
213                     ret_child = child
214                 else:
215                     self.active_list.remove(child)
216                     self.idle_list.append(child)
217
218         return ret_child
219         
220
221     def reap_children(self, done=False):
222         '''
223         Uses waitpid() to reap the children. If necessary, spawns new children.
224         '''
225
226         options = 0
227         if not done: 
228             options = os.WNOHANG 
229
230         while True:
231             try:
232                 (pid, status) = os.waitpid(0, options)
233                 if pid == 0:
234                     if not done:
235                         self.spawn_children()
236                     return
237
238                 osrf.log.log_internal("server: cleaning up child %d" % pid)
239                 self.num_children -= 1
240                 self.cleanup_child(pid)
241
242             except OSError:
243                 return
244
245     def cleanup_child(self, pid, kill=False):
246         '''
247         Removes the child from the active or idle list.
248
249         Kills the process if requested.
250         '''
251
252         if kill:
253             os.kill(pid, signal.SIGKILL)
254
255         # locate the child in the active or idle list and remove it
256         # Note: typically, a dead child will be in the active list, since 
257         # exiting children do not send a cleanup status to the controller
258
259         try:
260             self.active_list.pop(self.active_list.index(self.pid_map[pid]))
261         except:
262             try:
263                 self.idle_list.pop(self.active_list.index(self.pid_map[pid]))
264             except:
265                 pass
266
267         del self.pid_map[pid]
268
269             
270         
271     def spawn_children(self):
272         ''' Launches up to min_children child processes '''
273         while self.num_children < self.min_children:
274             self.spawn_child()
275
276     def spawn_child(self, active=False):
277         ''' Spawns a new child process '''
278
279         child = Child(self)
280         child.read_data, child.write_data = socket.socketpair()
281         child.pid = os.fork()
282         sys.stdin.close()
283         sys.stdin = open(os.devnull, 'r')
284         sys.stdout.close()
285         sys.stdout = open(os.devnull, 'w')
286         sys.stderr.close()
287         sys.stderr = open(os.devnull, 'w')
288
289
290         if child.pid:
291             self.num_children += 1
292             self.pid_map[child.pid] = child
293             if active:
294                 self.active_list.append(child)
295             else:
296                 self.idle_list.append(child)
297             osrf.log.log_internal(
298                 "server: %s spawned child %d : %d total" 
299                 % (self.service, child.pid, self.num_children)
300             )
301             return child
302         else:
303             child.pid = os.getpid()
304             child.init()
305             child.run()
306             osrf.net.get_network_handle().disconnect()
307             osrf.log.log_internal("server: child exiting...")
308             os._exit(0)
309
310     def register_routers(self):
311         ''' Registers this application instance with all configured routers '''
312         routers = osrf.conf.get('routers.router')
313
314         if not isinstance(routers, list):
315             routers = [routers]
316
317         for router in routers:
318             if isinstance(router, dict):
319                 if not 'services' in router or \
320                         self.service in router['services']['service']:
321                     target = "%s@%s/router" % (router['name'], router['domain'])
322                     self.register_router(target)
323             else:
324                 router_name = osrf.conf.get('router_name')
325                 target = "%s@%s/router" % (router_name, router)
326                 self.register_router(target)
327
328
329     def register_router(self, target):
330         ''' Registers with a single router '''
331
332         osrf.log.log_info("server: registering with router %s" % target)
333         self.routers.append(target)
334
335         reg_msg = osrf.net.NetworkMessage(
336             recipient = target,
337             body = 'registering...',
338             router_command = 'register',
339             router_class = self.service
340         )
341
342         self.osrf_handle.send(reg_msg)
343
344     def cleanup_routers(self):
345         ''' Un-registers with all connected routers '''
346
347         for target in self.routers:
348             osrf.log.log_info("server: un-registering with router %s" % target)
349             unreg_msg = osrf.net.NetworkMessage(
350                 recipient = target,
351                 body = 'un-registering...',
352                 router_command = 'unregister',
353                 router_class = self.service
354             )
355             self.osrf_handle.send(unreg_msg)
356         
357
358 class Child(object):
359     ''' Models a single child process '''
360
361     def __init__(self, controller):
362         ''' Initializes child process instance '''
363
364         # our Controller object
365         self.controller = controller
366
367         # how many requests we've served so far
368         self.num_requests = 0
369
370         # the child reads data from the controller on this socket
371         self.read_data = None
372
373         # the controller sends data to the child on this socket 
374         self.write_data = None
375
376         # my process id
377         self.pid = 0
378
379     def run(self):
380         ''' Loops, processing data, until max_requests is reached '''
381
382         while True:
383
384             try:
385
386                 self.read_data.setblocking(1)
387                 data = ''
388
389                 while True: # read all the data from the socket
390
391                     buf = None
392                     try:
393                         buf = self.read_data.recv(2048)
394                     except socket.error, exc:
395                         if exc.args[0] == errno.EAGAIN:
396                             break
397                         osrf.log.log_error(
398                             "server: child data read failed: %s" % str(exc)
399                         )
400                         osrf.app.Application.application.child_exit()
401                         return
402
403                     if buf is None or buf == '':
404                         break
405
406                     data += buf
407                     self.read_data.setblocking(0)
408
409                 osrf.log.log_internal("server: child received message: " + data)
410
411                 osrf.net.get_network_handle().flush_inbound_data()
412                 session = osrf.stack.push(
413                     osrf.net.NetworkMessage.from_xml(data)
414                 )
415                 self.keepalive_loop(session)
416
417                 self.num_requests += 1
418
419                 osrf.log.log_internal("server: child done processing message")
420
421                 if self.num_requests == self.controller.max_requests:
422                     break
423
424                 # tell the parent we're done w/ this request session
425                 self.send_status()
426
427             except KeyboardInterrupt:
428                 pass
429
430         # run the exit handler
431         osrf.app.Application.application.child_exit()
432
433     def keepalive_loop(self, session):
434         '''
435         Keeps session alive while client is connected.
436
437         If timeout occurs, session disconnects and gets cleaned up.
438         '''
439         keepalive = self.controller.keepalive
440
441         while session.state == osrf.const.OSRF_APP_SESSION_CONNECTED:
442
443             status = session.wait(keepalive)
444
445             if session.state == osrf.const.OSRF_APP_SESSION_DISCONNECTED:
446                 osrf.log.log_internal(
447                     "server: client sent disconnect, exiting keepalive"
448                 )
449                 break
450
451             # if no msg received before keepalive timeout expired
452             if status is None:
453
454                 osrf.log.log_info(
455                     "server: no request was received in %d seconds from %s, "
456                     "exiting stateful session"
457                     % (session.remote_id, int(keepalive))
458                 )
459
460                 session.send_status(
461                     session.thread, 
462                     osrf.net_obj.NetworkObject.osrfConnectStatus({   
463                         'status' : 'Disconnected on timeout',
464                         'statusCode': osrf.const.OSRF_STATUS_TIMEOUT
465                     })
466                 )
467
468                 break
469
470         session.cleanup()
471         return
472
473     def send_status(self):
474         ''' Informs the controller that we are done processing this request '''
475         fcntl.lockf(self.controller.write_status.fileno(), fcntl.LOCK_EX)
476         try:
477             self.controller.write_status.sendall(str(self.pid).rjust(SIZE_PAD))
478         finally:
479             fcntl.lockf(self.controller.write_status.fileno(), fcntl.LOCK_UN)
480
481     def init(self):
482         ''' Connects the opensrf xmpp handle '''
483         osrf.net.clear_network_handle()
484         osrf.system.System.net_connect(
485             resource = '%s_drone' % self.controller.service, 
486             service = self.controller.service
487         )
488         osrf.app.Application.application.child_init()
489