]> git.evergreen-ils.org Git - OpenSRF.git/blob - src/perlmods/OpenSRF/Application/Persist.pm
a6d640847184bbba4dbf95977ad153073f656607
[OpenSRF.git] / src / perlmods / OpenSRF / Application / Persist.pm
1 package OpenSRF::Application::Persist;
2 use base qw/OpenSRF::Application/;
3 use OpenSRF::Application;
4
5 use OpenSRF::Utils::SettingsClient;
6 use OpenSRF::EX qw/:try/;
7 use OpenSRF::Utils qw/:common/;
8 use OpenSRF::Utils::Logger;
9 use JSON;
10 use DBI;
11
12 use vars qw/$dbh $log $default_expire_time/;
13
14 sub initialize {
15         $log = 'OpenSRF::Utils::Logger';
16
17         $sc = OpenSRF::Utils::SettingsClient->new;
18
19         my $dbfile = $sc->config_value( apps => persist => app_settings => 'dbfile');
20         unless ($dbfile) {
21                 throw OpenSRF::EX::PANIC ("Can't find my dbfile for SQLite!");
22         }
23
24         my $init_dbh = DBI->connect("dbi:SQLite:dbname=$dbfile","","");
25         $init_dbh->{AutoCommit} = 1;
26         $init_dbh->{RaiseError} = 0;
27
28         $init_dbh->do( <<"      SQL" );
29                 CREATE TABLE storage (
30                         id      INTEGER PRIMARY KEY,
31                         name_id INTEGER,
32                         value   TEXT
33                 );
34         SQL
35
36         $init_dbh->do( <<"      SQL" );
37                 CREATE TABLE store_name (
38                         id      INTEGER PRIMARY KEY,
39                         name    TEXT UNIQUE
40                 );
41         SQL
42
43         $init_dbh->do( <<"      SQL" );
44                 CREATE TABLE store_expire (
45                         id              INTEGER PRIMARY KEY,
46                         atime           INTEGER,
47                         expire_interval INTEGER
48                 );
49         SQL
50
51 }
52
53 sub child_init {
54         my $sc = OpenSRF::Utils::SettingsClient->new;
55
56         $default_expire_time = $sc->config_value( apps => persist => app_settings => 'default_expire_time' );
57         $default_expire_time ||= 300;
58
59         my $dbfile = $sc->config_value( apps => persist => app_settings => 'dbfile');
60         unless ($dbfile) {
61                 throw OpenSRF::EX::PANIC ("Can't find my dbfile for SQLite!");
62         }
63
64         $dbh = DBI->connect("dbi:SQLite:dbname=$dbfile","","");
65         $dbh->{AutoCommit} = 1;
66         $dbh->{RaiseError} = 0;
67
68 }
69
70 sub create_store {
71         my $self = shift;
72         my $client = shift;
73
74         my $name = shift || '';
75
76         
77         my $continue = 0;
78         try {
79                 _get_name_id($name);
80
81         } catch Error with { 
82                 $continue++;
83         };
84
85         throw OpenSRF::EX::WARN ("Duplicate key:  object name [$name] already exists!  " . $dbh->errstr)
86                 unless ($continue);
87
88         my $sth = $dbh->prepare("INSERT INTO store_name (name) VALUES (?);");
89         $sth->execute($name);
90         $sth->finish;
91
92         unless ($name) {
93                 my $last_id = $dbh->last_insert_id(undef, undef, 'store_name', 'id');
94                 $name = 'AUTOGENERATED!!'.$last_id;
95                 $dbh->do("UPDATE store_name SET name = '$name' WHERE id = '$last_id';");
96         }
97
98         _flush_by_name($name);
99         return $name;
100 }
101 __PACKAGE__->register_method(
102         api_name => 'opensrf.persist.slot.create',
103         method => 'create_store',
104         argc => 1,
105 );
106
107
108 sub create_expirable_store {
109         my $self = shift;
110         my $client = shift;
111         my $name = shift || do { throw OpenSRF::EX::InvalidArg ("Expirable slots must be given a name!") };
112         my $time = shift || $default_expire_time;
113
114         try {
115                 __PACKAGE__->method_lookup( 'opensrf.persist.slot.create' )->run( $name );
116         } catch Error with {
117                 my $e = shift;
118                 if ($e->message =~ /^Duplicate key/o) {
119                         throw $e;
120                 }
121         };
122
123         $self->method_lookup('opensrf.persist.slot.set_expire')->run($name, $time);
124
125         return $name;
126 }
127 __PACKAGE__->register_method(
128         api_name => 'opensrf.persist.slot.create_expirable',
129         method => 'create_expirable_store',
130         argc => 2,
131 );
132
133 sub _update_expire_atime {
134         my $id = shift;
135         $dbh->do('UPDATE store_expire SET atime = ? WHERE id = ?', {}, time(), $id);
136 }
137
138 sub set_expire_interval {
139         my $self = shift;
140         my $client = shift;
141         my $slot = shift;
142         my $new_interval = shift;
143
144         my $etime = interval_to_seconds($new_interval);
145         my $sid = _get_name_id($slot);
146
147         $dbh->do('DELETE FROM store_expire where id = ?', {}, $sid);
148         return $slot if ($etime == 0);
149
150         $dbh->do('INSERT INTO store_expire (id, atime, expire_interval) VALUES (?,?,?);',{},$sid,time(),$etime);
151         return $slot;
152 }
153 __PACKAGE__->register_method(
154         api_name => 'opensrf.persist.slot.set_expire',
155         method => 'set_expire_interval',
156         argc => 2,
157 );
158
159
160 sub _sweep_expired_slots {
161         return if (shift());
162
163         my $expired_slots = $dbh->selectcol_arrayref(<<"        SQL", {}, time() );
164                 SELECT id FROM store_expire WHERE (atime + expire_interval) <= ?;
165         SQL
166
167         return unless ($expired_slots);
168
169         $dbh->do('DELETE FROM storage WHERE name_id IN ('.join(',', map { '?' } @$expired_slots).');', {}, @$expired_slots);
170         for my $id (@$expired_slots) {
171                 _flush_by_name(_get_id_name($id), 1);
172         }
173 }
174
175 sub add_item {
176         my $self = shift;
177         my $client = shift;
178
179         my $name = shift or do {
180                 throw OpenSRF::EX::WARN ("No name specified!");
181         };
182         my $value = shift || '';
183
184         my $name_id = _get_name_id($name);
185         
186         if ($self->api_name =~ /object/) {
187                 $dbh->do('DELETE FROM storage WHERE name_id = ?;', {}, $name_id);
188         }
189
190         $dbh->do('INSERT INTO storage (name_id,value) VALUES (?,?);', {}, $name_id, JSON->perl2JSON($value));
191
192         _flush_by_name($name);
193
194         return 0 if ($dbh->err);
195         return $name;
196 }
197 __PACKAGE__->register_method(
198         api_name => 'opensrf.persist.object.set',
199         method => 'add_item',
200         argc => 2,
201 );
202 __PACKAGE__->register_method(
203         api_name => 'opensrf.persist.queue.push',
204         method => 'add_item',
205         argc => 2,
206 );
207 __PACKAGE__->register_method(
208         api_name => 'opensrf.persist.stack.push',
209         method => 'add_item',
210         argc => 2,
211 );
212
213 sub _get_id_name {
214         my $name = shift or do {
215                 throw OpenSRF::EX::WARN ("No slot id specified!");
216         };
217
218
219         my $name_id = $dbh->selectcol_arrayref("SELECT name FROM store_name WHERE id = ?;", {}, $name);
220
221         if (!ref($name_id) || !defined($name_id->[0])) {
222                 throw OpenSRF::EX::WARN ("Slot id [$name] does not exist!");
223         }
224
225         return $name_id->[0];
226 }
227
228 sub _get_name_id {
229         my $name = shift or do {
230                 throw OpenSRF::EX::WARN ("No slot name specified!");
231         };
232
233
234         my $name_id = $dbh->selectrow_arrayref("SELECT id FROM store_name WHERE name = ?;", {}, $name);
235
236         if (!ref($name_id) || !defined($name_id->[0])) {
237                 throw OpenSRF::EX::WARN ("Slot name [$name] does not exist!");
238         }
239
240         return $name_id->[0];
241 }
242
243 sub destroy_store {
244         my $self = shift;
245         my $client = shift;
246
247         my $name = shift;
248
249         my $name_id = _get_name_id($name);
250
251         $dbh->do("DELETE FROM storage WHERE name_id = ?;", {}, $name_id);
252         $dbh->do("DELETE FROM store_name WHERE id = ?;", {}, $name_id);
253         $dbh->do("DELETE FROM store_expire WHERE id = ?;", {}, $name_id);
254         _sweep_expired_slots();
255
256         return 1;
257 }
258 __PACKAGE__->register_method(
259         api_name => 'opensrf.persist.slot.destroy',
260         method => 'destroy_store',
261         argc => 1,
262 );
263
264 sub _flush_by_name {
265         my $name = shift;
266         my $no_sweep = shift;
267  
268         my $name_id = _get_name_id($name);
269
270         unless ($no_sweep) {
271                 _update_expire_atime($name);
272                 _sweep_expired_slots();
273         }
274         
275         if ($name =~ /^AUTOGENERATED!!/) {
276                 my $count = $dbh->selectcol_arrayref("SELECT COUNT(*) FROM storage WHERE name_id = ?;", {}, $name_id);
277                 if (!ref($count) || $$count[0] == 0) {
278                         $dbh->do("DELETE FROM store_name WHERE name = ?;", {}, $name);
279                 }
280         }
281 }
282         
283 sub pop_queue {
284         my $self = shift;
285         my $client = shift;
286
287         my $name = shift or do {
288                 throw OpenSRF::EX::WARN ("No queue name specified!");
289         };
290         my $name_id = _get_name_id($name);
291
292         my $value = $dbh->selectrow_arrayref('SELECT id, value FROM storage WHERE name_id = ? ORDER BY id ASC LIMIT 1;', {}, $name_id);
293         $dbh->do('DELETE FROM storage WHERE id = ?;',{}, $value->[0]) unless ($self->api_name =~ /peek$/);
294
295         _flush_by_name($name);
296
297         return JSON->JSON2perl( $value->[1] );
298 }
299 __PACKAGE__->register_method(
300         api_name => 'opensrf.persist.queue.peek',
301         method => 'pop_queue',
302         argc => 1,
303 );
304 __PACKAGE__->register_method(
305         api_name => 'opensrf.persist.queue.pop',
306         method => 'pop_queue',
307         argc => 1,
308 );
309
310
311 sub peek_slot {
312         my $self = shift;
313         my $client = shift;
314
315         my $name = shift or do {
316                 throw OpenSRF::EX::WARN ("No slot name specified!");
317         };
318         my $name_id = _get_name_id($name);
319
320         my $order = 'ASC';
321         $order = 'DESC' if ($self->api_name =~ /stack/o);
322         
323         my $values = $dbh->selectall_arrayref("SELECT value FROM storage WHERE name_id = ? ORDER BY id $order;", {}, $name_id);
324
325         $client->respond( JSON->JSON2perl( $_->[0] ) ) for (@$values);
326
327         _flush_by_name($name);
328         return undef;
329 }
330 __PACKAGE__->register_method(
331         api_name => 'opensrf.persist.queue.peek.all',
332         method => 'peek_slot',
333         argc => 1,
334         stream => 1,
335 );
336 __PACKAGE__->register_method(
337         api_name => 'opensrf.persist.stack.peek.all',
338         method => 'peek_slot',
339         argc => 1,
340         stream => 1,
341 );
342
343
344 sub store_size {
345         my $self = shift;
346         my $client = shift;
347
348         my $name = shift or do {
349                 throw OpenSRF::EX::WARN ("No queue name specified!");
350         };
351         my $name_id = _get_name_id($name);
352
353         my $value = $dbh->selectcol_arrayref('SELECT SUM(LENGTH(value)) FROM storage WHERE name_id = ?;', {}, $name_id);
354
355         return JSON->JSON2perl( $value->[0] );
356 }
357 __PACKAGE__->register_method(
358         api_name => 'opensrf.persist.queue.size',
359         method => 'shift_stack',
360         argc => 1,
361 );
362 __PACKAGE__->register_method(
363         api_name => 'opensrf.persist.stack.size',
364         method => 'shift_stack',
365         argc => 1,
366 );
367 __PACKAGE__->register_method(
368         api_name => 'opensrf.persist.object.size',
369         method => 'shift_stack',
370         argc => 1,
371 );
372
373 sub store_depth {
374         my $self = shift;
375         my $client = shift;
376
377         my $name = shift or do {
378                 throw OpenSRF::EX::WARN ("No queue name specified!");
379         };
380         my $name_id = _get_name_id($name);
381
382         my $value = $dbh->selectcol_arrayref('SELECT COUNT(*) FROM storage WHERE name_id = ?;', {}, $name_id);
383
384         return JSON->JSON2perl( $value->[0] );
385 }
386 __PACKAGE__->register_method(
387         api_name => 'opensrf.persist.queue.length',
388         method => 'shift_stack',
389         argc => 1,
390 );
391 __PACKAGE__->register_method(
392         api_name => 'opensrf.persist.stack.depth',
393         method => 'shift_stack',
394         argc => 1,
395 );
396
397 sub shift_stack {
398         my $self = shift;
399         my $client = shift;
400
401         my $name = shift or do {
402                 throw OpenSRF::EX::WARN ("No queue name specified!");
403         };
404         my $name_id = _get_name_id($name);
405
406         my $value = $dbh->selectrow_arrayref('SELECT id, value FROM storage WHERE name_id = ? ORDER BY id DESC LIMIT 1;', {}, $name_id);
407         $dbh->do('DELETE FROM storage WHERE id = ?;',{}, $value->[0]) unless ($self->api_name =~ /peek$/);
408
409         _flush_by_name($name);
410
411         return JSON->JSON2perl( $value->[1] );
412 }
413 __PACKAGE__->register_method(
414         api_name => 'opensrf.persist.stack.peek',
415         method => 'shift_stack',
416         argc => 1,
417 );
418 __PACKAGE__->register_method(
419         api_name => 'opensrf.persist.stack.pop',
420         method => 'shift_stack',
421         argc => 1,
422 );
423
424 sub get_object {
425         my $self = shift;
426         my $client = shift;
427
428         my $name = shift or do {
429                 throw OpenSRF::EX::WARN ("No object name specified!");
430         };
431
432         my $name_id = _get_name_id($name);
433
434         my $value = $dbh->selectrow_arrayref('SELECT name_id, value FROM storage WHERE name_id = ? ORDER BY id DESC LIMIT 1;', {}, $name_id);
435         $dbh->do('DELETE FROM storage WHERE name_id = ?',{}, $value->[0]) unless ($self->api_name =~ /peek$/);
436
437         _flush_by_name($name);
438
439         return JSON->JSON2perl( $value->[1] );
440 }
441 __PACKAGE__->register_method(
442         api_name => 'opensrf.persist.object.peek',
443         method => 'shift_stack',
444         argc => 1,
445 );
446 __PACKAGE__->register_method(
447         api_name => 'opensrf.persist.object.get',
448         method => 'shift_stack',
449         argc => 1,
450 );
451
452 1;