1 package OpenSRF::UnixServer;
2 use strict; use warnings;
4 use OpenSRF::EX qw(:try);
5 use OpenSRF::Utils::Logger qw(:level);
6 use OpenSRF::Transport::PeerHandle;
7 use OpenSRF::Application;
8 use OpenSRF::AppSession;
9 use OpenSRF::DomainObject::oilsResponse qw/:status/;
11 use OpenSRF::Utils::SettingsClient;
12 use Time::HiRes qw(time);
14 use vars qw/@ISA $app/;
15 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
21 # XXX Need to add actual logging statements in the code
22 my $logger = "OpenSRF::Utils::Logger";
24 sub DESTROY { confess "Dying $$"; }
29 All inbound messages are passed on to the UnixServer for processing.
30 We take the data, close the Unix socket, and pass the data on to our abstract
33 Our purpose is to 'multiplex' a single TCP connection into multiple 'client' connections.
34 So when you pass data down the Unix socket to us, we have been preforked and waiting
35 to disperse new data among us.
39 sub app { return $app; }
44 my( $class, $app1 ) = @_;
46 throw OpenSRF::EX::InvalidArg( "UnixServer requires an app name to run" );
49 my $self = bless( {}, $class );
50 # my $client = OpenSRF::Utils::SettingsClient->new();
51 # if( $client->config_value("server_type") !~ /fork/i ||
52 # OpenSRF::Utils::Config->current->bootstrap->settings_config ) {
53 # warn "Calling hooks for non-prefork\n";
54 # $self->configure_hook();
55 # $self->child_init_hook();
62 =head2 process_request()
64 Takes the incoming data, closes the Unix socket and hands the data untouched
65 to the abstract process() method. This method is implemented in our subclasses.
73 while( $d = <STDIN> ) { $data .= $d; }
79 if( ! $data or ! defined( $data ) or $data eq "" ) {
80 close($self->{server}->{client});
81 $logger->debug("Unix child received empty data from socket", ERROR);
87 if( ! close( $self->{server}->{client} ) ) {
88 $logger->debug( "Error closing Unix socket: $!", ERROR );
91 my $app = $self->app();
92 $logger->transport( "UnixServer for $app received $data", INTERNAL );
94 # --------------------------------------------------------------
95 # Drop all data from the socket before coninuting to process
96 # --------------------------------------------------------------
97 my $ph = OpenSRF::Transport::PeerHandle->retrieve;
98 if(!$ph->flush_socket()) {
99 $logger->error("We received a request ".
100 "and we are no longer connected to the jabber network. ".
101 "We will go away and drop this request: $data");
105 my $app_session = OpenSRF::Transport->handler( $self->app(), $data );
107 if(!ref($app_session)) {
108 $logger->transport( "Did not receive AppSession from transport handler, returning...", WARN );
113 if($app_session->stateless and $app_session->state != $app_session->CONNECTED()){
114 $logger->debug("Exiting keepalive for stateless session / orig = $orig");
115 $app_session->kill_me;
121 my $client = OpenSRF::Utils::SettingsClient->new();
122 my $keepalive = $client->config_value("apps", $self->app(), "keepalive");
125 while( $app_session and
126 $app_session->state and
127 $app_session->state != $app_session->DISCONNECTED() and
128 $app_session->find( $app_session->session_id ) ) {
132 $logger->debug( "UnixServer calling queue_wait $keepalive", INTERNAL );
133 $app_session->queue_wait( $keepalive );
134 $logger->debug( "after queue wait $keepalive", INTERNAL );
137 if( ($after - $before) >= $keepalive ) {
139 my $res = OpenSRF::DomainObject::oilsConnectStatus->new(
140 status => "Disconnected on timeout",
141 statusCode => STATUS_TIMEOUT);
142 $app_session->status($res);
143 $app_session->state( $app_session->DISCONNECTED() );
150 while( $app_session && $app_session->queue_wait(0) ) {
151 $logger->debug( "Looping on zombies " . $x++ , DEBUG);
154 $logger->debug( "Timed out, disconnected, or auth failed" );
155 $app_session->kill_me if ($app_session);
164 my $app = $self->app();
165 $logger->set_service($app);
167 $0 = "OpenSRF master [$app]";
169 system("rm -f /tmp/opensrf_unix_$app*");
171 my $client = OpenSRF::Utils::SettingsClient->new();
172 $logger->transport("Max Req: " . $client->config_value("apps", $app, "unix_config", "max_requests" ), INFO );
174 my $min_servers = $client->config_value("apps", $app, "unix_config", "min_children" );
175 my $max_servers = $client->config_value("apps", $app, "unix_config", "max_children" );
176 my $min_spare = $client->config_value("apps", $app, "unix_config", "min_spare_children" );
177 my $max_spare = $client->config_value("apps", $app, "unix_config", "max_spare_children" );
178 my $max_requests = $client->config_value("apps", $app, "unix_config", "max_requests" );
179 my $log_file = join("/", $client->config_value("dirs", "log"),
180 $client->config_value("apps", $app, "unix_config", "unix_log" ));
181 my $port = join("/", $client->config_value("dirs", "sock"),
182 $client->config_value("apps", $app, "unix_config", "unix_sock" ));
183 my $pid_file = join("/", $client->config_value("dirs", "pid"),
184 $client->config_value("apps", $app, "unix_config", "unix_pid" ));
186 my $file = "/tmp/" . "opensrf_unix_$app"."_" . time . rand( $$ ) . "_$$";
187 my $file_string = "min_servers $min_servers\nmax_servers $max_servers\n" .
188 "min_spare_servers $min_spare\nmax_spare_servers $max_spare\n" .
189 "max_requests $max_requests\nlog_file $log_file\nproto unix\n" .
190 "port $port\npid_file $pid_file\nlog_level 3\n";
192 open F, "> $file" or die "Can't open $file : $!";
193 print F $file_string;
196 $self->run( 'conf_file' => $file );
203 my $app = $self->app;
206 OpenSRF::System->bootstrap_client( client_name => "system_client" );
208 $logger->debug( "Setting application implementaion for $app", DEBUG );
209 my $client = OpenSRF::Utils::SettingsClient->new();
210 my $imp = $client->config_value("apps", $app, "implementation");
211 OpenSRF::Application::server_class($app);
212 OpenSRF::Application->application_implementation( $imp );
213 JSON->register_class_hint( name => $imp, hint => $app, type => "hash" );
214 OpenSRF::Application->application_implementation->initialize()
215 if (OpenSRF::Application->application_implementation->can('initialize'));
217 if( $client->config_value("server_type") !~ /fork/i ) {
218 $self->child_init_hook();
221 my $con = OpenSRF::Transport::PeerHandle->retrieve;
226 return OpenSRF::Application->application_implementation;
229 sub child_finish_hook {
233 sub child_init_hook {
235 $0 =~ s/master/drone/g;
237 if ($ENV{OPENSRF_PROFILE}) {
240 eval "use Devel::Profiler output_file => '/tmp/profiler_$file.out', buffer_size => 0;";
242 $logger->debug("Could not load Devel::Profiler: $@",ERROR);
244 $0 .= ' [PROFILING]';
245 $logger->debug("Running under Devel::Profiler", INFO);
251 # $logger->transport(
252 # "Creating PeerHandle from UnixServer child_init_hook", INTERNAL );
253 OpenSRF::Transport::PeerHandle->construct( $self->app() );
254 $logger->transport( "PeerHandle Created from UnixServer child_init_hook", INTERNAL );
256 OpenSRF::Application->application_implementation->child_init
257 if (OpenSRF::Application->application_implementation->can('child_init'));
259 return OpenSRF::Transport::PeerHandle->retrieve;