]> git.evergreen-ils.org Git - OpenSRF.git/blob - src/perlmods/OpenSRF/MultiSession.pm
MultiSession, YAY
[OpenSRF.git] / src / perlmods / OpenSRF / MultiSession.pm
1 package OpenSRF::MultiSession;
2 use OpenSRF::AppSession;
3 use OpenSRF::Utils::Logger;
4 use Time::HiRes qw/time usleep/;
5
6 my $log = 'OpenSRF::Utils::Logger';
7
8 sub new {
9         my $class = shift;
10         $class = ref($class) || $class;
11
12         my $self = bless {@_} => $class;
13
14         $self->{api_level} = 1 if (!defined($self->{api_level}));
15         $self->{session_hash_function} = \&_dummy_session_hash_function
16                 if (!defined($self->{session_hash_function}));
17
18         if ($self->{cap}) {
19                 $self->session_cap($self->{cap});
20                 $self->request_cap($self->{cap});
21         }
22
23         if (!$self->session_cap) {
24                 # XXX make adaptive the default once the logic is in place
25                 #$self->adaptive(1);
26
27                 $self->session_cap(10);
28         }
29         if (!$self->request_cap) {
30                 # XXX make adaptive the default once the logic is in place
31                 #$self->adaptive(1);
32
33                 $self->request_cap(10);
34         }
35
36         $self->{sessions} = [];
37         $self->{running} = [];
38         $self->{completed} = [];
39         $self->{failed} = [];
40
41         for ( 1 .. $self->session_cap) {
42                 push @{ $self->{sessions} },
43                         OpenSRF::AppSession->create(
44                                 $self->{app},
45                                 $self->{api_level},
46                                 1
47                         );
48                 #print "Creating connection ".$self->{sessions}->[-1]->session_id." ...\n";
49                 $log->debug("Creating connection ".$self->{sessions}->[-1]->session_id." ...");
50         }
51
52         return $self;
53 }
54
55 sub _dummy_session_hash_function {
56         my $self = shift;
57         $self->{_dummy_hash_counter} = 0 if (!exists($self->{_dummy_hash_counter}));
58         $self->{_dummy_hash_counter}++;
59         return ( $self->{_dummy_hash_counter} % $self->session_cap ) - 1;
60 }
61
62 sub connect {
63         my $self = shift;
64         $_->connect for (@{$self->{sessions}});
65 }
66
67 sub finish {
68         my $self = shift;
69         $_->finish for (@{$self->{sessions}});
70 }
71
72 sub disconnect {
73         my $self = shift;
74         $_->disconnect for (@{$self->{sessions}});
75 }
76
77 sub session_hash_function {
78         my $self = shift;
79         my $session_hash_function = shift;
80         return unless (ref $self);
81
82         $self->{session_hash_function} = $session_hash_function if (defined $session_hash_function);
83         return $self->{session_hash_function};
84 }
85
86 sub failure_handler {
87         my $self = shift;
88         my $failure_handler = shift;
89         return unless (ref $self);
90
91         $self->{failure_handler} = $failure_handler if (defined $failure_handler);
92         return $self->{failure_handler};
93 }
94
95 sub success_handler {
96         my $self = shift;
97         my $success_handler = shift;
98         return unless (ref $self);
99
100         $self->{success_handler} = $success_handler if (defined $success_handler);
101         return $self->{success_handler};
102 }
103
104 sub session_cap {
105         my $self = shift;
106         my $cap = shift;
107         return unless (ref $self);
108
109         $self->{session_cap} = $cap if (defined $cap);
110         return $self->{session_cap};
111 }
112
113 sub request_cap {
114         my $self = shift;
115         my $cap = shift;
116         return unless (ref $self);
117
118         $self->{request_cap} = $cap if (defined $cap);
119         return $self->{request_cap};
120 }
121
122 sub adaptive {
123         my $self = shift;
124         my $adapt = shift;
125         return unless (ref $self);
126
127         $self->{adaptive} = $adapt if (defined $adapt);
128         return $self->{adaptive};
129 }
130
131 sub completed {
132         my $self = shift;
133         my $count = shift;
134         return unless (ref $self);
135
136
137         if (wantarray) {
138                 $count ||= scalar @{$self->{completed}}; 
139         }
140
141         if (defined $count) {
142                 return () unless (@{$self->{completed}});
143                 return splice @{$self->{completed}}, 0, $count;
144         }
145
146         return scalar @{$self->{completed}};
147 }
148
149 sub failed {
150         my $self = shift;
151         my $count = shift;
152         return unless (ref $self);
153
154
155         if (wantarray) {
156                 $count ||= scalar @{$self->{failed}}; 
157         }
158
159         if (defined $count) {
160                 return () unless (@{$self->{failed}});
161                 return splice @{$self->{failed}}, 0, $count;
162         }
163
164         return scalar @{$self->{failed}};
165 }
166
167 sub running {
168         my $self = shift;
169         return unless (ref $self);
170         return scalar(@{ $self->{running} });
171 }
172
173
174 sub request {
175         my $self = shift;
176         my $method = shift;
177         my @params = @_;
178
179         if ($self->running < $self->request_cap ) {
180                 my $index = $self->session_hash_function->($self, $method, @params);
181                 my $ses = $self->{sessions}->[$index]; 
182
183                 #print "Running $method using session ".$ses->session_id."\n";
184
185                 my $req = $ses->request( $method, @params );
186
187                 push @{ $self->{running} },
188                         { req => $req,
189                           meth => $method,
190                           params => [@params],
191                           start => time,
192                         };
193
194                 $log->debug("Making request [$method] ".$self->running."...");
195
196                 return $req;
197         } elsif (!$self->adaptive) {
198                 #print "Oops.  Too many running: ".$self->running."\n";
199                 $self->session_wait;
200                 return $self->request($method => @params);
201         } else {
202                 # XXX do addaptive stuff ...
203         }
204 }
205
206 sub session_wait {
207         my $self = shift;
208         my $all = shift;
209
210         if ($all) {
211                 while ($self->running) {
212                         $self->session_reap;
213                 }
214         } else {
215                 while(!$self->session_reap) {
216                         usleep 10;
217                 }
218         }
219 }
220
221 sub session_reap {
222         my $self = shift;
223
224         my $done = 0;
225         my @running;
226         while ( my $req = shift @{ $self->{running} } ) {
227                 if ($req->{req}->complete) {
228                         #print "Currently running: ".$self->running."\n";
229
230                         $req->{response} = [$req->{req}->recv];
231
232                         $req->{end} = time;
233                         $req->{duration} = $req->{end} - $req->{start};
234
235                         #print "Completed ".$req->{meth}." in session ".$req->{req}->session->session_id." [$req->{duration}]\n";
236
237                         if ($req->{req}->failed) {
238                                 #print "ARG!!!! Failed command $req->{meth} in session ".$req->{req}->session->session_id."\n";
239                                 $req->{error} = $req->{req}->failed;
240                                 push @{ $self->{failed} }, $req;
241                         } else {
242                                 push @{ $self->{completed} }, $req;
243                         }
244
245                         $req->{req}->finish;
246                         delete $$req{req};
247
248                         my $handler = $req->{error} ? $self->failure_handler : $self->success_handler;
249
250                         $handler->($self, $req) if ($handler);
251                         
252                         $done++;
253                 } else {
254                         #print "Still running ".$req->{meth}." in session ".$req->{req}->session->session_id."\n";
255                         push @running, $req;
256                 }
257         }
258         push @{ $self->{running} }, @running;
259         return $done
260 }
261
262 1;
263