]> git.evergreen-ils.org Git - working/Evergreen.git/blob - Open-ILS/src/perlmods/lib/OpenILS/Application/Vandelay.pm
LP1209291 vandelay item import defaults
[working/Evergreen.git] / Open-ILS / src / perlmods / lib / OpenILS / Application / Vandelay.pm
1 package OpenILS::Application::Vandelay;
2 use strict; use warnings;
3 use OpenILS::Application;
4 use base qw/OpenILS::Application/;
5 use Unicode::Normalize;
6 use OpenSRF::EX qw/:try/;
7 use OpenSRF::AppSession;
8 use OpenSRF::Utils::SettingsClient;
9 use OpenSRF::Utils::Cache;
10 use OpenILS::Utils::Fieldmapper;
11 use OpenILS::Utils::CStoreEditor qw/:funcs/;
12 use OpenILS::Utils::Normalize qw/clean_marc/;
13 use MARC::Batch;
14 use MARC::Record;
15 use MARC::File::XML ( BinaryEncoding => 'UTF-8' );
16 use Time::HiRes qw(time);
17 use OpenSRF::Utils::Logger qw/$logger/;
18 use MIME::Base64;
19 use XML::LibXML;
20 use OpenILS::Const qw/:const/;
21 use OpenILS::Application::AppUtils;
22 use OpenILS::Application::Cat::BibCommon;
23 use OpenILS::Application::Cat::AuthCommon;
24 use OpenILS::Application::Cat::AssetCommon;
25 my $U = 'OpenILS::Application::AppUtils';
26
27 # A list of LDR/06 values from http://loc.gov/marc
28 my %record_types = (
29         a => 'bib',
30         c => 'bib',
31         d => 'bib',
32         e => 'bib',
33         f => 'bib',
34         g => 'bib',
35         i => 'bib',
36         j => 'bib',
37         k => 'bib',
38         m => 'bib',
39         o => 'bib',
40         p => 'bib',
41         r => 'bib',
42         t => 'bib',
43         u => 'holdings',
44         v => 'holdings',
45         x => 'holdings',
46         y => 'holdings',
47         z => 'auth',
48       ' ' => 'bib',
49 );
50
51 sub initialize {}
52 sub child_init {}
53
54 # --------------------------------------------------------------------------------
55 # Biblio ingest
56
57 sub create_bib_queue {
58     my $self = shift;
59     my $client = shift;
60     my $auth = shift;
61     my $name = shift;
62     my $owner = shift;
63     my $type = shift;
64     my $match_set = shift;
65     my $import_def = shift;
66     my $match_bucket = shift;
67
68     my $e = new_editor(authtoken => $auth, xact => 1);
69
70     return $e->die_event unless $e->checkauth;
71     return $e->die_event unless $e->allowed('CREATE_BIB_IMPORT_QUEUE');
72     $owner ||= $e->requestor->id;
73
74     if ($e->search_vandelay_bib_queue( {name => $name, owner => $owner, queue_type => $type})->[0]) {
75         $e->rollback;
76         return OpenILS::Event->new('BIB_QUEUE_EXISTS') 
77     }
78
79     my $queue = new Fieldmapper::vandelay::bib_queue();
80     $queue->name( $name );
81     $queue->owner( $owner );
82     $queue->queue_type( $type ) if ($type);
83     $queue->item_attr_def( $import_def ) if ($import_def);
84     $queue->match_set($match_set) if $match_set;
85     $queue->match_bucket($match_bucket) if $match_bucket;
86
87     my $new_q = $e->create_vandelay_bib_queue( $queue );
88     return $e->die_event unless ($new_q);
89     $e->commit;
90
91     return $new_q;
92 }
93 __PACKAGE__->register_method(  
94     api_name   => "open-ils.vandelay.bib_queue.create",
95     method     => "create_bib_queue",
96     api_level  => 1,
97     argc       => 4,
98 );                      
99
100
101 sub create_auth_queue {
102     my $self = shift;
103     my $client = shift;
104     my $auth = shift;
105     my $name = shift;
106     my $owner = shift;
107     my $type = shift;
108     my $match_set = shift;
109
110     my $e = new_editor(authtoken => $auth, xact => 1);
111
112     return $e->die_event unless $e->checkauth;
113     return $e->die_event unless $e->allowed('CREATE_AUTHORITY_IMPORT_QUEUE');
114     $owner ||= $e->requestor->id;
115
116     if ($e->search_vandelay_authority_queue({name => $name, owner => $owner, queue_type => $type})->[0]) {
117         $e->rollback;
118         return OpenILS::Event->new('AUTH_QUEUE_EXISTS') 
119     }
120
121     my $queue = new Fieldmapper::vandelay::authority_queue();
122     $queue->name( $name );
123     $queue->owner( $owner );
124     $queue->queue_type( $type ) if ($type);
125
126     my $new_q = $e->create_vandelay_authority_queue( $queue );
127     $e->die_event unless ($new_q);
128     $e->commit;
129
130     return $new_q;
131 }
132 __PACKAGE__->register_method(  
133     api_name   => "open-ils.vandelay.authority_queue.create",
134     method     => "create_auth_queue",
135     api_level  => 1,
136     argc       => 3,
137 );                      
138
139 sub add_record_to_bib_queue {
140     my $self = shift;
141     my $client = shift;
142     my $auth = shift;
143     my $queue = shift;
144     my $marc = shift;
145     my $purpose = shift;
146     my $bib_source = shift;
147
148     my $e = new_editor(authtoken => $auth, xact => 1);
149
150     $queue = $e->retrieve_vandelay_bib_queue($queue);
151
152     return $e->die_event unless $e->checkauth;
153     return $e->die_event unless
154         ($e->allowed('CREATE_BIB_IMPORT_QUEUE', undef, $queue) ||
155          $e->allowed('CREATE_BIB_IMPORT_QUEUE'));
156
157     my $new_rec = _add_bib_rec($e, $marc, $queue->id, $purpose, $bib_source);
158
159     return $e->die_event unless ($new_rec);
160     $e->commit;
161     return $new_rec;
162 }
163 __PACKAGE__->register_method(  
164     api_name   => "open-ils.vandelay.queued_bib_record.create",
165     method     => "add_record_to_bib_queue",
166     api_level  => 1,
167     argc       => 3,
168 );                      
169
170 sub _add_bib_rec {
171     my $e = shift;
172     my $marc = shift;
173     my $queue = shift;
174     my $purpose = shift;
175     my $bib_source = shift;
176
177     my $rec = new Fieldmapper::vandelay::queued_bib_record();
178     $rec->marc( $marc );
179     $rec->queue( $queue );
180     $rec->purpose( $purpose ) if ($purpose);
181     $rec->bib_source($bib_source);
182
183     return $e->create_vandelay_queued_bib_record( $rec );
184 }
185
186 sub add_record_to_authority_queue {
187     my $self = shift;
188     my $client = shift;
189     my $auth = shift;
190     my $queue = shift;
191     my $marc = shift;
192     my $purpose = shift;
193
194     my $e = new_editor(authtoken => $auth, xact => 1);
195
196     $queue = $e->retrieve_vandelay_authority_queue($queue);
197
198     return $e->die_event unless $e->checkauth;
199     return $e->die_event unless
200         ($e->allowed('CREATE_AUTHORITY_IMPORT_QUEUE', undef, $queue) ||
201          $e->allowed('CREATE_AUTHORITY_IMPORT_QUEUE'));
202
203     my $new_rec = _add_auth_rec($e, $marc, $queue->id, $purpose);
204
205     return $e->die_event unless ($new_rec);
206     $e->commit;
207     return $new_rec;
208 }
209 __PACKAGE__->register_method(
210     api_name   => "open-ils.vandelay.queued_authority_record.create",
211     method     => "add_record_to_authority_queue",
212     api_level  => 1,
213     argc       => 3,
214 );
215
216 sub _add_auth_rec {
217     my $e = shift;
218     my $marc = shift;
219     my $queue = shift;
220     my $purpose = shift;
221
222     my $rec = new Fieldmapper::vandelay::queued_authority_record();
223     $rec->marc( $marc );
224     $rec->queue( $queue );
225     $rec->purpose( $purpose ) if ($purpose);
226
227     return $e->create_vandelay_queued_authority_record( $rec );
228 }
229
230 sub process_spool {
231     my $self = shift;
232     my $client = shift;
233     my $auth = shift;
234     my $fingerprint = shift || '';
235     my $queue_id = shift;
236     my $purpose = shift;
237     my $filename = shift;
238     my $bib_source = shift;
239
240     my $e = new_editor(authtoken => $auth, xact => 1);
241     return $e->die_event unless $e->checkauth;
242
243     my $queue;
244     my $type = $self->{record_type};
245
246     if($type eq 'bib') {
247         $queue = $e->retrieve_vandelay_bib_queue($queue_id) or return $e->die_event;
248     } else {
249         $queue = $e->retrieve_vandelay_authority_queue($queue_id) or return $e->die_event;
250     }
251
252     my $evt = check_queue_perms($e, $type, $queue);
253     return $evt if ($evt);
254
255     my $cache = new OpenSRF::Utils::Cache();
256
257     if($fingerprint) {
258         my $data = $cache->get_cache('vandelay_import_spool_' . $fingerprint);
259         $purpose = $data->{purpose};
260         $filename = $data->{path};
261         $bib_source = $data->{bib_source};
262     }
263
264     unless(-r $filename) {
265         $logger->error("unable to read MARC file $filename");
266         return -1; # make this an event XXX
267     }
268
269     $logger->info("vandelay spooling $fingerprint purpose=$purpose file=$filename");
270
271     my $marctype = 'USMARC'; 
272
273     open F, $filename;
274     $marctype = 'XML' if (getc(F) =~ /^\D/o);
275     close F;
276
277     my $batch = new MARC::Batch ($marctype, $filename);
278     $batch->strict_off;
279
280     my $response_scale = 10;
281     my $count = 0;
282     my $r = -1;
283     while (try { $r = $batch->next } otherwise { $r = -1 }) {
284         if ($r == -1) {
285             $logger->warn("Processing of record $count in set $filename failed.  Skipping this record");
286             $count++;
287         }
288
289         $logger->info("processing record $count");
290
291         try {
292             my $xml = clean_marc($r);
293
294             my $qrec;
295             # Check the leader to ensure we've got something resembling the expected
296             # Allow spaces to give records the benefit of the doubt
297             my $ldr_type = substr($r->leader(), 6, 1);
298             if ($type eq 'bib' && ($record_types{$ldr_type}) eq 'bib' || $ldr_type eq ' ') {
299                 $qrec = _add_bib_rec( $e, $xml, $queue_id, $purpose, $bib_source ) or return $e->die_event;
300             } elsif ($type eq 'auth' && ($record_types{$ldr_type}) eq 'auth' || $ldr_type eq ' ') {
301                 $qrec = _add_auth_rec( $e, $xml, $queue_id, $purpose ) or return $e->die_event;
302             } else {
303                 # I don't know how to handle this type; rock on
304                 $logger->error("In process_spool(), type was $type and leader type was $ldr_type ; not currently supported");
305                 next;
306             }
307
308             if($self->api_name =~ /stream_results/ and $qrec) {
309                 $client->respond($qrec->id)
310             } else {
311                 $client->respond($count) if (++$count % $response_scale) == 0;
312                 $response_scale *= 10 if ($count == ($response_scale * 10));
313             }
314         } catch Error with {
315             my $error = shift;
316             $logger->warn("Encountered a bad record at Vandelay ingest: ".$error);
317         }
318     }
319
320     $e->commit;
321     unlink($filename);
322     $cache->delete_cache('vandelay_import_spool_' . $fingerprint) if $fingerprint;
323     return $count;
324 }
325
326 __PACKAGE__->register_method(  
327     api_name    => "open-ils.vandelay.bib.process_spool",
328     method      => "process_spool",
329     api_level   => 1,
330     argc        => 3,
331     max_chunk_size => 0,
332     record_type => 'bib'
333 );                      
334 __PACKAGE__->register_method(  
335     api_name    => "open-ils.vandelay.auth.process_spool",
336     method      => "process_spool",
337     api_level   => 1,
338     argc        => 3,
339     max_chunk_size => 0,
340     record_type => 'auth'
341 );                      
342
343 __PACKAGE__->register_method(  
344     api_name    => "open-ils.vandelay.bib.process_spool.stream_results",
345     method      => "process_spool",
346     api_level   => 1,
347     argc        => 3,
348     stream      => 1,
349     max_chunk_size => 0,
350     record_type => 'bib'
351 );                      
352 __PACKAGE__->register_method(  
353     api_name    => "open-ils.vandelay.auth.process_spool.stream_results",
354     method      => "process_spool",
355     api_level   => 1,
356     argc        => 3,
357     stream      => 1,
358     max_chunk_size => 0,
359     record_type => 'auth'
360 );
361
362 __PACKAGE__->register_method(  
363     api_name    => "open-ils.vandelay.bib_queue.records.retrieve",
364     method      => 'retrieve_queued_records',
365     api_level   => 1,
366     argc        => 2,
367     stream      => 1,
368     record_type => 'bib'
369 );
370 __PACKAGE__->register_method(
371     api_name    => "open-ils.vandelay.bib_queue.records.retrieve.export.print",
372     method      => 'retrieve_queued_records',
373     api_level   => 1,
374     argc        => 2,
375     stream      => 1,
376     record_type => 'bib'
377 );
378 __PACKAGE__->register_method(
379     api_name    => "open-ils.vandelay.bib_queue.records.retrieve.export.csv",
380     method      => 'retrieve_queued_records',
381     api_level   => 1,
382     argc        => 2,
383     stream      => 1,
384     record_type => 'bib'
385 );
386 __PACKAGE__->register_method(
387     api_name    => "open-ils.vandelay.bib_queue.records.retrieve.export.email",
388     method      => 'retrieve_queued_records',
389     api_level   => 1,
390     argc        => 2,
391     stream      => 1,
392     record_type => 'bib'
393 );
394
395 __PACKAGE__->register_method(  
396     api_name    => "open-ils.vandelay.auth_queue.records.retrieve",
397     method      => 'retrieve_queued_records',
398     api_level   => 1,
399     argc        => 2,
400     stream      => 1,
401     record_type => 'auth'
402 );
403 __PACKAGE__->register_method(
404     api_name    => "open-ils.vandelay.auth_queue.records.retrieve.export.print",
405     method      => 'retrieve_queued_records',
406     api_level   => 1,
407     argc        => 2,
408     stream      => 1,
409     record_type => 'auth'
410 );
411 __PACKAGE__->register_method(
412     api_name    => "open-ils.vandelay.auth_queue.records.retrieve.export.csv",
413     method      => 'retrieve_queued_records',
414     api_level   => 1,
415     argc        => 2,
416     stream      => 1,
417     record_type => 'auth'
418 );
419 __PACKAGE__->register_method(
420     api_name    => "open-ils.vandelay.auth_queue.records.retrieve.export.email",
421     method      => 'retrieve_queued_records',
422     api_level   => 1,
423     argc        => 2,
424     stream      => 1,
425     record_type => 'auth'
426 );
427
428 __PACKAGE__->register_method(  
429     api_name    => "open-ils.vandelay.bib_queue.records.matches.retrieve",
430     method      => 'retrieve_queued_records',
431     api_level   => 1,
432     argc        => 2,
433     stream      => 1,
434     record_type => 'bib',
435     signature   => {
436         desc => q/Only retrieve queued bib records that have matches against existing records/
437     }
438 );
439 __PACKAGE__->register_method(  
440     api_name    => "open-ils.vandelay.auth_queue.records.matches.retrieve",
441     method      => 'retrieve_queued_records',
442     api_level   => 1,
443     argc        => 2,
444     stream      => 1,
445     record_type => 'auth',
446     signature   => {
447         desc => q/Only retrieve queued authority records that have matches against existing records/
448     }
449 );
450
451 sub retrieve_queued_records {
452     my($self, $conn, $auth, $queue_id, $options) = @_;
453
454     $options ||= {};
455     my $limit = $$options{limit} || 20;
456     my $offset = $$options{offset} || 0;
457     my $type = $self->{record_type};
458
459     my $e = new_editor(authtoken => $auth, xact => 1);
460     return $e->die_event unless $e->checkauth;
461
462     my $queue;
463     if($type eq 'bib') {
464         $queue = $e->retrieve_vandelay_bib_queue($queue_id) or return $e->die_event;
465     } else {
466         $queue = $e->retrieve_vandelay_authority_queue($queue_id) or return $e->die_event;
467     }
468     my $evt = check_queue_perms($e, $type, $queue);
469     return $evt if ($evt);
470
471     my $class = ($type eq 'bib') ? 'vqbr' : 'vqar';
472     my $mclass = $type eq 'bib' ? 'vbm' : 'vam';
473     my $query = {
474         select => {
475             $class => ['id'],
476             $mclass => [{
477                 column => 'eg_record', 
478                 transform => 'min',
479                 aggregate => 1
480             }]
481         },
482         from => $class,
483         where => {queue => $queue_id},
484         distinct => 1,
485         limit => $limit,
486         offset => $offset,
487     };
488     if($self->api_name =~ /export/) {
489         delete $query->{limit};
490         delete $query->{offset};
491     }
492
493     $query->{where}->{import_time} = undef if $$options{non_imported};
494
495     if($$options{with_import_error}) {
496
497         $query->{from} = {$class => {vii => {type => 'left'}}};
498         $query->{where}->{'-or'} = [
499             {'+vqbr' => {import_error => {'!=' => undef}}},
500             {'+vii' => {import_error => {'!=' => undef}}}
501         ];
502
503     } else {
504         
505         if($$options{with_rec_import_error}) {
506             $query->{where}->{import_error} = {'!=' => undef};
507
508         } elsif( $$options{with_item_import_error} and $type eq 'bib') {
509
510             $query->{from} = {$class => 'vii'};
511             $query->{where}->{'+vii'} = {import_error => {'!=' => undef}};
512         }
513     }
514
515     if($self->api_name =~ /matches/) {
516         # find only records that have matches
517         $query->{from} = {$class => {$mclass => {type => 'right'}}};
518     } else {
519         # join to mclass for sorting (see below)
520         $query->{from} = {$class => {$mclass => {type => 'left'}}};
521     }
522
523     # order by the matched bib records to group like queued records
524     $query->{order_by} = [
525         {class => $mclass, field => 'eg_record', transform => 'min'},
526         {class => $class, field => 'id'} 
527     ];
528
529     my $record_ids = $e->json_query($query);
530
531     my $retrieve = ($type eq 'bib') ? 
532         'retrieve_vandelay_queued_bib_record' : 'retrieve_vandelay_queued_authority_record';
533     my $search = ($type eq 'bib') ? 
534         'search_vandelay_queued_bib_record' : 'search_vandelay_queued_authority_record';
535
536     if ($self->api_name =~ /export/) {
537         my $rec_list = $e->$search({id => [map { $_->{id} } @$record_ids]}, {substream => 1});
538         if ($self->api_name =~ /print/) {
539
540             $e->rollback;
541             return $U->fire_object_event(
542                 undef,
543                 'vandelay.queued_'.$type.'_record.print',
544                 $rec_list,
545                 $e->requestor->ws_ou
546             );
547
548         } elsif ($self->api_name =~ /csv/) {
549
550             $e->rollback;
551             return $U->fire_object_event(
552                 undef,
553                 'vandelay.queued_'.$type.'_record.csv',
554                 $rec_list,
555                 $e->requestor->ws_ou
556             );
557
558         } elsif ($self->api_name =~ /email/) {
559
560             $conn->respond_complete(1);
561
562             for my $rec (@$rec_list) {
563                 $U->create_events_for_hook(
564                     'vandelay.queued_'.$type.'_record.email',
565                     $rec,
566                     $e->requestor->home_ou,
567                     undef,
568                     undef,
569                     1
570                 );
571             }
572
573         }
574     } else {
575         for my $rec_id (@$record_ids) {
576             my $flesh = ['attributes', 'matches'];
577             push(@$flesh, 'import_items') if $$options{flesh_import_items};
578             my $params = {flesh => 1, flesh_fields => {$class => $flesh}};
579             my $rec = $e->$retrieve([$rec_id->{id}, $params]);
580             $rec->clear_marc if $$options{clear_marc};
581             $conn->respond($rec);
582         }
583     }
584
585     $e->rollback;
586     return undef;
587 }
588
589 __PACKAGE__->register_method(  
590     api_name    => 'open-ils.vandelay.import_item.queue.retrieve',
591     method      => 'retrieve_queue_import_items',
592     api_level   => 1,
593     argc        => 2,
594     stream      => 1,
595     authoritative => 1,
596     signature => q/
597         Returns Import Item (vii) objects for the selected queue.
598         Filter options:
599             with_import_error : only return items that failed to import
600     /
601 );
602 __PACKAGE__->register_method(
603     api_name    => 'open-ils.vandelay.import_item.queue.export.print',
604     method      => 'retrieve_queue_import_items',
605     api_level   => 1,
606     argc        => 2,
607     stream      => 1,
608     authoritative => 1,
609     signature => q/
610         Returns template-generated printable output of Import Item (vii) objects for the selected queue.
611         Filter options:
612             with_import_error : only return items that failed to import
613     /
614 );
615 __PACKAGE__->register_method(
616     api_name    => 'open-ils.vandelay.import_item.queue.export.csv',
617     method      => 'retrieve_queue_import_items',
618     api_level   => 1,
619     argc        => 2,
620     stream      => 1,
621     authoritative => 1,
622     signature => q/
623         Returns template-generated CSV output of Import Item (vii) objects for the selected queue.
624         Filter options:
625             with_import_error : only return items that failed to import
626     /
627 );
628 __PACKAGE__->register_method(
629     api_name    => 'open-ils.vandelay.import_item.queue.export.email',
630     method      => 'retrieve_queue_import_items',
631     api_level   => 1,
632     argc        => 2,
633     stream      => 1,
634     authoritative => 1,
635     signature => q/
636         Emails template-generated output of Import Item (vii) objects for the selected queue.
637         Filter options:
638             with_import_error : only return items that failed to import
639     /
640 );
641
642 sub retrieve_queue_import_items {
643     my($self, $conn, $auth, $q_id, $options) = @_;
644
645     $options ||= {};
646     my $limit = $$options{limit} || 20;
647     my $offset = $$options{offset} || 0;
648
649     my $e = new_editor(authtoken => $auth);
650     return $e->event unless $e->checkauth;
651
652     my $queue = $e->retrieve_vandelay_bib_queue($q_id) or return $e->event;
653     my $evt = check_queue_perms($e, 'bib', $queue);
654     return $evt if $evt;
655
656     my $query = {
657         select => {vii => ['id']},
658         from => {
659             vii => {
660                 vqbr => {
661                     join => {
662                         'vbq' => {
663                             field => 'id',
664                             fkey => 'queue',
665                             filter => {id => $q_id}
666                         }
667                     }
668                 }
669             }
670         },
671         order_by => {'vii' => ['record','id']},
672         limit => $limit,
673         offset => $offset
674     };
675     if($self->api_name =~ /export/) {
676         delete $query->{limit};
677         delete $query->{offset};
678     }
679
680     $query->{where} = {'+vii' => {import_error => {'!=' => undef}}}
681         if $$options{with_import_error};
682
683     my $items = $e->json_query($query);
684     my $item_list = $e->search_vandelay_import_item({id => [map { $_->{id} } @$items]});
685     if ($self->api_name =~ /export/) {
686         if ($self->api_name =~ /print/) {
687
688             return $U->fire_object_event(
689                 undef,
690                 'vandelay.import_items.print',
691                 $item_list,
692                 $e->requestor->ws_ou
693             );
694
695         } elsif ($self->api_name =~ /csv/) {
696
697             return $U->fire_object_event(
698                 undef,
699                 'vandelay.import_items.csv',
700                 $item_list,
701                 $e->requestor->ws_ou
702             );
703
704         } elsif ($self->api_name =~ /email/) {
705
706             $conn->respond_complete(1);
707
708             for my $item (@$item_list) {
709                 $U->create_events_for_hook(
710                     'vandelay.import_items.email',
711                     $item,
712                     $e->requestor->home_ou,
713                     undef,
714                     undef,
715                     1
716                 );
717             }
718
719         }
720     } else {
721         for my $item (@$item_list) {
722             $conn->respond($item);
723         }
724     }
725
726     return undef;
727 }
728
729 sub check_queue_perms {
730     my($e, $type, $queue) = @_;
731     if ($type eq 'bib') {
732         return $e->die_event unless
733             ($e->allowed('CREATE_BIB_IMPORT_QUEUE', undef, $queue) ||
734              $e->allowed('CREATE_BIB_IMPORT_QUEUE'));
735     } else {
736         return $e->die_event unless
737             ($e->allowed('CREATE_AUTHORITY_IMPORT_QUEUE', undef, $queue) ||
738              $e->allowed('CREATE_AUTHORITY_IMPORT_QUEUE'));
739     }
740
741     return undef;
742 }
743
744 __PACKAGE__->register_method(  
745     api_name    => "open-ils.vandelay.bib_record.list.import",
746     method      => 'import_record_list',
747     api_level   => 1,
748     argc        => 2,
749     stream      => 1,
750     record_type => 'bib'
751 );
752
753 __PACKAGE__->register_method(  
754     api_name    => "open-ils.vandelay.auth_record.list.import",
755     method      => 'import_record_list',
756     api_level   => 1,
757     argc        => 2,
758     stream      => 1,
759     record_type => 'auth'
760 );
761
762 sub import_record_list {
763     my($self, $conn, $auth, $rec_ids, $args) = @_;
764     my $e = new_editor(authtoken => $auth, xact => 1);
765     return $e->die_event unless $e->checkauth;
766     $args ||= {};
767     my $err = import_record_list_impl($self, $conn, $rec_ids, $e->requestor, $args);
768     try {$e->rollback} otherwise {}; 
769     return $err if $err;
770     return {complete => 1};
771 }
772
773
774 __PACKAGE__->register_method(  
775     api_name    => "open-ils.vandelay.bib_queue.import",
776     method      => 'import_queue',
777     api_level   => 1,
778     argc        => 2,
779     stream      => 1,
780     max_chunk_size => 0,
781     record_type => 'bib',
782     signature => {
783         desc => q/
784             Attempts to import all non-imported records for the selected queue.
785             Will also attempt import of all non-imported items.
786         /
787     }
788 );
789
790 __PACKAGE__->register_method(  
791     api_name    => "open-ils.vandelay.auth_queue.import",
792     method      => 'import_queue',
793     api_level   => 1,
794     argc        => 2,
795     stream      => 1,
796     max_chunk_size => 0,
797     record_type => 'auth'
798 );
799
800 sub import_queue {
801     my($self, $conn, $auth, $q_id, $options) = @_;
802     my $e = new_editor(authtoken => $auth, xact => 1);
803     return $e->die_event unless $e->checkauth;
804     $options ||= {};
805     my $type = $self->{record_type};
806     my $class = ($type eq 'bib') ? 'vqbr' : 'vqar';
807
808     # First, collect the not-yet-imported records
809     my $query = {queue => $q_id, import_time => undef};
810     my $search = ($type eq 'bib') ? 
811         'search_vandelay_queued_bib_record' : 
812         'search_vandelay_queued_authority_record';
813     my $rec_ids = $e->$search($query, {idlist => 1});
814
815     # Now add any imported records that have un-imported items
816
817     if($type eq 'bib') {
818         my $item_recs = $e->json_query({
819             select => {vqbr => ['id']},
820             from => {vqbr => 'vii'},
821             where => {
822                 '+vqbr' => {
823                     queue => $q_id,
824                     import_time => {'!=' => undef}
825                 },
826                 '+vii' => {import_time => undef}
827             },
828             distinct => 1
829         });
830         push(@$rec_ids, map {$_->{id}} @$item_recs);
831     }
832
833     my $err = import_record_list_impl($self, $conn, $rec_ids, $e->requestor, $options);
834     try {$e->rollback} otherwise {}; # only using this to make the read authoritative -- don't die from it
835     return $err if $err;
836     return {complete => 1};
837 }
838
839 # returns a list of queued record IDs for a given queue that 
840 # have at least one entry in the match table
841 # XXX DEPRECATED?
842 sub queued_records_with_matches {
843     my($e, $type, $q_id, $limit, $offset, $filter) = @_;
844
845     my $match_class = 'vbm';
846     my $rec_class = 'vqbr';
847     if($type eq 'auth') {
848         $match_class = 'vam';
849          $rec_class = 'vqar';
850     }
851
852     $filter ||= {};
853     $filter->{queue} = $q_id;
854
855     my $query = {
856         distinct => 1, 
857         select => {$match_class => ['queued_record']}, 
858         from => {
859             $match_class => {
860                 $rec_class => {
861                     field => 'id',
862                     fkey => 'queued_record',
863                     filter => $filter,
864                 }
865             }
866         }
867     };        
868
869     if($limit or defined $offset) {
870         $limit ||= 20;
871         $offset ||= 0;
872         $query->{limit} = $limit;
873         $query->{offset} = $offset;
874     }
875
876     my $data = $e->json_query($query);
877     return [ map {$_->{queued_record}} @$data ];
878 }
879
880
881 # cache of import item org unit settings.  
882 # used in apply_import_item_defaults() below, 
883 # but reset on each call to import_record_list_impl()
884 my %item_defaults_cache;
885
886 sub import_record_list_impl {
887     my($self, $conn, $rec_ids, $requestor, $args) = @_;
888
889     my $overlay_map = $args->{overlay_map} || {};
890     my $type = $self->{record_type};
891     my %queues;
892     %item_defaults_cache = ();
893
894     my $report_args = {
895         progress => 1,
896         step => 1,
897         conn => $conn,
898         total => scalar(@$rec_ids),
899         report_all => $$args{report_all}
900     };
901
902     $conn->max_chunk_count(1) if $$args{report_all};
903
904     my $auto_overlay_exact = $$args{auto_overlay_exact};
905     my $auto_overlay_1match = $$args{auto_overlay_1match};
906     my $auto_overlay_best = $$args{auto_overlay_best_match};
907     my $match_quality_ratio = $$args{match_quality_ratio};
908     my $merge_profile = $$args{merge_profile};
909     my $ft_merge_profile = $$args{fall_through_merge_profile};
910     my $bib_source = $$args{bib_source};
911     my $import_no_match = $$args{import_no_match};
912     my $strip_grps = $$args{strip_field_groups}; # bib-only
913
914     my $overlay_func = 'vandelay.overlay_bib_record';
915     my $auto_overlay_func = 'vandelay.auto_overlay_bib_record';
916     my $auto_overlay_best_func = 'vandelay.auto_overlay_bib_record_with_best'; # XXX bib-only
917     my $retrieve_func = 'retrieve_vandelay_queued_bib_record';
918     my $update_func = 'update_vandelay_queued_bib_record';
919     my $search_func = 'search_vandelay_queued_bib_record';
920     my $retrieve_queue_func = 'retrieve_vandelay_bib_queue';
921     my $update_queue_func = 'update_vandelay_bib_queue';
922     my $delete_queue_func = 'delete_vandelay_bib_queue';
923     my $rec_class = 'vqbr';
924
925     my $editor = new_editor();
926
927     my %bib_sources;
928     my $sources = $editor->search_config_bib_source({id => {'!=' => undef}});
929     $bib_sources{$_->id} = $_->source for @$sources;
930
931     if($type eq 'auth') {
932         $overlay_func =~ s/bib/auth/o;
933         $auto_overlay_func = s/bib/auth/o;
934         $retrieve_func =~ s/bib/authority/o;
935         $retrieve_queue_func =~ s/bib/authority/o;
936         $update_queue_func =~ s/bib/authority/o;
937         $update_func =~ s/bib/authority/o;
938         $search_func =~ s/bib/authority/o;
939         $delete_queue_func =~ s/bib/authority/o;
940         $rec_class = 'vqar';
941     }
942
943     my $new_rec_perm_cache;
944     my @success_rec_ids;
945     for my $rec_id (@$rec_ids) {
946
947         my $error = 0;
948         my $overlay_target = $overlay_map->{$rec_id};
949
950         my $e = new_editor(xact => 1);
951         $e->requestor($requestor);
952
953         $$report_args{e} = $e;
954         $$report_args{evt} = undef;
955         $$report_args{import_error} = undef;
956         $$report_args{no_import} = 0;
957
958         my $rec = $e->$retrieve_func([
959             $rec_id,
960             {   flesh => 1,
961                 flesh_fields => { $rec_class => ['matches']},
962             }
963         ]);
964
965         unless($rec) {
966             $$report_args{evt} = $e->event;
967             finish_rec_import_attempt($report_args);
968             next;
969         }
970
971         if($rec->import_time) {
972             # if the record is already imported, that means it may have 
973             # un-imported copies.  Add to success list for later processing.
974             push(@success_rec_ids, $rec_id);
975             $e->rollback;
976             next;
977         }
978
979         $$report_args{rec} = $rec;
980         $queues{$rec->queue} = 1;
981
982         my $record;
983         my $imported = 0;
984
985         if ($type eq 'bib') {
986             # strip configured / selected MARC tags from inbound records
987
988             my $marcdoc = XML::LibXML->new->parse_string($rec->marc);
989             $rec->marc($U->strip_marc_fields($e, $marcdoc, $strip_grps));
990
991             unless ($e->$update_func($rec)) {
992                 $$report_args{evt} = $e->die_event;
993                 finish_rec_import_attempt($report_args);
994                 next;
995             }
996         }
997
998         if(defined $overlay_target) {
999             # Caller chose an explicit overlay target
1000
1001             my $res = $e->json_query(
1002                 {
1003                     from => [
1004                         $overlay_func,
1005                         $rec_id,
1006                         $overlay_target, 
1007                         $merge_profile
1008                     ]
1009                 }
1010             );
1011
1012             if($res and ($res = $res->[0])) {
1013
1014                 if($res->{$overlay_func} eq 't') {
1015                     $logger->info("vl: $type direct overlay succeeded for queued rec ".
1016                         "$rec_id and overlay target $overlay_target");
1017                     $imported = 1;
1018                 }
1019
1020             } else {
1021                 $error = 1;
1022                 $logger->error("vl: Error attempting overlay with func=$overlay_func, profile=$merge_profile, record=$rec_id");
1023             }
1024
1025         } else {
1026
1027             if($auto_overlay_1match) { # overlay if there is exactly 1 match
1028
1029                 my %match_recs = map { $_->eg_record => 1 } @{$rec->matches};
1030
1031                 if( scalar(keys %match_recs) == 1) { # all matches point to the same record
1032
1033                     ($imported, $error, $rec) = try_auto_overlay(
1034                         $e, $type,
1035                         $report_args, 
1036                         $auto_overlay_best_func,
1037                         $retrieve_func,
1038                         $rec_class,
1039                         $rec_id, 
1040                         $match_quality_ratio, 
1041                         $merge_profile, 
1042                         $ft_merge_profile
1043                     );
1044                 }
1045             }
1046
1047             if(!$imported and !$error and $auto_overlay_exact and scalar(@{$rec->matches}) == 1 ) {
1048                 
1049                 # caller says to overlay if there is an /exact/ match
1050                 # $auto_overlay_func only proceeds and returns true on exact matches
1051
1052                 my $res = $e->json_query(
1053                     {
1054                         from => [
1055                             $auto_overlay_func,
1056                             $rec_id,
1057                             $merge_profile
1058                         ]
1059                     }
1060                 );
1061
1062                 if($res and ($res = $res->[0])) {
1063
1064                     if($res->{$auto_overlay_func} eq 't') {
1065                         $logger->info("vl: $type auto-overlay succeeded for queued rec $rec_id");
1066                         $imported = 1;
1067
1068                         # re-fetch the record to pick up the imported_as value from the DB
1069                         $$report_args{rec} = $rec = $e->$retrieve_func([
1070                             $rec_id, {flesh => 1, flesh_fields => {$rec_class => ['matches']}}]);
1071
1072                     } else {
1073                         $logger->info("vl: $type auto-overlay failed for queued rec $rec_id");
1074                     }
1075
1076                 } else {
1077                     $error = 1;
1078                     $logger->error("vl: Error attempting overlay with func=$auto_overlay_func, profile=$merge_profile, record=$rec_id");
1079                 }
1080             }
1081
1082             if(!$imported and !$error and $auto_overlay_best and scalar(@{$rec->matches}) > 0 ) {
1083                 # caller says to overlay the best match
1084
1085                 ($imported, $error, $rec) = try_auto_overlay(
1086                     $e, $type,
1087                     $report_args, 
1088                     $auto_overlay_best_func,
1089                     $retrieve_func,
1090                     $rec_class,
1091                     $rec_id, 
1092                     $match_quality_ratio, 
1093                     $merge_profile, 
1094                     $ft_merge_profile
1095                 );
1096             }
1097
1098             if(!$imported and !$error and $import_no_match and scalar(@{$rec->matches}) == 0) {
1099             
1100                 # No overlay / merge occurred.  Do a traditional record import by creating a new record
1101
1102                 if (!$new_rec_perm_cache) {
1103                     $new_rec_perm_cache = {};
1104
1105                     # all users creating new records are required to have the basic permission.
1106                     # if the client requests, we can enforce extra permissions for creating new records.
1107                     # for speed, check the permissions the first time then cache the result.
1108
1109                     my $perm = ($type eq 'bib') ? 'IMPORT_MARC' : 'IMPORT_AUTHORITY_MARC';
1110                     my $xperm = $$args{new_rec_perm};
1111                     my $rec_ou = $e->requestor->ws_ou;
1112
1113                     $new_rec_perm_cache->{evt} = $e->die_event
1114                         if !$e->allowed($perm, $rec_ou) || ($xperm and !$e->allowed($xperm, $rec_ou));
1115                 }
1116
1117                 if ($new_rec_perm_cache->{evt}) {
1118
1119                     # a cached event won't roll back the transaction (a la die_event), but
1120                     # the transaction will get rolled back in finish_rec_import_attempt() below
1121                     $$report_args{evt} = $new_rec_perm_cache->{evt};
1122                     $$report_args{import_error} = 'import.record.perm_failure';
1123
1124                 } else { # perm checks succeeded
1125
1126                     $logger->info("vl: creating new $type record for queued record $rec_id");
1127
1128                     if ($type eq 'bib') {
1129
1130                         $record = OpenILS::Application::Cat::BibCommon->biblio_record_xml_import(
1131                             $e, $rec->marc, $bib_sources{$rec->bib_source}, undef, 1);
1132
1133                     } else { # authority record
1134
1135                         $record = OpenILS::Application::Cat::AuthCommon->import_authority_record($e, $rec->marc); #$source);
1136                     }
1137
1138                     if($U->event_code($record)) {
1139                         $$report_args{import_error} = 'import.duplicate.tcn' 
1140                             if $record->{textcode} eq 'OPEN_TCN_NOT_FOUND';
1141                         $$report_args{evt} = $record;
1142
1143                     } else {
1144
1145                         $logger->info("vl: successfully imported new $type record");
1146                         $rec->imported_as($record->id);
1147                         $imported = 1;
1148                     }
1149                 }
1150             }
1151         }
1152
1153         if($imported) {
1154
1155             $rec->import_time('now');
1156             $rec->clear_import_error;
1157             $rec->clear_error_detail;
1158
1159             if($e->$update_func($rec)) {
1160
1161                 if($type eq 'bib') {
1162
1163                     # see if this record is linked from an acq record.
1164                     my $li = $e->search_acq_lineitem(
1165                         {queued_record => $rec->id, state => {'!=' => 'canceled'}})->[0];
1166
1167                     if ($li) { 
1168                         # if so, update the acq lineitem to point to the imported record
1169                         $li->eg_bib_id($rec->imported_as);
1170                         $$report_args{evt} = $e->die_event unless $e->update_acq_lineitem($li);
1171                     }
1172                 }
1173
1174                 push @success_rec_ids, $rec_id;
1175                 finish_rec_import_attempt($report_args);
1176
1177             } else {
1178                 $imported = 0;
1179             }
1180         }
1181
1182         if(!$imported) {
1183             $logger->info("vl: record $rec_id was not imported");
1184             $$report_args{evt} = $e->event unless $$report_args{evt};
1185             $$report_args{no_import} = 1;
1186             finish_rec_import_attempt($report_args);
1187         }
1188     }
1189
1190     # see if we need to mark any queues as complete
1191     for my $q_id (keys %queues) {
1192
1193         my $e = new_editor(xact => 1);
1194         my $remaining = $e->$search_func(
1195             [{queue => $q_id, import_time => undef}, {limit =>1}], {idlist => 1});
1196
1197         unless(@$remaining) {
1198             my $queue = $e->$retrieve_queue_func($q_id);
1199             unless($U->is_true($queue->complete)) {
1200                 $queue->complete('t');
1201                 $e->$update_queue_func($queue) or return $e->die_event;
1202                 $e->commit;
1203                 next;
1204             }
1205         } 
1206         $e->rollback;
1207     }
1208
1209     # import the copies
1210     import_record_asset_list_impl($conn, \@success_rec_ids, $requestor) if @success_rec_ids;
1211
1212     $conn->respond({total => $$report_args{total}, progress => $$report_args{progress}});
1213     return undef;
1214 }
1215
1216
1217 sub try_auto_overlay {
1218     my $e = shift;
1219     my $type = shift;
1220     my $report_args = shift;
1221     my $overlay_func  = shift;
1222     my $retrieve_func = shift; 
1223     my $rec_class = shift;
1224     my $rec_id  = shift;
1225     my $match_quality_ratio = shift;
1226     my $merge_profile  = shift;
1227     my $ft_merge_profile = shift;
1228
1229     my $imported = 0;
1230     my $error = 0;
1231
1232     # Find the best match and overlay if the quality ratio allows it.
1233     my $res = $e->json_query(
1234         {
1235             from => [
1236                 $overlay_func,
1237                 $rec_id, 
1238                 $merge_profile,
1239                 $match_quality_ratio
1240             ]
1241         }
1242     );
1243
1244     if($res and ($res = $res->[0])) {
1245
1246         if($res->{$overlay_func} eq 't') {
1247
1248             # first attempt succeeded
1249             $imported = 1;
1250
1251         } else {
1252
1253             # quality-limited merge failed with insufficient quality.  If there is a 
1254             # fall-through merge profile, re-do the merge with the alternate profile
1255             # and no quality restriction.
1256
1257             if($ft_merge_profile and $match_quality_ratio > 0) {
1258
1259                 $logger->info("vl: $type auto-merge failed with profile $merge_profile; ".
1260                     "re-merging with fall-through profile $ft_merge_profile");
1261
1262                 my $res = $e->json_query(
1263                     {
1264                         from => [
1265                             $overlay_func,
1266                             $rec_id, 
1267                             $ft_merge_profile,
1268                             0 # minimum quality not required
1269                         ]
1270                     }
1271                 );
1272
1273                 if($res and ($res = $res->[0])) {
1274
1275                     if($res->{$overlay_func} eq 't') {
1276
1277                         # second attempt succeeded
1278                         $imported = 1;
1279
1280                     } else {
1281
1282                         # failed to merge on second attempt
1283                         $logger->info("vl: $type auto-merge with fall-through failed for queued rec $rec_id");
1284                     }
1285                 } else {
1286                     
1287                     # second attempt died 
1288                     $error = 1;
1289                     $logger->error("vl: Error attempting overlay with func=$overlay_func, profile=$merge_profile, record=$rec_id");
1290                 }
1291
1292             } else { 
1293
1294                 # failed to merge on first attempt, no fall-through was provided
1295                 $$report_args{import_error} = 'overlay.record.quality' if $match_quality_ratio > 0;
1296                 $logger->info("vl: $type auto-merge failed for queued rec $rec_id");
1297             }
1298         }
1299
1300     } else {
1301
1302         # first attempt died 
1303         $error = 1;
1304         $logger->error("vl: Error attempting overlay with func=$overlay_func, profile=$merge_profile, record=$rec_id");
1305     }
1306
1307     if($imported) {
1308
1309         # at least 1 of the attempts succeeded
1310         $logger->info("vl: $type auto-merge succeeded for queued rec $rec_id");
1311
1312         # re-fetch the record to pick up the imported_as value from the DB
1313         $$report_args{rec} = $e->$retrieve_func([
1314             $rec_id, {flesh => 1, flesh_fields => {$rec_class => ['matches']}}]);
1315     }
1316
1317     return ($imported, $error, $$report_args{rec});
1318 }
1319
1320
1321 # tracks any import errors, commits the current xact, responds to the client
1322 sub finish_rec_import_attempt {
1323     my $args = shift;
1324     my $evt = $$args{evt};
1325     my $rec = $$args{rec};
1326     my $e = $$args{e};
1327
1328     my $error = $$args{import_error};
1329     $error = 'general.unknown' if $evt and not $error;
1330
1331     # error tracking
1332     if($rec) {
1333
1334         if($error or $evt) {
1335             # failed import
1336             # since an error occurred, there's no guarantee the transaction wasn't 
1337             # rolled back.  force a rollback and create a new editor.
1338             $e->rollback;
1339             $e = new_editor(xact => 1);
1340             $rec->import_error($error);
1341
1342             if($evt) {
1343                 my $detail = sprintf("%s : %s", $evt->{textcode}, substr($evt->{desc}, 0, 140));
1344                 $rec->error_detail($detail);
1345             }
1346
1347             my $method = 'update_vandelay_queued_bib_record';
1348             $method =~ s/bib/authority/ if $$args{type} eq 'auth';
1349             $e->$method($rec) and $e->commit or $e->rollback;
1350
1351         } else {
1352             # commit the successful import
1353             $e->commit;
1354         }
1355
1356     } else {
1357         # requested queued record was not found
1358         $e->rollback;
1359     }
1360         
1361     # respond to client
1362     if($$args{report_all} or ($$args{progress} % $$args{step}) == 0) {
1363         $$args{conn}->respond({
1364             total => $$args{total}, 
1365             progress => $$args{progress}, 
1366             imported => ($rec) ? $rec->id : undef,
1367             import_error => $error,
1368             no_import => $$args{no_import},
1369             err_event => $evt
1370         });
1371         $$args{step} *= 2 unless $$args{step} == 256;
1372     }
1373
1374     $$args{progress}++;
1375 }
1376
1377
1378
1379
1380
1381 __PACKAGE__->register_method(  
1382     api_name    => "open-ils.vandelay.bib_queue.owner.retrieve",
1383     method      => 'owner_queue_retrieve',
1384     api_level   => 1,
1385     argc        => 2,
1386     stream      => 1,
1387     record_type => 'bib'
1388 );
1389 __PACKAGE__->register_method(  
1390     api_name    => "open-ils.vandelay.authority_queue.owner.retrieve",
1391     method      => 'owner_queue_retrieve',
1392     api_level   => 1,
1393     argc        => 2,
1394     stream      => 1,
1395     record_type => 'auth'
1396 );
1397
1398 sub owner_queue_retrieve {
1399     my($self, $conn, $auth, $owner_id, $filters) = @_;
1400     my $e = new_editor(authtoken => $auth, xact => 1);
1401     return $e->die_event unless $e->checkauth;
1402     $owner_id = $e->requestor->id; # XXX add support for viewing other's queues?
1403     my $queues;
1404     $filters ||= {};
1405     my $search = {owner => $owner_id};
1406     $search->{$_} = $filters->{$_} for keys %$filters;
1407
1408     if($self->{record_type} eq 'bib') {
1409         $queues = $e->search_vandelay_bib_queue(
1410             [$search, {order_by => {vbq => 'evergreen.lowercase(name)'}}]);
1411     } else {
1412         $queues = $e->search_vandelay_authority_queue(
1413             [$search, {order_by => {vaq => 'evergreen.lowercase(name)'}}]);
1414     }
1415     $conn->respond($_) for @$queues;
1416     $e->rollback;
1417     return undef;
1418 }
1419
1420 __PACKAGE__->register_method(  
1421     api_name    => "open-ils.vandelay.bib_queue.delete",
1422     method      => "delete_queue",
1423     api_level   => 1,
1424     argc        => 2,
1425     record_type => 'bib'
1426 );            
1427 __PACKAGE__->register_method(  
1428     api_name    => "open-ils.vandelay.auth_queue.delete",
1429     method      => "delete_queue",
1430     api_level   => 1,
1431     argc        => 2,
1432     record_type => 'auth'
1433 );  
1434
1435 sub delete_queue {
1436     my($self, $conn, $auth, $q_id) = @_;
1437     my $e = new_editor(xact => 1, authtoken => $auth);
1438     return $e->die_event unless $e->checkauth;
1439     if($self->{record_type} eq 'bib') {
1440         return $e->die_event unless $e->allowed('CREATE_BIB_IMPORT_QUEUE');
1441         my $queue = $e->retrieve_vandelay_bib_queue($q_id)
1442             or return $e->die_event;
1443         $e->delete_vandelay_bib_queue($queue)
1444             or return $e->die_event;
1445     } else {
1446            return $e->die_event unless $e->allowed('CREATE_AUTHORITY_IMPORT_QUEUE');
1447         my $queue = $e->retrieve_vandelay_authority_queue($q_id)
1448             or return $e->die_event;
1449         $e->delete_vandelay_authority_queue($queue)
1450             or return $e->die_event;
1451     }
1452     $e->commit;
1453     return 1;
1454 }
1455
1456
1457 __PACKAGE__->register_method(  
1458     api_name    => "open-ils.vandelay.queued_bib_record.html",
1459     method      => 'queued_record_html',
1460     api_level   => 1,
1461     argc        => 2,
1462     stream      => 1,
1463     record_type => 'bib'
1464 );
1465 __PACKAGE__->register_method(  
1466     api_name    => "open-ils.vandelay.queued_authority_record.html",
1467     method      => 'queued_record_html',
1468     api_level   => 1,
1469     argc        => 2,
1470     stream      => 1,
1471     record_type => 'auth'
1472 );
1473
1474 sub queued_record_html {
1475     my($self, $conn, $auth, $rec_id) = @_;
1476     my $e = new_editor(xact=>1,authtoken => $auth);
1477     return $e->die_event unless $e->checkauth;
1478     my $rec;
1479     if($self->{record_type} eq 'bib') {
1480         $rec = $e->retrieve_vandelay_queued_bib_record($rec_id)
1481             or return $e->die_event;
1482     } else {
1483         $rec = $e->retrieve_vandelay_queued_authority_record($rec_id)
1484             or return $e->die_event;
1485     }
1486
1487     $e->rollback;
1488     return $U->simplereq(
1489         'open-ils.search',
1490         'open-ils.search.biblio.record.html', undef, 1, $rec->marc);
1491 }
1492
1493
1494 __PACKAGE__->register_method(  
1495     api_name    => "open-ils.vandelay.bib_queue.summary.retrieve", 
1496     method      => 'retrieve_queue_summary',
1497     api_level   => 1,
1498     argc        => 2,
1499     stream      => 1,
1500     record_type => 'bib'
1501 );
1502 __PACKAGE__->register_method(  
1503     api_name    => "open-ils.vandelay.auth_queue.summary.retrieve",
1504     method      => 'retrieve_queue_summary',
1505     api_level   => 1,
1506     argc        => 2,
1507     stream      => 1,
1508     record_type => 'auth'
1509 );
1510
1511 sub retrieve_queue_summary {
1512     my($self, $conn, $auth, $queue_id) = @_;
1513     my $e = new_editor(xact=>1, authtoken => $auth);
1514     return $e->die_event unless $e->checkauth;
1515
1516     my $queue;
1517     my $type = $self->{record_type};
1518     if($type eq 'bib') {
1519         $queue = $e->retrieve_vandelay_bib_queue($queue_id)
1520             or return $e->die_event;
1521     } else {
1522         $queue = $e->retrieve_vandelay_authority_queue($queue_id)
1523             or return $e->die_event;
1524     }
1525
1526     my $evt = check_queue_perms($e, $type, $queue);
1527     return $evt if $evt;
1528
1529     my $search = 'search_vandelay_queued_bib_record';
1530     $search =~ s/bib/authority/ if $type ne 'bib';
1531
1532     my $summary = {
1533         queue => $queue,
1534         total => scalar(@{$e->$search({queue => $queue_id}, {idlist=>1})}),
1535         imported => scalar(@{$e->$search({queue => $queue_id, import_time => {'!=' => undef}}, {idlist=>1})}),
1536     };
1537
1538     my $class = ($type eq 'bib') ? 'vqbr' : 'vqar';
1539     $summary->{rec_import_errors} = $e->json_query({
1540         select => {$class => [{alias => 'count', column => 'id', transform => 'count', aggregate => 1}]},
1541         from => $class,
1542         where => {queue => $queue_id, import_error => {'!=' => undef}}
1543     })->[0]->{count};
1544
1545     if($type eq 'bib') {
1546         
1547         # count of all items attached to records in the queue in question
1548         my $query = {
1549             select => {vii => [{alias => 'count', column => 'id', transform => 'count', aggregate => 1}]},
1550             from => 'vii',
1551             where => {
1552                 record => {
1553                     in => {
1554                         select => {vqbr => ['id']},
1555                         from => 'vqbr',
1556                         where => {queue => $queue_id}
1557                     }
1558                 }
1559             }
1560         };
1561         $summary->{total_items} = $e->json_query($query)->[0]->{count};
1562
1563         # count of items we attempted to import, but errored, attached to records in the queue in question
1564         $query->{where}->{import_error} = {'!=' => undef};
1565         $summary->{item_import_errors} = $e->json_query($query)->[0]->{count};
1566
1567         # count of items we successfully imported attached to records in the queue in question
1568         delete $query->{where}->{import_error};
1569         $query->{where}->{import_time} = {'!=' => undef};
1570         $summary->{total_items_imported} = $e->json_query($query)->[0]->{count};
1571     }
1572
1573     return $summary;
1574 }
1575
1576 # --------------------------------------------------------------------------------
1577 # Given a list of queued record IDs, imports all items attached to those records
1578 # --------------------------------------------------------------------------------
1579 sub import_record_asset_list_impl {
1580     my($conn, $rec_ids, $requestor) = @_;
1581
1582     my $roe = new_editor(xact=> 1, requestor => $requestor);
1583
1584     # for speed, filter out any records have not been 
1585     # imported or have no import items to load
1586     $rec_ids = $roe->json_query({
1587         select => {vqbr => ['id']},
1588         from => {vqbr => 'vii'},
1589         where => {'+vqbr' => {
1590             id => $rec_ids,
1591             import_time => {'!=' => undef}
1592         }},
1593         distinct => 1
1594     });
1595     $rec_ids = [map {$_->{id}} @$rec_ids];
1596
1597     my $report_args = {
1598         conn => $conn,
1599         total => scalar(@$rec_ids),
1600         step => 1, # how often to respond
1601         progress => 1,
1602         in_count => 0,
1603     };
1604
1605     for my $rec_id (@$rec_ids) {
1606         my $rec = $roe->retrieve_vandelay_queued_bib_record($rec_id);
1607         my $item_ids = $roe->search_vandelay_import_item(
1608             {record => $rec->id, import_error => undef}, 
1609             {idlist=>1}
1610         );
1611
1612         # if any items have no call_number label and a value should be
1613         # applied automatically (via org settings), we want to use the same
1614         # call number label for every copy per org per record.
1615         my $auto_callnumber = {};
1616
1617         for my $item_id (@$item_ids) {
1618             my $e = new_editor(requestor => $requestor, xact => 1);
1619             my $item = $e->retrieve_vandelay_import_item($item_id);
1620             my ($copy, $vol, $evt);
1621
1622             $$report_args{e} = $e;
1623             $$report_args{evt} = undef;
1624             $$report_args{import_item} = $item;
1625             $$report_args{import_error} = undef;
1626
1627             if (my $copy_id = $item->internal_id) { # assignment
1628                 # copy matches an existing copy.  Overlay instead of create.
1629
1630                 $logger->info("vl: performing copy overlay for internal_id=$copy_id");
1631
1632                 my $qt = $e->json_query({
1633                     select => {vbq => ['queue_type']},
1634                     from => {vqbr => 'vbq'},
1635                     where => {'+vqbr' => {id => $rec_id}}
1636                 })->[0]->{queue_type};
1637
1638                 if ($qt eq 'acq') {
1639                     # internal_id for ACQ queues refers to acq.lineitem_detail.id
1640                     # pull the real copy id from the acq LID
1641
1642                     my $lid = $e->retrieve_acq_lineitem_detail($copy_id);
1643                     if (!$lid) {
1644                         $$report_args{evt} = $e->die_event;
1645                         respond_with_status($report_args);
1646                         next;
1647                     }
1648                     $copy_id = $lid->eg_copy_id;
1649                     $logger->info("vl: performing ACQ copy overlay for copy $copy_id");
1650                 }
1651
1652                 $copy = $e->search_asset_copy([
1653                     {id => $copy_id, deleted => 'f'},
1654                     {flesh => 1, flesh_fields => {acp => ['call_number']}}
1655                 ])->[0];
1656
1657                 if (!$copy) {
1658                     $$report_args{evt} = $e->die_event;
1659                     respond_with_status($report_args);
1660                     next;
1661                 }
1662
1663                 # prevent update of unrelated copies
1664                 if ($copy->call_number->record != $rec->imported_as) {
1665                     $logger->info("vl: attempt to overlay unrelated copy=$copy_id; rec=".$rec->imported_as);
1666
1667                     $evt = OpenILS::Event->new('INVALID_IMPORT_COPY_ID', 
1668                         note => 'Cannot overlay copies for unlinked bib',
1669                         bre => $rec->imported_as, 
1670                         copy_id => $copy_id
1671                     );
1672                     $$report_args{evt} = $evt;
1673                     respond_with_status($report_args);
1674                     next;
1675                 }
1676
1677                 # overlaying copies requires an extra permission
1678                 if (!$e->allowed("IMPORT_OVERLAY_COPY", $copy->call_number->owning_lib)) {
1679                     $$report_args{evt} = $e->die_event;
1680                     respond_with_status($report_args);
1681                     next;
1682                 }
1683
1684                 # are we updating the call-number?
1685                 if ($item->call_number and $item->call_number ne $copy->call_number->label) {
1686
1687                     my $count = $e->json_query({
1688                         select => {acp => [{
1689                             alias => 'count', 
1690                             column => 'id', 
1691                             transform => 'count', 
1692                             aggregate => 1
1693                         }]},
1694                         from => 'acp',
1695                         where => {
1696                             deleted => 'f',
1697                             call_number => $copy->call_number->id
1698                         }
1699                     })->[0]->{count};
1700
1701                     if ($count == 1) {
1702                         # if this is the only copy attached to this 
1703                         # callnumber, just update the callnumber
1704
1705                         $logger->info("vl: updating callnumber label in copy overlay");
1706
1707                         $copy->call_number->label($item->call_number);
1708                         if (!$e->update_asset_call_number($copy->call_number)) {
1709                             $$report_args{evt} = $e->die_event;
1710                             respond_with_status($report_args);
1711                             next;
1712                         }
1713
1714                     } else {
1715
1716                         # otherwise, move the copy to a new/existing 
1717                         # call-number with the given label/owner
1718                         # note that overlay does not allow the owning_lib 
1719                         # to be changed.  Should it?
1720
1721                         $logger->info("vl: moving copy to new callnumber in copy overlay");
1722
1723                         ($vol, $evt) =
1724                             OpenILS::Application::Cat::AssetCommon->find_or_create_volume(
1725                                 $e, $item->call_number, 
1726                                 $copy->call_number->record, 
1727                                 $copy->call_number->owning_lib
1728                             );
1729
1730                         if($evt) {
1731                             $$report_args{evt} = $evt;
1732                             respond_with_status($report_args);
1733                             next;
1734                         }
1735
1736                         $copy->call_number($vol);
1737                     }
1738                 } # cn-update
1739
1740                 # for every field that has a non-'' value, overlay the copy value
1741                 foreach (qw/ barcode location circ_lib status 
1742                     circulate deposit deposit_amount ref holdable 
1743                     price circ_as_type alert_message opac_visible circ_modifier/) {
1744
1745                     my $val = $item->$_();
1746                     $copy->$_($val) if $val and $val ne '';
1747                 }
1748
1749                 # de-flesh for update
1750                 $copy->call_number($copy->call_number->id);
1751                 $copy->ischanged(1);
1752
1753                 $evt = OpenILS::Application::Cat::AssetCommon->
1754                     update_fleshed_copies($e, {all => 1}, undef, [$copy]);
1755
1756                 if($evt) {
1757                     $$report_args{evt} = $evt;
1758                     respond_with_status($report_args);
1759                     next;
1760                 }
1761
1762             } else { 
1763
1764                 # Creating a new copy
1765                 $logger->info("vl: creating new copy in import");
1766
1767                 # appply defaults values from org settings as needed
1768                 # if $auto_callnumber is unset, it will be set within
1769                 apply_import_item_defaults($e, $item, $auto_callnumber);
1770
1771                 # --------------------------------------------------------------------------------
1772                 # Find or create the volume
1773                 # --------------------------------------------------------------------------------
1774                 my ($vol, $evt) =
1775                     OpenILS::Application::Cat::AssetCommon->find_or_create_volume(
1776                         $e, $item->call_number, $rec->imported_as, $item->owning_lib);
1777
1778                 if($evt) {
1779                     $$report_args{evt} = $evt;
1780                     respond_with_status($report_args);
1781                     next;
1782                 }
1783
1784                 # --------------------------------------------------------------------------------
1785                 # Create the new copy
1786                 # --------------------------------------------------------------------------------
1787                 $copy = Fieldmapper::asset::copy->new;
1788                 $copy->loan_duration(2);
1789                 $copy->fine_level(2);
1790                 $copy->barcode($item->barcode);
1791                 $copy->location($item->location);
1792                 $copy->circ_lib($item->circ_lib || $item->owning_lib);
1793                 $copy->status( defined($item->status) ? $item->status : OILS_COPY_STATUS_IN_PROCESS );
1794                 $copy->circulate($item->circulate);
1795                 $copy->deposit($item->deposit);
1796                 $copy->deposit_amount($item->deposit_amount);
1797                 $copy->ref($item->ref);
1798                 $copy->holdable($item->holdable);
1799                 $copy->price($item->price);
1800                 $copy->circ_as_type($item->circ_as_type);
1801                 $copy->alert_message($item->alert_message);
1802                 $copy->opac_visible($item->opac_visible);
1803                 $copy->circ_modifier($item->circ_modifier);
1804
1805                 # --------------------------------------------------------------------------------
1806                 # Check for dupe barcode
1807                 # --------------------------------------------------------------------------------
1808                 if($evt = OpenILS::Application::Cat::AssetCommon->create_copy($e, $vol, $copy)) {
1809                     $$report_args{evt} = $evt;
1810                     $$report_args{import_error} = 'import.item.duplicate.barcode'
1811                         if $evt->{textcode} eq 'ITEM_BARCODE_EXISTS';
1812                     respond_with_status($report_args);
1813                     next;
1814                 }
1815
1816                 # --------------------------------------------------------------------------------
1817                 # create copy notes
1818                 # --------------------------------------------------------------------------------
1819                 $evt = OpenILS::Application::Cat::AssetCommon->create_copy_note(
1820                     $e, $copy, '', $item->pub_note, 1) if $item->pub_note;
1821
1822                 if($evt) {
1823                     $$report_args{evt} = $evt;
1824                     respond_with_status($report_args);
1825                     next;
1826                 }
1827
1828                 $evt = OpenILS::Application::Cat::AssetCommon->create_copy_note(
1829                     $e, $copy, '', $item->priv_note) if $item->priv_note;
1830
1831                 if($evt) {
1832                     $$report_args{evt} = $evt;
1833                     respond_with_status($report_args);
1834                     next;
1835                 }
1836             }
1837
1838             # set the import data on the import item
1839             $item->imported_as($copy->id); # $copy->id is set by create_copy() ^--
1840             $item->import_time('now');
1841
1842             unless($e->update_vandelay_import_item($item)) {
1843                 $$report_args{evt} = $e->die_event;
1844                 respond_with_status($report_args);
1845                 next;
1846             }
1847
1848             # --------------------------------------------------------------------------------
1849             # Item import succeeded
1850             # --------------------------------------------------------------------------------
1851             $e->commit;
1852             $$report_args{in_count}++;
1853             respond_with_status($report_args);
1854             $logger->info("vl: successfully imported item " . $item->barcode);
1855         }
1856     }
1857
1858     $roe->rollback;
1859     return undef;
1860 }
1861
1862 sub apply_import_item_defaults {
1863     my ($e, $item, $auto_cn) = @_;
1864     my $org = $item->owning_lib || $item->circ_lib;
1865     my %c = %item_defaults_cache;  
1866
1867     # fetch and cache the org unit setting value (unless 
1868     # it's already cached) and return the value to the caller
1869     my $set = sub {
1870         my $name = shift;
1871         return $c{$org}{$name} if defined $c{$org}{$name};
1872         my $sname = "vandelay.item.$name";
1873         $c{$org}{$name} = $U->ou_ancestor_setting_value($org, $sname, $e);
1874         $c{$org}{$name} = '' unless defined $c{$org}{$name};
1875         return $c{$org}{$name};
1876     };
1877
1878     if (!$item->barcode) {
1879
1880         if ($set->('barcode.auto')) {
1881
1882             my $pfx = $set->('barcode.prefix') || 'VAN';
1883             my $barcode = $pfx . $item->record . $item->id;
1884
1885             $logger->info("vl: using auto barcode $barcode for ".$item->id);
1886             $item->barcode($barcode);
1887
1888         } else {
1889             $logger->error("vl: no barcode (or defualt) for item ".$item->id);
1890         }
1891     }
1892
1893     if (!$item->call_number) {
1894
1895         if ($set->('call_number.auto')) {
1896
1897             if (!$auto_cn->{$org}) {
1898                 my $pfx = $set->('call_number.prefix') || 'VAN';
1899
1900                 # use the ID of the first item to differentiate this 
1901                 # call number from others linked to the same record
1902                 $auto_cn->{$org} = $pfx . $item->record . $item->id;
1903             }
1904
1905             $logger->info("vl: using auto call number ".$auto_cn->{$org});
1906             $item->call_number($auto_cn->{$org});
1907
1908         } else {
1909             $logger->error("vl: no call number or default for item ".$item->id);
1910         }
1911     }
1912 }
1913
1914
1915 sub respond_with_status {
1916     my $args = shift;
1917     my $e = $$args{e};
1918
1919     #  If the import failed, track the failure reason
1920
1921     my $error = $$args{import_error};
1922     my $evt = $$args{evt};
1923
1924     if($error or $evt) {
1925
1926         my $item = $$args{import_item};
1927         $logger->info("vl: unable to import item " . $item->barcode);
1928
1929         $error ||= 'general.unknown';
1930         $item->import_error($error);
1931
1932         if($evt) {
1933             my $detail = sprintf("%s : %s", $evt->{textcode}, substr($evt->{desc}, 0, 140));
1934             $item->error_detail($detail);
1935         }
1936
1937         # state of the editor is unknown at this point.  Force a rollback and start over.
1938         $e->rollback;
1939         $e = new_editor(xact => 1);
1940         $e->update_vandelay_import_item($item);
1941         $e->commit;
1942     }
1943
1944     if($$args{report_all} or ($$args{progress} % $$args{step}) == 0) {
1945         $$args{conn}->respond({
1946             total => $$args{total},
1947             progress => $$args{progress},
1948             success_count => $$args{success_count},
1949             err_event => $evt
1950         });
1951         $$args{step} *= 2 unless $$args{step} == 256;
1952     }
1953
1954     $$args{progress}++;
1955 }
1956
1957 __PACKAGE__->register_method(  
1958     api_name    => "open-ils.vandelay.match_set.get_tree",
1959     method      => "match_set_get_tree",
1960     api_level   => 1,
1961     argc        => 2,
1962     signature   => {
1963         desc    => q/For a given vms object, return a tree of match set points
1964                     represented by a vmsp object with recursively fleshed
1965                     children./
1966     }
1967 );
1968
1969 sub match_set_get_tree {
1970     my ($self, $conn, $authtoken, $match_set_id) = @_;
1971
1972     $match_set_id = int($match_set_id) or return;
1973
1974     my $e = new_editor("authtoken" => $authtoken);
1975     $e->checkauth or return $e->die_event;
1976
1977     my $set = $e->retrieve_vandelay_match_set($match_set_id) or
1978         return $e->die_event;
1979
1980     $e->allowed("ADMIN_IMPORT_MATCH_SET", $set->owner) or
1981         return $e->die_event;
1982
1983     my $tree = $e->search_vandelay_match_set_point([
1984         {"match_set" => $match_set_id, "parent" => undef},
1985         {"flesh" => -1, "flesh_fields" => {"vmsp" => ["children"]}}
1986     ]) or return $e->die_event;
1987
1988     return pop @$tree;
1989 }
1990
1991
1992 __PACKAGE__->register_method(
1993     api_name    => "open-ils.vandelay.match_set.update",
1994     method      => "match_set_update_tree",
1995     api_level   => 1,
1996     argc        => 3,
1997     signature   => {
1998         desc => q/Replace any vmsp objects associated with a given (by ID) vms
1999                 with the given objects (recursively fleshed vmsp tree)./
2000     }
2001 );
2002
2003 sub _walk_new_vmsp {
2004     my ($e, $match_set_id, $node, $parent_id) = @_;
2005
2006     my $point = new Fieldmapper::vandelay::match_set_point;
2007     $point->parent($parent_id);
2008     $point->match_set($match_set_id);
2009     $point->$_($node->$_) for (qw/bool_op svf tag subfield negate quality/);
2010
2011     $e->create_vandelay_match_set_point($point) or return $e->die_event;
2012
2013     $parent_id = $e->data->id;
2014     if ($node->children && @{$node->children}) {
2015         for (@{$node->children}) {
2016             return $e->die_event if
2017                 _walk_new_vmsp($e, $match_set_id, $_, $parent_id);
2018         }
2019     }
2020
2021     return;
2022 }
2023
2024 sub match_set_update_tree {
2025     my ($self, $conn, $authtoken, $match_set_id, $tree) = @_;
2026
2027     my $e = new_editor("xact" => 1, "authtoken" => $authtoken);
2028     $e->checkauth or return $e->die_event;
2029
2030     my $set = $e->retrieve_vandelay_match_set($match_set_id) or
2031         return $e->die_event;
2032
2033     $e->allowed("ADMIN_IMPORT_MATCH_SET", $set->owner) or
2034         return $e->die_event;
2035
2036     my $existing = $e->search_vandelay_match_set_point([
2037         {"match_set" => $match_set_id},
2038         {"order_by" => {"vmsp" => "id DESC"}}
2039     ]) or return $e->die_event;
2040
2041     # delete points, working up from leaf points to the root
2042     while(@$existing) {
2043         for my $point (shift @$existing) {
2044             if( grep {$_->parent eq $point->id} @$existing) {
2045                 push(@$existing, $point);
2046             } else {
2047                 $e->delete_vandelay_match_set_point($point) or return $e->die_event;
2048             }
2049         }
2050     }
2051
2052     _walk_new_vmsp($e, $match_set_id, $tree);
2053
2054     $e->commit or return $e->die_event;
2055 }
2056
2057 __PACKAGE__->register_method(  
2058     api_name    => 'open-ils.vandelay.bib_queue.to_bucket',
2059     method      => 'bib_queue_to_bucket',
2060     api_level   => 1,
2061     argc        => 2,
2062     signature   => {
2063         desc    => q/Add to or create a new bib container (bucket) with the successfully 
2064                     imported contents of a vandelay queue.  Any user that has Vandelay 
2065                     queue create permissions can append or create buckets from his-her own queues./,
2066         params  => [
2067             {desc => 'Authtoken', type => 'string'},
2068             {desc => 'Queue ID', type => 'number'},
2069             {desc => 'Bucket Name', type => 'string'}
2070         ],
2071         return  => {desc => q/
2072             {bucket => $bucket, addcount => number-of-items-added-to-bucket, item_count => total-bucket-item-count} on success,
2073             {add_count => 0} if there is nothing to do, and Event on error/}
2074     }
2075 );
2076
2077 sub bib_queue_to_bucket {
2078     my ($self, $conn, $auth, $q_id, $bucket_name) = @_;
2079
2080     my $e = new_editor(xact => 1, authtoken => $auth);
2081     return $e->die_event unless $e->checkauth;
2082     
2083     my $queue = $e->retrieve_vandelay_bib_queue($q_id)
2084         or return $e->die_event;
2085
2086     return OpenILS::Event->new('BAD_PARAMS', 
2087         note => q/Bucket creator must be queue owner/)
2088         unless $queue->owner == $e->requestor->id;
2089
2090     # find the bib IDs that will go into the bucket
2091     my $bib_ids = $e->json_query({
2092         select => {vqbr => ['imported_as']},
2093         from => 'vqbr',
2094         where => {queue => $q_id, imported_as => {'!=' => undef}}
2095     });
2096
2097     if (!@$bib_ids) { # no records to add
2098         $e->rollback;
2099         return {add_count => 0};
2100     }
2101
2102     # allow user to add to an existing bucket by name
2103     my $bucket = $e->search_container_biblio_record_entry_bucket({
2104         owner => $e->requestor->id, 
2105         name => $bucket_name,
2106         btype => 'vandelay_queue'
2107     })->[0];
2108
2109     # if the bucket does not exist, create a new one
2110     if (!$bucket) { 
2111         $bucket = Fieldmapper::container::biblio_record_entry_bucket->new;
2112         $bucket->name($bucket_name);
2113         $bucket->owner($e->requestor->id);
2114         $bucket->btype('vandelay_queue');
2115
2116         $e->create_container_biblio_record_entry_bucket($bucket)
2117             or return $e->die_event;
2118     }
2119
2120     # create the new bucket items
2121     for my $bib_id ( map {$_->{imported_as}} @$bib_ids ) {
2122         my $item = Fieldmapper::container::biblio_record_entry_bucket_item->new;
2123         $item->target_biblio_record_entry($bib_id);
2124         $item->bucket($bucket->id);
2125         $e->create_container_biblio_record_entry_bucket_item($item)
2126             or return $e->die_event;
2127     }
2128
2129     # re-fetch the bucket to pick up the correct create_time
2130     $bucket = $e->retrieve_container_biblio_record_entry_bucket($bucket->id)
2131         or return $e->die_event;
2132
2133     # get the total count of items in this bucket
2134     my $count = $e->json_query({
2135         select => {cbrebi => [{
2136             aggregate =>  1,
2137             transform => 'count',
2138             alias => 'count',
2139             column => 'id'
2140         }]},
2141         from => 'cbrebi',
2142         where => {bucket => $bucket->id}
2143     })->[0];
2144
2145     $e->commit;
2146
2147     return {
2148         bucket => $bucket, 
2149         add_count => scalar(@$bib_ids), # items added to the bucket
2150         item_count => $count->{count} # total items in buckets
2151     };
2152 }
2153
2154 1;