]> git.evergreen-ils.org Git - working/Evergreen.git/blob - Open-ILS/src/sql/Pg/012.schema.vandelay.sql
LP1915464 follow-up: use spaces, not tabs; remove extra comma
[working/Evergreen.git] / Open-ILS / src / sql / Pg / 012.schema.vandelay.sql
1 DROP SCHEMA IF EXISTS vandelay CASCADE;
2
3 BEGIN;
4
5 CREATE SCHEMA vandelay;
6
7 CREATE TABLE vandelay.match_set (
8     id      SERIAL  PRIMARY KEY,
9     name    TEXT        NOT NULL,
10     owner   INT     NOT NULL REFERENCES actor.org_unit (id) ON DELETE CASCADE,
11     mtype   TEXT        NOT NULL DEFAULT 'biblio', -- 'biblio','authority','mfhd'?, others?
12     CONSTRAINT name_once_per_owner_mtype UNIQUE (name, owner, mtype)
13 );
14
15 -- Table to define match points, either FF via SVF or tag+subfield
16 CREATE TABLE vandelay.match_set_point (
17     id          SERIAL  PRIMARY KEY,
18     match_set   INT     REFERENCES vandelay.match_set (id) ON DELETE CASCADE,
19     parent      INT     REFERENCES vandelay.match_set_point (id),
20     bool_op     TEXT    CHECK (bool_op IS NULL OR (bool_op IN ('AND','OR','NOT'))),
21     svf         TEXT    REFERENCES config.record_attr_definition (name),
22     tag         TEXT,
23     subfield    TEXT,
24     negate      BOOL    DEFAULT FALSE,
25     quality     INT     NOT NULL DEFAULT 1, -- higher is better
26     heading     BOOLEAN NOT NULL DEFAULT FALSE, -- match on authority heading
27     CONSTRAINT vmsp_need_a_subfield_with_a_tag CHECK ((tag IS NOT NULL AND subfield IS NOT NULL) OR tag IS NULL),
28     CONSTRAINT vmsp_need_a_tag_or_a_ff_or_a_bo CHECK (
29         (tag IS NOT NULL AND svf IS NULL AND heading IS FALSE AND bool_op IS NULL) OR 
30         (tag IS NULL AND svf IS NOT NULL AND heading IS FALSE AND bool_op IS NULL) OR 
31         (tag IS NULL AND svf IS NULL AND heading IS TRUE AND bool_op IS NULL) OR 
32         (tag IS NULL AND svf IS NULL AND heading IS FALSE AND bool_op IS NOT NULL)
33     )
34 );
35
36 CREATE TABLE vandelay.match_set_quality (
37     id          SERIAL  PRIMARY KEY,
38     match_set   INT     NOT NULL REFERENCES vandelay.match_set (id) ON DELETE CASCADE,
39     svf         TEXT    REFERENCES config.record_attr_definition,
40     tag         TEXT,
41     subfield    TEXT,
42     value       TEXT    NOT NULL,
43     quality     INT     NOT NULL DEFAULT 1, -- higher is better
44     CONSTRAINT vmsq_need_a_subfield_with_a_tag CHECK ((tag IS NOT NULL AND subfield IS NOT NULL) OR tag IS NULL),
45     CONSTRAINT vmsq_need_a_tag_or_a_ff CHECK ((tag IS NOT NULL AND svf IS NULL) OR (tag IS NULL AND svf IS NOT NULL))
46 );
47 CREATE UNIQUE INDEX vmsq_def_once_per_set ON vandelay.match_set_quality (match_set, COALESCE(tag,''), COALESCE(subfield,''), COALESCE(svf,''), value);
48
49
50 CREATE TABLE vandelay.queue (
51         id                              BIGSERIAL       PRIMARY KEY,
52         owner                   INT                     NOT NULL REFERENCES actor.usr (id) DEFERRABLE INITIALLY DEFERRED,
53         name                    TEXT            NOT NULL,
54         complete                BOOL            NOT NULL DEFAULT FALSE,
55     match_set       INT         REFERENCES vandelay.match_set (id) ON UPDATE CASCADE ON DELETE SET NULL DEFERRABLE INITIALLY DEFERRED
56 );
57
58 CREATE TABLE vandelay.queued_record (
59     id                  BIGSERIAL                   PRIMARY KEY,
60     create_time TIMESTAMP WITH TIME ZONE    NOT NULL DEFAULT NOW(),
61     import_time TIMESTAMP WITH TIME ZONE,
62         purpose         TEXT                                            NOT NULL DEFAULT 'import' CHECK (purpose IN ('import','overlay')),
63     marc                TEXT                        NOT NULL,
64     quality     INT                         NOT NULL DEFAULT 0
65 );
66
67
68
69 /* Bib stuff at the top */
70 ----------------------------------------------------
71
72 CREATE TABLE vandelay.bib_attr_definition (
73         id                      SERIAL  PRIMARY KEY,
74         code            TEXT    UNIQUE NOT NULL,
75         description     TEXT,
76         xpath           TEXT    NOT NULL,
77         remove          TEXT    NOT NULL DEFAULT ''
78 );
79
80 -- Each TEXT field (other than 'name') should hold an XPath predicate for pulling the data needed
81 -- DROP TABLE vandelay.import_item_attr_definition CASCADE;
82 CREATE TABLE vandelay.import_item_attr_definition (
83     id              BIGSERIAL   PRIMARY KEY,
84     owner           INT         NOT NULL REFERENCES actor.org_unit (id) ON UPDATE CASCADE ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
85     name            TEXT        NOT NULL,
86     tag             TEXT        NOT NULL,
87     keep            BOOL        NOT NULL DEFAULT FALSE,
88     owning_lib      TEXT,
89     circ_lib        TEXT,
90     call_number     TEXT,
91     copy_number     TEXT,
92     status          TEXT,
93     location        TEXT,
94     circulate       TEXT,
95     deposit         TEXT,
96     deposit_amount  TEXT,
97     ref             TEXT,
98     holdable        TEXT,
99     price           TEXT,
100     barcode         TEXT,
101     circ_modifier   TEXT,
102     circ_as_type    TEXT,
103     alert_message   TEXT,
104     opac_visible    TEXT,
105     pub_note_title  TEXT,
106     pub_note        TEXT,
107     priv_note_title TEXT,
108     priv_note       TEXT,
109     internal_id     TEXT,
110     stat_cat_data   TEXT,
111     parts_data      TEXT,
112         CONSTRAINT vand_import_item_attr_def_idx UNIQUE (owner,name)
113 );
114
115 CREATE TABLE vandelay.import_error (
116     code        TEXT    PRIMARY KEY,
117     description TEXT    NOT NULL -- i18n
118 );
119
120 CREATE TYPE vandelay.bib_queue_queue_type AS ENUM ('bib', 'acq');
121
122 CREATE TABLE vandelay.bib_queue (
123         queue_type          vandelay.bib_queue_queue_type       NOT NULL DEFAULT 'bib',
124         item_attr_def   BIGINT REFERENCES vandelay.import_item_attr_definition (id) ON DELETE SET NULL DEFERRABLE INITIALLY DEFERRED,
125     match_bucket    INTEGER, -- REFERENCES container.biblio_record_entry_bucket(id);
126         CONSTRAINT vand_bib_queue_name_once_per_owner_const UNIQUE (owner,name,queue_type)
127 ) INHERITS (vandelay.queue);
128 ALTER TABLE vandelay.bib_queue ADD PRIMARY KEY (id);
129
130 CREATE TABLE vandelay.queued_bib_record (
131         queue               INT         NOT NULL REFERENCES vandelay.bib_queue (id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
132         bib_source          INT         REFERENCES config.bib_source (id) DEFERRABLE INITIALLY DEFERRED,
133         imported_as     BIGINT  REFERENCES biblio.record_entry (id) DEFERRABLE INITIALLY DEFERRED,
134         import_error    TEXT    REFERENCES vandelay.import_error (code) ON DELETE SET NULL ON UPDATE CASCADE DEFERRABLE INITIALLY DEFERRED,
135         error_detail    TEXT
136 ) INHERITS (vandelay.queued_record);
137 ALTER TABLE vandelay.queued_bib_record ADD PRIMARY KEY (id);
138 CREATE INDEX queued_bib_record_queue_idx ON vandelay.queued_bib_record (queue);
139
140 CREATE TABLE vandelay.queued_bib_record_attr (
141         id                      BIGSERIAL       PRIMARY KEY,
142         record          BIGINT          NOT NULL REFERENCES vandelay.queued_bib_record (id) DEFERRABLE INITIALLY DEFERRED,
143         field           INT                     NOT NULL REFERENCES vandelay.bib_attr_definition (id) DEFERRABLE INITIALLY DEFERRED,
144         attr_value      TEXT            NOT NULL
145 );
146 CREATE INDEX queued_bib_record_attr_record_idx ON vandelay.queued_bib_record_attr (record);
147
148 CREATE TABLE vandelay.bib_match (
149         id                              BIGSERIAL       PRIMARY KEY,
150         queued_record   BIGINT          REFERENCES vandelay.queued_bib_record (id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
151         eg_record               BIGINT          REFERENCES biblio.record_entry (id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
152     quality         INT         NOT NULL DEFAULT 1,
153     match_score     INT         NOT NULL DEFAULT 0
154 );
155 CREATE INDEX bib_match_queued_record_idx ON vandelay.bib_match (queued_record);
156
157 CREATE TABLE vandelay.import_item (
158     id              BIGSERIAL   PRIMARY KEY,
159     record          BIGINT      NOT NULL REFERENCES vandelay.queued_bib_record (id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
160     definition      BIGINT      NOT NULL REFERENCES vandelay.import_item_attr_definition (id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
161         import_error    TEXT        REFERENCES vandelay.import_error (code) ON DELETE SET NULL ON UPDATE CASCADE DEFERRABLE INITIALLY DEFERRED,
162         error_detail    TEXT,
163     imported_as     BIGINT,
164     import_time     TIMESTAMP WITH TIME ZONE,
165     owning_lib      INT,
166     circ_lib        INT,
167     call_number     TEXT,
168     copy_number     INT,
169     status          INT,
170     location        INT,
171     circulate       BOOL,
172     deposit         BOOL,
173     deposit_amount  NUMERIC(8,2),
174     ref             BOOL,
175     holdable        BOOL,
176     price           NUMERIC(8,2),
177     barcode         TEXT,
178     circ_modifier   TEXT,
179     circ_as_type    TEXT,
180     alert_message   TEXT,
181     pub_note        TEXT,
182     priv_note       TEXT,
183     stat_cat_data   TEXT,
184     parts_data      TEXT,
185     opac_visible    BOOL,
186     internal_id     BIGINT -- queue_type == 'acq' ? acq.lineitem_detail.id : asset.copy.id
187 );
188 CREATE INDEX import_item_record_idx ON vandelay.import_item (record);
189
190 CREATE TABLE vandelay.import_bib_trash_group(
191     id           SERIAL  PRIMARY KEY,
192     owner        INTEGER NOT NULL REFERENCES actor.org_unit(id),
193     label        TEXT    NOT NULL, --i18n
194     always_apply BOOLEAN NOT NULL DEFAULT FALSE,
195         CONSTRAINT vand_import_bib_trash_grp_owner_label UNIQUE (owner, label)
196 );
197  
198 CREATE TABLE vandelay.import_bib_trash_fields (
199     id         BIGSERIAL PRIMARY KEY,
200     grp        INTEGER   NOT NULL REFERENCES vandelay.import_bib_trash_group,
201     field      TEXT      NOT NULL,
202     CONSTRAINT vand_import_bib_trash_fields_once_per UNIQUE (grp, field)
203 );
204
205 CREATE TABLE vandelay.merge_profile (
206     id              BIGSERIAL   PRIMARY KEY,
207     owner           INT         NOT NULL REFERENCES actor.org_unit (id) ON UPDATE CASCADE ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
208     name            TEXT        NOT NULL,
209     add_spec        TEXT,
210     replace_spec    TEXT,
211     strip_spec      TEXT,
212     preserve_spec   TEXT,
213     update_bib_source BOOLEAN   NOT NULL DEFAULT FALSE,
214     update_bib_editor BOOLEAN   NOT NULL DEFAULT FALSE,
215     lwm_ratio       NUMERIC,
216         CONSTRAINT vand_merge_prof_owner_name_idx UNIQUE (owner,name),
217         CONSTRAINT add_replace_strip_or_preserve CHECK ((preserve_spec IS NOT NULL OR replace_spec IS NOT NULL) OR (preserve_spec IS NULL AND replace_spec IS NULL))
218 );
219
220 CREATE OR REPLACE FUNCTION vandelay.marc21_record_type( marc TEXT ) RETURNS config.marc21_rec_type_map AS $func$
221 DECLARE
222         ldr         TEXT;
223         tval        TEXT;
224         tval_rec    RECORD;
225         bval        TEXT;
226         bval_rec    RECORD;
227     retval      config.marc21_rec_type_map%ROWTYPE;
228 BEGIN
229     ldr := oils_xpath_string( '//*[local-name()="leader"]', marc );
230
231     IF ldr IS NULL OR ldr = '' THEN
232         SELECT * INTO retval FROM config.marc21_rec_type_map WHERE code = 'BKS';
233         RETURN retval;
234     END IF;
235
236     SELECT * INTO tval_rec FROM config.marc21_ff_pos_map WHERE fixed_field = 'Type' LIMIT 1; -- They're all the same
237     SELECT * INTO bval_rec FROM config.marc21_ff_pos_map WHERE fixed_field = 'BLvl' LIMIT 1; -- They're all the same
238
239
240     tval := SUBSTRING( ldr, tval_rec.start_pos + 1, tval_rec.length );
241     bval := SUBSTRING( ldr, bval_rec.start_pos + 1, bval_rec.length );
242
243     -- RAISE NOTICE 'type %, blvl %, ldr %', tval, bval, ldr;
244
245     SELECT * INTO retval FROM config.marc21_rec_type_map WHERE type_val LIKE '%' || tval || '%' AND blvl_val LIKE '%' || bval || '%';
246
247
248     IF retval.code IS NULL THEN
249         SELECT * INTO retval FROM config.marc21_rec_type_map WHERE code = 'BKS';
250     END IF;
251
252     RETURN retval;
253 END;
254 $func$ LANGUAGE PLPGSQL;
255
256 CREATE TYPE biblio.record_ff_map AS (record BIGINT, ff_name TEXT, ff_value TEXT);
257 CREATE OR REPLACE FUNCTION vandelay.marc21_extract_all_fixed_fields( marc TEXT, use_default BOOL DEFAULT FALSE ) RETURNS SETOF biblio.record_ff_map AS $func$
258 DECLARE
259     tag_data    TEXT;
260     rtype       TEXT;
261     ff_pos      RECORD;
262     output      biblio.record_ff_map%ROWTYPE;
263 BEGIN
264     rtype := (vandelay.marc21_record_type( marc )).code;
265
266     FOR ff_pos IN SELECT * FROM config.marc21_ff_pos_map WHERE rec_type = rtype ORDER BY tag DESC LOOP
267         output.ff_name  := ff_pos.fixed_field;
268         output.ff_value := NULL;
269
270         IF ff_pos.tag = 'ldr' THEN
271             output.ff_value := oils_xpath_string('//*[local-name()="leader"]', marc);
272             IF output.ff_value IS NOT NULL THEN
273                 output.ff_value := SUBSTRING( output.ff_value, ff_pos.start_pos + 1, ff_pos.length );
274                 RETURN NEXT output;
275                 output.ff_value := NULL;
276             END IF;
277         ELSE
278             FOR tag_data IN SELECT value FROM UNNEST( oils_xpath( '//*[@tag="' || UPPER(ff_pos.tag) || '"]/text()', marc ) ) x(value) LOOP
279                 output.ff_value := SUBSTRING( tag_data, ff_pos.start_pos + 1, ff_pos.length );
280                 CONTINUE WHEN output.ff_value IS NULL AND NOT use_default;
281                 IF output.ff_value IS NULL THEN output.ff_value := REPEAT( ff_pos.default_val, ff_pos.length ); END IF;
282                 RETURN NEXT output;
283                 output.ff_value := NULL;
284             END LOOP;
285         END IF;
286
287     END LOOP;
288
289     RETURN;
290 END;
291 $func$ LANGUAGE PLPGSQL;
292
293 CREATE OR REPLACE FUNCTION vandelay.marc21_extract_fixed_field_list( marc TEXT, ff TEXT, use_default BOOL DEFAULT FALSE ) RETURNS TEXT[] AS $func$
294 DECLARE
295     rtype       TEXT;
296     ff_pos      RECORD;
297     tag_data    RECORD;
298     val         TEXT;
299     collection  TEXT[] := '{}'::TEXT[];
300 BEGIN
301     rtype := (vandelay.marc21_record_type( marc )).code;
302     FOR ff_pos IN SELECT * FROM config.marc21_ff_pos_map WHERE fixed_field = ff AND rec_type = rtype ORDER BY tag DESC LOOP
303         IF ff_pos.tag = 'ldr' THEN
304             val := oils_xpath_string('//*[local-name()="leader"]', marc);
305             IF val IS NOT NULL THEN
306                 val := SUBSTRING( val, ff_pos.start_pos + 1, ff_pos.length );
307                 collection := collection || val;
308             END IF;
309         ELSE
310             FOR tag_data IN SELECT value FROM UNNEST( oils_xpath( '//*[@tag="' || UPPER(ff_pos.tag) || '"]/text()', marc ) ) x(value) LOOP
311                 val := SUBSTRING( tag_data.value, ff_pos.start_pos + 1, ff_pos.length );
312                 collection := collection || val;
313             END LOOP;
314         END IF;
315         CONTINUE WHEN NOT use_default;
316         CONTINUE WHEN ARRAY_UPPER(collection, 1) > 0;
317         val := REPEAT( ff_pos.default_val, ff_pos.length );
318         collection := collection || val;
319     END LOOP;
320
321     RETURN collection;
322 END;
323 $func$ LANGUAGE PLPGSQL;
324
325 CREATE OR REPLACE FUNCTION vandelay.marc21_extract_fixed_field( marc TEXT, ff TEXT, use_default BOOL DEFAULT FALSE ) RETURNS TEXT AS $func$
326 DECLARE
327     rtype       TEXT;
328     ff_pos      RECORD;
329     tag_data    RECORD;
330     val         TEXT;
331 BEGIN
332     rtype := (vandelay.marc21_record_type( marc )).code;
333     FOR ff_pos IN SELECT * FROM config.marc21_ff_pos_map WHERE fixed_field = ff AND rec_type = rtype ORDER BY tag DESC LOOP
334         IF ff_pos.tag = 'ldr' THEN
335             val := oils_xpath_string('//*[local-name()="leader"]', marc);
336             IF val IS NOT NULL THEN
337                 val := SUBSTRING( val, ff_pos.start_pos + 1, ff_pos.length );
338                 RETURN val;
339             END IF;
340         ELSE
341             FOR tag_data IN SELECT value FROM UNNEST( oils_xpath( '//*[@tag="' || UPPER(ff_pos.tag) || '"]/text()', marc ) ) x(value) LOOP
342                 val := SUBSTRING( tag_data.value, ff_pos.start_pos + 1, ff_pos.length );
343                 RETURN val;
344             END LOOP;
345         END IF;
346         CONTINUE WHEN NOT use_default;
347         val := REPEAT( ff_pos.default_val, ff_pos.length );
348         RETURN val;
349     END LOOP;
350
351     RETURN NULL;
352 END;
353 $func$ LANGUAGE PLPGSQL;
354
355 CREATE TYPE biblio.marc21_physical_characteristics AS ( id INT, record BIGINT, ptype TEXT, subfield INT, value INT );
356 CREATE OR REPLACE FUNCTION vandelay.marc21_physical_characteristics( marc TEXT) RETURNS SETOF biblio.marc21_physical_characteristics AS $func$
357 DECLARE
358     rowid   INT := 0;
359     _007    TEXT;
360     ptype   config.marc21_physical_characteristic_type_map%ROWTYPE;
361     psf     config.marc21_physical_characteristic_subfield_map%ROWTYPE;
362     pval    config.marc21_physical_characteristic_value_map%ROWTYPE;
363     retval  biblio.marc21_physical_characteristics%ROWTYPE;
364 BEGIN
365
366     FOR _007 IN SELECT oils_xpath_string('//*', value) FROM UNNEST(oils_xpath('//*[@tag="007"]', marc)) x(value) LOOP
367         IF _007 IS NOT NULL AND _007 <> '' THEN
368             SELECT * INTO ptype FROM config.marc21_physical_characteristic_type_map WHERE ptype_key = SUBSTRING( _007, 1, 1 );
369
370             IF ptype.ptype_key IS NOT NULL THEN
371                 FOR psf IN SELECT * FROM config.marc21_physical_characteristic_subfield_map WHERE ptype_key = ptype.ptype_key LOOP
372                     SELECT * INTO pval FROM config.marc21_physical_characteristic_value_map WHERE ptype_subfield = psf.id AND value = SUBSTRING( _007, psf.start_pos + 1, psf.length );
373
374                     IF pval.id IS NOT NULL THEN
375                         rowid := rowid + 1;
376                         retval.id := rowid;
377                         retval.ptype := ptype.ptype_key;
378                         retval.subfield := psf.id;
379                         retval.value := pval.id;
380                         RETURN NEXT retval;
381                     END IF;
382
383                 END LOOP;
384             END IF;
385         END IF;
386     END LOOP;
387
388     RETURN;
389 END;
390 $func$ LANGUAGE PLPGSQL;
391
392 CREATE TYPE vandelay.flat_marc AS ( tag CHAR(3), ind1 TEXT, ind2 TEXT, subfield TEXT, value TEXT );
393 CREATE OR REPLACE FUNCTION vandelay.flay_marc ( TEXT ) RETURNS SETOF vandelay.flat_marc AS $func$
394
395 use MARC::Record;
396 use MARC::File::XML (BinaryEncoding => 'UTF-8');
397 use MARC::Charset;
398 use strict;
399
400 MARC::Charset->assume_unicode(1);
401
402 my $xml = shift;
403 my $r = MARC::Record->new_from_xml( $xml );
404
405 return_next( { tag => 'LDR', value => $r->leader } );
406
407 for my $f ( $r->fields ) {
408     if ($f->is_control_field) {
409         return_next({ tag => $f->tag, value => $f->data });
410     } else {
411         for my $s ($f->subfields) {
412             return_next({
413                 tag      => $f->tag,
414                 ind1     => $f->indicator(1),
415                 ind2     => $f->indicator(2),
416                 subfield => $s->[0],
417                 value    => $s->[1]
418             });
419
420             if ( $f->tag eq '245' and $s->[0] eq 'a' ) {
421                 my $trim = $f->indicator(2) || 0;
422                 return_next({
423                     tag      => 'tnf',
424                     ind1     => $f->indicator(1),
425                     ind2     => $f->indicator(2),
426                     subfield => 'a',
427                     value    => substr( $s->[1], $trim )
428                 });
429             }
430         }
431     }
432 }
433
434 return undef;
435
436 $func$ LANGUAGE PLPERLU;
437
438 CREATE OR REPLACE FUNCTION vandelay.flatten_marc ( marc TEXT ) RETURNS SETOF vandelay.flat_marc AS $func$
439 DECLARE
440     output  vandelay.flat_marc%ROWTYPE;
441     field   RECORD;
442 BEGIN
443     FOR field IN SELECT * FROM vandelay.flay_marc( marc ) LOOP
444         output.ind1 := field.ind1;
445         output.ind2 := field.ind2;
446         output.tag := field.tag;
447         output.subfield := field.subfield;
448         IF field.subfield IS NOT NULL AND field.tag NOT IN ('020','022','024') THEN -- exclude standard numbers and control fields
449             output.value := naco_normalize(field.value, field.subfield);
450         ELSE
451             output.value := field.value;
452         END IF;
453
454         CONTINUE WHEN output.value IS NULL;
455
456         RETURN NEXT output;
457     END LOOP;
458 END;
459 $func$ LANGUAGE PLPGSQL;
460
461 CREATE OR REPLACE FUNCTION vandelay.extract_rec_attrs ( xml TEXT, attr_defs TEXT[]) RETURNS hstore AS $_$
462 DECLARE
463     transformed_xml TEXT;
464     prev_xfrm       TEXT;
465     normalizer      RECORD;
466     xfrm            config.xml_transform%ROWTYPE;
467     attr_value      TEXT;
468     new_attrs       HSTORE := ''::HSTORE;
469     attr_def        config.record_attr_definition%ROWTYPE;
470 BEGIN
471
472     FOR attr_def IN SELECT * FROM config.record_attr_definition WHERE name IN (SELECT * FROM UNNEST(attr_defs)) ORDER BY format LOOP
473
474         IF attr_def.tag IS NOT NULL THEN -- tag (and optional subfield list) selection
475             SELECT  STRING_AGG(x.value, COALESCE(attr_def.joiner,' ')) INTO attr_value
476               FROM  vandelay.flatten_marc(xml) AS x
477               WHERE x.tag LIKE attr_def.tag
478                     AND CASE
479                         WHEN attr_def.sf_list IS NOT NULL
480                             THEN POSITION(x.subfield IN attr_def.sf_list) > 0
481                         ELSE TRUE
482                         END
483               GROUP BY x.tag
484               ORDER BY x.tag
485               LIMIT 1;
486
487         ELSIF attr_def.fixed_field IS NOT NULL THEN -- a named fixed field, see config.marc21_ff_pos_map.fixed_field
488             attr_value := vandelay.marc21_extract_fixed_field(xml, attr_def.fixed_field);
489
490         ELSIF attr_def.xpath IS NOT NULL THEN -- and xpath expression
491
492             SELECT INTO xfrm * FROM config.xml_transform WHERE name = attr_def.format;
493
494             -- See if we can skip the XSLT ... it's expensive
495             IF prev_xfrm IS NULL OR prev_xfrm <> xfrm.name THEN
496                 -- Can't skip the transform
497                 IF xfrm.xslt <> '---' THEN
498                     transformed_xml := oils_xslt_process(xml,xfrm.xslt);
499                 ELSE
500                     transformed_xml := xml;
501                 END IF;
502
503                 prev_xfrm := xfrm.name;
504             END IF;
505
506             IF xfrm.name IS NULL THEN
507                 -- just grab the marcxml (empty) transform
508                 SELECT INTO xfrm * FROM config.xml_transform WHERE xslt = '---' LIMIT 1;
509                 prev_xfrm := xfrm.name;
510             END IF;
511
512             attr_value := oils_xpath_string(attr_def.xpath, transformed_xml, COALESCE(attr_def.joiner,' '), ARRAY[ARRAY[xfrm.prefix, xfrm.namespace_uri]]);
513
514         ELSIF attr_def.phys_char_sf IS NOT NULL THEN -- a named Physical Characteristic, see config.marc21_physical_characteristic_*_map
515             SELECT  m.value::TEXT INTO attr_value
516               FROM  vandelay.marc21_physical_characteristics(xml) v
517                     JOIN config.marc21_physical_characteristic_value_map m ON (m.id = v.value)
518               WHERE v.subfield = attr_def.phys_char_sf
519               LIMIT 1; -- Just in case ...
520
521         END IF;
522
523         -- apply index normalizers to attr_value
524         FOR normalizer IN
525             SELECT  n.func AS func,
526                     n.param_count AS param_count,
527                     m.params AS params
528               FROM  config.index_normalizer n
529                     JOIN config.record_attr_index_norm_map m ON (m.norm = n.id)
530               WHERE attr = attr_def.name
531               ORDER BY m.pos LOOP
532                 EXECUTE 'SELECT ' || normalizer.func || '(' ||
533                     quote_nullable( attr_value ) ||
534                     CASE
535                         WHEN normalizer.param_count > 0
536                             THEN ',' || REPLACE(REPLACE(BTRIM(normalizer.params,'[]'),E'\'',E'\\\''),E'"',E'\'')
537                             ELSE ''
538                         END ||
539                     ')' INTO attr_value;
540
541         END LOOP;
542
543         -- Add the new value to the hstore
544         new_attrs := new_attrs || hstore( attr_def.name, attr_value );
545
546     END LOOP;
547
548     RETURN new_attrs;
549 END;
550 $_$ LANGUAGE PLPGSQL;
551
552 CREATE OR REPLACE FUNCTION vandelay.extract_rec_attrs ( xml TEXT ) RETURNS hstore AS $_$
553     SELECT vandelay.extract_rec_attrs( $1, (SELECT ARRAY_AGG(name) FROM config.record_attr_definition));
554 $_$ LANGUAGE SQL;
555
556 -- Everything between this comment and the beginning of the definition of
557 -- vandelay.match_bib_record() is strictly in service of that function.
558 CREATE TYPE vandelay.match_set_test_result AS (record BIGINT, quality INTEGER);
559
560 CREATE OR REPLACE FUNCTION vandelay.match_set_test_marcxml(
561     match_set_id INTEGER, record_xml TEXT, bucket_id INTEGER 
562 ) RETURNS SETOF vandelay.match_set_test_result AS $$
563 DECLARE
564     tags_rstore HSTORE;
565     svf_rstore  HSTORE;
566     coal        TEXT;
567     joins       TEXT;
568     query_      TEXT;
569     wq          TEXT;
570     qvalue      INTEGER;
571     rec         RECORD;
572 BEGIN
573     tags_rstore := vandelay.flatten_marc_hstore(record_xml);
574     svf_rstore := vandelay.extract_rec_attrs(record_xml);
575
576     CREATE TEMPORARY TABLE _vandelay_tmp_qrows (q INTEGER);
577     CREATE TEMPORARY TABLE _vandelay_tmp_jrows (j TEXT);
578
579     -- generate the where clause and return that directly (into wq), and as
580     -- a side-effect, populate the _vandelay_tmp_[qj]rows tables.
581     wq := vandelay.get_expr_from_match_set(match_set_id, tags_rstore);
582
583     query_ := 'SELECT DISTINCT(record), ';
584
585     -- qrows table is for the quality bits we add to the SELECT clause
586     SELECT STRING_AGG(
587         'COALESCE(n' || q::TEXT || '.quality, 0)', ' + '
588     ) INTO coal FROM _vandelay_tmp_qrows;
589
590     -- our query string so far is the SELECT clause and the inital FROM.
591     -- no JOINs yet nor the WHERE clause
592     query_ := query_ || coal || ' AS quality ' || E'\n';
593
594     -- jrows table is for the joins we must make (and the real text conditions)
595     SELECT STRING_AGG(j, E'\n') INTO joins
596         FROM _vandelay_tmp_jrows;
597
598     -- add those joins and the where clause to our query.
599     query_ := query_ || joins || E'\n';
600
601     -- join the record bucket
602     IF bucket_id IS NOT NULL THEN
603         query_ := query_ || 'JOIN container.biblio_record_entry_bucket_item ' ||
604             'brebi ON (brebi.target_biblio_record_entry = record ' ||
605             'AND brebi.bucket = ' || bucket_id || E')\n';
606     END IF;
607
608     query_ := query_ || 'JOIN biblio.record_entry bre ON (bre.id = record) ' || 'WHERE ' || wq || ' AND not bre.deleted';
609
610     -- this will return rows of record,quality
611     FOR rec IN EXECUTE query_ USING tags_rstore, svf_rstore LOOP
612         RETURN NEXT rec;
613     END LOOP;
614
615     DROP TABLE _vandelay_tmp_qrows;
616     DROP TABLE _vandelay_tmp_jrows;
617     RETURN;
618 END;
619 $$ LANGUAGE PLPGSQL;
620
621
622 CREATE OR REPLACE FUNCTION vandelay.flatten_marc_hstore(
623     record_xml TEXT
624 ) RETURNS HSTORE AS $func$
625 BEGIN
626     RETURN (SELECT
627         HSTORE(
628             ARRAY_AGG(tag || (COALESCE(subfield, ''))),
629             ARRAY_AGG(value)
630         )
631         FROM (
632             SELECT  tag, subfield, ARRAY_AGG(value)::TEXT AS value
633               FROM  (SELECT tag,
634                             subfield,
635                             CASE WHEN tag = '020' THEN -- caseless -- isbn
636                                 LOWER((SELECT REGEXP_MATCHES(value,$$^(\S{10,17})$$))[1] || '%')
637                             WHEN tag = '022' THEN -- caseless -- issn
638                                 LOWER((SELECT REGEXP_MATCHES(value,$$^(\S{4}[- ]?\S{4})$$))[1] || '%')
639                             WHEN tag = '024' THEN -- caseless -- upc (other)
640                                 LOWER(value || '%')
641                             ELSE
642                                 value
643                             END AS value
644                       FROM  vandelay.flatten_marc(record_xml)) x
645                 GROUP BY tag, subfield ORDER BY tag, subfield
646         ) subquery
647     );
648 END;
649 $func$ LANGUAGE PLPGSQL;
650
651 -- backwards compat version so we don't have 
652 -- to modify vandelay.match_set_test_marcxml()
653 CREATE OR REPLACE FUNCTION vandelay.get_expr_from_match_set(
654     match_set_id INTEGER,
655     tags_rstore HSTORE
656 ) RETURNS TEXT AS $$
657 BEGIN
658     RETURN vandelay.get_expr_from_match_set(
659         match_set_id, tags_rstore, NULL);
660 END;
661 $$  LANGUAGE PLPGSQL;
662
663 CREATE OR REPLACE FUNCTION vandelay.get_expr_from_match_set(
664     match_set_id INTEGER,
665     tags_rstore HSTORE,
666     auth_heading TEXT
667 ) RETURNS TEXT AS $$
668 DECLARE
669     root vandelay.match_set_point;
670 BEGIN
671     SELECT * INTO root FROM vandelay.match_set_point
672         WHERE parent IS NULL AND match_set = match_set_id;
673
674     RETURN vandelay.get_expr_from_match_set_point(
675         root, tags_rstore, auth_heading);
676 END;
677 $$  LANGUAGE PLPGSQL;
678
679 CREATE OR REPLACE FUNCTION vandelay.get_expr_from_match_set_point(
680     node vandelay.match_set_point,
681     tags_rstore HSTORE,
682     auth_heading TEXT
683 ) RETURNS TEXT AS $$
684 DECLARE
685     q           TEXT;
686     i           INTEGER;
687     this_op     TEXT;
688     children    INTEGER[];
689     child       vandelay.match_set_point;
690 BEGIN
691     SELECT ARRAY_AGG(id) INTO children FROM vandelay.match_set_point
692         WHERE parent = node.id;
693
694     IF ARRAY_LENGTH(children, 1) > 0 THEN
695         this_op := vandelay._get_expr_render_one(node);
696         q := '(';
697         i := 1;
698         WHILE children[i] IS NOT NULL LOOP
699             SELECT * INTO child FROM vandelay.match_set_point
700                 WHERE id = children[i];
701             IF i > 1 THEN
702                 q := q || ' ' || this_op || ' ';
703             END IF;
704             i := i + 1;
705             q := q || vandelay.get_expr_from_match_set_point(
706                 child, tags_rstore, auth_heading);
707         END LOOP;
708         q := q || ')';
709         RETURN q;
710     ELSIF node.bool_op IS NULL THEN
711         PERFORM vandelay._get_expr_push_qrow(node);
712         PERFORM vandelay._get_expr_push_jrow(node, tags_rstore, auth_heading);
713         RETURN vandelay._get_expr_render_one(node);
714     ELSE
715         RETURN '';
716     END IF;
717 END;
718 $$  LANGUAGE PLPGSQL;
719
720 CREATE OR REPLACE FUNCTION vandelay._get_expr_push_qrow(
721     node vandelay.match_set_point
722 ) RETURNS VOID AS $$
723 DECLARE
724 BEGIN
725     INSERT INTO _vandelay_tmp_qrows (q) VALUES (node.id);
726 END;
727 $$ LANGUAGE PLPGSQL;
728
729 CREATE OR REPLACE FUNCTION vandelay._get_expr_push_jrow(
730     node vandelay.match_set_point,
731     tags_rstore HSTORE,
732     auth_heading TEXT
733 ) RETURNS VOID AS $$
734 DECLARE
735     jrow        TEXT;
736     my_alias    TEXT;
737     op          TEXT;
738     tagkey      TEXT;
739     caseless    BOOL;
740     jrow_count  INT;
741     my_using    TEXT;
742     my_join     TEXT;
743     rec_table   TEXT;
744 BEGIN
745     -- remember $1 is tags_rstore, and $2 is svf_rstore
746     -- a non-NULL auth_heading means we're matching authority records
747
748     IF auth_heading IS NOT NULL THEN
749         rec_table := 'authority.full_rec';
750     ELSE
751         rec_table := 'metabib.full_rec';
752     END IF;
753
754     caseless := FALSE;
755     SELECT COUNT(*) INTO jrow_count FROM _vandelay_tmp_jrows;
756     IF jrow_count > 0 THEN
757         my_using := ' USING (record)';
758         my_join := 'FULL OUTER JOIN';
759     ELSE
760         my_using := '';
761         my_join := 'FROM';
762     END IF;
763
764     IF node.tag IS NOT NULL THEN
765         caseless := (node.tag IN ('020', '022', '024'));
766         tagkey := node.tag;
767         IF node.subfield IS NOT NULL THEN
768             tagkey := tagkey || node.subfield;
769         END IF;
770     END IF;
771
772     IF node.negate THEN
773         IF caseless THEN
774             op := 'NOT LIKE';
775         ELSE
776             op := '<>';
777         END IF;
778     ELSE
779         IF caseless THEN
780             op := 'LIKE';
781         ELSE
782             op := '=';
783         END IF;
784     END IF;
785
786     my_alias := 'n' || node.id::TEXT;
787
788     jrow := my_join || ' (SELECT *, ';
789     IF node.tag IS NOT NULL THEN
790         jrow := jrow  || node.quality ||
791             ' AS quality FROM ' || rec_table || ' mfr WHERE mfr.tag = ''' ||
792             node.tag || '''';
793         IF node.subfield IS NOT NULL THEN
794             jrow := jrow || ' AND mfr.subfield = ''' ||
795                 node.subfield || '''';
796         END IF;
797         jrow := jrow || ' AND (';
798         jrow := jrow || vandelay._node_tag_comparisons(caseless, op, tags_rstore, tagkey);
799         jrow := jrow || ')) ' || my_alias || my_using || E'\n';
800     ELSE    -- svf
801         IF auth_heading IS NOT NULL THEN -- authority record
802             IF node.heading AND auth_heading <> '' THEN
803                 jrow := jrow || 'id AS record, ' || node.quality ||
804                 ' AS quality FROM authority.record_entry are ' ||
805                 ' WHERE are.heading = ''' || auth_heading || '''';
806                 jrow := jrow || ') ' || my_alias || my_using || E'\n';
807             END IF;
808         ELSE -- bib record
809             jrow := jrow || 'id AS record, ' || node.quality ||
810                 ' AS quality FROM metabib.record_attr_flat mraf WHERE mraf.attr = ''' ||
811                 node.svf || ''' AND mraf.value ' || op || ' $2->''' || node.svf || ''') ' ||
812                 my_alias || my_using || E'\n';
813         END IF;
814     END IF;
815     INSERT INTO _vandelay_tmp_jrows (j) VALUES (jrow);
816 END;
817 $$ LANGUAGE PLPGSQL;
818
819 CREATE OR REPLACE FUNCTION vandelay._node_tag_comparisons(
820     caseless BOOLEAN,
821     op TEXT,
822     tags_rstore HSTORE,
823     tagkey TEXT
824 ) RETURNS TEXT AS $$
825 DECLARE
826     result  TEXT;
827     i       INT;
828     vals    TEXT[];
829 BEGIN
830     i := 1;
831     vals := tags_rstore->tagkey;
832     result := '';
833
834     WHILE TRUE LOOP
835         IF i > 1 THEN
836             IF vals[i] IS NULL THEN
837                 EXIT;
838             ELSE
839                 result := result || ' OR ';
840             END IF;
841         END IF;
842
843         IF caseless THEN
844             result := result || 'LOWER(mfr.value) ' || op;
845         ELSE
846             result := result || 'mfr.value ' || op;
847         END IF;
848
849         result := result || ' ' || COALESCE('''' || vals[i] || '''', 'NULL');
850
851         IF vals[i] IS NULL THEN
852             EXIT;
853         END IF;
854         i := i + 1;
855     END LOOP;
856
857     RETURN result;
858
859 END;
860 $$ LANGUAGE PLPGSQL;
861
862 CREATE OR REPLACE FUNCTION vandelay._get_expr_render_one(
863     node vandelay.match_set_point
864 ) RETURNS TEXT AS $$
865 DECLARE
866     s           TEXT;
867 BEGIN
868     IF node.bool_op IS NOT NULL THEN
869         RETURN node.bool_op;
870     ELSE
871         RETURN '(n' || node.id::TEXT || '.id IS NOT NULL)';
872     END IF;
873 END;
874 $$ LANGUAGE PLPGSQL;
875
876 CREATE OR REPLACE FUNCTION vandelay.match_bib_record() RETURNS TRIGGER AS $func$
877 DECLARE
878     incoming_existing_id    TEXT;
879     test_result             vandelay.match_set_test_result%ROWTYPE;
880     tmp_rec                 BIGINT;
881     match_set               INT;
882     match_bucket            INT;
883 BEGIN
884     IF TG_OP IN ('INSERT','UPDATE') AND NEW.imported_as IS NOT NULL THEN
885         RETURN NEW;
886     END IF;
887
888     DELETE FROM vandelay.bib_match WHERE queued_record = NEW.id;
889
890     SELECT q.match_set INTO match_set FROM vandelay.bib_queue q WHERE q.id = NEW.queue;
891
892     IF match_set IS NOT NULL THEN
893         NEW.quality := vandelay.measure_record_quality( NEW.marc, match_set );
894     END IF;
895
896     -- Perfect matches on 901$c exit early with a match with high quality.
897     incoming_existing_id :=
898         oils_xpath_string('//*[@tag="901"]/*[@code="c"][1]', NEW.marc);
899
900     IF incoming_existing_id IS NOT NULL AND incoming_existing_id != '' THEN
901         SELECT id INTO tmp_rec FROM biblio.record_entry WHERE id = incoming_existing_id::bigint;
902         IF tmp_rec IS NOT NULL THEN
903             INSERT INTO vandelay.bib_match (queued_record, eg_record, match_score, quality) 
904                 SELECT
905                     NEW.id, 
906                     b.id,
907                     9999,
908                     -- note: no match_set means quality==0
909                     vandelay.measure_record_quality( b.marc, match_set )
910                 FROM biblio.record_entry b
911                 WHERE id = incoming_existing_id::bigint;
912         END IF;
913     END IF;
914
915     IF match_set IS NULL THEN
916         RETURN NEW;
917     END IF;
918
919     SELECT q.match_bucket INTO match_bucket FROM vandelay.bib_queue q WHERE q.id = NEW.queue;
920
921     FOR test_result IN SELECT * FROM
922         vandelay.match_set_test_marcxml(match_set, NEW.marc, match_bucket) LOOP
923
924         INSERT INTO vandelay.bib_match ( queued_record, eg_record, match_score, quality )
925             SELECT  
926                 NEW.id,
927                 test_result.record,
928                 test_result.quality,
929                 vandelay.measure_record_quality( b.marc, match_set )
930                 FROM  biblio.record_entry b
931                 WHERE id = test_result.record;
932
933     END LOOP;
934
935     RETURN NEW;
936 END;
937 $func$ LANGUAGE PLPGSQL;
938
939 CREATE OR REPLACE FUNCTION vandelay.measure_record_quality ( xml TEXT, match_set_id INT ) RETURNS INT AS $_$
940 DECLARE
941     out_q   INT := 0;
942     rvalue  TEXT;
943     test    vandelay.match_set_quality%ROWTYPE;
944 BEGIN
945
946     FOR test IN SELECT * FROM vandelay.match_set_quality WHERE match_set = match_set_id LOOP
947         IF test.tag IS NOT NULL THEN
948             FOR rvalue IN SELECT value FROM vandelay.flatten_marc( xml ) WHERE tag = test.tag AND subfield = test.subfield LOOP
949                 IF test.value = rvalue THEN
950                     out_q := out_q + test.quality;
951                 END IF;
952             END LOOP;
953         ELSE
954             IF test.value = vandelay.extract_rec_attrs(xml, ARRAY[test.svf]) -> test.svf THEN
955                 out_q := out_q + test.quality;
956             END IF;
957         END IF;
958     END LOOP;
959
960     RETURN out_q;
961 END;
962 $_$ LANGUAGE PLPGSQL;
963
964 CREATE TYPE vandelay.tcn_data AS (tcn TEXT, tcn_source TEXT, used BOOL);
965 CREATE OR REPLACE FUNCTION vandelay.find_bib_tcn_data ( xml TEXT ) RETURNS SETOF vandelay.tcn_data AS $_$
966 DECLARE
967     eg_tcn          TEXT;
968     eg_tcn_source   TEXT;
969     output          vandelay.tcn_data%ROWTYPE;
970 BEGIN
971
972     -- 001/003
973     eg_tcn := BTRIM((oils_xpath('//*[@tag="001"]/text()',xml))[1]);
974     IF eg_tcn IS NOT NULL AND eg_tcn <> '' THEN
975
976         eg_tcn_source := BTRIM((oils_xpath('//*[@tag="003"]/text()',xml))[1]);
977         IF eg_tcn_source IS NULL OR eg_tcn_source = '' THEN
978             eg_tcn_source := 'System Local';
979         END IF;
980
981         PERFORM id FROM biblio.record_entry WHERE tcn_value = eg_tcn  AND NOT deleted;
982
983         IF NOT FOUND THEN
984             output.used := FALSE;
985         ELSE
986             output.used := TRUE;
987         END IF;
988
989         output.tcn := eg_tcn;
990         output.tcn_source := eg_tcn_source;
991         RETURN NEXT output;
992
993     END IF;
994
995     -- 901 ab
996     eg_tcn := BTRIM((oils_xpath('//*[@tag="901"]/*[@code="a"]/text()',xml))[1]);
997     IF eg_tcn IS NOT NULL AND eg_tcn <> '' THEN
998
999         eg_tcn_source := BTRIM((oils_xpath('//*[@tag="901"]/*[@code="b"]/text()',xml))[1]);
1000         IF eg_tcn_source IS NULL OR eg_tcn_source = '' THEN
1001             eg_tcn_source := 'System Local';
1002         END IF;
1003
1004         PERFORM id FROM biblio.record_entry WHERE tcn_value = eg_tcn  AND NOT deleted;
1005
1006         IF NOT FOUND THEN
1007             output.used := FALSE;
1008         ELSE
1009             output.used := TRUE;
1010         END IF;
1011
1012         output.tcn := eg_tcn;
1013         output.tcn_source := eg_tcn_source;
1014         RETURN NEXT output;
1015
1016     END IF;
1017
1018     -- 039 ab
1019     eg_tcn := BTRIM((oils_xpath('//*[@tag="039"]/*[@code="a"]/text()',xml))[1]);
1020     IF eg_tcn IS NOT NULL AND eg_tcn <> '' THEN
1021
1022         eg_tcn_source := BTRIM((oils_xpath('//*[@tag="039"]/*[@code="b"]/text()',xml))[1]);
1023         IF eg_tcn_source IS NULL OR eg_tcn_source = '' THEN
1024             eg_tcn_source := 'System Local';
1025         END IF;
1026
1027         PERFORM id FROM biblio.record_entry WHERE tcn_value = eg_tcn  AND NOT deleted;
1028
1029         IF NOT FOUND THEN
1030             output.used := FALSE;
1031         ELSE
1032             output.used := TRUE;
1033         END IF;
1034
1035         output.tcn := eg_tcn;
1036         output.tcn_source := eg_tcn_source;
1037         RETURN NEXT output;
1038
1039     END IF;
1040
1041     -- 020 a
1042     eg_tcn := REGEXP_REPLACE((oils_xpath('//*[@tag="020"]/*[@code="a"]/text()',xml))[1], $re$^(\w+).*?$$re$, $re$\1$re$);
1043     IF eg_tcn IS NOT NULL AND eg_tcn <> '' THEN
1044
1045         eg_tcn_source := 'ISBN';
1046
1047         PERFORM id FROM biblio.record_entry WHERE tcn_value = eg_tcn  AND NOT deleted;
1048
1049         IF NOT FOUND THEN
1050             output.used := FALSE;
1051         ELSE
1052             output.used := TRUE;
1053         END IF;
1054
1055         output.tcn := eg_tcn;
1056         output.tcn_source := eg_tcn_source;
1057         RETURN NEXT output;
1058
1059     END IF;
1060
1061     -- 022 a
1062     eg_tcn := REGEXP_REPLACE((oils_xpath('//*[@tag="022"]/*[@code="a"]/text()',xml))[1], $re$^(\w+).*?$$re$, $re$\1$re$);
1063     IF eg_tcn IS NOT NULL AND eg_tcn <> '' THEN
1064
1065         eg_tcn_source := 'ISSN';
1066
1067         PERFORM id FROM biblio.record_entry WHERE tcn_value = eg_tcn  AND NOT deleted;
1068
1069         IF NOT FOUND THEN
1070             output.used := FALSE;
1071         ELSE
1072             output.used := TRUE;
1073         END IF;
1074
1075         output.tcn := eg_tcn;
1076         output.tcn_source := eg_tcn_source;
1077         RETURN NEXT output;
1078
1079     END IF;
1080
1081     -- 010 a
1082     eg_tcn := REGEXP_REPLACE((oils_xpath('//*[@tag="010"]/*[@code="a"]/text()',xml))[1], $re$^(\w+).*?$$re$, $re$\1$re$);
1083     IF eg_tcn IS NOT NULL AND eg_tcn <> '' THEN
1084
1085         eg_tcn_source := 'LCCN';
1086
1087         PERFORM id FROM biblio.record_entry WHERE tcn_value = eg_tcn  AND NOT deleted;
1088
1089         IF NOT FOUND THEN
1090             output.used := FALSE;
1091         ELSE
1092             output.used := TRUE;
1093         END IF;
1094
1095         output.tcn := eg_tcn;
1096         output.tcn_source := eg_tcn_source;
1097         RETURN NEXT output;
1098
1099     END IF;
1100
1101     -- 035 a
1102     eg_tcn := REGEXP_REPLACE((oils_xpath('//*[@tag="035"]/*[@code="a"]/text()',xml))[1], $re$^.*?(\w+)$$re$, $re$\1$re$);
1103     IF eg_tcn IS NOT NULL AND eg_tcn <> '' THEN
1104
1105         eg_tcn_source := 'System Legacy';
1106
1107         PERFORM id FROM biblio.record_entry WHERE tcn_value = eg_tcn  AND NOT deleted;
1108
1109         IF NOT FOUND THEN
1110             output.used := FALSE;
1111         ELSE
1112             output.used := TRUE;
1113         END IF;
1114
1115         output.tcn := eg_tcn;
1116         output.tcn_source := eg_tcn_source;
1117         RETURN NEXT output;
1118
1119     END IF;
1120
1121     RETURN;
1122 END;
1123 $_$ LANGUAGE PLPGSQL;
1124
1125 CREATE OR REPLACE FUNCTION vandelay.add_field ( target_xml TEXT, source_xml TEXT, field TEXT, force_add INT ) RETURNS TEXT AS $_$
1126
1127     use MARC::Record;
1128     use MARC::File::XML (BinaryEncoding => 'UTF-8');
1129     use MARC::Charset;
1130     use strict;
1131
1132     MARC::Charset->assume_unicode(1);
1133
1134     my $target_xml = shift;
1135     my $source_xml = shift;
1136     my $field_spec = shift;
1137     my $force_add = shift || 0;
1138
1139     my $target_r = MARC::Record->new_from_xml( $target_xml );
1140     my $source_r = MARC::Record->new_from_xml( $source_xml );
1141
1142     return $target_xml unless ($target_r && $source_r);
1143
1144     my @field_list = split(',', $field_spec);
1145
1146     my %fields;
1147     for my $f (@field_list) {
1148         $f =~ s/^\s*//; $f =~ s/\s*$//;
1149         if ($f =~ /^(.{3})(\w*)(?:\[([^]]*)\])?$/) {
1150             my $field = $1;
1151             $field =~ s/\s+//;
1152             my $sf = $2;
1153             $sf =~ s/\s+//;
1154             my $match = $3;
1155             $match =~ s/^\s*//; $match =~ s/\s*$//;
1156             $fields{$field} = { sf => [ split('', $sf) ] };
1157             if ($match) {
1158                 my ($msf,$mre) = split('~', $match);
1159                 if (length($msf) > 0 and length($mre) > 0) {
1160                     $msf =~ s/^\s*//; $msf =~ s/\s*$//;
1161                     $mre =~ s/^\s*//; $mre =~ s/\s*$//;
1162                     $fields{$field}{match} = { sf => $msf, re => qr/$mre/ };
1163                 }
1164             }
1165         }
1166     }
1167
1168     for my $f ( keys %fields) {
1169         if ( @{$fields{$f}{sf}} ) {
1170             for my $from_field ($source_r->field( $f )) {
1171                 my @tos = $target_r->field( $f );
1172                 if (!@tos) {
1173                     next if (exists($fields{$f}{match}) and !$force_add);
1174                     my @new_fields = map { $_->clone } $source_r->field( $f );
1175                     $target_r->insert_fields_ordered( @new_fields );
1176                 } else {
1177                     for my $to_field (@tos) {
1178                         if (exists($fields{$f}{match})) {
1179                             next unless (grep { $_ =~ $fields{$f}{match}{re} } $to_field->subfield($fields{$f}{match}{sf}));
1180                         }
1181                         for my $old_sf ($from_field->subfields) {
1182                             $to_field->add_subfields( @$old_sf ) if grep(/$$old_sf[0]/,@{$fields{$f}{sf}});
1183                         }
1184                     }
1185                 }
1186             }
1187         } else {
1188             my @new_fields = map { $_->clone } $source_r->field( $f );
1189             $target_r->insert_fields_ordered( @new_fields );
1190         }
1191     }
1192
1193     $target_xml = $target_r->as_xml_record;
1194     $target_xml =~ s/^<\?.+?\?>$//mo;
1195     $target_xml =~ s/\n//sgo;
1196     $target_xml =~ s/>\s+</></sgo;
1197
1198     return $target_xml;
1199
1200 $_$ LANGUAGE PLPERLU;
1201
1202 CREATE OR REPLACE FUNCTION vandelay.add_field ( target_xml TEXT, source_xml TEXT, field TEXT ) RETURNS TEXT AS $_$
1203     SELECT vandelay.add_field( $1, $2, $3, 0 );
1204 $_$ LANGUAGE SQL;
1205
1206
1207 CREATE OR REPLACE FUNCTION vandelay.strip_field(xml text, field text) RETURNS text AS $f$
1208
1209     use MARC::Record;
1210     use MARC::File::XML (BinaryEncoding => 'UTF-8');
1211     use MARC::Charset;
1212     use strict;
1213
1214     MARC::Charset->assume_unicode(1);
1215
1216     my $xml = shift;
1217     my $r = MARC::Record->new_from_xml( $xml );
1218
1219     return $xml unless ($r);
1220
1221     my $field_spec = shift;
1222     my @field_list = split(',', $field_spec);
1223
1224     my %fields;
1225     for my $f (@field_list) {
1226         $f =~ s/^\s*//; $f =~ s/\s*$//;
1227         if ($f =~ /^(.{3})(\w*)(?:\[([^]]*)\])?$/) {
1228             my $field = $1;
1229             $field =~ s/\s+//;
1230             my $sf = $2;
1231             $sf =~ s/\s+//;
1232             my $matches = $3;
1233             $matches =~ s/^\s*//; $matches =~ s/\s*$//;
1234             $fields{$field} = { sf => [ split('', $sf) ] };
1235             if ($matches) {
1236                 for my $match (split('&&', $matches)) {
1237                     $match =~ s/^\s*//; $match =~ s/\s*$//;
1238                     my ($msf,$mre) = split('~', $match);
1239                     if (length($msf) > 0 and length($mre) > 0) {
1240                         $msf =~ s/^\s*//; $msf =~ s/\s*$//;
1241                         $mre =~ s/^\s*//; $mre =~ s/\s*$//;
1242                         $fields{$field}{match}{$msf} = qr/$mre/;
1243                     }
1244                 }
1245             }
1246         }
1247     }
1248
1249     for my $f ( keys %fields) {
1250         for my $to_field ($r->field( $f )) {
1251             if (exists($fields{$f}{match})) {
1252                 my @match_list = grep { $to_field->subfield($_) =~ $fields{$f}{match}{$_} } keys %{$fields{$f}{match}};
1253                 next unless (scalar(@match_list) == scalar(keys %{$fields{$f}{match}}));
1254             }
1255
1256             if ( @{$fields{$f}{sf}} ) {
1257                 $to_field->delete_subfield(code => $fields{$f}{sf});
1258             } else {
1259                 $r->delete_field( $to_field );
1260             }
1261         }
1262     }
1263
1264     $xml = $r->as_xml_record;
1265     $xml =~ s/^<\?.+?\?>$//mo;
1266     $xml =~ s/\n//sgo;
1267     $xml =~ s/>\s+</></sgo;
1268
1269     return $xml;
1270
1271 $f$ LANGUAGE plperlu;
1272
1273 CREATE OR REPLACE FUNCTION vandelay.replace_field 
1274     (target_xml TEXT, source_xml TEXT, field TEXT) RETURNS TEXT AS $_$
1275
1276     use strict;
1277     use MARC::Record;
1278     use MARC::Field;
1279     use MARC::File::XML (BinaryEncoding => 'UTF-8');
1280     use MARC::Charset;
1281
1282     MARC::Charset->assume_unicode(1);
1283
1284     my $target_xml = shift;
1285     my $source_xml = shift;
1286     my $field_spec = shift;
1287
1288     my $target_r = MARC::Record->new_from_xml($target_xml);
1289     my $source_r = MARC::Record->new_from_xml($source_xml);
1290
1291     return $target_xml unless $target_r && $source_r;
1292
1293     # Extract the field_spec components into MARC tags, subfields, 
1294     # and regex matches.  Copied wholesale from vandelay.strip_field()
1295
1296     my @field_list = split(',', $field_spec);
1297     my %fields;
1298     for my $f (@field_list) {
1299         $f =~ s/^\s*//; $f =~ s/\s*$//;
1300         if ($f =~ /^(.{3})(\w*)(?:\[([^]]*)\])?$/) {
1301             my $field = $1;
1302             $field =~ s/\s+//;
1303             my $sf = $2;
1304             $sf =~ s/\s+//;
1305             my $match = $3;
1306             $match =~ s/^\s*//; $match =~ s/\s*$//;
1307             $fields{$field} = { sf => [ split('', $sf) ] };
1308             if ($match) {
1309                 my ($msf,$mre) = split('~', $match);
1310                 if (length($msf) > 0 and length($mre) > 0) {
1311                     $msf =~ s/^\s*//; $msf =~ s/\s*$//;
1312                     $mre =~ s/^\s*//; $mre =~ s/\s*$//;
1313                     $fields{$field}{match} = { sf => $msf, re => qr/$mre/ };
1314                 }
1315             }
1316         }
1317     }
1318
1319     # Returns a flat list of subfield (code, value, code, value, ...)
1320     # suitable for adding to a MARC::Field.
1321     sub generate_replacement_subfields {
1322         my ($source_field, $target_field, @controlled_subfields) = @_;
1323
1324         # Performing a wholesale field replacment.  
1325         # Use the entire source field as-is.
1326         return map {$_->[0], $_->[1]} $source_field->subfields
1327             unless @controlled_subfields;
1328
1329         my @new_subfields;
1330
1331         # Iterate over all target field subfields:
1332         # 1. Keep uncontrolled subfields as is.
1333         # 2. Replace values for controlled subfields when a
1334         #    replacement value exists on the source record.
1335         # 3. Delete values for controlled subfields when no 
1336         #    replacement value exists on the source record.
1337
1338         for my $target_sf ($target_field->subfields) {
1339             my $subfield = $target_sf->[0];
1340             my $target_val = $target_sf->[1];
1341
1342             if (grep {$_ eq $subfield} @controlled_subfields) {
1343                 if (my $source_val = $source_field->subfield($subfield)) {
1344                     # We have a replacement value
1345                     push(@new_subfields, $subfield, $source_val);
1346                 } else {
1347                     # no replacement value for controlled subfield, drop it.
1348                 }
1349             } else {
1350                 # Field is not controlled.  Copy it over as-is.
1351                 push(@new_subfields, $subfield, $target_val);
1352             }
1353         }
1354
1355         # Iterate over all subfields in the source field and back-fill
1356         # any values that exist only in the source field.  Insert these
1357         # subfields in the same relative position they exist in the
1358         # source field.
1359                 
1360         my @seen_subfields;
1361         for my $source_sf ($source_field->subfields) {
1362             my $subfield = $source_sf->[0];
1363             my $source_val = $source_sf->[1];
1364             push(@seen_subfields, $subfield);
1365
1366             # target field already contains this subfield, 
1367             # so it would have been addressed above.
1368             next if $target_field->subfield($subfield);
1369
1370             # Ignore uncontrolled subfields.
1371             next unless grep {$_ eq $subfield} @controlled_subfields;
1372
1373             # Adding a new subfield.  Find its relative position and add
1374             # it to the list under construction.  Work backwards from
1375             # the list of already seen subfields to find the best slot.
1376
1377             my $done = 0;
1378             for my $seen_sf (reverse(@seen_subfields)) {
1379                 my $idx = @new_subfields;
1380                 for my $new_sf (reverse(@new_subfields)) {
1381                     $idx--;
1382                     next if $idx % 2 == 1; # sf codes are in the even slots
1383
1384                     if ($new_subfields[$idx] eq $seen_sf) {
1385                         splice(@new_subfields, $idx + 2, 0, $subfield, $source_val);
1386                         $done = 1;
1387                         last;
1388                     }
1389                 }
1390                 last if $done;
1391             }
1392
1393             # if no slot was found, add to the end of the list.
1394             push(@new_subfields, $subfield, $source_val) unless $done;
1395         }
1396
1397         return @new_subfields;
1398     }
1399
1400     # MARC tag loop
1401     for my $f (keys %fields) {
1402         my $tag_idx = -1;
1403         for my $target_field ($target_r->field($f)) {
1404
1405             # field spec contains a regex for this field.  Confirm field on 
1406             # target record matches the specified regex before replacing.
1407             if (exists($fields{$f}{match})) {
1408                 next unless (grep { $_ =~ $fields{$f}{match}{re} } 
1409                     $target_field->subfield($fields{$f}{match}{sf}));
1410             }
1411
1412             my @new_subfields;
1413             my @controlled_subfields = @{$fields{$f}{sf}};
1414
1415             # If the target record has multiple matching bib fields,
1416             # replace them from matching fields on the source record
1417             # in a predictable order to avoid replacing with them with
1418             # same source field repeatedly.
1419             my @source_fields = $source_r->field($f);
1420             my $source_field = $source_fields[++$tag_idx];
1421
1422             if (!$source_field && @controlled_subfields) {
1423                 # When there are more target fields than source fields
1424                 # and we are replacing values for subfields and not
1425                 # performing wholesale field replacment, use the last
1426                 # available source field as the input for all remaining
1427                 # target fields.
1428                 $source_field = $source_fields[$#source_fields];
1429             }
1430
1431             if (!$source_field) {
1432                 # No source field exists.  Delete all affected target
1433                 # data.  This is a little bit counterintuitive, but is
1434                 # backwards compatible with the previous version of this
1435                 # function which first deleted all affected data, then
1436                 # replaced values where possible.
1437                 if (@controlled_subfields) {
1438                     $target_field->delete_subfield($_) for @controlled_subfields;
1439                 } else {
1440                     $target_r->delete_field($target_field);
1441                 }
1442                 next;
1443             }
1444
1445             my @new_subfields = generate_replacement_subfields(
1446                 $source_field, $target_field, @controlled_subfields);
1447
1448             # Build the replacement field from scratch.  
1449             my $replacement_field = MARC::Field->new(
1450                 $target_field->tag,
1451                 $target_field->indicator(1),
1452                 $target_field->indicator(2),
1453                 @new_subfields
1454             );
1455
1456             $target_field->replace_with($replacement_field);
1457         }
1458     }
1459
1460     $target_xml = $target_r->as_xml_record;
1461     $target_xml =~ s/^<\?.+?\?>$//mo;
1462     $target_xml =~ s/\n//sgo;
1463     $target_xml =~ s/>\s+</></sgo;
1464
1465     return $target_xml;
1466
1467 $_$ LANGUAGE PLPERLU;
1468
1469 CREATE OR REPLACE FUNCTION vandelay.merge_record_xml ( target_xml TEXT, source_xml TEXT, add_rule TEXT, replace_preserve_rule TEXT, strip_rule TEXT ) RETURNS TEXT AS $_$
1470     SELECT vandelay.replace_field( vandelay.add_field( vandelay.strip_field( $1, $5) , $2, $3 ), $2, $4);
1471 $_$ LANGUAGE SQL;
1472
1473 CREATE TYPE vandelay.compile_profile AS (add_rule TEXT, replace_rule TEXT, preserve_rule TEXT, strip_rule TEXT);
1474 CREATE OR REPLACE FUNCTION vandelay.compile_profile ( incoming_xml TEXT ) RETURNS vandelay.compile_profile AS $_$
1475 DECLARE
1476     output              vandelay.compile_profile%ROWTYPE;
1477     profile             vandelay.merge_profile%ROWTYPE;
1478     profile_tmpl        TEXT;
1479     profile_tmpl_owner  TEXT;
1480     add_rule            TEXT := '';
1481     strip_rule          TEXT := '';
1482     replace_rule        TEXT := '';
1483     preserve_rule       TEXT := '';
1484
1485 BEGIN
1486
1487     profile_tmpl := (oils_xpath('//*[@tag="905"]/*[@code="t"]/text()',incoming_xml))[1];
1488     profile_tmpl_owner := (oils_xpath('//*[@tag="905"]/*[@code="o"]/text()',incoming_xml))[1];
1489
1490     IF profile_tmpl IS NOT NULL AND profile_tmpl <> '' AND profile_tmpl_owner IS NOT NULL AND profile_tmpl_owner <> '' THEN
1491         SELECT  p.* INTO profile
1492           FROM  vandelay.merge_profile p
1493                 JOIN actor.org_unit u ON (u.id = p.owner)
1494           WHERE p.name = profile_tmpl
1495                 AND u.shortname = profile_tmpl_owner;
1496
1497         IF profile.id IS NOT NULL THEN
1498             add_rule := COALESCE(profile.add_spec,'');
1499             strip_rule := COALESCE(profile.strip_spec,'');
1500             replace_rule := COALESCE(profile.replace_spec,'');
1501             preserve_rule := COALESCE(profile.preserve_spec,'');
1502         END IF;
1503     END IF;
1504
1505     add_rule := add_rule || ',' || COALESCE(ARRAY_TO_STRING(oils_xpath('//*[@tag="905"]/*[@code="a"]/text()',incoming_xml),','),'');
1506     strip_rule := strip_rule || ',' || COALESCE(ARRAY_TO_STRING(oils_xpath('//*[@tag="905"]/*[@code="d"]/text()',incoming_xml),','),'');
1507     replace_rule := replace_rule || ',' || COALESCE(ARRAY_TO_STRING(oils_xpath('//*[@tag="905"]/*[@code="r"]/text()',incoming_xml),','),'');
1508     preserve_rule := preserve_rule || ',' || COALESCE(ARRAY_TO_STRING(oils_xpath('//*[@tag="905"]/*[@code="p"]/text()',incoming_xml),','),'');
1509
1510     output.add_rule := BTRIM(add_rule,',');
1511     output.replace_rule := BTRIM(replace_rule,',');
1512     output.strip_rule := BTRIM(strip_rule,',');
1513     output.preserve_rule := BTRIM(preserve_rule,',');
1514
1515     RETURN output;
1516 END;
1517 $_$ LANGUAGE PLPGSQL;
1518
1519 CREATE OR REPLACE FUNCTION vandelay.template_overlay_bib_record ( v_marc TEXT, eg_id BIGINT, merge_profile_id INT ) RETURNS BOOL AS $$
1520 DECLARE
1521     merge_profile   vandelay.merge_profile%ROWTYPE;
1522     dyn_profile     vandelay.compile_profile%ROWTYPE;
1523     editor_string   TEXT;
1524     editor_id       INT;
1525     source_marc     TEXT;
1526     target_marc     TEXT;
1527     eg_marc         TEXT;
1528     replace_rule    TEXT;
1529     match_count     INT;
1530 BEGIN
1531
1532     SELECT  b.marc INTO eg_marc
1533       FROM  biblio.record_entry b
1534       WHERE b.id = eg_id
1535       LIMIT 1;
1536
1537     IF eg_marc IS NULL OR v_marc IS NULL THEN
1538         -- RAISE NOTICE 'no marc for template or bib record';
1539         RETURN FALSE;
1540     END IF;
1541
1542     dyn_profile := vandelay.compile_profile( v_marc );
1543
1544     IF merge_profile_id IS NOT NULL THEN
1545         SELECT * INTO merge_profile FROM vandelay.merge_profile WHERE id = merge_profile_id;
1546         IF FOUND THEN
1547             dyn_profile.add_rule := BTRIM( dyn_profile.add_rule || ',' || COALESCE(merge_profile.add_spec,''), ',');
1548             dyn_profile.strip_rule := BTRIM( dyn_profile.strip_rule || ',' || COALESCE(merge_profile.strip_spec,''), ',');
1549             dyn_profile.replace_rule := BTRIM( dyn_profile.replace_rule || ',' || COALESCE(merge_profile.replace_spec,''), ',');
1550             dyn_profile.preserve_rule := BTRIM( dyn_profile.preserve_rule || ',' || COALESCE(merge_profile.preserve_spec,''), ',');
1551         END IF;
1552     END IF;
1553
1554     IF dyn_profile.replace_rule <> '' AND dyn_profile.preserve_rule <> '' THEN
1555         -- RAISE NOTICE 'both replace [%] and preserve [%] specified', dyn_profile.replace_rule, dyn_profile.preserve_rule;
1556         RETURN FALSE;
1557     END IF;
1558
1559     IF dyn_profile.replace_rule = '' AND dyn_profile.preserve_rule = '' AND dyn_profile.add_rule = '' AND dyn_profile.strip_rule = '' THEN
1560         --Since we have nothing to do, just return a NOOP "we did it"
1561         RETURN TRUE;
1562     ELSIF dyn_profile.replace_rule <> '' THEN
1563         source_marc = v_marc;
1564         target_marc = eg_marc;
1565         replace_rule = dyn_profile.replace_rule;
1566     ELSE
1567         source_marc = eg_marc;
1568         target_marc = v_marc;
1569         replace_rule = dyn_profile.preserve_rule;
1570     END IF;
1571
1572     UPDATE  biblio.record_entry
1573       SET   marc = vandelay.merge_record_xml( target_marc, source_marc, dyn_profile.add_rule, replace_rule, dyn_profile.strip_rule )
1574       WHERE id = eg_id;
1575
1576     IF NOT FOUND THEN
1577         -- RAISE NOTICE 'update of biblio.record_entry failed';
1578         RETURN FALSE;
1579     END IF;
1580
1581     RETURN TRUE;
1582
1583 END;
1584 $$ LANGUAGE PLPGSQL;
1585
1586 CREATE OR REPLACE FUNCTION vandelay.merge_record_xml ( target_marc TEXT, template_marc TEXT ) RETURNS TEXT AS $$
1587 DECLARE
1588     dyn_profile     vandelay.compile_profile%ROWTYPE;
1589     replace_rule    TEXT;
1590     tmp_marc        TEXT;
1591     trgt_marc        TEXT;
1592     tmpl_marc        TEXT;
1593     match_count     INT;
1594 BEGIN
1595
1596     IF target_marc IS NULL OR template_marc IS NULL THEN
1597         -- RAISE NOTICE 'no marc for target or template record';
1598         RETURN NULL;
1599     END IF;
1600
1601     dyn_profile := vandelay.compile_profile( template_marc );
1602
1603     IF dyn_profile.replace_rule <> '' AND dyn_profile.preserve_rule <> '' THEN
1604         -- RAISE NOTICE 'both replace [%] and preserve [%] specified', dyn_profile.replace_rule, dyn_profile.preserve_rule;
1605         RETURN NULL;
1606     END IF;
1607
1608     IF dyn_profile.replace_rule = '' AND dyn_profile.preserve_rule = '' AND dyn_profile.add_rule = '' AND dyn_profile.strip_rule = '' THEN
1609         --Since we have nothing to do, just return what we were given.
1610         RETURN target_marc;
1611     ELSIF dyn_profile.replace_rule <> '' THEN
1612         trgt_marc = target_marc;
1613         tmpl_marc = template_marc;
1614         replace_rule = dyn_profile.replace_rule;
1615     ELSE
1616         tmp_marc = target_marc;
1617         trgt_marc = template_marc;
1618         tmpl_marc = tmp_marc;
1619         replace_rule = dyn_profile.preserve_rule;
1620     END IF;
1621
1622     RETURN vandelay.merge_record_xml( trgt_marc, tmpl_marc, dyn_profile.add_rule, replace_rule, dyn_profile.strip_rule );
1623
1624 END;
1625 $$ LANGUAGE PLPGSQL;
1626
1627 CREATE OR REPLACE FUNCTION vandelay.merge_record_xml_using_profile ( incoming_marc TEXT, existing_marc TEXT, merge_profile_id BIGINT ) RETURNS TEXT AS $$
1628 DECLARE
1629     merge_profile   vandelay.merge_profile%ROWTYPE;
1630     dyn_profile     vandelay.compile_profile%ROWTYPE;
1631     target_marc     TEXT;
1632     source_marc     TEXT;
1633     replace_rule    TEXT;
1634     match_count     INT;
1635 BEGIN
1636
1637     IF existing_marc IS NULL OR incoming_marc IS NULL THEN
1638         -- RAISE NOTICE 'no marc for source or target records';
1639         RETURN NULL;
1640     END IF;
1641
1642     IF merge_profile_id IS NOT NULL THEN
1643         SELECT * INTO merge_profile FROM vandelay.merge_profile WHERE id = merge_profile_id;
1644         IF FOUND THEN
1645             dyn_profile.add_rule := COALESCE(merge_profile.add_spec,'');
1646             dyn_profile.strip_rule := COALESCE(merge_profile.strip_spec,'');
1647             dyn_profile.replace_rule := COALESCE(merge_profile.replace_spec,'');
1648             dyn_profile.preserve_rule := COALESCE(merge_profile.preserve_spec,'');
1649         ELSE
1650             -- RAISE NOTICE 'merge profile not found';
1651             RETURN NULL;
1652         END IF;
1653     ELSE
1654         -- RAISE NOTICE 'no merge profile specified';
1655         RETURN NULL;
1656     END IF;
1657
1658     IF dyn_profile.replace_rule <> '' AND dyn_profile.preserve_rule <> '' THEN
1659         -- RAISE NOTICE 'both replace [%] and preserve [%] specified', dyn_profile.replace_rule, dyn_profile.preserve_rule;
1660         RETURN NULL;
1661     END IF;
1662
1663     IF dyn_profile.replace_rule = '' AND dyn_profile.preserve_rule = '' AND dyn_profile.add_rule = '' AND dyn_profile.strip_rule = '' THEN
1664         -- Since we have nothing to do, just return a target record as is
1665         RETURN existing_marc;
1666     ELSIF dyn_profile.preserve_rule <> '' THEN
1667         source_marc = existing_marc;
1668         target_marc = incoming_marc;
1669         replace_rule = dyn_profile.preserve_rule;
1670     ELSE
1671         source_marc = incoming_marc;
1672         target_marc = existing_marc;
1673         replace_rule = dyn_profile.replace_rule;
1674     END IF;
1675
1676     RETURN vandelay.merge_record_xml( target_marc, source_marc, dyn_profile.add_rule, replace_rule, dyn_profile.strip_rule );
1677
1678 END;
1679 $$ LANGUAGE PLPGSQL;
1680
1681 CREATE OR REPLACE FUNCTION vandelay.template_overlay_bib_record ( v_marc TEXT, eg_id BIGINT) RETURNS BOOL AS $$
1682     SELECT vandelay.template_overlay_bib_record( $1, $2, NULL);
1683 $$ LANGUAGE SQL;
1684
1685 CREATE OR REPLACE FUNCTION vandelay.overlay_bib_record 
1686     ( import_id BIGINT, eg_id BIGINT, merge_profile_id INT ) RETURNS BOOL AS $$
1687 DECLARE
1688     editor_string   TEXT;
1689     editor_id       INT;
1690     v_marc          TEXT;
1691     v_bib_source    INT;
1692     update_fields   TEXT[];
1693     update_query    TEXT;
1694     update_bib_source BOOL;
1695     update_bib_editor BOOL;
1696 BEGIN
1697
1698     SELECT  q.marc, q.bib_source INTO v_marc, v_bib_source
1699       FROM  vandelay.queued_bib_record q
1700             JOIN vandelay.bib_match m ON (m.queued_record = q.id AND q.id = import_id)
1701       LIMIT 1;
1702
1703     IF v_marc IS NULL THEN
1704         -- RAISE NOTICE 'no marc for vandelay or bib record';
1705         RETURN FALSE;
1706     END IF;
1707
1708     IF NOT vandelay.template_overlay_bib_record( v_marc, eg_id, merge_profile_id) THEN
1709         -- no update happened, get outta here.
1710         RETURN FALSE;
1711     END IF;
1712
1713     UPDATE  vandelay.queued_bib_record
1714       SET   imported_as = eg_id,
1715             import_time = NOW()
1716       WHERE id = import_id;
1717
1718     SELECT q.update_bib_source INTO update_bib_source 
1719         FROM vandelay.merge_profile q where q.id = merge_profile_Id;
1720
1721     IF update_bib_source AND v_bib_source IS NOT NULL THEN
1722         update_fields := ARRAY_APPEND(update_fields, 'source = ' || v_bib_source);
1723     END IF;
1724
1725     SELECT q.update_bib_editor INTO update_bib_editor 
1726         FROM vandelay.merge_profile q where q.id = merge_profile_Id;
1727
1728     IF update_bib_editor THEN
1729
1730         editor_string := (oils_xpath('//*[@tag="905"]/*[@code="u"]/text()',v_marc))[1];
1731
1732         IF editor_string IS NOT NULL AND editor_string <> '' THEN
1733             SELECT usr INTO editor_id FROM actor.card WHERE barcode = editor_string;
1734
1735             IF editor_id IS NULL THEN
1736                 SELECT id INTO editor_id FROM actor.usr WHERE usrname = editor_string;
1737             END IF;
1738
1739             IF editor_id IS NOT NULL THEN
1740                 --only update the edit date if we have a valid editor
1741                 update_fields := ARRAY_APPEND(
1742                     update_fields, 'editor = ' || editor_id || ', edit_date = NOW()');
1743             END IF;
1744         END IF;
1745     END IF;
1746
1747     IF ARRAY_LENGTH(update_fields, 1) > 0 THEN
1748         update_query := 'UPDATE biblio.record_entry SET ' || 
1749             ARRAY_TO_STRING(update_fields, ',') || ' WHERE id = ' || eg_id || ';';
1750         EXECUTE update_query;
1751     END IF;
1752
1753     RETURN TRUE;
1754 END;
1755 $$ LANGUAGE PLPGSQL;
1756
1757
1758 CREATE OR REPLACE FUNCTION vandelay.auto_overlay_bib_record_with_best ( import_id BIGINT, merge_profile_id INT, lwm_ratio_value_p NUMERIC ) RETURNS BOOL AS $$
1759 DECLARE
1760     eg_id           BIGINT;
1761     lwm_ratio_value NUMERIC;
1762 BEGIN
1763
1764     lwm_ratio_value := COALESCE(lwm_ratio_value_p, 0.0);
1765
1766     PERFORM * FROM vandelay.queued_bib_record WHERE import_time IS NOT NULL AND id = import_id;
1767
1768     IF FOUND THEN
1769         -- RAISE NOTICE 'already imported, cannot auto-overlay'
1770         RETURN FALSE;
1771     END IF;
1772
1773     SELECT  m.eg_record INTO eg_id
1774       FROM  vandelay.bib_match m
1775             JOIN vandelay.queued_bib_record qr ON (m.queued_record = qr.id)
1776             JOIN vandelay.bib_queue q ON (qr.queue = q.id)
1777             JOIN biblio.record_entry r ON (r.id = m.eg_record)
1778       WHERE m.queued_record = import_id
1779             AND qr.quality::NUMERIC / COALESCE(NULLIF(m.quality,0),1)::NUMERIC >= lwm_ratio_value
1780       ORDER BY  m.match_score DESC, -- required match score
1781                 qr.quality::NUMERIC / COALESCE(NULLIF(m.quality,0),1)::NUMERIC DESC, -- quality tie breaker
1782                 m.id -- when in doubt, use the first match
1783       LIMIT 1;
1784
1785     IF eg_id IS NULL THEN
1786         -- RAISE NOTICE 'incoming record is not of high enough quality';
1787         RETURN FALSE;
1788     END IF;
1789
1790     RETURN vandelay.overlay_bib_record( import_id, eg_id, merge_profile_id );
1791 END;
1792 $$ LANGUAGE PLPGSQL;
1793
1794 CREATE OR REPLACE FUNCTION vandelay.auto_overlay_bib_record_with_best ( import_id BIGINT, merge_profile_id INT ) RETURNS BOOL AS $$
1795     SELECT vandelay.auto_overlay_bib_record_with_best( $1, $2, p.lwm_ratio ) FROM vandelay.merge_profile p WHERE id = $2;
1796 $$ LANGUAGE SQL;
1797
1798 CREATE OR REPLACE FUNCTION vandelay.auto_overlay_bib_record ( import_id BIGINT, merge_profile_id INT ) RETURNS BOOL AS $$
1799 DECLARE
1800     eg_id           BIGINT;
1801     match_count     INT;
1802 BEGIN
1803
1804     PERFORM * FROM vandelay.queued_bib_record WHERE import_time IS NOT NULL AND id = import_id;
1805
1806     IF FOUND THEN
1807         -- RAISE NOTICE 'already imported, cannot auto-overlay'
1808         RETURN FALSE;
1809     END IF;
1810
1811     SELECT COUNT(*) INTO match_count FROM vandelay.bib_match WHERE queued_record = import_id;
1812
1813     IF match_count <> 1 THEN
1814         -- RAISE NOTICE 'not an exact match';
1815         RETURN FALSE;
1816     END IF;
1817
1818     -- Check that the one match is on the first 901c
1819     SELECT  m.eg_record INTO eg_id
1820       FROM  vandelay.queued_bib_record q
1821             JOIN vandelay.bib_match m ON (m.queued_record = q.id)
1822       WHERE q.id = import_id
1823             AND m.eg_record = oils_xpath_string('//*[@tag="901"]/*[@code="c"][1]',marc)::BIGINT;
1824
1825     IF NOT FOUND THEN
1826         -- RAISE NOTICE 'not a 901c match';
1827         RETURN FALSE;
1828     END IF;
1829
1830     RETURN vandelay.overlay_bib_record( import_id, eg_id, merge_profile_id );
1831 END;
1832 $$ LANGUAGE PLPGSQL;
1833
1834 CREATE OR REPLACE FUNCTION vandelay.auto_overlay_bib_queue ( queue_id BIGINT, merge_profile_id INT ) RETURNS SETOF BIGINT AS $$
1835 DECLARE
1836     queued_record   vandelay.queued_bib_record%ROWTYPE;
1837 BEGIN
1838
1839     FOR queued_record IN SELECT * FROM vandelay.queued_bib_record WHERE queue = queue_id AND import_time IS NULL LOOP
1840
1841         IF vandelay.auto_overlay_bib_record( queued_record.id, merge_profile_id ) THEN
1842             RETURN NEXT queued_record.id;
1843         END IF;
1844
1845     END LOOP;
1846
1847     RETURN;
1848     
1849 END;
1850 $$ LANGUAGE PLPGSQL;
1851
1852 CREATE OR REPLACE FUNCTION vandelay.auto_overlay_bib_queue_with_best ( queue_id BIGINT, merge_profile_id INT, lwm_ratio_value NUMERIC ) RETURNS SETOF BIGINT AS $$
1853 DECLARE
1854     queued_record   vandelay.queued_bib_record%ROWTYPE;
1855 BEGIN
1856
1857     FOR queued_record IN SELECT * FROM vandelay.queued_bib_record WHERE queue = queue_id AND import_time IS NULL LOOP
1858
1859         IF vandelay.auto_overlay_bib_record_with_best( queued_record.id, merge_profile_id, lwm_ratio_value ) THEN
1860             RETURN NEXT queued_record.id;
1861         END IF;
1862
1863     END LOOP;
1864
1865     RETURN;
1866     
1867 END;
1868 $$ LANGUAGE PLPGSQL;
1869
1870 CREATE OR REPLACE FUNCTION vandelay.auto_overlay_bib_queue_with_best ( import_id BIGINT, merge_profile_id INT ) RETURNS SETOF BIGINT AS $$
1871     SELECT vandelay.auto_overlay_bib_queue_with_best( $1, $2, p.lwm_ratio ) FROM vandelay.merge_profile p WHERE id = $2;
1872 $$ LANGUAGE SQL;
1873
1874 CREATE OR REPLACE FUNCTION vandelay.auto_overlay_bib_queue ( queue_id BIGINT ) RETURNS SETOF BIGINT AS $$
1875     SELECT * FROM vandelay.auto_overlay_bib_queue( $1, NULL );
1876 $$ LANGUAGE SQL;
1877
1878 CREATE OR REPLACE FUNCTION vandelay.auto_overlay_org_unit_copies ( import_id BIGINT, merge_profile_id INT, lwm_ratio_value_p NUMERIC ) RETURNS BOOL AS $$
1879 DECLARE
1880     eg_id           BIGINT;
1881     match_count     INT;
1882     rec             vandelay.bib_match%ROWTYPE;
1883     v_owning_lib    INT;
1884     scope_org       INT;
1885     scope_orgs      INT[];
1886     copy_count      INT := 0;
1887     max_copy_count  INT := 0;
1888 BEGIN
1889
1890     PERFORM * FROM vandelay.queued_bib_record WHERE import_time IS NOT NULL AND id = import_id;
1891
1892     IF FOUND THEN
1893         -- RAISE NOTICE 'already imported, cannot auto-overlay'
1894         RETURN FALSE;
1895     END IF;
1896
1897     -- Gather all the owning libs for our import items.
1898     -- These are our initial scope_orgs.
1899     SELECT ARRAY_AGG(DISTINCT owning_lib) INTO scope_orgs
1900         FROM vandelay.import_item
1901         WHERE record = import_id;
1902
1903     WHILE CARDINALITY(scope_orgs) > 0 LOOP
1904         FOR scope_org IN SELECT * FROM UNNEST(scope_orgs) LOOP
1905             -- For each match, get a count of all copies at descendants of our scope org.
1906             FOR rec IN SELECT * FROM vandelay.bib_match AS vbm
1907                 WHERE queued_record = import_id
1908                 ORDER BY vbm.eg_record DESC
1909             LOOP
1910                 SELECT COUNT(acp.id) INTO copy_count
1911                     FROM asset.copy AS acp
1912                     INNER JOIN asset.call_number AS acn
1913                         ON acp.call_number = acn.id
1914                     WHERE acn.owning_lib IN (SELECT id FROM
1915                         actor.org_unit_descendants(scope_org))
1916                     AND acn.record = rec.eg_record
1917                     AND acp.deleted = FALSE;
1918                 IF copy_count > max_copy_count THEN
1919                     max_copy_count := copy_count;
1920                     eg_id := rec.eg_record;
1921                 END IF;
1922             END LOOP;
1923         END LOOP;
1924
1925         -- If no matching bibs had holdings, gather our next set of orgs to check, and iterate.
1926         IF max_copy_count = 0 THEN 
1927             SELECT ARRAY_AGG(DISTINCT parent_ou) INTO scope_orgs
1928                 FROM actor.org_unit
1929                 WHERE id IN (SELECT * FROM UNNEST(scope_orgs))
1930                 AND parent_ou IS NOT NULL;
1931         END IF;
1932     END LOOP;
1933
1934     IF eg_id IS NULL THEN
1935         -- Could not determine best match via copy count
1936         -- fall back to default best match
1937         IF (SELECT * FROM vandelay.auto_overlay_bib_record_with_best( import_id, merge_profile_id, lwm_ratio_value_p )) THEN
1938             RETURN TRUE;
1939         ELSE
1940             RETURN FALSE;
1941         END IF;
1942     END IF;
1943
1944     RETURN vandelay.overlay_bib_record( import_id, eg_id, merge_profile_id );
1945 END;
1946 $$ LANGUAGE PLPGSQL;
1947
1948 CREATE OR REPLACE FUNCTION vandelay.ingest_bib_marc ( ) RETURNS TRIGGER AS $$
1949 DECLARE
1950     value   TEXT;
1951     atype   TEXT;
1952     adef    RECORD;
1953 BEGIN
1954     IF TG_OP IN ('INSERT','UPDATE') AND NEW.imported_as IS NOT NULL THEN
1955         RETURN NEW;
1956     END IF;
1957
1958     FOR adef IN SELECT * FROM vandelay.bib_attr_definition LOOP
1959
1960         SELECT extract_marc_field('vandelay.queued_bib_record', id, adef.xpath, adef.remove) INTO value FROM vandelay.queued_bib_record WHERE id = NEW.id;
1961         IF (value IS NOT NULL AND value <> '') THEN
1962             INSERT INTO vandelay.queued_bib_record_attr (record, field, attr_value) VALUES (NEW.id, adef.id, value);
1963         END IF;
1964
1965     END LOOP;
1966
1967     RETURN NULL;
1968 END;
1969 $$ LANGUAGE PLPGSQL;
1970
1971 CREATE OR REPLACE FUNCTION vandelay.cleanup_bib_marc ( ) RETURNS TRIGGER AS $$
1972 BEGIN
1973     IF TG_OP IN ('INSERT','UPDATE') AND NEW.imported_as IS NOT NULL THEN
1974         RETURN NEW;
1975     END IF;
1976
1977     DELETE FROM vandelay.queued_bib_record_attr WHERE record = OLD.id;
1978     DELETE FROM vandelay.import_item WHERE record = OLD.id;
1979
1980     IF TG_OP = 'UPDATE' THEN
1981         RETURN NEW;
1982     END IF;
1983     RETURN OLD;
1984 END;
1985 $$ LANGUAGE PLPGSQL;
1986
1987 CREATE TRIGGER cleanup_bib_trigger
1988     BEFORE UPDATE OR DELETE ON vandelay.queued_bib_record
1989     FOR EACH ROW EXECUTE PROCEDURE vandelay.cleanup_bib_marc();
1990
1991 CREATE TRIGGER ingest_bib_trigger
1992     AFTER INSERT OR UPDATE ON vandelay.queued_bib_record
1993     FOR EACH ROW EXECUTE PROCEDURE vandelay.ingest_bib_marc();
1994
1995 CREATE TRIGGER zz_match_bibs_trigger
1996     BEFORE INSERT OR UPDATE ON vandelay.queued_bib_record
1997     FOR EACH ROW EXECUTE PROCEDURE vandelay.match_bib_record();
1998
1999
2000 /* Authority stuff down here */
2001 ---------------------------------------
2002 CREATE TABLE vandelay.authority_attr_definition (
2003         id                      SERIAL  PRIMARY KEY,
2004         code            TEXT    UNIQUE NOT NULL,
2005         description     TEXT,
2006         xpath           TEXT    NOT NULL,
2007         remove          TEXT    NOT NULL DEFAULT ''
2008 );
2009
2010 CREATE TYPE vandelay.authority_queue_queue_type AS ENUM ('authority');
2011 CREATE TABLE vandelay.authority_queue (
2012         queue_type      vandelay.authority_queue_queue_type NOT NULL DEFAULT 'authority',
2013         CONSTRAINT vand_authority_queue_name_once_per_owner_const UNIQUE (owner,name,queue_type)
2014 ) INHERITS (vandelay.queue);
2015 ALTER TABLE vandelay.authority_queue ADD PRIMARY KEY (id);
2016
2017 CREATE TABLE vandelay.queued_authority_record (
2018         queue           INT     NOT NULL REFERENCES vandelay.authority_queue (id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
2019         imported_as     INT     REFERENCES authority.record_entry (id) DEFERRABLE INITIALLY DEFERRED,
2020         import_error    TEXT    REFERENCES vandelay.import_error (code) ON DELETE SET NULL ON UPDATE CASCADE DEFERRABLE INITIALLY DEFERRED,
2021         error_detail    TEXT
2022 ) INHERITS (vandelay.queued_record);
2023 ALTER TABLE vandelay.queued_authority_record ADD PRIMARY KEY (id);
2024 CREATE INDEX queued_authority_record_queue_idx ON vandelay.queued_authority_record (queue);
2025
2026 CREATE TABLE vandelay.queued_authority_record_attr (
2027         id                      BIGSERIAL       PRIMARY KEY,
2028         record          BIGINT          NOT NULL REFERENCES vandelay.queued_authority_record (id) DEFERRABLE INITIALLY DEFERRED,
2029         field           INT                     NOT NULL REFERENCES vandelay.authority_attr_definition (id) DEFERRABLE INITIALLY DEFERRED,
2030         attr_value      TEXT            NOT NULL
2031 );
2032 CREATE INDEX queued_authority_record_attr_record_idx ON vandelay.queued_authority_record_attr (record);
2033
2034 CREATE TABLE vandelay.authority_match (
2035         id                              BIGSERIAL       PRIMARY KEY,
2036         queued_record   BIGINT          REFERENCES vandelay.queued_authority_record (id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
2037         eg_record               BIGINT          REFERENCES authority.record_entry (id) DEFERRABLE INITIALLY DEFERRED,
2038     quality         INT         NOT NULL DEFAULT 0,
2039     match_score     INT         NOT NULL DEFAULT 0
2040 );
2041
2042 CREATE OR REPLACE FUNCTION vandelay.ingest_authority_marc ( ) RETURNS TRIGGER AS $$
2043 DECLARE
2044     value   TEXT;
2045     atype   TEXT;
2046     adef    RECORD;
2047 BEGIN
2048     IF TG_OP IN ('INSERT','UPDATE') AND NEW.imported_as IS NOT NULL THEN
2049         RETURN NEW;
2050     END IF;
2051
2052     FOR adef IN SELECT * FROM vandelay.authority_attr_definition LOOP
2053
2054         SELECT extract_marc_field('vandelay.queued_authority_record', id, adef.xpath, adef.remove) INTO value FROM vandelay.queued_authority_record WHERE id = NEW.id;
2055         IF (value IS NOT NULL AND value <> '') THEN
2056             INSERT INTO vandelay.queued_authority_record_attr (record, field, attr_value) VALUES (NEW.id, adef.id, value);
2057         END IF;
2058
2059     END LOOP;
2060
2061     RETURN NULL;
2062 END;
2063 $$ LANGUAGE PLPGSQL;
2064
2065 CREATE OR REPLACE FUNCTION vandelay.cleanup_authority_marc ( ) RETURNS TRIGGER AS $$
2066 BEGIN
2067     IF TG_OP IN ('INSERT','UPDATE') AND NEW.imported_as IS NOT NULL THEN
2068         RETURN NEW;
2069     END IF;
2070
2071     DELETE FROM vandelay.queued_authority_record_attr WHERE record = OLD.id;
2072     IF TG_OP = 'UPDATE' THEN
2073         RETURN NEW;
2074     END IF;
2075     RETURN OLD;
2076 END;
2077 $$ LANGUAGE PLPGSQL;
2078
2079 CREATE TRIGGER cleanup_authority_trigger
2080     BEFORE UPDATE OR DELETE ON vandelay.queued_authority_record
2081     FOR EACH ROW EXECUTE PROCEDURE vandelay.cleanup_authority_marc();
2082
2083 CREATE TRIGGER ingest_authority_trigger
2084     AFTER INSERT OR UPDATE ON vandelay.queued_authority_record
2085     FOR EACH ROW EXECUTE PROCEDURE vandelay.ingest_authority_marc();
2086
2087 CREATE OR REPLACE FUNCTION vandelay.overlay_authority_record ( import_id BIGINT, eg_id BIGINT, merge_profile_id INT ) RETURNS BOOL AS $$
2088 DECLARE
2089     merge_profile   vandelay.merge_profile%ROWTYPE;
2090     dyn_profile     vandelay.compile_profile%ROWTYPE;
2091     editor_string   TEXT;
2092     new_editor      INT;
2093     new_edit_date   TIMESTAMPTZ;
2094     source_marc     TEXT;
2095     target_marc     TEXT;
2096     eg_marc_row     authority.record_entry%ROWTYPE;
2097     eg_marc         TEXT;
2098     v_marc          TEXT;
2099     replace_rule    TEXT;
2100     match_count     INT;
2101     update_query    TEXT;
2102 BEGIN
2103
2104     SELECT  * INTO eg_marc_row
2105       FROM  authority.record_entry b
2106             JOIN vandelay.authority_match m ON (m.eg_record = b.id AND m.queued_record = import_id)
2107       LIMIT 1;
2108
2109     SELECT  q.marc INTO v_marc
2110       FROM  vandelay.queued_record q
2111             JOIN vandelay.authority_match m ON (m.queued_record = q.id AND q.id = import_id)
2112       LIMIT 1;
2113
2114     eg_marc := eg_marc_row.marc;
2115
2116     IF eg_marc IS NULL OR v_marc IS NULL THEN
2117         -- RAISE NOTICE 'no marc for vandelay or authority record';
2118         RETURN FALSE;
2119     END IF;
2120
2121     -- Extract the editor string before any modification to the vandelay
2122     -- MARC occur.
2123     editor_string := 
2124         (oils_xpath('//*[@tag="905"]/*[@code="u"]/text()',v_marc))[1];
2125
2126     -- If an editor value can be found, update the authority record
2127     -- editor and edit_date values.
2128     IF editor_string IS NOT NULL AND editor_string <> '' THEN
2129
2130         -- Vandelay.pm sets the value to 'usrname' when needed.  
2131         SELECT id INTO new_editor
2132             FROM actor.usr WHERE usrname = editor_string;
2133
2134         IF new_editor IS NULL THEN
2135             SELECT usr INTO new_editor
2136                 FROM actor.card WHERE barcode = editor_string;
2137         END IF;
2138
2139         IF new_editor IS NOT NULL THEN
2140             new_edit_date := NOW();
2141         ELSE -- No valid editor, use current values
2142             new_editor = eg_marc_row.editor;
2143             new_edit_date = eg_marc_row.edit_date;
2144         END IF;
2145     ELSE
2146         new_editor = eg_marc_row.editor;
2147         new_edit_date = eg_marc_row.edit_date;
2148     END IF;
2149
2150     dyn_profile := vandelay.compile_profile( v_marc );
2151
2152     IF merge_profile_id IS NOT NULL THEN
2153         SELECT * INTO merge_profile FROM vandelay.merge_profile WHERE id = merge_profile_id;
2154         IF FOUND THEN
2155             dyn_profile.add_rule := BTRIM( dyn_profile.add_rule || ',' || COALESCE(merge_profile.add_spec,''), ',');
2156             dyn_profile.strip_rule := BTRIM( dyn_profile.strip_rule || ',' || COALESCE(merge_profile.strip_spec,''), ',');
2157             dyn_profile.replace_rule := BTRIM( dyn_profile.replace_rule || ',' || COALESCE(merge_profile.replace_spec,''), ',');
2158             dyn_profile.preserve_rule := BTRIM( dyn_profile.preserve_rule || ',' || COALESCE(merge_profile.preserve_spec,''), ',');
2159         END IF;
2160     END IF;
2161
2162     IF dyn_profile.replace_rule <> '' AND dyn_profile.preserve_rule <> '' THEN
2163         -- RAISE NOTICE 'both replace [%] and preserve [%] specified', dyn_profile.replace_rule, dyn_profile.preserve_rule;
2164         RETURN FALSE;
2165     END IF;
2166
2167     IF dyn_profile.replace_rule = '' AND dyn_profile.preserve_rule = '' AND dyn_profile.add_rule = '' AND dyn_profile.strip_rule = '' THEN
2168         --Since we have nothing to do, just return a NOOP "we did it"
2169         RETURN TRUE;
2170     ELSIF dyn_profile.replace_rule <> '' THEN
2171         source_marc = v_marc;
2172         target_marc = eg_marc;
2173         replace_rule = dyn_profile.replace_rule;
2174     ELSE
2175         source_marc = eg_marc;
2176         target_marc = v_marc;
2177         replace_rule = dyn_profile.preserve_rule;
2178     END IF;
2179
2180     UPDATE  authority.record_entry
2181       SET   marc = vandelay.merge_record_xml( target_marc, source_marc, dyn_profile.add_rule, replace_rule, dyn_profile.strip_rule ),
2182             editor = new_editor,
2183             edit_date = new_edit_date
2184       WHERE id = eg_id;
2185
2186     IF NOT FOUND THEN 
2187         -- Import/merge failed.  Nothing left to do.
2188         RETURN FALSE;
2189     END IF;
2190
2191     -- Authority record successfully merged / imported.
2192
2193     -- Update the vandelay record to show the successful import.
2194     UPDATE  vandelay.queued_authority_record
2195       SET   imported_as = eg_id,
2196             import_time = NOW()
2197       WHERE id = import_id;
2198
2199     RETURN TRUE;
2200
2201 END;
2202 $$ LANGUAGE PLPGSQL;
2203
2204 CREATE OR REPLACE FUNCTION vandelay.auto_overlay_authority_record ( import_id BIGINT, merge_profile_id INT ) RETURNS BOOL AS $$
2205 DECLARE
2206     eg_id           BIGINT;
2207     match_count     INT;
2208 BEGIN
2209     SELECT COUNT(*) INTO match_count FROM vandelay.authority_match WHERE queued_record = import_id;
2210
2211     IF match_count <> 1 THEN
2212         -- RAISE NOTICE 'not an exact match';
2213         RETURN FALSE;
2214     END IF;
2215
2216     SELECT  m.eg_record INTO eg_id
2217       FROM  vandelay.authority_match m
2218       WHERE m.queued_record = import_id
2219       LIMIT 1;
2220
2221     IF eg_id IS NULL THEN
2222         RETURN FALSE;
2223     END IF;
2224
2225     RETURN vandelay.overlay_authority_record( import_id, eg_id, merge_profile_id );
2226 END;
2227 $$ LANGUAGE PLPGSQL;
2228
2229 CREATE OR REPLACE FUNCTION vandelay.auto_overlay_authority_queue ( queue_id BIGINT, merge_profile_id INT ) RETURNS SETOF BIGINT AS $$
2230 DECLARE
2231     queued_record   vandelay.queued_authority_record%ROWTYPE;
2232 BEGIN
2233
2234     FOR queued_record IN SELECT * FROM vandelay.queued_authority_record WHERE queue = queue_id AND import_time IS NULL LOOP
2235
2236         IF vandelay.auto_overlay_authority_record( queued_record.id, merge_profile_id ) THEN
2237             RETURN NEXT queued_record.id;
2238         END IF;
2239
2240     END LOOP;
2241
2242     RETURN;
2243     
2244 END;
2245 $$ LANGUAGE PLPGSQL;
2246
2247 CREATE OR REPLACE FUNCTION vandelay.auto_overlay_authority_queue ( queue_id BIGINT ) RETURNS SETOF BIGINT AS $$
2248     SELECT * FROM vandelay.auto_overlay_authority_queue( $1, NULL );
2249 $$ LANGUAGE SQL;
2250
2251 CREATE OR REPLACE FUNCTION vandelay.match_set_test_authxml(
2252     match_set_id INTEGER, record_xml TEXT
2253 ) RETURNS SETOF vandelay.match_set_test_result AS $$
2254 DECLARE
2255     tags_rstore HSTORE;
2256     heading     TEXT;
2257     coal        TEXT;
2258     joins       TEXT;
2259     query_      TEXT;
2260     wq          TEXT;
2261     qvalue      INTEGER;
2262     rec         RECORD;
2263 BEGIN
2264     tags_rstore := vandelay.flatten_marc_hstore(record_xml);
2265
2266     SELECT normalize_heading INTO heading 
2267         FROM authority.normalize_heading(record_xml);
2268
2269     CREATE TEMPORARY TABLE _vandelay_tmp_qrows (q INTEGER);
2270     CREATE TEMPORARY TABLE _vandelay_tmp_jrows (j TEXT);
2271
2272     -- generate the where clause and return that directly (into wq), and as
2273     -- a side-effect, populate the _vandelay_tmp_[qj]rows tables.
2274     wq := vandelay.get_expr_from_match_set(
2275         match_set_id, tags_rstore, heading);
2276
2277     query_ := 'SELECT DISTINCT(record), ';
2278
2279     -- qrows table is for the quality bits we add to the SELECT clause
2280     SELECT STRING_AGG(
2281         'COALESCE(n' || q::TEXT || '.quality, 0)', ' + '
2282     ) INTO coal FROM _vandelay_tmp_qrows;
2283
2284     -- our query string so far is the SELECT clause and the inital FROM.
2285     -- no JOINs yet nor the WHERE clause
2286     query_ := query_ || coal || ' AS quality ' || E'\n';
2287
2288     -- jrows table is for the joins we must make (and the real text conditions)
2289     SELECT STRING_AGG(j, E'\n') INTO joins
2290         FROM _vandelay_tmp_jrows;
2291
2292     -- add those joins and the where clause to our query.
2293     query_ := query_ || joins || E'\n';
2294
2295     query_ := query_ || 'JOIN authority.record_entry are ON (are.id = record) ' 
2296         || 'WHERE ' || wq || ' AND not are.deleted';
2297
2298     -- this will return rows of record,quality
2299     FOR rec IN EXECUTE query_ USING tags_rstore LOOP
2300         RETURN NEXT rec;
2301     END LOOP;
2302
2303     DROP TABLE _vandelay_tmp_qrows;
2304     DROP TABLE _vandelay_tmp_jrows;
2305     RETURN;
2306 END;
2307 $$ LANGUAGE PLPGSQL;
2308
2309 CREATE OR REPLACE FUNCTION vandelay.measure_auth_record_quality 
2310     ( xml TEXT, match_set_id INT ) RETURNS INT AS $_$
2311 DECLARE
2312     out_q   INT := 0;
2313     rvalue  TEXT;
2314     test    vandelay.match_set_quality%ROWTYPE;
2315 BEGIN
2316
2317     FOR test IN SELECT * FROM vandelay.match_set_quality 
2318             WHERE match_set = match_set_id LOOP
2319         IF test.tag IS NOT NULL THEN
2320             FOR rvalue IN SELECT value FROM vandelay.flatten_marc( xml ) 
2321                 WHERE tag = test.tag AND subfield = test.subfield LOOP
2322                 IF test.value = rvalue THEN
2323                     out_q := out_q + test.quality;
2324                 END IF;
2325             END LOOP;
2326         END IF;
2327     END LOOP;
2328
2329     RETURN out_q;
2330 END;
2331 $_$ LANGUAGE PLPGSQL;
2332
2333
2334
2335 CREATE OR REPLACE FUNCTION vandelay.match_authority_record() RETURNS TRIGGER AS $func$
2336 DECLARE
2337     incoming_existing_id    TEXT;
2338     test_result             vandelay.match_set_test_result%ROWTYPE;
2339     tmp_rec                 BIGINT;
2340     match_set               INT;
2341 BEGIN
2342     IF TG_OP IN ('INSERT','UPDATE') AND NEW.imported_as IS NOT NULL THEN
2343         RETURN NEW;
2344     END IF;
2345
2346     DELETE FROM vandelay.authority_match WHERE queued_record = NEW.id;
2347
2348     SELECT q.match_set INTO match_set FROM vandelay.authority_queue q WHERE q.id = NEW.queue;
2349
2350     IF match_set IS NOT NULL THEN
2351         NEW.quality := vandelay.measure_auth_record_quality( NEW.marc, match_set );
2352     END IF;
2353
2354     -- Perfect matches on 901$c exit early with a match with high quality.
2355     incoming_existing_id :=
2356         oils_xpath_string('//*[@tag="901"]/*[@code="c"][1]', NEW.marc);
2357
2358     IF incoming_existing_id IS NOT NULL AND incoming_existing_id != '' THEN
2359         SELECT id INTO tmp_rec FROM authority.record_entry WHERE id = incoming_existing_id::bigint;
2360         IF tmp_rec IS NOT NULL THEN
2361             INSERT INTO vandelay.authority_match (queued_record, eg_record, match_score, quality) 
2362                 SELECT
2363                     NEW.id, 
2364                     b.id,
2365                     9999,
2366                     -- note: no match_set means quality==0
2367                     vandelay.measure_auth_record_quality( b.marc, match_set )
2368                 FROM authority.record_entry b
2369                 WHERE id = incoming_existing_id::bigint;
2370         END IF;
2371     END IF;
2372
2373     IF match_set IS NULL THEN
2374         RETURN NEW;
2375     END IF;
2376
2377     FOR test_result IN SELECT * FROM
2378         vandelay.match_set_test_authxml(match_set, NEW.marc) LOOP
2379
2380         INSERT INTO vandelay.authority_match ( queued_record, eg_record, match_score, quality )
2381             SELECT  
2382                 NEW.id,
2383                 test_result.record,
2384                 test_result.quality,
2385                 vandelay.measure_auth_record_quality( b.marc, match_set )
2386                 FROM  authority.record_entry b
2387                 WHERE id = test_result.record;
2388
2389     END LOOP;
2390
2391     RETURN NEW;
2392 END;
2393 $func$ LANGUAGE PLPGSQL;
2394
2395 CREATE TRIGGER zz_match_auths_trigger
2396     BEFORE INSERT OR UPDATE ON vandelay.queued_authority_record
2397     FOR EACH ROW EXECUTE PROCEDURE vandelay.match_authority_record();
2398
2399 CREATE OR REPLACE FUNCTION vandelay.auto_overlay_authority_record_with_best ( import_id BIGINT, merge_profile_id INT, lwm_ratio_value_p NUMERIC ) RETURNS BOOL AS $$
2400 DECLARE
2401     eg_id           BIGINT;
2402     lwm_ratio_value NUMERIC;
2403 BEGIN
2404
2405     lwm_ratio_value := COALESCE(lwm_ratio_value_p, 0.0);
2406
2407     PERFORM * FROM vandelay.queued_authority_record WHERE import_time IS NOT NULL AND id = import_id;
2408
2409     IF FOUND THEN
2410         -- RAISE NOTICE 'already imported, cannot auto-overlay'
2411         RETURN FALSE;
2412     END IF;
2413
2414     SELECT  m.eg_record INTO eg_id
2415       FROM  vandelay.authority_match m
2416             JOIN vandelay.queued_authority_record qr ON (m.queued_record = qr.id)
2417             JOIN vandelay.authority_queue q ON (qr.queue = q.id)
2418             JOIN authority.record_entry r ON (r.id = m.eg_record)
2419       WHERE m.queued_record = import_id
2420             AND qr.quality::NUMERIC / COALESCE(NULLIF(m.quality,0),1)::NUMERIC >= lwm_ratio_value
2421       ORDER BY  m.match_score DESC, -- required match score
2422                 qr.quality::NUMERIC / COALESCE(NULLIF(m.quality,0),1)::NUMERIC DESC, -- quality tie breaker
2423                 m.id -- when in doubt, use the first match
2424       LIMIT 1;
2425
2426     IF eg_id IS NULL THEN
2427         -- RAISE NOTICE 'incoming record is not of high enough quality';
2428         RETURN FALSE;
2429     END IF;
2430
2431     RETURN vandelay.overlay_authority_record( import_id, eg_id, merge_profile_id );
2432 END;
2433 $$ LANGUAGE PLPGSQL;
2434
2435
2436
2437
2438 -- Vandelay (for importing and exporting records) 012.schema.vandelay.sql 
2439 --INSERT INTO vandelay.bib_attr_definition ( id, code, description, xpath ) VALUES (1, 'title', oils_i18n_gettext(1, 'vqbrad', 'Title of work', 'description'),'//*[@tag="245"]/*[contains("abcmnopr",@code)]');
2440 --INSERT INTO vandelay.bib_attr_definition ( id, code, description, xpath ) VALUES (2, 'author', oils_i18n_gettext(1, 'vqbrad', 'Author of work', 'description'),'//*[@tag="100" or @tag="110" or @tag="113"]/*[contains("ad",@code)]');
2441 --INSERT INTO vandelay.bib_attr_definition ( id, code, description, xpath ) VALUES (3, 'language', oils_i18n_gettext(3, 'vqbrad', 'Language of work', 'description'),'//*[@tag="240"]/*[@code="l"][1]');
2442 --INSERT INTO vandelay.bib_attr_definition ( id, code, description, xpath ) VALUES (4, 'pagination', oils_i18n_gettext(4, 'vqbrad', 'Pagination', 'description'),'//*[@tag="300"]/*[@code="a"][1]');
2443 --INSERT INTO vandelay.bib_attr_definition ( id, code, description, xpath, ident, remove ) VALUES (5, 'isbn',oils_i18n_gettext(5, 'vqbrad', 'ISBN', 'description'),'//*[@tag="020"]/*[@code="a"]', TRUE, $r$(?:-|\s.+$)$r$);
2444 --INSERT INTO vandelay.bib_attr_definition ( id, code, description, xpath, ident, remove ) VALUES (6, 'issn',oils_i18n_gettext(6, 'vqbrad', 'ISSN', 'description'),'//*[@tag="022"]/*[@code="a"]', TRUE, $r$(?:-|\s.+$)$r$);
2445 --INSERT INTO vandelay.bib_attr_definition ( id, code, description, xpath ) VALUES (7, 'price',oils_i18n_gettext(7, 'vqbrad', 'Price', 'description'),'//*[@tag="020" or @tag="022"]/*[@code="c"][1]');
2446 --INSERT INTO vandelay.bib_attr_definition ( id, code, description, xpath, ident ) VALUES (8, 'rec_identifier',oils_i18n_gettext(8, 'vqbrad', 'Accession Number', 'description'),'//*[@tag="001"]', TRUE);
2447 --INSERT INTO vandelay.bib_attr_definition ( id, code, description, xpath, ident ) VALUES (9, 'eg_tcn',oils_i18n_gettext(9, 'vqbrad', 'TCN Value', 'description'),'//*[@tag="901"]/*[@code="a"]', TRUE);
2448 --INSERT INTO vandelay.bib_attr_definition ( id, code, description, xpath, ident ) VALUES (10, 'eg_tcn_source',oils_i18n_gettext(10, 'vqbrad', 'TCN Source', 'description'),'//*[@tag="901"]/*[@code="b"]', TRUE);
2449 --INSERT INTO vandelay.bib_attr_definition ( id, code, description, xpath, ident ) VALUES (11, 'eg_identifier',oils_i18n_gettext(11, 'vqbrad', 'Internal ID', 'description'),'//*[@tag="901"]/*[@code="c"]', TRUE);
2450 --INSERT INTO vandelay.bib_attr_definition ( id, code, description, xpath ) VALUES (12, 'publisher',oils_i18n_gettext(12, 'vqbrad', 'Publisher', 'description'),'//*[@tag="260"]/*[@code="b"][1]');
2451 --INSERT INTO vandelay.bib_attr_definition ( id, code, description, xpath, remove ) VALUES (13, 'pubdate',oils_i18n_gettext(13, 'vqbrad', 'Publication Date', 'description'),'//*[@tag="260"]/*[@code="c"][1]',$r$\D$r$);
2452 --INSERT INTO vandelay.bib_attr_definition ( id, code, description, xpath ) VALUES (14, 'edition',oils_i18n_gettext(14, 'vqbrad', 'Edition', 'description'),'//*[@tag="250"]/*[@code="a"][1]');
2453 --
2454 --INSERT INTO vandelay.import_item_attr_definition (
2455 --    owner, name, tag, owning_lib, circ_lib, location,
2456 --    call_number, circ_modifier, barcode, price, copy_number,
2457 --    circulate, ref, holdable, opac_visible, status
2458 --) VALUES (
2459 --    1,
2460 --    'Evergreen 852 export format',
2461 --    '852',
2462 --    '[@code = "b"][1]',
2463 --    '[@code = "b"][2]',
2464 --    'c',
2465 --    'j',
2466 --    'g',
2467 --    'p',
2468 --    'y',
2469 --    't',
2470 --    '[@code = "x" and text() = "circulating"]',
2471 --    '[@code = "x" and text() = "reference"]',
2472 --    '[@code = "x" and text() = "holdable"]',
2473 --    '[@code = "x" and text() = "visible"]',
2474 --    'z'
2475 --);
2476 --
2477 --INSERT INTO vandelay.import_item_attr_definition (
2478 --    owner,
2479 --    name,
2480 --    tag,
2481 --    owning_lib,
2482 --    location,
2483 --    call_number,
2484 --    circ_modifier,
2485 --    barcode,
2486 --    price,
2487 --    status
2488 --) VALUES (
2489 --    1,
2490 --    'Unicorn Import format -- 999',
2491 --    '999',
2492 --    'm',
2493 --    'l',
2494 --    'a',
2495 --    't',
2496 --    'i',
2497 --    'p',
2498 --    'k'
2499 --);
2500 --
2501 --INSERT INTO vandelay.authority_attr_definition ( code, description, xpath, ident ) VALUES ('rec_identifier','Identifier','//*[@tag="001"]', TRUE);
2502
2503
2504 CREATE TABLE vandelay.session_tracker (
2505     id          BIGSERIAL PRIMARY KEY,
2506
2507     -- string of characters (e.g. md5) used for linking trackers
2508     -- of different actions into a series.  There can be multiple
2509     -- session_keys of each action type, creating the opportunity
2510     -- to link multiple action trackers into a single session.
2511     session_key TEXT NOT NULL,
2512
2513     -- optional user-supplied name
2514     name        TEXT NOT NULL, 
2515
2516     usr         INTEGER NOT NULL REFERENCES actor.usr(id)
2517                 DEFERRABLE INITIALLY DEFERRED,
2518
2519     -- org unit can be derived from WS
2520     workstation INTEGER NOT NULL REFERENCES actor.workstation(id)
2521                 ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
2522
2523     -- bib/auth
2524     record_type TEXT NOT NULL DEFAULT 'bib',
2525
2526     -- Queue defines the source of the data, it does not necessarily
2527     -- mean that an action is being performed against an entire queue.
2528     -- E.g. some imports are misc. lists of record IDs, but they always 
2529     -- come from one queue.
2530     -- No foreign key -- could be auth or bib queue.
2531     queue       BIGINT NOT NULL,
2532
2533     create_time TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
2534     update_time TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
2535
2536     state       TEXT NOT NULL DEFAULT 'active',
2537
2538     action_type TEXT NOT NULL DEFAULT 'enqueue', -- import
2539
2540     -- total number of tasks to perform / loosely defined
2541     -- could be # of recs to import or # of recs + # of copies 
2542     -- depending on the import context
2543     total_actions INTEGER NOT NULL DEFAULT 0,
2544
2545     -- total number of tasked performed so far
2546     actions_performed INTEGER NOT NULL DEFAULT 0,
2547
2548     CONSTRAINT vand_tracker_valid_state 
2549         CHECK (state IN ('active','error','complete')),
2550
2551     CONSTRAINT vand_tracker_valid_action_type
2552         CHECK (action_type IN ('upload', 'enqueue', 'import')),
2553
2554     CONSTRAINT vand_tracker_valid_record_type
2555         CHECK (record_type IN ('bib', 'authority'))
2556 );
2557
2558 COMMIT;
2559