1 package OpenSRF::Application::Persist;
2 use base qw/OpenSRF::Application/;
3 use OpenSRF::Application;
5 use OpenSRF::Utils::SettingsClient;
6 use OpenSRF::EX qw/:try/;
7 use OpenSRF::Utils qw/:common/;
8 use OpenSRF::Utils::Logger;
12 use vars qw/$dbh $log $default_expire_time/;
15 $log = 'OpenSRF::Utils::Logger';
17 $sc = OpenSRF::Utils::SettingsClient->new;
19 my $dbfile = $sc->config_value( apps => 'opensrf.persist' => app_settings => 'dbfile');
21 throw OpenSRF::EX::PANIC ("Can't find my dbfile for SQLite!");
24 my $init_dbh = DBI->connect("dbi:SQLite:dbname=$dbfile","","");
25 $init_dbh->{AutoCommit} = 1;
26 $init_dbh->{RaiseError} = 0;
28 $init_dbh->do( <<" SQL" );
29 CREATE TABLE storage (
30 id INTEGER PRIMARY KEY,
36 $init_dbh->do( <<" SQL" );
37 CREATE TABLE store_name (
38 id INTEGER PRIMARY KEY,
43 $init_dbh->do( <<" SQL" );
44 CREATE TABLE store_expire (
45 id INTEGER PRIMARY KEY,
47 expire_interval INTEGER
54 my $sc = OpenSRF::Utils::SettingsClient->new;
56 $default_expire_time = $sc->config_value( apps => 'opensrf.persist' => app_settings => 'default_expire_time' );
57 $default_expire_time ||= 300;
59 my $dbfile = $sc->config_value( apps => 'opensrf.persist' => app_settings => 'dbfile');
61 throw OpenSRF::EX::PANIC ("Can't find my dbfile for SQLite!");
64 $dbh = DBI->connect("dbi:SQLite:dbname=$dbfile","","");
65 $dbh->{AutoCommit} = 1;
66 $dbh->{RaiseError} = 0;
74 my $name = shift || '';
86 throw OpenSRF::EX::WARN ("Duplicate key: object name [$name] already exists! " . $dbh->errstr)
89 my $sth = $dbh->prepare("INSERT INTO store_name (name) VALUES (?);");
94 my $last_id = $dbh->last_insert_id(undef, undef, 'store_name', 'id');
95 $name = 'AUTOGENERATED!!'.$last_id;
96 $dbh->do("UPDATE store_name SET name = '$name' WHERE id = '$last_id';");
99 _flush_by_name($name);
105 __PACKAGE__->register_method(
106 api_name => 'opensrf.persist.slot.create',
107 method => 'create_store',
112 sub create_expirable_store {
115 my $name = shift || do { throw OpenSRF::EX::InvalidArg ("Expirable slots must be given a name!") };
116 my $time = shift || $default_expire_time;
119 ($name) = $self->method_lookup( 'opensrf.persist.slot.create' )->run( $name );
120 return undef unless $name;
122 $self->method_lookup('opensrf.persist.slot.set_expire')->run($name, $time);
129 __PACKAGE__->register_method(
130 api_name => 'opensrf.persist.slot.create_expirable',
131 method => 'create_expirable_store',
135 sub _update_expire_atime {
137 $dbh->do('UPDATE store_expire SET atime = ? WHERE id = ?', {}, time(), $id);
140 sub set_expire_interval {
144 my $new_interval = shift;
147 my $etime = interval_to_seconds($new_interval);
148 my $sid = _get_name_id($slot);
150 $dbh->do('DELETE FROM store_expire where id = ?', {}, $sid);
151 return 0 if ($etime == 0);
153 $dbh->do('INSERT INTO store_expire (id, atime, expire_interval) VALUES (?,?,?);',{},$sid,time(),$etime);
157 __PACKAGE__->register_method(
158 api_name => 'opensrf.persist.slot.set_expire',
159 method => 'set_expire_interval',
163 sub get_expire_interval {
168 my $sid = _get_name_id($slot);
169 my ($int) = $dbh->selectrow_array('SELECT expire_interval FROM store_expire WHERE id = ?;',{},$sid);
170 return undef unless ($int);
172 my ($future) = $dbh->selectrow_array('SELECT atime + expire_interval FROM store_expire WHERE id = ?;',{},$sid);
173 return $future - time();
175 __PACKAGE__->register_method(
176 api_name => 'opensrf.persist.slot.get_expire',
177 method => 'get_expire_interval',
182 sub _sweep_expired_slots {
185 my $expired_slots = $dbh->selectcol_arrayref(<<" SQL", {}, time() );
186 SELECT id FROM store_expire WHERE (atime + expire_interval) <= ?;
189 return unless ($expired_slots);
191 $dbh->do('DELETE FROM storage WHERE name_id IN ('.join(',', map { '?' } @$expired_slots).');', {}, @$expired_slots);
192 for my $id (@$expired_slots) {
193 _flush_by_name(_get_id_name($id), 1);
201 my $name = shift or do {
202 throw OpenSRF::EX::WARN ("No name specified!");
205 my $value = shift || '';
208 my $name_id = _get_name_id($name);
210 if ($self->api_name =~ /object/) {
211 $dbh->do('DELETE FROM storage WHERE name_id = ?;', {}, $name_id);
214 $dbh->do('INSERT INTO storage (name_id,value) VALUES (?,?);', {}, $name_id, JSON->perl2JSON($value));
216 _flush_by_name($name);
223 __PACKAGE__->register_method(
224 api_name => 'opensrf.persist.object.set',
225 method => 'add_item',
228 __PACKAGE__->register_method(
229 api_name => 'opensrf.persist.queue.push',
230 method => 'add_item',
233 __PACKAGE__->register_method(
234 api_name => 'opensrf.persist.stack.push',
235 method => 'add_item',
240 my $name = shift or do {
241 throw OpenSRF::EX::WARN ("No slot id specified!");
245 my $name_id = $dbh->selectcol_arrayref("SELECT name FROM store_name WHERE id = ?;", {}, $name);
247 if (!ref($name_id) || !defined($name_id->[0])) {
248 throw OpenSRF::EX::WARN ("Slot id [$name] does not exist!");
251 return $name_id->[0];
255 my $name = shift or do {
256 throw OpenSRF::EX::WARN ("No slot name specified!");
260 my $name_id = $dbh->selectrow_arrayref("SELECT id FROM store_name WHERE name = ?;", {}, $name);
262 if (!ref($name_id) || !defined($name_id->[0])) {
263 throw OpenSRF::EX::WARN ("Slot name [$name] does not exist!");
266 return $name_id->[0];
277 my $name_id = _get_name_id($name);
279 $dbh->do("DELETE FROM storage WHERE name_id = ?;", {}, $name_id);
280 $dbh->do("DELETE FROM store_name WHERE id = ?;", {}, $name_id);
281 $dbh->do("DELETE FROM store_expire WHERE id = ?;", {}, $name_id);
283 _sweep_expired_slots();
290 __PACKAGE__->register_method(
291 api_name => 'opensrf.persist.slot.destroy',
292 method => 'destroy_store',
298 my $no_sweep = shift;
300 my $name_id = _get_name_id($name);
303 _update_expire_atime($name);
304 _sweep_expired_slots();
307 if ($name =~ /^AUTOGENERATED!!/) {
308 my $count = $dbh->selectcol_arrayref("SELECT COUNT(*) FROM storage WHERE name_id = ?;", {}, $name_id);
309 if (!ref($count) || $$count[0] == 0) {
310 $dbh->do("DELETE FROM store_name WHERE name = ?;", {}, $name);
319 my $name = shift or do {
320 throw OpenSRF::EX::WARN ("No queue name specified!");
324 my $name_id = _get_name_id($name);
326 my $value = $dbh->selectrow_arrayref('SELECT id, value FROM storage WHERE name_id = ? ORDER BY id ASC LIMIT 1;', {}, $name_id);
327 $dbh->do('DELETE FROM storage WHERE id = ?;',{}, $value->[0]) unless ($self->api_name =~ /peek$/);
329 _flush_by_name($name);
331 return JSON->JSON2perl( $value->[1] );
338 __PACKAGE__->register_method(
339 api_name => 'opensrf.persist.queue.peek',
340 method => 'pop_queue',
343 __PACKAGE__->register_method(
344 api_name => 'opensrf.persist.queue.pop',
345 method => 'pop_queue',
354 my $name = shift or do {
355 throw OpenSRF::EX::WARN ("No slot name specified!");
357 my $name_id = _get_name_id($name);
360 $order = 'DESC' if ($self->api_name =~ /stack/o);
362 my $values = $dbh->selectall_arrayref("SELECT value FROM storage WHERE name_id = ? ORDER BY id $order;", {}, $name_id);
364 $client->respond( JSON->JSON2perl( $_->[0] ) ) for (@$values);
366 _flush_by_name($name);
369 __PACKAGE__->register_method(
370 api_name => 'opensrf.persist.queue.peek.all',
371 method => 'peek_slot',
375 __PACKAGE__->register_method(
376 api_name => 'opensrf.persist.stack.peek.all',
377 method => 'peek_slot',
387 my $name = shift or do {
388 throw OpenSRF::EX::WARN ("No queue name specified!");
390 my $name_id = _get_name_id($name);
392 my $value = $dbh->selectcol_arrayref('SELECT SUM(LENGTH(value)) FROM storage WHERE name_id = ?;', {}, $name_id);
394 return JSON->JSON2perl( $value->[0] );
396 __PACKAGE__->register_method(
397 api_name => 'opensrf.persist.queue.size',
398 method => 'shift_stack',
401 __PACKAGE__->register_method(
402 api_name => 'opensrf.persist.stack.size',
403 method => 'shift_stack',
406 __PACKAGE__->register_method(
407 api_name => 'opensrf.persist.object.size',
408 method => 'shift_stack',
416 my $name = shift or do {
417 throw OpenSRF::EX::WARN ("No queue name specified!");
419 my $name_id = _get_name_id($name);
421 my $value = $dbh->selectcol_arrayref('SELECT COUNT(*) FROM storage WHERE name_id = ?;', {}, $name_id);
423 return JSON->JSON2perl( $value->[0] );
425 __PACKAGE__->register_method(
426 api_name => 'opensrf.persist.queue.length',
427 method => 'shift_stack',
430 __PACKAGE__->register_method(
431 api_name => 'opensrf.persist.stack.depth',
432 method => 'shift_stack',
440 my $name = shift or do {
441 throw OpenSRF::EX::WARN ("No slot name specified!");
445 my $name_id = _get_name_id($name);
447 my $value = $dbh->selectrow_arrayref('SELECT id, value FROM storage WHERE name_id = ? ORDER BY id DESC LIMIT 1;', {}, $name_id);
448 $dbh->do('DELETE FROM storage WHERE id = ?;',{}, $value->[0]) unless ($self->api_name =~ /peek$/);
450 _flush_by_name($name);
452 return JSON->JSON2perl( $value->[1] );
458 __PACKAGE__->register_method(
459 api_name => 'opensrf.persist.stack.peek',
460 method => 'shift_stack',
463 __PACKAGE__->register_method(
464 api_name => 'opensrf.persist.stack.pop',
465 method => 'shift_stack',
473 my $name = shift or do {
474 throw OpenSRF::EX::WARN ("No object name specified!");
478 my $name_id = _get_name_id($name);
480 my $value = $dbh->selectrow_arrayref('SELECT name_id, value FROM storage WHERE name_id = ? ORDER BY id DESC LIMIT 1;', {}, $name_id);
481 $dbh->do('DELETE FROM storage WHERE name_id = ?',{}, $value->[0]) unless ($self->api_name =~ /peek$/);
483 _flush_by_name($name);
485 return JSON->JSON2perl( $value->[1] );
490 __PACKAGE__->register_method(
491 api_name => 'opensrf.persist.object.peek',
492 method => 'shift_stack',
495 __PACKAGE__->register_method(
496 api_name => 'opensrf.persist.object.get',
497 method => 'shift_stack',