2 # ---------------------------------------------------------------
3 # Copyright © 2013,2014 Merrimack Valley Library Consortium
4 # Jason Stephenson <jstephenson@mvlc.org>
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
15 # ---------------------------------------------------------------
16 # TODO: Document with POD.
17 # This guy parallelizes a reingest.
23 # Globals for the command line options: --
25 # You will want to adjust the next two based on your database size,
26 # i.e. number of bib records as well as the number of cores on your
27 # database server. Using roughly number of cores/2 doesn't seem to
28 # have much impact in off peak times.
29 my $batch_size = 10000; # records processed per batch
30 my $max_child = 8; # max number of parallel worker processes
32 my $delay_dym; # Delay DYM symspell dictionary reification.
33 my $skip_browse; # Skip the browse reingest.
34 my $skip_attrs; # Skip the record attributes reingest.
35 my $skip_search; # Skip the search reingest.
36 my $skip_facets; # Skip the facets reingest.
37 my $skip_display; # Skip the display reingest.
38 my $rebuild_rmsr; # Rebuild reporter.materialized_simple_record.
39 my $start_id; # start processing at this bib ID.
40 my $end_id; # stop processing when this bib ID is reached.
41 my $max_duration; # max processing duration in seconds
42 my $help; # show help text
43 my $opt_pipe; # Read record ids from STDIN.
44 my $record_attrs; # Record attributes for metabib.reingest_record_attributes.
46 # Database connection options with defaults:
47 my $db_user = $ENV{PGUSER} || 'evergreen';
48 my $db_host = $ENV{PGHOST} || 'localhost';
49 my $db_db = $ENV{PGDATABASE} || 'evergreen';
50 my $db_password = $ENV{PGPASSWORD} || 'evergreen';
51 my $db_port = $ENV{PGPORT} || 5432;
54 'user=s' => \$db_user,
55 'host=s' => \$db_host,
57 'password=s' => \$db_password,
58 'port=i' => \$db_port,
59 'batch-size=i' => \$batch_size,
60 'max-child=i' => \$max_child,
61 'delay-symspell' => \$delay_dym,
62 'skip-browse' => \$skip_browse,
63 'skip-attrs' => \$skip_attrs,
64 'skip-search' => \$skip_search,
65 'skip-facets' => \$skip_facets,
66 'skip-display' => \$skip_display,
67 'rebuild-rmsr' => \$rebuild_rmsr,
68 'start-id=i' => \$start_id,
69 'end-id=i' => \$end_id,
71 'max-duration=i' => \$max_duration,
72 'attr=s@' => \$record_attrs,
79 $0 --batch-size $batch_size --max-child $max_child \
80 --start-id 1 --end-id 500000 --max-duration 14400
83 Number of records to process per batch
86 Max number of worker processes
89 Delay reification of symspell dictionary entries
90 This can provide a significant speedup for large ingests.
91 NOTE: This will cause concurrent, unrelated symspell
92 updates to be delayed as well. This is usually not a
93 concern in an existing database as the dictionary is
94 generally complete and only the details of use counts
95 will change due to reingests and record inserts/updates.
102 Skip the selected reingest component
105 Specify a record attribute for ingest
106 This option can be used more than once to specify multiple
107 attributes to ingest.
108 This option is ignored if --skip-attrs is also given.
110 Rebuild the reporter.materialized_simple_record table.
113 Start processing at this record ID.
116 Stop processing when this record ID is reached
119 Read record IDs to reingest from standard input.
120 This option conflicts with --start-id and/or --end-id.
123 Stop processing after this many total seconds have passed.
134 # Check for mutually exclusive options:
135 if ($opt_pipe && ($start_id || $end_id)) {
136 warn('Mutually exclusive options');
140 my $where = "WHERE deleted = 'f'";
141 if ($start_id && $end_id) {
142 $where .= " AND id BETWEEN $start_id AND $end_id";
143 } elsif ($start_id) {
144 $where .= " AND id >= $start_id";
146 $where .= " AND id <= $end_id";
149 # "Gimme the keys! I'll drive!"
152 FROM biblio.record_entry
157 # Stuffs needed for looping, tracking how many lists of records we
158 # have, storing the actual list of records, and the list of the lists
160 my ($count, $lists, $records) = (0,0,[]);
162 # To do the browse-only ingest:
165 my $start_epoch = time;
167 sub duration_expired {
168 return 1 if $max_duration && (time - $start_epoch) >= $max_duration;
172 # All of the DBI->connect() calls in this file assume that you have
173 # configured the PGHOST, PGPORT, PGDATABASE, PGUSER, and PGPASSWORD
174 # variables in your execution environment. If you have not, you have
179 # 2) edit the DBI->connect() calls in this program so that it can
180 # connect to your database.
182 # Get the input records from either standard input or the database.
186 # Assume any string of digits is an id.
187 if (my @subs = /([0-9]+)/g) {
192 my $dbh = DBI->connect("DBI:Pg:database=$db_db;host=$db_host;port=$db_port;application_name=pingest",
193 $db_user, $db_password);
194 @input = @{$dbh->selectcol_arrayref($q)};
198 foreach my $record (@input) {
199 push(@blist, $record); # separate list of browse-only ingest
200 push(@$records, $record);
201 if (++$count == $batch_size) {
202 $lol[$lists++] = $records;
207 $lol[$lists++] = $records if ($count); # Last batch is likely to be
210 # We're going to reuse $count to keep track of the total number of
214 # Disable inline reification of symspell data during the main ingest process
216 my $dbh = DBI->connect("DBI:Pg:database=$db_db;host=$db_host;port=$db_port;application_name=pingest",
217 $db_user, $db_password);
218 $dbh->do('SELECT search.disable_symspell_reification()');
222 # @running keeps track of the running child processes.
225 # We start the browse-only ingest before starting the other ingests.
226 browse_ingest(@blist) unless ($skip_browse);
228 # We loop until we have processed all of the batches stored in @lol
229 # or the maximum processing duration has been reached.
230 while ($count < $lists) {
231 my $duration_expired = duration_expired();
233 if (scalar(@lol) && scalar(@running) < $max_child && !$duration_expired) {
234 # Reuse $records for the lulz.
235 $records = shift(@lol);
236 if ($skip_search && $skip_facets && $skip_attrs && $skip_display) {
243 if (grep {$_ == $pid} @running) {
244 @running = grep {$_ != $pid} @running;
246 print "$count of $lists processed\n";
250 if ($duration_expired && scalar(@running) == 0) {
251 symspell_reification() if ($delay_dym);
252 warn "Exiting on max_duration ($max_duration)\n";
257 # Incorporate symspell updates if they were delayed
258 symspell_reification() if ($delay_dym);
260 # Rebuild reporter.materialized_simple_record after the ingests.
261 rmsr_rebuild() if ($rebuild_rmsr);
263 # This sub should be called at the end of the run if symspell updates
264 # were delayed using the --delay-dym command line flag.
265 sub symspell_reification {
266 my $dbh = DBI->connect("DBI:Pg:database=$db_db;host=$db_host;port=$db_port;application_name=pingest",
267 $db_user, $db_password);
268 $dbh->do('SELECT search.enable_symspell_reification()');
269 $dbh->do('SELECT search.symspell_dictionary_full_reify()');
271 # There might be a race condition above if non-pingest record updates
272 # were started before the first of the two statements above, but ended
273 # after the second one, so we'll wait a few seconds and then look again.
276 # This count will always be 0 when symspell reification is done inline
277 # rather than delayed, because it is handled by a trigger that runs
278 # inside the transaction that causes inline reification.
279 my ($recheck) = $dbh->selectrow_array('SELECT COUNT(*) FROM search.symspell_dictionary_updates');
280 $dbh->do('SELECT search.symspell_dictionary_full_reify()') if ($recheck);
284 # This subroutine forks a process to do the browse-only ingest on the
285 # @blist above. It cannot be parallelized, but can run in parrallel
286 # to the other ingests.
290 if (!defined($pid)) {
291 die "failed to spawn child";
293 # Add our browser to the list of running children.
294 push(@running, $pid);
295 # Increment the number of lists, because this list was not
296 # previously counted.
298 } elsif ($pid == 0) {
299 my $dbh = DBI->connect("DBI:Pg:database=$db_db;host=$db_host;port=$db_port;application_name=pingest",
300 $db_user, $db_password);
301 my $sth = $dbh->prepare('SELECT metabib.reingest_metabib_field_entries(bib_id := ?, skip_facet := TRUE, skip_browse := FALSE, skip_search := TRUE, skip_display := TRUE)');
303 if ($sth->execute($_)) {
304 my $crap = $sth->fetchall_arrayref();
306 warn ("Browse ingest failed for record $_");
308 if (duration_expired()) {
309 warn "browse_ingest() stopping on record $_ ".
310 "after max duration reached\n";
319 # Fork a child to do the other reingests:
324 if (!defined($pid)) {
325 die "Failed to spawn a child";
327 push(@running, $pid);
328 } elsif ($pid == 0) {
329 my $dbh = DBI->connect("DBI:Pg:database=$db_db;host=$db_host;port=$db_port;application_name=pingest",
330 $db_user, $db_password);
331 reingest_attributes($dbh, $list) unless ($skip_attrs);
332 reingest_field_entries($dbh, $list)
333 unless ($skip_facets && $skip_search && $skip_display);
339 # Reingest metabib field entries on a list of records.
340 sub reingest_field_entries {
343 my $sth = $dbh->prepare('SELECT metabib.reingest_metabib_field_entries(bib_id := ?, skip_facet := ?, skip_browse := TRUE, skip_search := ?, skip_display := ?)');
344 # Because reingest uses "skip" options we invert the logic of do variables.
345 $sth->bind_param(2, ($skip_facets) ? 1 : 0);
346 $sth->bind_param(3, ($skip_search) ? 1 : 0);
347 $sth->bind_param(4, ($skip_display) ? 1: 0);
349 $sth->bind_param(1, $_);
350 if ($sth->execute()) {
351 my $crap = $sth->fetchall_arrayref();
353 warn ("metabib.reingest_metabib_field_entries failed for record $_");
358 # Reingest record attributes on a list of records.
359 sub reingest_attributes {
362 my $sth = $dbh->prepare(<<END_OF_INGEST
363 SELECT metabib.reingest_record_attributes(rid := id, prmarc := marc, pattr_list := ?, rdeleted := deleted)
364 FROM biblio.record_entry
368 $sth->bind_param(1, $record_attrs);
370 $sth->bind_param(2, $_);
371 if ($sth->execute()) {
372 my $crap = $sth->fetchall_arrayref();
374 warn ("metabib.reingest_record_attributes failed for record $_");
379 # Rebuild/refresh reporter.materialized_simple_record
381 print("Rebuilding reporter.materialized_simple_record\n");
382 my $dbh = DBI->connect("DBI:Pg:database=$db_db;host=$db_host;port=$db_port;application_name=pingest",
383 $db_user, $db_password);
384 $dbh->selectall_arrayref("SELECT reporter.refresh_materialized_simple_record();");