1 package OpenSRF::Application::Persist;
2 use base qw/OpenSRF::Application/;
3 use OpenSRF::Application;
5 use OpenSRF::Utils::SettingsClient;
6 use OpenSRF::EX qw/:try/;
7 use OpenSRF::Utils qw/:common/;
8 use OpenSRF::Utils::Logger;
12 use vars qw/$dbh $log $default_expire_time/;
15 $log = 'OpenSRF::Utils::Logger';
17 $sc = OpenSRF::Utils::SettingsClient->new;
19 my $dbfile = $sc->config_value( apps => 'opensrf.persist' => app_settings => 'dbfile');
21 throw OpenSRF::EX::PANIC ("Can't find my dbfile for SQLite!");
24 my $init_dbh = DBI->connect("dbi:SQLite:dbname=$dbfile","","");
25 $init_dbh->{AutoCommit} = 1;
26 $init_dbh->{RaiseError} = 0;
28 $init_dbh->do( <<" SQL" );
29 CREATE TABLE storage (
30 id INTEGER PRIMARY KEY,
36 $init_dbh->do( <<" SQL" );
37 CREATE TABLE store_name (
38 id INTEGER PRIMARY KEY,
43 $init_dbh->do( <<" SQL" );
44 CREATE TABLE store_expire (
45 id INTEGER PRIMARY KEY,
47 expire_interval INTEGER
54 my $sc = OpenSRF::Utils::SettingsClient->new;
56 $default_expire_time = $sc->config_value( apps => 'opensrf.persist' => app_settings => 'default_expire_time' );
57 $default_expire_time ||= 300;
59 my $dbfile = $sc->config_value( apps => 'opensrf.persist' => app_settings => 'dbfile');
61 throw OpenSRF::EX::PANIC ("Can't find my dbfile for SQLite!");
64 $dbh = DBI->connect("dbi:SQLite:dbname=$dbfile","","");
65 $dbh->{AutoCommit} = 1;
66 $dbh->{RaiseError} = 0;
74 my $name = shift || '';
86 throw OpenSRF::EX::WARN ("Duplicate key: object name [$name] already exists! " . $dbh->errstr)
89 my $sth = $dbh->prepare("INSERT INTO store_name (name) VALUES (?);");
94 my $last_id = $dbh->last_insert_id(undef, undef, 'store_name', 'id');
95 $name = 'AUTOGENERATED!!'.$last_id;
96 $dbh->do("UPDATE store_name SET name = '$name' WHERE id = '$last_id';");
99 _flush_by_name($name);
105 __PACKAGE__->register_method(
106 api_name => 'opensrf.persist.slot.create',
107 method => 'create_store',
112 sub create_expirable_store {
115 my $name = shift || do { throw OpenSRF::EX::InvalidArg ("Expirable slots must be given a name!") };
116 my $time = shift || $default_expire_time;
119 $name = $self->method_lookup( 'opensrf.persist.slot.create' )->run( $name );
120 return undef unless $name;
122 $self->method_lookup('opensrf.persist.slot.set_expire')->run($name, $time);
129 __PACKAGE__->register_method(
130 api_name => 'opensrf.persist.slot.create_expirable',
131 method => 'create_expirable_store',
135 sub _update_expire_atime {
137 $dbh->do('UPDATE store_expire SET atime = ? WHERE id = ?', {}, time(), $id);
140 sub set_expire_interval {
144 my $new_interval = shift;
147 my $etime = interval_to_seconds($new_interval);
148 my $sid = _get_name_id($slot);
150 $dbh->do('DELETE FROM store_expire where id = ?', {}, $sid);
151 return 0 if ($etime == 0);
153 $dbh->do('INSERT INTO store_expire (id, atime, expire_interval) VALUES (?,?,?);',{},$sid,time(),$etime);
157 __PACKAGE__->register_method(
158 api_name => 'opensrf.persist.slot.set_expire',
159 method => 'set_expire_interval',
164 sub _sweep_expired_slots {
167 my $expired_slots = $dbh->selectcol_arrayref(<<" SQL", {}, time() );
168 SELECT id FROM store_expire WHERE (atime + expire_interval) <= ?;
171 return unless ($expired_slots);
173 $dbh->do('DELETE FROM storage WHERE name_id IN ('.join(',', map { '?' } @$expired_slots).');', {}, @$expired_slots);
174 for my $id (@$expired_slots) {
175 _flush_by_name(_get_id_name($id), 1);
183 my $name = shift or do {
184 throw OpenSRF::EX::WARN ("No name specified!");
187 my $value = shift || '';
190 my $name_id = _get_name_id($name);
192 if ($self->api_name =~ /object/) {
193 $dbh->do('DELETE FROM storage WHERE name_id = ?;', {}, $name_id);
196 $dbh->do('INSERT INTO storage (name_id,value) VALUES (?,?);', {}, $name_id, JSON->perl2JSON($value));
198 _flush_by_name($name);
205 __PACKAGE__->register_method(
206 api_name => 'opensrf.persist.object.set',
207 method => 'add_item',
210 __PACKAGE__->register_method(
211 api_name => 'opensrf.persist.queue.push',
212 method => 'add_item',
215 __PACKAGE__->register_method(
216 api_name => 'opensrf.persist.stack.push',
217 method => 'add_item',
222 my $name = shift or do {
223 throw OpenSRF::EX::WARN ("No slot id specified!");
227 my $name_id = $dbh->selectcol_arrayref("SELECT name FROM store_name WHERE id = ?;", {}, $name);
229 if (!ref($name_id) || !defined($name_id->[0])) {
230 throw OpenSRF::EX::WARN ("Slot id [$name] does not exist!");
233 return $name_id->[0];
237 my $name = shift or do {
238 throw OpenSRF::EX::WARN ("No slot name specified!");
242 my $name_id = $dbh->selectrow_arrayref("SELECT id FROM store_name WHERE name = ?;", {}, $name);
244 if (!ref($name_id) || !defined($name_id->[0])) {
245 throw OpenSRF::EX::WARN ("Slot name [$name] does not exist!");
248 return $name_id->[0];
259 my $name_id = _get_name_id($name);
261 $dbh->do("DELETE FROM storage WHERE name_id = ?;", {}, $name_id);
262 $dbh->do("DELETE FROM store_name WHERE id = ?;", {}, $name_id);
263 $dbh->do("DELETE FROM store_expire WHERE id = ?;", {}, $name_id);
265 _sweep_expired_slots();
272 __PACKAGE__->register_method(
273 api_name => 'opensrf.persist.slot.destroy',
274 method => 'destroy_store',
280 my $no_sweep = shift;
282 my $name_id = _get_name_id($name);
285 _update_expire_atime($name);
286 _sweep_expired_slots();
289 if ($name =~ /^AUTOGENERATED!!/) {
290 my $count = $dbh->selectcol_arrayref("SELECT COUNT(*) FROM storage WHERE name_id = ?;", {}, $name_id);
291 if (!ref($count) || $$count[0] == 0) {
292 $dbh->do("DELETE FROM store_name WHERE name = ?;", {}, $name);
301 my $name = shift or do {
302 throw OpenSRF::EX::WARN ("No queue name specified!");
306 my $name_id = _get_name_id($name);
308 my $value = $dbh->selectrow_arrayref('SELECT id, value FROM storage WHERE name_id = ? ORDER BY id ASC LIMIT 1;', {}, $name_id);
309 $dbh->do('DELETE FROM storage WHERE id = ?;',{}, $value->[0]) unless ($self->api_name =~ /peek$/);
311 _flush_by_name($name);
313 return JSON->JSON2perl( $value->[1] );
320 __PACKAGE__->register_method(
321 api_name => 'opensrf.persist.queue.peek',
322 method => 'pop_queue',
325 __PACKAGE__->register_method(
326 api_name => 'opensrf.persist.queue.pop',
327 method => 'pop_queue',
336 my $name = shift or do {
337 throw OpenSRF::EX::WARN ("No slot name specified!");
339 my $name_id = _get_name_id($name);
342 $order = 'DESC' if ($self->api_name =~ /stack/o);
344 my $values = $dbh->selectall_arrayref("SELECT value FROM storage WHERE name_id = ? ORDER BY id $order;", {}, $name_id);
346 $client->respond( JSON->JSON2perl( $_->[0] ) ) for (@$values);
348 _flush_by_name($name);
351 __PACKAGE__->register_method(
352 api_name => 'opensrf.persist.queue.peek.all',
353 method => 'peek_slot',
357 __PACKAGE__->register_method(
358 api_name => 'opensrf.persist.stack.peek.all',
359 method => 'peek_slot',
369 my $name = shift or do {
370 throw OpenSRF::EX::WARN ("No queue name specified!");
372 my $name_id = _get_name_id($name);
374 my $value = $dbh->selectcol_arrayref('SELECT SUM(LENGTH(value)) FROM storage WHERE name_id = ?;', {}, $name_id);
376 return JSON->JSON2perl( $value->[0] );
378 __PACKAGE__->register_method(
379 api_name => 'opensrf.persist.queue.size',
380 method => 'shift_stack',
383 __PACKAGE__->register_method(
384 api_name => 'opensrf.persist.stack.size',
385 method => 'shift_stack',
388 __PACKAGE__->register_method(
389 api_name => 'opensrf.persist.object.size',
390 method => 'shift_stack',
398 my $name = shift or do {
399 throw OpenSRF::EX::WARN ("No queue name specified!");
401 my $name_id = _get_name_id($name);
403 my $value = $dbh->selectcol_arrayref('SELECT COUNT(*) FROM storage WHERE name_id = ?;', {}, $name_id);
405 return JSON->JSON2perl( $value->[0] );
407 __PACKAGE__->register_method(
408 api_name => 'opensrf.persist.queue.length',
409 method => 'shift_stack',
412 __PACKAGE__->register_method(
413 api_name => 'opensrf.persist.stack.depth',
414 method => 'shift_stack',
422 my $name = shift or do {
423 throw OpenSRF::EX::WARN ("No slot name specified!");
427 my $name_id = _get_name_id($name);
429 my $value = $dbh->selectrow_arrayref('SELECT id, value FROM storage WHERE name_id = ? ORDER BY id DESC LIMIT 1;', {}, $name_id);
430 $dbh->do('DELETE FROM storage WHERE id = ?;',{}, $value->[0]) unless ($self->api_name =~ /peek$/);
432 _flush_by_name($name);
434 return JSON->JSON2perl( $value->[1] );
440 __PACKAGE__->register_method(
441 api_name => 'opensrf.persist.stack.peek',
442 method => 'shift_stack',
445 __PACKAGE__->register_method(
446 api_name => 'opensrf.persist.stack.pop',
447 method => 'shift_stack',
455 my $name = shift or do {
456 throw OpenSRF::EX::WARN ("No object name specified!");
460 my $name_id = _get_name_id($name);
462 my $value = $dbh->selectrow_arrayref('SELECT name_id, value FROM storage WHERE name_id = ? ORDER BY id DESC LIMIT 1;', {}, $name_id);
463 $dbh->do('DELETE FROM storage WHERE name_id = ?',{}, $value->[0]) unless ($self->api_name =~ /peek$/);
465 _flush_by_name($name);
467 return JSON->JSON2perl( $value->[1] );
472 __PACKAGE__->register_method(
473 api_name => 'opensrf.persist.object.peek',
474 method => 'shift_stack',
477 __PACKAGE__->register_method(
478 api_name => 'opensrf.persist.object.get',
479 method => 'shift_stack',