LP 1350345: marc_export blows up on bad records.
[working/Evergreen.git] / Open-ILS / src / support-scripts / marc_export.in
1 #!/usr/bin/perl
2 # ---------------------------------------------------------------
3 # Copyright © 2013 Merrimack Valley Library Consortium
4 # Jason Stephenson <jstephenson@mvlc.org>
5 #
6 # This program is part of Evergreen; you can redistribute it and/or
7 # modify it under the terms of the GNU General Public License as
8 # published by the Free Software Foundation; either version 2 of the
9 # License, or (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 # GNU General Public License for more details.
15 # ---------------------------------------------------------------
16 use strict;
17 use warnings;
18 use OpenILS::Utils::Fieldmapper;
19 use OpenILS::Application::AppUtils;
20 use OpenSRF::Utils::JSON;
21 use MARC::Field;
22 use MARC::Record;
23 use MARC::File::XML (BinaryEncoding => 'UTF-8');
24 use Date::Manip::Date;
25 my $U = 'OpenILS::Application::AppUtils';
26
27 binmode(STDERR, ':utf8');
28
29 package Marque;
30
31 our $config = Marque::Config->new();
32 Fieldmapper->import(IDL => $config->option_value('xml-idl'));
33
34 # Look for passed in ids:
35 my @ids = ();
36 if ($config->need_ids()) {
37     print STDERR "Waiting for input\n";
38     while (my $i = <>) {
39         push @ids, $i if ($i =~ /^\s*[0-9]+\s*$/);
40     }
41 }
42
43 my $exporter;
44 if ($config->option_value('type') eq 'authority') {
45     $exporter = Marque::Authority->new(\@ids);
46 } else {
47     $exporter = Marque::Biblio->new(\@ids);
48 }
49
50 Marque::Output::output($exporter);
51
52 # ------------------------------------------------------------------
53 package Marque::Config;
54
55 use Getopt::Long;
56 use List::MoreUtils qw(none);
57 use OpenSRF::System;
58 use OpenSRF::Utils::SettingsClient;
59
60 use constant FORMATS => qw(USMARC UNIMARC XML BRE ARE);
61 use constant STORES => qw(reporter cstore storage);
62 use constant TYPES => qw(authority biblio);
63
64
65 sub new {
66     my $class = shift;
67
68     my $self = {};
69
70     # For command line options.
71     my %opts;
72
73     # set some default values
74     $opts{'format'} = 'USMARC';
75     $opts{'encoding'} = 'MARC8';
76     $opts{'type'} = 'biblio';
77     $opts{'money'} = '$';
78     $opts{'timeout'} = 0;
79     $opts{'config'} = '@sysconfdir@/opensrf_core.xml';
80     $opts{'store'} = 'reporter';
81
82     GetOptions(\%opts,
83                'help',
84                'items',
85                'mfhd',
86                'all',
87                'replace_001',
88                'location=s',
89                'money=s',
90                'config=s',
91                'format=s',
92                'type=s',
93                'xml-idl=s',
94                'encoding=s',
95                'timeout=i',
96                'library=s@',
97                'since=s',
98                'store=s',
99                'debug');
100
101     if ($opts{help}) {
102         print <<"HELP";
103 This script exports MARC authority, bibliographic, and serial holdings
104 records from an Evergreen database. 
105
106 Input to this script can consist of a list of record IDs, with one record ID
107 per line, corresponding to the record ID in the Evergreen database table of
108 your requested record type.
109
110 Alternately, passing the --all option will attempt to export all records of
111 the specified type from the Evergreen database. The --all option starts at
112 record ID 1 and increments the ID by 1 until the largest ID in the database
113 is retrieved. This may not be very efficient for databases with large gaps
114 in their ID sequences.
115
116 Usage: $0 [options]
117  --help or -h       This screen.
118  --config or -c     Configuration file [@sysconfdir@/opensrf_core.xml]
119  --format or -f     Output format (USMARC, UNIMARC, XML, BRE, ARE) [USMARC]
120  --encoding or -e   Output encoding (UTF-8, ISO-8859-?, MARC8) [MARC8]
121  --xml-idl or -x    Location of the IDL XML
122  --timeout          Remains for backward compatibility. No longer used.
123  --type or -t       Record type (BIBLIO, AUTHORITY) [BIBLIO]
124  --all or -a        Export all records; ignores input list
125  --replace_001      Replace the 001 field value with the record ID
126  --store            Use the given storage backend to connect to the database.
127                     Choices are (reporter, cstore, storage) [reporter]
128  --since            Export records modified since a certain date and time.
129
130  Additional options for type = 'BIBLIO':
131  --items or -i      Include items (holdings) in the output
132  --money            Currency symbol to use in item price field [\$]
133  --mfhd             Export serial MFHD records for associated bib records
134                     Not compatible with --format=BRE
135  --location or -l   MARC Location Code for holdings from
136                     http://www.loc.gov/marc/organizations/orgshome.html
137  --library          Export the bibliographic records that have attached
138                     holdings for the listed library or libraries as
139                     identified by shortname
140
141 Examples:
142
143 To export a set of USMARC records in a file named "output_file" based on the
144 IDs contained in a file named "list_of_ids":
145   cat list_of_ids | $0 > output_file
146
147 To export a set of MARC21XML authority records in a file named "output.xml"
148 for all authority records in the database:
149   $0 --format XML --type AUTHORITY --all > output.xml
150
151 To export a set of USMARC bibliographic records encoded in UTF-8 in a file
152 named "sys1_bibs.mrc" based on records which have attached callnumbers for the
153 libraries with the short names "BR1" and "BR2":
154
155   $0 --library BR1 --library BR2 --encoding UTF-8 > sys1_bibs.mrc
156
157 HELP
158         exit;
159     }
160
161     OpenSRF::System->bootstrap_client( config_file => $opts{config} );
162     my $sclient = OpenSRF::Utils::SettingsClient->new();
163     unless ($opts{'xml-idl'}) {
164         $opts{'xml-idl'} = $sclient->config_value('IDL');
165     }
166
167     # Validate some of the settings.
168     if ($opts{all} && $opts{library}) {
169         die('Incompatible arguments: you cannot combine a request for all ' .
170                 'records with a request for records by library');
171     }
172     if ($opts{all} && $opts{since}) {
173         die('Incompatible arguments: you cannot combine a request for all ' .
174                 'records with a request for records added or changed since a certain date');
175     }
176     $opts{type} = lc($opts{type});
177     if (none {$_ eq $opts{type}} (TYPES)) {
178         die "Please select a supported type.  ".
179             "Right now that means one of [".
180                 join('|',(FORMATS)). "]\n";
181     }
182     $opts{format} = uc($opts{format});
183     if (none {$_ eq $opts{format}} (FORMATS)) {
184         die "Please select a supported format.  ".
185             "Right now that means one of [".
186                 join('|',(FORMATS)). "]\n";
187     }
188
189     if ($opts{format} eq 'ARE' && $opts{type} ne 'authority') {
190         die "Format ARE is not compatible with type " . $opts{type};
191     }
192     if ($opts{format} eq 'BRE' && $opts{type} ne 'biblio') {
193         die "Format BRE is not compatible with type " . $opts{type};
194     }
195     if ($opts{format} eq 'BRE' && $opts{items}) {
196         die "Format BRE is not compatible with exporting holdings."
197     }
198
199     if ($opts{mfhd}) {
200         if ($opts{type} ne 'biblio') {
201             die "MFHD export only works with bibliographic records.";
202         } elsif ($opts{format} eq 'BRE') {
203             die "MFHD export incompatible with format BRE.";
204         }
205     }
206
207     $opts{store} = lc($opts{store});
208     if (none {$_ eq $opts{store}} (STORES)) {
209         die "Please select a supported store.  ".
210             "Right now that means one of [".
211                 join('|',(STORES)). "]\n";
212     } else {
213         my $app;
214         if ($opts{store} eq 'reporter') {
215             $app = 'open-ils.reporter-store';
216         } else {
217             $app = 'open-ils.' . $opts{store};
218         }
219         if ($app eq 'open-ils.storage') {
220             $self->{dbsettings} = $sclient->config_value(
221                 apps => $app => app_settings => databases => 'database');
222         } else {
223             $self->{dbsettings} = $sclient->config_value(
224                 apps => $app => app_settings => 'database');
225         }
226     }
227     $opts{encoding} = uc($opts{encoding});
228
229     $self->{'options'} = \%opts;
230     bless $self, $class;
231     return $self;
232 }
233
234 sub option_value {
235     my ($self, $option) = @_;
236     return $self->{options}->{$option};
237 }
238
239 sub database_settings {
240     my $self = shift;
241     return $self->{dbsettings};
242 }
243
244 sub need_ids {
245     my $self = shift;
246     my $rv = 1;
247
248     $rv = 0 if ($self->{options}->{all});
249     $rv = 0 if ($self->{options}->{since});
250     $rv = 0 if ($self->{options}->{library});
251
252     return $rv;
253 }
254
255 # ------------------------------------------------------------------
256 # This package exists to get a connection to the database.  Since
257 # we'll need one for both biblio records and authorities, we've made a
258 # single subpackage with a function so that we don't have to duplicate
259 # code.
260 package Marque::Connector;
261
262 use DBI;
263
264 # Pass a Marque::Config object's database_settings return value into
265 # this to get a DBI connection.
266 # ex:
267 # my $db = Marque::Connector::connect($config->database_settings);
268 sub connect {
269     my $args = shift;
270
271     # Build a connect string from the args.
272     my $connect_string = 'DBI:Pg:';
273     $connect_string .= 'dbname=' . $args->{db} . ';';
274     $connect_string .= 'host=' . $args->{host} . ';';
275     $connect_string .= 'port=' . $args->{port};
276
277     my $db_handle = DBI->connect($connect_string,
278                                  $args->{user}, $args->{pw});
279     return $db_handle;
280 }
281
282 # A function to get the date into a format better for PostgreSQL.
283 sub db_date {
284     my $input = shift;
285     my $date;
286     if (ref($input) eq 'Date::Manip::Date') {
287         $date = $input;
288     } else {
289         $date = Date::Manip::Date->new();
290         if ($date->parse($input)) {
291             die "Can't parse date $input";
292         }
293     }
294     return $date->printf("%Y-%m-%dT%H:%M:%S%z");
295  }
296
297 # ------------------------------------------------------------------
298 # You would typically have the next two packages inherit from a common
299 # superclass, but ineritance doesn't seem to work when all packages
300 # are in single file, so we have some duplicated code between these
301 # two.
302
303 # Get bibliographic records from the database.
304 package Marque::Biblio;
305
306 sub new {
307     my $class = shift;
308     my $idlist = shift;
309     my $self = {idlist => $idlist};
310     $self->{handle} = Marque::Connector::connect(
311         $Marque::config->database_settings);
312     $self->{since_date} = Date::Manip::Date->new;
313     $self->{since_date}->parse($Marque::config->option_value('since'));
314
315     # We need multiple fieldmapper classes depending on our
316     # options. We'll just get the information that we'll need for them
317     # all right here instead of only fetching the information when
318     # needed.
319     $self->{breClass} = Fieldmapper::class_for_hint('bre');
320     $self->{acnClass} = Fieldmapper::class_for_hint('acn');
321     $self->{acpClass} = Fieldmapper::class_for_hint('acp');
322     $self->{sreClass} = Fieldmapper::class_for_hint('sre');
323
324     # Make an arrayref of shortname ids if the library option was
325     # specified:
326     $self->{libs} = [];
327     if ($Marque::config->option_value('library')) {
328         # This is done not only for speed, but to prevent SQL injection.
329         my $sth = $self->{handle}->prepare('select id from actor.org_unit where shortname=any(?::text[])');
330         if ($sth->execute($Marque::config->option_value('library'))) {
331             my $r = $sth->fetchall_arrayref();
332             my @ids = map {$_->[0]} @{$r};
333             $self->{libs} = \@ids;
334             $sth->finish();
335         }
336     }
337
338     bless $self, $class;
339     return $self;
340 }
341
342 sub build_query {
343     my $self = shift;
344
345     # Get the field names and tables for our classes. We add the fully
346     # qualified table names to the fields so that the joins will work.
347     my $breTable = $self->{breClass}->Table();
348     my @breFields = map {$breTable . '.' . $_} $self->{breClass}->real_fields();
349     my $acnTable = $self->{acnClass}->Table();
350     my $acpTable = $self->{acpClass}->Table();
351
352     # Now we build the query in pieces:
353
354     # We always select the bre fields:
355     my $select = 'select distinct ' . join(',', @breFields);
356     # We always use the bre table.
357     my $from = "from $breTable";
358
359     # If have the libraries or items options, we need to join the
360     # asset.call_number table. If we have both, this variable checks
361     # that it has already been joined so we don't create an invalid
362     # query.
363     my $acn_joined = 0;
364     # Join to the acn table as needed for the library option.
365     if (@{$self->{libs}}) {
366         $acn_joined = 1;
367         $from .= <<ACN_JOIN;
368
369 join $acnTable on $acnTable.record = $breTable.id
370 and $acnTable.owning_lib in (
371 ACN_JOIN
372         $from .= join(',', @{$self->{libs}}) . ")";
373         $from .= "\nand $acnTable.deleted = 'f'" unless ($Marque::config->option_value('since'));
374     }
375
376     if ($Marque::config->option_value('items')) {
377         unless ($acn_joined) {
378             $from .= "\njoin $acnTable on $acnTable.record = $breTable.id";
379             $from .= "\nand $acnTable.deleted = 'f'" unless ($Marque::config->option_value('since'));
380         }
381         $from .= "\njoin $acpTable on $acpTable.call_number = $acnTable.id";
382         $from .= "\nand $acpTable.deleted = 'f'" unless ($Marque::config->option_value('since'));
383     }
384
385     # The where really depends on a few options:
386     my $where = "where $breTable.id > 0 and ";
387     # We fill in the where as necessary.
388     if ($self->{idlist} && @{$self->{idlist}}) {
389         $where .= "$breTable.id in (" . join(',', @{$self->{idlist}}) . ')';
390     } elsif ($Marque::config->option_value('since')) {
391         my $since_str = Marque::Connector::db_date($self->{since_date});
392         $where .= "($breTable.edit_date > '$since_str'";
393         $where .= " or $breTable.create_date > '$since_str')";
394     } else {
395         # We want all non-deleted records.
396         $where .= "$breTable.deleted = 'f'";
397     }
398
399     $self->{query} = $select . "\n" . $from . "\n" . $where;
400 }
401
402 sub execute_query {
403     my $self = shift;
404     $self->build_query() unless ($self->{query});
405     $self->{sth} = $self->{handle}->prepare($self->{query});
406     return $self->{sth}->execute;
407 }
408
409 sub next {
410     my $self = shift;
411     my $output;
412
413     # $r holds the record object, either sre or bre.  $marc holds the
414     # current record's MARC, either sre.marc or bre.marc
415     my ($r,$marc);
416     # If we have the mfhd option and we've previously retrieved some
417     # sres, then we output one of the retrieved sres for each call
418     # until we run out.  These sres "go with" the previous bib record.
419     if ($Marque::config->option_value('mfhd') && $self->{mfhds} && @{$self->{mfhds}}) {
420         $r = shift(@{$self->{mfhds}});
421         eval {
422             $marc = MARC::Record->new_from_xml($r->marc(),
423                                                $Marque::config->option_value('encoding'),
424                                                $Marque::config->option_value('format'));
425         };
426         if ($@) {
427             print STDERR "Error in serial record " . $r->id() . "\n";
428             print STDERR "$@\n";
429             import MARC::File::XML; # Reset SAX Parser.
430             return $self->next();
431         }
432     } else {
433         my $data = $self->{sth}->fetchrow_hashref;
434         if ($data) {
435             $r = $self->{breClass}->from_bare_hash($data);
436             if ($Marque::config->option_value('format') eq 'BRE') {
437                 $output = OpenSRF::Utils::JSON->perl2JSON($r);
438             } else {
439                 eval {
440                     $marc = MARC::Record->new_from_xml($r->marc(),
441                                                        $Marque::config->option_value('encoding'),
442                                                        $Marque::config->option_value('format'));
443                 };
444                 if ($@) {
445                     print STDERR "Error in bibliograpic record " . $r->id() . "\n";
446                     print STDERR "$@\n";
447                     import MARC::File::XML; # Reset SAX Parser.
448                     return $self->next();
449                 }
450                 if ($Marque::config->option_value('replace_001')) {
451                     my $tcn = $marc->field('001');
452                     if ($tcn) {
453                         $tcn->update($r->id());
454                     } else {
455                         $tcn = MARC::Field->new('001', $r->id());
456                         $marc->insert_fields_ordered($tcn);
457                     }
458                 }
459                 if ($Marque::config->option_value('items')) {
460                     my @acps = $self->acps_for_bre($r);
461                     foreach my $acp (@acps) {
462                         next unless ($acp);
463                         my $location = $Marque::config->option_value('location');
464                         my $price = ($acp->price() ? $Marque::config->option_value('money').$acp->price() : '');
465                         $marc->insert_grouped_field(
466                             MARC::Field->new(
467                                 '852', '4', ' ',
468                                 ($location ? ('a' => $location) : ()),
469                                 b => $acp->call_number()->owning_lib()->shortname(),
470                                 b => $acp->circ_lib()->shortname(),
471                                 c => $acp->location()->name(),
472                                 j => $acp->call_number()->label(),
473                                 ($acp->circ_modifier() ? (g => $acp->circ_modifier()) : ()),
474                                 p => $acp->barcode(),
475                                 ($price ? (y => $price) : ()),
476                                 ($acp->copy_number() ? (t => $acp->copy_number()) : ()),
477                                 ($U->is_true($acp->ref()) ? (x => 'reference') : ()),
478                                 (!$U->is_true($acp->holdable()) ? (x => 'unholdable') : ()),
479                                 (!$U->is_true($acp->circulate()) ? (x => 'noncirculating') : ()),
480                                 (!$U->is_true($acp->opac_visible()) ? (x => 'hidden') : ())
481                             ));
482                     }
483                 }
484                 if ($Marque::config->option_value('mfhd')) {
485                     $self->{mfhds} = [$self->sres_for_bre($r)];
486                 }
487             }
488         }
489     }
490     # Common stuff that doesn't depend on record type.
491     if ($marc) {
492         if ($Marque::config->option_value('since')) {
493             my $leader = $marc->leader();
494             if ($U->is_true($r->deleted())) {
495                 substr($leader, 5, 1) = 'd';
496                 $marc->leader($leader);
497             } else {
498                 my $create_date = Date::Manip::Date->new;
499                 $create_date->parse($r->create_date());
500                 my $edit_date = Date::Manip::Date->new;
501                 $edit_date->parse($r->edit_date());
502                 if ($self->{since_date}->cmp($create_date) < 0) {
503                     substr($leader, 5, 1) = 'n';
504                     $marc->leader($leader);
505                 } elsif ($self->{since_date}->cmp($edit_date) < 0) {
506                     substr($leader, 5, 1) = 'c';
507                     $marc->leader($leader);
508                 }
509             }
510         }
511         if ($Marque::config->option_value('format') eq 'XML') {
512             eval {
513                 $output = $marc->as_xml_record;
514                 $output =~ s/^<\?.+?\?>$//mo;
515             };
516             if ($@) {
517                 print STDERR "Error in bibliograpic record " . $r->id() . "\n";
518                 print STDERR "$@\n";
519                 return $self->next();
520             }
521         } else {
522             eval {
523                 $output = $marc->as_usmarc;
524             };
525             if ($@) {
526                 print STDERR "Error in bibliograpic record " . $r->id() . "\n";
527                 print STDERR "$@\n";
528                 return $self->next();
529             }
530         }
531     }
532     return $output;
533 }
534
535 # Returns a list of aou objects in an array.
536 sub orgs {
537     my $self = shift;
538     unless ($self->{orgs} && @{$self->{orgs}}) {
539         my $fmClass = Fieldmapper::class_for_hint('aou');
540         my @classFields = $fmClass->real_fields();
541         my $classTable = $fmClass->Table();
542         my $query = 'select ' . join(',', @classFields);
543         $query .= "\nfrom $classTable";
544         my $sth = $self->{handle}->prepare($query);
545         if ($sth->execute()) {
546             my $result = $sth->fetchall_arrayref({});
547             my @orgs = map {$fmClass->from_bare_hash($_)} @{$result};
548             $self->{orgs} = \@orgs;
549         } else {
550             $self->{orgs} = [];
551         }
552     }
553     return @{$self->{orgs}};
554 }
555
556 # Returns an array of acpl objects.
557 sub shelves {
558     my $self = shift;
559
560     unless ($self->{shelves} && @{$self->{shelves}}) {
561         my $fmClass = Fieldmapper::class_for_hint('acpl');
562         my @classFields = $fmClass->real_fields();
563         my $classTable = $fmClass->Table();
564         my $query = 'select ' . join(',', @classFields);
565         $query .= "\nfrom $classTable";
566         my $result = $self->{handle}->selectall_arrayref($query, {Slice=>{}});
567         my @shelves = map {$fmClass->from_bare_hash($_)} @{$result};
568         $self->{shelves} = \@shelves;
569     }
570
571     return @{$self->{shelves}};
572 }
573
574 # Returns an array of acn objects for a given bre object or id.
575 sub acns_for_bre {
576     my $self = shift;
577     my $bre = shift;
578     $bre = $bre->id() if (ref($bre));
579
580     unless ($self->{acnHandle}) {
581         my $query = "select " . join(',', $self->{acnClass}->real_fields());
582         $query .= "\nfrom " . $self->{acnClass}->Table();
583         $query .= "\nwhere record = ?";
584         if (@{$self->{libs}}) {
585             $query .= "\nand owning_lib in (";
586             $query .= join(',', @{$self->{libs}}) . ")";
587         }
588         $query .= "\nand deleted = 'f'" unless($Marque::config->option_value('since'));
589         $self->{acnHandle} = $self->{handle}->prepare($query);
590     }
591
592     if ($self->{acnHandle}->execute($bre)) {
593         my $result = $self->{acnHandle}->fetchall_arrayref({});
594         return map {$self->{acnClass}->from_bare_hash($_)} @{$result};
595     }
596
597     # If for some reason, we don't find anything.
598     return undef;
599 }
600
601 # Returns an array of acp objects for a given bre object or id.
602 sub acps_for_bre {
603     my $self = shift;
604     my $bre = shift;
605     $bre = $bre->id() if (ref($bre));
606
607     my @orgs = $self->orgs();
608     my @locations = $self->shelves();
609
610     my @acns = $self->acns_for_bre($bre);
611     if (@acns) {
612         my $query = 'select ' . join(',', $self->{acpClass}->real_fields());
613         $query .= "\nfrom " . $self->{acpClass}->Table();
614         $query .= "\nwhere call_number in (";
615         $query .= join(',', map {$_->id()} @acns);
616         $query .= ")";
617         $query .= "\nand deleted = 'f'" unless ($Marque::config->option_value('since'));
618         my $result = $self->{handle}->selectall_arrayref($query, {Slice=>{}});
619         if ($result && @{$result}) {
620             my @acps = map {$self->{acpClass}->from_bare_hash($_)} @{$result};
621             foreach (@acps) {
622                 my $cn = $_->call_number();
623                 my $clib = $_->circ_lib();
624                 my $loc = $_->location();
625                 my ($org) = grep {$_->id() == $clib} @orgs;
626                 my ($acn) = grep {$_->id() == $cn} @acns;
627                 my ($location) = grep {$_->id() == $loc} @locations;
628                 my $olib = $acn->owning_lib();
629                 my ($owner) = grep {$_->id() == $olib} @orgs;
630                 $acn->owning_lib($owner);
631                 $_->call_number($acn);
632                 $_->circ_lib($org);
633                 $_->location($location);
634             }
635             return @acps;
636         }
637     }
638
639     # If for some reason, we don't find anything.
640     return undef;
641 }
642
643 # Retreive an array for sre objects when the --mfhd option is used.
644 sub sres_for_bre {
645     my $self = shift;
646     my $bre = shift;
647     $bre = $bre->id() if (ref($bre));
648     my @sres;
649     # Build a query to retrieve SREs when the MFHD option is passed.
650     if ($Marque::config->option_value('mfhd')) {
651         # Create a persistent handle as needed.
652         unless ($self->{sreSth}) {
653             my $query = "select " . join(',', $self->{sreClass}->real_fields());
654             $query .= "\nfrom " . $self->{sreClass}->Table();
655             $query .= "\nwhere record = ?";
656             $query .= "\nand deleted = 'f'" unless ($Marque::config->option_value('since'));
657             $self->{sreSth} = $self->{handle}->prepare($query);
658         }
659         if ($self->{sreSth}->execute($bre)) {
660             while (my $data = $self->{sreSth}->fetchrow_hashref) {
661                 push @sres, $self->{sreClass}->from_bare_hash($data);
662             }
663             $self->{sreSth}->finish; # Sometimes DBI complains.
664         }
665     }
666     # May be empty.
667     return @sres;
668 }
669
670 # Get authority records from the database.
671 package Marque::Authority;
672
673 sub new {
674     my $class = shift;
675     my $idlist = shift;
676     my $self = {idlist => $idlist};
677     $self->{handle} = Marque::Connector::connect(
678         $Marque::config->database_settings);
679     $self->{fmClass} = Fieldmapper::class_for_hint('are');
680     $self->{since_date} = Date::Manip::Date->new;
681     $self->{since_date}->parse($Marque::config->option_value('since'));
682     bless $self, $class;
683     return $self;
684 }
685
686 sub build_query {
687     my $self = shift;
688
689     # Get the information for are object from the Fieldmapper:
690     my @fields  = $self->{fmClass}->real_fields();
691     my $table = $self->{fmClass}->Table();
692
693     # Build the actual query.
694     my $select = "select " . join(',', @fields);
695     my $from = "from $table";
696     my $where = 'where ';
697
698     # If we have an idlist, we pretty much ignore anything else.
699     if ($self->{idlist} && @{$self->{idlist}}) {
700         $where .= 'id in (' . join(',', @{$self->{idlist}}) . ')';
701     } elsif ($Marque::config->option_value('since')) {
702         my $since_str = Marque::Connector::db_date($self->{since_date});
703         $where .= "edit_date > '$since_str'";
704         $where .= " or create_date > '$since_str'";
705     } else {
706         # We want all non-deleted records.
707         $where .= "deleted = 'f'";
708     }
709
710     $self->{query} = $select . "\n" . $from . "\n" . $where;
711 }
712
713 sub execute_query {
714     my $self = shift;
715     $self->build_query() unless ($self->{query});
716     $self->{sth} = $self->{handle}->prepare($self->{query});
717     return $self->{sth}->execute;
718 }
719
720 sub next {
721     my $self = shift;
722     my $output;
723     my $data = $self->{sth}->fetchrow_hashref;
724
725     if ($data) {
726         my $format = $Marque::config->option_value('format');
727         my $are = $self->{fmClass}->from_bare_hash($data);
728         if ($format eq 'ARE') {
729             $output = OpenSRF::Utils::JSON->perl2JSON($are);
730         } else {
731             my $r;
732             eval {
733                 $r = MARC::Record->new_from_xml($are->marc(),
734                                                 $Marque::config->option_value('encoding'),
735                                                 $Marque::config->option_value('format'));
736             };
737             if ($@) {
738                 print STDERR "Error in authority record " . $are->id() . "\n";
739                 print STDERR "$@\n";
740                 import MARC::File::XML; # Reset SAX Parser.
741                 return $self->next();
742             }
743             if ($Marque::config->option_value('replace_001')) {
744                 my $tcn = $r->field('001');
745                 if ($tcn) {
746                     $tcn->update($are->id());
747                 } else {
748                     $tcn = MARC::Field->new('001', $are->id());
749                     $r->insert_fields_ordered($tcn);
750                 }
751             }
752             if ($Marque::config->option_value('since')) {
753                 my $leader = $r->leader();
754                 if ($U->is_true($are->deleted())) {
755                     substr($leader, 5, 1) = 'd';
756                     $r->leader($leader);
757                 } else {
758                     my $create_date = Date::Manip::Date->new;
759                     $create_date->parse($are->create_date());
760                     my $edit_date = Date::Manip::Date->new;
761                     $edit_date->parse($are->edit_date());
762                     if ($self->{since_date}->cmp($create_date) < 0) {
763                         substr($leader, 5, 1) = 'n';
764                         $r->leader($leader);
765                     } elsif ($self->{since_date}->cmp($edit_date) < 0) {
766                         substr($leader, 5, 1) = 'c';
767                         $r->leader($leader);
768                     }
769                 }
770             }
771             if ($Marque::config->option_value('format') eq 'XML') {
772                 eval {
773                     $output = $r->as_xml_record;
774                     $output =~ s/^<\?.+?\?>$//mo;
775                 };
776                 if ($@) {
777                     print STDERR "Error in authority record " . $r->id() . "\n";
778                     print STDERR "$@\n";
779                     return $self->next();
780                 }
781             } else {
782                 eval {
783                     $output = $r->as_usmarc;
784                 };
785                 if ($@) {
786                     print STDERR "Error in authority record " . $r->id() . "\n";
787                     print STDERR "$@\n";
788                     return $self->next();
789                 }
790             }
791         }
792     }
793
794     return $output;
795 }
796
797 # ------------------------------------------------------------------
798 # Since the ultimate output is largely independent of the type of the
799 # records, we use a single subpackage to group our output routines.
800 package Marque::Output;
801
802 sub output {
803     my $extractor = shift;
804     if ($extractor->execute_query) {
805         if ($Marque::config->option_value('encoding') eq 'UTF-8') {
806             binmode(STDOUT, ':utf8');
807         } else {
808             binmode(STDOUT, ':raw');
809         }
810
811         &preamble;
812         while (my $output = $extractor->next()) {
813             print $output;
814         }
815         &postamble;
816     } else {
817         print STDERR $extractor->{query} if ($Marque::config->option_value('debug'));
818         die "Database query failed!";
819     }
820 }
821
822 sub preamble {
823     if ($Marque::config->option_value('format') eq 'XML') {
824         my $encoding = $Marque::config->option_value('encoding');
825         print <<PREAMBLE;
826 <?xml version="1.0" encoding="$encoding"?>
827 <collection xmlns='http://www.loc.gov/MARC21/slim'>
828 PREAMBLE
829     }
830 }
831
832 sub postamble {
833     if ($Marque::config->option_value('format') eq 'XML') {
834         print "</collection>\n";
835     }
836 }
837
838 1;