]> git.evergreen-ils.org Git - OpenSRF.git/blob - src/perlmods/OpenSRF/Application/Persist.pm
removing expire times
[OpenSRF.git] / src / perlmods / OpenSRF / Application / Persist.pm
1 package OpenSRF::Application::Persist;
2 use base qw/OpenSRF::Application/;
3 use OpenSRF::Application;
4
5 use OpenSRF::Utils::SettingsClient;
6 use OpenSRF::EX qw/:try/;
7 use OpenSRF::Utils qw/:common/;
8 use OpenSRF::Utils::Logger;
9 use JSON;
10 use DBI;
11
12 use vars qw/$dbh $log $default_expire_time/;
13
14 sub initialize {
15         $log = 'OpenSRF::Utils::Logger';
16
17         $sc = OpenSRF::Utils::SettingsClient->new;
18
19         my $dbfile = $sc->config_value( apps => 'opensrf.persist' => app_settings => 'dbfile');
20         unless ($dbfile) {
21                 throw OpenSRF::EX::PANIC ("Can't find my dbfile for SQLite!");
22         }
23
24         my $init_dbh = DBI->connect("dbi:SQLite:dbname=$dbfile","","");
25         $init_dbh->{AutoCommit} = 1;
26         $init_dbh->{RaiseError} = 0;
27
28         $init_dbh->do( <<"      SQL" );
29                 CREATE TABLE storage (
30                         id      INTEGER PRIMARY KEY,
31                         name_id INTEGER,
32                         value   TEXT
33                 );
34         SQL
35
36         $init_dbh->do( <<"      SQL" );
37                 CREATE TABLE store_name (
38                         id      INTEGER PRIMARY KEY,
39                         name    TEXT UNIQUE
40                 );
41         SQL
42
43         $init_dbh->do( <<"      SQL" );
44                 CREATE TABLE store_expire (
45                         id              INTEGER PRIMARY KEY,
46                         atime           INTEGER,
47                         expire_interval INTEGER
48                 );
49         SQL
50
51 }
52
53 sub child_init {
54         my $sc = OpenSRF::Utils::SettingsClient->new;
55
56         $default_expire_time = $sc->config_value( apps => 'opensrf.persist' => app_settings => 'default_expire_time' );
57         $default_expire_time ||= 300;
58
59         my $dbfile = $sc->config_value( apps => 'opensrf.persist' => app_settings => 'dbfile');
60         unless ($dbfile) {
61                 throw OpenSRF::EX::PANIC ("Can't find my dbfile for SQLite!");
62         }
63
64         $dbh = DBI->connect("dbi:SQLite:dbname=$dbfile","","");
65         $dbh->{AutoCommit} = 1;
66         $dbh->{RaiseError} = 0;
67
68 }
69
70 sub create_store {
71         my $self = shift;
72         my $client = shift;
73
74         my $name = shift || '';
75
76         try {
77         
78                 my $continue = 0;
79                 try {
80                         _get_name_id($name);
81
82                 } catch Error with { 
83                         $continue++;
84                 };
85
86                 throw OpenSRF::EX::WARN ("Duplicate key:  object name [$name] already exists!  " . $dbh->errstr)
87                         unless ($continue);
88
89                 my $sth = $dbh->prepare("INSERT INTO store_name (name) VALUES (?);");
90                 $sth->execute($name);
91                 $sth->finish;
92
93                 unless ($name) {
94                         my $last_id = $dbh->last_insert_id(undef, undef, 'store_name', 'id');
95                         $name = 'AUTOGENERATED!!'.$last_id;
96                         $dbh->do("UPDATE store_name SET name = '$name' WHERE id = '$last_id';");
97                 }
98
99                 _flush_by_name($name);
100                 return $name;
101         } catch Error with {
102                 return undef;
103         };
104 }
105 __PACKAGE__->register_method(
106         api_name => 'opensrf.persist.slot.create',
107         method => 'create_store',
108         argc => 1,
109 );
110
111
112 sub create_expirable_store {
113         my $self = shift;
114         my $client = shift;
115         my $name = shift || do { throw OpenSRF::EX::InvalidArg ("Expirable slots must be given a name!") };
116         my $time = shift || $default_expire_time;
117
118         try {
119                 ($name) = $self->method_lookup( 'opensrf.persist.slot.create' )->run( $name );
120                 return undef unless $name;
121
122                 $self->method_lookup('opensrf.persist.slot.set_expire')->run($name, $time);
123                 return $name;
124         } catch Error with {
125                 return undef;
126         };
127
128 }
129 __PACKAGE__->register_method(
130         api_name => 'opensrf.persist.slot.create_expirable',
131         method => 'create_expirable_store',
132         argc => 2,
133 );
134
135 sub _update_expire_atime {
136         my $id = shift;
137         $dbh->do('UPDATE store_expire SET atime = ? WHERE id = ?', {}, time(), $id);
138 }
139
140 sub set_expire_interval {
141         my $self = shift;
142         my $client = shift;
143         my $slot = shift;
144         my $new_interval = shift;
145
146         try {
147                 my $etime = interval_to_seconds($new_interval);
148                 my $sid = _get_name_id($slot);
149
150                 $dbh->do('DELETE FROM store_expire where id = ?', {}, $sid);
151                 return 0 if ($etime == 0);
152
153                 $dbh->do('INSERT INTO store_expire (id, atime, expire_interval) VALUES (?,?,?);',{},$sid,time(),$etime);
154                 return $etime;
155         } 
156 }
157 __PACKAGE__->register_method(
158         api_name => 'opensrf.persist.slot.set_expire',
159         method => 'set_expire_interval',
160         argc => 2,
161 );
162
163 sub find_slot {
164         my $self = shift;
165         my $client = shift;
166         my $slot = shift;
167
168         my $sid = _get_name_id($slot);
169         return $slot if ($sid);
170         return undef;
171 }
172 __PACKAGE__->register_method(
173         api_name => 'opensrf.persist.slot.find',
174         method => 'find_slot',
175         argc => 2,
176 );
177
178 sub get_expire_interval {
179         my $self = shift;
180         my $client = shift;
181         my $slot = shift;
182
183         my $sid = _get_name_id($slot);
184         my ($int) = $dbh->selectrow_array('SELECT expire_interval FROM store_expire WHERE id = ?;',{},$sid);
185         return undef unless ($int);
186
187         my ($future) = $dbh->selectrow_array('SELECT atime + expire_interval FROM store_expire WHERE id = ?;',{},$sid);
188         return $future - time();
189 }
190 __PACKAGE__->register_method(
191         api_name => 'opensrf.persist.slot.get_expire',
192         method => 'get_expire_interval',
193         argc => 2,
194 );
195
196
197 sub _sweep_expired_slots {
198         return if (shift());
199
200         my $expired_slots = $dbh->selectcol_arrayref(<<"        SQL", {}, time() );
201                 SELECT id FROM store_expire WHERE (atime + expire_interval) <= ?;
202         SQL
203
204         return unless ($expired_slots);
205
206         $dbh->do('DELETE FROM storage WHERE name_id IN ('.join(',', map { '?' } @$expired_slots).');', {}, @$expired_slots);
207         $dbh->do('DELETE FROM store_expire WHERE id IN ('.join(',', map { '?' } @$expired_slots).');', {}, @$expired_slots);
208         for my $id (@$expired_slots) {
209                 _flush_by_name(_get_id_name($id), 1);
210         }
211 }
212
213 sub add_item {
214         my $self = shift;
215         my $client = shift;
216
217         my $name = shift or do {
218                 throw OpenSRF::EX::WARN ("No name specified!");
219         };
220
221         my $value = shift || '';
222
223         try {
224                 my $name_id = _get_name_id($name);
225         
226                 if ($self->api_name =~ /object/) {
227                         $dbh->do('DELETE FROM storage WHERE name_id = ?;', {}, $name_id);
228                 }
229
230                 $dbh->do('INSERT INTO storage (name_id,value) VALUES (?,?);', {}, $name_id, JSON->perl2JSON($value));
231
232                 _flush_by_name($name);
233
234                 return $name;
235         } catch Error with {
236                 return undef;
237         };
238 }
239 __PACKAGE__->register_method(
240         api_name => 'opensrf.persist.object.set',
241         method => 'add_item',
242         argc => 2,
243 );
244 __PACKAGE__->register_method(
245         api_name => 'opensrf.persist.queue.push',
246         method => 'add_item',
247         argc => 2,
248 );
249 __PACKAGE__->register_method(
250         api_name => 'opensrf.persist.stack.push',
251         method => 'add_item',
252         argc => 2,
253 );
254
255 sub _get_id_name {
256         my $name = shift or do {
257                 throw OpenSRF::EX::WARN ("No slot id specified!");
258         };
259
260
261         my $name_id = $dbh->selectcol_arrayref("SELECT name FROM store_name WHERE id = ?;", {}, $name);
262
263         if (!ref($name_id) || !defined($name_id->[0])) {
264                 throw OpenSRF::EX::WARN ("Slot id [$name] does not exist!");
265         }
266
267         return $name_id->[0];
268 }
269
270 sub _get_name_id {
271         my $name = shift or do {
272                 throw OpenSRF::EX::WARN ("No slot name specified!");
273         };
274
275
276         my $name_id = $dbh->selectrow_arrayref("SELECT id FROM store_name WHERE name = ?;", {}, $name);
277
278         if (!ref($name_id) || !defined($name_id->[0])) {
279                 throw OpenSRF::EX::WARN ("Slot name [$name] does not exist!");
280         }
281
282         return $name_id->[0];
283 }
284
285 sub destroy_store {
286         my $self = shift;
287         my $client = shift;
288
289         my $name = shift;
290
291         my $problem = 0;
292         try {
293                 my $name_id = _get_name_id($name);
294         
295                 $dbh->do("DELETE FROM storage WHERE name_id = ?;", {}, $name_id);
296                 $dbh->do("DELETE FROM store_name WHERE id = ?;", {}, $name_id);
297                 $dbh->do("DELETE FROM store_expire WHERE id = ?;", {}, $name_id);
298
299                 _sweep_expired_slots();
300                 return $name;
301         } catch Error with {
302                 return undef;
303         };
304
305 }
306 __PACKAGE__->register_method(
307         api_name => 'opensrf.persist.slot.destroy',
308         method => 'destroy_store',
309         argc => 1,
310 );
311
312 sub _flush_by_name {
313         my $name = shift;
314         my $no_sweep = shift;
315  
316         my $name_id = _get_name_id($name);
317
318         unless ($no_sweep) {
319                 _update_expire_atime($name);
320                 _sweep_expired_slots();
321         }
322         
323         if ($name =~ /^AUTOGENERATED!!/) {
324                 my $count = $dbh->selectcol_arrayref("SELECT COUNT(*) FROM storage WHERE name_id = ?;", {}, $name_id);
325                 if (!ref($count) || $$count[0] == 0) {
326                         $dbh->do("DELETE FROM store_name WHERE name = ?;", {}, $name);
327                 }
328         }
329 }
330         
331 sub pop_queue {
332         my $self = shift;
333         my $client = shift;
334
335         my $name = shift or do {
336                 throw OpenSRF::EX::WARN ("No queue name specified!");
337         };
338
339         try {
340                 my $name_id = _get_name_id($name);
341
342                 my $value = $dbh->selectrow_arrayref('SELECT id, value FROM storage WHERE name_id = ? ORDER BY id ASC LIMIT 1;', {}, $name_id);
343                 $dbh->do('DELETE FROM storage WHERE id = ?;',{}, $value->[0]) unless ($self->api_name =~ /peek$/);
344
345                 _flush_by_name($name);
346
347                 return JSON->JSON2perl( $value->[1] );
348         } catch Error with {
349                 #my $e = shift;
350                 #return $e;
351                 return undef;
352         };
353 }
354 __PACKAGE__->register_method(
355         api_name => 'opensrf.persist.queue.peek',
356         method => 'pop_queue',
357         argc => 1,
358 );
359 __PACKAGE__->register_method(
360         api_name => 'opensrf.persist.queue.pop',
361         method => 'pop_queue',
362         argc => 1,
363 );
364
365
366 sub peek_slot {
367         my $self = shift;
368         my $client = shift;
369
370         my $name = shift or do {
371                 throw OpenSRF::EX::WARN ("No slot name specified!");
372         };
373         my $name_id = _get_name_id($name);
374
375         my $order = 'ASC';
376         $order = 'DESC' if ($self->api_name =~ /stack/o);
377         
378         my $values = $dbh->selectall_arrayref("SELECT value FROM storage WHERE name_id = ? ORDER BY id $order;", {}, $name_id);
379
380         $client->respond( JSON->JSON2perl( $_->[0] ) ) for (@$values);
381
382         _flush_by_name($name);
383         return undef;
384 }
385 __PACKAGE__->register_method(
386         api_name => 'opensrf.persist.queue.peek.all',
387         method => 'peek_slot',
388         argc => 1,
389         stream => 1,
390 );
391 __PACKAGE__->register_method(
392         api_name => 'opensrf.persist.stack.peek.all',
393         method => 'peek_slot',
394         argc => 1,
395         stream => 1,
396 );
397
398
399 sub store_size {
400         my $self = shift;
401         my $client = shift;
402
403         my $name = shift or do {
404                 throw OpenSRF::EX::WARN ("No queue name specified!");
405         };
406         my $name_id = _get_name_id($name);
407
408         my $value = $dbh->selectcol_arrayref('SELECT SUM(LENGTH(value)) FROM storage WHERE name_id = ?;', {}, $name_id);
409
410         return JSON->JSON2perl( $value->[0] );
411 }
412 __PACKAGE__->register_method(
413         api_name => 'opensrf.persist.queue.size',
414         method => 'shift_stack',
415         argc => 1,
416 );
417 __PACKAGE__->register_method(
418         api_name => 'opensrf.persist.stack.size',
419         method => 'shift_stack',
420         argc => 1,
421 );
422 __PACKAGE__->register_method(
423         api_name => 'opensrf.persist.object.size',
424         method => 'shift_stack',
425         argc => 1,
426 );
427
428 sub store_depth {
429         my $self = shift;
430         my $client = shift;
431
432         my $name = shift or do {
433                 throw OpenSRF::EX::WARN ("No queue name specified!");
434         };
435         my $name_id = _get_name_id($name);
436
437         my $value = $dbh->selectcol_arrayref('SELECT COUNT(*) FROM storage WHERE name_id = ?;', {}, $name_id);
438
439         return JSON->JSON2perl( $value->[0] );
440 }
441 __PACKAGE__->register_method(
442         api_name => 'opensrf.persist.queue.length',
443         method => 'shift_stack',
444         argc => 1,
445 );
446 __PACKAGE__->register_method(
447         api_name => 'opensrf.persist.stack.depth',
448         method => 'shift_stack',
449         argc => 1,
450 );
451
452 sub shift_stack {
453         my $self = shift;
454         my $client = shift;
455
456         my $name = shift or do {
457                 throw OpenSRF::EX::WARN ("No slot name specified!");
458         };
459
460         try {
461                 my $name_id = _get_name_id($name);
462
463                 my $value = $dbh->selectrow_arrayref('SELECT id, value FROM storage WHERE name_id = ? ORDER BY id DESC LIMIT 1;', {}, $name_id);
464                 $dbh->do('DELETE FROM storage WHERE id = ?;',{}, $value->[0]) unless ($self->api_name =~ /peek$/);
465
466                 _flush_by_name($name);
467
468                 return JSON->JSON2perl( $value->[1] );
469         } catch Error with {
470                 my $e = shift;
471                 return undef;
472         };
473 }
474 __PACKAGE__->register_method(
475         api_name => 'opensrf.persist.stack.peek',
476         method => 'shift_stack',
477         argc => 1,
478 );
479 __PACKAGE__->register_method(
480         api_name => 'opensrf.persist.stack.pop',
481         method => 'shift_stack',
482         argc => 1,
483 );
484
485 sub get_object {
486         my $self = shift;
487         my $client = shift;
488
489         my $name = shift or do {
490                 throw OpenSRF::EX::WARN ("No object name specified!");
491         };
492
493         try {
494                 my $name_id = _get_name_id($name);
495
496                 my $value = $dbh->selectrow_arrayref('SELECT name_id, value FROM storage WHERE name_id = ? ORDER BY id DESC LIMIT 1;', {}, $name_id);
497                 $dbh->do('DELETE FROM storage WHERE name_id = ?',{}, $value->[0]) unless ($self->api_name =~ /peek$/);
498
499                 _flush_by_name($name);
500
501                 return JSON->JSON2perl( $value->[1] );
502         } catch Error with {
503                 return undef;
504         };
505 }
506 __PACKAGE__->register_method(
507         api_name => 'opensrf.persist.object.peek',
508         method => 'shift_stack',
509         argc => 1,
510 );
511 __PACKAGE__->register_method(
512         api_name => 'opensrf.persist.object.get',
513         method => 'shift_stack',
514         argc => 1,
515 );
516
517 1;