LP#1514085 Vandelay in-database session tracking
[working/Evergreen.git] / Open-ILS / src / perlmods / lib / OpenILS / Application / Vandelay.pm
index ba64b14..0647b33 100644 (file)
@@ -17,6 +17,7 @@ use Time::HiRes qw(time);
 use OpenSRF::Utils::Logger qw/$logger/;
 use MIME::Base64;
 use XML::LibXML;
+use Digest::MD5 qw/md5_hex/;
 use OpenILS::Const qw/:const/;
 use OpenILS::Application::AppUtils;
 use OpenILS::Application::Cat::BibCommon;
@@ -238,6 +239,8 @@ sub process_spool {
     my $purpose = shift;
     my $filename = shift;
     my $bib_source = shift;
+    my $session_name = shift;
+    my $exit_early = shift;
 
     $client->max_chunk_count($self->{max_bundle_count}) if (!$client->can('max_bundle_count') && $self->{max_bundle_count});
 
@@ -272,6 +275,17 @@ sub process_spool {
 
     $logger->info("vandelay spooling $fingerprint purpose=$purpose file=$filename");
 
+    my ($tracker, $tevt) = create_session_tracker(
+        $e->requestor->id, $e->requestor->wsid, $fingerprint, 
+        $session_name, $type, $queue_id, 'enqueue');
+
+    if (!$tracker) {
+        $e->rollback;
+        return $tevt;
+    }
+
+    $client->respond_complete($tracker) if $exit_early;
+
     my $marctype = 'USMARC'; 
 
     open F, $filename;
@@ -309,11 +323,15 @@ sub process_spool {
                 next;
             }
 
-            if($self->api_name =~ /stream_results/ and $qrec) {
-                $client->respond($qrec->id)
-            } else {
-                $client->respond($count) if (++$count % $response_scale) == 0;
-                $response_scale *= 10 if ($count == ($response_scale * 10));
+            return $tevt if $tevt = increment_session_tracker($tracker);
+
+            if (!$exit_early) { # avoid unnecessary responses
+                if($self->api_name =~ /stream_results/ and $qrec) {
+                    $client->respond($qrec->id)
+                } else {
+                    $client->respond($count) if (++$count % $response_scale) == 0;
+                    $response_scale *= 10 if ($count == ($response_scale * 10));
+                }
             }
         } catch Error with {
             my $error = shift;
@@ -321,6 +339,9 @@ sub process_spool {
         }
     }
 
+    $tracker->state('complete');
+    $e->update_vandelay_session_tracker($tracker) or return $e->die_event;
+
     $e->commit;
     unlink($filename);
     $cache->delete_cache('vandelay_import_spool_' . $fingerprint) if $fingerprint;
@@ -883,6 +904,62 @@ sub queued_records_with_matches {
     return [ map {$_->{queued_record}} @$data ];
 }
 
+# Returns (tracker, err_event)
+# Runs within its own transaction.
+sub create_session_tracker {
+    my ($user_id, $ws_id, $key, $name, $type, $queue_id, $action, $total_acts) = @_;
+    my $e = new_editor(xact => 1);
+
+    if ($key) {
+        # if other trackers exist for this key, adopt the name
+        my $existing = 
+            $e->search_vandelay_session_tracker({session_key => $key})->[0];
+        $name = $existing->name if $name;
+
+    } else {
+        # anonymous tracker
+        $key = md5_hex(time."$$".rand());
+    }
+
+    my $tracker = Fieldmapper::vandelay::session_tracker->new;
+    $tracker->session_key($key);
+    $tracker->name($name || $key);
+    $tracker->usr($user_id);
+    $tracker->workstation($ws_id);
+    $tracker->record_type($type);
+    $tracker->queue($queue_id);
+    $tracker->action_type($action);
+    $tracker->total_actions($total_acts) if $total_acts;
+
+    # caller responsible for rolling back transaction
+    return (undef, $e->die_event) unless
+        $e->create_vandelay_session_tracker($tracker);
+
+    # Re-fetch to ensure we have all defaults applied for future updates.
+    return (undef, $e->die_event) unless 
+        $tracker = $e->retrieve_vandelay_session_tracker($tracker->id);
+
+    $e->commit;
+
+    return ($tracker);
+}
+
+# Increment the actions_performed count.
+# Must happen in its own transaction
+# Returns undef on success, Event on error
+sub increment_session_tracker {
+    my $tracker = shift;
+    my $amount = shift || 1;
+    my $e = new_editor(xact => 1);
+    $tracker->update_time('now');
+    $tracker->actions_performed($tracker->actions_performed + $amount);
+    return $e->die_event unless 
+        $e->update_vandelay_session_tracker($tracker);
+
+    $e->commit;
+    return undef;
+}
+
 
 # cache of import item org unit settings.  
 # used in apply_import_item_defaults() below, 
@@ -896,13 +973,15 @@ sub import_record_list_impl {
     my $type = $self->{record_type};
     my %queues;
     %item_defaults_cache = ();
+    my $exit_early = $args->{exit_early};
 
     my $report_args = {
         progress => 1,
         step => 1,
         conn => $conn,
         total => scalar(@$rec_ids),
-        report_all => $$args{report_all}
+        report_all => $$args{report_all},
+        exit_early => $exit_early
     };
 
     $conn->max_chunk_count(1) if (!$conn->can('max_bundle_size') && $conn->can('max_chunk_size') && $$args{report_all});
@@ -947,6 +1026,9 @@ sub import_record_list_impl {
         $rec_class = 'vqar';
     }
 
+
+    my $tracker;
+    my $tevt;
     my $new_rec_perm_cache;
     my @success_rec_ids;
     for my $rec_id (@$rec_ids) {
@@ -975,6 +1057,28 @@ sub import_record_list_impl {
             next;
         }
 
+        if (!$tracker) {
+            # Create the import tracker using the queue of the first
+            # retrieved record.  I'm fairly certain in practice all
+            # lists of records for import come from a single queue.
+            # We could get the queue from the previously created
+            # 'enqueue' tracker, but this is a safetly valve to handle
+            # imports where no enqueue tracker exists, e.g. records
+            # enqueued pre-upgrade.
+            ($tracker, $tevt) = create_session_tracker(
+                $e->requestor->id, $e->requestor->wsid, $args->{session_key}, 
+                undef, $type, $rec->queue, 'import', scalar(@$rec_ids));
+
+            if (!$tracker) {
+                $e->rollback;
+                return $tevt;
+            }
+
+            $report_args->{tracker} = $tracker;
+
+            $conn->respond_complete($tracker) if $exit_early;
+        }
+
         if($rec->import_time) {
             # if the record is already imported, that means it may have 
             # un-imported copies.  Add to success list for later processing.
@@ -1219,8 +1323,17 @@ sub import_record_list_impl {
         $e->rollback;
     }
 
-    # import the copies
-    import_record_asset_list_impl($conn, \@success_rec_ids, $requestor, $args) if @success_rec_ids;
+    import_record_asset_list_impl($conn, 
+        \@success_rec_ids, $requestor, $args, $tracker) if @success_rec_ids;
+
+    if ($tracker) { # there are edge cases where it may not exist
+        my $e = new_editor(xact => 1);
+        $e->requestor($requestor);
+        $tracker->update_time('now');
+        $tracker->state('complete');
+        $e->update_vandelay_session_tracker($tracker) or return $e->die_event;
+        $e->commit;
+    }
 
     $conn->respond({total => $$report_args{total}, progress => $$report_args{progress}});
     return undef;
@@ -1337,10 +1450,20 @@ sub finish_rec_import_attempt {
     my $evt = $$args{evt};
     my $rec = $$args{rec};
     my $e = $$args{e};
+    my $tracker = $$args{tracker};
+    my $exit_early = $$args{exit_early};
 
     my $error = $$args{import_error};
     $error = 'general.unknown' if $evt and not $error;
 
+    # Note the tracker is updated regardless of whether the individual
+    # record import succeeded.  It's only a failed tracker if the
+    # entire process fails.
+    if ($tracker) { # tracker may be undef in rec-not-found situations
+        my $tevt = increment_session_tracker($tracker);
+        return $tevt if $tevt;
+    }
+
     # error tracking
     if($rec) {
 
@@ -1362,6 +1485,7 @@ sub finish_rec_import_attempt {
             $e->$method($rec) and $e->commit or $e->rollback;
 
         } else {
+
             # commit the successful import
             $e->commit;
         }
@@ -1371,17 +1495,19 @@ sub finish_rec_import_attempt {
         $e->rollback;
     }
         
-    # respond to client
-    if($$args{report_all} or ($$args{progress} % $$args{step}) == 0) {
-        $$args{conn}->respond({
-            total => $$args{total}, 
-            progress => $$args{progress}, 
-            imported => ($rec) ? $rec->id : undef,
-            import_error => $error,
-            no_import => $$args{no_import},
-            err_event => $evt
-        });
-        $$args{step} *= 2 unless $$args{step} == 256;
+    # respond to client unless we've already responded-complete
+    if (!$exit_early) {
+        if($$args{report_all} or ($$args{progress} % $$args{step}) == 0) {
+            $$args{conn}->respond({
+                total => $$args{total}, 
+                progress => $$args{progress}, 
+                imported => ($rec) ? $rec->id : undef,
+                import_error => $error,
+                no_import => $$args{no_import},
+                err_event => $evt
+            });
+            $$args{step} *= 2 unless $$args{step} == 256;
+        }
     }
 
     $$args{progress}++;
@@ -1590,7 +1716,7 @@ sub retrieve_queue_summary {
 # Given a list of queued record IDs, imports all items attached to those records
 # --------------------------------------------------------------------------------
 sub import_record_asset_list_impl {
-    my($conn, $rec_ids, $requestor, $args) = @_;
+    my($conn, $rec_ids, $requestor, $args, $tracker) = @_;
 
     my $roe = new_editor(xact=> 1, requestor => $requestor);
 
@@ -1615,6 +1741,14 @@ sub import_record_asset_list_impl {
         in_count => 0,
     };
 
+    if ($tracker && @$rec_ids) {
+        if (my $tevt = # assignment
+            increment_session_tracker($tracker, scalar(@$rec_ids))) {
+            $roe->rollback;
+            return $tevt;
+        }
+    }
+
     for my $rec_id (@$rec_ids) {
         my $rec = $roe->retrieve_vandelay_queued_bib_record($rec_id);
         my $item_ids = $roe->search_vandelay_import_item(
@@ -1622,6 +1756,13 @@ sub import_record_asset_list_impl {
             {idlist=>1}
         );
 
+        if ($tracker) { # increment per record
+            if (my $tevt = increment_session_tracker($tracker)) {
+                $roe->rollback;
+                return $tevt;
+            }
+        }
+
         # if any items have no call_number label and a value should be
         # applied automatically (via org settings), we want to use the same
         # call number label for every copy per org per record.