LP#1358916: refuse to retrieve over-large MARC records via Z39.50
[working/Evergreen.git] / Open-ILS / src / perlmods / lib / OpenILS / Application / Search / Z3950.pm
index 978b106..a5a4f6c 100644 (file)
@@ -11,11 +11,14 @@ use XML::LibXML;
 
 use OpenILS::Event;
 use OpenSRF::EX qw(:try);
+use OpenSRF::MultiSession;
 use OpenILS::Utils::ModsParser;
 use OpenSRF::Utils::SettingsClient;
+use OpenSRF::Utils::JSON;
 use OpenILS::Application::AppUtils;
 use OpenSRF::Utils::Logger qw/$logger/;
 use OpenILS::Utils::CStoreEditor q/:funcs/;
+use OpenILS::Utils::Normalize qw/clean_marc/;                                  
 
 MARC::Charset->assume_unicode(1);
 MARC::Charset->ignore_errors(1);
@@ -27,6 +30,44 @@ my $sclient;
 my %services;
 my $default_service;
 
+__PACKAGE__->register_method(
+    method    => 'apply_credentials',
+    api_name  => 'open-ils.search.z3950.apply_credentials',
+    signature => {
+        desc   => "Apply credentials for a Z39.50 server",
+        params => [
+            {desc => 'Authtoken', type => 'string'},
+            {desc => 'Z39.50 Source (server) name', type => 'string'},
+            {desc => 'Context org unit', type => 'number'},
+            {desc => 'Username', type => 'string'},
+            {desc => 'Password', type => 'string'}
+        ],
+        return => {
+            desc => 'Event; SUCCESS on success, other event type on error'
+        }
+    }
+);
+
+sub apply_credentials {
+    my ($self, $client, $auth, $source, $ctx_ou, $username, $password) = @_;
+
+    my $e = new_editor(authtoken => $auth, xact => 1);
+
+    return $e->die_event unless 
+        $e->checkauth and 
+        $e->allowed('ADMIN_Z3950_SOURCE', $ctx_ou);
+
+    $e->json_query({from => [
+        'config.z3950_source_credentials_apply',
+        $source, $ctx_ou, $username, $password
+    ]}) or return $e->die_event;
+
+    $e->commit;
+
+    return OpenILS::Event->new('SUCCESS');
+}
+
 
 __PACKAGE__->register_method(
     method    => 'do_class_search',
@@ -236,10 +277,11 @@ sub do_class_search {
                 push @services, $tmp_args{service}; 
                 push @results, $res->{result}; 
                 push @connections, $res->{connection}; 
+
+                $logger->debug("z3950: Result object: $results[$i], Connection object: $connections[$i]");
             } 
         }
 
-        $logger->debug("z3950: Result object: $results[$i], Connection object: $connections[$i]");
     }
 
     $logger->debug("z3950: Connections created");
