1 package OpenSRF::Application::Persist;
2 use base qw/OpenSRF::Application/;
3 use OpenSRF::Application;
5 use OpenSRF::Utils::SettingsClient;
6 use OpenSRF::EX qw/:try/;
7 use OpenSRF::Utils qw/:common/;
8 use OpenSRF::Utils::Logger;
9 use OpenSRF::Utils::JSON;
12 use vars qw/$dbh $log $default_expire_time/;
15 $log = 'OpenSRF::Utils::Logger';
17 $sc = OpenSRF::Utils::SettingsClient->new;
19 my $dbfile = $sc->config_value( apps => 'opensrf.persist' => app_settings => 'dbfile');
21 throw OpenSRF::EX::PANIC ("Can't find my dbfile for SQLite!");
24 my $init_dbh = DBI->connect("dbi:SQLite:dbname=$dbfile","","");
25 $init_dbh->{AutoCommit} = 1;
26 $init_dbh->{RaiseError} = 0;
28 $init_dbh->do( <<" SQL" );
29 CREATE TABLE storage (
30 id INTEGER PRIMARY KEY,
36 $init_dbh->do( <<" SQL" );
37 CREATE TABLE store_name (
38 id INTEGER PRIMARY KEY,
43 $init_dbh->do( <<" SQL" );
44 CREATE TABLE store_expire (
45 id INTEGER PRIMARY KEY,
47 expire_interval INTEGER
54 my $sc = OpenSRF::Utils::SettingsClient->new;
56 $default_expire_time = $sc->config_value( apps => 'opensrf.persist' => app_settings => 'default_expire_time' );
57 $default_expire_time ||= 300;
59 my $dbfile = $sc->config_value( apps => 'opensrf.persist' => app_settings => 'dbfile');
61 throw OpenSRF::EX::PANIC ("Can't find my dbfile for SQLite!");
64 $dbh = DBI->connect("dbi:SQLite:dbname=$dbfile","","");
65 $dbh->{AutoCommit} = 1;
66 $dbh->{RaiseError} = 0;
74 my $name = shift || '';
86 throw OpenSRF::EX::WARN ("Duplicate key: object name [$name] already exists! " . $dbh->errstr)
89 my $sth = $dbh->prepare("INSERT INTO store_name (name) VALUES (?);");
94 my $last_id = $dbh->last_insert_id(undef, undef, 'store_name', 'id');
95 $name = 'AUTOGENERATED!!'.$last_id;
96 $dbh->do("UPDATE store_name SET name = '$name' WHERE id = '$last_id';");
99 _flush_by_name($name);
105 __PACKAGE__->register_method(
106 api_name => 'opensrf.persist.slot.create',
107 method => 'create_store',
112 sub create_expirable_store {
115 my $name = shift || do { throw OpenSRF::EX::InvalidArg ("Expirable slots must be given a name!") };
116 my $time = shift || $default_expire_time;
119 ($name) = $self->method_lookup( 'opensrf.persist.slot.create' )->run( $name );
120 return undef unless $name;
122 $self->method_lookup('opensrf.persist.slot.set_expire')->run($name, $time);
129 __PACKAGE__->register_method(
130 api_name => 'opensrf.persist.slot.create_expirable',
131 method => 'create_expirable_store',
135 sub _update_expire_atime {
137 $dbh->do('UPDATE store_expire SET atime = ? WHERE id = ?', {}, time(), $id);
140 sub set_expire_interval {
144 my $new_interval = shift;
147 my $etime = interval_to_seconds($new_interval);
148 my $sid = _get_name_id($slot);
150 $dbh->do('DELETE FROM store_expire where id = ?', {}, $sid);
151 return 0 if ($etime == 0);
153 $dbh->do('INSERT INTO store_expire (id, atime, expire_interval) VALUES (?,?,?);',{},$sid,time(),$etime);
157 __PACKAGE__->register_method(
158 api_name => 'opensrf.persist.slot.set_expire',
159 method => 'set_expire_interval',
168 my $sid = _get_name_id($slot);
169 return $slot if ($sid);
172 __PACKAGE__->register_method(
173 api_name => 'opensrf.persist.slot.find',
174 method => 'find_slot',
178 sub get_expire_interval {
183 my $sid = _get_name_id($slot);
184 my ($int) = $dbh->selectrow_array('SELECT expire_interval FROM store_expire WHERE id = ?;',{},$sid);
185 return undef unless ($int);
187 my ($future) = $dbh->selectrow_array('SELECT atime + expire_interval FROM store_expire WHERE id = ?;',{},$sid);
188 return $future - time();
190 __PACKAGE__->register_method(
191 api_name => 'opensrf.persist.slot.get_expire',
192 method => 'get_expire_interval',
197 sub _sweep_expired_slots {
200 my $expired_slots = $dbh->selectcol_arrayref(<<" SQL", {}, time() );
201 SELECT id FROM store_expire WHERE (atime + expire_interval) <= ?;
204 return unless ($expired_slots);
206 $dbh->do('DELETE FROM storage WHERE name_id IN ('.join(',', map { '?' } @$expired_slots).');', {}, @$expired_slots);
207 $dbh->do('DELETE FROM store_expire WHERE id IN ('.join(',', map { '?' } @$expired_slots).');', {}, @$expired_slots);
208 for my $id (@$expired_slots) {
209 _flush_by_name(_get_id_name($id), 1);
217 my $name = shift or do {
218 throw OpenSRF::EX::WARN ("No name specified!");
221 my $value = shift || '';
224 my $name_id = _get_name_id($name);
226 if ($self->api_name =~ /object/) {
227 $dbh->do('DELETE FROM storage WHERE name_id = ?;', {}, $name_id);
230 $dbh->do('INSERT INTO storage (name_id,value) VALUES (?,?);', {}, $name_id, OpenSRF::Utils::JSON->perl2JSON($value));
232 _flush_by_name($name);
239 __PACKAGE__->register_method(
240 api_name => 'opensrf.persist.object.set',
241 method => 'add_item',
244 __PACKAGE__->register_method(
245 api_name => 'opensrf.persist.queue.push',
246 method => 'add_item',
249 __PACKAGE__->register_method(
250 api_name => 'opensrf.persist.stack.push',
251 method => 'add_item',
256 my $name = shift or do {
257 throw OpenSRF::EX::WARN ("No slot id specified!");
261 my $name_id = $dbh->selectcol_arrayref("SELECT name FROM store_name WHERE id = ?;", {}, $name);
263 if (!ref($name_id) || !defined($name_id->[0])) {
264 throw OpenSRF::EX::WARN ("Slot id [$name] does not exist!");
267 return $name_id->[0];
271 my $name = shift or do {
272 throw OpenSRF::EX::WARN ("No slot name specified!");
276 my $name_id = $dbh->selectrow_arrayref("SELECT id FROM store_name WHERE name = ?;", {}, $name);
278 if (!ref($name_id) || !defined($name_id->[0])) {
279 throw OpenSRF::EX::WARN ("Slot name [$name] does not exist!");
282 return $name_id->[0];
293 my $name_id = _get_name_id($name);
295 $dbh->do("DELETE FROM storage WHERE name_id = ?;", {}, $name_id);
296 $dbh->do("DELETE FROM store_name WHERE id = ?;", {}, $name_id);
297 $dbh->do("DELETE FROM store_expire WHERE id = ?;", {}, $name_id);
299 _sweep_expired_slots();
306 __PACKAGE__->register_method(
307 api_name => 'opensrf.persist.slot.destroy',
308 method => 'destroy_store',
314 my $no_sweep = shift;
316 my $name_id = _get_name_id($name);
319 _update_expire_atime($name);
320 _sweep_expired_slots();
323 if ($name =~ /^AUTOGENERATED!!/) {
324 my $count = $dbh->selectcol_arrayref("SELECT COUNT(*) FROM storage WHERE name_id = ?;", {}, $name_id);
325 if (!ref($count) || $$count[0] == 0) {
326 $dbh->do("DELETE FROM store_name WHERE name = ?;", {}, $name);
335 my $name = shift or do {
336 throw OpenSRF::EX::WARN ("No queue name specified!");
340 my $name_id = _get_name_id($name);
342 my $value = $dbh->selectrow_arrayref('SELECT id, value FROM storage WHERE name_id = ? ORDER BY id ASC LIMIT 1;', {}, $name_id);
343 $dbh->do('DELETE FROM storage WHERE id = ?;',{}, $value->[0]) unless ($self->api_name =~ /peek$/);
345 _flush_by_name($name);
347 return OpenSRF::Utils::JSON->JSON2perl( $value->[1] );
354 __PACKAGE__->register_method(
355 api_name => 'opensrf.persist.queue.peek',
356 method => 'pop_queue',
359 __PACKAGE__->register_method(
360 api_name => 'opensrf.persist.queue.pop',
361 method => 'pop_queue',
370 my $name = shift or do {
371 throw OpenSRF::EX::WARN ("No slot name specified!");
373 my $name_id = _get_name_id($name);
376 $order = 'DESC' if ($self->api_name =~ /stack/o);
378 my $values = $dbh->selectall_arrayref("SELECT value FROM storage WHERE name_id = ? ORDER BY id $order;", {}, $name_id);
380 $client->respond( OpenSRF::Utils::JSON->JSON2perl( $_->[0] ) ) for (@$values);
382 _flush_by_name($name);
385 __PACKAGE__->register_method(
386 api_name => 'opensrf.persist.queue.peek.all',
387 method => 'peek_slot',
391 __PACKAGE__->register_method(
392 api_name => 'opensrf.persist.stack.peek.all',
393 method => 'peek_slot',
403 my $name = shift or do {
404 throw OpenSRF::EX::WARN ("No queue name specified!");
406 my $name_id = _get_name_id($name);
408 my $value = $dbh->selectcol_arrayref('SELECT SUM(LENGTH(value)) FROM storage WHERE name_id = ?;', {}, $name_id);
410 return OpenSRF::Utils::JSON->JSON2perl( $value->[0] );
412 __PACKAGE__->register_method(
413 api_name => 'opensrf.persist.queue.size',
414 method => 'shift_stack',
417 __PACKAGE__->register_method(
418 api_name => 'opensrf.persist.stack.size',
419 method => 'shift_stack',
422 __PACKAGE__->register_method(
423 api_name => 'opensrf.persist.object.size',
424 method => 'shift_stack',
432 my $name = shift or do {
433 throw OpenSRF::EX::WARN ("No queue name specified!");
435 my $name_id = _get_name_id($name);
437 my $value = $dbh->selectcol_arrayref('SELECT COUNT(*) FROM storage WHERE name_id = ?;', {}, $name_id);
439 return OpenSRF::Utils::JSON->JSON2perl( $value->[0] );
441 __PACKAGE__->register_method(
442 api_name => 'opensrf.persist.queue.length',
443 method => 'shift_stack',
446 __PACKAGE__->register_method(
447 api_name => 'opensrf.persist.stack.depth',
448 method => 'shift_stack',
456 my $name = shift or do {
457 throw OpenSRF::EX::WARN ("No slot name specified!");
461 my $name_id = _get_name_id($name);
463 my $value = $dbh->selectrow_arrayref('SELECT id, value FROM storage WHERE name_id = ? ORDER BY id DESC LIMIT 1;', {}, $name_id);
464 $dbh->do('DELETE FROM storage WHERE id = ?;',{}, $value->[0]) unless ($self->api_name =~ /peek$/);
466 _flush_by_name($name);
468 return OpenSRF::Utils::JSON->JSON2perl( $value->[1] );
474 __PACKAGE__->register_method(
475 api_name => 'opensrf.persist.stack.peek',
476 method => 'shift_stack',
479 __PACKAGE__->register_method(
480 api_name => 'opensrf.persist.stack.pop',
481 method => 'shift_stack',
489 my $name = shift or do {
490 throw OpenSRF::EX::WARN ("No object name specified!");
494 my $name_id = _get_name_id($name);
496 my $value = $dbh->selectrow_arrayref('SELECT name_id, value FROM storage WHERE name_id = ? ORDER BY id DESC LIMIT 1;', {}, $name_id);
497 $dbh->do('DELETE FROM storage WHERE name_id = ?',{}, $value->[0]) unless ($self->api_name =~ /peek$/);
499 _flush_by_name($name);
501 return OpenSRF::Utils::JSON->JSON2perl( $value->[1] );
506 __PACKAGE__->register_method(
507 api_name => 'opensrf.persist.object.peek',
508 method => 'shift_stack',
511 __PACKAGE__->register_method(
512 api_name => 'opensrf.persist.object.get',
513 method => 'shift_stack',