]> git.evergreen-ils.org Git - OpenSRF.git/blob - src/python/osrf/server.py
call cleanup instead of running the death callback directory when server session...
[OpenSRF.git] / src / python / osrf / server.py
1 # -----------------------------------------------------------------------
2 # Copyright (C) 2008  Equinox Software, Inc.
3 # Bill Erickson <erickson@esilibrary.com>
4 #
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU General Public License
7 # as published by the Free Software Foundation; either version 2
8 # of the License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program; if not, write to the Free Software
17 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  
18 # 02110-1301, USA
19 # -----------------------------------------------------------------------
20
21 import os, sys, threading, fcntl, socket, errno, signal, time
22 import osrf.log, osrf.conf, osrf.net, osrf.system, osrf.stack, osrf.app, osrf.const
23
24
25 # used to define the size of the PID/size leader in 
26 # status and data messages passed to and from children
27 SIZE_PAD = 12
28
29 class Controller(object):
30     ''' 
31         OpenSRF forking request server.  
32     '''
33
34     def __init__(self, service):
35         self.service = service # service name
36         self.application = None # the application we're serving
37         self.max_requests = 0 # max child requests
38         self.max_children = 0 # max num of child processes
39         self.min_childen = 0 # min num of child processes
40         self.num_children = 0 # current num children
41         self.child_idx = 0 # current index into the children array
42         self.children = [] # list of children
43         self.osrf_handle = None # xmpp handle
44         self.routers = [] # list of registered routers
45         self.keepalive = 0 # how long to wait for subsequent, stateful requests
46
47         # Global status socketpair.  All children relay their 
48         # availability info to the parent through this socketpair. 
49         self.read_status, self.write_status = socket.socketpair()
50
51     def load_app(self):
52         settings = osrf.set.get('activeapps.%s' % self.service)
53         
54
55     def cleanup(self):
56         ''' Closes management sockets, kills children, reaps children, exits '''
57
58         osrf.log.log_info("Shutting down...")
59         self.cleanup_routers()
60
61         self.read_status.shutdown(socket.SHUT_RDWR)
62         self.write_status.shutdown(socket.SHUT_RDWR)
63         self.read_status.close()
64         self.write_status.close()
65
66         for child in self.children:
67             child.read_data.shutdown(socket.SHUT_RDWR)
68             child.write_data.shutdown(socket.SHUT_RDWR)
69             child.read_data.close()
70             child.write_data.close()
71
72         os.kill(0, signal.SIGKILL)
73         self.reap_children(True)
74         os._exit(0)
75
76
77     def handle_signals(self):
78         ''' Installs SIGINT and SIGTERM handlers '''
79         def handler(signum, frame):
80             self.cleanup()
81         signal.signal(signal.SIGINT, handler)
82         signal.signal(signal.SIGTERM, handler)
83
84
85     def run(self):
86
87         osrf.net.get_network_handle().disconnect()
88         osrf.net.clear_network_handle()
89         self.spawn_children()
90         self.handle_signals()
91
92         time.sleep(.5) # give children a chance to connect before we start taking data
93         self.osrf_handle = osrf.system.System.net_connect(
94             resource = '%s_listener' % self.service,
95             service = self.service
96         )
97
98         # clear the recv callback so inbound messages do not filter through the opensrf stack
99         self.osrf_handle.receive_callback = None
100
101         # connect to our listening routers
102         self.register_routers()
103
104         try:
105             osrf.log.log_debug("entering main server loop...")
106             while True: # main server loop
107
108                 self.reap_children()
109                 self.check_status()
110                 data = self.osrf_handle.recv(-1).to_xml()
111
112                 if self.try_avail_child(data):
113                     continue
114
115                 if self.try_new_child(data):
116                     continue
117
118                 self.wait_for_child()
119
120         except KeyboardInterrupt:
121             self.cleanup()
122         #except Exception, e: 
123             #osrf.log.log_error("server exiting with exception: %s" % e.message)
124             #self.cleanup()
125                 
126
127     def try_avail_child(self, data):
128         ''' Trys to send current request data to an available child process '''
129         ctr = 0
130         while ctr < self.num_children:
131
132             if self.child_idx >= self.num_children:
133                 self.child_idx = 0
134             child = self.children[self.child_idx]
135
136             if child.available:
137                 osrf.log.log_internal("sending data to available child")
138                 self.write_child(child, data)
139                 return True
140
141             ctr += 1
142             self.child_idx += 1
143         return False
144
145     def try_new_child(self, data):
146         ''' Tries to spawn a new child to send request data to '''
147         if self.num_children < self.max_children:
148             osrf.log.log_internal("spawning new child to handle data")
149             child = self.spawn_child()
150             self.write_child(child, data)
151             return True
152         return False
153
154     def try_wait_child(self, data):
155         ''' Waits for a child to become available '''
156         osrf.log.log_warn("No children available, waiting...")
157         child = self.check_status(True)
158         self.write_child(child, data)
159
160
161     def write_child(self, child, data):
162         ''' Sends data to the child process '''
163         child.available = False
164         child.write_data.sendall(str(len(data)).rjust(SIZE_PAD) + data)
165         self.child_idx += 1
166
167
168     def check_status(self, block=False):
169         ''' Checks to see if any children have indicated they are done with 
170             their current request.  If block is true, this will wait 
171             indefinitely for a child to be free. '''
172
173         pid = None
174         child = None
175         if block:
176             pid = self.read_status.recv(SIZE_PAD)
177         else:
178             try:
179                 self.read_status.setblocking(0)
180                 pid = self.read_status.recv(SIZE_PAD)
181             except socket.error, e:
182                 if e.args[0] != errno.EAGAIN:
183                     raise e
184             self.read_status.setblocking(1)
185                 
186         if pid:
187             pid = int(pid)
188             child = [c for c in self.children if c.pid == pid][0]
189             child.available = True
190
191         return child
192         
193
194     def reap_children(self, done=False):
195         ''' Uses waitpid() to reap the children.  If necessary, new children are spawned '''
196         options = 0
197         if not done: 
198             options = os.WNOHANG 
199
200         while True:
201             try:
202                 (pid, status) = os.waitpid(0, options)
203                 if pid == 0:
204                     if not done:
205                         self.spawn_children()
206                     return
207                 osrf.log.log_debug("reaping child %d" % pid)
208                 self.num_children -= 1
209                 self.children = [c for c in self.children if c.pid != pid]
210             except OSError:
211                 return
212         
213     def spawn_children(self):
214         ''' Launches up to min_children child processes '''
215         while self.num_children < self.min_children:
216             self.spawn_child()
217
218     def spawn_child(self):
219         ''' Spawns a new child process '''
220
221         child = Child(self)
222         child.read_data, child.write_data = socket.socketpair()
223         child.pid = os.fork()
224
225         if child.pid:
226             self.num_children += 1
227             self.children.append(child)
228             osrf.log.log_debug("spawned child %d : %d total" % (child.pid, self.num_children))
229             return child
230         else:
231             child.pid = os.getpid()
232             child.init()
233             child.run()
234             osrf.net.get_network_handle().disconnect()
235             osrf.log.log_internal("child exiting...")
236             os._exit(0)
237
238     def register_routers(self):
239         ''' Registers this application instance with all configured routers '''
240         routers = osrf.conf.get('routers.router')
241
242         if not isinstance(routers, list):
243             routers = [routers]
244
245         for router in routers:
246             if isinstance(router, dict):
247                 if not 'services' in router or \
248                         self.service in router['services']['service']:
249                     target = "%s@%s/router" % (router['name'], router['domain'])
250                     self.register_router(target)
251             else:
252                 router_name = osrf.conf.get('router_name')
253                 target = "%s@%s/router" % (router_name, router)
254                 self.register_router(target)
255
256
257     def register_router(self, target):
258         ''' Registers with a single router '''
259         osrf.log.log_info("registering with router %s" % target)
260         self.routers.append(target)
261
262         reg_msg = osrf.net.NetworkMessage(
263             recipient = target,
264             body = 'registering...',
265             router_command = 'register',
266             router_class = self.service
267         )
268
269         self.osrf_handle.send(reg_msg)
270
271     def cleanup_routers(self):
272         ''' Un-registers with all connected routers '''
273         for target in self.routers:
274             osrf.log.log_info("un-registering with router %s" % target)
275             unreg_msg = osrf.net.NetworkMessage(
276                 recipient = target,
277                 body = 'un-registering...',
278                 router_command = 'unregister',
279                 router_class = self.service
280             )
281             self.osrf_handle.send(unreg_msg)
282         
283
284 class Child(object):
285     ''' Models a single child process '''
286
287     def __init__(self, controller):
288         self.controller = controller # our Controller object
289         self.num_requests = 0 # how many requests we've served so far
290         self.read_data = None # the child reads data from the controller on this socket
291         self.write_data = None # the controller sends data to the child on this socket 
292         self.available = True # true if this child is not currently serving a request
293         self.pid = 0 # my process id
294
295
296     def run(self):
297         ''' Loops, processing data, until max_requests is reached '''
298         while True:
299             try:
300                 size = int(self.read_data.recv(SIZE_PAD) or 0)
301                 data = self.read_data.recv(size)
302                 osrf.log.log_internal("recv'd data " + data)
303                 osrf.net.get_network_handle().flush_inbound_data()
304                 session = osrf.stack.push(osrf.net.NetworkMessage.from_xml(data))
305                 self.keepalive_loop(session)
306                 self.num_requests += 1
307                 if self.num_requests == self.controller.max_requests:
308                     break
309                 self.send_status()
310             except KeyboardInterrupt:
311                 pass
312         # run the exit handler
313         osrf.app.Application.application.child_exit()
314
315     def keepalive_loop(self, session):
316         keepalive = self.controller.keepalive
317
318         while session.state == osrf.const.OSRF_APP_SESSION_CONNECTED:
319
320             status = session.wait(keepalive)
321
322             if session.state == osrf.const.OSRF_APP_SESSION_DISCONNECTED:
323                 osrf.log.log_internal("client sent disconnect, exiting keepalive")
324                 break
325
326             if status is None: # no msg received before keepalive timeout expired
327
328                 osrf.log.log_info("No request was received in %d seconds, exiting stateful session" % int(keepalive));
329
330                 session.send_status(
331                     session.thread, 
332                     osrf.net_obj.NetworkObject.osrfConnectStatus({   
333                         'status' : 'Disconnected on timeout',
334                         'statusCode': osrf.const.OSRF_STATUS_TIMEOUT
335                     })
336                 )
337
338                 break
339
340         session.cleanup()
341         return
342
343     def send_status(self):
344         ''' Informs the controller that we are done processing this request '''
345         fcntl.lockf(self.controller.write_status.fileno(), fcntl.LOCK_EX)
346         try:
347             self.controller.write_status.sendall(str(self.pid).rjust(SIZE_PAD))
348         finally:
349             fcntl.lockf(self.controller.write_status.fileno(), fcntl.LOCK_UN)
350
351     def init(self):
352         ''' Connects the opensrf xmpp handle '''
353         osrf.net.clear_network_handle()
354         osrf.system.System.net_connect(
355             resource = '%s_drone' % self.controller.service, 
356             service = self.controller.service
357         )
358         osrf.app.Application.application.child_init()
359