1 package OpenSRF::Application::Persist;
2 use base qw/OpenSRF::Application/;
3 use OpenSRF::Application;
5 use OpenSRF::Utils::SettingsClient;
6 use OpenSRF::EX qw/:try/;
7 use OpenSRF::Utils qw/:common/;
8 use OpenSRF::Utils::Logger;
12 use vars qw/$dbh $log $default_expire_time/;
15 $log = 'OpenSRF::Utils::Logger';
17 $sc = OpenSRF::Utils::SettingsClient->new;
19 my $dbfile = $sc->config_value( apps => persist => app_settings => 'dbfile');
21 throw OpenSRF::EX::PANIC ("Can't find my dbfile for SQLite!");
24 my $init_dbh = DBI->connect("dbi:SQLite:dbname=$dbfile","","");
25 $init_dbh->{AutoCommit} = 1;
26 $init_dbh->{RaiseError} = 0;
28 $init_dbh->do( <<" SQL" );
29 CREATE TABLE storage (
30 id INTEGER PRIMARY KEY,
36 $init_dbh->do( <<" SQL" );
37 CREATE TABLE store_name (
38 id INTEGER PRIMARY KEY,
43 $init_dbh->do( <<" SQL" );
44 CREATE TABLE store_expire (
45 id INTEGER PRIMARY KEY,
47 expire_interval INTEGER
54 my $sc = OpenSRF::Utils::SettingsClient->new;
56 $default_expire_time = $sc->config_value( apps => persist => app_settings => 'default_expire_time' );
57 $default_expire_time ||= 300;
59 my $dbfile = $sc->config_value( apps => persist => app_settings => 'dbfile');
61 throw OpenSRF::EX::PANIC ("Can't find my dbfile for SQLite!");
64 $dbh = DBI->connect("dbi:SQLite:dbname=$dbfile","","");
65 $dbh->{AutoCommit} = 1;
66 $dbh->{RaiseError} = 0;
74 my $name = shift || '';
85 throw OpenSRF::EX::WARN ("Duplicate key: object name [$name] already exists! " . $dbh->errstr)
88 my $sth = $dbh->prepare("INSERT INTO store_name (name) VALUES (?);");
93 my $last_id = $dbh->last_insert_id(undef, undef, 'store_name', 'id');
94 $name = 'AUTOGENERATED!!'.$last_id;
95 $dbh->do("UPDATE store_name SET name = '$name' WHERE id = '$last_id';");
98 _flush_by_name($name);
101 __PACKAGE__->register_method(
102 api_name => 'opensrf.persist.slot.create',
103 method => 'create_store',
108 sub create_expirable_store {
111 my $name = shift || do { throw OpenSRF::EX::InvalidArg ("Expirable slots must be given a name!") };
112 my $time = shift || $default_expire_time;
115 __PACKAGE__->method_lookup( 'opensrf.persist.slot.create' )->run( $name );
118 if ($e->message =~ /^Duplicate key/o) {
123 $self->method_lookup('opensrf.persist.slot.set_expire')->run($name, $time);
127 __PACKAGE__->register_method(
128 api_name => 'opensrf.persist.slot.create_expirable',
129 method => 'create_expirable_store',
133 sub _update_expire_atime {
135 $dbh->do('UPDATE store_expire SET atime = ? WHERE id = ?', {}, time(), $id);
138 sub set_expire_interval {
142 my $new_interval = shift;
144 my $etime = interval_to_seconds($new_interval);
145 my $sid = _get_name_id($slot);
147 $dbh->do('DELETE FROM store_expire where id = ?', {}, $sid);
148 return $slot if ($etime == 0);
150 $dbh->do('INSERT INTO store_expire (id, atime, expire_interval) VALUES (?,?,?);',{},$sid,time(),$etime);
153 __PACKAGE__->register_method(
154 api_name => 'opensrf.persist.slot.set_expire',
155 method => 'set_expire_interval',
160 sub _sweep_expired_slots {
163 my $expired_slots = $dbh->selectcol_arrayref(<<" SQL", {}, time() );
164 SELECT id FROM store_expire WHERE (atime + expire_interval) <= ?;
167 return unless ($expired_slots);
169 $dbh->do('DELETE FROM storage WHERE name_id IN ('.join(',', map { '?' } @$expired_slots).');', {}, @$expired_slots);
170 for my $id (@$expired_slots) {
171 _flush_by_name(_get_id_name($id), 1);
179 my $name = shift or do {
180 throw OpenSRF::EX::WARN ("No name specified!");
182 my $value = shift || '';
184 my $name_id = _get_name_id($name);
186 if ($self->api_name =~ /object/) {
187 $dbh->do('DELETE FROM storage WHERE name_id = ?;', {}, $name_id);
190 $dbh->do('INSERT INTO storage (name_id,value) VALUES (?,?);', {}, $name_id, JSON->perl2JSON($value));
192 _flush_by_name($name);
194 return 0 if ($dbh->err);
197 __PACKAGE__->register_method(
198 api_name => 'opensrf.persist.object.set',
199 method => 'add_item',
202 __PACKAGE__->register_method(
203 api_name => 'opensrf.persist.queue.push',
204 method => 'add_item',
207 __PACKAGE__->register_method(
208 api_name => 'opensrf.persist.stack.push',
209 method => 'add_item',
214 my $name = shift or do {
215 throw OpenSRF::EX::WARN ("No slot id specified!");
219 my $name_id = $dbh->selectcol_arrayref("SELECT name FROM store_name WHERE id = ?;", {}, $name);
221 if (!ref($name_id) || !defined($name_id->[0])) {
222 throw OpenSRF::EX::WARN ("Slot id [$name] does not exist!");
225 return $name_id->[0];
229 my $name = shift or do {
230 throw OpenSRF::EX::WARN ("No slot name specified!");
234 my $name_id = $dbh->selectrow_arrayref("SELECT id FROM store_name WHERE name = ?;", {}, $name);
236 if (!ref($name_id) || !defined($name_id->[0])) {
237 throw OpenSRF::EX::WARN ("Slot name [$name] does not exist!");
240 return $name_id->[0];
249 my $name_id = _get_name_id($name);
251 $dbh->do("DELETE FROM storage WHERE name_id = ?;", {}, $name_id);
252 $dbh->do("DELETE FROM store_name WHERE id = ?;", {}, $name_id);
253 $dbh->do("DELETE FROM store_expire WHERE id = ?;", {}, $name_id);
254 _sweep_expired_slots();
258 __PACKAGE__->register_method(
259 api_name => 'opensrf.persist.slot.destroy',
260 method => 'destroy_store',
266 my $no_sweep = shift;
268 my $name_id = _get_name_id($name);
271 _update_expire_atime($name);
272 _sweep_expired_slots();
275 if ($name =~ /^AUTOGENERATED!!/) {
276 my $count = $dbh->selectcol_arrayref("SELECT COUNT(*) FROM storage WHERE name_id = ?;", {}, $name_id);
277 if (!ref($count) || $$count[0] == 0) {
278 $dbh->do("DELETE FROM store_name WHERE name = ?;", {}, $name);
287 my $name = shift or do {
288 throw OpenSRF::EX::WARN ("No queue name specified!");
290 my $name_id = _get_name_id($name);
292 my $value = $dbh->selectrow_arrayref('SELECT id, value FROM storage WHERE name_id = ? ORDER BY id ASC LIMIT 1;', {}, $name_id);
293 $dbh->do('DELETE FROM storage WHERE id = ?;',{}, $value->[0]) unless ($self->api_name =~ /peek$/);
295 _flush_by_name($name);
297 return JSON->JSON2perl( $value->[1] );
299 __PACKAGE__->register_method(
300 api_name => 'opensrf.persist.queue.peek',
301 method => 'pop_queue',
304 __PACKAGE__->register_method(
305 api_name => 'opensrf.persist.queue.pop',
306 method => 'pop_queue',
315 my $name = shift or do {
316 throw OpenSRF::EX::WARN ("No slot name specified!");
318 my $name_id = _get_name_id($name);
321 $order = 'DESC' if ($self->api_name =~ /stack/o);
323 my $values = $dbh->selectall_arrayref("SELECT value FROM storage WHERE name_id = ? ORDER BY id $order;", {}, $name_id);
325 $client->respond( JSON->JSON2perl( $_->[0] ) ) for (@$values);
327 _flush_by_name($name);
330 __PACKAGE__->register_method(
331 api_name => 'opensrf.persist.queue.peek.all',
332 method => 'peek_slot',
336 __PACKAGE__->register_method(
337 api_name => 'opensrf.persist.stack.peek.all',
338 method => 'peek_slot',
348 my $name = shift or do {
349 throw OpenSRF::EX::WARN ("No queue name specified!");
351 my $name_id = _get_name_id($name);
353 my $value = $dbh->selectcol_arrayref('SELECT SUM(LENGTH(value)) FROM storage WHERE name_id = ?;', {}, $name_id);
355 return JSON->JSON2perl( $value->[0] );
357 __PACKAGE__->register_method(
358 api_name => 'opensrf.persist.queue.size',
359 method => 'shift_stack',
362 __PACKAGE__->register_method(
363 api_name => 'opensrf.persist.stack.size',
364 method => 'shift_stack',
367 __PACKAGE__->register_method(
368 api_name => 'opensrf.persist.object.size',
369 method => 'shift_stack',
377 my $name = shift or do {
378 throw OpenSRF::EX::WARN ("No queue name specified!");
380 my $name_id = _get_name_id($name);
382 my $value = $dbh->selectcol_arrayref('SELECT COUNT(*) FROM storage WHERE name_id = ?;', {}, $name_id);
384 return JSON->JSON2perl( $value->[0] );
386 __PACKAGE__->register_method(
387 api_name => 'opensrf.persist.queue.length',
388 method => 'shift_stack',
391 __PACKAGE__->register_method(
392 api_name => 'opensrf.persist.stack.depth',
393 method => 'shift_stack',
401 my $name = shift or do {
402 throw OpenSRF::EX::WARN ("No queue name specified!");
404 my $name_id = _get_name_id($name);
406 my $value = $dbh->selectrow_arrayref('SELECT id, value FROM storage WHERE name_id = ? ORDER BY id DESC LIMIT 1;', {}, $name_id);
407 $dbh->do('DELETE FROM storage WHERE id = ?;',{}, $value->[0]) unless ($self->api_name =~ /peek$/);
409 _flush_by_name($name);
411 return JSON->JSON2perl( $value->[1] );
413 __PACKAGE__->register_method(
414 api_name => 'opensrf.persist.stack.peek',
415 method => 'shift_stack',
418 __PACKAGE__->register_method(
419 api_name => 'opensrf.persist.stack.pop',
420 method => 'shift_stack',
428 my $name = shift or do {
429 throw OpenSRF::EX::WARN ("No object name specified!");
432 my $name_id = _get_name_id($name);
434 my $value = $dbh->selectrow_arrayref('SELECT name_id, value FROM storage WHERE name_id = ? ORDER BY id DESC LIMIT 1;', {}, $name_id);
435 $dbh->do('DELETE FROM storage WHERE name_id = ?',{}, $value->[0]) unless ($self->api_name =~ /peek$/);
437 _flush_by_name($name);
439 return JSON->JSON2perl( $value->[1] );
441 __PACKAGE__->register_method(
442 api_name => 'opensrf.persist.object.peek',
443 method => 'shift_stack',
446 __PACKAGE__->register_method(
447 api_name => 'opensrf.persist.object.get',
448 method => 'shift_stack',