1 package OpenSRF::UnixServer;
2 use strict; use warnings;
4 use OpenSRF::EX qw(:try);
5 use OpenSRF::Utils::Logger qw(:level $logger);
6 use OpenSRF::Transport::PeerHandle;
7 use OpenSRF::Application;
8 use OpenSRF::AppSession;
9 use OpenSRF::DomainObject::oilsResponse qw/:status/;
11 use OpenSRF::Utils::SettingsClient;
12 use Time::HiRes qw(time);
14 use vars qw/@ISA $app/;
15 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
21 sub DESTROY { confess "Dying $$"; }
25 All inbound messages are passed on to the UnixServer for processing.
26 We take the data, close the Unix socket, and pass the data on to our abstract
29 Our purpose is to 'multiplex' a single TCP connection into multiple 'client' connections.
30 So when you pass data down the Unix socket to us, we have been preforked and waiting
31 to disperse new data among us.
35 sub app { return $app; }
40 my( $class, $app1 ) = @_;
42 throw OpenSRF::EX::InvalidArg( "UnixServer requires an app name to run" );
45 my $self = bless( {}, $class );
46 # my $client = OpenSRF::Utils::SettingsClient->new();
47 # if( $client->config_value("server_type") !~ /fork/i ||
48 # OpenSRF::Utils::Config->current->bootstrap->settings_config ) {
49 # warn "Calling hooks for non-prefork\n";
50 # $self->configure_hook();
51 # $self->child_init_hook();
58 =head2 process_request()
60 Takes the incoming data, closes the Unix socket and hands the data untouched
61 to the abstract process() method. This method is implemented in our subclasses.
69 while( $d = <STDIN> ) { $data .= $d; }
75 if( ! $data or ! defined( $data ) or $data eq "" ) {
76 close($self->{server}->{client});
77 $logger->debug("Unix child received empty data from socket", ERROR);
83 if( ! close( $self->{server}->{client} ) ) {
84 $logger->debug( "Error closing Unix socket: $!", ERROR );
87 my $app = $self->app();
88 $logger->transport( "UnixServer for $app received $data", INTERNAL );
90 # --------------------------------------------------------------
91 # Drop all data from the socket before coninuting to process
92 # --------------------------------------------------------------
93 my $ph = OpenSRF::Transport::PeerHandle->retrieve;
94 if(!$ph->flush_socket()) {
95 $logger->error("We received a request ".
96 "and we are no longer connected to the jabber network. ".
97 "We will go away and drop this request: $data");
101 my $app_session = OpenSRF::Transport->handler( $self->app(), $data );
103 if(!ref($app_session)) {
104 $logger->transport( "Did not receive AppSession from transport handler, returning...", WARN );
109 if($app_session->stateless and $app_session->state != $app_session->CONNECTED()){
110 $logger->debug("Exiting keepalive for stateless session / orig = $orig");
111 $app_session->kill_me;
117 my $client = OpenSRF::Utils::SettingsClient->new();
118 my $keepalive = $client->config_value("apps", $self->app(), "keepalive");
121 while( $app_session and
122 $app_session->state and
123 $app_session->state != $app_session->DISCONNECTED() and
124 $app_session->find( $app_session->session_id ) ) {
128 $logger->debug( "UnixServer calling queue_wait $keepalive", INTERNAL );
129 $app_session->queue_wait( $keepalive );
130 $logger->debug( "after queue wait $keepalive", INTERNAL );
133 if( ($after - $before) >= $keepalive ) {
135 my $res = OpenSRF::DomainObject::oilsConnectStatus->new(
136 status => "Disconnected on timeout",
137 statusCode => STATUS_TIMEOUT);
138 $app_session->status($res);
139 $app_session->state( $app_session->DISCONNECTED() );
146 while( $app_session && $app_session->queue_wait(0) ) {
147 $logger->debug( "Looping on zombies " . $x++ , DEBUG);
150 $logger->debug( "Timed out, disconnected, or auth failed" );
151 $app_session->kill_me if ($app_session);
160 my $app = $self->app();
161 $logger->set_service($app);
163 $0 = "OpenSRF master [$app]";
165 system("rm -f /tmp/opensrf_unix_$app*");
167 my $client = OpenSRF::Utils::SettingsClient->new();
168 $logger->transport("Max Req: " . $client->config_value("apps", $app, "unix_config", "max_requests" ), INFO );
170 my $min_servers = $client->config_value("apps", $app, "unix_config", "min_children" );
171 my $max_servers = $client->config_value("apps", $app, "unix_config", "max_children" );
172 my $min_spare = $client->config_value("apps", $app, "unix_config", "min_spare_children" );
173 my $max_spare = $client->config_value("apps", $app, "unix_config", "max_spare_children" );
174 my $max_requests = $client->config_value("apps", $app, "unix_config", "max_requests" );
175 my $log_file = join("/", $client->config_value("dirs", "log"),
176 $client->config_value("apps", $app, "unix_config", "unix_log" ));
177 my $port = join("/", $client->config_value("dirs", "sock"),
178 $client->config_value("apps", $app, "unix_config", "unix_sock" ));
179 my $pid_file = join("/", $client->config_value("dirs", "pid"),
180 $client->config_value("apps", $app, "unix_config", "unix_pid" ));
182 my $file = "/tmp/" . "opensrf_unix_$app"."_" . time . rand( $$ ) . "_$$";
183 my $file_string = "min_servers $min_servers\nmax_servers $max_servers\n" .
184 "min_spare_servers $min_spare\nmax_spare_servers $max_spare\n" .
185 "max_requests $max_requests\nlog_file $log_file\nproto unix\n" .
186 "port $port\npid_file $pid_file\nlog_level 3\n";
188 open F, "> $file" or die "Can't open $file : $!";
189 print F $file_string;
192 $self->run( 'conf_file' => $file );
199 my $app = $self->app;
202 OpenSRF::System->bootstrap_client( client_name => "system_client" );
204 $logger->debug( "Setting application implementaion for $app", DEBUG );
205 my $client = OpenSRF::Utils::SettingsClient->new();
206 my $imp = $client->config_value("apps", $app, "implementation");
207 OpenSRF::Application::server_class($app);
208 OpenSRF::Application->application_implementation( $imp );
209 JSON->register_class_hint( name => $imp, hint => $app, type => "hash" );
210 OpenSRF::Application->application_implementation->initialize()
211 if (OpenSRF::Application->application_implementation->can('initialize'));
213 if( $client->config_value("server_type") !~ /fork/i ) {
214 $self->child_init_hook();
217 my $con = OpenSRF::Transport::PeerHandle->retrieve;
222 return OpenSRF::Application->application_implementation;
225 sub child_init_hook {
227 $0 =~ s/master/drone/g;
229 if ($ENV{OPENSRF_PROFILE}) {
232 eval "use Devel::Profiler output_file => '/tmp/profiler_$file.out', buffer_size => 0;";
234 $logger->debug("Could not load Devel::Profiler: $@",ERROR);
236 $0 .= ' [PROFILING]';
237 $logger->debug("Running under Devel::Profiler", INFO);
243 # $logger->transport(
244 # "Creating PeerHandle from UnixServer child_init_hook", INTERNAL );
245 OpenSRF::Transport::PeerHandle->construct( $self->app() );
246 $logger->transport( "PeerHandle Created from UnixServer child_init_hook", INTERNAL );
248 OpenSRF::Application->application_implementation->child_init
249 if (OpenSRF::Application->application_implementation->can('child_init'));
251 return OpenSRF::Transport::PeerHandle->retrieve;
254 sub child_finish_hook {
255 $logger->debug("attempting to call child exit handler...");
256 OpenSRF::Application->application_implementation->child_exit
257 if (OpenSRF::Application->application_implementation->can('child_exit'));