2 # ---------------------------------------------------------------
3 # Copyright © 2022 Equinox Open Library Initiative, INC.
4 # Mike Rylander <mrylander@gmail.com>
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
15 # ---------------------------------------------------------------
21 use Time::HiRes qw/usleep time/;
23 use POSIX ":sys_wait_h";
25 use OpenSRF::Utils ':daemon';
27 my $raise_db_error = 0;
28 # Globals for the command line options:
30 my $opt_lockfile = '/tmp/queued-ingest-coordinator-LOCK';
31 my $opt_logfile = '/tmp/queued-ingest-coordinator-LOG';
32 my $daemon = 0; # do we go into the background?
33 my $stop = 0; # stop a running coordinator, if the lock file is there
34 my $chatty = 0; # do we yell into the void?
35 my $opt_max_child; # max number of parallel worker processes
36 my $max_child = 20; # max number of parallel worker processes
37 my $max_child_force; # max number of parallel worker processes
40 my $start_id; # start processing at this record ID.
41 my $end_id; # stop processing when this record ID is reached.
42 my $opt_pipe; # Read record ids from STDIN.
43 my $stats_only; # Just report on QI processing stats/progress
44 my $totals_only; # Only give top-line aggregate stats, no per day breakdown
45 my $stats_queues; # Provide a list of queue-specfic stats
46 my $stats_since='001-01-01';# Default "since" time for stats
47 my $queue; # queue id, either pre-existing or created based on params
48 my $queue_type = 'biblio'; # type of records for batch enqueuing
49 my $queue_why; # description for this reingest queue
50 my $queue_action = 'update';# what action is happening to the records: update, insert, delete, propagate
51 my $queue_state_data = ''; # State data required for queue entry processing
52 my $queue_owner; # Owner of the queue
53 my $queue_run_at; # Owner of the queue
54 my $queue_threads; # parallelism for this queue (capped at max_child)
55 my $skip_browse = 0; # Skip the browse reingest.
56 my $skip_attrs = 0; # Skip the record attributes reingest.
57 my $skip_search = 0; # Skip the search reingest.
58 my $skip_facets = 0; # Skip the facets reingest.
59 my $skip_display = 0; # Skip the display reingest.
60 my $skip_full_rec = 0; # Skip the full_rec reingest.
61 my $skip_authority = 0; # Skip the authority reingest.
62 my $skip_luri = 0; # Skip the luri reingest.
63 my $skip_mrmap = 0; # Skip the metarecord remapping.
64 my $record_attrs = []; # Skip the metarecord remapping.
65 my $metabib_fields = []; # Skip the metarecord remapping.
66 my $input_records = []; # Records supplied via CLI switch.
67 my $pingest = ''; # Special "pingest" flag, supplying an EG user name as queue owner.
69 my $help; # show help text
71 # Database connection options with defaults:
72 my $db_user = $ENV{PGUSER} || 'evergreen';
73 my $db_host = $ENV{PGHOST} || 'localhost';
74 my $db_db = $ENV{PGDATABASE} || 'evergreen';
75 my $db_pw = $ENV{PGPASSWORD} || 'evergreen';
76 my $db_port = $ENV{PGPORT} || 5432;
79 'lock-file=s' => \$opt_lockfile,
80 'log-file=s' => \$opt_logfile,
81 'dbuser=s' => \$db_user,
82 'dbhost=s' => \$db_host,
83 'dbname=s' => \$db_db,
85 'dbport=i' => \$db_port,
86 'max-child=i' => \$opt_max_child,
87 'max-child-force' => \$max_child_force,
88 'stats' => \$stats_only,
89 'totals-only' => \$totals_only,
90 'queue-stats' => \$stats_queues,
91 'since=s' => \$stats_since,
93 'queue-action=s' => \$queue_action,
94 'queue-name=s' => \$queue_why,
95 'queue-type=s' => \$queue_type,
96 'queue-owner=s' => \$queue_owner,
97 'queue-run-at=s' => \$queue_run_at,
98 'queue-threads=i' => \$queue_threads,
99 'queue-state-data=s'=> \$queue_state_data,
100 'skip-browse' => \$skip_browse,
101 'skip-attrs' => \$skip_attrs,
102 'skip-search' => \$skip_search,
103 'skip-facets' => \$skip_facets,
104 'skip-display' => \$skip_display,
105 'skip-full_rec' => \$skip_full_rec,
106 'skip-authority' => \$skip_authority,
107 'skip-luri' => \$skip_luri,
108 'skip-mr-map' => \$skip_mrmap,
109 'attr=s@' => \$record_attrs,
110 'field=s@' => \$metabib_fields,
111 'record=s@' => \$input_records,
112 'start-id=i' => \$start_id,
113 'end-id=i' => \$end_id,
114 'pipe' => \$opt_pipe,
115 'pingest=s' => \$pingest,
116 'coordinator' => \$daemon,
118 'chatty' => \$chatty,
125 # Enqueue records 1-500000 for reingest later, just one worker for the queue
126 $0 --queue-threads 1 \
127 --queue-type biblio \
128 --queue-run-at tomorrow \
129 --queue-owner admin \
130 --queue-name "slowly updating records due to new RDA attributes"
131 --start-id 1 --end-id 500000
133 # Start the background worker
134 $0 --coordinator --max-child $max_child
136 # Stop the background worker
137 $0 --coordinator --stop
139 # Process whatever you can Right Now
140 $0 --max-child $max_child
142 # Process a single queue Right Now
143 $0 --queue 1234 --max-child $max_child
145 # Stats on Queued Ingest processing so far today
146 $0 --stats --since today --totals-only
154 Id of an existing queue to report, process, or enqueue into
160 Request statistical information about Queued Ingest processing.
161 This option is required for other report options to work.
164 Only present aggregate total statistics for the reported time
165 rather than a daily breakdown.
168 Limit statistics to processing that was scheduled to happen
169 at or after this timestamp. Normal PostgreSQL shorthand for
170 timestamps are allowed, such as "today" or "yesterday".
174 Provide a per-queue breakdown of processing statistics for
178 ID of a queue about which to a breakdown of processing
185 Start the background watcher.
186 This option conficts with --pipe, --start-id, and --end-id.
189 Max number of database workers to use for entries without a
190 named queue, or when enqueuing to a named queue, the number
191 of database workers to use for queue processing.
200 Type of records to be enqueued; biblio or authority
204 Action triggering the record queuing; insert, update, delete,
209 User name of the owner of a new queue
212 ISO timestamp at which a queue should begin processing
216 Processing concurrency for a new queue, capped at $max_child
220 Any state data required for queue entry processing. For
221 instance, the authority record ID when enqueuing a bib
222 record for authority propagation with the "propagate"
225 ** If none of the name, owner, or run-at options are specified,
226 records will be enqueued for processing without a named ingest
230 Start processing at this record ID.
233 Stop processing when this record ID is reached
236 Read record IDs to reingest from standard input.
237 This option conflicts with --start-id and/or --end-id.
245 my $dsn = "dbi:Pg:dbname=$db_db;host=$db_host;port=$db_port;application_name='queued_ingest';sslmode=allow";
247 my $main_dbh = DBI->connect(
248 $dsn, $db_user, $db_pw,
250 pg_expand_array => 0,
253 RaiseError => $raise_db_error
255 ) || die "Could not connect to the database\n";
257 my $configured_max_child = $main_dbh->selectcol_arrayref(
258 "SELECT value FROM config.global_flag WHERE name = 'ingest.queued.max_threads'"
260 $max_child = $configured_max_child if (!$opt_max_child);
263 if (defined($opt_max_child) && $opt_max_child > 20 && !$max_child_force) {
264 warn('Max Child > 20 ... no, sorry');
268 if ($opt_max_child) {
269 $max_child = $opt_max_child;
272 if ($max_child <= 0) {
276 if ($opt_pipe && ($start_id || $end_id)) {
277 warn('Mutually exclusive options: either pipe or start/end range');
281 if ($daemon && ($start_id || $end_id || $opt_pipe)) {
282 warn('Mutually exclusive options: cannot start or stop the Coordinator in Enqueuing mode');
286 if (!$daemon && $stop) {
287 warn('Option --stop can only be used with the --coordinator option');
291 if ($daemon && $queue) {
292 warn('Mutually exclusive options: cannot start or stop the Coordinator in one-shot processing mode');
296 if ($queue_type && !(grep {$_ eq $queue_type} qw/biblio authority/)) {
297 warn('Valid queue types are biblio and authority');
301 if (!(grep {$_ eq $queue_action} qw/insert update delete propagate/)) {
302 warn('Valid queue actions are: insert, update, delete, propagate');
306 if ($queue && ($queue_owner || $queue_why || $queue_threads || $queue_run_at)) {
307 warn('Mutually exclusive options: specify a queue id OR queue creation values');
312 if ($daemon) { # background mode, we need a lockfile;
315 die "Lockfile $opt_lockfile does not exist, is the coordinator running?\n" unless (-e $opt_lockfile);
317 open(F, "<$opt_lockfile") or die "Unable to open lockfile $opt_lockfile for reading, wrong user?\n";
322 if (kill(0,$old_pid)) {
323 my $dead_count = kill(9,$old_pid);
325 warn "Coordinator process terminated, removing lock file $opt_lockfile\n";
326 unlink($opt_lockfile) or die "Could not remove lock file $opt_lockfile\n";
328 die "Could not kill coordinator process $old_pid\n";
331 warn "Coordinator process not running, removing stale lock file\n";
332 unlink($opt_lockfile) or die "Could not remove lock file $opt_lockfile\n";
335 warn "Coordinator lock file empty, removing lock file $opt_lockfile\n";
336 unlink($opt_lockfile) or die "Could not remove lock file $opt_lockfile\n";
343 die "I'm already running with lock-file $opt_lockfile\n" if (-e $opt_lockfile);
345 $main_dbh->disconnect;
347 daemonize("Queued Ingest Coordinator") if ($daemon);
350 open(F, ">$opt_lockfile") or die "Unable to open lockfile $opt_lockfile for writing\n";
354 open(STDERR, ">>$opt_logfile") if ($opt_logfile);
357 my $start_time = time;
379 my %queues_in_progress;
381 my $db_connections_in_use = 0;
383 if ($start_id || $end_id || $opt_pipe || @$input_records) { # enqueuing mode
385 if ($pingest) { # special mode that sets up two queues that can run in parallel
387 my $no_browse = $skip_browse;
388 my $orig_stat_data = $queue_state_data;
390 # set up the first queue
392 $queue_threads //= 4;
393 $queue_type = 'biblio';
394 $queue_action = 'update';
395 $queue_why = 'pingest - fields and attributes queue';
396 $queue_owner = $pingest;
398 # for pingest mode, always skip authority and luri, and skip browse in the first queue
403 my $record_list = enqueue_input();
404 report_stats('Enqueuing '.$queue_why);
406 if (!$no_browse and @$record_list) { # user didn't ask to skip browse reingest
407 # set up the second queue
409 $queue_threads //= 4;
410 $queue_why = 'pingest - browse queue';
411 $queue_state_data = $orig_stat_data;
423 enqueue_input($record_list);
424 report_stats('Enqueuing '.$queue_why);
427 } else { # just a regular, user-defined QI request
429 report_stats('Enqueuing');
433 } elsif ($queue && !$stats_only) { # single queue processing mode
435 my $q = gather_one_queue($queue);
436 process_one_queue($q->{id}, $max_child);
437 complete_outstanding_queue($q->{id});
438 report_stats('Queue Processing');
440 } elsif (!$daemon && !$stats_only) { # special case: foreground single process, end after
442 my @dbhs = create_processor_dbhs($max_child);
445 my $new_queues = gather_outstanding_queues(); # array ref of queues
446 for my $q (@$new_queues) {
447 process_one_queue($q->{id}, $max_child, \@dbhs);
448 complete_outstanding_queue($q->{id});
449 report_stats('Queue and Entry Processing', $clear++);
452 my $new_entries = gather_outstanding_nonqueue_entries('NULL'); # array ref of queue entries
453 my @eids = map { $$_{id} } @$new_entries;
454 while (my @current_subset = splice(@eids, 0, 10 * $max_child)) {
455 process_input_list(\@current_subset, \@dbhs);
456 report_stats('Queue and Entry Processing', $clear++);
459 } elsif($stats_only) {
462 my $q_query = <<" SQL";
463 SELECT run_at::DATE AS scheduled_date,
464 SUM((start_time IS NULL)::INT) AS pending,
465 SUM((start_time IS NOT NULL AND end_time IS NULL)::INT) AS ongoing,
466 SUM((end_time IS NOT NULL)::INT) AS complete,
468 FROM action.ingest_queue
474 if (!$queue) { # all queues in the time range
475 my $qstat_rows = $main_dbh->selectall_arrayref($q_query, { Slice => {} }, $stats_since);
476 if (@$qstat_rows > 1) {
477 $qstat_rows = [ $$qstat_rows[-1] ] if ($totals_only);
479 push @output, "Named Queue processing stats";
480 push @output, "============================","";
481 for my $row ( @$qstat_rows ) {
482 push @output, "* Scheduled processing date: $$row{scheduled_date}" if ($$row{scheduled_date});
483 push @output, " Totals"," ------" unless ($$row{scheduled_date});
485 push @output, " Pending: $$row{pending}";
486 push @output, " Ongoing: $$row{ongoing}";
487 push @output, " Complete: $$row{complete}";
498 if ($stats_queues || $queue) {
502 SUM((e.ingest_time IS NULL AND e.override_by IS NULL)::INT) AS pending,
503 SUM((e.override_by IS NOT NULL)::INT) AS overridden,
504 SUM((e.ingest_time IS NOT NULL)::INT) AS complete,
505 SUM((e.fail_time IS NOT NULL)::INT) AS failed,
506 COUNT(e.id) AS events
507 FROM action.ingest_queue q
508 JOIN actor.usr u ON (q.who=u.id)
509 LEFT JOIN action.ingest_queue_entry e ON (e.queue=q.id)
511 GROUP BY 1,2,3,4,5,6,7,8,9
512 ORDER BY q.run_at, q.id
515 my $param = $stats_since;
518 $q_query =~ s/XX/id =/;
520 $q_query =~ s/XX/run_at >=/;
523 my $qstat_rows = $main_dbh->selectall_arrayref($q_query, { Slice => {} }, $param);
526 push @output, "Named Queue details";
527 push @output, "===================","";
528 for my $row ( @$qstat_rows ) {
529 push @output, "* Queue id: $$row{id} | Threads: $$row{threads} | Owner: $$row{usrname}";
530 push @output, "* Reason: $$row{why}";
531 push @output, "* Create time: $$row{created}";
532 push @output, "* Scheduled start time: $$row{run_at}";
533 push @output, " - Started: $$row{start_time}";
534 push @output, " - Ended : $$row{end_time}","";
535 push @output, " Pending: $$row{pending}";
536 push @output, " Overridden: $$row{overridden}";
537 push @output, " Complete: $$row{complete}";
538 push @output, " Failed: $$row{failed}";
539 push @output, " Total: $$row{events}";
540 push @output, "Percent complete: " . sprintf('%.2f',(($$row{complete} + 1.0) / ($$row{events} - $$row{failed} + 1.0)) * 100.0);
552 my $e_query = <<" SQL";
553 SELECT run_at::DATE AS scheduled_date,
556 SUM((ingest_time IS NULL AND fail_time IS NULL AND override_by IS NULL AND queue IS NOT NULL)::INT) AS pending_with_queue,
557 SUM((ingest_time IS NULL AND fail_time IS NULL AND override_by IS NULL AND queue IS NULL)::INT) AS pending_without_queue,
558 SUM((ingest_time IS NULL AND fail_time IS NULL AND override_by IS NULL)::INT) AS pending,
559 SUM((ingest_time IS NOT NULL AND override_by IS NULL)::INT) AS processed,
560 SUM((ingest_time IS NOT NULL AND override_by IS NOT NULL)::INT) AS overridden,
561 SUM((fail_time IS NOT NULL)::INT) AS failed,
562 SUM((ingest_time IS NOT NULL)::INT) AS complete,
564 FROM action.ingest_queue_entry
566 GROUP BY 2,3, ROLLUP (1)
570 my $estat_rows = $main_dbh->selectall_arrayref($e_query, { Slice => {} }, $stats_since);
571 if (@$estat_rows > 1) {
572 $estat_rows = [ grep { !defined($$_{scheduled_date}) } @$estat_rows ] if ($totals_only);
574 push @output, "Record processing stats";
575 push @output, "============================","";
576 for my $row ( @$estat_rows ) {
577 push @output, "* Scheduled processing date: $$row{scheduled_date}" if ($$row{scheduled_date});
578 push @output, " Record Type: $$row{record_type}; Action: $$row{action}";
579 push @output, " Totals"," ------" unless ($$row{scheduled_date});
581 push @output, " Pending: $$row{pending}";
582 push @output, " ... $$row{pending_with_queue} with a named queue" if $$row{pending};
583 push @output, " ... $$row{pending_without_queue} without a named queue" if $$row{pending};
584 push @output, " Processed: $$row{processed}";
585 push @output, " Overridden: $$row{overridden}";
586 push @output, " Complete: $$row{complete}";
587 push @output, " Failed: $$row{failed}";
588 push @output, " Total: $$row{total}";
599 print join("\n", @output);
601 } elsif($daemon) { # background processing
603 $SIG{CHLD} = \&REAPER;
605 my %queues_in_waiting;
608 my $pid = spawn_kid();
609 my $orphan_processor = $processors{$pid};
610 $orphan_processor->{state} = 0; # will be decremented (made true) once used
611 print {$orphan_processor->{pipe}} "n:Orphan processor\n"; # set parallelism
613 refresh_main_dbh()->do('LISTEN queued_ingest');
618 warn_if_chatty("starting processing loop");
620 my @complete_processors = grep { $_->{state} == 3 } values %processors;
621 warn_if_chatty("".scalar(@complete_processors)." complete for cleanup");
622 for my $p (@complete_processors) {
624 if (my $dead_pid = $$p{DEAD}) {
625 warn_if_chatty("processor $dead_pid already cleaned up, final flush");
626 delete $processors{$dead_pid}{$_} for keys %{$processors{$dead_pid}};
627 delete $processors{$dead_pid};
631 $$p{DEAD} = $$p{pid};
632 warn_if_chatty("phase 1 cleanup of processor $$p{DEAD}");
633 $db_connections_in_use -= $$p{threads};
636 for my $q (keys %{$$p{queues}}) {
637 warn_if_chatty("processor $$p{pid} finished processing queue $q");
638 delete $queues_in_progress{$q}{processors}{$$p{pid}};
639 if (!scalar(keys(%{$queues_in_progress{$q}{processors}}))) { # that was the last processor for the queue
640 warn_if_chatty("queue $q complete");
641 complete_outstanding_queue($q);
642 delete $queues_in_progress{$q};
643 delete $$p{queues}{$q};
649 warn_if_chatty("".scalar(keys(%queues_in_progress))." queues in progress");
650 warn_if_chatty("checking for new queues");
652 my $new_queues = gather_outstanding_queues(); # array ref of queues
654 warn_if_chatty("".scalar(@$new_queues)." new queues");
655 $queues_in_waiting{$_->{id}} = $_ for @$new_queues;
657 my @ready_kids = grep { $_->{state} == 1 } values %processors;
658 if (my $needed = scalar(@$new_queues) - scalar(@ready_kids)) {
659 warn_if_chatty("spawning $needed new processors");
660 spawn_kid() while ($needed--);
663 @ready_kids = grep { $_->{state} == 1 } values %processors;
665 my @sorted_queues = sort {$$a{run_at} cmp $$b{run_at}} values %queues_in_waiting;
666 for my $q (@sorted_queues) {
668 my $local_max = $max_child - $db_connections_in_use;
669 if ($local_max > 0) { # we have connections available
670 my $ready = shift @ready_kids;
673 # cap at unused max if more threads were requested
674 $$q{threads} = $local_max if ($$q{threads} > $local_max);
675 $ready->{threads} = $$q{threads};
676 $ready->{state} = 2; # running now
677 $ready->{queues}{$$q{id}} = 1;
679 $queues_in_progress{$$q{id}} = delete $queues_in_waiting{$$q{id}};
680 $queues_in_progress{$$q{id}}{processors}{$ready->{pid}} = 1;
682 $db_connections_in_use += $$q{threads};
683 print {$ready->{pipe}} "n:Queue $$q{id}\n";
684 print {$ready->{pipe}} "p:$$q{threads}\n";
685 print {$ready->{pipe}} "q:$$q{id}\n";
686 shutdown($ready->{pipe},2);
688 warn_if_chatty("no db connections available, we'll wait");
693 warn_if_chatty("checking orphan processor");
695 if ($orphan_processor->{state} == 3) { # replace it
697 warn_if_chatty("replacing orphan processor, it is finished");
700 $orphan_processor = $processors{$pid};
701 $orphan_processor->{state} = 0; # will be decremented (made true) once used
702 print {$orphan_processor->{pipe}} "n:Orphan processor\n"; # set parallelism
705 warn_if_chatty("gathering orphan entries");
707 my $new_entries = gather_outstanding_nonqueue_entries(10 * $max_child); # array ref of queue entries
708 if ($new_entries and @$new_entries) {
709 warn_if_chatty("".scalar(@$new_entries)." new entries");
710 if ($orphan_processor->{state}) { # already processing some entries
711 warn_if_chatty("orphan processor is busy, wait and loop");
713 my $local_max = $max_child - $db_connections_in_use;
714 if ($local_max > 0) {
715 $orphan_processor->{state}--;
716 $db_connections_in_use += $local_max;
717 $orphan_processor->{threads} = $local_max;
718 $orphan_processor->{entries} = [ map { $$_{id} } @$new_entries ];
719 print {$orphan_processor->{pipe}} "p:$local_max\n"; # set parallelism
720 print {$orphan_processor->{pipe}} "e:$_\n" for (@{$orphan_processor->{entries}});
721 print {$orphan_processor->{pipe}} "!!\n";
722 shutdown($orphan_processor->{pipe},2);
724 warn_if_chatty("no db connections available for the orphan processor, wait and loop");
730 warn_if_chatty("waiting for LISTEN notifications");
731 until (defined(refresh_main_dbh()->pg_notifies)) {
734 if ($inner_loops >= 10) {
735 last; # loop after 10s at most
745 sub refresh_main_dbh {
746 unless ($main_dbh->ping) {
747 $main_dbh = DBI->connect(
748 $dsn, $db_user, $db_pw,
750 pg_expand_array => 0,
753 RaiseError => $raise_db_error
755 ) || die "Could not connect to the database\n";
756 $main_dbh->do('LISTEN queued_ingest') if ($daemon);
762 local $!; # don't let waitpid() overwrite current error
763 warn_if_chatty("REAPER called");
764 while ((my $pid = waitpid(-1, WNOHANG)) > 0) {
765 warn_if_chatty("reaping kid $pid");
766 $processors{$pid}{state} = 3;
768 $SIG{CHLD} = \&REAPER;
771 sub create_processor_dbhs {
775 while (scalar(@dbhs) < $count) {
776 push @dbhs, DBI->connect(
777 $dsn, $db_user, $db_pw,
779 pg_expand_array => 0,
782 RaiseError => $raise_db_error
790 sub complete_outstanding_queue {
793 return refresh_main_dbh()->do(
794 'UPDATE action.ingest_queue SET end_time = NOW()'.
795 ' WHERE id=? RETURNING *',
800 sub gather_one_queue {
802 my $q = refresh_main_dbh()->selectrow_hashref(
803 'UPDATE action.ingest_queue SET start_time = NOW()'.
804 ' WHERE id = ? RETURNING *',
811 sub gather_outstanding_queues {
812 my $qs = refresh_main_dbh()->selectall_hashref(
813 'UPDATE action.ingest_queue SET start_time = NOW()'.
814 ' WHERE start_time IS NULL AND run_at <= NOW()'.
819 for my $q (values %$qs) {
823 return [values %$qs];
826 sub gather_outstanding_nonqueue_entries {
828 return refresh_main_dbh()->selectall_arrayref(
829 "SELECT * FROM action.ingest_queue_entry".
830 " WHERE queue IS NULL".
831 " AND run_at <= NOW()".
832 " AND override_by IS NULL".
833 " AND ingest_time IS NULL".
834 " AND fail_time IS NULL".
835 " ORDER BY run_at, id".
845 socketpair($child, $parent, AF_UNIX, SOCK_STREAM, PF_UNSPEC)
846 || die "socketpair: $!";
848 $parent->autoflush(1);
849 $child->autoflush(1);
851 my $kid_pid = fork() //
852 die "Could not fork worker process";
856 $processors{$kid_pid} = {
862 set_psname("Queued Ingest worker - waiting");
863 open(STDERR, ">>$opt_logfile") if ($opt_logfile);
864 $SIG{CHLD} = 'IGNORE';
866 process_commands_from_parent($parent);
867 warn_if_chatty("finished processing commands from parent, exiting");
868 report_stats("Entry Processing - worker $$",0,1);
876 return unless $chatty;
880 warn "$time [$$] $msg\n";
883 sub process_commands_from_parent {
894 if (/^q:(\d+)$/) { # we were passed a queue id
897 warn_if_chatty("processing queue $queue, should exit after");
898 process_one_queue($queue,$dbh_count);
899 warn_if_chatty("processing queue $queue complete");
900 } elsif (/^n:(.+)$/) { # we were passed a process name
901 set_psname("Queued Ingest worker - ". $1);
902 } elsif (/^p:(\d+)$/) { # we were passed a dbh count (parallelism)
903 $dbh_count = $1 || 1;
904 warn_if_chatty("parallelism set to $dbh_count");
905 } elsif (/^e:(\d+)$/) { # we were passed an entry id
908 } elsif (/^##$/) { # This is the "process those, but then wait" command
910 } elsif (/^!!$/) { # This is the "end of input, process and exit" command
911 warn_if_chatty("end of the command stream, should exit after");
917 # we have a list of entries to process
918 if (my $icount = scalar(@input)) {
919 my @dbhs = create_processor_dbhs($dbh_count);
920 warn_if_chatty("processing $icount entries...");
921 process_input_list(\@input, \@dbhs);
922 warn_if_chatty("processing $icount entries complete");
925 last if $stop_after || !$cont;
931 sub process_one_queue {
933 my $dbh_count = shift || 1;
934 my $dbh_list = shift;
938 my @dbhs = $dbh_list ? @$dbh_list : create_processor_dbhs($dbh_count);
939 my @input = @{$dbhs[0]->selectcol_arrayref(
940 'SELECT id FROM action.ingest_queue_entry'.
942 ' AND override_by IS NULL'.
943 ' AND ingest_time IS NULL'.
944 ' AND fail_time IS NULL',
948 return unless @input;
949 return process_input_list(\@input, \@dbhs);
952 sub process_entry_begin {
953 my $entry_id = shift;
958 my $entry = $dbh->selectrow_hashref(
959 "SELECT * FROM action.ingest_queue_entry WHERE id = ?", undef, $entry_id
966 my $sth = $dbh->prepare(
967 "SELECT action.process_ingest_queue_entry(?)", {pg_async => 1}
970 if (!$sth->execute($entry_id)) {
971 $stats{total}{fail}++;
972 $stats{$$entry{record_type}}{$$entry{action}}{fail}++;
974 my $current_second = CORE::time;
975 $stats{seconds}{$current_second}{fail}++;
976 $stats{seconds}{$current_second}{total}++;
990 sub process_entry_complete {
994 if ($$ms{$eid}{sth}->pg_ready) {
995 $$ms{$eid}{sth}->pg_result;
996 my ($success) = $$ms{$eid}{sth}->fetchrow_array;
997 $$ms{$eid}{sth}->finish;
999 $success = $success ? 'success' : 'fail';
1000 $stats{total}{$success}++;
1001 $stats{$$ms{$eid}{entry}{record_type}}{$$ms{$eid}{entry}{action}}{$success}++;
1003 my $current_second = CORE::time;
1004 $stats{seconds}{$current_second}{$success}++;
1005 $stats{seconds}{$current_second}{total}++;
1007 my $dbh = delete $$ms{$eid}{dbh};
1008 delete $$ms{$eid}{$_} for keys %{$$ms{$eid}};
1016 sub process_input_list {
1021 while (@$input || keys(%microstate)) {
1023 # do we have an idle worker, and work to do?
1024 if (@$input && scalar(@$dbhs)) {
1025 my $entry_id = shift @$input;
1026 my $dbh = shift @$dbhs;
1027 my $failed_dbh = process_entry_begin($entry_id, \%microstate, $dbh);
1029 push @$dbhs, $failed_dbh;
1034 # look at them in ascending order, as the oldest will have
1035 # been running the longest and is more likely to be finished
1036 my @entries = sort {$a <=> $b} keys %microstate;
1037 for my $eid (@entries) {
1038 my $success_dbh = process_entry_complete($eid, \%microstate);
1040 push @$dbhs, $success_dbh;
1044 usleep(10000) if (keys %microstate); # ~0.01 seconds
1054 my $runtime = time - $start_time;
1055 my @seconds_list = sort keys %{$stats{seconds}};
1056 my $first_second = $seconds_list[0];
1057 my $last_second = $seconds_list[-1];
1058 my $processing_seconds = ($last_second - $first_second) + 1.0;
1060 system('clear') if $clear;
1062 print "$label stats:\n" unless $warn;
1063 warn "$label stats:\n" if $warn;
1064 for my $type (qw/biblio authority/) {
1065 for my $action ( sort keys %{$stats{$type}} ) {
1066 my $part = $stats{$type}{$action};
1067 next unless (defined($$part{success}) || defined($$part{fail}));
1069 $$part{success} //= 0;
1072 my $subtotal = $$part{success} + $$part{fail};
1073 print " * Type: $type\n - Action: $action :: $$part{success}/$$part{fail}/$subtotal\n" unless $warn;
1074 warn " * Type: $type\n - Action: $action :: $$part{success}/$$part{fail}/$subtotal\n" if $warn;
1078 $stats{total}{success} //= 0;
1079 $stats{total}{fail} //= 0;
1081 my $per_sec = ($stats{total}{success} + $stats{total}{fail}) / $processing_seconds;
1083 print " * Timing: $runtime runtime, $processing_seconds seconds processing, $per_sec per second\n".
1084 " * Total: $stats{total}{success} succeeded, $stats{total}{fail} failed\n" unless $warn;
1085 warn " * Timing: $runtime runtime, $processing_seconds seconds processing, $per_sec per second\n".
1086 " * Total: $stats{total}{success} succeeded, $stats{total}{fail} failed\n" if $warn;
1090 my $predestined_input = shift;
1093 if ($predestined_input and @$predestined_input) {
1094 @input = @$predestined_input;
1095 } elsif ($opt_pipe) {
1097 # Assume any string of digits is an id.
1098 if (my @subs = /([0-9]+)/g) {
1099 push(@input, @subs);
1102 } elsif (@$input_records) {
1103 @input = grep { /^\d+$/ } @$input_records;
1105 my $q = "SELECT id FROM $queue_type.record_entry WHERE NOT DELETED";
1106 if ($start_id && $end_id) {
1107 $q .= " AND id BETWEEN $start_id AND $end_id";
1108 } elsif ($start_id) {
1109 $q .= " AND id >= $start_id";
1111 $q .= " AND id <= $end_id";
1113 $q .= ' ORDER BY id ASC';
1115 @input = @{$main_dbh->selectcol_arrayref($q)};
1118 $main_dbh->begin_work;
1119 if ($queue_why || $queue_threads || $queue_run_at) {
1120 my $rows = $main_dbh->do(
1121 'INSERT INTO action.ingest_queue (why,threads,run_at) VALUES(?,?,?)',
1122 undef, $queue_why, $queue_threads || 1, $queue_run_at || 'NOW'
1124 if ($rows && $queue_owner) {
1125 $queue = $main_dbh->last_insert_id(undef,'action','ingest_queue',undef);
1126 if ($queue && $queue_owner) {
1127 my $uid = $main_dbh->selectcol_arrayref(
1128 'SELECT id FROM actor.usr WHERE usrname = ?',
1132 $rows = $main_dbh->do(
1133 'UPDATE action.ingest_queue SET who = ? WHERE id = ?',
1143 $q_obj = $main_dbh->selectrow_hashref("SELECT * FROM action.ingest_queue WHERE id=$queue") || {};
1145 $queue = $q_obj->{id} || '0';
1147 if ($queue_type eq 'biblio' and $queue_action eq 'update') {
1148 $queue_state_data .= ';skip_browse' if $skip_browse;
1149 $queue_state_data .= ';skip_attrs' if $skip_attrs;
1150 $queue_state_data .= ';skip_search' if $skip_search;
1151 $queue_state_data .= ';skip_facets' if $skip_facets;
1152 $queue_state_data .= ';skip_display' if $skip_display;
1153 $queue_state_data .= ';skip_full_rec' if $skip_full_rec;
1154 $queue_state_data .= ';skip_authority' if $skip_authority;
1155 $queue_state_data .= ';skip_luri' if $skip_luri;
1156 $queue_state_data .= ';skip_mrmap' if $skip_mrmap;
1158 $queue_state_data .= ';attr_list('.join(',',@$record_attrs).')' if @$record_attrs;
1159 $queue_state_data .= ';field_list('.join(',',@$metabib_fields).')' if @$metabib_fields;
1162 my $qid = $q_obj->{id};
1163 my $run_at = $q_obj->{run_at} || 'NOW';
1164 for my $rid (@input) {
1165 my $success = $main_dbh->selectcol_arrayref(
1166 'SELECT action.enqueue_ingest_entry(?, ?, ?, ?, ?, ?)',
1168 $rid, $queue_type, $run_at, $qid, $queue_action, $queue_state_data
1171 my $update_key = $success ? 'success' : 'fail';
1172 $stats{total}{$update_key}++;
1173 $stats{$queue_type}{$queue_action}{$update_key}++;
1175 my $current_second = CORE::time;
1176 $stats{seconds}{$current_second}{$update_key}++;
1177 $stats{seconds}{$current_second}{total}++;