2 # ---------------------------------------------------------------
3 # Copyright © 2013,2014 Merrimack Valley Library Consortium
4 # Jason Stephenson <jstephenson@mvlc.org>
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
15 # ---------------------------------------------------------------
16 # TODO: Document with POD.
17 # This guy parallelizes a reingest.
23 # Globals for the command line options: --
25 # You will want to adjust the next two based on your database size,
26 # i.e. number of bib records as well as the number of cores on your
27 # database server. Using roughly number of cores/2 doesn't seem to
28 # have much impact in off peak times.
29 my $batch_size = 10000; # records processed per batch
30 my $max_child = 8; # max number of parallel worker processes
32 my $skip_browse; # Skip the browse reingest.
33 my $skip_attrs; # Skip the record attributes reingest.
34 my $skip_search; # Skip the search reingest.
35 my $skip_facets; # Skip the facets reingest.
36 my $skip_display; # Skip the display reingest.
37 my $start_id; # start processing at this bib ID.
38 my $end_id; # stop processing when this bib ID is reached.
39 my $max_duration; # max processing duration in seconds
40 my $help; # show help text
41 my $opt_pipe; # Read record ids from STDIN.
43 # Database connection options with defaults:
44 my $db_user = $ENV{PGUSER} || 'evergreen';
45 my $db_host = $ENV{PGHOST} || 'localhost';
46 my $db_db = $ENV{PGDATABASE} || 'evergreen';
47 my $db_password = $ENV{PGPASSWORD} || 'evergreen';
48 my $db_port = $ENV{PGPORT} || 5432;
51 'user=s' => \$db_user,
52 'host=s' => \$db_host,
54 'password=s' => \$db_password,
55 'port=i' => \$db_port,
56 'batch-size=i' => \$batch_size,
57 'max-child=i' => \$max_child,
58 'skip-browse' => \$skip_browse,
59 'skip-attrs' => \$skip_attrs,
60 'skip-search' => \$skip_search,
61 'skip-facets' => \$skip_facets,
62 'skip-display' => \$skip_display,
63 'start-id=i' => \$start_id,
64 'end-id=i' => \$end_id,
66 'max-duration=i' => \$max_duration,
73 $0 --batch-size $batch_size --max-child $max_child \
74 --start-id 1 --end-id 500000 --duration 14400
77 Number of records to process per batch
80 Max number of worker processes
87 Skip the selected reingest component
90 Start processing at this record ID.
93 Stop processing when this record ID is reached
96 Read record IDs to reingest from standard input.
97 This option conflicts with --start-id and/or --end-id.
100 Stop processing after this many total seconds have passed.
111 # Check for mutually exclusive options:
112 if ($opt_pipe && ($start_id || $end_id)) {
113 warn('Mutually exclusive options');
117 my $where = "WHERE deleted = 'f'";
118 if ($start_id && $end_id) {
119 $where .= " AND id BETWEEN $start_id AND $end_id";
120 } elsif ($start_id) {
121 $where .= " AND id >= $start_id";
123 $where .= " AND id <= $end_id";
126 # "Gimme the keys! I'll drive!"
129 FROM biblio.record_entry
134 # Stuffs needed for looping, tracking how many lists of records we
135 # have, storing the actual list of records, and the list of the lists
137 my ($count, $lists, $records) = (0,0,[]);
139 # To do the browse-only ingest:
142 my $start_epoch = time;
144 sub duration_expired {
145 return 1 if $max_duration && (time - $start_epoch) >= $max_duration;
149 # All of the DBI->connect() calls in this file assume that you have
150 # configured the PGHOST, PGPORT, PGDATABASE, PGUSER, and PGPASSWORD
151 # variables in your execution environment. If you have not, you have
156 # 2) edit the DBI->connect() calls in this program so that it can
157 # connect to your database.
159 # Get the input records from either standard input or the database.
163 # Assume any string of digits is an id.
164 if (my @subs = /([0-9]+)/g) {
169 my $dbh = DBI->connect("DBI:Pg:database=$db_db;host=$db_host;port=$db_port;application_name=pingest",
170 $db_user, $db_password);
171 @input = @{$dbh->selectcol_arrayref($q)};
175 foreach my $record (@input) {
176 push(@blist, $record); # separate list of browse-only ingest
177 push(@$records, $record);
178 if (++$count == $batch_size) {
179 $lol[$lists++] = $records;
184 $lol[$lists++] = $records if ($count); # Last batch is likely to be
187 # We're going to reuse $count to keep track of the total number of
191 # @running keeps track of the running child processes.
194 # We start the browse-only ingest before starting the other ingests.
195 browse_ingest(@blist) unless ($skip_browse);
197 # We loop until we have processed all of the batches stored in @lol
198 # or the maximum processing duration has been reached.
199 while ($count < $lists) {
200 my $duration_expired = duration_expired();
202 if (scalar(@lol) && scalar(@running) < $max_child && !$duration_expired) {
203 # Reuse $records for the lulz.
204 $records = shift(@lol);
205 if ($skip_search && $skip_facets && $skip_attrs && $skip_display) {
212 if (grep {$_ == $pid} @running) {
213 @running = grep {$_ != $pid} @running;
215 print "$count of $lists processed\n";
219 if ($duration_expired && scalar(@running) == 0) {
220 warn "Exiting on max_duration ($max_duration)\n";
225 # This subroutine forks a process to do the browse-only ingest on the
226 # @blist above. It cannot be parallelized, but can run in parrallel
227 # to the other ingests.
231 if (!defined($pid)) {
232 die "failed to spawn child";
234 # Add our browser to the list of running children.
235 push(@running, $pid);
236 # Increment the number of lists, because this list was not
237 # previously counted.
239 } elsif ($pid == 0) {
240 my $dbh = DBI->connect("DBI:Pg:database=$db_db;host=$db_host;port=$db_port;application_name=pingest",
241 $db_user, $db_password);
242 my $sth = $dbh->prepare('SELECT metabib.reingest_metabib_field_entries(bib_id := ?, skip_facet := TRUE, skip_browse := FALSE, skip_search := TRUE, skip_display := TRUE)');
244 if ($sth->execute($_)) {
245 my $crap = $sth->fetchall_arrayref();
247 warn ("Browse ingest failed for record $_");
249 if (duration_expired()) {
250 warn "browse_ingest() stopping on record $_ ".
251 "after max duration reached\n";
260 # Fork a child to do the other reingests:
265 if (!defined($pid)) {
266 die "Failed to spawn a child";
268 push(@running, $pid);
269 } elsif ($pid == 0) {
270 my $dbh = DBI->connect("DBI:Pg:database=$db_db;host=$db_host;port=$db_port;application_name=pingest",
271 $db_user, $db_password);
272 reingest_attributes($dbh, $list) unless ($skip_attrs);
273 reingest_field_entries($dbh, $list)
274 unless ($skip_facets && $skip_search && $skip_display);
280 # Reingest metabib field entries on a list of records.
281 sub reingest_field_entries {
284 my $sth = $dbh->prepare('SELECT metabib.reingest_metabib_field_entries(bib_id := ?, skip_facet := ?, skip_browse := TRUE, skip_search := ?, skip_display := ?)');
285 # Because reingest uses "skip" options we invert the logic of do variables.
286 $sth->bind_param(2, ($skip_facets) ? 1 : 0);
287 $sth->bind_param(3, ($skip_search) ? 1 : 0);
288 $sth->bind_param(4, ($skip_display) ? 1: 0);
290 $sth->bind_param(1, $_);
291 if ($sth->execute()) {
292 my $crap = $sth->fetchall_arrayref();
294 warn ("metabib.reingest_metabib_field_entries failed for record $_");
299 # Reingest record attributes on a list of records.
300 sub reingest_attributes {
303 my $sth = $dbh->prepare(<<END_OF_INGEST
304 SELECT metabib.reingest_record_attributes(rid := id, prmarc := marc)
305 FROM biblio.record_entry
310 $sth->bind_param(1, $_);
311 if ($sth->execute()) {
312 my $crap = $sth->fetchall_arrayref();
314 warn ("metabib.reingest_record_attributes failed for record $_");