]> git.evergreen-ils.org Git - OpenSRF.git/blob - src/perlmods/OpenSRF/Application/Persist.pm
added .slot.set_expire interface for setting/removing/updating the expire time of...
[OpenSRF.git] / src / perlmods / OpenSRF / Application / Persist.pm
1 package OpenSRF::Application::Persist;
2 use base qw/OpenSRF::Application/;
3 use OpenSRF::Application;
4
5 use OpenSRF::Utils::SettingsClient;
6 use OpenSRF::EX qw/:try/;
7 use OpenSRF::Utils qw/:common/;
8 use OpenSRF::Utils::Logger;
9 use JSON;
10 use DBI;
11
12 use vars qw/$dbh $log $default_expire_time/;
13
14 sub initialize {
15         $log = 'OpenSRF::Utils::Logger';
16
17         $sc = OpenSRF::Utils::SettingsClient->new;
18
19         my $dbfile = $sc->config_value( apps => persist => app_settings => 'dbfile');
20         unless ($dbfile) {
21                 throw OpenSRF::EX::PANIC ("Can't find my dbfile for SQLite!");
22         }
23
24         my $init_dbh = DBI->connect("dbi:SQLite:dbname=$dbfile","","");
25         $init_dbh->{AutoCommit} = 1;
26         $init_dbh->{RaiseError} = 0;
27
28         $init_dbh->do( <<"      SQL" );
29                 CREATE TABLE storage (
30                         id      INTEGER PRIMARY KEY,
31                         name_id INTEGER,
32                         value   TEXT
33                 );
34         SQL
35
36         $init_dbh->do( <<"      SQL" );
37                 CREATE TABLE store_name (
38                         id      INTEGER PRIMARY KEY,
39                         name    TEXT UNIQUE
40                 );
41         SQL
42
43         $init_dbh->do( <<"      SQL" );
44                 CREATE TABLE store_expire (
45                         id              INTEGER PRIMARY KEY,
46                         atime           INTEGER,
47                         expire_interval INTEGER
48                 );
49         SQL
50
51 }
52
53 sub child_init {
54         my $sc = OpenSRF::Utils::SettingsClient->new;
55
56         $default_expire_time = $sc->config_value( apps => persist => app_settings => 'default_expire_time' );
57         $default_expire_time ||= 300;
58
59         my $dbfile = $sc->config_value( apps => persist => app_settings => 'dbfile');
60         unless ($dbfile) {
61                 throw OpenSRF::EX::PANIC ("Can't find my dbfile for SQLite!");
62         }
63
64         $dbh = DBI->connect("dbi:SQLite:dbname=$dbfile","","");
65         $dbh->{AutoCommit} = 1;
66         $dbh->{RaiseError} = 0;
67
68 }
69
70 sub create_store {
71         my $self = shift;
72         my $client = shift;
73
74         my $name = shift || '';
75
76         
77         my $continue = 0;
78         try {
79                 _get_name_id($name);
80
81         } catch Error with { 
82                 $continue++;
83         };
84
85         throw OpenSRF::EX::WARN ("Duplicate key:  object name [$name] already exists!  " . $dbh->errstr)
86                 unless ($continue);
87
88         my $sth = $dbh->prepare("INSERT INTO store_name (name) VALUES (?);");
89         $sth->execute($name);
90         $sth->finish;
91
92         unless ($name) {
93                 my $last_id = $dbh->last_insert_id(undef, undef, 'store_name', 'id');
94                 $name = 'AUTOGENERATED!!'.$last_id;
95                 $dbh->do("UPDATE store_name SET name = '$name' WHERE id = '$last_id';");
96         }
97
98         _flush_by_name($name);
99         return $name;
100 }
101 __PACKAGE__->register_method(
102         api_name => 'opensrf.persist.slot.create',
103         method => 'create_store',
104         argc => 1,
105 );
106
107
108 sub create_expirable_store {
109         my $self = shift;
110         my $client = shift;
111         my $name = shift || do { throw OpenSRF::EX::InvalidArg ("Expirable slots must be given a name!") };
112         my $time = shift || $default_expire_time;
113
114         try {
115                 __PACKAGE__->method_lookup( 'opensrf.persist.slot.create' )->run( $name );
116         } catch Error with {
117                 my $e = shift;
118                 if ($e->message =~ /^Duplicate key/o) {
119                         throw $e;
120                 }
121         };
122
123         $self->method_lookup('opensrf.persist.slot.set_expire')->run($name, $time);
124
125         return $name;
126 }
127 __PACKAGE__->register_method(
128         api_name => 'opensrf.persist.slot.create_expirable',
129         method => 'create_expirable_store',
130         argc => 2,
131 );
132
133 sub _update_expire_atime {
134         my $id = shift;
135         $dbh->do('UPDATE store_expire SET atime = ? WHERE id = ?', {}, time(), $id);
136 }
137
138 sub set_expire_interval {
139         my $self = shift;
140         my $client = shift;
141         my $slot = shift;
142         my $new_interval = shift;
143
144         my $etime = interval_to_seconds($new_interval);
145         my $sid = _get_name_id($slot);
146
147         $dbh->do('DELETE FROM store_expire where id = ?', {}, $sid);
148         return $slot if ($etime == 0);
149
150         $dbh->do('INSERT INTO store_expire (id, atime, expire_interval) VALUES (?,?,?);',{},$sid,time(),$etime);
151         return $slot;
152 }
153 __PACKAGE__->register_method(
154         api_name => 'opensrf.persist.slot.set_expire',
155         method => 'set_expire_interval',
156         argc => 2,
157 );
158
159
160 sub _sweep_expired_slots {
161         return if (shift());
162
163         my @expired_slots = $dbh->selectcol_arrayref(<<"        SQL", {}, time() );
164                 SELECT id FROM store_expire WHERE (atime + expire_interval) <= ?;
165         SQL
166
167         return unless (@expired_slots);
168
169         $dbh->do('DELETE FROM storage WHERE name_id IN ('.join(',', map { '?' } @expired_slots).');', {}, @expired_slots);
170         for my $id (@expired_slots) {
171                 _flush_by_name(_get_id_name($id), 1);
172         }
173 }
174
175 sub add_item {
176         my $self = shift;
177         my $client = shift;
178
179         my $name = shift or do {
180                 throw OpenSRF::EX::WARN ("No name specified!");
181         };
182         my $value = shift || '';
183
184         my $name_id = _get_name_id($name);
185         
186         if ($self->api_name =~ /object/) {
187                 $dbh->do('DELETE FROM storage WHERE name_id = ?;', {}, $name_id);
188         }
189
190         $dbh->do('INSERT INTO storage (name_id,value) VALUES (?,?);', {}, $name_id, JSON->perl2JSON($value));
191
192         _flush_by_name($name);
193
194         return 0 if ($dbh->err);
195         return $name;
196 }
197 __PACKAGE__->register_method(
198         api_name => 'opensrf.persist.object.set',
199         method => 'add_item',
200         argc => 2,
201 );
202 __PACKAGE__->register_method(
203         api_name => 'opensrf.persist.queue.push',
204         method => 'add_item',
205         argc => 2,
206 );
207 __PACKAGE__->register_method(
208         api_name => 'opensrf.persist.stack.push',
209         method => 'add_item',
210         argc => 2,
211 );
212
213 sub _get_id_name {
214         my $name = shift or do {
215                 throw OpenSRF::EX::WARN ("No slot id specified!");
216         };
217
218
219         my $name_id = $dbh->selectcol_arrayref("SELECT name FROM store_name WHERE id = ?;", {}, $name);
220
221         if (!ref($name_id) || !defined($name_id->[0])) {
222                 throw OpenSRF::EX::WARN ("Slot id [$name] does not exist!");
223         }
224
225         return $name_id->[0];
226 }
227
228 sub _get_name_id {
229         my $name = shift or do {
230                 throw OpenSRF::EX::WARN ("No slot name specified!");
231         };
232
233
234         my $name_id = $dbh->selectcol_arrayref("SELECT id FROM store_name WHERE name = ?;", {}, $name);
235
236         if (!ref($name_id) || !defined($name_id->[0])) {
237                 throw OpenSRF::EX::WARN ("Slot name [$name] does not exist!");
238         }
239
240         return $name_id->[0];
241 }
242
243 sub destroy_store {
244         my $self = shift;
245         my $client = shift;
246
247         my $name = shift;
248
249         my $name_id = _get_name_id($name);
250
251         $dbh->do("DELETE FROM storage WHERE name_id = ?;", {}, $name_id);
252         _flush_by_name($name);
253
254 }
255 __PACKAGE__->register_method(
256         api_name => 'opensrf.persist.slot.destroy',
257         method => 'destroy_store',
258         argc => 1,
259 );
260
261 sub _flush_by_name {
262         my $name = shift;
263         my $no_sweep = shift;
264  
265         my $name_id = _get_name_id($name);
266
267         unless ($no_sweep) {
268                 _update_expire_atime($name);
269                 _sweep_expired_slots();
270         }
271         
272         if ($name =~ /^AUTOGENERATED!!/) {
273                 my $count = $dbh->selectcol_arrayref("SELECT COUNT(*) FROM storage WHERE name_id = ?;", {}, $name_id);
274                 if (!ref($count) || $$count[0] == 0) {
275                         $dbh->do("DELETE FROM store_name WHERE name = ?;", {}, $name);
276                 }
277         }
278 }
279         
280 sub pop_queue {
281         my $self = shift;
282         my $client = shift;
283
284         my $name = shift or do {
285                 throw OpenSRF::EX::WARN ("No queue name specified!");
286         };
287         my $name_id = _get_name_id($name);
288
289         my $value = $dbh->selectrow_arrayref('SELECT id, value FROM storage WHERE name_id = ? ORDER BY id ASC LIMIT 1;', {}, $name_id);
290         $dbh->do('DELETE FROM storage WHERE id = ?;',{}, $value->[0]) unless ($self->api_name =~ /peek$/);
291
292         _flush_by_name($name);
293
294         return JSON->JSON2perl( $value->[1] );
295 }
296 __PACKAGE__->register_method(
297         api_name => 'opensrf.persist.queue.peek',
298         method => 'pop_queue',
299         argc => 1,
300 );
301 __PACKAGE__->register_method(
302         api_name => 'opensrf.persist.queue.pop',
303         method => 'pop_queue',
304         argc => 1,
305 );
306
307
308 sub peek_slot {
309         my $self = shift;
310         my $client = shift;
311
312         my $name = shift or do {
313                 throw OpenSRF::EX::WARN ("No slot name specified!");
314         };
315         my $name_id = _get_name_id($name);
316
317         my $order = 'ASC';
318         $order = 'DESC' if ($self->api_name =~ /stack/o);
319         
320         my $values = $dbh->selectall_arrayref("SELECT value FROM storage WHERE name_id = ? ORDER BY id $order;", {}, $name_id);
321
322         $client->respond( JSON->JSON2perl( $_->[0] ) ) for (@$values);
323
324         _flush_by_name($name);
325         return undef;
326 }
327 __PACKAGE__->register_method(
328         api_name => 'opensrf.persist.queue.peek.all',
329         method => 'peek_slot',
330         argc => 1,
331         stream => 1,
332 );
333 __PACKAGE__->register_method(
334         api_name => 'opensrf.persist.stack.peek.all',
335         method => 'peek_slot',
336         argc => 1,
337         stream => 1,
338 );
339
340
341 sub store_size {
342         my $self = shift;
343         my $client = shift;
344
345         my $name = shift or do {
346                 throw OpenSRF::EX::WARN ("No queue name specified!");
347         };
348         my $name_id = _get_name_id($name);
349
350         my $value = $dbh->selectcol_arrayref('SELECT SUM(LENGTH(value)) FROM storage WHERE name_id = ?;', {}, $name_id);
351
352         return JSON->JSON2perl( $value->[0] );
353 }
354 __PACKAGE__->register_method(
355         api_name => 'opensrf.persist.queue.size',
356         method => 'shift_stack',
357         argc => 1,
358 );
359 __PACKAGE__->register_method(
360         api_name => 'opensrf.persist.stack.size',
361         method => 'shift_stack',
362         argc => 1,
363 );
364 __PACKAGE__->register_method(
365         api_name => 'opensrf.persist.object.size',
366         method => 'shift_stack',
367         argc => 1,
368 );
369
370 sub store_depth {
371         my $self = shift;
372         my $client = shift;
373
374         my $name = shift or do {
375                 throw OpenSRF::EX::WARN ("No queue name specified!");
376         };
377         my $name_id = _get_name_id($name);
378
379         my $value = $dbh->selectcol_arrayref('SELECT COUNT(*) FROM storage WHERE name_id = ?;', {}, $name_id);
380
381         return JSON->JSON2perl( $value->[0] );
382 }
383 __PACKAGE__->register_method(
384         api_name => 'opensrf.persist.queue.length',
385         method => 'shift_stack',
386         argc => 1,
387 );
388 __PACKAGE__->register_method(
389         api_name => 'opensrf.persist.stack.depth',
390         method => 'shift_stack',
391         argc => 1,
392 );
393
394 sub shift_stack {
395         my $self = shift;
396         my $client = shift;
397
398         my $name = shift or do {
399                 throw OpenSRF::EX::WARN ("No queue name specified!");
400         };
401         my $name_id = _get_name_id($name);
402
403         my $value = $dbh->selectrow_arrayref('SELECT id, value FROM storage WHERE name_id = ? ORDER BY id DESC LIMIT 1;', {}, $name_id);
404         $dbh->do('DELETE FROM storage WHERE id = ?;',{}, $value->[0]) unless ($self->api_name =~ /peek$/);
405
406         _flush_by_name($name);
407
408         return JSON->JSON2perl( $value->[1] );
409 }
410 __PACKAGE__->register_method(
411         api_name => 'opensrf.persist.stack.peek',
412         method => 'shift_stack',
413         argc => 1,
414 );
415 __PACKAGE__->register_method(
416         api_name => 'opensrf.persist.stack.pop',
417         method => 'shift_stack',
418         argc => 1,
419 );
420
421 sub get_object {
422         my $self = shift;
423         my $client = shift;
424
425         my $name = shift or do {
426                 throw OpenSRF::EX::WARN ("No object name specified!");
427         };
428
429         my $name_id = _get_name_id($name);
430
431         my $value = $dbh->selectrow_arrayref('SELECT name_id, value FROM storage WHERE name_id = ? ORDER BY id DESC LIMIT 1;', {}, $name_id);
432         $dbh->do('DELETE FROM storage WHERE name_id = ?',{}, $value->[0]) unless ($self->api_name =~ /peek$/);
433
434         _flush_by_name($name);
435
436         return JSON->JSON2perl( $value->[1] );
437 }
438 __PACKAGE__->register_method(
439         api_name => 'opensrf.persist.object.peek',
440         method => 'shift_stack',
441         argc => 1,
442 );
443 __PACKAGE__->register_method(
444         api_name => 'opensrf.persist.object.get',
445         method => 'shift_stack',
446         argc => 1,
447 );
448
449 1;