]> git.evergreen-ils.org Git - OpenSRF.git/blob - src/python/osrf/server.py
Typos: s/wait_for_child/try_wait_child/ and s/min_childen/min_children/
[OpenSRF.git] / src / python / osrf / server.py
1 # -----------------------------------------------------------------------
2 # Copyright (C) 2008  Equinox Software, Inc.
3 # Bill Erickson <erickson@esilibrary.com>
4 #
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU General Public License
7 # as published by the Free Software Foundation; either version 2
8 # of the License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program; if not, write to the Free Software
17 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  
18 # 02110-1301, USA
19 # -----------------------------------------------------------------------
20
21 import os, sys, threading, fcntl, socket, errno, signal, time
22 import osrf.log, osrf.conf, osrf.net, osrf.system, osrf.stack, osrf.app, osrf.const
23
24
25 # used to define the size of the PID/size leader in 
26 # status and data messages passed to and from children
27 SIZE_PAD = 12
28
29 class Controller(object):
30     ''' 
31         OpenSRF forking request server.  
32     '''
33
34     def __init__(self, service):
35         self.service = service # service name
36         self.max_requests = 0 # max child requests
37         self.max_children = 0 # max num of child processes
38         self.min_children = 0 # min num of child processes
39         self.num_children = 0 # current num children
40         self.osrf_handle = None # xmpp handle
41         self.routers = [] # list of registered routers
42         self.keepalive = 0 # how long to wait for subsequent, stateful requests
43         self.active_list = [] # list of active children
44         self.idle_list = [] # list of idle children
45
46         # Global status socketpair.  All children relay their 
47         # availability info to the parent through this socketpair. 
48         self.read_status, self.write_status = socket.socketpair()
49
50     def load_app(self):
51         settings = osrf.set.get('activeapps.%s' % self.service)
52         
53
54     def cleanup(self):
55         ''' Closes management sockets, kills children, reaps children, exits '''
56
57         osrf.log.log_info("Shutting down...")
58         self.cleanup_routers()
59
60         self.read_status.shutdown(socket.SHUT_RDWR)
61         self.write_status.shutdown(socket.SHUT_RDWR)
62         self.read_status.close()
63         self.write_status.close()
64
65         for child in self.idle_list + self.active_list:
66             child.read_data.shutdown(socket.SHUT_RDWR)
67             child.write_data.shutdown(socket.SHUT_RDWR)
68             child.read_data.close()
69             child.write_data.close()
70
71         os.kill(0, signal.SIGKILL)
72         self.reap_children(True)
73         os._exit(0)
74
75
76     def handle_signals(self):
77         ''' Installs SIGINT and SIGTERM handlers '''
78         def handler(signum, frame):
79             self.cleanup()
80         signal.signal(signal.SIGINT, handler)
81         signal.signal(signal.SIGTERM, handler)
82
83
84     def run(self):
85
86         osrf.net.get_network_handle().disconnect()
87         osrf.net.clear_network_handle()
88         self.spawn_children()
89         self.handle_signals()
90
91         time.sleep(.5) # give children a chance to connect before we start taking data
92         self.osrf_handle = osrf.system.System.net_connect(
93             resource = '%s_listener' % self.service,
94             service = self.service
95         )
96
97         # clear the recv callback so inbound messages do not filter through the opensrf stack
98         self.osrf_handle.receive_callback = None
99
100         # connect to our listening routers
101         self.register_routers()
102
103         try:
104             osrf.log.log_debug("entering main server loop...")
105             while True: # main server loop
106
107                 self.reap_children()
108                 self.check_status()
109                 data = self.osrf_handle.recv(-1).to_xml()
110
111                 if self.try_avail_child(data):
112                     continue
113
114                 if self.try_new_child(data):
115                     continue
116
117                 self.try_wait_child()
118
119         except KeyboardInterrupt:
120             self.cleanup()
121         #except Exception, e: 
122             #osrf.log.log_error("server exiting with exception: %s" % e.message)
123             #self.cleanup()
124                 
125
126     def try_avail_child(self, data):
127         ''' Trys to send current request data to an available child process '''
128
129         if len(self.idle_list) == 0:
130             return False
131
132         child = self.idle_list.pop(0) # remove from idle list
133         osrf.log.log_internal("sending data to available child %d" % child.pid)
134         self.write_child(child, data)
135         self.active_list.insert(0, child) # add to active list
136         return True
137
138     def try_new_child(self, data):
139         ''' Tries to spawn a new child to send request data to '''
140
141         osrf.log.log_debug("try_new_child: service=%s num_children=%s max_children=%s" % (self.service, self.num_children, self.max_children))
142         if self.num_children < self.max_children:
143             osrf.log.log_internal("spawning new child to handle data")
144             child = self.spawn_child(True)
145             self.write_child(child, data)
146             return True
147         return False
148
149     def try_wait_child(self, data):
150         ''' Waits for a child to become available '''
151
152         osrf.log.log_warn("No children available, waiting...")
153         child = self.check_status(True)
154         self.write_child(child, data)
155
156
157     def write_child(self, child, data):
158         ''' Sends data to the child process '''
159         # Do we need to watch for sigpipe, etc?
160         child.write_data.sendall(str(len(data)).rjust(SIZE_PAD) + data)
161
162
163     def check_status(self, block=False):
164         ''' Checks to see if any children have indicated they are done with 
165             their current request.  If block is true, this will wait 
166             indefinitely for a child to be free. '''
167
168         pid = None
169         if block:
170             pid = self.read_status.recv(SIZE_PAD)
171         else:
172             try:
173                 self.read_status.setblocking(0)
174                 pid = self.read_status.recv(SIZE_PAD)
175             except socket.error, e:
176                 if e.args[0] != errno.EAGAIN:
177                     raise e
178             self.read_status.setblocking(1)
179                 
180         if pid:
181             pid = int(pid)
182             child = [c for c in self.active_list if c.pid == pid][0]
183             self.active_list.remove(child)
184             self.idle_list.insert(0, child)
185             return child
186
187         return None
188         
189
190     def reap_children(self, done=False):
191         ''' Uses waitpid() to reap the children.  If necessary, new children are spawned '''
192
193         options = 0
194         if not done: 
195             options = os.WNOHANG 
196
197         while True:
198             try:
199                 (pid, status) = os.waitpid(0, options)
200                 if pid == 0:
201                     if not done:
202                         self.spawn_children()
203                     return
204
205                 osrf.log.log_debug("reaping child %d" % pid)
206                 self.num_children -= 1
207
208                 # locate the child in the active or idle list and remove it
209                 # Note: typically, a dead child will be in the active list, since 
210                 # exiting children do not send a cleanup status to the controller
211
212                 child = [c for c in self.active_list if c.pid == pid]
213                 if len(child) > 0:
214                     self.active_list.remove(child[0])
215                 else:
216                     child = [c for c in self.idle_list if c.pid == pid]
217                     self.idle_list.remove(child[0])
218
219             except OSError:
220                 return
221         
222     def spawn_children(self):
223         ''' Launches up to min_children child processes '''
224         osrf.log.log_debug("spawn_children: service=%s num_children=%s min_children=%s" % (self.service, self.num_children, self.min_children))
225         while self.num_children < self.min_children:
226             self.spawn_child()
227
228     def spawn_child(self, active=False):
229         ''' Spawns a new child process '''
230
231         child = Child(self)
232         child.read_data, child.write_data = socket.socketpair()
233         child.pid = os.fork()
234
235         if child.pid:
236             self.num_children += 1
237             if active:
238                 self.active_list.insert(0, child)
239             else:
240                 self.idle_list.insert(0, child)
241             osrf.log.log_debug("service %s spawned child %d : %d total" % (self.service, child.pid, self.num_children))
242             return child
243         else:
244             child.pid = os.getpid()
245             child.init()
246             child.run()
247             osrf.net.get_network_handle().disconnect()
248             osrf.log.log_internal("child exiting...")
249             os._exit(0)
250
251     def register_routers(self):
252         ''' Registers this application instance with all configured routers '''
253         routers = osrf.conf.get('routers.router')
254
255         if not isinstance(routers, list):
256             routers = [routers]
257
258         for router in routers:
259             if isinstance(router, dict):
260                 if not 'services' in router or \
261                         self.service in router['services']['service']:
262                     target = "%s@%s/router" % (router['name'], router['domain'])
263                     self.register_router(target)
264             else:
265                 router_name = osrf.conf.get('router_name')
266                 target = "%s@%s/router" % (router_name, router)
267                 self.register_router(target)
268
269
270     def register_router(self, target):
271         ''' Registers with a single router '''
272
273         osrf.log.log_info("registering with router %s" % target)
274         self.routers.append(target)
275
276         reg_msg = osrf.net.NetworkMessage(
277             recipient = target,
278             body = 'registering...',
279             router_command = 'register',
280             router_class = self.service
281         )
282
283         self.osrf_handle.send(reg_msg)
284
285     def cleanup_routers(self):
286         ''' Un-registers with all connected routers '''
287
288         for target in self.routers:
289             osrf.log.log_info("un-registering with router %s" % target)
290             unreg_msg = osrf.net.NetworkMessage(
291                 recipient = target,
292                 body = 'un-registering...',
293                 router_command = 'unregister',
294                 router_class = self.service
295             )
296             self.osrf_handle.send(unreg_msg)
297         
298
299 class Child(object):
300     ''' Models a single child process '''
301
302     def __init__(self, controller):
303         self.controller = controller # our Controller object
304         self.num_requests = 0 # how many requests we've served so far
305         self.read_data = None # the child reads data from the controller on this socket
306         self.write_data = None # the controller sends data to the child on this socket 
307         self.pid = 0 # my process id
308
309     def run(self):
310         ''' Loops, processing data, until max_requests is reached '''
311
312         while True:
313             try:
314                 size = int(self.read_data.recv(SIZE_PAD) or 0)
315                 data = self.read_data.recv(size)
316                 osrf.log.log_internal("recv'd data " + data)
317                 osrf.net.get_network_handle().flush_inbound_data()
318                 session = osrf.stack.push(osrf.net.NetworkMessage.from_xml(data))
319                 self.keepalive_loop(session)
320                 self.num_requests += 1
321                 if self.num_requests == self.controller.max_requests:
322                     break
323                 self.send_status()
324             except KeyboardInterrupt:
325                 pass
326
327         # run the exit handler
328         osrf.app.Application.application.child_exit()
329
330     def keepalive_loop(self, session):
331         keepalive = self.controller.keepalive
332
333         while session.state == osrf.const.OSRF_APP_SESSION_CONNECTED:
334
335             status = session.wait(keepalive)
336
337             if session.state == osrf.const.OSRF_APP_SESSION_DISCONNECTED:
338                 osrf.log.log_internal("client sent disconnect, exiting keepalive")
339                 break
340
341             if status is None: # no msg received before keepalive timeout expired
342
343                 osrf.log.log_info("No request was received in %d seconds, exiting stateful session" % int(keepalive));
344
345                 session.send_status(
346                     session.thread, 
347                     osrf.net_obj.NetworkObject.osrfConnectStatus({   
348                         'status' : 'Disconnected on timeout',
349                         'statusCode': osrf.const.OSRF_STATUS_TIMEOUT
350                     })
351                 )
352
353                 break
354
355         session.cleanup()
356         return
357
358     def send_status(self):
359         ''' Informs the controller that we are done processing this request '''
360         fcntl.lockf(self.controller.write_status.fileno(), fcntl.LOCK_EX)
361         try:
362             self.controller.write_status.sendall(str(self.pid).rjust(SIZE_PAD))
363         finally:
364             fcntl.lockf(self.controller.write_status.fileno(), fcntl.LOCK_UN)
365
366     def init(self):
367         ''' Connects the opensrf xmpp handle '''
368         osrf.net.clear_network_handle()
369         osrf.system.System.net_connect(
370             resource = '%s_drone' % self.controller.service, 
371             service = self.controller.service
372         )
373         osrf.app.Application.application.child_init()
374