]> git.evergreen-ils.org Git - Evergreen.git/blob - Open-ILS/src/sql/Pg/012.schema.vandelay.sql
LP1820339: Vandelay Imports on Pg 10
[Evergreen.git] / Open-ILS / src / sql / Pg / 012.schema.vandelay.sql
1 DROP SCHEMA IF EXISTS vandelay CASCADE;
2
3 BEGIN;
4
5 CREATE SCHEMA vandelay;
6
7 CREATE TABLE vandelay.match_set (
8     id      SERIAL  PRIMARY KEY,
9     name    TEXT        NOT NULL,
10     owner   INT     NOT NULL REFERENCES actor.org_unit (id) ON DELETE CASCADE,
11     mtype   TEXT        NOT NULL DEFAULT 'biblio', -- 'biblio','authority','mfhd'?, others?
12     CONSTRAINT name_once_per_owner_mtype UNIQUE (name, owner, mtype)
13 );
14
15 -- Table to define match points, either FF via SVF or tag+subfield
16 CREATE TABLE vandelay.match_set_point (
17     id          SERIAL  PRIMARY KEY,
18     match_set   INT     REFERENCES vandelay.match_set (id) ON DELETE CASCADE,
19     parent      INT     REFERENCES vandelay.match_set_point (id),
20     bool_op     TEXT    CHECK (bool_op IS NULL OR (bool_op IN ('AND','OR','NOT'))),
21     svf         TEXT    REFERENCES config.record_attr_definition (name),
22     tag         TEXT,
23     subfield    TEXT,
24     negate      BOOL    DEFAULT FALSE,
25     quality     INT     NOT NULL DEFAULT 1, -- higher is better
26     heading     BOOLEAN NOT NULL DEFAULT FALSE, -- match on authority heading
27     CONSTRAINT vmsp_need_a_subfield_with_a_tag CHECK ((tag IS NOT NULL AND subfield IS NOT NULL) OR tag IS NULL),
28     CONSTRAINT vmsp_need_a_tag_or_a_ff_or_a_bo CHECK (
29         (tag IS NOT NULL AND svf IS NULL AND heading IS FALSE AND bool_op IS NULL) OR 
30         (tag IS NULL AND svf IS NOT NULL AND heading IS FALSE AND bool_op IS NULL) OR 
31         (tag IS NULL AND svf IS NULL AND heading IS TRUE AND bool_op IS NULL) OR 
32         (tag IS NULL AND svf IS NULL AND heading IS FALSE AND bool_op IS NOT NULL)
33     )
34 );
35
36 CREATE TABLE vandelay.match_set_quality (
37     id          SERIAL  PRIMARY KEY,
38     match_set   INT     NOT NULL REFERENCES vandelay.match_set (id) ON DELETE CASCADE,
39     svf         TEXT    REFERENCES config.record_attr_definition,
40     tag         TEXT,
41     subfield    TEXT,
42     value       TEXT    NOT NULL,
43     quality     INT     NOT NULL DEFAULT 1, -- higher is better
44     CONSTRAINT vmsq_need_a_subfield_with_a_tag CHECK ((tag IS NOT NULL AND subfield IS NOT NULL) OR tag IS NULL),
45     CONSTRAINT vmsq_need_a_tag_or_a_ff CHECK ((tag IS NOT NULL AND svf IS NULL) OR (tag IS NULL AND svf IS NOT NULL))
46 );
47 CREATE UNIQUE INDEX vmsq_def_once_per_set ON vandelay.match_set_quality (match_set, COALESCE(tag,''), COALESCE(subfield,''), COALESCE(svf,''), value);
48
49
50 CREATE TABLE vandelay.queue (
51         id                              BIGSERIAL       PRIMARY KEY,
52         owner                   INT                     NOT NULL REFERENCES actor.usr (id) DEFERRABLE INITIALLY DEFERRED,
53         name                    TEXT            NOT NULL,
54         complete                BOOL            NOT NULL DEFAULT FALSE,
55     match_set       INT         REFERENCES vandelay.match_set (id) ON UPDATE CASCADE ON DELETE SET NULL DEFERRABLE INITIALLY DEFERRED
56 );
57
58 CREATE TABLE vandelay.queued_record (
59     id                  BIGSERIAL                   PRIMARY KEY,
60     create_time TIMESTAMP WITH TIME ZONE    NOT NULL DEFAULT NOW(),
61     import_time TIMESTAMP WITH TIME ZONE,
62         purpose         TEXT                                            NOT NULL DEFAULT 'import' CHECK (purpose IN ('import','overlay')),
63     marc                TEXT                        NOT NULL,
64     quality     INT                         NOT NULL DEFAULT 0
65 );
66
67
68
69 /* Bib stuff at the top */
70 ----------------------------------------------------
71
72 CREATE TABLE vandelay.bib_attr_definition (
73         id                      SERIAL  PRIMARY KEY,
74         code            TEXT    UNIQUE NOT NULL,
75         description     TEXT,
76         xpath           TEXT    NOT NULL,
77         remove          TEXT    NOT NULL DEFAULT ''
78 );
79
80 -- Each TEXT field (other than 'name') should hold an XPath predicate for pulling the data needed
81 -- DROP TABLE vandelay.import_item_attr_definition CASCADE;
82 CREATE TABLE vandelay.import_item_attr_definition (
83     id              BIGSERIAL   PRIMARY KEY,
84     owner           INT         NOT NULL REFERENCES actor.org_unit (id) ON UPDATE CASCADE ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
85     name            TEXT        NOT NULL,
86     tag             TEXT        NOT NULL,
87     keep            BOOL        NOT NULL DEFAULT FALSE,
88     owning_lib      TEXT,
89     circ_lib        TEXT,
90     call_number     TEXT,
91     copy_number     TEXT,
92     status          TEXT,
93     location        TEXT,
94     circulate       TEXT,
95     deposit         TEXT,
96     deposit_amount  TEXT,
97     ref             TEXT,
98     holdable        TEXT,
99     price           TEXT,
100     barcode         TEXT,
101     circ_modifier   TEXT,
102     circ_as_type    TEXT,
103     alert_message   TEXT,
104     opac_visible    TEXT,
105     pub_note_title  TEXT,
106     pub_note        TEXT,
107     priv_note_title TEXT,
108     priv_note       TEXT,
109     internal_id     TEXT,
110     stat_cat_data   TEXT,
111     parts_data      TEXT,
112         CONSTRAINT vand_import_item_attr_def_idx UNIQUE (owner,name)
113 );
114
115 CREATE TABLE vandelay.import_error (
116     code        TEXT    PRIMARY KEY,
117     description TEXT    NOT NULL -- i18n
118 );
119
120 CREATE TYPE vandelay.bib_queue_queue_type AS ENUM ('bib', 'acq');
121
122 CREATE TABLE vandelay.bib_queue (
123         queue_type          vandelay.bib_queue_queue_type       NOT NULL DEFAULT 'bib',
124         item_attr_def   BIGINT REFERENCES vandelay.import_item_attr_definition (id) ON DELETE SET NULL DEFERRABLE INITIALLY DEFERRED,
125     match_bucket    INTEGER, -- REFERENCES container.biblio_record_entry_bucket(id);
126         CONSTRAINT vand_bib_queue_name_once_per_owner_const UNIQUE (owner,name,queue_type)
127 ) INHERITS (vandelay.queue);
128 ALTER TABLE vandelay.bib_queue ADD PRIMARY KEY (id);
129
130 CREATE TABLE vandelay.queued_bib_record (
131         queue               INT         NOT NULL REFERENCES vandelay.bib_queue (id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
132         bib_source          INT         REFERENCES config.bib_source (id) DEFERRABLE INITIALLY DEFERRED,
133         imported_as     BIGINT  REFERENCES biblio.record_entry (id) DEFERRABLE INITIALLY DEFERRED,
134         import_error    TEXT    REFERENCES vandelay.import_error (code) ON DELETE SET NULL ON UPDATE CASCADE DEFERRABLE INITIALLY DEFERRED,
135         error_detail    TEXT
136 ) INHERITS (vandelay.queued_record);
137 ALTER TABLE vandelay.queued_bib_record ADD PRIMARY KEY (id);
138 CREATE INDEX queued_bib_record_queue_idx ON vandelay.queued_bib_record (queue);
139
140 CREATE TABLE vandelay.queued_bib_record_attr (
141         id                      BIGSERIAL       PRIMARY KEY,
142         record          BIGINT          NOT NULL REFERENCES vandelay.queued_bib_record (id) DEFERRABLE INITIALLY DEFERRED,
143         field           INT                     NOT NULL REFERENCES vandelay.bib_attr_definition (id) DEFERRABLE INITIALLY DEFERRED,
144         attr_value      TEXT            NOT NULL
145 );
146 CREATE INDEX queued_bib_record_attr_record_idx ON vandelay.queued_bib_record_attr (record);
147
148 CREATE TABLE vandelay.bib_match (
149         id                              BIGSERIAL       PRIMARY KEY,
150         queued_record   BIGINT          REFERENCES vandelay.queued_bib_record (id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
151         eg_record               BIGINT          REFERENCES biblio.record_entry (id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
152     quality         INT         NOT NULL DEFAULT 1,
153     match_score     INT         NOT NULL DEFAULT 0
154 );
155 CREATE INDEX bib_match_queued_record_idx ON vandelay.bib_match (queued_record);
156
157 CREATE TABLE vandelay.import_item (
158     id              BIGSERIAL   PRIMARY KEY,
159     record          BIGINT      NOT NULL REFERENCES vandelay.queued_bib_record (id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
160     definition      BIGINT      NOT NULL REFERENCES vandelay.import_item_attr_definition (id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
161         import_error    TEXT        REFERENCES vandelay.import_error (code) ON DELETE SET NULL ON UPDATE CASCADE DEFERRABLE INITIALLY DEFERRED,
162         error_detail    TEXT,
163     imported_as     BIGINT,
164     import_time     TIMESTAMP WITH TIME ZONE,
165     owning_lib      INT,
166     circ_lib        INT,
167     call_number     TEXT,
168     copy_number     INT,
169     status          INT,
170     location        INT,
171     circulate       BOOL,
172     deposit         BOOL,
173     deposit_amount  NUMERIC(8,2),
174     ref             BOOL,
175     holdable        BOOL,
176     price           NUMERIC(8,2),
177     barcode         TEXT,
178     circ_modifier   TEXT,
179     circ_as_type    TEXT,
180     alert_message   TEXT,
181     pub_note        TEXT,
182     priv_note       TEXT,
183     stat_cat_data   TEXT,
184     parts_data      TEXT,
185     opac_visible    BOOL,
186     internal_id     BIGINT -- queue_type == 'acq' ? acq.lineitem_detail.id : asset.copy.id
187 );
188 CREATE INDEX import_item_record_idx ON vandelay.import_item (record);
189
190 CREATE TABLE vandelay.import_bib_trash_group(
191     id           SERIAL  PRIMARY KEY,
192     owner        INTEGER NOT NULL REFERENCES actor.org_unit(id),
193     label        TEXT    NOT NULL, --i18n
194     always_apply BOOLEAN NOT NULL DEFAULT FALSE,
195         CONSTRAINT vand_import_bib_trash_grp_owner_label UNIQUE (owner, label)
196 );
197  
198 CREATE TABLE vandelay.import_bib_trash_fields (
199     id         BIGSERIAL PRIMARY KEY,
200     grp        INTEGER   NOT NULL REFERENCES vandelay.import_bib_trash_group,
201     field      TEXT      NOT NULL,
202     CONSTRAINT vand_import_bib_trash_fields_once_per UNIQUE (grp, field)
203 );
204
205 CREATE TABLE vandelay.merge_profile (
206     id              BIGSERIAL   PRIMARY KEY,
207     owner           INT         NOT NULL REFERENCES actor.org_unit (id) ON UPDATE CASCADE ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
208     name            TEXT        NOT NULL,
209     add_spec        TEXT,
210     replace_spec    TEXT,
211     strip_spec      TEXT,
212     preserve_spec   TEXT,
213     update_bib_source BOOLEAN   NOT NULL DEFAULT FALSE,
214     lwm_ratio       NUMERIC,
215         CONSTRAINT vand_merge_prof_owner_name_idx UNIQUE (owner,name),
216         CONSTRAINT add_replace_strip_or_preserve CHECK ((preserve_spec IS NOT NULL OR replace_spec IS NOT NULL) OR (preserve_spec IS NULL AND replace_spec IS NULL))
217 );
218
219 CREATE OR REPLACE FUNCTION vandelay.marc21_record_type( marc TEXT ) RETURNS config.marc21_rec_type_map AS $func$
220 DECLARE
221         ldr         TEXT;
222         tval        TEXT;
223         tval_rec    RECORD;
224         bval        TEXT;
225         bval_rec    RECORD;
226     retval      config.marc21_rec_type_map%ROWTYPE;
227 BEGIN
228     ldr := oils_xpath_string( '//*[local-name()="leader"]', marc );
229
230     IF ldr IS NULL OR ldr = '' THEN
231         SELECT * INTO retval FROM config.marc21_rec_type_map WHERE code = 'BKS';
232         RETURN retval;
233     END IF;
234
235     SELECT * INTO tval_rec FROM config.marc21_ff_pos_map WHERE fixed_field = 'Type' LIMIT 1; -- They're all the same
236     SELECT * INTO bval_rec FROM config.marc21_ff_pos_map WHERE fixed_field = 'BLvl' LIMIT 1; -- They're all the same
237
238
239     tval := SUBSTRING( ldr, tval_rec.start_pos + 1, tval_rec.length );
240     bval := SUBSTRING( ldr, bval_rec.start_pos + 1, bval_rec.length );
241
242     -- RAISE NOTICE 'type %, blvl %, ldr %', tval, bval, ldr;
243
244     SELECT * INTO retval FROM config.marc21_rec_type_map WHERE type_val LIKE '%' || tval || '%' AND blvl_val LIKE '%' || bval || '%';
245
246
247     IF retval.code IS NULL THEN
248         SELECT * INTO retval FROM config.marc21_rec_type_map WHERE code = 'BKS';
249     END IF;
250
251     RETURN retval;
252 END;
253 $func$ LANGUAGE PLPGSQL;
254
255 CREATE TYPE biblio.record_ff_map AS (record BIGINT, ff_name TEXT, ff_value TEXT);
256 CREATE OR REPLACE FUNCTION vandelay.marc21_extract_all_fixed_fields( marc TEXT, use_default BOOL DEFAULT FALSE ) RETURNS SETOF biblio.record_ff_map AS $func$
257 DECLARE
258     tag_data    TEXT;
259     rtype       TEXT;
260     ff_pos      RECORD;
261     output      biblio.record_ff_map%ROWTYPE;
262 BEGIN
263     rtype := (vandelay.marc21_record_type( marc )).code;
264
265     FOR ff_pos IN SELECT * FROM config.marc21_ff_pos_map WHERE rec_type = rtype ORDER BY tag DESC LOOP
266         output.ff_name  := ff_pos.fixed_field;
267         output.ff_value := NULL;
268
269         IF ff_pos.tag = 'ldr' THEN
270             output.ff_value := oils_xpath_string('//*[local-name()="leader"]', marc);
271             IF output.ff_value IS NOT NULL THEN
272                 output.ff_value := SUBSTRING( output.ff_value, ff_pos.start_pos + 1, ff_pos.length );
273                 RETURN NEXT output;
274                 output.ff_value := NULL;
275             END IF;
276         ELSE
277             FOR tag_data IN SELECT value FROM UNNEST( oils_xpath( '//*[@tag="' || UPPER(ff_pos.tag) || '"]/text()', marc ) ) x(value) LOOP
278                 output.ff_value := SUBSTRING( tag_data, ff_pos.start_pos + 1, ff_pos.length );
279                 CONTINUE WHEN output.ff_value IS NULL AND NOT use_default;
280                 IF output.ff_value IS NULL THEN output.ff_value := REPEAT( ff_pos.default_val, ff_pos.length ); END IF;
281                 RETURN NEXT output;
282                 output.ff_value := NULL;
283             END LOOP;
284         END IF;
285
286     END LOOP;
287
288     RETURN;
289 END;
290 $func$ LANGUAGE PLPGSQL;
291
292 CREATE OR REPLACE FUNCTION vandelay.marc21_extract_fixed_field_list( marc TEXT, ff TEXT, use_default BOOL DEFAULT FALSE ) RETURNS TEXT[] AS $func$
293 DECLARE
294     rtype       TEXT;
295     ff_pos      RECORD;
296     tag_data    RECORD;
297     val         TEXT;
298     collection  TEXT[] := '{}'::TEXT[];
299 BEGIN
300     rtype := (vandelay.marc21_record_type( marc )).code;
301     FOR ff_pos IN SELECT * FROM config.marc21_ff_pos_map WHERE fixed_field = ff AND rec_type = rtype ORDER BY tag DESC LOOP
302         IF ff_pos.tag = 'ldr' THEN
303             val := oils_xpath_string('//*[local-name()="leader"]', marc);
304             IF val IS NOT NULL THEN
305                 val := SUBSTRING( val, ff_pos.start_pos + 1, ff_pos.length );
306                 collection := collection || val;
307             END IF;
308         ELSE
309             FOR tag_data IN SELECT value FROM UNNEST( oils_xpath( '//*[@tag="' || UPPER(ff_pos.tag) || '"]/text()', marc ) ) x(value) LOOP
310                 val := SUBSTRING( tag_data.value, ff_pos.start_pos + 1, ff_pos.length );
311                 collection := collection || val;
312             END LOOP;
313         END IF;
314         CONTINUE WHEN NOT use_default;
315         CONTINUE WHEN ARRAY_UPPER(collection, 1) > 0;
316         val := REPEAT( ff_pos.default_val, ff_pos.length );
317         collection := collection || val;
318     END LOOP;
319
320     RETURN collection;
321 END;
322 $func$ LANGUAGE PLPGSQL;
323
324 CREATE OR REPLACE FUNCTION vandelay.marc21_extract_fixed_field( marc TEXT, ff TEXT, use_default BOOL DEFAULT FALSE ) RETURNS TEXT AS $func$
325 DECLARE
326     rtype       TEXT;
327     ff_pos      RECORD;
328     tag_data    RECORD;
329     val         TEXT;
330 BEGIN
331     rtype := (vandelay.marc21_record_type( marc )).code;
332     FOR ff_pos IN SELECT * FROM config.marc21_ff_pos_map WHERE fixed_field = ff AND rec_type = rtype ORDER BY tag DESC LOOP
333         IF ff_pos.tag = 'ldr' THEN
334             val := oils_xpath_string('//*[local-name()="leader"]', marc);
335             IF val IS NOT NULL THEN
336                 val := SUBSTRING( val, ff_pos.start_pos + 1, ff_pos.length );
337                 RETURN val;
338             END IF;
339         ELSE
340             FOR tag_data IN SELECT value FROM UNNEST( oils_xpath( '//*[@tag="' || UPPER(ff_pos.tag) || '"]/text()', marc ) ) x(value) LOOP
341                 val := SUBSTRING( tag_data.value, ff_pos.start_pos + 1, ff_pos.length );
342                 RETURN val;
343             END LOOP;
344         END IF;
345         CONTINUE WHEN NOT use_default;
346         val := REPEAT( ff_pos.default_val, ff_pos.length );
347         RETURN val;
348     END LOOP;
349
350     RETURN NULL;
351 END;
352 $func$ LANGUAGE PLPGSQL;
353
354 CREATE TYPE biblio.marc21_physical_characteristics AS ( id INT, record BIGINT, ptype TEXT, subfield INT, value INT );
355 CREATE OR REPLACE FUNCTION vandelay.marc21_physical_characteristics( marc TEXT) RETURNS SETOF biblio.marc21_physical_characteristics AS $func$
356 DECLARE
357     rowid   INT := 0;
358     _007    TEXT;
359     ptype   config.marc21_physical_characteristic_type_map%ROWTYPE;
360     psf     config.marc21_physical_characteristic_subfield_map%ROWTYPE;
361     pval    config.marc21_physical_characteristic_value_map%ROWTYPE;
362     retval  biblio.marc21_physical_characteristics%ROWTYPE;
363 BEGIN
364
365     FOR _007 IN SELECT oils_xpath_string('//*', value) FROM UNNEST(oils_xpath('//*[@tag="007"]', marc)) x(value) LOOP
366         IF _007 IS NOT NULL AND _007 <> '' THEN
367             SELECT * INTO ptype FROM config.marc21_physical_characteristic_type_map WHERE ptype_key = SUBSTRING( _007, 1, 1 );
368
369             IF ptype.ptype_key IS NOT NULL THEN
370                 FOR psf IN SELECT * FROM config.marc21_physical_characteristic_subfield_map WHERE ptype_key = ptype.ptype_key LOOP
371                     SELECT * INTO pval FROM config.marc21_physical_characteristic_value_map WHERE ptype_subfield = psf.id AND value = SUBSTRING( _007, psf.start_pos + 1, psf.length );
372
373                     IF pval.id IS NOT NULL THEN
374                         rowid := rowid + 1;
375                         retval.id := rowid;
376                         retval.ptype := ptype.ptype_key;
377                         retval.subfield := psf.id;
378                         retval.value := pval.id;
379                         RETURN NEXT retval;
380                     END IF;
381
382                 END LOOP;
383             END IF;
384         END IF;
385     END LOOP;
386
387     RETURN;
388 END;
389 $func$ LANGUAGE PLPGSQL;
390
391 CREATE TYPE vandelay.flat_marc AS ( tag CHAR(3), ind1 TEXT, ind2 TEXT, subfield TEXT, value TEXT );
392 CREATE OR REPLACE FUNCTION vandelay.flay_marc ( TEXT ) RETURNS SETOF vandelay.flat_marc AS $func$
393
394 use MARC::Record;
395 use MARC::File::XML (BinaryEncoding => 'UTF-8');
396 use MARC::Charset;
397 use strict;
398
399 MARC::Charset->assume_unicode(1);
400
401 my $xml = shift;
402 my $r = MARC::Record->new_from_xml( $xml );
403
404 return_next( { tag => 'LDR', value => $r->leader } );
405
406 for my $f ( $r->fields ) {
407     if ($f->is_control_field) {
408         return_next({ tag => $f->tag, value => $f->data });
409     } else {
410         for my $s ($f->subfields) {
411             return_next({
412                 tag      => $f->tag,
413                 ind1     => $f->indicator(1),
414                 ind2     => $f->indicator(2),
415                 subfield => $s->[0],
416                 value    => $s->[1]
417             });
418
419             if ( $f->tag eq '245' and $s->[0] eq 'a' ) {
420                 my $trim = $f->indicator(2) || 0;
421                 return_next({
422                     tag      => 'tnf',
423                     ind1     => $f->indicator(1),
424                     ind2     => $f->indicator(2),
425                     subfield => 'a',
426                     value    => substr( $s->[1], $trim )
427                 });
428             }
429         }
430     }
431 }
432
433 return undef;
434
435 $func$ LANGUAGE PLPERLU;
436
437 CREATE OR REPLACE FUNCTION vandelay.flatten_marc ( marc TEXT ) RETURNS SETOF vandelay.flat_marc AS $func$
438 DECLARE
439     output  vandelay.flat_marc%ROWTYPE;
440     field   RECORD;
441 BEGIN
442     FOR field IN SELECT * FROM vandelay.flay_marc( marc ) LOOP
443         output.ind1 := field.ind1;
444         output.ind2 := field.ind2;
445         output.tag := field.tag;
446         output.subfield := field.subfield;
447         IF field.subfield IS NOT NULL AND field.tag NOT IN ('020','022','024') THEN -- exclude standard numbers and control fields
448             output.value := naco_normalize(field.value, field.subfield);
449         ELSE
450             output.value := field.value;
451         END IF;
452
453         CONTINUE WHEN output.value IS NULL;
454
455         RETURN NEXT output;
456     END LOOP;
457 END;
458 $func$ LANGUAGE PLPGSQL;
459
460 CREATE OR REPLACE FUNCTION vandelay.extract_rec_attrs ( xml TEXT, attr_defs TEXT[]) RETURNS hstore AS $_$
461 DECLARE
462     transformed_xml TEXT;
463     prev_xfrm       TEXT;
464     normalizer      RECORD;
465     xfrm            config.xml_transform%ROWTYPE;
466     attr_value      TEXT;
467     new_attrs       HSTORE := ''::HSTORE;
468     attr_def        config.record_attr_definition%ROWTYPE;
469 BEGIN
470
471     FOR attr_def IN SELECT * FROM config.record_attr_definition WHERE name IN (SELECT * FROM UNNEST(attr_defs)) ORDER BY format LOOP
472
473         IF attr_def.tag IS NOT NULL THEN -- tag (and optional subfield list) selection
474             SELECT  STRING_AGG(x.value, COALESCE(attr_def.joiner,' ')) INTO attr_value
475               FROM  vandelay.flatten_marc(xml) AS x
476               WHERE x.tag LIKE attr_def.tag
477                     AND CASE
478                         WHEN attr_def.sf_list IS NOT NULL
479                             THEN POSITION(x.subfield IN attr_def.sf_list) > 0
480                         ELSE TRUE
481                         END
482               GROUP BY x.tag
483               ORDER BY x.tag
484               LIMIT 1;
485
486         ELSIF attr_def.fixed_field IS NOT NULL THEN -- a named fixed field, see config.marc21_ff_pos_map.fixed_field
487             attr_value := vandelay.marc21_extract_fixed_field(xml, attr_def.fixed_field);
488
489         ELSIF attr_def.xpath IS NOT NULL THEN -- and xpath expression
490
491             SELECT INTO xfrm * FROM config.xml_transform WHERE name = attr_def.format;
492
493             -- See if we can skip the XSLT ... it's expensive
494             IF prev_xfrm IS NULL OR prev_xfrm <> xfrm.name THEN
495                 -- Can't skip the transform
496                 IF xfrm.xslt <> '---' THEN
497                     transformed_xml := oils_xslt_process(xml,xfrm.xslt);
498                 ELSE
499                     transformed_xml := xml;
500                 END IF;
501
502                 prev_xfrm := xfrm.name;
503             END IF;
504
505             IF xfrm.name IS NULL THEN
506                 -- just grab the marcxml (empty) transform
507                 SELECT INTO xfrm * FROM config.xml_transform WHERE xslt = '---' LIMIT 1;
508                 prev_xfrm := xfrm.name;
509             END IF;
510
511             attr_value := oils_xpath_string(attr_def.xpath, transformed_xml, COALESCE(attr_def.joiner,' '), ARRAY[ARRAY[xfrm.prefix, xfrm.namespace_uri]]);
512
513         ELSIF attr_def.phys_char_sf IS NOT NULL THEN -- a named Physical Characteristic, see config.marc21_physical_characteristic_*_map
514             SELECT  m.value::TEXT INTO attr_value
515               FROM  vandelay.marc21_physical_characteristics(xml) v
516                     JOIN config.marc21_physical_characteristic_value_map m ON (m.id = v.value)
517               WHERE v.subfield = attr_def.phys_char_sf
518               LIMIT 1; -- Just in case ...
519
520         END IF;
521
522         -- apply index normalizers to attr_value
523         FOR normalizer IN
524             SELECT  n.func AS func,
525                     n.param_count AS param_count,
526                     m.params AS params
527               FROM  config.index_normalizer n
528                     JOIN config.record_attr_index_norm_map m ON (m.norm = n.id)
529               WHERE attr = attr_def.name
530               ORDER BY m.pos LOOP
531                 EXECUTE 'SELECT ' || normalizer.func || '(' ||
532                     quote_nullable( attr_value ) ||
533                     CASE
534                         WHEN normalizer.param_count > 0
535                             THEN ',' || REPLACE(REPLACE(BTRIM(normalizer.params,'[]'),E'\'',E'\\\''),E'"',E'\'')
536                             ELSE ''
537                         END ||
538                     ')' INTO attr_value;
539
540         END LOOP;
541
542         -- Add the new value to the hstore
543         new_attrs := new_attrs || hstore( attr_def.name, attr_value );
544
545     END LOOP;
546
547     RETURN new_attrs;
548 END;
549 $_$ LANGUAGE PLPGSQL;
550
551 CREATE OR REPLACE FUNCTION vandelay.extract_rec_attrs ( xml TEXT ) RETURNS hstore AS $_$
552     SELECT vandelay.extract_rec_attrs( $1, (SELECT ARRAY_AGG(name) FROM config.record_attr_definition));
553 $_$ LANGUAGE SQL;
554
555 -- Everything between this comment and the beginning of the definition of
556 -- vandelay.match_bib_record() is strictly in service of that function.
557 CREATE TYPE vandelay.match_set_test_result AS (record BIGINT, quality INTEGER);
558
559 CREATE OR REPLACE FUNCTION vandelay.match_set_test_marcxml(
560     match_set_id INTEGER, record_xml TEXT, bucket_id INTEGER 
561 ) RETURNS SETOF vandelay.match_set_test_result AS $$
562 DECLARE
563     tags_rstore HSTORE;
564     svf_rstore  HSTORE;
565     coal        TEXT;
566     joins       TEXT;
567     query_      TEXT;
568     wq          TEXT;
569     qvalue      INTEGER;
570     rec         RECORD;
571 BEGIN
572     tags_rstore := vandelay.flatten_marc_hstore(record_xml);
573     svf_rstore := vandelay.extract_rec_attrs(record_xml);
574
575     CREATE TEMPORARY TABLE _vandelay_tmp_qrows (q INTEGER);
576     CREATE TEMPORARY TABLE _vandelay_tmp_jrows (j TEXT);
577
578     -- generate the where clause and return that directly (into wq), and as
579     -- a side-effect, populate the _vandelay_tmp_[qj]rows tables.
580     wq := vandelay.get_expr_from_match_set(match_set_id, tags_rstore);
581
582     query_ := 'SELECT DISTINCT(record), ';
583
584     -- qrows table is for the quality bits we add to the SELECT clause
585     SELECT STRING_AGG(
586         'COALESCE(n' || q::TEXT || '.quality, 0)', ' + '
587     ) INTO coal FROM _vandelay_tmp_qrows;
588
589     -- our query string so far is the SELECT clause and the inital FROM.
590     -- no JOINs yet nor the WHERE clause
591     query_ := query_ || coal || ' AS quality ' || E'\n';
592
593     -- jrows table is for the joins we must make (and the real text conditions)
594     SELECT STRING_AGG(j, E'\n') INTO joins
595         FROM _vandelay_tmp_jrows;
596
597     -- add those joins and the where clause to our query.
598     query_ := query_ || joins || E'\n';
599
600     -- join the record bucket
601     IF bucket_id IS NOT NULL THEN
602         query_ := query_ || 'JOIN container.biblio_record_entry_bucket_item ' ||
603             'brebi ON (brebi.target_biblio_record_entry = record ' ||
604             'AND brebi.bucket = ' || bucket_id || E')\n';
605     END IF;
606
607     query_ := query_ || 'JOIN biblio.record_entry bre ON (bre.id = record) ' || 'WHERE ' || wq || ' AND not bre.deleted';
608
609     -- this will return rows of record,quality
610     FOR rec IN EXECUTE query_ USING tags_rstore, svf_rstore LOOP
611         RETURN NEXT rec;
612     END LOOP;
613
614     DROP TABLE _vandelay_tmp_qrows;
615     DROP TABLE _vandelay_tmp_jrows;
616     RETURN;
617 END;
618 $$ LANGUAGE PLPGSQL;
619
620
621 CREATE OR REPLACE FUNCTION vandelay.flatten_marc_hstore(
622     record_xml TEXT
623 ) RETURNS HSTORE AS $func$
624 BEGIN
625     RETURN (SELECT
626         HSTORE(
627             ARRAY_AGG(tag || (COALESCE(subfield, ''))),
628             ARRAY_AGG(value)
629         )
630         FROM (
631             SELECT  tag, subfield, ARRAY_AGG(value)::TEXT AS value
632               FROM  (SELECT tag,
633                             subfield,
634                             CASE WHEN tag = '020' THEN -- caseless -- isbn
635                                 LOWER((REGEXP_MATCH(value,$$^(\S{10,17})$$))[1] || '%')
636                             WHEN tag = '022' THEN -- caseless -- issn
637                                 LOWER((REGEXP_MATCH(value,$$^(\S{4}[- ]?\S{4})$$))[1] || '%')
638                             WHEN tag = '024' THEN -- caseless -- upc (other)
639                                 LOWER(value || '%')
640                             ELSE
641                                 value
642                             END AS value
643                       FROM  vandelay.flatten_marc(record_xml)) x
644                 GROUP BY tag, subfield ORDER BY tag, subfield
645         ) subquery
646     );
647 END;
648 $func$ LANGUAGE PLPGSQL;
649
650 -- backwards compat version so we don't have 
651 -- to modify vandelay.match_set_test_marcxml()
652 CREATE OR REPLACE FUNCTION vandelay.get_expr_from_match_set(
653     match_set_id INTEGER,
654     tags_rstore HSTORE
655 ) RETURNS TEXT AS $$
656 BEGIN
657     RETURN vandelay.get_expr_from_match_set(
658         match_set_id, tags_rstore, NULL);
659 END;
660 $$  LANGUAGE PLPGSQL;
661
662 CREATE OR REPLACE FUNCTION vandelay.get_expr_from_match_set(
663     match_set_id INTEGER,
664     tags_rstore HSTORE,
665     auth_heading TEXT
666 ) RETURNS TEXT AS $$
667 DECLARE
668     root vandelay.match_set_point;
669 BEGIN
670     SELECT * INTO root FROM vandelay.match_set_point
671         WHERE parent IS NULL AND match_set = match_set_id;
672
673     RETURN vandelay.get_expr_from_match_set_point(
674         root, tags_rstore, auth_heading);
675 END;
676 $$  LANGUAGE PLPGSQL;
677
678 CREATE OR REPLACE FUNCTION vandelay.get_expr_from_match_set_point(
679     node vandelay.match_set_point,
680     tags_rstore HSTORE,
681     auth_heading TEXT
682 ) RETURNS TEXT AS $$
683 DECLARE
684     q           TEXT;
685     i           INTEGER;
686     this_op     TEXT;
687     children    INTEGER[];
688     child       vandelay.match_set_point;
689 BEGIN
690     SELECT ARRAY_AGG(id) INTO children FROM vandelay.match_set_point
691         WHERE parent = node.id;
692
693     IF ARRAY_LENGTH(children, 1) > 0 THEN
694         this_op := vandelay._get_expr_render_one(node);
695         q := '(';
696         i := 1;
697         WHILE children[i] IS NOT NULL LOOP
698             SELECT * INTO child FROM vandelay.match_set_point
699                 WHERE id = children[i];
700             IF i > 1 THEN
701                 q := q || ' ' || this_op || ' ';
702             END IF;
703             i := i + 1;
704             q := q || vandelay.get_expr_from_match_set_point(
705                 child, tags_rstore, auth_heading);
706         END LOOP;
707         q := q || ')';
708         RETURN q;
709     ELSIF node.bool_op IS NULL THEN
710         PERFORM vandelay._get_expr_push_qrow(node);
711         PERFORM vandelay._get_expr_push_jrow(node, tags_rstore, auth_heading);
712         RETURN vandelay._get_expr_render_one(node);
713     ELSE
714         RETURN '';
715     END IF;
716 END;
717 $$  LANGUAGE PLPGSQL;
718
719 CREATE OR REPLACE FUNCTION vandelay._get_expr_push_qrow(
720     node vandelay.match_set_point
721 ) RETURNS VOID AS $$
722 DECLARE
723 BEGIN
724     INSERT INTO _vandelay_tmp_qrows (q) VALUES (node.id);
725 END;
726 $$ LANGUAGE PLPGSQL;
727
728 CREATE OR REPLACE FUNCTION vandelay._get_expr_push_jrow(
729     node vandelay.match_set_point,
730     tags_rstore HSTORE,
731     auth_heading TEXT
732 ) RETURNS VOID AS $$
733 DECLARE
734     jrow        TEXT;
735     my_alias    TEXT;
736     op          TEXT;
737     tagkey      TEXT;
738     caseless    BOOL;
739     jrow_count  INT;
740     my_using    TEXT;
741     my_join     TEXT;
742     rec_table   TEXT;
743 BEGIN
744     -- remember $1 is tags_rstore, and $2 is svf_rstore
745     -- a non-NULL auth_heading means we're matching authority records
746
747     IF auth_heading IS NOT NULL THEN
748         rec_table := 'authority.full_rec';
749     ELSE
750         rec_table := 'metabib.full_rec';
751     END IF;
752
753     caseless := FALSE;
754     SELECT COUNT(*) INTO jrow_count FROM _vandelay_tmp_jrows;
755     IF jrow_count > 0 THEN
756         my_using := ' USING (record)';
757         my_join := 'FULL OUTER JOIN';
758     ELSE
759         my_using := '';
760         my_join := 'FROM';
761     END IF;
762
763     IF node.tag IS NOT NULL THEN
764         caseless := (node.tag IN ('020', '022', '024'));
765         tagkey := node.tag;
766         IF node.subfield IS NOT NULL THEN
767             tagkey := tagkey || node.subfield;
768         END IF;
769     END IF;
770
771     IF node.negate THEN
772         IF caseless THEN
773             op := 'NOT LIKE';
774         ELSE
775             op := '<>';
776         END IF;
777     ELSE
778         IF caseless THEN
779             op := 'LIKE';
780         ELSE
781             op := '=';
782         END IF;
783     END IF;
784
785     my_alias := 'n' || node.id::TEXT;
786
787     jrow := my_join || ' (SELECT *, ';
788     IF node.tag IS NOT NULL THEN
789         jrow := jrow  || node.quality ||
790             ' AS quality FROM ' || rec_table || ' mfr WHERE mfr.tag = ''' ||
791             node.tag || '''';
792         IF node.subfield IS NOT NULL THEN
793             jrow := jrow || ' AND mfr.subfield = ''' ||
794                 node.subfield || '''';
795         END IF;
796         jrow := jrow || ' AND (';
797         jrow := jrow || vandelay._node_tag_comparisons(caseless, op, tags_rstore, tagkey);
798         jrow := jrow || ')) ' || my_alias || my_using || E'\n';
799     ELSE    -- svf
800         IF auth_heading IS NOT NULL THEN -- authority record
801             IF node.heading AND auth_heading <> '' THEN
802                 jrow := jrow || 'id AS record, ' || node.quality ||
803                 ' AS quality FROM authority.record_entry are ' ||
804                 ' WHERE are.heading = ''' || auth_heading || '''';
805                 jrow := jrow || ') ' || my_alias || my_using || E'\n';
806             END IF;
807         ELSE -- bib record
808             jrow := jrow || 'id AS record, ' || node.quality ||
809                 ' AS quality FROM metabib.record_attr_flat mraf WHERE mraf.attr = ''' ||
810                 node.svf || ''' AND mraf.value ' || op || ' $2->''' || node.svf || ''') ' ||
811                 my_alias || my_using || E'\n';
812         END IF;
813     END IF;
814     INSERT INTO _vandelay_tmp_jrows (j) VALUES (jrow);
815 END;
816 $$ LANGUAGE PLPGSQL;
817
818 CREATE OR REPLACE FUNCTION vandelay._node_tag_comparisons(
819     caseless BOOLEAN,
820     op TEXT,
821     tags_rstore HSTORE,
822     tagkey TEXT
823 ) RETURNS TEXT AS $$
824 DECLARE
825     result  TEXT;
826     i       INT;
827     vals    TEXT[];
828 BEGIN
829     i := 1;
830     vals := tags_rstore->tagkey;
831     result := '';
832
833     WHILE TRUE LOOP
834         IF i > 1 THEN
835             IF vals[i] IS NULL THEN
836                 EXIT;
837             ELSE
838                 result := result || ' OR ';
839             END IF;
840         END IF;
841
842         IF caseless THEN
843             result := result || 'LOWER(mfr.value) ' || op;
844         ELSE
845             result := result || 'mfr.value ' || op;
846         END IF;
847
848         result := result || ' ' || COALESCE('''' || vals[i] || '''', 'NULL');
849
850         IF vals[i] IS NULL THEN
851             EXIT;
852         END IF;
853         i := i + 1;
854     END LOOP;
855
856     RETURN result;
857
858 END;
859 $$ LANGUAGE PLPGSQL;
860
861 CREATE OR REPLACE FUNCTION vandelay._get_expr_render_one(
862     node vandelay.match_set_point
863 ) RETURNS TEXT AS $$
864 DECLARE
865     s           TEXT;
866 BEGIN
867     IF node.bool_op IS NOT NULL THEN
868         RETURN node.bool_op;
869     ELSE
870         RETURN '(n' || node.id::TEXT || '.id IS NOT NULL)';
871     END IF;
872 END;
873 $$ LANGUAGE PLPGSQL;
874
875 CREATE OR REPLACE FUNCTION vandelay.match_bib_record() RETURNS TRIGGER AS $func$
876 DECLARE
877     incoming_existing_id    TEXT;
878     test_result             vandelay.match_set_test_result%ROWTYPE;
879     tmp_rec                 BIGINT;
880     match_set               INT;
881     match_bucket            INT;
882 BEGIN
883     IF TG_OP IN ('INSERT','UPDATE') AND NEW.imported_as IS NOT NULL THEN
884         RETURN NEW;
885     END IF;
886
887     DELETE FROM vandelay.bib_match WHERE queued_record = NEW.id;
888
889     SELECT q.match_set INTO match_set FROM vandelay.bib_queue q WHERE q.id = NEW.queue;
890
891     IF match_set IS NOT NULL THEN
892         NEW.quality := vandelay.measure_record_quality( NEW.marc, match_set );
893     END IF;
894
895     -- Perfect matches on 901$c exit early with a match with high quality.
896     incoming_existing_id :=
897         oils_xpath_string('//*[@tag="901"]/*[@code="c"][1]', NEW.marc);
898
899     IF incoming_existing_id IS NOT NULL AND incoming_existing_id != '' THEN
900         SELECT id INTO tmp_rec FROM biblio.record_entry WHERE id = incoming_existing_id::bigint;
901         IF tmp_rec IS NOT NULL THEN
902             INSERT INTO vandelay.bib_match (queued_record, eg_record, match_score, quality) 
903                 SELECT
904                     NEW.id, 
905                     b.id,
906                     9999,
907                     -- note: no match_set means quality==0
908                     vandelay.measure_record_quality( b.marc, match_set )
909                 FROM biblio.record_entry b
910                 WHERE id = incoming_existing_id::bigint;
911         END IF;
912     END IF;
913
914     IF match_set IS NULL THEN
915         RETURN NEW;
916     END IF;
917
918     SELECT q.match_bucket INTO match_bucket FROM vandelay.bib_queue q WHERE q.id = NEW.queue;
919
920     FOR test_result IN SELECT * FROM
921         vandelay.match_set_test_marcxml(match_set, NEW.marc, match_bucket) LOOP
922
923         INSERT INTO vandelay.bib_match ( queued_record, eg_record, match_score, quality )
924             SELECT  
925                 NEW.id,
926                 test_result.record,
927                 test_result.quality,
928                 vandelay.measure_record_quality( b.marc, match_set )
929                 FROM  biblio.record_entry b
930                 WHERE id = test_result.record;
931
932     END LOOP;
933
934     RETURN NEW;
935 END;
936 $func$ LANGUAGE PLPGSQL;
937
938 CREATE OR REPLACE FUNCTION vandelay.measure_record_quality ( xml TEXT, match_set_id INT ) RETURNS INT AS $_$
939 DECLARE
940     out_q   INT := 0;
941     rvalue  TEXT;
942     test    vandelay.match_set_quality%ROWTYPE;
943 BEGIN
944
945     FOR test IN SELECT * FROM vandelay.match_set_quality WHERE match_set = match_set_id LOOP
946         IF test.tag IS NOT NULL THEN
947             FOR rvalue IN SELECT value FROM vandelay.flatten_marc( xml ) WHERE tag = test.tag AND subfield = test.subfield LOOP
948                 IF test.value = rvalue THEN
949                     out_q := out_q + test.quality;
950                 END IF;
951             END LOOP;
952         ELSE
953             IF test.value = vandelay.extract_rec_attrs(xml, ARRAY[test.svf]) -> test.svf THEN
954                 out_q := out_q + test.quality;
955             END IF;
956         END IF;
957     END LOOP;
958
959     RETURN out_q;
960 END;
961 $_$ LANGUAGE PLPGSQL;
962
963 CREATE TYPE vandelay.tcn_data AS (tcn TEXT, tcn_source TEXT, used BOOL);
964 CREATE OR REPLACE FUNCTION vandelay.find_bib_tcn_data ( xml TEXT ) RETURNS SETOF vandelay.tcn_data AS $_$
965 DECLARE
966     eg_tcn          TEXT;
967     eg_tcn_source   TEXT;
968     output          vandelay.tcn_data%ROWTYPE;
969 BEGIN
970
971     -- 001/003
972     eg_tcn := BTRIM((oils_xpath('//*[@tag="001"]/text()',xml))[1]);
973     IF eg_tcn IS NOT NULL AND eg_tcn <> '' THEN
974
975         eg_tcn_source := BTRIM((oils_xpath('//*[@tag="003"]/text()',xml))[1]);
976         IF eg_tcn_source IS NULL OR eg_tcn_source = '' THEN
977             eg_tcn_source := 'System Local';
978         END IF;
979
980         PERFORM id FROM biblio.record_entry WHERE tcn_value = eg_tcn  AND NOT deleted;
981
982         IF NOT FOUND THEN
983             output.used := FALSE;
984         ELSE
985             output.used := TRUE;
986         END IF;
987
988         output.tcn := eg_tcn;
989         output.tcn_source := eg_tcn_source;
990         RETURN NEXT output;
991
992     END IF;
993
994     -- 901 ab
995     eg_tcn := BTRIM((oils_xpath('//*[@tag="901"]/*[@code="a"]/text()',xml))[1]);
996     IF eg_tcn IS NOT NULL AND eg_tcn <> '' THEN
997
998         eg_tcn_source := BTRIM((oils_xpath('//*[@tag="901"]/*[@code="b"]/text()',xml))[1]);
999         IF eg_tcn_source IS NULL OR eg_tcn_source = '' THEN
1000             eg_tcn_source := 'System Local';
1001         END IF;
1002
1003         PERFORM id FROM biblio.record_entry WHERE tcn_value = eg_tcn  AND NOT deleted;
1004
1005         IF NOT FOUND THEN
1006             output.used := FALSE;
1007         ELSE
1008             output.used := TRUE;
1009         END IF;
1010
1011         output.tcn := eg_tcn;
1012         output.tcn_source := eg_tcn_source;
1013         RETURN NEXT output;
1014
1015     END IF;
1016
1017     -- 039 ab
1018     eg_tcn := BTRIM((oils_xpath('//*[@tag="039"]/*[@code="a"]/text()',xml))[1]);
1019     IF eg_tcn IS NOT NULL AND eg_tcn <> '' THEN
1020
1021         eg_tcn_source := BTRIM((oils_xpath('//*[@tag="039"]/*[@code="b"]/text()',xml))[1]);
1022         IF eg_tcn_source IS NULL OR eg_tcn_source = '' THEN
1023             eg_tcn_source := 'System Local';
1024         END IF;
1025
1026         PERFORM id FROM biblio.record_entry WHERE tcn_value = eg_tcn  AND NOT deleted;
1027
1028         IF NOT FOUND THEN
1029             output.used := FALSE;
1030         ELSE
1031             output.used := TRUE;
1032         END IF;
1033
1034         output.tcn := eg_tcn;
1035         output.tcn_source := eg_tcn_source;
1036         RETURN NEXT output;
1037
1038     END IF;
1039
1040     -- 020 a
1041     eg_tcn := REGEXP_REPLACE((oils_xpath('//*[@tag="020"]/*[@code="a"]/text()',xml))[1], $re$^(\w+).*?$$re$, $re$\1$re$);
1042     IF eg_tcn IS NOT NULL AND eg_tcn <> '' THEN
1043
1044         eg_tcn_source := 'ISBN';
1045
1046         PERFORM id FROM biblio.record_entry WHERE tcn_value = eg_tcn  AND NOT deleted;
1047
1048         IF NOT FOUND THEN
1049             output.used := FALSE;
1050         ELSE
1051             output.used := TRUE;
1052         END IF;
1053
1054         output.tcn := eg_tcn;
1055         output.tcn_source := eg_tcn_source;
1056         RETURN NEXT output;
1057
1058     END IF;
1059
1060     -- 022 a
1061     eg_tcn := REGEXP_REPLACE((oils_xpath('//*[@tag="022"]/*[@code="a"]/text()',xml))[1], $re$^(\w+).*?$$re$, $re$\1$re$);
1062     IF eg_tcn IS NOT NULL AND eg_tcn <> '' THEN
1063
1064         eg_tcn_source := 'ISSN';
1065
1066         PERFORM id FROM biblio.record_entry WHERE tcn_value = eg_tcn  AND NOT deleted;
1067
1068         IF NOT FOUND THEN
1069             output.used := FALSE;
1070         ELSE
1071             output.used := TRUE;
1072         END IF;
1073
1074         output.tcn := eg_tcn;
1075         output.tcn_source := eg_tcn_source;
1076         RETURN NEXT output;
1077
1078     END IF;
1079
1080     -- 010 a
1081     eg_tcn := REGEXP_REPLACE((oils_xpath('//*[@tag="010"]/*[@code="a"]/text()',xml))[1], $re$^(\w+).*?$$re$, $re$\1$re$);
1082     IF eg_tcn IS NOT NULL AND eg_tcn <> '' THEN
1083
1084         eg_tcn_source := 'LCCN';
1085
1086         PERFORM id FROM biblio.record_entry WHERE tcn_value = eg_tcn  AND NOT deleted;
1087
1088         IF NOT FOUND THEN
1089             output.used := FALSE;
1090         ELSE
1091             output.used := TRUE;
1092         END IF;
1093
1094         output.tcn := eg_tcn;
1095         output.tcn_source := eg_tcn_source;
1096         RETURN NEXT output;
1097
1098     END IF;
1099
1100     -- 035 a
1101     eg_tcn := REGEXP_REPLACE((oils_xpath('//*[@tag="035"]/*[@code="a"]/text()',xml))[1], $re$^.*?(\w+)$$re$, $re$\1$re$);
1102     IF eg_tcn IS NOT NULL AND eg_tcn <> '' THEN
1103
1104         eg_tcn_source := 'System Legacy';
1105
1106         PERFORM id FROM biblio.record_entry WHERE tcn_value = eg_tcn  AND NOT deleted;
1107
1108         IF NOT FOUND THEN
1109             output.used := FALSE;
1110         ELSE
1111             output.used := TRUE;
1112         END IF;
1113
1114         output.tcn := eg_tcn;
1115         output.tcn_source := eg_tcn_source;
1116         RETURN NEXT output;
1117
1118     END IF;
1119
1120     RETURN;
1121 END;
1122 $_$ LANGUAGE PLPGSQL;
1123
1124 CREATE OR REPLACE FUNCTION vandelay.add_field ( target_xml TEXT, source_xml TEXT, field TEXT, force_add INT ) RETURNS TEXT AS $_$
1125
1126     use MARC::Record;
1127     use MARC::File::XML (BinaryEncoding => 'UTF-8');
1128     use MARC::Charset;
1129     use strict;
1130
1131     MARC::Charset->assume_unicode(1);
1132
1133     my $target_xml = shift;
1134     my $source_xml = shift;
1135     my $field_spec = shift;
1136     my $force_add = shift || 0;
1137
1138     my $target_r = MARC::Record->new_from_xml( $target_xml );
1139     my $source_r = MARC::Record->new_from_xml( $source_xml );
1140
1141     return $target_xml unless ($target_r && $source_r);
1142
1143     my @field_list = split(',', $field_spec);
1144
1145     my %fields;
1146     for my $f (@field_list) {
1147         $f =~ s/^\s*//; $f =~ s/\s*$//;
1148         if ($f =~ /^(.{3})(\w*)(?:\[([^]]*)\])?$/) {
1149             my $field = $1;
1150             $field =~ s/\s+//;
1151             my $sf = $2;
1152             $sf =~ s/\s+//;
1153             my $match = $3;
1154             $match =~ s/^\s*//; $match =~ s/\s*$//;
1155             $fields{$field} = { sf => [ split('', $sf) ] };
1156             if ($match) {
1157                 my ($msf,$mre) = split('~', $match);
1158                 if (length($msf) > 0 and length($mre) > 0) {
1159                     $msf =~ s/^\s*//; $msf =~ s/\s*$//;
1160                     $mre =~ s/^\s*//; $mre =~ s/\s*$//;
1161                     $fields{$field}{match} = { sf => $msf, re => qr/$mre/ };
1162                 }
1163             }
1164         }
1165     }
1166
1167     for my $f ( keys %fields) {
1168         if ( @{$fields{$f}{sf}} ) {
1169             for my $from_field ($source_r->field( $f )) {
1170                 my @tos = $target_r->field( $f );
1171                 if (!@tos) {
1172                     next if (exists($fields{$f}{match}) and !$force_add);
1173                     my @new_fields = map { $_->clone } $source_r->field( $f );
1174                     $target_r->insert_fields_ordered( @new_fields );
1175                 } else {
1176                     for my $to_field (@tos) {
1177                         if (exists($fields{$f}{match})) {
1178                             next unless (grep { $_ =~ $fields{$f}{match}{re} } $to_field->subfield($fields{$f}{match}{sf}));
1179                         }
1180                         for my $old_sf ($from_field->subfields) {
1181                             $to_field->add_subfields( @$old_sf ) if grep(/$$old_sf[0]/,@{$fields{$f}{sf}});
1182                         }
1183                     }
1184                 }
1185             }
1186         } else {
1187             my @new_fields = map { $_->clone } $source_r->field( $f );
1188             $target_r->insert_fields_ordered( @new_fields );
1189         }
1190     }
1191
1192     $target_xml = $target_r->as_xml_record;
1193     $target_xml =~ s/^<\?.+?\?>$//mo;
1194     $target_xml =~ s/\n//sgo;
1195     $target_xml =~ s/>\s+</></sgo;
1196
1197     return $target_xml;
1198
1199 $_$ LANGUAGE PLPERLU;
1200
1201 CREATE OR REPLACE FUNCTION vandelay.add_field ( target_xml TEXT, source_xml TEXT, field TEXT ) RETURNS TEXT AS $_$
1202     SELECT vandelay.add_field( $1, $2, $3, 0 );
1203 $_$ LANGUAGE SQL;
1204
1205 CREATE OR REPLACE FUNCTION vandelay.strip_field ( xml TEXT, field TEXT ) RETURNS TEXT AS $_$
1206
1207     use MARC::Record;
1208     use MARC::File::XML (BinaryEncoding => 'UTF-8');
1209     use MARC::Charset;
1210     use strict;
1211
1212     MARC::Charset->assume_unicode(1);
1213
1214     my $xml = shift;
1215     my $r = MARC::Record->new_from_xml( $xml );
1216
1217     return $xml unless ($r);
1218
1219     my $field_spec = shift;
1220     my @field_list = split(',', $field_spec);
1221
1222     my %fields;
1223     for my $f (@field_list) {
1224         $f =~ s/^\s*//; $f =~ s/\s*$//;
1225         if ($f =~ /^(.{3})(\w*)(?:\[([^]]*)\])?$/) {
1226             my $field = $1;
1227             $field =~ s/\s+//;
1228             my $sf = $2;
1229             $sf =~ s/\s+//;
1230             my $match = $3;
1231             $match =~ s/^\s*//; $match =~ s/\s*$//;
1232             $fields{$field} = { sf => [ split('', $sf) ] };
1233             if ($match) {
1234                 my ($msf,$mre) = split('~', $match);
1235                 if (length($msf) > 0 and length($mre) > 0) {
1236                     $msf =~ s/^\s*//; $msf =~ s/\s*$//;
1237                     $mre =~ s/^\s*//; $mre =~ s/\s*$//;
1238                     $fields{$field}{match} = { sf => $msf, re => qr/$mre/ };
1239                 }
1240             }
1241         }
1242     }
1243
1244     for my $f ( keys %fields) {
1245         for my $to_field ($r->field( $f )) {
1246             if (exists($fields{$f}{match})) {
1247                 next unless (grep { $_ =~ $fields{$f}{match}{re} } $to_field->subfield($fields{$f}{match}{sf}));
1248             }
1249
1250             if ( @{$fields{$f}{sf}} ) {
1251                 $to_field->delete_subfield(code => $fields{$f}{sf});
1252             } else {
1253                 $r->delete_field( $to_field );
1254             }
1255         }
1256     }
1257
1258     $xml = $r->as_xml_record;
1259     $xml =~ s/^<\?.+?\?>$//mo;
1260     $xml =~ s/\n//sgo;
1261     $xml =~ s/>\s+</></sgo;
1262
1263     return $xml;
1264
1265 $_$ LANGUAGE PLPERLU;
1266
1267 CREATE OR REPLACE FUNCTION vandelay.replace_field 
1268     (target_xml TEXT, source_xml TEXT, field TEXT) RETURNS TEXT AS $_$
1269
1270     use strict;
1271     use MARC::Record;
1272     use MARC::Field;
1273     use MARC::File::XML (BinaryEncoding => 'UTF-8');
1274     use MARC::Charset;
1275
1276     MARC::Charset->assume_unicode(1);
1277
1278     my $target_xml = shift;
1279     my $source_xml = shift;
1280     my $field_spec = shift;
1281
1282     my $target_r = MARC::Record->new_from_xml($target_xml);
1283     my $source_r = MARC::Record->new_from_xml($source_xml);
1284
1285     return $target_xml unless $target_r && $source_r;
1286
1287     # Extract the field_spec components into MARC tags, subfields, 
1288     # and regex matches.  Copied wholesale from vandelay.strip_field()
1289
1290     my @field_list = split(',', $field_spec);
1291     my %fields;
1292     for my $f (@field_list) {
1293         $f =~ s/^\s*//; $f =~ s/\s*$//;
1294         if ($f =~ /^(.{3})(\w*)(?:\[([^]]*)\])?$/) {
1295             my $field = $1;
1296             $field =~ s/\s+//;
1297             my $sf = $2;
1298             $sf =~ s/\s+//;
1299             my $match = $3;
1300             $match =~ s/^\s*//; $match =~ s/\s*$//;
1301             $fields{$field} = { sf => [ split('', $sf) ] };
1302             if ($match) {
1303                 my ($msf,$mre) = split('~', $match);
1304                 if (length($msf) > 0 and length($mre) > 0) {
1305                     $msf =~ s/^\s*//; $msf =~ s/\s*$//;
1306                     $mre =~ s/^\s*//; $mre =~ s/\s*$//;
1307                     $fields{$field}{match} = { sf => $msf, re => qr/$mre/ };
1308                 }
1309             }
1310         }
1311     }
1312
1313     # Returns a flat list of subfield (code, value, code, value, ...)
1314     # suitable for adding to a MARC::Field.
1315     sub generate_replacement_subfields {
1316         my ($source_field, $target_field, @controlled_subfields) = @_;
1317
1318         # Performing a wholesale field replacment.  
1319         # Use the entire source field as-is.
1320         return map {$_->[0], $_->[1]} $source_field->subfields
1321             unless @controlled_subfields;
1322
1323         my @new_subfields;
1324
1325         # Iterate over all target field subfields:
1326         # 1. Keep uncontrolled subfields as is.
1327         # 2. Replace values for controlled subfields when a
1328         #    replacement value exists on the source record.
1329         # 3. Delete values for controlled subfields when no 
1330         #    replacement value exists on the source record.
1331
1332         for my $target_sf ($target_field->subfields) {
1333             my $subfield = $target_sf->[0];
1334             my $target_val = $target_sf->[1];
1335
1336             if (grep {$_ eq $subfield} @controlled_subfields) {
1337                 if (my $source_val = $source_field->subfield($subfield)) {
1338                     # We have a replacement value
1339                     push(@new_subfields, $subfield, $source_val);
1340                 } else {
1341                     # no replacement value for controlled subfield, drop it.
1342                 }
1343             } else {
1344                 # Field is not controlled.  Copy it over as-is.
1345                 push(@new_subfields, $subfield, $target_val);
1346             }
1347         }
1348
1349         # Iterate over all subfields in the source field and back-fill
1350         # any values that exist only in the source field.  Insert these
1351         # subfields in the same relative position they exist in the
1352         # source field.
1353                 
1354         my @seen_subfields;
1355         for my $source_sf ($source_field->subfields) {
1356             my $subfield = $source_sf->[0];
1357             my $source_val = $source_sf->[1];
1358             push(@seen_subfields, $subfield);
1359
1360             # target field already contains this subfield, 
1361             # so it would have been addressed above.
1362             next if $target_field->subfield($subfield);
1363
1364             # Ignore uncontrolled subfields.
1365             next unless grep {$_ eq $subfield} @controlled_subfields;
1366
1367             # Adding a new subfield.  Find its relative position and add
1368             # it to the list under construction.  Work backwards from
1369             # the list of already seen subfields to find the best slot.
1370
1371             my $done = 0;
1372             for my $seen_sf (reverse(@seen_subfields)) {
1373                 my $idx = @new_subfields;
1374                 for my $new_sf (reverse(@new_subfields)) {
1375                     $idx--;
1376                     next if $idx % 2 == 1; # sf codes are in the even slots
1377
1378                     if ($new_subfields[$idx] eq $seen_sf) {
1379                         splice(@new_subfields, $idx + 2, 0, $subfield, $source_val);
1380                         $done = 1;
1381                         last;
1382                     }
1383                 }
1384                 last if $done;
1385             }
1386
1387             # if no slot was found, add to the end of the list.
1388             push(@new_subfields, $subfield, $source_val) unless $done;
1389         }
1390
1391         return @new_subfields;
1392     }
1393
1394     # MARC tag loop
1395     for my $f (keys %fields) {
1396         my $tag_idx = -1;
1397         for my $target_field ($target_r->field($f)) {
1398
1399             # field spec contains a regex for this field.  Confirm field on 
1400             # target record matches the specified regex before replacing.
1401             if (exists($fields{$f}{match})) {
1402                 next unless (grep { $_ =~ $fields{$f}{match}{re} } 
1403                     $target_field->subfield($fields{$f}{match}{sf}));
1404             }
1405
1406             my @new_subfields;
1407             my @controlled_subfields = @{$fields{$f}{sf}};
1408
1409             # If the target record has multiple matching bib fields,
1410             # replace them from matching fields on the source record
1411             # in a predictable order to avoid replacing with them with
1412             # same source field repeatedly.
1413             my @source_fields = $source_r->field($f);
1414             my $source_field = $source_fields[++$tag_idx];
1415
1416             if (!$source_field && @controlled_subfields) {
1417                 # When there are more target fields than source fields
1418                 # and we are replacing values for subfields and not
1419                 # performing wholesale field replacment, use the last
1420                 # available source field as the input for all remaining
1421                 # target fields.
1422                 $source_field = $source_fields[$#source_fields];
1423             }
1424
1425             if (!$source_field) {
1426                 # No source field exists.  Delete all affected target
1427                 # data.  This is a little bit counterintuitive, but is
1428                 # backwards compatible with the previous version of this
1429                 # function which first deleted all affected data, then
1430                 # replaced values where possible.
1431                 if (@controlled_subfields) {
1432                     $target_field->delete_subfield($_) for @controlled_subfields;
1433                 } else {
1434                     $target_r->delete_field($target_field);
1435                 }
1436                 next;
1437             }
1438
1439             my @new_subfields = generate_replacement_subfields(
1440                 $source_field, $target_field, @controlled_subfields);
1441
1442             # Build the replacement field from scratch.  
1443             my $replacement_field = MARC::Field->new(
1444                 $target_field->tag,
1445                 $target_field->indicator(1),
1446                 $target_field->indicator(2),
1447                 @new_subfields
1448             );
1449
1450             $target_field->replace_with($replacement_field);
1451         }
1452     }
1453
1454     $target_xml = $target_r->as_xml_record;
1455     $target_xml =~ s/^<\?.+?\?>$//mo;
1456     $target_xml =~ s/\n//sgo;
1457     $target_xml =~ s/>\s+</></sgo;
1458
1459     return $target_xml;
1460
1461 $_$ LANGUAGE PLPERLU;
1462
1463 CREATE OR REPLACE FUNCTION vandelay.merge_record_xml ( target_xml TEXT, source_xml TEXT, add_rule TEXT, replace_preserve_rule TEXT, strip_rule TEXT ) RETURNS TEXT AS $_$
1464     SELECT vandelay.replace_field( vandelay.add_field( vandelay.strip_field( $1, $5) , $2, $3 ), $2, $4);
1465 $_$ LANGUAGE SQL;
1466
1467 CREATE TYPE vandelay.compile_profile AS (add_rule TEXT, replace_rule TEXT, preserve_rule TEXT, strip_rule TEXT);
1468 CREATE OR REPLACE FUNCTION vandelay.compile_profile ( incoming_xml TEXT ) RETURNS vandelay.compile_profile AS $_$
1469 DECLARE
1470     output              vandelay.compile_profile%ROWTYPE;
1471     profile             vandelay.merge_profile%ROWTYPE;
1472     profile_tmpl        TEXT;
1473     profile_tmpl_owner  TEXT;
1474     add_rule            TEXT := '';
1475     strip_rule          TEXT := '';
1476     replace_rule        TEXT := '';
1477     preserve_rule       TEXT := '';
1478
1479 BEGIN
1480
1481     profile_tmpl := (oils_xpath('//*[@tag="905"]/*[@code="t"]/text()',incoming_xml))[1];
1482     profile_tmpl_owner := (oils_xpath('//*[@tag="905"]/*[@code="o"]/text()',incoming_xml))[1];
1483
1484     IF profile_tmpl IS NOT NULL AND profile_tmpl <> '' AND profile_tmpl_owner IS NOT NULL AND profile_tmpl_owner <> '' THEN
1485         SELECT  p.* INTO profile
1486           FROM  vandelay.merge_profile p
1487                 JOIN actor.org_unit u ON (u.id = p.owner)
1488           WHERE p.name = profile_tmpl
1489                 AND u.shortname = profile_tmpl_owner;
1490
1491         IF profile.id IS NOT NULL THEN
1492             add_rule := COALESCE(profile.add_spec,'');
1493             strip_rule := COALESCE(profile.strip_spec,'');
1494             replace_rule := COALESCE(profile.replace_spec,'');
1495             preserve_rule := COALESCE(profile.preserve_spec,'');
1496         END IF;
1497     END IF;
1498
1499     add_rule := add_rule || ',' || COALESCE(ARRAY_TO_STRING(oils_xpath('//*[@tag="905"]/*[@code="a"]/text()',incoming_xml),','),'');
1500     strip_rule := strip_rule || ',' || COALESCE(ARRAY_TO_STRING(oils_xpath('//*[@tag="905"]/*[@code="d"]/text()',incoming_xml),','),'');
1501     replace_rule := replace_rule || ',' || COALESCE(ARRAY_TO_STRING(oils_xpath('//*[@tag="905"]/*[@code="r"]/text()',incoming_xml),','),'');
1502     preserve_rule := preserve_rule || ',' || COALESCE(ARRAY_TO_STRING(oils_xpath('//*[@tag="905"]/*[@code="p"]/text()',incoming_xml),','),'');
1503
1504     output.add_rule := BTRIM(add_rule,',');
1505     output.replace_rule := BTRIM(replace_rule,',');
1506     output.strip_rule := BTRIM(strip_rule,',');
1507     output.preserve_rule := BTRIM(preserve_rule,',');
1508
1509     RETURN output;
1510 END;
1511 $_$ LANGUAGE PLPGSQL;
1512
1513 CREATE OR REPLACE FUNCTION vandelay.template_overlay_bib_record ( v_marc TEXT, eg_id BIGINT, merge_profile_id INT ) RETURNS BOOL AS $$
1514 DECLARE
1515     merge_profile   vandelay.merge_profile%ROWTYPE;
1516     dyn_profile     vandelay.compile_profile%ROWTYPE;
1517     editor_string   TEXT;
1518     editor_id       INT;
1519     source_marc     TEXT;
1520     target_marc     TEXT;
1521     eg_marc         TEXT;
1522     replace_rule    TEXT;
1523     match_count     INT;
1524 BEGIN
1525
1526     SELECT  b.marc INTO eg_marc
1527       FROM  biblio.record_entry b
1528       WHERE b.id = eg_id
1529       LIMIT 1;
1530
1531     IF eg_marc IS NULL OR v_marc IS NULL THEN
1532         -- RAISE NOTICE 'no marc for template or bib record';
1533         RETURN FALSE;
1534     END IF;
1535
1536     dyn_profile := vandelay.compile_profile( v_marc );
1537
1538     IF merge_profile_id IS NOT NULL THEN
1539         SELECT * INTO merge_profile FROM vandelay.merge_profile WHERE id = merge_profile_id;
1540         IF FOUND THEN
1541             dyn_profile.add_rule := BTRIM( dyn_profile.add_rule || ',' || COALESCE(merge_profile.add_spec,''), ',');
1542             dyn_profile.strip_rule := BTRIM( dyn_profile.strip_rule || ',' || COALESCE(merge_profile.strip_spec,''), ',');
1543             dyn_profile.replace_rule := BTRIM( dyn_profile.replace_rule || ',' || COALESCE(merge_profile.replace_spec,''), ',');
1544             dyn_profile.preserve_rule := BTRIM( dyn_profile.preserve_rule || ',' || COALESCE(merge_profile.preserve_spec,''), ',');
1545         END IF;
1546     END IF;
1547
1548     IF dyn_profile.replace_rule <> '' AND dyn_profile.preserve_rule <> '' THEN
1549         -- RAISE NOTICE 'both replace [%] and preserve [%] specified', dyn_profile.replace_rule, dyn_profile.preserve_rule;
1550         RETURN FALSE;
1551     END IF;
1552
1553     IF dyn_profile.replace_rule = '' AND dyn_profile.preserve_rule = '' AND dyn_profile.add_rule = '' AND dyn_profile.strip_rule = '' THEN
1554         --Since we have nothing to do, just return a NOOP "we did it"
1555         RETURN TRUE;
1556     ELSIF dyn_profile.replace_rule <> '' THEN
1557         source_marc = v_marc;
1558         target_marc = eg_marc;
1559         replace_rule = dyn_profile.replace_rule;
1560     ELSE
1561         source_marc = eg_marc;
1562         target_marc = v_marc;
1563         replace_rule = dyn_profile.preserve_rule;
1564     END IF;
1565
1566     UPDATE  biblio.record_entry
1567       SET   marc = vandelay.merge_record_xml( target_marc, source_marc, dyn_profile.add_rule, replace_rule, dyn_profile.strip_rule )
1568       WHERE id = eg_id;
1569
1570     IF NOT FOUND THEN
1571         -- RAISE NOTICE 'update of biblio.record_entry failed';
1572         RETURN FALSE;
1573     END IF;
1574
1575     RETURN TRUE;
1576
1577 END;
1578 $$ LANGUAGE PLPGSQL;
1579
1580 CREATE OR REPLACE FUNCTION vandelay.merge_record_xml ( target_marc TEXT, template_marc TEXT ) RETURNS TEXT AS $$
1581 DECLARE
1582     dyn_profile     vandelay.compile_profile%ROWTYPE;
1583     replace_rule    TEXT;
1584     tmp_marc        TEXT;
1585     trgt_marc        TEXT;
1586     tmpl_marc        TEXT;
1587     match_count     INT;
1588 BEGIN
1589
1590     IF target_marc IS NULL OR template_marc IS NULL THEN
1591         -- RAISE NOTICE 'no marc for target or template record';
1592         RETURN NULL;
1593     END IF;
1594
1595     dyn_profile := vandelay.compile_profile( template_marc );
1596
1597     IF dyn_profile.replace_rule <> '' AND dyn_profile.preserve_rule <> '' THEN
1598         -- RAISE NOTICE 'both replace [%] and preserve [%] specified', dyn_profile.replace_rule, dyn_profile.preserve_rule;
1599         RETURN NULL;
1600     END IF;
1601
1602     IF dyn_profile.replace_rule = '' AND dyn_profile.preserve_rule = '' AND dyn_profile.add_rule = '' AND dyn_profile.strip_rule = '' THEN
1603         --Since we have nothing to do, just return what we were given.
1604         RETURN target_marc;
1605     ELSIF dyn_profile.replace_rule <> '' THEN
1606         trgt_marc = target_marc;
1607         tmpl_marc = template_marc;
1608         replace_rule = dyn_profile.replace_rule;
1609     ELSE
1610         tmp_marc = target_marc;
1611         trgt_marc = template_marc;
1612         tmpl_marc = tmp_marc;
1613         replace_rule = dyn_profile.preserve_rule;
1614     END IF;
1615
1616     RETURN vandelay.merge_record_xml( trgt_marc, tmpl_marc, dyn_profile.add_rule, replace_rule, dyn_profile.strip_rule );
1617
1618 END;
1619 $$ LANGUAGE PLPGSQL;
1620
1621 CREATE OR REPLACE FUNCTION vandelay.merge_record_xml_using_profile ( incoming_marc TEXT, existing_marc TEXT, merge_profile_id BIGINT ) RETURNS TEXT AS $$
1622 DECLARE
1623     merge_profile   vandelay.merge_profile%ROWTYPE;
1624     dyn_profile     vandelay.compile_profile%ROWTYPE;
1625     target_marc     TEXT;
1626     source_marc     TEXT;
1627     replace_rule    TEXT;
1628     match_count     INT;
1629 BEGIN
1630
1631     IF existing_marc IS NULL OR incoming_marc IS NULL THEN
1632         -- RAISE NOTICE 'no marc for source or target records';
1633         RETURN NULL;
1634     END IF;
1635
1636     IF merge_profile_id IS NOT NULL THEN
1637         SELECT * INTO merge_profile FROM vandelay.merge_profile WHERE id = merge_profile_id;
1638         IF FOUND THEN
1639             dyn_profile.add_rule := COALESCE(merge_profile.add_spec,'');
1640             dyn_profile.strip_rule := COALESCE(merge_profile.strip_spec,'');
1641             dyn_profile.replace_rule := COALESCE(merge_profile.replace_spec,'');
1642             dyn_profile.preserve_rule := COALESCE(merge_profile.preserve_spec,'');
1643         ELSE
1644             -- RAISE NOTICE 'merge profile not found';
1645             RETURN NULL;
1646         END IF;
1647     ELSE
1648         -- RAISE NOTICE 'no merge profile specified';
1649         RETURN NULL;
1650     END IF;
1651
1652     IF dyn_profile.replace_rule <> '' AND dyn_profile.preserve_rule <> '' THEN
1653         -- RAISE NOTICE 'both replace [%] and preserve [%] specified', dyn_profile.replace_rule, dyn_profile.preserve_rule;
1654         RETURN NULL;
1655     END IF;
1656
1657     IF dyn_profile.replace_rule = '' AND dyn_profile.preserve_rule = '' AND dyn_profile.add_rule = '' AND dyn_profile.strip_rule = '' THEN
1658         -- Since we have nothing to do, just return a target record as is
1659         RETURN existing_marc;
1660     ELSIF dyn_profile.preserve_rule <> '' THEN
1661         source_marc = existing_marc;
1662         target_marc = incoming_marc;
1663         replace_rule = dyn_profile.preserve_rule;
1664     ELSE
1665         source_marc = incoming_marc;
1666         target_marc = existing_marc;
1667         replace_rule = dyn_profile.replace_rule;
1668     END IF;
1669
1670     RETURN vandelay.merge_record_xml( target_marc, source_marc, dyn_profile.add_rule, replace_rule, dyn_profile.strip_rule );
1671
1672 END;
1673 $$ LANGUAGE PLPGSQL;
1674
1675 CREATE OR REPLACE FUNCTION vandelay.template_overlay_bib_record ( v_marc TEXT, eg_id BIGINT) RETURNS BOOL AS $$
1676     SELECT vandelay.template_overlay_bib_record( $1, $2, NULL);
1677 $$ LANGUAGE SQL;
1678
1679 CREATE OR REPLACE FUNCTION vandelay.overlay_bib_record ( import_id BIGINT, eg_id BIGINT, merge_profile_id INT ) RETURNS BOOL AS $$
1680 DECLARE
1681     editor_string   TEXT;
1682     editor_id       INT;
1683     v_marc          TEXT;
1684     v_bib_source    INT;
1685     update_fields   TEXT[];
1686     update_query    TEXT;
1687     update_bib      BOOL;
1688 BEGIN
1689
1690     SELECT  q.marc, q.bib_source INTO v_marc, v_bib_source
1691       FROM  vandelay.queued_bib_record q
1692             JOIN vandelay.bib_match m ON (m.queued_record = q.id AND q.id = import_id)
1693       LIMIT 1;
1694
1695     IF v_marc IS NULL THEN
1696         -- RAISE NOTICE 'no marc for vandelay or bib record';
1697         RETURN FALSE;
1698     END IF;
1699
1700     IF vandelay.template_overlay_bib_record( v_marc, eg_id, merge_profile_id) THEN
1701         UPDATE  vandelay.queued_bib_record
1702           SET   imported_as = eg_id,
1703                 import_time = NOW()
1704           WHERE id = import_id;
1705
1706           SELECT q.update_bib_source INTO update_bib FROM vandelay.merge_profile q where q.id = merge_profile_Id;
1707
1708           IF update_bib THEN
1709                 editor_string := (oils_xpath('//*[@tag="905"]/*[@code="u"]/text()',v_marc))[1];
1710
1711                 IF editor_string IS NOT NULL AND editor_string <> '' THEN
1712                     SELECT usr INTO editor_id FROM actor.card WHERE barcode = editor_string;
1713
1714                     IF editor_id IS NULL THEN
1715                         SELECT id INTO editor_id FROM actor.usr WHERE usrname = editor_string;
1716                     END IF;
1717
1718                     IF editor_id IS NOT NULL THEN
1719                         --only update the edit date if we have a valid editor
1720                         update_fields := ARRAY_APPEND(update_fields, 'editor = ' || editor_id || ', edit_date = NOW()');
1721                     END IF;
1722                 END IF;
1723
1724                 IF v_bib_source IS NOT NULL THEN
1725                     update_fields := ARRAY_APPEND(update_fields, 'source = ' || v_bib_source);
1726                 END IF;
1727
1728                 IF ARRAY_LENGTH(update_fields, 1) > 0 THEN
1729                     update_query := 'UPDATE biblio.record_entry SET ' || ARRAY_TO_STRING(update_fields, ',') || ' WHERE id = ' || eg_id || ';';
1730                     --RAISE NOTICE 'query: %', update_query;
1731                     EXECUTE update_query;
1732                 END IF;
1733         END IF;
1734
1735         RETURN TRUE;
1736     END IF;
1737
1738     -- RAISE NOTICE 'update of biblio.record_entry failed';
1739
1740     RETURN FALSE;
1741
1742 END;
1743 $$ LANGUAGE PLPGSQL;
1744
1745 CREATE OR REPLACE FUNCTION vandelay.auto_overlay_bib_record_with_best ( import_id BIGINT, merge_profile_id INT, lwm_ratio_value_p NUMERIC ) RETURNS BOOL AS $$
1746 DECLARE
1747     eg_id           BIGINT;
1748     lwm_ratio_value NUMERIC;
1749 BEGIN
1750
1751     lwm_ratio_value := COALESCE(lwm_ratio_value_p, 0.0);
1752
1753     PERFORM * FROM vandelay.queued_bib_record WHERE import_time IS NOT NULL AND id = import_id;
1754
1755     IF FOUND THEN
1756         -- RAISE NOTICE 'already imported, cannot auto-overlay'
1757         RETURN FALSE;
1758     END IF;
1759
1760     SELECT  m.eg_record INTO eg_id
1761       FROM  vandelay.bib_match m
1762             JOIN vandelay.queued_bib_record qr ON (m.queued_record = qr.id)
1763             JOIN vandelay.bib_queue q ON (qr.queue = q.id)
1764             JOIN biblio.record_entry r ON (r.id = m.eg_record)
1765       WHERE m.queued_record = import_id
1766             AND qr.quality::NUMERIC / COALESCE(NULLIF(m.quality,0),1)::NUMERIC >= lwm_ratio_value
1767       ORDER BY  m.match_score DESC, -- required match score
1768                 qr.quality::NUMERIC / COALESCE(NULLIF(m.quality,0),1)::NUMERIC DESC, -- quality tie breaker
1769                 m.id -- when in doubt, use the first match
1770       LIMIT 1;
1771
1772     IF eg_id IS NULL THEN
1773         -- RAISE NOTICE 'incoming record is not of high enough quality';
1774         RETURN FALSE;
1775     END IF;
1776
1777     RETURN vandelay.overlay_bib_record( import_id, eg_id, merge_profile_id );
1778 END;
1779 $$ LANGUAGE PLPGSQL;
1780
1781 CREATE OR REPLACE FUNCTION vandelay.auto_overlay_bib_record_with_best ( import_id BIGINT, merge_profile_id INT ) RETURNS BOOL AS $$
1782     SELECT vandelay.auto_overlay_bib_record_with_best( $1, $2, p.lwm_ratio ) FROM vandelay.merge_profile p WHERE id = $2;
1783 $$ LANGUAGE SQL;
1784
1785 CREATE OR REPLACE FUNCTION vandelay.auto_overlay_bib_record ( import_id BIGINT, merge_profile_id INT ) RETURNS BOOL AS $$
1786 DECLARE
1787     eg_id           BIGINT;
1788     match_count     INT;
1789 BEGIN
1790
1791     PERFORM * FROM vandelay.queued_bib_record WHERE import_time IS NOT NULL AND id = import_id;
1792
1793     IF FOUND THEN
1794         -- RAISE NOTICE 'already imported, cannot auto-overlay'
1795         RETURN FALSE;
1796     END IF;
1797
1798     SELECT COUNT(*) INTO match_count FROM vandelay.bib_match WHERE queued_record = import_id;
1799
1800     IF match_count <> 1 THEN
1801         -- RAISE NOTICE 'not an exact match';
1802         RETURN FALSE;
1803     END IF;
1804
1805     -- Check that the one match is on the first 901c
1806     SELECT  m.eg_record INTO eg_id
1807       FROM  vandelay.queued_bib_record q
1808             JOIN vandelay.bib_match m ON (m.queued_record = q.id)
1809       WHERE q.id = import_id
1810             AND m.eg_record = oils_xpath_string('//*[@tag="901"]/*[@code="c"][1]',marc)::BIGINT;
1811
1812     IF NOT FOUND THEN
1813         -- RAISE NOTICE 'not a 901c match';
1814         RETURN FALSE;
1815     END IF;
1816
1817     RETURN vandelay.overlay_bib_record( import_id, eg_id, merge_profile_id );
1818 END;
1819 $$ LANGUAGE PLPGSQL;
1820
1821 CREATE OR REPLACE FUNCTION vandelay.auto_overlay_bib_queue ( queue_id BIGINT, merge_profile_id INT ) RETURNS SETOF BIGINT AS $$
1822 DECLARE
1823     queued_record   vandelay.queued_bib_record%ROWTYPE;
1824 BEGIN
1825
1826     FOR queued_record IN SELECT * FROM vandelay.queued_bib_record WHERE queue = queue_id AND import_time IS NULL LOOP
1827
1828         IF vandelay.auto_overlay_bib_record( queued_record.id, merge_profile_id ) THEN
1829             RETURN NEXT queued_record.id;
1830         END IF;
1831
1832     END LOOP;
1833
1834     RETURN;
1835     
1836 END;
1837 $$ LANGUAGE PLPGSQL;
1838
1839 CREATE OR REPLACE FUNCTION vandelay.auto_overlay_bib_queue_with_best ( queue_id BIGINT, merge_profile_id INT, lwm_ratio_value NUMERIC ) RETURNS SETOF BIGINT AS $$
1840 DECLARE
1841     queued_record   vandelay.queued_bib_record%ROWTYPE;
1842 BEGIN
1843
1844     FOR queued_record IN SELECT * FROM vandelay.queued_bib_record WHERE queue = queue_id AND import_time IS NULL LOOP
1845
1846         IF vandelay.auto_overlay_bib_record_with_best( queued_record.id, merge_profile_id, lwm_ratio_value ) THEN
1847             RETURN NEXT queued_record.id;
1848         END IF;
1849
1850     END LOOP;
1851
1852     RETURN;
1853     
1854 END;
1855 $$ LANGUAGE PLPGSQL;
1856
1857 CREATE OR REPLACE FUNCTION vandelay.auto_overlay_bib_queue_with_best ( import_id BIGINT, merge_profile_id INT ) RETURNS SETOF BIGINT AS $$
1858     SELECT vandelay.auto_overlay_bib_queue_with_best( $1, $2, p.lwm_ratio ) FROM vandelay.merge_profile p WHERE id = $2;
1859 $$ LANGUAGE SQL;
1860
1861 CREATE OR REPLACE FUNCTION vandelay.auto_overlay_bib_queue ( queue_id BIGINT ) RETURNS SETOF BIGINT AS $$
1862     SELECT * FROM vandelay.auto_overlay_bib_queue( $1, NULL );
1863 $$ LANGUAGE SQL;
1864
1865 CREATE OR REPLACE FUNCTION vandelay.ingest_bib_marc ( ) RETURNS TRIGGER AS $$
1866 DECLARE
1867     value   TEXT;
1868     atype   TEXT;
1869     adef    RECORD;
1870 BEGIN
1871     IF TG_OP IN ('INSERT','UPDATE') AND NEW.imported_as IS NOT NULL THEN
1872         RETURN NEW;
1873     END IF;
1874
1875     FOR adef IN SELECT * FROM vandelay.bib_attr_definition LOOP
1876
1877         SELECT extract_marc_field('vandelay.queued_bib_record', id, adef.xpath, adef.remove) INTO value FROM vandelay.queued_bib_record WHERE id = NEW.id;
1878         IF (value IS NOT NULL AND value <> '') THEN
1879             INSERT INTO vandelay.queued_bib_record_attr (record, field, attr_value) VALUES (NEW.id, adef.id, value);
1880         END IF;
1881
1882     END LOOP;
1883
1884     RETURN NULL;
1885 END;
1886 $$ LANGUAGE PLPGSQL;
1887
1888 CREATE OR REPLACE FUNCTION vandelay.cleanup_bib_marc ( ) RETURNS TRIGGER AS $$
1889 BEGIN
1890     IF TG_OP IN ('INSERT','UPDATE') AND NEW.imported_as IS NOT NULL THEN
1891         RETURN NEW;
1892     END IF;
1893
1894     DELETE FROM vandelay.queued_bib_record_attr WHERE record = OLD.id;
1895     DELETE FROM vandelay.import_item WHERE record = OLD.id;
1896
1897     IF TG_OP = 'UPDATE' THEN
1898         RETURN NEW;
1899     END IF;
1900     RETURN OLD;
1901 END;
1902 $$ LANGUAGE PLPGSQL;
1903
1904 CREATE TRIGGER cleanup_bib_trigger
1905     BEFORE UPDATE OR DELETE ON vandelay.queued_bib_record
1906     FOR EACH ROW EXECUTE PROCEDURE vandelay.cleanup_bib_marc();
1907
1908 CREATE TRIGGER ingest_bib_trigger
1909     AFTER INSERT OR UPDATE ON vandelay.queued_bib_record
1910     FOR EACH ROW EXECUTE PROCEDURE vandelay.ingest_bib_marc();
1911
1912 CREATE TRIGGER zz_match_bibs_trigger
1913     BEFORE INSERT OR UPDATE ON vandelay.queued_bib_record
1914     FOR EACH ROW EXECUTE PROCEDURE vandelay.match_bib_record();
1915
1916
1917 /* Authority stuff down here */
1918 ---------------------------------------
1919 CREATE TABLE vandelay.authority_attr_definition (
1920         id                      SERIAL  PRIMARY KEY,
1921         code            TEXT    UNIQUE NOT NULL,
1922         description     TEXT,
1923         xpath           TEXT    NOT NULL,
1924         remove          TEXT    NOT NULL DEFAULT ''
1925 );
1926
1927 CREATE TYPE vandelay.authority_queue_queue_type AS ENUM ('authority');
1928 CREATE TABLE vandelay.authority_queue (
1929         queue_type      vandelay.authority_queue_queue_type NOT NULL DEFAULT 'authority',
1930         CONSTRAINT vand_authority_queue_name_once_per_owner_const UNIQUE (owner,name,queue_type)
1931 ) INHERITS (vandelay.queue);
1932 ALTER TABLE vandelay.authority_queue ADD PRIMARY KEY (id);
1933
1934 CREATE TABLE vandelay.queued_authority_record (
1935         queue           INT     NOT NULL REFERENCES vandelay.authority_queue (id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
1936         imported_as     INT     REFERENCES authority.record_entry (id) DEFERRABLE INITIALLY DEFERRED,
1937         import_error    TEXT    REFERENCES vandelay.import_error (code) ON DELETE SET NULL ON UPDATE CASCADE DEFERRABLE INITIALLY DEFERRED,
1938         error_detail    TEXT
1939 ) INHERITS (vandelay.queued_record);
1940 ALTER TABLE vandelay.queued_authority_record ADD PRIMARY KEY (id);
1941 CREATE INDEX queued_authority_record_queue_idx ON vandelay.queued_authority_record (queue);
1942
1943 CREATE TABLE vandelay.queued_authority_record_attr (
1944         id                      BIGSERIAL       PRIMARY KEY,
1945         record          BIGINT          NOT NULL REFERENCES vandelay.queued_authority_record (id) DEFERRABLE INITIALLY DEFERRED,
1946         field           INT                     NOT NULL REFERENCES vandelay.authority_attr_definition (id) DEFERRABLE INITIALLY DEFERRED,
1947         attr_value      TEXT            NOT NULL
1948 );
1949 CREATE INDEX queued_authority_record_attr_record_idx ON vandelay.queued_authority_record_attr (record);
1950
1951 CREATE TABLE vandelay.authority_match (
1952         id                              BIGSERIAL       PRIMARY KEY,
1953         queued_record   BIGINT          REFERENCES vandelay.queued_authority_record (id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
1954         eg_record               BIGINT          REFERENCES authority.record_entry (id) DEFERRABLE INITIALLY DEFERRED,
1955     quality         INT         NOT NULL DEFAULT 0,
1956     match_score     INT         NOT NULL DEFAULT 0
1957 );
1958
1959 CREATE OR REPLACE FUNCTION vandelay.ingest_authority_marc ( ) RETURNS TRIGGER AS $$
1960 DECLARE
1961     value   TEXT;
1962     atype   TEXT;
1963     adef    RECORD;
1964 BEGIN
1965     IF TG_OP IN ('INSERT','UPDATE') AND NEW.imported_as IS NOT NULL THEN
1966         RETURN NEW;
1967     END IF;
1968
1969     FOR adef IN SELECT * FROM vandelay.authority_attr_definition LOOP
1970
1971         SELECT extract_marc_field('vandelay.queued_authority_record', id, adef.xpath, adef.remove) INTO value FROM vandelay.queued_authority_record WHERE id = NEW.id;
1972         IF (value IS NOT NULL AND value <> '') THEN
1973             INSERT INTO vandelay.queued_authority_record_attr (record, field, attr_value) VALUES (NEW.id, adef.id, value);
1974         END IF;
1975
1976     END LOOP;
1977
1978     RETURN NULL;
1979 END;
1980 $$ LANGUAGE PLPGSQL;
1981
1982 CREATE OR REPLACE FUNCTION vandelay.cleanup_authority_marc ( ) RETURNS TRIGGER AS $$
1983 BEGIN
1984     IF TG_OP IN ('INSERT','UPDATE') AND NEW.imported_as IS NOT NULL THEN
1985         RETURN NEW;
1986     END IF;
1987
1988     DELETE FROM vandelay.queued_authority_record_attr WHERE record = OLD.id;
1989     IF TG_OP = 'UPDATE' THEN
1990         RETURN NEW;
1991     END IF;
1992     RETURN OLD;
1993 END;
1994 $$ LANGUAGE PLPGSQL;
1995
1996 CREATE TRIGGER cleanup_authority_trigger
1997     BEFORE UPDATE OR DELETE ON vandelay.queued_authority_record
1998     FOR EACH ROW EXECUTE PROCEDURE vandelay.cleanup_authority_marc();
1999
2000 CREATE TRIGGER ingest_authority_trigger
2001     AFTER INSERT OR UPDATE ON vandelay.queued_authority_record
2002     FOR EACH ROW EXECUTE PROCEDURE vandelay.ingest_authority_marc();
2003
2004 CREATE OR REPLACE FUNCTION vandelay.overlay_authority_record ( import_id BIGINT, eg_id BIGINT, merge_profile_id INT ) RETURNS BOOL AS $$
2005 DECLARE
2006     merge_profile   vandelay.merge_profile%ROWTYPE;
2007     dyn_profile     vandelay.compile_profile%ROWTYPE;
2008     editor_string   TEXT;
2009     new_editor      INT;
2010     new_edit_date   TIMESTAMPTZ;
2011     source_marc     TEXT;
2012     target_marc     TEXT;
2013     eg_marc_row     authority.record_entry%ROWTYPE;
2014     eg_marc         TEXT;
2015     v_marc          TEXT;
2016     replace_rule    TEXT;
2017     match_count     INT;
2018     update_query    TEXT;
2019 BEGIN
2020
2021     SELECT  * INTO eg_marc_row
2022       FROM  authority.record_entry b
2023             JOIN vandelay.authority_match m ON (m.eg_record = b.id AND m.queued_record = import_id)
2024       LIMIT 1;
2025
2026     SELECT  q.marc INTO v_marc
2027       FROM  vandelay.queued_record q
2028             JOIN vandelay.authority_match m ON (m.queued_record = q.id AND q.id = import_id)
2029       LIMIT 1;
2030
2031     eg_marc := eg_marc_row.marc;
2032
2033     IF eg_marc IS NULL OR v_marc IS NULL THEN
2034         -- RAISE NOTICE 'no marc for vandelay or authority record';
2035         RETURN FALSE;
2036     END IF;
2037
2038     -- Extract the editor string before any modification to the vandelay
2039     -- MARC occur.
2040     editor_string := 
2041         (oils_xpath('//*[@tag="905"]/*[@code="u"]/text()',v_marc))[1];
2042
2043     -- If an editor value can be found, update the authority record
2044     -- editor and edit_date values.
2045     IF editor_string IS NOT NULL AND editor_string <> '' THEN
2046
2047         -- Vandelay.pm sets the value to 'usrname' when needed.  
2048         SELECT id INTO new_editor
2049             FROM actor.usr WHERE usrname = editor_string;
2050
2051         IF new_editor IS NULL THEN
2052             SELECT usr INTO new_editor
2053                 FROM actor.card WHERE barcode = editor_string;
2054         END IF;
2055
2056         IF new_editor IS NOT NULL THEN
2057             new_edit_date := NOW();
2058         ELSE -- No valid editor, use current values
2059             new_editor = eg_marc_row.editor;
2060             new_edit_date = eg_marc_row.edit_date;
2061         END IF;
2062     ELSE
2063         new_editor = eg_marc_row.editor;
2064         new_edit_date = eg_marc_row.edit_date;
2065     END IF;
2066
2067     dyn_profile := vandelay.compile_profile( v_marc );
2068
2069     IF merge_profile_id IS NOT NULL THEN
2070         SELECT * INTO merge_profile FROM vandelay.merge_profile WHERE id = merge_profile_id;
2071         IF FOUND THEN
2072             dyn_profile.add_rule := BTRIM( dyn_profile.add_rule || ',' || COALESCE(merge_profile.add_spec,''), ',');
2073             dyn_profile.strip_rule := BTRIM( dyn_profile.strip_rule || ',' || COALESCE(merge_profile.strip_spec,''), ',');
2074             dyn_profile.replace_rule := BTRIM( dyn_profile.replace_rule || ',' || COALESCE(merge_profile.replace_spec,''), ',');
2075             dyn_profile.preserve_rule := BTRIM( dyn_profile.preserve_rule || ',' || COALESCE(merge_profile.preserve_spec,''), ',');
2076         END IF;
2077     END IF;
2078
2079     IF dyn_profile.replace_rule <> '' AND dyn_profile.preserve_rule <> '' THEN
2080         -- RAISE NOTICE 'both replace [%] and preserve [%] specified', dyn_profile.replace_rule, dyn_profile.preserve_rule;
2081         RETURN FALSE;
2082     END IF;
2083
2084     IF dyn_profile.replace_rule = '' AND dyn_profile.preserve_rule = '' AND dyn_profile.add_rule = '' AND dyn_profile.strip_rule = '' THEN
2085         --Since we have nothing to do, just return a NOOP "we did it"
2086         RETURN TRUE;
2087     ELSIF dyn_profile.replace_rule <> '' THEN
2088         source_marc = v_marc;
2089         target_marc = eg_marc;
2090         replace_rule = dyn_profile.replace_rule;
2091     ELSE
2092         source_marc = eg_marc;
2093         target_marc = v_marc;
2094         replace_rule = dyn_profile.preserve_rule;
2095     END IF;
2096
2097     UPDATE  authority.record_entry
2098       SET   marc = vandelay.merge_record_xml( target_marc, source_marc, dyn_profile.add_rule, replace_rule, dyn_profile.strip_rule ),
2099             editor = new_editor,
2100             edit_date = new_edit_date
2101       WHERE id = eg_id;
2102
2103     IF NOT FOUND THEN 
2104         -- Import/merge failed.  Nothing left to do.
2105         RETURN FALSE;
2106     END IF;
2107
2108     -- Authority record successfully merged / imported.
2109
2110     -- Update the vandelay record to show the successful import.
2111     UPDATE  vandelay.queued_authority_record
2112       SET   imported_as = eg_id,
2113             import_time = NOW()
2114       WHERE id = import_id;
2115
2116     RETURN TRUE;
2117
2118 END;
2119 $$ LANGUAGE PLPGSQL;
2120
2121 CREATE OR REPLACE FUNCTION vandelay.auto_overlay_authority_record ( import_id BIGINT, merge_profile_id INT ) RETURNS BOOL AS $$
2122 DECLARE
2123     eg_id           BIGINT;
2124     match_count     INT;
2125 BEGIN
2126     SELECT COUNT(*) INTO match_count FROM vandelay.authority_match WHERE queued_record = import_id;
2127
2128     IF match_count <> 1 THEN
2129         -- RAISE NOTICE 'not an exact match';
2130         RETURN FALSE;
2131     END IF;
2132
2133     SELECT  m.eg_record INTO eg_id
2134       FROM  vandelay.authority_match m
2135       WHERE m.queued_record = import_id
2136       LIMIT 1;
2137
2138     IF eg_id IS NULL THEN
2139         RETURN FALSE;
2140     END IF;
2141
2142     RETURN vandelay.overlay_authority_record( import_id, eg_id, merge_profile_id );
2143 END;
2144 $$ LANGUAGE PLPGSQL;
2145
2146 CREATE OR REPLACE FUNCTION vandelay.auto_overlay_authority_queue ( queue_id BIGINT, merge_profile_id INT ) RETURNS SETOF BIGINT AS $$
2147 DECLARE
2148     queued_record   vandelay.queued_authority_record%ROWTYPE;
2149 BEGIN
2150
2151     FOR queued_record IN SELECT * FROM vandelay.queued_authority_record WHERE queue = queue_id AND import_time IS NULL LOOP
2152
2153         IF vandelay.auto_overlay_authority_record( queued_record.id, merge_profile_id ) THEN
2154             RETURN NEXT queued_record.id;
2155         END IF;
2156
2157     END LOOP;
2158
2159     RETURN;
2160     
2161 END;
2162 $$ LANGUAGE PLPGSQL;
2163
2164 CREATE OR REPLACE FUNCTION vandelay.auto_overlay_authority_queue ( queue_id BIGINT ) RETURNS SETOF BIGINT AS $$
2165     SELECT * FROM vandelay.auto_overlay_authority_queue( $1, NULL );
2166 $$ LANGUAGE SQL;
2167
2168 CREATE OR REPLACE FUNCTION vandelay.match_set_test_authxml(
2169     match_set_id INTEGER, record_xml TEXT
2170 ) RETURNS SETOF vandelay.match_set_test_result AS $$
2171 DECLARE
2172     tags_rstore HSTORE;
2173     heading     TEXT;
2174     coal        TEXT;
2175     joins       TEXT;
2176     query_      TEXT;
2177     wq          TEXT;
2178     qvalue      INTEGER;
2179     rec         RECORD;
2180 BEGIN
2181     tags_rstore := vandelay.flatten_marc_hstore(record_xml);
2182
2183     SELECT normalize_heading INTO heading 
2184         FROM authority.normalize_heading(record_xml);
2185
2186     CREATE TEMPORARY TABLE _vandelay_tmp_qrows (q INTEGER);
2187     CREATE TEMPORARY TABLE _vandelay_tmp_jrows (j TEXT);
2188
2189     -- generate the where clause and return that directly (into wq), and as
2190     -- a side-effect, populate the _vandelay_tmp_[qj]rows tables.
2191     wq := vandelay.get_expr_from_match_set(
2192         match_set_id, tags_rstore, heading);
2193
2194     query_ := 'SELECT DISTINCT(record), ';
2195
2196     -- qrows table is for the quality bits we add to the SELECT clause
2197     SELECT STRING_AGG(
2198         'COALESCE(n' || q::TEXT || '.quality, 0)', ' + '
2199     ) INTO coal FROM _vandelay_tmp_qrows;
2200
2201     -- our query string so far is the SELECT clause and the inital FROM.
2202     -- no JOINs yet nor the WHERE clause
2203     query_ := query_ || coal || ' AS quality ' || E'\n';
2204
2205     -- jrows table is for the joins we must make (and the real text conditions)
2206     SELECT STRING_AGG(j, E'\n') INTO joins
2207         FROM _vandelay_tmp_jrows;
2208
2209     -- add those joins and the where clause to our query.
2210     query_ := query_ || joins || E'\n';
2211
2212     query_ := query_ || 'JOIN authority.record_entry are ON (are.id = record) ' 
2213         || 'WHERE ' || wq || ' AND not are.deleted';
2214
2215     -- this will return rows of record,quality
2216     FOR rec IN EXECUTE query_ USING tags_rstore LOOP
2217         RETURN NEXT rec;
2218     END LOOP;
2219
2220     DROP TABLE _vandelay_tmp_qrows;
2221     DROP TABLE _vandelay_tmp_jrows;
2222     RETURN;
2223 END;
2224 $$ LANGUAGE PLPGSQL;
2225
2226 CREATE OR REPLACE FUNCTION vandelay.measure_auth_record_quality 
2227     ( xml TEXT, match_set_id INT ) RETURNS INT AS $_$
2228 DECLARE
2229     out_q   INT := 0;
2230     rvalue  TEXT;
2231     test    vandelay.match_set_quality%ROWTYPE;
2232 BEGIN
2233
2234     FOR test IN SELECT * FROM vandelay.match_set_quality 
2235             WHERE match_set = match_set_id LOOP
2236         IF test.tag IS NOT NULL THEN
2237             FOR rvalue IN SELECT value FROM vandelay.flatten_marc( xml ) 
2238                 WHERE tag = test.tag AND subfield = test.subfield LOOP
2239                 IF test.value = rvalue THEN
2240                     out_q := out_q + test.quality;
2241                 END IF;
2242             END LOOP;
2243         END IF;
2244     END LOOP;
2245
2246     RETURN out_q;
2247 END;
2248 $_$ LANGUAGE PLPGSQL;
2249
2250
2251
2252 CREATE OR REPLACE FUNCTION vandelay.match_authority_record() RETURNS TRIGGER AS $func$
2253 DECLARE
2254     incoming_existing_id    TEXT;
2255     test_result             vandelay.match_set_test_result%ROWTYPE;
2256     tmp_rec                 BIGINT;
2257     match_set               INT;
2258 BEGIN
2259     IF TG_OP IN ('INSERT','UPDATE') AND NEW.imported_as IS NOT NULL THEN
2260         RETURN NEW;
2261     END IF;
2262
2263     DELETE FROM vandelay.authority_match WHERE queued_record = NEW.id;
2264
2265     SELECT q.match_set INTO match_set FROM vandelay.authority_queue q WHERE q.id = NEW.queue;
2266
2267     IF match_set IS NOT NULL THEN
2268         NEW.quality := vandelay.measure_auth_record_quality( NEW.marc, match_set );
2269     END IF;
2270
2271     -- Perfect matches on 901$c exit early with a match with high quality.
2272     incoming_existing_id :=
2273         oils_xpath_string('//*[@tag="901"]/*[@code="c"][1]', NEW.marc);
2274
2275     IF incoming_existing_id IS NOT NULL AND incoming_existing_id != '' THEN
2276         SELECT id INTO tmp_rec FROM authority.record_entry WHERE id = incoming_existing_id::bigint;
2277         IF tmp_rec IS NOT NULL THEN
2278             INSERT INTO vandelay.authority_match (queued_record, eg_record, match_score, quality) 
2279                 SELECT
2280                     NEW.id, 
2281                     b.id,
2282                     9999,
2283                     -- note: no match_set means quality==0
2284                     vandelay.measure_auth_record_quality( b.marc, match_set )
2285                 FROM authority.record_entry b
2286                 WHERE id = incoming_existing_id::bigint;
2287         END IF;
2288     END IF;
2289
2290     IF match_set IS NULL THEN
2291         RETURN NEW;
2292     END IF;
2293
2294     FOR test_result IN SELECT * FROM
2295         vandelay.match_set_test_authxml(match_set, NEW.marc) LOOP
2296
2297         INSERT INTO vandelay.authority_match ( queued_record, eg_record, match_score, quality )
2298             SELECT  
2299                 NEW.id,
2300                 test_result.record,
2301                 test_result.quality,
2302                 vandelay.measure_auth_record_quality( b.marc, match_set )
2303                 FROM  authority.record_entry b
2304                 WHERE id = test_result.record;
2305
2306     END LOOP;
2307
2308     RETURN NEW;
2309 END;
2310 $func$ LANGUAGE PLPGSQL;
2311
2312 CREATE TRIGGER zz_match_auths_trigger
2313     BEFORE INSERT OR UPDATE ON vandelay.queued_authority_record
2314     FOR EACH ROW EXECUTE PROCEDURE vandelay.match_authority_record();
2315
2316 CREATE OR REPLACE FUNCTION vandelay.auto_overlay_authority_record_with_best ( import_id BIGINT, merge_profile_id INT, lwm_ratio_value_p NUMERIC ) RETURNS BOOL AS $$
2317 DECLARE
2318     eg_id           BIGINT;
2319     lwm_ratio_value NUMERIC;
2320 BEGIN
2321
2322     lwm_ratio_value := COALESCE(lwm_ratio_value_p, 0.0);
2323
2324     PERFORM * FROM vandelay.queued_authority_record WHERE import_time IS NOT NULL AND id = import_id;
2325
2326     IF FOUND THEN
2327         -- RAISE NOTICE 'already imported, cannot auto-overlay'
2328         RETURN FALSE;
2329     END IF;
2330
2331     SELECT  m.eg_record INTO eg_id
2332       FROM  vandelay.authority_match m
2333             JOIN vandelay.queued_authority_record qr ON (m.queued_record = qr.id)
2334             JOIN vandelay.authority_queue q ON (qr.queue = q.id)
2335             JOIN authority.record_entry r ON (r.id = m.eg_record)
2336       WHERE m.queued_record = import_id
2337             AND qr.quality::NUMERIC / COALESCE(NULLIF(m.quality,0),1)::NUMERIC >= lwm_ratio_value
2338       ORDER BY  m.match_score DESC, -- required match score
2339                 qr.quality::NUMERIC / COALESCE(NULLIF(m.quality,0),1)::NUMERIC DESC, -- quality tie breaker
2340                 m.id -- when in doubt, use the first match
2341       LIMIT 1;
2342
2343     IF eg_id IS NULL THEN
2344         -- RAISE NOTICE 'incoming record is not of high enough quality';
2345         RETURN FALSE;
2346     END IF;
2347
2348     RETURN vandelay.overlay_authority_record( import_id, eg_id, merge_profile_id );
2349 END;
2350 $$ LANGUAGE PLPGSQL;
2351
2352
2353
2354
2355 -- Vandelay (for importing and exporting records) 012.schema.vandelay.sql 
2356 --INSERT INTO vandelay.bib_attr_definition ( id, code, description, xpath ) VALUES (1, 'title', oils_i18n_gettext(1, 'vqbrad', 'Title of work', 'description'),'//*[@tag="245"]/*[contains("abcmnopr",@code)]');
2357 --INSERT INTO vandelay.bib_attr_definition ( id, code, description, xpath ) VALUES (2, 'author', oils_i18n_gettext(1, 'vqbrad', 'Author of work', 'description'),'//*[@tag="100" or @tag="110" or @tag="113"]/*[contains("ad",@code)]');
2358 --INSERT INTO vandelay.bib_attr_definition ( id, code, description, xpath ) VALUES (3, 'language', oils_i18n_gettext(3, 'vqbrad', 'Language of work', 'description'),'//*[@tag="240"]/*[@code="l"][1]');
2359 --INSERT INTO vandelay.bib_attr_definition ( id, code, description, xpath ) VALUES (4, 'pagination', oils_i18n_gettext(4, 'vqbrad', 'Pagination', 'description'),'//*[@tag="300"]/*[@code="a"][1]');
2360 --INSERT INTO vandelay.bib_attr_definition ( id, code, description, xpath, ident, remove ) VALUES (5, 'isbn',oils_i18n_gettext(5, 'vqbrad', 'ISBN', 'description'),'//*[@tag="020"]/*[@code="a"]', TRUE, $r$(?:-|\s.+$)$r$);
2361 --INSERT INTO vandelay.bib_attr_definition ( id, code, description, xpath, ident, remove ) VALUES (6, 'issn',oils_i18n_gettext(6, 'vqbrad', 'ISSN', 'description'),'//*[@tag="022"]/*[@code="a"]', TRUE, $r$(?:-|\s.+$)$r$);
2362 --INSERT INTO vandelay.bib_attr_definition ( id, code, description, xpath ) VALUES (7, 'price',oils_i18n_gettext(7, 'vqbrad', 'Price', 'description'),'//*[@tag="020" or @tag="022"]/*[@code="c"][1]');
2363 --INSERT INTO vandelay.bib_attr_definition ( id, code, description, xpath, ident ) VALUES (8, 'rec_identifier',oils_i18n_gettext(8, 'vqbrad', 'Accession Number', 'description'),'//*[@tag="001"]', TRUE);
2364 --INSERT INTO vandelay.bib_attr_definition ( id, code, description, xpath, ident ) VALUES (9, 'eg_tcn',oils_i18n_gettext(9, 'vqbrad', 'TCN Value', 'description'),'//*[@tag="901"]/*[@code="a"]', TRUE);
2365 --INSERT INTO vandelay.bib_attr_definition ( id, code, description, xpath, ident ) VALUES (10, 'eg_tcn_source',oils_i18n_gettext(10, 'vqbrad', 'TCN Source', 'description'),'//*[@tag="901"]/*[@code="b"]', TRUE);
2366 --INSERT INTO vandelay.bib_attr_definition ( id, code, description, xpath, ident ) VALUES (11, 'eg_identifier',oils_i18n_gettext(11, 'vqbrad', 'Internal ID', 'description'),'//*[@tag="901"]/*[@code="c"]', TRUE);
2367 --INSERT INTO vandelay.bib_attr_definition ( id, code, description, xpath ) VALUES (12, 'publisher',oils_i18n_gettext(12, 'vqbrad', 'Publisher', 'description'),'//*[@tag="260"]/*[@code="b"][1]');
2368 --INSERT INTO vandelay.bib_attr_definition ( id, code, description, xpath, remove ) VALUES (13, 'pubdate',oils_i18n_gettext(13, 'vqbrad', 'Publication Date', 'description'),'//*[@tag="260"]/*[@code="c"][1]',$r$\D$r$);
2369 --INSERT INTO vandelay.bib_attr_definition ( id, code, description, xpath ) VALUES (14, 'edition',oils_i18n_gettext(14, 'vqbrad', 'Edition', 'description'),'//*[@tag="250"]/*[@code="a"][1]');
2370 --
2371 --INSERT INTO vandelay.import_item_attr_definition (
2372 --    owner, name, tag, owning_lib, circ_lib, location,
2373 --    call_number, circ_modifier, barcode, price, copy_number,
2374 --    circulate, ref, holdable, opac_visible, status
2375 --) VALUES (
2376 --    1,
2377 --    'Evergreen 852 export format',
2378 --    '852',
2379 --    '[@code = "b"][1]',
2380 --    '[@code = "b"][2]',
2381 --    'c',
2382 --    'j',
2383 --    'g',
2384 --    'p',
2385 --    'y',
2386 --    't',
2387 --    '[@code = "x" and text() = "circulating"]',
2388 --    '[@code = "x" and text() = "reference"]',
2389 --    '[@code = "x" and text() = "holdable"]',
2390 --    '[@code = "x" and text() = "visible"]',
2391 --    'z'
2392 --);
2393 --
2394 --INSERT INTO vandelay.import_item_attr_definition (
2395 --    owner,
2396 --    name,
2397 --    tag,
2398 --    owning_lib,
2399 --    location,
2400 --    call_number,
2401 --    circ_modifier,
2402 --    barcode,
2403 --    price,
2404 --    status
2405 --) VALUES (
2406 --    1,
2407 --    'Unicorn Import format -- 999',
2408 --    '999',
2409 --    'm',
2410 --    'l',
2411 --    'a',
2412 --    't',
2413 --    'i',
2414 --    'p',
2415 --    'k'
2416 --);
2417 --
2418 --INSERT INTO vandelay.authority_attr_definition ( code, description, xpath, ident ) VALUES ('rec_identifier','Identifier','//*[@tag="001"]', TRUE);
2419
2420
2421 CREATE TABLE vandelay.session_tracker (
2422     id          BIGSERIAL PRIMARY KEY,
2423
2424     -- string of characters (e.g. md5) used for linking trackers
2425     -- of different actions into a series.  There can be multiple
2426     -- session_keys of each action type, creating the opportunity
2427     -- to link multiple action trackers into a single session.
2428     session_key TEXT NOT NULL,
2429
2430     -- optional user-supplied name
2431     name        TEXT NOT NULL, 
2432
2433     usr         INTEGER NOT NULL REFERENCES actor.usr(id)
2434                 DEFERRABLE INITIALLY DEFERRED,
2435
2436     -- org unit can be derived from WS
2437     workstation INTEGER NOT NULL REFERENCES actor.workstation(id)
2438                 ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
2439
2440     -- bib/auth
2441     record_type TEXT NOT NULL DEFAULT 'bib',
2442
2443     -- Queue defines the source of the data, it does not necessarily
2444     -- mean that an action is being performed against an entire queue.
2445     -- E.g. some imports are misc. lists of record IDs, but they always 
2446     -- come from one queue.
2447     -- No foreign key -- could be auth or bib queue.
2448     queue       BIGINT NOT NULL,
2449
2450     create_time TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
2451     update_time TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
2452
2453     state       TEXT NOT NULL DEFAULT 'active',
2454
2455     action_type TEXT NOT NULL DEFAULT 'enqueue', -- import
2456
2457     -- total number of tasks to perform / loosely defined
2458     -- could be # of recs to import or # of recs + # of copies 
2459     -- depending on the import context
2460     total_actions INTEGER NOT NULL DEFAULT 0,
2461
2462     -- total number of tasked performed so far
2463     actions_performed INTEGER NOT NULL DEFAULT 0,
2464
2465     CONSTRAINT vand_tracker_valid_state 
2466         CHECK (state IN ('active','error','complete')),
2467
2468     CONSTRAINT vand_tracker_valid_action_type
2469         CHECK (action_type IN ('upload', 'enqueue', 'import')),
2470
2471     CONSTRAINT vand_tracker_valid_record_type
2472         CHECK (record_type IN ('bib', 'authority'))
2473 );
2474
2475 COMMIT;
2476