]> git.evergreen-ils.org Git - OpenSRF.git/blob - src/perlmods/OpenSRF/UnixServer.pm
added a flush_socket method to the jabber client code to allow
[OpenSRF.git] / src / perlmods / OpenSRF / UnixServer.pm
1 package OpenSRF::UnixServer;
2 use strict; use warnings;
3 use base qw/OpenSRF/;
4 use OpenSRF::EX qw(:try);
5 use OpenSRF::Utils::Logger qw(:level);
6 use OpenSRF::Transport::PeerHandle;
7 use OpenSRF::Application;
8 use OpenSRF::AppSession;
9 use OpenSRF::DomainObject::oilsResponse qw/:status/;
10 use OpenSRF::System;
11 use OpenSRF::Utils::SettingsClient;
12 use Time::HiRes qw(time);
13 use JSON;
14 use vars qw/@ISA $app/;
15 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
16 use Carp;
17
18 use IO::Socket::INET;
19 use IO::Socket::UNIX;
20
21 # XXX Need to add actual logging statements in the code
22 my $logger = "OpenSRF::Utils::Logger";
23
24 sub DESTROY { confess "Dying $$"; }
25
26
27 =head1 What am I
28
29 All inbound messages are passed on to the UnixServer for processing.
30 We take the data, close the Unix socket, and pass the data on to our abstract
31 'process()' method.  
32
33 Our purpose is to 'multiplex' a single TCP connection into multiple 'client' connections.
34 So when you pass data down the Unix socket to us, we have been preforked and waiting
35 to disperse new data among us.
36
37 =cut
38
39 sub app { return $app; }
40
41 {
42
43         sub new {
44                 my( $class, $app1 ) = @_;
45                 if( ! $app1 ) {
46                         throw OpenSRF::EX::InvalidArg( "UnixServer requires an app name to run" );
47                 }
48                 $app = $app1;
49                 my $self = bless( {}, $class );
50 #               my $client = OpenSRF::Utils::SettingsClient->new();
51 #               if( $client->config_value("server_type") !~ /fork/i || 
52 #                               OpenSRF::Utils::Config->current->bootstrap->settings_config ) {
53 #                       warn "Calling hooks for non-prefork\n";
54 #                       $self->configure_hook();
55 #                       $self->child_init_hook();
56 #               }
57                 return $self;
58         }
59
60 }
61
62 =head2 process_request()
63
64 Takes the incoming data, closes the Unix socket and hands the data untouched 
65 to the abstract process() method.  This method is implemented in our subclasses.
66
67 =cut
68
69 sub process_request {
70
71         my $self = shift;
72         my $data; my $d;
73         while( $d = <STDIN> ) { $data .= $d; }
74
75         
76         my $orig = $0;
77         $0 = "$0*";
78
79         if( ! $data or ! defined( $data ) or $data eq "" ) {
80                 close($self->{server}->{client}); 
81                 $logger->debug("Unix child received empty data from socket", ERROR);
82                 $0 = $orig;
83                 return;
84         }
85
86
87         if( ! close( $self->{server}->{client} ) ) {
88                 $logger->debug( "Error closing Unix socket: $!", ERROR );
89         }
90
91         my $app = $self->app();
92         $logger->transport( "UnixServer for $app received $data", INTERNAL );
93
94         # --------------------------------------------------------------
95         # Drop all data from the socket before coninuting to process
96         # --------------------------------------------------------------
97         my $ph = OpenSRF::Transport::PeerHandle->retrieve;
98         if(!$ph->flush_socket()) {
99                 $logger->error("We received a request ".
100                         "and we are no longer connected to the jabber network. ".
101                         "We will go away and drop this request: $data");
102                 exit;
103         }
104
105         my $app_session = OpenSRF::Transport->handler( $self->app(), $data );
106
107         if(!ref($app_session)) {
108                 $logger->transport( "Did not receive AppSession from transport handler, returning...", WARN );
109                 $0 = $orig;
110                 return;
111         }
112
113         if($app_session->stateless and $app_session->state != $app_session->CONNECTED()){
114                 $logger->debug("Exiting keepalive for stateless session / orig = $orig");
115                 $app_session->kill_me;
116                 $0 = $orig;
117                 return;
118         }
119
120
121         my $client = OpenSRF::Utils::SettingsClient->new();
122         my $keepalive = $client->config_value("apps", $self->app(), "keepalive");
123
124         my $req_counter = 0;
125         while( $app_session and 
126                         $app_session->state and 
127                         $app_session->state != $app_session->DISCONNECTED() and
128                         $app_session->find( $app_session->session_id ) ) {
129                 
130
131                 my $before = time;
132                 $logger->debug( "UnixServer calling queue_wait $keepalive", INTERNAL );
133                 $app_session->queue_wait( $keepalive );
134                 $logger->debug( "after queue wait $keepalive", INTERNAL );
135                 my $after = time;
136
137                 if( ($after - $before) >= $keepalive ) { 
138
139                         my $res = OpenSRF::DomainObject::oilsConnectStatus->new(
140                                                                         status => "Disconnected on timeout",
141                                                                         statusCode => STATUS_TIMEOUT);
142                         $app_session->status($res);
143                         $app_session->state( $app_session->DISCONNECTED() );
144                         last;
145                 }
146         
147         }
148
149         my $x = 0;
150         while( $app_session && $app_session->queue_wait(0) ) {
151                 $logger->debug( "Looping on zombies " . $x++ , DEBUG);
152         }
153
154         $logger->debug( "Timed out, disconnected, or auth failed" );
155         $app_session->kill_me if ($app_session);
156
157         $0 = $orig;
158 }
159
160
161 sub serve {
162         my( $self ) = @_;
163
164         my $app = $self->app();
165         $logger->set_service($app);
166
167         $0 = "OpenSRF master [$app]";
168
169         system("rm -f /tmp/opensrf_unix_$app*");
170
171         my $client = OpenSRF::Utils::SettingsClient->new();
172         $logger->transport("Max Req: " . $client->config_value("apps", $app, "unix_config", "max_requests" ), INFO );
173
174         my $min_servers = $client->config_value("apps", $app, "unix_config", "min_children" );
175         my $max_servers = $client->config_value("apps", $app, "unix_config", "max_children" );
176         my $min_spare    =      $client->config_value("apps", $app, "unix_config", "min_spare_children" );
177         my $max_spare    = $client->config_value("apps", $app, "unix_config", "max_spare_children" );
178         my $max_requests = $client->config_value("apps", $app, "unix_config", "max_requests" );
179         my $log_file = join("/", $client->config_value("dirs", "log"),
180                                 $client->config_value("apps", $app, "unix_config", "unix_log" ));
181         my $port =      join("/", $client->config_value("dirs", "sock"),
182                                 $client->config_value("apps", $app, "unix_config", "unix_sock" ));
183         my $pid_file =  join("/", $client->config_value("dirs", "pid"),
184                                 $client->config_value("apps", $app, "unix_config", "unix_pid" ));
185
186         my $file = "/tmp/" . "opensrf_unix_$app"."_" . time . rand( $$ ) . "_$$";
187         my $file_string = "min_servers $min_servers\nmax_servers $max_servers\n" .
188                 "min_spare_servers $min_spare\nmax_spare_servers $max_spare\n" .
189                 "max_requests $max_requests\nlog_file $log_file\nproto unix\n" . 
190                 "port $port\npid_file $pid_file\nlog_level 3\n";
191
192         open F, "> $file" or die "Can't open $file : $!";
193         print F $file_string;
194         close F;
195
196         $self->run( 'conf_file' => $file );
197         unlink($file);
198
199 }
200
201 sub configure_hook {
202         my $self = shift;
203         my $app = $self->app;
204
205         # boot a client
206         OpenSRF::System->bootstrap_client( client_name => "system_client" );
207
208         $logger->debug( "Setting application implementaion for $app", DEBUG );
209         my $client = OpenSRF::Utils::SettingsClient->new();
210         my $imp = $client->config_value("apps", $app, "implementation");
211         OpenSRF::Application::server_class($app);
212         OpenSRF::Application->application_implementation( $imp );
213         JSON->register_class_hint( name => $imp, hint => $app, type => "hash" );
214         OpenSRF::Application->application_implementation->initialize()
215                 if (OpenSRF::Application->application_implementation->can('initialize'));
216
217         if( $client->config_value("server_type") !~ /fork/i  ) {
218                 $self->child_init_hook();
219         }
220
221         my $con = OpenSRF::Transport::PeerHandle->retrieve;
222         if($con) {
223                 $con->disconnect;
224         }
225
226         return OpenSRF::Application->application_implementation;
227 }
228
229 sub child_finish_hook {
230         my $self = shift;
231 }
232
233 sub child_init_hook { 
234
235         $0 =~ s/master/drone/g;
236
237         if ($ENV{OPENSRF_PROFILE}) {
238                 my $file = $0;
239                 $file =~ s/\W/_/go;
240                 eval "use Devel::Profiler output_file => '/tmp/profiler_$file.out', buffer_size => 0;";
241                 if ($@) {
242                         $logger->debug("Could not load Devel::Profiler: $@",ERROR);
243                 } else {
244                         $0 .= ' [PROFILING]';
245                         $logger->debug("Running under Devel::Profiler", INFO);
246                 }
247         }
248
249         my $self = shift;
250
251 #       $logger->transport( 
252 #                       "Creating PeerHandle from UnixServer child_init_hook", INTERNAL );
253         OpenSRF::Transport::PeerHandle->construct( $self->app() );
254         $logger->transport( "PeerHandle Created from UnixServer child_init_hook", INTERNAL );
255
256         OpenSRF::Application->application_implementation->child_init
257                 if (OpenSRF::Application->application_implementation->can('child_init'));
258
259         return OpenSRF::Transport::PeerHandle->retrieve;
260 }
261
262 1;
263