1 package OpenSRF::MultiSession;
2 use OpenSRF::AppSession;
3 use OpenSRF::Utils::Logger;
4 use Time::HiRes qw/time usleep/;
6 my $log = 'OpenSRF::Utils::Logger';
10 $class = ref($class) || $class;
12 my $self = bless {@_} => $class;
14 $self->{api_level} = 1 if (!defined($self->{api_level}));
15 $self->{session_hash_function} = \&_dummy_session_hash_function
16 if (!defined($self->{session_hash_function}));
19 $self->session_cap($self->{cap});
20 $self->request_cap($self->{cap});
23 if (!$self->session_cap) {
24 # XXX make adaptive the default once the logic is in place
27 $self->session_cap(10);
29 if (!$self->request_cap) {
30 # XXX make adaptive the default once the logic is in place
33 $self->request_cap(10);
36 $self->{sessions} = [];
37 $self->{running} = [];
38 $self->{completed} = [];
41 for ( 1 .. $self->session_cap) {
42 push @{ $self->{sessions} },
43 OpenSRF::AppSession->create(
48 #print "Creating connection ".$self->{sessions}->[-1]->session_id." ...\n";
49 $log->debug("Creating connection ".$self->{sessions}->[-1]->session_id." ...");
55 sub _dummy_session_hash_function {
57 $self->{_dummy_hash_counter} = 0 if (!exists($self->{_dummy_hash_counter}));
58 $self->{_dummy_hash_counter}++;
59 return ( $self->{_dummy_hash_counter} % $self->session_cap ) - 1;
64 $_->connect for (@{$self->{sessions}});
69 $_->finish for (@{$self->{sessions}});
74 $_->disconnect for (@{$self->{sessions}});
77 sub session_hash_function {
79 my $session_hash_function = shift;
80 return unless (ref $self);
82 $self->{session_hash_function} = $session_hash_function if (defined $session_hash_function);
83 return $self->{session_hash_function};
88 my $failure_handler = shift;
89 return unless (ref $self);
91 $self->{failure_handler} = $failure_handler if (defined $failure_handler);
92 return $self->{failure_handler};
97 my $success_handler = shift;
98 return unless (ref $self);
100 $self->{success_handler} = $success_handler if (defined $success_handler);
101 return $self->{success_handler};
107 return unless (ref $self);
109 $self->{session_cap} = $cap if (defined $cap);
110 return $self->{session_cap};
116 return unless (ref $self);
118 $self->{request_cap} = $cap if (defined $cap);
119 return $self->{request_cap};
125 return unless (ref $self);
127 $self->{adaptive} = $adapt if (defined $adapt);
128 return $self->{adaptive};
134 return unless (ref $self);
138 $count ||= scalar @{$self->{completed}};
141 if (defined $count) {
142 return () unless (@{$self->{completed}});
143 return splice @{$self->{completed}}, 0, $count;
146 return scalar @{$self->{completed}};
152 return unless (ref $self);
156 $count ||= scalar @{$self->{failed}};
159 if (defined $count) {
160 return () unless (@{$self->{failed}});
161 return splice @{$self->{failed}}, 0, $count;
164 return scalar @{$self->{failed}};
169 return unless (ref $self);
170 return scalar(@{ $self->{running} });
179 if ($self->running < $self->request_cap ) {
180 my $index = $self->session_hash_function->($self, $method, @params);
181 my $ses = $self->{sessions}->[$index];
183 #print "Running $method using session ".$ses->session_id."\n";
185 my $req = $ses->request( $method, @params );
187 push @{ $self->{running} },
194 $log->debug("Making request [$method] ".$self->running."...");
197 } elsif (!$self->adaptive) {
198 #print "Oops. Too many running: ".$self->running."\n";
200 return $self->request($method => @params);
202 # XXX do addaptive stuff ...
211 while ($self->running) {
215 while(!$self->session_reap) {
226 while ( my $req = shift @{ $self->{running} } ) {
227 if ($req->{req}->complete) {
228 #print "Currently running: ".$self->running."\n";
230 $req->{response} = [$req->{req}->recv];
233 $req->{duration} = $req->{end} - $req->{start};
235 #print "Completed ".$req->{meth}." in session ".$req->{req}->session->session_id." [$req->{duration}]\n";
237 if ($req->{req}->failed) {
238 #print "ARG!!!! Failed command $req->{meth} in session ".$req->{req}->session->session_id."\n";
239 $req->{error} = $req->{req}->failed;
240 push @{ $self->{failed} }, $req;
242 push @{ $self->{completed} }, $req;
248 my $handler = $req->{error} ? $self->failure_handler : $self->success_handler;
250 $handler->($self, $req) if ($handler);
254 #print "Still running ".$req->{meth}." in session ".$req->{req}->session->session_id."\n";
258 push @{ $self->{running} }, @running;