eb5434e69882fa533fb76266808227ba2fb85a7d
[working/Evergreen.git] / Open-ILS / src / support-scripts / pingest.pl
1 #!/usr/bin/perl
2 # ---------------------------------------------------------------
3 # Copyright © 2013,2014 Merrimack Valley Library Consortium
4 # Jason Stephenson <jstephenson@mvlc.org>
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 # GNU General Public License for more details.
15 # ---------------------------------------------------------------
16 # TODO: Document with POD.
17 # This guy parallelizes a reingest.
18 use strict;
19 use warnings;
20 use DBI;
21 use Getopt::Long;
22
23 # Globals for the command line options: --
24
25 # You will want to adjust the next two based on your database size,
26 # i.e. number of bib records as well as the number of cores on your
27 # database server.  Using roughly number of cores/2 doesn't seem to
28 # have much impact in off peak times.
29 my $batch_size = 10000; # records processed per batch
30 my $max_child  = 8;     # max number of parallel worker processes
31
32 my $skip_browse;  # Skip the browse reingest.
33 my $skip_attrs;   # Skip the record attributes reingest.
34 my $skip_search;  # Skip the search reingest.
35 my $skip_facets;  # Skip the facets reingest.
36 my $start_id;     # start processing at this bib ID.
37 my $end_id;       # stop processing when this bib ID is reached.
38 my $max_duration; # max processing duration in seconds
39 my $help;         # show help text
40 my $opt_pipe;     # Read record ids from STDIN.
41
42 # Database connection options with defaults:
43 my $db_user = $ENV{PGUSER} || 'evergreen';
44 my $db_host = $ENV{PGHOST} || 'localhost';
45 my $db_db = $ENV{PGDATABASE} || 'evergreen';
46 my $db_password = $ENV{PGPASSWORD} || 'evergreen';
47 my $db_port = $ENV{PGPORT} || 5432;
48
49 GetOptions(
50     'user=s'         => \$db_user,
51     'host=s'         => \$db_host,
52     'db=s'           => \$db_db,
53     'password=s'     => \$db_password,
54     'port=i'         => \$db_port,
55     'batch-size=i'   => \$batch_size,
56     'max-child=i'    => \$max_child,
57     'skip-browse'    => \$skip_browse,
58     'skip-attrs'     => \$skip_attrs,
59     'skip-search'    => \$skip_search,
60     'skip-facets'    => \$skip_facets,
61     'start-id=i'     => \$start_id,
62     'end-id=i'       => \$end_id,
63     'pipe'           => \$opt_pipe,
64     'max-duration=i' => \$max_duration,
65     'help'           => \$help
66 );
67
68 sub help {
69     print <<HELP;
70
71     $0 --batch-size $batch_size --max-child $max_child \
72         --start-id 1 --end-id 500000 --duration 14400
73
74     --batch-size
75         Number of records to process per batch
76
77     --max-child
78         Max number of worker processes
79
80     --skip-browse
81     --skip-attrs
82     --skip-search
83     --skip-facets
84         Skip the selected reingest component
85
86     --start-id
87         Start processing at this record ID.
88
89     --end-id
90         Stop processing when this record ID is reached
91
92     --pipe
93         Read record IDs to reingest from standard input.
94         This option conflicts with --start-id and/or --end-id.
95
96     --max-duration
97         Stop processing after this many total seconds have passed.
98
99     --help
100         Show this help text.
101
102 HELP
103     exit;
104 }
105
106 help() if $help;
107
108 # Check for mutually exclusive options:
109 if ($opt_pipe && ($start_id || $end_id)) {
110     warn('Mutually exclusive options');
111     help();
112 }
113
114 my $where = "WHERE deleted = 'f'";
115 if ($start_id && $end_id) {
116     $where .= " AND id BETWEEN $start_id AND $end_id";
117 } elsif ($start_id) {
118     $where .= " AND id >= $start_id";
119 } elsif ($end_id) {
120     $where .= " AND id <= $end_id";
121 }
122
123 # "Gimme the keys!  I'll drive!"
124 my $q = <<END_OF_Q;
125 SELECT id
126 FROM biblio.record_entry
127 $where
128 ORDER BY id ASC
129 END_OF_Q
130
131 # Stuffs needed for looping, tracking how many lists of records we
132 # have, storing the actual list of records, and the list of the lists
133 # of records.
134 my ($count, $lists, $records) = (0,0,[]);
135 my @lol = ();
136 # To do the browse-only ingest:
137 my @blist = ();
138
139 my $start_epoch = time;
140
141 sub duration_expired {
142     return 1 if $max_duration && (time - $start_epoch) >= $max_duration;
143     return 0;
144 }
145
146 # All of the DBI->connect() calls in this file assume that you have
147 # configured the PGHOST, PGPORT, PGDATABASE, PGUSER, and PGPASSWORD
148 # variables in your execution environment.  If you have not, you have
149 # two options:
150 #
151 # 1) configure them
152 #
153 # 2) edit the DBI->connect() calls in this program so that it can
154 # connect to your database.
155
156 # Get the input records from either standard input or the database.
157 my @input;
158 if ($opt_pipe) {
159     while (<STDIN>) {
160         # Assume any string of digits is an id.
161         if (my @subs = /([0-9]+)/g) {
162             push(@input, @subs);
163         }
164     }
165 } else {
166     my $dbh = DBI->connect("DBI:Pg:database=$db_db;host=$db_host;port=$db_port;application_name=pingest",
167                            $db_user, $db_password);
168     @input = @{$dbh->selectcol_arrayref($q)};
169     $dbh->disconnect();
170 }
171
172 foreach my $record (@input) {
173     push(@blist, $record); # separate list of browse-only ingest
174     push(@$records, $record);
175     if (++$count == $batch_size) {
176         $lol[$lists++] = $records;
177         $count = 0;
178         $records = [];
179     }
180 }
181 $lol[$lists++] = $records if ($count); # Last batch is likely to be
182                                        # small.
183
184 # We're going to reuse $count to keep track of the total number of
185 # batches processed.
186 $count = 0;
187
188 # @running keeps track of the running child processes.
189 my @running = ();
190
191 # We start the browse-only ingest before starting the other ingests.
192 browse_ingest(@blist) unless ($skip_browse);
193
194 # We loop until we have processed all of the batches stored in @lol
195 # or the maximum processing duration has been reached.
196 while ($count < $lists) {
197     my $duration_expired = duration_expired();
198
199     if (scalar(@lol) && scalar(@running) < $max_child && !$duration_expired) {
200         # Reuse $records for the lulz.
201         $records = shift(@lol);
202         if ($skip_search && $skip_facets && $skip_attrs) {
203             $count++;
204         } else {
205             reingest($records);
206         }
207     } else {
208         my $pid = wait();
209         if (grep {$_ == $pid} @running) {
210             @running = grep {$_ != $pid} @running;
211             $count++;
212             print "$count of $lists processed\n";
213         }
214     }
215
216     if ($duration_expired && scalar(@running) == 0) {
217         warn "Exiting on max_duration ($max_duration)\n";
218         exit(0);
219     }
220 }
221
222 # This subroutine forks a process to do the browse-only ingest on the
223 # @blist above.  It cannot be parallelized, but can run in parrallel
224 # to the other ingests.
225 sub browse_ingest {
226     my @list = @_;
227     my $pid = fork();
228     if (!defined($pid)) {
229         die "failed to spawn child";
230     } elsif ($pid > 0) {
231         # Add our browser to the list of running children.
232         push(@running, $pid);
233         # Increment the number of lists, because this list was not
234         # previously counted.
235         $lists++;
236     } elsif ($pid == 0) {
237         my $dbh = DBI->connect("DBI:Pg:database=$db_db;host=$db_host;port=$db_port;application_name=pingest",
238                                $db_user, $db_password);
239         my $sth = $dbh->prepare('SELECT metabib.reingest_metabib_field_entries(bib_id => ?, skip_facet => TRUE, skip_browse => FALSE, skip_search => TRUE)');
240         foreach (@list) {
241             if ($sth->execute($_)) {
242                 my $crap = $sth->fetchall_arrayref();
243             } else {
244                 warn ("Browse ingest failed for record $_");
245             }
246             if (duration_expired()) {
247                 warn "browse_ingest() stopping on record $_ ".
248                     "after max duration reached\n";
249                 last;
250             }
251         }
252         $dbh->disconnect();
253         exit(0);
254     }
255 }
256
257 # Fork a child to do the other reingests:
258
259 sub reingest {
260     my $list = shift;
261     my $pid = fork();
262     if (!defined($pid)) {
263         die "Failed to spawn a child";
264     } elsif ($pid > 0) {
265         push(@running, $pid);
266     } elsif ($pid == 0) {
267         my $dbh = DBI->connect("DBI:Pg:database=$db_db;host=$db_host;port=$db_port;application_name=pingest",
268                                $db_user, $db_password);
269         reingest_attributes($dbh, $list) unless ($skip_attrs);
270         reingest_field_entries($dbh, $list)
271             unless ($skip_facets && $skip_search);
272         $dbh->disconnect();
273         exit(0);
274     }
275 }
276
277 # Reingest metabib field entries on a list of records.
278 sub reingest_field_entries {
279     my $dbh = shift;
280     my $list = shift;
281     my $sth = $dbh->prepare('SELECT metabib.reingest_metabib_field_entries(bib_id => ?, skip_facet => ?, TRUE, skip_search => ?)');
282     # Because reingest uses "skip" options we invert the logic of do variables.
283     $sth->bind_param(2, ($skip_facets) ? 1 : 0);
284     $sth->bind_param(3, ($skip_search) ? 1 : 0);
285     foreach (@$list) {
286         $sth->bind_param(1, $_);
287         if ($sth->execute()) {
288             my $crap = $sth->fetchall_arrayref();
289         } else {
290             warn ("metabib.reingest_metabib_field_entries failed for record $_");
291         }
292     }
293 }
294
295 # Reingest record attributes on a list of records.
296 sub reingest_attributes {
297     my $dbh = shift;
298     my $list = shift;
299     my $sth = $dbh->prepare(<<END_OF_INGEST
300 SELECT metabib.reingest_record_attributes(rid => id, prmarc => marc)
301 FROM biblio.record_entry
302 WHERE id = ?
303 END_OF_INGEST
304     );
305     foreach (@$list) {
306         $sth->bind_param(1, $_);
307         if ($sth->execute()) {
308             my $crap = $sth->fetchall_arrayref();
309         } else {
310             warn ("metabib.reingest_record_attributes failed for record $_");
311         }
312     }
313 }