]> git.evergreen-ils.org Git - working/Evergreen.git/blob - Open-ILS/src/perlmods/lib/OpenILS/Application/Vandelay.pm
576623128f113ab392d3aa130eba94ea945bfa6e
[working/Evergreen.git] / Open-ILS / src / perlmods / lib / OpenILS / Application / Vandelay.pm
1 package OpenILS::Application::Vandelay;
2 use strict; use warnings;
3 use OpenILS::Application;
4 use base qw/OpenILS::Application/;
5 use Unicode::Normalize;
6 use OpenSRF::EX qw/:try/;
7 use OpenSRF::AppSession;
8 use OpenSRF::Utils::SettingsClient;
9 use OpenSRF::Utils::Cache;
10 use OpenILS::Utils::Fieldmapper;
11 use OpenILS::Utils::CStoreEditor qw/:funcs/;
12 use OpenILS::Utils::Normalize qw/clean_marc/;
13 use MARC::Batch;
14 use MARC::Record;
15 use MARC::File::XML ( BinaryEncoding => 'UTF-8' );
16 use Time::HiRes qw(time);
17 use OpenSRF::Utils::Logger qw/$logger/;
18 use MIME::Base64;
19 use XML::LibXML;
20 use OpenILS::Const qw/:const/;
21 use OpenILS::Application::AppUtils;
22 use OpenILS::Application::Cat::BibCommon;
23 use OpenILS::Application::Cat::AuthCommon;
24 use OpenILS::Application::Cat::AssetCommon;
25 my $U = 'OpenILS::Application::AppUtils';
26
27 # A list of LDR/06 values from http://loc.gov/marc
28 my %record_types = (
29         a => 'bib',
30         c => 'bib',
31         d => 'bib',
32         e => 'bib',
33         f => 'bib',
34         g => 'bib',
35         i => 'bib',
36         j => 'bib',
37         k => 'bib',
38         m => 'bib',
39         o => 'bib',
40         p => 'bib',
41         r => 'bib',
42         t => 'bib',
43         u => 'holdings',
44         v => 'holdings',
45         x => 'holdings',
46         y => 'holdings',
47         z => 'auth',
48       ' ' => 'bib',
49 );
50
51 sub initialize {}
52 sub child_init {}
53
54 # --------------------------------------------------------------------------------
55 # Biblio ingest
56
57 sub create_bib_queue {
58     my $self = shift;
59     my $client = shift;
60     my $auth = shift;
61     my $name = shift;
62     my $owner = shift;
63     my $type = shift;
64     my $match_set = shift;
65     my $import_def = shift;
66     my $match_bucket = shift;
67
68     my $e = new_editor(authtoken => $auth, xact => 1);
69
70     return $e->die_event unless $e->checkauth;
71     return $e->die_event unless $e->allowed('CREATE_BIB_IMPORT_QUEUE');
72     $owner ||= $e->requestor->id;
73
74     if ($e->search_vandelay_bib_queue( {name => $name, owner => $owner, queue_type => $type})->[0]) {
75         $e->rollback;
76         return OpenILS::Event->new('BIB_QUEUE_EXISTS') 
77     }
78
79     my $queue = new Fieldmapper::vandelay::bib_queue();
80     $queue->name( $name );
81     $queue->owner( $owner );
82     $queue->queue_type( $type ) if ($type);
83     $queue->item_attr_def( $import_def ) if ($import_def);
84     $queue->match_set($match_set) if $match_set;
85     $queue->match_bucket($match_bucket) if $match_bucket;
86
87     my $new_q = $e->create_vandelay_bib_queue( $queue );
88     return $e->die_event unless ($new_q);
89     $e->commit;
90
91     return $new_q;
92 }
93 __PACKAGE__->register_method(  
94     api_name   => "open-ils.vandelay.bib_queue.create",
95     method     => "create_bib_queue",
96     api_level  => 1,
97     argc       => 4,
98 );                      
99
100
101 sub create_auth_queue {
102     my $self = shift;
103     my $client = shift;
104     my $auth = shift;
105     my $name = shift;
106     my $owner = shift;
107     my $type = shift;
108     my $match_set = shift;
109
110     my $e = new_editor(authtoken => $auth, xact => 1);
111
112     return $e->die_event unless $e->checkauth;
113     return $e->die_event unless $e->allowed('CREATE_AUTHORITY_IMPORT_QUEUE');
114     $owner ||= $e->requestor->id;
115
116     if ($e->search_vandelay_authority_queue({name => $name, owner => $owner, queue_type => $type})->[0]) {
117         $e->rollback;
118         return OpenILS::Event->new('AUTH_QUEUE_EXISTS') 
119     }
120
121     my $queue = new Fieldmapper::vandelay::authority_queue();
122     $queue->name( $name );
123     $queue->owner( $owner );
124     $queue->queue_type( $type ) if ($type);
125
126     my $new_q = $e->create_vandelay_authority_queue( $queue );
127     $e->die_event unless ($new_q);
128     $e->commit;
129
130     return $new_q;
131 }
132 __PACKAGE__->register_method(  
133     api_name   => "open-ils.vandelay.authority_queue.create",
134     method     => "create_auth_queue",
135     api_level  => 1,
136     argc       => 3,
137 );                      
138
139 sub add_record_to_bib_queue {
140     my $self = shift;
141     my $client = shift;
142     my $auth = shift;
143     my $queue = shift;
144     my $marc = shift;
145     my $purpose = shift;
146     my $bib_source = shift;
147
148     my $e = new_editor(authtoken => $auth, xact => 1);
149
150     $queue = $e->retrieve_vandelay_bib_queue($queue);
151
152     return $e->die_event unless $e->checkauth;
153     return $e->die_event unless
154         ($e->allowed('CREATE_BIB_IMPORT_QUEUE', undef, $queue) ||
155          $e->allowed('CREATE_BIB_IMPORT_QUEUE'));
156
157     my $new_rec = _add_bib_rec($e, $marc, $queue->id, $purpose, $bib_source);
158
159     return $e->die_event unless ($new_rec);
160     $e->commit;
161     return $new_rec;
162 }
163 __PACKAGE__->register_method(  
164     api_name   => "open-ils.vandelay.queued_bib_record.create",
165     method     => "add_record_to_bib_queue",
166     api_level  => 1,
167     argc       => 3,
168 );                      
169
170 sub _add_bib_rec {
171     my $e = shift;
172     my $marc = shift;
173     my $queue = shift;
174     my $purpose = shift;
175     my $bib_source = shift;
176
177     my $rec = new Fieldmapper::vandelay::queued_bib_record();
178     $rec->marc( $marc );
179     $rec->queue( $queue );
180     $rec->purpose( $purpose ) if ($purpose);
181     $rec->bib_source($bib_source);
182
183     return $e->create_vandelay_queued_bib_record( $rec );
184 }
185
186 sub add_record_to_authority_queue {
187     my $self = shift;
188     my $client = shift;
189     my $auth = shift;
190     my $queue = shift;
191     my $marc = shift;
192     my $purpose = shift;
193
194     my $e = new_editor(authtoken => $auth, xact => 1);
195
196     $queue = $e->retrieve_vandelay_authority_queue($queue);
197
198     return $e->die_event unless $e->checkauth;
199     return $e->die_event unless
200         ($e->allowed('CREATE_AUTHORITY_IMPORT_QUEUE', undef, $queue) ||
201          $e->allowed('CREATE_AUTHORITY_IMPORT_QUEUE'));
202
203     my $new_rec = _add_auth_rec($e, $marc, $queue->id, $purpose);
204
205     return $e->die_event unless ($new_rec);
206     $e->commit;
207     return $new_rec;
208 }
209 __PACKAGE__->register_method(
210     api_name   => "open-ils.vandelay.queued_authority_record.create",
211     method     => "add_record_to_authority_queue",
212     api_level  => 1,
213     argc       => 3,
214 );
215
216 sub _add_auth_rec {
217     my $e = shift;
218     my $marc = shift;
219     my $queue = shift;
220     my $purpose = shift;
221
222     my $rec = new Fieldmapper::vandelay::queued_authority_record();
223     $rec->marc( $marc );
224     $rec->queue( $queue );
225     $rec->purpose( $purpose ) if ($purpose);
226
227     return $e->create_vandelay_queued_authority_record( $rec );
228 }
229
230 sub process_spool {
231     my $self = shift;
232     my $client = shift;
233     my $auth = shift;
234     my $fingerprint = shift || '';
235     my $queue_id = shift;
236     my $purpose = shift;
237     my $filename = shift;
238     my $bib_source = shift;
239
240     my $e = new_editor(authtoken => $auth, xact => 1);
241     return $e->die_event unless $e->checkauth;
242
243     my $queue;
244     my $type = $self->{record_type};
245
246     if($type eq 'bib') {
247         $queue = $e->retrieve_vandelay_bib_queue($queue_id) or return $e->die_event;
248     } else {
249         $queue = $e->retrieve_vandelay_authority_queue($queue_id) or return $e->die_event;
250     }
251
252     my $evt = check_queue_perms($e, $type, $queue);
253     return $evt if ($evt);
254
255     my $cache = new OpenSRF::Utils::Cache();
256
257     if($fingerprint) {
258         my $data = $cache->get_cache('vandelay_import_spool_' . $fingerprint);
259         $purpose = $data->{purpose};
260         $filename = $data->{path};
261         $bib_source = $data->{bib_source};
262     }
263
264     unless(-r $filename) {
265         $logger->error("unable to read MARC file $filename");
266         return -1; # make this an event XXX
267     }
268
269     $logger->info("vandelay spooling $fingerprint purpose=$purpose file=$filename");
270
271     my $marctype = 'USMARC'; 
272
273     open F, $filename;
274     $marctype = 'XML' if (getc(F) =~ /^\D/o);
275     close F;
276
277     my $batch = new MARC::Batch ($marctype, $filename);
278     $batch->strict_off;
279
280     my $response_scale = 10;
281     my $count = 0;
282     my $r = -1;
283     while (try { $r = $batch->next } otherwise { $r = -1 }) {
284         if ($r == -1) {
285             $logger->warn("Processing of record $count in set $filename failed.  Skipping this record");
286             $count++;
287         }
288
289         $logger->info("processing record $count");
290
291         try {
292             my $xml = clean_marc($r);
293
294             my $qrec;
295             # Check the leader to ensure we've got something resembling the expected
296             # Allow spaces to give records the benefit of the doubt
297             my $ldr_type = substr($r->leader(), 6, 1);
298             if ($type eq 'bib' && ($record_types{$ldr_type}) eq 'bib' || $ldr_type eq ' ') {
299                 $qrec = _add_bib_rec( $e, $xml, $queue_id, $purpose, $bib_source ) or return $e->die_event;
300             } elsif ($type eq 'auth' && ($record_types{$ldr_type}) eq 'auth' || $ldr_type eq ' ') {
301                 $qrec = _add_auth_rec( $e, $xml, $queue_id, $purpose ) or return $e->die_event;
302             } else {
303                 # I don't know how to handle this type; rock on
304                 $logger->error("In process_spool(), type was $type and leader type was $ldr_type ; not currently supported");
305                 next;
306             }
307
308             if($self->api_name =~ /stream_results/ and $qrec) {
309                 $client->respond($qrec->id)
310             } else {
311                 $client->respond($count) if (++$count % $response_scale) == 0;
312                 $response_scale *= 10 if ($count == ($response_scale * 10));
313             }
314         } catch Error with {
315             my $error = shift;
316             $logger->warn("Encountered a bad record at Vandelay ingest: ".$error);
317         }
318     }
319
320     $e->commit;
321     unlink($filename);
322     $cache->delete_cache('vandelay_import_spool_' . $fingerprint) if $fingerprint;
323     return $count;
324 }
325
326 __PACKAGE__->register_method(  
327     api_name    => "open-ils.vandelay.bib.process_spool",
328     method      => "process_spool",
329     api_level   => 1,
330     argc        => 3,
331     max_chunk_size => 0,
332     record_type => 'bib'
333 );                      
334 __PACKAGE__->register_method(  
335     api_name    => "open-ils.vandelay.auth.process_spool",
336     method      => "process_spool",
337     api_level   => 1,
338     argc        => 3,
339     max_chunk_size => 0,
340     record_type => 'auth'
341 );                      
342
343 __PACKAGE__->register_method(  
344     api_name    => "open-ils.vandelay.bib.process_spool.stream_results",
345     method      => "process_spool",
346     api_level   => 1,
347     argc        => 3,
348     stream      => 1,
349     max_chunk_size => 0,
350     record_type => 'bib'
351 );                      
352 __PACKAGE__->register_method(  
353     api_name    => "open-ils.vandelay.auth.process_spool.stream_results",
354     method      => "process_spool",
355     api_level   => 1,
356     argc        => 3,
357     stream      => 1,
358     max_chunk_size => 0,
359     record_type => 'auth'
360 );
361
362 __PACKAGE__->register_method(  
363     api_name    => "open-ils.vandelay.bib_queue.records.retrieve",
364     method      => 'retrieve_queued_records',
365     api_level   => 1,
366     argc        => 2,
367     stream      => 1,
368     record_type => 'bib'
369 );
370 __PACKAGE__->register_method(
371     api_name    => "open-ils.vandelay.bib_queue.records.retrieve.export.print",
372     method      => 'retrieve_queued_records',
373     api_level   => 1,
374     argc        => 2,
375     stream      => 1,
376     record_type => 'bib'
377 );
378 __PACKAGE__->register_method(
379     api_name    => "open-ils.vandelay.bib_queue.records.retrieve.export.csv",
380     method      => 'retrieve_queued_records',
381     api_level   => 1,
382     argc        => 2,
383     stream      => 1,
384     record_type => 'bib'
385 );
386 __PACKAGE__->register_method(
387     api_name    => "open-ils.vandelay.bib_queue.records.retrieve.export.email",
388     method      => 'retrieve_queued_records',
389     api_level   => 1,
390     argc        => 2,
391     stream      => 1,
392     record_type => 'bib'
393 );
394
395 __PACKAGE__->register_method(  
396     api_name    => "open-ils.vandelay.auth_queue.records.retrieve",
397     method      => 'retrieve_queued_records',
398     api_level   => 1,
399     argc        => 2,
400     stream      => 1,
401     record_type => 'auth'
402 );
403 __PACKAGE__->register_method(
404     api_name    => "open-ils.vandelay.auth_queue.records.retrieve.export.print",
405     method      => 'retrieve_queued_records',
406     api_level   => 1,
407     argc        => 2,
408     stream      => 1,
409     record_type => 'auth'
410 );
411 __PACKAGE__->register_method(
412     api_name    => "open-ils.vandelay.auth_queue.records.retrieve.export.csv",
413     method      => 'retrieve_queued_records',
414     api_level   => 1,
415     argc        => 2,
416     stream      => 1,
417     record_type => 'auth'
418 );
419 __PACKAGE__->register_method(
420     api_name    => "open-ils.vandelay.auth_queue.records.retrieve.export.email",
421     method      => 'retrieve_queued_records',
422     api_level   => 1,
423     argc        => 2,
424     stream      => 1,
425     record_type => 'auth'
426 );
427
428 __PACKAGE__->register_method(  
429     api_name    => "open-ils.vandelay.bib_queue.records.matches.retrieve",
430     method      => 'retrieve_queued_records',
431     api_level   => 1,
432     argc        => 2,
433     stream      => 1,
434     record_type => 'bib',
435     signature   => {
436         desc => q/Only retrieve queued bib records that have matches against existing records/
437     }
438 );
439 __PACKAGE__->register_method(  
440     api_name    => "open-ils.vandelay.auth_queue.records.matches.retrieve",
441     method      => 'retrieve_queued_records',
442     api_level   => 1,
443     argc        => 2,
444     stream      => 1,
445     record_type => 'auth',
446     signature   => {
447         desc => q/Only retrieve queued authority records that have matches against existing records/
448     }
449 );
450
451 sub retrieve_queued_records {
452     my($self, $conn, $auth, $queue_id, $options) = @_;
453
454     $options ||= {};
455     my $limit = $$options{limit} || 20;
456     my $offset = $$options{offset} || 0;
457     my $type = $self->{record_type};
458
459     my $e = new_editor(authtoken => $auth, xact => 1);
460     return $e->die_event unless $e->checkauth;
461
462     my $queue;
463     if($type eq 'bib') {
464         $queue = $e->retrieve_vandelay_bib_queue($queue_id) or return $e->die_event;
465     } else {
466         $queue = $e->retrieve_vandelay_authority_queue($queue_id) or return $e->die_event;
467     }
468     my $evt = check_queue_perms($e, $type, $queue);
469     return $evt if ($evt);
470
471     my $class = ($type eq 'bib') ? 'vqbr' : 'vqar';
472     my $mclass = $type eq 'bib' ? 'vbm' : 'vam';
473     my $query = {
474         select => {
475             $class => ['id'],
476             $mclass => [{
477                 column => 'eg_record', 
478                 transform => 'min',
479                 aggregate => 1
480             }]
481         },
482         from => $class,
483         where => {queue => $queue_id},
484         distinct => 1,
485         limit => $limit,
486         offset => $offset,
487     };
488     if($self->api_name =~ /export/) {
489         delete $query->{limit};
490         delete $query->{offset};
491     }
492
493     $query->{where}->{import_time} = undef if $$options{non_imported};
494
495     if($$options{with_import_error}) {
496
497         $query->{from} = {$class => {vii => {type => 'left'}}};
498         $query->{where}->{'-or'} = [
499             {'+vqbr' => {import_error => {'!=' => undef}}},
500             {'+vii' => {import_error => {'!=' => undef}}}
501         ];
502
503     } else {
504         
505         if($$options{with_rec_import_error}) {
506             $query->{where}->{import_error} = {'!=' => undef};
507
508         } elsif( $$options{with_item_import_error} and $type eq 'bib') {
509
510             $query->{from} = {$class => 'vii'};
511             $query->{where}->{'+vii'} = {import_error => {'!=' => undef}};
512         }
513     }
514
515     if($self->api_name =~ /matches/) {
516         # find only records that have matches
517         $query->{from} = {$class => {$mclass => {type => 'right'}}};
518     } else {
519         # join to mclass for sorting (see below)
520         $query->{from} = {$class => {$mclass => {type => 'left'}}};
521     }
522
523     # order by the matched bib records to group like queued records
524     $query->{order_by} = [
525         {class => $mclass, field => 'eg_record', transform => 'min'},
526         {class => $class, field => 'id'} 
527     ];
528
529     my $record_ids = $e->json_query($query);
530
531     my $retrieve = ($type eq 'bib') ? 
532         'retrieve_vandelay_queued_bib_record' : 'retrieve_vandelay_queued_authority_record';
533     my $search = ($type eq 'bib') ? 
534         'search_vandelay_queued_bib_record' : 'search_vandelay_queued_authority_record';
535
536     if ($self->api_name =~ /export/) {
537         my $rec_list = $e->$search({id => [map { $_->{id} } @$record_ids]}, {substream => 1});
538         if ($self->api_name =~ /print/) {
539
540             $e->rollback;
541             return $U->fire_object_event(
542                 undef,
543                 'vandelay.queued_'.$type.'_record.print',
544                 $rec_list,
545                 $e->requestor->ws_ou
546             );
547
548         } elsif ($self->api_name =~ /csv/) {
549
550             $e->rollback;
551             return $U->fire_object_event(
552                 undef,
553                 'vandelay.queued_'.$type.'_record.csv',
554                 $rec_list,
555                 $e->requestor->ws_ou
556             );
557
558         } elsif ($self->api_name =~ /email/) {
559
560             $conn->respond_complete(1);
561
562             for my $rec (@$rec_list) {
563                 $U->create_events_for_hook(
564                     'vandelay.queued_'.$type.'_record.email',
565                     $rec,
566                     $e->requestor->home_ou,
567                     undef,
568                     undef,
569                     1
570                 );
571             }
572
573         }
574     } else {
575         for my $rec_id (@$record_ids) {
576             my $flesh = ['attributes', 'matches'];
577             push(@$flesh, 'import_items') if $$options{flesh_import_items};
578             my $params = {flesh => 1, flesh_fields => {$class => $flesh}};
579             my $rec = $e->$retrieve([$rec_id->{id}, $params]);
580             $rec->clear_marc if $$options{clear_marc};
581             $conn->respond($rec);
582         }
583     }
584
585     $e->rollback;
586     return undef;
587 }
588
589 __PACKAGE__->register_method(  
590     api_name    => 'open-ils.vandelay.import_item.queue.retrieve',
591     method      => 'retrieve_queue_import_items',
592     api_level   => 1,
593     argc        => 2,
594     stream      => 1,
595     authoritative => 1,
596     signature => q/
597         Returns Import Item (vii) objects for the selected queue.
598         Filter options:
599             with_import_error : only return items that failed to import
600     /
601 );
602 __PACKAGE__->register_method(
603     api_name    => 'open-ils.vandelay.import_item.queue.export.print',
604     method      => 'retrieve_queue_import_items',
605     api_level   => 1,
606     argc        => 2,
607     stream      => 1,
608     authoritative => 1,
609     signature => q/
610         Returns template-generated printable output of Import Item (vii) objects for the selected queue.
611         Filter options:
612             with_import_error : only return items that failed to import
613     /
614 );
615 __PACKAGE__->register_method(
616     api_name    => 'open-ils.vandelay.import_item.queue.export.csv',
617     method      => 'retrieve_queue_import_items',
618     api_level   => 1,
619     argc        => 2,
620     stream      => 1,
621     authoritative => 1,
622     signature => q/
623         Returns template-generated CSV output of Import Item (vii) objects for the selected queue.
624         Filter options:
625             with_import_error : only return items that failed to import
626     /
627 );
628 __PACKAGE__->register_method(
629     api_name    => 'open-ils.vandelay.import_item.queue.export.email',
630     method      => 'retrieve_queue_import_items',
631     api_level   => 1,
632     argc        => 2,
633     stream      => 1,
634     authoritative => 1,
635     signature => q/
636         Emails template-generated output of Import Item (vii) objects for the selected queue.
637         Filter options:
638             with_import_error : only return items that failed to import
639     /
640 );
641
642 sub retrieve_queue_import_items {
643     my($self, $conn, $auth, $q_id, $options) = @_;
644
645     $options ||= {};
646     my $limit = $$options{limit} || 20;
647     my $offset = $$options{offset} || 0;
648
649     my $e = new_editor(authtoken => $auth);
650     return $e->event unless $e->checkauth;
651
652     my $queue = $e->retrieve_vandelay_bib_queue($q_id) or return $e->event;
653     my $evt = check_queue_perms($e, 'bib', $queue);
654     return $evt if $evt;
655
656     my $query = {
657         select => {vii => ['id']},
658         from => {
659             vii => {
660                 vqbr => {
661                     join => {
662                         'vbq' => {
663                             field => 'id',
664                             fkey => 'queue',
665                             filter => {id => $q_id}
666                         }
667                     }
668                 }
669             }
670         },
671         order_by => {'vii' => ['record','id']},
672         limit => $limit,
673         offset => $offset
674     };
675     if($self->api_name =~ /export/) {
676         delete $query->{limit};
677         delete $query->{offset};
678     }
679
680     $query->{where} = {'+vii' => {import_error => {'!=' => undef}}}
681         if $$options{with_import_error};
682
683     my $items = $e->json_query($query);
684     my $item_list = $e->search_vandelay_import_item({id => [map { $_->{id} } @$items]});
685     if ($self->api_name =~ /export/) {
686         if ($self->api_name =~ /print/) {
687
688             return $U->fire_object_event(
689                 undef,
690                 'vandelay.import_items.print',
691                 $item_list,
692                 $e->requestor->ws_ou
693             );
694
695         } elsif ($self->api_name =~ /csv/) {
696
697             return $U->fire_object_event(
698                 undef,
699                 'vandelay.import_items.csv',
700                 $item_list,
701                 $e->requestor->ws_ou
702             );
703
704         } elsif ($self->api_name =~ /email/) {
705
706             $conn->respond_complete(1);
707
708             for my $item (@$item_list) {
709                 $U->create_events_for_hook(
710                     'vandelay.import_items.email',
711                     $item,
712                     $e->requestor->home_ou,
713                     undef,
714                     undef,
715                     1
716                 );
717             }
718
719         }
720     } else {
721         for my $item (@$item_list) {
722             $conn->respond($item);
723         }
724     }
725
726     return undef;
727 }
728
729 sub check_queue_perms {
730     my($e, $type, $queue) = @_;
731     if ($type eq 'bib') {
732         return $e->die_event unless
733             ($e->allowed('CREATE_BIB_IMPORT_QUEUE', undef, $queue) ||
734              $e->allowed('CREATE_BIB_IMPORT_QUEUE'));
735     } else {
736         return $e->die_event unless
737             ($e->allowed('CREATE_AUTHORITY_IMPORT_QUEUE', undef, $queue) ||
738              $e->allowed('CREATE_AUTHORITY_IMPORT_QUEUE'));
739     }
740
741     return undef;
742 }
743
744 __PACKAGE__->register_method(  
745     api_name    => "open-ils.vandelay.bib_record.list.import",
746     method      => 'import_record_list',
747     api_level   => 1,
748     argc        => 2,
749     stream      => 1,
750     record_type => 'bib'
751 );
752
753 __PACKAGE__->register_method(  
754     api_name    => "open-ils.vandelay.auth_record.list.import",
755     method      => 'import_record_list',
756     api_level   => 1,
757     argc        => 2,
758     stream      => 1,
759     record_type => 'auth'
760 );
761
762 sub import_record_list {
763     my($self, $conn, $auth, $rec_ids, $args) = @_;
764     my $e = new_editor(authtoken => $auth, xact => 1);
765     return $e->die_event unless $e->checkauth;
766     $args ||= {};
767     my $err = import_record_list_impl($self, $conn, $rec_ids, $e->requestor, $args);
768     try {$e->rollback} otherwise {}; 
769     return $err if $err;
770     return {complete => 1};
771 }
772
773
774 __PACKAGE__->register_method(  
775     api_name    => "open-ils.vandelay.bib_queue.import",
776     method      => 'import_queue',
777     api_level   => 1,
778     argc        => 2,
779     stream      => 1,
780     max_chunk_size => 0,
781     record_type => 'bib',
782     signature => {
783         desc => q/
784             Attempts to import all non-imported records for the selected queue.
785             Will also attempt import of all non-imported items.
786         /
787     }
788 );
789
790 __PACKAGE__->register_method(  
791     api_name    => "open-ils.vandelay.auth_queue.import",
792     method      => 'import_queue',
793     api_level   => 1,
794     argc        => 2,
795     stream      => 1,
796     max_chunk_size => 0,
797     record_type => 'auth'
798 );
799
800 sub import_queue {
801     my($self, $conn, $auth, $q_id, $options) = @_;
802     my $e = new_editor(authtoken => $auth, xact => 1);
803     return $e->die_event unless $e->checkauth;
804     $options ||= {};
805     my $type = $self->{record_type};
806     my $class = ($type eq 'bib') ? 'vqbr' : 'vqar';
807
808     # First, collect the not-yet-imported records
809     my $query = {queue => $q_id, import_time => undef};
810     my $search = ($type eq 'bib') ? 
811         'search_vandelay_queued_bib_record' : 
812         'search_vandelay_queued_authority_record';
813     my $rec_ids = $e->$search($query, {idlist => 1});
814
815     # Now add any imported records that have un-imported items
816
817     if($type eq 'bib') {
818         my $item_recs = $e->json_query({
819             select => {vqbr => ['id']},
820             from => {vqbr => 'vii'},
821             where => {
822                 '+vqbr' => {
823                     queue => $q_id,
824                     import_time => {'!=' => undef}
825                 },
826                 '+vii' => {import_time => undef}
827             },
828             distinct => 1
829         });
830         push(@$rec_ids, map {$_->{id}} @$item_recs);
831     }
832
833     my $err = import_record_list_impl($self, $conn, $rec_ids, $e->requestor, $options);
834     try {$e->rollback} otherwise {}; # only using this to make the read authoritative -- don't die from it
835     return $err if $err;
836     return {complete => 1};
837 }
838
839 # returns a list of queued record IDs for a given queue that 
840 # have at least one entry in the match table
841 # XXX DEPRECATED?
842 sub queued_records_with_matches {
843     my($e, $type, $q_id, $limit, $offset, $filter) = @_;
844
845     my $match_class = 'vbm';
846     my $rec_class = 'vqbr';
847     if($type eq 'auth') {
848         $match_class = 'vam';
849          $rec_class = 'vqar';
850     }
851
852     $filter ||= {};
853     $filter->{queue} = $q_id;
854
855     my $query = {
856         distinct => 1, 
857         select => {$match_class => ['queued_record']}, 
858         from => {
859             $match_class => {
860                 $rec_class => {
861                     field => 'id',
862                     fkey => 'queued_record',
863                     filter => $filter,
864                 }
865             }
866         }
867     };        
868
869     if($limit or defined $offset) {
870         $limit ||= 20;
871         $offset ||= 0;
872         $query->{limit} = $limit;
873         $query->{offset} = $offset;
874     }
875
876     my $data = $e->json_query($query);
877     return [ map {$_->{queued_record}} @$data ];
878 }
879
880
881 # cache of import item org unit settings.  
882 # used in apply_import_item_defaults() below, 
883 # but reset on each call to import_record_list_impl()
884 my %item_defaults_cache;
885
886 sub import_record_list_impl {
887     my($self, $conn, $rec_ids, $requestor, $args) = @_;
888
889     my $overlay_map = $args->{overlay_map} || {};
890     my $type = $self->{record_type};
891     my %queues;
892     %item_defaults_cache = ();
893
894     my $report_args = {
895         progress => 1,
896         step => 1,
897         conn => $conn,
898         total => scalar(@$rec_ids),
899         report_all => $$args{report_all}
900     };
901
902     $conn->max_chunk_count(1) if $$args{report_all};
903
904     my $auto_overlay_exact = $$args{auto_overlay_exact};
905     my $auto_overlay_1match = $$args{auto_overlay_1match};
906     my $auto_overlay_best = $$args{auto_overlay_best_match};
907     my $match_quality_ratio = $$args{match_quality_ratio};
908     my $merge_profile = $$args{merge_profile};
909     my $ft_merge_profile = $$args{fall_through_merge_profile};
910     my $bib_source = $$args{bib_source};
911     my $import_no_match = $$args{import_no_match};
912     my $strip_grps = $$args{strip_field_groups}; # bib-only
913
914     my $overlay_func = 'vandelay.overlay_bib_record';
915     my $auto_overlay_func = 'vandelay.auto_overlay_bib_record';
916     my $auto_overlay_best_func = 'vandelay.auto_overlay_bib_record_with_best'; # XXX bib-only
917     my $retrieve_func = 'retrieve_vandelay_queued_bib_record';
918     my $update_func = 'update_vandelay_queued_bib_record';
919     my $search_func = 'search_vandelay_queued_bib_record';
920     my $retrieve_queue_func = 'retrieve_vandelay_bib_queue';
921     my $update_queue_func = 'update_vandelay_bib_queue';
922     my $delete_queue_func = 'delete_vandelay_bib_queue';
923     my $rec_class = 'vqbr';
924
925     my $editor = new_editor();
926
927     my %bib_sources;
928     my $sources = $editor->search_config_bib_source({id => {'!=' => undef}});
929     $bib_sources{$_->id} = $_->source for @$sources;
930
931     if($type eq 'auth') {
932         $overlay_func =~ s/bib/auth/o;
933         $auto_overlay_func = s/bib/auth/o;
934         $retrieve_func =~ s/bib/authority/o;
935         $retrieve_queue_func =~ s/bib/authority/o;
936         $update_queue_func =~ s/bib/authority/o;
937         $update_func =~ s/bib/authority/o;
938         $search_func =~ s/bib/authority/o;
939         $delete_queue_func =~ s/bib/authority/o;
940         $rec_class = 'vqar';
941     }
942
943     my $new_rec_perm_cache;
944     my @success_rec_ids;
945     for my $rec_id (@$rec_ids) {
946
947         my $error = 0;
948         my $overlay_target = $overlay_map->{$rec_id};
949
950         my $e = new_editor(xact => 1);
951         $e->requestor($requestor);
952
953         $$report_args{e} = $e;
954         $$report_args{evt} = undef;
955         $$report_args{import_error} = undef;
956         $$report_args{no_import} = 0;
957
958         my $rec = $e->$retrieve_func([
959             $rec_id,
960             {   flesh => 1,
961                 flesh_fields => { $rec_class => ['matches']},
962             }
963         ]);
964
965         unless($rec) {
966             $$report_args{evt} = $e->event;
967             finish_rec_import_attempt($report_args);
968             next;
969         }
970
971         if($rec->import_time) {
972             # if the record is already imported, that means it may have 
973             # un-imported copies.  Add to success list for later processing.
974             push(@success_rec_ids, $rec_id);
975             $e->rollback;
976             next;
977         }
978
979         $$report_args{rec} = $rec;
980         $queues{$rec->queue} = 1;
981
982         my $record;
983         my $imported = 0;
984
985         if ($type eq 'bib') {
986             # strip configured / selected MARC tags from inbound records
987
988             my $marcdoc = XML::LibXML->new->parse_string($rec->marc);
989             $rec->marc($U->strip_marc_fields($e, $marcdoc, $strip_grps));
990
991             unless ($e->$update_func($rec)) {
992                 $$report_args{evt} = $e->die_event;
993                 finish_rec_import_attempt($report_args);
994                 next;
995             }
996         }
997
998         if(defined $overlay_target) {
999             # Caller chose an explicit overlay target
1000
1001             my $res = $e->json_query(
1002                 {
1003                     from => [
1004                         $overlay_func,
1005                         $rec_id,
1006                         $overlay_target, 
1007                         $merge_profile
1008                     ]
1009                 }
1010             );
1011
1012             if($res and ($res = $res->[0])) {
1013
1014                 if($res->{$overlay_func} eq 't') {
1015                     $logger->info("vl: $type direct overlay succeeded for queued rec ".
1016                         "$rec_id and overlay target $overlay_target");
1017                     $imported = 1;
1018                     $rec->imported_as($overlay_target);
1019                 }
1020
1021             } else {
1022                 $error = 1;
1023                 $logger->error("vl: Error attempting overlay with func=$overlay_func, profile=$merge_profile, record=$rec_id");
1024             }
1025
1026         } else {
1027
1028             if($auto_overlay_1match) { # overlay if there is exactly 1 match
1029
1030                 my %match_recs = map { $_->eg_record => 1 } @{$rec->matches};
1031
1032                 if( scalar(keys %match_recs) == 1) { # all matches point to the same record
1033
1034                     ($imported, $error, $rec) = try_auto_overlay(
1035                         $e, $type,
1036                         $report_args, 
1037                         $auto_overlay_best_func,
1038                         $retrieve_func,
1039                         $rec_class,
1040                         $rec_id, 
1041                         $match_quality_ratio, 
1042                         $merge_profile, 
1043                         $ft_merge_profile
1044                     );
1045                 }
1046             }
1047
1048             if(!$imported and !$error and $auto_overlay_exact and scalar(@{$rec->matches}) == 1 ) {
1049                 
1050                 # caller says to overlay if there is an /exact/ match
1051                 # $auto_overlay_func only proceeds and returns true on exact matches
1052
1053                 my $res = $e->json_query(
1054                     {
1055                         from => [
1056                             $auto_overlay_func,
1057                             $rec_id,
1058                             $merge_profile
1059                         ]
1060                     }
1061                 );
1062
1063                 if($res and ($res = $res->[0])) {
1064
1065                     if($res->{$auto_overlay_func} eq 't') {
1066                         $logger->info("vl: $type auto-overlay succeeded for queued rec $rec_id");
1067                         $imported = 1;
1068
1069                         # re-fetch the record to pick up the imported_as value from the DB
1070                         $$report_args{rec} = $rec = $e->$retrieve_func([
1071                             $rec_id, {flesh => 1, flesh_fields => {$rec_class => ['matches']}}]);
1072
1073                     } else {
1074                         $logger->info("vl: $type auto-overlay failed for queued rec $rec_id");
1075                     }
1076
1077                 } else {
1078                     $error = 1;
1079                     $logger->error("vl: Error attempting overlay with func=$auto_overlay_func, profile=$merge_profile, record=$rec_id");
1080                 }
1081             }
1082
1083             if(!$imported and !$error and $auto_overlay_best and scalar(@{$rec->matches}) > 0 ) {
1084                 # caller says to overlay the best match
1085
1086                 ($imported, $error, $rec) = try_auto_overlay(
1087                     $e, $type,
1088                     $report_args, 
1089                     $auto_overlay_best_func,
1090                     $retrieve_func,
1091                     $rec_class,
1092                     $rec_id, 
1093                     $match_quality_ratio, 
1094                     $merge_profile, 
1095                     $ft_merge_profile
1096                 );
1097             }
1098
1099             if(!$imported and !$error and $import_no_match and scalar(@{$rec->matches}) == 0) {
1100             
1101                 # No overlay / merge occurred.  Do a traditional record import by creating a new record
1102
1103                 if (!$new_rec_perm_cache) {
1104                     $new_rec_perm_cache = {};
1105
1106                     # all users creating new records are required to have the basic permission.
1107                     # if the client requests, we can enforce extra permissions for creating new records.
1108                     # for speed, check the permissions the first time then cache the result.
1109
1110                     my $perm = ($type eq 'bib') ? 'IMPORT_MARC' : 'IMPORT_AUTHORITY_MARC';
1111                     my $xperm = $$args{new_rec_perm};
1112                     my $rec_ou = $e->requestor->ws_ou;
1113
1114                     $new_rec_perm_cache->{evt} = $e->die_event
1115                         if !$e->allowed($perm, $rec_ou) || ($xperm and !$e->allowed($xperm, $rec_ou));
1116                 }
1117
1118                 if ($new_rec_perm_cache->{evt}) {
1119
1120                     # a cached event won't roll back the transaction (a la die_event), but
1121                     # the transaction will get rolled back in finish_rec_import_attempt() below
1122                     $$report_args{evt} = $new_rec_perm_cache->{evt};
1123                     $$report_args{import_error} = 'import.record.perm_failure';
1124
1125                 } else { # perm checks succeeded
1126
1127                     $logger->info("vl: creating new $type record for queued record $rec_id");
1128
1129                     if ($type eq 'bib') {
1130
1131                         $record = OpenILS::Application::Cat::BibCommon->biblio_record_xml_import(
1132                             $e, $rec->marc, $bib_sources{$rec->bib_source}, undef, 1);
1133
1134                     } else { # authority record
1135
1136                         $record = OpenILS::Application::Cat::AuthCommon->import_authority_record($e, $rec->marc); #$source);
1137                     }
1138
1139                     if($U->event_code($record)) {
1140                         $$report_args{import_error} = 'import.duplicate.tcn' 
1141                             if $record->{textcode} eq 'OPEN_TCN_NOT_FOUND';
1142                         $$report_args{evt} = $record;
1143
1144                     } else {
1145
1146                         $logger->info("vl: successfully imported new $type record");
1147                         $rec->imported_as($record->id);
1148                         $imported = 1;
1149                     }
1150                 }
1151             }
1152         }
1153
1154         if($imported) {
1155
1156             $rec->import_time('now');
1157             $rec->clear_import_error;
1158             $rec->clear_error_detail;
1159
1160             if($e->$update_func($rec)) {
1161
1162                 if($type eq 'bib') {
1163
1164                     # see if this record is linked from an acq record.
1165                     my $li = $e->search_acq_lineitem(
1166                         {queued_record => $rec->id, state => {'!=' => 'canceled'}})->[0];
1167
1168                     if ($li) { 
1169                         # if so, update the acq lineitem to point to the imported record
1170                         $li->eg_bib_id($rec->imported_as);
1171                         $$report_args{evt} = $e->die_event unless $e->update_acq_lineitem($li);
1172                     }
1173                 }
1174
1175                 push @success_rec_ids, $rec_id;
1176                 finish_rec_import_attempt($report_args);
1177
1178             } else {
1179                 $imported = 0;
1180             }
1181         }
1182
1183         if(!$imported) {
1184             $logger->info("vl: record $rec_id was not imported");
1185             $$report_args{evt} = $e->event unless $$report_args{evt};
1186             $$report_args{no_import} = 1;
1187             finish_rec_import_attempt($report_args);
1188         }
1189     }
1190
1191     # see if we need to mark any queues as complete
1192     for my $q_id (keys %queues) {
1193
1194         my $e = new_editor(xact => 1);
1195         my $remaining = $e->$search_func(
1196             [{queue => $q_id, import_time => undef}, {limit =>1}], {idlist => 1});
1197
1198         unless(@$remaining) {
1199             my $queue = $e->$retrieve_queue_func($q_id);
1200             unless($U->is_true($queue->complete)) {
1201                 $queue->complete('t');
1202                 $e->$update_queue_func($queue) or return $e->die_event;
1203                 $e->commit;
1204                 next;
1205             }
1206         } 
1207         $e->rollback;
1208     }
1209
1210     # import the copies
1211     import_record_asset_list_impl($conn, \@success_rec_ids, $requestor, $args) if @success_rec_ids;
1212
1213     $conn->respond({total => $$report_args{total}, progress => $$report_args{progress}});
1214     return undef;
1215 }
1216
1217
1218 sub try_auto_overlay {
1219     my $e = shift;
1220     my $type = shift;
1221     my $report_args = shift;
1222     my $overlay_func  = shift;
1223     my $retrieve_func = shift; 
1224     my $rec_class = shift;
1225     my $rec_id  = shift;
1226     my $match_quality_ratio = shift;
1227     my $merge_profile  = shift;
1228     my $ft_merge_profile = shift;
1229
1230     my $imported = 0;
1231     my $error = 0;
1232
1233     # Find the best match and overlay if the quality ratio allows it.
1234     my $res = $e->json_query(
1235         {
1236             from => [
1237                 $overlay_func,
1238                 $rec_id, 
1239                 $merge_profile,
1240                 $match_quality_ratio
1241             ]
1242         }
1243     );
1244
1245     if($res and ($res = $res->[0])) {
1246
1247         if($res->{$overlay_func} eq 't') {
1248
1249             # first attempt succeeded
1250             $imported = 1;
1251
1252         } else {
1253
1254             # quality-limited merge failed with insufficient quality.  If there is a 
1255             # fall-through merge profile, re-do the merge with the alternate profile
1256             # and no quality restriction.
1257
1258             if($ft_merge_profile and $match_quality_ratio > 0) {
1259
1260                 $logger->info("vl: $type auto-merge failed with profile $merge_profile; ".
1261                     "re-merging with fall-through profile $ft_merge_profile");
1262
1263                 my $res = $e->json_query(
1264                     {
1265                         from => [
1266                             $overlay_func,
1267                             $rec_id, 
1268                             $ft_merge_profile,
1269                             0 # minimum quality not required
1270                         ]
1271                     }
1272                 );
1273
1274                 if($res and ($res = $res->[0])) {
1275
1276                     if($res->{$overlay_func} eq 't') {
1277
1278                         # second attempt succeeded
1279                         $imported = 1;
1280
1281                     } else {
1282
1283                         # failed to merge on second attempt
1284                         $logger->info("vl: $type auto-merge with fall-through failed for queued rec $rec_id");
1285                     }
1286                 } else {
1287                     
1288                     # second attempt died 
1289                     $error = 1;
1290                     $logger->error("vl: Error attempting overlay with func=$overlay_func, profile=$merge_profile, record=$rec_id");
1291                 }
1292
1293             } else { 
1294
1295                 # failed to merge on first attempt, no fall-through was provided
1296                 $$report_args{import_error} = 'overlay.record.quality' if $match_quality_ratio > 0;
1297                 $logger->info("vl: $type auto-merge failed for queued rec $rec_id");
1298             }
1299         }
1300
1301     } else {
1302
1303         # first attempt died 
1304         $error = 1;
1305         $logger->error("vl: Error attempting overlay with func=$overlay_func, profile=$merge_profile, record=$rec_id");
1306     }
1307
1308     if($imported) {
1309
1310         # at least 1 of the attempts succeeded
1311         $logger->info("vl: $type auto-merge succeeded for queued rec $rec_id");
1312
1313         # re-fetch the record to pick up the imported_as value from the DB
1314         $$report_args{rec} = $e->$retrieve_func([
1315             $rec_id, {flesh => 1, flesh_fields => {$rec_class => ['matches']}}]);
1316     }
1317
1318     return ($imported, $error, $$report_args{rec});
1319 }
1320
1321
1322 # tracks any import errors, commits the current xact, responds to the client
1323 sub finish_rec_import_attempt {
1324     my $args = shift;
1325     my $evt = $$args{evt};
1326     my $rec = $$args{rec};
1327     my $e = $$args{e};
1328
1329     my $error = $$args{import_error};
1330     $error = 'general.unknown' if $evt and not $error;
1331
1332     # error tracking
1333     if($rec) {
1334
1335         if($error or $evt) {
1336             # failed import
1337             # since an error occurred, there's no guarantee the transaction wasn't 
1338             # rolled back.  force a rollback and create a new editor.
1339             $e->rollback;
1340             $e = new_editor(xact => 1);
1341             $rec->import_error($error);
1342
1343             if($evt) {
1344                 my $detail = sprintf("%s : %s", $evt->{textcode}, substr($evt->{desc}, 0, 140));
1345                 $rec->error_detail($detail);
1346             }
1347
1348             my $method = 'update_vandelay_queued_bib_record';
1349             $method =~ s/bib/authority/ if $$args{type} eq 'auth';
1350             $e->$method($rec) and $e->commit or $e->rollback;
1351
1352         } else {
1353             # commit the successful import
1354             $e->commit;
1355         }
1356
1357     } else {
1358         # requested queued record was not found
1359         $e->rollback;
1360     }
1361         
1362     # respond to client
1363     if($$args{report_all} or ($$args{progress} % $$args{step}) == 0) {
1364         $$args{conn}->respond({
1365             total => $$args{total}, 
1366             progress => $$args{progress}, 
1367             imported => ($rec) ? $rec->id : undef,
1368             import_error => $error,
1369             no_import => $$args{no_import},
1370             err_event => $evt
1371         });
1372         $$args{step} *= 2 unless $$args{step} == 256;
1373     }
1374
1375     $$args{progress}++;
1376 }
1377
1378
1379
1380
1381
1382 __PACKAGE__->register_method(  
1383     api_name    => "open-ils.vandelay.bib_queue.owner.retrieve",
1384     method      => 'owner_queue_retrieve',
1385     api_level   => 1,
1386     argc        => 2,
1387     stream      => 1,
1388     record_type => 'bib'
1389 );
1390 __PACKAGE__->register_method(  
1391     api_name    => "open-ils.vandelay.authority_queue.owner.retrieve",
1392     method      => 'owner_queue_retrieve',
1393     api_level   => 1,
1394     argc        => 2,
1395     stream      => 1,
1396     record_type => 'auth'
1397 );
1398
1399 sub owner_queue_retrieve {
1400     my($self, $conn, $auth, $owner_id, $filters) = @_;
1401     my $e = new_editor(authtoken => $auth, xact => 1);
1402     return $e->die_event unless $e->checkauth;
1403     $owner_id = $e->requestor->id; # XXX add support for viewing other's queues?
1404     my $queues;
1405     $filters ||= {};
1406     my $search = {owner => $owner_id};
1407     $search->{$_} = $filters->{$_} for keys %$filters;
1408
1409     if($self->{record_type} eq 'bib') {
1410         $queues = $e->search_vandelay_bib_queue(
1411             [$search, {order_by => {vbq => 'evergreen.lowercase(name)'}}]);
1412     } else {
1413         $queues = $e->search_vandelay_authority_queue(
1414             [$search, {order_by => {vaq => 'evergreen.lowercase(name)'}}]);
1415     }
1416     $conn->respond($_) for @$queues;
1417     $e->rollback;
1418     return undef;
1419 }
1420
1421 __PACKAGE__->register_method(  
1422     api_name    => "open-ils.vandelay.bib_queue.delete",
1423     method      => "delete_queue",
1424     api_level   => 1,
1425     argc        => 2,
1426     record_type => 'bib'
1427 );            
1428 __PACKAGE__->register_method(  
1429     api_name    => "open-ils.vandelay.auth_queue.delete",
1430     method      => "delete_queue",
1431     api_level   => 1,
1432     argc        => 2,
1433     record_type => 'auth'
1434 );  
1435
1436 sub delete_queue {
1437     my($self, $conn, $auth, $q_id) = @_;
1438     my $e = new_editor(xact => 1, authtoken => $auth);
1439     return $e->die_event unless $e->checkauth;
1440     if($self->{record_type} eq 'bib') {
1441         return $e->die_event unless $e->allowed('CREATE_BIB_IMPORT_QUEUE');
1442         my $queue = $e->retrieve_vandelay_bib_queue($q_id)
1443             or return $e->die_event;
1444         $e->delete_vandelay_bib_queue($queue)
1445             or return $e->die_event;
1446     } else {
1447            return $e->die_event unless $e->allowed('CREATE_AUTHORITY_IMPORT_QUEUE');
1448         my $queue = $e->retrieve_vandelay_authority_queue($q_id)
1449             or return $e->die_event;
1450         $e->delete_vandelay_authority_queue($queue)
1451             or return $e->die_event;
1452     }
1453     $e->commit;
1454     return 1;
1455 }
1456
1457
1458 __PACKAGE__->register_method(  
1459     api_name    => "open-ils.vandelay.queued_bib_record.html",
1460     method      => 'queued_record_html',
1461     api_level   => 1,
1462     argc        => 2,
1463     stream      => 1,
1464     record_type => 'bib'
1465 );
1466 __PACKAGE__->register_method(  
1467     api_name    => "open-ils.vandelay.queued_authority_record.html",
1468     method      => 'queued_record_html',
1469     api_level   => 1,
1470     argc        => 2,
1471     stream      => 1,
1472     record_type => 'auth'
1473 );
1474
1475 sub queued_record_html {
1476     my($self, $conn, $auth, $rec_id) = @_;
1477     my $e = new_editor(xact=>1,authtoken => $auth);
1478     return $e->die_event unless $e->checkauth;
1479     my $rec;
1480     if($self->{record_type} eq 'bib') {
1481         $rec = $e->retrieve_vandelay_queued_bib_record($rec_id)
1482             or return $e->die_event;
1483     } else {
1484         $rec = $e->retrieve_vandelay_queued_authority_record($rec_id)
1485             or return $e->die_event;
1486     }
1487
1488     $e->rollback;
1489     return $U->simplereq(
1490         'open-ils.search',
1491         'open-ils.search.biblio.record.html', undef, 1, $rec->marc);
1492 }
1493
1494
1495 __PACKAGE__->register_method(  
1496     api_name    => "open-ils.vandelay.bib_queue.summary.retrieve", 
1497     method      => 'retrieve_queue_summary',
1498     api_level   => 1,
1499     argc        => 2,
1500     stream      => 1,
1501     record_type => 'bib'
1502 );
1503 __PACKAGE__->register_method(  
1504     api_name    => "open-ils.vandelay.auth_queue.summary.retrieve",
1505     method      => 'retrieve_queue_summary',
1506     api_level   => 1,
1507     argc        => 2,
1508     stream      => 1,
1509     record_type => 'auth'
1510 );
1511
1512 sub retrieve_queue_summary {
1513     my($self, $conn, $auth, $queue_id) = @_;
1514     my $e = new_editor(xact=>1, authtoken => $auth);
1515     return $e->die_event unless $e->checkauth;
1516
1517     my $queue;
1518     my $type = $self->{record_type};
1519     if($type eq 'bib') {
1520         $queue = $e->retrieve_vandelay_bib_queue($queue_id)
1521             or return $e->die_event;
1522     } else {
1523         $queue = $e->retrieve_vandelay_authority_queue($queue_id)
1524             or return $e->die_event;
1525     }
1526
1527     my $evt = check_queue_perms($e, $type, $queue);
1528     return $evt if $evt;
1529
1530     my $search = 'search_vandelay_queued_bib_record';
1531     $search =~ s/bib/authority/ if $type ne 'bib';
1532
1533     my $summary = {
1534         queue => $queue,
1535         total => scalar(@{$e->$search({queue => $queue_id}, {idlist=>1})}),
1536         imported => scalar(@{$e->$search({queue => $queue_id, import_time => {'!=' => undef}}, {idlist=>1})}),
1537     };
1538
1539     my $class = ($type eq 'bib') ? 'vqbr' : 'vqar';
1540     $summary->{rec_import_errors} = $e->json_query({
1541         select => {$class => [{alias => 'count', column => 'id', transform => 'count', aggregate => 1}]},
1542         from => $class,
1543         where => {queue => $queue_id, import_error => {'!=' => undef}}
1544     })->[0]->{count};
1545
1546     if($type eq 'bib') {
1547         
1548         # count of all items attached to records in the queue in question
1549         my $query = {
1550             select => {vii => [{alias => 'count', column => 'id', transform => 'count', aggregate => 1}]},
1551             from => 'vii',
1552             where => {
1553                 record => {
1554                     in => {
1555                         select => {vqbr => ['id']},
1556                         from => 'vqbr',
1557                         where => {queue => $queue_id}
1558                     }
1559                 }
1560             }
1561         };
1562         $summary->{total_items} = $e->json_query($query)->[0]->{count};
1563
1564         # count of items we attempted to import, but errored, attached to records in the queue in question
1565         $query->{where}->{import_error} = {'!=' => undef};
1566         $summary->{item_import_errors} = $e->json_query($query)->[0]->{count};
1567
1568         # count of items we successfully imported attached to records in the queue in question
1569         delete $query->{where}->{import_error};
1570         $query->{where}->{import_time} = {'!=' => undef};
1571         $summary->{total_items_imported} = $e->json_query($query)->[0]->{count};
1572     }
1573
1574     return $summary;
1575 }
1576
1577 # --------------------------------------------------------------------------------
1578 # Given a list of queued record IDs, imports all items attached to those records
1579 # --------------------------------------------------------------------------------
1580 sub import_record_asset_list_impl {
1581     my($conn, $rec_ids, $requestor, $args) = @_;
1582
1583     my $roe = new_editor(xact=> 1, requestor => $requestor);
1584
1585     # for speed, filter out any records have not been 
1586     # imported or have no import items to load
1587     $rec_ids = $roe->json_query({
1588         select => {vqbr => ['id']},
1589         from => {vqbr => 'vii'},
1590         where => {'+vqbr' => {
1591             id => $rec_ids,
1592             import_time => {'!=' => undef}
1593         }},
1594         distinct => 1
1595     });
1596     $rec_ids = [map {$_->{id}} @$rec_ids];
1597
1598     my $report_args = {
1599         conn => $conn,
1600         total => scalar(@$rec_ids),
1601         step => 1, # how often to respond
1602         progress => 1,
1603         in_count => 0,
1604     };
1605
1606     for my $rec_id (@$rec_ids) {
1607         my $rec = $roe->retrieve_vandelay_queued_bib_record($rec_id);
1608         my $item_ids = $roe->search_vandelay_import_item(
1609             {record => $rec->id, import_error => undef}, 
1610             {idlist=>1}
1611         );
1612
1613         # if any items have no call_number label and a value should be
1614         # applied automatically (via org settings), we want to use the same
1615         # call number label for every copy per org per record.
1616         my $auto_callnumber = {};
1617
1618         my $opp_acq_copy_overlay = $args->{opp_acq_copy_overlay};
1619         my @overlaid_copy_ids;
1620         for my $item_id (@$item_ids) {
1621             my $e = new_editor(requestor => $requestor, xact => 1);
1622             my $item = $e->retrieve_vandelay_import_item($item_id);
1623             my ($copy, $vol, $evt);
1624
1625             $$report_args{e} = $e;
1626             $$report_args{evt} = undef;
1627             $$report_args{import_item} = $item;
1628             $$report_args{import_error} = undef;
1629
1630             if (my $copy_id = $item->internal_id) { # assignment
1631                 # copy matches an existing copy.  Overlay instead of create.
1632
1633                 $logger->info("vl: performing copy overlay for internal_id=$copy_id");
1634
1635                 my $qt = $e->json_query({
1636                     select => {vbq => ['queue_type']},
1637                     from => {vqbr => 'vbq'},
1638                     where => {'+vqbr' => {id => $rec_id}}
1639                 })->[0]->{queue_type};
1640
1641                 if ($qt eq 'acq') {
1642                     # internal_id for ACQ queues refers to acq.lineitem_detail.id
1643                     # pull the real copy id from the acq LID
1644
1645                     my $lid = $e->retrieve_acq_lineitem_detail($copy_id);
1646                     if (!$lid) {
1647                         $$report_args{evt} = $e->die_event;
1648                         respond_with_status($report_args);
1649                         next;
1650                     }
1651                     $copy_id = $lid->eg_copy_id;
1652                     $logger->info("vl: performing ACQ copy overlay for copy $copy_id");
1653                 }
1654
1655                 $copy = $e->search_asset_copy([
1656                     {id => $copy_id, deleted => 'f'},
1657                     {flesh => 1, flesh_fields => {acp => ['call_number']}}
1658                 ])->[0];
1659
1660                 if (!$copy) {
1661                     $$report_args{evt} = $e->die_event;
1662                     respond_with_status($report_args);
1663                     next;
1664                 }
1665
1666                 # prevent update of unrelated copies
1667                 if ($copy->call_number->record != $rec->imported_as) {
1668                     $logger->info("vl: attempt to overlay unrelated copy=$copy_id; rec=".$rec->imported_as);
1669
1670                     $evt = OpenILS::Event->new('INVALID_IMPORT_COPY_ID', 
1671                         note => 'Cannot overlay copies for unlinked bib',
1672                         bre => $rec->imported_as, 
1673                         copy_id => $copy_id
1674                     );
1675                     $$report_args{evt} = $evt;
1676                     respond_with_status($report_args);
1677                     next;
1678                 }
1679             } elsif ($opp_acq_copy_overlay) { # we are going to "opportunistically" overlay received, in-process acq copies
1680                 # recv_time should never be null if the copy status is
1681                 # "In Process", so that is just a double-check
1682                 my $query = [
1683                     {
1684                         "recv_time" => {"!=" => undef},
1685                         "owning_lib" => $item->owning_lib,
1686                         "+acn" => {"record" => $rec->imported_as},
1687                         "+acp" => {"status" => OILS_COPY_STATUS_IN_PROCESS}
1688                     },
1689                     {
1690                         "join" => {
1691                             "acp" => {
1692                                 "join" => "acn"
1693                             }
1694                         },
1695                         "flesh" => 2,
1696                         "flesh_fields" => {
1697                             "acqlid" => ["eg_copy_id"],
1698                             "acp" => ["call_number"]
1699                         }
1700                     }
1701                 ];
1702                 # don't overlay the same copy twice
1703                 $query->[0]{"+acp"}{"id"} = {"not in" => \@overlaid_copy_ids} if @overlaid_copy_ids;
1704                 if (my $acqlid = $e->search_acq_lineitem_detail($query)->[0]) {
1705                     $copy = $acqlid->eg_copy_id;
1706                     push(@overlaid_copy_ids, $copy->id);
1707                 }
1708             }
1709
1710             if ($copy) { # we found a copy to overlay
1711
1712                 # overlaying copies requires an extra permission
1713                 if (!$e->allowed("IMPORT_OVERLAY_COPY", $copy->call_number->owning_lib)) {
1714                     $$report_args{evt} = $e->die_event;
1715                     respond_with_status($report_args);
1716                     next;
1717                 }
1718
1719                 # are we updating the call-number?
1720                 if ($item->call_number and $item->call_number ne $copy->call_number->label) {
1721
1722                     my $count = $e->json_query({
1723                         select => {acp => [{
1724                             alias => 'count', 
1725                             column => 'id', 
1726                             transform => 'count', 
1727                             aggregate => 1
1728                         }]},
1729                         from => 'acp',
1730                         where => {
1731                             deleted => 'f',
1732                             call_number => $copy->call_number->id
1733                         }
1734                     })->[0]->{count};
1735
1736                     if ($count == 1) {
1737                         # if this is the only copy attached to this 
1738                         # callnumber, just update the callnumber
1739
1740                         $logger->info("vl: updating callnumber label in copy overlay");
1741
1742                         $copy->call_number->label($item->call_number);
1743                         if (!$e->update_asset_call_number($copy->call_number)) {
1744                             $$report_args{evt} = $e->die_event;
1745                             respond_with_status($report_args);
1746                             next;
1747                         }
1748
1749                     } else {
1750
1751                         # otherwise, move the copy to a new/existing 
1752                         # call-number with the given label/owner
1753                         # note that overlay does not allow the owning_lib 
1754                         # to be changed.  Should it?
1755
1756                         $logger->info("vl: moving copy to new callnumber in copy overlay");
1757
1758                         ($vol, $evt) =
1759                             OpenILS::Application::Cat::AssetCommon->find_or_create_volume(
1760                                 $e, $item->call_number, 
1761                                 $copy->call_number->record, 
1762                                 $copy->call_number->owning_lib
1763                             );
1764
1765                         if($evt) {
1766                             $$report_args{evt} = $evt;
1767                             respond_with_status($report_args);
1768                             next;
1769                         }
1770
1771                         $copy->call_number($vol);
1772                     }
1773                 } # cn-update
1774
1775                 # for every field that has a non-'' value, overlay the copy value
1776                 foreach (qw/ barcode location circ_lib status 
1777                     circulate deposit deposit_amount ref holdable 
1778                     price circ_as_type alert_message opac_visible circ_modifier/) {
1779
1780                     my $val = $item->$_();
1781                     $copy->$_($val) if defined $val and $val ne '';
1782                 }
1783
1784                 # de-flesh for update
1785                 $copy->call_number($copy->call_number->id);
1786                 $copy->ischanged(1);
1787
1788                 $evt = OpenILS::Application::Cat::AssetCommon->
1789                     update_fleshed_copies($e, {all => 1}, undef, [$copy]);
1790
1791                 if($evt) {
1792                     $$report_args{evt} = $evt;
1793                     respond_with_status($report_args);
1794                     next;
1795                 }
1796
1797             } else { 
1798
1799                 # Creating a new copy
1800                 $logger->info("vl: creating new copy in import");
1801
1802                 # appply defaults values from org settings as needed
1803                 # if $auto_callnumber is unset, it will be set within
1804                 apply_import_item_defaults($e, $item, $auto_callnumber);
1805
1806                 # --------------------------------------------------------------------------------
1807                 # Find or create the volume
1808                 # --------------------------------------------------------------------------------
1809                 my ($vol, $evt) =
1810                     OpenILS::Application::Cat::AssetCommon->find_or_create_volume(
1811                         $e, $item->call_number, $rec->imported_as, $item->owning_lib);
1812
1813                 if($evt) {
1814                     $$report_args{evt} = $evt;
1815                     respond_with_status($report_args);
1816                     next;
1817                 }
1818
1819                 # --------------------------------------------------------------------------------
1820                 # Create the new copy
1821                 # --------------------------------------------------------------------------------
1822                 $copy = Fieldmapper::asset::copy->new;
1823                 $copy->loan_duration(2);
1824                 $copy->fine_level(2);
1825                 $copy->barcode($item->barcode);
1826                 $copy->location($item->location);
1827                 $copy->circ_lib($item->circ_lib || $item->owning_lib);
1828                 $copy->status( defined($item->status) ? $item->status : OILS_COPY_STATUS_IN_PROCESS );
1829                 $copy->circulate($item->circulate);
1830                 $copy->deposit($item->deposit);
1831                 $copy->deposit_amount($item->deposit_amount);
1832                 $copy->ref($item->ref);
1833                 $copy->holdable($item->holdable);
1834                 $copy->price($item->price);
1835                 $copy->circ_as_type($item->circ_as_type);
1836                 $copy->alert_message($item->alert_message);
1837                 $copy->opac_visible($item->opac_visible);
1838                 $copy->circ_modifier($item->circ_modifier);
1839
1840                 # --------------------------------------------------------------------------------
1841                 # Check for dupe barcode
1842                 # --------------------------------------------------------------------------------
1843                 if($evt = OpenILS::Application::Cat::AssetCommon->create_copy($e, $vol, $copy)) {
1844                     $$report_args{evt} = $evt;
1845                     $$report_args{import_error} = 'import.item.duplicate.barcode'
1846                         if $evt->{textcode} eq 'ITEM_BARCODE_EXISTS';
1847                     respond_with_status($report_args);
1848                     next;
1849                 }
1850
1851                 # --------------------------------------------------------------------------------
1852                 # create copy notes
1853                 # --------------------------------------------------------------------------------
1854                 $evt = OpenILS::Application::Cat::AssetCommon->create_copy_note(
1855                     $e, $copy, '', $item->pub_note, 1) if $item->pub_note;
1856
1857                 if($evt) {
1858                     $$report_args{evt} = $evt;
1859                     respond_with_status($report_args);
1860                     next;
1861                 }
1862
1863                 $evt = OpenILS::Application::Cat::AssetCommon->create_copy_note(
1864                     $e, $copy, '', $item->priv_note) if $item->priv_note;
1865
1866                 if($evt) {
1867                     $$report_args{evt} = $evt;
1868                     respond_with_status($report_args);
1869                     next;
1870                 }
1871             }
1872
1873             # set the import data on the import item
1874             $item->imported_as($copy->id); # $copy->id is set by create_copy() ^--
1875             $item->import_time('now');
1876
1877             unless($e->update_vandelay_import_item($item)) {
1878                 $$report_args{evt} = $e->die_event;
1879                 respond_with_status($report_args);
1880                 next;
1881             }
1882
1883             # --------------------------------------------------------------------------------
1884             # Item import succeeded
1885             # --------------------------------------------------------------------------------
1886             $e->commit;
1887             $$report_args{in_count}++;
1888             respond_with_status($report_args);
1889             $logger->info("vl: successfully imported item " . $item->barcode);
1890         }
1891     }
1892
1893     $roe->rollback;
1894     return undef;
1895 }
1896
1897 sub apply_import_item_defaults {
1898     my ($e, $item, $auto_cn) = @_;
1899     my $org = $item->owning_lib || $item->circ_lib;
1900     my %c = %item_defaults_cache;  
1901
1902     # fetch and cache the org unit setting value (unless 
1903     # it's already cached) and return the value to the caller
1904     my $set = sub {
1905         my $name = shift;
1906         return $c{$org}{$name} if defined $c{$org}{$name};
1907         my $sname = "vandelay.item.$name";
1908         $c{$org}{$name} = $U->ou_ancestor_setting_value($org, $sname, $e);
1909         $c{$org}{$name} = '' unless defined $c{$org}{$name};
1910         return $c{$org}{$name};
1911     };
1912
1913     if (!$item->barcode) {
1914
1915         if ($set->('barcode.auto')) {
1916
1917             my $pfx = $set->('barcode.prefix') || 'VAN';
1918             my $barcode = $pfx . $item->record . $item->id;
1919
1920             $logger->info("vl: using auto barcode $barcode for ".$item->id);
1921             $item->barcode($barcode);
1922
1923         } else {
1924             $logger->error("vl: no barcode (or defualt) for item ".$item->id);
1925         }
1926     }
1927
1928     if (!$item->call_number) {
1929
1930         if ($set->('call_number.auto')) {
1931
1932             if (!$auto_cn->{$org}) {
1933                 my $pfx = $set->('call_number.prefix') || 'VAN';
1934
1935                 # use the ID of the first item to differentiate this 
1936                 # call number from others linked to the same record
1937                 $auto_cn->{$org} = $pfx . $item->record . $item->id;
1938             }
1939
1940             $logger->info("vl: using auto call number ".$auto_cn->{$org});
1941             $item->call_number($auto_cn->{$org});
1942
1943         } else {
1944             $logger->error("vl: no call number or default for item ".$item->id);
1945         }
1946     }
1947 }
1948
1949
1950 sub respond_with_status {
1951     my $args = shift;
1952     my $e = $$args{e};
1953
1954     #  If the import failed, track the failure reason
1955
1956     my $error = $$args{import_error};
1957     my $evt = $$args{evt};
1958
1959     if($error or $evt) {
1960
1961         my $item = $$args{import_item};
1962         $logger->info("vl: unable to import item " . $item->barcode);
1963
1964         $error ||= 'general.unknown';
1965         $item->import_error($error);
1966
1967         if($evt) {
1968             my $detail = sprintf("%s : %s", $evt->{textcode}, substr($evt->{desc}, 0, 140));
1969             $item->error_detail($detail);
1970         }
1971
1972         # state of the editor is unknown at this point.  Force a rollback and start over.
1973         $e->rollback;
1974         $e = new_editor(xact => 1);
1975         $e->update_vandelay_import_item($item);
1976         $e->commit;
1977     }
1978
1979     if($$args{report_all} or ($$args{progress} % $$args{step}) == 0) {
1980         $$args{conn}->respond({
1981             total => $$args{total},
1982             progress => $$args{progress},
1983             success_count => $$args{success_count},
1984             err_event => $evt
1985         });
1986         $$args{step} *= 2 unless $$args{step} == 256;
1987     }
1988
1989     $$args{progress}++;
1990 }
1991
1992 __PACKAGE__->register_method(  
1993     api_name    => "open-ils.vandelay.match_set.get_tree",
1994     method      => "match_set_get_tree",
1995     api_level   => 1,
1996     argc        => 2,
1997     signature   => {
1998         desc    => q/For a given vms object, return a tree of match set points
1999                     represented by a vmsp object with recursively fleshed
2000                     children./
2001     }
2002 );
2003
2004 sub match_set_get_tree {
2005     my ($self, $conn, $authtoken, $match_set_id) = @_;
2006
2007     $match_set_id = int($match_set_id) or return;
2008
2009     my $e = new_editor("authtoken" => $authtoken);
2010     $e->checkauth or return $e->die_event;
2011
2012     my $set = $e->retrieve_vandelay_match_set($match_set_id) or
2013         return $e->die_event;
2014
2015     $e->allowed("ADMIN_IMPORT_MATCH_SET", $set->owner) or
2016         return $e->die_event;
2017
2018     my $tree = $e->search_vandelay_match_set_point([
2019         {"match_set" => $match_set_id, "parent" => undef},
2020         {"flesh" => -1, "flesh_fields" => {"vmsp" => ["children"]}}
2021     ]) or return $e->die_event;
2022
2023     return pop @$tree;
2024 }
2025
2026
2027 __PACKAGE__->register_method(
2028     api_name    => "open-ils.vandelay.match_set.update",
2029     method      => "match_set_update_tree",
2030     api_level   => 1,
2031     argc        => 3,
2032     signature   => {
2033         desc => q/Replace any vmsp objects associated with a given (by ID) vms
2034                 with the given objects (recursively fleshed vmsp tree)./
2035     }
2036 );
2037
2038 sub _walk_new_vmsp {
2039     my ($e, $match_set_id, $node, $parent_id) = @_;
2040
2041     my $point = new Fieldmapper::vandelay::match_set_point;
2042     $point->parent($parent_id);
2043     $point->match_set($match_set_id);
2044     $point->$_($node->$_) for (qw/bool_op svf tag subfield negate quality/);
2045
2046     $e->create_vandelay_match_set_point($point) or return $e->die_event;
2047
2048     $parent_id = $e->data->id;
2049     if ($node->children && @{$node->children}) {
2050         for (@{$node->children}) {
2051             return $e->die_event if
2052                 _walk_new_vmsp($e, $match_set_id, $_, $parent_id);
2053         }
2054     }
2055
2056     return;
2057 }
2058
2059 sub match_set_update_tree {
2060     my ($self, $conn, $authtoken, $match_set_id, $tree) = @_;
2061
2062     my $e = new_editor("xact" => 1, "authtoken" => $authtoken);
2063     $e->checkauth or return $e->die_event;
2064
2065     my $set = $e->retrieve_vandelay_match_set($match_set_id) or
2066         return $e->die_event;
2067
2068     $e->allowed("ADMIN_IMPORT_MATCH_SET", $set->owner) or
2069         return $e->die_event;
2070
2071     my $existing = $e->search_vandelay_match_set_point([
2072         {"match_set" => $match_set_id},
2073         {"order_by" => {"vmsp" => "id DESC"}}
2074     ]) or return $e->die_event;
2075
2076     # delete points, working up from leaf points to the root
2077     while(@$existing) {
2078         for my $point (shift @$existing) {
2079             if( grep {$_->parent eq $point->id} @$existing) {
2080                 push(@$existing, $point);
2081             } else {
2082                 $e->delete_vandelay_match_set_point($point) or return $e->die_event;
2083             }
2084         }
2085     }
2086
2087     _walk_new_vmsp($e, $match_set_id, $tree);
2088
2089     $e->commit or return $e->die_event;
2090 }
2091
2092 __PACKAGE__->register_method(  
2093     api_name    => 'open-ils.vandelay.bib_queue.to_bucket',
2094     method      => 'bib_queue_to_bucket',
2095     api_level   => 1,
2096     argc        => 2,
2097     signature   => {
2098         desc    => q/Add to or create a new bib container (bucket) with the successfully 
2099                     imported contents of a vandelay queue.  Any user that has Vandelay 
2100                     queue create permissions can append or create buckets from his-her own queues./,
2101         params  => [
2102             {desc => 'Authtoken', type => 'string'},
2103             {desc => 'Queue ID', type => 'number'},
2104             {desc => 'Bucket Name', type => 'string'}
2105         ],
2106         return  => {desc => q/
2107             {bucket => $bucket, addcount => number-of-items-added-to-bucket, item_count => total-bucket-item-count} on success,
2108             {add_count => 0} if there is nothing to do, and Event on error/}
2109     }
2110 );
2111
2112 sub bib_queue_to_bucket {
2113     my ($self, $conn, $auth, $q_id, $bucket_name) = @_;
2114
2115     my $e = new_editor(xact => 1, authtoken => $auth);
2116     return $e->die_event unless $e->checkauth;
2117     
2118     my $queue = $e->retrieve_vandelay_bib_queue($q_id)
2119         or return $e->die_event;
2120
2121     return OpenILS::Event->new('BAD_PARAMS', 
2122         note => q/Bucket creator must be queue owner/)
2123         unless $queue->owner == $e->requestor->id;
2124
2125     # find the bib IDs that will go into the bucket
2126     my $bib_ids = $e->json_query({
2127         select => {vqbr => ['imported_as']},
2128         from => 'vqbr',
2129         where => {queue => $q_id, imported_as => {'!=' => undef}}
2130     });
2131
2132     if (!@$bib_ids) { # no records to add
2133         $e->rollback;
2134         return {add_count => 0};
2135     }
2136
2137     # allow user to add to an existing bucket by name
2138     my $bucket = $e->search_container_biblio_record_entry_bucket({
2139         owner => $e->requestor->id, 
2140         name => $bucket_name,
2141         btype => 'vandelay_queue'
2142     })->[0];
2143
2144     # if the bucket does not exist, create a new one
2145     if (!$bucket) { 
2146         $bucket = Fieldmapper::container::biblio_record_entry_bucket->new;
2147         $bucket->name($bucket_name);
2148         $bucket->owner($e->requestor->id);
2149         $bucket->btype('vandelay_queue');
2150
2151         $e->create_container_biblio_record_entry_bucket($bucket)
2152             or return $e->die_event;
2153     }
2154
2155     # create the new bucket items
2156     for my $bib_id ( map {$_->{imported_as}} @$bib_ids ) {
2157         my $item = Fieldmapper::container::biblio_record_entry_bucket_item->new;
2158         $item->target_biblio_record_entry($bib_id);
2159         $item->bucket($bucket->id);
2160         $e->create_container_biblio_record_entry_bucket_item($item)
2161             or return $e->die_event;
2162     }
2163
2164     # re-fetch the bucket to pick up the correct create_time
2165     $bucket = $e->retrieve_container_biblio_record_entry_bucket($bucket->id)
2166         or return $e->die_event;
2167
2168     # get the total count of items in this bucket
2169     my $count = $e->json_query({
2170         select => {cbrebi => [{
2171             aggregate =>  1,
2172             transform => 'count',
2173             alias => 'count',
2174             column => 'id'
2175         }]},
2176         from => 'cbrebi',
2177         where => {bucket => $bucket->id}
2178     })->[0];
2179
2180     $e->commit;
2181
2182     return {
2183         bucket => $bucket, 
2184         add_count => scalar(@$bib_ids), # items added to the bucket
2185         item_count => $count->{count} # total items in buckets
2186     };
2187 }
2188
2189 1;