implemented the majority of server-side python. still need to add settings server...
[OpenSRF.git] / src / python / osrf / server.py
1 # -----------------------------------------------------------------------
2 # Copyright (C) 2008  Equinox Software, Inc.
3 # Bill Erickson <erickson@esilibrary.com>
4 #
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU General Public License
7 # as published by the Free Software Foundation; either version 2
8 # of the License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program; if not, write to the Free Software
17 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  
18 # 02110-1301, USA
19 # -----------------------------------------------------------------------
20
21 import os, sys, threading, logging, fcntl, socket, errno, signal, time
22 import osrf.log, osrf.conf, osrf.net, osrf.system, osrf.stack, osrf.app
23
24
25 # used to define the size of the PID/size leader in 
26 # status and data messages passed to and from children
27 SIZE_PAD = 12
28
29 class Controller(object):
30     ''' 
31         OpenSRF forking request server.  
32     '''
33
34     def __init__(self, service):
35         self.service = service # service name
36         self.application = None
37         self.max_requests = 0 # max child requests
38         self.max_children = 0 # max num of child processes
39         self.min_childen = 0 # min num of child processes
40         self.num_children = 0 # current num children
41         self.child_idx = 0 # current index into the children array
42         self.children = [] # list of children
43         self.osrf_handle = None # xmpp handle
44         self.routers = [] # list of registered routers
45
46         # Global status socketpair.  All children relay their 
47         # availability info to the parent through this socketpair. 
48         self.read_status, self.write_status = socket.socketpair()
49
50     def load_app(self):
51         settings = osrf.set.get('activeapps.%s' % self.service)
52         
53
54     def cleanup(self):
55         ''' Closes management sockets, kills children, reaps children, exits '''
56
57         osrf.log.log_info("Shutting down...")
58         self.cleanup_routers()
59
60         self.read_status.shutdown(socket.SHUT_RDWR)
61         self.write_status.shutdown(socket.SHUT_RDWR)
62         self.read_status.close()
63         self.write_status.close()
64
65         for child in self.children:
66             child.read_data.shutdown(socket.SHUT_RDWR)
67             child.write_data.shutdown(socket.SHUT_RDWR)
68             child.read_data.close()
69             child.write_data.close()
70
71         os.kill(0, signal.SIGKILL)
72         self.reap_children(True)
73         os._exit(0)
74
75
76     def handle_signals(self):
77         ''' Installs SIGINT and SIGTERM handlers '''
78         def handler(signum, frame):
79             self.cleanup()
80         signal.signal(signal.SIGINT, handler)
81         signal.signal(signal.SIGTERM, handler)
82
83
84     def run(self):
85
86         osrf.net.clear_network_handle()
87         self.spawn_children()
88         self.handle_signals()
89
90         time.sleep(.5) # give children a chance to connect before we start taking data
91         self.osrf_handle = osrf.system.System.net_connect(resource = '%s_listener' % self.service)
92
93         # clear the recv callback so inbound messages do not filter through the opensrf stack
94         self.osrf_handle.receive_callback = None
95
96         # connect to our listening routers
97         self.register_routers()
98
99         try:
100             osrf.log.log_debug("entering main server loop...")
101             while True: # main server loop
102
103                 self.reap_children()
104                 self.check_status()
105                 data = self.osrf_handle.recv(-1).to_xml()
106
107                 if self.try_avail_child(data):
108                     continue
109
110                 if self.try_new_child(data):
111                     continue
112
113                 self.wait_for_child()
114
115         except KeyboardInterrupt:
116             self.cleanup()
117         #except Exception, e: 
118             #osrf.log.log_error("server exiting with exception: %s" % e.message)
119             #self.cleanup()
120                 
121
122     def try_avail_child(self, data):
123         ''' Trys to send current request data to an available child process '''
124         ctr = 0
125         while ctr < self.num_children:
126
127             if self.child_idx >= self.num_children:
128                 self.child_idx = 0
129             child = self.children[self.child_idx]
130
131             if child.available:
132                 osrf.log.log_internal("sending data to available child")
133                 self.write_child(child, data)
134                 return True
135
136             ctr += 1
137             self.child_idx += 1
138         return False
139
140     def try_new_child(self, data):
141         ''' Tries to spawn a new child to send request data to '''
142         if self.num_children < self.max_children:
143             osrf.log.log_internal("spawning new child to handle data")
144             child = self.spawn_child()
145             self.write_child(child, data)
146             return True
147         return False
148
149     def try_wait_child(self, data):
150         ''' Waits for a child to become available '''
151         osrf.log.log_warn("No children available, waiting...")
152         child = self.check_status(True)
153         self.write_child(child, data)
154
155
156     def write_child(self, child, data):
157         ''' Sends data to the child process '''
158         child.available = False
159         child.write_data.sendall(str(len(data)).rjust(SIZE_PAD) + data)
160         self.child_idx += 1
161
162
163     def check_status(self, block=False):
164         ''' Checks to see if any children have indicated they are done with 
165             their current request.  If block is true, this will wait 
166             indefinitely for a child to be free. '''
167
168         pid = None
169         child = None
170         if block:
171             pid = self.read_status.recv(SIZE_PAD)
172         else:
173             try:
174                 self.read_status.setblocking(0)
175                 pid = self.read_status.recv(SIZE_PAD)
176             except socket.error, e:
177                 if e.args[0] != errno.EAGAIN:
178                     raise e
179             self.read_status.setblocking(1)
180                 
181         if pid:
182             pid = int(pid)
183             child = [c for c in self.children if c.pid == pid][0]
184             child.available = True
185
186         return child
187         
188
189     def reap_children(self, done=False):
190         ''' Uses waitpid() to reap the children.  If necessary, new children are spawned '''
191         options = 0
192         if not done: 
193             options = os.WNOHANG 
194
195         while True:
196             try:
197                 (pid, status) = os.waitpid(0, options)
198                 if pid == 0:
199                     if not done:
200                         self.spawn_children()
201                     return
202                 osrf.log.log_debug("reaping child %d" % pid)
203                 self.num_children -= 1
204                 self.children = [c for c in self.children if c.pid != pid]
205             except OSError:
206                 return
207         
208     def spawn_children(self):
209         ''' Launches up to min_children child processes '''
210         while self.num_children < self.min_children:
211             self.spawn_child()
212
213     def spawn_child(self):
214         ''' Spawns a new child process '''
215
216         child = Child(self)
217         child.read_data, child.write_data = socket.socketpair()
218         child.pid = os.fork()
219
220         if child.pid:
221             self.num_children += 1
222             self.children.append(child)
223             osrf.log.log_debug("spawned child %d : %d total" % (child.pid, self.num_children))
224             return child
225         else:
226             child.pid = os.getpid()
227             child.init()
228             child.run()
229             os._exit(0)
230
231     def register_routers(self):
232         ''' Registers this application instance with all configured routers '''
233         routers = osrf.conf.get('routers.router')
234
235         if not isinstance(routers, list):
236             routers = [routers]
237
238         for router in routers:
239             if isinstance(router, dict):
240                 if not 'services' in router or \
241                         self.service in router['services']['service']:
242                     target = "%s@%s/router" % (router['name'], router['domain'])
243                     self.register_router(target)
244             else:
245                 router_name = osrf.conf.get('router_name')
246                 target = "%s@%s/router" % (router_name, router)
247                 self.register_router(target)
248
249
250     def register_router(self, target):
251         ''' Registers with a single router '''
252         osrf.log.log_info("registering with router %s" % target)
253         self.routers.append(target)
254
255         reg_msg = osrf.net.NetworkMessage(
256             recipient = target,
257             body = 'registering...',
258             router_command = 'register',
259             router_class = self.service
260         )
261
262         self.osrf_handle.send(reg_msg)
263
264     def cleanup_routers(self):
265         ''' Un-registers with all connected routers '''
266         for target in self.routers:
267             osrf.log.log_info("un-registering with router %s" % target)
268             unreg_msg = osrf.net.NetworkMessage(
269                 recipient = target,
270                 body = 'un-registering...',
271                 router_command = 'unregister',
272                 router_class = self.service
273             )
274             self.osrf_handle.send(unreg_msg)
275         
276
277 class Child(object):
278     ''' Models a single child process '''
279
280     def __init__(self, controller):
281         self.controller = controller # our Controller object
282         self.num_requests = 0 # how many requests we've served so far
283         self.read_data = None # the child reads data from the controller on this socket
284         self.write_data = None # the controller sends data to the child on this socket 
285         self.available = True # true if this child is not currently serving a request
286         self.pid = 0 # my process id
287
288
289     def run(self):
290         ''' Loops, processing data, until max_requests is reached '''
291         while True:
292             try:
293                 size = int(self.read_data.recv(SIZE_PAD) or 0)
294                 data = self.read_data.recv(size)
295                 osrf.log.log_internal("recv'd data " + data)
296                 osrf.stack.push(osrf.net.NetworkMessage.from_xml(data))
297                 self.num_requests += 1
298                 if self.num_requests == self.controller.max_requests:
299                     break
300                 self.send_status()
301             except KeyboardInterrupt:
302                 pass
303         # run the exit handler
304         osrf.app.Application.application.child_exit()
305
306     def send_status(self):
307         ''' Informs the controller that we are done processing this request '''
308         fcntl.lockf(self.controller.write_status.fileno(), fcntl.LOCK_EX)
309         try:
310             self.controller.write_status.sendall(str(self.pid).rjust(SIZE_PAD))
311         finally:
312             fcntl.lockf(self.controller.write_status.fileno(), fcntl.LOCK_UN)
313
314     def init(self):
315         ''' Connects the opensrf xmpp handle '''
316         osrf.net.clear_network_handle()
317         osrf.system.System.net_connect(resource = '%s_drone' % self.controller.service)
318         osrf.app.Application.application.child_init()
319