e1960272170ad575f74aeb5b7b3dc3dafc1eb2ed
[OpenSRF.git] / src / perl / lib / OpenSRF / MultiSession.pm
1 package OpenSRF::MultiSession;
2 use OpenSRF::AppSession;
3 use OpenSRF::Utils::Logger;
4 use Time::HiRes qw/time usleep/;
5
6 my $log = 'OpenSRF::Utils::Logger';
7
8 sub new {
9         my $class = shift;
10         $class = ref($class) || $class;
11
12         my $self = bless {@_} => $class;
13
14         $self->{api_level} = 1 if (!defined($self->{api_level}));
15         $self->{session_hash_function} = \&_dummy_session_hash_function
16                 if (!defined($self->{session_hash_function}));
17
18         if ($self->{cap}) {
19                 $self->session_cap($self->{cap}) if (!$self->session_cap);
20                 $self->request_cap($self->{cap}) if (!$self->request_cap);
21         }
22
23         if (!$self->session_cap) {
24                 # XXX make adaptive the default once the logic is in place
25                 #$self->adaptive(1);
26
27                 $self->session_cap(10);
28         }
29         if (!$self->request_cap) {
30                 # XXX make adaptive the default once the logic is in place
31                 #$self->adaptive(1);
32
33                 $self->request_cap(10);
34         }
35
36         $self->{sessions} = [];
37         $self->{running} = [];
38         $self->{completed} = [];
39         $self->{failed} = [];
40
41         for ( 1 .. $self->session_cap) {
42                 push @{ $self->{sessions} },
43                         OpenSRF::AppSession->create(
44                                 $self->{app},
45                                 $self->{api_level},
46                                 1
47                         );
48                 #print "Creating connection ".$self->{sessions}->[-1]->session_id." ...\n";
49                 $log->debug("Creating connection ".$self->{sessions}->[-1]->session_id." ...");
50         }
51
52         return $self;
53 }
54
55 sub _dummy_session_hash_function {
56         my $self = shift;
57         $self->{_dummy_hash_counter} = 1 if (!exists($self->{_dummy_hash_counter}));
58         return $self->{_dummy_hash_counter}++;
59 }
60
61 sub connect {
62         my $self = shift;
63         for my $ses (@{$self->{sessions}}) {
64                 $ses->connect unless ($ses->connected);
65         }
66 }
67
68 sub finish {
69         my $self = shift;
70         $_->finish for (@{$self->{sessions}});
71 }
72
73 sub disconnect {
74         my $self = shift;
75         $_->disconnect for (@{$self->{sessions}});
76 }
77
78 sub session_hash_function {
79         my $self = shift;
80         my $session_hash_function = shift;
81         return unless (ref $self);
82
83         $self->{session_hash_function} = $session_hash_function if (defined $session_hash_function);
84         return $self->{session_hash_function};
85 }
86
87 sub failure_handler {
88         my $self = shift;
89         my $failure_handler = shift;
90         return unless (ref $self);
91
92         $self->{failure_handler} = $failure_handler if (defined $failure_handler);
93         return $self->{failure_handler};
94 }
95
96 sub success_handler {
97         my $self = shift;
98         my $success_handler = shift;
99         return unless (ref $self);
100
101         $self->{success_handler} = $success_handler if (defined $success_handler);
102         return $self->{success_handler};
103 }
104
105 sub session_cap {
106         my $self = shift;
107         my $cap = shift;
108         return unless (ref $self);
109
110         $self->{session_cap} = $cap if (defined $cap);
111         return $self->{session_cap};
112 }
113
114 sub request_cap {
115         my $self = shift;
116         my $cap = shift;
117         return unless (ref $self);
118
119         $self->{request_cap} = $cap if (defined $cap);
120         return $self->{request_cap};
121 }
122
123 sub adaptive {
124         my $self = shift;
125         my $adapt = shift;
126         return unless (ref $self);
127
128         $self->{adaptive} = $adapt if (defined $adapt);
129         return $self->{adaptive};
130 }
131
132 sub completed {
133         my $self = shift;
134         my $count = shift;
135         return unless (ref $self);
136
137
138         if (wantarray) {
139                 $count ||= scalar @{$self->{completed}}; 
140         }
141
142         if (defined $count) {
143                 return () unless (@{$self->{completed}});
144                 return splice @{$self->{completed}}, 0, $count;
145         }
146
147         return scalar @{$self->{completed}};
148 }
149
150 sub failed {
151         my $self = shift;
152         my $count = shift;
153         return unless (ref $self);
154
155
156         if (wantarray) {
157                 $count ||= scalar @{$self->{failed}}; 
158         }
159
160         if (defined $count) {
161                 return () unless (@{$self->{failed}});
162                 return splice @{$self->{failed}}, 0, $count;
163         }
164
165         return scalar @{$self->{failed}};
166 }
167
168 sub running {
169         my $self = shift;
170         return unless (ref $self);
171         return scalar(@{ $self->{running} });
172 }
173
174
175 sub request {
176         my $self = shift;
177         my $hash_param;
178
179         my $method = shift;
180         if (ref $method) {
181                 $hash_param = $method;
182                 $method = shift;
183         }
184
185         my @params = @_;
186
187         $self->session_reap;
188         if ($self->running < $self->request_cap ) {
189                 my $index = $self->session_hash_function->($self, (defined $hash_param ? $hash_param : ()), $method, @params);
190                 my $ses = $self->{sessions}->[$index % $self->session_cap]; 
191
192                 #print "Running $method using session ".$ses->session_id."\n";
193
194                 my $req = $ses->request( $method, @params );
195
196                 push @{ $self->{running} },
197                         { req => $req,
198                           meth => $method,
199                           hash => $hash_param,
200                           params => [@params]
201                         };
202
203                 $log->debug("Making request [$method] ".$self->running."...");
204
205                 return $req;
206         } elsif (!$self->adaptive) {
207                 #print "Oops.  Too many running: ".$self->running."\n";
208                 $self->session_wait;
209                 return $self->request((defined $hash_param ? $hash_param : ()), $method => @params);
210         } else {
211                 # XXX do addaptive stuff ...
212         }
213 }
214
215 sub session_wait {
216         my $self = shift;
217         my $all = shift;
218         my $xmpp = OpenSRF::Transport::PeerHandle->retrieve;
219
220         my $count;
221         if ($all) {
222                 $count = $self->running;
223                 while ($self->running) {
224                         # block on the xmpp socket until data arrives
225                         $xmpp->process(-1);
226                         $self->session_reap;
227                 }
228                 return $count;
229         } else {
230                 while(($count = $self->session_reap) == 0 && $self->running) {
231                         # block on the xmpp socket until data arrives
232                         $xmpp->process(-1);
233                 }
234                 return $count;
235         }
236 }
237
238 sub session_reap {
239         my $self = shift;
240
241         my @done;
242         my @running;
243         while ( my $req = shift @{ $self->{running} } ) {
244                 if ($req->{req}->complete) {
245                         #print "Currently running: ".$self->running."\n";
246
247                         $req->{response} = [ $req->{req}->recv ];
248                         $req->{duration} = $req->{req}->duration;
249
250                         #print "Completed ".$req->{meth}." in session ".$req->{req}->session->session_id." [$req->{duration}]\n";
251
252                         if ($req->{req}->failed) {
253                                 #print "ARG!!!! Failed command $req->{meth} in session ".$req->{req}->session->session_id."\n";
254                                 $req->{error} = $req->{req}->failed;
255                                 push @{ $self->{failed} }, $req;
256                         } else {
257                                 push @{ $self->{completed} }, $req;
258                         }
259
260                         push @done, $req;
261
262                 } else {
263                         #$log->debug("Still running ".$req->{meth}." in session ".$req->{req}->session->session_id);
264                         push @running, $req;
265                 }
266         }
267         push @{ $self->{running} }, @running;
268
269         for my $req ( @done ) {
270                 my $handler = $req->{error} ? $self->failure_handler : $self->success_handler;
271                 $handler->($self, $req) if ($handler);
272
273                 $req->{req}->finish;
274                 delete $$req{$_} for (keys %$req);
275
276         }
277
278         my $complete = scalar @done;
279         my $incomplete = scalar @running;
280
281         #$log->debug("Still running $incomplete, completed $complete");
282
283         return $complete;
284 }
285
286 1;
287