2 # ---------------------------------------------------------------
3 # Copyright © 2013,2014 Merrimack Valley Library Consortium
4 # Jason Stephenson <jstephenson@mvlc.org>
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
15 # ---------------------------------------------------------------
16 # TODO: Document with POD.
17 # This guy parallelizes a reingest.
23 # Globals for the command line options: --
25 # You will want to adjust the next two based on your database size,
26 # i.e. number of bib records as well as the number of cores on your
27 # database server. Using roughly number of cores/2 doesn't seem to
28 # have much impact in off peak times.
29 my $batch_size = 10000; # records processed per batch
30 my $max_child = 8; # max number of parallel worker processes
32 my $skip_browse; # Skip the browse reingest.
33 my $skip_attrs; # Skip the record attributes reingest.
34 my $skip_search; # Skip the search reingest.
35 my $skip_facets; # Skip the facets reingest.
36 my $skip_display; # Skip the display reingest.
37 my $start_id; # start processing at this bib ID.
38 my $end_id; # stop processing when this bib ID is reached.
39 my $max_duration; # max processing duration in seconds
40 my $help; # show help text
41 my $opt_pipe; # Read record ids from STDIN.
42 my $record_attrs; # Record attributes for metabib.reingest_record_attributes.
44 # Database connection options with defaults:
45 my $db_user = $ENV{PGUSER} || 'evergreen';
46 my $db_host = $ENV{PGHOST} || 'localhost';
47 my $db_db = $ENV{PGDATABASE} || 'evergreen';
48 my $db_password = $ENV{PGPASSWORD} || 'evergreen';
49 my $db_port = $ENV{PGPORT} || 5432;
52 'user=s' => \$db_user,
53 'host=s' => \$db_host,
55 'password=s' => \$db_password,
56 'port=i' => \$db_port,
57 'batch-size=i' => \$batch_size,
58 'max-child=i' => \$max_child,
59 'skip-browse' => \$skip_browse,
60 'skip-attrs' => \$skip_attrs,
61 'skip-search' => \$skip_search,
62 'skip-facets' => \$skip_facets,
63 'skip-display' => \$skip_display,
64 'start-id=i' => \$start_id,
65 'end-id=i' => \$end_id,
67 'max-duration=i' => \$max_duration,
68 'attr=s@' => \$record_attrs,
75 $0 --batch-size $batch_size --max-child $max_child \
76 --start-id 1 --end-id 500000 --duration 14400
79 Number of records to process per batch
82 Max number of worker processes
89 Skip the selected reingest component
92 Specify a record attribute for ingest
93 This option can be used more than once to specify multiple
95 This option is ignored if --skip-attrs is also given.
98 Start processing at this record ID.
101 Stop processing when this record ID is reached
104 Read record IDs to reingest from standard input.
105 This option conflicts with --start-id and/or --end-id.
108 Stop processing after this many total seconds have passed.
119 # Check for mutually exclusive options:
120 if ($opt_pipe && ($start_id || $end_id)) {
121 warn('Mutually exclusive options');
125 my $where = "WHERE deleted = 'f'";
126 if ($start_id && $end_id) {
127 $where .= " AND id BETWEEN $start_id AND $end_id";
128 } elsif ($start_id) {
129 $where .= " AND id >= $start_id";
131 $where .= " AND id <= $end_id";
134 # "Gimme the keys! I'll drive!"
137 FROM biblio.record_entry
142 # Stuffs needed for looping, tracking how many lists of records we
143 # have, storing the actual list of records, and the list of the lists
145 my ($count, $lists, $records) = (0,0,[]);
147 # To do the browse-only ingest:
150 my $start_epoch = time;
152 sub duration_expired {
153 return 1 if $max_duration && (time - $start_epoch) >= $max_duration;
157 # All of the DBI->connect() calls in this file assume that you have
158 # configured the PGHOST, PGPORT, PGDATABASE, PGUSER, and PGPASSWORD
159 # variables in your execution environment. If you have not, you have
164 # 2) edit the DBI->connect() calls in this program so that it can
165 # connect to your database.
167 # Get the input records from either standard input or the database.
171 # Assume any string of digits is an id.
172 if (my @subs = /([0-9]+)/g) {
177 my $dbh = DBI->connect("DBI:Pg:database=$db_db;host=$db_host;port=$db_port;application_name=pingest",
178 $db_user, $db_password);
179 @input = @{$dbh->selectcol_arrayref($q)};
183 foreach my $record (@input) {
184 push(@blist, $record); # separate list of browse-only ingest
185 push(@$records, $record);
186 if (++$count == $batch_size) {
187 $lol[$lists++] = $records;
192 $lol[$lists++] = $records if ($count); # Last batch is likely to be
195 # We're going to reuse $count to keep track of the total number of
199 # @running keeps track of the running child processes.
202 # We start the browse-only ingest before starting the other ingests.
203 browse_ingest(@blist) unless ($skip_browse);
205 # We loop until we have processed all of the batches stored in @lol
206 # or the maximum processing duration has been reached.
207 while ($count < $lists) {
208 my $duration_expired = duration_expired();
210 if (scalar(@lol) && scalar(@running) < $max_child && !$duration_expired) {
211 # Reuse $records for the lulz.
212 $records = shift(@lol);
213 if ($skip_search && $skip_facets && $skip_attrs && $skip_display) {
220 if (grep {$_ == $pid} @running) {
221 @running = grep {$_ != $pid} @running;
223 print "$count of $lists processed\n";
227 if ($duration_expired && scalar(@running) == 0) {
228 warn "Exiting on max_duration ($max_duration)\n";
233 # This subroutine forks a process to do the browse-only ingest on the
234 # @blist above. It cannot be parallelized, but can run in parrallel
235 # to the other ingests.
239 if (!defined($pid)) {
240 die "failed to spawn child";
242 # Add our browser to the list of running children.
243 push(@running, $pid);
244 # Increment the number of lists, because this list was not
245 # previously counted.
247 } elsif ($pid == 0) {
248 my $dbh = DBI->connect("DBI:Pg:database=$db_db;host=$db_host;port=$db_port;application_name=pingest",
249 $db_user, $db_password);
250 my $sth = $dbh->prepare('SELECT metabib.reingest_metabib_field_entries(bib_id := ?, skip_facet := TRUE, skip_browse := FALSE, skip_search := TRUE, skip_display := TRUE)');
252 if ($sth->execute($_)) {
253 my $crap = $sth->fetchall_arrayref();
255 warn ("Browse ingest failed for record $_");
257 if (duration_expired()) {
258 warn "browse_ingest() stopping on record $_ ".
259 "after max duration reached\n";
268 # Fork a child to do the other reingests:
273 if (!defined($pid)) {
274 die "Failed to spawn a child";
276 push(@running, $pid);
277 } elsif ($pid == 0) {
278 my $dbh = DBI->connect("DBI:Pg:database=$db_db;host=$db_host;port=$db_port;application_name=pingest",
279 $db_user, $db_password);
280 reingest_attributes($dbh, $list) unless ($skip_attrs);
281 reingest_field_entries($dbh, $list)
282 unless ($skip_facets && $skip_search && $skip_display);
288 # Reingest metabib field entries on a list of records.
289 sub reingest_field_entries {
292 my $sth = $dbh->prepare('SELECT metabib.reingest_metabib_field_entries(bib_id := ?, skip_facet := ?, skip_browse := TRUE, skip_search := ?, skip_display := ?)');
293 # Because reingest uses "skip" options we invert the logic of do variables.
294 $sth->bind_param(2, ($skip_facets) ? 1 : 0);
295 $sth->bind_param(3, ($skip_search) ? 1 : 0);
296 $sth->bind_param(4, ($skip_display) ? 1: 0);
298 $sth->bind_param(1, $_);
299 if ($sth->execute()) {
300 my $crap = $sth->fetchall_arrayref();
302 warn ("metabib.reingest_metabib_field_entries failed for record $_");
307 # Reingest record attributes on a list of records.
308 sub reingest_attributes {
311 my $sth = $dbh->prepare(<<END_OF_INGEST
312 SELECT metabib.reingest_record_attributes(rid := id, prmarc := marc, pattr_list := ?)
313 FROM biblio.record_entry
317 $sth->bind_param(1, $record_attrs);
319 $sth->bind_param(2, $_);
320 if ($sth->execute()) {
321 my $crap = $sth->fetchall_arrayref();
323 warn ("metabib.reingest_record_attributes failed for record $_");