get_expire method
[OpenSRF.git] / src / perlmods / OpenSRF / Application / Persist.pm
1 package OpenSRF::Application::Persist;
2 use base qw/OpenSRF::Application/;
3 use OpenSRF::Application;
4
5 use OpenSRF::Utils::SettingsClient;
6 use OpenSRF::EX qw/:try/;
7 use OpenSRF::Utils qw/:common/;
8 use OpenSRF::Utils::Logger;
9 use JSON;
10 use DBI;
11
12 use vars qw/$dbh $log $default_expire_time/;
13
14 sub initialize {
15         $log = 'OpenSRF::Utils::Logger';
16
17         $sc = OpenSRF::Utils::SettingsClient->new;
18
19         my $dbfile = $sc->config_value( apps => 'opensrf.persist' => app_settings => 'dbfile');
20         unless ($dbfile) {
21                 throw OpenSRF::EX::PANIC ("Can't find my dbfile for SQLite!");
22         }
23
24         my $init_dbh = DBI->connect("dbi:SQLite:dbname=$dbfile","","");
25         $init_dbh->{AutoCommit} = 1;
26         $init_dbh->{RaiseError} = 0;
27
28         $init_dbh->do( <<"      SQL" );
29                 CREATE TABLE storage (
30                         id      INTEGER PRIMARY KEY,
31                         name_id INTEGER,
32                         value   TEXT
33                 );
34         SQL
35
36         $init_dbh->do( <<"      SQL" );
37                 CREATE TABLE store_name (
38                         id      INTEGER PRIMARY KEY,
39                         name    TEXT UNIQUE
40                 );
41         SQL
42
43         $init_dbh->do( <<"      SQL" );
44                 CREATE TABLE store_expire (
45                         id              INTEGER PRIMARY KEY,
46                         atime           INTEGER,
47                         expire_interval INTEGER
48                 );
49         SQL
50
51 }
52
53 sub child_init {
54         my $sc = OpenSRF::Utils::SettingsClient->new;
55
56         $default_expire_time = $sc->config_value( apps => 'opensrf.persist' => app_settings => 'default_expire_time' );
57         $default_expire_time ||= 300;
58
59         my $dbfile = $sc->config_value( apps => 'opensrf.persist' => app_settings => 'dbfile');
60         unless ($dbfile) {
61                 throw OpenSRF::EX::PANIC ("Can't find my dbfile for SQLite!");
62         }
63
64         $dbh = DBI->connect("dbi:SQLite:dbname=$dbfile","","");
65         $dbh->{AutoCommit} = 1;
66         $dbh->{RaiseError} = 0;
67
68 }
69
70 sub create_store {
71         my $self = shift;
72         my $client = shift;
73
74         my $name = shift || '';
75
76         try {
77         
78                 my $continue = 0;
79                 try {
80                         _get_name_id($name);
81
82                 } catch Error with { 
83                         $continue++;
84                 };
85
86                 throw OpenSRF::EX::WARN ("Duplicate key:  object name [$name] already exists!  " . $dbh->errstr)
87                         unless ($continue);
88
89                 my $sth = $dbh->prepare("INSERT INTO store_name (name) VALUES (?);");
90                 $sth->execute($name);
91                 $sth->finish;
92
93                 unless ($name) {
94                         my $last_id = $dbh->last_insert_id(undef, undef, 'store_name', 'id');
95                         $name = 'AUTOGENERATED!!'.$last_id;
96                         $dbh->do("UPDATE store_name SET name = '$name' WHERE id = '$last_id';");
97                 }
98
99                 _flush_by_name($name);
100                 return $name;
101         } catch Error with {
102                 return undef;
103         };
104 }
105 __PACKAGE__->register_method(
106         api_name => 'opensrf.persist.slot.create',
107         method => 'create_store',
108         argc => 1,
109 );
110
111
112 sub create_expirable_store {
113         my $self = shift;
114         my $client = shift;
115         my $name = shift || do { throw OpenSRF::EX::InvalidArg ("Expirable slots must be given a name!") };
116         my $time = shift || $default_expire_time;
117
118         try {
119                 ($name) = $self->method_lookup( 'opensrf.persist.slot.create' )->run( $name );
120                 return undef unless $name;
121
122                 $self->method_lookup('opensrf.persist.slot.set_expire')->run($name, $time);
123                 return $name;
124         } catch Error with {
125                 return undef;
126         };
127
128 }
129 __PACKAGE__->register_method(
130         api_name => 'opensrf.persist.slot.create_expirable',
131         method => 'create_expirable_store',
132         argc => 2,
133 );
134
135 sub _update_expire_atime {
136         my $id = shift;
137         $dbh->do('UPDATE store_expire SET atime = ? WHERE id = ?', {}, time(), $id);
138 }
139
140 sub set_expire_interval {
141         my $self = shift;
142         my $client = shift;
143         my $slot = shift;
144         my $new_interval = shift;
145
146         try {
147                 my $etime = interval_to_seconds($new_interval);
148                 my $sid = _get_name_id($slot);
149
150                 $dbh->do('DELETE FROM store_expire where id = ?', {}, $sid);
151                 return 0 if ($etime == 0);
152
153                 $dbh->do('INSERT INTO store_expire (id, atime, expire_interval) VALUES (?,?,?);',{},$sid,time(),$etime);
154                 return $etime;
155         } 
156 }
157 __PACKAGE__->register_method(
158         api_name => 'opensrf.persist.slot.set_expire',
159         method => 'set_expire_interval',
160         argc => 2,
161 );
162
163 sub get_expire_interval {
164         my $self = shift;
165         my $client = shift;
166         my $slot = shift;
167
168         my $sid = _get_name_id($slot);
169         my ($int) = $dbh->selectrow_array('SELECT expire_interval FROM store_expire WHERE id = ?;',{},$sid);
170         return undef unless ($int);
171
172         my ($future) = $dbh->selectrow_array('SELECT atime + expire_interval FROM store_expire WHERE id = ?;',{},$sid);
173         return $future - time();
174 }
175 __PACKAGE__->register_method(
176         api_name => 'opensrf.persist.slot.get_expire',
177         method => 'get_expire_interval',
178         argc => 2,
179 );
180
181
182 sub _sweep_expired_slots {
183         return if (shift());
184
185         my $expired_slots = $dbh->selectcol_arrayref(<<"        SQL", {}, time() );
186                 SELECT id FROM store_expire WHERE (atime + expire_interval) <= ?;
187         SQL
188
189         return unless ($expired_slots);
190
191         $dbh->do('DELETE FROM storage WHERE name_id IN ('.join(',', map { '?' } @$expired_slots).');', {}, @$expired_slots);
192         for my $id (@$expired_slots) {
193                 _flush_by_name(_get_id_name($id), 1);
194         }
195 }
196
197 sub add_item {
198         my $self = shift;
199         my $client = shift;
200
201         my $name = shift or do {
202                 throw OpenSRF::EX::WARN ("No name specified!");
203         };
204
205         my $value = shift || '';
206
207         try {
208                 my $name_id = _get_name_id($name);
209         
210                 if ($self->api_name =~ /object/) {
211                         $dbh->do('DELETE FROM storage WHERE name_id = ?;', {}, $name_id);
212                 }
213
214                 $dbh->do('INSERT INTO storage (name_id,value) VALUES (?,?);', {}, $name_id, JSON->perl2JSON($value));
215
216                 _flush_by_name($name);
217
218                 return $name;
219         } catch Error with {
220                 return undef;
221         };
222 }
223 __PACKAGE__->register_method(
224         api_name => 'opensrf.persist.object.set',
225         method => 'add_item',
226         argc => 2,
227 );
228 __PACKAGE__->register_method(
229         api_name => 'opensrf.persist.queue.push',
230         method => 'add_item',
231         argc => 2,
232 );
233 __PACKAGE__->register_method(
234         api_name => 'opensrf.persist.stack.push',
235         method => 'add_item',
236         argc => 2,
237 );
238
239 sub _get_id_name {
240         my $name = shift or do {
241                 throw OpenSRF::EX::WARN ("No slot id specified!");
242         };
243
244
245         my $name_id = $dbh->selectcol_arrayref("SELECT name FROM store_name WHERE id = ?;", {}, $name);
246
247         if (!ref($name_id) || !defined($name_id->[0])) {
248                 throw OpenSRF::EX::WARN ("Slot id [$name] does not exist!");
249         }
250
251         return $name_id->[0];
252 }
253
254 sub _get_name_id {
255         my $name = shift or do {
256                 throw OpenSRF::EX::WARN ("No slot name specified!");
257         };
258
259
260         my $name_id = $dbh->selectrow_arrayref("SELECT id FROM store_name WHERE name = ?;", {}, $name);
261
262         if (!ref($name_id) || !defined($name_id->[0])) {
263                 throw OpenSRF::EX::WARN ("Slot name [$name] does not exist!");
264         }
265
266         return $name_id->[0];
267 }
268
269 sub destroy_store {
270         my $self = shift;
271         my $client = shift;
272
273         my $name = shift;
274
275         my $problem = 0;
276         try {
277                 my $name_id = _get_name_id($name);
278         
279                 $dbh->do("DELETE FROM storage WHERE name_id = ?;", {}, $name_id);
280                 $dbh->do("DELETE FROM store_name WHERE id = ?;", {}, $name_id);
281                 $dbh->do("DELETE FROM store_expire WHERE id = ?;", {}, $name_id);
282
283                 _sweep_expired_slots();
284                 return $name;
285         } catch Error with {
286                 return undef;
287         };
288
289 }
290 __PACKAGE__->register_method(
291         api_name => 'opensrf.persist.slot.destroy',
292         method => 'destroy_store',
293         argc => 1,
294 );
295
296 sub _flush_by_name {
297         my $name = shift;
298         my $no_sweep = shift;
299  
300         my $name_id = _get_name_id($name);
301
302         unless ($no_sweep) {
303                 _update_expire_atime($name);
304                 _sweep_expired_slots();
305         }
306         
307         if ($name =~ /^AUTOGENERATED!!/) {
308                 my $count = $dbh->selectcol_arrayref("SELECT COUNT(*) FROM storage WHERE name_id = ?;", {}, $name_id);
309                 if (!ref($count) || $$count[0] == 0) {
310                         $dbh->do("DELETE FROM store_name WHERE name = ?;", {}, $name);
311                 }
312         }
313 }
314         
315 sub pop_queue {
316         my $self = shift;
317         my $client = shift;
318
319         my $name = shift or do {
320                 throw OpenSRF::EX::WARN ("No queue name specified!");
321         };
322
323         try {
324                 my $name_id = _get_name_id($name);
325
326                 my $value = $dbh->selectrow_arrayref('SELECT id, value FROM storage WHERE name_id = ? ORDER BY id ASC LIMIT 1;', {}, $name_id);
327                 $dbh->do('DELETE FROM storage WHERE id = ?;',{}, $value->[0]) unless ($self->api_name =~ /peek$/);
328
329                 _flush_by_name($name);
330
331                 return JSON->JSON2perl( $value->[1] );
332         } catch Error with {
333                 #my $e = shift;
334                 #return $e;
335                 return undef;
336         };
337 }
338 __PACKAGE__->register_method(
339         api_name => 'opensrf.persist.queue.peek',
340         method => 'pop_queue',
341         argc => 1,
342 );
343 __PACKAGE__->register_method(
344         api_name => 'opensrf.persist.queue.pop',
345         method => 'pop_queue',
346         argc => 1,
347 );
348
349
350 sub peek_slot {
351         my $self = shift;
352         my $client = shift;
353
354         my $name = shift or do {
355                 throw OpenSRF::EX::WARN ("No slot name specified!");
356         };
357         my $name_id = _get_name_id($name);
358
359         my $order = 'ASC';
360         $order = 'DESC' if ($self->api_name =~ /stack/o);
361         
362         my $values = $dbh->selectall_arrayref("SELECT value FROM storage WHERE name_id = ? ORDER BY id $order;", {}, $name_id);
363
364         $client->respond( JSON->JSON2perl( $_->[0] ) ) for (@$values);
365
366         _flush_by_name($name);
367         return undef;
368 }
369 __PACKAGE__->register_method(
370         api_name => 'opensrf.persist.queue.peek.all',
371         method => 'peek_slot',
372         argc => 1,
373         stream => 1,
374 );
375 __PACKAGE__->register_method(
376         api_name => 'opensrf.persist.stack.peek.all',
377         method => 'peek_slot',
378         argc => 1,
379         stream => 1,
380 );
381
382
383 sub store_size {
384         my $self = shift;
385         my $client = shift;
386
387         my $name = shift or do {
388                 throw OpenSRF::EX::WARN ("No queue name specified!");
389         };
390         my $name_id = _get_name_id($name);
391
392         my $value = $dbh->selectcol_arrayref('SELECT SUM(LENGTH(value)) FROM storage WHERE name_id = ?;', {}, $name_id);
393
394         return JSON->JSON2perl( $value->[0] );
395 }
396 __PACKAGE__->register_method(
397         api_name => 'opensrf.persist.queue.size',
398         method => 'shift_stack',
399         argc => 1,
400 );
401 __PACKAGE__->register_method(
402         api_name => 'opensrf.persist.stack.size',
403         method => 'shift_stack',
404         argc => 1,
405 );
406 __PACKAGE__->register_method(
407         api_name => 'opensrf.persist.object.size',
408         method => 'shift_stack',
409         argc => 1,
410 );
411
412 sub store_depth {
413         my $self = shift;
414         my $client = shift;
415
416         my $name = shift or do {
417                 throw OpenSRF::EX::WARN ("No queue name specified!");
418         };
419         my $name_id = _get_name_id($name);
420
421         my $value = $dbh->selectcol_arrayref('SELECT COUNT(*) FROM storage WHERE name_id = ?;', {}, $name_id);
422
423         return JSON->JSON2perl( $value->[0] );
424 }
425 __PACKAGE__->register_method(
426         api_name => 'opensrf.persist.queue.length',
427         method => 'shift_stack',
428         argc => 1,
429 );
430 __PACKAGE__->register_method(
431         api_name => 'opensrf.persist.stack.depth',
432         method => 'shift_stack',
433         argc => 1,
434 );
435
436 sub shift_stack {
437         my $self = shift;
438         my $client = shift;
439
440         my $name = shift or do {
441                 throw OpenSRF::EX::WARN ("No slot name specified!");
442         };
443
444         try {
445                 my $name_id = _get_name_id($name);
446
447                 my $value = $dbh->selectrow_arrayref('SELECT id, value FROM storage WHERE name_id = ? ORDER BY id DESC LIMIT 1;', {}, $name_id);
448                 $dbh->do('DELETE FROM storage WHERE id = ?;',{}, $value->[0]) unless ($self->api_name =~ /peek$/);
449
450                 _flush_by_name($name);
451
452                 return JSON->JSON2perl( $value->[1] );
453         } catch Error with {
454                 my $e = shift;
455                 return undef;
456         };
457 }
458 __PACKAGE__->register_method(
459         api_name => 'opensrf.persist.stack.peek',
460         method => 'shift_stack',
461         argc => 1,
462 );
463 __PACKAGE__->register_method(
464         api_name => 'opensrf.persist.stack.pop',
465         method => 'shift_stack',
466         argc => 1,
467 );
468
469 sub get_object {
470         my $self = shift;
471         my $client = shift;
472
473         my $name = shift or do {
474                 throw OpenSRF::EX::WARN ("No object name specified!");
475         };
476
477         try {
478                 my $name_id = _get_name_id($name);
479
480                 my $value = $dbh->selectrow_arrayref('SELECT name_id, value FROM storage WHERE name_id = ? ORDER BY id DESC LIMIT 1;', {}, $name_id);
481                 $dbh->do('DELETE FROM storage WHERE name_id = ?',{}, $value->[0]) unless ($self->api_name =~ /peek$/);
482
483                 _flush_by_name($name);
484
485                 return JSON->JSON2perl( $value->[1] );
486         } catch Error with {
487                 return undef;
488         };
489 }
490 __PACKAGE__->register_method(
491         api_name => 'opensrf.persist.object.peek',
492         method => 'shift_stack',
493         argc => 1,
494 );
495 __PACKAGE__->register_method(
496         api_name => 'opensrf.persist.object.get',
497         method => 'shift_stack',
498         argc => 1,
499 );
500
501 1;