]> git.evergreen-ils.org Git - working/Evergreen.git/blob - Open-ILS/src/support-scripts/marc_stream_importer.pl
bdb740bac5fa1adca4f87fbdba076deeb421df9c
[working/Evergreen.git] / Open-ILS / src / support-scripts / marc_stream_importer.pl
1 #!/usr/bin/perl
2 # Copyright (C) 2008-2010 Equinox Software, Inc.
3 # Author: Bill Erickson <erickson@esilibrary.com>
4 #
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU General Public License
7 # as published by the Free Software Foundation; either version 2
8 # of the License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14
15
16 use strict; use warnings;
17 use Net::Server::PreFork;
18 use base qw/Net::Server::PreFork/;
19 use MARC::Record;
20 use MARC::Batch;
21 use MARC::File::XML ( BinaryEncoding => 'UTF-8' );
22 use MARC::File::USMARC;
23
24 use Data::Dumper;
25 use File::Basename qw/fileparse/;
26 use File::Temp;
27 use Getopt::Long qw(:DEFAULT GetOptionsFromArray);
28 use Pod::Usage;
29 use Socket;
30
31 use OpenSRF::Utils::Logger qw/$logger/;
32 use OpenSRF::AppSession;
33 use OpenSRF::EX qw/:try/;
34 use OpenILS::Utils::Cronscript;
35 use OpenSRF::Transport::PeerHandle;
36 require 'oils_header.pl';
37 use vars qw/$apputils/;
38
39 my $vl_ses;
40
41 my $debug = 0;
42
43 my %defaults = (
44     'buffsize=i'    => 4096,
45     'merge-profile=i' => 0,
46     'source=i'      => 1,
47 #    'osrf-config=s' => '/openils/conf/opensrf_core.xml',
48     'user=s'        => 'admin',
49     'password=s'    => '',
50     'tempdir=s'     => '',
51     'spoolfile=s'   => '',
52     'nolockfile'    => 1,
53     'queue=i'       => 1,
54     'noqueue'       => 0,
55     'nodaemon'      => 0,
56     'wait=i'        => 5,
57     'import-by-queue' => 0,
58     'auto-overlay-exact' => 0,
59     'auto-overlay-1match' => 0,
60     'auto-overlay-best-match' => 0
61 );
62
63 $OpenILS::Utils::Cronscript::debug=1 if $debug;
64 $Getopt::Long::debug=1 if $debug > 1;
65 my $o = OpenILS::Utils::Cronscript->new(\%defaults);
66
67 my @script_args = ();
68
69 if (grep {$_ eq '--'} @ARGV) {
70     print "Splitting options into groups\n" if $debug;
71     while (@ARGV) {
72         $_ = shift @ARGV;
73         $_ eq '--' and last;    # stop at the first --
74         push @script_args, $_;
75     }
76 } else {
77     @script_args = @ARGV;
78     @ARGV = ();
79 }
80
81 print "Calling MyGetOptions ",
82     (@script_args ? "with options: " . join(' ', @script_args) : 'without options from command line'),
83     "\n" if $debug;
84
85 my $real_opts = $o->MyGetOptions(\@script_args);
86 $o->bootstrap;
87 # GetOptionsFromArray(\@script_args, \%defaults, %defaults); # similar to
88
89 $real_opts->{tempdir} ||= tempdir_setting();    # This doesn't go in defaults because it reads config, must come after bootstrap
90
91 my $bufsize       = $real_opts->{buffsize};
92 my $bib_source    = $real_opts->{source};
93 my $osrf_config   = $real_opts->{'osrf-config'};
94 my $oils_username = $real_opts->{user};
95 my $oils_password = $real_opts->{password};
96 my $help          = $real_opts->{help};
97 my $merge_profile = $real_opts->{'merge-profile'};
98 my $queue_id      = $real_opts->{queue};
99 my $tempdir       = $real_opts->{tempdir};
100 my $import_by_queue  = $real_opts->{'import-by-queue'};
101    $debug        += $real_opts->{debug};
102
103 foreach (keys %$real_opts) {
104     print("real_opt->{$_} = ", $real_opts->{$_}, "\n") if $real_opts->{debug} or $debug;
105 }
106 my $wait_time     = $real_opts->{wait};
107 my $authtoken     = '';
108
109 # DEFAULTS for Net::Server
110 my $filename   = fileparse($0, '.pl');
111 my $conf_file  = (-r "$filename.conf") ? "$filename.conf" : undef;
112 # $conf_file is the Net::Server config for THIS script (not EG), if it exists and is readable
113
114
115 # FEEDBACK
116
117 pod2usage(1) if $help;
118 unless ($oils_password) {
119     print STDERR "\nERROR: password option required for session login\n\n";
120     # pod2usage(1);
121 }
122
123 print Dumper($o) if $debug;
124
125 if ($debug) {
126     foreach my $ref (qw/bufsize bib_source osrf_config oils_username oils_password help conf_file debug/) {
127         no strict 'refs';
128         printf "%16s => %s\n", $ref, (eval("\$$ref") || '');
129     }
130 }
131
132 print warning();
133 print Dumper($real_opts);
134
135 # SUBS
136
137 sub tempdir_setting {
138     my $ret = $apputils->simplereq( qw# opensrf.settings opensrf.settings.xpath.get
139         /opensrf/default/apps/open-ils.vandelay/app_settings/databases/importer # );
140     return $ret->[0] || '/tmp';
141 }
142
143 sub warning {
144     return <<WARNING;
145
146 WARNING:  This script provides no security layer.  Any client that has 
147 access to the server+port can inject MARC records into the system.  
148
149 WARNING
150 }
151
152 sub xml_import {
153     return $apputils->simplereq(
154         'open-ils.cat', 
155         'open-ils.cat.biblio.record.xml.import',
156         @_
157     );
158 }
159
160 sub old_process_batch_data {
161     my $data = shift or $logger->error("process_batch_data called without any data");
162     my $isfile = shift;
163     $data or return;
164
165     my $handle;
166     if ($isfile) {
167         $handle = $data;
168     } else {
169         open $handle, '<', \$data; 
170     }
171
172     my $batch = MARC::Batch->new('USMARC', $handle);
173     $batch->strict_off;
174
175     my $index = 0;
176     my $imported = 0;
177     my $failed = 0;
178
179     while (1) {
180         my $rec;
181         $index++;
182
183         eval { $rec = $batch->next; };
184
185         if ($@) {
186             $logger->error("Failed parsing MARC record $index");
187             $failed++;
188             next;
189         }
190         last unless $rec;   # The only way out
191
192         my $resp = xml_import($authtoken, $rec->as_xml_record, $bib_source);
193
194         # has the session timed out?
195         if (oils_event_equals($resp, 'NO_SESSION')) {
196             new_auth_token();
197             $resp = xml_import($authtoken, $rec->as_xml_record, $bib_source);   # try again w/ new token
198         }
199         oils_event_die($resp);
200         $imported++;
201     }
202
203     return ($imported, $failed);
204 }
205
206 sub process_spool { # filename
207
208     my $marcfile = shift;
209     my @rec_ids;
210
211     if($import_by_queue) {
212
213         # don't collect the record IDs, just spool the queue
214
215         $apputils->simplereq(
216             'open-ils.vandelay', 
217             'open-ils.vandelay.bib.process_spool', 
218             $authtoken, 
219             undef, 
220             $queue_id, 
221             'import', 
222             $marcfile,
223             $bib_source 
224         );
225
226     } else {
227
228         # collect the newly queued record IDs for processing
229
230         my $req = $vl_ses->request(
231             'open-ils.vandelay.bib.process_spool.stream_results',
232             $authtoken, 
233             undef, # cache key not needed
234             $queue_id, 
235             'import', 
236             $marcfile, 
237             $bib_source 
238         );
239     
240         while(my $resp = $req->recv) {
241
242             if($req->failed) {
243                 $logger->error("Error spooling MARC data: $resp");
244
245             } elsif($resp->content) {
246                 push(@rec_ids, $resp->content);
247             }
248         }
249     }
250
251     return \@rec_ids;
252 }
253
254 sub bib_queue_import {
255     my $rec_ids = shift;
256     my $extra = {import_no_match => 1};
257     $extra->{merge_profile} = $merge_profile if $merge_profile;
258     $extra->{auto_overlay_1match} = 1 if $real_opts->{'auto-overlay-1match'};
259     $extra->{auto_overlay_best_match} = 1 if $real_opts->{'auto-overlay-best-match'};
260
261     # default to exact match if no strategy is chosen
262     $extra->{auto_overlay_exact} = 1 
263         if $real_opts->{'auto-overlay-exact'} or
264         not ($extra->{auto_overlay_1match} or $extra->{auto_overlay_best_match});
265
266     my $req;
267     my @cleanup_recs;
268
269     if($import_by_queue) {
270         # import by queue
271
272         $req = $vl_ses->request(
273             'open-ils.vandelay.bib_queue.import', 
274             $authtoken, 
275             $queue_id, 
276             $extra 
277         );
278
279     } else {
280         # import explicit record IDs
281
282         $req = $vl_ses->request(
283             'open-ils.vandelay.bib_record.list.import', 
284             $authtoken, 
285             $rec_ids, 
286             $extra 
287         );
288     }
289
290     # collect the successfully imported vandelay records
291     my $failed = 0;
292     while(my $resp = $req->recv) {
293          if($req->failed) {
294             $logger->error("Error importing MARC data: $resp");
295
296         } elsif(my $data = $resp->content) {
297
298             if($data->{err_event}) {
299
300                 $logger->error(Dumper($data->{err_event}));
301                 $failed++;
302
303             } else {
304                 push(@cleanup_recs, $data->{imported}) if $data->{imported};
305             }
306         }
307     }
308
309     # clean up the successfully imported vandelay records to prevent queue bloat
310     my $pcrud = OpenSRF::AppSession->create('open-ils.pcrud');
311     $pcrud->connect;
312     $pcrud->request('open-ils.pcrud.transaction.begin', $authtoken)->recv;
313     my $err;
314
315     foreach (@cleanup_recs) {
316
317         try { 
318
319             $pcrud->request('open-ils.pcrud.delete.vqbr', $authtoken, $_)->recv;
320
321         } catch Error with {
322             $err = shift;
323             $logger->error("Error deleteing queued bib record $_: $err");
324         };
325     }
326
327     $pcrud->request('open-ils.pcrud.transaction.commit', $authtoken)->recv unless $err;
328     $pcrud->disconnect;
329
330     $logger->info("imported queued vandelay records: @cleanup_recs");
331     return (scalar(@cleanup_recs), $failed);
332 }
333
334 sub process_batch_data {
335     my $data = shift or $logger->error("process_batch_data called without any data");
336     my $isfile = shift;
337     $data or return;
338
339     $vl_ses = OpenSRF::AppSession->create('open-ils.vandelay');
340
341     my ($handle, $tempfile);
342     if (!$isfile) {
343         ($handle, $tempfile) = File::Temp->tempfile("$0_XXXX", DIR => $tempdir) or die "Cannot write tempfile in $tempdir";
344         print $handle $data;
345         close $handle;
346     } else {
347         $tempfile = $data;
348     }
349        
350     $logger->info("Calling process_spool on tempfile $tempfile (queue: $queue_id; source: $bib_source)");
351     my $rec_ids = process_spool($tempfile);
352
353     if (oils_event_equals($rec_ids, 'NO_SESSION')) {  # has the session timed out?
354         new_auth_token();
355         $rec_ids = process_spool($tempfile);                # try again w/ new token
356     }
357
358     my ($imported, $failed) = bib_queue_import($rec_ids);
359
360     if (oils_event_equals($imported, 'NO_SESSION')) {  # has the session timed out?
361         new_auth_token();
362         ($imported, $failed) = bib_queue_import();                # try again w/ new token
363     }
364
365     oils_event_die($imported);
366
367     return ($imported, $failed);
368 }
369
370 sub process_request {   # The core Net::Server method
371     my $self = shift;
372     my $client = $self->{server}->{client};
373
374     my $sockname = getpeername($client);
375     my ($port, $ip_addr) = unpack_sockaddr_in($sockname);
376     $logger->info("stream parser received contact from ".inet_ntoa($ip_addr));
377
378     my $ph = OpenSRF::Transport::PeerHandle->retrieve;
379     if(!$ph->flush_socket()) {
380         $logger->error("We received a request, bu we are no longer connected to opensrf.  ".
381             "Exiting and dropping request from $client");
382         exit;
383     }
384
385     my $data = '';
386     eval {
387         local $SIG{ALRM} = sub { die "alarm\n" };
388         alarm $wait_time; # prevent accidental tie ups of backend processes
389         local $/ = "\x1D"; # MARC record separator
390         $data = <STDIN>;
391         alarm 0;
392     };
393
394     if($@) {
395         $logger->error("reading from STDIN failed or timed out: $@");
396         return;
397     } 
398
399     $logger->info("stream parser read " . length($data) . " bytes");
400
401     my ($imported, $failed) = (0, 0);
402
403     new_auth_token(); # login
404
405     if ($real_opts->{noqueue}) {
406         ($imported, $failed) = old_process_batch_data($data);
407     } else {
408         ($imported, $failed) = process_batch_data($data);
409     }
410
411     my $profile = (!$merge_profile) ? '' :
412         $apputils->simplereq(
413             'open-ils.pcrud', 
414             'open-ils.pcrud.retrieve.vmp', 
415             $authtoken, 
416             $merge_profile)->name;
417
418     my $msg = '';
419     $msg .= "Successfully imported $imported records using merge profile '$profile'\n" if $imported;
420     $msg .= "Failed to import $failed records\n" if $failed;
421     $msg .= "\x00";
422     print $client $msg;
423
424     clear_auth_token(); # logout
425 }
426
427 sub standalone_process_request {   # The command line version
428     my $file = shift;
429     
430     $logger->info("stream parser received file processing request for $file");
431
432     my $ph = OpenSRF::Transport::PeerHandle->retrieve;
433     if(!$ph->flush_socket()) {
434         $logger->error("We received a request, bu we are no longer connected to opensrf.  ".
435             "Exiting and dropping request for $file");
436         exit;
437     }
438
439     my ($imported, $failed) = (0, 0);
440
441     new_auth_token(); # login
442
443     if ($real_opts->{noqueue}) {
444         ($imported, $failed) = old_process_batch_data($file, 1);
445     } else {
446         ($imported, $failed) = process_batch_data($file, 1);
447     }
448
449     my $profile = (!$merge_profile) ? '' :
450         $apputils->simplereq(
451             'open-ils.pcrud', 
452             'open-ils.pcrud.retrieve.vmp', 
453             $authtoken, 
454             $merge_profile)->name;
455
456     my $msg = '';
457     $msg .= "Successfully imported $imported records using merge profile '$profile'\n" if $imported;
458     $msg .= "Failed to import $failed records\n" if $failed;
459     $msg .= "\x00";
460     print $msg;
461
462     clear_auth_token(); # logout
463 }
464
465
466 # the authtoken will timeout after the configured inactivity period.
467 # When that happens, get a new one.
468 sub new_auth_token {
469     $authtoken = oils_login($oils_username, $oils_password, 'staff') 
470         or die "Unable to login to Evergreen as user $oils_username";
471     return $authtoken;
472 }
473
474 sub clear_auth_token {
475     $apputils->simplereq(
476         'open-ils.auth',
477         'open-ils.auth.session.delete',
478         $authtoken
479     );
480 }
481
482 ##### MAIN ######
483
484 osrf_connect($osrf_config);
485 if ($real_opts->{nodaemon}) {
486     if (!$real_opts->{spoolfile}) {
487         print " --nodaemon mode requested, but no --spoolfile supplied!\n";
488         exit;
489     }
490     standalone_process_request($real_opts->{spoolfile});
491 } else {
492     print "Calling Net::Server run ", (@ARGV ? "with command-line options: " . join(' ', @ARGV) : ''), "\n";
493     __PACKAGE__->run(conf_file => $conf_file);
494 }
495
496 __END__
497
498 =head1 NAME
499
500 marc_stream_importer.pl - Import MARC records via bare socket connection.
501
502 =head1 SYNOPSIS
503
504 ./marc_stream_importer.pl [common opts ...] [script opts ...] -- [Net::Server opts ...] &
505
506 This script uses the EG common options from B<Cronscript>.  See --help output for those.
507
508 Run C<perldoc marc_stream_importer.pl> for full documentation.
509
510 Note the extra C<--> to separate options for the script wrapper from options for the
511 underlying L<Net::Server> options.  
512
513 Note: this script has to be run in the same directory as B<oils_header.pl>.
514
515 Typical server-style execution will include a trailing C<&> to run in the background.
516
517 =head1 DESCRIPTION
518
519 This script is a L<Net::Server::PreFork> instance for shoving records into Evergreen from a remote system.
520
521 =head1 OPTIONS
522
523 The only required option is --password
524
525  --password         =<eg_password>
526  --user             =<eg_username>  default: admin
527  --source           =<bib_source>   default: 1         Integer
528  --merge-profile    =<i>            default: 0
529  --tempdir          =</temp/dir/>   default: from L<opensrf.conf> <open-ils.vandelay/app_settings/databases/importer>
530  --source           =<i>            default: 1
531  --import-by-queue  =<i>            default: 0
532  --spoolfile        =<import_file>  default: NONE      File to import in --nodaemon mode
533  --nodaemon                         default: OFF       When used with --spoolfile, turns off Net::Server mode and runs this utility in the foreground
534
535
536 =head2 Old style: --noqueue and associated options
537
538 To bypass vandelay queue processing and push directly into the database (as the old style)
539
540  --noqueue         default: OFF
541  --buffsize =<i>   default: 4096    Buffer size.  Only used by --noqueue
542  --wait     =<i>   default: 5       Seconds to read socket before processing.  Only used by --noqueue
543
544 =head2 Net::Server Options
545
546 By default, the script will use the Net::Server configuration file B<marc_stream_importer.conf>.  You can 
547 override this by passing a filepath with the --conf_file option.
548
549 Other Net::Server options include: --port=<port> --min_servers=<X> --max_servers=<Y> and --log_file=[path/to/file]
550
551 See L<Net::Server> for a complete list.
552
553 =head2 Configuration
554
555 =head3 OCLC Connexion
556
557 To use this script with OCLC Connexion, configure the client as follows:
558
559 Under Tools -> Options -> Export (tab)
560    Create -> Choose Connection -> OK -> Leave translation at "None" 
561        -> Create -> Create -> choose TCP/IP (internet) 
562        -> Enter hostname and Port, leave 'Use Telnet Protocol' checked 
563        -> Create/OK your way out of the dialogs
564    Record Characteristics (button) -> Choose 'UTF-8 Unicode' for the Character Set
565    
566
567 OCLC and Connexion are trademark/service marks of OCLC Online Computer Library Center, Inc.
568
569 =head1 CAVEATS
570
571 WARNING: This script provides no inherent security layer.  Any client that has 
572 access to the server+port can inject MARC records into the system.  
573 Use the available options (like allow/deny) in the Net::Server config file 
574 or via the command line to restrict access as necessary.
575
576 =head1 EXAMPLES
577
578 ./marc_stream_importer.pl  \
579     admin open-ils connexion --port 5555 --min_servers 2 \
580     --max_servers=20 --log_file=/openils/var/log/marc_net_importer.log &
581
582 ./marc_stream_importer.pl  \
583     admin open-ils connexion --port 5555 --min_servers 2 \
584     --max_servers=20 --log_file=/openils/var/log/marc_net_importer.log &
585
586 =head1 SEE ALSO
587
588 L<Net::Server::PreFork>, L<marc_stream_importer.conf>
589
590 =head1 AUTHORS
591
592     Bill Erickson <erickson@esilibrary.com>
593     Joe Atzberger <jatzberger@esilibrary.com>
594     Mike Rylander <miker@esilibrary.com> (nodaemon+spoolfile mode)
595
596 =cut