1 package OpenSRF::UnixServer;
2 use strict; use warnings;
4 use OpenSRF::EX qw(:try);
5 use OpenSRF::Utils::Logger qw(:level $logger);
6 use OpenSRF::Transport::PeerHandle;
7 use OpenSRF::Application;
8 use OpenSRF::AppSession;
9 use OpenSRF::DomainObject::oilsResponse qw/:status/;
11 use OpenSRF::Utils::SettingsClient;
12 use Time::HiRes qw(time);
13 use OpenSRF::Utils::JSON;
14 use vars qw/@ISA $app/;
15 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
17 use FreezeThaw qw/thaw/;
22 sub DESTROY { confess "Dying $$"; }
26 All inbound messages are passed on to the UnixServer for processing.
27 We take the data, close the Unix socket, and pass the data on to our abstract
30 Our purpose is to 'multiplex' a single TCP connection into multiple 'client' connections.
31 So when you pass data down the Unix socket to us, we have been preforked and waiting
32 to disperse new data among us.
36 sub app { return $app; }
41 my( $class, $app1 ) = @_;
43 throw OpenSRF::EX::InvalidArg( "UnixServer requires an app name to run" );
46 my $self = bless( {}, $class );
47 # my $client = OpenSRF::Utils::SettingsClient->new();
48 # if( $client->config_value("server_type") !~ /fork/i ||
49 # OpenSRF::Utils::Config->current->bootstrap->settings_config ) {
50 # warn "Calling hooks for non-prefork\n";
51 # $self->configure_hook();
52 # $self->child_init_hook();
59 =head2 process_request()
61 Takes the incoming data, closes the Unix socket and hands the data untouched
62 to the abstract process() method. This method is implemented in our subclasses.
70 while( $d = <STDIN> ) { $data .= $d; }
75 if( ! $data or ! defined( $data ) or $data eq "" ) {
76 close($self->{server}->{client});
77 $logger->debug("Unix child received empty data from socket", ERROR);
83 if( ! close( $self->{server}->{client} ) ) {
84 $logger->debug( "Error closing Unix socket: $!", ERROR );
87 my $app = $self->app();
88 $logger->transport( "UnixServer for $app received $data", INTERNAL );
90 # --------------------------------------------------------------
91 # Drop all data from the socket before coninuting to process
92 # --------------------------------------------------------------
93 my $ph = OpenSRF::Transport::PeerHandle->retrieve;
94 if(!$ph->flush_socket()) {
95 $logger->error("We received a request ".
96 "and we are no longer connected to the jabber network. ".
97 "We will go away and drop this request: $data");
101 ($data) = thaw($data);
102 my $app_session = OpenSRF::Transport->handler( $self->app(), $data );
104 if(!ref($app_session)) {
105 $logger->transport( "Did not receive AppSession from transport handler, returning...", WARN );
110 if($app_session->stateless and $app_session->state != $app_session->CONNECTED()){
111 $logger->debug("Exiting keepalive for stateless session / orig = $orig");
112 $app_session->kill_me;
118 my $client = OpenSRF::Utils::SettingsClient->new();
119 my $keepalive = $client->config_value("apps", $self->app(), "keepalive");
122 while( $app_session and
123 $app_session->state and
124 $app_session->state != $app_session->DISCONNECTED() and
125 $app_session->find( $app_session->session_id ) ) {
129 $logger->debug( "UnixServer calling queue_wait $keepalive", INTERNAL );
130 $app_session->queue_wait( $keepalive );
131 $logger->debug( "after queue wait $keepalive", INTERNAL );
134 if( ($after - $before) >= $keepalive ) {
136 my $res = OpenSRF::DomainObject::oilsConnectStatus->new(
137 status => "Disconnected on timeout",
138 statusCode => STATUS_TIMEOUT);
139 $app_session->status($res);
140 $app_session->state( $app_session->DISCONNECTED() );
147 while( $app_session && $app_session->queue_wait(0) ) {
148 $logger->debug( "Looping on zombies " . $x++ , DEBUG);
151 $logger->debug( "Timed out, disconnected, or authentication failed" );
152 $app_session->kill_me if ($app_session);
161 my $app = $self->app();
162 $logger->set_service($app);
164 $0 = "OpenSRF master [$app]";
166 my $client = OpenSRF::Utils::SettingsClient->new();
167 my @base = ('apps', $app, 'unix_config' );
169 my $min_servers = $client->config_value(@base, 'min_children');
170 my $max_servers = $client->config_value(@base, "max_children" );
171 my $min_spare = $client->config_value(@base, "min_spare_children" );
172 my $max_spare = $client->config_value(@base, "max_spare_children" );
173 my $max_requests = $client->config_value(@base, "max_requests" );
174 # fwiw, these file paths are (obviously) not portable
175 my $log_file = join("/", $client->config_value("dirs", "log"), $client->config_value(@base, "unix_log" ));
176 my $port = join("/", $client->config_value("dirs", "sock"), $client->config_value(@base, "unix_sock" ));
177 my $pid_file = join("/", $client->config_value("dirs", "pid"), $client->config_value(@base, "unix_pid" ));
179 $min_spare ||= $min_servers;
180 $max_spare ||= $max_servers;
181 $max_requests ||= 1000;
183 $logger->info("UnixServer: min=$min_servers, max=$max_servers, min_spare=$min_spare ".
184 "max_spare=$max_spare, max_req=$max_requests, log_file=$log_file, port=$port, pid_file=$pid_file");
187 min_servers => $min_servers,
188 max_servers => $max_servers,
189 min_spare_servers => $min_spare,
190 max_spare_servers => $max_spare,
191 max_requests => $max_requests,
192 log_file => $log_file,
195 pid_file => $pid_file,
203 my $app = $self->app;
206 OpenSRF::System->bootstrap_client( client_name => "system_client" );
208 $logger->debug( "Setting application implementation for $app", DEBUG );
209 my $client = OpenSRF::Utils::SettingsClient->new();
210 my $imp = $client->config_value("apps", $app, "implementation");
211 OpenSRF::Application::server_class($app);
212 OpenSRF::Application->application_implementation( $imp );
213 OpenSRF::Utils::JSON->register_class_hint( name => $imp, hint => $app, type => "hash" );
214 OpenSRF::Application->application_implementation->initialize()
215 if (OpenSRF::Application->application_implementation->can('initialize'));
217 if( $client->config_value("server_type") !~ /fork/i ) {
218 $self->child_init_hook();
221 my $con = OpenSRF::Transport::PeerHandle->retrieve;
226 return OpenSRF::Application->application_implementation;
229 sub child_init_hook {
231 $0 =~ s/master/drone/g;
233 if ($ENV{OPENSRF_PROFILE}) {
236 eval "use Devel::Profiler output_file => '/tmp/profiler_$file.out', buffer_size => 0;";
238 $logger->debug("Could not load Devel::Profiler: $@",ERROR);
240 $0 .= ' [PROFILING]';
241 $logger->debug("Running under Devel::Profiler", INFO);
247 # $logger->transport(
248 # "Creating PeerHandle from UnixServer child_init_hook", INTERNAL );
249 OpenSRF::Transport::PeerHandle->construct( $self->app() );
250 $logger->transport( "PeerHandle Created from UnixServer child_init_hook", INTERNAL );
252 OpenSRF::Application->application_implementation->child_init
253 if (OpenSRF::Application->application_implementation->can('child_init'));
255 return OpenSRF::Transport::PeerHandle->retrieve;
258 sub child_finish_hook {
259 $logger->debug("attempting to call child exit handler...");
260 OpenSRF::Application->application_implementation->child_exit
261 if (OpenSRF::Application->application_implementation->can('child_exit'));