]> git.evergreen-ils.org Git - OpenSRF.git/blob - src/perlmods/OpenSRF/Application/Persist.pm
e8298298c174e4aac247dff6d8b2263170000d43
[OpenSRF.git] / src / perlmods / OpenSRF / Application / Persist.pm
1 package OpenSRF::Application::Persist;
2 use base qw/OpenSRF::Application/;
3 use OpenSRF::Application;
4
5 use OpenSRF::Utils::SettingsClient;
6 use OpenSRF::EX qw/:try/;
7 use OpenSRF::Utils qw/:common/;
8 use OpenSRF::Utils::Logger;
9 use JSON;
10 use DBI;
11
12 use vars qw/$dbh $log $default_expire_time/;
13
14 sub initialize {
15         $log = 'OpenSRF::Utils::Logger';
16
17         $sc = OpenSRF::Utils::SettingsClient->new;
18
19         my $dbfile = $sc->config_value( apps => 'opensrf.persist' => app_settings => 'dbfile');
20         unless ($dbfile) {
21                 throw OpenSRF::EX::PANIC ("Can't find my dbfile for SQLite!");
22         }
23
24         my $init_dbh = DBI->connect("dbi:SQLite:dbname=$dbfile","","");
25         $init_dbh->{AutoCommit} = 1;
26         $init_dbh->{RaiseError} = 0;
27
28         $init_dbh->do( <<"      SQL" );
29                 CREATE TABLE storage (
30                         id      INTEGER PRIMARY KEY,
31                         name_id INTEGER,
32                         value   TEXT
33                 );
34         SQL
35
36         $init_dbh->do( <<"      SQL" );
37                 CREATE TABLE store_name (
38                         id      INTEGER PRIMARY KEY,
39                         name    TEXT UNIQUE
40                 );
41         SQL
42
43         $init_dbh->do( <<"      SQL" );
44                 CREATE TABLE store_expire (
45                         id              INTEGER PRIMARY KEY,
46                         atime           INTEGER,
47                         expire_interval INTEGER
48                 );
49         SQL
50
51 }
52
53 sub child_init {
54         my $sc = OpenSRF::Utils::SettingsClient->new;
55
56         $default_expire_time = $sc->config_value( apps => 'opensrf.persist' => app_settings => 'default_expire_time' );
57         $default_expire_time ||= 300;
58
59         my $dbfile = $sc->config_value( apps => 'opensrf.persist' => app_settings => 'dbfile');
60         unless ($dbfile) {
61                 throw OpenSRF::EX::PANIC ("Can't find my dbfile for SQLite!");
62         }
63
64         $dbh = DBI->connect("dbi:SQLite:dbname=$dbfile","","");
65         $dbh->{AutoCommit} = 1;
66         $dbh->{RaiseError} = 0;
67
68 }
69
70 sub create_store {
71         my $self = shift;
72         my $client = shift;
73
74         my $name = shift || '';
75
76         try {
77         
78                 my $continue = 0;
79                 try {
80                         _get_name_id($name);
81
82                 } catch Error with { 
83                         $continue++;
84                 };
85
86                 throw OpenSRF::EX::WARN ("Duplicate key:  object name [$name] already exists!  " . $dbh->errstr)
87                         unless ($continue);
88
89                 my $sth = $dbh->prepare("INSERT INTO store_name (name) VALUES (?);");
90                 $sth->execute($name);
91                 $sth->finish;
92
93                 unless ($name) {
94                         my $last_id = $dbh->last_insert_id(undef, undef, 'store_name', 'id');
95                         $name = 'AUTOGENERATED!!'.$last_id;
96                         $dbh->do("UPDATE store_name SET name = '$name' WHERE id = '$last_id';");
97                 }
98
99                 _flush_by_name($name);
100                 return $name;
101         } catch Error with {
102                 return undef;
103         };
104 }
105 __PACKAGE__->register_method(
106         api_name => 'opensrf.persist.slot.create',
107         method => 'create_store',
108         argc => 1,
109 );
110
111
112 sub create_expirable_store {
113         my $self = shift;
114         my $client = shift;
115         my $name = shift || do { throw OpenSRF::EX::InvalidArg ("Expirable slots must be given a name!") };
116         my $time = shift || $default_expire_time;
117
118         try {
119                 ($name) = $self->method_lookup( 'opensrf.persist.slot.create' )->run( $name );
120                 return undef unless $name;
121
122                 $self->method_lookup('opensrf.persist.slot.set_expire')->run($name, $time);
123                 return $name;
124         } catch Error with {
125                 return undef;
126         };
127
128 }
129 __PACKAGE__->register_method(
130         api_name => 'opensrf.persist.slot.create_expirable',
131         method => 'create_expirable_store',
132         argc => 2,
133 );
134
135 sub _update_expire_atime {
136         my $id = shift;
137         $dbh->do('UPDATE store_expire SET atime = ? WHERE id = ?', {}, time(), $id);
138 }
139
140 sub set_expire_interval {
141         my $self = shift;
142         my $client = shift;
143         my $slot = shift;
144         my $new_interval = shift;
145
146         try {
147                 my $etime = interval_to_seconds($new_interval);
148                 my $sid = _get_name_id($slot);
149
150                 $dbh->do('DELETE FROM store_expire where id = ?', {}, $sid);
151                 return 0 if ($etime == 0);
152
153                 $dbh->do('INSERT INTO store_expire (id, atime, expire_interval) VALUES (?,?,?);',{},$sid,time(),$etime);
154                 return $etime;
155         } 
156 }
157 __PACKAGE__->register_method(
158         api_name => 'opensrf.persist.slot.set_expire',
159         method => 'set_expire_interval',
160         argc => 2,
161 );
162
163
164 sub _sweep_expired_slots {
165         return if (shift());
166
167         my $expired_slots = $dbh->selectcol_arrayref(<<"        SQL", {}, time() );
168                 SELECT id FROM store_expire WHERE (atime + expire_interval) <= ?;
169         SQL
170
171         return unless ($expired_slots);
172
173         $dbh->do('DELETE FROM storage WHERE name_id IN ('.join(',', map { '?' } @$expired_slots).');', {}, @$expired_slots);
174         for my $id (@$expired_slots) {
175                 _flush_by_name(_get_id_name($id), 1);
176         }
177 }
178
179 sub add_item {
180         my $self = shift;
181         my $client = shift;
182
183         my $name = shift or do {
184                 throw OpenSRF::EX::WARN ("No name specified!");
185         };
186
187         my $value = shift || '';
188
189         try {
190                 my $name_id = _get_name_id($name);
191         
192                 if ($self->api_name =~ /object/) {
193                         $dbh->do('DELETE FROM storage WHERE name_id = ?;', {}, $name_id);
194                 }
195
196                 $dbh->do('INSERT INTO storage (name_id,value) VALUES (?,?);', {}, $name_id, JSON->perl2JSON($value));
197
198                 _flush_by_name($name);
199
200                 return $name;
201         } catch Error with {
202                 return undef;
203         };
204 }
205 __PACKAGE__->register_method(
206         api_name => 'opensrf.persist.object.set',
207         method => 'add_item',
208         argc => 2,
209 );
210 __PACKAGE__->register_method(
211         api_name => 'opensrf.persist.queue.push',
212         method => 'add_item',
213         argc => 2,
214 );
215 __PACKAGE__->register_method(
216         api_name => 'opensrf.persist.stack.push',
217         method => 'add_item',
218         argc => 2,
219 );
220
221 sub _get_id_name {
222         my $name = shift or do {
223                 throw OpenSRF::EX::WARN ("No slot id specified!");
224         };
225
226
227         my $name_id = $dbh->selectcol_arrayref("SELECT name FROM store_name WHERE id = ?;", {}, $name);
228
229         if (!ref($name_id) || !defined($name_id->[0])) {
230                 throw OpenSRF::EX::WARN ("Slot id [$name] does not exist!");
231         }
232
233         return $name_id->[0];
234 }
235
236 sub _get_name_id {
237         my $name = shift or do {
238                 throw OpenSRF::EX::WARN ("No slot name specified!");
239         };
240
241
242         my $name_id = $dbh->selectrow_arrayref("SELECT id FROM store_name WHERE name = ?;", {}, $name);
243
244         if (!ref($name_id) || !defined($name_id->[0])) {
245                 throw OpenSRF::EX::WARN ("Slot name [$name] does not exist!");
246         }
247
248         return $name_id->[0];
249 }
250
251 sub destroy_store {
252         my $self = shift;
253         my $client = shift;
254
255         my $name = shift;
256
257         my $problem = 0;
258         try {
259                 my $name_id = _get_name_id($name);
260         
261                 $dbh->do("DELETE FROM storage WHERE name_id = ?;", {}, $name_id);
262                 $dbh->do("DELETE FROM store_name WHERE id = ?;", {}, $name_id);
263                 $dbh->do("DELETE FROM store_expire WHERE id = ?;", {}, $name_id);
264
265                 _sweep_expired_slots();
266                 return $name;
267         } catch Error with {
268                 return undef;
269         };
270
271 }
272 __PACKAGE__->register_method(
273         api_name => 'opensrf.persist.slot.destroy',
274         method => 'destroy_store',
275         argc => 1,
276 );
277
278 sub _flush_by_name {
279         my $name = shift;
280         my $no_sweep = shift;
281  
282         my $name_id = _get_name_id($name);
283
284         unless ($no_sweep) {
285                 _update_expire_atime($name);
286                 _sweep_expired_slots();
287         }
288         
289         if ($name =~ /^AUTOGENERATED!!/) {
290                 my $count = $dbh->selectcol_arrayref("SELECT COUNT(*) FROM storage WHERE name_id = ?;", {}, $name_id);
291                 if (!ref($count) || $$count[0] == 0) {
292                         $dbh->do("DELETE FROM store_name WHERE name = ?;", {}, $name);
293                 }
294         }
295 }
296         
297 sub pop_queue {
298         my $self = shift;
299         my $client = shift;
300
301         my $name = shift or do {
302                 throw OpenSRF::EX::WARN ("No queue name specified!");
303         };
304
305         try {
306                 my $name_id = _get_name_id($name);
307
308                 my $value = $dbh->selectrow_arrayref('SELECT id, value FROM storage WHERE name_id = ? ORDER BY id ASC LIMIT 1;', {}, $name_id);
309                 $dbh->do('DELETE FROM storage WHERE id = ?;',{}, $value->[0]) unless ($self->api_name =~ /peek$/);
310
311                 _flush_by_name($name);
312
313                 return JSON->JSON2perl( $value->[1] );
314         } catch Error with {
315                 #my $e = shift;
316                 #return $e;
317                 return undef;
318         };
319 }
320 __PACKAGE__->register_method(
321         api_name => 'opensrf.persist.queue.peek',
322         method => 'pop_queue',
323         argc => 1,
324 );
325 __PACKAGE__->register_method(
326         api_name => 'opensrf.persist.queue.pop',
327         method => 'pop_queue',
328         argc => 1,
329 );
330
331
332 sub peek_slot {
333         my $self = shift;
334         my $client = shift;
335
336         my $name = shift or do {
337                 throw OpenSRF::EX::WARN ("No slot name specified!");
338         };
339         my $name_id = _get_name_id($name);
340
341         my $order = 'ASC';
342         $order = 'DESC' if ($self->api_name =~ /stack/o);
343         
344         my $values = $dbh->selectall_arrayref("SELECT value FROM storage WHERE name_id = ? ORDER BY id $order;", {}, $name_id);
345
346         $client->respond( JSON->JSON2perl( $_->[0] ) ) for (@$values);
347
348         _flush_by_name($name);
349         return undef;
350 }
351 __PACKAGE__->register_method(
352         api_name => 'opensrf.persist.queue.peek.all',
353         method => 'peek_slot',
354         argc => 1,
355         stream => 1,
356 );
357 __PACKAGE__->register_method(
358         api_name => 'opensrf.persist.stack.peek.all',
359         method => 'peek_slot',
360         argc => 1,
361         stream => 1,
362 );
363
364
365 sub store_size {
366         my $self = shift;
367         my $client = shift;
368
369         my $name = shift or do {
370                 throw OpenSRF::EX::WARN ("No queue name specified!");
371         };
372         my $name_id = _get_name_id($name);
373
374         my $value = $dbh->selectcol_arrayref('SELECT SUM(LENGTH(value)) FROM storage WHERE name_id = ?;', {}, $name_id);
375
376         return JSON->JSON2perl( $value->[0] );
377 }
378 __PACKAGE__->register_method(
379         api_name => 'opensrf.persist.queue.size',
380         method => 'shift_stack',
381         argc => 1,
382 );
383 __PACKAGE__->register_method(
384         api_name => 'opensrf.persist.stack.size',
385         method => 'shift_stack',
386         argc => 1,
387 );
388 __PACKAGE__->register_method(
389         api_name => 'opensrf.persist.object.size',
390         method => 'shift_stack',
391         argc => 1,
392 );
393
394 sub store_depth {
395         my $self = shift;
396         my $client = shift;
397
398         my $name = shift or do {
399                 throw OpenSRF::EX::WARN ("No queue name specified!");
400         };
401         my $name_id = _get_name_id($name);
402
403         my $value = $dbh->selectcol_arrayref('SELECT COUNT(*) FROM storage WHERE name_id = ?;', {}, $name_id);
404
405         return JSON->JSON2perl( $value->[0] );
406 }
407 __PACKAGE__->register_method(
408         api_name => 'opensrf.persist.queue.length',
409         method => 'shift_stack',
410         argc => 1,
411 );
412 __PACKAGE__->register_method(
413         api_name => 'opensrf.persist.stack.depth',
414         method => 'shift_stack',
415         argc => 1,
416 );
417
418 sub shift_stack {
419         my $self = shift;
420         my $client = shift;
421
422         my $name = shift or do {
423                 throw OpenSRF::EX::WARN ("No slot name specified!");
424         };
425
426         try {
427                 my $name_id = _get_name_id($name);
428
429                 my $value = $dbh->selectrow_arrayref('SELECT id, value FROM storage WHERE name_id = ? ORDER BY id DESC LIMIT 1;', {}, $name_id);
430                 $dbh->do('DELETE FROM storage WHERE id = ?;',{}, $value->[0]) unless ($self->api_name =~ /peek$/);
431
432                 _flush_by_name($name);
433
434                 return JSON->JSON2perl( $value->[1] );
435         } catch Error with {
436                 my $e = shift;
437                 return undef;
438         };
439 }
440 __PACKAGE__->register_method(
441         api_name => 'opensrf.persist.stack.peek',
442         method => 'shift_stack',
443         argc => 1,
444 );
445 __PACKAGE__->register_method(
446         api_name => 'opensrf.persist.stack.pop',
447         method => 'shift_stack',
448         argc => 1,
449 );
450
451 sub get_object {
452         my $self = shift;
453         my $client = shift;
454
455         my $name = shift or do {
456                 throw OpenSRF::EX::WARN ("No object name specified!");
457         };
458
459         try {
460                 my $name_id = _get_name_id($name);
461
462                 my $value = $dbh->selectrow_arrayref('SELECT name_id, value FROM storage WHERE name_id = ? ORDER BY id DESC LIMIT 1;', {}, $name_id);
463                 $dbh->do('DELETE FROM storage WHERE name_id = ?',{}, $value->[0]) unless ($self->api_name =~ /peek$/);
464
465                 _flush_by_name($name);
466
467                 return JSON->JSON2perl( $value->[1] );
468         } catch Error with {
469                 return undef;
470         };
471 }
472 __PACKAGE__->register_method(
473         api_name => 'opensrf.persist.object.peek',
474         method => 'shift_stack',
475         argc => 1,
476 );
477 __PACKAGE__->register_method(
478         api_name => 'opensrf.persist.object.get',
479         method => 'shift_stack',
480         argc => 1,
481 );
482
483 1;