1 package OpenSRF::Application::Persist;
2 use base qw/OpenSRF::Application/;
3 use OpenSRF::Application;
5 use OpenSRF::Utils::SettingsClient;
6 use OpenSRF::EX qw/:try/;
7 use OpenSRF::Utils qw/:common/;
8 use OpenSRF::Utils::Logger;
12 use vars qw/$dbh $log $default_expire_time/;
15 $log = 'OpenSRF::Utils::Logger';
17 $sc = OpenSRF::Utils::SettingsClient->new;
19 my $dbfile = $sc->config_value( apps => persist => app_settings => 'dbfile');
21 throw OpenSRF::EX::PANIC ("Can't find my dbfile for SQLite!");
24 my $init_dbh = DBI->connect("dbi:SQLite:dbname=$dbfile","","");
25 $init_dbh->{AutoCommit} = 1;
26 $init_dbh->{RaiseError} = 0;
28 $init_dbh->do( <<" SQL" );
29 CREATE TABLE storage (
30 id INTEGER PRIMARY KEY,
36 $init_dbh->do( <<" SQL" );
37 CREATE TABLE store_name (
38 id INTEGER PRIMARY KEY,
43 $init_dbh->do( <<" SQL" );
44 CREATE TABLE store_expire (
45 id INTEGER PRIMARY KEY,
47 expire_interval INTEGER
54 my $sc = OpenSRF::Utils::SettingsClient->new;
56 $default_expire_time = $sc->config_value( apps => persist => app_settings => 'default_expire_time' );
57 $default_expire_time ||= 300;
59 my $dbfile = $sc->config_value( apps => persist => app_settings => 'dbfile');
61 throw OpenSRF::EX::PANIC ("Can't find my dbfile for SQLite!");
64 $dbh = DBI->connect("dbi:SQLite:dbname=$dbfile","","");
65 $dbh->{AutoCommit} = 1;
66 $dbh->{RaiseError} = 0;
74 my $name = shift || '';
85 throw OpenSRF::EX::WARN ("Duplicate key: object name [$name] already exists! " . $dbh->errstr)
88 my $sth = $dbh->prepare("INSERT INTO store_name (name) VALUES (?);");
93 my $last_id = $dbh->last_insert_id(undef, undef, 'store_name', 'id');
94 $name = 'AUTOGENERATED!!'.$last_id;
95 $dbh->do("UPDATE store_name SET name = '$name' WHERE id = '$last_id';");
100 __PACKAGE__->register_method(
101 api_name => 'opensrf.persist.slot.create',
102 method => 'create_store',
107 sub create_expirable_store {
110 my $name = shift || do { throw OpenSRF::EX::InvalidArg ("Expirable slots must be given a name!") };
111 my $time = shift || $default_expire_time;
114 __PACKAGE__->method_lookup( 'opensrf.persist.slot.create' )->run( $name );
117 if ($e->message =~ /^Duplicate key/o) {
122 my $name_id = _get_name_id($name);
124 my $etime = interval_to_seconds($time);
126 $dbh->do('INSERT INTO store_expire (id, atime, expire_interval) VALUES (?,?,?);',{},$name_id,$atime,$etime);
130 __PACKAGE__->register_method(
131 api_name => 'opensrf.persist.slot.create_expirable',
132 method => 'create_expirable_store',
136 sub _update_expire_atime {
138 $dbh->do('UPDATE store_expire SET atime = ? WHERE id = ?', {}, time(), $id);
141 sub _sweep_expired_slots {
144 my @expired_slots = $dbh->selectcol_arrayref(<<" SQL", {}, time() );
145 SELECT id FROM store_expire WHERE (atime + expire_interval) <= ?;
148 return unless (@expired_slots);
150 $dbh->do('DELETE FROM storage WHERE name_id IN ('.join(',', map { '?' } @expired_slots).');', {}, @expired_slots);
151 for my $id (@expired_slots) {
152 _flush_by_name(_get_id_name($id), 1);
160 my $name = shift or do {
161 throw OpenSRF::EX::WARN ("No name specified!");
163 my $value = shift || '';
165 my $name_id = _get_name_id($name);
167 if ($self->api_name =~ /object/) {
168 $dbh->do('DELETE FROM storage WHERE name_id = ?;', {}, $name_id);
171 $dbh->do('INSERT INTO storage (name_id,value) VALUES (?,?);', {}, $name_id, JSON->perl2JSON($value));
173 return 0 if ($dbh->err);
176 __PACKAGE__->register_method(
177 api_name => 'opensrf.persist.object.set',
178 method => 'add_item',
181 __PACKAGE__->register_method(
182 api_name => 'opensrf.persist.queue.push',
183 method => 'add_item',
186 __PACKAGE__->register_method(
187 api_name => 'opensrf.persist.stack.push',
188 method => 'add_item',
193 my $name = shift or do {
194 throw OpenSRF::EX::WARN ("No slot id specified!");
198 my $name_id = $dbh->selectcol_arrayref("SELECT name FROM store_name WHERE id = ?;", {}, $name);
200 if (!ref($name_id) || !defined($name_id->[0])) {
201 throw OpenSRF::EX::WARN ("Slot id [$name] does not exist!");
204 return $name_id->[0];
208 my $name = shift or do {
209 throw OpenSRF::EX::WARN ("No slot name specified!");
213 my $name_id = $dbh->selectcol_arrayref("SELECT id FROM store_name WHERE name = ?;", {}, $name);
215 if (!ref($name_id) || !defined($name_id->[0])) {
216 throw OpenSRF::EX::WARN ("Slot name [$name] does not exist!");
219 return $name_id->[0];
228 my $name_id = _get_name_id($name);
230 $dbh->do("DELETE FROM storage WHERE name_id = ?;", {}, $name_id);
231 _flush_by_name($name);
234 __PACKAGE__->register_method(
235 api_name => 'opensrf.persist.slot.destroy',
236 method => 'destroy_store',
242 my $no_sweep = shift;
244 _sweep_expired_slots() unless ($no_sweep);
246 if ($name =~ /^AUTOGENERATED!!/) {
247 my $name_id = _get_name_id($name);
248 my $count = $dbh->selectcol_arrayref("SELECT COUNT(*) FROM storage WHERE name_id = ?;", {}, $name_id);
249 if (!ref($count) || $$count[0] == 0) {
250 $dbh->do("DELETE FROM store_name WHERE name = ?;", {}, $name);
259 my $name = shift or do {
260 throw OpenSRF::EX::WARN ("No queue name specified!");
262 my $name_id = _get_name_id($name);
264 my $value = $dbh->selectrow_arrayref('SELECT id, value FROM storage WHERE name_id = ? ORDER BY id ASC LIMIT 1;', {}, $name_id);
265 $dbh->do('DELETE FROM storage WHERE id = ?;',{}, $value->[0]) unless ($self->api_name =~ /peek$/);
267 _flush_by_name($name);
269 return JSON->JSON2perl( $value->[1] );
271 __PACKAGE__->register_method(
272 api_name => 'opensrf.persist.queue.peek',
273 method => 'pop_queue',
276 __PACKAGE__->register_method(
277 api_name => 'opensrf.persist.queue.pop',
278 method => 'pop_queue',
287 my $name = shift or do {
288 throw OpenSRF::EX::WARN ("No slot name specified!");
290 my $name_id = _get_name_id($name);
293 $order = 'DESC' if ($self->api_name =~ /stack/o);
295 my $values = $dbh->selectall_arrayref("SELECT value FROM storage WHERE name_id = ? ORDER BY id $order;", {}, $name_id);
297 $client->respond( JSON->JSON2perl( $_->[0] ) ) for (@$values);
301 __PACKAGE__->register_method(
302 api_name => 'opensrf.persist.queue.peek.all',
303 method => 'peek_slot',
307 __PACKAGE__->register_method(
308 api_name => 'opensrf.persist.stack.peek.all',
309 method => 'peek_slot',
319 my $name = shift or do {
320 throw OpenSRF::EX::WARN ("No queue name specified!");
322 my $name_id = _get_name_id($name);
324 my $value = $dbh->selectcol_arrayref('SELECT SUM(LENGTH(value)) FROM storage WHERE name_id = ?;', {}, $name_id);
326 return JSON->JSON2perl( $value->[0] );
328 __PACKAGE__->register_method(
329 api_name => 'opensrf.persist.queue.size',
330 method => 'shift_stack',
333 __PACKAGE__->register_method(
334 api_name => 'opensrf.persist.stack.size',
335 method => 'shift_stack',
338 __PACKAGE__->register_method(
339 api_name => 'opensrf.persist.object.size',
340 method => 'shift_stack',
348 my $name = shift or do {
349 throw OpenSRF::EX::WARN ("No queue name specified!");
351 my $name_id = _get_name_id($name);
353 my $value = $dbh->selectcol_arrayref('SELECT COUNT(*) FROM storage WHERE name_id = ?;', {}, $name_id);
355 return JSON->JSON2perl( $value->[0] );
357 __PACKAGE__->register_method(
358 api_name => 'opensrf.persist.queue.length',
359 method => 'shift_stack',
362 __PACKAGE__->register_method(
363 api_name => 'opensrf.persist.stack.depth',
364 method => 'shift_stack',
372 my $name = shift or do {
373 throw OpenSRF::EX::WARN ("No queue name specified!");
375 my $name_id = _get_name_id($name);
377 my $value = $dbh->selectrow_arrayref('SELECT id, value FROM storage WHERE name_id = ? ORDER BY id DESC LIMIT 1;', {}, $name_id);
378 $dbh->do('DELETE FROM storage WHERE id = ?;',{}, $value->[0]) unless ($self->api_name =~ /peek$/);
380 _flush_by_name($name);
382 return JSON->JSON2perl( $value->[1] );
384 __PACKAGE__->register_method(
385 api_name => 'opensrf.persist.stack.peek',
386 method => 'shift_stack',
389 __PACKAGE__->register_method(
390 api_name => 'opensrf.persist.stack.pop',
391 method => 'shift_stack',
399 my $name = shift or do {
400 throw OpenSRF::EX::WARN ("No object name specified!");
403 my $name_id = _get_name_id($name);
405 my $value = $dbh->selectrow_arrayref('SELECT name_id, value FROM storage WHERE name_id = ? ORDER BY id DESC LIMIT 1;', {}, $name_id);
406 $dbh->do('DELETE FROM storage WHERE name_id = ?',{}, $value->[0]) unless ($self->api_name =~ /peek$/);
408 _flush_by_name($name);
410 return JSON->JSON2perl( $value->[1] );
412 __PACKAGE__->register_method(
413 api_name => 'opensrf.persist.object.peek',
414 method => 'shift_stack',
417 __PACKAGE__->register_method(
418 api_name => 'opensrf.persist.object.get',
419 method => 'shift_stack',