]> git.evergreen-ils.org Git - Evergreen.git/blob - Open-ILS/src/perlmods/lib/OpenILS/Application/Vandelay.pm
LP#1171984 Vandelay authority record matching
[Evergreen.git] / Open-ILS / src / perlmods / lib / OpenILS / Application / Vandelay.pm
1 package OpenILS::Application::Vandelay;
2 use strict; use warnings;
3 use OpenILS::Application;
4 use base qw/OpenILS::Application/;
5 use Unicode::Normalize;
6 use OpenSRF::EX qw/:try/;
7 use OpenSRF::AppSession;
8 use OpenSRF::Utils::SettingsClient;
9 use OpenSRF::Utils::Cache;
10 use OpenILS::Utils::Fieldmapper;
11 use OpenILS::Utils::CStoreEditor qw/:funcs/;
12 use OpenILS::Utils::Normalize qw/clean_marc/;
13 use MARC::Batch;
14 use MARC::Record;
15 use MARC::File::XML ( BinaryEncoding => 'UTF-8' );
16 use Time::HiRes qw(time);
17 use OpenSRF::Utils::Logger qw/$logger/;
18 use MIME::Base64;
19 use XML::LibXML;
20 use OpenILS::Const qw/:const/;
21 use OpenILS::Application::AppUtils;
22 use OpenILS::Application::Cat::BibCommon;
23 use OpenILS::Application::Cat::AuthCommon;
24 use OpenILS::Application::Cat::AssetCommon;
25 use OpenILS::Application::Cat::Merge;
26 my $U = 'OpenILS::Application::AppUtils';
27
28 # A list of LDR/06 values from http://loc.gov/marc
29 my %record_types = (
30         a => 'bib',
31         c => 'bib',
32         d => 'bib',
33         e => 'bib',
34         f => 'bib',
35         g => 'bib',
36         i => 'bib',
37         j => 'bib',
38         k => 'bib',
39         m => 'bib',
40         o => 'bib',
41         p => 'bib',
42         r => 'bib',
43         t => 'bib',
44         u => 'holdings',
45         v => 'holdings',
46         x => 'holdings',
47         y => 'holdings',
48         z => 'auth',
49       ' ' => 'bib',
50 );
51
52 sub initialize {}
53 sub child_init {}
54
55 # --------------------------------------------------------------------------------
56 # Biblio ingest
57
58 sub create_bib_queue {
59     my $self = shift;
60     my $client = shift;
61     my $auth = shift;
62     my $name = shift;
63     my $owner = shift;
64     my $type = shift;
65     my $match_set = shift;
66     my $import_def = shift;
67     my $match_bucket = shift;
68
69     my $e = new_editor(authtoken => $auth, xact => 1);
70
71     return $e->die_event unless $e->checkauth;
72     return $e->die_event unless $e->allowed('CREATE_BIB_IMPORT_QUEUE');
73     $owner ||= $e->requestor->id;
74
75     if ($e->search_vandelay_bib_queue( {name => $name, owner => $owner, queue_type => $type})->[0]) {
76         $e->rollback;
77         return OpenILS::Event->new('BIB_QUEUE_EXISTS') 
78     }
79
80     my $queue = new Fieldmapper::vandelay::bib_queue();
81     $queue->name( $name );
82     $queue->owner( $owner );
83     $queue->queue_type( $type ) if ($type);
84     $queue->item_attr_def( $import_def ) if ($import_def);
85     $queue->match_set($match_set) if $match_set;
86     $queue->match_bucket($match_bucket) if $match_bucket;
87
88     my $new_q = $e->create_vandelay_bib_queue( $queue );
89     return $e->die_event unless ($new_q);
90     $e->commit;
91
92     return $new_q;
93 }
94 __PACKAGE__->register_method(  
95     api_name   => "open-ils.vandelay.bib_queue.create",
96     method     => "create_bib_queue",
97     api_level  => 1,
98     argc       => 4,
99 );                      
100
101
102 sub create_auth_queue {
103     my $self = shift;
104     my $client = shift;
105     my $auth = shift;
106     my $name = shift;
107     my $owner = shift;
108     my $type = shift;
109     my $match_set = shift;
110
111     my $e = new_editor(authtoken => $auth, xact => 1);
112
113     return $e->die_event unless $e->checkauth;
114     return $e->die_event unless $e->allowed('CREATE_AUTHORITY_IMPORT_QUEUE');
115     $owner ||= $e->requestor->id;
116
117     if ($e->search_vandelay_authority_queue({name => $name, owner => $owner, queue_type => $type})->[0]) {
118         $e->rollback;
119         return OpenILS::Event->new('AUTH_QUEUE_EXISTS') 
120     }
121
122     my $queue = new Fieldmapper::vandelay::authority_queue();
123     $queue->name( $name );
124     $queue->owner( $owner );
125     $queue->queue_type( $type ) if ($type);
126     $queue->match_set($match_set) if $match_set;
127
128     my $new_q = $e->create_vandelay_authority_queue( $queue );
129     $e->die_event unless ($new_q);
130     $e->commit;
131
132     return $new_q;
133 }
134 __PACKAGE__->register_method(  
135     api_name   => "open-ils.vandelay.authority_queue.create",
136     method     => "create_auth_queue",
137     api_level  => 1,
138     argc       => 3,
139 );                      
140
141 sub add_record_to_bib_queue {
142     my $self = shift;
143     my $client = shift;
144     my $auth = shift;
145     my $queue = shift;
146     my $marc = shift;
147     my $purpose = shift;
148     my $bib_source = shift;
149
150     my $e = new_editor(authtoken => $auth, xact => 1);
151
152     $queue = $e->retrieve_vandelay_bib_queue($queue);
153
154     return $e->die_event unless $e->checkauth;
155     return $e->die_event unless
156         ($e->allowed('CREATE_BIB_IMPORT_QUEUE', undef, $queue) ||
157          $e->allowed('CREATE_BIB_IMPORT_QUEUE'));
158
159     my $new_rec = _add_bib_rec($e, $marc, $queue->id, $purpose, $bib_source);
160
161     return $e->die_event unless ($new_rec);
162     $e->commit;
163     return $new_rec;
164 }
165 __PACKAGE__->register_method(  
166     api_name   => "open-ils.vandelay.queued_bib_record.create",
167     method     => "add_record_to_bib_queue",
168     api_level  => 1,
169     argc       => 3,
170 );                      
171
172 sub _add_bib_rec {
173     my $e = shift;
174     my $marc = shift;
175     my $queue = shift;
176     my $purpose = shift;
177     my $bib_source = shift;
178
179     my $rec = new Fieldmapper::vandelay::queued_bib_record();
180     $rec->marc( $marc );
181     $rec->queue( $queue );
182     $rec->purpose( $purpose ) if ($purpose);
183     $rec->bib_source($bib_source);
184
185     return $e->create_vandelay_queued_bib_record( $rec, {timeout => 600} );
186 }
187
188 sub add_record_to_authority_queue {
189     my $self = shift;
190     my $client = shift;
191     my $auth = shift;
192     my $queue = shift;
193     my $marc = shift;
194     my $purpose = shift;
195
196     my $e = new_editor(authtoken => $auth, xact => 1);
197
198     $queue = $e->retrieve_vandelay_authority_queue($queue);
199
200     return $e->die_event unless $e->checkauth;
201     return $e->die_event unless
202         ($e->allowed('CREATE_AUTHORITY_IMPORT_QUEUE', undef, $queue) ||
203          $e->allowed('CREATE_AUTHORITY_IMPORT_QUEUE'));
204
205     my $new_rec = _add_auth_rec($e, $marc, $queue->id, $purpose);
206
207     return $e->die_event unless ($new_rec);
208     $e->commit;
209     return $new_rec;
210 }
211 __PACKAGE__->register_method(
212     api_name   => "open-ils.vandelay.queued_authority_record.create",
213     method     => "add_record_to_authority_queue",
214     api_level  => 1,
215     argc       => 3,
216 );
217
218 sub _add_auth_rec {
219     my $e = shift;
220     my $marc = shift;
221     my $queue = shift;
222     my $purpose = shift;
223
224     my $rec = new Fieldmapper::vandelay::queued_authority_record();
225     $rec->marc( $marc );
226     $rec->queue( $queue );
227     $rec->purpose( $purpose ) if ($purpose);
228
229     return $e->create_vandelay_queued_authority_record( $rec, {timeout => 600} );
230 }
231
232 sub process_spool {
233     my $self = shift;
234     my $client = shift;
235     my $auth = shift;
236     my $fingerprint = shift || '';
237     my $queue_id = shift;
238     my $purpose = shift;
239     my $filename = shift;
240     my $bib_source = shift;
241
242     my $e = new_editor(authtoken => $auth, xact => 1);
243     return $e->die_event unless $e->checkauth;
244
245     my $queue;
246     my $type = $self->{record_type};
247
248     if($type eq 'bib') {
249         $queue = $e->retrieve_vandelay_bib_queue($queue_id) or return $e->die_event;
250     } else {
251         $queue = $e->retrieve_vandelay_authority_queue($queue_id) or return $e->die_event;
252     }
253
254     my $evt = check_queue_perms($e, $type, $queue);
255     return $evt if ($evt);
256
257     my $cache = new OpenSRF::Utils::Cache();
258
259     if($fingerprint) {
260         my $data = $cache->get_cache('vandelay_import_spool_' . $fingerprint);
261         $purpose = $data->{purpose};
262         $filename = $data->{path};
263         $bib_source = $data->{bib_source};
264     }
265
266     unless(-r $filename) {
267         $logger->error("unable to read MARC file $filename");
268         return -1; # make this an event XXX
269     }
270
271     $logger->info("vandelay spooling $fingerprint purpose=$purpose file=$filename");
272
273     my $marctype = 'USMARC'; 
274
275     open F, $filename;
276     $marctype = 'XML' if (getc(F) =~ /^\D/o);
277     close F;
278
279     my $batch = new MARC::Batch ($marctype, $filename);
280     $batch->strict_off;
281
282     my $response_scale = 10;
283     my $count = 0;
284     my $r = -1;
285     while (try { $r = $batch->next } otherwise { $r = -1 }) {
286         if ($r == -1) {
287             $logger->warn("Processing of record $count in set $filename failed.  Skipping this record");
288             $count++;
289         }
290
291         $logger->info("processing record $count");
292
293         try {
294             my $xml = clean_marc($r);
295
296             my $qrec;
297             # Check the leader to ensure we've got something resembling the expected
298             # Allow spaces to give records the benefit of the doubt
299             my $ldr_type = substr($r->leader(), 6, 1);
300             if ($type eq 'bib' && ($record_types{$ldr_type}) eq 'bib' || $ldr_type eq ' ') {
301                 $qrec = _add_bib_rec( $e, $xml, $queue_id, $purpose, $bib_source ) or return $e->die_event;
302             } elsif ($type eq 'auth' && ($record_types{$ldr_type}) eq 'auth' || $ldr_type eq ' ') {
303                 $qrec = _add_auth_rec( $e, $xml, $queue_id, $purpose ) or return $e->die_event;
304             } else {
305                 # I don't know how to handle this type; rock on
306                 $logger->error("In process_spool(), type was $type and leader type was $ldr_type ; not currently supported");
307                 next;
308             }
309
310             if($self->api_name =~ /stream_results/ and $qrec) {
311                 $client->respond($qrec->id)
312             } else {
313                 $client->respond($count) if (++$count % $response_scale) == 0;
314                 $response_scale *= 10 if ($count == ($response_scale * 10));
315             }
316         } catch Error with {
317             my $error = shift;
318             $logger->warn("Encountered a bad record at Vandelay ingest: ".$error);
319         }
320     }
321
322     $e->commit;
323     unlink($filename);
324     $cache->delete_cache('vandelay_import_spool_' . $fingerprint) if $fingerprint;
325     return $count;
326 }
327
328 __PACKAGE__->register_method(  
329     api_name    => "open-ils.vandelay.bib.process_spool",
330     method      => "process_spool",
331     api_level   => 1,
332     argc        => 3,
333     max_chunk_size => 0,
334     record_type => 'bib'
335 );                      
336 __PACKAGE__->register_method(  
337     api_name    => "open-ils.vandelay.auth.process_spool",
338     method      => "process_spool",
339     api_level   => 1,
340     argc        => 3,
341     max_chunk_size => 0,
342     record_type => 'auth'
343 );                      
344
345 __PACKAGE__->register_method(  
346     api_name    => "open-ils.vandelay.bib.process_spool.stream_results",
347     method      => "process_spool",
348     api_level   => 1,
349     argc        => 3,
350     stream      => 1,
351     max_chunk_size => 0,
352     record_type => 'bib'
353 );                      
354 __PACKAGE__->register_method(  
355     api_name    => "open-ils.vandelay.auth.process_spool.stream_results",
356     method      => "process_spool",
357     api_level   => 1,
358     argc        => 3,
359     stream      => 1,
360     max_chunk_size => 0,
361     record_type => 'auth'
362 );
363
364 __PACKAGE__->register_method(  
365     api_name    => "open-ils.vandelay.bib_queue.records.retrieve",
366     method      => 'retrieve_queued_records',
367     api_level   => 1,
368     argc        => 2,
369     stream      => 1,
370     record_type => 'bib'
371 );
372 __PACKAGE__->register_method(
373     api_name    => "open-ils.vandelay.bib_queue.records.retrieve.export.print",
374     method      => 'retrieve_queued_records',
375     api_level   => 1,
376     argc        => 2,
377     stream      => 1,
378     record_type => 'bib'
379 );
380 __PACKAGE__->register_method(
381     api_name    => "open-ils.vandelay.bib_queue.records.retrieve.export.csv",
382     method      => 'retrieve_queued_records',
383     api_level   => 1,
384     argc        => 2,
385     stream      => 1,
386     record_type => 'bib'
387 );
388 __PACKAGE__->register_method(
389     api_name    => "open-ils.vandelay.bib_queue.records.retrieve.export.email",
390     method      => 'retrieve_queued_records',
391     api_level   => 1,
392     argc        => 2,
393     stream      => 1,
394     record_type => 'bib'
395 );
396
397 __PACKAGE__->register_method(  
398     api_name    => "open-ils.vandelay.auth_queue.records.retrieve",
399     method      => 'retrieve_queued_records',
400     api_level   => 1,
401     argc        => 2,
402     stream      => 1,
403     record_type => 'auth'
404 );
405 __PACKAGE__->register_method(
406     api_name    => "open-ils.vandelay.auth_queue.records.retrieve.export.print",
407     method      => 'retrieve_queued_records',
408     api_level   => 1,
409     argc        => 2,
410     stream      => 1,
411     record_type => 'auth'
412 );
413 __PACKAGE__->register_method(
414     api_name    => "open-ils.vandelay.auth_queue.records.retrieve.export.csv",
415     method      => 'retrieve_queued_records',
416     api_level   => 1,
417     argc        => 2,
418     stream      => 1,
419     record_type => 'auth'
420 );
421 __PACKAGE__->register_method(
422     api_name    => "open-ils.vandelay.auth_queue.records.retrieve.export.email",
423     method      => 'retrieve_queued_records',
424     api_level   => 1,
425     argc        => 2,
426     stream      => 1,
427     record_type => 'auth'
428 );
429
430 __PACKAGE__->register_method(  
431     api_name    => "open-ils.vandelay.bib_queue.records.matches.retrieve",
432     method      => 'retrieve_queued_records',
433     api_level   => 1,
434     argc        => 2,
435     stream      => 1,
436     record_type => 'bib',
437     signature   => {
438         desc => q/Only retrieve queued bib records that have matches against existing records/
439     }
440 );
441 __PACKAGE__->register_method(  
442     api_name    => "open-ils.vandelay.auth_queue.records.matches.retrieve",
443     method      => 'retrieve_queued_records',
444     api_level   => 1,
445     argc        => 2,
446     stream      => 1,
447     record_type => 'auth',
448     signature   => {
449         desc => q/Only retrieve queued authority records that have matches against existing records/
450     }
451 );
452
453 sub retrieve_queued_records {
454     my($self, $conn, $auth, $queue_id, $options) = @_;
455
456     $options ||= {};
457     my $limit = $$options{limit} || 20;
458     my $offset = $$options{offset} || 0;
459     my $type = $self->{record_type};
460
461     my $e = new_editor(authtoken => $auth, xact => 1);
462     return $e->die_event unless $e->checkauth;
463
464     my $queue;
465     if($type eq 'bib') {
466         $queue = $e->retrieve_vandelay_bib_queue($queue_id) or return $e->die_event;
467     } else {
468         $queue = $e->retrieve_vandelay_authority_queue($queue_id) or return $e->die_event;
469     }
470     my $evt = check_queue_perms($e, $type, $queue);
471     return $evt if ($evt);
472
473     my $class = ($type eq 'bib') ? 'vqbr' : 'vqar';
474     my $mclass = $type eq 'bib' ? 'vbm' : 'vam';
475     my $query = {
476         select => {
477             $class => ['id'],
478             $mclass => [{
479                 column => 'eg_record', 
480                 transform => 'min',
481                 aggregate => 1
482             }]
483         },
484         from => $class,
485         where => {queue => $queue_id},
486         distinct => 1,
487         limit => $limit,
488         offset => $offset,
489     };
490     if($self->api_name =~ /export/) {
491         delete $query->{limit};
492         delete $query->{offset};
493     }
494
495     $query->{where}->{import_time} = undef if $$options{non_imported};
496
497     if($$options{with_import_error}) {
498
499         $query->{from} = {$class => {vii => {type => 'left'}}};
500         $query->{where}->{'-or'} = [
501             {'+vqbr' => {import_error => {'!=' => undef}}},
502             {'+vii' => {import_error => {'!=' => undef}}}
503         ];
504
505     } else {
506         
507         if($$options{with_rec_import_error}) {
508             $query->{where}->{import_error} = {'!=' => undef};
509
510         } elsif( $$options{with_item_import_error} and $type eq 'bib') {
511
512             $query->{from} = {$class => 'vii'};
513             $query->{where}->{'+vii'} = {import_error => {'!=' => undef}};
514         }
515     }
516
517     if($self->api_name =~ /matches/) {
518         # find only records that have matches
519         $query->{from} = {$class => {$mclass => {type => 'right'}}};
520     } else {
521         # join to mclass for sorting (see below)
522         $query->{from} = {$class => {$mclass => {type => 'left'}}};
523     }
524
525     # order by the matched bib records to group like queued records
526     $query->{order_by} = [
527         {class => $mclass, field => 'eg_record', transform => 'min'},
528         {class => $class, field => 'id'} 
529     ];
530
531     my $record_ids = $e->json_query($query);
532
533     my $retrieve = ($type eq 'bib') ? 
534         'retrieve_vandelay_queued_bib_record' : 'retrieve_vandelay_queued_authority_record';
535     my $search = ($type eq 'bib') ? 
536         'search_vandelay_queued_bib_record' : 'search_vandelay_queued_authority_record';
537
538     if ($self->api_name =~ /export/) {
539         my $rec_list = $e->$search({id => [map { $_->{id} } @$record_ids]}, {substream => 1});
540         if ($self->api_name =~ /print/) {
541
542             $e->rollback;
543             return $U->fire_object_event(
544                 undef,
545                 'vandelay.queued_'.$type.'_record.print',
546                 $rec_list,
547                 $e->requestor->ws_ou
548             );
549
550         } elsif ($self->api_name =~ /csv/) {
551
552             $e->rollback;
553             return $U->fire_object_event(
554                 undef,
555                 'vandelay.queued_'.$type.'_record.csv',
556                 $rec_list,
557                 $e->requestor->ws_ou
558             );
559
560         } elsif ($self->api_name =~ /email/) {
561
562             $conn->respond_complete(1);
563
564             for my $rec (@$rec_list) {
565                 $U->create_events_for_hook(
566                     'vandelay.queued_'.$type.'_record.email',
567                     $rec,
568                     $e->requestor->home_ou,
569                     undef,
570                     undef,
571                     1
572                 );
573             }
574
575         }
576     } else {
577         for my $rec_id (@$record_ids) {
578             my $flesh = ['attributes', 'matches'];
579             push(@$flesh, 'import_items') if $$options{flesh_import_items};
580             my $params = {flesh => 1, flesh_fields => {$class => $flesh}};
581             my $rec = $e->$retrieve([$rec_id->{id}, $params]);
582             $rec->clear_marc if $$options{clear_marc};
583             $conn->respond($rec);
584         }
585     }
586
587     $e->rollback;
588     return undef;
589 }
590
591 __PACKAGE__->register_method(  
592     api_name    => 'open-ils.vandelay.import_item.queue.retrieve',
593     method      => 'retrieve_queue_import_items',
594     api_level   => 1,
595     argc        => 2,
596     stream      => 1,
597     authoritative => 1,
598     signature => q/
599         Returns Import Item (vii) objects for the selected queue.
600         Filter options:
601             with_import_error : only return items that failed to import
602     /
603 );
604 __PACKAGE__->register_method(
605     api_name    => 'open-ils.vandelay.import_item.queue.export.print',
606     method      => 'retrieve_queue_import_items',
607     api_level   => 1,
608     argc        => 2,
609     stream      => 1,
610     authoritative => 1,
611     signature => q/
612         Returns template-generated printable output of Import Item (vii) objects for the selected queue.
613         Filter options:
614             with_import_error : only return items that failed to import
615     /
616 );
617 __PACKAGE__->register_method(
618     api_name    => 'open-ils.vandelay.import_item.queue.export.csv',
619     method      => 'retrieve_queue_import_items',
620     api_level   => 1,
621     argc        => 2,
622     stream      => 1,
623     authoritative => 1,
624     signature => q/
625         Returns template-generated CSV output of Import Item (vii) objects for the selected queue.
626         Filter options:
627             with_import_error : only return items that failed to import
628     /
629 );
630 __PACKAGE__->register_method(
631     api_name    => 'open-ils.vandelay.import_item.queue.export.email',
632     method      => 'retrieve_queue_import_items',
633     api_level   => 1,
634     argc        => 2,
635     stream      => 1,
636     authoritative => 1,
637     signature => q/
638         Emails template-generated output of Import Item (vii) objects for the selected queue.
639         Filter options:
640             with_import_error : only return items that failed to import
641     /
642 );
643
644 sub retrieve_queue_import_items {
645     my($self, $conn, $auth, $q_id, $options) = @_;
646
647     $options ||= {};
648     my $limit = $$options{limit} || 20;
649     my $offset = $$options{offset} || 0;
650
651     my $e = new_editor(authtoken => $auth);
652     return $e->event unless $e->checkauth;
653
654     my $queue = $e->retrieve_vandelay_bib_queue($q_id) or return $e->event;
655     my $evt = check_queue_perms($e, 'bib', $queue);
656     return $evt if $evt;
657
658     my $query = {
659         select => {vii => ['id']},
660         from => {
661             vii => {
662                 vqbr => {
663                     join => {
664                         'vbq' => {
665                             field => 'id',
666                             fkey => 'queue',
667                             filter => {id => $q_id}
668                         }
669                     }
670                 }
671             }
672         },
673         order_by => {'vii' => ['record','id']},
674         limit => $limit,
675         offset => $offset
676     };
677     if($self->api_name =~ /export/) {
678         delete $query->{limit};
679         delete $query->{offset};
680     }
681
682     $query->{where} = {'+vii' => {import_error => {'!=' => undef}}}
683         if $$options{with_import_error};
684
685     my $items = $e->json_query($query);
686     my $item_list = $e->search_vandelay_import_item({id => [map { $_->{id} } @$items]});
687     if ($self->api_name =~ /export/) {
688         if ($self->api_name =~ /print/) {
689
690             return $U->fire_object_event(
691                 undef,
692                 'vandelay.import_items.print',
693                 $item_list,
694                 $e->requestor->ws_ou
695             );
696
697         } elsif ($self->api_name =~ /csv/) {
698
699             return $U->fire_object_event(
700                 undef,
701                 'vandelay.import_items.csv',
702                 $item_list,
703                 $e->requestor->ws_ou
704             );
705
706         } elsif ($self->api_name =~ /email/) {
707
708             $conn->respond_complete(1);
709
710             for my $item (@$item_list) {
711                 $U->create_events_for_hook(
712                     'vandelay.import_items.email',
713                     $item,
714                     $e->requestor->home_ou,
715                     undef,
716                     undef,
717                     1
718                 );
719             }
720
721         }
722     } else {
723         for my $item (@$item_list) {
724             $conn->respond($item);
725         }
726     }
727
728     return undef;
729 }
730
731 sub check_queue_perms {
732     my($e, $type, $queue) = @_;
733     if ($type eq 'bib') {
734         return $e->die_event unless
735             ($e->allowed('CREATE_BIB_IMPORT_QUEUE', undef, $queue) ||
736              $e->allowed('CREATE_BIB_IMPORT_QUEUE'));
737     } else {
738         return $e->die_event unless
739             ($e->allowed('CREATE_AUTHORITY_IMPORT_QUEUE', undef, $queue) ||
740              $e->allowed('CREATE_AUTHORITY_IMPORT_QUEUE'));
741     }
742
743     return undef;
744 }
745
746 __PACKAGE__->register_method(  
747     api_name    => "open-ils.vandelay.bib_record.list.import",
748     method      => 'import_record_list',
749     api_level   => 1,
750     argc        => 2,
751     stream      => 1,
752     record_type => 'bib'
753 );
754
755 __PACKAGE__->register_method(  
756     api_name    => "open-ils.vandelay.auth_record.list.import",
757     method      => 'import_record_list',
758     api_level   => 1,
759     argc        => 2,
760     stream      => 1,
761     record_type => 'auth'
762 );
763
764 sub import_record_list {
765     my($self, $conn, $auth, $rec_ids, $args) = @_;
766     my $e = new_editor(authtoken => $auth, xact => 1);
767     return $e->die_event unless $e->checkauth;
768     $args ||= {};
769     my $err = import_record_list_impl($self, $conn, $rec_ids, $e->requestor, $args);
770     try {$e->rollback} otherwise {}; 
771     return $err if $err;
772     return {complete => 1};
773 }
774
775
776 __PACKAGE__->register_method(  
777     api_name    => "open-ils.vandelay.bib_queue.import",
778     method      => 'import_queue',
779     api_level   => 1,
780     argc        => 2,
781     stream      => 1,
782     max_chunk_size => 0,
783     record_type => 'bib',
784     signature => {
785         desc => q/
786             Attempts to import all non-imported records for the selected queue.
787             Will also attempt import of all non-imported items.
788         /
789     }
790 );
791
792 __PACKAGE__->register_method(  
793     api_name    => "open-ils.vandelay.auth_queue.import",
794     method      => 'import_queue',
795     api_level   => 1,
796     argc        => 2,
797     stream      => 1,
798     max_chunk_size => 0,
799     record_type => 'auth'
800 );
801
802 sub import_queue {
803     my($self, $conn, $auth, $q_id, $options) = @_;
804     my $e = new_editor(authtoken => $auth, xact => 1);
805     return $e->die_event unless $e->checkauth;
806     $options ||= {};
807     my $type = $self->{record_type};
808     my $class = ($type eq 'bib') ? 'vqbr' : 'vqar';
809
810     # First, collect the not-yet-imported records
811     my $query = {queue => $q_id, import_time => undef};
812     my $search = ($type eq 'bib') ? 
813         'search_vandelay_queued_bib_record' : 
814         'search_vandelay_queued_authority_record';
815     my $rec_ids = $e->$search($query, {idlist => 1});
816
817     # Now add any imported records that have un-imported items
818
819     if($type eq 'bib') {
820         my $item_recs = $e->json_query({
821             select => {vqbr => ['id']},
822             from => {vqbr => 'vii'},
823             where => {
824                 '+vqbr' => {
825                     queue => $q_id,
826                     import_time => {'!=' => undef}
827                 },
828                 '+vii' => {import_time => undef}
829             },
830             distinct => 1
831         });
832         push(@$rec_ids, map {$_->{id}} @$item_recs);
833     }
834
835     my $err = import_record_list_impl($self, $conn, $rec_ids, $e->requestor, $options);
836     try {$e->rollback} otherwise {}; # only using this to make the read authoritative -- don't die from it
837     return $err if $err;
838     return {complete => 1};
839 }
840
841 # returns a list of queued record IDs for a given queue that 
842 # have at least one entry in the match table
843 # XXX DEPRECATED?
844 sub queued_records_with_matches {
845     my($e, $type, $q_id, $limit, $offset, $filter) = @_;
846
847     my $match_class = 'vbm';
848     my $rec_class = 'vqbr';
849     if($type eq 'auth') {
850         $match_class = 'vam';
851          $rec_class = 'vqar';
852     }
853
854     $filter ||= {};
855     $filter->{queue} = $q_id;
856
857     my $query = {
858         distinct => 1, 
859         select => {$match_class => ['queued_record']}, 
860         from => {
861             $match_class => {
862                 $rec_class => {
863                     field => 'id',
864                     fkey => 'queued_record',
865                     filter => $filter,
866                 }
867             }
868         }
869     };        
870
871     if($limit or defined $offset) {
872         $limit ||= 20;
873         $offset ||= 0;
874         $query->{limit} = $limit;
875         $query->{offset} = $offset;
876     }
877
878     my $data = $e->json_query($query);
879     return [ map {$_->{queued_record}} @$data ];
880 }
881
882
883 # cache of import item org unit settings.  
884 # used in apply_import_item_defaults() below, 
885 # but reset on each call to import_record_list_impl()
886 my %item_defaults_cache;
887
888 sub import_record_list_impl {
889     my($self, $conn, $rec_ids, $requestor, $args) = @_;
890
891     my $overlay_map = $args->{overlay_map} || {};
892     my $type = $self->{record_type};
893     my %queues;
894     %item_defaults_cache = ();
895
896     my $report_args = {
897         progress => 1,
898         step => 1,
899         conn => $conn,
900         total => scalar(@$rec_ids),
901         report_all => $$args{report_all}
902     };
903
904     $conn->max_chunk_count(1) if $$args{report_all};
905
906     my $auto_overlay_exact = $$args{auto_overlay_exact};
907     my $auto_overlay_1match = $$args{auto_overlay_1match};
908     my $auto_overlay_best = $$args{auto_overlay_best_match};
909     my $match_quality_ratio = $$args{match_quality_ratio};
910     my $merge_profile = $$args{merge_profile};
911     my $ft_merge_profile = $$args{fall_through_merge_profile};
912     my $bib_source = $$args{bib_source};
913     my $import_no_match = $$args{import_no_match};
914     my $strip_grps = $$args{strip_field_groups}; # bib-only
915
916     my $overlay_func = 'vandelay.overlay_bib_record';
917     my $auto_overlay_func = 'vandelay.auto_overlay_bib_record';
918     my $auto_overlay_best_func = 'vandelay.auto_overlay_bib_record_with_best';
919     my $retrieve_func = 'retrieve_vandelay_queued_bib_record';
920     my $update_func = 'update_vandelay_queued_bib_record';
921     my $search_func = 'search_vandelay_queued_bib_record';
922     my $retrieve_queue_func = 'retrieve_vandelay_bib_queue';
923     my $update_queue_func = 'update_vandelay_bib_queue';
924     my $delete_queue_func = 'delete_vandelay_bib_queue';
925     my $rec_class = 'vqbr';
926
927     my $editor = new_editor();
928
929     my %bib_sources;
930     my $sources = $editor->search_config_bib_source({id => {'!=' => undef}});
931     $bib_sources{$_->id} = $_->source for @$sources;
932
933     if($type eq 'auth') {
934         $overlay_func =~ s/bib/authority/o; 
935         $auto_overlay_func =~ s/bib/authority/o; 
936         $auto_overlay_best_func =~ s/bib/authority/o;
937         $retrieve_func =~ s/bib/authority/o;
938         $retrieve_queue_func =~ s/bib/authority/o;
939         $update_queue_func =~ s/bib/authority/o;
940         $update_func =~ s/bib/authority/o;
941         $search_func =~ s/bib/authority/o;
942         $delete_queue_func =~ s/bib/authority/o;
943         $rec_class = 'vqar';
944     }
945
946     my $new_rec_perm_cache;
947     my @success_rec_ids;
948     for my $rec_id (@$rec_ids) {
949
950         my $error = 0;
951         my $overlay_target = $overlay_map->{$rec_id};
952
953         my $e = new_editor(xact => 1);
954         $e->requestor($requestor);
955
956         $$report_args{e} = $e;
957         $$report_args{evt} = undef;
958         $$report_args{import_error} = undef;
959         $$report_args{no_import} = 0;
960
961         my $rec = $e->$retrieve_func([
962             $rec_id,
963             {   flesh => 1,
964                 flesh_fields => { $rec_class => ['matches']},
965             }
966         ]);
967
968         unless($rec) {
969             $$report_args{evt} = $e->event;
970             finish_rec_import_attempt($report_args);
971             next;
972         }
973
974         if($rec->import_time) {
975             # if the record is already imported, that means it may have 
976             # un-imported copies.  Add to success list for later processing.
977             push(@success_rec_ids, $rec_id);
978             $e->rollback;
979             next;
980         }
981
982         $$report_args{rec} = $rec;
983         $queues{$rec->queue} = 1;
984
985         my $record;
986         my $imported = 0;
987
988         if ($type eq 'bib') {
989             # strip configured / selected MARC tags from inbound records
990
991             my $marcdoc = XML::LibXML->new->parse_string($rec->marc);
992             $rec->marc($U->strip_marc_fields($e, $marcdoc, $strip_grps));
993
994             # Set the imported record's 905$u, so
995             # editor/creator/edit_date are set correctly.
996             $marcdoc = XML::LibXML->new->parse_string($rec->marc);
997             $rec->marc($U->set_marc_905u($marcdoc, $requestor->usrname));
998
999             unless ($e->$update_func($rec)) {
1000                 $$report_args{evt} = $e->die_event;
1001                 finish_rec_import_attempt($report_args);
1002                 next;
1003             }
1004         }
1005
1006         if(defined $overlay_target) {
1007             # Caller chose an explicit overlay target
1008
1009             my $res = $e->json_query(
1010                 {
1011                     from => [
1012                         $overlay_func,
1013                         $rec_id,
1014                         $overlay_target, 
1015                         $merge_profile
1016                     ]
1017                 }
1018             );
1019
1020             if($res and ($res = $res->[0])) {
1021
1022                 if($res->{$overlay_func} eq 't') {
1023                     $logger->info("vl: $type direct overlay succeeded for queued rec ".
1024                         "$rec_id and overlay target $overlay_target");
1025                     $imported = 1;
1026                     $rec->imported_as($overlay_target);
1027                 }
1028
1029             } else {
1030                 $error = 1;
1031                 $logger->error("vl: Error attempting overlay with func=$overlay_func, profile=$merge_profile, record=$rec_id");
1032             }
1033
1034         } else {
1035
1036             if($auto_overlay_1match) { # overlay if there is exactly 1 match
1037
1038                 my %match_recs = map { $_->eg_record => 1 } @{$rec->matches};
1039
1040                 if( scalar(keys %match_recs) == 1) { # all matches point to the same record
1041
1042                     ($imported, $error, $rec) = try_auto_overlay(
1043                         $e, $type,
1044                         $report_args, 
1045                         $auto_overlay_best_func,
1046                         $retrieve_func,
1047                         $rec_class,
1048                         $rec_id, 
1049                         $match_quality_ratio, 
1050                         $merge_profile, 
1051                         $ft_merge_profile
1052                     );
1053                 }
1054             }
1055
1056             if(!$imported and !$error and $auto_overlay_exact and scalar(@{$rec->matches}) == 1 ) {
1057                 
1058                 # caller says to overlay if there is an /exact/ match
1059                 # $auto_overlay_func only proceeds and returns true on exact matches
1060
1061                 my $res = $e->json_query(
1062                     {
1063                         from => [
1064                             $auto_overlay_func,
1065                             $rec_id,
1066                             $merge_profile
1067                         ]
1068                     }
1069                 );
1070
1071                 if($res and ($res = $res->[0])) {
1072
1073                     if($res->{$auto_overlay_func} eq 't') {
1074                         $logger->info("vl: $type auto-overlay succeeded for queued rec $rec_id");
1075                         $imported = 1;
1076
1077                         # re-fetch the record to pick up the imported_as value from the DB
1078                         $$report_args{rec} = $rec = $e->$retrieve_func([
1079                             $rec_id, {flesh => 1, flesh_fields => {$rec_class => ['matches']}}]);
1080
1081                     } else {
1082                         $logger->info("vl: $type auto-overlay failed for queued rec $rec_id");
1083                     }
1084
1085                 } else {
1086                     $error = 1;
1087                     $logger->error("vl: Error attempting overlay with func=$auto_overlay_func, profile=$merge_profile, record=$rec_id");
1088                 }
1089             }
1090
1091             if(!$imported and !$error and $auto_overlay_best and scalar(@{$rec->matches}) > 0 ) {
1092                 # caller says to overlay the best match
1093
1094                 ($imported, $error, $rec) = try_auto_overlay(
1095                     $e, $type,
1096                     $report_args, 
1097                     $auto_overlay_best_func,
1098                     $retrieve_func,
1099                     $rec_class,
1100                     $rec_id, 
1101                     $match_quality_ratio, 
1102                     $merge_profile, 
1103                     $ft_merge_profile
1104                 );
1105             }
1106
1107             if(!$imported and !$error and $import_no_match and scalar(@{$rec->matches}) == 0) {
1108             
1109                 # No overlay / merge occurred.  Do a traditional record import by creating a new record
1110
1111                 if (!$new_rec_perm_cache) {
1112                     $new_rec_perm_cache = {};
1113
1114                     # all users creating new records are required to have the basic permission.
1115                     # if the client requests, we can enforce extra permissions for creating new records.
1116                     # for speed, check the permissions the first time then cache the result.
1117
1118                     my $perm = ($type eq 'bib') ? 'IMPORT_MARC' : 'IMPORT_AUTHORITY_MARC';
1119                     my $xperm = $$args{new_rec_perm};
1120                     my $rec_ou = $e->requestor->ws_ou;
1121
1122                     $new_rec_perm_cache->{evt} = $e->die_event
1123                         if !$e->allowed($perm, $rec_ou) || ($xperm and !$e->allowed($xperm, $rec_ou));
1124                 }
1125
1126                 if ($new_rec_perm_cache->{evt}) {
1127
1128                     # a cached event won't roll back the transaction (a la die_event), but
1129                     # the transaction will get rolled back in finish_rec_import_attempt() below
1130                     $$report_args{evt} = $new_rec_perm_cache->{evt};
1131                     $$report_args{import_error} = 'import.record.perm_failure';
1132
1133                 } else { # perm checks succeeded
1134
1135                     $logger->info("vl: creating new $type record for queued record $rec_id");
1136
1137                     if ($type eq 'bib') {
1138
1139                         $record = OpenILS::Application::Cat::BibCommon->biblio_record_xml_import(
1140                             $e, $rec->marc, $bib_sources{$rec->bib_source}, undef, 1);
1141
1142                     } else { # authority record
1143
1144                         $record = OpenILS::Application::Cat::AuthCommon->import_authority_record($e, $rec->marc); #$source);
1145                     }
1146
1147                     if($U->event_code($record)) {
1148                         $$report_args{import_error} = 'import.duplicate.tcn' 
1149                             if $record->{textcode} eq 'OPEN_TCN_NOT_FOUND';
1150                         $$report_args{evt} = $record;
1151
1152                     } else {
1153
1154                         $logger->info("vl: successfully imported new $type record");
1155                         $rec->imported_as($record->id);
1156                         $imported = 1;
1157                     }
1158                 }
1159             }
1160         }
1161
1162         if($imported) {
1163
1164             $rec->import_time('now');
1165             $rec->clear_import_error;
1166             $rec->clear_error_detail;
1167
1168             if($e->$update_func($rec)) {
1169
1170                 if($type eq 'bib') {
1171
1172                     # see if this record is linked from an acq record.
1173                     my $li = $e->search_acq_lineitem(
1174                         {queued_record => $rec->id, state => {'!=' => 'canceled'}})->[0];
1175
1176                     if ($li) { 
1177                         # if so, update the acq lineitem to point to the imported record
1178                         $li->eg_bib_id($rec->imported_as);
1179                         $$report_args{evt} = $e->die_event unless $e->update_acq_lineitem($li);
1180                     }
1181                 }
1182
1183                 push @success_rec_ids, $rec_id;
1184                 finish_rec_import_attempt($report_args);
1185
1186             } else {
1187                 $imported = 0;
1188             }
1189         }
1190
1191         if(!$imported) {
1192             $logger->info("vl: record $rec_id was not imported");
1193             $$report_args{evt} = $e->event unless $$report_args{evt};
1194             $$report_args{no_import} = 1;
1195             finish_rec_import_attempt($report_args);
1196         }
1197     }
1198
1199     # see if we need to mark any queues as complete
1200     for my $q_id (keys %queues) {
1201
1202         my $e = new_editor(xact => 1);
1203         my $remaining = $e->$search_func(
1204             [{queue => $q_id, import_time => undef}, {limit =>1}], {idlist => 1});
1205
1206         unless(@$remaining) {
1207             my $queue = $e->$retrieve_queue_func($q_id);
1208             unless($U->is_true($queue->complete)) {
1209                 $queue->complete('t');
1210                 $e->$update_queue_func($queue) or return $e->die_event;
1211                 $e->commit;
1212                 next;
1213             }
1214         } 
1215         $e->rollback;
1216     }
1217
1218     # import the copies
1219     import_record_asset_list_impl($conn, \@success_rec_ids, $requestor, $args) if @success_rec_ids;
1220
1221     $conn->respond({total => $$report_args{total}, progress => $$report_args{progress}});
1222     return undef;
1223 }
1224
1225
1226 sub try_auto_overlay {
1227     my $e = shift;
1228     my $type = shift;
1229     my $report_args = shift;
1230     my $overlay_func  = shift;
1231     my $retrieve_func = shift; 
1232     my $rec_class = shift;
1233     my $rec_id  = shift;
1234     my $match_quality_ratio = shift;
1235     my $merge_profile  = shift;
1236     my $ft_merge_profile = shift;
1237
1238     my $imported = 0;
1239     my $error = 0;
1240
1241     # Find the best match and overlay if the quality ratio allows it.
1242     my $res = $e->json_query(
1243         {
1244             from => [
1245                 $overlay_func,
1246                 $rec_id, 
1247                 $merge_profile,
1248                 $match_quality_ratio
1249             ]
1250         }
1251     );
1252
1253     if($res and ($res = $res->[0])) {
1254
1255         if($res->{$overlay_func} eq 't') {
1256
1257             # first attempt succeeded
1258             $imported = 1;
1259
1260         } else {
1261
1262             # quality-limited merge failed with insufficient quality.  If there is a 
1263             # fall-through merge profile, re-do the merge with the alternate profile
1264             # and no quality restriction.
1265
1266             if($ft_merge_profile and $match_quality_ratio > 0) {
1267
1268                 $logger->info("vl: $type auto-merge failed with profile $merge_profile; ".
1269                     "re-merging with fall-through profile $ft_merge_profile");
1270
1271                 my $res = $e->json_query(
1272                     {
1273                         from => [
1274                             $overlay_func,
1275                             $rec_id, 
1276                             $ft_merge_profile,
1277                             0 # minimum quality not required
1278                         ]
1279                     }
1280                 );
1281
1282                 if($res and ($res = $res->[0])) {
1283
1284                     if($res->{$overlay_func} eq 't') {
1285
1286                         # second attempt succeeded
1287                         $imported = 1;
1288
1289                     } else {
1290
1291                         # failed to merge on second attempt
1292                         $logger->info("vl: $type auto-merge with fall-through failed for queued rec $rec_id");
1293                     }
1294                 } else {
1295                     
1296                     # second attempt died 
1297                     $error = 1;
1298                     $logger->error("vl: Error attempting overlay with func=$overlay_func, profile=$merge_profile, record=$rec_id");
1299                 }
1300
1301             } else { 
1302
1303                 # failed to merge on first attempt, no fall-through was provided
1304                 $$report_args{import_error} = 'overlay.record.quality' if $match_quality_ratio > 0;
1305                 $logger->info("vl: $type auto-merge failed for queued rec $rec_id");
1306             }
1307         }
1308
1309     } else {
1310
1311         # first attempt died 
1312         $error = 1;
1313         $logger->error("vl: Error attempting overlay with func=$overlay_func, profile=$merge_profile, record=$rec_id");
1314     }
1315
1316     if($imported) {
1317
1318         # at least 1 of the attempts succeeded
1319         $logger->info("vl: $type auto-merge succeeded for queued rec $rec_id");
1320
1321         # re-fetch the record to pick up the imported_as value from the DB
1322         $$report_args{rec} = $e->$retrieve_func([
1323             $rec_id, {flesh => 1, flesh_fields => {$rec_class => ['matches']}}]);
1324     }
1325
1326     return ($imported, $error, $$report_args{rec});
1327 }
1328
1329
1330 # tracks any import errors, commits the current xact, responds to the client
1331 sub finish_rec_import_attempt {
1332     my $args = shift;
1333     my $evt = $$args{evt};
1334     my $rec = $$args{rec};
1335     my $e = $$args{e};
1336
1337     my $error = $$args{import_error};
1338     $error = 'general.unknown' if $evt and not $error;
1339
1340     # error tracking
1341     if($rec) {
1342
1343         if($error or $evt) {
1344             # failed import
1345             # since an error occurred, there's no guarantee the transaction wasn't 
1346             # rolled back.  force a rollback and create a new editor.
1347             $e->rollback;
1348             $e = new_editor(xact => 1);
1349             $rec->import_error($error);
1350
1351             if($evt) {
1352                 my $detail = sprintf("%s : %s", $evt->{textcode}, substr($evt->{desc}, 0, 140));
1353                 $rec->error_detail($detail);
1354             }
1355
1356             my $method = 'update_vandelay_queued_bib_record';
1357             $method =~ s/bib/authority/ if $$args{type} eq 'auth';
1358             $e->$method($rec) and $e->commit or $e->rollback;
1359
1360         } else {
1361             # commit the successful import
1362             $e->commit;
1363         }
1364
1365     } else {
1366         # requested queued record was not found
1367         $e->rollback;
1368     }
1369         
1370     # respond to client
1371     if($$args{report_all} or ($$args{progress} % $$args{step}) == 0) {
1372         $$args{conn}->respond({
1373             total => $$args{total}, 
1374             progress => $$args{progress}, 
1375             imported => ($rec) ? $rec->id : undef,
1376             import_error => $error,
1377             no_import => $$args{no_import},
1378             err_event => $evt
1379         });
1380         $$args{step} *= 2 unless $$args{step} == 256;
1381     }
1382
1383     $$args{progress}++;
1384 }
1385
1386
1387
1388
1389
1390 __PACKAGE__->register_method(  
1391     api_name    => "open-ils.vandelay.bib_queue.owner.retrieve",
1392     method      => 'owner_queue_retrieve',
1393     api_level   => 1,
1394     argc        => 2,
1395     stream      => 1,
1396     record_type => 'bib'
1397 );
1398 __PACKAGE__->register_method(  
1399     api_name    => "open-ils.vandelay.authority_queue.owner.retrieve",
1400     method      => 'owner_queue_retrieve',
1401     api_level   => 1,
1402     argc        => 2,
1403     stream      => 1,
1404     record_type => 'auth'
1405 );
1406
1407 sub owner_queue_retrieve {
1408     my($self, $conn, $auth, $owner_id, $filters) = @_;
1409     my $e = new_editor(authtoken => $auth, xact => 1);
1410     return $e->die_event unless $e->checkauth;
1411     $owner_id = $e->requestor->id; # XXX add support for viewing other's queues?
1412     my $queues;
1413     $filters ||= {};
1414     my $search = {owner => $owner_id};
1415     $search->{$_} = $filters->{$_} for keys %$filters;
1416
1417     if($self->{record_type} eq 'bib') {
1418         $queues = $e->search_vandelay_bib_queue(
1419             [$search, {order_by => {vbq => 'evergreen.lowercase(name)'}}]);
1420     } else {
1421         $queues = $e->search_vandelay_authority_queue(
1422             [$search, {order_by => {vaq => 'evergreen.lowercase(name)'}}]);
1423     }
1424     $conn->respond($_) for @$queues;
1425     $e->rollback;
1426     return undef;
1427 }
1428
1429 __PACKAGE__->register_method(  
1430     api_name    => "open-ils.vandelay.bib_queue.delete",
1431     method      => "delete_queue",
1432     api_level   => 1,
1433     argc        => 2,
1434     record_type => 'bib'
1435 );            
1436 __PACKAGE__->register_method(  
1437     api_name    => "open-ils.vandelay.auth_queue.delete",
1438     method      => "delete_queue",
1439     api_level   => 1,
1440     argc        => 2,
1441     record_type => 'auth'
1442 );  
1443
1444 sub delete_queue {
1445     my($self, $conn, $auth, $q_id) = @_;
1446     my $e = new_editor(xact => 1, authtoken => $auth);
1447     return $e->die_event unless $e->checkauth;
1448     if($self->{record_type} eq 'bib') {
1449         return $e->die_event unless $e->allowed('CREATE_BIB_IMPORT_QUEUE');
1450         my $queue = $e->retrieve_vandelay_bib_queue($q_id)
1451             or return $e->die_event;
1452         $e->delete_vandelay_bib_queue($queue)
1453             or return $e->die_event;
1454     } else {
1455            return $e->die_event unless $e->allowed('CREATE_AUTHORITY_IMPORT_QUEUE');
1456         my $queue = $e->retrieve_vandelay_authority_queue($q_id)
1457             or return $e->die_event;
1458         $e->delete_vandelay_authority_queue($queue)
1459             or return $e->die_event;
1460     }
1461     $e->commit;
1462     return 1;
1463 }
1464
1465
1466 __PACKAGE__->register_method(  
1467     api_name    => "open-ils.vandelay.queued_bib_record.html",
1468     method      => 'queued_record_html',
1469     api_level   => 1,
1470     argc        => 2,
1471     stream      => 1,
1472     record_type => 'bib'
1473 );
1474 __PACKAGE__->register_method(  
1475     api_name    => "open-ils.vandelay.queued_authority_record.html",
1476     method      => 'queued_record_html',
1477     api_level   => 1,
1478     argc        => 2,
1479     stream      => 1,
1480     record_type => 'auth'
1481 );
1482
1483 sub queued_record_html {
1484     my($self, $conn, $auth, $rec_id) = @_;
1485     my $e = new_editor(xact=>1,authtoken => $auth);
1486     return $e->die_event unless $e->checkauth;
1487     my $rec;
1488     if($self->{record_type} eq 'bib') {
1489         $rec = $e->retrieve_vandelay_queued_bib_record($rec_id)
1490             or return $e->die_event;
1491     } else {
1492         $rec = $e->retrieve_vandelay_queued_authority_record($rec_id)
1493             or return $e->die_event;
1494     }
1495
1496     $e->rollback;
1497     return $U->simplereq(
1498         'open-ils.search',
1499         'open-ils.search.biblio.record.html', undef, 1, $rec->marc);
1500 }
1501
1502
1503 __PACKAGE__->register_method(  
1504     api_name    => "open-ils.vandelay.bib_queue.summary.retrieve", 
1505     method      => 'retrieve_queue_summary',
1506     api_level   => 1,
1507     argc        => 2,
1508     stream      => 1,
1509     record_type => 'bib'
1510 );
1511 __PACKAGE__->register_method(  
1512     api_name    => "open-ils.vandelay.auth_queue.summary.retrieve",
1513     method      => 'retrieve_queue_summary',
1514     api_level   => 1,
1515     argc        => 2,
1516     stream      => 1,
1517     record_type => 'auth'
1518 );
1519
1520 sub retrieve_queue_summary {
1521     my($self, $conn, $auth, $queue_id) = @_;
1522     my $e = new_editor(xact=>1, authtoken => $auth);
1523     return $e->die_event unless $e->checkauth;
1524
1525     my $queue;
1526     my $type = $self->{record_type};
1527     if($type eq 'bib') {
1528         $queue = $e->retrieve_vandelay_bib_queue($queue_id)
1529             or return $e->die_event;
1530     } else {
1531         $queue = $e->retrieve_vandelay_authority_queue($queue_id)
1532             or return $e->die_event;
1533     }
1534
1535     my $evt = check_queue_perms($e, $type, $queue);
1536     return $evt if $evt;
1537
1538     my $search = 'search_vandelay_queued_bib_record';
1539     $search =~ s/bib/authority/ if $type ne 'bib';
1540
1541     my $summary = {
1542         queue => $queue,
1543         total => scalar(@{$e->$search({queue => $queue_id}, {idlist=>1})}),
1544         imported => scalar(@{$e->$search({queue => $queue_id, import_time => {'!=' => undef}}, {idlist=>1})}),
1545     };
1546
1547     my $class = ($type eq 'bib') ? 'vqbr' : 'vqar';
1548     $summary->{rec_import_errors} = $e->json_query({
1549         select => {$class => [{alias => 'count', column => 'id', transform => 'count', aggregate => 1}]},
1550         from => $class,
1551         where => {queue => $queue_id, import_error => {'!=' => undef}}
1552     })->[0]->{count};
1553
1554     if($type eq 'bib') {
1555         
1556         # count of all items attached to records in the queue in question
1557         my $query = {
1558             select => {vii => [{alias => 'count', column => 'id', transform => 'count', aggregate => 1}]},
1559             from => 'vii',
1560             where => {
1561                 record => {
1562                     in => {
1563                         select => {vqbr => ['id']},
1564                         from => 'vqbr',
1565                         where => {queue => $queue_id}
1566                     }
1567                 }
1568             }
1569         };
1570         $summary->{total_items} = $e->json_query($query)->[0]->{count};
1571
1572         # count of items we attempted to import, but errored, attached to records in the queue in question
1573         $query->{where}->{import_error} = {'!=' => undef};
1574         $summary->{item_import_errors} = $e->json_query($query)->[0]->{count};
1575
1576         # count of items we successfully imported attached to records in the queue in question
1577         delete $query->{where}->{import_error};
1578         $query->{where}->{import_time} = {'!=' => undef};
1579         $summary->{total_items_imported} = $e->json_query($query)->[0]->{count};
1580     }
1581
1582     return $summary;
1583 }
1584
1585 # --------------------------------------------------------------------------------
1586 # Given a list of queued record IDs, imports all items attached to those records
1587 # --------------------------------------------------------------------------------
1588 sub import_record_asset_list_impl {
1589     my($conn, $rec_ids, $requestor, $args) = @_;
1590
1591     my $roe = new_editor(xact=> 1, requestor => $requestor);
1592
1593     # for speed, filter out any records have not been 
1594     # imported or have no import items to load
1595     $rec_ids = $roe->json_query({
1596         select => {vqbr => ['id']},
1597         from => {vqbr => 'vii'},
1598         where => {'+vqbr' => {
1599             id => $rec_ids,
1600             import_time => {'!=' => undef}
1601         }},
1602         distinct => 1
1603     });
1604     $rec_ids = [map {$_->{id}} @$rec_ids];
1605
1606     my $report_args = {
1607         conn => $conn,
1608         total => scalar(@$rec_ids),
1609         step => 1, # how often to respond
1610         progress => 1,
1611         in_count => 0,
1612     };
1613
1614     for my $rec_id (@$rec_ids) {
1615         my $rec = $roe->retrieve_vandelay_queued_bib_record($rec_id);
1616         my $item_ids = $roe->search_vandelay_import_item(
1617             {record => $rec->id, import_error => undef}, 
1618             {idlist=>1}
1619         );
1620
1621         # if any items have no call_number label and a value should be
1622         # applied automatically (via org settings), we want to use the same
1623         # call number label for every copy per org per record.
1624         my $auto_callnumber = {};
1625
1626         my $opp_acq_copy_overlay = $args->{opp_acq_copy_overlay};
1627         my @overlaid_copy_ids;
1628         for my $item_id (@$item_ids) {
1629             my $e = new_editor(requestor => $requestor, xact => 1);
1630             my $item = $e->retrieve_vandelay_import_item($item_id);
1631             my ($copy, $vol, $evt);
1632
1633             $$report_args{e} = $e;
1634             $$report_args{evt} = undef;
1635             $$report_args{import_item} = $item;
1636             $$report_args{import_error} = undef;
1637
1638             if (my $copy_id = $item->internal_id) { # assignment
1639                 # copy matches an existing copy.  Overlay instead of create.
1640
1641                 $logger->info("vl: performing copy overlay for internal_id=$copy_id");
1642
1643                 my $qt = $e->json_query({
1644                     select => {vbq => ['queue_type']},
1645                     from => {vqbr => 'vbq'},
1646                     where => {'+vqbr' => {id => $rec_id}}
1647                 })->[0]->{queue_type};
1648
1649                 if ($qt eq 'acq') {
1650                     # internal_id for ACQ queues refers to acq.lineitem_detail.id
1651                     # pull the real copy id from the acq LID
1652
1653                     my $lid = $e->retrieve_acq_lineitem_detail($copy_id);
1654                     if (!$lid) {
1655                         $$report_args{evt} = $e->die_event;
1656                         respond_with_status($report_args);
1657                         next;
1658                     }
1659                     $copy_id = $lid->eg_copy_id;
1660                     $logger->info("vl: performing ACQ copy overlay for copy $copy_id");
1661                 }
1662
1663                 $copy = $e->search_asset_copy([
1664                     {id => $copy_id, deleted => 'f'},
1665                     {flesh => 1, flesh_fields => {acp => ['call_number']}}
1666                 ])->[0];
1667
1668                 if (!$copy) {
1669                     $$report_args{evt} = $e->die_event;
1670                     respond_with_status($report_args);
1671                     next;
1672                 }
1673
1674                 # prevent update of unrelated copies
1675                 if ($copy->call_number->record != $rec->imported_as) {
1676                     $logger->info("vl: attempt to overlay unrelated copy=$copy_id; rec=".$rec->imported_as);
1677
1678                     $evt = OpenILS::Event->new('INVALID_IMPORT_COPY_ID', 
1679                         note => 'Cannot overlay copies for unlinked bib',
1680                         bre => $rec->imported_as, 
1681                         copy_id => $copy_id
1682                     );
1683                     $$report_args{evt} = $evt;
1684                     respond_with_status($report_args);
1685                     next;
1686                 }
1687             } elsif ($opp_acq_copy_overlay) { # we are going to "opportunistically" overlay received, in-process acq copies
1688                 # recv_time should never be null if the copy status is
1689                 # "In Process", so that is just a double-check
1690                 my $query = [
1691                     {
1692                         "recv_time" => {"!=" => undef},
1693                         "owning_lib" => $item->owning_lib,
1694                         "+acn" => {"record" => $rec->imported_as},
1695                         "+acp" => {"status" => OILS_COPY_STATUS_IN_PROCESS}
1696                     },
1697                     {
1698                         "join" => {
1699                             "acp" => {
1700                                 "join" => "acn"
1701                             }
1702                         },
1703                         "flesh" => 2,
1704                         "flesh_fields" => {
1705                             "acqlid" => ["eg_copy_id"],
1706                             "acp" => ["call_number"]
1707                         }
1708                     }
1709                 ];
1710                 # don't overlay the same copy twice
1711                 $query->[0]{"+acp"}{"id"} = {"not in" => \@overlaid_copy_ids} if @overlaid_copy_ids;
1712                 if (my $acqlid = $e->search_acq_lineitem_detail($query)->[0]) {
1713                     $copy = $acqlid->eg_copy_id;
1714                     push(@overlaid_copy_ids, $copy->id);
1715                 }
1716             }
1717
1718             if ($copy) { # we found a copy to overlay
1719
1720                 # overlaying copies requires an extra permission
1721                 if (!$e->allowed("IMPORT_OVERLAY_COPY", $copy->call_number->owning_lib)) {
1722                     $$report_args{evt} = $e->die_event;
1723                     respond_with_status($report_args);
1724                     next;
1725                 }
1726
1727                 # are we updating the call-number?
1728                 if ($item->call_number and $item->call_number ne $copy->call_number->label) {
1729
1730                     my $count = $e->json_query({
1731                         select => {acp => [{
1732                             alias => 'count', 
1733                             column => 'id', 
1734                             transform => 'count', 
1735                             aggregate => 1
1736                         }]},
1737                         from => 'acp',
1738                         where => {
1739                             deleted => 'f',
1740                             call_number => $copy->call_number->id
1741                         }
1742                     })->[0]->{count};
1743
1744                     if ($count == 1) {
1745
1746                         my $evol = $e->search_asset_call_number({
1747                             id => {'<>' => $copy->call_number->id},
1748                             label => $item->call_number,
1749                             owning_lib => $copy->call_number->owning_lib,
1750                             record => $copy->call_number->record,
1751                             prefix => $copy->call_number->prefix,
1752                             suffix => $copy->call_number->suffix,
1753                             deleted => 'f'
1754                         })->[0];
1755
1756                         if ($evol) {
1757                             # call number for overlayed copy changed to a
1758                             # label already in use by another call number.
1759                             # merge the old CN into the new CN
1760                             
1761                             $logger->info(
1762                                 "vl: moving copy to new call number ".
1763                                 $item->call_number);
1764
1765                             my ($mvol, $err) = 
1766                                 OpenILS::Application::Cat::Merge::merge_volumes(
1767                                     $e, [$copy->call_number], $evol);
1768
1769                             if (!$mvol) {
1770                                 $$report_args{evt} = $err;
1771                                 respond_with_status($report_args);
1772                                 next;
1773                             }
1774
1775                             # update our copy *cough* of the copy to pick up
1776                             # any changes made my merge_volumes
1777                             $copy = $e->retrieve_asset_copy([
1778                                 $copy->id,
1779                                 {flesh => 1, flesh_fields => {acp => ['call_number']}}
1780                             ]);
1781
1782                         } else {
1783                             $logger->info(
1784                                 "vl: updating copy call number label".
1785                                 $item->call_number);
1786
1787                             $copy->call_number->label($item->call_number);
1788                             if (!$e->update_asset_call_number($copy->call_number)) {
1789                                 $$report_args{evt} = $e->die_event;
1790                                 respond_with_status($report_args);
1791                                 next;
1792                             }
1793                         }
1794
1795                     } else {
1796
1797                         # otherwise, move the copy to a new/existing 
1798                         # call-number with the given label/owner
1799                         # note that overlay does not allow the owning_lib 
1800                         # to be changed.  Should it?
1801
1802                         $logger->info("vl: moving copy to new callnumber in copy overlay");
1803
1804                         ($vol, $evt) =
1805                             OpenILS::Application::Cat::AssetCommon->find_or_create_volume(
1806                                 $e, $item->call_number, 
1807                                 $copy->call_number->record, 
1808                                 $copy->call_number->owning_lib
1809                             );
1810
1811                         if($evt) {
1812                             $$report_args{evt} = $evt;
1813                             respond_with_status($report_args);
1814                             next;
1815                         }
1816
1817                         $copy->call_number($vol);
1818                     }
1819                 } # cn-update
1820
1821                 # for every field that has a non-'' value, overlay the copy value
1822                 foreach (qw/ barcode location circ_lib status 
1823                     circulate deposit deposit_amount ref holdable 
1824                     price circ_as_type alert_message opac_visible circ_modifier/) {
1825
1826                     my $val = $item->$_();
1827                     $copy->$_($val) if defined $val and $val ne '';
1828                 }
1829
1830                 # de-flesh for update
1831                 $copy->call_number($copy->call_number->id);
1832                 $copy->ischanged(1);
1833
1834                 $evt = OpenILS::Application::Cat::AssetCommon->
1835                     update_fleshed_copies($e, {all => 1}, undef, [$copy]);
1836
1837                 if($evt) {
1838                     $$report_args{evt} = $evt;
1839                     respond_with_status($report_args);
1840                     next;
1841                 }
1842
1843             } else { 
1844
1845                 # Creating a new copy
1846                 $logger->info("vl: creating new copy in import");
1847
1848                 # appply defaults values from org settings as needed
1849                 # if $auto_callnumber is unset, it will be set within
1850                 apply_import_item_defaults($e, $item, $auto_callnumber);
1851
1852                 # --------------------------------------------------------------------------------
1853                 # Find or create the volume
1854                 # --------------------------------------------------------------------------------
1855                 my ($vol, $evt) =
1856                     OpenILS::Application::Cat::AssetCommon->find_or_create_volume(
1857                         $e, $item->call_number, $rec->imported_as, $item->owning_lib);
1858
1859                 if($evt) {
1860                     $$report_args{evt} = $evt;
1861                     respond_with_status($report_args);
1862                     next;
1863                 }
1864
1865                 # --------------------------------------------------------------------------------
1866                 # Create the new copy
1867                 # --------------------------------------------------------------------------------
1868                 $copy = Fieldmapper::asset::copy->new;
1869                 $copy->loan_duration(2);
1870                 $copy->fine_level(2);
1871                 $copy->barcode($item->barcode);
1872                 $copy->location($item->location);
1873                 $copy->circ_lib($item->circ_lib || $item->owning_lib);
1874                 $copy->status( defined($item->status) ? $item->status : OILS_COPY_STATUS_IN_PROCESS );
1875                 $copy->circulate($item->circulate);
1876                 $copy->deposit($item->deposit);
1877                 $copy->deposit_amount($item->deposit_amount);
1878                 $copy->ref($item->ref);
1879                 $copy->holdable($item->holdable);
1880                 $copy->price($item->price);
1881                 $copy->circ_as_type($item->circ_as_type);
1882                 $copy->alert_message($item->alert_message);
1883                 $copy->opac_visible($item->opac_visible);
1884                 $copy->circ_modifier($item->circ_modifier);
1885
1886                 # --------------------------------------------------------------------------------
1887                 # Check for dupe barcode
1888                 # --------------------------------------------------------------------------------
1889                 if($evt = OpenILS::Application::Cat::AssetCommon->create_copy($e, $vol, $copy)) {
1890                     $$report_args{evt} = $evt;
1891                     $$report_args{import_error} = 'import.item.duplicate.barcode'
1892                         if $evt->{textcode} eq 'ITEM_BARCODE_EXISTS';
1893                     respond_with_status($report_args);
1894                     next;
1895                 }
1896
1897                 # --------------------------------------------------------------------------------
1898                 # create copy notes
1899                 # --------------------------------------------------------------------------------
1900                 $evt = OpenILS::Application::Cat::AssetCommon->create_copy_note(
1901                     $e, $copy, '', $item->pub_note, 1) if $item->pub_note;
1902
1903                 if($evt) {
1904                     $$report_args{evt} = $evt;
1905                     respond_with_status($report_args);
1906                     next;
1907                 }
1908
1909                 $evt = OpenILS::Application::Cat::AssetCommon->create_copy_note(
1910                     $e, $copy, '', $item->priv_note) if $item->priv_note;
1911
1912                 if($evt) {
1913                     $$report_args{evt} = $evt;
1914                     respond_with_status($report_args);
1915                     next;
1916                 }
1917             }
1918
1919             # set the import data on the import item
1920             $item->imported_as($copy->id); # $copy->id is set by create_copy() ^--
1921             $item->import_time('now');
1922
1923             unless($e->update_vandelay_import_item($item)) {
1924                 $$report_args{evt} = $e->die_event;
1925                 respond_with_status($report_args);
1926                 next;
1927             }
1928
1929             # --------------------------------------------------------------------------------
1930             # Item import succeeded
1931             # --------------------------------------------------------------------------------
1932             $e->commit;
1933             $$report_args{in_count}++;
1934             respond_with_status($report_args);
1935             $logger->info("vl: successfully imported item " . $item->barcode);
1936         }
1937     }
1938
1939     $roe->rollback;
1940     return undef;
1941 }
1942
1943 sub apply_import_item_defaults {
1944     my ($e, $item, $auto_cn) = @_;
1945     my $org = $item->owning_lib || $item->circ_lib;
1946     my %c = %item_defaults_cache;  
1947
1948     # fetch and cache the org unit setting value (unless 
1949     # it's already cached) and return the value to the caller
1950     my $set = sub {
1951         my $name = shift;
1952         return $c{$org}{$name} if defined $c{$org}{$name};
1953         my $sname = "vandelay.item.$name";
1954         $c{$org}{$name} = $U->ou_ancestor_setting_value($org, $sname, $e);
1955         $c{$org}{$name} = '' unless defined $c{$org}{$name};
1956         return $c{$org}{$name};
1957     };
1958
1959     if (!$item->barcode) {
1960
1961         if ($set->('barcode.auto')) {
1962
1963             my $pfx = $set->('barcode.prefix') || 'VAN';
1964             my $barcode = $pfx . $item->record . $item->id;
1965
1966             $logger->info("vl: using auto barcode $barcode for ".$item->id);
1967             $item->barcode($barcode);
1968
1969         } else {
1970             $logger->error("vl: no barcode (or defualt) for item ".$item->id);
1971         }
1972     }
1973
1974     if (!$item->call_number) {
1975
1976         if ($set->('call_number.auto')) {
1977
1978             if (!$auto_cn->{$org}) {
1979                 my $pfx = $set->('call_number.prefix') || 'VAN';
1980
1981                 # use the ID of the first item to differentiate this 
1982                 # call number from others linked to the same record
1983                 $auto_cn->{$org} = $pfx . $item->record . $item->id;
1984             }
1985
1986             $logger->info("vl: using auto call number ".$auto_cn->{$org});
1987             $item->call_number($auto_cn->{$org});
1988
1989         } else {
1990             $logger->error("vl: no call number or default for item ".$item->id);
1991         }
1992     }
1993 }
1994
1995
1996 sub respond_with_status {
1997     my $args = shift;
1998     my $e = $$args{e};
1999
2000     #  If the import failed, track the failure reason
2001
2002     my $error = $$args{import_error};
2003     my $evt = $$args{evt};
2004
2005     if($error or $evt) {
2006
2007         my $item = $$args{import_item};
2008         $logger->info("vl: unable to import item " . $item->barcode);
2009
2010         $error ||= 'general.unknown';
2011         $item->import_error($error);
2012
2013         if($evt) {
2014             my $detail = sprintf("%s : %s", $evt->{textcode}, substr($evt->{desc}, 0, 140));
2015             $item->error_detail($detail);
2016         }
2017
2018         # state of the editor is unknown at this point.  Force a rollback and start over.
2019         $e->rollback;
2020         $e = new_editor(xact => 1);
2021         $e->update_vandelay_import_item($item);
2022         $e->commit;
2023     }
2024
2025     if($$args{report_all} or ($$args{progress} % $$args{step}) == 0) {
2026         $$args{conn}->respond({
2027             total => $$args{total},
2028             progress => $$args{progress},
2029             success_count => $$args{success_count},
2030             err_event => $evt
2031         });
2032         $$args{step} *= 2 unless $$args{step} == 256;
2033     }
2034
2035     $$args{progress}++;
2036 }
2037
2038 __PACKAGE__->register_method(  
2039     api_name    => "open-ils.vandelay.match_set.get_tree",
2040     method      => "match_set_get_tree",
2041     api_level   => 1,
2042     argc        => 2,
2043     signature   => {
2044         desc    => q/For a given vms object, return a tree of match set points
2045                     represented by a vmsp object with recursively fleshed
2046                     children./
2047     }
2048 );
2049
2050 sub match_set_get_tree {
2051     my ($self, $conn, $authtoken, $match_set_id) = @_;
2052
2053     $match_set_id = int($match_set_id) or return;
2054
2055     my $e = new_editor("authtoken" => $authtoken);
2056     $e->checkauth or return $e->die_event;
2057
2058     my $set = $e->retrieve_vandelay_match_set($match_set_id) or
2059         return $e->die_event;
2060
2061     $e->allowed("ADMIN_IMPORT_MATCH_SET", $set->owner) or
2062         return $e->die_event;
2063
2064     my $tree = $e->search_vandelay_match_set_point([
2065         {"match_set" => $match_set_id, "parent" => undef},
2066         {"flesh" => -1, "flesh_fields" => {"vmsp" => ["children"]}}
2067     ]) or return $e->die_event;
2068
2069     return pop @$tree;
2070 }
2071
2072
2073 __PACKAGE__->register_method(
2074     api_name    => "open-ils.vandelay.match_set.update",
2075     method      => "match_set_update_tree",
2076     api_level   => 1,
2077     argc        => 3,
2078     signature   => {
2079         desc => q/Replace any vmsp objects associated with a given (by ID) vms
2080                 with the given objects (recursively fleshed vmsp tree)./
2081     }
2082 );
2083
2084 sub _walk_new_vmsp {
2085     my ($e, $match_set_id, $node, $parent_id) = @_;
2086
2087     my $point = new Fieldmapper::vandelay::match_set_point;
2088     $point->parent($parent_id);
2089     $point->match_set($match_set_id);
2090     $point->$_($node->$_) 
2091         for (qw/bool_op svf tag subfield negate quality heading/);
2092
2093     $e->create_vandelay_match_set_point($point) or return $e->die_event;
2094
2095     $parent_id = $e->data->id;
2096     if ($node->children && @{$node->children}) {
2097         for (@{$node->children}) {
2098             return $e->die_event if
2099                 _walk_new_vmsp($e, $match_set_id, $_, $parent_id);
2100         }
2101     }
2102
2103     return;
2104 }
2105
2106 sub match_set_update_tree {
2107     my ($self, $conn, $authtoken, $match_set_id, $tree) = @_;
2108
2109     my $e = new_editor("xact" => 1, "authtoken" => $authtoken);
2110     $e->checkauth or return $e->die_event;
2111
2112     my $set = $e->retrieve_vandelay_match_set($match_set_id) or
2113         return $e->die_event;
2114
2115     $e->allowed("ADMIN_IMPORT_MATCH_SET", $set->owner) or
2116         return $e->die_event;
2117
2118     my $existing = $e->search_vandelay_match_set_point([
2119         {"match_set" => $match_set_id},
2120         {"order_by" => {"vmsp" => "id DESC"}}
2121     ]) or return $e->die_event;
2122
2123     # delete points, working up from leaf points to the root
2124     while(@$existing) {
2125         for my $point (shift @$existing) {
2126             if( grep {$_->parent eq $point->id} @$existing) {
2127                 push(@$existing, $point);
2128             } else {
2129                 $e->delete_vandelay_match_set_point($point) or return $e->die_event;
2130             }
2131         }
2132     }
2133
2134     _walk_new_vmsp($e, $match_set_id, $tree);
2135
2136     $e->commit or return $e->die_event;
2137 }
2138
2139 __PACKAGE__->register_method(  
2140     api_name    => 'open-ils.vandelay.bib_queue.to_bucket',
2141     method      => 'bib_queue_to_bucket',
2142     api_level   => 1,
2143     argc        => 2,
2144     signature   => {
2145         desc    => q/Add to or create a new bib container (bucket) with the successfully 
2146                     imported contents of a vandelay queue.  Any user that has Vandelay 
2147                     queue create permissions can append or create buckets from his-her own queues./,
2148         params  => [
2149             {desc => 'Authtoken', type => 'string'},
2150             {desc => 'Queue ID', type => 'number'},
2151             {desc => 'Bucket Name', type => 'string'}
2152         ],
2153         return  => {desc => q/
2154             {bucket => $bucket, addcount => number-of-items-added-to-bucket, item_count => total-bucket-item-count} on success,
2155             {add_count => 0} if there is nothing to do, and Event on error/}
2156     }
2157 );
2158
2159 sub bib_queue_to_bucket {
2160     my ($self, $conn, $auth, $q_id, $bucket_name) = @_;
2161
2162     my $e = new_editor(xact => 1, authtoken => $auth);
2163     return $e->die_event unless $e->checkauth;
2164     
2165     my $queue = $e->retrieve_vandelay_bib_queue($q_id)
2166         or return $e->die_event;
2167
2168     return OpenILS::Event->new('BAD_PARAMS', 
2169         note => q/Bucket creator must be queue owner/)
2170         unless $queue->owner == $e->requestor->id;
2171
2172     # find the bib IDs that will go into the bucket
2173     my $bib_ids = $e->json_query({
2174         select => {vqbr => ['imported_as']},
2175         from => 'vqbr',
2176         where => {queue => $q_id, imported_as => {'!=' => undef}}
2177     });
2178
2179     if (!@$bib_ids) { # no records to add
2180         $e->rollback;
2181         return {add_count => 0};
2182     }
2183
2184     # allow user to add to an existing bucket by name
2185     my $bucket = $e->search_container_biblio_record_entry_bucket({
2186         owner => $e->requestor->id, 
2187         name => $bucket_name,
2188         btype => 'vandelay_queue'
2189     })->[0];
2190
2191     # if the bucket does not exist, create a new one
2192     if (!$bucket) { 
2193         $bucket = Fieldmapper::container::biblio_record_entry_bucket->new;
2194         $bucket->name($bucket_name);
2195         $bucket->owner($e->requestor->id);
2196         $bucket->btype('vandelay_queue');
2197
2198         $e->create_container_biblio_record_entry_bucket($bucket)
2199             or return $e->die_event;
2200     }
2201
2202     # create the new bucket items
2203     for my $bib_id ( map {$_->{imported_as}} @$bib_ids ) {
2204         my $item = Fieldmapper::container::biblio_record_entry_bucket_item->new;
2205         $item->target_biblio_record_entry($bib_id);
2206         $item->bucket($bucket->id);
2207         $e->create_container_biblio_record_entry_bucket_item($item)
2208             or return $e->die_event;
2209     }
2210
2211     # re-fetch the bucket to pick up the correct create_time
2212     $bucket = $e->retrieve_container_biblio_record_entry_bucket($bucket->id)
2213         or return $e->die_event;
2214
2215     # get the total count of items in this bucket
2216     my $count = $e->json_query({
2217         select => {cbrebi => [{
2218             aggregate =>  1,
2219             transform => 'count',
2220             alias => 'count',
2221             column => 'id'
2222         }]},
2223         from => 'cbrebi',
2224         where => {bucket => $bucket->id}
2225     })->[0];
2226
2227     $e->commit;
2228
2229     return {
2230         bucket => $bucket, 
2231         add_count => scalar(@$bib_ids), # items added to the bucket
2232         item_count => $count->{count} # total items in buckets
2233     };
2234 }
2235
2236 1;