0255b916c64c7da4c0432a80fdabaa93ab47ee21
[OpenSRF.git] / src / python / osrf / server.py
1 # -----------------------------------------------------------------------
2 # Copyright (C) 2008-2010  Equinox Software, Inc.
3 # Bill Erickson <erickson@esilibrary.com>
4 #
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU General Public License
7 # as published by the Free Software Foundation; either version 2
8 # of the License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program; if not, write to the Free Software
17 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  
18 # 02110-1301, USA
19 # -----------------------------------------------------------------------
20
21 import os, sys, threading, fcntl, socket, errno, signal, time
22 import osrf.log, osrf.conf, osrf.net, osrf.system, osrf.stack, osrf.app, osrf.const
23
24
25 # used to define the size of the PID/size leader in 
26 # status and data messages passed to and from children
27 SIZE_PAD = 12
28
29 class Controller(object):
30     ''' 
31         OpenSRF forking request server.  
32     '''
33
34     def __init__(self, service):
35         self.service = service # service name
36         self.max_requests = 0 # max child requests
37         self.max_children = 0 # max num of child processes
38         self.min_children = 0 # min num of child processes
39         self.num_children = 0 # current num children
40         self.osrf_handle = None # xmpp handle
41         self.routers = [] # list of registered routers
42         self.keepalive = 0 # how long to wait for subsequent, stateful requests
43         self.active_list = [] # list of active children
44         self.idle_list = [] # list of idle children
45         self.pid_map = {} # map of pid -> child object for faster access
46
47         # Global status socketpair.  All children relay their 
48         # availability info to the parent through this socketpair. 
49         self.read_status, self.write_status = socket.socketpair()
50         self.read_status.setblocking(0)
51
52     def load_app(self):
53         settings = osrf.set.get('activeapps.%s' % self.service)
54         
55
56     def cleanup(self):
57         ''' Closes management sockets, kills children, reaps children, exits '''
58
59         osrf.log.log_info("server: shutting down...")
60         self.cleanup_routers()
61
62         self.read_status.shutdown(socket.SHUT_RDWR)
63         self.write_status.shutdown(socket.SHUT_RDWR)
64         self.read_status.close()
65         self.write_status.close()
66
67         for child in self.idle_list + self.active_list:
68             child.read_data.shutdown(socket.SHUT_RDWR)
69             child.write_data.shutdown(socket.SHUT_RDWR)
70             child.read_data.close()
71             child.write_data.close()
72             os.kill(child.pid, signal.SIGKILL)
73
74         self.reap_children(True)
75         os._exit(0)
76
77
78     def handle_signals(self):
79         ''' Installs SIGINT and SIGTERM handlers '''
80         def handler(signum, frame):
81             self.cleanup()
82         signal.signal(signal.SIGINT, handler)
83         signal.signal(signal.SIGTERM, handler)
84
85
86     def run(self):
87
88         osrf.net.get_network_handle().disconnect()
89         osrf.net.clear_network_handle()
90         self.spawn_children()
91         self.handle_signals()
92
93         time.sleep(.5) # give children a chance to connect before we start taking data
94         self.osrf_handle = osrf.system.System.net_connect(
95             resource = '%s_listener' % self.service,
96             service = self.service
97         )
98
99         # clear the recv callback so inbound messages do not filter through the opensrf stack
100         self.osrf_handle.receive_callback = None
101
102         # connect to our listening routers
103         self.register_routers()
104
105         try:
106             osrf.log.log_internal("server: entering main server loop...")
107
108             while True: # main server loop
109
110                 self.reap_children()
111                 self.check_status()
112                 data = self.osrf_handle.recv(-1).to_xml()
113                 child = None
114
115                 if len(self.idle_list) > 0:
116                     child = self.idle_list.pop() 
117                     self.active_list.append(child)
118                     osrf.log.log_internal("server: sending data to available child %d" % child.pid)
119
120                 elif self.num_children < self.max_children:
121                     child = self.spawn_child(True)
122                     osrf.log.log_internal("server: sending data to new child %d" % child.pid)
123
124                 else:
125                     osrf.log.log_warn("server: no children available, waiting...")
126                     child = self.check_status(True)
127
128                 self.write_child(child, data)
129
130         except KeyboardInterrupt:
131             osrf.log.log_info("server: exiting with keyboard interrupt")
132
133         except Exception, e: 
134             osrf.log.log_error("server: exiting with exception: %s" % e.message)
135
136         finally:
137             self.cleanup()
138                 
139
140     def write_child(self, child, data):
141         ''' Sends data to the child process '''
142
143         try:
144             child.write_data.sendall(data)
145
146         except Exception, e:
147             osrf.log.log_error("server: error sending data to child %d: %s" % (child.pid, str(e)))
148             self.cleanup_child(child.pid, True)
149             return False
150
151         return True
152
153
154     def check_status(self, wait=False):
155         ''' Checks to see if any children have indicated they are done with 
156             their current request.  If wait is true, wait indefinitely 
157             for a child to be free. '''
158
159         ret_child = None
160
161         if wait:
162             self.read_status.setblocking(1)
163
164         while True:
165             pid = None
166
167             try:
168                 pid = self.read_status.recv(SIZE_PAD)
169
170             except socket.error, e:
171                 if e.args[0] == errno.EAGAIN:
172                     break # no data left to read in nonblocking mode
173
174                 osrf.log.log_error("server: child status check failed: %s" % str(e))
175                 if not wait or ret_child:
176                     break
177
178             finally:
179                 if wait and ret_child:
180                     # we've received a status update from at least 
181                     # 1 child.  No need to block anymore.
182                     self.read_status.setblocking(0)
183
184             if pid:
185                 child = self.pid_map[int(pid)]
186                 osrf.log.log_internal("server: child process %d reporting for duty" % child.pid)
187                 if wait and ret_child is None:
188                     # caller is waiting for a free child, leave it in the active list
189                     ret_child = child
190                 else:
191                     self.active_list.remove(child)
192                     self.idle_list.append(child)
193
194         return ret_child
195         
196
197     def reap_children(self, done=False):
198         ''' Uses waitpid() to reap the children.  If necessary, new children are spawned '''
199
200         options = 0
201         if not done: 
202             options = os.WNOHANG 
203
204         while True:
205             try:
206                 (pid, status) = os.waitpid(0, options)
207                 if pid == 0:
208                     if not done:
209                         self.spawn_children()
210                     return
211
212                 osrf.log.log_internal("server: cleaning up child %d" % pid)
213                 self.num_children -= 1
214                 self.cleanup_child(pid)
215
216             except OSError:
217                 return
218
219     def cleanup_child(self, pid, kill=False):
220
221         if kill:
222             os.kill(pid, signal.SIGKILL)
223
224         # locate the child in the active or idle list and remove it
225         # Note: typically, a dead child will be in the active list, since 
226         # exiting children do not send a cleanup status to the controller
227
228         try:
229             self.active_list.pop(self.active_list.index(self.pid_map[pid]))
230         except:
231             try:
232                 self.idle_list.pop(self.active_list.index(self.pid_map[pid]))
233             except:
234                 pass
235
236         del self.pid_map[pid]
237
238             
239         
240     def spawn_children(self):
241         ''' Launches up to min_children child processes '''
242         while self.num_children < self.min_children:
243             self.spawn_child()
244
245     def spawn_child(self, active=False):
246         ''' Spawns a new child process '''
247
248         child = Child(self)
249         child.read_data, child.write_data = socket.socketpair()
250         child.pid = os.fork()
251
252         if child.pid: # parent process
253             self.num_children += 1
254             self.pid_map[child.pid] = child
255             if active:
256                 self.active_list.append(child)
257             else:
258                 self.idle_list.append(child)
259             osrf.log.log_internal("server: %s spawned child %d : %d total" % (self.service, child.pid, self.num_children))
260             return child
261         else:
262             child.pid = os.getpid()
263             child.init()
264             child.run()
265             osrf.net.get_network_handle().disconnect()
266             osrf.log.log_internal("server: child exiting...")
267             os._exit(0)
268
269     def register_routers(self):
270         ''' Registers this application instance with all configured routers '''
271         routers = osrf.conf.get('routers.router')
272
273         if not isinstance(routers, list):
274             routers = [routers]
275
276         for router in routers:
277             if isinstance(router, dict):
278                 if not 'services' in router or \
279                         self.service in router['services']['service']:
280                     target = "%s@%s/router" % (router['name'], router['domain'])
281                     self.register_router(target)
282             else:
283                 router_name = osrf.conf.get('router_name')
284                 target = "%s@%s/router" % (router_name, router)
285                 self.register_router(target)
286
287
288     def register_router(self, target):
289         ''' Registers with a single router '''
290
291         osrf.log.log_info("server: registering with router %s" % target)
292         self.routers.append(target)
293
294         reg_msg = osrf.net.NetworkMessage(
295             recipient = target,
296             body = 'registering...',
297             router_command = 'register',
298             router_class = self.service
299         )
300
301         self.osrf_handle.send(reg_msg)
302
303     def cleanup_routers(self):
304         ''' Un-registers with all connected routers '''
305
306         for target in self.routers:
307             osrf.log.log_info("server: un-registering with router %s" % target)
308             unreg_msg = osrf.net.NetworkMessage(
309                 recipient = target,
310                 body = 'un-registering...',
311                 router_command = 'unregister',
312                 router_class = self.service
313             )
314             self.osrf_handle.send(unreg_msg)
315         
316
317 class Child(object):
318     ''' Models a single child process '''
319
320     def __init__(self, controller):
321         self.controller = controller # our Controller object
322         self.num_requests = 0 # how many requests we've served so far
323         self.read_data = None # the child reads data from the controller on this socket
324         self.write_data = None # the controller sends data to the child on this socket 
325         self.pid = 0 # my process id
326
327     def run(self):
328         ''' Loops, processing data, until max_requests is reached '''
329
330         while True:
331
332             try:
333
334                 self.read_data.setblocking(1)
335                 data = ''
336
337                 while True: # read all the data from the socket
338
339                     buf = None
340                     try:
341                         buf = self.read_data.recv(2048)
342                     except socket.error, e:
343                         if e.args[0] == errno.EAGAIN:
344                             break
345                         osrf.log.log_error("server: child data read failed: %s" % str(e))
346                         osrf.app.Application.application.child_exit()
347                         return
348
349                     if buf is None or buf == '':
350                         break
351
352                     data += buf
353                     self.read_data.setblocking(0)
354
355                 osrf.log.log_internal("server: child received message: " + data)
356
357                 osrf.net.get_network_handle().flush_inbound_data()
358                 session = osrf.stack.push(osrf.net.NetworkMessage.from_xml(data))
359                 self.keepalive_loop(session)
360
361                 self.num_requests += 1
362
363                 osrf.log.log_internal("server: child done processing message")
364
365                 if self.num_requests == self.controller.max_requests:
366                     break
367
368                 # tell the parent we're done w/ this request session
369                 self.send_status()
370
371             except KeyboardInterrupt:
372                 pass
373
374         # run the exit handler
375         osrf.app.Application.application.child_exit()
376
377     def keepalive_loop(self, session):
378         keepalive = self.controller.keepalive
379
380         while session.state == osrf.const.OSRF_APP_SESSION_CONNECTED:
381
382             status = session.wait(keepalive)
383
384             if session.state == osrf.const.OSRF_APP_SESSION_DISCONNECTED:
385                 osrf.log.log_internal("server: client sent disconnect, exiting keepalive")
386                 break
387
388             if status is None: # no msg received before keepalive timeout expired
389
390                 osrf.log.log_info(
391                     "server: no request was received in %d seconds from %s, exiting stateful session" % (
392                     session.remote_id, int(keepalive)));
393
394                 session.send_status(
395                     session.thread, 
396                     osrf.net_obj.NetworkObject.osrfConnectStatus({   
397                         'status' : 'Disconnected on timeout',
398                         'statusCode': osrf.const.OSRF_STATUS_TIMEOUT
399                     })
400                 )
401
402                 break
403
404         session.cleanup()
405         return
406
407     def send_status(self):
408         ''' Informs the controller that we are done processing this request '''
409         fcntl.lockf(self.controller.write_status.fileno(), fcntl.LOCK_EX)
410         try:
411             self.controller.write_status.sendall(str(self.pid).rjust(SIZE_PAD))
412         finally:
413             fcntl.lockf(self.controller.write_status.fileno(), fcntl.LOCK_UN)
414
415     def init(self):
416         ''' Connects the opensrf xmpp handle '''
417         osrf.net.clear_network_handle()
418         osrf.system.System.net_connect(
419             resource = '%s_drone' % self.controller.service, 
420             service = self.controller.service
421         )
422         osrf.app.Application.application.child_init()
423