]> git.evergreen-ils.org Git - Evergreen.git/blob - OpenSRF/src/perlmods/OpenSRF/MultiSession.pm
optional hash function parameter; minor debuging
[Evergreen.git] / OpenSRF / src / perlmods / OpenSRF / MultiSession.pm
1 package OpenSRF::MultiSession;
2 use OpenSRF::AppSession;
3 use OpenSRF::Utils::Logger;
4 use Time::HiRes qw/time usleep/;
5
6 my $log = 'OpenSRF::Utils::Logger';
7
8 sub new {
9         my $class = shift;
10         $class = ref($class) || $class;
11
12         my $self = bless {@_} => $class;
13
14         $self->{api_level} = 1 if (!defined($self->{api_level}));
15         $self->{session_hash_function} = \&_dummy_session_hash_function
16                 if (!defined($self->{session_hash_function}));
17
18         if ($self->{cap}) {
19                 $self->session_cap($self->{cap}) if (!$self->session_cap);
20                 $self->request_cap($self->{cap}) if (!$self->request_cap);
21         }
22
23         if (!$self->session_cap) {
24                 # XXX make adaptive the default once the logic is in place
25                 #$self->adaptive(1);
26
27                 $self->session_cap(10);
28         }
29         if (!$self->request_cap) {
30                 # XXX make adaptive the default once the logic is in place
31                 #$self->adaptive(1);
32
33                 $self->request_cap(10);
34         }
35
36         $self->{sessions} = [];
37         $self->{running} = [];
38         $self->{completed} = [];
39         $self->{failed} = [];
40
41         for ( 1 .. $self->session_cap) {
42                 push @{ $self->{sessions} },
43                         OpenSRF::AppSession->create(
44                                 $self->{app},
45                                 $self->{api_level},
46                                 1
47                         );
48                 #print "Creating connection ".$self->{sessions}->[-1]->session_id." ...\n";
49                 $log->debug("Creating connection ".$self->{sessions}->[-1]->session_id." ...");
50         }
51
52         return $self;
53 }
54
55 sub _dummy_session_hash_function {
56         my $self = shift;
57         $self->{_dummy_hash_counter} = 1 if (!exists($self->{_dummy_hash_counter}));
58         return $self->{_dummy_hash_counter}++;
59 }
60
61 sub connect {
62         my $self = shift;
63         $_->connect for (@{$self->{sessions}});
64 }
65
66 sub finish {
67         my $self = shift;
68         $_->finish for (@{$self->{sessions}});
69 }
70
71 sub disconnect {
72         my $self = shift;
73         $_->disconnect for (@{$self->{sessions}});
74 }
75
76 sub session_hash_function {
77         my $self = shift;
78         my $session_hash_function = shift;
79         return unless (ref $self);
80
81         $self->{session_hash_function} = $session_hash_function if (defined $session_hash_function);
82         return $self->{session_hash_function};
83 }
84
85 sub failure_handler {
86         my $self = shift;
87         my $failure_handler = shift;
88         return unless (ref $self);
89
90         $self->{failure_handler} = $failure_handler if (defined $failure_handler);
91         return $self->{failure_handler};
92 }
93
94 sub success_handler {
95         my $self = shift;
96         my $success_handler = shift;
97         return unless (ref $self);
98
99         $self->{success_handler} = $success_handler if (defined $success_handler);
100         return $self->{success_handler};
101 }
102
103 sub session_cap {
104         my $self = shift;
105         my $cap = shift;
106         return unless (ref $self);
107
108         $self->{session_cap} = $cap if (defined $cap);
109         return $self->{session_cap};
110 }
111
112 sub request_cap {
113         my $self = shift;
114         my $cap = shift;
115         return unless (ref $self);
116
117         $self->{request_cap} = $cap if (defined $cap);
118         return $self->{request_cap};
119 }
120
121 sub adaptive {
122         my $self = shift;
123         my $adapt = shift;
124         return unless (ref $self);
125
126         $self->{adaptive} = $adapt if (defined $adapt);
127         return $self->{adaptive};
128 }
129
130 sub completed {
131         my $self = shift;
132         my $count = shift;
133         return unless (ref $self);
134
135
136         if (wantarray) {
137                 $count ||= scalar @{$self->{completed}}; 
138         }
139
140         if (defined $count) {
141                 return () unless (@{$self->{completed}});
142                 return splice @{$self->{completed}}, 0, $count;
143         }
144
145         return scalar @{$self->{completed}};
146 }
147
148 sub failed {
149         my $self = shift;
150         my $count = shift;
151         return unless (ref $self);
152
153
154         if (wantarray) {
155                 $count ||= scalar @{$self->{failed}}; 
156         }
157
158         if (defined $count) {
159                 return () unless (@{$self->{failed}});
160                 return splice @{$self->{failed}}, 0, $count;
161         }
162
163         return scalar @{$self->{failed}};
164 }
165
166 sub running {
167         my $self = shift;
168         return unless (ref $self);
169         return scalar(@{ $self->{running} });
170 }
171
172
173 sub request {
174         my $self = shift;
175         my $hash_param;
176
177         my $method = shift;
178         if (ref $method) {
179                 $hash_param = $method;
180                 $method = shift;
181         }
182
183         my @params = @_;
184
185         $self->session_reap;
186         if ($self->running < $self->request_cap ) {
187                 my $index = $self->session_hash_function->($self, (defined $hash_param ? $hash_param : ()), $method, @params);
188                 my $ses = $self->{sessions}->[$index % $self->session_cap]; 
189
190                 #print "Running $method using session ".$ses->session_id."\n";
191
192                 my $req = $ses->request( $method, @params );
193
194                 push @{ $self->{running} },
195                         { req => $req,
196                           meth => $method,
197                           params => [@params]
198                         };
199
200                 $log->debug("Making request [$method] ".$self->running."...");
201
202                 return $req;
203         } elsif (!$self->adaptive) {
204                 #print "Oops.  Too many running: ".$self->running."\n";
205                 $self->session_wait;
206                 return $self->request((defined $hash_param ? $hash_param : ()), $method => @params);
207         } else {
208                 # XXX do addaptive stuff ...
209         }
210 }
211
212 sub session_wait {
213         my $self = shift;
214         my $all = shift;
215
216         my $count;
217         if ($all) {
218                 $count = $self->running;
219                 while ($self->running) {
220                         $self->session_reap;
221                 }
222                 return $count;
223         } else {
224                 while(($count = $self->session_reap) > 0) {
225                         usleep 100;
226                 }
227                 return $count;
228         }
229 }
230
231 sub session_reap {
232         my $self = shift;
233
234         my @done;
235         my @running;
236         while ( my $req = shift @{ $self->{running} } ) {
237                 if ($req->{req}->complete) {
238                         #print "Currently running: ".$self->running."\n";
239
240                         $req->{response} = [ $req->{req}->recv ];
241                         $req->{duration} = $req->{req}->duration;
242
243                         #print "Completed ".$req->{meth}." in session ".$req->{req}->session->session_id." [$req->{duration}]\n";
244
245                         if ($req->{req}->failed) {
246                                 #print "ARG!!!! Failed command $req->{meth} in session ".$req->{req}->session->session_id."\n";
247                                 $req->{error} = $req->{req}->failed;
248                                 push @{ $self->{failed} }, $req;
249                         } else {
250                                 push @{ $self->{completed} }, $req;
251                         }
252
253                         push @done, $req;
254
255                 } else {
256                         #$log->debug("Still running ".$req->{meth}." in session ".$req->{req}->session->session_id);
257                         push @running, $req;
258                 }
259         }
260         push @{ $self->{running} }, @running;
261
262         for my $req ( @done ) {
263                 my $handler = $req->{error} ? $self->failure_handler : $self->success_handler;
264                 $handler->($self, $req) if ($handler);
265
266                 $req->{req}->finish;
267                 delete $$req{$_} for (keys %$req);
268
269         }
270
271         my $complete = scalar @done;
272         my $incomplete = scalar @running;
273
274         #$log->debug("Still running $incomplete, completed $complete");
275
276         return $complete;
277 }
278
279 1;
280