1 package OpenSRF::Application::Persist;
2 use base qw/OpenSRF::Application/;
3 use OpenSRF::Application;
5 use OpenSRF::Utils::SettingsClient;
6 use OpenSRF::EX qw/:try/;
7 use OpenSRF::Utils qw/:common/;
8 use OpenSRF::Utils::Logger;
12 use vars qw/$dbh $log $default_expire_time/;
15 $log = 'OpenSRF::Utils::Logger';
17 $sc = OpenSRF::Utils::SettingsClient->new;
19 my $dbfile = $sc->config_value( apps => persist => app_settings => 'dbfile');
21 throw OpenSRF::EX::PANIC ("Can't find my dbfile for SQLite!");
24 my $init_dbh = DBI->connect("dbi:SQLite:dbname=$dbfile","","");
25 $init_dbh->{AutoCommit} = 1;
26 $init_dbh->{RaiseError} = 0;
28 $init_dbh->do( <<" SQL" );
29 CREATE TABLE storage (
30 id INTEGER PRIMARY KEY,
36 $init_dbh->do( <<" SQL" );
37 CREATE TABLE store_name (
38 id INTEGER PRIMARY KEY,
43 $init_dbh->do( <<" SQL" );
44 CREATE TABLE store_expire (
45 id INTEGER PRIMARY KEY,
47 expire_interval INTEGER
54 my $sc = OpenSRF::Utils::SettingsClient->new;
56 $default_expire_time = $sc->config_value( apps => persist => app_settings => 'default_expire_time' );
57 $default_expire_time ||= 300;
59 my $dbfile = $sc->config_value( apps => persist => app_settings => 'dbfile');
61 throw OpenSRF::EX::PANIC ("Can't find my dbfile for SQLite!");
64 $dbh = DBI->connect("dbi:SQLite:dbname=$dbfile","","");
65 $dbh->{AutoCommit} = 1;
66 $dbh->{RaiseError} = 0;
74 my $name = shift || '';
85 throw OpenSRF::EX::WARN ("Duplicate key: object name [$name] already exists! " . $dbh->errstr)
88 my $sth = $dbh->prepare("INSERT INTO store_name (name) VALUES (?);");
93 my $last_id = $dbh->last_insert_id(undef, undef, 'store_name', 'id');
94 $name = 'AUTOGENERATED!!'.$last_id;
95 $dbh->do("UPDATE store_name SET name = '$name' WHERE id = '$last_id';");
98 _flush_by_name($name);
101 __PACKAGE__->register_method(
102 api_name => 'opensrf.persist.slot.create',
103 method => 'create_store',
108 sub create_expirable_store {
111 my $name = shift || do { throw OpenSRF::EX::InvalidArg ("Expirable slots must be given a name!") };
112 my $time = shift || $default_expire_time;
115 __PACKAGE__->method_lookup( 'opensrf.persist.slot.create' )->run( $name );
118 if ($e->message =~ /^Duplicate key/o) {
123 $self->method_lookup('opensrf.persist.slot.set_expire')->run($name, $time);
127 __PACKAGE__->register_method(
128 api_name => 'opensrf.persist.slot.create_expirable',
129 method => 'create_expirable_store',
133 sub _update_expire_atime {
135 $dbh->do('UPDATE store_expire SET atime = ? WHERE id = ?', {}, time(), $id);
138 sub set_expire_interval {
142 my $new_interval = shift;
144 my $etime = interval_to_seconds($new_interval);
145 my $sid = _get_name_id($slot);
147 $dbh->do('DELETE FROM store_expire where id = ?', {}, $sid);
148 return $slot if ($etime == 0);
150 $dbh->do('INSERT INTO store_expire (id, atime, expire_interval) VALUES (?,?,?);',{},$sid,time(),$etime);
153 __PACKAGE__->register_method(
154 api_name => 'opensrf.persist.slot.set_expire',
155 method => 'set_expire_interval',
160 sub _sweep_expired_slots {
163 my @expired_slots = $dbh->selectcol_arrayref(<<" SQL", {}, time() );
164 SELECT id FROM store_expire WHERE (atime + expire_interval) <= ?;
167 return unless (@expired_slots);
169 $dbh->do('DELETE FROM storage WHERE name_id IN ('.join(',', map { '?' } @expired_slots).');', {}, @expired_slots);
170 for my $id (@expired_slots) {
171 _flush_by_name(_get_id_name($id), 1);
179 my $name = shift or do {
180 throw OpenSRF::EX::WARN ("No name specified!");
182 my $value = shift || '';
184 my $name_id = _get_name_id($name);
186 if ($self->api_name =~ /object/) {
187 $dbh->do('DELETE FROM storage WHERE name_id = ?;', {}, $name_id);
190 $dbh->do('INSERT INTO storage (name_id,value) VALUES (?,?);', {}, $name_id, JSON->perl2JSON($value));
192 _flush_by_name($name);
194 return 0 if ($dbh->err);
197 __PACKAGE__->register_method(
198 api_name => 'opensrf.persist.object.set',
199 method => 'add_item',
202 __PACKAGE__->register_method(
203 api_name => 'opensrf.persist.queue.push',
204 method => 'add_item',
207 __PACKAGE__->register_method(
208 api_name => 'opensrf.persist.stack.push',
209 method => 'add_item',
214 my $name = shift or do {
215 throw OpenSRF::EX::WARN ("No slot id specified!");
219 my $name_id = $dbh->selectcol_arrayref("SELECT name FROM store_name WHERE id = ?;", {}, $name);
221 if (!ref($name_id) || !defined($name_id->[0])) {
222 throw OpenSRF::EX::WARN ("Slot id [$name] does not exist!");
225 return $name_id->[0];
229 my $name = shift or do {
230 throw OpenSRF::EX::WARN ("No slot name specified!");
234 my $name_id = $dbh->selectcol_arrayref("SELECT id FROM store_name WHERE name = ?;", {}, $name);
236 if (!ref($name_id) || !defined($name_id->[0])) {
237 throw OpenSRF::EX::WARN ("Slot name [$name] does not exist!");
240 return $name_id->[0];
249 my $name_id = _get_name_id($name);
251 $dbh->do("DELETE FROM storage WHERE name_id = ?;", {}, $name_id);
252 _flush_by_name($name);
255 __PACKAGE__->register_method(
256 api_name => 'opensrf.persist.slot.destroy',
257 method => 'destroy_store',
263 my $no_sweep = shift;
265 my $name_id = _get_name_id($name);
268 _update_expire_atime($name);
269 _sweep_expired_slots();
272 if ($name =~ /^AUTOGENERATED!!/) {
273 my $count = $dbh->selectcol_arrayref("SELECT COUNT(*) FROM storage WHERE name_id = ?;", {}, $name_id);
274 if (!ref($count) || $$count[0] == 0) {
275 $dbh->do("DELETE FROM store_name WHERE name = ?;", {}, $name);
284 my $name = shift or do {
285 throw OpenSRF::EX::WARN ("No queue name specified!");
287 my $name_id = _get_name_id($name);
289 my $value = $dbh->selectrow_arrayref('SELECT id, value FROM storage WHERE name_id = ? ORDER BY id ASC LIMIT 1;', {}, $name_id);
290 $dbh->do('DELETE FROM storage WHERE id = ?;',{}, $value->[0]) unless ($self->api_name =~ /peek$/);
292 _flush_by_name($name);
294 return JSON->JSON2perl( $value->[1] );
296 __PACKAGE__->register_method(
297 api_name => 'opensrf.persist.queue.peek',
298 method => 'pop_queue',
301 __PACKAGE__->register_method(
302 api_name => 'opensrf.persist.queue.pop',
303 method => 'pop_queue',
312 my $name = shift or do {
313 throw OpenSRF::EX::WARN ("No slot name specified!");
315 my $name_id = _get_name_id($name);
318 $order = 'DESC' if ($self->api_name =~ /stack/o);
320 my $values = $dbh->selectall_arrayref("SELECT value FROM storage WHERE name_id = ? ORDER BY id $order;", {}, $name_id);
322 $client->respond( JSON->JSON2perl( $_->[0] ) ) for (@$values);
324 _flush_by_name($name);
327 __PACKAGE__->register_method(
328 api_name => 'opensrf.persist.queue.peek.all',
329 method => 'peek_slot',
333 __PACKAGE__->register_method(
334 api_name => 'opensrf.persist.stack.peek.all',
335 method => 'peek_slot',
345 my $name = shift or do {
346 throw OpenSRF::EX::WARN ("No queue name specified!");
348 my $name_id = _get_name_id($name);
350 my $value = $dbh->selectcol_arrayref('SELECT SUM(LENGTH(value)) FROM storage WHERE name_id = ?;', {}, $name_id);
352 return JSON->JSON2perl( $value->[0] );
354 __PACKAGE__->register_method(
355 api_name => 'opensrf.persist.queue.size',
356 method => 'shift_stack',
359 __PACKAGE__->register_method(
360 api_name => 'opensrf.persist.stack.size',
361 method => 'shift_stack',
364 __PACKAGE__->register_method(
365 api_name => 'opensrf.persist.object.size',
366 method => 'shift_stack',
374 my $name = shift or do {
375 throw OpenSRF::EX::WARN ("No queue name specified!");
377 my $name_id = _get_name_id($name);
379 my $value = $dbh->selectcol_arrayref('SELECT COUNT(*) FROM storage WHERE name_id = ?;', {}, $name_id);
381 return JSON->JSON2perl( $value->[0] );
383 __PACKAGE__->register_method(
384 api_name => 'opensrf.persist.queue.length',
385 method => 'shift_stack',
388 __PACKAGE__->register_method(
389 api_name => 'opensrf.persist.stack.depth',
390 method => 'shift_stack',
398 my $name = shift or do {
399 throw OpenSRF::EX::WARN ("No queue name specified!");
401 my $name_id = _get_name_id($name);
403 my $value = $dbh->selectrow_arrayref('SELECT id, value FROM storage WHERE name_id = ? ORDER BY id DESC LIMIT 1;', {}, $name_id);
404 $dbh->do('DELETE FROM storage WHERE id = ?;',{}, $value->[0]) unless ($self->api_name =~ /peek$/);
406 _flush_by_name($name);
408 return JSON->JSON2perl( $value->[1] );
410 __PACKAGE__->register_method(
411 api_name => 'opensrf.persist.stack.peek',
412 method => 'shift_stack',
415 __PACKAGE__->register_method(
416 api_name => 'opensrf.persist.stack.pop',
417 method => 'shift_stack',
425 my $name = shift or do {
426 throw OpenSRF::EX::WARN ("No object name specified!");
429 my $name_id = _get_name_id($name);
431 my $value = $dbh->selectrow_arrayref('SELECT name_id, value FROM storage WHERE name_id = ? ORDER BY id DESC LIMIT 1;', {}, $name_id);
432 $dbh->do('DELETE FROM storage WHERE name_id = ?',{}, $value->[0]) unless ($self->api_name =~ /peek$/);
434 _flush_by_name($name);
436 return JSON->JSON2perl( $value->[1] );
438 __PACKAGE__->register_method(
439 api_name => 'opensrf.persist.object.peek',
440 method => 'shift_stack',
443 __PACKAGE__->register_method(
444 api_name => 'opensrf.persist.object.get',
445 method => 'shift_stack',