1 package OpenSRF::MultiSession;
2 use OpenSRF::AppSession;
3 use OpenSRF::Utils::Logger;
4 use Time::HiRes qw/time usleep/;
6 my $log = 'OpenSRF::Utils::Logger';
10 $class = ref($class) || $class;
12 my $self = bless {@_} => $class;
14 $self->{api_level} = 1 if (!defined($self->{api_level}));
15 $self->{session_hash_function} = \&_dummy_session_hash_function
16 if (!defined($self->{session_hash_function}));
19 $self->session_cap($self->{cap});
20 $self->request_cap($self->{cap});
23 if (!$self->session_cap) {
24 # XXX make adaptive the default once the logic is in place
27 $self->session_cap(10);
29 if (!$self->request_cap) {
30 # XXX make adaptive the default once the logic is in place
33 $self->request_cap(10);
36 $self->{sessions} = [];
37 $self->{running} = [];
38 $self->{completed} = [];
41 for ( 1 .. $self->session_cap) {
42 push @{ $self->{sessions} },
43 OpenSRF::AppSession->create(
48 #print "Creating connection ".$self->{sessions}->[-1]->session_id." ...\n";
49 $log->debug("Creating connection ".$self->{sessions}->[-1]->session_id." ...");
55 sub _dummy_session_hash_function {
57 $self->{_dummy_hash_counter} = 1 if (!exists($self->{_dummy_hash_counter}));
58 return $self->{_dummy_hash_counter}++;
63 $_->connect for (@{$self->{sessions}});
68 $_->finish for (@{$self->{sessions}});
73 $_->disconnect for (@{$self->{sessions}});
76 sub session_hash_function {
78 my $session_hash_function = shift;
79 return unless (ref $self);
81 $self->{session_hash_function} = $session_hash_function if (defined $session_hash_function);
82 return $self->{session_hash_function};
87 my $failure_handler = shift;
88 return unless (ref $self);
90 $self->{failure_handler} = $failure_handler if (defined $failure_handler);
91 return $self->{failure_handler};
96 my $success_handler = shift;
97 return unless (ref $self);
99 $self->{success_handler} = $success_handler if (defined $success_handler);
100 return $self->{success_handler};
106 return unless (ref $self);
108 $self->{session_cap} = $cap if (defined $cap);
109 return $self->{session_cap};
115 return unless (ref $self);
117 $self->{request_cap} = $cap if (defined $cap);
118 return $self->{request_cap};
124 return unless (ref $self);
126 $self->{adaptive} = $adapt if (defined $adapt);
127 return $self->{adaptive};
133 return unless (ref $self);
137 $count ||= scalar @{$self->{completed}};
140 if (defined $count) {
141 return () unless (@{$self->{completed}});
142 return splice @{$self->{completed}}, 0, $count;
145 return scalar @{$self->{completed}};
151 return unless (ref $self);
155 $count ||= scalar @{$self->{failed}};
158 if (defined $count) {
159 return () unless (@{$self->{failed}});
160 return splice @{$self->{failed}}, 0, $count;
163 return scalar @{$self->{failed}};
168 return unless (ref $self);
169 return scalar(@{ $self->{running} });
178 if ($self->running < $self->request_cap ) {
179 my $index = $self->session_hash_function->($self, $method, @params);
180 my $ses = $self->{sessions}->[$index % $self->session_cap];
182 #print "Running $method using session ".$ses->session_id."\n";
184 my $req = $ses->request( $method, @params );
186 push @{ $self->{running} },
193 $log->debug("Making request [$method] ".$self->running."...");
196 } elsif (!$self->adaptive) {
197 #print "Oops. Too many running: ".$self->running."\n";
199 return $self->request($method => @params);
201 # XXX do addaptive stuff ...
210 while ($self->running) {
214 while(!$self->session_reap) {
225 while ( my $req = shift @{ $self->{running} } ) {
226 if ($req->{req}->complete) {
227 #print "Currently running: ".$self->running."\n";
229 $req->{response} = [$req->{req}->recv];
232 $req->{duration} = $req->{end} - $req->{start};
234 #print "Completed ".$req->{meth}." in session ".$req->{req}->session->session_id." [$req->{duration}]\n";
236 if ($req->{req}->failed) {
237 #print "ARG!!!! Failed command $req->{meth} in session ".$req->{req}->session->session_id."\n";
238 $req->{error} = $req->{req}->failed;
239 push @{ $self->{failed} }, $req;
241 push @{ $self->{completed} }, $req;
247 my $handler = $req->{error} ? $self->failure_handler : $self->success_handler;
249 $handler->($self, $req) if ($handler);
253 #print "Still running ".$req->{meth}." in session ".$req->{req}->session->session_id."\n";
257 push @{ $self->{running} }, @running;