]> git.evergreen-ils.org Git - Evergreen.git/blob - Open-ILS/src/support-scripts/pingest.pl
LP 1768715: Add --pipe option to pingest.pl
[Evergreen.git] / Open-ILS / src / support-scripts / pingest.pl
1 #!/usr/bin/perl
2 # ---------------------------------------------------------------
3 # Copyright © 2013,2014 Merrimack Valley Library Consortium
4 # Jason Stephenson <jstephenson@mvlc.org>
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 # GNU General Public License for more details.
15 # ---------------------------------------------------------------
16 # TODO: Document with POD.
17 # This guy parallelizes a reingest.
18 use strict;
19 use warnings;
20 use DBI;
21 use Getopt::Long;
22
23 # Globals for the command line options: --
24
25 # You will want to adjust the next two based on your database size,
26 # i.e. number of bib records as well as the number of cores on your
27 # database server.  Using roughly number of cores/2 doesn't seem to
28 # have much impact in off peak times.
29 my $batch_size = 10000; # records processed per batch
30 my $max_child  = 8;     # max number of parallel worker processes
31
32 my $skip_browse;  # Skip the browse reingest.
33 my $skip_attrs;   # Skip the record attributes reingest.
34 my $skip_search;  # Skip the search reingest.
35 my $skip_facets;  # Skip the facets reingest.
36 my $start_id;     # start processing at this bib ID.
37 my $end_id;       # stop processing when this bib ID is reached.
38 my $max_duration; # max processing duration in seconds
39 my $help;         # show help text
40 my $opt_pipe;     # Read record ids from STDIN.
41
42 GetOptions(
43     'batch-size=i'   => \$batch_size,
44     'max-child=i'    => \$max_child,
45     'skip-browse'    => \$skip_browse,
46     'skip-attrs'     => \$skip_attrs,
47     'skip-search'    => \$skip_search,
48     'skip-facets'    => \$skip_facets,
49     'start-id=i'     => \$start_id,
50     'end-id=i'       => \$end_id,
51     'pipe'           => \$opt_pipe,
52     'max-duration=i' => \$max_duration,
53     'help'           => \$help
54 );
55
56 sub help {
57     print <<HELP;
58
59     $0 --batch-size $batch_size --max-child $max_child \
60         --start-id 1 --end-id 500000 --duration 14400
61
62     --batch-size
63         Number of records to process per batch
64
65     --max-child
66         Max number of worker processes
67
68     --skip-browse
69     --skip-attrs
70     --skip-search
71     --skip-facets
72         Skip the selected reingest component
73
74     --start-id
75         Start processing at this record ID.
76
77     --end-id
78         Stop processing when this record ID is reached
79
80     --pipe
81         Read record IDs to reingest from standard input.
82         This option conflicts with --start-id and/or --end-id.
83
84     --max-duration
85         Stop processing after this many total seconds have passed.
86
87     --help
88         Show this help text.
89
90 HELP
91     exit;
92 }
93
94 help() if $help;
95
96 # Check for mutually exclusive options:
97 if ($opt_pipe && ($start_id || $end_id)) {
98     warn('Mutually exclusive options');
99     help();
100 }
101
102 my $where = "WHERE deleted = 'f'";
103 if ($start_id && $end_id) {
104     $where .= " AND id BETWEEN $start_id AND $end_id";
105 } elsif ($start_id) {
106     $where .= " AND id >= $start_id";
107 } elsif ($end_id) {
108     $where .= " AND id <= $end_id";
109 }
110
111 # "Gimme the keys!  I'll drive!"
112 my $q = <<END_OF_Q;
113 SELECT id
114 FROM biblio.record_entry
115 $where
116 ORDER BY id ASC
117 END_OF_Q
118
119 # Stuffs needed for looping, tracking how many lists of records we
120 # have, storing the actual list of records, and the list of the lists
121 # of records.
122 my ($count, $lists, $records) = (0,0,[]);
123 my @lol = ();
124 # To do the browse-only ingest:
125 my @blist = ();
126
127 my $start_epoch = time;
128
129 sub duration_expired {
130     return 1 if $max_duration && (time - $start_epoch) >= $max_duration;
131     return 0;
132 }
133
134 # All of the DBI->connect() calls in this file assume that you have
135 # configured the PGHOST, PGPORT, PGDATABASE, PGUSER, and PGPASSWORD
136 # variables in your execution environment.  If you have not, you have
137 # two options:
138 #
139 # 1) configure them
140 #
141 # 2) edit the DBI->connect() calls in this program so that it can
142 # connect to your database.
143 my $dbh = DBI->connect('DBI:Pg:');
144
145 # Get the input records from either standard input or the database.
146 my @input;
147 if ($opt_pipe) {
148     while (<STDIN>) {
149         # Want only numbers, one per line.
150         if ($_ =~ /([0-9]+)/) {
151             push(@input, $1);
152         }
153     }
154 } else {
155     @input = ($dbh->selectall_arrayref($q));
156 }
157
158 foreach my $r (@input) {
159     my $record = (ref($r)) ? $r->[0] : $r;
160     push(@blist, $record); # separate list of browse-only ingest
161     push(@$records, $record);
162     if (++$count == $batch_size) {
163         $lol[$lists++] = $records;
164         $count = 0;
165         $records = [];
166     }
167 }
168 $lol[$lists++] = $records if ($count); # Last batch is likely to be
169                                        # small.
170 $dbh->disconnect();
171
172 # We're going to reuse $count to keep track of the total number of
173 # batches processed.
174 $count = 0;
175
176 # @running keeps track of the running child processes.
177 my @running = ();
178
179 # We start the browse-only ingest before starting the other ingests.
180 browse_ingest(@blist) unless ($skip_browse);
181
182 # We loop until we have processed all of the batches stored in @lol
183 # or the maximum processing duration has been reached.
184 while ($count < $lists) {
185     my $duration_expired = duration_expired();
186
187     if (scalar(@lol) && scalar(@running) < $max_child && !$duration_expired) {
188         # Reuse $records for the lulz.
189         $records = shift(@lol);
190         if ($skip_search && $skip_facets && $skip_attrs) {
191             $count++;
192         } else {
193             reingest($records);
194         }
195     } else {
196         my $pid = wait();
197         if (grep {$_ == $pid} @running) {
198             @running = grep {$_ != $pid} @running;
199             $count++;
200             print "$count of $lists processed\n";
201         }
202     }
203
204     if ($duration_expired && scalar(@running) == 0) {
205         warn "Exiting on max_duration ($max_duration)\n";
206         exit(0);
207     }
208 }
209
210 # This subroutine forks a process to do the browse-only ingest on the
211 # @blist above.  It cannot be parallelized, but can run in parrallel
212 # to the other ingests.
213 sub browse_ingest {
214     my @list = @_;
215     my $pid = fork();
216     if (!defined($pid)) {
217         die "failed to spawn child";
218     } elsif ($pid > 0) {
219         # Add our browser to the list of running children.
220         push(@running, $pid);
221         # Increment the number of lists, because this list was not
222         # previously counted.
223         $lists++;
224     } elsif ($pid == 0) {
225         my $dbh = DBI->connect('DBI:Pg:');
226         my $sth = $dbh->prepare("SELECT metabib.reingest_metabib_field_entries(?, TRUE, FALSE, TRUE)");
227         foreach (@list) {
228             if ($sth->execute($_)) {
229                 my $crap = $sth->fetchall_arrayref();
230             } else {
231                 warn ("Browse ingest failed for record $_");
232             }
233             if (duration_expired()) {
234                 warn "browse_ingest() stopping on record $_ ".
235                     "after max duration reached\n";
236                 last;
237             }
238         }
239         $dbh->disconnect();
240         exit(0);
241     }
242 }
243
244 # Fork a child to do the other reingests:
245
246 sub reingest {
247     my $list = shift;
248     my $pid = fork();
249     if (!defined($pid)) {
250         die "Failed to spawn a child";
251     } elsif ($pid > 0) {
252         push(@running, $pid);
253     } elsif ($pid == 0) {
254         my $dbh = DBI->connect('DBI:Pg:');
255         reingest_attributes($dbh, $list) unless ($skip_attrs);
256         reingest_field_entries($dbh, $list)
257             unless ($skip_facets && $skip_search);
258         $dbh->disconnect();
259         exit(0);
260     }
261 }
262
263 # Reingest metabib field entries on a list of records.
264 sub reingest_field_entries {
265     my $dbh = shift;
266     my $list = shift;
267     my $sth = $dbh->prepare("SELECT metabib.reingest_metabib_field_entries(?, ?, TRUE, ?)");
268     # Because reingest uses "skip" options we invert the logic of do variables.
269     $sth->bind_param(2, ($skip_facets) ? 1 : 0);
270     $sth->bind_param(3, ($skip_search) ? 1 : 0);
271     foreach (@$list) {
272         $sth->bind_param(1, $_);
273         if ($sth->execute()) {
274             my $crap = $sth->fetchall_arrayref();
275         } else {
276             warn ("metabib.reingest_metabib_field_entries failed for record $_");
277         }
278     }
279 }
280
281 # Reingest record attributes on a list of records.
282 sub reingest_attributes {
283     my $dbh = shift;
284     my $list = shift;
285     my $sth = $dbh->prepare(<<END_OF_INGEST
286 SELECT metabib.reingest_record_attributes(id, NULL::TEXT[], marc)
287 FROM biblio.record_entry
288 WHERE id = ?
289 END_OF_INGEST
290     );
291     foreach (@$list) {
292         $sth->bind_param(1, $_);
293         if ($sth->execute()) {
294             my $crap = $sth->fetchall_arrayref();
295         } else {
296             warn ("metabib.reingest_record_attributes failed for record $_");
297         }
298     }
299 }