@@ -314,14 +356,22 @@ sub do_search {
     my $limit = $$args{limit} || 10;
     my $offset = $$args{offset} || 0;
 
-    my $username = $$args{username} || "";
-    my $password = $$args{password} || "";
+    my $editor = new_editor(authtoken => $auth);
+    return $editor->event unless 
+        $editor->checkauth and
+        $editor->allowed('REMOTE_Z3950_QUERY', $editor->requestor->ws_ou);
 
-    my $tformat = $services{$args->{service}}->{transmission_format} || $output;
+    my $creds = $editor->json_query({from => [
+        'config.z3950_source_credentials_lookup',
+        $$args{service}, $editor->requestor->ws_ou
+    ]})->[0] || {};
 
-    my $editor = new_editor(authtoken => $auth);
-    return $editor->event unless $editor->checkauth;
-    return $editor->event unless $editor->allowed('REMOTE_Z3950_QUERY');
+    # use the caller-provided username/password if offered.
+    # otherwise, use the stored credentials.
+    my $username = $$args{username} || $creds->{username} || "";
+    my $password = $$args{password} || $creds->{password} || "";
+
+    my $tformat = $services{$args->{service}}->{transmission_format} || $output;
 
     $logger->info("z3950: connecting to server $host:$port:$db as $username");
 
@@ -391,7 +441,7 @@ sub process_results {
     my $res = {};
     my $count = $$res{count} = $results->size;
 
-    $logger->info("z3950: search returned $count hits");
+    $logger->info("z3950: '$service' search returned $count hits");
 
     my $tend = $limit + $offset;
 
@@ -412,7 +462,13 @@ sub process_results {
             my $rec = $results->record($_);
 
             if ($tformat eq 'usmarc') {
-                $marc = MARC::Record->new_from_usmarc($rec->raw());
+                my $raw = $rec->raw();
+                if (length($raw) <= 99999) {
+                    $marc = MARC::Record->new_from_usmarc($raw);
+                } else {
+                    $marcs = '';
+                    die "ISO2709 record is too large to process";
+                }
             } elsif ($tformat eq 'xml') {
                 $marc = MARC::Record->new_from_xml($rec->raw());
             } else {
@@ -479,5 +535,380 @@ sub compile_query {
     return $str;
 }
 
+
+__PACKAGE__->register_method(
+    method    => 'bucket_search_queue',
+    api_name  => 'open-ils.search.z3950.bucket_search_queue',
+    stream    => 1,
+    # disable opensrf chunking so the caller can receive timely responses
+    max_chunk_size => 0,
+    signature => {
+        desc => q/
+            Performs a Z39.50 search for every record in a bucket, using the
+            provided Z39.50 fields.  Add all search results to the specified
+            Vandelay queue.  If no source records or search results are found,
+            no queue is created.
+        /,
+        params => [
+            {desc => q/Authentication token/, type => 'string'},
+            {desc => q/Bucket ID/, type => 'number'},
+            {desc => q/Z39 Sources.  List of czs.name/, type => 'array'},
+            {desc => q/Z39 Index Maps.  List of czifm.id/, type => 'array'},
+            {   desc => q/Vandelay arguments
+                    queue_name -- required
+                    match_set
+                    ...
+                    /, 
+                type => 'object'
+            }
+        ],
+        return => {
+            desc => q/Object containing status information about the on-going search
+            and queue operation. 
+            {
+                bre_count    : $num, -- number of bibs to search against
+                search_count : $num,
+                search_complete  : $num,
+                queue_count  : $num
+                queue        : $queue_obj
+            }
+            This object will be streamed back with each milestone (search
+            result or complete).
+            Event object returned on failure
+            /
+        }
+    }
+);
+
+sub bucket_search_queue {
+    my $self = shift;
+    my $conn = shift;
+    my $auth = shift;
+    my $bucket_id = shift;
+    my $z_sources = shift;
+    my $z_indexes = shift;
+    my $vandelay = shift;
+
+    my $e = new_editor(authtoken => $auth);
+    return $e->event unless 
+        $e->checkauth and
+        $e->allowed('REMOTE_Z3950_QUERY') and
+        $e->allowed('CREATE_BIB_IMPORT_QUEUE');
+    
+    # find the source bib records
+
+    my $bre_ids = $e->json_query({
+        select => {cbrebi => ['target_biblio_record_entry']},
+        from => 'cbrebi',
+        where => {bucket => $bucket_id},
+        distinct => 1
+    });
+
+    # empty bucket
+    return {bre_count => 0} unless @$bre_ids;
+
+    $bre_ids = [ map {$_->{target_biblio_record_entry}} @$bre_ids ];
+
+    $z_indexes = $e->search_config_z3950_index_field_map({id => $z_indexes});
+
+    return OpenILS::Event->new('BAD_PARAMS', 
+        note => q/No z_indexes/) unless @$z_indexes;
+
+    # build the Z39 queries for the source bib records
+
+    my $z_searches = compile_bucket_zsearch(
+        $e, $bre_ids, $z_sources, $z_indexes);
+
+    return $e->event unless $z_searches;
+    return {bre_count => 0} unless @$z_searches;
+
+    my $queue = create_z39_bucket_queue($e, $bucket_id, $vandelay);
+    return $e->event unless $queue;
+
+    send_and_queue_bucket_searches($conn, $e, $queue, $z_searches);
+
+    return undef;
+}
+
+ # create the queue for storing search results
+sub create_z39_bucket_queue {
+    my ($e, $bucket_id, $vandelay) = @_;
+
+    my $existing = $e->search_vandelay_bib_queue({
+        name => $vandelay->{queue_name},
+        owner => $e->requestor->id
+    })->[0];
+
+    return $existing if $existing;
+
+    my $queue = Fieldmapper::vandelay::bib_queue->new;
+    $queue->match_bucket($bucket_id);
+    $queue->owner($e->requestor->id);
+    $queue->name($vandelay->{queue_name});
+    $queue->match_set($vandelay->{match_set});
+
+    $e->xact_begin;
+    unless ($e->create_vandelay_bib_queue($queue)) {
+        $e->rollback;
+        return undef;
+    }
+    $e->commit;
+
+    return $queue;
+}
+
+# sets the 901c value to the Z39 service and 
+# adds the record to the growing vandelay queue
+# returns the number of successfully queued records
+sub stamp_and_queue_results {
+    my ($e, $queue, $service, $bre_id, $result) = @_;
+    my $qcount = 0;
+
+    for my $rec (@{$result->{records}}) {
+        # insert z39 service as the 901z
+        my $marc = MARC::Record->new_from_xml(
+            $rec->{marcxml}, 'UTF-8', 'USMARC');
+
+        $marc->insert_fields_ordered(
+            MARC::Field->new('901', '', '', z => $service));
+
+        # put the record into the queue
+        my $qrec = Fieldmapper::vandelay::queued_bib_record->new;
+        $qrec->marc(clean_marc($marc));
+        $qrec->queue($queue->id);
+
+        $e->xact_begin;
+        if ($e->create_vandelay_queued_bib_record($qrec)) {
+            $e->commit;
+            $qcount++;
+        } else {
+            my $evt = $e->die_event;
+            $logger->error("z39: unable to queue record: $evt");
+        }
+    }
+
+    return $qcount;
+}
+
+sub send_and_queue_bucket_searches {
+    my ($conn, $e, $queue, $z_searches) = @_;
+
+    my $max_parallel = $U->ou_ancestor_setting(
+        $e->requestor->ws_ou,
+        'cat.z3950.batch.max_parallel') || 5;
+
+    my $search_limit = $U->ou_ancestor_setting(
+        $e->requestor->ws_ou,
+        'cat.z3950.batch.max_results') || 5;
+
+    my $response = {
+        bre_count => 0,
+        search_count => 0,
+        search_complete => 0,
+        queue_count => 0
+    };
+
+    # searches are about to be in flight
+    # let the caller know we're still alive
+    $conn->respond($response);
+
+    my $handle_search_result = sub {
+        my ($self, $req) = @_;
+        my $bre_id = $req->{req}->{_bre_id};
+
+        my @p = $req->{req}->payload->params;
+        $logger->debug("z39: multi-search response for request [$bre_id]". 
+            OpenSRF::Utils::JSON->perl2JSON(\@p));
+
+        for my $resp (@{$req->{response}}) {
+            $response->{search_complete}++;
+            my $result = $resp->content or next;
+            my $service = $result->{service};
+            $response->{queue_count} += 
+                stamp_and_queue_results($e, $queue, $service, $bre_id, $result);
+        }
+
+        $conn->respond($response);
+    };
+
+    my $multi_ses = OpenSRF::MultiSession->new(
+        app             => 'open-ils.search',
+        cap             => $max_parallel,
+        timeout         => 120,
+        success_handler => $handle_search_result
+    );
+
+    # note: mult-session blocks new requests when it hits max 
+    # parallel, so we need to cacluate summary values up front.
+    my %bre_uniq;
+    $bre_uniq{$_->{bre_id}} = 1 for @$z_searches;
+    $response->{bre_count} = scalar(keys %bre_uniq);
+    $response->{search_count} += scalar(@$z_searches);
+
+    # let the caller know searches are on their way out
+    $conn->respond($response);
+
+    for my $search (@$z_searches) {
+
+        my $bre_id = delete $search->{bre_id};
+        $search->{limit} = $search_limit;
+
+        # toss it onto the multi-pile
+        my $req = $multi_ses->request(
+            'open-ils.search.z3950.search_class', $e->authtoken, $search);
+
+        $req->{_bre_id} = $bre_id;
+    }
+
+    $multi_ses->session_wait(1);
+    $response->{queue} = $queue;
+    $conn->respond($response);
+}
+
+
+# creates a series of Z39.50 searchs based on the 
+# in-bucket records and the selected sources and indexes
+sub compile_bucket_zsearch {
+    my ($e, $bre_ids, $z_sources, $z_indexes) = @_;
+
+    # pre-load the metabib_field's we'll need for this batch
+
+    my %mb_fields;
+    my @mb_fields = grep { $_->metabib_field } @$z_indexes;
+    if (@mb_fields) {
+        @mb_fields = map { $_->metabib_field } @mb_fields;
+        my $field_objs = $e->search_config_metabib_field({id => \@mb_fields});
+        %mb_fields = map {$_->id => $_} @$field_objs;
+    }
+
+    # pre-load the z3950_attrs we'll need for this batch
+
+    my %z3950_attrs;
+    my @z3950_attrs = grep { $_->z3950_attr } @$z_indexes;
+    if (@z3950_attrs) {
+        @z3950_attrs = map { $_->z3950_attr } @z3950_attrs;
+        my $attr_objs = $e->search_config_z3950_attr({id => \@z3950_attrs});
+        %z3950_attrs = map {$_->id => $_} @$attr_objs;
+    }
+
+    # indexes with specific z3950_attr's take precedence
+    my @z_index_attrs = grep { $_->z3950_attr } @$z_indexes;
+    my @z_index_types = grep { !$_->z3950_attr } @$z_indexes;
+
+    # for each bib record, extract the indexed value for the selected indexes.  
+    my %z_searches;
+
+    for my $bre_id (@$bre_ids) {
+
+        $z_searches{$bre_id} = {};
+
+        for my $z_index (@z_index_attrs, @z_index_types) {
+
+            my $bre_val;
+            if ($z_index->record_attr) {
+
+                my $attrs = $U->get_bre_attrs($bre_id, $e);
+                $bre_val = $attrs->{$bre_id}{$z_index->record_attr}{code};
+
+            } else { # metabib_field
+                my $fid = $z_index->metabib_field;
+
+                # the value for each field will be in the 
+                # index class-specific table
+                my $entry_query = sprintf(
+                    'search_metabib_%s_field_entry', 
+                    $mb_fields{$fid}->field_class);
+
+                my $entry = $e->$entry_query(
+                    {field => $fid, source => $bre_id})->[0];
+
+                $bre_val = $entry->value if $entry;
+            }
+
+            # no value means no search
+            next unless $bre_val;
+
+            # determine which z3950 source to send this search field to 
+
+            my $z_source = [];
+            my $z_index_name;
+            if ($z_index->z3950_attr) {
+
+                # a specific z3950_attr means this search index
+                # only applies to the z_source linked to the attr
+
+                $z_index_name = $z3950_attrs{$z_index->z3950_attr}->name;
+                my $src = $z3950_attrs{$z_index->z3950_attr}->source;
+
+                if (grep { $_ eq $src } @$z_sources) {
+                    $z_searches{$bre_id}{$src} ||= {
+                        service => [$src],
+                        search => {}
+                    };
+                    $z_searches{$bre_id}{$src}{search}{$z_index_name} = $bre_val;
+
+                } else {
+                    $logger->warn("z39: z3950_attr '$z_index_name' for '$src'".
+                        " selected, but $src is not in the search list.  Skipping...");
+                }
+
+            } else {
+
+                # when a generic attr type is used, it applies to all 
+                # z-sources, except those for which a more specific
+                # z3950_attr has already been applied
+
+                $z_index_name = $z_index->z3950_attr_type;
+
+                my @excluded;
+                for my $attr (values %z3950_attrs) {
+                    push(@excluded, $attr->source)
+                        if $attr->name eq $z_index_name;
+                }
+
+                for my $src (@$z_sources) {
+                    next if grep {$_ eq $src} @excluded;
+                    $z_searches{$bre_id}{$src} ||= {
+                        service => [$src],
+                        search => {}
+                    };
+                    $z_searches{$bre_id}{$src}{search}{$z_index_name} = $bre_val;
+                }
+            }
+        }
+    }
+
+    # NOTE: ISBNs are sent through the translate_isbn1013 normalize
+    # before entring metabib.identifier_field_entry.  As such, there
+    # will always be at minimum 2 ISBNs per record w/ ISBN and the
+    # data will be pre-sanitized.  The first ISBN in the list is the
+    # ISBN from the record.  Use that for these searches.
+    for my $bre_id (keys %z_searches) {
+        for my $src (keys %{$z_searches{$bre_id}}) {
+            my $blob = $z_searches{$bre_id}{$src};
+
+            # Sanitized ISBNs are space-separated.
+            # kill everything past the first space
+            $blob->{search}{isbn} =~ s/\s.*//g if $blob->{search}{isbn};
+        }
+    }
+
+    # let's turn this into something slightly more digestable
+    my @searches;
+    for my $bre_id (keys %z_searches) {
+        for my $blobset (values %{$z_searches{$bre_id}}) {
+            $blobset = [$blobset] unless ref $blobset eq 'ARRAY';
+            for my $blob (@$blobset) {
+                $blob->{bre_id} = $bre_id;
+                push(@searches, $blob);
+            }
+        }
+    }
+
+    return \@searches;
+}
+
+
+
 1;
 # vim:et:ts=4:sw=4: