1 package OpenSRF::MultiSession;
2 use OpenSRF::AppSession;
3 use OpenSRF::Utils::Logger;
4 use Time::HiRes qw/time usleep/;
6 my $log = 'OpenSRF::Utils::Logger';
10 $class = ref($class) || $class;
12 my $self = bless {@_} => $class;
14 $self->{api_level} = 1 if (!defined($self->{api_level}));
15 $self->{session_hash_function} = \&_dummy_session_hash_function
16 if (!defined($self->{session_hash_function}));
19 $self->session_cap($self->{cap}) if (!$self->session_cap);
20 $self->request_cap($self->{cap}) if (!$self->request_cap);
23 if (!$self->session_cap) {
24 # XXX make adaptive the default once the logic is in place
27 $self->session_cap(10);
29 if (!$self->request_cap) {
30 # XXX make adaptive the default once the logic is in place
33 $self->request_cap(10);
36 $self->{sessions} = [];
37 $self->{running} = [];
38 $self->{completed} = [];
41 for ( 1 .. $self->session_cap) {
42 push @{ $self->{sessions} },
43 OpenSRF::AppSession->create(
48 #print "Creating connection ".$self->{sessions}->[-1]->session_id." ...\n";
49 $log->debug("Creating connection ".$self->{sessions}->[-1]->session_id." ...");
55 sub _dummy_session_hash_function {
57 $self->{_dummy_hash_counter} = 1 if (!exists($self->{_dummy_hash_counter}));
58 return $self->{_dummy_hash_counter}++;
63 for my $ses (@{$self->{sessions}}) {
64 $ses->connect unless ($ses->connected);
70 $_->finish for (@{$self->{sessions}});
75 $_->disconnect for (@{$self->{sessions}});
78 sub session_hash_function {
80 my $session_hash_function = shift;
81 return unless (ref $self);
83 $self->{session_hash_function} = $session_hash_function if (defined $session_hash_function);
84 return $self->{session_hash_function};
89 my $failure_handler = shift;
90 return unless (ref $self);
92 $self->{failure_handler} = $failure_handler if (defined $failure_handler);
93 return $self->{failure_handler};
98 my $success_handler = shift;
99 return unless (ref $self);
101 $self->{success_handler} = $success_handler if (defined $success_handler);
102 return $self->{success_handler};
108 return unless (ref $self);
110 $self->{session_cap} = $cap if (defined $cap);
111 return $self->{session_cap};
117 return unless (ref $self);
119 $self->{request_cap} = $cap if (defined $cap);
120 return $self->{request_cap};
126 return unless (ref $self);
128 $self->{adaptive} = $adapt if (defined $adapt);
129 return $self->{adaptive};
135 return unless (ref $self);
139 $count ||= scalar @{$self->{completed}};
142 if (defined $count) {
143 return () unless (@{$self->{completed}});
144 return splice @{$self->{completed}}, 0, $count;
147 return scalar @{$self->{completed}};
153 return unless (ref $self);
157 $count ||= scalar @{$self->{failed}};
160 if (defined $count) {
161 return () unless (@{$self->{failed}});
162 return splice @{$self->{failed}}, 0, $count;
165 return scalar @{$self->{failed}};
170 return unless (ref $self);
171 return scalar(@{ $self->{running} });
181 $hash_param = $method;
188 if ($self->running < $self->request_cap ) {
189 my $index = $self->session_hash_function->($self, (defined $hash_param ? $hash_param : ()), $method, @params);
190 my $ses = $self->{sessions}->[$index % $self->session_cap];
192 #print "Running $method using session ".$ses->session_id."\n";
194 my $req = $ses->request( $method, @params );
196 push @{ $self->{running} },
203 $log->debug("Making request [$method] ".$self->running."...");
206 } elsif (!$self->adaptive) {
207 #print "Oops. Too many running: ".$self->running."\n";
209 return $self->request((defined $hash_param ? $hash_param : ()), $method => @params);
211 # XXX do addaptive stuff ...
221 $count = $self->running;
222 while ($self->running) {
227 while(($count = $self->session_reap) == 0 && $self->running) {
239 while ( my $req = shift @{ $self->{running} } ) {
240 if ($req->{req}->complete) {
241 #print "Currently running: ".$self->running."\n";
243 $req->{response} = [ $req->{req}->recv ];
244 $req->{duration} = $req->{req}->duration;
246 #print "Completed ".$req->{meth}." in session ".$req->{req}->session->session_id." [$req->{duration}]\n";
248 if ($req->{req}->failed) {
249 #print "ARG!!!! Failed command $req->{meth} in session ".$req->{req}->session->session_id."\n";
250 $req->{error} = $req->{req}->failed;
251 push @{ $self->{failed} }, $req;
253 push @{ $self->{completed} }, $req;
259 #$log->debug("Still running ".$req->{meth}." in session ".$req->{req}->session->session_id);
263 push @{ $self->{running} }, @running;
265 for my $req ( @done ) {
266 my $handler = $req->{error} ? $self->failure_handler : $self->success_handler;
267 $handler->($self, $req) if ($handler);
270 delete $$req{$_} for (keys %$req);
274 my $complete = scalar @done;
275 my $incomplete = scalar @running;
277 #$log->debug("Still running $incomplete, completed $complete");