630b7dd4422e929c38276ebd551ef8563fe9e883
[OpenSRF.git] / src / python / osrf / server.py
1 # -----------------------------------------------------------------------
2 # Copyright (C) 2008  Equinox Software, Inc.
3 # Bill Erickson <erickson@esilibrary.com>
4 #
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU General Public License
7 # as published by the Free Software Foundation; either version 2
8 # of the License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program; if not, write to the Free Software
17 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  
18 # 02110-1301, USA
19 # -----------------------------------------------------------------------
20
21 import os, sys, threading, logging, fcntl, socket, errno, signal, time
22 import osrf.log, osrf.net, osrf.system, osrf.stack
23
24
25 # used to define the size of the PID/size leader in 
26 # status and data messages passed to and from children
27 SIZE_PAD = 12
28
29 class Controller(object):
30     ''' 
31         OpenSRF forking request server.  
32     '''
33
34     def __init__(self, service):
35         self.service = service
36         self.max_requests = 0 # max child requests
37         self.max_children = 0 # max num of child processes
38         self.min_childen = 0 # min num of child processes
39         self.num_children = 0 # current num children
40         self.child_idx = 0 # current index into the children array
41         self.children = [] # list of children
42         self.osrf_handle = None # xmpp handle
43
44         # Global status socketpair.  All children relay their 
45         # availability info to the parent through this socketpair. 
46         self.read_status, self.write_status = socket.socketpair()
47
48
49     def cleanup(self):
50         ''' Closes management sockets, kills children, reaps children, exits '''
51
52         self.read_status.shutdown(socket.SHUT_RDWR)
53         self.write_status.shutdown(socket.SHUT_RDWR)
54         self.read_status.close()
55         self.write_status.close()
56
57         for child in self.children:
58             child.read_data.shutdown(socket.SHUT_RDWR)
59             child.write_data.shutdown(socket.SHUT_RDWR)
60             child.read_data.close()
61             child.write_data.close()
62
63         os.kill(0, signal.SIGKILL)
64         self.reap_children(True)
65         os._exit(0)
66
67
68     def handle_signals(self):
69         ''' Installs SIGINT and SIGTERM handlers '''
70         def handler(signum, frame):
71             self.cleanup()
72         signal.signal(signal.SIGINT, handler)
73         signal.signal(signal.SIGTERM, handler)
74
75
76     def run(self):
77
78         osrf.net.clear_network_handle()
79         self.spawn_children()
80         self.handle_signals()
81
82         time.sleep(.5) # give children a chance to connect before we start taking data
83         self.osrf_handle = osrf.system.System.net_connect(resource = '%s_listener' % self.service)
84
85         # clear the recv callback so inbound messages do not filter through the opensrf stack
86         self.osrf_handle.receive_callback = None
87
88         try:
89             while True: # main server loop
90
91                 self.reap_children()
92                 self.check_status()
93                 data = self.osrf_handle.recv(-1).to_xml()
94
95                 if self.try_avail_child(data):
96                     continue
97
98                 if self.try_new_child(data):
99                     continue
100
101                 self.wait_for_child()
102
103         except KeyboardInterrupt:
104             self.cleanup()
105         #except Exception, e: 
106             #osrf.log.log_error("server exiting with exception: %s" % e.message)
107             #self.cleanup()
108                 
109
110     def try_avail_child(self, data):
111         ''' Trys to send current request data to an available child process '''
112         ctr = 0
113         while ctr < self.num_children:
114
115             if self.child_idx >= self.num_children:
116                 self.child_idx = 0
117             child = self.children[self.child_idx]
118
119             if child.available:
120                 osrf.log.log_internal("sending data to available child")
121                 self.write_child(child, data)
122                 return True
123
124             ctr += 1
125             self.child_idx += 1
126         return False
127
128     def try_new_child(self, data):
129         ''' Tries to spawn a new child to send request data to '''
130         if self.num_children < self.max_children:
131             osrf.log.log_internal("spawning new child to handle data")
132             child = self.spawn_child()
133             self.write_child(child, data)
134             return True
135         return False
136
137     def try_wait_child(self, data):
138         ''' Waits for a child to become available '''
139         osrf.log.log_warn("No children available, waiting...")
140         child = self.check_status(True)
141         self.write_child(child, data)
142
143
144     def write_child(self, child, data):
145         ''' Sends data to the child process '''
146         child.available = False
147         child.write_data.sendall(str(len(data)).rjust(SIZE_PAD) + data)
148         self.child_idx += 1
149
150
151     def check_status(self, block=False):
152         ''' Checks to see if any children have indicated they are done with 
153             their current request.  If block is true, this will wait 
154             indefinitely for a child to be free. '''
155
156         pid = None
157         child = None
158         if block:
159             pid = self.read_status.recv(SIZE_PAD)
160         else:
161             try:
162                 self.read_status.setblocking(0)
163                 pid = self.read_status.recv(SIZE_PAD)
164             except socket.error, e:
165                 if e.args[0] != errno.EAGAIN:
166                     raise e
167             self.read_status.setblocking(1)
168                 
169         if pid:
170             pid = int(pid)
171             child = [c for c in self.children if c.pid == pid][0]
172             child.available = True
173
174         return child
175         
176
177     def reap_children(self, done=False):
178         ''' Uses waitpid() to reap the children.  If necessary, new children are spawned '''
179         options = 0
180         if not done: 
181             options = os.WNOHANG 
182
183         while True:
184             try:
185                 (pid, status) = os.waitpid(0, options)
186                 if pid == 0:
187                     if not done:
188                         self.spawn_children()
189                     return
190                 osrf.log.log_debug("reaping child %d" % pid)
191                 self.num_children -= 1
192                 self.children = [c for c in self.children if c.pid != pid]
193             except OSError:
194                 return
195         
196     def spawn_children(self):
197         ''' Launches up to min_children child processes '''
198         while self.num_children < self.min_children:
199             self.spawn_child()
200
201     def spawn_child(self):
202         ''' Spawns a new child process '''
203
204         child = Child(self)
205         child.read_data, child.write_data = socket.socketpair()
206         child.pid = os.fork()
207
208         if child.pid:
209             self.num_children += 1
210             self.children.append(child)
211             osrf.log.log_debug("spawned child %d : %d total" % (child.pid, self.num_children))
212             return child
213         else:
214             child.pid = os.getpid()
215             child.init()
216             child.run()
217             os._exit(0)
218
219 class Child(object):
220     ''' Models a single child process '''
221
222     def __init__(self, controller):
223         self.controller = controller # our Controller object
224         self.num_requests = 0 # how many requests we've served so far
225         self.read_data = None # the child reads data from the controller on this socket
226         self.write_data = None # the controller sends data to the child on this socket 
227         self.available = True # true if this child is not currently serving a request
228         self.pid = 0 # my process id
229
230
231     def run(self):
232         ''' Loops, processing data, until max_requests is reached '''
233         while True:
234             try:
235                 size = int(self.read_data.recv(SIZE_PAD))
236                 data = self.read_data.recv(size)
237                 osrf.log.log_internal("recv'd data " + data)
238                 osrf.stack.push(osrf.net.NetworkMessage.from_xml(data))
239                 self.num_requests += 1
240                 if self.num_requests == self.controller.max_requests:
241                     break
242                 self.send_status()
243             except KeyboardInterrupt:
244                 pass
245
246     def send_status(self):
247         ''' Informs the controller that we are done processing this request '''
248         fcntl.lockf(self.controller.write_status.fileno(), fcntl.LOCK_EX)
249         try:
250             self.controller.write_status.sendall(str(self.pid).rjust(SIZE_PAD))
251         finally:
252             fcntl.lockf(self.controller.write_status.fileno(), fcntl.LOCK_UN)
253
254     def init(self):
255         ''' Connects the opensrf xmpp handle '''
256         osrf.net.clear_network_handle()
257         osrf.system.System.net_connect(resource = '%s_drone' % self.controller.service)
258