#!/usr/bin/perl # --------------------------------------------------------------- # Copyright © 2022 Equinox Open Library Initiative, INC. # Mike Rylander # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # --------------------------------------------------------------- use strict; use warnings; use DBI; use Getopt::Long; use Time::HiRes qw/usleep time/; use IO::Handle; use POSIX ":sys_wait_h"; use Socket; use OpenSRF::Utils ':daemon'; my $raise_db_error = 0; # Globals for the command line options: my $opt_lockfile = '/tmp/queued-ingest-coordinator-LOCK'; my $opt_logfile = '/tmp/queued-ingest-coordinator-LOG'; my $daemon = 0; # do we go into the background? my $stop = 0; # stop a running coordinator, if the lock file is there my $chatty = 0; # do we yell into the void? my $opt_max_child; # max number of parallel worker processes my $max_child = 20; # max number of parallel worker processes my $max_child_force; # max number of parallel worker processes # for enqueuing mode my $start_id; # start processing at this record ID. my $end_id; # stop processing when this record ID is reached. my $opt_pipe; # Read record ids from STDIN. my $stats_only; # Just report on QI processing stats/progress my $totals_only; # Only give top-line aggregate stats, no per day breakdown my $stats_queues; # Provide a list of queue-specfic stats my $stats_since='001-01-01';# Default "since" time for stats my $queue; # queue id, either pre-existing or created based on params my $queue_type = 'biblio'; # type of records for batch enqueuing my $queue_why; # description for this reingest queue my $queue_action = 'update';# what action is happening to the records: update, insert, delete, propagate my $queue_state_data = ''; # State data required for queue entry processing my $queue_owner; # Owner of the queue my $queue_run_at; # Owner of the queue my $queue_threads; # parallelism for this queue (capped at max_child) my $skip_browse = 0; # Skip the browse reingest. my $skip_attrs = 0; # Skip the record attributes reingest. my $skip_search = 0; # Skip the search reingest. my $skip_facets = 0; # Skip the facets reingest. my $skip_display = 0; # Skip the display reingest. my $skip_full_rec = 0; # Skip the full_rec reingest. my $skip_authority = 0; # Skip the authority reingest. my $skip_luri = 0; # Skip the luri reingest. my $skip_mrmap = 0; # Skip the metarecord remapping. my $record_attrs = []; # Skip the metarecord remapping. my $metabib_fields = []; # Skip the metarecord remapping. my $input_records = []; # Records supplied via CLI switch. my $pingest = ''; # Special "pingest" flag, supplying an EG user name as queue owner. my $help; # show help text # Database connection options with defaults: my $db_user = $ENV{PGUSER} || 'evergreen'; my $db_host = $ENV{PGHOST} || 'localhost'; my $db_db = $ENV{PGDATABASE} || 'evergreen'; my $db_pw = $ENV{PGPASSWORD} || 'evergreen'; my $db_port = $ENV{PGPORT} || 5432; GetOptions( 'lock-file=s' => \$opt_lockfile, 'log-file=s' => \$opt_logfile, 'dbuser=s' => \$db_user, 'dbhost=s' => \$db_host, 'dbname=s' => \$db_db, 'dbpw=s' => \$db_pw, 'dbport=i' => \$db_port, 'max-child=i' => \$opt_max_child, 'max-child-force' => \$max_child_force, 'stats' => \$stats_only, 'totals-only' => \$totals_only, 'queue-stats' => \$stats_queues, 'since=s' => \$stats_since, 'queue=i' => \$queue, 'queue-action=s' => \$queue_action, 'queue-name=s' => \$queue_why, 'queue-type=s' => \$queue_type, 'queue-owner=s' => \$queue_owner, 'queue-run-at=s' => \$queue_run_at, 'queue-threads=i' => \$queue_threads, 'queue-state-data=s'=> \$queue_state_data, 'skip-browse' => \$skip_browse, 'skip-attrs' => \$skip_attrs, 'skip-search' => \$skip_search, 'skip-facets' => \$skip_facets, 'skip-display' => \$skip_display, 'skip-full_rec' => \$skip_full_rec, 'skip-authority' => \$skip_authority, 'skip-luri' => \$skip_luri, 'skip-mr-map' => \$skip_mrmap, 'attr=s@' => \$record_attrs, 'field=s@' => \$metabib_fields, 'record=s@' => \$input_records, 'start-id=i' => \$start_id, 'end-id=i' => \$end_id, 'pipe' => \$opt_pipe, 'pingest=s' => \$pingest, 'coordinator' => \$daemon, 'stop' => \$stop, 'chatty' => \$chatty, 'help' => \$help ) or help(); sub help { print <connect( $dsn, $db_user, $db_pw, { AutoCommit => 1, pg_expand_array => 0, pg_enable_utf8 => 1, pg_bool_tf => 0, RaiseError => $raise_db_error } ) || die "Could not connect to the database\n"; my $configured_max_child = $main_dbh->selectcol_arrayref( "SELECT value FROM config.global_flag WHERE name = 'ingest.queued.max_threads'" )->[0] || 20; $max_child = $configured_max_child if (!$opt_max_child); if (defined($opt_max_child) && $opt_max_child > 20 && !$max_child_force) { warn('Max Child > 20 ... no, sorry'); help(); } if ($opt_max_child) { $max_child = $opt_max_child; } if ($max_child <= 0) { $max_child = 20; } if ($opt_pipe && ($start_id || $end_id)) { warn('Mutually exclusive options: either pipe or start/end range'); help(); } if ($daemon && ($start_id || $end_id || $opt_pipe)) { warn('Mutually exclusive options: cannot start or stop the Coordinator in Enqueuing mode'); help(); } if (!$daemon && $stop) { warn('Option --stop can only be used with the --coordinator option'); help(); } if ($daemon && $queue) { warn('Mutually exclusive options: cannot start or stop the Coordinator in one-shot processing mode'); help(); } if ($queue_type && !(grep {$_ eq $queue_type} qw/biblio authority/)) { warn('Valid queue types are biblio and authority'); help(); } if (!(grep {$_ eq $queue_action} qw/insert update delete propagate/)) { warn('Valid queue actions are: insert, update, delete, propagate'); help(); } if ($queue && ($queue_owner || $queue_why || $queue_threads || $queue_run_at)) { warn('Mutually exclusive options: specify a queue id OR queue creation values'); help(); } if ($daemon) { # background mode, we need a lockfile; if ($stop) { die "Lockfile $opt_lockfile does not exist, is the coordinator running?\n" unless (-e $opt_lockfile); open(F, "<$opt_lockfile") or die "Unable to open lockfile $opt_lockfile for reading, wrong user?\n"; my $old_pid = ; close F; if ($old_pid) { if (kill(0,$old_pid)) { my $dead_count = kill(9,$old_pid); if ($dead_count) { warn "Coordinator process terminated, removing lock file $opt_lockfile\n"; unlink($opt_lockfile) or die "Could not remove lock file $opt_lockfile\n"; } else { die "Could not kill coordinator process $old_pid\n"; } } else { warn "Coordinator process not running, removing stale lock file\n"; unlink($opt_lockfile) or die "Could not remove lock file $opt_lockfile\n"; } } else { warn "Coordinator lock file empty, removing lock file $opt_lockfile\n"; unlink($opt_lockfile) or die "Could not remove lock file $opt_lockfile\n"; } exit; } # check the lockfile die "I'm already running with lock-file $opt_lockfile\n" if (-e $opt_lockfile); $main_dbh->disconnect; daemonize("Queued Ingest Coordinator") if ($daemon); # set the lockfile open(F, ">$opt_lockfile") or die "Unable to open lockfile $opt_lockfile for writing\n"; print F $$; close F; open(STDERR, ">>$opt_logfile") if ($opt_logfile); } my $start_time = time; my %stats; sub reset_stats { %stats = ( total => { }, biblio => { insert => {}, update => {}, delete => {}, propagate => {} }, authority => { insert => {}, update => {}, delete => {} }, seconds => {} ); } reset_stats(); my %processors; my %queues_in_progress; my $db_connections_in_use = 0; if ($start_id || $end_id || $opt_pipe || @$input_records) { # enqueuing mode if ($pingest) { # special mode that sets up two queues that can run in parallel my $no_browse = $skip_browse; my $orig_stat_data = $queue_state_data; # set up the first queue $queue = undef; $queue_threads //= 4; $queue_type = 'biblio'; $queue_action = 'update'; $queue_why = 'pingest - fields and attributes queue'; $queue_owner = $pingest; # for pingest mode, always skip authority and luri, and skip browse in the first queue $skip_browse = 1; $skip_authority = 1; $skip_luri = 1; my $record_list = enqueue_input(); report_stats('Enqueuing '.$queue_why); if (!$no_browse and @$record_list) { # user didn't ask to skip browse reingest # set up the second queue $queue = undef; $queue_threads //= 4; $queue_why = 'pingest - browse queue'; $queue_state_data = $orig_stat_data; $skip_browse = 0; $skip_attrs = 1; $skip_search = 1; $skip_facets = 1; $skip_display = 1; $skip_full_rec = 1; $skip_mrmap = 1; reset_stats(); enqueue_input($record_list); report_stats('Enqueuing '.$queue_why); } } else { # just a regular, user-defined QI request enqueue_input(); report_stats('Enqueuing'); } } elsif ($queue && !$stats_only) { # single queue processing mode my $q = gather_one_queue($queue); process_one_queue($q->{id}, $max_child); complete_outstanding_queue($q->{id}); report_stats('Queue Processing'); } elsif (!$daemon && !$stats_only) { # special case: foreground single process, end after my @dbhs = create_processor_dbhs($max_child); my $clear = 0; my $new_queues = gather_outstanding_queues(); # array ref of queues for my $q (@$new_queues) { process_one_queue($q->{id}, $max_child, \@dbhs); complete_outstanding_queue($q->{id}); report_stats('Queue and Entry Processing', $clear++); } my $new_entries = gather_outstanding_nonqueue_entries('NULL'); # array ref of queue entries my @eids = map { $$_{id} } @$new_entries; while (my @current_subset = splice(@eids, 0, 10 * $max_child)) { process_input_list(\@current_subset, \@dbhs); report_stats('Queue and Entry Processing', $clear++); } } elsif($stats_only) { my @output; my $q_query = <<" SQL"; SELECT run_at::DATE AS scheduled_date, SUM((start_time IS NULL)::INT) AS pending, SUM((start_time IS NOT NULL AND end_time IS NULL)::INT) AS ongoing, SUM((end_time IS NOT NULL)::INT) AS complete, COUNT(*) AS total FROM action.ingest_queue WHERE run_at >= ? GROUP BY ROLLUP (1) ORDER BY 1 SQL if (!$queue) { # all queues in the time range my $qstat_rows = $main_dbh->selectall_arrayref($q_query, { Slice => {} }, $stats_since); if (@$qstat_rows > 1) { $qstat_rows = [ $$qstat_rows[-1] ] if ($totals_only); push @output, "Named Queue processing stats"; push @output, "============================",""; for my $row ( @$qstat_rows ) { push @output, "* Scheduled processing date: $$row{scheduled_date}" if ($$row{scheduled_date}); push @output, " Totals"," ------" unless ($$row{scheduled_date}); push @output, " Pending: $$row{pending}"; push @output, " Ongoing: $$row{ongoing}"; push @output, " Complete: $$row{complete}"; push @output,""; push @output,'-'x50; push @output,""; } push @output,""; } } if ($stats_queues || $queue) { $q_query = <<" SQL"; SELECT q.*, u.usrname, SUM((e.ingest_time IS NULL AND e.override_by IS NULL)::INT) AS pending, SUM((e.override_by IS NOT NULL)::INT) AS overridden, SUM((e.ingest_time IS NOT NULL)::INT) AS complete, SUM((e.fail_time IS NOT NULL)::INT) AS failed, COUNT(e.id) AS events FROM action.ingest_queue q JOIN actor.usr u ON (q.who=u.id) LEFT JOIN action.ingest_queue_entry e ON (e.queue=q.id) WHERE q.XX ? GROUP BY 1,2,3,4,5,6,7,8,9 ORDER BY q.run_at, q.id SQL my $param = $stats_since; if ($queue) { $param = $queue; $q_query =~ s/XX/id =/; } else { $q_query =~ s/XX/run_at >=/; } my $qstat_rows = $main_dbh->selectall_arrayref($q_query, { Slice => {} }, $param); if (@$qstat_rows) { push @output, "Named Queue details"; push @output, "===================",""; for my $row ( @$qstat_rows ) { push @output, "* Queue id: $$row{id} | Threads: $$row{threads} | Owner: $$row{usrname}"; push @output, "* Reason: $$row{why}"; push @output, "* Create time: $$row{created}"; push @output, "* Scheduled start time: $$row{run_at}"; push @output, " - Started: $$row{start_time}"; push @output, " - Ended : $$row{end_time}",""; push @output, " Pending: $$row{pending}"; push @output, " Overridden: $$row{overridden}"; push @output, " Complete: $$row{complete}"; push @output, " Failed: $$row{failed}"; push @output, " Total: $$row{events}"; push @output, "Percent complete: " . sprintf('%.2f',(($$row{complete} + 1.0) / ($$row{events} - $$row{failed} + 1.0)) * 100.0); push @output,""; push @output,'-'x50; push @output,""; } push @output,""; } } if (!$queue) { my $e_query = <<" SQL"; SELECT run_at::DATE AS scheduled_date, record_type, action, SUM((ingest_time IS NULL AND fail_time IS NULL AND override_by IS NULL AND queue IS NOT NULL)::INT) AS pending_with_queue, SUM((ingest_time IS NULL AND fail_time IS NULL AND override_by IS NULL AND queue IS NULL)::INT) AS pending_without_queue, SUM((ingest_time IS NULL AND fail_time IS NULL AND override_by IS NULL)::INT) AS pending, SUM((ingest_time IS NOT NULL AND override_by IS NULL)::INT) AS processed, SUM((ingest_time IS NOT NULL AND override_by IS NOT NULL)::INT) AS overridden, SUM((fail_time IS NOT NULL)::INT) AS failed, SUM((ingest_time IS NOT NULL)::INT) AS complete, COUNT(*) AS total FROM action.ingest_queue_entry WHERE run_at >= ? GROUP BY 2,3, ROLLUP (1) ORDER BY 1,2,3 SQL my $estat_rows = $main_dbh->selectall_arrayref($e_query, { Slice => {} }, $stats_since); if (@$estat_rows > 1) { $estat_rows = [ grep { !defined($$_{scheduled_date}) } @$estat_rows ] if ($totals_only); push @output, "Record processing stats"; push @output, "============================",""; for my $row ( @$estat_rows ) { push @output, "* Scheduled processing date: $$row{scheduled_date}" if ($$row{scheduled_date}); push @output, " Record Type: $$row{record_type}; Action: $$row{action}"; push @output, " Totals"," ------" unless ($$row{scheduled_date}); push @output, " Pending: $$row{pending}"; push @output, " ... $$row{pending_with_queue} with a named queue" if $$row{pending}; push @output, " ... $$row{pending_without_queue} without a named queue" if $$row{pending}; push @output, " Processed: $$row{processed}"; push @output, " Overridden: $$row{overridden}"; push @output, " Complete: $$row{complete}"; push @output, " Failed: $$row{failed}"; push @output, " Total: $$row{total}"; push @output,""; push @output,'-'x50; push @output,""; } } push @output,""; } print join("\n", @output); } elsif($daemon) { # background processing $SIG{CHLD} = \&REAPER; my %queues_in_waiting; my @orphan_entries; my $pid = spawn_kid(); my $orphan_processor = $processors{$pid}; $orphan_processor->{state} = 0; # will be decremented (made true) once used print {$orphan_processor->{pipe}} "n:Orphan processor\n"; # set parallelism refresh_main_dbh()->do('LISTEN queued_ingest'); my $loops = 1; while ($loops) { warn_if_chatty("starting processing loop"); my @complete_processors = grep { $_->{state} == 3 } values %processors; warn_if_chatty("".scalar(@complete_processors)." complete for cleanup"); for my $p (@complete_processors) { if (my $dead_pid = $$p{DEAD}) { warn_if_chatty("processor $dead_pid already cleaned up, final flush"); delete $processors{$dead_pid}{$_} for keys %{$processors{$dead_pid}}; delete $processors{$dead_pid}; next; } $$p{DEAD} = $$p{pid}; warn_if_chatty("phase 1 cleanup of processor $$p{DEAD}"); $db_connections_in_use -= $$p{threads}; if ($$p{queues}) { for my $q (keys %{$$p{queues}}) { warn_if_chatty("processor $$p{pid} finished processing queue $q"); delete $queues_in_progress{$q}{processors}{$$p{pid}}; if (!scalar(keys(%{$queues_in_progress{$q}{processors}}))) { # that was the last processor for the queue warn_if_chatty("queue $q complete"); complete_outstanding_queue($q); delete $queues_in_progress{$q}; delete $$p{queues}{$q}; } } } } warn_if_chatty("".scalar(keys(%queues_in_progress))." queues in progress"); warn_if_chatty("checking for new queues"); my $new_queues = gather_outstanding_queues(); # array ref of queues if (@$new_queues) { warn_if_chatty("".scalar(@$new_queues)." new queues"); $queues_in_waiting{$_->{id}} = $_ for @$new_queues; my @ready_kids = grep { $_->{state} == 1 } values %processors; if (my $needed = scalar(@$new_queues) - scalar(@ready_kids)) { warn_if_chatty("spawning $needed new processors"); spawn_kid() while ($needed--); } @ready_kids = grep { $_->{state} == 1 } values %processors; my @sorted_queues = sort {$$a{run_at} cmp $$b{run_at}} values %queues_in_waiting; for my $q (@sorted_queues) { my $local_max = $max_child - $db_connections_in_use; if ($local_max > 0) { # we have connections available my $ready = shift @ready_kids; next unless $ready; # cap at unused max if more threads were requested $$q{threads} = $local_max if ($$q{threads} > $local_max); $ready->{threads} = $$q{threads}; $ready->{state} = 2; # running now $ready->{queues}{$$q{id}} = 1; $queues_in_progress{$$q{id}} = delete $queues_in_waiting{$$q{id}}; $queues_in_progress{$$q{id}}{processors}{$ready->{pid}} = 1; $db_connections_in_use += $$q{threads}; print {$ready->{pipe}} "n:Queue $$q{id}\n"; print {$ready->{pipe}} "p:$$q{threads}\n"; print {$ready->{pipe}} "q:$$q{id}\n"; shutdown($ready->{pipe},2); } else { warn_if_chatty("no db connections available, we'll wait"); } } } warn_if_chatty("checking orphan processor"); if ($orphan_processor->{state} == 3) { # replace it warn_if_chatty("replacing orphan processor, it is finished"); $pid = spawn_kid(); $orphan_processor = $processors{$pid}; $orphan_processor->{state} = 0; # will be decremented (made true) once used print {$orphan_processor->{pipe}} "n:Orphan processor\n"; # set parallelism } warn_if_chatty("gathering orphan entries"); my $new_entries = gather_outstanding_nonqueue_entries(10 * $max_child); # array ref of queue entries if ($new_entries and @$new_entries) { warn_if_chatty("".scalar(@$new_entries)." new entries"); if ($orphan_processor->{state}) { # already processing some entries warn_if_chatty("orphan processor is busy, wait and loop"); } else { my $local_max = $max_child - $db_connections_in_use; if ($local_max > 0) { $orphan_processor->{state}--; $db_connections_in_use += $local_max; $orphan_processor->{threads} = $local_max; $orphan_processor->{entries} = [ map { $$_{id} } @$new_entries ]; print {$orphan_processor->{pipe}} "p:$local_max\n"; # set parallelism print {$orphan_processor->{pipe}} "e:$_\n" for (@{$orphan_processor->{entries}}); print {$orphan_processor->{pipe}} "!!\n"; shutdown($orphan_processor->{pipe},2); } else { warn_if_chatty("no db connections available for the orphan processor, wait and loop"); } } } my $inner_loops = 0; warn_if_chatty("waiting for LISTEN notifications"); until (defined(refresh_main_dbh()->pg_notifies)) { sleep(1); $inner_loops++; if ($inner_loops >= 10) { last; # loop after 10s at most } } $loops++; } } exit; sub refresh_main_dbh { unless ($main_dbh->ping) { $main_dbh = DBI->connect( $dsn, $db_user, $db_pw, { AutoCommit => 1, pg_expand_array => 0, pg_enable_utf8 => 1, pg_bool_tf => 0, RaiseError => $raise_db_error } ) || die "Could not connect to the database\n"; $main_dbh->do('LISTEN queued_ingest') if ($daemon); } return $main_dbh; } sub REAPER { local $!; # don't let waitpid() overwrite current error warn_if_chatty("REAPER called"); while ((my $pid = waitpid(-1, WNOHANG)) > 0) { warn_if_chatty("reaping kid $pid"); $processors{$pid}{state} = 3; } $SIG{CHLD} = \&REAPER; } sub create_processor_dbhs { my $count = shift; my @dbhs; while (scalar(@dbhs) < $count) { push @dbhs, DBI->connect( $dsn, $db_user, $db_pw, { AutoCommit => 1, pg_expand_array => 0, pg_enable_utf8 => 1, pg_bool_tf => 0, RaiseError => $raise_db_error } ); } return @dbhs; } sub complete_outstanding_queue { my $qid = shift; return refresh_main_dbh()->do( 'UPDATE action.ingest_queue SET end_time = NOW()'. ' WHERE id=? RETURNING *', {}, $qid ); } sub gather_one_queue { my $qid = shift; my $q = refresh_main_dbh()->selectrow_hashref( 'UPDATE action.ingest_queue SET start_time = NOW()'. ' WHERE id = ? RETURNING *', {},$qid ); return $q; } sub gather_outstanding_queues { my $qs = refresh_main_dbh()->selectall_hashref( 'UPDATE action.ingest_queue SET start_time = NOW()'. ' WHERE start_time IS NULL AND run_at <= NOW()'. ' RETURNING *', 'id' ); for my $q (values %$qs) { $q->{threads} ||= 1; } return [values %$qs]; } sub gather_outstanding_nonqueue_entries { my $limit = shift; return refresh_main_dbh()->selectall_arrayref( "SELECT * FROM action.ingest_queue_entry". " WHERE queue IS NULL". " AND run_at <= NOW()". " AND override_by IS NULL". " AND ingest_time IS NULL". " AND fail_time IS NULL". " ORDER BY run_at, id". " LIMIT $limit", { Slice => {} } ); } sub spawn_kid { my $parent; my $child; socketpair($child, $parent, AF_UNIX, SOCK_STREAM, PF_UNSPEC) || die "socketpair: $!"; $parent->autoflush(1); $child->autoflush(1); my $kid_pid = fork() // die "Could not fork worker process"; if ($kid_pid) { close($parent); $processors{$kid_pid} = { pipe => $child, pid => $kid_pid, state => 1 }; } else { set_psname("Queued Ingest worker - waiting"); open(STDERR, ">>$opt_logfile") if ($opt_logfile); $SIG{CHLD} = 'IGNORE'; close($child); process_commands_from_parent($parent); warn_if_chatty("finished processing commands from parent, exiting"); report_stats("Entry Processing - worker $$",0,1); exit; } return $kid_pid; } sub warn_if_chatty { return unless $chatty; my $msg = shift; my $time = time; warn "$time [$$] $msg\n"; } sub process_commands_from_parent { my $ppipe = shift; my $stop_after = 0; while (1) { my @input; my $dbh_count = 1; my $cont = 0; while (<$ppipe>) { $cont = 1; chomp; if (/^q:(\d+)$/) { # we were passed a queue id my $queue = $1; $stop_after = 1; warn_if_chatty("processing queue $queue, should exit after"); process_one_queue($queue,$dbh_count); warn_if_chatty("processing queue $queue complete"); } elsif (/^n:(.+)$/) { # we were passed a process name set_psname("Queued Ingest worker - ". $1); } elsif (/^p:(\d+)$/) { # we were passed a dbh count (parallelism) $dbh_count = $1 || 1; warn_if_chatty("parallelism set to $dbh_count"); } elsif (/^e:(\d+)$/) { # we were passed an entry id my $entry = $1; push @input, $entry; } elsif (/^##$/) { # This is the "process those, but then wait" command last; } elsif (/^!!$/) { # This is the "end of input, process and exit" command warn_if_chatty("end of the command stream, should exit after"); $stop_after = 1; last; } } # we have a list of entries to process if (my $icount = scalar(@input)) { my @dbhs = create_processor_dbhs($dbh_count); warn_if_chatty("processing $icount entries..."); process_input_list(\@input, \@dbhs); warn_if_chatty("processing $icount entries complete"); } last if $stop_after || !$cont; } close($ppipe); } sub process_one_queue { my $qid = shift; my $dbh_count = shift || 1; my $dbh_list = shift; return unless $qid; my @dbhs = $dbh_list ? @$dbh_list : create_processor_dbhs($dbh_count); my @input = @{$dbhs[0]->selectcol_arrayref( 'SELECT id FROM action.ingest_queue_entry'. ' WHERE queue = ?'. ' AND override_by IS NULL'. ' AND ingest_time IS NULL'. ' AND fail_time IS NULL', {}, $qid )}; return unless @input; return process_input_list(\@input, \@dbhs); } sub process_entry_begin { my $entry_id = shift; my $ms = shift; my $dbh = shift; my $entry = $dbh->selectrow_hashref( "SELECT * FROM action.ingest_queue_entry WHERE id = ?", undef, $entry_id ); if (!$$entry{id}) { return $dbh; } my $sth = $dbh->prepare( "SELECT action.process_ingest_queue_entry(?)", {pg_async => 1} ); if (!$sth->execute($entry_id)) { $stats{total}{fail}++; $stats{$$entry{record_type}}{$$entry{action}}{fail}++; my $current_second = CORE::time; $stats{seconds}{$current_second}{fail}++; $stats{seconds}{$current_second}{total}++; return $dbh; } $$ms{$entry_id} = { entry => $entry, dbh => $dbh, sth => $sth }; return undef; } sub process_entry_complete { my $eid = shift; my $ms = shift; if ($$ms{$eid}{sth}->pg_ready) { $$ms{$eid}{sth}->pg_result; my ($success) = $$ms{$eid}{sth}->fetchrow_array; $$ms{$eid}{sth}->finish; $success = $success ? 'success' : 'fail'; $stats{total}{$success}++; $stats{$$ms{$eid}{entry}{record_type}}{$$ms{$eid}{entry}{action}}{$success}++; my $current_second = CORE::time; $stats{seconds}{$current_second}{$success}++; $stats{seconds}{$current_second}{total}++; my $dbh = delete $$ms{$eid}{dbh}; delete $$ms{$eid}{$_} for keys %{$$ms{$eid}}; delete $$ms{$eid}; return $dbh; } return undef; } sub process_input_list { my $input = shift; my $dbhs = shift; my %microstate; while (@$input || keys(%microstate)) { # do we have an idle worker, and work to do? if (@$input && scalar(@$dbhs)) { my $entry_id = shift @$input; my $dbh = shift @$dbhs; my $failed_dbh = process_entry_begin($entry_id, \%microstate, $dbh); if ($failed_dbh) { push @$dbhs, $failed_dbh; next; } } # look at them in ascending order, as the oldest will have # been running the longest and is more likely to be finished my @entries = sort {$a <=> $b} keys %microstate; for my $eid (@entries) { my $success_dbh = process_entry_complete($eid, \%microstate); if ($success_dbh) { push @$dbhs, $success_dbh; } } usleep(10000) if (keys %microstate); # ~0.01 seconds } return $dbhs; } sub report_stats { my $label = shift; my $clear = shift; my $warn = shift; my $runtime = time - $start_time; my @seconds_list = sort keys %{$stats{seconds}}; my $first_second = $seconds_list[0]; my $last_second = $seconds_list[-1]; my $processing_seconds = ($last_second - $first_second) + 1.0; system('clear') if $clear; print "$label stats:\n" unless $warn; warn "$label stats:\n" if $warn; for my $type (qw/biblio authority/) { for my $action ( sort keys %{$stats{$type}} ) { my $part = $stats{$type}{$action}; next unless (defined($$part{success}) || defined($$part{fail})); $$part{success} //= 0; $$part{fail} //= 0; my $subtotal = $$part{success} + $$part{fail}; print " * Type: $type\n - Action: $action :: $$part{success}/$$part{fail}/$subtotal\n" unless $warn; warn " * Type: $type\n - Action: $action :: $$part{success}/$$part{fail}/$subtotal\n" if $warn; } } $stats{total}{success} //= 0; $stats{total}{fail} //= 0; my $per_sec = ($stats{total}{success} + $stats{total}{fail}) / $processing_seconds; print " * Timing: $runtime runtime, $processing_seconds seconds processing, $per_sec per second\n". " * Total: $stats{total}{success} succeeded, $stats{total}{fail} failed\n" unless $warn; warn " * Timing: $runtime runtime, $processing_seconds seconds processing, $per_sec per second\n". " * Total: $stats{total}{success} succeeded, $stats{total}{fail} failed\n" if $warn; } sub enqueue_input { my $predestined_input = shift; my @input; if ($predestined_input and @$predestined_input) { @input = @$predestined_input; } elsif ($opt_pipe) { while () { # Assume any string of digits is an id. if (my @subs = /([0-9]+)/g) { push(@input, @subs); } } } elsif (@$input_records) { @input = grep { /^\d+$/ } @$input_records; } else { my $q = "SELECT id FROM $queue_type.record_entry WHERE NOT DELETED"; if ($start_id && $end_id) { $q .= " AND id BETWEEN $start_id AND $end_id"; } elsif ($start_id) { $q .= " AND id >= $start_id"; } elsif ($end_id) { $q .= " AND id <= $end_id"; } $q .= ' ORDER BY id ASC'; @input = @{$main_dbh->selectcol_arrayref($q)}; } $main_dbh->begin_work; if ($queue_why || $queue_threads || $queue_run_at) { my $rows = $main_dbh->do( 'INSERT INTO action.ingest_queue (why,threads,run_at) VALUES(?,?,?)', undef, $queue_why, $queue_threads || 1, $queue_run_at || 'NOW' ); if ($rows && $queue_owner) { $queue = $main_dbh->last_insert_id(undef,'action','ingest_queue',undef); if ($queue && $queue_owner) { my $uid = $main_dbh->selectcol_arrayref( 'SELECT id FROM actor.usr WHERE usrname = ?', undef, $queue_owner )->[0]; if ($uid) { $rows = $main_dbh->do( 'UPDATE action.ingest_queue SET who = ? WHERE id = ?', undef, $uid, $queue ); } } } } my $q_obj = {}; if ($queue) { $q_obj = $main_dbh->selectrow_hashref("SELECT * FROM action.ingest_queue WHERE id=$queue") || {}; } $queue = $q_obj->{id} || '0'; if ($queue_type eq 'biblio' and $queue_action eq 'update') { $queue_state_data .= ';skip_browse' if $skip_browse; $queue_state_data .= ';skip_attrs' if $skip_attrs; $queue_state_data .= ';skip_search' if $skip_search; $queue_state_data .= ';skip_facets' if $skip_facets; $queue_state_data .= ';skip_display' if $skip_display; $queue_state_data .= ';skip_full_rec' if $skip_full_rec; $queue_state_data .= ';skip_authority' if $skip_authority; $queue_state_data .= ';skip_luri' if $skip_luri; $queue_state_data .= ';skip_mrmap' if $skip_mrmap; $queue_state_data .= ';attr_list('.join(',',@$record_attrs).')' if @$record_attrs; $queue_state_data .= ';field_list('.join(',',@$metabib_fields).')' if @$metabib_fields; } my $qid = $q_obj->{id}; my $run_at = $q_obj->{run_at} || 'NOW'; for my $rid (@input) { my $success = $main_dbh->selectcol_arrayref( 'SELECT action.enqueue_ingest_entry(?, ?, ?, ?, ?, ?)', {}, $rid, $queue_type, $run_at, $qid, $queue_action, $queue_state_data )->[0]; my $update_key = $success ? 'success' : 'fail'; $stats{total}{$update_key}++; $stats{$queue_type}{$queue_action}{$update_key}++; my $current_second = CORE::time; $stats{seconds}{$current_second}{$update_key}++; $stats{seconds}{$current_second}{total}++; } $main_dbh->commit; return \@input; }