1 package OpenSRF::UnixServer;
2 use strict; use warnings;
4 use OpenSRF::EX qw(:try);
5 use OpenSRF::Utils::Logger qw(:level $logger);
6 use OpenSRF::Transport::PeerHandle;
7 use OpenSRF::Application;
8 use OpenSRF::AppSession;
9 use OpenSRF::DomainObject::oilsResponse qw/:status/;
11 use OpenSRF::Utils::SettingsClient;
12 use Time::HiRes qw(time);
13 use OpenSRF::Utils::JSON;
14 use vars qw/@ISA $app/;
15 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
21 sub DESTROY { confess "Dying $$"; }
25 All inbound messages are passed on to the UnixServer for processing.
26 We take the data, close the Unix socket, and pass the data on to our abstract
29 Our purpose is to 'multiplex' a single TCP connection into multiple 'client' connections.
30 So when you pass data down the Unix socket to us, we have been preforked and waiting
31 to disperse new data among us.
35 sub app { return $app; }
40 my( $class, $app1 ) = @_;
42 throw OpenSRF::EX::InvalidArg( "UnixServer requires an app name to run" );
45 my $self = bless( {}, $class );
46 # my $client = OpenSRF::Utils::SettingsClient->new();
47 # if( $client->config_value("server_type") !~ /fork/i ||
48 # OpenSRF::Utils::Config->current->bootstrap->settings_config ) {
49 # warn "Calling hooks for non-prefork\n";
50 # $self->configure_hook();
51 # $self->child_init_hook();
58 =head2 process_request()
60 Takes the incoming data, closes the Unix socket and hands the data untouched
61 to the abstract process() method. This method is implemented in our subclasses.
69 while( $d = <STDIN> ) { $data .= $d; }
74 if( ! $data or ! defined( $data ) or $data eq "" ) {
75 close($self->{server}->{client});
76 $logger->debug("Unix child received empty data from socket", ERROR);
82 if( ! close( $self->{server}->{client} ) ) {
83 $logger->debug( "Error closing Unix socket: $!", ERROR );
86 my $app = $self->app();
87 $logger->transport( "UnixServer for $app received $data", INTERNAL );
89 # --------------------------------------------------------------
90 # Drop all data from the socket before coninuting to process
91 # --------------------------------------------------------------
92 my $ph = OpenSRF::Transport::PeerHandle->retrieve;
93 if(!$ph->flush_socket()) {
94 $logger->error("We received a request ".
95 "and we are no longer connected to the jabber network. ".
96 "We will go away and drop this request: $data");
100 my $app_session = OpenSRF::Transport->handler( $self->app(), $data );
102 if(!ref($app_session)) {
103 $logger->transport( "Did not receive AppSession from transport handler, returning...", WARN );
108 if($app_session->stateless and $app_session->state != $app_session->CONNECTED()){
109 $logger->debug("Exiting keepalive for stateless session / orig = $orig");
110 $app_session->kill_me;
116 my $client = OpenSRF::Utils::SettingsClient->new();
117 my $keepalive = $client->config_value("apps", $self->app(), "keepalive");
120 while( $app_session and
121 $app_session->state and
122 $app_session->state != $app_session->DISCONNECTED() and
123 $app_session->find( $app_session->session_id ) ) {
127 $logger->debug( "UnixServer calling queue_wait $keepalive", INTERNAL );
128 $app_session->queue_wait( $keepalive );
129 $logger->debug( "after queue wait $keepalive", INTERNAL );
132 if( ($after - $before) >= $keepalive ) {
134 my $res = OpenSRF::DomainObject::oilsConnectStatus->new(
135 status => "Disconnected on timeout",
136 statusCode => STATUS_TIMEOUT);
137 $app_session->status($res);
138 $app_session->state( $app_session->DISCONNECTED() );
145 while( $app_session && $app_session->queue_wait(0) ) {
146 $logger->debug( "Looping on zombies " . $x++ , DEBUG);
149 $logger->debug( "Timed out, disconnected, or auth failed" );
150 $app_session->kill_me if ($app_session);
159 my $app = $self->app();
160 $logger->set_service($app);
162 $0 = "OpenSRF master [$app]";
164 my $client = OpenSRF::Utils::SettingsClient->new();
165 my @base = ('apps', $app, 'unix_config' );
167 my $min_servers = $client->config_value(@base, 'min_children');
168 my $max_servers = $client->config_value(@base, "max_children" );
169 my $min_spare = $client->config_value(@base, "min_spare_children" );
170 my $max_spare = $client->config_value(@base, "max_spare_children" );
171 my $max_requests = $client->config_value(@base, "max_requests" );
172 # fwiw, these file paths are (obviously) not portable
173 my $log_file = join("/", $client->config_value("dirs", "log"), $client->config_value(@base, "unix_log" ));
174 my $port = join("/", $client->config_value("dirs", "sock"), $client->config_value(@base, "unix_sock" ));
175 my $pid_file = join("/", $client->config_value("dirs", "pid"), $client->config_value(@base, "unix_pid" ));
177 $min_spare ||= $min_servers;
178 $max_spare ||= $max_servers;
179 $max_requests ||= 1000;
181 $logger->info("UnixServer: min=$min_servers, max=$max_servers, min_spare=$min_spare ".
182 "max_spare=$max_spare, max_req=$max_requests, log_file=$log_file, port=$port, pid_file=$pid_file");
185 min_servers => $min_servers,
186 max_servers => $max_servers,
187 min_spare_servers => $min_spare,
188 max_spare_servers => $max_spare,
189 max_requests => $max_requests,
190 log_file => $log_file,
193 pid_file => $pid_file,
201 my $app = $self->app;
204 OpenSRF::System->bootstrap_client( client_name => "system_client" );
206 $logger->debug( "Setting application implementaion for $app", DEBUG );
207 my $client = OpenSRF::Utils::SettingsClient->new();
208 my $imp = $client->config_value("apps", $app, "implementation");
209 OpenSRF::Application::server_class($app);
210 OpenSRF::Application->application_implementation( $imp );
211 OpenSRF::Utils::JSON->register_class_hint( name => $imp, hint => $app, type => "hash" );
212 OpenSRF::Application->application_implementation->initialize()
213 if (OpenSRF::Application->application_implementation->can('initialize'));
215 if( $client->config_value("server_type") !~ /fork/i ) {
216 $self->child_init_hook();
219 my $con = OpenSRF::Transport::PeerHandle->retrieve;
224 return OpenSRF::Application->application_implementation;
227 sub child_init_hook {
229 $0 =~ s/master/drone/g;
231 if ($ENV{OPENSRF_PROFILE}) {
234 eval "use Devel::Profiler output_file => '/tmp/profiler_$file.out', buffer_size => 0;";
236 $logger->debug("Could not load Devel::Profiler: $@",ERROR);
238 $0 .= ' [PROFILING]';
239 $logger->debug("Running under Devel::Profiler", INFO);
245 # $logger->transport(
246 # "Creating PeerHandle from UnixServer child_init_hook", INTERNAL );
247 OpenSRF::Transport::PeerHandle->construct( $self->app() );
248 $logger->transport( "PeerHandle Created from UnixServer child_init_hook", INTERNAL );
250 OpenSRF::Application->application_implementation->child_init
251 if (OpenSRF::Application->application_implementation->can('child_init'));
253 return OpenSRF::Transport::PeerHandle->retrieve;
256 sub child_finish_hook {
257 $logger->debug("attempting to call child exit handler...");
258 OpenSRF::Application->application_implementation->child_exit
259 if (OpenSRF::Application->application_implementation->can('child_exit'));