LP#1514085 Vandelay in-database session tracking
[working/Evergreen.git] / Open-ILS / src / perlmods / lib / OpenILS / Application / Vandelay.pm
1 package OpenILS::Application::Vandelay;
2 use strict; use warnings;
3 use OpenILS::Application;
4 use base qw/OpenILS::Application/;
5 use Unicode::Normalize;
6 use OpenSRF::EX qw/:try/;
7 use OpenSRF::AppSession;
8 use OpenSRF::Utils::SettingsClient;
9 use OpenSRF::Utils::Cache;
10 use OpenILS::Utils::Fieldmapper;
11 use OpenILS::Utils::CStoreEditor qw/:funcs/;
12 use OpenILS::Utils::Normalize qw/clean_marc/;
13 use MARC::Batch;
14 use MARC::Record;
15 use MARC::File::XML ( BinaryEncoding => 'UTF-8' );
16 use Time::HiRes qw(time);
17 use OpenSRF::Utils::Logger qw/$logger/;
18 use MIME::Base64;
19 use XML::LibXML;
20 use Digest::MD5 qw/md5_hex/;
21 use OpenILS::Const qw/:const/;
22 use OpenILS::Application::AppUtils;
23 use OpenILS::Application::Cat::BibCommon;
24 use OpenILS::Application::Cat::AuthCommon;
25 use OpenILS::Application::Cat::AssetCommon;
26 use OpenILS::Application::Cat::Merge;
27 my $U = 'OpenILS::Application::AppUtils';
28
29 # A list of LDR/06 values from http://loc.gov/marc
30 my %record_types = (
31         a => 'bib',
32         c => 'bib',
33         d => 'bib',
34         e => 'bib',
35         f => 'bib',
36         g => 'bib',
37         i => 'bib',
38         j => 'bib',
39         k => 'bib',
40         m => 'bib',
41         o => 'bib',
42         p => 'bib',
43         r => 'bib',
44         t => 'bib',
45         u => 'holdings',
46         v => 'holdings',
47         x => 'holdings',
48         y => 'holdings',
49         z => 'auth',
50       ' ' => 'bib',
51 );
52
53 sub initialize {}
54 sub child_init {}
55
56 # --------------------------------------------------------------------------------
57 # Biblio ingest
58
59 sub create_bib_queue {
60     my $self = shift;
61     my $client = shift;
62     my $auth = shift;
63     my $name = shift;
64     my $owner = shift;
65     my $type = shift;
66     my $match_set = shift;
67     my $import_def = shift;
68     my $match_bucket = shift;
69
70     my $e = new_editor(authtoken => $auth, xact => 1);
71
72     return $e->die_event unless $e->checkauth;
73     return $e->die_event unless $e->allowed('CREATE_BIB_IMPORT_QUEUE');
74     $owner ||= $e->requestor->id;
75
76     if ($e->search_vandelay_bib_queue( {name => $name, owner => $owner, queue_type => $type})->[0]) {
77         $e->rollback;
78         return OpenILS::Event->new('BIB_QUEUE_EXISTS') 
79     }
80
81     my $queue = new Fieldmapper::vandelay::bib_queue();
82     $queue->name( $name );
83     $queue->owner( $owner );
84     $queue->queue_type( $type ) if ($type);
85     $queue->item_attr_def( $import_def ) if ($import_def);
86     $queue->match_set($match_set) if $match_set;
87     $queue->match_bucket($match_bucket) if $match_bucket;
88
89     my $new_q = $e->create_vandelay_bib_queue( $queue );
90     return $e->die_event unless ($new_q);
91     $e->commit;
92
93     return $new_q;
94 }
95 __PACKAGE__->register_method(  
96     api_name   => "open-ils.vandelay.bib_queue.create",
97     method     => "create_bib_queue",
98     api_level  => 1,
99     argc       => 4,
100 );                      
101
102
103 sub create_auth_queue {
104     my $self = shift;
105     my $client = shift;
106     my $auth = shift;
107     my $name = shift;
108     my $owner = shift;
109     my $type = shift;
110     my $match_set = shift;
111
112     my $e = new_editor(authtoken => $auth, xact => 1);
113
114     return $e->die_event unless $e->checkauth;
115     return $e->die_event unless $e->allowed('CREATE_AUTHORITY_IMPORT_QUEUE');
116     $owner ||= $e->requestor->id;
117
118     if ($e->search_vandelay_authority_queue({name => $name, owner => $owner, queue_type => $type})->[0]) {
119         $e->rollback;
120         return OpenILS::Event->new('AUTH_QUEUE_EXISTS') 
121     }
122
123     my $queue = new Fieldmapper::vandelay::authority_queue();
124     $queue->name( $name );
125     $queue->owner( $owner );
126     $queue->queue_type( $type ) if ($type);
127     $queue->match_set($match_set) if $match_set;
128
129     my $new_q = $e->create_vandelay_authority_queue( $queue );
130     $e->die_event unless ($new_q);
131     $e->commit;
132
133     return $new_q;
134 }
135 __PACKAGE__->register_method(  
136     api_name   => "open-ils.vandelay.authority_queue.create",
137     method     => "create_auth_queue",
138     api_level  => 1,
139     argc       => 3,
140 );                      
141
142 sub add_record_to_bib_queue {
143     my $self = shift;
144     my $client = shift;
145     my $auth = shift;
146     my $queue = shift;
147     my $marc = shift;
148     my $purpose = shift;
149     my $bib_source = shift;
150
151     my $e = new_editor(authtoken => $auth, xact => 1);
152
153     $queue = $e->retrieve_vandelay_bib_queue($queue);
154
155     return $e->die_event unless $e->checkauth;
156     return $e->die_event unless
157         ($e->allowed('CREATE_BIB_IMPORT_QUEUE', undef, $queue) ||
158          $e->allowed('CREATE_BIB_IMPORT_QUEUE'));
159
160     my $new_rec = _add_bib_rec($e, $marc, $queue->id, $purpose, $bib_source);
161
162     return $e->die_event unless ($new_rec);
163     $e->commit;
164     return $new_rec;
165 }
166 __PACKAGE__->register_method(  
167     api_name   => "open-ils.vandelay.queued_bib_record.create",
168     method     => "add_record_to_bib_queue",
169     api_level  => 1,
170     argc       => 3,
171 );                      
172
173 sub _add_bib_rec {
174     my $e = shift;
175     my $marc = shift;
176     my $queue = shift;
177     my $purpose = shift;
178     my $bib_source = shift;
179
180     my $rec = new Fieldmapper::vandelay::queued_bib_record();
181     $rec->marc( $marc );
182     $rec->queue( $queue );
183     $rec->purpose( $purpose ) if ($purpose);
184     $rec->bib_source($bib_source);
185
186     return $e->create_vandelay_queued_bib_record( $rec, {timeout => 600} );
187 }
188
189 sub add_record_to_authority_queue {
190     my $self = shift;
191     my $client = shift;
192     my $auth = shift;
193     my $queue = shift;
194     my $marc = shift;
195     my $purpose = shift;
196
197     my $e = new_editor(authtoken => $auth, xact => 1);
198
199     $queue = $e->retrieve_vandelay_authority_queue($queue);
200
201     return $e->die_event unless $e->checkauth;
202     return $e->die_event unless
203         ($e->allowed('CREATE_AUTHORITY_IMPORT_QUEUE', undef, $queue) ||
204          $e->allowed('CREATE_AUTHORITY_IMPORT_QUEUE'));
205
206     my $new_rec = _add_auth_rec($e, $marc, $queue->id, $purpose);
207
208     return $e->die_event unless ($new_rec);
209     $e->commit;
210     return $new_rec;
211 }
212 __PACKAGE__->register_method(
213     api_name   => "open-ils.vandelay.queued_authority_record.create",
214     method     => "add_record_to_authority_queue",
215     api_level  => 1,
216     argc       => 3,
217 );
218
219 sub _add_auth_rec {
220     my $e = shift;
221     my $marc = shift;
222     my $queue = shift;
223     my $purpose = shift;
224
225     my $rec = new Fieldmapper::vandelay::queued_authority_record();
226     $rec->marc( $marc );
227     $rec->queue( $queue );
228     $rec->purpose( $purpose ) if ($purpose);
229
230     return $e->create_vandelay_queued_authority_record( $rec, {timeout => 600} );
231 }
232
233 sub process_spool {
234     my $self = shift;
235     my $client = shift;
236     my $auth = shift;
237     my $fingerprint = shift || '';
238     my $queue_id = shift;
239     my $purpose = shift;
240     my $filename = shift;
241     my $bib_source = shift;
242     my $session_name = shift;
243     my $exit_early = shift;
244
245     $client->max_chunk_count($self->{max_bundle_count}) if (!$client->can('max_bundle_count') && $self->{max_bundle_count});
246
247     my $e = new_editor(authtoken => $auth, xact => 1);
248     return $e->die_event unless $e->checkauth;
249
250     my $queue;
251     my $type = $self->{record_type};
252
253     if($type eq 'bib') {
254         $queue = $e->retrieve_vandelay_bib_queue($queue_id) or return $e->die_event;
255     } else {
256         $queue = $e->retrieve_vandelay_authority_queue($queue_id) or return $e->die_event;
257     }
258
259     my $evt = check_queue_perms($e, $type, $queue);
260     return $evt if ($evt);
261
262     my $cache = new OpenSRF::Utils::Cache();
263
264     if($fingerprint) {
265         my $data = $cache->get_cache('vandelay_import_spool_' . $fingerprint);
266         $purpose = $data->{purpose};
267         $filename = $data->{path};
268         $bib_source = $data->{bib_source};
269     }
270
271     unless(-r $filename) {
272         $logger->error("unable to read MARC file $filename");
273         return -1; # make this an event XXX
274     }
275
276     $logger->info("vandelay spooling $fingerprint purpose=$purpose file=$filename");
277
278     my ($tracker, $tevt) = create_session_tracker(
279         $e->requestor->id, $e->requestor->wsid, $fingerprint, 
280         $session_name, $type, $queue_id, 'enqueue');
281
282     if (!$tracker) {
283         $e->rollback;
284         return $tevt;
285     }
286
287     $client->respond_complete($tracker) if $exit_early;
288
289     my $marctype = 'USMARC'; 
290
291     open F, $filename;
292     $marctype = 'XML' if (getc(F) =~ /^\D/o);
293     close F;
294
295     my $batch = new MARC::Batch ($marctype, $filename);
296     $batch->strict_off;
297
298     my $response_scale = 10;
299     my $count = 0;
300     my $r = -1;
301     while (try { $r = $batch->next } otherwise { $r = -1 }) {
302         if ($r == -1) {
303             $logger->warn("Processing of record $count in set $filename failed.  Skipping this record");
304             $count++;
305         }
306
307         $logger->info("processing record $count");
308
309         try {
310             my $xml = clean_marc($r);
311
312             my $qrec;
313             # Check the leader to ensure we've got something resembling the expected
314             # Allow spaces to give records the benefit of the doubt
315             my $ldr_type = substr($r->leader(), 6, 1);
316             if ($type eq 'bib' && ($record_types{$ldr_type}) eq 'bib' || $ldr_type eq ' ') {
317                 $qrec = _add_bib_rec( $e, $xml, $queue_id, $purpose, $bib_source ) or return $e->die_event;
318             } elsif ($type eq 'auth' && ($record_types{$ldr_type}) eq 'auth' || $ldr_type eq ' ') {
319                 $qrec = _add_auth_rec( $e, $xml, $queue_id, $purpose ) or return $e->die_event;
320             } else {
321                 # I don't know how to handle this type; rock on
322                 $logger->error("In process_spool(), type was $type and leader type was $ldr_type ; not currently supported");
323                 next;
324             }
325
326             return $tevt if $tevt = increment_session_tracker($tracker);
327
328             if (!$exit_early) { # avoid unnecessary responses
329                 if($self->api_name =~ /stream_results/ and $qrec) {
330                     $client->respond($qrec->id)
331                 } else {
332                     $client->respond($count) if (++$count % $response_scale) == 0;
333                     $response_scale *= 10 if ($count == ($response_scale * 10));
334                 }
335             }
336         } catch Error with {
337             my $error = shift;
338             $logger->warn("Encountered a bad record at Vandelay ingest: ".$error);
339         }
340     }
341
342     $tracker->state('complete');
343     $e->update_vandelay_session_tracker($tracker) or return $e->die_event;
344
345     $e->commit;
346     unlink($filename);
347     $cache->delete_cache('vandelay_import_spool_' . $fingerprint) if $fingerprint;
348     return $count;
349 }
350
351 __PACKAGE__->register_method(  
352     api_name    => "open-ils.vandelay.bib.process_spool",
353     method      => "process_spool",
354     api_level   => 1,
355     argc        => 3,
356     max_bundle_count => 1,
357     record_type => 'bib'
358 );                      
359 __PACKAGE__->register_method(  
360     api_name    => "open-ils.vandelay.auth.process_spool",
361     method      => "process_spool",
362     api_level   => 1,
363     argc        => 3,
364     max_bundle_count => 1,
365     record_type => 'auth'
366 );                      
367
368 __PACKAGE__->register_method(  
369     api_name    => "open-ils.vandelay.bib.process_spool.stream_results",
370     method      => "process_spool",
371     api_level   => 1,
372     argc        => 3,
373     stream      => 1,
374     max_bundle_count => 1,
375     record_type => 'bib'
376 );                      
377 __PACKAGE__->register_method(  
378     api_name    => "open-ils.vandelay.auth.process_spool.stream_results",
379     method      => "process_spool",
380     api_level   => 1,
381     argc        => 3,
382     stream      => 1,
383     max_bundle_count => 1,
384     record_type => 'auth'
385 );
386
387 __PACKAGE__->register_method(  
388     api_name    => "open-ils.vandelay.bib_queue.records.retrieve",
389     method      => 'retrieve_queued_records',
390     api_level   => 1,
391     argc        => 2,
392     stream      => 1,
393     record_type => 'bib'
394 );
395 __PACKAGE__->register_method(
396     api_name    => "open-ils.vandelay.bib_queue.records.retrieve.export.print",
397     method      => 'retrieve_queued_records',
398     api_level   => 1,
399     argc        => 2,
400     stream      => 1,
401     record_type => 'bib'
402 );
403 __PACKAGE__->register_method(
404     api_name    => "open-ils.vandelay.bib_queue.records.retrieve.export.csv",
405     method      => 'retrieve_queued_records',
406     api_level   => 1,
407     argc        => 2,
408     stream      => 1,
409     record_type => 'bib'
410 );
411 __PACKAGE__->register_method(
412     api_name    => "open-ils.vandelay.bib_queue.records.retrieve.export.email",
413     method      => 'retrieve_queued_records',
414     api_level   => 1,
415     argc        => 2,
416     stream      => 1,
417     record_type => 'bib'
418 );
419
420 __PACKAGE__->register_method(  
421     api_name    => "open-ils.vandelay.auth_queue.records.retrieve",
422     method      => 'retrieve_queued_records',
423     api_level   => 1,
424     argc        => 2,
425     stream      => 1,
426     record_type => 'auth'
427 );
428 __PACKAGE__->register_method(
429     api_name    => "open-ils.vandelay.auth_queue.records.retrieve.export.print",
430     method      => 'retrieve_queued_records',
431     api_level   => 1,
432     argc        => 2,
433     stream      => 1,
434     record_type => 'auth'
435 );
436 __PACKAGE__->register_method(
437     api_name    => "open-ils.vandelay.auth_queue.records.retrieve.export.csv",
438     method      => 'retrieve_queued_records',
439     api_level   => 1,
440     argc        => 2,
441     stream      => 1,
442     record_type => 'auth'
443 );
444 __PACKAGE__->register_method(
445     api_name    => "open-ils.vandelay.auth_queue.records.retrieve.export.email",
446     method      => 'retrieve_queued_records',
447     api_level   => 1,
448     argc        => 2,
449     stream      => 1,
450     record_type => 'auth'
451 );
452
453 __PACKAGE__->register_method(  
454     api_name    => "open-ils.vandelay.bib_queue.records.matches.retrieve",
455     method      => 'retrieve_queued_records',
456     api_level   => 1,
457     argc        => 2,
458     stream      => 1,
459     record_type => 'bib',
460     signature   => {
461         desc => q/Only retrieve queued bib records that have matches against existing records/
462     }
463 );
464 __PACKAGE__->register_method(  
465     api_name    => "open-ils.vandelay.auth_queue.records.matches.retrieve",
466     method      => 'retrieve_queued_records',
467     api_level   => 1,
468     argc        => 2,
469     stream      => 1,
470     record_type => 'auth',
471     signature   => {
472         desc => q/Only retrieve queued authority records that have matches against existing records/
473     }
474 );
475
476 sub retrieve_queued_records {
477     my($self, $conn, $auth, $queue_id, $options) = @_;
478
479     $options ||= {};
480     my $limit = $$options{limit} || 20;
481     my $offset = $$options{offset} || 0;
482     my $type = $self->{record_type};
483
484     my $e = new_editor(authtoken => $auth, xact => 1);
485     return $e->die_event unless $e->checkauth;
486
487     my $queue;
488     if($type eq 'bib') {
489         $queue = $e->retrieve_vandelay_bib_queue($queue_id) or return $e->die_event;
490     } else {
491         $queue = $e->retrieve_vandelay_authority_queue($queue_id) or return $e->die_event;
492     }
493     my $evt = check_queue_perms($e, $type, $queue);
494     return $evt if ($evt);
495
496     my $class = ($type eq 'bib') ? 'vqbr' : 'vqar';
497     my $mclass = $type eq 'bib' ? 'vbm' : 'vam';
498     my $query = {
499         select => {
500             $class => ['id'],
501             $mclass => [{
502                 column => 'eg_record', 
503                 transform => 'min',
504                 aggregate => 1
505             }]
506         },
507         from => $class,
508         where => {queue => $queue_id},
509         distinct => 1,
510         limit => $limit,
511         offset => $offset,
512     };
513     if($self->api_name =~ /export/) {
514         delete $query->{limit};
515         delete $query->{offset};
516     }
517
518     $query->{where}->{import_time} = undef if $$options{non_imported};
519
520     if($$options{with_import_error}) {
521
522         $query->{from} = {$class => {vii => {type => 'left'}}};
523         $query->{where}->{'-or'} = [
524             {'+vqbr' => {import_error => {'!=' => undef}}},
525             {'+vii' => {import_error => {'!=' => undef}}}
526         ];
527
528     } else {
529         
530         if($$options{with_rec_import_error}) {
531             $query->{where}->{import_error} = {'!=' => undef};
532
533         } elsif( $$options{with_item_import_error} and $type eq 'bib') {
534
535             $query->{from} = {$class => 'vii'};
536             $query->{where}->{'+vii'} = {import_error => {'!=' => undef}};
537         }
538     }
539
540     if($self->api_name =~ /matches/) {
541         # find only records that have matches
542         $query->{from} = {$class => {$mclass => {type => 'right'}}};
543     } else {
544         # join to mclass for sorting (see below)
545         $query->{from} = {$class => {$mclass => {type => 'left'}}};
546     }
547
548     # order by the matched bib records to group like queued records
549     $query->{order_by} = [
550         {class => $mclass, field => 'eg_record', transform => 'min'},
551         {class => $class, field => 'id'} 
552     ];
553
554     my $record_ids = $e->json_query($query);
555
556     my $retrieve = ($type eq 'bib') ? 
557         'retrieve_vandelay_queued_bib_record' : 'retrieve_vandelay_queued_authority_record';
558     my $search = ($type eq 'bib') ? 
559         'search_vandelay_queued_bib_record' : 'search_vandelay_queued_authority_record';
560
561     if ($self->api_name =~ /export/) {
562         my $rec_list = $e->$search({id => [map { $_->{id} } @$record_ids]}, {substream => 1});
563         if ($self->api_name =~ /print/) {
564
565             $e->rollback;
566             return $U->fire_object_event(
567                 undef,
568                 'vandelay.queued_'.$type.'_record.print',
569                 $rec_list,
570                 $e->requestor->ws_ou
571             );
572
573         } elsif ($self->api_name =~ /csv/) {
574
575             $e->rollback;
576             return $U->fire_object_event(
577                 undef,
578                 'vandelay.queued_'.$type.'_record.csv',
579                 $rec_list,
580                 $e->requestor->ws_ou
581             );
582
583         } elsif ($self->api_name =~ /email/) {
584
585             $conn->respond_complete(1);
586
587             for my $rec (@$rec_list) {
588                 $U->create_events_for_hook(
589                     'vandelay.queued_'.$type.'_record.email',
590                     $rec,
591                     $e->requestor->home_ou,
592                     undef,
593                     undef,
594                     1
595                 );
596             }
597
598         }
599     } else {
600         for my $rec_id (@$record_ids) {
601             my $flesh = ['attributes', 'matches'];
602             push(@$flesh, 'import_items') if $$options{flesh_import_items};
603             my $params = {flesh => 1, flesh_fields => {$class => $flesh}};
604             my $rec = $e->$retrieve([$rec_id->{id}, $params]);
605             $rec->clear_marc if $$options{clear_marc};
606             $conn->respond($rec);
607         }
608     }
609
610     $e->rollback;
611     return undef;
612 }
613
614 __PACKAGE__->register_method(  
615     api_name    => 'open-ils.vandelay.import_item.queue.retrieve',
616     method      => 'retrieve_queue_import_items',
617     api_level   => 1,
618     argc        => 2,
619     stream      => 1,
620     authoritative => 1,
621     signature => q/
622         Returns Import Item (vii) objects for the selected queue.
623         Filter options:
624             with_import_error : only return items that failed to import
625     /
626 );
627 __PACKAGE__->register_method(
628     api_name    => 'open-ils.vandelay.import_item.queue.export.print',
629     method      => 'retrieve_queue_import_items',
630     api_level   => 1,
631     argc        => 2,
632     stream      => 1,
633     authoritative => 1,
634     signature => q/
635         Returns template-generated printable output of Import Item (vii) objects for the selected queue.
636         Filter options:
637             with_import_error : only return items that failed to import
638     /
639 );
640 __PACKAGE__->register_method(
641     api_name    => 'open-ils.vandelay.import_item.queue.export.csv',
642     method      => 'retrieve_queue_import_items',
643     api_level   => 1,
644     argc        => 2,
645     stream      => 1,
646     authoritative => 1,
647     signature => q/
648         Returns template-generated CSV output of Import Item (vii) objects for the selected queue.
649         Filter options:
650             with_import_error : only return items that failed to import
651     /
652 );
653 __PACKAGE__->register_method(
654     api_name    => 'open-ils.vandelay.import_item.queue.export.email',
655     method      => 'retrieve_queue_import_items',
656     api_level   => 1,
657     argc        => 2,
658     stream      => 1,
659     authoritative => 1,
660     signature => q/
661         Emails template-generated output of Import Item (vii) objects for the selected queue.
662         Filter options:
663             with_import_error : only return items that failed to import
664     /
665 );
666
667 sub retrieve_queue_import_items {
668     my($self, $conn, $auth, $q_id, $options) = @_;
669
670     $options ||= {};
671     my $limit = $$options{limit} || 20;
672     my $offset = $$options{offset} || 0;
673
674     my $e = new_editor(authtoken => $auth);
675     return $e->event unless $e->checkauth;
676
677     my $queue = $e->retrieve_vandelay_bib_queue($q_id) or return $e->event;
678     my $evt = check_queue_perms($e, 'bib', $queue);
679     return $evt if $evt;
680
681     my $query = {
682         select => {vii => ['id']},
683         from => {
684             vii => {
685                 vqbr => {
686                     join => {
687                         'vbq' => {
688                             field => 'id',
689                             fkey => 'queue',
690                             filter => {id => $q_id}
691                         }
692                     }
693                 }
694             }
695         },
696         order_by => {'vii' => ['record','id']},
697         limit => $limit,
698         offset => $offset
699     };
700     if($self->api_name =~ /export/) {
701         delete $query->{limit};
702         delete $query->{offset};
703     }
704
705     $query->{where} = {'+vii' => {import_error => {'!=' => undef}}}
706         if $$options{with_import_error};
707
708     my $items = $e->json_query($query);
709     my $item_list = $e->search_vandelay_import_item({id => [map { $_->{id} } @$items]});
710     if ($self->api_name =~ /export/) {
711         if ($self->api_name =~ /print/) {
712
713             return $U->fire_object_event(
714                 undef,
715                 'vandelay.import_items.print',
716                 $item_list,
717                 $e->requestor->ws_ou
718             );
719
720         } elsif ($self->api_name =~ /csv/) {
721
722             return $U->fire_object_event(
723                 undef,
724                 'vandelay.import_items.csv',
725                 $item_list,
726                 $e->requestor->ws_ou
727             );
728
729         } elsif ($self->api_name =~ /email/) {
730
731             $conn->respond_complete(1);
732
733             for my $item (@$item_list) {
734                 $U->create_events_for_hook(
735                     'vandelay.import_items.email',
736                     $item,
737                     $e->requestor->home_ou,
738                     undef,
739                     undef,
740                     1
741                 );
742             }
743
744         }
745     } else {
746         for my $item (@$item_list) {
747             $conn->respond($item);
748         }
749     }
750
751     return undef;
752 }
753
754 sub check_queue_perms {
755     my($e, $type, $queue) = @_;
756     if ($type eq 'bib') {
757         return $e->die_event unless
758             ($e->allowed('CREATE_BIB_IMPORT_QUEUE', undef, $queue) ||
759              $e->allowed('CREATE_BIB_IMPORT_QUEUE'));
760     } else {
761         return $e->die_event unless
762             ($e->allowed('CREATE_AUTHORITY_IMPORT_QUEUE', undef, $queue) ||
763              $e->allowed('CREATE_AUTHORITY_IMPORT_QUEUE'));
764     }
765
766     return undef;
767 }
768
769 __PACKAGE__->register_method(  
770     api_name    => "open-ils.vandelay.bib_record.list.import",
771     method      => 'import_record_list',
772     api_level   => 1,
773     argc        => 2,
774     stream      => 1,
775     record_type => 'bib'
776 );
777
778 __PACKAGE__->register_method(  
779     api_name    => "open-ils.vandelay.auth_record.list.import",
780     method      => 'import_record_list',
781     api_level   => 1,
782     argc        => 2,
783     stream      => 1,
784     record_type => 'auth'
785 );
786
787 sub import_record_list {
788     my($self, $conn, $auth, $rec_ids, $args) = @_;
789     my $e = new_editor(authtoken => $auth, xact => 1);
790     return $e->die_event unless $e->checkauth;
791     $args ||= {};
792     my $err = import_record_list_impl($self, $conn, $rec_ids, $e->requestor, $args);
793     try {$e->rollback} otherwise {}; 
794     return $err if $err;
795     return {complete => 1};
796 }
797
798
799 __PACKAGE__->register_method(  
800     api_name    => "open-ils.vandelay.bib_queue.import",
801     method      => 'import_queue',
802     api_level   => 1,
803     argc        => 2,
804     stream      => 1,
805     max_bundle_count => 1,
806     record_type => 'bib',
807     signature => {
808         desc => q/
809             Attempts to import all non-imported records for the selected queue.
810             Will also attempt import of all non-imported items.
811         /
812     }
813 );
814
815 __PACKAGE__->register_method(  
816     api_name    => "open-ils.vandelay.auth_queue.import",
817     method      => 'import_queue',
818     api_level   => 1,
819     argc        => 2,
820     stream      => 1,
821     max_bundle_count => 1,
822     record_type => 'auth'
823 );
824
825 sub import_queue {
826     my($self, $conn, $auth, $q_id, $options) = @_;
827     $conn->max_chunk_count($self->{max_bundle_count}) if (!$conn->can('max_bundle_count') && $self->{max_bundle_count});
828
829     my $e = new_editor(authtoken => $auth, xact => 1);
830     return $e->die_event unless $e->checkauth;
831     $options ||= {};
832     my $type = $self->{record_type};
833     my $class = ($type eq 'bib') ? 'vqbr' : 'vqar';
834
835     # First, collect the not-yet-imported records
836     my $query = {queue => $q_id, import_time => undef};
837     my $search = ($type eq 'bib') ? 
838         'search_vandelay_queued_bib_record' : 
839         'search_vandelay_queued_authority_record';
840     my $rec_ids = $e->$search($query, {idlist => 1});
841
842     # Now add any imported records that have un-imported items
843
844     if($type eq 'bib') {
845         my $item_recs = $e->json_query({
846             select => {vqbr => ['id']},
847             from => {vqbr => 'vii'},
848             where => {
849                 '+vqbr' => {
850                     queue => $q_id,
851                     import_time => {'!=' => undef}
852                 },
853                 '+vii' => {import_time => undef}
854             },
855             distinct => 1
856         });
857         push(@$rec_ids, map {$_->{id}} @$item_recs);
858     }
859
860     my $err = import_record_list_impl($self, $conn, $rec_ids, $e->requestor, $options);
861     try {$e->rollback} otherwise {}; # only using this to make the read authoritative -- don't die from it
862     return $err if $err;
863     return {complete => 1};
864 }
865
866 # returns a list of queued record IDs for a given queue that 
867 # have at least one entry in the match table
868 # XXX DEPRECATED?
869 sub queued_records_with_matches {
870     my($e, $type, $q_id, $limit, $offset, $filter) = @_;
871
872     my $match_class = 'vbm';
873     my $rec_class = 'vqbr';
874     if($type eq 'auth') {
875         $match_class = 'vam';
876          $rec_class = 'vqar';
877     }
878
879     $filter ||= {};
880     $filter->{queue} = $q_id;
881
882     my $query = {
883         distinct => 1, 
884         select => {$match_class => ['queued_record']}, 
885         from => {
886             $match_class => {
887                 $rec_class => {
888                     field => 'id',
889                     fkey => 'queued_record',
890                     filter => $filter,
891                 }
892             }
893         }
894     };        
895
896     if($limit or defined $offset) {
897         $limit ||= 20;
898         $offset ||= 0;
899         $query->{limit} = $limit;
900         $query->{offset} = $offset;
901     }
902
903     my $data = $e->json_query($query);
904     return [ map {$_->{queued_record}} @$data ];
905 }
906
907 # Returns (tracker, err_event)
908 # Runs within its own transaction.
909 sub create_session_tracker {
910     my ($user_id, $ws_id, $key, $name, $type, $queue_id, $action, $total_acts) = @_;
911     my $e = new_editor(xact => 1);
912
913     if ($key) {
914         # if other trackers exist for this key, adopt the name
915         my $existing = 
916             $e->search_vandelay_session_tracker({session_key => $key})->[0];
917         $name = $existing->name if $name;
918
919     } else {
920         # anonymous tracker
921         $key = md5_hex(time."$$".rand());
922     }
923
924     my $tracker = Fieldmapper::vandelay::session_tracker->new;
925     $tracker->session_key($key);
926     $tracker->name($name || $key);
927     $tracker->usr($user_id);
928     $tracker->workstation($ws_id);
929     $tracker->record_type($type);
930     $tracker->queue($queue_id);
931     $tracker->action_type($action);
932     $tracker->total_actions($total_acts) if $total_acts;
933
934     # caller responsible for rolling back transaction
935     return (undef, $e->die_event) unless
936         $e->create_vandelay_session_tracker($tracker);
937
938     # Re-fetch to ensure we have all defaults applied for future updates.
939     return (undef, $e->die_event) unless 
940         $tracker = $e->retrieve_vandelay_session_tracker($tracker->id);
941
942     $e->commit;
943
944     return ($tracker);
945 }
946
947 # Increment the actions_performed count.
948 # Must happen in its own transaction
949 # Returns undef on success, Event on error
950 sub increment_session_tracker {
951     my $tracker = shift;
952     my $amount = shift || 1;
953     my $e = new_editor(xact => 1);
954     $tracker->update_time('now');
955     $tracker->actions_performed($tracker->actions_performed + $amount);
956     return $e->die_event unless 
957         $e->update_vandelay_session_tracker($tracker);
958
959     $e->commit;
960     return undef;
961 }
962
963
964 # cache of import item org unit settings.  
965 # used in apply_import_item_defaults() below, 
966 # but reset on each call to import_record_list_impl()
967 my %item_defaults_cache;
968
969 sub import_record_list_impl {
970     my($self, $conn, $rec_ids, $requestor, $args) = @_;
971
972     my $overlay_map = $args->{overlay_map} || {};
973     my $type = $self->{record_type};
974     my %queues;
975     %item_defaults_cache = ();
976     my $exit_early = $args->{exit_early};
977
978     my $report_args = {
979         progress => 1,
980         step => 1,
981         conn => $conn,
982         total => scalar(@$rec_ids),
983         report_all => $$args{report_all},
984         exit_early => $exit_early
985     };
986
987     $conn->max_chunk_count(1) if (!$conn->can('max_bundle_size') && $conn->can('max_chunk_size') && $$args{report_all});
988
989     my $auto_overlay_exact = $$args{auto_overlay_exact};
990     my $auto_overlay_1match = $$args{auto_overlay_1match};
991     my $auto_overlay_best = $$args{auto_overlay_best_match};
992     my $match_quality_ratio = $$args{match_quality_ratio};
993     my $merge_profile = $$args{merge_profile};
994     my $ft_merge_profile = $$args{fall_through_merge_profile};
995     my $bib_source = $$args{bib_source};
996     my $import_no_match = $$args{import_no_match};
997     my $strip_grps = $$args{strip_field_groups}; # bib-only
998
999     my $overlay_func = 'vandelay.overlay_bib_record';
1000     my $auto_overlay_func = 'vandelay.auto_overlay_bib_record';
1001     my $auto_overlay_best_func = 'vandelay.auto_overlay_bib_record_with_best';
1002     my $retrieve_func = 'retrieve_vandelay_queued_bib_record';
1003     my $update_func = 'update_vandelay_queued_bib_record';
1004     my $search_func = 'search_vandelay_queued_bib_record';
1005     my $retrieve_queue_func = 'retrieve_vandelay_bib_queue';
1006     my $update_queue_func = 'update_vandelay_bib_queue';
1007     my $delete_queue_func = 'delete_vandelay_bib_queue';
1008     my $rec_class = 'vqbr';
1009
1010     my $editor = new_editor();
1011
1012     my %bib_sources;
1013     my $sources = $editor->search_config_bib_source({id => {'!=' => undef}});
1014     $bib_sources{$_->id} = $_->source for @$sources;
1015
1016     if($type eq 'auth') {
1017         $overlay_func =~ s/bib/authority/o; 
1018         $auto_overlay_func =~ s/bib/authority/o; 
1019         $auto_overlay_best_func =~ s/bib/authority/o;
1020         $retrieve_func =~ s/bib/authority/o;
1021         $retrieve_queue_func =~ s/bib/authority/o;
1022         $update_queue_func =~ s/bib/authority/o;
1023         $update_func =~ s/bib/authority/o;
1024         $search_func =~ s/bib/authority/o;
1025         $delete_queue_func =~ s/bib/authority/o;
1026         $rec_class = 'vqar';
1027     }
1028
1029
1030     my $tracker;
1031     my $tevt;
1032     my $new_rec_perm_cache;
1033     my @success_rec_ids;
1034     for my $rec_id (@$rec_ids) {
1035
1036         my $error = 0;
1037         my $overlay_target = $overlay_map->{$rec_id};
1038
1039         my $e = new_editor(xact => 1);
1040         $e->requestor($requestor);
1041
1042         $$report_args{e} = $e;
1043         $$report_args{evt} = undef;
1044         $$report_args{import_error} = undef;
1045         $$report_args{no_import} = 0;
1046
1047         my $rec = $e->$retrieve_func([
1048             $rec_id,
1049             {   flesh => 1,
1050                 flesh_fields => { $rec_class => ['matches']},
1051             }
1052         ]);
1053
1054         unless($rec) {
1055             $$report_args{evt} = $e->event;
1056             finish_rec_import_attempt($report_args);
1057             next;
1058         }
1059
1060         if (!$tracker) {
1061             # Create the import tracker using the queue of the first
1062             # retrieved record.  I'm fairly certain in practice all
1063             # lists of records for import come from a single queue.
1064             # We could get the queue from the previously created
1065             # 'enqueue' tracker, but this is a safetly valve to handle
1066             # imports where no enqueue tracker exists, e.g. records
1067             # enqueued pre-upgrade.
1068             ($tracker, $tevt) = create_session_tracker(
1069                 $e->requestor->id, $e->requestor->wsid, $args->{session_key}, 
1070                 undef, $type, $rec->queue, 'import', scalar(@$rec_ids));
1071
1072             if (!$tracker) {
1073                 $e->rollback;
1074                 return $tevt;
1075             }
1076
1077             $report_args->{tracker} = $tracker;
1078
1079             $conn->respond_complete($tracker) if $exit_early;
1080         }
1081
1082         if($rec->import_time) {
1083             # if the record is already imported, that means it may have 
1084             # un-imported copies.  Add to success list for later processing.
1085             push(@success_rec_ids, $rec_id);
1086             $e->rollback;
1087             next;
1088         }
1089
1090         $$report_args{rec} = $rec;
1091         $queues{$rec->queue} = 1;
1092
1093         my $record;
1094         my $imported = 0;
1095
1096         if ($type eq 'bib') {
1097             # strip configured / selected MARC tags from inbound records
1098
1099             my $marcdoc = XML::LibXML->new->parse_string($rec->marc);
1100             $rec->marc($U->strip_marc_fields($e, $marcdoc, $strip_grps));
1101         }
1102
1103         # Set the imported record's 905$u, so
1104         # editor/edit_date are set correctly.
1105         my $marcdoc = XML::LibXML->new->parse_string($rec->marc);
1106         $rec->marc($U->set_marc_905u($marcdoc, $requestor->usrname));
1107
1108         unless ($e->$update_func($rec)) {
1109             $$report_args{evt} = $e->die_event;
1110             finish_rec_import_attempt($report_args);
1111             next;
1112         }
1113
1114         if(defined $overlay_target) {
1115             # Caller chose an explicit overlay target
1116
1117             my $res = $e->json_query(
1118                 {
1119                     from => [
1120                         $overlay_func,
1121                         $rec_id,
1122                         $overlay_target, 
1123                         $merge_profile
1124                     ]
1125                 }
1126             );
1127
1128             if($res and ($res = $res->[0])) {
1129
1130                 if($res->{$overlay_func} eq 't') {
1131                     $logger->info("vl: $type direct overlay succeeded for queued rec ".
1132                         "$rec_id and overlay target $overlay_target");
1133                     $imported = 1;
1134                     $rec->imported_as($overlay_target);
1135                 }
1136
1137             } else {
1138                 $error = 1;
1139                 $logger->error("vl: Error attempting overlay with func=$overlay_func, profile=$merge_profile, record=$rec_id");
1140             }
1141
1142         } else {
1143
1144             if($auto_overlay_1match) { # overlay if there is exactly 1 match
1145
1146                 my %match_recs = map { $_->eg_record => 1 } @{$rec->matches};
1147
1148                 if( scalar(keys %match_recs) == 1) { # all matches point to the same record
1149
1150                     ($imported, $error, $rec) = try_auto_overlay(
1151                         $e, $type,
1152                         $report_args, 
1153                         $auto_overlay_best_func,
1154                         $retrieve_func,
1155                         $rec_class,
1156                         $rec_id, 
1157                         $match_quality_ratio, 
1158                         $merge_profile, 
1159                         $ft_merge_profile
1160                     );
1161                 }
1162             }
1163
1164             if(!$imported and !$error and $auto_overlay_exact and scalar(@{$rec->matches}) == 1 ) {
1165                 
1166                 # caller says to overlay if there is an /exact/ match
1167                 # $auto_overlay_func only proceeds and returns true on exact matches
1168
1169                 my $res = $e->json_query(
1170                     {
1171                         from => [
1172                             $auto_overlay_func,
1173                             $rec_id,
1174                             $merge_profile
1175                         ]
1176                     }
1177                 );
1178
1179                 if($res and ($res = $res->[0])) {
1180
1181                     if($res->{$auto_overlay_func} eq 't') {
1182                         $logger->info("vl: $type auto-overlay succeeded for queued rec $rec_id");
1183                         $imported = 1;
1184
1185                         # re-fetch the record to pick up the imported_as value from the DB
1186                         $$report_args{rec} = $rec = $e->$retrieve_func([
1187                             $rec_id, {flesh => 1, flesh_fields => {$rec_class => ['matches']}}]);
1188
1189                     } else {
1190                         $logger->info("vl: $type auto-overlay failed for queued rec $rec_id");
1191                     }
1192
1193                 } else {
1194                     $error = 1;
1195                     $logger->error("vl: Error attempting overlay with func=$auto_overlay_func, profile=$merge_profile, record=$rec_id");
1196                 }
1197             }
1198
1199             if(!$imported and !$error and $auto_overlay_best and scalar(@{$rec->matches}) > 0 ) {
1200                 # caller says to overlay the best match
1201
1202                 ($imported, $error, $rec) = try_auto_overlay(
1203                     $e, $type,
1204                     $report_args, 
1205                     $auto_overlay_best_func,
1206                     $retrieve_func,
1207                     $rec_class,
1208                     $rec_id, 
1209                     $match_quality_ratio, 
1210                     $merge_profile, 
1211                     $ft_merge_profile
1212                 );
1213             }
1214
1215             if(!$imported and !$error and $import_no_match and scalar(@{$rec->matches}) == 0) {
1216             
1217                 # No overlay / merge occurred.  Do a traditional record import by creating a new record
1218
1219                 if (!$new_rec_perm_cache) {
1220                     $new_rec_perm_cache = {};
1221
1222                     # all users creating new records are required to have the basic permission.
1223                     # if the client requests, we can enforce extra permissions for creating new records.
1224                     # for speed, check the permissions the first time then cache the result.
1225
1226                     my $perm = ($type eq 'bib') ? 'IMPORT_MARC' : 'IMPORT_AUTHORITY_MARC';
1227                     my $xperm = $$args{new_rec_perm};
1228                     my $rec_ou = $e->requestor->ws_ou;
1229
1230                     $new_rec_perm_cache->{evt} = $e->die_event
1231                         if !$e->allowed($perm, $rec_ou) || ($xperm and !$e->allowed($xperm, $rec_ou));
1232                 }
1233
1234                 if ($new_rec_perm_cache->{evt}) {
1235
1236                     # a cached event won't roll back the transaction (a la die_event), but
1237                     # the transaction will get rolled back in finish_rec_import_attempt() below
1238                     $$report_args{evt} = $new_rec_perm_cache->{evt};
1239                     $$report_args{import_error} = 'import.record.perm_failure';
1240
1241                 } else { # perm checks succeeded
1242
1243                     $logger->info("vl: creating new $type record for queued record $rec_id");
1244
1245                     if ($type eq 'bib') {
1246
1247                         $record = OpenILS::Application::Cat::BibCommon->biblio_record_xml_import(
1248                             $e, $rec->marc, $bib_sources{$rec->bib_source}, undef, 1);
1249
1250                     } else { # authority record
1251
1252                         $record = OpenILS::Application::Cat::AuthCommon->import_authority_record($e, $rec->marc); #$source);
1253                     }
1254
1255                     if($U->event_code($record)) {
1256                         $$report_args{import_error} = 'import.duplicate.tcn' 
1257                             if $record->{textcode} eq 'OPEN_TCN_NOT_FOUND';
1258                         $$report_args{evt} = $record;
1259
1260                     } else {
1261
1262                         $logger->info("vl: successfully imported new $type record");
1263                         $rec->imported_as($record->id);
1264                         $imported = 1;
1265                     }
1266                 }
1267             }
1268         }
1269
1270         if($imported) {
1271
1272             $rec->import_time('now');
1273             $rec->clear_import_error;
1274             $rec->clear_error_detail;
1275
1276             if($e->$update_func($rec)) {
1277
1278                 if($type eq 'bib') {
1279
1280                     # see if this record is linked from an acq record.
1281                     my $li = $e->search_acq_lineitem(
1282                         {queued_record => $rec->id, state => {'!=' => 'canceled'}})->[0];
1283
1284                     if ($li) { 
1285                         # if so, update the acq lineitem to point to the imported record
1286                         $li->eg_bib_id($rec->imported_as);
1287                         $$report_args{evt} = $e->die_event unless $e->update_acq_lineitem($li);
1288                     }
1289                 }
1290
1291                 push @success_rec_ids, $rec_id;
1292                 finish_rec_import_attempt($report_args);
1293
1294             } else {
1295                 $imported = 0;
1296             }
1297         }
1298
1299         if(!$imported) {
1300             $logger->info("vl: record $rec_id was not imported");
1301             $$report_args{evt} = $e->event unless $$report_args{evt};
1302             $$report_args{no_import} = 1;
1303             finish_rec_import_attempt($report_args);
1304         }
1305     }
1306
1307     # see if we need to mark any queues as complete
1308     for my $q_id (keys %queues) {
1309
1310         my $e = new_editor(xact => 1);
1311         my $remaining = $e->$search_func(
1312             [{queue => $q_id, import_time => undef}, {limit =>1}], {idlist => 1});
1313
1314         unless(@$remaining) {
1315             my $queue = $e->$retrieve_queue_func($q_id);
1316             unless($U->is_true($queue->complete)) {
1317                 $queue->complete('t');
1318                 $e->$update_queue_func($queue) or return $e->die_event;
1319                 $e->commit;
1320                 next;
1321             }
1322         } 
1323         $e->rollback;
1324     }
1325
1326     import_record_asset_list_impl($conn, 
1327         \@success_rec_ids, $requestor, $args, $tracker) if @success_rec_ids;
1328
1329     if ($tracker) { # there are edge cases where it may not exist
1330         my $e = new_editor(xact => 1);
1331         $e->requestor($requestor);
1332         $tracker->update_time('now');
1333         $tracker->state('complete');
1334         $e->update_vandelay_session_tracker($tracker) or return $e->die_event;
1335         $e->commit;
1336     }
1337
1338     $conn->respond({total => $$report_args{total}, progress => $$report_args{progress}});
1339     return undef;
1340 }
1341
1342
1343 sub try_auto_overlay {
1344     my $e = shift;
1345     my $type = shift;
1346     my $report_args = shift;
1347     my $overlay_func  = shift;
1348     my $retrieve_func = shift; 
1349     my $rec_class = shift;
1350     my $rec_id  = shift;
1351     my $match_quality_ratio = shift;
1352     my $merge_profile  = shift;
1353     my $ft_merge_profile = shift;
1354
1355     my $imported = 0;
1356     my $error = 0;
1357
1358     # Find the best match and overlay if the quality ratio allows it.
1359     my $res = $e->json_query(
1360         {
1361             from => [
1362                 $overlay_func,
1363                 $rec_id, 
1364                 $merge_profile,
1365                 $match_quality_ratio
1366             ]
1367         }
1368     );
1369
1370     if($res and ($res = $res->[0])) {
1371
1372         if($res->{$overlay_func} eq 't') {
1373
1374             # first attempt succeeded
1375             $imported = 1;
1376
1377         } else {
1378
1379             # quality-limited merge failed with insufficient quality.  If there is a 
1380             # fall-through merge profile, re-do the merge with the alternate profile
1381             # and no quality restriction.
1382
1383             if($ft_merge_profile and $match_quality_ratio > 0) {
1384
1385                 $logger->info("vl: $type auto-merge failed with profile $merge_profile; ".
1386                     "re-merging with fall-through profile $ft_merge_profile");
1387
1388                 my $res = $e->json_query(
1389                     {
1390                         from => [
1391                             $overlay_func,
1392                             $rec_id, 
1393                             $ft_merge_profile,
1394                             0 # minimum quality not required
1395                         ]
1396                     }
1397                 );
1398
1399                 if($res and ($res = $res->[0])) {
1400
1401                     if($res->{$overlay_func} eq 't') {
1402
1403                         # second attempt succeeded
1404                         $imported = 1;
1405
1406                     } else {
1407
1408                         # failed to merge on second attempt
1409                         $logger->info("vl: $type auto-merge with fall-through failed for queued rec $rec_id");
1410                     }
1411                 } else {
1412                     
1413                     # second attempt died 
1414                     $error = 1;
1415                     $logger->error("vl: Error attempting overlay with func=$overlay_func, profile=$merge_profile, record=$rec_id");
1416                 }
1417
1418             } else { 
1419
1420                 # failed to merge on first attempt, no fall-through was provided
1421                 $$report_args{import_error} = 'overlay.record.quality' if $match_quality_ratio > 0;
1422                 $logger->info("vl: $type auto-merge failed for queued rec $rec_id");
1423             }
1424         }
1425
1426     } else {
1427
1428         # first attempt died 
1429         $error = 1;
1430         $logger->error("vl: Error attempting overlay with func=$overlay_func, profile=$merge_profile, record=$rec_id");
1431     }
1432
1433     if($imported) {
1434
1435         # at least 1 of the attempts succeeded
1436         $logger->info("vl: $type auto-merge succeeded for queued rec $rec_id");
1437
1438         # re-fetch the record to pick up the imported_as value from the DB
1439         $$report_args{rec} = $e->$retrieve_func([
1440             $rec_id, {flesh => 1, flesh_fields => {$rec_class => ['matches']}}]);
1441     }
1442
1443     return ($imported, $error, $$report_args{rec});
1444 }
1445
1446
1447 # tracks any import errors, commits the current xact, responds to the client
1448 sub finish_rec_import_attempt {
1449     my $args = shift;
1450     my $evt = $$args{evt};
1451     my $rec = $$args{rec};
1452     my $e = $$args{e};
1453     my $tracker = $$args{tracker};
1454     my $exit_early = $$args{exit_early};
1455
1456     my $error = $$args{import_error};
1457     $error = 'general.unknown' if $evt and not $error;
1458
1459     # Note the tracker is updated regardless of whether the individual
1460     # record import succeeded.  It's only a failed tracker if the
1461     # entire process fails.
1462     if ($tracker) { # tracker may be undef in rec-not-found situations
1463         my $tevt = increment_session_tracker($tracker);
1464         return $tevt if $tevt;
1465     }
1466
1467     # error tracking
1468     if($rec) {
1469
1470         if($error or $evt) {
1471             # failed import
1472             # since an error occurred, there's no guarantee the transaction wasn't 
1473             # rolled back.  force a rollback and create a new editor.
1474             $e->rollback;
1475             $e = new_editor(xact => 1);
1476             $rec->import_error($error);
1477
1478             if($evt) {
1479                 my $detail = sprintf("%s : %s", $evt->{textcode}, substr($evt->{desc}, 0, 140));
1480                 $rec->error_detail($detail);
1481             }
1482
1483             my $method = 'update_vandelay_queued_bib_record';
1484             $method =~ s/bib/authority/ if $$args{type} eq 'auth';
1485             $e->$method($rec) and $e->commit or $e->rollback;
1486
1487         } else {
1488
1489             # commit the successful import
1490             $e->commit;
1491         }
1492
1493     } else {
1494         # requested queued record was not found
1495         $e->rollback;
1496     }
1497         
1498     # respond to client unless we've already responded-complete
1499     if (!$exit_early) {
1500         if($$args{report_all} or ($$args{progress} % $$args{step}) == 0) {
1501             $$args{conn}->respond({
1502                 total => $$args{total}, 
1503                 progress => $$args{progress}, 
1504                 imported => ($rec) ? $rec->id : undef,
1505                 import_error => $error,
1506                 no_import => $$args{no_import},
1507                 err_event => $evt
1508             });
1509             $$args{step} *= 2 unless $$args{step} == 256;
1510         }
1511     }
1512
1513     $$args{progress}++;
1514 }
1515
1516
1517
1518
1519
1520 __PACKAGE__->register_method(  
1521     api_name    => "open-ils.vandelay.bib_queue.owner.retrieve",
1522     method      => 'owner_queue_retrieve',
1523     api_level   => 1,
1524     argc        => 2,
1525     stream      => 1,
1526     record_type => 'bib'
1527 );
1528 __PACKAGE__->register_method(  
1529     api_name    => "open-ils.vandelay.authority_queue.owner.retrieve",
1530     method      => 'owner_queue_retrieve',
1531     api_level   => 1,
1532     argc        => 2,
1533     stream      => 1,
1534     record_type => 'auth'
1535 );
1536
1537 sub owner_queue_retrieve {
1538     my($self, $conn, $auth, $owner_id, $filters) = @_;
1539     my $e = new_editor(authtoken => $auth, xact => 1);
1540     return $e->die_event unless $e->checkauth;
1541     $owner_id = $e->requestor->id; # XXX add support for viewing other's queues?
1542     my $queues;
1543     $filters ||= {};
1544     my $search = {owner => $owner_id};
1545     $search->{$_} = $filters->{$_} for keys %$filters;
1546
1547     if($self->{record_type} eq 'bib') {
1548         $queues = $e->search_vandelay_bib_queue(
1549             [$search, {order_by => {vbq => 'evergreen.lowercase(name)'}}]);
1550     } else {
1551         $queues = $e->search_vandelay_authority_queue(
1552             [$search, {order_by => {vaq => 'evergreen.lowercase(name)'}}]);
1553     }
1554     $conn->respond($_) for @$queues;
1555     $e->rollback;
1556     return undef;
1557 }
1558
1559 __PACKAGE__->register_method(  
1560     api_name    => "open-ils.vandelay.bib_queue.delete",
1561     method      => "delete_queue",
1562     api_level   => 1,
1563     argc        => 2,
1564     record_type => 'bib'
1565 );            
1566 __PACKAGE__->register_method(  
1567     api_name    => "open-ils.vandelay.auth_queue.delete",
1568     method      => "delete_queue",
1569     api_level   => 1,
1570     argc        => 2,
1571     record_type => 'auth'
1572 );  
1573
1574 sub delete_queue {
1575     my($self, $conn, $auth, $q_id) = @_;
1576     my $e = new_editor(xact => 1, authtoken => $auth);
1577     return $e->die_event unless $e->checkauth;
1578     if($self->{record_type} eq 'bib') {
1579         return $e->die_event unless $e->allowed('CREATE_BIB_IMPORT_QUEUE');
1580         my $queue = $e->retrieve_vandelay_bib_queue($q_id)
1581             or return $e->die_event;
1582         $e->delete_vandelay_bib_queue($queue)
1583             or return $e->die_event;
1584     } else {
1585            return $e->die_event unless $e->allowed('CREATE_AUTHORITY_IMPORT_QUEUE');
1586         my $queue = $e->retrieve_vandelay_authority_queue($q_id)
1587             or return $e->die_event;
1588         $e->delete_vandelay_authority_queue($queue)
1589             or return $e->die_event;
1590     }
1591     $e->commit;
1592     return 1;
1593 }
1594
1595
1596 __PACKAGE__->register_method(  
1597     api_name    => "open-ils.vandelay.queued_bib_record.html",
1598     method      => 'queued_record_html',
1599     api_level   => 1,
1600     argc        => 2,
1601     stream      => 1,
1602     record_type => 'bib'
1603 );
1604 __PACKAGE__->register_method(  
1605     api_name    => "open-ils.vandelay.queued_authority_record.html",
1606     method      => 'queued_record_html',
1607     api_level   => 1,
1608     argc        => 2,
1609     stream      => 1,
1610     record_type => 'auth'
1611 );
1612
1613 sub queued_record_html {
1614     my($self, $conn, $auth, $rec_id) = @_;
1615     my $e = new_editor(xact=>1,authtoken => $auth);
1616     return $e->die_event unless $e->checkauth;
1617     my $rec;
1618     if($self->{record_type} eq 'bib') {
1619         $rec = $e->retrieve_vandelay_queued_bib_record($rec_id)
1620             or return $e->die_event;
1621     } else {
1622         $rec = $e->retrieve_vandelay_queued_authority_record($rec_id)
1623             or return $e->die_event;
1624     }
1625
1626     $e->rollback;
1627     return $U->simplereq(
1628         'open-ils.search',
1629         'open-ils.search.biblio.record.html', undef, 1, $rec->marc);
1630 }
1631
1632
1633 __PACKAGE__->register_method(  
1634     api_name    => "open-ils.vandelay.bib_queue.summary.retrieve", 
1635     method      => 'retrieve_queue_summary',
1636     api_level   => 1,
1637     argc        => 2,
1638     stream      => 1,
1639     record_type => 'bib'
1640 );
1641 __PACKAGE__->register_method(  
1642     api_name    => "open-ils.vandelay.auth_queue.summary.retrieve",
1643     method      => 'retrieve_queue_summary',
1644     api_level   => 1,
1645     argc        => 2,
1646     stream      => 1,
1647     record_type => 'auth'
1648 );
1649
1650 sub retrieve_queue_summary {
1651     my($self, $conn, $auth, $queue_id) = @_;
1652     my $e = new_editor(xact=>1, authtoken => $auth);
1653     return $e->die_event unless $e->checkauth;
1654
1655     my $queue;
1656     my $type = $self->{record_type};
1657     if($type eq 'bib') {
1658         $queue = $e->retrieve_vandelay_bib_queue($queue_id)
1659             or return $e->die_event;
1660     } else {
1661         $queue = $e->retrieve_vandelay_authority_queue($queue_id)
1662             or return $e->die_event;
1663     }
1664
1665     my $evt = check_queue_perms($e, $type, $queue);
1666     return $evt if $evt;
1667
1668     my $search = 'search_vandelay_queued_bib_record';
1669     $search =~ s/bib/authority/ if $type ne 'bib';
1670
1671     my $summary = {
1672         queue => $queue,
1673         total => scalar(@{$e->$search({queue => $queue_id}, {idlist=>1})}),
1674         imported => scalar(@{$e->$search({queue => $queue_id, import_time => {'!=' => undef}}, {idlist=>1})}),
1675     };
1676
1677     my $class = ($type eq 'bib') ? 'vqbr' : 'vqar';
1678     $summary->{rec_import_errors} = $e->json_query({
1679         select => {$class => [{alias => 'count', column => 'id', transform => 'count', aggregate => 1}]},
1680         from => $class,
1681         where => {queue => $queue_id, import_error => {'!=' => undef}}
1682     })->[0]->{count};
1683
1684     if($type eq 'bib') {
1685         
1686         # count of all items attached to records in the queue in question
1687         my $query = {
1688             select => {vii => [{alias => 'count', column => 'id', transform => 'count', aggregate => 1}]},
1689             from => 'vii',
1690             where => {
1691                 record => {
1692                     in => {
1693                         select => {vqbr => ['id']},
1694                         from => 'vqbr',
1695                         where => {queue => $queue_id}
1696                     }
1697                 }
1698             }
1699         };
1700         $summary->{total_items} = $e->json_query($query)->[0]->{count};
1701
1702         # count of items we attempted to import, but errored, attached to records in the queue in question
1703         $query->{where}->{import_error} = {'!=' => undef};
1704         $summary->{item_import_errors} = $e->json_query($query)->[0]->{count};
1705
1706         # count of items we successfully imported attached to records in the queue in question
1707         delete $query->{where}->{import_error};
1708         $query->{where}->{import_time} = {'!=' => undef};
1709         $summary->{total_items_imported} = $e->json_query($query)->[0]->{count};
1710     }
1711
1712     return $summary;
1713 }
1714
1715 # --------------------------------------------------------------------------------
1716 # Given a list of queued record IDs, imports all items attached to those records
1717 # --------------------------------------------------------------------------------
1718 sub import_record_asset_list_impl {
1719     my($conn, $rec_ids, $requestor, $args, $tracker) = @_;
1720
1721     my $roe = new_editor(xact=> 1, requestor => $requestor);
1722
1723     # for speed, filter out any records have not been 
1724     # imported or have no import items to load
1725     $rec_ids = $roe->json_query({
1726         select => {vqbr => ['id']},
1727         from => {vqbr => 'vii'},
1728         where => {'+vqbr' => {
1729             id => $rec_ids,
1730             import_time => {'!=' => undef}
1731         }},
1732         distinct => 1
1733     });
1734     $rec_ids = [map {$_->{id}} @$rec_ids];
1735
1736     my $report_args = {
1737         conn => $conn,
1738         total => scalar(@$rec_ids),
1739         step => 1, # how often to respond
1740         progress => 1,
1741         in_count => 0,
1742     };
1743
1744     if ($tracker && @$rec_ids) {
1745         if (my $tevt = # assignment
1746             increment_session_tracker($tracker, scalar(@$rec_ids))) {
1747             $roe->rollback;
1748             return $tevt;
1749         }
1750     }
1751
1752     for my $rec_id (@$rec_ids) {
1753         my $rec = $roe->retrieve_vandelay_queued_bib_record($rec_id);
1754         my $item_ids = $roe->search_vandelay_import_item(
1755             {record => $rec->id, import_error => undef}, 
1756             {idlist=>1}
1757         );
1758
1759         if ($tracker) { # increment per record
1760             if (my $tevt = increment_session_tracker($tracker)) {
1761                 $roe->rollback;
1762                 return $tevt;
1763             }
1764         }
1765
1766         # if any items have no call_number label and a value should be
1767         # applied automatically (via org settings), we want to use the same
1768         # call number label for every copy per org per record.
1769         my $auto_callnumber = {};
1770
1771         my $opp_acq_copy_overlay = $args->{opp_acq_copy_overlay};
1772         my @overlaid_copy_ids;
1773         for my $item_id (@$item_ids) {
1774             my $e = new_editor(requestor => $requestor, xact => 1);
1775             my $item = $e->retrieve_vandelay_import_item($item_id);
1776             my ($copy, $vol, $evt);
1777
1778             $$report_args{e} = $e;
1779             $$report_args{evt} = undef;
1780             $$report_args{import_item} = $item;
1781             $$report_args{import_error} = undef;
1782
1783             if (my $copy_id = $item->internal_id) { # assignment
1784                 # copy matches an existing copy.  Overlay instead of create.
1785
1786                 $logger->info("vl: performing copy overlay for internal_id=$copy_id");
1787
1788                 my $qt = $e->json_query({
1789                     select => {vbq => ['queue_type']},
1790                     from => {vqbr => 'vbq'},
1791                     where => {'+vqbr' => {id => $rec_id}}
1792                 })->[0]->{queue_type};
1793
1794                 if ($qt eq 'acq') {
1795                     # internal_id for ACQ queues refers to acq.lineitem_detail.id
1796                     # pull the real copy id from the acq LID
1797
1798                     my $lid = $e->retrieve_acq_lineitem_detail($copy_id);
1799                     if (!$lid) {
1800                         $$report_args{evt} = $e->die_event;
1801                         respond_with_status($report_args);
1802                         next;
1803                     }
1804                     $copy_id = $lid->eg_copy_id;
1805                     $logger->info("vl: performing ACQ copy overlay for copy $copy_id");
1806                 }
1807
1808                 $copy = $e->search_asset_copy([
1809                     {id => $copy_id, deleted => 'f'},
1810                     {flesh => 1, flesh_fields => {acp => ['call_number']}}
1811                 ])->[0];
1812
1813                 if (!$copy) {
1814                     $$report_args{evt} = $e->die_event;
1815                     respond_with_status($report_args);
1816                     next;
1817                 }
1818
1819                 # prevent update of unrelated copies
1820                 if ($copy->call_number->record != $rec->imported_as) {
1821                     $logger->info("vl: attempt to overlay unrelated copy=$copy_id; rec=".$rec->imported_as);
1822
1823                     $evt = OpenILS::Event->new('INVALID_IMPORT_COPY_ID', 
1824                         note => 'Cannot overlay copies for unlinked bib',
1825                         bre => $rec->imported_as, 
1826                         copy_id => $copy_id
1827                     );
1828                     $$report_args{evt} = $evt;
1829                     respond_with_status($report_args);
1830                     next;
1831                 }
1832             } elsif ($opp_acq_copy_overlay) { # we are going to "opportunistically" overlay received, in-process acq copies
1833                 # recv_time should never be null if the copy status is
1834                 # "In Process", so that is just a double-check
1835                 my $query = [
1836                     {
1837                         "recv_time" => {"!=" => undef},
1838                         "owning_lib" => $item->owning_lib,
1839                         "+acn" => {"record" => $rec->imported_as},
1840                         "+acp" => {"status" => OILS_COPY_STATUS_IN_PROCESS}
1841                     },
1842                     {
1843                         "join" => {
1844                             "acp" => {
1845                                 "join" => "acn"
1846                             }
1847                         },
1848                         "flesh" => 2,
1849                         "flesh_fields" => {
1850                             "acqlid" => ["eg_copy_id"],
1851                             "acp" => ["call_number"]
1852                         }
1853                     }
1854                 ];
1855                 # don't overlay the same copy twice
1856                 $query->[0]{"+acp"}{"id"} = {"not in" => \@overlaid_copy_ids} if @overlaid_copy_ids;
1857                 if (my $acqlid = $e->search_acq_lineitem_detail($query)->[0]) {
1858                     $copy = $acqlid->eg_copy_id;
1859                     push(@overlaid_copy_ids, $copy->id);
1860                 }
1861             }
1862
1863             if ($copy) { # we found a copy to overlay
1864
1865                 # overlaying copies requires an extra permission
1866                 if (!$e->allowed("IMPORT_OVERLAY_COPY", $copy->call_number->owning_lib)) {
1867                     $$report_args{evt} = $e->die_event;
1868                     respond_with_status($report_args);
1869                     next;
1870                 }
1871
1872                 # are we updating the call-number?
1873                 if ($item->call_number and $item->call_number ne $copy->call_number->label) {
1874
1875                     my $count = $e->json_query({
1876                         select => {acp => [{
1877                             alias => 'count', 
1878                             column => 'id', 
1879                             transform => 'count', 
1880                             aggregate => 1
1881                         }]},
1882                         from => 'acp',
1883                         where => {
1884                             deleted => 'f',
1885                             call_number => $copy->call_number->id
1886                         }
1887                     })->[0]->{count};
1888
1889                     if ($count == 1) {
1890
1891                         my $evol = $e->search_asset_call_number({
1892                             id => {'<>' => $copy->call_number->id},
1893                             label => $item->call_number,
1894                             owning_lib => $copy->call_number->owning_lib,
1895                             record => $copy->call_number->record,
1896                             prefix => $copy->call_number->prefix,
1897                             suffix => $copy->call_number->suffix,
1898                             deleted => 'f'
1899                         })->[0];
1900
1901                         if ($evol) {
1902                             # call number for overlayed copy changed to a
1903                             # label already in use by another call number.
1904                             # merge the old CN into the new CN
1905                             
1906                             $logger->info(
1907                                 "vl: moving copy to new call number ".
1908                                 $item->call_number);
1909
1910                             my ($mvol, $err) = 
1911                                 OpenILS::Application::Cat::Merge::merge_volumes(
1912                                     $e, [$copy->call_number], $evol);
1913
1914                             if (!$mvol) {
1915                                 $$report_args{evt} = $err;
1916                                 respond_with_status($report_args);
1917                                 next;
1918                             }
1919
1920                             # update our copy *cough* of the copy to pick up
1921                             # any changes made my merge_volumes
1922                             $copy = $e->retrieve_asset_copy([
1923                                 $copy->id,
1924                                 {flesh => 1, flesh_fields => {acp => ['call_number']}}
1925                             ]);
1926
1927                         } else {
1928                             $logger->info(
1929                                 "vl: updating copy call number label".
1930                                 $item->call_number);
1931
1932                             $copy->call_number->label($item->call_number);
1933                             if (!$e->update_asset_call_number($copy->call_number)) {
1934                                 $$report_args{evt} = $e->die_event;
1935                                 respond_with_status($report_args);
1936                                 next;
1937                             }
1938                         }
1939
1940                     } else {
1941
1942                         # otherwise, move the copy to a new/existing 
1943                         # call-number with the given label/owner
1944                         # note that overlay does not allow the owning_lib 
1945                         # to be changed.  Should it?
1946
1947                         $logger->info("vl: moving copy to new callnumber in copy overlay");
1948
1949                         ($vol, $evt) =
1950                             OpenILS::Application::Cat::AssetCommon->find_or_create_volume(
1951                                 $e, $item->call_number, 
1952                                 $copy->call_number->record, 
1953                                 $copy->call_number->owning_lib
1954                             );
1955
1956                         if($evt) {
1957                             $$report_args{evt} = $evt;
1958                             respond_with_status($report_args);
1959                             next;
1960                         }
1961
1962                         $copy->call_number($vol);
1963                     }
1964                 } # cn-update
1965
1966                 # for every field that has a non-'' value, overlay the copy value
1967                 foreach (qw/ barcode location circ_lib status 
1968                     circulate deposit deposit_amount ref holdable 
1969                     price circ_as_type alert_message opac_visible circ_modifier/) {
1970
1971                     my $val = $item->$_();
1972                     $copy->$_($val) if defined $val and $val ne '';
1973                 }
1974
1975                 # de-flesh for update
1976                 $copy->call_number($copy->call_number->id);
1977                 $copy->ischanged(1);
1978
1979                 $evt = OpenILS::Application::Cat::AssetCommon->
1980                     update_fleshed_copies($e, {all => 1}, undef, [$copy]);
1981
1982                 if($evt) {
1983                     $$report_args{evt} = $evt;
1984                     respond_with_status($report_args);
1985                     next;
1986                 }
1987
1988             } else { 
1989
1990                 # Creating a new copy
1991                 $logger->info("vl: creating new copy in import");
1992
1993                 # appply defaults values from org settings as needed
1994                 # if $auto_callnumber is unset, it will be set within
1995                 apply_import_item_defaults($e, $item, $auto_callnumber);
1996
1997                 # --------------------------------------------------------------------------------
1998                 # Find or create the volume
1999                 # --------------------------------------------------------------------------------
2000                 my ($vol, $evt) =
2001                     OpenILS::Application::Cat::AssetCommon->find_or_create_volume(
2002                         $e, $item->call_number, $rec->imported_as, $item->owning_lib);
2003
2004                 if($evt) {
2005                     $$report_args{evt} = $evt;
2006                     respond_with_status($report_args);
2007                     next;
2008                 }
2009
2010                 # --------------------------------------------------------------------------------
2011                 # Create the new copy
2012                 # --------------------------------------------------------------------------------
2013                 $copy = Fieldmapper::asset::copy->new;
2014                 $copy->loan_duration(2);
2015                 $copy->fine_level(2);
2016                 $copy->barcode($item->barcode);
2017                 $copy->location($item->location);
2018                 $copy->circ_lib($item->circ_lib || $item->owning_lib);
2019                 $copy->status( defined($item->status) ? $item->status : OILS_COPY_STATUS_IN_PROCESS );
2020                 $copy->circulate($item->circulate);
2021                 $copy->deposit($item->deposit);
2022                 $copy->deposit_amount($item->deposit_amount);
2023                 $copy->ref($item->ref);
2024                 $copy->holdable($item->holdable);
2025                 $copy->price($item->price);
2026                 $copy->circ_as_type($item->circ_as_type);
2027                 $copy->alert_message($item->alert_message);
2028                 $copy->opac_visible($item->opac_visible);
2029                 $copy->circ_modifier($item->circ_modifier);
2030
2031                 # --------------------------------------------------------------------------------
2032                 # Check for dupe barcode
2033                 # --------------------------------------------------------------------------------
2034                 if($evt = OpenILS::Application::Cat::AssetCommon->create_copy($e, $vol, $copy)) {
2035                     $$report_args{evt} = $evt;
2036                     $$report_args{import_error} = 'import.item.duplicate.barcode'
2037                         if $evt->{textcode} eq 'ITEM_BARCODE_EXISTS';
2038                     respond_with_status($report_args);
2039                     next;
2040                 }
2041
2042                 # --------------------------------------------------------------------------------
2043                 # create copy notes
2044                 # --------------------------------------------------------------------------------
2045                 $evt = OpenILS::Application::Cat::AssetCommon->create_copy_note(
2046                     $e, $copy, '', $item->pub_note, 1) if $item->pub_note;
2047
2048                 if($evt) {
2049                     $$report_args{evt} = $evt;
2050                     respond_with_status($report_args);
2051                     next;
2052                 }
2053
2054                 $evt = OpenILS::Application::Cat::AssetCommon->create_copy_note(
2055                     $e, $copy, '', $item->priv_note) if $item->priv_note;
2056
2057                 if($evt) {
2058                     $$report_args{evt} = $evt;
2059                     respond_with_status($report_args);
2060                     next;
2061                 }
2062             }
2063
2064             if ($item->stat_cat_data) {
2065                 $logger->info("vl: parsing stat cat data: " . $item->stat_cat_data);
2066                 my @stat_cat_pairs = split('\|\|', $item->stat_cat_data);
2067                 my $stat_cat_entries = [];
2068                 # lookup stat cats
2069                 foreach my $stat_cat_pair (@stat_cat_pairs) {
2070                     my ($stat_cat, $stat_cat_entry);
2071                     my @pair_pieces = split('\|', $stat_cat_pair);
2072                     if (@pair_pieces == 2) {
2073                         $stat_cat = $e->search_asset_stat_cat({name=>$pair_pieces[0]})->[0];
2074                         if ($stat_cat) {
2075                             $stat_cat_entry = $e->search_asset_stat_cat_entry({'value' => $pair_pieces[1], 'stat_cat' => $stat_cat->id})->[0];
2076                             push (@$stat_cat_entries, $stat_cat_entry) if $stat_cat_entry;
2077                         }
2078                     } else {
2079                         $$report_args{import_error} = "import.item.invalid.stat_cat_format";
2080                         last;
2081                     }
2082
2083                     if (!$stat_cat or !$stat_cat_entry) {
2084                         $$report_args{import_error} = "import.item.invalid.stat_cat_data";
2085                         last;
2086                     }
2087                 }
2088                 if ($$report_args{import_error}) {
2089                     $logger->error("vl: invalid stat cat data: " . $item->stat_cat_data);
2090                     respond_with_status($report_args);
2091                     next;
2092                 }
2093                 $copy->stat_cat_entries( $stat_cat_entries );
2094                 $copy->ischanged(1);
2095                 $evt = OpenILS::Application::Cat::AssetCommon->update_copy_stat_entries($e, $copy, 0, 1); #delete_stats=0, add_or_update_only=1
2096                 if($evt) {
2097                     $$report_args{evt} = $evt;
2098                     respond_with_status($report_args);
2099                     next;
2100                 }
2101             }
2102
2103             if ($item->parts_data) {
2104                 $logger->info("vl: parsing parts data: " . $item->parts_data);
2105                 my @parts = split('\|', $item->parts_data);
2106                 my $part_objs = [];
2107                 foreach my $part_label (@parts) {
2108                     my $part_obj = $e->search_biblio_monograph_part(
2109                         {
2110                             label=>$part_label,
2111                             record=>$rec->imported_as
2112                         }
2113                     )->[0];
2114
2115                     if (!$part_obj) {
2116                         $part_obj = Fieldmapper::biblio::monograph_part->new();
2117                         $part_obj->label( $part_label );
2118                         $part_obj->record( $rec->imported_as );
2119                         unless($e->create_biblio_monograph_part($part_obj)) {
2120                             $$report_args{evt} = $e->die_event;
2121                             last;
2122                         }
2123                     }
2124                     push @$part_objs, $part_obj;
2125                 }
2126
2127                 if ($$report_args{evt}) {
2128                     respond_with_status($report_args);
2129                     next;
2130                 } else {
2131                     $copy->parts( $part_objs );
2132                     $copy->ischanged(1);
2133                     $evt = OpenILS::Application::Cat::AssetCommon->update_copy_parts($e, $copy, 0); #delete_parts=0
2134                     if($evt) {
2135                         $$report_args{evt} = $evt;
2136                         respond_with_status($report_args);
2137                         next;
2138                     }
2139                 }
2140             }
2141
2142             # set the import data on the import item
2143             $item->imported_as($copy->id); # $copy->id is set by create_copy() ^--
2144             $item->import_time('now');
2145
2146             unless($e->update_vandelay_import_item($item)) {
2147                 $$report_args{evt} = $e->die_event;
2148                 respond_with_status($report_args);
2149                 next;
2150             }
2151
2152             # --------------------------------------------------------------------------------
2153             # Item import succeeded
2154             # --------------------------------------------------------------------------------
2155             $e->commit;
2156             $$report_args{in_count}++;
2157             respond_with_status($report_args);
2158             $logger->info("vl: successfully imported item " . $item->barcode);
2159         }
2160     }
2161
2162     $roe->rollback;
2163     return undef;
2164 }
2165
2166 sub apply_import_item_defaults {
2167     my ($e, $item, $auto_cn) = @_;
2168     my $org = $item->owning_lib || $item->circ_lib;
2169     my %c = %item_defaults_cache;  
2170
2171     # fetch and cache the org unit setting value (unless 
2172     # it's already cached) and return the value to the caller
2173     my $set = sub {
2174         my $name = shift;
2175         return $c{$org}{$name} if defined $c{$org}{$name};
2176         my $sname = "vandelay.item.$name";
2177         $c{$org}{$name} = $U->ou_ancestor_setting_value($org, $sname, $e);
2178         $c{$org}{$name} = '' unless defined $c{$org}{$name};
2179         return $c{$org}{$name};
2180     };
2181
2182     if (!$item->barcode) {
2183
2184         if ($set->('barcode.auto')) {
2185
2186             my $pfx = $set->('barcode.prefix') || 'VAN';
2187             my $barcode = $pfx . $item->record . $item->id;
2188
2189             $logger->info("vl: using auto barcode $barcode for ".$item->id);
2190             $item->barcode($barcode);
2191
2192         } else {
2193             $logger->error("vl: no barcode (or defualt) for item ".$item->id);
2194         }
2195     }
2196
2197     if (!$item->call_number) {
2198
2199         if ($set->('call_number.auto')) {
2200
2201             if (!$auto_cn->{$org}) {
2202                 my $pfx = $set->('call_number.prefix') || 'VAN';
2203
2204                 # use the ID of the first item to differentiate this 
2205                 # call number from others linked to the same record
2206                 $auto_cn->{$org} = $pfx . $item->record . $item->id;
2207             }
2208
2209             $logger->info("vl: using auto call number ".$auto_cn->{$org});
2210             $item->call_number($auto_cn->{$org});
2211
2212         } else {
2213             $logger->error("vl: no call number or default for item ".$item->id);
2214         }
2215     }
2216 }
2217
2218
2219 sub respond_with_status {
2220     my $args = shift;
2221     my $e = $$args{e};
2222
2223     #  If the import failed, track the failure reason
2224
2225     my $error = $$args{import_error};
2226     my $evt = $$args{evt};
2227
2228     if($error or $evt) {
2229
2230         my $item = $$args{import_item};
2231         $logger->info("vl: unable to import item " . $item->barcode);
2232
2233         $error ||= 'general.unknown';
2234         $item->import_error($error);
2235
2236         if($evt) {
2237             my $detail = sprintf("%s : %s", $evt->{textcode}, substr($evt->{desc}, 0, 140));
2238             $item->error_detail($detail);
2239         }
2240
2241         # state of the editor is unknown at this point.  Force a rollback and start over.
2242         $e->rollback;
2243         $e = new_editor(xact => 1);
2244         $e->update_vandelay_import_item($item);
2245         $e->commit;
2246     }
2247
2248     if($$args{report_all} or ($$args{progress} % $$args{step}) == 0) {
2249         $$args{conn}->respond({
2250             total => $$args{total},
2251             progress => $$args{progress},
2252             success_count => $$args{success_count},
2253             err_event => $evt
2254         });
2255         $$args{step} *= 2 unless $$args{step} == 256;
2256     }
2257
2258     $$args{progress}++;
2259 }
2260
2261 __PACKAGE__->register_method(  
2262     api_name    => "open-ils.vandelay.match_set.get_tree",
2263     method      => "match_set_get_tree",
2264     api_level   => 1,
2265     argc        => 2,
2266     signature   => {
2267         desc    => q/For a given vms object, return a tree of match set points
2268                     represented by a vmsp object with recursively fleshed
2269                     children./
2270     }
2271 );
2272
2273 sub match_set_get_tree {
2274     my ($self, $conn, $authtoken, $match_set_id) = @_;
2275
2276     $match_set_id = int($match_set_id) or return;
2277
2278     my $e = new_editor("authtoken" => $authtoken);
2279     $e->checkauth or return $e->die_event;
2280
2281     my $set = $e->retrieve_vandelay_match_set($match_set_id) or
2282         return $e->die_event;
2283
2284     $e->allowed("ADMIN_IMPORT_MATCH_SET", $set->owner) or
2285         return $e->die_event;
2286
2287     my $tree = $e->search_vandelay_match_set_point([
2288         {"match_set" => $match_set_id, "parent" => undef},
2289         {"flesh" => -1, "flesh_fields" => {"vmsp" => ["children"]}}
2290     ]) or return $e->die_event;
2291
2292     return pop @$tree;
2293 }
2294
2295
2296 __PACKAGE__->register_method(
2297     api_name    => "open-ils.vandelay.match_set.update",
2298     method      => "match_set_update_tree",
2299     api_level   => 1,
2300     argc        => 3,
2301     signature   => {
2302         desc => q/Replace any vmsp objects associated with a given (by ID) vms
2303                 with the given objects (recursively fleshed vmsp tree)./
2304     }
2305 );
2306
2307 sub _walk_new_vmsp {
2308     my ($e, $match_set_id, $node, $parent_id) = @_;
2309
2310     my $point = new Fieldmapper::vandelay::match_set_point;
2311     $point->parent($parent_id);
2312     $point->match_set($match_set_id);
2313     $point->$_($node->$_) 
2314         for (qw/bool_op svf tag subfield negate quality heading/);
2315
2316     $e->create_vandelay_match_set_point($point) or return $e->die_event;
2317
2318     $parent_id = $e->data->id;
2319     if ($node->children && @{$node->children}) {
2320         for (@{$node->children}) {
2321             return $e->die_event if
2322                 _walk_new_vmsp($e, $match_set_id, $_, $parent_id);
2323         }
2324     }
2325
2326     return;
2327 }
2328
2329 sub match_set_update_tree {
2330     my ($self, $conn, $authtoken, $match_set_id, $tree) = @_;
2331
2332     my $e = new_editor("xact" => 1, "authtoken" => $authtoken);
2333     $e->checkauth or return $e->die_event;
2334
2335     my $set = $e->retrieve_vandelay_match_set($match_set_id) or
2336         return $e->die_event;
2337
2338     $e->allowed("ADMIN_IMPORT_MATCH_SET", $set->owner) or
2339         return $e->die_event;
2340
2341     my $existing = $e->search_vandelay_match_set_point([
2342         {"match_set" => $match_set_id},
2343         {"order_by" => {"vmsp" => "id DESC"}}
2344     ]) or return $e->die_event;
2345
2346     # delete points, working up from leaf points to the root
2347     while(@$existing) {
2348         for my $point (shift @$existing) {
2349             if( grep {$_->parent eq $point->id} @$existing) {
2350                 push(@$existing, $point);
2351             } else {
2352                 $e->delete_vandelay_match_set_point($point) or return $e->die_event;
2353             }
2354         }
2355     }
2356
2357     _walk_new_vmsp($e, $match_set_id, $tree);
2358
2359     $e->commit or return $e->die_event;
2360 }
2361
2362 __PACKAGE__->register_method(  
2363     api_name    => 'open-ils.vandelay.bib_queue.to_bucket',
2364     method      => 'bib_queue_to_bucket',
2365     api_level   => 1,
2366     argc        => 2,
2367     signature   => {
2368         desc    => q/Add to or create a new bib container (bucket) with the successfully 
2369                     imported contents of a vandelay queue.  Any user that has Vandelay 
2370                     queue create permissions can append or create buckets from his-her own queues./,
2371         params  => [
2372             {desc => 'Authtoken', type => 'string'},
2373             {desc => 'Queue ID', type => 'number'},
2374             {desc => 'Bucket Name', type => 'string'}
2375         ],
2376         return  => {desc => q/
2377             {bucket => $bucket, addcount => number-of-items-added-to-bucket, item_count => total-bucket-item-count} on success,
2378             {add_count => 0} if there is nothing to do, and Event on error/}
2379     }
2380 );
2381
2382 sub bib_queue_to_bucket {
2383     my ($self, $conn, $auth, $q_id, $bucket_name) = @_;
2384
2385     my $e = new_editor(xact => 1, authtoken => $auth);
2386     return $e->die_event unless $e->checkauth;
2387     
2388     my $queue = $e->retrieve_vandelay_bib_queue($q_id)
2389         or return $e->die_event;
2390
2391     return OpenILS::Event->new('BAD_PARAMS', 
2392         note => q/Bucket creator must be queue owner/)
2393         unless $queue->owner == $e->requestor->id;
2394
2395     # find the bib IDs that will go into the bucket
2396     my $bib_ids = $e->json_query({
2397         select => {vqbr => ['imported_as']},
2398         from => 'vqbr',
2399         where => {queue => $q_id, imported_as => {'!=' => undef}}
2400     });
2401
2402     if (!@$bib_ids) { # no records to add
2403         $e->rollback;
2404         return {add_count => 0};
2405     }
2406
2407     # allow user to add to an existing bucket by name
2408     my $bucket = $e->search_container_biblio_record_entry_bucket({
2409         owner => $e->requestor->id, 
2410         name => $bucket_name,
2411         btype => 'vandelay_queue'
2412     })->[0];
2413
2414     # if the bucket does not exist, create a new one
2415     if (!$bucket) { 
2416         $bucket = Fieldmapper::container::biblio_record_entry_bucket->new;
2417         $bucket->name($bucket_name);
2418         $bucket->owner($e->requestor->id);
2419         $bucket->btype('vandelay_queue');
2420
2421         $e->create_container_biblio_record_entry_bucket($bucket)
2422             or return $e->die_event;
2423     }
2424
2425     # create the new bucket items
2426     for my $bib_id ( map {$_->{imported_as}} @$bib_ids ) {
2427         my $item = Fieldmapper::container::biblio_record_entry_bucket_item->new;
2428         $item->target_biblio_record_entry($bib_id);
2429         $item->bucket($bucket->id);
2430         $e->create_container_biblio_record_entry_bucket_item($item)
2431             or return $e->die_event;
2432     }
2433
2434     # re-fetch the bucket to pick up the correct create_time
2435     $bucket = $e->retrieve_container_biblio_record_entry_bucket($bucket->id)
2436         or return $e->die_event;
2437
2438     # get the total count of items in this bucket
2439     my $count = $e->json_query({
2440         select => {cbrebi => [{
2441             aggregate =>  1,
2442             transform => 'count',
2443             alias => 'count',
2444             column => 'id'
2445         }]},
2446         from => 'cbrebi',
2447         where => {bucket => $bucket->id}
2448     })->[0];
2449
2450     $e->commit;
2451
2452     return {
2453         bucket => $bucket, 
2454         add_count => scalar(@$bib_ids), # items added to the bucket
2455         item_count => $count->{count} # total items in buckets
2456     };
2457 }
2458
2459 1;