]> git.evergreen-ils.org Git - Evergreen.git/blob - Open-ILS/src/support-scripts/marc_stream_importer.pl
Merge branch 'master' of git+ssh://yeti.esilibrary.com/home/evergreen/evergreen-equin...
[Evergreen.git] / Open-ILS / src / support-scripts / marc_stream_importer.pl
1 #!/usr/bin/perl
2 # Copyright (C) 2008-2010 Equinox Software, Inc.
3 # Author: Bill Erickson <erickson@esilibrary.com>
4 #
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU General Public License
7 # as published by the Free Software Foundation; either version 2
8 # of the License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14
15
16 use strict; use warnings;
17 use Net::Server::PreFork;
18 use base qw/Net::Server::PreFork/;
19 use MARC::Record;
20 use MARC::Batch;
21 use MARC::File::XML ( BinaryEncoding => 'UTF-8' );
22 use MARC::File::USMARC;
23
24 use Data::Dumper;
25 use File::Basename qw/fileparse/;
26 use File::Temp;
27 use Getopt::Long qw(:DEFAULT GetOptionsFromArray);
28 use Pod::Usage;
29 use Socket;
30
31 use OpenSRF::Utils::Logger qw/$logger/;
32 use OpenSRF::AppSession;
33 use OpenSRF::EX qw/:try/;
34 use OpenILS::Utils::Cronscript;
35 use OpenSRF::Transport::PeerHandle;
36 require 'oils_header.pl';
37 use vars qw/$apputils/;
38
39 my $vl_ses;
40
41 my $debug = 0;
42
43 my %defaults = (
44     'buffsize=i'    => 4096,
45     'merge-profile=i' => 0,
46     'source=i'      => 1,
47 #    'osrf-config=s' => '/openils/conf/opensrf_core.xml',
48     'user=s'        => 'admin',
49     'password=s'    => '',
50     'tempdir=s'     => '',
51     'nolockfile'    => 1,
52     'queue=i'       => 1,
53     'noqueue'       => 0,
54     'wait=i'        => 5,
55     'import-by-queue' => 0
56 );
57
58 $OpenILS::Utils::Cronscript::debug=1 if $debug;
59 $Getopt::Long::debug=1 if $debug > 1;
60 my $o = OpenILS::Utils::Cronscript->new(\%defaults);
61
62 my @script_args = ();
63
64 if (grep {$_ eq '--'} @ARGV) {
65     print "Splitting options into groups\n" if $debug;
66     while (@ARGV) {
67         $_ = shift @ARGV;
68         $_ eq '--' and last;    # stop at the first --
69         push @script_args, $_;
70     }
71 } else {
72     @script_args = @ARGV;
73     @ARGV = ();
74 }
75
76 print "Calling MyGetOptions ",
77     (@script_args ? "with options: " . join(' ', @script_args) : 'without options from command line'),
78     "\n" if $debug;
79
80 my $real_opts = $o->MyGetOptions(\@script_args);
81 $o->bootstrap;
82 # GetOptionsFromArray(\@script_args, \%defaults, %defaults); # similar to
83
84 $real_opts->{tempdir} ||= tempdir_setting();    # This doesn't go in defaults because it reads config, must come after bootstrap
85
86 my $bufsize       = $real_opts->{buffsize};
87 my $bib_source    = $real_opts->{source};
88 my $osrf_config   = $real_opts->{'osrf-config'};
89 my $oils_username = $real_opts->{user};
90 my $oils_password = $real_opts->{password};
91 my $help          = $real_opts->{help};
92 my $merge_profile = $real_opts->{'merge-profile'};
93 my $queue_id      = $real_opts->{queue};
94 my $tempdir       = $real_opts->{tempdir};
95 my $import_by_queue  = $real_opts->{'import-by-queue'};
96    $debug        += $real_opts->{debug};
97
98 foreach (keys %$real_opts) {
99     print("real_opt->{$_} = ", $real_opts->{$_}, "\n") if $real_opts->{debug} or $debug;
100 }
101 my $wait_time     = $real_opts->{wait};
102 my $authtoken     = '';
103
104 # DEFAULTS for Net::Server
105 my $filename   = fileparse($0, '.pl');
106 my $conf_file  = (-r "$filename.conf") ? "$filename.conf" : undef;
107 # $conf_file is the Net::Server config for THIS script (not EG), if it exists and is readable
108
109
110 # FEEDBACK
111
112 pod2usage(1) if $help;
113 unless ($oils_password) {
114     print STDERR "\nERROR: password option required for session login\n\n";
115     # pod2usage(1);
116 }
117
118 print Dumper($o) if $debug;
119
120 if ($debug) {
121     foreach my $ref (qw/bufsize bib_source osrf_config oils_username oils_password help conf_file debug/) {
122         no strict 'refs';
123         printf "%16s => %s\n", $ref, (eval("\$$ref") || '');
124     }
125 }
126
127 print warning();
128 print Dumper($real_opts);
129
130 # SUBS
131
132 sub tempdir_setting {
133     my $ret = $apputils->simplereq( qw# opensrf.settings opensrf.settings.xpath.get
134         /opensrf/default/apps/open-ils.vandelay/app_settings/databases/importer # );
135     return $ret->[0] || '/tmp';
136 }
137
138 sub warning {
139     return <<WARNING;
140
141 WARNING:  This script provides no security layer.  Any client that has 
142 access to the server+port can inject MARC records into the system.  
143
144 WARNING
145 }
146
147 sub xml_import {
148     return $apputils->simplereq(
149         'open-ils.cat', 
150         'open-ils.cat.biblio.record.xml.import',
151         @_
152     );
153 }
154
155 sub old_process_batch_data {
156     my $data = shift or $logger->error("process_batch_data called without any data");
157     $data or return;
158
159     my $handle;
160     open $handle, '<', \$data; 
161     my $batch = MARC::Batch->new('USMARC', $handle);
162     $batch->strict_off;
163
164     my $index = 0;
165     my $imported = 0;
166     my $failed = 0;
167
168     while (1) {
169         my $rec;
170         $index++;
171
172         eval { $rec = $batch->next; };
173
174         if ($@) {
175             $logger->error("Failed parsing MARC record $index");
176             $failed++;
177             next;
178         }
179         last unless $rec;   # The only way out
180
181         my $resp = xml_import($authtoken, $rec->as_xml_record, $bib_source);
182
183         # has the session timed out?
184         if (oils_event_equals($resp, 'NO_SESSION')) {
185             new_auth_token();
186             $resp = xml_import($authtoken, $rec->as_xml_record, $bib_source);   # try again w/ new token
187         }
188         oils_event_die($resp);
189         $imported++;
190     }
191
192     return ($imported, $failed);
193 }
194
195 sub process_spool { # filename
196
197     my $marcfile = shift;
198     my @rec_ids;
199
200     if($import_by_queue) {
201
202         # don't collect the record IDs, just spool the queue
203
204         $apputils->simplereq(
205             'open-ils.vandelay', 
206             'open-ils.vandelay.bib.process_spool', 
207             $authtoken, 
208             undef, 
209             $queue_id, 
210             'import', 
211             $marcfile,
212             $bib_source 
213         );
214
215     } else {
216
217         # collect the newly queued record IDs for processing
218
219         my $req = $vl_ses->request(
220             'open-ils.vandelay.bib.process_spool.stream_results',
221             $authtoken, 
222             undef, # cache key not needed
223             $queue_id, 
224             'import', 
225             $marcfile, 
226             $bib_source 
227         );
228     
229         while(my $resp = $req->recv) {
230
231             if($req->failed) {
232                 $logger->error("Error spooling MARC data: $resp");
233
234             } elsif($resp->content) {
235                 push(@rec_ids, $resp->content);
236             }
237         }
238     }
239
240     return \@rec_ids;
241 }
242
243 sub bib_queue_import {
244     my $rec_ids = shift;
245     my $extra = {auto_overlay_exact => 1};
246     $extra->{merge_profile} = $merge_profile if $merge_profile;
247
248     my $req;
249     my @cleanup_recs;
250
251     if($import_by_queue) {
252         # import by queue
253
254         $req = $vl_ses->request(
255             'open-ils.vandelay.bib_queue.import', 
256             $authtoken, 
257             $queue_id, 
258             $extra 
259         );
260
261     } else {
262         # import explicit record IDs
263
264         $req = $vl_ses->request(
265             'open-ils.vandelay.bib_record.list.import', 
266             $authtoken, 
267             $rec_ids, 
268             $extra 
269         );
270     }
271
272     # collect the successfully imported vandelay records
273     my $failed = 0;
274     while(my $resp = $req->recv) {
275          if($req->failed) {
276             $logger->error("Error importing MARC data: $resp");
277
278         } elsif(my $data = $resp->content) {
279
280             if($data->{err_event}) {
281
282                 $logger->error(Dumper($data->{err_event}));
283                 $failed++;
284
285             } else {
286                 push(@cleanup_recs, $data->{imported}) if $data->{imported};
287             }
288         }
289     }
290
291     # clean up the successfully imported vandelay records to prevent queue bloat
292     my $pcrud = OpenSRF::AppSession->create('open-ils.pcrud');
293     $pcrud->connect;
294     $pcrud->request('open-ils.pcrud.transaction.begin', $authtoken)->recv;
295     my $err;
296
297     foreach (@cleanup_recs) {
298
299         try { 
300
301             $pcrud->request('open-ils.pcrud.delete.vqbr', $authtoken, $_)->recv;
302
303         } catch Error with {
304             $err = shift;
305             $logger->error("Error deleteing queued bib record $_: $err");
306         };
307     }
308
309     $pcrud->request('open-ils.pcrud.transaction.commit', $authtoken)->recv unless $err;
310     $pcrud->disconnect;
311
312     $logger->info("imported queued vandelay records: @cleanup_recs");
313     return (scalar(@cleanup_recs), $failed);
314 }
315
316 sub process_batch_data {
317     my $data = shift or $logger->error("process_batch_data called without any data");
318     $data or return;
319
320     $vl_ses = OpenSRF::AppSession->create('open-ils.vandelay');
321
322     my ($handle, $tempfile) = File::Temp->tempfile("$0_XXXX", DIR => $tempdir) or die "Cannot write tempfile in $tempdir";
323     print $handle $data;
324     close $handle;
325        
326     $logger->info("Calling process_spool on tempfile $tempfile (queue: $queue_id; source: $bib_source)");
327     my $rec_ids = process_spool($tempfile);
328
329     if (oils_event_equals($rec_ids, 'NO_SESSION')) {  # has the session timed out?
330         new_auth_token();
331         $rec_ids = process_spool($tempfile);                # try again w/ new token
332     }
333
334     my ($imported, $failed) = bib_queue_import($rec_ids);
335
336     if (oils_event_equals($imported, 'NO_SESSION')) {  # has the session timed out?
337         new_auth_token();
338         ($imported, $failed) = bib_queue_import();                # try again w/ new token
339     }
340
341     oils_event_die($imported);
342
343     return ($imported, $failed);
344 }
345
346 sub process_request {   # The core Net::Server method
347     my $self = shift;
348     my $client = $self->{server}->{client};
349
350     my $sockname = getpeername($client);
351     my ($port, $ip_addr) = unpack_sockaddr_in($sockname);
352     $logger->info("stream parser received contact from ".inet_ntoa($ip_addr));
353
354     my $ph = OpenSRF::Transport::PeerHandle->retrieve;
355     if(!$ph->flush_socket()) {
356         $logger->error("We received a request, bu we are no longer connected to opensrf.  ".
357             "Exiting and dropping request from $client");
358         exit;
359     }
360
361     my $data;
362     eval {
363         local $SIG{ALRM} = sub { die "alarm\n" };
364         alarm $wait_time; # prevent accidental tie ups of backend processes
365         local $/ = "\x1D"; # MARC record separator
366         $data = <STDIN>;
367         alarm 0;
368     };
369
370     if($@) {
371         $logger->error("reading from STDIN failed or timed out: $@");
372         return;
373     } 
374
375     $logger->info("stream parser read " . length($data) . " bytes");
376
377     my ($imported, $failed) = (0, 0);
378
379     new_auth_token(); # login
380
381     if ($real_opts->{noqueue}) {
382         ($imported, $failed) = old_process_batch_data($data);
383     } else {
384         ($imported, $failed) = process_batch_data($data);
385     }
386
387     my $profile = (!$merge_profile) ? '' :
388         $apputils->simplereq(
389             'open-ils.pcrud', 
390             'open-ils.pcrud.retrieve.vmp', 
391             $authtoken, 
392             $merge_profile)->name;
393
394     my $msg = '';
395     $msg .= "Successfully imported $imported records using merge profile '$profile'\n" if $imported;
396     $msg .= "Failed to import $failed records\n" if $failed;
397     $msg .= "\x00";
398     print $client $msg;
399
400     clear_auth_token(); # logout
401 }
402
403
404 # the authtoken will timeout after the configured inactivity period.
405 # When that happens, get a new one.
406 sub new_auth_token {
407     $authtoken = oils_login($oils_username, $oils_password, 'staff') 
408         or die "Unable to login to Evergreen as user $oils_username";
409     return $authtoken;
410 }
411
412 sub clear_auth_token {
413     $apputils->simplereq(
414         'open-ils.auth',
415         'open-ils.auth.session.delete',
416         $authtoken
417     );
418 }
419
420 ##### MAIN ######
421
422 osrf_connect($osrf_config);
423 print "Calling Net::Server run ", (@ARGV ? "with command-line options: " . join(' ', @ARGV) : ''), "\n";
424 __PACKAGE__->run(conf_file => $conf_file);
425
426 __END__
427
428 =head1 NAME
429
430 marc_stream_importer.pl - Import MARC records via bare socket connection.
431
432 =head1 SYNOPSIS
433
434 ./marc_stream_importer.pl [common opts ...] [script opts ...] -- [Net::Server opts ...] &
435
436 This script uses the EG common options from B<Cronscript>.  See --help output for those.
437
438 Run C<perldoc marc_stream_importer.pl> for full documentation.
439
440 Note the extra C<--> to separate options for the script wrapper from options for the
441 underlying L<Net::Server> options.  
442
443 Note: this script has to be run in the same directory as B<oils_header.pl>.
444
445 Typical execution will include a trailing C<&> to run in the background.
446
447 =head1 DESCRIPTION
448
449 This script is a L<Net::Server::PreFork> instance for shoving records into Evergreen from a remote system.
450
451 =head1 OPTIONS
452
453 The only required option is --password
454
455  --password         =<eg_password>
456  --user             =<eg_username>  default: admin
457  --source           =<bib_source>   default: 1         Integer
458  --merge-profile    =<i>            default: 0
459  --tempdir          =</temp/dir/>   default: from L<opensrf.conf> <open-ils.vandelay/app_settings/databases/importer>
460  --source           =<i>            default: 1
461  --import-by-queue  =<i>            default: 0
462
463
464 =head2 Old style: --noqueue and associated options
465
466 To bypass vandelay queue processing and push directly into the database (as the old style)
467
468  --noqueue         default: OFF
469  --buffsize =<i>   default: 4096    Buffer size.  Only used by --noqueue
470  --wait     =<i>   default: 5       Seconds to read socket before processing.  Only used by --noqueue
471
472 =head2 Net::Server Options
473
474 By default, the script will use the Net::Server configuration file B<marc_stream_importer.conf>.  You can 
475 override this by passing a filepath with the --conf_file option.
476
477 Other Net::Server options include: --port=<port> --min_servers=<X> --max_servers=<Y> and --log_file=[path/to/file]
478
479 See L<Net::Server> for a complete list.
480
481 =head2 Configuration
482
483 =head3 OCLC Connexion
484
485 To use this script with OCLC Connexion, configure the client as follows:
486
487 Under Tools -> Options -> Export (tab)
488    Create -> Choose Connection -> OK -> Leave translation at "None" 
489        -> Create -> Create -> choose TCP/IP (internet) 
490        -> Enter hostname and Port, leave 'Use Telnet Protocol' checked 
491        -> Create/OK your way out of the dialogs
492    Record Characteristics (button) -> Choose 'UTF-8 Unicode' for the Character Set
493    
494
495 OCLC and Connexion are trademark/service marks of OCLC Online Computer Library Center, Inc.
496
497 =head1 CAVEATS
498
499 WARNING: This script provides no inherent security layer.  Any client that has 
500 access to the server+port can inject MARC records into the system.  
501 Use the available options (like allow/deny) in the Net::Server config file 
502 or via the command line to restrict access as necessary.
503
504 =head1 EXAMPLES
505
506 ./marc_stream_importer.pl  \
507     admin open-ils connexion --port 5555 --min_servers 2 \
508     --max_servers=20 --log_file=/openils/var/log/marc_net_importer.log &
509
510 =head1 SEE ALSO
511
512 L<Net::Server::PreFork>, L<marc_stream_importer.conf>
513
514 =head1 AUTHORS
515
516     Bill Erickson <erickson@esilibrary.com>
517     Joe Atzberger <jatzberger@esilibrary.com>
518
519 =cut