]> git.evergreen-ils.org Git - working/Evergreen.git/blob - Open-ILS/src/perlmods/lib/OpenILS/Application/Vandelay.pm
Vandelay record bucket-limited matching
[working/Evergreen.git] / Open-ILS / src / perlmods / lib / OpenILS / Application / Vandelay.pm
1 package OpenILS::Application::Vandelay;
2 use strict; use warnings;
3 use OpenILS::Application;
4 use base qw/OpenILS::Application/;
5 use Unicode::Normalize;
6 use OpenSRF::EX qw/:try/;
7 use OpenSRF::AppSession;
8 use OpenSRF::Utils::SettingsClient;
9 use OpenSRF::Utils::Cache;
10 use OpenILS::Utils::Fieldmapper;
11 use OpenILS::Utils::CStoreEditor qw/:funcs/;
12 use OpenILS::Utils::Normalize qw/clean_marc/;
13 use MARC::Batch;
14 use MARC::Record;
15 use MARC::File::XML ( BinaryEncoding => 'UTF-8' );
16 use Time::HiRes qw(time);
17 use OpenSRF::Utils::Logger qw/$logger/;
18 use MIME::Base64;
19 use XML::LibXML;
20 use OpenILS::Const qw/:const/;
21 use OpenILS::Application::AppUtils;
22 use OpenILS::Application::Cat::BibCommon;
23 use OpenILS::Application::Cat::AuthCommon;
24 use OpenILS::Application::Cat::AssetCommon;
25 my $U = 'OpenILS::Application::AppUtils';
26
27 # A list of LDR/06 values from http://loc.gov/marc
28 my %record_types = (
29         a => 'bib',
30         c => 'bib',
31         d => 'bib',
32         e => 'bib',
33         f => 'bib',
34         g => 'bib',
35         i => 'bib',
36         j => 'bib',
37         k => 'bib',
38         m => 'bib',
39         o => 'bib',
40         p => 'bib',
41         r => 'bib',
42         t => 'bib',
43         u => 'holdings',
44         v => 'holdings',
45         x => 'holdings',
46         y => 'holdings',
47         z => 'auth',
48       ' ' => 'bib',
49 );
50
51 sub initialize {}
52 sub child_init {}
53
54 # --------------------------------------------------------------------------------
55 # Biblio ingest
56
57 sub create_bib_queue {
58     my $self = shift;
59     my $client = shift;
60     my $auth = shift;
61     my $name = shift;
62     my $owner = shift;
63     my $type = shift;
64     my $match_set = shift;
65     my $import_def = shift;
66     my $match_bucket = shift;
67
68     my $e = new_editor(authtoken => $auth, xact => 1);
69
70     return $e->die_event unless $e->checkauth;
71     return $e->die_event unless $e->allowed('CREATE_BIB_IMPORT_QUEUE');
72     $owner ||= $e->requestor->id;
73
74     if ($e->search_vandelay_bib_queue( {name => $name, owner => $owner, queue_type => $type})->[0]) {
75         $e->rollback;
76         return OpenILS::Event->new('BIB_QUEUE_EXISTS') 
77     }
78
79     my $queue = new Fieldmapper::vandelay::bib_queue();
80     $queue->name( $name );
81     $queue->owner( $owner );
82     $queue->queue_type( $type ) if ($type);
83     $queue->item_attr_def( $import_def ) if ($import_def);
84     $queue->match_set($match_set) if $match_set;
85     $queue->match_bucket($match_bucket);
86
87     my $new_q = $e->create_vandelay_bib_queue( $queue );
88     return $e->die_event unless ($new_q);
89     $e->commit;
90
91     return $new_q;
92 }
93 __PACKAGE__->register_method(  
94     api_name   => "open-ils.vandelay.bib_queue.create",
95     method     => "create_bib_queue",
96     api_level  => 1,
97     argc       => 4,
98 );                      
99
100
101 sub create_auth_queue {
102     my $self = shift;
103     my $client = shift;
104     my $auth = shift;
105     my $name = shift;
106     my $owner = shift;
107     my $type = shift;
108     my $match_set = shift;
109
110     my $e = new_editor(authtoken => $auth, xact => 1);
111
112     return $e->die_event unless $e->checkauth;
113     return $e->die_event unless $e->allowed('CREATE_AUTHORITY_IMPORT_QUEUE');
114     $owner ||= $e->requestor->id;
115
116     if ($e->search_vandelay_authority_queue({name => $name, owner => $owner, queue_type => $type})->[0]) {
117         $e->rollback;
118         return OpenILS::Event->new('AUTH_QUEUE_EXISTS') 
119     }
120
121     my $queue = new Fieldmapper::vandelay::authority_queue();
122     $queue->name( $name );
123     $queue->owner( $owner );
124     $queue->queue_type( $type ) if ($type);
125
126     my $new_q = $e->create_vandelay_authority_queue( $queue );
127     $e->die_event unless ($new_q);
128     $e->commit;
129
130     return $new_q;
131 }
132 __PACKAGE__->register_method(  
133     api_name   => "open-ils.vandelay.authority_queue.create",
134     method     => "create_auth_queue",
135     api_level  => 1,
136     argc       => 3,
137 );                      
138
139 sub add_record_to_bib_queue {
140     my $self = shift;
141     my $client = shift;
142     my $auth = shift;
143     my $queue = shift;
144     my $marc = shift;
145     my $purpose = shift;
146     my $bib_source = shift;
147
148     my $e = new_editor(authtoken => $auth, xact => 1);
149
150     $queue = $e->retrieve_vandelay_bib_queue($queue);
151
152     return $e->die_event unless $e->checkauth;
153     return $e->die_event unless
154         ($e->allowed('CREATE_BIB_IMPORT_QUEUE', undef, $queue) ||
155          $e->allowed('CREATE_BIB_IMPORT_QUEUE'));
156
157     my $new_rec = _add_bib_rec($e, $marc, $queue->id, $purpose, $bib_source);
158
159     return $e->die_event unless ($new_rec);
160     $e->commit;
161     return $new_rec;
162 }
163 __PACKAGE__->register_method(  
164     api_name   => "open-ils.vandelay.queued_bib_record.create",
165     method     => "add_record_to_bib_queue",
166     api_level  => 1,
167     argc       => 3,
168 );                      
169
170 sub _add_bib_rec {
171     my $e = shift;
172     my $marc = shift;
173     my $queue = shift;
174     my $purpose = shift;
175     my $bib_source = shift;
176
177     my $rec = new Fieldmapper::vandelay::queued_bib_record();
178     $rec->marc( $marc );
179     $rec->queue( $queue );
180     $rec->purpose( $purpose ) if ($purpose);
181     $rec->bib_source($bib_source);
182
183     return $e->create_vandelay_queued_bib_record( $rec );
184 }
185
186 sub add_record_to_authority_queue {
187     my $self = shift;
188     my $client = shift;
189     my $auth = shift;
190     my $queue = shift;
191     my $marc = shift;
192     my $purpose = shift;
193
194     my $e = new_editor(authtoken => $auth, xact => 1);
195
196     $queue = $e->retrieve_vandelay_authority_queue($queue);
197
198     return $e->die_event unless $e->checkauth;
199     return $e->die_event unless
200         ($e->allowed('CREATE_AUTHORITY_IMPORT_QUEUE', undef, $queue) ||
201          $e->allowed('CREATE_AUTHORITY_IMPORT_QUEUE'));
202
203     my $new_rec = _add_auth_rec($e, $marc, $queue->id, $purpose);
204
205     return $e->die_event unless ($new_rec);
206     $e->commit;
207     return $new_rec;
208 }
209 __PACKAGE__->register_method(
210     api_name   => "open-ils.vandelay.queued_authority_record.create",
211     method     => "add_record_to_authority_queue",
212     api_level  => 1,
213     argc       => 3,
214 );
215
216 sub _add_auth_rec {
217     my $e = shift;
218     my $marc = shift;
219     my $queue = shift;
220     my $purpose = shift;
221
222     my $rec = new Fieldmapper::vandelay::queued_authority_record();
223     $rec->marc( $marc );
224     $rec->queue( $queue );
225     $rec->purpose( $purpose ) if ($purpose);
226
227     return $e->create_vandelay_queued_authority_record( $rec );
228 }
229
230 sub process_spool {
231     my $self = shift;
232     my $client = shift;
233     my $auth = shift;
234     my $fingerprint = shift || '';
235     my $queue_id = shift;
236     my $purpose = shift;
237     my $filename = shift;
238     my $bib_source = shift;
239
240     my $e = new_editor(authtoken => $auth, xact => 1);
241     return $e->die_event unless $e->checkauth;
242
243     my $queue;
244     my $type = $self->{record_type};
245
246     if($type eq 'bib') {
247         $queue = $e->retrieve_vandelay_bib_queue($queue_id) or return $e->die_event;
248     } else {
249         $queue = $e->retrieve_vandelay_authority_queue($queue_id) or return $e->die_event;
250     }
251
252     my $evt = check_queue_perms($e, $type, $queue);
253     return $evt if ($evt);
254
255     my $cache = new OpenSRF::Utils::Cache();
256
257     if($fingerprint) {
258         my $data = $cache->get_cache('vandelay_import_spool_' . $fingerprint);
259         $purpose = $data->{purpose};
260         $filename = $data->{path};
261         $bib_source = $data->{bib_source};
262     }
263
264     unless(-r $filename) {
265         $logger->error("unable to read MARC file $filename");
266         return -1; # make this an event XXX
267     }
268
269     $logger->info("vandelay spooling $fingerprint purpose=$purpose file=$filename");
270
271     my $marctype = 'USMARC'; 
272
273     open F, $filename;
274     $marctype = 'XML' if (getc(F) =~ /^\D/o);
275     close F;
276
277     my $batch = new MARC::Batch ($marctype, $filename);
278     $batch->strict_off;
279
280     my $response_scale = 10;
281     my $count = 0;
282     my $r = -1;
283     while (try { $r = $batch->next } otherwise { $r = -1 }) {
284         if ($r == -1) {
285             $logger->warn("Processing of record $count in set $filename failed.  Skipping this record");
286             $count++;
287         }
288
289         $logger->info("processing record $count");
290
291         try {
292             my $xml = clean_marc($r);
293
294             my $qrec;
295             # Check the leader to ensure we've got something resembling the expected
296             # Allow spaces to give records the benefit of the doubt
297             my $ldr_type = substr($r->leader(), 6, 1);
298             if ($type eq 'bib' && ($record_types{$ldr_type}) eq 'bib' || $ldr_type eq ' ') {
299                 $qrec = _add_bib_rec( $e, $xml, $queue_id, $purpose, $bib_source ) or return $e->die_event;
300             } elsif ($type eq 'auth' && ($record_types{$ldr_type}) eq 'auth' || $ldr_type eq ' ') {
301                 $qrec = _add_auth_rec( $e, $xml, $queue_id, $purpose ) or return $e->die_event;
302             } else {
303                 # I don't know how to handle this type; rock on
304                 $logger->error("In process_spool(), type was $type and leader type was $ldr_type ; not currently supported");
305                 next;
306             }
307
308             if($self->api_name =~ /stream_results/ and $qrec) {
309                 $client->respond($qrec->id)
310             } else {
311                 $client->respond($count) if (++$count % $response_scale) == 0;
312                 $response_scale *= 10 if ($count == ($response_scale * 10));
313             }
314         } catch Error with {
315             my $error = shift;
316             $logger->warn("Encountered a bad record at Vandelay ingest: ".$error);
317         }
318     }
319
320     $e->commit;
321     unlink($filename);
322     $cache->delete_cache('vandelay_import_spool_' . $fingerprint) if $fingerprint;
323     return $count;
324 }
325
326 __PACKAGE__->register_method(  
327     api_name    => "open-ils.vandelay.bib.process_spool",
328     method      => "process_spool",
329     api_level   => 1,
330     argc        => 3,
331     max_chunk_size => 0,
332     record_type => 'bib'
333 );                      
334 __PACKAGE__->register_method(  
335     api_name    => "open-ils.vandelay.auth.process_spool",
336     method      => "process_spool",
337     api_level   => 1,
338     argc        => 3,
339     max_chunk_size => 0,
340     record_type => 'auth'
341 );                      
342
343 __PACKAGE__->register_method(  
344     api_name    => "open-ils.vandelay.bib.process_spool.stream_results",
345     method      => "process_spool",
346     api_level   => 1,
347     argc        => 3,
348     stream      => 1,
349     max_chunk_size => 0,
350     record_type => 'bib'
351 );                      
352 __PACKAGE__->register_method(  
353     api_name    => "open-ils.vandelay.auth.process_spool.stream_results",
354     method      => "process_spool",
355     api_level   => 1,
356     argc        => 3,
357     stream      => 1,
358     max_chunk_size => 0,
359     record_type => 'auth'
360 );
361
362 __PACKAGE__->register_method(  
363     api_name    => "open-ils.vandelay.bib_queue.records.retrieve",
364     method      => 'retrieve_queued_records',
365     api_level   => 1,
366     argc        => 2,
367     stream      => 1,
368     record_type => 'bib'
369 );
370 __PACKAGE__->register_method(
371     api_name    => "open-ils.vandelay.bib_queue.records.retrieve.export.print",
372     method      => 'retrieve_queued_records',
373     api_level   => 1,
374     argc        => 2,
375     stream      => 1,
376     record_type => 'bib'
377 );
378 __PACKAGE__->register_method(
379     api_name    => "open-ils.vandelay.bib_queue.records.retrieve.export.csv",
380     method      => 'retrieve_queued_records',
381     api_level   => 1,
382     argc        => 2,
383     stream      => 1,
384     record_type => 'bib'
385 );
386 __PACKAGE__->register_method(
387     api_name    => "open-ils.vandelay.bib_queue.records.retrieve.export.email",
388     method      => 'retrieve_queued_records',
389     api_level   => 1,
390     argc        => 2,
391     stream      => 1,
392     record_type => 'bib'
393 );
394
395 __PACKAGE__->register_method(  
396     api_name    => "open-ils.vandelay.auth_queue.records.retrieve",
397     method      => 'retrieve_queued_records',
398     api_level   => 1,
399     argc        => 2,
400     stream      => 1,
401     record_type => 'auth'
402 );
403 __PACKAGE__->register_method(
404     api_name    => "open-ils.vandelay.auth_queue.records.retrieve.export.print",
405     method      => 'retrieve_queued_records',
406     api_level   => 1,
407     argc        => 2,
408     stream      => 1,
409     record_type => 'auth'
410 );
411 __PACKAGE__->register_method(
412     api_name    => "open-ils.vandelay.auth_queue.records.retrieve.export.csv",
413     method      => 'retrieve_queued_records',
414     api_level   => 1,
415     argc        => 2,
416     stream      => 1,
417     record_type => 'auth'
418 );
419 __PACKAGE__->register_method(
420     api_name    => "open-ils.vandelay.auth_queue.records.retrieve.export.email",
421     method      => 'retrieve_queued_records',
422     api_level   => 1,
423     argc        => 2,
424     stream      => 1,
425     record_type => 'auth'
426 );
427
428 __PACKAGE__->register_method(  
429     api_name    => "open-ils.vandelay.bib_queue.records.matches.retrieve",
430     method      => 'retrieve_queued_records',
431     api_level   => 1,
432     argc        => 2,
433     stream      => 1,
434     record_type => 'bib',
435     signature   => {
436         desc => q/Only retrieve queued bib records that have matches against existing records/
437     }
438 );
439 __PACKAGE__->register_method(  
440     api_name    => "open-ils.vandelay.auth_queue.records.matches.retrieve",
441     method      => 'retrieve_queued_records',
442     api_level   => 1,
443     argc        => 2,
444     stream      => 1,
445     record_type => 'auth',
446     signature   => {
447         desc => q/Only retrieve queued authority records that have matches against existing records/
448     }
449 );
450
451 sub retrieve_queued_records {
452     my($self, $conn, $auth, $queue_id, $options) = @_;
453
454     $options ||= {};
455     my $limit = $$options{limit} || 20;
456     my $offset = $$options{offset} || 0;
457     my $type = $self->{record_type};
458
459     my $e = new_editor(authtoken => $auth, xact => 1);
460     return $e->die_event unless $e->checkauth;
461
462     my $queue;
463     if($type eq 'bib') {
464         $queue = $e->retrieve_vandelay_bib_queue($queue_id) or return $e->die_event;
465     } else {
466         $queue = $e->retrieve_vandelay_authority_queue($queue_id) or return $e->die_event;
467     }
468     my $evt = check_queue_perms($e, $type, $queue);
469     return $evt if ($evt);
470
471     my $class = ($type eq 'bib') ? 'vqbr' : 'vqar';
472     my $query = {
473         select => {$class => ['id']},
474         from => $class,
475         where => {queue => $queue_id},
476         distinct => 1,
477         order_by => {$class => ['id']}, 
478         limit => $limit,
479         offset => $offset,
480     };
481     if($self->api_name =~ /export/) {
482         delete $query->{limit};
483         delete $query->{offset};
484     }
485
486     $query->{where}->{import_time} = undef if $$options{non_imported};
487
488     if($$options{with_import_error}) {
489
490         $query->{from} = {$class => {vii => {type => 'left'}}};
491         $query->{where}->{'-or'} = [
492             {'+vqbr' => {import_error => {'!=' => undef}}},
493             {'+vii' => {import_error => {'!=' => undef}}}
494         ];
495
496     } else {
497         
498         if($$options{with_rec_import_error}) {
499             $query->{where}->{import_error} = {'!=' => undef};
500
501         } elsif( $$options{with_item_import_error} and $type eq 'bib') {
502
503             $query->{from} = {$class => 'vii'};
504             $query->{where}->{'+vii'} = {import_error => {'!=' => undef}};
505         }
506     }
507
508     if($self->api_name =~ /matches/) {
509         # find only records that have matches
510         my $mclass = $type eq 'bib' ? 'vbm' : 'vam';
511         $query->{from} = {$class => {$mclass => {type => 'right'}}};
512     } 
513
514     my $record_ids = $e->json_query($query);
515
516     my $retrieve = ($type eq 'bib') ? 
517         'retrieve_vandelay_queued_bib_record' : 'retrieve_vandelay_queued_authority_record';
518     my $search = ($type eq 'bib') ? 
519         'search_vandelay_queued_bib_record' : 'search_vandelay_queued_authority_record';
520
521     if ($self->api_name =~ /export/) {
522         my $rec_list = $e->$search({id => [map { $_->{id} } @$record_ids]}, {substream => 1});
523         if ($self->api_name =~ /print/) {
524
525             $e->rollback;
526             return $U->fire_object_event(
527                 undef,
528                 'vandelay.queued_'.$type.'_record.print',
529                 $rec_list,
530                 $e->requestor->ws_ou
531             );
532
533         } elsif ($self->api_name =~ /csv/) {
534
535             $e->rollback;
536             return $U->fire_object_event(
537                 undef,
538                 'vandelay.queued_'.$type.'_record.csv',
539                 $rec_list,
540                 $e->requestor->ws_ou
541             );
542
543         } elsif ($self->api_name =~ /email/) {
544
545             $conn->respond_complete(1);
546
547             for my $rec (@$rec_list) {
548                 $U->create_events_for_hook(
549                     'vandelay.queued_'.$type.'_record.email',
550                     $rec,
551                     $e->requestor->home_ou,
552                     undef,
553                     undef,
554                     1
555                 );
556             }
557
558         }
559     } else {
560         for my $rec_id (@$record_ids) {
561             my $flesh = ['attributes', 'matches'];
562             push(@$flesh, 'import_items') if $$options{flesh_import_items};
563             my $params = {flesh => 1, flesh_fields => {$class => $flesh}};
564             my $rec = $e->$retrieve([$rec_id->{id}, $params]);
565             $rec->clear_marc if $$options{clear_marc};
566             $conn->respond($rec);
567         }
568     }
569
570     $e->rollback;
571     return undef;
572 }
573
574 __PACKAGE__->register_method(  
575     api_name    => 'open-ils.vandelay.import_item.queue.retrieve',
576     method      => 'retrieve_queue_import_items',
577     api_level   => 1,
578     argc        => 2,
579     stream      => 1,
580     authoritative => 1,
581     signature => q/
582         Returns Import Item (vii) objects for the selected queue.
583         Filter options:
584             with_import_error : only return items that failed to import
585     /
586 );
587 __PACKAGE__->register_method(
588     api_name    => 'open-ils.vandelay.import_item.queue.export.print',
589     method      => 'retrieve_queue_import_items',
590     api_level   => 1,
591     argc        => 2,
592     stream      => 1,
593     authoritative => 1,
594     signature => q/
595         Returns template-generated printable output of Import Item (vii) objects for the selected queue.
596         Filter options:
597             with_import_error : only return items that failed to import
598     /
599 );
600 __PACKAGE__->register_method(
601     api_name    => 'open-ils.vandelay.import_item.queue.export.csv',
602     method      => 'retrieve_queue_import_items',
603     api_level   => 1,
604     argc        => 2,
605     stream      => 1,
606     authoritative => 1,
607     signature => q/
608         Returns template-generated CSV output of Import Item (vii) objects for the selected queue.
609         Filter options:
610             with_import_error : only return items that failed to import
611     /
612 );
613 __PACKAGE__->register_method(
614     api_name    => 'open-ils.vandelay.import_item.queue.export.email',
615     method      => 'retrieve_queue_import_items',
616     api_level   => 1,
617     argc        => 2,
618     stream      => 1,
619     authoritative => 1,
620     signature => q/
621         Emails template-generated output of Import Item (vii) objects for the selected queue.
622         Filter options:
623             with_import_error : only return items that failed to import
624     /
625 );
626
627 sub retrieve_queue_import_items {
628     my($self, $conn, $auth, $q_id, $options) = @_;
629
630     $options ||= {};
631     my $limit = $$options{limit} || 20;
632     my $offset = $$options{offset} || 0;
633
634     my $e = new_editor(authtoken => $auth);
635     return $e->event unless $e->checkauth;
636
637     my $queue = $e->retrieve_vandelay_bib_queue($q_id) or return $e->event;
638     my $evt = check_queue_perms($e, 'bib', $queue);
639     return $evt if $evt;
640
641     my $query = {
642         select => {vii => ['id']},
643         from => {
644             vii => {
645                 vqbr => {
646                     join => {
647                         'vbq' => {
648                             field => 'id',
649                             fkey => 'queue',
650                             filter => {id => $q_id}
651                         }
652                     }
653                 }
654             }
655         },
656         order_by => {'vii' => ['record','id']},
657         limit => $limit,
658         offset => $offset
659     };
660     if($self->api_name =~ /export/) {
661         delete $query->{limit};
662         delete $query->{offset};
663     }
664
665     $query->{where} = {'+vii' => {import_error => {'!=' => undef}}}
666         if $$options{with_import_error};
667
668     my $items = $e->json_query($query);
669     my $item_list = $e->search_vandelay_import_item({id => [map { $_->{id} } @$items]});
670     if ($self->api_name =~ /export/) {
671         if ($self->api_name =~ /print/) {
672
673             return $U->fire_object_event(
674                 undef,
675                 'vandelay.import_items.print',
676                 $item_list,
677                 $e->requestor->ws_ou
678             );
679
680         } elsif ($self->api_name =~ /csv/) {
681
682             return $U->fire_object_event(
683                 undef,
684                 'vandelay.import_items.csv',
685                 $item_list,
686                 $e->requestor->ws_ou
687             );
688
689         } elsif ($self->api_name =~ /email/) {
690
691             $conn->respond_complete(1);
692
693             for my $item (@$item_list) {
694                 $U->create_events_for_hook(
695                     'vandelay.import_items.email',
696                     $item,
697                     $e->requestor->home_ou,
698                     undef,
699                     undef,
700                     1
701                 );
702             }
703
704         }
705     } else {
706         for my $item (@$item_list) {
707             $conn->respond($item);
708         }
709     }
710
711     return undef;
712 }
713
714 sub check_queue_perms {
715     my($e, $type, $queue) = @_;
716     if ($type eq 'bib') {
717         return $e->die_event unless
718             ($e->allowed('CREATE_BIB_IMPORT_QUEUE', undef, $queue) ||
719              $e->allowed('CREATE_BIB_IMPORT_QUEUE'));
720     } else {
721         return $e->die_event unless
722             ($e->allowed('CREATE_AUTHORITY_IMPORT_QUEUE', undef, $queue) ||
723              $e->allowed('CREATE_AUTHORITY_IMPORT_QUEUE'));
724     }
725
726     return undef;
727 }
728
729 __PACKAGE__->register_method(  
730     api_name    => "open-ils.vandelay.bib_record.list.import",
731     method      => 'import_record_list',
732     api_level   => 1,
733     argc        => 2,
734     stream      => 1,
735     record_type => 'bib'
736 );
737
738 __PACKAGE__->register_method(  
739     api_name    => "open-ils.vandelay.auth_record.list.import",
740     method      => 'import_record_list',
741     api_level   => 1,
742     argc        => 2,
743     stream      => 1,
744     record_type => 'auth'
745 );
746
747 sub import_record_list {
748     my($self, $conn, $auth, $rec_ids, $args) = @_;
749     my $e = new_editor(authtoken => $auth, xact => 1);
750     return $e->die_event unless $e->checkauth;
751     $args ||= {};
752     my $err = import_record_list_impl($self, $conn, $rec_ids, $e->requestor, $args);
753     try {$e->rollback} otherwise {}; 
754     return $err if $err;
755     return {complete => 1};
756 }
757
758
759 __PACKAGE__->register_method(  
760     api_name    => "open-ils.vandelay.bib_queue.import",
761     method      => 'import_queue',
762     api_level   => 1,
763     argc        => 2,
764     stream      => 1,
765     max_chunk_size => 0,
766     record_type => 'bib',
767     signature => {
768         desc => q/
769             Attempts to import all non-imported records for the selected queue.
770             Will also attempt import of all non-imported items.
771         /
772     }
773 );
774
775 __PACKAGE__->register_method(  
776     api_name    => "open-ils.vandelay.auth_queue.import",
777     method      => 'import_queue',
778     api_level   => 1,
779     argc        => 2,
780     stream      => 1,
781     max_chunk_size => 0,
782     record_type => 'auth'
783 );
784
785 sub import_queue {
786     my($self, $conn, $auth, $q_id, $options) = @_;
787     my $e = new_editor(authtoken => $auth, xact => 1);
788     return $e->die_event unless $e->checkauth;
789     $options ||= {};
790     my $type = $self->{record_type};
791     my $class = ($type eq 'bib') ? 'vqbr' : 'vqar';
792
793     # First, collect the not-yet-imported records
794     my $query = {queue => $q_id, import_time => undef};
795     my $search = ($type eq 'bib') ? 
796         'search_vandelay_queued_bib_record' : 
797         'search_vandelay_queued_authority_record';
798     my $rec_ids = $e->$search($query, {idlist => 1});
799
800     # Now add any imported records that have un-imported items
801
802     if($type eq 'bib') {
803         my $item_recs = $e->json_query({
804             select => {vqbr => ['id']},
805             from => {vqbr => 'vii'},
806             where => {
807                 '+vqbr' => {
808                     queue => $q_id,
809                     import_time => {'!=' => undef}
810                 },
811                 '+vii' => {import_time => undef}
812             },
813             distinct => 1
814         });
815         push(@$rec_ids, map {$_->{id}} @$item_recs);
816     }
817
818     my $err = import_record_list_impl($self, $conn, $rec_ids, $e->requestor, $options);
819     try {$e->rollback} otherwise {}; # only using this to make the read authoritative -- don't die from it
820     return $err if $err;
821     return {complete => 1};
822 }
823
824 # returns a list of queued record IDs for a given queue that 
825 # have at least one entry in the match table
826 # XXX DEPRECATED?
827 sub queued_records_with_matches {
828     my($e, $type, $q_id, $limit, $offset, $filter) = @_;
829
830     my $match_class = 'vbm';
831     my $rec_class = 'vqbr';
832     if($type eq 'auth') {
833         $match_class = 'vam';
834          $rec_class = 'vqar';
835     }
836
837     $filter ||= {};
838     $filter->{queue} = $q_id;
839
840     my $query = {
841         distinct => 1, 
842         select => {$match_class => ['queued_record']}, 
843         from => {
844             $match_class => {
845                 $rec_class => {
846                     field => 'id',
847                     fkey => 'queued_record',
848                     filter => $filter,
849                 }
850             }
851         }
852     };        
853
854     if($limit or defined $offset) {
855         $limit ||= 20;
856         $offset ||= 0;
857         $query->{limit} = $limit;
858         $query->{offset} = $offset;
859     }
860
861     my $data = $e->json_query($query);
862     return [ map {$_->{queued_record}} @$data ];
863 }
864
865
866 sub import_record_list_impl {
867     my($self, $conn, $rec_ids, $requestor, $args) = @_;
868
869     my $overlay_map = $args->{overlay_map} || {};
870     my $type = $self->{record_type};
871     my %queues;
872
873     my $report_args = {
874         progress => 1,
875         step => 1,
876         conn => $conn,
877         total => scalar(@$rec_ids),
878         report_all => $$args{report_all}
879     };
880
881     $conn->max_chunk_count(1) if $$args{report_all};
882
883     my $auto_overlay_exact = $$args{auto_overlay_exact};
884     my $auto_overlay_1match = $$args{auto_overlay_1match};
885     my $auto_overlay_best = $$args{auto_overlay_best_match};
886     my $match_quality_ratio = $$args{match_quality_ratio};
887     my $merge_profile = $$args{merge_profile};
888     my $ft_merge_profile = $$args{fall_through_merge_profile};
889     my $bib_source = $$args{bib_source};
890     my $import_no_match = $$args{import_no_match};
891     my $strip_grps = $$args{strip_field_groups}; # bib-only
892
893     my $overlay_func = 'vandelay.overlay_bib_record';
894     my $auto_overlay_func = 'vandelay.auto_overlay_bib_record';
895     my $auto_overlay_best_func = 'vandelay.auto_overlay_bib_record_with_best'; # XXX bib-only
896     my $retrieve_func = 'retrieve_vandelay_queued_bib_record';
897     my $update_func = 'update_vandelay_queued_bib_record';
898     my $search_func = 'search_vandelay_queued_bib_record';
899     my $retrieve_queue_func = 'retrieve_vandelay_bib_queue';
900     my $update_queue_func = 'update_vandelay_bib_queue';
901     my $delete_queue_func = 'delete_vandelay_bib_queue';
902     my $rec_class = 'vqbr';
903
904     my $editor = new_editor();
905
906     my %bib_sources;
907     my $sources = $editor->search_config_bib_source({id => {'!=' => undef}});
908     $bib_sources{$_->id} = $_->source for @$sources;
909
910     if($type eq 'auth') {
911         $overlay_func =~ s/bib/auth/o;
912         $auto_overlay_func = s/bib/auth/o;
913         $retrieve_func =~ s/bib/authority/o;
914         $retrieve_queue_func =~ s/bib/authority/o;
915         $update_queue_func =~ s/bib/authority/o;
916         $update_func =~ s/bib/authority/o;
917         $search_func =~ s/bib/authority/o;
918         $delete_queue_func =~ s/bib/authority/o;
919         $rec_class = 'vqar';
920     }
921
922     my $new_rec_perm_cache;
923     my @success_rec_ids;
924     for my $rec_id (@$rec_ids) {
925
926         my $error = 0;
927         my $overlay_target = $overlay_map->{$rec_id};
928
929         my $e = new_editor(xact => 1);
930         $e->requestor($requestor);
931
932         $$report_args{e} = $e;
933         $$report_args{evt} = undef;
934         $$report_args{import_error} = undef;
935         $$report_args{no_import} = 0;
936
937         my $rec = $e->$retrieve_func([
938             $rec_id,
939             {   flesh => 1,
940                 flesh_fields => { $rec_class => ['matches']},
941             }
942         ]);
943
944         unless($rec) {
945             $$report_args{evt} = $e->event;
946             finish_rec_import_attempt($report_args);
947             next;
948         }
949
950         if($rec->import_time) {
951             # if the record is already imported, that means it may have 
952             # un-imported copies.  Add to success list for later processing.
953             push(@success_rec_ids, $rec_id);
954             $e->rollback;
955             next;
956         }
957
958         $$report_args{rec} = $rec;
959         $queues{$rec->queue} = 1;
960
961         my $record;
962         my $imported = 0;
963
964         if ($type eq 'bib') {
965             # strip configured / selected MARC tags from inbound records
966
967             my $marcdoc = XML::LibXML->new->parse_string($rec->marc);
968             $rec->marc($U->strip_marc_fields($e, $marcdoc, $strip_grps));
969
970             unless ($e->$update_func($rec)) {
971                 $$report_args{evt} = $e->die_event;
972                 finish_rec_import_attempt($report_args);
973                 next;
974             }
975         }
976
977         if(defined $overlay_target) {
978             # Caller chose an explicit overlay target
979
980             my $res = $e->json_query(
981                 {
982                     from => [
983                         $overlay_func,
984                         $rec_id,
985                         $overlay_target, 
986                         $merge_profile
987                     ]
988                 }
989             );
990
991             if($res and ($res = $res->[0])) {
992
993                 if($res->{$overlay_func} eq 't') {
994                     $logger->info("vl: $type direct overlay succeeded for queued rec ".
995                         "$rec_id and overlay target $overlay_target");
996                     $imported = 1;
997                 }
998
999             } else {
1000                 $error = 1;
1001                 $logger->error("vl: Error attempting overlay with func=$overlay_func, profile=$merge_profile, record=$rec_id");
1002             }
1003
1004         } else {
1005
1006             if($auto_overlay_1match) { # overlay if there is exactly 1 match
1007
1008                 my %match_recs = map { $_->eg_record => 1 } @{$rec->matches};
1009
1010                 if( scalar(keys %match_recs) == 1) { # all matches point to the same record
1011
1012                     ($imported, $error, $rec) = try_auto_overlay(
1013                         $e, $type,
1014                         $report_args, 
1015                         $auto_overlay_best_func,
1016                         $retrieve_func,
1017                         $rec_class,
1018                         $rec_id, 
1019                         $match_quality_ratio, 
1020                         $merge_profile, 
1021                         $ft_merge_profile
1022                     );
1023                 }
1024             }
1025
1026             if(!$imported and !$error and $auto_overlay_exact and scalar(@{$rec->matches}) == 1 ) {
1027                 
1028                 # caller says to overlay if there is an /exact/ match
1029                 # $auto_overlay_func only proceeds and returns true on exact matches
1030
1031                 my $res = $e->json_query(
1032                     {
1033                         from => [
1034                             $auto_overlay_func,
1035                             $rec_id,
1036                             $merge_profile
1037                         ]
1038                     }
1039                 );
1040
1041                 if($res and ($res = $res->[0])) {
1042
1043                     if($res->{$auto_overlay_func} eq 't') {
1044                         $logger->info("vl: $type auto-overlay succeeded for queued rec $rec_id");
1045                         $imported = 1;
1046
1047                         # re-fetch the record to pick up the imported_as value from the DB
1048                         $$report_args{rec} = $rec = $e->$retrieve_func([
1049                             $rec_id, {flesh => 1, flesh_fields => {$rec_class => ['matches']}}]);
1050
1051                     } else {
1052                         $logger->info("vl: $type auto-overlay failed for queued rec $rec_id");
1053                     }
1054
1055                 } else {
1056                     $error = 1;
1057                     $logger->error("vl: Error attempting overlay with func=$auto_overlay_func, profile=$merge_profile, record=$rec_id");
1058                 }
1059             }
1060
1061             if(!$imported and !$error and $auto_overlay_best and scalar(@{$rec->matches}) > 0 ) {
1062                 # caller says to overlay the best match
1063
1064                 ($imported, $error, $rec) = try_auto_overlay(
1065                     $e, $type,
1066                     $report_args, 
1067                     $auto_overlay_best_func,
1068                     $retrieve_func,
1069                     $rec_class,
1070                     $rec_id, 
1071                     $match_quality_ratio, 
1072                     $merge_profile, 
1073                     $ft_merge_profile
1074                 );
1075             }
1076
1077             if(!$imported and !$error and $import_no_match and scalar(@{$rec->matches}) == 0) {
1078             
1079                 # No overlay / merge occurred.  Do a traditional record import by creating a new record
1080
1081                 if (!$new_rec_perm_cache) {
1082                     $new_rec_perm_cache = {};
1083
1084                     # all users creating new records are required to have the basic permission.
1085                     # if the client requests, we can enforce extra permissions for creating new records.
1086                     # for speed, check the permissions the first time then cache the result.
1087
1088                     my $perm = ($type eq 'bib') ? 'IMPORT_MARC' : 'IMPORT_AUTHORITY_MARC';
1089                     my $xperm = $$args{new_rec_perm};
1090                     my $rec_ou = $e->requestor->ws_ou;
1091
1092                     $new_rec_perm_cache->{evt} = $e->die_event
1093                         if !$e->allowed($perm, $rec_ou) || ($xperm and !$e->allowed($xperm, $rec_ou));
1094                 }
1095
1096                 if ($new_rec_perm_cache->{evt}) {
1097
1098                     # a cached event won't roll back the transaction (a la die_event), but
1099                     # the transaction will get rolled back in finish_rec_import_attempt() below
1100                     $$report_args{evt} = $new_rec_perm_cache->{evt};
1101                     $$report_args{import_error} = 'import.record.perm_failure';
1102
1103                 } else { # perm checks succeeded
1104
1105                     $logger->info("vl: creating new $type record for queued record $rec_id");
1106
1107                     if ($type eq 'bib') {
1108
1109                         $record = OpenILS::Application::Cat::BibCommon->biblio_record_xml_import(
1110                             $e, $rec->marc, $bib_sources{$rec->bib_source}, undef, 1);
1111
1112                     } else { # authority record
1113
1114                         $record = OpenILS::Application::Cat::AuthCommon->import_authority_record($e, $rec->marc); #$source);
1115                     }
1116
1117                     if($U->event_code($record)) {
1118                         $$report_args{import_error} = 'import.duplicate.tcn' 
1119                             if $record->{textcode} eq 'OPEN_TCN_NOT_FOUND';
1120                         $$report_args{evt} = $record;
1121
1122                     } else {
1123
1124                         $logger->info("vl: successfully imported new $type record");
1125                         $rec->imported_as($record->id);
1126                         $imported = 1;
1127                     }
1128                 }
1129             }
1130         }
1131
1132         if($imported) {
1133
1134             $rec->import_time('now');
1135             $rec->clear_import_error;
1136             $rec->clear_error_detail;
1137
1138             if($e->$update_func($rec)) {
1139
1140                 if($type eq 'bib') {
1141
1142                     # see if this record is linked from an acq record.
1143                     my $li = $e->search_acq_lineitem(
1144                         {queued_record => $rec->id, state => {'!=' => 'canceled'}})->[0];
1145
1146                     if ($li) { 
1147                         # if so, update the acq lineitem to point to the imported record
1148                         $li->eg_bib_id($rec->imported_as);
1149                         $$report_args{evt} = $e->die_event unless $e->update_acq_lineitem($li);
1150                     }
1151                 }
1152
1153                 push @success_rec_ids, $rec_id;
1154                 finish_rec_import_attempt($report_args);
1155
1156             } else {
1157                 $imported = 0;
1158             }
1159         }
1160
1161         if(!$imported) {
1162             $logger->info("vl: record $rec_id was not imported");
1163             $$report_args{evt} = $e->event unless $$report_args{evt};
1164             $$report_args{no_import} = 1;
1165             finish_rec_import_attempt($report_args);
1166         }
1167     }
1168
1169     # see if we need to mark any queues as complete
1170     for my $q_id (keys %queues) {
1171
1172         my $e = new_editor(xact => 1);
1173         my $remaining = $e->$search_func(
1174             [{queue => $q_id, import_time => undef}, {limit =>1}], {idlist => 1});
1175
1176         unless(@$remaining) {
1177             my $queue = $e->$retrieve_queue_func($q_id);
1178             unless($U->is_true($queue->complete)) {
1179                 $queue->complete('t');
1180                 $e->$update_queue_func($queue) or return $e->die_event;
1181                 $e->commit;
1182                 next;
1183             }
1184         } 
1185         $e->rollback;
1186     }
1187
1188     # import the copies
1189     import_record_asset_list_impl($conn, \@success_rec_ids, $requestor) if @success_rec_ids;
1190
1191     $conn->respond({total => $$report_args{total}, progress => $$report_args{progress}});
1192     return undef;
1193 }
1194
1195
1196 sub try_auto_overlay {
1197     my $e = shift;
1198     my $type = shift;
1199     my $report_args = shift;
1200     my $overlay_func  = shift;
1201     my $retrieve_func = shift; 
1202     my $rec_class = shift;
1203     my $rec_id  = shift;
1204     my $match_quality_ratio = shift;
1205     my $merge_profile  = shift;
1206     my $ft_merge_profile = shift;
1207
1208     my $imported = 0;
1209     my $error = 0;
1210
1211     # Find the best match and overlay if the quality ratio allows it.
1212     my $res = $e->json_query(
1213         {
1214             from => [
1215                 $overlay_func,
1216                 $rec_id, 
1217                 $merge_profile,
1218                 $match_quality_ratio
1219             ]
1220         }
1221     );
1222
1223     if($res and ($res = $res->[0])) {
1224
1225         if($res->{$overlay_func} eq 't') {
1226
1227             # first attempt succeeded
1228             $imported = 1;
1229
1230         } else {
1231
1232             # quality-limited merge failed with insufficient quality.  If there is a 
1233             # fall-through merge profile, re-do the merge with the alternate profile
1234             # and no quality restriction.
1235
1236             if($ft_merge_profile and $match_quality_ratio > 0) {
1237
1238                 $logger->info("vl: $type auto-merge failed with profile $merge_profile; ".
1239                     "re-merging with fall-through profile $ft_merge_profile");
1240
1241                 my $res = $e->json_query(
1242                     {
1243                         from => [
1244                             $overlay_func,
1245                             $rec_id, 
1246                             $ft_merge_profile,
1247                             0 # minimum quality not required
1248                         ]
1249                     }
1250                 );
1251
1252                 if($res and ($res = $res->[0])) {
1253
1254                     if($res->{$overlay_func} eq 't') {
1255
1256                         # second attempt succeeded
1257                         $imported = 1;
1258
1259                     } else {
1260
1261                         # failed to merge on second attempt
1262                         $logger->info("vl: $type auto-merge with fall-through failed for queued rec $rec_id");
1263                     }
1264                 } else {
1265                     
1266                     # second attempt died 
1267                     $error = 1;
1268                     $logger->error("vl: Error attempting overlay with func=$overlay_func, profile=$merge_profile, record=$rec_id");
1269                 }
1270
1271             } else { 
1272
1273                 # failed to merge on first attempt, no fall-through was provided
1274                 $$report_args{import_error} = 'overlay.record.quality' if $match_quality_ratio > 0;
1275                 $logger->info("vl: $type auto-merge failed for queued rec $rec_id");
1276             }
1277         }
1278
1279     } else {
1280
1281         # first attempt died 
1282         $error = 1;
1283         $logger->error("vl: Error attempting overlay with func=$overlay_func, profile=$merge_profile, record=$rec_id");
1284     }
1285
1286     if($imported) {
1287
1288         # at least 1 of the attempts succeeded
1289         $logger->info("vl: $type auto-merge succeeded for queued rec $rec_id");
1290
1291         # re-fetch the record to pick up the imported_as value from the DB
1292         $$report_args{rec} = $e->$retrieve_func([
1293             $rec_id, {flesh => 1, flesh_fields => {$rec_class => ['matches']}}]);
1294     }
1295
1296     return ($imported, $error, $$report_args{rec});
1297 }
1298
1299
1300 # tracks any import errors, commits the current xact, responds to the client
1301 sub finish_rec_import_attempt {
1302     my $args = shift;
1303     my $evt = $$args{evt};
1304     my $rec = $$args{rec};
1305     my $e = $$args{e};
1306
1307     my $error = $$args{import_error};
1308     $error = 'general.unknown' if $evt and not $error;
1309
1310     # error tracking
1311     if($rec) {
1312
1313         if($error or $evt) {
1314             # failed import
1315             # since an error occurred, there's no guarantee the transaction wasn't 
1316             # rolled back.  force a rollback and create a new editor.
1317             $e->rollback;
1318             $e = new_editor(xact => 1);
1319             $rec->import_error($error);
1320
1321             if($evt) {
1322                 my $detail = sprintf("%s : %s", $evt->{textcode}, substr($evt->{desc}, 0, 140));
1323                 $rec->error_detail($detail);
1324             }
1325
1326             my $method = 'update_vandelay_queued_bib_record';
1327             $method =~ s/bib/authority/ if $$args{type} eq 'auth';
1328             $e->$method($rec) and $e->commit or $e->rollback;
1329
1330         } else {
1331             # commit the successful import
1332             $e->commit;
1333         }
1334
1335     } else {
1336         # requested queued record was not found
1337         $e->rollback;
1338     }
1339         
1340     # respond to client
1341     if($$args{report_all} or ($$args{progress} % $$args{step}) == 0) {
1342         $$args{conn}->respond({
1343             total => $$args{total}, 
1344             progress => $$args{progress}, 
1345             imported => ($rec) ? $rec->id : undef,
1346             import_error => $error,
1347             no_import => $$args{no_import},
1348             err_event => $evt
1349         });
1350         $$args{step} *= 2 unless $$args{step} == 256;
1351     }
1352
1353     $$args{progress}++;
1354 }
1355
1356
1357
1358
1359
1360 __PACKAGE__->register_method(  
1361     api_name    => "open-ils.vandelay.bib_queue.owner.retrieve",
1362     method      => 'owner_queue_retrieve',
1363     api_level   => 1,
1364     argc        => 2,
1365     stream      => 1,
1366     record_type => 'bib'
1367 );
1368 __PACKAGE__->register_method(  
1369     api_name    => "open-ils.vandelay.authority_queue.owner.retrieve",
1370     method      => 'owner_queue_retrieve',
1371     api_level   => 1,
1372     argc        => 2,
1373     stream      => 1,
1374     record_type => 'auth'
1375 );
1376
1377 sub owner_queue_retrieve {
1378     my($self, $conn, $auth, $owner_id, $filters) = @_;
1379     my $e = new_editor(authtoken => $auth, xact => 1);
1380     return $e->die_event unless $e->checkauth;
1381     $owner_id = $e->requestor->id; # XXX add support for viewing other's queues?
1382     my $queues;
1383     $filters ||= {};
1384     my $search = {owner => $owner_id};
1385     $search->{$_} = $filters->{$_} for keys %$filters;
1386
1387     if($self->{record_type} eq 'bib') {
1388         $queues = $e->search_vandelay_bib_queue(
1389             [$search, {order_by => {vbq => 'evergreen.lowercase(name)'}}]);
1390     } else {
1391         $queues = $e->search_vandelay_authority_queue(
1392             [$search, {order_by => {vaq => 'evergreen.lowercase(name)'}}]);
1393     }
1394     $conn->respond($_) for @$queues;
1395     $e->rollback;
1396     return undef;
1397 }
1398
1399 __PACKAGE__->register_method(  
1400     api_name    => "open-ils.vandelay.bib_queue.delete",
1401     method      => "delete_queue",
1402     api_level   => 1,
1403     argc        => 2,
1404     record_type => 'bib'
1405 );            
1406 __PACKAGE__->register_method(  
1407     api_name    => "open-ils.vandelay.auth_queue.delete",
1408     method      => "delete_queue",
1409     api_level   => 1,
1410     argc        => 2,
1411     record_type => 'auth'
1412 );  
1413
1414 sub delete_queue {
1415     my($self, $conn, $auth, $q_id) = @_;
1416     my $e = new_editor(xact => 1, authtoken => $auth);
1417     return $e->die_event unless $e->checkauth;
1418     if($self->{record_type} eq 'bib') {
1419         return $e->die_event unless $e->allowed('CREATE_BIB_IMPORT_QUEUE');
1420         my $queue = $e->retrieve_vandelay_bib_queue($q_id)
1421             or return $e->die_event;
1422         $e->delete_vandelay_bib_queue($queue)
1423             or return $e->die_event;
1424     } else {
1425            return $e->die_event unless $e->allowed('CREATE_AUTHORITY_IMPORT_QUEUE');
1426         my $queue = $e->retrieve_vandelay_authority_queue($q_id)
1427             or return $e->die_event;
1428         $e->delete_vandelay_authority_queue($queue)
1429             or return $e->die_event;
1430     }
1431     $e->commit;
1432     return 1;
1433 }
1434
1435
1436 __PACKAGE__->register_method(  
1437     api_name    => "open-ils.vandelay.queued_bib_record.html",
1438     method      => 'queued_record_html',
1439     api_level   => 1,
1440     argc        => 2,
1441     stream      => 1,
1442     record_type => 'bib'
1443 );
1444 __PACKAGE__->register_method(  
1445     api_name    => "open-ils.vandelay.queued_authority_record.html",
1446     method      => 'queued_record_html',
1447     api_level   => 1,
1448     argc        => 2,
1449     stream      => 1,
1450     record_type => 'auth'
1451 );
1452
1453 sub queued_record_html {
1454     my($self, $conn, $auth, $rec_id) = @_;
1455     my $e = new_editor(xact=>1,authtoken => $auth);
1456     return $e->die_event unless $e->checkauth;
1457     my $rec;
1458     if($self->{record_type} eq 'bib') {
1459         $rec = $e->retrieve_vandelay_queued_bib_record($rec_id)
1460             or return $e->die_event;
1461     } else {
1462         $rec = $e->retrieve_vandelay_queued_authority_record($rec_id)
1463             or return $e->die_event;
1464     }
1465
1466     $e->rollback;
1467     return $U->simplereq(
1468         'open-ils.search',
1469         'open-ils.search.biblio.record.html', undef, 1, $rec->marc);
1470 }
1471
1472
1473 __PACKAGE__->register_method(  
1474     api_name    => "open-ils.vandelay.bib_queue.summary.retrieve", 
1475     method      => 'retrieve_queue_summary',
1476     api_level   => 1,
1477     argc        => 2,
1478     stream      => 1,
1479     record_type => 'bib'
1480 );
1481 __PACKAGE__->register_method(  
1482     api_name    => "open-ils.vandelay.auth_queue.summary.retrieve",
1483     method      => 'retrieve_queue_summary',
1484     api_level   => 1,
1485     argc        => 2,
1486     stream      => 1,
1487     record_type => 'auth'
1488 );
1489
1490 sub retrieve_queue_summary {
1491     my($self, $conn, $auth, $queue_id) = @_;
1492     my $e = new_editor(xact=>1, authtoken => $auth);
1493     return $e->die_event unless $e->checkauth;
1494
1495     my $queue;
1496     my $type = $self->{record_type};
1497     if($type eq 'bib') {
1498         $queue = $e->retrieve_vandelay_bib_queue($queue_id)
1499             or return $e->die_event;
1500     } else {
1501         $queue = $e->retrieve_vandelay_authority_queue($queue_id)
1502             or return $e->die_event;
1503     }
1504
1505     my $evt = check_queue_perms($e, $type, $queue);
1506     return $evt if $evt;
1507
1508     my $search = 'search_vandelay_queued_bib_record';
1509     $search =~ s/bib/authority/ if $type ne 'bib';
1510
1511     my $summary = {
1512         queue => $queue,
1513         total => scalar(@{$e->$search({queue => $queue_id}, {idlist=>1})}),
1514         imported => scalar(@{$e->$search({queue => $queue_id, import_time => {'!=' => undef}}, {idlist=>1})}),
1515     };
1516
1517     my $class = ($type eq 'bib') ? 'vqbr' : 'vqar';
1518     $summary->{rec_import_errors} = $e->json_query({
1519         select => {$class => [{alias => 'count', column => 'id', transform => 'count', aggregate => 1}]},
1520         from => $class,
1521         where => {queue => $queue_id, import_error => {'!=' => undef}}
1522     })->[0]->{count};
1523
1524     if($type eq 'bib') {
1525         
1526         # count of all items attached to records in the queue in question
1527         my $query = {
1528             select => {vii => [{alias => 'count', column => 'id', transform => 'count', aggregate => 1}]},
1529             from => 'vii',
1530             where => {
1531                 record => {
1532                     in => {
1533                         select => {vqbr => ['id']},
1534                         from => 'vqbr',
1535                         where => {queue => $queue_id}
1536                     }
1537                 }
1538             }
1539         };
1540         $summary->{total_items} = $e->json_query($query)->[0]->{count};
1541
1542         # count of items we attempted to import, but errored, attached to records in the queue in question
1543         $query->{where}->{import_error} = {'!=' => undef};
1544         $summary->{item_import_errors} = $e->json_query($query)->[0]->{count};
1545
1546         # count of items we successfully imported attached to records in the queue in question
1547         delete $query->{where}->{import_error};
1548         $query->{where}->{import_time} = {'!=' => undef};
1549         $summary->{total_items_imported} = $e->json_query($query)->[0]->{count};
1550     }
1551
1552     return $summary;
1553 }
1554
1555 # --------------------------------------------------------------------------------
1556 # Given a list of queued record IDs, imports all items attached to those records
1557 # --------------------------------------------------------------------------------
1558 sub import_record_asset_list_impl {
1559     my($conn, $rec_ids, $requestor) = @_;
1560
1561     my $roe = new_editor(xact=> 1, requestor => $requestor);
1562
1563     # for speed, filter out any records have not been 
1564     # imported or have no import items to load
1565     $rec_ids = $roe->json_query({
1566         select => {vqbr => ['id']},
1567         from => {vqbr => 'vii'},
1568         where => {'+vqbr' => {
1569             id => $rec_ids,
1570             import_time => {'!=' => undef}
1571         }},
1572         distinct => 1
1573     });
1574     $rec_ids = [map {$_->{id}} @$rec_ids];
1575
1576     my $report_args = {
1577         conn => $conn,
1578         total => scalar(@$rec_ids),
1579         step => 1, # how often to respond
1580         progress => 1,
1581         in_count => 0,
1582     };
1583
1584     for my $rec_id (@$rec_ids) {
1585         my $rec = $roe->retrieve_vandelay_queued_bib_record($rec_id);
1586         my $item_ids = $roe->search_vandelay_import_item(
1587             {record => $rec->id, import_error => undef}, 
1588             {idlist=>1}
1589         );
1590
1591         for my $item_id (@$item_ids) {
1592             my $e = new_editor(requestor => $requestor, xact => 1);
1593             my $item = $e->retrieve_vandelay_import_item($item_id);
1594             my ($copy, $vol, $evt);
1595
1596             $$report_args{import_item} = $item;
1597             $$report_args{e} = $e;
1598             $$report_args{import_error} = undef;
1599             $$report_args{evt} = undef;
1600
1601             if (my $copy_id = $item->internal_id) { # assignment
1602                 # copy matches an existing copy.  Overlay instead of create.
1603
1604                 $logger->info("vl: performing copy overlay for internal_id=$copy_id");
1605
1606                 my $qt = $e->json_query({
1607                     select => {vbq => ['queue_type']},
1608                     from => {vqbr => 'vbq'},
1609                     where => {'+vqbr' => {id => $rec_id}}
1610                 })->[0]->{queue_type};
1611
1612                 if ($qt eq 'acq') {
1613                     # internal_id for ACQ queues refers to acq.lineitem_detail.id
1614                     # pull the real copy id from the acq LID
1615
1616                     my $lid = $e->retrieve_acq_lineitem_detail($copy_id);
1617                     if (!$lid) {
1618                         $$report_args{evt} = $e->die_event;
1619                         respond_with_status($report_args);
1620                         next;
1621                     }
1622                     $copy_id = $lid->eg_copy_id;
1623                     $logger->info("vl: performing ACQ copy overlay for copy $copy_id");
1624                 }
1625
1626                 $copy = $e->search_asset_copy([
1627                     {id => $copy_id, deleted => 'f'},
1628                     {flesh => 1, flesh_fields => {acp => ['call_number']}}
1629                 ])->[0];
1630
1631                 if (!$copy) {
1632                     $$report_args{evt} = $e->die_event;
1633                     respond_with_status($report_args);
1634                     next;
1635                 }
1636
1637                 # prevent update of unrelated copies
1638                 if ($copy->call_number->record != $rec->imported_as) {
1639                     $logger->info("vl: attempt to overlay unrelated copy=$copy_id; rec=".$rec->imported_as);
1640
1641                     $evt = OpenILS::Event->new('INVALID_IMPORT_COPY_ID', 
1642                         note => 'Cannot overlay copies for unlinked bib',
1643                         bre => $rec->imported_as, 
1644                         copy_id => $copy_id
1645                     );
1646                     $$report_args{evt} = $evt;
1647                     respond_with_status($report_args);
1648                     next;
1649                 }
1650
1651                 # overlaying copies requires an extra permission
1652                 if (!$e->allowed("IMPORT_OVERLAY_COPY", $copy->call_number->owning_lib)) {
1653                     $$report_args{evt} = $e->die_event;
1654                     respond_with_status($report_args);
1655                     next;
1656                 }
1657
1658                 # are we updating the call-number?
1659                 if ($item->call_number and $item->call_number ne $copy->call_number->label) {
1660
1661                     my $count = $e->json_query({
1662                         select => {acp => [{
1663                             alias => 'count', 
1664                             column => 'id', 
1665                             transform => 'count', 
1666                             aggregate => 1
1667                         }]},
1668                         from => 'acp',
1669                         where => {
1670                             deleted => 'f',
1671                             call_number => $copy->call_number->id
1672                         }
1673                     })->[0]->{count};
1674
1675                     if ($count == 1) {
1676                         # if this is the only copy attached to this 
1677                         # callnumber, just update the callnumber
1678
1679                         $logger->info("vl: updating callnumber label in copy overlay");
1680
1681                         $copy->call_number->label($item->call_number);
1682                         if (!$e->update_asset_call_number($copy->call_number)) {
1683                             $$report_args{evt} = $e->die_event;
1684                             respond_with_status($report_args);
1685                             next;
1686                         }
1687
1688                     } else {
1689
1690                         # otherwise, move the copy to a new/existing 
1691                         # call-number with the given label/owner
1692                         # note that overlay does not allow the owning_lib 
1693                         # to be changed.  Should it?
1694
1695                         $logger->info("vl: moving copy to new callnumber in copy overlay");
1696
1697                         ($vol, $evt) =
1698                             OpenILS::Application::Cat::AssetCommon->find_or_create_volume(
1699                                 $e, $item->call_number, 
1700                                 $copy->call_number->record, 
1701                                 $copy->call_number->owning_lib
1702                             );
1703
1704                         if($evt) {
1705                             $$report_args{evt} = $evt;
1706                             respond_with_status($report_args);
1707                             next;
1708                         }
1709
1710                         $copy->call_number($vol);
1711                     }
1712                 } # cn-update
1713
1714                 # for every field that has a non-'' value, overlay the copy value
1715                 foreach (qw/ barcode location circ_lib status 
1716                     circulate deposit deposit_amount ref holdable 
1717                     price circ_as_type alert_message opac_visible circ_modifier/) {
1718
1719                     my $val = $item->$_();
1720                     $copy->$_($val) if $val and $val ne '';
1721                 }
1722
1723                 # de-flesh for update
1724                 $copy->call_number($copy->call_number->id);
1725                 $copy->ischanged(1);
1726
1727                 $evt = OpenILS::Application::Cat::AssetCommon->
1728                     update_fleshed_copies($e, {all => 1}, undef, [$copy]);
1729
1730                 if($evt) {
1731                     $$report_args{evt} = $evt;
1732                     respond_with_status($report_args);
1733                     next;
1734                 }
1735
1736             } else { 
1737
1738                 # Creating a new copy
1739                 $logger->info("vl: creating new copy in import");
1740
1741                 # --------------------------------------------------------------------------------
1742                 # Find or create the volume
1743                 # --------------------------------------------------------------------------------
1744                 my ($vol, $evt) =
1745                     OpenILS::Application::Cat::AssetCommon->find_or_create_volume(
1746                         $e, $item->call_number, $rec->imported_as, $item->owning_lib);
1747
1748                 if($evt) {
1749                     $$report_args{evt} = $evt;
1750                     respond_with_status($report_args);
1751                     next;
1752                 }
1753
1754                 # --------------------------------------------------------------------------------
1755                 # Create the new copy
1756                 # --------------------------------------------------------------------------------
1757                 $copy = Fieldmapper::asset::copy->new;
1758                 $copy->loan_duration(2);
1759                 $copy->fine_level(2);
1760                 $copy->barcode($item->barcode);
1761                 $copy->location($item->location);
1762                 $copy->circ_lib($item->circ_lib || $item->owning_lib);
1763                 $copy->status( defined($item->status) ? $item->status : OILS_COPY_STATUS_IN_PROCESS );
1764                 $copy->circulate($item->circulate);
1765                 $copy->deposit($item->deposit);
1766                 $copy->deposit_amount($item->deposit_amount);
1767                 $copy->ref($item->ref);
1768                 $copy->holdable($item->holdable);
1769                 $copy->price($item->price);
1770                 $copy->circ_as_type($item->circ_as_type);
1771                 $copy->alert_message($item->alert_message);
1772                 $copy->opac_visible($item->opac_visible);
1773                 $copy->circ_modifier($item->circ_modifier);
1774
1775                 # --------------------------------------------------------------------------------
1776                 # Check for dupe barcode
1777                 # --------------------------------------------------------------------------------
1778                 if($evt = OpenILS::Application::Cat::AssetCommon->create_copy($e, $vol, $copy)) {
1779                     $$report_args{evt} = $evt;
1780                     $$report_args{import_error} = 'import.item.duplicate.barcode'
1781                         if $evt->{textcode} eq 'ITEM_BARCODE_EXISTS';
1782                     respond_with_status($report_args);
1783                     next;
1784                 }
1785
1786                 # --------------------------------------------------------------------------------
1787                 # create copy notes
1788                 # --------------------------------------------------------------------------------
1789                 $evt = OpenILS::Application::Cat::AssetCommon->create_copy_note(
1790                     $e, $copy, '', $item->pub_note, 1) if $item->pub_note;
1791
1792                 if($evt) {
1793                     $$report_args{evt} = $evt;
1794                     respond_with_status($report_args);
1795                     next;
1796                 }
1797
1798                 $evt = OpenILS::Application::Cat::AssetCommon->create_copy_note(
1799                     $e, $copy, '', $item->priv_note) if $item->priv_note;
1800
1801                 if($evt) {
1802                     $$report_args{evt} = $evt;
1803                     respond_with_status($report_args);
1804                     next;
1805                 }
1806             }
1807
1808             # set the import data on the import item
1809             $item->imported_as($copy->id); # $copy->id is set by create_copy() ^--
1810             $item->import_time('now');
1811
1812             unless($e->update_vandelay_import_item($item)) {
1813                 $$report_args{evt} = $e->die_event;
1814                 respond_with_status($report_args);
1815                 next;
1816             }
1817
1818             # --------------------------------------------------------------------------------
1819             # Item import succeeded
1820             # --------------------------------------------------------------------------------
1821             $e->commit;
1822             $$report_args{in_count}++;
1823             respond_with_status($report_args);
1824             $logger->info("vl: successfully imported item " . $item->barcode);
1825         }
1826     }
1827
1828     $roe->rollback;
1829     return undef;
1830 }
1831
1832
1833 sub respond_with_status {
1834     my $args = shift;
1835     my $e = $$args{e};
1836
1837     #  If the import failed, track the failure reason
1838
1839     my $error = $$args{import_error};
1840     my $evt = $$args{evt};
1841
1842     if($error or $evt) {
1843
1844         my $item = $$args{import_item};
1845         $logger->info("vl: unable to import item " . $item->barcode);
1846
1847         $error ||= 'general.unknown';
1848         $item->import_error($error);
1849
1850         if($evt) {
1851             my $detail = sprintf("%s : %s", $evt->{textcode}, substr($evt->{desc}, 0, 140));
1852             $item->error_detail($detail);
1853         }
1854
1855         # state of the editor is unknown at this point.  Force a rollback and start over.
1856         $e->rollback;
1857         $e = new_editor(xact => 1);
1858         $e->update_vandelay_import_item($item);
1859         $e->commit;
1860     }
1861
1862     if($$args{report_all} or ($$args{progress} % $$args{step}) == 0) {
1863         $$args{conn}->respond({
1864             total => $$args{total},
1865             progress => $$args{progress},
1866             success_count => $$args{success_count},
1867             err_event => $evt
1868         });
1869         $$args{step} *= 2 unless $$args{step} == 256;
1870     }
1871
1872     $$args{progress}++;
1873 }
1874
1875 __PACKAGE__->register_method(  
1876     api_name    => "open-ils.vandelay.match_set.get_tree",
1877     method      => "match_set_get_tree",
1878     api_level   => 1,
1879     argc        => 2,
1880     signature   => {
1881         desc    => q/For a given vms object, return a tree of match set points
1882                     represented by a vmsp object with recursively fleshed
1883                     children./
1884     }
1885 );
1886
1887 sub match_set_get_tree {
1888     my ($self, $conn, $authtoken, $match_set_id) = @_;
1889
1890     $match_set_id = int($match_set_id) or return;
1891
1892     my $e = new_editor("authtoken" => $authtoken);
1893     $e->checkauth or return $e->die_event;
1894
1895     my $set = $e->retrieve_vandelay_match_set($match_set_id) or
1896         return $e->die_event;
1897
1898     $e->allowed("ADMIN_IMPORT_MATCH_SET", $set->owner) or
1899         return $e->die_event;
1900
1901     my $tree = $e->search_vandelay_match_set_point([
1902         {"match_set" => $match_set_id, "parent" => undef},
1903         {"flesh" => -1, "flesh_fields" => {"vmsp" => ["children"]}}
1904     ]) or return $e->die_event;
1905
1906     return pop @$tree;
1907 }
1908
1909
1910 __PACKAGE__->register_method(
1911     api_name    => "open-ils.vandelay.match_set.update",
1912     method      => "match_set_update_tree",
1913     api_level   => 1,
1914     argc        => 3,
1915     signature   => {
1916         desc => q/Replace any vmsp objects associated with a given (by ID) vms
1917                 with the given objects (recursively fleshed vmsp tree)./
1918     }
1919 );
1920
1921 sub _walk_new_vmsp {
1922     my ($e, $match_set_id, $node, $parent_id) = @_;
1923
1924     my $point = new Fieldmapper::vandelay::match_set_point;
1925     $point->parent($parent_id);
1926     $point->match_set($match_set_id);
1927     $point->$_($node->$_) for (qw/bool_op svf tag subfield negate quality/);
1928
1929     $e->create_vandelay_match_set_point($point) or return $e->die_event;
1930
1931     $parent_id = $e->data->id;
1932     if ($node->children && @{$node->children}) {
1933         for (@{$node->children}) {
1934             return $e->die_event if
1935                 _walk_new_vmsp($e, $match_set_id, $_, $parent_id);
1936         }
1937     }
1938
1939     return;
1940 }
1941
1942 sub match_set_update_tree {
1943     my ($self, $conn, $authtoken, $match_set_id, $tree) = @_;
1944
1945     my $e = new_editor("xact" => 1, "authtoken" => $authtoken);
1946     $e->checkauth or return $e->die_event;
1947
1948     my $set = $e->retrieve_vandelay_match_set($match_set_id) or
1949         return $e->die_event;
1950
1951     $e->allowed("ADMIN_IMPORT_MATCH_SET", $set->owner) or
1952         return $e->die_event;
1953
1954     my $existing = $e->search_vandelay_match_set_point([
1955         {"match_set" => $match_set_id},
1956         {"order_by" => {"vmsp" => "id DESC"}}
1957     ]) or return $e->die_event;
1958
1959     # delete points, working up from leaf points to the root
1960     while(@$existing) {
1961         for my $point (shift @$existing) {
1962             if( grep {$_->parent eq $point->id} @$existing) {
1963                 push(@$existing, $point);
1964             } else {
1965                 $e->delete_vandelay_match_set_point($point) or return $e->die_event;
1966             }
1967         }
1968     }
1969
1970     _walk_new_vmsp($e, $match_set_id, $tree);
1971
1972     $e->commit or return $e->die_event;
1973 }
1974
1975 __PACKAGE__->register_method(  
1976     api_name    => 'open-ils.vandelay.bib_queue.to_bucket',
1977     method      => 'bib_queue_to_bucket',
1978     api_level   => 1,
1979     argc        => 2,
1980     signature   => {
1981         desc    => q/Add to or create a new bib container (bucket) with the successfully 
1982                     imported contents of a vandelay queue.  Any user that has Vandelay 
1983                     queue create permissions can append or create buckets from his-her own queues./,
1984         params  => [
1985             {desc => 'Authtoken', type => 'string'},
1986             {desc => 'Queue ID', type => 'number'},
1987             {desc => 'Bucket Name', type => 'string'}
1988         ],
1989         return  => {desc => q/
1990             {bucket => $bucket, addcount => number-of-items-added-to-bucket, item_count => total-bucket-item-count} on success,
1991             {add_count => 0} if there is nothing to do, and Event on error/}
1992     }
1993 );
1994
1995 sub bib_queue_to_bucket {
1996     my ($self, $conn, $auth, $q_id, $bucket_name) = @_;
1997
1998     my $e = new_editor(xact => 1, authtoken => $auth);
1999     return $e->die_event unless $e->checkauth;
2000     
2001     my $queue = $e->retrieve_vandelay_bib_queue($q_id)
2002         or return $e->die_event;
2003
2004     return OpenILS::Event->new('BAD_PARAMS', 
2005         note => q/Bucket creator must be queue owner/)
2006         unless $queue->owner == $e->requestor->id;
2007
2008     # find the bib IDs that will go into the bucket
2009     my $bib_ids = $e->json_query({
2010         select => {vqbr => ['imported_as']},
2011         from => 'vqbr',
2012         where => {queue => $q_id, imported_as => {'!=' => undef}}
2013     });
2014
2015     if (!@$bib_ids) { # no records to add
2016         $e->rollback;
2017         return {add_count => 0};
2018     }
2019
2020     # allow user to add to an existing bucket by name
2021     my $bucket = $e->search_container_biblio_record_entry_bucket({
2022         owner => $e->requestor->id, 
2023         name => $bucket_name,
2024         btype => 'vandelay_queue'
2025     })->[0];
2026
2027     # if the bucket does not exist, create a new one
2028     if (!$bucket) { 
2029         $bucket = Fieldmapper::container::biblio_record_entry_bucket->new;
2030         $bucket->name($bucket_name);
2031         $bucket->owner($e->requestor->id);
2032         $bucket->btype('vandelay_queue');
2033
2034         $e->create_container_biblio_record_entry_bucket($bucket)
2035             or return $e->die_event;
2036     }
2037
2038     # create the new bucket items
2039     for my $bib_id ( map {$_->{imported_as}} @$bib_ids ) {
2040         my $item = Fieldmapper::container::biblio_record_entry_bucket_item->new;
2041         $item->target_biblio_record_entry($bib_id);
2042         $item->bucket($bucket->id);
2043         $e->create_container_biblio_record_entry_bucket_item($item)
2044             or return $e->die_event;
2045     }
2046
2047     # re-fetch the bucket to pick up the correct create_time
2048     $bucket = $e->retrieve_container_biblio_record_entry_bucket($bucket->id)
2049         or return $e->die_event;
2050
2051     # get the total count of items in this bucket
2052     my $count = $e->json_query({
2053         select => {cbrebi => [{
2054             aggregate =>  1,
2055             transform => 'count',
2056             alias => 'count',
2057             column => 'id'
2058         }]},
2059         from => 'cbrebi',
2060         where => {bucket => $bucket->id}
2061     })->[0];
2062
2063     $e->commit;
2064
2065     return {
2066         bucket => $bucket, 
2067         add_count => scalar(@$bib_ids), # items added to the bucket
2068         item_count => $count->{count} # total items in buckets
2069     };
2070 }
2071
2072 1;