Prevent MARC batch import from blindly trusting the user by checking LDR/06
[working/Evergreen.git] / Open-ILS / src / perlmods / OpenILS / Application / Vandelay.pm
1 package OpenILS::Application::Vandelay;
2 use strict; use warnings;
3 use OpenILS::Application;
4 use base qw/OpenILS::Application/;
5 use Unicode::Normalize;
6 use OpenSRF::EX qw/:try/;
7 use OpenSRF::AppSession;
8 use OpenSRF::Utils::SettingsClient;
9 use OpenSRF::Utils::Cache;
10 use OpenILS::Utils::Fieldmapper;
11 use OpenILS::Utils::CStoreEditor qw/:funcs/;
12 use MARC::Batch;
13 use MARC::Record;
14 use MARC::File::XML;
15 use OpenILS::Utils::Fieldmapper;
16 use Time::HiRes qw(time);
17 use OpenSRF::Utils::Logger qw/$logger/;
18 use MIME::Base64;
19 use OpenILS::Const qw/:const/;
20 use OpenILS::Application::AppUtils;
21 use OpenILS::Application::Cat::BibCommon;
22 use OpenILS::Application::Cat::AuthCommon;
23 use OpenILS::Application::Cat::AssetCommon;
24 my $U = 'OpenILS::Application::AppUtils';
25
26 # A list of LDR/06 values from http://loc.gov/marc
27 my %record_types = (
28         a => 'bib',
29         c => 'bib',
30         d => 'bib',
31         e => 'bib',
32         f => 'bib',
33         g => 'bib',
34         i => 'bib',
35         j => 'bib',
36         k => 'bib',
37         m => 'bib',
38         o => 'bib',
39         p => 'bib',
40         r => 'bib',
41         t => 'bib',
42         u => 'holdings',
43         v => 'holdings',
44         x => 'holdings',
45         y => 'holdings',
46         z => 'auth',
47 );
48
49 sub initialize {}
50 sub child_init {}
51
52 # --------------------------------------------------------------------------------
53 # Biblio ingest
54
55 sub create_bib_queue {
56         my $self = shift;
57         my $client = shift;
58         my $auth = shift;
59         my $name = shift;
60         my $owner = shift;
61         my $type = shift;
62         my $import_def = shift;
63
64         my $e = new_editor(authtoken => $auth, xact => 1);
65
66         return $e->die_event unless $e->checkauth;
67         return $e->die_event unless $e->allowed('CREATE_BIB_IMPORT_QUEUE');
68     $owner ||= $e->requestor->id;
69
70     return OpenILS::Event->new('BIB_QUEUE_EXISTS') 
71         if $e->search_vandelay_bib_queue(
72             {name => $name, owner => $owner, queue_type => $type})->[0];
73
74         my $queue = new Fieldmapper::vandelay::bib_queue();
75         $queue->name( $name );
76         $queue->owner( $owner );
77         $queue->queue_type( $type ) if ($type);
78         $queue->item_attr_def( $import_def ) if ($import_def);
79
80         my $new_q = $e->create_vandelay_bib_queue( $queue );
81         return $e->die_event unless ($new_q);
82         $e->commit;
83
84     return $new_q;
85 }
86 __PACKAGE__->register_method(  
87         api_name        => "open-ils.vandelay.bib_queue.create",
88         method          => "create_bib_queue",
89         api_level       => 1,
90         argc            => 4,
91 );                      
92
93
94 sub create_auth_queue {
95         my $self = shift;
96         my $client = shift;
97         my $auth = shift;
98         my $name = shift;
99         my $owner = shift;
100         my $type = shift;
101
102         my $e = new_editor(authtoken => $auth, xact => 1);
103
104         return $e->die_event unless $e->checkauth;
105         return $e->die_event unless $e->allowed('CREATE_AUTHORITY_IMPORT_QUEUE');
106     $owner ||= $e->requestor->id;
107
108     return OpenILS::Event->new('AUTH_QUEUE_EXISTS') 
109         if $e->search_vandelay_bib_queue(
110             {name => $name, owner => $owner, queue_type => $type})->[0];
111
112         my $queue = new Fieldmapper::vandelay::authority_queue();
113         $queue->name( $name );
114         $queue->owner( $owner );
115         $queue->queue_type( $type ) if ($type);
116
117         my $new_q = $e->create_vandelay_authority_queue( $queue );
118         $e->die_event unless ($new_q);
119         $e->commit;
120
121     return $new_q;
122 }
123 __PACKAGE__->register_method(  
124         api_name        => "open-ils.vandelay.authority_queue.create",
125         method          => "create_auth_queue",
126         api_level       => 1,
127         argc            => 3,
128 );                      
129
130 sub add_record_to_bib_queue {
131         my $self = shift;
132         my $client = shift;
133         my $auth = shift;
134         my $queue = shift;
135         my $marc = shift;
136         my $purpose = shift;
137     my $bib_source = shift;
138
139         my $e = new_editor(authtoken => $auth, xact => 1);
140
141         $queue = $e->retrieve_vandelay_bib_queue($queue);
142
143         return $e->die_event unless $e->checkauth;
144         return $e->die_event unless
145                 ($e->allowed('CREATE_BIB_IMPORT_QUEUE', undef, $queue) ||
146                  $e->allowed('CREATE_BIB_IMPORT_QUEUE'));
147
148         my $new_rec = _add_bib_rec($e, $marc, $queue->id, $purpose, $bib_source);
149
150         return $e->die_event unless ($new_rec);
151         $e->commit;
152     return $new_rec;
153 }
154 __PACKAGE__->register_method(  
155         api_name        => "open-ils.vandelay.queued_bib_record.create",
156         method          => "add_record_to_bib_queue",
157         api_level       => 1,
158         argc            => 3,
159 );                      
160
161 sub _add_bib_rec {
162         my $e = shift;
163         my $marc = shift;
164         my $queue = shift;
165         my $purpose = shift;
166     my $bib_source = shift;
167
168         my $rec = new Fieldmapper::vandelay::queued_bib_record();
169         $rec->marc( $marc );
170         $rec->queue( $queue );
171         $rec->purpose( $purpose ) if ($purpose);
172     $rec->bib_source($bib_source);
173
174         return $e->create_vandelay_queued_bib_record( $rec );
175 }
176
177 sub add_record_to_authority_queue {
178         my $self = shift;
179         my $client = shift;
180         my $auth = shift;
181         my $queue = shift;
182         my $marc = shift;
183         my $purpose = shift;
184
185         my $e = new_editor(authtoken => $auth, xact => 1);
186
187         $queue = $e->retrieve_vandelay_authority_queue($queue);
188
189         return $e->die_event unless $e->checkauth;
190         return $e->die_event unless
191                 ($e->allowed('CREATE_AUTHORITY_IMPORT_QUEUE', undef, $queue) ||
192                  $e->allowed('CREATE_AUTHORITY_IMPORT_QUEUE'));
193
194         my $new_rec = _add_auth_rec($e, $marc, $queue->id, $purpose);
195
196         return $e->die_event unless ($new_rec);
197         $e->commit;
198     return $new_rec;
199 }
200 __PACKAGE__->register_method(
201         api_name        => "open-ils.vandelay.queued_authority_record.create",
202         method          => "add_record_to_authority_queue",
203         api_level       => 1,
204         argc            => 3,
205 );
206
207 sub _add_auth_rec {
208         my $e = shift;
209         my $marc = shift;
210         my $queue = shift;
211     my $purpose = shift;
212
213         my $rec = new Fieldmapper::vandelay::queued_authority_record();
214         $rec->marc( $marc );
215         $rec->queue( $queue );
216         $rec->purpose( $purpose ) if ($purpose);
217
218         return $e->create_vandelay_queued_authority_record( $rec );
219 }
220
221 sub process_spool {
222         my $self = shift;
223         my $client = shift;
224         my $auth = shift;
225         my $fingerprint = shift || '';
226         my $queue_id = shift;
227     my $purpose = shift;
228     my $filename = shift;
229     my $bib_source = shift;
230
231         my $e = new_editor(authtoken => $auth, xact => 1);
232     return $e->die_event unless $e->checkauth;
233
234     my $queue;
235     my $type = $self->{record_type};
236
237     if($type eq 'bib') {
238         $queue = $e->retrieve_vandelay_bib_queue($queue_id) or return $e->die_event;
239     } else {
240         $queue = $e->retrieve_vandelay_authority_queue($queue_id) or return $e->die_event;
241     }
242
243     my $evt = check_queue_perms($e, $type, $queue);
244     return $evt if $evt;
245
246     my $cache = new OpenSRF::Utils::Cache();
247
248     if($fingerprint) {
249         my $data = $cache->get_cache('vandelay_import_spool_' . $fingerprint);
250             $purpose = $data->{purpose};
251         $filename = $data->{path};
252         $bib_source = $data->{bib_source};
253     }
254
255     unless(-r $filename) {
256         $logger->error("unable to read MARC file $filename");
257         return -1; # make this an event XXX
258     }
259
260     $logger->info("vandelay spooling $fingerprint purpose=$purpose file=$filename");
261
262     my $marctype = 'USMARC'; 
263
264     open F, $filename;
265     $marctype = 'XML' if (getc(F) =~ /^\D/o);
266     close F;
267
268         my $batch = new MARC::Batch ($marctype, $filename);
269         $batch->strict_off;
270
271         my $response_scale = 10;
272         my $count = 0;
273         my $r = -1;
274         while (try { $r = $batch->next } otherwise { $r = -1 }) {
275                 if ($r == -1) {
276                         $logger->warn("Proccessing of record $count in set $filename failed.  Skipping this record");
277                         $count++;
278                 }
279
280                 $logger->info("processing record $count");
281
282                 try {
283                         (my $xml = $r->as_xml_record()) =~ s/\n//sog;
284                         $xml =~ s/^<\?xml.+\?\s*>//go;
285                         $xml =~ s/>\s+</></go;
286                         $xml =~ s/\p{Cc}//go;
287                         $xml = $U->entityize($xml);
288                         $xml =~ s/[\x00-\x1f]//go;
289
290             my $qrec;
291             # Check the leader to ensure we've got something resembling the expected
292             # Allow spaces to give records the benefit of the doubt
293             my $ldr_type = substr($r->leader(), 6, 1);
294             if ($type eq 'bib' && ($record_types{$ldr_type}) eq 'bib') {
295                 $qrec = _add_bib_rec( $e, $xml, $queue_id, $purpose, $bib_source ) or return $e->die_event;
296             } elsif ($type eq 'auth' && ($record_types{$ldr_type}) eq 'auth') {
297                 $qrec = _add_auth_rec( $e, $xml, $queue_id, $purpose ) or return $e->die_event;
298             } else {
299                 # I don't know how to handle this type; rock on
300                 $logger->error("In process_spool(), type was $type and leader type was $ldr_type ; not currently supported");
301                 next;
302             }
303
304             if($self->api_name =~ /stream_results/ and $qrec) {
305                             $client->respond($qrec->id)
306             } else {
307                             $client->respond($count) if (++$count % $response_scale) == 0;
308                             $response_scale *= 10 if ($count == ($response_scale * 10));
309             }
310                 } catch Error with {
311                         my $error = shift;
312                         $logger->warn("Encountered a bad record at Vandelay ingest: ".$error);
313                 }
314         }
315
316         $e->commit;
317     unlink($filename);
318     $cache->delete_cache('vandelay_import_spool_' . $fingerprint) if $fingerprint;
319         return $count;
320 }
321
322 __PACKAGE__->register_method(  
323         api_name        => "open-ils.vandelay.bib.process_spool",
324         method          => "process_spool",
325         api_level       => 1,
326         argc            => 3,
327         record_type     => 'bib'
328 );                      
329 __PACKAGE__->register_method(  
330         api_name        => "open-ils.vandelay.auth.process_spool",
331         method          => "process_spool",
332         api_level       => 1,
333         argc            => 3,
334         record_type     => 'auth'
335 );                      
336
337 __PACKAGE__->register_method(  
338         api_name        => "open-ils.vandelay.bib.process_spool.stream_results",
339         method          => "process_spool",
340         api_level       => 1,
341         argc            => 3,
342     stream      => 1,
343         record_type     => 'bib'
344 );                      
345 __PACKAGE__->register_method(  
346         api_name        => "open-ils.vandelay.auth.process_spool.stream_results",
347         method          => "process_spool",
348         api_level       => 1,
349         argc            => 3,
350     stream      => 1,
351         record_type     => 'auth'
352 );
353
354 __PACKAGE__->register_method(  
355         api_name        => "open-ils.vandelay.bib_queue.records.retrieve",
356         method          => 'retrieve_queued_records',
357         api_level       => 1,
358         argc            => 2,
359     stream      => 1,
360         record_type     => 'bib'
361 );
362 __PACKAGE__->register_method(  
363         api_name        => "open-ils.vandelay.auth_queue.records.retrieve",
364         method          => 'retrieve_queued_records',
365         api_level       => 1,
366         argc            => 2,
367     stream      => 1,
368         record_type     => 'auth'
369 );
370
371 __PACKAGE__->register_method(  
372         api_name        => "open-ils.vandelay.bib_queue.records.matches.retrieve",
373         method          => 'retrieve_queued_records',
374         api_level       => 1,
375         argc            => 2,
376     stream      => 1,
377         record_type     => 'bib',
378     signature   => {
379         desc => q/Only retrieve queued bib records that have matches against existing records/
380     }
381 );
382 __PACKAGE__->register_method(  
383         api_name        => "open-ils.vandelay.auth_queue.records.matches.retrieve",
384         method          => 'retrieve_queued_records',
385         api_level       => 1,
386         argc            => 2,
387     stream      => 1,
388         record_type     => 'auth',
389     signature   => {
390         desc => q/Only retrieve queued authority records that have matches against existing records/
391     }
392
393 );
394
395 sub retrieve_queued_records {
396     my($self, $conn, $auth, $queue_id, $options) = @_;
397     my $e = new_editor(authtoken => $auth);
398     return $e->event unless $e->checkauth;
399     $options ||= {};
400     my $limit = $$options{limit} || 20;
401     my $offset = $$options{offset} || 0;
402
403     my $type = $self->{record_type};
404     my $queue;
405     if($type eq 'bib') {
406         $queue = $e->retrieve_vandelay_bib_queue($queue_id) or return $e->die_event;
407     } else {
408         $queue = $e->retrieve_vandelay_authority_queue($queue_id) or return $e->die_event;
409     }
410     my $evt = check_queue_perms($e, $type, $queue);
411     return $evt if $evt;
412
413     my $class = ($type eq 'bib') ? 'vqbr' : 'vqar';
414     my $search = ($type eq 'bib') ? 
415         'search_vandelay_queued_bib_record' : 'search_vandelay_queued_authority_record';
416     my $retrieve = ($type eq 'bib') ? 
417         'retrieve_vandelay_queued_bib_record' : 'retrieve_vandelay_queued_authority_record';
418
419     my $filter = ($$options{non_imported}) ? {import_time => undef} : {};
420
421     my $record_ids;
422     if($self->api_name =~ /matches/) {
423         # fetch only matched records
424         $record_ids = queued_records_with_matches($e, $type, $queue_id, $limit, $offset, $filter);
425     } else {
426         # fetch all queue records
427         $record_ids = $e->$search([
428                 {queue => $queue_id, %$filter}, 
429                 {order_by => {$class => 'id'}, limit => $limit, offset => $offset}
430             ],
431             {idlist => 1}
432         );
433     }
434
435
436     for my $rec_id (@$record_ids) {
437         my $params = {   
438             flesh => 1,
439             flesh_fields => {$class => ['attributes', 'matches']},
440         };
441         my $rec = $e->$retrieve([$rec_id, $params]);
442         $rec->clear_marc if $$options{clear_marc};
443         $conn->respond($rec);
444     }
445     return undef;
446 }
447
448 sub check_queue_perms {
449     my($e, $type, $queue) = @_;
450         if ($type eq 'bib') {
451                 return $e->die_event unless
452                         ($e->allowed('CREATE_BIB_IMPORT_QUEUE', undef, $queue) ||
453                          $e->allowed('CREATE_BIB_IMPORT_QUEUE'));
454         } else {
455                 return $e->die_event unless
456                         ($e->allowed('CREATE_AUTHORITY_IMPORT_QUEUE', undef, $queue) ||
457                          $e->allowed('CREATE_AUTHORITY_IMPORT_QUEUE'));
458         }
459
460     return undef;
461 }
462
463 __PACKAGE__->register_method(  
464         api_name        => "open-ils.vandelay.bib_record.list.import",
465         method          => 'import_record_list',
466         api_level       => 1,
467         argc            => 2,
468     stream      => 1,
469         record_type     => 'bib'
470 );
471
472 __PACKAGE__->register_method(  
473         api_name        => "open-ils.vandelay.auth_record.list.import",
474         method          => 'import_record_list',
475         api_level       => 1,
476         argc            => 2,
477     stream      => 1,
478         record_type     => 'auth'
479 );
480
481 sub import_record_list {
482     my($self, $conn, $auth, $rec_ids, $args) = @_;
483     my $e = new_editor(authtoken => $auth);
484     return $e->event unless $e->checkauth;
485     $args ||= {};
486     my $err = import_record_list_impl($self, $conn, $rec_ids, $e->requestor, $args);
487     return $err if $err;
488     return {complete => 1};
489 }
490
491
492 __PACKAGE__->register_method(  
493         api_name        => "open-ils.vandelay.bib_queue.import",
494         method          => 'import_queue',
495         api_level       => 1,
496         argc            => 2,
497     stream      => 1,
498         record_type     => 'bib'
499 );
500
501 __PACKAGE__->register_method(  
502         api_name        => "open-ils.vandelay.auth_queue.import",
503         method          => 'import_queue',
504         api_level       => 1,
505         argc            => 2,
506     stream      => 1,
507         record_type     => 'auth'
508 );
509 __PACKAGE__->register_method(  
510         api_name        => "open-ils.vandelay.bib_queue.nomatch.import",
511         method          => 'import_queue',
512         api_level       => 1,
513         argc            => 2,
514     stream      => 1,
515     signature   => {
516         desc => q/Only import records that have no collisions/
517     },
518         record_type     => 'bib'
519 );
520
521 __PACKAGE__->register_method(  
522         api_name        => "open-ils.vandelay.auth_queue.nomatch.import",
523         method          => 'import_queue',
524         api_level       => 1,
525         argc            => 2,
526     stream      => 1,
527     signature   => {
528         desc => q/Only import records that have no collisions/
529     },
530         record_type     => 'auth'
531 );
532 sub import_queue {
533     my($self, $conn, $auth, $q_id, $options) = @_;
534     my $e = new_editor(authtoken => $auth);
535     return $e->event unless $e->checkauth;
536     $options ||= {};
537     my $type = $self->{record_type};
538     my $class = ($type eq 'bib') ? 'vqbr' : 'vqar';
539
540     my $query = {queue => $q_id, import_time => undef};
541
542     if($self->api_name =~ /nomatch/) {
543         my $matched_recs = queued_records_with_matches($e, $type, $q_id, undef, undef, {import_time => undef});
544         $query->{id} = {'not in' => $matched_recs} if @$matched_recs;
545     }
546
547     my $search = ($type eq 'bib') ? 
548         'search_vandelay_queued_bib_record' : 'search_vandelay_queued_authority_record';
549     my $rec_ids = $e->$search($query, {idlist => 1});
550     my $err = import_record_list_impl($self, $conn, $rec_ids, $e->requestor, $options);
551     return $err if $err;
552     return {complete => 1};
553 }
554
555 # returns a list of queued record IDs for a given queue that 
556 # have at least one entry in the match table
557 sub queued_records_with_matches {
558     my($e, $type, $q_id, $limit, $offset, $filter) = @_;
559
560     my $match_class = 'vbm';
561     my $rec_class = 'vqbr';
562     if($type eq 'auth') {
563         $match_class = 'vam';
564          $rec_class = 'vqar';
565     }
566
567     $filter ||= {};
568     $filter->{queue} = $q_id;
569
570     my $query = {
571         distinct => 1, 
572         select => {$match_class => ['queued_record']}, 
573         from => {
574             $match_class => {
575                 $rec_class => {
576                     field => 'id',
577                     fkey => 'queued_record',
578                     filter => $filter,
579                 }
580             }
581         }
582     };        
583
584     if($limit or defined $offset) {
585         $limit ||= 20;
586         $offset ||= 0;
587         $query->{limit} = $limit;
588         $query->{offset} = $offset;
589     }
590
591     my $data = $e->json_query($query);
592     return [ map {$_->{queued_record}} @$data ];
593 }
594
595 sub import_record_list_impl {
596     my($self, $conn, $rec_ids, $requestor, $args) = @_;
597
598     my $overlay_map = $args->{overlay_map} || {};
599     my $type = $self->{record_type};
600     my $total = @$rec_ids;
601     my $count = 0;
602     my %queues;
603     my @ingest_queue;
604     my $auto_overlay_exact = $$args{auto_overlay_exact};
605     my $auto_overlay_1match = $$args{auto_overlay_1match};
606     my $merge_profile = $$args{merge_profile};
607     my $bib_source = $$args{bib_source};
608
609     my $overlay_func = 'vandelay.overlay_bib_record';
610     my $auto_overlay_func = 'vandelay.auto_overlay_bib_record';
611     my $retrieve_func = 'retrieve_vandelay_queued_bib_record';
612     my $update_func = 'update_vandelay_queued_bib_record';
613     my $search_func = 'search_vandelay_queued_bib_record';
614     my $retrieve_queue_func = 'retrieve_vandelay_bib_queue';
615     my $update_queue_func = 'update_vandelay_bib_queue';
616     my $rec_class = 'vqbr';
617
618     if($type eq 'auth') {
619         $overlay_func =~ s/bib/auth/o;
620         $auto_overlay_func = s/bib/auth/o;
621         $retrieve_func =~ s/bib/authority/o;
622         $retrieve_queue_func =~ s/bib/authority/o;
623         $update_queue_func =~ s/bib/authority/o;
624         $update_func =~ s/bib/authority/o;
625         $search_func =~ s/bib/authority/o;
626         $rec_class = 'vqar';
627     }
628
629     my $ingest_ses = OpenSRF::AppSession->connect('open-ils.ingest');
630
631     for my $rec_id (@$rec_ids) {
632
633         my $overlay_target = $overlay_map->{$rec_id};
634
635         my $e = new_editor(xact => 1);
636         $e->requestor($requestor);
637
638         my $rec = $e->$retrieve_func([
639             $rec_id,
640             {   flesh => 1,
641                 flesh_fields => { $rec_class => ['matches']},
642             }
643         ]);
644
645         unless($rec) {
646             $conn->respond({total => $total, progress => ++$count, imported => $rec_id, err_event => $e->die_event});
647             $e->rollback;
648             next;
649         }
650
651         if($rec->import_time) {
652             $e->rollback;
653             next;
654         }
655
656         $queues{$rec->queue} = 1;
657
658         my $record;
659         my $imported = 0;
660
661         if(defined $overlay_target) {
662             # Caller chose an explicit overlay target
663
664             my $res = $e->json_query(
665                 {
666                     from => [
667                         $overlay_func,
668                         $rec->id, 
669                         $overlay_target, 
670                         $merge_profile
671                     ]
672                 }
673             )->[0];
674
675             if($res->{$overlay_func} eq 't') {
676                 $logger->info("vl: $type direct overlay succeeded for queued rec " . 
677                     $rec->id . " and overlay target $overlay_target");
678                 $imported = 1;
679             }
680
681         } else {
682
683             if($auto_overlay_1match) { 
684                 # caller says to overlay if there is exactly 1 match
685
686                 my %match_recs = map { $_->eg_record => 1 } @{$rec->matches};
687
688                 if( scalar(keys %match_recs) == 1) { # all matches point to the same record
689
690                     my $res = $e->json_query(
691                         {
692                             from => [
693                                 $overlay_func,
694                                 $rec->id, 
695                                 $rec->matches->[0]->eg_record,
696                                 $merge_profile
697                             ]
698                         }
699                     )->[0];
700
701                     if($res->{$overlay_func} eq 't') {
702                         $logger->info("vl: $type overlay-1match succeeded for queued rec " . $rec->id);
703                         $imported = 1;
704                     }
705                 }
706             }
707
708             if(!$imported and $auto_overlay_exact and scalar(@{$rec->matches}) == 1 ) {
709                 
710                 # caller says to overlay if there is an /exact/ match
711
712                 my $res = $e->json_query(
713                     {
714                         from => [
715                             $auto_overlay_func,
716                             $rec->id, 
717                             $merge_profile
718                         ]
719                     }
720                 )->[0];
721
722                 if($res->{$auto_overlay_func} eq 't') {
723                     $logger->info("vl: $type auto-overlay succeeded for queued rec " . $rec->id);
724                     $imported = 1;
725                 }
726             }
727
728             if(!$imported) {
729             
730                 # No overlay / merge occured.  Do a traditional record import by creating a new record
731             
732                 if($type eq 'bib') {
733                     $record = OpenILS::Application::Cat::BibCommon->biblio_record_xml_import($e, $rec->marc); #$rec->bib_source
734
735                 } else {
736
737                     $record = OpenILS::Application::Cat::AuthCommon->import_authority_record($e, $rec->marc); #$source);
738                     push @ingest_queue, { 
739                         req => $ingest_ses->request('open-ils.ingest.full.authority.record', $record->id), 
740                         rec_id => $record->id 
741                     };
742                 }
743
744                 if($U->event_code($record)) {
745
746                     $e->event($record); 
747
748                 } else {
749
750                     $logger->info("vl: successfully importid new $type record");
751                     $rec->imported_as($record->id);
752                     $rec->import_time('now');
753
754                     $imported = 1 if $e->$update_func($rec);
755                 }
756             }
757         }
758
759         if($imported) {
760
761             $e->commit;
762             $conn->respond({total => $total, progress => ++$count, imported => $rec_id});
763
764         } else {
765
766             $e->rollback;
767             $conn->respond({total => $total, progress => ++$count, imported => $rec_id, err_event => $e->die_event});
768         }
769
770         $conn->respond({total => $total, progress => $count, imported => $rec_id}) if (++$count % 10) == 0;
771     }
772
773     # see if we need to mark any queues as complete
774     my $e = new_editor(xact => 1);
775     for my $q_id (keys %queues) {
776
777         my $remaining = $e->$search_func(
778             [{queue => $q_id, import_time => undef}, {limit =>1}], {idlist => 1});
779
780         unless(@$remaining) {
781             my $queue = $e->$retrieve_queue_func($q_id);
782
783             unless($U->is_true($queue->complete)) {
784                 $queue->complete('t');
785                 $e->$update_queue_func($queue) or return $e->die_event;
786                 $e->commit;
787                 last
788             }
789             
790         } 
791     }
792     $e->rollback;
793
794     $count = 0;
795     for my $ingest (@ingest_queue) {
796         try { $ingest->{req}->gather(1); } otherwise {};
797         $conn->respond({total => $total, progress => $count, imported => $ingest->{rec_id}}) if (++$count % 10) == 0;
798     } 
799
800     $ingest_ses->disconnect();
801     return undef;
802 }
803
804
805 __PACKAGE__->register_method(  
806         api_name        => "open-ils.vandelay.bib_queue.owner.retrieve",
807         method          => 'owner_queue_retrieve',
808         api_level       => 1,
809         argc            => 2,
810     stream      => 1,
811         record_type     => 'bib'
812 );
813 __PACKAGE__->register_method(  
814         api_name        => "open-ils.vandelay.authority_queue.owner.retrieve",
815         method          => 'owner_queue_retrieve',
816         api_level       => 1,
817         argc            => 2,
818     stream      => 1,
819         record_type     => 'auth'
820 );
821
822 sub owner_queue_retrieve {
823     my($self, $conn, $auth, $owner_id, $filters) = @_;
824     my $e = new_editor(authtoken => $auth);
825     return $e->die_event unless $e->checkauth;
826     $owner_id = $e->requestor->id; # XXX add support for viewing other's queues?
827     my $queues;
828     $filters ||= {};
829     my $search = {owner => $owner_id};
830     $search->{$_} = $filters->{$_} for keys %$filters;
831
832     if($self->{record_type} eq 'bib') {
833         $queues = $e->search_vandelay_bib_queue(
834             [$search, {order_by => {vbq => 'lower(name)'}}]);
835     } else {
836         $queues = $e->search_vandelay_authority_queue(
837             [$search, {order_by => {vaq => 'lower(name)'}}]);
838     }
839     $conn->respond($_) for @$queues;
840     return undef;
841 }
842
843 __PACKAGE__->register_method(  
844         api_name        => "open-ils.vandelay.bib_queue.delete",
845         method          => "delete_queue",
846         api_level       => 1,
847         argc            => 2,
848         record_type     => 'bib'
849 );            
850 __PACKAGE__->register_method(  
851         api_name        => "open-ils.vandelay.auth_queue.delete",
852         method          => "delete_queue",
853         api_level       => 1,
854         argc            => 2,
855         record_type     => 'auth'
856 );  
857
858 sub delete_queue {
859     my($self, $conn, $auth, $q_id) = @_;
860     my $e = new_editor(xact => 1, authtoken => $auth);
861     return $e->die_event unless $e->checkauth;
862     if($self->{record_type} eq 'bib') {
863             return $e->die_event unless $e->allowed('CREATE_BIB_IMPORT_QUEUE');
864         my $queue = $e->retrieve_vandelay_bib_queue($q_id)
865             or return $e->die_event;
866         $e->delete_vandelay_bib_queue($queue)
867             or return $e->die_event;
868     } else {
869             return $e->die_event unless $e->allowed('CREATE_AUTHORITY_IMPORT_QUEUE');
870         my $queue = $e->retrieve_vandelay_authority_queue($q_id)
871             or return $e->die_event;
872         $e->delete_vandelay_authority_queue($queue)
873             or return $e->die_event;
874     }
875     $e->commit;
876     return 1;
877 }
878
879
880 __PACKAGE__->register_method(  
881         api_name        => "open-ils.vandelay.queued_bib_record.html",
882         method          => 'queued_record_html',
883         api_level       => 1,
884         argc            => 2,
885     stream      => 1,
886         record_type     => 'bib'
887 );
888 __PACKAGE__->register_method(  
889         api_name        => "open-ils.vandelay.queued_authority_record.html",
890         method          => 'queued_record_html',
891         api_level       => 1,
892         argc            => 2,
893     stream      => 1,
894         record_type     => 'auth'
895 );
896
897 sub queued_record_html {
898     my($self, $conn, $auth, $rec_id) = @_;
899     my $e = new_editor(authtoken => $auth);
900     return $e->event unless $e->checkauth;
901     my $rec;
902     if($self->{record_type} eq 'bib') {
903         $rec = $e->retrieve_vandelay_queued_bib_record($rec_id)
904             or return $e->event;
905     } else {
906         $rec = $e->retrieve_vandelay_queued_authority_record($rec_id)
907             or return $e->event;
908     }
909
910     return $U->simplereq(
911         'open-ils.search',
912         'open-ils.search.biblio.record.html', undef, 1, $rec->marc);
913 }
914
915
916 __PACKAGE__->register_method(  
917         api_name        => "open-ils.vandelay.bib_queue.summary.retrieve", 
918         method          => 'retrieve_queue_summary',
919         api_level       => 1,
920         argc            => 2,
921     stream      => 1,
922         record_type     => 'bib'
923 );
924 __PACKAGE__->register_method(  
925         api_name        => "open-ils.vandelay.auth_queue.summary.retrieve",
926         method          => 'retrieve_queue_summary',
927         api_level       => 1,
928         argc            => 2,
929     stream      => 1,
930         record_type     => 'auth'
931 );
932
933 sub retrieve_queue_summary {
934     my($self, $conn, $auth, $queue_id) = @_;
935     my $e = new_editor(authtoken => $auth);
936     return $e->event unless $e->checkauth;
937
938     my $queue;
939     my $type = $self->{record_type};
940     if($type eq 'bib') {
941         $queue = $e->retrieve_vandelay_bib_queue($queue_id)
942             or return $e->event;
943     } else {
944         $queue = $e->retrieve_vandelay_authority_queue($queue_id)
945             or return $e->event;
946     }
947
948     my $evt = check_queue_perms($e, $type, $queue);
949     return $evt if $evt;
950
951     my $search = 'search_vandelay_queued_bib_record';
952     $search =~ s/bib/authority/ if $type ne 'bib';
953
954     return {
955         queue => $queue,
956         total => scalar(@{$e->$search({queue => $queue_id}, {idlist=>1})}),
957         imported => scalar(@{$e->$search({queue => $queue_id, import_time => {'!=' => undef}}, {idlist=>1})}),
958     };
959 }
960
961
962 __PACKAGE__->register_method(  
963         api_name        => "open-ils.vandelay.bib_record.list.asset.import",
964         method          => 'import_record_list_assets',
965         api_level       => 1,
966         argc            => 2,
967     stream      => 1,
968         record_type     => 'bib'
969 );
970 __PACKAGE__->register_method(  
971         api_name        => "open-ils.vandelay.bib_record.queue.asset.import",
972         method          => 'import_record_queue_assets',
973         api_level       => 1,
974         argc            => 2,
975     stream      => 1,
976         record_type     => 'bib'
977 );
978
979 sub import_record_list_assets {
980     my($self, $conn, $auth, $import_def, $rec_ids) = @_;
981     my $e = new_editor(authtoken => $auth);
982     return $e->event unless $e->checkauth;
983     my $err = import_record_asset_list_impl($conn, $import_def, $rec_ids, $e->requestor);
984     return $err if $err;
985     return {complete => 1};
986 }
987
988 sub import_record_queue_assets {
989     my($self, $conn, $auth, $import_def, $q_id) = @_;
990     my $e = new_editor(authtoken => $auth);
991     return $e->event unless $e->checkauth;
992     my $rec_ids = $e->search_vandelay_queued_bib_record(
993         {queue => $q_id, import_time => {'!=' => undef}}, {idlist => 1});
994     my $err = import_record_asset_list_impl($conn, $import_def, $rec_ids, $e->requestor);
995     return $err if $err;
996     return {complete => 1};
997 }
998
999 # --------------------------------------------------------------------------------
1000 # Given a list of queued record IDs, imports all items attached to those records
1001 # --------------------------------------------------------------------------------
1002 sub import_record_asset_list_impl {
1003     my($conn, $import_def, $rec_ids, $requestor) = @_;
1004
1005     my $total = @$rec_ids;
1006     my $try_count = 0;
1007     my $in_count = 0;
1008     my $roe = new_editor(requestor => $requestor);
1009
1010     for my $rec_id (@$rec_ids) {
1011         my $rec = $roe->retrieve_vandelay_queued_bib_record($rec_id);
1012         next unless $rec and $rec->import_time;
1013         my $item_ids = $roe->search_vandelay_import_item({definition => $import_def, record => $rec->id}, {idlist=>1});
1014
1015         for my $item_id (@$item_ids) {
1016             my $e = new_editor(requestor => $requestor, xact => 1);
1017             my $item = $e->retrieve_vandelay_import_item($item_id);
1018             $try_count++;
1019
1020             # --------------------------------------------------------------------------------
1021             # Find or create the volume
1022             # --------------------------------------------------------------------------------
1023             my ($vol, $evt) =
1024                 OpenILS::Application::Cat::AssetCommon->find_or_create_volume(
1025                     $e, $item->call_number, $rec->imported_as, $item->owning_lib);
1026
1027             if($evt) {
1028                 respond_with_status($conn, $total, $try_count, $in_count, $evt);
1029                 $e->rollback;
1030                 next;
1031             }
1032
1033             # --------------------------------------------------------------------------------
1034             # Create the new copy
1035             # --------------------------------------------------------------------------------
1036             my $copy = Fieldmapper::asset::copy->new;
1037             $copy->loan_duration(2);
1038             $copy->fine_level(2);
1039             $copy->barcode($item->barcode);
1040             $copy->location($item->location);
1041             $copy->circ_lib($item->circ_lib || $item->owning_lib);
1042             $copy->status($item->status || OILS_COPY_STATUS_IN_PROCESS);
1043             $copy->circulate($item->circulate);
1044             $copy->deposit($item->deposit);
1045             $copy->deposit_amount($item->deposit_amount);
1046             $copy->ref($item->ref);
1047             $copy->holdable($item->holdable);
1048             $copy->price($item->price);
1049             $copy->circ_as_type($item->circ_as_type);
1050             $copy->alert_message($item->alert_message);
1051             $copy->opac_visible($item->opac_visible);
1052             $copy->circ_modifier($item->circ_modifier);
1053
1054             # --------------------------------------------------------------------------------
1055             # see if a valid circ_modifier was provided
1056             # --------------------------------------------------------------------------------
1057             #if($copy->circ_modifier and not $e->retrieve_config_circ_modifier($item->circ_modifier)) {
1058             if($copy->circ_modifier and not $e->search_config_circ_modifier({code=>$item->circ_modifier})->[0]) {
1059                 respond_with_status($conn, $total, $try_count, $in_count, $e->die_event);
1060                 $e->rollback;
1061                 next;
1062             }
1063
1064             if($evt = OpenILS::Application::Cat::AssetCommon->create_copy($e, $vol, $copy)) {
1065                 $e->rollback;
1066                 respond_with_status($conn, $total, $try_count, $in_count, $evt);
1067                 next;
1068             }
1069
1070             # --------------------------------------------------------------------------------
1071             # create copy notes
1072             # --------------------------------------------------------------------------------
1073             $evt = OpenILS::Application::Cat::AssetCommon->create_copy_note(
1074                 $e, $copy, '', $item->pub_note, 1) if $item->pub_note;
1075
1076             if($evt) {
1077                 respond_with_status($conn, $total, $try_count, $in_count, $evt);
1078                 $e->rollback;
1079                 next;
1080             }
1081
1082             $evt = OpenILS::Application::Cat::AssetCommon->create_copy_note(
1083                 $e, $copy, '', $item->priv_note, 1) if $item->priv_note;
1084
1085             if($evt) {
1086                 respond_with_status($conn, $total, $try_count, $in_count, $evt);
1087                 $e->rollback;
1088                 next;
1089             }
1090
1091             # --------------------------------------------------------------------------------
1092             # Item import succeeded
1093             # --------------------------------------------------------------------------------
1094             $e->commit;
1095             respond_with_status($conn, $total, $try_count, ++$in_count, undef, imported_as => $copy->id);
1096         }
1097     }
1098     return undef;
1099 }
1100
1101
1102 sub respond_with_status {
1103     my($conn, $total, $try_count, $success_count, $err, %args) = @_;
1104     $conn->respond({
1105         total => $total, 
1106         progress => $try_count, 
1107         err_event => $err, 
1108         success_count => $success_count, %args }) if $err or ($try_count % 5 == 0);
1109 }
1110
1111
1112 1;