2 # ---------------------------------------------------------------
3 # Copyright © 2013,2014 Merrimack Valley Library Consortium
4 # Jason Stephenson <jstephenson@mvlc.org>
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
15 # ---------------------------------------------------------------
16 # TODO: Document with POD.
17 # This guy parallelizes a reingest.
23 # Globals for the command line options: --
25 # You will want to adjust the next two based on your database size,
26 # i.e. number of bib records as well as the number of cores on your
27 # database server. Using roughly number of cores/2 doesn't seem to
28 # have much impact in off peak times.
29 my $batch_size = 10000; # records processed per batch
30 my $max_child = 8; # max number of parallel worker processes
32 my $skip_browse; # Skip the browse reingest.
33 my $skip_attrs; # Skip the record attributes reingest.
34 my $skip_search; # Skip the search reingest.
35 my $skip_facets; # Skip the facets reingest.
36 my $skip_display; # Skip the display reingest.
37 my $rebuild_rmsr; # Rebuild reporter.materialized_simple_record.
38 my $start_id; # start processing at this bib ID.
39 my $end_id; # stop processing when this bib ID is reached.
40 my $max_duration; # max processing duration in seconds
41 my $help; # show help text
42 my $opt_pipe; # Read record ids from STDIN.
43 my $record_attrs; # Record attributes for metabib.reingest_record_attributes.
45 # Database connection options with defaults:
46 my $db_user = $ENV{PGUSER} || 'evergreen';
47 my $db_host = $ENV{PGHOST} || 'localhost';
48 my $db_db = $ENV{PGDATABASE} || 'evergreen';
49 my $db_password = $ENV{PGPASSWORD} || 'evergreen';
50 my $db_port = $ENV{PGPORT} || 5432;
53 'user=s' => \$db_user,
54 'host=s' => \$db_host,
56 'password=s' => \$db_password,
57 'port=i' => \$db_port,
58 'batch-size=i' => \$batch_size,
59 'max-child=i' => \$max_child,
60 'skip-browse' => \$skip_browse,
61 'skip-attrs' => \$skip_attrs,
62 'skip-search' => \$skip_search,
63 'skip-facets' => \$skip_facets,
64 'skip-display' => \$skip_display,
65 'rebuild-rmsr' => \$rebuild_rmsr,
66 'start-id=i' => \$start_id,
67 'end-id=i' => \$end_id,
69 'max-duration=i' => \$max_duration,
70 'attr=s@' => \$record_attrs,
77 $0 --batch-size $batch_size --max-child $max_child \
78 --start-id 1 --end-id 500000 --duration 14400
81 Number of records to process per batch
84 Max number of worker processes
91 Skip the selected reingest component
94 Specify a record attribute for ingest
95 This option can be used more than once to specify multiple
97 This option is ignored if --skip-attrs is also given.
99 Rebuild the reporter.materialized_simple_record table.
102 Start processing at this record ID.
105 Stop processing when this record ID is reached
108 Read record IDs to reingest from standard input.
109 This option conflicts with --start-id and/or --end-id.
112 Stop processing after this many total seconds have passed.
123 # Check for mutually exclusive options:
124 if ($opt_pipe && ($start_id || $end_id)) {
125 warn('Mutually exclusive options');
129 my $where = "WHERE deleted = 'f'";
130 if ($start_id && $end_id) {
131 $where .= " AND id BETWEEN $start_id AND $end_id";
132 } elsif ($start_id) {
133 $where .= " AND id >= $start_id";
135 $where .= " AND id <= $end_id";
138 # "Gimme the keys! I'll drive!"
141 FROM biblio.record_entry
146 # Stuffs needed for looping, tracking how many lists of records we
147 # have, storing the actual list of records, and the list of the lists
149 my ($count, $lists, $records) = (0,0,[]);
151 # To do the browse-only ingest:
154 my $start_epoch = time;
156 sub duration_expired {
157 return 1 if $max_duration && (time - $start_epoch) >= $max_duration;
161 # All of the DBI->connect() calls in this file assume that you have
162 # configured the PGHOST, PGPORT, PGDATABASE, PGUSER, and PGPASSWORD
163 # variables in your execution environment. If you have not, you have
168 # 2) edit the DBI->connect() calls in this program so that it can
169 # connect to your database.
171 # Get the input records from either standard input or the database.
175 # Assume any string of digits is an id.
176 if (my @subs = /([0-9]+)/g) {
181 my $dbh = DBI->connect("DBI:Pg:database=$db_db;host=$db_host;port=$db_port;application_name=pingest",
182 $db_user, $db_password);
183 @input = @{$dbh->selectcol_arrayref($q)};
187 foreach my $record (@input) {
188 push(@blist, $record); # separate list of browse-only ingest
189 push(@$records, $record);
190 if (++$count == $batch_size) {
191 $lol[$lists++] = $records;
196 $lol[$lists++] = $records if ($count); # Last batch is likely to be
199 # We're going to reuse $count to keep track of the total number of
203 # @running keeps track of the running child processes.
206 # We start the browse-only ingest before starting the other ingests.
207 browse_ingest(@blist) unless ($skip_browse);
209 # We loop until we have processed all of the batches stored in @lol
210 # or the maximum processing duration has been reached.
211 while ($count < $lists) {
212 my $duration_expired = duration_expired();
214 if (scalar(@lol) && scalar(@running) < $max_child && !$duration_expired) {
215 # Reuse $records for the lulz.
216 $records = shift(@lol);
217 if ($skip_search && $skip_facets && $skip_attrs && $skip_display) {
224 if (grep {$_ == $pid} @running) {
225 @running = grep {$_ != $pid} @running;
227 print "$count of $lists processed\n";
231 if ($duration_expired && scalar(@running) == 0) {
232 warn "Exiting on max_duration ($max_duration)\n";
237 # Rebuild reporter.materialized_simple_record after the ingests.
238 rmsr_rebuild() if ($rebuild_rmsr);
240 # This subroutine forks a process to do the browse-only ingest on the
241 # @blist above. It cannot be parallelized, but can run in parrallel
242 # to the other ingests.
246 if (!defined($pid)) {
247 die "failed to spawn child";
249 # Add our browser to the list of running children.
250 push(@running, $pid);
251 # Increment the number of lists, because this list was not
252 # previously counted.
254 } elsif ($pid == 0) {
255 my $dbh = DBI->connect("DBI:Pg:database=$db_db;host=$db_host;port=$db_port;application_name=pingest",
256 $db_user, $db_password);
257 my $sth = $dbh->prepare('SELECT metabib.reingest_metabib_field_entries(bib_id := ?, skip_facet := TRUE, skip_browse := FALSE, skip_search := TRUE, skip_display := TRUE)');
259 if ($sth->execute($_)) {
260 my $crap = $sth->fetchall_arrayref();
262 warn ("Browse ingest failed for record $_");
264 if (duration_expired()) {
265 warn "browse_ingest() stopping on record $_ ".
266 "after max duration reached\n";
275 # Fork a child to do the other reingests:
280 if (!defined($pid)) {
281 die "Failed to spawn a child";
283 push(@running, $pid);
284 } elsif ($pid == 0) {
285 my $dbh = DBI->connect("DBI:Pg:database=$db_db;host=$db_host;port=$db_port;application_name=pingest",
286 $db_user, $db_password);
287 reingest_attributes($dbh, $list) unless ($skip_attrs);
288 reingest_field_entries($dbh, $list)
289 unless ($skip_facets && $skip_search && $skip_display);
295 # Reingest metabib field entries on a list of records.
296 sub reingest_field_entries {
299 my $sth = $dbh->prepare('SELECT metabib.reingest_metabib_field_entries(bib_id := ?, skip_facet := ?, skip_browse := TRUE, skip_search := ?, skip_display := ?)');
300 # Because reingest uses "skip" options we invert the logic of do variables.
301 $sth->bind_param(2, ($skip_facets) ? 1 : 0);
302 $sth->bind_param(3, ($skip_search) ? 1 : 0);
303 $sth->bind_param(4, ($skip_display) ? 1: 0);
305 $sth->bind_param(1, $_);
306 if ($sth->execute()) {
307 my $crap = $sth->fetchall_arrayref();
309 warn ("metabib.reingest_metabib_field_entries failed for record $_");
314 # Reingest record attributes on a list of records.
315 sub reingest_attributes {
318 my $sth = $dbh->prepare(<<END_OF_INGEST
319 SELECT metabib.reingest_record_attributes(rid := id, prmarc := marc, pattr_list := ?)
320 FROM biblio.record_entry
324 $sth->bind_param(1, $record_attrs);
326 $sth->bind_param(2, $_);
327 if ($sth->execute()) {
328 my $crap = $sth->fetchall_arrayref();
330 warn ("metabib.reingest_record_attributes failed for record $_");
335 # Rebuild/refresh reporter.materialized_simple_record
337 print("Rebuilding reporter.materialized_simple_record\n");
338 my $dbh = DBI->connect("DBI:Pg:database=$db_db;host=$db_host;port=$db_port;application_name=pingest",
339 $db_user, $db_password);
340 $dbh->selectall_arrayref("SELECT reporter.refresh_materialized_simple_record();");