]> git.evergreen-ils.org Git - OpenSRF.git/blob - src/perlmods/OpenSRF/Application/Persist.pm
added .create_expirable interface. accepts a Utils style interval, and sweeps at...
[OpenSRF.git] / src / perlmods / OpenSRF / Application / Persist.pm
1 package OpenSRF::Application::Persist;
2 use base qw/OpenSRF::Application/;
3 use OpenSRF::Application;
4
5 use OpenSRF::Utils::SettingsClient;
6 use OpenSRF::EX qw/:try/;
7 use OpenSRF::Utils qw/:common/;
8 use OpenSRF::Utils::Logger;
9 use JSON;
10 use DBI;
11
12 use vars qw/$dbh $log $default_expire_time/;
13
14 sub initialize {
15         $log = 'OpenSRF::Utils::Logger';
16
17         $sc = OpenSRF::Utils::SettingsClient->new;
18
19         my $dbfile = $sc->config_value( apps => persist => app_settings => 'dbfile');
20         unless ($dbfile) {
21                 throw OpenSRF::EX::PANIC ("Can't find my dbfile for SQLite!");
22         }
23
24         my $init_dbh = DBI->connect("dbi:SQLite:dbname=$dbfile","","");
25         $init_dbh->{AutoCommit} = 1;
26         $init_dbh->{RaiseError} = 0;
27
28         $init_dbh->do( <<"      SQL" );
29                 CREATE TABLE storage (
30                         id      INTEGER PRIMARY KEY,
31                         name_id INTEGER,
32                         value   TEXT
33                 );
34         SQL
35
36         $init_dbh->do( <<"      SQL" );
37                 CREATE TABLE store_name (
38                         id      INTEGER PRIMARY KEY,
39                         name    TEXT UNIQUE
40                 );
41         SQL
42
43         $init_dbh->do( <<"      SQL" );
44                 CREATE TABLE store_expire (
45                         id              INTEGER PRIMARY KEY,
46                         atime           INTEGER,
47                         expire_interval INTEGER
48                 );
49         SQL
50
51 }
52
53 sub child_init {
54         my $sc = OpenSRF::Utils::SettingsClient->new;
55
56         $default_expire_time = $sc->config_value( apps => persist => app_settings => 'default_expire_time' );
57         $default_expire_time ||= 300;
58
59         my $dbfile = $sc->config_value( apps => persist => app_settings => 'dbfile');
60         unless ($dbfile) {
61                 throw OpenSRF::EX::PANIC ("Can't find my dbfile for SQLite!");
62         }
63
64         $dbh = DBI->connect("dbi:SQLite:dbname=$dbfile","","");
65         $dbh->{AutoCommit} = 1;
66         $dbh->{RaiseError} = 0;
67
68 }
69
70 sub create_store {
71         my $self = shift;
72         my $client = shift;
73
74         my $name = shift || '';
75
76         
77         my $continue = 0;
78         try {
79                 _get_name_id($name);
80
81         } catch Error with { 
82                 $continue++;
83         };
84
85         throw OpenSRF::EX::WARN ("Duplicate key:  object name [$name] already exists!  " . $dbh->errstr)
86                 unless ($continue);
87
88         my $sth = $dbh->prepare("INSERT INTO store_name (name) VALUES (?);");
89         $sth->execute($name);
90         $sth->finish;
91
92         unless ($name) {
93                 my $last_id = $dbh->last_insert_id(undef, undef, 'store_name', 'id');
94                 $name = 'AUTOGENERATED!!'.$last_id;
95                 $dbh->do("UPDATE store_name SET name = '$name' WHERE id = '$last_id';");
96         }
97
98         return $name;
99 }
100 __PACKAGE__->register_method(
101         api_name => 'opensrf.persist.slot.create',
102         method => 'create_store',
103         argc => 1,
104 );
105
106
107 sub create_expirable_store {
108         my $self = shift;
109         my $client = shift;
110         my $name = shift || do { throw OpenSRF::EX::InvalidArg ("Expirable slots must be given a name!") };
111         my $time = shift || $default_expire_time;
112
113         try {
114                 __PACKAGE__->method_lookup( 'opensrf.persist.slot.create' )->run( $name );
115         } catch Error with {
116                 my $e = shift;
117                 if ($e->message =~ /^Duplicate key/o) {
118                         throw $e;
119                 }
120         };
121
122         my $name_id = _get_name_id($name);
123         my $atime = time;
124         my $etime = interval_to_seconds($time);
125
126         $dbh->do('INSERT INTO store_expire (id, atime, expire_interval) VALUES (?,?,?);',{},$name_id,$atime,$etime);
127
128         return $name;
129 }
130 __PACKAGE__->register_method(
131         api_name => 'opensrf.persist.slot.create_expirable',
132         method => 'create_expirable_store',
133         argc => 2,
134 );
135
136 sub _update_expire_atime {
137         my $id = shift;
138         $dbh->do('UPDATE store_expire SET atime = ? WHERE id = ?', {}, time(), $id);
139 }
140
141 sub _sweep_expired_slots {
142         return if (shift());
143
144         my @expired_slots = $dbh->selectcol_arrayref(<<"        SQL", {}, time() );
145                 SELECT id FROM store_expire WHERE (atime + expire_interval) <= ?;
146         SQL
147
148         return unless (@expired_slots);
149
150         $dbh->do('DELETE FROM storage WHERE name_id IN ('.join(',', map { '?' } @expired_slots).');', {}, @expired_slots);
151         for my $id (@expired_slots) {
152                 _flush_by_name(_get_id_name($id), 1);
153         }
154 }
155
156 sub add_item {
157         my $self = shift;
158         my $client = shift;
159
160         my $name = shift or do {
161                 throw OpenSRF::EX::WARN ("No name specified!");
162         };
163         my $value = shift || '';
164
165         my $name_id = _get_name_id($name);
166         
167         if ($self->api_name =~ /object/) {
168                 $dbh->do('DELETE FROM storage WHERE name_id = ?;', {}, $name_id);
169         }
170
171         $dbh->do('INSERT INTO storage (name_id,value) VALUES (?,?);', {}, $name_id, JSON->perl2JSON($value));
172
173         return 0 if ($dbh->err);
174         return $name;
175 }
176 __PACKAGE__->register_method(
177         api_name => 'opensrf.persist.object.set',
178         method => 'add_item',
179         argc => 2,
180 );
181 __PACKAGE__->register_method(
182         api_name => 'opensrf.persist.queue.push',
183         method => 'add_item',
184         argc => 2,
185 );
186 __PACKAGE__->register_method(
187         api_name => 'opensrf.persist.stack.push',
188         method => 'add_item',
189         argc => 2,
190 );
191
192 sub _get_id_name {
193         my $name = shift or do {
194                 throw OpenSRF::EX::WARN ("No slot id specified!");
195         };
196
197
198         my $name_id = $dbh->selectcol_arrayref("SELECT name FROM store_name WHERE id = ?;", {}, $name);
199
200         if (!ref($name_id) || !defined($name_id->[0])) {
201                 throw OpenSRF::EX::WARN ("Slot id [$name] does not exist!");
202         }
203
204         return $name_id->[0];
205 }
206
207 sub _get_name_id {
208         my $name = shift or do {
209                 throw OpenSRF::EX::WARN ("No slot name specified!");
210         };
211
212
213         my $name_id = $dbh->selectcol_arrayref("SELECT id FROM store_name WHERE name = ?;", {}, $name);
214
215         if (!ref($name_id) || !defined($name_id->[0])) {
216                 throw OpenSRF::EX::WARN ("Slot name [$name] does not exist!");
217         }
218
219         return $name_id->[0];
220 }
221
222 sub destroy_store {
223         my $self = shift;
224         my $client = shift;
225
226         my $name = shift;
227
228         my $name_id = _get_name_id($name);
229
230         $dbh->do("DELETE FROM storage WHERE name_id = ?;", {}, $name_id);
231         _flush_by_name($name);
232
233 }
234 __PACKAGE__->register_method(
235         api_name => 'opensrf.persist.slot.destroy',
236         method => 'destroy_store',
237         argc => 1,
238 );
239
240 sub _flush_by_name {
241         my $name = shift;
242         my $no_sweep = shift;
243
244         _sweep_expired_slots() unless ($no_sweep);
245         
246         if ($name =~ /^AUTOGENERATED!!/) {
247                 my $name_id = _get_name_id($name);
248                 my $count = $dbh->selectcol_arrayref("SELECT COUNT(*) FROM storage WHERE name_id = ?;", {}, $name_id);
249                 if (!ref($count) || $$count[0] == 0) {
250                         $dbh->do("DELETE FROM store_name WHERE name = ?;", {}, $name);
251                 }
252         }
253 }
254         
255 sub pop_queue {
256         my $self = shift;
257         my $client = shift;
258
259         my $name = shift or do {
260                 throw OpenSRF::EX::WARN ("No queue name specified!");
261         };
262         my $name_id = _get_name_id($name);
263
264         my $value = $dbh->selectrow_arrayref('SELECT id, value FROM storage WHERE name_id = ? ORDER BY id ASC LIMIT 1;', {}, $name_id);
265         $dbh->do('DELETE FROM storage WHERE id = ?;',{}, $value->[0]) unless ($self->api_name =~ /peek$/);
266
267         _flush_by_name($name);
268
269         return JSON->JSON2perl( $value->[1] );
270 }
271 __PACKAGE__->register_method(
272         api_name => 'opensrf.persist.queue.peek',
273         method => 'pop_queue',
274         argc => 1,
275 );
276 __PACKAGE__->register_method(
277         api_name => 'opensrf.persist.queue.pop',
278         method => 'pop_queue',
279         argc => 1,
280 );
281
282
283 sub peek_slot {
284         my $self = shift;
285         my $client = shift;
286
287         my $name = shift or do {
288                 throw OpenSRF::EX::WARN ("No slot name specified!");
289         };
290         my $name_id = _get_name_id($name);
291
292         my $order = 'ASC';
293         $order = 'DESC' if ($self->api_name =~ /stack/o);
294         
295         my $values = $dbh->selectall_arrayref("SELECT value FROM storage WHERE name_id = ? ORDER BY id $order;", {}, $name_id);
296
297         $client->respond( JSON->JSON2perl( $_->[0] ) ) for (@$values);
298
299         return undef;
300 }
301 __PACKAGE__->register_method(
302         api_name => 'opensrf.persist.queue.peek.all',
303         method => 'peek_slot',
304         argc => 1,
305         stream => 1,
306 );
307 __PACKAGE__->register_method(
308         api_name => 'opensrf.persist.stack.peek.all',
309         method => 'peek_slot',
310         argc => 1,
311         stream => 1,
312 );
313
314
315 sub store_size {
316         my $self = shift;
317         my $client = shift;
318
319         my $name = shift or do {
320                 throw OpenSRF::EX::WARN ("No queue name specified!");
321         };
322         my $name_id = _get_name_id($name);
323
324         my $value = $dbh->selectcol_arrayref('SELECT SUM(LENGTH(value)) FROM storage WHERE name_id = ?;', {}, $name_id);
325
326         return JSON->JSON2perl( $value->[0] );
327 }
328 __PACKAGE__->register_method(
329         api_name => 'opensrf.persist.queue.size',
330         method => 'shift_stack',
331         argc => 1,
332 );
333 __PACKAGE__->register_method(
334         api_name => 'opensrf.persist.stack.size',
335         method => 'shift_stack',
336         argc => 1,
337 );
338 __PACKAGE__->register_method(
339         api_name => 'opensrf.persist.object.size',
340         method => 'shift_stack',
341         argc => 1,
342 );
343
344 sub store_depth {
345         my $self = shift;
346         my $client = shift;
347
348         my $name = shift or do {
349                 throw OpenSRF::EX::WARN ("No queue name specified!");
350         };
351         my $name_id = _get_name_id($name);
352
353         my $value = $dbh->selectcol_arrayref('SELECT COUNT(*) FROM storage WHERE name_id = ?;', {}, $name_id);
354
355         return JSON->JSON2perl( $value->[0] );
356 }
357 __PACKAGE__->register_method(
358         api_name => 'opensrf.persist.queue.length',
359         method => 'shift_stack',
360         argc => 1,
361 );
362 __PACKAGE__->register_method(
363         api_name => 'opensrf.persist.stack.depth',
364         method => 'shift_stack',
365         argc => 1,
366 );
367
368 sub shift_stack {
369         my $self = shift;
370         my $client = shift;
371
372         my $name = shift or do {
373                 throw OpenSRF::EX::WARN ("No queue name specified!");
374         };
375         my $name_id = _get_name_id($name);
376
377         my $value = $dbh->selectrow_arrayref('SELECT id, value FROM storage WHERE name_id = ? ORDER BY id DESC LIMIT 1;', {}, $name_id);
378         $dbh->do('DELETE FROM storage WHERE id = ?;',{}, $value->[0]) unless ($self->api_name =~ /peek$/);
379
380         _flush_by_name($name);
381
382         return JSON->JSON2perl( $value->[1] );
383 }
384 __PACKAGE__->register_method(
385         api_name => 'opensrf.persist.stack.peek',
386         method => 'shift_stack',
387         argc => 1,
388 );
389 __PACKAGE__->register_method(
390         api_name => 'opensrf.persist.stack.pop',
391         method => 'shift_stack',
392         argc => 1,
393 );
394
395 sub get_object {
396         my $self = shift;
397         my $client = shift;
398
399         my $name = shift or do {
400                 throw OpenSRF::EX::WARN ("No object name specified!");
401         };
402
403         my $name_id = _get_name_id($name);
404
405         my $value = $dbh->selectrow_arrayref('SELECT name_id, value FROM storage WHERE name_id = ? ORDER BY id DESC LIMIT 1;', {}, $name_id);
406         $dbh->do('DELETE FROM storage WHERE name_id = ?',{}, $value->[0]) unless ($self->api_name =~ /peek$/);
407
408         _flush_by_name($name);
409
410         return JSON->JSON2perl( $value->[1] );
411 }
412 __PACKAGE__->register_method(
413         api_name => 'opensrf.persist.object.peek',
414         method => 'shift_stack',
415         argc => 1,
416 );
417 __PACKAGE__->register_method(
418         api_name => 'opensrf.persist.object.get',
419         method => 'shift_stack',
420         argc => 1,
421 );
422
423 1;