]> git.evergreen-ils.org Git - working/Evergreen.git/blob - Open-ILS/src/perlmods/lib/OpenILS/Application/Vandelay.pm
Vandelay : sort queued records by matches
[working/Evergreen.git] / Open-ILS / src / perlmods / lib / OpenILS / Application / Vandelay.pm
1 package OpenILS::Application::Vandelay;
2 use strict; use warnings;
3 use OpenILS::Application;
4 use base qw/OpenILS::Application/;
5 use Unicode::Normalize;
6 use OpenSRF::EX qw/:try/;
7 use OpenSRF::AppSession;
8 use OpenSRF::Utils::SettingsClient;
9 use OpenSRF::Utils::Cache;
10 use OpenILS::Utils::Fieldmapper;
11 use OpenILS::Utils::CStoreEditor qw/:funcs/;
12 use OpenILS::Utils::Normalize qw/clean_marc/;
13 use MARC::Batch;
14 use MARC::Record;
15 use MARC::File::XML ( BinaryEncoding => 'UTF-8' );
16 use Time::HiRes qw(time);
17 use OpenSRF::Utils::Logger qw/$logger/;
18 use MIME::Base64;
19 use XML::LibXML;
20 use OpenILS::Const qw/:const/;
21 use OpenILS::Application::AppUtils;
22 use OpenILS::Application::Cat::BibCommon;
23 use OpenILS::Application::Cat::AuthCommon;
24 use OpenILS::Application::Cat::AssetCommon;
25 my $U = 'OpenILS::Application::AppUtils';
26
27 # A list of LDR/06 values from http://loc.gov/marc
28 my %record_types = (
29         a => 'bib',
30         c => 'bib',
31         d => 'bib',
32         e => 'bib',
33         f => 'bib',
34         g => 'bib',
35         i => 'bib',
36         j => 'bib',
37         k => 'bib',
38         m => 'bib',
39         o => 'bib',
40         p => 'bib',
41         r => 'bib',
42         t => 'bib',
43         u => 'holdings',
44         v => 'holdings',
45         x => 'holdings',
46         y => 'holdings',
47         z => 'auth',
48       ' ' => 'bib',
49 );
50
51 sub initialize {}
52 sub child_init {}
53
54 # --------------------------------------------------------------------------------
55 # Biblio ingest
56
57 sub create_bib_queue {
58     my $self = shift;
59     my $client = shift;
60     my $auth = shift;
61     my $name = shift;
62     my $owner = shift;
63     my $type = shift;
64     my $match_set = shift;
65     my $import_def = shift;
66     my $match_bucket = shift;
67
68     my $e = new_editor(authtoken => $auth, xact => 1);
69
70     return $e->die_event unless $e->checkauth;
71     return $e->die_event unless $e->allowed('CREATE_BIB_IMPORT_QUEUE');
72     $owner ||= $e->requestor->id;
73
74     if ($e->search_vandelay_bib_queue( {name => $name, owner => $owner, queue_type => $type})->[0]) {
75         $e->rollback;
76         return OpenILS::Event->new('BIB_QUEUE_EXISTS') 
77     }
78
79     my $queue = new Fieldmapper::vandelay::bib_queue();
80     $queue->name( $name );
81     $queue->owner( $owner );
82     $queue->queue_type( $type ) if ($type);
83     $queue->item_attr_def( $import_def ) if ($import_def);
84     $queue->match_set($match_set) if $match_set;
85     $queue->match_bucket($match_bucket) if $match_bucket;
86
87     my $new_q = $e->create_vandelay_bib_queue( $queue );
88     return $e->die_event unless ($new_q);
89     $e->commit;
90
91     return $new_q;
92 }
93 __PACKAGE__->register_method(  
94     api_name   => "open-ils.vandelay.bib_queue.create",
95     method     => "create_bib_queue",
96     api_level  => 1,
97     argc       => 4,
98 );                      
99
100
101 sub create_auth_queue {
102     my $self = shift;
103     my $client = shift;
104     my $auth = shift;
105     my $name = shift;
106     my $owner = shift;
107     my $type = shift;
108     my $match_set = shift;
109
110     my $e = new_editor(authtoken => $auth, xact => 1);
111
112     return $e->die_event unless $e->checkauth;
113     return $e->die_event unless $e->allowed('CREATE_AUTHORITY_IMPORT_QUEUE');
114     $owner ||= $e->requestor->id;
115
116     if ($e->search_vandelay_authority_queue({name => $name, owner => $owner, queue_type => $type})->[0]) {
117         $e->rollback;
118         return OpenILS::Event->new('AUTH_QUEUE_EXISTS') 
119     }
120
121     my $queue = new Fieldmapper::vandelay::authority_queue();
122     $queue->name( $name );
123     $queue->owner( $owner );
124     $queue->queue_type( $type ) if ($type);
125
126     my $new_q = $e->create_vandelay_authority_queue( $queue );
127     $e->die_event unless ($new_q);
128     $e->commit;
129
130     return $new_q;
131 }
132 __PACKAGE__->register_method(  
133     api_name   => "open-ils.vandelay.authority_queue.create",
134     method     => "create_auth_queue",
135     api_level  => 1,
136     argc       => 3,
137 );                      
138
139 sub add_record_to_bib_queue {
140     my $self = shift;
141     my $client = shift;
142     my $auth = shift;
143     my $queue = shift;
144     my $marc = shift;
145     my $purpose = shift;
146     my $bib_source = shift;
147
148     my $e = new_editor(authtoken => $auth, xact => 1);
149
150     $queue = $e->retrieve_vandelay_bib_queue($queue);
151
152     return $e->die_event unless $e->checkauth;
153     return $e->die_event unless
154         ($e->allowed('CREATE_BIB_IMPORT_QUEUE', undef, $queue) ||
155          $e->allowed('CREATE_BIB_IMPORT_QUEUE'));
156
157     my $new_rec = _add_bib_rec($e, $marc, $queue->id, $purpose, $bib_source);
158
159     return $e->die_event unless ($new_rec);
160     $e->commit;
161     return $new_rec;
162 }
163 __PACKAGE__->register_method(  
164     api_name   => "open-ils.vandelay.queued_bib_record.create",
165     method     => "add_record_to_bib_queue",
166     api_level  => 1,
167     argc       => 3,
168 );                      
169
170 sub _add_bib_rec {
171     my $e = shift;
172     my $marc = shift;
173     my $queue = shift;
174     my $purpose = shift;
175     my $bib_source = shift;
176
177     my $rec = new Fieldmapper::vandelay::queued_bib_record();
178     $rec->marc( $marc );
179     $rec->queue( $queue );
180     $rec->purpose( $purpose ) if ($purpose);
181     $rec->bib_source($bib_source);
182
183     return $e->create_vandelay_queued_bib_record( $rec );
184 }
185
186 sub add_record_to_authority_queue {
187     my $self = shift;
188     my $client = shift;
189     my $auth = shift;
190     my $queue = shift;
191     my $marc = shift;
192     my $purpose = shift;
193
194     my $e = new_editor(authtoken => $auth, xact => 1);
195
196     $queue = $e->retrieve_vandelay_authority_queue($queue);
197
198     return $e->die_event unless $e->checkauth;
199     return $e->die_event unless
200         ($e->allowed('CREATE_AUTHORITY_IMPORT_QUEUE', undef, $queue) ||
201          $e->allowed('CREATE_AUTHORITY_IMPORT_QUEUE'));
202
203     my $new_rec = _add_auth_rec($e, $marc, $queue->id, $purpose);
204
205     return $e->die_event unless ($new_rec);
206     $e->commit;
207     return $new_rec;
208 }
209 __PACKAGE__->register_method(
210     api_name   => "open-ils.vandelay.queued_authority_record.create",
211     method     => "add_record_to_authority_queue",
212     api_level  => 1,
213     argc       => 3,
214 );
215
216 sub _add_auth_rec {
217     my $e = shift;
218     my $marc = shift;
219     my $queue = shift;
220     my $purpose = shift;
221
222     my $rec = new Fieldmapper::vandelay::queued_authority_record();
223     $rec->marc( $marc );
224     $rec->queue( $queue );
225     $rec->purpose( $purpose ) if ($purpose);
226
227     return $e->create_vandelay_queued_authority_record( $rec );
228 }
229
230 sub process_spool {
231     my $self = shift;
232     my $client = shift;
233     my $auth = shift;
234     my $fingerprint = shift || '';
235     my $queue_id = shift;
236     my $purpose = shift;
237     my $filename = shift;
238     my $bib_source = shift;
239
240     my $e = new_editor(authtoken => $auth, xact => 1);
241     return $e->die_event unless $e->checkauth;
242
243     my $queue;
244     my $type = $self->{record_type};
245
246     if($type eq 'bib') {
247         $queue = $e->retrieve_vandelay_bib_queue($queue_id) or return $e->die_event;
248     } else {
249         $queue = $e->retrieve_vandelay_authority_queue($queue_id) or return $e->die_event;
250     }
251
252     my $evt = check_queue_perms($e, $type, $queue);
253     return $evt if ($evt);
254
255     my $cache = new OpenSRF::Utils::Cache();
256
257     if($fingerprint) {
258         my $data = $cache->get_cache('vandelay_import_spool_' . $fingerprint);
259         $purpose = $data->{purpose};
260         $filename = $data->{path};
261         $bib_source = $data->{bib_source};
262     }
263
264     unless(-r $filename) {
265         $logger->error("unable to read MARC file $filename");
266         return -1; # make this an event XXX
267     }
268
269     $logger->info("vandelay spooling $fingerprint purpose=$purpose file=$filename");
270
271     my $marctype = 'USMARC'; 
272
273     open F, $filename;
274     $marctype = 'XML' if (getc(F) =~ /^\D/o);
275     close F;
276
277     my $batch = new MARC::Batch ($marctype, $filename);
278     $batch->strict_off;
279
280     my $response_scale = 10;
281     my $count = 0;
282     my $r = -1;
283     while (try { $r = $batch->next } otherwise { $r = -1 }) {
284         if ($r == -1) {
285             $logger->warn("Processing of record $count in set $filename failed.  Skipping this record");
286             $count++;
287         }
288
289         $logger->info("processing record $count");
290
291         try {
292             my $xml = clean_marc($r);
293
294             my $qrec;
295             # Check the leader to ensure we've got something resembling the expected
296             # Allow spaces to give records the benefit of the doubt
297             my $ldr_type = substr($r->leader(), 6, 1);
298             if ($type eq 'bib' && ($record_types{$ldr_type}) eq 'bib' || $ldr_type eq ' ') {
299                 $qrec = _add_bib_rec( $e, $xml, $queue_id, $purpose, $bib_source ) or return $e->die_event;
300             } elsif ($type eq 'auth' && ($record_types{$ldr_type}) eq 'auth' || $ldr_type eq ' ') {
301                 $qrec = _add_auth_rec( $e, $xml, $queue_id, $purpose ) or return $e->die_event;
302             } else {
303                 # I don't know how to handle this type; rock on
304                 $logger->error("In process_spool(), type was $type and leader type was $ldr_type ; not currently supported");
305                 next;
306             }
307
308             if($self->api_name =~ /stream_results/ and $qrec) {
309                 $client->respond($qrec->id)
310             } else {
311                 $client->respond($count) if (++$count % $response_scale) == 0;
312                 $response_scale *= 10 if ($count == ($response_scale * 10));
313             }
314         } catch Error with {
315             my $error = shift;
316             $logger->warn("Encountered a bad record at Vandelay ingest: ".$error);
317         }
318     }
319
320     $e->commit;
321     unlink($filename);
322     $cache->delete_cache('vandelay_import_spool_' . $fingerprint) if $fingerprint;
323     return $count;
324 }
325
326 __PACKAGE__->register_method(  
327     api_name    => "open-ils.vandelay.bib.process_spool",
328     method      => "process_spool",
329     api_level   => 1,
330     argc        => 3,
331     max_chunk_size => 0,
332     record_type => 'bib'
333 );                      
334 __PACKAGE__->register_method(  
335     api_name    => "open-ils.vandelay.auth.process_spool",
336     method      => "process_spool",
337     api_level   => 1,
338     argc        => 3,
339     max_chunk_size => 0,
340     record_type => 'auth'
341 );                      
342
343 __PACKAGE__->register_method(  
344     api_name    => "open-ils.vandelay.bib.process_spool.stream_results",
345     method      => "process_spool",
346     api_level   => 1,
347     argc        => 3,
348     stream      => 1,
349     max_chunk_size => 0,
350     record_type => 'bib'
351 );                      
352 __PACKAGE__->register_method(  
353     api_name    => "open-ils.vandelay.auth.process_spool.stream_results",
354     method      => "process_spool",
355     api_level   => 1,
356     argc        => 3,
357     stream      => 1,
358     max_chunk_size => 0,
359     record_type => 'auth'
360 );
361
362 __PACKAGE__->register_method(  
363     api_name    => "open-ils.vandelay.bib_queue.records.retrieve",
364     method      => 'retrieve_queued_records',
365     api_level   => 1,
366     argc        => 2,
367     stream      => 1,
368     record_type => 'bib'
369 );
370 __PACKAGE__->register_method(
371     api_name    => "open-ils.vandelay.bib_queue.records.retrieve.export.print",
372     method      => 'retrieve_queued_records',
373     api_level   => 1,
374     argc        => 2,
375     stream      => 1,
376     record_type => 'bib'
377 );
378 __PACKAGE__->register_method(
379     api_name    => "open-ils.vandelay.bib_queue.records.retrieve.export.csv",
380     method      => 'retrieve_queued_records',
381     api_level   => 1,
382     argc        => 2,
383     stream      => 1,
384     record_type => 'bib'
385 );
386 __PACKAGE__->register_method(
387     api_name    => "open-ils.vandelay.bib_queue.records.retrieve.export.email",
388     method      => 'retrieve_queued_records',
389     api_level   => 1,
390     argc        => 2,
391     stream      => 1,
392     record_type => 'bib'
393 );
394
395 __PACKAGE__->register_method(  
396     api_name    => "open-ils.vandelay.auth_queue.records.retrieve",
397     method      => 'retrieve_queued_records',
398     api_level   => 1,
399     argc        => 2,
400     stream      => 1,
401     record_type => 'auth'
402 );
403 __PACKAGE__->register_method(
404     api_name    => "open-ils.vandelay.auth_queue.records.retrieve.export.print",
405     method      => 'retrieve_queued_records',
406     api_level   => 1,
407     argc        => 2,
408     stream      => 1,
409     record_type => 'auth'
410 );
411 __PACKAGE__->register_method(
412     api_name    => "open-ils.vandelay.auth_queue.records.retrieve.export.csv",
413     method      => 'retrieve_queued_records',
414     api_level   => 1,
415     argc        => 2,
416     stream      => 1,
417     record_type => 'auth'
418 );
419 __PACKAGE__->register_method(
420     api_name    => "open-ils.vandelay.auth_queue.records.retrieve.export.email",
421     method      => 'retrieve_queued_records',
422     api_level   => 1,
423     argc        => 2,
424     stream      => 1,
425     record_type => 'auth'
426 );
427
428 __PACKAGE__->register_method(  
429     api_name    => "open-ils.vandelay.bib_queue.records.matches.retrieve",
430     method      => 'retrieve_queued_records',
431     api_level   => 1,
432     argc        => 2,
433     stream      => 1,
434     record_type => 'bib',
435     signature   => {
436         desc => q/Only retrieve queued bib records that have matches against existing records/
437     }
438 );
439 __PACKAGE__->register_method(  
440     api_name    => "open-ils.vandelay.auth_queue.records.matches.retrieve",
441     method      => 'retrieve_queued_records',
442     api_level   => 1,
443     argc        => 2,
444     stream      => 1,
445     record_type => 'auth',
446     signature   => {
447         desc => q/Only retrieve queued authority records that have matches against existing records/
448     }
449 );
450
451 sub retrieve_queued_records {
452     my($self, $conn, $auth, $queue_id, $options) = @_;
453
454     $options ||= {};
455     my $limit = $$options{limit} || 20;
456     my $offset = $$options{offset} || 0;
457     my $type = $self->{record_type};
458
459     my $e = new_editor(authtoken => $auth, xact => 1);
460     return $e->die_event unless $e->checkauth;
461
462     my $queue;
463     if($type eq 'bib') {
464         $queue = $e->retrieve_vandelay_bib_queue($queue_id) or return $e->die_event;
465     } else {
466         $queue = $e->retrieve_vandelay_authority_queue($queue_id) or return $e->die_event;
467     }
468     my $evt = check_queue_perms($e, $type, $queue);
469     return $evt if ($evt);
470
471     my $class = ($type eq 'bib') ? 'vqbr' : 'vqar';
472     my $mclass = $type eq 'bib' ? 'vbm' : 'vam';
473     my $query = {
474         select => {
475             $class => ['id'],
476             $mclass => [{
477                 column => 'eg_record', 
478                 transform => 'min',
479                 aggregate => 1
480             }]
481         },
482         from => $class,
483         where => {queue => $queue_id},
484         distinct => 1,
485         limit => $limit,
486         offset => $offset,
487     };
488     if($self->api_name =~ /export/) {
489         delete $query->{limit};
490         delete $query->{offset};
491     }
492
493     $query->{where}->{import_time} = undef if $$options{non_imported};
494
495     if($$options{with_import_error}) {
496
497         $query->{from} = {$class => {vii => {type => 'left'}}};
498         $query->{where}->{'-or'} = [
499             {'+vqbr' => {import_error => {'!=' => undef}}},
500             {'+vii' => {import_error => {'!=' => undef}}}
501         ];
502
503     } else {
504         
505         if($$options{with_rec_import_error}) {
506             $query->{where}->{import_error} = {'!=' => undef};
507
508         } elsif( $$options{with_item_import_error} and $type eq 'bib') {
509
510             $query->{from} = {$class => 'vii'};
511             $query->{where}->{'+vii'} = {import_error => {'!=' => undef}};
512         }
513     }
514
515     if($self->api_name =~ /matches/) {
516         # find only records that have matches
517         $query->{from} = {$class => {$mclass => {type => 'right'}}};
518     } else {
519         # join to mclass for sorting (see below)
520         $query->{from} = {$class => {$mclass => {type => 'left'}}};
521     }
522
523     # order by the matched bib records to group like queued records
524     $query->{order_by} = [
525         {class => $mclass, field => 'eg_record', transform => 'min'},
526         {class => $class, field => 'id'} 
527     ];
528
529     my $record_ids = $e->json_query($query);
530
531     my $retrieve = ($type eq 'bib') ? 
532         'retrieve_vandelay_queued_bib_record' : 'retrieve_vandelay_queued_authority_record';
533     my $search = ($type eq 'bib') ? 
534         'search_vandelay_queued_bib_record' : 'search_vandelay_queued_authority_record';
535
536     if ($self->api_name =~ /export/) {
537         my $rec_list = $e->$search({id => [map { $_->{id} } @$record_ids]}, {substream => 1});
538         if ($self->api_name =~ /print/) {
539
540             $e->rollback;
541             return $U->fire_object_event(
542                 undef,
543                 'vandelay.queued_'.$type.'_record.print',
544                 $rec_list,
545                 $e->requestor->ws_ou
546             );
547
548         } elsif ($self->api_name =~ /csv/) {
549
550             $e->rollback;
551             return $U->fire_object_event(
552                 undef,
553                 'vandelay.queued_'.$type.'_record.csv',
554                 $rec_list,
555                 $e->requestor->ws_ou
556             );
557
558         } elsif ($self->api_name =~ /email/) {
559
560             $conn->respond_complete(1);
561
562             for my $rec (@$rec_list) {
563                 $U->create_events_for_hook(
564                     'vandelay.queued_'.$type.'_record.email',
565                     $rec,
566                     $e->requestor->home_ou,
567                     undef,
568                     undef,
569                     1
570                 );
571             }
572
573         }
574     } else {
575         for my $rec_id (@$record_ids) {
576             my $flesh = ['attributes', 'matches'];
577             push(@$flesh, 'import_items') if $$options{flesh_import_items};
578             my $params = {flesh => 1, flesh_fields => {$class => $flesh}};
579             my $rec = $e->$retrieve([$rec_id->{id}, $params]);
580             $rec->clear_marc if $$options{clear_marc};
581             $conn->respond($rec);
582         }
583     }
584
585     $e->rollback;
586     return undef;
587 }
588
589 __PACKAGE__->register_method(  
590     api_name    => 'open-ils.vandelay.import_item.queue.retrieve',
591     method      => 'retrieve_queue_import_items',
592     api_level   => 1,
593     argc        => 2,
594     stream      => 1,
595     authoritative => 1,
596     signature => q/
597         Returns Import Item (vii) objects for the selected queue.
598         Filter options:
599             with_import_error : only return items that failed to import
600     /
601 );
602 __PACKAGE__->register_method(
603     api_name    => 'open-ils.vandelay.import_item.queue.export.print',
604     method      => 'retrieve_queue_import_items',
605     api_level   => 1,
606     argc        => 2,
607     stream      => 1,
608     authoritative => 1,
609     signature => q/
610         Returns template-generated printable output of Import Item (vii) objects for the selected queue.
611         Filter options:
612             with_import_error : only return items that failed to import
613     /
614 );
615 __PACKAGE__->register_method(
616     api_name    => 'open-ils.vandelay.import_item.queue.export.csv',
617     method      => 'retrieve_queue_import_items',
618     api_level   => 1,
619     argc        => 2,
620     stream      => 1,
621     authoritative => 1,
622     signature => q/
623         Returns template-generated CSV output of Import Item (vii) objects for the selected queue.
624         Filter options:
625             with_import_error : only return items that failed to import
626     /
627 );
628 __PACKAGE__->register_method(
629     api_name    => 'open-ils.vandelay.import_item.queue.export.email',
630     method      => 'retrieve_queue_import_items',
631     api_level   => 1,
632     argc        => 2,
633     stream      => 1,
634     authoritative => 1,
635     signature => q/
636         Emails template-generated output of Import Item (vii) objects for the selected queue.
637         Filter options:
638             with_import_error : only return items that failed to import
639     /
640 );
641
642 sub retrieve_queue_import_items {
643     my($self, $conn, $auth, $q_id, $options) = @_;
644
645     $options ||= {};
646     my $limit = $$options{limit} || 20;
647     my $offset = $$options{offset} || 0;
648
649     my $e = new_editor(authtoken => $auth);
650     return $e->event unless $e->checkauth;
651
652     my $queue = $e->retrieve_vandelay_bib_queue($q_id) or return $e->event;
653     my $evt = check_queue_perms($e, 'bib', $queue);
654     return $evt if $evt;
655
656     my $query = {
657         select => {vii => ['id']},
658         from => {
659             vii => {
660                 vqbr => {
661                     join => {
662                         'vbq' => {
663                             field => 'id',
664                             fkey => 'queue',
665                             filter => {id => $q_id}
666                         }
667                     }
668                 }
669             }
670         },
671         order_by => {'vii' => ['record','id']},
672         limit => $limit,
673         offset => $offset
674     };
675     if($self->api_name =~ /export/) {
676         delete $query->{limit};
677         delete $query->{offset};
678     }
679
680     $query->{where} = {'+vii' => {import_error => {'!=' => undef}}}
681         if $$options{with_import_error};
682
683     my $items = $e->json_query($query);
684     my $item_list = $e->search_vandelay_import_item({id => [map { $_->{id} } @$items]});
685     if ($self->api_name =~ /export/) {
686         if ($self->api_name =~ /print/) {
687
688             return $U->fire_object_event(
689                 undef,
690                 'vandelay.import_items.print',
691                 $item_list,
692                 $e->requestor->ws_ou
693             );
694
695         } elsif ($self->api_name =~ /csv/) {
696
697             return $U->fire_object_event(
698                 undef,
699                 'vandelay.import_items.csv',
700                 $item_list,
701                 $e->requestor->ws_ou
702             );
703
704         } elsif ($self->api_name =~ /email/) {
705
706             $conn->respond_complete(1);
707
708             for my $item (@$item_list) {
709                 $U->create_events_for_hook(
710                     'vandelay.import_items.email',
711                     $item,
712                     $e->requestor->home_ou,
713                     undef,
714                     undef,
715                     1
716                 );
717             }
718
719         }
720     } else {
721         for my $item (@$item_list) {
722             $conn->respond($item);
723         }
724     }
725
726     return undef;
727 }
728
729 sub check_queue_perms {
730     my($e, $type, $queue) = @_;
731     if ($type eq 'bib') {
732         return $e->die_event unless
733             ($e->allowed('CREATE_BIB_IMPORT_QUEUE', undef, $queue) ||
734              $e->allowed('CREATE_BIB_IMPORT_QUEUE'));
735     } else {
736         return $e->die_event unless
737             ($e->allowed('CREATE_AUTHORITY_IMPORT_QUEUE', undef, $queue) ||
738              $e->allowed('CREATE_AUTHORITY_IMPORT_QUEUE'));
739     }
740
741     return undef;
742 }
743
744 __PACKAGE__->register_method(  
745     api_name    => "open-ils.vandelay.bib_record.list.import",
746     method      => 'import_record_list',
747     api_level   => 1,
748     argc        => 2,
749     stream      => 1,
750     record_type => 'bib'
751 );
752
753 __PACKAGE__->register_method(  
754     api_name    => "open-ils.vandelay.auth_record.list.import",
755     method      => 'import_record_list',
756     api_level   => 1,
757     argc        => 2,
758     stream      => 1,
759     record_type => 'auth'
760 );
761
762 sub import_record_list {
763     my($self, $conn, $auth, $rec_ids, $args) = @_;
764     my $e = new_editor(authtoken => $auth, xact => 1);
765     return $e->die_event unless $e->checkauth;
766     $args ||= {};
767     my $err = import_record_list_impl($self, $conn, $rec_ids, $e->requestor, $args);
768     try {$e->rollback} otherwise {}; 
769     return $err if $err;
770     return {complete => 1};
771 }
772
773
774 __PACKAGE__->register_method(  
775     api_name    => "open-ils.vandelay.bib_queue.import",
776     method      => 'import_queue',
777     api_level   => 1,
778     argc        => 2,
779     stream      => 1,
780     max_chunk_size => 0,
781     record_type => 'bib',
782     signature => {
783         desc => q/
784             Attempts to import all non-imported records for the selected queue.
785             Will also attempt import of all non-imported items.
786         /
787     }
788 );
789
790 __PACKAGE__->register_method(  
791     api_name    => "open-ils.vandelay.auth_queue.import",
792     method      => 'import_queue',
793     api_level   => 1,
794     argc        => 2,
795     stream      => 1,
796     max_chunk_size => 0,
797     record_type => 'auth'
798 );
799
800 sub import_queue {
801     my($self, $conn, $auth, $q_id, $options) = @_;
802     my $e = new_editor(authtoken => $auth, xact => 1);
803     return $e->die_event unless $e->checkauth;
804     $options ||= {};
805     my $type = $self->{record_type};
806     my $class = ($type eq 'bib') ? 'vqbr' : 'vqar';
807
808     # First, collect the not-yet-imported records
809     my $query = {queue => $q_id, import_time => undef};
810     my $search = ($type eq 'bib') ? 
811         'search_vandelay_queued_bib_record' : 
812         'search_vandelay_queued_authority_record';
813     my $rec_ids = $e->$search($query, {idlist => 1});
814
815     # Now add any imported records that have un-imported items
816
817     if($type eq 'bib') {
818         my $item_recs = $e->json_query({
819             select => {vqbr => ['id']},
820             from => {vqbr => 'vii'},
821             where => {
822                 '+vqbr' => {
823                     queue => $q_id,
824                     import_time => {'!=' => undef}
825                 },
826                 '+vii' => {import_time => undef}
827             },
828             distinct => 1
829         });
830         push(@$rec_ids, map {$_->{id}} @$item_recs);
831     }
832
833     my $err = import_record_list_impl($self, $conn, $rec_ids, $e->requestor, $options);
834     try {$e->rollback} otherwise {}; # only using this to make the read authoritative -- don't die from it
835     return $err if $err;
836     return {complete => 1};
837 }
838
839 # returns a list of queued record IDs for a given queue that 
840 # have at least one entry in the match table
841 # XXX DEPRECATED?
842 sub queued_records_with_matches {
843     my($e, $type, $q_id, $limit, $offset, $filter) = @_;
844
845     my $match_class = 'vbm';
846     my $rec_class = 'vqbr';
847     if($type eq 'auth') {
848         $match_class = 'vam';
849          $rec_class = 'vqar';
850     }
851
852     $filter ||= {};
853     $filter->{queue} = $q_id;
854
855     my $query = {
856         distinct => 1, 
857         select => {$match_class => ['queued_record']}, 
858         from => {
859             $match_class => {
860                 $rec_class => {
861                     field => 'id',
862                     fkey => 'queued_record',
863                     filter => $filter,
864                 }
865             }
866         }
867     };        
868
869     if($limit or defined $offset) {
870         $limit ||= 20;
871         $offset ||= 0;
872         $query->{limit} = $limit;
873         $query->{offset} = $offset;
874     }
875
876     my $data = $e->json_query($query);
877     return [ map {$_->{queued_record}} @$data ];
878 }
879
880
881 sub import_record_list_impl {
882     my($self, $conn, $rec_ids, $requestor, $args) = @_;
883
884     my $overlay_map = $args->{overlay_map} || {};
885     my $type = $self->{record_type};
886     my %queues;
887
888     my $report_args = {
889         progress => 1,
890         step => 1,
891         conn => $conn,
892         total => scalar(@$rec_ids),
893         report_all => $$args{report_all}
894     };
895
896     $conn->max_chunk_count(1) if $$args{report_all};
897
898     my $auto_overlay_exact = $$args{auto_overlay_exact};
899     my $auto_overlay_1match = $$args{auto_overlay_1match};
900     my $auto_overlay_best = $$args{auto_overlay_best_match};
901     my $match_quality_ratio = $$args{match_quality_ratio};
902     my $merge_profile = $$args{merge_profile};
903     my $ft_merge_profile = $$args{fall_through_merge_profile};
904     my $bib_source = $$args{bib_source};
905     my $import_no_match = $$args{import_no_match};
906     my $strip_grps = $$args{strip_field_groups}; # bib-only
907
908     my $overlay_func = 'vandelay.overlay_bib_record';
909     my $auto_overlay_func = 'vandelay.auto_overlay_bib_record';
910     my $auto_overlay_best_func = 'vandelay.auto_overlay_bib_record_with_best'; # XXX bib-only
911     my $retrieve_func = 'retrieve_vandelay_queued_bib_record';
912     my $update_func = 'update_vandelay_queued_bib_record';
913     my $search_func = 'search_vandelay_queued_bib_record';
914     my $retrieve_queue_func = 'retrieve_vandelay_bib_queue';
915     my $update_queue_func = 'update_vandelay_bib_queue';
916     my $delete_queue_func = 'delete_vandelay_bib_queue';
917     my $rec_class = 'vqbr';
918
919     my $editor = new_editor();
920
921     my %bib_sources;
922     my $sources = $editor->search_config_bib_source({id => {'!=' => undef}});
923     $bib_sources{$_->id} = $_->source for @$sources;
924
925     if($type eq 'auth') {
926         $overlay_func =~ s/bib/auth/o;
927         $auto_overlay_func = s/bib/auth/o;
928         $retrieve_func =~ s/bib/authority/o;
929         $retrieve_queue_func =~ s/bib/authority/o;
930         $update_queue_func =~ s/bib/authority/o;
931         $update_func =~ s/bib/authority/o;
932         $search_func =~ s/bib/authority/o;
933         $delete_queue_func =~ s/bib/authority/o;
934         $rec_class = 'vqar';
935     }
936
937     my $new_rec_perm_cache;
938     my @success_rec_ids;
939     for my $rec_id (@$rec_ids) {
940
941         my $error = 0;
942         my $overlay_target = $overlay_map->{$rec_id};
943
944         my $e = new_editor(xact => 1);
945         $e->requestor($requestor);
946
947         $$report_args{e} = $e;
948         $$report_args{evt} = undef;
949         $$report_args{import_error} = undef;
950         $$report_args{no_import} = 0;
951
952         my $rec = $e->$retrieve_func([
953             $rec_id,
954             {   flesh => 1,
955                 flesh_fields => { $rec_class => ['matches']},
956             }
957         ]);
958
959         unless($rec) {
960             $$report_args{evt} = $e->event;
961             finish_rec_import_attempt($report_args);
962             next;
963         }
964
965         if($rec->import_time) {
966             # if the record is already imported, that means it may have 
967             # un-imported copies.  Add to success list for later processing.
968             push(@success_rec_ids, $rec_id);
969             $e->rollback;
970             next;
971         }
972
973         $$report_args{rec} = $rec;
974         $queues{$rec->queue} = 1;
975
976         my $record;
977         my $imported = 0;
978
979         if ($type eq 'bib') {
980             # strip configured / selected MARC tags from inbound records
981
982             my $marcdoc = XML::LibXML->new->parse_string($rec->marc);
983             $rec->marc($U->strip_marc_fields($e, $marcdoc, $strip_grps));
984
985             unless ($e->$update_func($rec)) {
986                 $$report_args{evt} = $e->die_event;
987                 finish_rec_import_attempt($report_args);
988                 next;
989             }
990         }
991
992         if(defined $overlay_target) {
993             # Caller chose an explicit overlay target
994
995             my $res = $e->json_query(
996                 {
997                     from => [
998                         $overlay_func,
999                         $rec_id,
1000                         $overlay_target, 
1001                         $merge_profile
1002                     ]
1003                 }
1004             );
1005
1006             if($res and ($res = $res->[0])) {
1007
1008                 if($res->{$overlay_func} eq 't') {
1009                     $logger->info("vl: $type direct overlay succeeded for queued rec ".
1010                         "$rec_id and overlay target $overlay_target");
1011                     $imported = 1;
1012                 }
1013
1014             } else {
1015                 $error = 1;
1016                 $logger->error("vl: Error attempting overlay with func=$overlay_func, profile=$merge_profile, record=$rec_id");
1017             }
1018
1019         } else {
1020
1021             if($auto_overlay_1match) { # overlay if there is exactly 1 match
1022
1023                 my %match_recs = map { $_->eg_record => 1 } @{$rec->matches};
1024
1025                 if( scalar(keys %match_recs) == 1) { # all matches point to the same record
1026
1027                     ($imported, $error, $rec) = try_auto_overlay(
1028                         $e, $type,
1029                         $report_args, 
1030                         $auto_overlay_best_func,
1031                         $retrieve_func,
1032                         $rec_class,
1033                         $rec_id, 
1034                         $match_quality_ratio, 
1035                         $merge_profile, 
1036                         $ft_merge_profile
1037                     );
1038                 }
1039             }
1040
1041             if(!$imported and !$error and $auto_overlay_exact and scalar(@{$rec->matches}) == 1 ) {
1042                 
1043                 # caller says to overlay if there is an /exact/ match
1044                 # $auto_overlay_func only proceeds and returns true on exact matches
1045
1046                 my $res = $e->json_query(
1047                     {
1048                         from => [
1049                             $auto_overlay_func,
1050                             $rec_id,
1051                             $merge_profile
1052                         ]
1053                     }
1054                 );
1055
1056                 if($res and ($res = $res->[0])) {
1057
1058                     if($res->{$auto_overlay_func} eq 't') {
1059                         $logger->info("vl: $type auto-overlay succeeded for queued rec $rec_id");
1060                         $imported = 1;
1061
1062                         # re-fetch the record to pick up the imported_as value from the DB
1063                         $$report_args{rec} = $rec = $e->$retrieve_func([
1064                             $rec_id, {flesh => 1, flesh_fields => {$rec_class => ['matches']}}]);
1065
1066                     } else {
1067                         $logger->info("vl: $type auto-overlay failed for queued rec $rec_id");
1068                     }
1069
1070                 } else {
1071                     $error = 1;
1072                     $logger->error("vl: Error attempting overlay with func=$auto_overlay_func, profile=$merge_profile, record=$rec_id");
1073                 }
1074             }
1075
1076             if(!$imported and !$error and $auto_overlay_best and scalar(@{$rec->matches}) > 0 ) {
1077                 # caller says to overlay the best match
1078
1079                 ($imported, $error, $rec) = try_auto_overlay(
1080                     $e, $type,
1081                     $report_args, 
1082                     $auto_overlay_best_func,
1083                     $retrieve_func,
1084                     $rec_class,
1085                     $rec_id, 
1086                     $match_quality_ratio, 
1087                     $merge_profile, 
1088                     $ft_merge_profile
1089                 );
1090             }
1091
1092             if(!$imported and !$error and $import_no_match and scalar(@{$rec->matches}) == 0) {
1093             
1094                 # No overlay / merge occurred.  Do a traditional record import by creating a new record
1095
1096                 if (!$new_rec_perm_cache) {
1097                     $new_rec_perm_cache = {};
1098
1099                     # all users creating new records are required to have the basic permission.
1100                     # if the client requests, we can enforce extra permissions for creating new records.
1101                     # for speed, check the permissions the first time then cache the result.
1102
1103                     my $perm = ($type eq 'bib') ? 'IMPORT_MARC' : 'IMPORT_AUTHORITY_MARC';
1104                     my $xperm = $$args{new_rec_perm};
1105                     my $rec_ou = $e->requestor->ws_ou;
1106
1107                     $new_rec_perm_cache->{evt} = $e->die_event
1108                         if !$e->allowed($perm, $rec_ou) || ($xperm and !$e->allowed($xperm, $rec_ou));
1109                 }
1110
1111                 if ($new_rec_perm_cache->{evt}) {
1112
1113                     # a cached event won't roll back the transaction (a la die_event), but
1114                     # the transaction will get rolled back in finish_rec_import_attempt() below
1115                     $$report_args{evt} = $new_rec_perm_cache->{evt};
1116                     $$report_args{import_error} = 'import.record.perm_failure';
1117
1118                 } else { # perm checks succeeded
1119
1120                     $logger->info("vl: creating new $type record for queued record $rec_id");
1121
1122                     if ($type eq 'bib') {
1123
1124                         $record = OpenILS::Application::Cat::BibCommon->biblio_record_xml_import(
1125                             $e, $rec->marc, $bib_sources{$rec->bib_source}, undef, 1);
1126
1127                     } else { # authority record
1128
1129                         $record = OpenILS::Application::Cat::AuthCommon->import_authority_record($e, $rec->marc); #$source);
1130                     }
1131
1132                     if($U->event_code($record)) {
1133                         $$report_args{import_error} = 'import.duplicate.tcn' 
1134                             if $record->{textcode} eq 'OPEN_TCN_NOT_FOUND';
1135                         $$report_args{evt} = $record;
1136
1137                     } else {
1138
1139                         $logger->info("vl: successfully imported new $type record");
1140                         $rec->imported_as($record->id);
1141                         $imported = 1;
1142                     }
1143                 }
1144             }
1145         }
1146
1147         if($imported) {
1148
1149             $rec->import_time('now');
1150             $rec->clear_import_error;
1151             $rec->clear_error_detail;
1152
1153             if($e->$update_func($rec)) {
1154
1155                 if($type eq 'bib') {
1156
1157                     # see if this record is linked from an acq record.
1158                     my $li = $e->search_acq_lineitem(
1159                         {queued_record => $rec->id, state => {'!=' => 'canceled'}})->[0];
1160
1161                     if ($li) { 
1162                         # if so, update the acq lineitem to point to the imported record
1163                         $li->eg_bib_id($rec->imported_as);
1164                         $$report_args{evt} = $e->die_event unless $e->update_acq_lineitem($li);
1165                     }
1166                 }
1167
1168                 push @success_rec_ids, $rec_id;
1169                 finish_rec_import_attempt($report_args);
1170
1171             } else {
1172                 $imported = 0;
1173             }
1174         }
1175
1176         if(!$imported) {
1177             $logger->info("vl: record $rec_id was not imported");
1178             $$report_args{evt} = $e->event unless $$report_args{evt};
1179             $$report_args{no_import} = 1;
1180             finish_rec_import_attempt($report_args);
1181         }
1182     }
1183
1184     # see if we need to mark any queues as complete
1185     for my $q_id (keys %queues) {
1186
1187         my $e = new_editor(xact => 1);
1188         my $remaining = $e->$search_func(
1189             [{queue => $q_id, import_time => undef}, {limit =>1}], {idlist => 1});
1190
1191         unless(@$remaining) {
1192             my $queue = $e->$retrieve_queue_func($q_id);
1193             unless($U->is_true($queue->complete)) {
1194                 $queue->complete('t');
1195                 $e->$update_queue_func($queue) or return $e->die_event;
1196                 $e->commit;
1197                 next;
1198             }
1199         } 
1200         $e->rollback;
1201     }
1202
1203     # import the copies
1204     import_record_asset_list_impl($conn, \@success_rec_ids, $requestor) if @success_rec_ids;
1205
1206     $conn->respond({total => $$report_args{total}, progress => $$report_args{progress}});
1207     return undef;
1208 }
1209
1210
1211 sub try_auto_overlay {
1212     my $e = shift;
1213     my $type = shift;
1214     my $report_args = shift;
1215     my $overlay_func  = shift;
1216     my $retrieve_func = shift; 
1217     my $rec_class = shift;
1218     my $rec_id  = shift;
1219     my $match_quality_ratio = shift;
1220     my $merge_profile  = shift;
1221     my $ft_merge_profile = shift;
1222
1223     my $imported = 0;
1224     my $error = 0;
1225
1226     # Find the best match and overlay if the quality ratio allows it.
1227     my $res = $e->json_query(
1228         {
1229             from => [
1230                 $overlay_func,
1231                 $rec_id, 
1232                 $merge_profile,
1233                 $match_quality_ratio
1234             ]
1235         }
1236     );
1237
1238     if($res and ($res = $res->[0])) {
1239
1240         if($res->{$overlay_func} eq 't') {
1241
1242             # first attempt succeeded
1243             $imported = 1;
1244
1245         } else {
1246
1247             # quality-limited merge failed with insufficient quality.  If there is a 
1248             # fall-through merge profile, re-do the merge with the alternate profile
1249             # and no quality restriction.
1250
1251             if($ft_merge_profile and $match_quality_ratio > 0) {
1252
1253                 $logger->info("vl: $type auto-merge failed with profile $merge_profile; ".
1254                     "re-merging with fall-through profile $ft_merge_profile");
1255
1256                 my $res = $e->json_query(
1257                     {
1258                         from => [
1259                             $overlay_func,
1260                             $rec_id, 
1261                             $ft_merge_profile,
1262                             0 # minimum quality not required
1263                         ]
1264                     }
1265                 );
1266
1267                 if($res and ($res = $res->[0])) {
1268
1269                     if($res->{$overlay_func} eq 't') {
1270
1271                         # second attempt succeeded
1272                         $imported = 1;
1273
1274                     } else {
1275
1276                         # failed to merge on second attempt
1277                         $logger->info("vl: $type auto-merge with fall-through failed for queued rec $rec_id");
1278                     }
1279                 } else {
1280                     
1281                     # second attempt died 
1282                     $error = 1;
1283                     $logger->error("vl: Error attempting overlay with func=$overlay_func, profile=$merge_profile, record=$rec_id");
1284                 }
1285
1286             } else { 
1287
1288                 # failed to merge on first attempt, no fall-through was provided
1289                 $$report_args{import_error} = 'overlay.record.quality' if $match_quality_ratio > 0;
1290                 $logger->info("vl: $type auto-merge failed for queued rec $rec_id");
1291             }
1292         }
1293
1294     } else {
1295
1296         # first attempt died 
1297         $error = 1;
1298         $logger->error("vl: Error attempting overlay with func=$overlay_func, profile=$merge_profile, record=$rec_id");
1299     }
1300
1301     if($imported) {
1302
1303         # at least 1 of the attempts succeeded
1304         $logger->info("vl: $type auto-merge succeeded for queued rec $rec_id");
1305
1306         # re-fetch the record to pick up the imported_as value from the DB
1307         $$report_args{rec} = $e->$retrieve_func([
1308             $rec_id, {flesh => 1, flesh_fields => {$rec_class => ['matches']}}]);
1309     }
1310
1311     return ($imported, $error, $$report_args{rec});
1312 }
1313
1314
1315 # tracks any import errors, commits the current xact, responds to the client
1316 sub finish_rec_import_attempt {
1317     my $args = shift;
1318     my $evt = $$args{evt};
1319     my $rec = $$args{rec};
1320     my $e = $$args{e};
1321
1322     my $error = $$args{import_error};
1323     $error = 'general.unknown' if $evt and not $error;
1324
1325     # error tracking
1326     if($rec) {
1327
1328         if($error or $evt) {
1329             # failed import
1330             # since an error occurred, there's no guarantee the transaction wasn't 
1331             # rolled back.  force a rollback and create a new editor.
1332             $e->rollback;
1333             $e = new_editor(xact => 1);
1334             $rec->import_error($error);
1335
1336             if($evt) {
1337                 my $detail = sprintf("%s : %s", $evt->{textcode}, substr($evt->{desc}, 0, 140));
1338                 $rec->error_detail($detail);
1339             }
1340
1341             my $method = 'update_vandelay_queued_bib_record';
1342             $method =~ s/bib/authority/ if $$args{type} eq 'auth';
1343             $e->$method($rec) and $e->commit or $e->rollback;
1344
1345         } else {
1346             # commit the successful import
1347             $e->commit;
1348         }
1349
1350     } else {
1351         # requested queued record was not found
1352         $e->rollback;
1353     }
1354         
1355     # respond to client
1356     if($$args{report_all} or ($$args{progress} % $$args{step}) == 0) {
1357         $$args{conn}->respond({
1358             total => $$args{total}, 
1359             progress => $$args{progress}, 
1360             imported => ($rec) ? $rec->id : undef,
1361             import_error => $error,
1362             no_import => $$args{no_import},
1363             err_event => $evt
1364         });
1365         $$args{step} *= 2 unless $$args{step} == 256;
1366     }
1367
1368     $$args{progress}++;
1369 }
1370
1371
1372
1373
1374
1375 __PACKAGE__->register_method(  
1376     api_name    => "open-ils.vandelay.bib_queue.owner.retrieve",
1377     method      => 'owner_queue_retrieve',
1378     api_level   => 1,
1379     argc        => 2,
1380     stream      => 1,
1381     record_type => 'bib'
1382 );
1383 __PACKAGE__->register_method(  
1384     api_name    => "open-ils.vandelay.authority_queue.owner.retrieve",
1385     method      => 'owner_queue_retrieve',
1386     api_level   => 1,
1387     argc        => 2,
1388     stream      => 1,
1389     record_type => 'auth'
1390 );
1391
1392 sub owner_queue_retrieve {
1393     my($self, $conn, $auth, $owner_id, $filters) = @_;
1394     my $e = new_editor(authtoken => $auth, xact => 1);
1395     return $e->die_event unless $e->checkauth;
1396     $owner_id = $e->requestor->id; # XXX add support for viewing other's queues?
1397     my $queues;
1398     $filters ||= {};
1399     my $search = {owner => $owner_id};
1400     $search->{$_} = $filters->{$_} for keys %$filters;
1401
1402     if($self->{record_type} eq 'bib') {
1403         $queues = $e->search_vandelay_bib_queue(
1404             [$search, {order_by => {vbq => 'evergreen.lowercase(name)'}}]);
1405     } else {
1406         $queues = $e->search_vandelay_authority_queue(
1407             [$search, {order_by => {vaq => 'evergreen.lowercase(name)'}}]);
1408     }
1409     $conn->respond($_) for @$queues;
1410     $e->rollback;
1411     return undef;
1412 }
1413
1414 __PACKAGE__->register_method(  
1415     api_name    => "open-ils.vandelay.bib_queue.delete",
1416     method      => "delete_queue",
1417     api_level   => 1,
1418     argc        => 2,
1419     record_type => 'bib'
1420 );            
1421 __PACKAGE__->register_method(  
1422     api_name    => "open-ils.vandelay.auth_queue.delete",
1423     method      => "delete_queue",
1424     api_level   => 1,
1425     argc        => 2,
1426     record_type => 'auth'
1427 );  
1428
1429 sub delete_queue {
1430     my($self, $conn, $auth, $q_id) = @_;
1431     my $e = new_editor(xact => 1, authtoken => $auth);
1432     return $e->die_event unless $e->checkauth;
1433     if($self->{record_type} eq 'bib') {
1434         return $e->die_event unless $e->allowed('CREATE_BIB_IMPORT_QUEUE');
1435         my $queue = $e->retrieve_vandelay_bib_queue($q_id)
1436             or return $e->die_event;
1437         $e->delete_vandelay_bib_queue($queue)
1438             or return $e->die_event;
1439     } else {
1440            return $e->die_event unless $e->allowed('CREATE_AUTHORITY_IMPORT_QUEUE');
1441         my $queue = $e->retrieve_vandelay_authority_queue($q_id)
1442             or return $e->die_event;
1443         $e->delete_vandelay_authority_queue($queue)
1444             or return $e->die_event;
1445     }
1446     $e->commit;
1447     return 1;
1448 }
1449
1450
1451 __PACKAGE__->register_method(  
1452     api_name    => "open-ils.vandelay.queued_bib_record.html",
1453     method      => 'queued_record_html',
1454     api_level   => 1,
1455     argc        => 2,
1456     stream      => 1,
1457     record_type => 'bib'
1458 );
1459 __PACKAGE__->register_method(  
1460     api_name    => "open-ils.vandelay.queued_authority_record.html",
1461     method      => 'queued_record_html',
1462     api_level   => 1,
1463     argc        => 2,
1464     stream      => 1,
1465     record_type => 'auth'
1466 );
1467
1468 sub queued_record_html {
1469     my($self, $conn, $auth, $rec_id) = @_;
1470     my $e = new_editor(xact=>1,authtoken => $auth);
1471     return $e->die_event unless $e->checkauth;
1472     my $rec;
1473     if($self->{record_type} eq 'bib') {
1474         $rec = $e->retrieve_vandelay_queued_bib_record($rec_id)
1475             or return $e->die_event;
1476     } else {
1477         $rec = $e->retrieve_vandelay_queued_authority_record($rec_id)
1478             or return $e->die_event;
1479     }
1480
1481     $e->rollback;
1482     return $U->simplereq(
1483         'open-ils.search',
1484         'open-ils.search.biblio.record.html', undef, 1, $rec->marc);
1485 }
1486
1487
1488 __PACKAGE__->register_method(  
1489     api_name    => "open-ils.vandelay.bib_queue.summary.retrieve", 
1490     method      => 'retrieve_queue_summary',
1491     api_level   => 1,
1492     argc        => 2,
1493     stream      => 1,
1494     record_type => 'bib'
1495 );
1496 __PACKAGE__->register_method(  
1497     api_name    => "open-ils.vandelay.auth_queue.summary.retrieve",
1498     method      => 'retrieve_queue_summary',
1499     api_level   => 1,
1500     argc        => 2,
1501     stream      => 1,
1502     record_type => 'auth'
1503 );
1504
1505 sub retrieve_queue_summary {
1506     my($self, $conn, $auth, $queue_id) = @_;
1507     my $e = new_editor(xact=>1, authtoken => $auth);
1508     return $e->die_event unless $e->checkauth;
1509
1510     my $queue;
1511     my $type = $self->{record_type};
1512     if($type eq 'bib') {
1513         $queue = $e->retrieve_vandelay_bib_queue($queue_id)
1514             or return $e->die_event;
1515     } else {
1516         $queue = $e->retrieve_vandelay_authority_queue($queue_id)
1517             or return $e->die_event;
1518     }
1519
1520     my $evt = check_queue_perms($e, $type, $queue);
1521     return $evt if $evt;
1522
1523     my $search = 'search_vandelay_queued_bib_record';
1524     $search =~ s/bib/authority/ if $type ne 'bib';
1525
1526     my $summary = {
1527         queue => $queue,
1528         total => scalar(@{$e->$search({queue => $queue_id}, {idlist=>1})}),
1529         imported => scalar(@{$e->$search({queue => $queue_id, import_time => {'!=' => undef}}, {idlist=>1})}),
1530     };
1531
1532     my $class = ($type eq 'bib') ? 'vqbr' : 'vqar';
1533     $summary->{rec_import_errors} = $e->json_query({
1534         select => {$class => [{alias => 'count', column => 'id', transform => 'count', aggregate => 1}]},
1535         from => $class,
1536         where => {queue => $queue_id, import_error => {'!=' => undef}}
1537     })->[0]->{count};
1538
1539     if($type eq 'bib') {
1540         
1541         # count of all items attached to records in the queue in question
1542         my $query = {
1543             select => {vii => [{alias => 'count', column => 'id', transform => 'count', aggregate => 1}]},
1544             from => 'vii',
1545             where => {
1546                 record => {
1547                     in => {
1548                         select => {vqbr => ['id']},
1549                         from => 'vqbr',
1550                         where => {queue => $queue_id}
1551                     }
1552                 }
1553             }
1554         };
1555         $summary->{total_items} = $e->json_query($query)->[0]->{count};
1556
1557         # count of items we attempted to import, but errored, attached to records in the queue in question
1558         $query->{where}->{import_error} = {'!=' => undef};
1559         $summary->{item_import_errors} = $e->json_query($query)->[0]->{count};
1560
1561         # count of items we successfully imported attached to records in the queue in question
1562         delete $query->{where}->{import_error};
1563         $query->{where}->{import_time} = {'!=' => undef};
1564         $summary->{total_items_imported} = $e->json_query($query)->[0]->{count};
1565     }
1566
1567     return $summary;
1568 }
1569
1570 # --------------------------------------------------------------------------------
1571 # Given a list of queued record IDs, imports all items attached to those records
1572 # --------------------------------------------------------------------------------
1573 sub import_record_asset_list_impl {
1574     my($conn, $rec_ids, $requestor) = @_;
1575
1576     my $roe = new_editor(xact=> 1, requestor => $requestor);
1577
1578     # for speed, filter out any records have not been 
1579     # imported or have no import items to load
1580     $rec_ids = $roe->json_query({
1581         select => {vqbr => ['id']},
1582         from => {vqbr => 'vii'},
1583         where => {'+vqbr' => {
1584             id => $rec_ids,
1585             import_time => {'!=' => undef}
1586         }},
1587         distinct => 1
1588     });
1589     $rec_ids = [map {$_->{id}} @$rec_ids];
1590
1591     my $report_args = {
1592         conn => $conn,
1593         total => scalar(@$rec_ids),
1594         step => 1, # how often to respond
1595         progress => 1,
1596         in_count => 0,
1597     };
1598
1599     for my $rec_id (@$rec_ids) {
1600         my $rec = $roe->retrieve_vandelay_queued_bib_record($rec_id);
1601         my $item_ids = $roe->search_vandelay_import_item(
1602             {record => $rec->id, import_error => undef}, 
1603             {idlist=>1}
1604         );
1605
1606         for my $item_id (@$item_ids) {
1607             my $e = new_editor(requestor => $requestor, xact => 1);
1608             my $item = $e->retrieve_vandelay_import_item($item_id);
1609             my ($copy, $vol, $evt);
1610
1611             $$report_args{import_item} = $item;
1612             $$report_args{e} = $e;
1613             $$report_args{import_error} = undef;
1614             $$report_args{evt} = undef;
1615
1616             if (my $copy_id = $item->internal_id) { # assignment
1617                 # copy matches an existing copy.  Overlay instead of create.
1618
1619                 $logger->info("vl: performing copy overlay for internal_id=$copy_id");
1620
1621                 my $qt = $e->json_query({
1622                     select => {vbq => ['queue_type']},
1623                     from => {vqbr => 'vbq'},
1624                     where => {'+vqbr' => {id => $rec_id}}
1625                 })->[0]->{queue_type};
1626
1627                 if ($qt eq 'acq') {
1628                     # internal_id for ACQ queues refers to acq.lineitem_detail.id
1629                     # pull the real copy id from the acq LID
1630
1631                     my $lid = $e->retrieve_acq_lineitem_detail($copy_id);
1632                     if (!$lid) {
1633                         $$report_args{evt} = $e->die_event;
1634                         respond_with_status($report_args);
1635                         next;
1636                     }
1637                     $copy_id = $lid->eg_copy_id;
1638                     $logger->info("vl: performing ACQ copy overlay for copy $copy_id");
1639                 }
1640
1641                 $copy = $e->search_asset_copy([
1642                     {id => $copy_id, deleted => 'f'},
1643                     {flesh => 1, flesh_fields => {acp => ['call_number']}}
1644                 ])->[0];
1645
1646                 if (!$copy) {
1647                     $$report_args{evt} = $e->die_event;
1648                     respond_with_status($report_args);
1649                     next;
1650                 }
1651
1652                 # prevent update of unrelated copies
1653                 if ($copy->call_number->record != $rec->imported_as) {
1654                     $logger->info("vl: attempt to overlay unrelated copy=$copy_id; rec=".$rec->imported_as);
1655
1656                     $evt = OpenILS::Event->new('INVALID_IMPORT_COPY_ID', 
1657                         note => 'Cannot overlay copies for unlinked bib',
1658                         bre => $rec->imported_as, 
1659                         copy_id => $copy_id
1660                     );
1661                     $$report_args{evt} = $evt;
1662                     respond_with_status($report_args);
1663                     next;
1664                 }
1665
1666                 # overlaying copies requires an extra permission
1667                 if (!$e->allowed("IMPORT_OVERLAY_COPY", $copy->call_number->owning_lib)) {
1668                     $$report_args{evt} = $e->die_event;
1669                     respond_with_status($report_args);
1670                     next;
1671                 }
1672
1673                 # are we updating the call-number?
1674                 if ($item->call_number and $item->call_number ne $copy->call_number->label) {
1675
1676                     my $count = $e->json_query({
1677                         select => {acp => [{
1678                             alias => 'count', 
1679                             column => 'id', 
1680                             transform => 'count', 
1681                             aggregate => 1
1682                         }]},
1683                         from => 'acp',
1684                         where => {
1685                             deleted => 'f',
1686                             call_number => $copy->call_number->id
1687                         }
1688                     })->[0]->{count};
1689
1690                     if ($count == 1) {
1691                         # if this is the only copy attached to this 
1692                         # callnumber, just update the callnumber
1693
1694                         $logger->info("vl: updating callnumber label in copy overlay");
1695
1696                         $copy->call_number->label($item->call_number);
1697                         if (!$e->update_asset_call_number($copy->call_number)) {
1698                             $$report_args{evt} = $e->die_event;
1699                             respond_with_status($report_args);
1700                             next;
1701                         }
1702
1703                     } else {
1704
1705                         # otherwise, move the copy to a new/existing 
1706                         # call-number with the given label/owner
1707                         # note that overlay does not allow the owning_lib 
1708                         # to be changed.  Should it?
1709
1710                         $logger->info("vl: moving copy to new callnumber in copy overlay");
1711
1712                         ($vol, $evt) =
1713                             OpenILS::Application::Cat::AssetCommon->find_or_create_volume(
1714                                 $e, $item->call_number, 
1715                                 $copy->call_number->record, 
1716                                 $copy->call_number->owning_lib
1717                             );
1718
1719                         if($evt) {
1720                             $$report_args{evt} = $evt;
1721                             respond_with_status($report_args);
1722                             next;
1723                         }
1724
1725                         $copy->call_number($vol);
1726                     }
1727                 } # cn-update
1728
1729                 # for every field that has a non-'' value, overlay the copy value
1730                 foreach (qw/ barcode location circ_lib status 
1731                     circulate deposit deposit_amount ref holdable 
1732                     price circ_as_type alert_message opac_visible circ_modifier/) {
1733
1734                     my $val = $item->$_();
1735                     $copy->$_($val) if $val and $val ne '';
1736                 }
1737
1738                 # de-flesh for update
1739                 $copy->call_number($copy->call_number->id);
1740                 $copy->ischanged(1);
1741
1742                 $evt = OpenILS::Application::Cat::AssetCommon->
1743                     update_fleshed_copies($e, {all => 1}, undef, [$copy]);
1744
1745                 if($evt) {
1746                     $$report_args{evt} = $evt;
1747                     respond_with_status($report_args);
1748                     next;
1749                 }
1750
1751             } else { 
1752
1753                 # Creating a new copy
1754                 $logger->info("vl: creating new copy in import");
1755
1756                 # --------------------------------------------------------------------------------
1757                 # Find or create the volume
1758                 # --------------------------------------------------------------------------------
1759                 my ($vol, $evt) =
1760                     OpenILS::Application::Cat::AssetCommon->find_or_create_volume(
1761                         $e, $item->call_number, $rec->imported_as, $item->owning_lib);
1762
1763                 if($evt) {
1764                     $$report_args{evt} = $evt;
1765                     respond_with_status($report_args);
1766                     next;
1767                 }
1768
1769                 # --------------------------------------------------------------------------------
1770                 # Create the new copy
1771                 # --------------------------------------------------------------------------------
1772                 $copy = Fieldmapper::asset::copy->new;
1773                 $copy->loan_duration(2);
1774                 $copy->fine_level(2);
1775                 $copy->barcode($item->barcode);
1776                 $copy->location($item->location);
1777                 $copy->circ_lib($item->circ_lib || $item->owning_lib);
1778                 $copy->status( defined($item->status) ? $item->status : OILS_COPY_STATUS_IN_PROCESS );
1779                 $copy->circulate($item->circulate);
1780                 $copy->deposit($item->deposit);
1781                 $copy->deposit_amount($item->deposit_amount);
1782                 $copy->ref($item->ref);
1783                 $copy->holdable($item->holdable);
1784                 $copy->price($item->price);
1785                 $copy->circ_as_type($item->circ_as_type);
1786                 $copy->alert_message($item->alert_message);
1787                 $copy->opac_visible($item->opac_visible);
1788                 $copy->circ_modifier($item->circ_modifier);
1789
1790                 # --------------------------------------------------------------------------------
1791                 # Check for dupe barcode
1792                 # --------------------------------------------------------------------------------
1793                 if($evt = OpenILS::Application::Cat::AssetCommon->create_copy($e, $vol, $copy)) {
1794                     $$report_args{evt} = $evt;
1795                     $$report_args{import_error} = 'import.item.duplicate.barcode'
1796                         if $evt->{textcode} eq 'ITEM_BARCODE_EXISTS';
1797                     respond_with_status($report_args);
1798                     next;
1799                 }
1800
1801                 # --------------------------------------------------------------------------------
1802                 # create copy notes
1803                 # --------------------------------------------------------------------------------
1804                 $evt = OpenILS::Application::Cat::AssetCommon->create_copy_note(
1805                     $e, $copy, '', $item->pub_note, 1) if $item->pub_note;
1806
1807                 if($evt) {
1808                     $$report_args{evt} = $evt;
1809                     respond_with_status($report_args);
1810                     next;
1811                 }
1812
1813                 $evt = OpenILS::Application::Cat::AssetCommon->create_copy_note(
1814                     $e, $copy, '', $item->priv_note) if $item->priv_note;
1815
1816                 if($evt) {
1817                     $$report_args{evt} = $evt;
1818                     respond_with_status($report_args);
1819                     next;
1820                 }
1821             }
1822
1823             # set the import data on the import item
1824             $item->imported_as($copy->id); # $copy->id is set by create_copy() ^--
1825             $item->import_time('now');
1826
1827             unless($e->update_vandelay_import_item($item)) {
1828                 $$report_args{evt} = $e->die_event;
1829                 respond_with_status($report_args);
1830                 next;
1831             }
1832
1833             # --------------------------------------------------------------------------------
1834             # Item import succeeded
1835             # --------------------------------------------------------------------------------
1836             $e->commit;
1837             $$report_args{in_count}++;
1838             respond_with_status($report_args);
1839             $logger->info("vl: successfully imported item " . $item->barcode);
1840         }
1841     }
1842
1843     $roe->rollback;
1844     return undef;
1845 }
1846
1847
1848 sub respond_with_status {
1849     my $args = shift;
1850     my $e = $$args{e};
1851
1852     #  If the import failed, track the failure reason
1853
1854     my $error = $$args{import_error};
1855     my $evt = $$args{evt};
1856
1857     if($error or $evt) {
1858
1859         my $item = $$args{import_item};
1860         $logger->info("vl: unable to import item " . $item->barcode);
1861
1862         $error ||= 'general.unknown';
1863         $item->import_error($error);
1864
1865         if($evt) {
1866             my $detail = sprintf("%s : %s", $evt->{textcode}, substr($evt->{desc}, 0, 140));
1867             $item->error_detail($detail);
1868         }
1869
1870         # state of the editor is unknown at this point.  Force a rollback and start over.
1871         $e->rollback;
1872         $e = new_editor(xact => 1);
1873         $e->update_vandelay_import_item($item);
1874         $e->commit;
1875     }
1876
1877     if($$args{report_all} or ($$args{progress} % $$args{step}) == 0) {
1878         $$args{conn}->respond({
1879             total => $$args{total},
1880             progress => $$args{progress},
1881             success_count => $$args{success_count},
1882             err_event => $evt
1883         });
1884         $$args{step} *= 2 unless $$args{step} == 256;
1885     }
1886
1887     $$args{progress}++;
1888 }
1889
1890 __PACKAGE__->register_method(  
1891     api_name    => "open-ils.vandelay.match_set.get_tree",
1892     method      => "match_set_get_tree",
1893     api_level   => 1,
1894     argc        => 2,
1895     signature   => {
1896         desc    => q/For a given vms object, return a tree of match set points
1897                     represented by a vmsp object with recursively fleshed
1898                     children./
1899     }
1900 );
1901
1902 sub match_set_get_tree {
1903     my ($self, $conn, $authtoken, $match_set_id) = @_;
1904
1905     $match_set_id = int($match_set_id) or return;
1906
1907     my $e = new_editor("authtoken" => $authtoken);
1908     $e->checkauth or return $e->die_event;
1909
1910     my $set = $e->retrieve_vandelay_match_set($match_set_id) or
1911         return $e->die_event;
1912
1913     $e->allowed("ADMIN_IMPORT_MATCH_SET", $set->owner) or
1914         return $e->die_event;
1915
1916     my $tree = $e->search_vandelay_match_set_point([
1917         {"match_set" => $match_set_id, "parent" => undef},
1918         {"flesh" => -1, "flesh_fields" => {"vmsp" => ["children"]}}
1919     ]) or return $e->die_event;
1920
1921     return pop @$tree;
1922 }
1923
1924
1925 __PACKAGE__->register_method(
1926     api_name    => "open-ils.vandelay.match_set.update",
1927     method      => "match_set_update_tree",
1928     api_level   => 1,
1929     argc        => 3,
1930     signature   => {
1931         desc => q/Replace any vmsp objects associated with a given (by ID) vms
1932                 with the given objects (recursively fleshed vmsp tree)./
1933     }
1934 );
1935
1936 sub _walk_new_vmsp {
1937     my ($e, $match_set_id, $node, $parent_id) = @_;
1938
1939     my $point = new Fieldmapper::vandelay::match_set_point;
1940     $point->parent($parent_id);
1941     $point->match_set($match_set_id);
1942     $point->$_($node->$_) for (qw/bool_op svf tag subfield negate quality/);
1943
1944     $e->create_vandelay_match_set_point($point) or return $e->die_event;
1945
1946     $parent_id = $e->data->id;
1947     if ($node->children && @{$node->children}) {
1948         for (@{$node->children}) {
1949             return $e->die_event if
1950                 _walk_new_vmsp($e, $match_set_id, $_, $parent_id);
1951         }
1952     }
1953
1954     return;
1955 }
1956
1957 sub match_set_update_tree {
1958     my ($self, $conn, $authtoken, $match_set_id, $tree) = @_;
1959
1960     my $e = new_editor("xact" => 1, "authtoken" => $authtoken);
1961     $e->checkauth or return $e->die_event;
1962
1963     my $set = $e->retrieve_vandelay_match_set($match_set_id) or
1964         return $e->die_event;
1965
1966     $e->allowed("ADMIN_IMPORT_MATCH_SET", $set->owner) or
1967         return $e->die_event;
1968
1969     my $existing = $e->search_vandelay_match_set_point([
1970         {"match_set" => $match_set_id},
1971         {"order_by" => {"vmsp" => "id DESC"}}
1972     ]) or return $e->die_event;
1973
1974     # delete points, working up from leaf points to the root
1975     while(@$existing) {
1976         for my $point (shift @$existing) {
1977             if( grep {$_->parent eq $point->id} @$existing) {
1978                 push(@$existing, $point);
1979             } else {
1980                 $e->delete_vandelay_match_set_point($point) or return $e->die_event;
1981             }
1982         }
1983     }
1984
1985     _walk_new_vmsp($e, $match_set_id, $tree);
1986
1987     $e->commit or return $e->die_event;
1988 }
1989
1990 __PACKAGE__->register_method(  
1991     api_name    => 'open-ils.vandelay.bib_queue.to_bucket',
1992     method      => 'bib_queue_to_bucket',
1993     api_level   => 1,
1994     argc        => 2,
1995     signature   => {
1996         desc    => q/Add to or create a new bib container (bucket) with the successfully 
1997                     imported contents of a vandelay queue.  Any user that has Vandelay 
1998                     queue create permissions can append or create buckets from his-her own queues./,
1999         params  => [
2000             {desc => 'Authtoken', type => 'string'},
2001             {desc => 'Queue ID', type => 'number'},
2002             {desc => 'Bucket Name', type => 'string'}
2003         ],
2004         return  => {desc => q/
2005             {bucket => $bucket, addcount => number-of-items-added-to-bucket, item_count => total-bucket-item-count} on success,
2006             {add_count => 0} if there is nothing to do, and Event on error/}
2007     }
2008 );
2009
2010 sub bib_queue_to_bucket {
2011     my ($self, $conn, $auth, $q_id, $bucket_name) = @_;
2012
2013     my $e = new_editor(xact => 1, authtoken => $auth);
2014     return $e->die_event unless $e->checkauth;
2015     
2016     my $queue = $e->retrieve_vandelay_bib_queue($q_id)
2017         or return $e->die_event;
2018
2019     return OpenILS::Event->new('BAD_PARAMS', 
2020         note => q/Bucket creator must be queue owner/)
2021         unless $queue->owner == $e->requestor->id;
2022
2023     # find the bib IDs that will go into the bucket
2024     my $bib_ids = $e->json_query({
2025         select => {vqbr => ['imported_as']},
2026         from => 'vqbr',
2027         where => {queue => $q_id, imported_as => {'!=' => undef}}
2028     });
2029
2030     if (!@$bib_ids) { # no records to add
2031         $e->rollback;
2032         return {add_count => 0};
2033     }
2034
2035     # allow user to add to an existing bucket by name
2036     my $bucket = $e->search_container_biblio_record_entry_bucket({
2037         owner => $e->requestor->id, 
2038         name => $bucket_name,
2039         btype => 'vandelay_queue'
2040     })->[0];
2041
2042     # if the bucket does not exist, create a new one
2043     if (!$bucket) { 
2044         $bucket = Fieldmapper::container::biblio_record_entry_bucket->new;
2045         $bucket->name($bucket_name);
2046         $bucket->owner($e->requestor->id);
2047         $bucket->btype('vandelay_queue');
2048
2049         $e->create_container_biblio_record_entry_bucket($bucket)
2050             or return $e->die_event;
2051     }
2052
2053     # create the new bucket items
2054     for my $bib_id ( map {$_->{imported_as}} @$bib_ids ) {
2055         my $item = Fieldmapper::container::biblio_record_entry_bucket_item->new;
2056         $item->target_biblio_record_entry($bib_id);
2057         $item->bucket($bucket->id);
2058         $e->create_container_biblio_record_entry_bucket_item($item)
2059             or return $e->die_event;
2060     }
2061
2062     # re-fetch the bucket to pick up the correct create_time
2063     $bucket = $e->retrieve_container_biblio_record_entry_bucket($bucket->id)
2064         or return $e->die_event;
2065
2066     # get the total count of items in this bucket
2067     my $count = $e->json_query({
2068         select => {cbrebi => [{
2069             aggregate =>  1,
2070             transform => 'count',
2071             alias => 'count',
2072             column => 'id'
2073         }]},
2074         from => 'cbrebi',
2075         where => {bucket => $bucket->id}
2076     })->[0];
2077
2078     $e->commit;
2079
2080     return {
2081         bucket => $bucket, 
2082         add_count => scalar(@$bib_ids), # items added to the bucket
2083         item_count => $count->{count} # total items in buckets
2084     };
2085 }
2086
2087 1;