]> git.evergreen-ils.org Git - Evergreen.git/blob - Open-ILS/src/support-scripts/ingest_ctl
LP 2061136 follow-up: ng lint --fix
[Evergreen.git] / Open-ILS / src / support-scripts / ingest_ctl
1 #!/usr/bin/perl
2 # ---------------------------------------------------------------
3 # Copyright © 2022 Equinox Open Library Initiative, INC.
4 # Mike Rylander <mrylander@gmail.com>
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 # GNU General Public License for more details.
15 # ---------------------------------------------------------------
16
17 use strict;
18 use warnings;
19 use DBI;
20 use Getopt::Long;
21 use Time::HiRes qw/usleep time/;
22 use IO::Handle;
23 use POSIX ":sys_wait_h";
24 use Socket;
25 use OpenSRF::Utils ':daemon';
26
27 my $raise_db_error = 0;
28 # Globals for the command line options:
29  
30 my $opt_lockfile      = '/tmp/queued-ingest-coordinator-LOCK';
31 my $opt_logfile       = '/tmp/queued-ingest-coordinator-LOG';
32 my $daemon     = 0;     # do we go into the background?
33 my $stop       = 0;     # stop a running coordinator, if the lock file is there
34 my $chatty     = 0;     # do we yell into the void?
35 my $opt_max_child;      # max number of parallel worker processes
36 my $max_child  = 20;    # max number of parallel worker processes
37 my $max_child_force;    # max number of parallel worker processes
38
39 # for enqueuing mode
40 my $start_id;               # start processing at this record ID.
41 my $end_id;                 # stop processing when this record ID is reached.
42 my $opt_pipe;               # Read record ids from STDIN.
43 my $stats_only;             # Just report on QI processing stats/progress
44 my $totals_only;            # Only give top-line aggregate stats, no per day breakdown
45 my $stats_queues;           # Provide a list of queue-specfic stats
46 my $stats_since='001-01-01';# Default "since" time for stats
47 my $queue;                  # queue id, either pre-existing or created based on params
48 my $queue_type = 'biblio';  # type of records for batch enqueuing
49 my $queue_why;              # description for this reingest queue
50 my $queue_action = 'update';# what action is happening to the records: update, insert, delete, propagate
51 my $queue_state_data = '';  # State data required for queue entry processing
52 my $queue_owner;            # Owner of the queue 
53 my $queue_run_at;           # Owner of the queue 
54 my $queue_threads;          # parallelism for this queue (capped at max_child)
55 my $skip_browse = 0;        # Skip the browse reingest.
56 my $skip_attrs = 0;         # Skip the record attributes reingest.
57 my $skip_search = 0;        # Skip the search reingest.
58 my $skip_facets = 0;        # Skip the facets reingest.
59 my $skip_display = 0;       # Skip the display reingest.
60 my $skip_full_rec = 0;      # Skip the full_rec reingest.
61 my $skip_authority = 0;     # Skip the authority reingest.
62 my $skip_luri = 0;          # Skip the luri reingest.
63 my $skip_mrmap = 0;         # Skip the metarecord remapping.
64 my $record_attrs = [];      # Skip the metarecord remapping.
65 my $metabib_fields = [];    # Skip the metarecord remapping.
66 my $input_records = [];     # Records supplied via CLI switch.
67 my $pingest = '';           # Special "pingest" flag, supplying an EG user name as queue owner.
68
69 my $help;         # show help text
70
71 # Database connection options with defaults:
72 my $db_user = $ENV{PGUSER} || 'evergreen';
73 my $db_host = $ENV{PGHOST} || 'localhost';
74 my $db_db = $ENV{PGDATABASE} || 'evergreen';
75 my $db_pw = $ENV{PGPASSWORD} || 'evergreen';
76 my $db_port = $ENV{PGPORT} || 5432;
77
78 GetOptions(
79     'lock-file=s'       => \$opt_lockfile,
80     'log-file=s'        => \$opt_logfile,
81     'dbuser=s'          => \$db_user,
82     'dbhost=s'          => \$db_host,
83     'dbname=s'          => \$db_db,
84     'dbpw=s'            => \$db_pw,
85     'dbport=i'          => \$db_port,
86     'max-child=i'       => \$opt_max_child,
87     'max-child-force'   => \$max_child_force,
88     'stats'             => \$stats_only,
89     'totals-only'       => \$totals_only,
90     'queue-stats'       => \$stats_queues,
91     'since=s'           => \$stats_since,
92     'queue=i'           => \$queue,
93     'queue-action=s'    => \$queue_action,
94     'queue-name=s'      => \$queue_why,
95     'queue-type=s'      => \$queue_type,
96     'queue-owner=s'     => \$queue_owner,
97     'queue-run-at=s'    => \$queue_run_at,
98     'queue-threads=i'   => \$queue_threads,
99     'queue-state-data=s'=> \$queue_state_data,
100     'skip-browse'       => \$skip_browse,
101     'skip-attrs'        => \$skip_attrs,
102     'skip-search'       => \$skip_search,
103     'skip-facets'       => \$skip_facets,
104     'skip-display'      => \$skip_display,
105     'skip-full_rec'     => \$skip_full_rec,
106     'skip-authority'    => \$skip_authority,
107     'skip-luri'         => \$skip_luri,
108     'skip-mr-map'       => \$skip_mrmap,
109     'attr=s@'           => \$record_attrs,
110     'field=s@'          => \$metabib_fields,
111     'record=s@'         => \$input_records,
112     'start-id=i'        => \$start_id,
113     'end-id=i'          => \$end_id,
114     'pipe'              => \$opt_pipe,
115     'pingest=s'         => \$pingest,
116     'coordinator'       => \$daemon,
117     'stop'              => \$stop,
118     'chatty'            => \$chatty,
119     'help'              => \$help
120 ) or help();
121
122 sub help {
123     print <<HELP;
124
125     # Enqueue records 1-500000 for reingest later, just one worker for the queue
126     $0 --queue-threads 1 \
127         --queue-type biblio \
128         --queue-run-at tomorrow \
129         --queue-owner admin \
130         --queue-name "slowly updating records due to new RDA attributes"
131         --start-id 1 --end-id 500000
132
133     # Start the background worker
134     $0 --coordinator --max-child $max_child
135
136     # Stop the background worker
137     $0 --coordinator --stop
138
139     # Process whatever you can Right Now
140     $0 --max-child $max_child
141
142     # Process a single queue Right Now
143     $0 --queue 1234 --max-child $max_child
144
145     # Stats on Queued Ingest processing so far today
146     $0 --stats --since today --totals-only
147
148  General options
149
150     --help
151         Show this help text.
152
153     --queue
154         Id of an existing queue to report, process, or enqueue into
155
156
157  Reporting options
158
159     --stats
160         Request statistical information about Queued Ingest processing.
161         This option is required for other report options to work.
162
163     --totals-only
164         Only present aggregate total statistics for the reported time
165         rather than a daily breakdown.
166
167     --since
168         Limit statistics to processing that was scheduled to happen
169         at or after this timestamp.  Normal PostgreSQL shorthand for
170         timestamps are allowed, such as "today" or "yesterday".
171         Default: no limit
172
173     --queue-stats
174         Provide a per-queue breakdown of processing statistics for
175         the requested time.
176
177     --queue
178         ID of a queue about which to a breakdown of processing
179         statistics.
180
181
182  Processing options
183
184     --coordinator
185         Start the background watcher.
186         This option conficts with --pipe, --start-id, and --end-id.
187
188     --max-child
189         Max number of database workers to use for entries without a
190         named queue, or when enqueuing to a named queue, the number
191         of database workers to use for queue processing.
192
193
194 Enqueuing options
195
196     --queue-name
197         Name for a new queue
198
199     --queue-type
200         Type of records to be enqueued; biblio or authority
201         Default: biblio
202
203     --queue-action
204         Action triggering the record queuing; insert, update, delete,
205           propagate
206         Default: update
207
208     --queue-owner
209         User name of the owner of a new queue
210
211     --queue-run-at
212         ISO timestamp at which a queue should begin processing
213         Default: NOW
214
215     --queue-threads
216         Processing concurrency for a new queue, capped at $max_child
217         Default: 1
218
219     --queue-state-data
220         Any state data required for queue entry processing.  For
221         instance, the authority record ID when enqueuing a bib
222         record for authority propagation with the "propagate"
223         action.
224
225   ** If none of the name, owner, or run-at options are specified,
226   records will be enqueued for processing without a named ingest
227   queue.
228
229     --start-id
230         Start processing at this record ID.
231
232     --end-id
233         Stop processing when this record ID is reached
234
235     --pipe
236         Read record IDs to reingest from standard input.
237         This option conflicts with --start-id and/or --end-id.
238
239 HELP
240     exit;
241 }
242
243 help() if $help;
244
245 my $dsn = "dbi:Pg:dbname=$db_db;host=$db_host;port=$db_port;application_name='queued_ingest';sslmode=allow";
246
247 my $main_dbh = DBI->connect(
248     $dsn, $db_user, $db_pw,
249     { AutoCommit => 1,
250       pg_expand_array => 0,
251       pg_enable_utf8 => 1,
252       pg_bool_tf => 0,
253       RaiseError => $raise_db_error
254     }
255 ) || die "Could not connect to the database\n";
256
257 my $configured_max_child = $main_dbh->selectcol_arrayref(
258     "SELECT value FROM config.global_flag WHERE name = 'ingest.queued.max_threads'"
259 )->[0] || 20;
260 $max_child = $configured_max_child if (!$opt_max_child);
261
262
263 if (defined($opt_max_child) && $opt_max_child > 20 && !$max_child_force) {
264     warn('Max Child > 20 ... no, sorry');
265     help();
266 }
267
268 if ($opt_max_child) {
269     $max_child = $opt_max_child;
270 }
271
272 if ($max_child <= 0) {
273     $max_child = 20;
274 }
275
276 if ($opt_pipe && ($start_id || $end_id)) {
277     warn('Mutually exclusive options: either pipe or start/end range');
278     help();
279 }
280
281 if ($daemon && ($start_id || $end_id || $opt_pipe)) {
282     warn('Mutually exclusive options: cannot start or stop the Coordinator in Enqueuing mode');
283     help();
284 }
285
286 if (!$daemon && $stop) {
287     warn('Option --stop can only be used with the --coordinator option');
288     help();
289 }
290
291 if ($daemon && $queue) {
292     warn('Mutually exclusive options: cannot start or stop the Coordinator in one-shot processing mode');
293     help();
294 }
295
296 if ($queue_type && !(grep {$_ eq $queue_type} qw/biblio authority/)) {
297     warn('Valid queue types are biblio and authority');
298     help();
299 }
300
301 if (!(grep {$_ eq $queue_action} qw/insert update delete propagate/)) {
302     warn('Valid queue actions are: insert, update, delete, propagate');
303     help();
304 }
305
306 if ($queue && ($queue_owner || $queue_why || $queue_threads || $queue_run_at)) {
307     warn('Mutually exclusive options: specify a queue id OR queue creation values');
308     help();
309 }
310
311
312 if ($daemon) { # background mode, we need a lockfile;
313
314     if ($stop) {
315         die "Lockfile $opt_lockfile does not exist, is the coordinator running?\n" unless (-e $opt_lockfile);
316
317         open(F, "<$opt_lockfile") or die "Unable to open lockfile $opt_lockfile for reading, wrong user?\n";
318         my $old_pid = <F>;
319         close F;
320
321         if ($old_pid) {
322             if (kill(0,$old_pid)) {
323                 my $dead_count = kill(9,$old_pid);
324                 if ($dead_count) {
325                     warn "Coordinator process terminated, removing lock file $opt_lockfile\n";
326                     unlink($opt_lockfile) or die "Could not remove lock file $opt_lockfile\n";
327                 } else {
328                     die "Could not kill coordinator process $old_pid\n";
329                 }
330             } else {
331                 warn "Coordinator process not running, removing stale lock file\n";
332                 unlink($opt_lockfile) or die "Could not remove lock file $opt_lockfile\n";
333             }
334         } else {
335             warn "Coordinator lock file empty, removing lock file $opt_lockfile\n";
336             unlink($opt_lockfile) or die "Could not remove lock file $opt_lockfile\n";
337         }
338
339         exit;
340     }
341
342     # check the lockfile
343     die "I'm already running with lock-file $opt_lockfile\n" if (-e $opt_lockfile);
344
345     $main_dbh->disconnect;
346
347     daemonize("Queued Ingest Coordinator") if ($daemon);
348
349     # set the lockfile
350     open(F, ">$opt_lockfile") or die "Unable to open lockfile $opt_lockfile for writing\n";
351     print F $$;
352     close F;
353
354     open(STDERR, ">>$opt_logfile") if ($opt_logfile);
355 }
356
357 my $start_time = time;
358 my %stats;
359
360 sub reset_stats {
361   %stats = (
362     total => {
363     }, biblio    => {
364         insert    => {},
365         update    => {},
366         delete    => {},
367         propagate => {}
368     }, authority => {
369         insert    => {},
370         update    => {},
371         delete    => {}
372     }, seconds => {}
373   );
374 }
375
376 reset_stats();
377
378 my %processors;
379 my %queues_in_progress;
380
381 my $db_connections_in_use = 0;
382
383 if ($start_id || $end_id || $opt_pipe || @$input_records) { # enqueuing mode
384
385     if ($pingest) { # special mode that sets up two queues that can run in parallel
386
387         my $no_browse = $skip_browse;
388         my $orig_stat_data = $queue_state_data;
389
390         # set up the first queue
391         $queue = undef;
392         $queue_threads //= 4;
393         $queue_type = 'biblio';
394         $queue_action = 'update';
395         $queue_why = 'pingest - fields and attributes queue';
396         $queue_owner = $pingest;
397
398         # for pingest mode, always skip authority and luri, and skip browse in the first queue
399         $skip_browse = 1;
400         $skip_authority = 1;
401         $skip_luri = 1;
402         
403         my $record_list = enqueue_input();
404         report_stats('Enqueuing '.$queue_why);
405
406         if (!$no_browse and @$record_list) { # user didn't ask to skip browse reingest
407             # set up the second queue
408             $queue = undef;
409             $queue_threads //= 4;
410             $queue_why = 'pingest - browse queue';
411             $queue_state_data = $orig_stat_data;
412
413             $skip_browse = 0;
414             $skip_attrs = 1;
415             $skip_search = 1;
416             $skip_facets = 1;
417             $skip_display = 1;
418             $skip_full_rec = 1;
419             $skip_mrmap = 1;
420
421             reset_stats();
422
423             enqueue_input($record_list);
424             report_stats('Enqueuing '.$queue_why);
425         }
426
427     } else { # just a regular, user-defined QI request
428         enqueue_input();
429         report_stats('Enqueuing');
430     }
431
432
433 } elsif ($queue && !$stats_only) { # single queue processing mode
434
435     my $q = gather_one_queue($queue);
436     process_one_queue($q->{id}, $max_child);
437     complete_outstanding_queue($q->{id});
438     report_stats('Queue Processing');
439
440 } elsif (!$daemon && !$stats_only) { # special case: foreground single process, end after
441
442     my @dbhs = create_processor_dbhs($max_child);
443     my $clear = 0;
444
445     my $new_queues = gather_outstanding_queues(); # array ref of queues
446     for my $q (@$new_queues) {
447         process_one_queue($q->{id}, $max_child, \@dbhs);
448         complete_outstanding_queue($q->{id});
449         report_stats('Queue and Entry Processing', $clear++);
450     }
451
452     my $new_entries = gather_outstanding_nonqueue_entries('NULL'); # array ref of queue entries
453     my @eids = map { $$_{id} } @$new_entries;
454     while (my @current_subset = splice(@eids, 0, 10 * $max_child)) {
455         process_input_list(\@current_subset, \@dbhs);
456         report_stats('Queue and Entry Processing', $clear++);
457     }
458
459 } elsif($stats_only) {
460     my @output;
461
462     my $q_query = <<"    SQL";
463 SELECT  run_at::DATE AS scheduled_date,
464         SUM((start_time IS NULL)::INT) AS pending,
465         SUM((start_time IS NOT NULL AND end_time IS NULL)::INT) AS ongoing,
466         SUM((end_time IS NOT NULL)::INT) AS complete,
467         COUNT(*) AS total
468   FROM  action.ingest_queue
469   WHERE run_at >= ?
470   GROUP BY ROLLUP (1)
471   ORDER BY 1
472     SQL
473
474     if (!$queue) { # all queues in the time range
475         my $qstat_rows = $main_dbh->selectall_arrayref($q_query, { Slice => {} }, $stats_since);
476         if (@$qstat_rows > 1) {
477             $qstat_rows = [ $$qstat_rows[-1] ] if ($totals_only);
478
479             push @output, "Named Queue processing stats";
480             push @output, "============================","";
481             for my $row ( @$qstat_rows ) {
482                 push @output, "* Scheduled processing date: $$row{scheduled_date}" if ($$row{scheduled_date});
483                 push @output, "  Totals","  ------" unless ($$row{scheduled_date});
484
485                 push @output, "  Pending: $$row{pending}";
486                 push @output, "  Ongoing: $$row{ongoing}";
487                 push @output, " Complete: $$row{complete}";
488
489                 push @output,"";
490                 push @output,'-'x50;
491                 push @output,"";
492             }
493
494             push @output,"";
495         }
496     }
497
498     if ($stats_queues || $queue) {
499         $q_query = <<"        SQL";
500 SELECT  q.*,
501         u.usrname,
502         SUM((e.ingest_time IS NULL AND e.override_by IS NULL)::INT) AS pending,
503         SUM((e.override_by IS NOT NULL)::INT) AS overridden,
504         SUM((e.ingest_time IS NOT NULL)::INT) AS complete,
505         SUM((e.fail_time IS NOT NULL)::INT) AS failed,
506         COUNT(e.id) AS events
507   FROM  action.ingest_queue q
508         JOIN actor.usr u ON (q.who=u.id)
509         LEFT JOIN action.ingest_queue_entry e ON (e.queue=q.id)
510   WHERE q.XX ?
511   GROUP BY 1,2,3,4,5,6,7,8,9
512   ORDER BY q.run_at, q.id
513         SQL
514
515         my $param = $stats_since;
516         if ($queue) {
517             $param = $queue;
518             $q_query =~ s/XX/id =/;
519         } else {
520             $q_query =~ s/XX/run_at >=/;
521         }
522
523         my $qstat_rows = $main_dbh->selectall_arrayref($q_query, { Slice => {} }, $param);
524         if (@$qstat_rows) {
525
526             push @output, "Named Queue details";
527             push @output, "===================","";
528             for my $row ( @$qstat_rows ) {
529                 push @output, "* Queue id: $$row{id} | Threads: $$row{threads} | Owner: $$row{usrname}";
530                 push @output, "* Reason: $$row{why}";
531                 push @output, "* Create time: $$row{created}";
532                 push @output, "* Scheduled start time: $$row{run_at}";
533                 push @output, " - Started: $$row{start_time}";
534                 push @output, " - Ended  : $$row{end_time}","";
535                 push @output, "    Pending: $$row{pending}";
536                 push @output, " Overridden: $$row{overridden}";
537                 push @output, "   Complete: $$row{complete}";
538                 push @output, "     Failed: $$row{failed}";
539                 push @output, "      Total: $$row{events}";
540                 push @output, "Percent complete: " . sprintf('%.2f',(($$row{complete} + 1.0) / ($$row{events} - $$row{failed} + 1.0)) * 100.0);
541
542                 push @output,"";
543                 push @output,'-'x50;
544                 push @output,"";
545             }
546
547             push @output,"";
548         }
549     }
550
551     if (!$queue) {
552         my $e_query = <<"        SQL";
553 SELECT  run_at::DATE AS scheduled_date,
554         record_type,
555         action,
556         SUM((ingest_time IS NULL AND fail_time IS NULL AND override_by IS NULL AND queue IS NOT NULL)::INT) AS pending_with_queue,
557         SUM((ingest_time IS NULL AND fail_time IS NULL AND override_by IS NULL AND queue IS NULL)::INT) AS pending_without_queue,
558         SUM((ingest_time IS NULL AND fail_time IS NULL AND override_by IS NULL)::INT) AS pending,
559         SUM((ingest_time IS NOT NULL AND override_by IS NULL)::INT) AS processed,
560         SUM((ingest_time IS NOT NULL AND override_by IS NOT NULL)::INT) AS overridden,
561         SUM((fail_time IS NOT NULL)::INT) AS failed,
562         SUM((ingest_time IS NOT NULL)::INT) AS complete,
563         COUNT(*) AS total
564   FROM  action.ingest_queue_entry
565   WHERE run_at >= ?
566   GROUP BY 2,3, ROLLUP (1)
567   ORDER BY 1,2,3
568         SQL
569
570         my $estat_rows = $main_dbh->selectall_arrayref($e_query, { Slice => {} }, $stats_since);
571         if (@$estat_rows > 1) {
572             $estat_rows = [ grep { !defined($$_{scheduled_date}) } @$estat_rows ] if ($totals_only);
573
574             push @output, "Record processing stats";
575             push @output, "============================","";
576             for my $row ( @$estat_rows ) {
577                 push @output, "* Scheduled processing date: $$row{scheduled_date}" if ($$row{scheduled_date});
578                 push @output, "  Record Type: $$row{record_type}; Action: $$row{action}";
579                 push @output, "  Totals","  ------" unless ($$row{scheduled_date});
580
581                 push @output, "    Pending: $$row{pending}";
582                 push @output, "         ... $$row{pending_with_queue} with a named queue" if $$row{pending};
583                 push @output, "         ... $$row{pending_without_queue} without a named queue" if $$row{pending};
584                 push @output, "  Processed: $$row{processed}";
585                 push @output, " Overridden: $$row{overridden}";
586                 push @output, "   Complete: $$row{complete}";
587                 push @output, "     Failed: $$row{failed}";
588                 push @output, "      Total: $$row{total}";
589
590                 push @output,"";
591                 push @output,'-'x50;
592                 push @output,"";
593             }
594         }
595
596         push @output,"";
597     }
598
599     print join("\n", @output);
600
601 } elsif($daemon) { # background processing
602
603     $SIG{CHLD} = \&REAPER;
604
605     my %queues_in_waiting;
606     my @orphan_entries;
607
608     my $pid = spawn_kid();
609     my $orphan_processor = $processors{$pid};
610     $orphan_processor->{state} = 0; # will be decremented (made true) once used
611     print {$orphan_processor->{pipe}} "n:Orphan processor\n"; # set parallelism
612
613     refresh_main_dbh()->do('LISTEN queued_ingest');
614
615     my $loops = 1;
616     while ($loops) {
617
618         warn_if_chatty("starting processing loop");
619
620         my @complete_processors = grep { $_->{state} == 3 } values %processors;
621         warn_if_chatty("".scalar(@complete_processors)." complete for cleanup");
622         for my $p (@complete_processors) {
623
624             if (my $dead_pid = $$p{DEAD}) {
625                 warn_if_chatty("processor $dead_pid already cleaned up, final flush");
626                 delete $processors{$dead_pid}{$_} for keys %{$processors{$dead_pid}};
627                 delete $processors{$dead_pid};
628                 next;
629             }
630
631             $$p{DEAD} = $$p{pid};
632             warn_if_chatty("phase 1 cleanup of processor $$p{DEAD}");
633             $db_connections_in_use -= $$p{threads};
634
635             if ($$p{queues}) {
636                 for my $q (keys %{$$p{queues}}) {
637                     warn_if_chatty("processor $$p{pid} finished processing queue $q");
638                     delete $queues_in_progress{$q}{processors}{$$p{pid}};
639                     if (!scalar(keys(%{$queues_in_progress{$q}{processors}}))) { # that was the last processor for the queue
640                         warn_if_chatty("queue $q complete");
641                         complete_outstanding_queue($q);
642                         delete $queues_in_progress{$q};
643                         delete $$p{queues}{$q};
644                     }
645                 }
646             }
647         }
648
649         warn_if_chatty("".scalar(keys(%queues_in_progress))." queues in progress");
650         warn_if_chatty("checking for new queues");
651
652         my $new_queues = gather_outstanding_queues(); # array ref of queues
653         if (@$new_queues) {
654             warn_if_chatty("".scalar(@$new_queues)." new queues");
655             $queues_in_waiting{$_->{id}} = $_ for @$new_queues;
656
657             my @ready_kids = grep { $_->{state} == 1 } values %processors;
658             if (my $needed = scalar(@$new_queues) - scalar(@ready_kids)) {
659                 warn_if_chatty("spawning $needed new processors");
660                 spawn_kid() while ($needed--);
661             }
662
663             @ready_kids = grep { $_->{state} == 1 } values %processors;
664
665             my @sorted_queues = sort {$$a{run_at} cmp $$b{run_at}} values %queues_in_waiting;
666             for my $q (@sorted_queues) {
667
668                 my $local_max = $max_child - $db_connections_in_use;
669                 if ($local_max > 0) { # we have connections available
670                     my $ready = shift @ready_kids;
671                     next unless $ready;
672
673                     # cap at unused max if more threads were requested
674                     $$q{threads} = $local_max if ($$q{threads} > $local_max);
675                     $ready->{threads} = $$q{threads};
676                     $ready->{state} = 2; # running now
677                     $ready->{queues}{$$q{id}} = 1;
678
679                     $queues_in_progress{$$q{id}} = delete $queues_in_waiting{$$q{id}};
680                     $queues_in_progress{$$q{id}}{processors}{$ready->{pid}} = 1;
681
682                     $db_connections_in_use += $$q{threads};
683                     print {$ready->{pipe}} "n:Queue $$q{id}\n";
684                     print {$ready->{pipe}} "p:$$q{threads}\n";
685                     print {$ready->{pipe}} "q:$$q{id}\n";
686                     shutdown($ready->{pipe},2);
687                 } else {
688                     warn_if_chatty("no db connections available, we'll wait");
689                 }
690             }
691         }
692
693         warn_if_chatty("checking orphan processor");
694
695         if ($orphan_processor->{state} == 3) { # replace it
696
697             warn_if_chatty("replacing orphan processor, it is finished");
698
699             $pid = spawn_kid();
700             $orphan_processor = $processors{$pid};
701             $orphan_processor->{state} = 0; # will be decremented (made true) once used
702             print {$orphan_processor->{pipe}} "n:Orphan processor\n"; # set parallelism
703         }
704
705         warn_if_chatty("gathering orphan entries");
706
707         my $new_entries = gather_outstanding_nonqueue_entries(10 * $max_child); # array ref of queue entries
708         if ($new_entries and @$new_entries) {
709             warn_if_chatty("".scalar(@$new_entries)." new entries");
710             if ($orphan_processor->{state}) { # already processing some entries
711                 warn_if_chatty("orphan processor is busy, wait and loop");
712             } else {
713                 my $local_max = $max_child - $db_connections_in_use;
714                 if ($local_max > 0) {
715                     $orphan_processor->{state}--;
716                     $db_connections_in_use += $local_max;
717                     $orphan_processor->{threads} = $local_max;
718                     $orphan_processor->{entries} = [ map { $$_{id} } @$new_entries ];
719                     print {$orphan_processor->{pipe}} "p:$local_max\n"; # set parallelism
720                     print {$orphan_processor->{pipe}} "e:$_\n" for (@{$orphan_processor->{entries}});
721                     print {$orphan_processor->{pipe}} "!!\n";
722                     shutdown($orphan_processor->{pipe},2);
723                 } else {
724                     warn_if_chatty("no db connections available for the orphan processor, wait and loop");
725                 }
726             }
727         }
728
729         my $inner_loops = 0;
730         warn_if_chatty("waiting for LISTEN notifications");
731         until (defined(refresh_main_dbh()->pg_notifies)) {
732             sleep(1);
733             $inner_loops++;
734             if ($inner_loops >= 10) {
735                 last; # loop after 10s at most
736             }
737         }
738
739         $loops++;
740     }
741 }
742
743 exit;
744
745 sub refresh_main_dbh {
746     unless ($main_dbh->ping) {
747         $main_dbh = DBI->connect(
748             $dsn, $db_user, $db_pw,
749             { AutoCommit => 1,
750               pg_expand_array => 0,
751               pg_enable_utf8 => 1,
752               pg_bool_tf => 0,
753               RaiseError => $raise_db_error
754             }
755         ) || die "Could not connect to the database\n";
756         $main_dbh->do('LISTEN queued_ingest') if ($daemon);
757     }
758     return $main_dbh;
759 }
760
761 sub REAPER {
762     local $!;   # don't let waitpid() overwrite current error
763     warn_if_chatty("REAPER called");
764     while ((my $pid = waitpid(-1, WNOHANG)) > 0) {
765         warn_if_chatty("reaping kid $pid");
766         $processors{$pid}{state} = 3;
767     }
768     $SIG{CHLD} = \&REAPER;
769 }
770
771 sub create_processor_dbhs {
772     my $count = shift;
773
774     my @dbhs;
775     while (scalar(@dbhs) < $count) {
776         push @dbhs, DBI->connect(
777             $dsn, $db_user, $db_pw,
778             { AutoCommit => 1,
779               pg_expand_array => 0,
780               pg_enable_utf8 => 1,
781               pg_bool_tf => 0,
782               RaiseError => $raise_db_error
783             }
784         );
785     }
786
787     return @dbhs;
788 }
789
790 sub complete_outstanding_queue {
791     my $qid = shift;
792
793     return refresh_main_dbh()->do(
794         'UPDATE action.ingest_queue SET end_time = NOW()'.
795         ' WHERE id=? RETURNING *',
796         {}, $qid
797     );
798 }
799
800 sub gather_one_queue {
801     my $qid = shift;
802     my $q = refresh_main_dbh()->selectrow_hashref(
803         'UPDATE action.ingest_queue SET start_time = NOW()'.
804         ' WHERE id = ? RETURNING *',
805         {},$qid
806     );
807
808     return $q;
809 }
810
811 sub gather_outstanding_queues {
812     my $qs = refresh_main_dbh()->selectall_hashref(
813         'UPDATE action.ingest_queue SET start_time = NOW()'.
814         ' WHERE start_time IS NULL AND run_at <= NOW()'.
815         ' RETURNING *',
816         'id'
817     );
818
819     for my $q (values %$qs) {
820         $q->{threads} ||= 1;
821     }
822
823     return [values %$qs];
824 }
825
826 sub gather_outstanding_nonqueue_entries {
827     my $limit = shift;
828     return refresh_main_dbh()->selectall_arrayref(
829         "SELECT * FROM action.ingest_queue_entry".
830         " WHERE queue IS NULL".
831         "       AND run_at <= NOW()".
832         "       AND override_by IS NULL".
833         "       AND ingest_time IS NULL".
834         "       AND fail_time IS NULL".
835         " ORDER BY run_at, id".
836         " LIMIT $limit",
837         { Slice => {} }
838     );
839 }
840
841 sub spawn_kid {
842     my $parent;
843     my $child;
844
845     socketpair($child, $parent, AF_UNIX, SOCK_STREAM, PF_UNSPEC)
846         ||  die "socketpair: $!";
847
848     $parent->autoflush(1);
849     $child->autoflush(1);
850
851     my $kid_pid = fork() //
852         die "Could not fork worker process";
853
854     if ($kid_pid) {
855         close($parent);
856         $processors{$kid_pid} = {
857             pipe  => $child,
858             pid   => $kid_pid,
859             state => 1
860         };
861     } else {
862         set_psname("Queued Ingest worker - waiting");
863         open(STDERR, ">>$opt_logfile") if ($opt_logfile);
864         $SIG{CHLD} = 'IGNORE';
865         close($child);
866         process_commands_from_parent($parent);
867         warn_if_chatty("finished processing commands from parent, exiting");
868         report_stats("Entry Processing - worker $$",0,1);
869         exit;
870     }
871
872     return $kid_pid;
873 }
874
875 sub warn_if_chatty {
876     return unless $chatty;
877
878     my $msg = shift;
879     my $time = time;
880     warn "$time [$$] $msg\n";
881 }
882
883 sub process_commands_from_parent {
884     my $ppipe = shift;
885
886     my $stop_after = 0;
887     while (1) {
888         my @input;
889         my $dbh_count = 1;
890         my $cont = 0;
891         while (<$ppipe>) {
892             $cont = 1;
893             chomp;
894             if (/^q:(\d+)$/) { # we were passed a queue id
895                 my $queue = $1;
896                 $stop_after = 1;
897                 warn_if_chatty("processing queue $queue, should exit after");
898                 process_one_queue($queue,$dbh_count);
899                 warn_if_chatty("processing queue $queue complete");
900             } elsif (/^n:(.+)$/) { # we were passed a process name
901                 set_psname("Queued Ingest worker - ". $1);
902             } elsif (/^p:(\d+)$/) { # we were passed a dbh count (parallelism)
903                 $dbh_count = $1 || 1;
904                 warn_if_chatty("parallelism set to $dbh_count");
905             } elsif (/^e:(\d+)$/) { # we were passed an entry id
906                 my $entry = $1;
907                 push @input, $entry;
908             } elsif (/^##$/) { # This is the "process those, but then wait" command
909                 last;
910             } elsif (/^!!$/) { # This is the "end of input, process and exit" command
911                 warn_if_chatty("end of the command stream, should exit after");
912                 $stop_after = 1;
913                 last;
914             }
915         }
916
917         # we have a list of entries to process
918         if (my $icount = scalar(@input)) {
919             my @dbhs = create_processor_dbhs($dbh_count);
920             warn_if_chatty("processing $icount entries...");
921             process_input_list(\@input, \@dbhs);
922             warn_if_chatty("processing $icount entries complete");
923         }
924
925         last if $stop_after || !$cont;
926     }
927
928     close($ppipe);
929 }
930
931 sub process_one_queue {
932     my $qid = shift;
933     my $dbh_count = shift || 1;
934     my $dbh_list = shift;
935
936     return unless $qid;
937
938     my @dbhs = $dbh_list ? @$dbh_list : create_processor_dbhs($dbh_count);
939     my @input = @{$dbhs[0]->selectcol_arrayref(
940         'SELECT id FROM action.ingest_queue_entry'.
941         ' WHERE queue = ?'.
942         '       AND override_by IS NULL'.
943         '       AND ingest_time IS NULL'.
944         '       AND fail_time IS NULL',
945         {}, $qid
946     )};
947
948     return unless @input;
949     return process_input_list(\@input, \@dbhs);
950 }
951
952 sub process_entry_begin {
953     my $entry_id = shift;
954     my $ms = shift;
955     my $dbh = shift;
956
957             
958     my $entry = $dbh->selectrow_hashref(
959         "SELECT * FROM action.ingest_queue_entry WHERE id = ?", undef, $entry_id
960     );
961
962     if (!$$entry{id}) {
963         return $dbh;
964     }
965
966     my $sth = $dbh->prepare(
967         "SELECT action.process_ingest_queue_entry(?)", {pg_async => 1}
968     );
969
970     if (!$sth->execute($entry_id)) {
971         $stats{total}{fail}++;
972         $stats{$$entry{record_type}}{$$entry{action}}{fail}++;
973
974         my $current_second = CORE::time;
975         $stats{seconds}{$current_second}{fail}++;
976         $stats{seconds}{$current_second}{total}++;
977
978         return $dbh;
979     }
980
981     $$ms{$entry_id} = {
982         entry   => $entry,
983         dbh     => $dbh,
984         sth     => $sth
985     };
986
987     return undef;
988 }
989
990 sub process_entry_complete {
991     my $eid = shift;
992     my $ms = shift;
993
994     if ($$ms{$eid}{sth}->pg_ready) {
995         $$ms{$eid}{sth}->pg_result;
996         my ($success) = $$ms{$eid}{sth}->fetchrow_array;
997         $$ms{$eid}{sth}->finish;
998
999         $success = $success ? 'success' : 'fail';
1000         $stats{total}{$success}++;
1001         $stats{$$ms{$eid}{entry}{record_type}}{$$ms{$eid}{entry}{action}}{$success}++;
1002
1003         my $current_second = CORE::time;
1004         $stats{seconds}{$current_second}{$success}++;
1005         $stats{seconds}{$current_second}{total}++;
1006
1007         my $dbh = delete $$ms{$eid}{dbh};
1008         delete $$ms{$eid}{$_} for keys %{$$ms{$eid}};
1009         delete $$ms{$eid};
1010         return $dbh;
1011     }
1012
1013     return undef;
1014 }
1015
1016 sub process_input_list {
1017     my $input = shift;
1018     my $dbhs = shift;
1019
1020     my %microstate;
1021     while (@$input || keys(%microstate)) {
1022
1023         # do we have an idle worker, and work to do?
1024         if (@$input && scalar(@$dbhs)) {
1025             my $entry_id = shift @$input;
1026             my $dbh = shift @$dbhs;
1027             my $failed_dbh = process_entry_begin($entry_id, \%microstate, $dbh);
1028             if ($failed_dbh) {
1029                 push @$dbhs, $failed_dbh;
1030                 next;
1031             }
1032         }
1033
1034         # look at them in ascending order, as the oldest will have
1035         # been running the longest and is more likely to be finished
1036         my @entries = sort {$a <=> $b} keys %microstate;
1037         for my $eid (@entries) {
1038             my $success_dbh = process_entry_complete($eid, \%microstate);
1039             if ($success_dbh) {
1040                 push @$dbhs, $success_dbh;
1041             }
1042         }
1043
1044         usleep(10000) if (keys %microstate); # ~0.01 seconds
1045     }
1046
1047     return $dbhs;
1048 }
1049
1050 sub report_stats {
1051     my $label = shift;
1052     my $clear = shift;
1053     my $warn = shift;
1054     my $runtime = time - $start_time;
1055     my @seconds_list = sort keys %{$stats{seconds}};
1056     my $first_second = $seconds_list[0];
1057     my $last_second = $seconds_list[-1];
1058     my $processing_seconds = ($last_second - $first_second) + 1.0;
1059
1060     system('clear') if $clear;
1061
1062     print "$label stats:\n" unless $warn;
1063     warn "$label stats:\n" if $warn;
1064     for my $type (qw/biblio authority/) {
1065         for my $action ( sort keys %{$stats{$type}} ) {
1066             my $part = $stats{$type}{$action};
1067             next unless (defined($$part{success}) || defined($$part{fail}));
1068
1069             $$part{success} //= 0;
1070             $$part{fail} //= 0;
1071
1072             my $subtotal = $$part{success} + $$part{fail};
1073             print " * Type: $type\n  - Action: $action :: $$part{success}/$$part{fail}/$subtotal\n" unless $warn;
1074             warn " * Type: $type\n  - Action: $action :: $$part{success}/$$part{fail}/$subtotal\n" if $warn;
1075         }
1076     }
1077
1078     $stats{total}{success} //= 0;
1079     $stats{total}{fail} //= 0;
1080
1081     my $per_sec = ($stats{total}{success} + $stats{total}{fail}) / $processing_seconds;
1082
1083     print " * Timing: $runtime runtime, $processing_seconds seconds processing, $per_sec per second\n".
1084           " * Total: $stats{total}{success} succeeded, $stats{total}{fail} failed\n" unless $warn;
1085     warn " * Timing: $runtime runtime, $processing_seconds seconds processing, $per_sec per second\n".
1086           " * Total: $stats{total}{success} succeeded, $stats{total}{fail} failed\n" if $warn;
1087 }
1088
1089 sub enqueue_input {
1090     my $predestined_input = shift;
1091     my @input;
1092
1093     if ($predestined_input and @$predestined_input) {
1094         @input = @$predestined_input;
1095     } elsif ($opt_pipe) {
1096         while (<STDIN>) {
1097             # Assume any string of digits is an id.
1098             if (my @subs = /([0-9]+)/g) {
1099                 push(@input, @subs);
1100             }
1101         }
1102     } elsif (@$input_records) {
1103         @input = grep { /^\d+$/ } @$input_records;
1104     } else {
1105         my $q = "SELECT id FROM $queue_type.record_entry WHERE NOT DELETED";
1106         if ($start_id && $end_id) {
1107             $q .= " AND id BETWEEN $start_id AND $end_id";
1108         } elsif ($start_id) {
1109             $q .= " AND id >= $start_id";
1110         } elsif ($end_id) {
1111             $q .= " AND id <= $end_id";
1112         }
1113         $q .= ' ORDER BY id ASC';
1114
1115         @input = @{$main_dbh->selectcol_arrayref($q)};
1116     }
1117
1118     $main_dbh->begin_work;
1119     if ($queue_why || $queue_threads || $queue_run_at) {
1120         my $rows = $main_dbh->do(
1121             'INSERT INTO action.ingest_queue (why,threads,run_at) VALUES(?,?,?)',
1122             undef, $queue_why, $queue_threads || 1, $queue_run_at || 'NOW'
1123         );
1124         if ($rows && $queue_owner) {
1125             $queue = $main_dbh->last_insert_id(undef,'action','ingest_queue',undef);
1126             if ($queue && $queue_owner) {
1127                 my $uid = $main_dbh->selectcol_arrayref(
1128                     'SELECT id FROM actor.usr WHERE usrname = ?',
1129                     undef, $queue_owner
1130                 )->[0];
1131                 if ($uid) {
1132                     $rows = $main_dbh->do(
1133                         'UPDATE action.ingest_queue SET who = ? WHERE id = ?',
1134                         undef, $uid, $queue
1135                     );
1136                 }
1137             }
1138         }
1139     }
1140
1141     my $q_obj = {};
1142     if ($queue) {
1143         $q_obj = $main_dbh->selectrow_hashref("SELECT * FROM action.ingest_queue WHERE id=$queue") || {};
1144     }
1145     $queue = $q_obj->{id} || '0'; 
1146
1147     if ($queue_type eq 'biblio' and $queue_action eq 'update') {
1148         $queue_state_data .= ';skip_browse' if $skip_browse;
1149         $queue_state_data .= ';skip_attrs' if $skip_attrs;
1150         $queue_state_data .= ';skip_search' if $skip_search;
1151         $queue_state_data .= ';skip_facets' if $skip_facets;
1152         $queue_state_data .= ';skip_display' if $skip_display;
1153         $queue_state_data .= ';skip_full_rec' if $skip_full_rec;
1154         $queue_state_data .= ';skip_authority' if $skip_authority;
1155         $queue_state_data .= ';skip_luri' if $skip_luri;
1156         $queue_state_data .= ';skip_mrmap' if $skip_mrmap;
1157
1158         $queue_state_data .= ';attr_list('.join(',',@$record_attrs).')' if @$record_attrs;
1159         $queue_state_data .= ';field_list('.join(',',@$metabib_fields).')' if @$metabib_fields;
1160     }
1161
1162     my $qid = $q_obj->{id};
1163     my $run_at = $q_obj->{run_at} || 'NOW';
1164     for my $rid (@input) {
1165         my $success = $main_dbh->selectcol_arrayref(
1166             'SELECT action.enqueue_ingest_entry(?, ?, ?, ?, ?, ?)',
1167             {},
1168             $rid, $queue_type, $run_at, $qid, $queue_action, $queue_state_data
1169         )->[0];
1170
1171         my $update_key = $success ? 'success' : 'fail';
1172         $stats{total}{$update_key}++;
1173         $stats{$queue_type}{$queue_action}{$update_key}++;
1174
1175         my $current_second = CORE::time;
1176         $stats{seconds}{$current_second}{$update_key}++;
1177         $stats{seconds}{$current_second}{total}++;
1178     }
1179     $main_dbh->commit;
1180
1181     return \@input;
1182 }
1183