]> git.evergreen-ils.org Git - working/Evergreen.git/blob - Open-ILS/src/sql/Pg/012.schema.vandelay.sql
LP#1304559 Fix slow Vandelay-based imports
[working/Evergreen.git] / Open-ILS / src / sql / Pg / 012.schema.vandelay.sql
1 DROP SCHEMA IF EXISTS vandelay CASCADE;
2
3 BEGIN;
4
5 CREATE SCHEMA vandelay;
6
7 CREATE TABLE vandelay.match_set (
8     id      SERIAL  PRIMARY KEY,
9     name    TEXT        NOT NULL,
10     owner   INT     NOT NULL REFERENCES actor.org_unit (id) ON DELETE CASCADE,
11     mtype   TEXT        NOT NULL DEFAULT 'biblio', -- 'biblio','authority','mfhd'?, others?
12     CONSTRAINT name_once_per_owner_mtype UNIQUE (name, owner, mtype)
13 );
14
15 -- Table to define match points, either FF via SVF or tag+subfield
16 CREATE TABLE vandelay.match_set_point (
17     id          SERIAL  PRIMARY KEY,
18     match_set   INT     REFERENCES vandelay.match_set (id) ON DELETE CASCADE,
19     parent      INT     REFERENCES vandelay.match_set_point (id),
20     bool_op     TEXT    CHECK (bool_op IS NULL OR (bool_op IN ('AND','OR','NOT'))),
21     svf         TEXT    REFERENCES config.record_attr_definition (name),
22     tag         TEXT,
23     subfield    TEXT,
24     negate      BOOL    DEFAULT FALSE,
25     quality     INT     NOT NULL DEFAULT 1, -- higher is better
26     CONSTRAINT vmsp_need_a_subfield_with_a_tag CHECK ((tag IS NOT NULL AND subfield IS NOT NULL) OR tag IS NULL),
27     CONSTRAINT vmsp_need_a_tag_or_a_ff_or_a_bo CHECK (
28         (tag IS NOT NULL AND svf IS NULL AND bool_op IS NULL) OR
29         (tag IS NULL AND svf IS NOT NULL AND bool_op IS NULL) OR
30         (tag IS NULL AND svf IS NULL AND bool_op IS NOT NULL)
31     )
32 );
33
34 CREATE TABLE vandelay.match_set_quality (
35     id          SERIAL  PRIMARY KEY,
36     match_set   INT     NOT NULL REFERENCES vandelay.match_set (id) ON DELETE CASCADE,
37     svf         TEXT    REFERENCES config.record_attr_definition,
38     tag         TEXT,
39     subfield    TEXT,
40     value       TEXT    NOT NULL,
41     quality     INT     NOT NULL DEFAULT 1, -- higher is better
42     CONSTRAINT vmsq_need_a_subfield_with_a_tag CHECK ((tag IS NOT NULL AND subfield IS NOT NULL) OR tag IS NULL),
43     CONSTRAINT vmsq_need_a_tag_or_a_ff CHECK ((tag IS NOT NULL AND svf IS NULL) OR (tag IS NULL AND svf IS NOT NULL))
44 );
45 CREATE UNIQUE INDEX vmsq_def_once_per_set ON vandelay.match_set_quality (match_set, COALESCE(tag,''), COALESCE(subfield,''), COALESCE(svf,''), value);
46
47
48 CREATE TABLE vandelay.queue (
49         id                              BIGSERIAL       PRIMARY KEY,
50         owner                   INT                     NOT NULL REFERENCES actor.usr (id) DEFERRABLE INITIALLY DEFERRED,
51         name                    TEXT            NOT NULL,
52         complete                BOOL            NOT NULL DEFAULT FALSE,
53     match_set       INT         REFERENCES vandelay.match_set (id) ON UPDATE CASCADE ON DELETE SET NULL DEFERRABLE INITIALLY DEFERRED
54 );
55
56 CREATE TABLE vandelay.queued_record (
57     id                  BIGSERIAL                   PRIMARY KEY,
58     create_time TIMESTAMP WITH TIME ZONE    NOT NULL DEFAULT NOW(),
59     import_time TIMESTAMP WITH TIME ZONE,
60         purpose         TEXT                                            NOT NULL DEFAULT 'import' CHECK (purpose IN ('import','overlay')),
61     marc                TEXT                        NOT NULL,
62     quality     INT                         NOT NULL DEFAULT 0
63 );
64
65
66
67 /* Bib stuff at the top */
68 ----------------------------------------------------
69
70 CREATE TABLE vandelay.bib_attr_definition (
71         id                      SERIAL  PRIMARY KEY,
72         code            TEXT    UNIQUE NOT NULL,
73         description     TEXT,
74         xpath           TEXT    NOT NULL,
75         remove          TEXT    NOT NULL DEFAULT ''
76 );
77
78 -- Each TEXT field (other than 'name') should hold an XPath predicate for pulling the data needed
79 -- DROP TABLE vandelay.import_item_attr_definition CASCADE;
80 CREATE TABLE vandelay.import_item_attr_definition (
81     id              BIGSERIAL   PRIMARY KEY,
82     owner           INT         NOT NULL REFERENCES actor.org_unit (id) ON UPDATE CASCADE ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
83     name            TEXT        NOT NULL,
84     tag             TEXT        NOT NULL,
85     keep            BOOL        NOT NULL DEFAULT FALSE,
86     owning_lib      TEXT,
87     circ_lib        TEXT,
88     call_number     TEXT,
89     copy_number     TEXT,
90     status          TEXT,
91     location        TEXT,
92     circulate       TEXT,
93     deposit         TEXT,
94     deposit_amount  TEXT,
95     ref             TEXT,
96     holdable        TEXT,
97     price           TEXT,
98     barcode         TEXT,
99     circ_modifier   TEXT,
100     circ_as_type    TEXT,
101     alert_message   TEXT,
102     opac_visible    TEXT,
103     pub_note_title  TEXT,
104     pub_note        TEXT,
105     priv_note_title TEXT,
106     priv_note       TEXT,
107     internal_id     TEXT,
108         CONSTRAINT vand_import_item_attr_def_idx UNIQUE (owner,name)
109 );
110
111 CREATE TABLE vandelay.import_error (
112     code        TEXT    PRIMARY KEY,
113     description TEXT    NOT NULL -- i18n
114 );
115
116 CREATE TYPE vandelay.bib_queue_queue_type AS ENUM ('bib', 'acq');
117
118 CREATE TABLE vandelay.bib_queue (
119         queue_type          vandelay.bib_queue_queue_type       NOT NULL DEFAULT 'bib',
120         item_attr_def   BIGINT REFERENCES vandelay.import_item_attr_definition (id) ON DELETE SET NULL DEFERRABLE INITIALLY DEFERRED,
121     match_bucket    INTEGER, -- REFERENCES container.biblio_record_entry_bucket(id);
122         CONSTRAINT vand_bib_queue_name_once_per_owner_const UNIQUE (owner,name,queue_type)
123 ) INHERITS (vandelay.queue);
124 ALTER TABLE vandelay.bib_queue ADD PRIMARY KEY (id);
125
126 CREATE TABLE vandelay.queued_bib_record (
127         queue               INT         NOT NULL REFERENCES vandelay.bib_queue (id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
128         bib_source          INT         REFERENCES config.bib_source (id) DEFERRABLE INITIALLY DEFERRED,
129         imported_as     BIGINT  REFERENCES biblio.record_entry (id) DEFERRABLE INITIALLY DEFERRED,
130         import_error    TEXT    REFERENCES vandelay.import_error (code) ON DELETE SET NULL ON UPDATE CASCADE DEFERRABLE INITIALLY DEFERRED,
131         error_detail    TEXT
132 ) INHERITS (vandelay.queued_record);
133 ALTER TABLE vandelay.queued_bib_record ADD PRIMARY KEY (id);
134 CREATE INDEX queued_bib_record_queue_idx ON vandelay.queued_bib_record (queue);
135
136 CREATE TABLE vandelay.queued_bib_record_attr (
137         id                      BIGSERIAL       PRIMARY KEY,
138         record          BIGINT          NOT NULL REFERENCES vandelay.queued_bib_record (id) DEFERRABLE INITIALLY DEFERRED,
139         field           INT                     NOT NULL REFERENCES vandelay.bib_attr_definition (id) DEFERRABLE INITIALLY DEFERRED,
140         attr_value      TEXT            NOT NULL
141 );
142 CREATE INDEX queued_bib_record_attr_record_idx ON vandelay.queued_bib_record_attr (record);
143
144 CREATE TABLE vandelay.bib_match (
145         id                              BIGSERIAL       PRIMARY KEY,
146         queued_record   BIGINT          REFERENCES vandelay.queued_bib_record (id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
147         eg_record               BIGINT          REFERENCES biblio.record_entry (id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
148     quality         INT         NOT NULL DEFAULT 1,
149     match_score     INT         NOT NULL DEFAULT 0
150 );
151
152 CREATE TABLE vandelay.import_item (
153     id              BIGSERIAL   PRIMARY KEY,
154     record          BIGINT      NOT NULL REFERENCES vandelay.queued_bib_record (id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
155     definition      BIGINT      NOT NULL REFERENCES vandelay.import_item_attr_definition (id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
156         import_error    TEXT        REFERENCES vandelay.import_error (code) ON DELETE SET NULL ON UPDATE CASCADE DEFERRABLE INITIALLY DEFERRED,
157         error_detail    TEXT,
158     imported_as     BIGINT,
159     import_time     TIMESTAMP WITH TIME ZONE,
160     owning_lib      INT,
161     circ_lib        INT,
162     call_number     TEXT,
163     copy_number     INT,
164     status          INT,
165     location        INT,
166     circulate       BOOL,
167     deposit         BOOL,
168     deposit_amount  NUMERIC(8,2),
169     ref             BOOL,
170     holdable        BOOL,
171     price           NUMERIC(8,2),
172     barcode         TEXT,
173     circ_modifier   TEXT,
174     circ_as_type    TEXT,
175     alert_message   TEXT,
176     pub_note        TEXT,
177     priv_note       TEXT,
178     opac_visible    BOOL,
179     internal_id     BIGINT -- queue_type == 'acq' ? acq.lineitem_detail.id : asset.copy.id
180 );
181
182 CREATE TABLE vandelay.import_bib_trash_group(
183     id           SERIAL  PRIMARY KEY,
184     owner        INTEGER NOT NULL REFERENCES actor.org_unit(id),
185     label        TEXT    NOT NULL, --i18n
186     always_apply BOOLEAN NOT NULL DEFAULT FALSE,
187         CONSTRAINT vand_import_bib_trash_grp_owner_label UNIQUE (owner, label)
188 );
189  
190 CREATE TABLE vandelay.import_bib_trash_fields (
191     id         BIGSERIAL PRIMARY KEY,
192     grp        INTEGER   NOT NULL REFERENCES vandelay.import_bib_trash_group,
193     field      TEXT      NOT NULL,
194     CONSTRAINT vand_import_bib_trash_fields_once_per UNIQUE (grp, field)
195 );
196
197 CREATE TABLE vandelay.merge_profile (
198     id              BIGSERIAL   PRIMARY KEY,
199     owner           INT         NOT NULL REFERENCES actor.org_unit (id) ON UPDATE CASCADE ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
200     name            TEXT        NOT NULL,
201     add_spec        TEXT,
202     replace_spec    TEXT,
203     strip_spec      TEXT,
204     preserve_spec   TEXT,
205     lwm_ratio       NUMERIC,
206         CONSTRAINT vand_merge_prof_owner_name_idx UNIQUE (owner,name),
207         CONSTRAINT add_replace_strip_or_preserve CHECK ((preserve_spec IS NOT NULL OR replace_spec IS NOT NULL) OR (preserve_spec IS NULL AND replace_spec IS NULL))
208 );
209
210 CREATE OR REPLACE FUNCTION vandelay.marc21_record_type( marc TEXT ) RETURNS config.marc21_rec_type_map AS $func$
211 DECLARE
212     ldr         TEXT;
213     tval        TEXT;
214     tval_rec    RECORD;
215     bval        TEXT;
216     bval_rec    RECORD;
217     retval      config.marc21_rec_type_map%ROWTYPE;
218 BEGIN
219     ldr := oils_xpath_string( '//*[local-name()="leader"]', marc );
220
221     IF ldr IS NULL OR ldr = '' THEN
222         SELECT * INTO retval FROM config.marc21_rec_type_map WHERE code = 'BKS';
223         RETURN retval;
224     END IF;
225
226     SELECT * INTO tval_rec FROM config.marc21_ff_pos_map WHERE fixed_field = 'Type' LIMIT 1; -- They're all the same
227     SELECT * INTO bval_rec FROM config.marc21_ff_pos_map WHERE fixed_field = 'BLvl' LIMIT 1; -- They're all the same
228
229
230     tval := SUBSTRING( ldr, tval_rec.start_pos + 1, tval_rec.length );
231     bval := SUBSTRING( ldr, bval_rec.start_pos + 1, bval_rec.length );
232
233     -- RAISE NOTICE 'type %, blvl %, ldr %', tval, bval, ldr;
234
235     SELECT * INTO retval FROM config.marc21_rec_type_map WHERE type_val LIKE '%' || tval || '%' AND blvl_val LIKE '%' || bval || '%';
236
237
238     IF retval.code IS NULL THEN
239         SELECT * INTO retval FROM config.marc21_rec_type_map WHERE code = 'BKS';
240     END IF;
241
242     RETURN retval;
243 END;
244 $func$ LANGUAGE PLPGSQL;
245
246 CREATE OR REPLACE FUNCTION vandelay.marc21_extract_fixed_field( marc TEXT, ff TEXT ) RETURNS TEXT AS $func$
247 DECLARE
248     rtype       TEXT;
249     ff_pos      RECORD;
250     tag_data    RECORD;
251     val         TEXT;
252 BEGIN
253     rtype := (vandelay.marc21_record_type( marc )).code;
254     FOR ff_pos IN SELECT * FROM config.marc21_ff_pos_map WHERE fixed_field = ff AND rec_type = rtype ORDER BY tag DESC LOOP
255         FOR tag_data IN SELECT value FROM UNNEST( oils_xpath( '//*[@tag="' || UPPER(ff_pos.tag) || '"]/text()', marc ) ) x(value) LOOP
256             val := SUBSTRING( tag_data.value, ff_pos.start_pos + 1, ff_pos.length );
257             RETURN val;
258         END LOOP;
259         val := REPEAT( ff_pos.default_val, ff_pos.length );
260         RETURN val;
261     END LOOP;
262
263     RETURN NULL;
264 END;
265 $func$ LANGUAGE PLPGSQL;
266
267 CREATE TYPE biblio.record_ff_map AS (record BIGINT, ff_name TEXT, ff_value TEXT);
268 CREATE OR REPLACE FUNCTION vandelay.marc21_extract_all_fixed_fields( marc TEXT ) RETURNS SETOF biblio.record_ff_map AS $func$
269 DECLARE
270     tag_data    TEXT;
271     rtype       TEXT;
272     ff_pos      RECORD;
273     output      biblio.record_ff_map%ROWTYPE;
274 BEGIN
275     rtype := (vandelay.marc21_record_type( marc )).code;
276
277     FOR ff_pos IN SELECT * FROM config.marc21_ff_pos_map WHERE rec_type = rtype ORDER BY tag DESC LOOP
278         output.ff_name  := ff_pos.fixed_field;
279         output.ff_value := NULL;
280
281         FOR tag_data IN SELECT value FROM UNNEST( oils_xpath( '//*[@tag="' || UPPER(tag) || '"]/text()', marc ) ) x(value) LOOP
282             output.ff_value := SUBSTRING( tag_data.value, ff_pos.start_pos + 1, ff_pos.length );
283             IF output.ff_value IS NULL THEN output.ff_value := REPEAT( ff_pos.default_val, ff_pos.length ); END IF;
284             RETURN NEXT output;
285             output.ff_value := NULL;
286         END LOOP;
287
288     END LOOP;
289
290     RETURN;
291 END;
292 $func$ LANGUAGE PLPGSQL;
293
294 CREATE TYPE biblio.marc21_physical_characteristics AS ( id INT, record BIGINT, ptype TEXT, subfield INT, value INT );
295 CREATE OR REPLACE FUNCTION vandelay.marc21_physical_characteristics( marc TEXT) RETURNS SETOF biblio.marc21_physical_characteristics AS $func$
296 DECLARE
297     rowid   INT := 0;
298     _007    TEXT;
299     ptype   config.marc21_physical_characteristic_type_map%ROWTYPE;
300     psf     config.marc21_physical_characteristic_subfield_map%ROWTYPE;
301     pval    config.marc21_physical_characteristic_value_map%ROWTYPE;
302     retval  biblio.marc21_physical_characteristics%ROWTYPE;
303 BEGIN
304
305     FOR _007 IN SELECT oils_xpath_string('//*', value) FROM UNNEST(oils_xpath('//*[@tag="007"]', marc)) x(value) LOOP
306         IF _007 IS NOT NULL AND _007 <> '' THEN
307             SELECT * INTO ptype FROM config.marc21_physical_characteristic_type_map WHERE ptype_key = SUBSTRING( _007, 1, 1 );
308
309             IF ptype.ptype_key IS NOT NULL THEN
310                 FOR psf IN SELECT * FROM config.marc21_physical_characteristic_subfield_map WHERE ptype_key = ptype.ptype_key LOOP
311                     SELECT * INTO pval FROM config.marc21_physical_characteristic_value_map WHERE ptype_subfield = psf.id AND value = SUBSTRING( _007, psf.start_pos + 1, psf.length );
312
313                     IF pval.id IS NOT NULL THEN
314                         rowid := rowid + 1;
315                         retval.id := rowid;
316                         retval.ptype := ptype.ptype_key;
317                         retval.subfield := psf.id;
318                         retval.value := pval.id;
319                         RETURN NEXT retval;
320                     END IF;
321
322                 END LOOP;
323             END IF;
324         END IF;
325     END LOOP;
326
327     RETURN;
328 END;
329 $func$ LANGUAGE PLPGSQL;
330
331 CREATE TYPE vandelay.flat_marc AS ( tag CHAR(3), ind1 TEXT, ind2 TEXT, subfield TEXT, value TEXT );
332 CREATE OR REPLACE FUNCTION vandelay.flay_marc ( TEXT ) RETURNS SETOF vandelay.flat_marc AS $func$
333
334 use MARC::Record;
335 use MARC::File::XML (BinaryEncoding => 'UTF-8');
336 use MARC::Charset;
337 use strict;
338
339 MARC::Charset->assume_unicode(1);
340
341 my $xml = shift;
342 my $r = MARC::Record->new_from_xml( $xml );
343
344 return_next( { tag => 'LDR', value => $r->leader } );
345
346 for my $f ( $r->fields ) {
347     if ($f->is_control_field) {
348         return_next({ tag => $f->tag, value => $f->data });
349     } else {
350         for my $s ($f->subfields) {
351             return_next({
352                 tag      => $f->tag,
353                 ind1     => $f->indicator(1),
354                 ind2     => $f->indicator(2),
355                 subfield => $s->[0],
356                 value    => $s->[1]
357             });
358
359             if ( $f->tag eq '245' and $s->[0] eq 'a' ) {
360                 my $trim = $f->indicator(2) || 0;
361                 return_next({
362                     tag      => 'tnf',
363                     ind1     => $f->indicator(1),
364                     ind2     => $f->indicator(2),
365                     subfield => 'a',
366                     value    => substr( $s->[1], $trim )
367                 });
368             }
369         }
370     }
371 }
372
373 return undef;
374
375 $func$ LANGUAGE PLPERLU;
376
377 CREATE OR REPLACE FUNCTION vandelay.flatten_marc ( marc TEXT ) RETURNS SETOF vandelay.flat_marc AS $func$
378 DECLARE
379     output  vandelay.flat_marc%ROWTYPE;
380     field   RECORD;
381 BEGIN
382     FOR field IN SELECT * FROM vandelay.flay_marc( marc ) LOOP
383         output.ind1 := field.ind1;
384         output.ind2 := field.ind2;
385         output.tag := field.tag;
386         output.subfield := field.subfield;
387         IF field.subfield IS NOT NULL AND field.tag NOT IN ('020','022','024') THEN -- exclude standard numbers and control fields
388             output.value := naco_normalize(field.value, field.subfield);
389         ELSE
390             output.value := field.value;
391         END IF;
392
393         CONTINUE WHEN output.value IS NULL;
394
395         RETURN NEXT output;
396     END LOOP;
397 END;
398 $func$ LANGUAGE PLPGSQL;
399
400 CREATE OR REPLACE FUNCTION vandelay.extract_rec_attrs ( xml TEXT, attr_defs TEXT[]) RETURNS hstore AS $_$
401 DECLARE
402     transformed_xml TEXT;
403     prev_xfrm       TEXT;
404     normalizer      RECORD;
405     xfrm            config.xml_transform%ROWTYPE;
406     attr_value      TEXT;
407     new_attrs       HSTORE := ''::HSTORE;
408     attr_def        config.record_attr_definition%ROWTYPE;
409 BEGIN
410
411     FOR attr_def IN SELECT * FROM config.record_attr_definition WHERE name IN (SELECT * FROM UNNEST(attr_defs)) ORDER BY format LOOP
412
413         IF attr_def.tag IS NOT NULL THEN -- tag (and optional subfield list) selection
414             SELECT  STRING_AGG(x.value, COALESCE(attr_def.joiner,' ')) INTO attr_value
415               FROM  vandelay.flatten_marc(xml) AS x
416               WHERE x.tag LIKE attr_def.tag
417                     AND CASE
418                         WHEN attr_def.sf_list IS NOT NULL
419                             THEN POSITION(x.subfield IN attr_def.sf_list) > 0
420                         ELSE TRUE
421                         END
422               GROUP BY x.tag
423               ORDER BY x.tag
424               LIMIT 1;
425
426         ELSIF attr_def.fixed_field IS NOT NULL THEN -- a named fixed field, see config.marc21_ff_pos_map.fixed_field
427             attr_value := vandelay.marc21_extract_fixed_field(xml, attr_def.fixed_field);
428
429         ELSIF attr_def.xpath IS NOT NULL THEN -- and xpath expression
430
431             SELECT INTO xfrm * FROM config.xml_transform WHERE name = attr_def.format;
432
433             -- See if we can skip the XSLT ... it's expensive
434             IF prev_xfrm IS NULL OR prev_xfrm <> xfrm.name THEN
435                 -- Can't skip the transform
436                 IF xfrm.xslt <> '---' THEN
437                     transformed_xml := oils_xslt_process(xml,xfrm.xslt);
438                 ELSE
439                     transformed_xml := xml;
440                 END IF;
441
442                 prev_xfrm := xfrm.name;
443             END IF;
444
445             IF xfrm.name IS NULL THEN
446                 -- just grab the marcxml (empty) transform
447                 SELECT INTO xfrm * FROM config.xml_transform WHERE xslt = '---' LIMIT 1;
448                 prev_xfrm := xfrm.name;
449             END IF;
450
451             attr_value := oils_xpath_string(attr_def.xpath, transformed_xml, COALESCE(attr_def.joiner,' '), ARRAY[ARRAY[xfrm.prefix, xfrm.namespace_uri]]);
452
453         ELSIF attr_def.phys_char_sf IS NOT NULL THEN -- a named Physical Characteristic, see config.marc21_physical_characteristic_*_map
454             SELECT  m.value::TEXT INTO attr_value
455               FROM  vandelay.marc21_physical_characteristics(xml) v
456                     JOIN config.marc21_physical_characteristic_value_map m ON (m.id = v.value)
457               WHERE v.subfield = attr_def.phys_char_sf
458               LIMIT 1; -- Just in case ...
459
460         END IF;
461
462         -- apply index normalizers to attr_value
463         FOR normalizer IN
464             SELECT  n.func AS func,
465                     n.param_count AS param_count,
466                     m.params AS params
467               FROM  config.index_normalizer n
468                     JOIN config.record_attr_index_norm_map m ON (m.norm = n.id)
469               WHERE attr = attr_def.name
470               ORDER BY m.pos LOOP
471                 EXECUTE 'SELECT ' || normalizer.func || '(' ||
472                     quote_nullable( attr_value ) ||
473                     CASE
474                         WHEN normalizer.param_count > 0
475                             THEN ',' || REPLACE(REPLACE(BTRIM(normalizer.params,'[]'),E'\'',E'\\\''),E'"',E'\'')
476                             ELSE ''
477                         END ||
478                     ')' INTO attr_value;
479
480         END LOOP;
481
482         -- Add the new value to the hstore
483         new_attrs := new_attrs || hstore( attr_def.name, attr_value );
484
485     END LOOP;
486
487     RETURN new_attrs;
488 END;
489 $_$ LANGUAGE PLPGSQL;
490
491 CREATE OR REPLACE FUNCTION vandelay.extract_rec_attrs ( xml TEXT ) RETURNS hstore AS $_$
492     SELECT vandelay.extract_rec_attrs( $1, (SELECT ARRAY_AGG(name) FROM config.record_attr_definition));
493 $_$ LANGUAGE SQL;
494
495 -- Everything between this comment and the beginning of the definition of
496 -- vandelay.match_bib_record() is strictly in service of that function.
497 CREATE TYPE vandelay.match_set_test_result AS (record BIGINT, quality INTEGER);
498
499 CREATE OR REPLACE FUNCTION vandelay.match_set_test_marcxml(
500     match_set_id INTEGER, record_xml TEXT, bucket_id INTEGER 
501 ) RETURNS SETOF vandelay.match_set_test_result AS $$
502 DECLARE
503     tags_rstore HSTORE;
504     svf_rstore  HSTORE;
505     coal        TEXT;
506     joins       TEXT;
507     query_      TEXT;
508     wq          TEXT;
509     qvalue      INTEGER;
510     rec         RECORD;
511 BEGIN
512     tags_rstore := vandelay.flatten_marc_hstore(record_xml);
513     svf_rstore := vandelay.extract_rec_attrs(record_xml);
514
515     CREATE TEMPORARY TABLE _vandelay_tmp_qrows (q INTEGER);
516     CREATE TEMPORARY TABLE _vandelay_tmp_jrows (j TEXT);
517
518     -- generate the where clause and return that directly (into wq), and as
519     -- a side-effect, populate the _vandelay_tmp_[qj]rows tables.
520     wq := vandelay.get_expr_from_match_set(match_set_id, tags_rstore);
521
522     query_ := 'SELECT DISTINCT(record), ';
523
524     -- qrows table is for the quality bits we add to the SELECT clause
525     SELECT STRING_AGG(
526         'COALESCE(n' || q::TEXT || '.quality, 0)', ' + '
527     ) INTO coal FROM _vandelay_tmp_qrows;
528
529     -- our query string so far is the SELECT clause and the inital FROM.
530     -- no JOINs yet nor the WHERE clause
531     query_ := query_ || coal || ' AS quality ' || E'\n';
532
533     -- jrows table is for the joins we must make (and the real text conditions)
534     SELECT STRING_AGG(j, E'\n') INTO joins
535         FROM _vandelay_tmp_jrows;
536
537     -- add those joins and the where clause to our query.
538     query_ := query_ || joins || E'\n';
539
540     -- join the record bucket
541     IF bucket_id IS NOT NULL THEN
542         query_ := query_ || 'JOIN container.biblio_record_entry_bucket_item ' ||
543             'brebi ON (brebi.target_biblio_record_entry = record ' ||
544             'AND brebi.bucket = ' || bucket_id || E')\n';
545     END IF;
546
547     query_ := query_ || 'JOIN biblio.record_entry bre ON (bre.id = record) ' || 'WHERE ' || wq || ' AND not bre.deleted';
548
549     -- this will return rows of record,quality
550     FOR rec IN EXECUTE query_ USING tags_rstore, svf_rstore LOOP
551         RETURN NEXT rec;
552     END LOOP;
553
554     DROP TABLE _vandelay_tmp_qrows;
555     DROP TABLE _vandelay_tmp_jrows;
556     RETURN;
557 END;
558 $$ LANGUAGE PLPGSQL;
559
560
561 CREATE OR REPLACE FUNCTION vandelay.flatten_marc_hstore(
562     record_xml TEXT
563 ) RETURNS HSTORE AS $func$
564 BEGIN
565     RETURN (SELECT
566         HSTORE(
567             ARRAY_AGG(tag || (COALESCE(subfield, ''))),
568             ARRAY_AGG(value)
569         )
570         FROM (
571             SELECT  tag, subfield, ARRAY_AGG(value)::TEXT AS value
572               FROM  (SELECT tag,
573                             subfield,
574                             CASE WHEN tag = '020' THEN -- caseless -- isbn
575                                 LOWER((REGEXP_MATCHES(value,$$^(\S{10,17})$$))[1] || '%')
576                             WHEN tag = '022' THEN -- caseless -- issn
577                                 LOWER((REGEXP_MATCHES(value,$$^(\S{4}[- ]?\S{4})$$))[1] || '%')
578                             WHEN tag = '024' THEN -- caseless -- upc (other)
579                                 LOWER(value || '%')
580                             ELSE
581                                 value
582                             END AS value
583                       FROM  vandelay.flatten_marc(record_xml)) x
584                 GROUP BY tag, subfield ORDER BY tag, subfield
585         ) subquery
586     );
587 END;
588 $func$ LANGUAGE PLPGSQL;
589
590 CREATE OR REPLACE FUNCTION vandelay.get_expr_from_match_set(
591     match_set_id INTEGER,
592     tags_rstore HSTORE
593 ) RETURNS TEXT AS $$
594 DECLARE
595     root    vandelay.match_set_point;
596 BEGIN
597     SELECT * INTO root FROM vandelay.match_set_point
598         WHERE parent IS NULL AND match_set = match_set_id;
599
600     RETURN vandelay.get_expr_from_match_set_point(root, tags_rstore);
601 END;
602 $$  LANGUAGE PLPGSQL;
603
604 CREATE OR REPLACE FUNCTION vandelay.get_expr_from_match_set_point(
605     node vandelay.match_set_point,
606     tags_rstore HSTORE
607 ) RETURNS TEXT AS $$
608 DECLARE
609     q           TEXT;
610     i           INTEGER;
611     this_op     TEXT;
612     children    INTEGER[];
613     child       vandelay.match_set_point;
614 BEGIN
615     SELECT ARRAY_AGG(id) INTO children FROM vandelay.match_set_point
616         WHERE parent = node.id;
617
618     IF ARRAY_LENGTH(children, 1) > 0 THEN
619         this_op := vandelay._get_expr_render_one(node);
620         q := '(';
621         i := 1;
622         WHILE children[i] IS NOT NULL LOOP
623             SELECT * INTO child FROM vandelay.match_set_point
624                 WHERE id = children[i];
625             IF i > 1 THEN
626                 q := q || ' ' || this_op || ' ';
627             END IF;
628             i := i + 1;
629             q := q || vandelay.get_expr_from_match_set_point(child, tags_rstore);
630         END LOOP;
631         q := q || ')';
632         RETURN q;
633     ELSIF node.bool_op IS NULL THEN
634         PERFORM vandelay._get_expr_push_qrow(node);
635         PERFORM vandelay._get_expr_push_jrow(node, tags_rstore);
636         RETURN vandelay._get_expr_render_one(node);
637     ELSE
638         RETURN '';
639     END IF;
640 END;
641 $$  LANGUAGE PLPGSQL;
642
643 CREATE OR REPLACE FUNCTION vandelay._get_expr_push_qrow(
644     node vandelay.match_set_point
645 ) RETURNS VOID AS $$
646 DECLARE
647 BEGIN
648     INSERT INTO _vandelay_tmp_qrows (q) VALUES (node.id);
649 END;
650 $$ LANGUAGE PLPGSQL;
651
652 CREATE OR REPLACE FUNCTION vandelay._get_expr_push_jrow(
653     node vandelay.match_set_point,
654     tags_rstore HSTORE
655 ) RETURNS VOID AS $$
656 DECLARE
657     jrow        TEXT;
658     my_alias    TEXT;
659     op          TEXT;
660     tagkey      TEXT;
661     caseless    BOOL;
662     jrow_count  INT;
663     my_using    TEXT;
664     my_join     TEXT;
665 BEGIN
666     -- remember $1 is tags_rstore, and $2 is svf_rstore
667
668     caseless := FALSE;
669     SELECT COUNT(*) INTO jrow_count FROM _vandelay_tmp_jrows;
670     IF jrow_count > 0 THEN
671         my_using := ' USING (record)';
672         my_join := 'FULL OUTER JOIN';
673     ELSE
674         my_using := '';
675         my_join := 'FROM';
676     END IF;
677
678     IF node.tag IS NOT NULL THEN
679         caseless := (node.tag IN ('020', '022', '024'));
680         tagkey := node.tag;
681         IF node.subfield IS NOT NULL THEN
682             tagkey := tagkey || node.subfield;
683         END IF;
684     END IF;
685
686     IF node.negate THEN
687         IF caseless THEN
688             op := 'NOT LIKE';
689         ELSE
690             op := '<>';
691         END IF;
692     ELSE
693         IF caseless THEN
694             op := 'LIKE';
695         ELSE
696             op := '=';
697         END IF;
698     END IF;
699
700     my_alias := 'n' || node.id::TEXT;
701
702     jrow := my_join || ' (SELECT *, ';
703     IF node.tag IS NOT NULL THEN
704         jrow := jrow  || node.quality ||
705             ' AS quality FROM metabib.full_rec mfr WHERE mfr.tag = ''' ||
706             node.tag || '''';
707         IF node.subfield IS NOT NULL THEN
708             jrow := jrow || ' AND mfr.subfield = ''' ||
709                 node.subfield || '''';
710         END IF;
711         jrow := jrow || ' AND (';
712         jrow := jrow || vandelay._node_tag_comparisons(caseless, op, tags_rstore, tagkey);
713         jrow := jrow || ')) ' || my_alias || my_using || E'\n';
714     ELSE    -- svf
715         jrow := jrow || 'id AS record, ' || node.quality ||
716             ' AS quality FROM metabib.record_attr_flat mraf WHERE mraf.attr = ''' ||
717             node.svf || ''' AND mraf.value ' || op || ' $2->''' || node.svf || ''') ' ||
718             my_alias || my_using || E'\n';
719     END IF;
720     INSERT INTO _vandelay_tmp_jrows (j) VALUES (jrow);
721 END;
722 $$ LANGUAGE PLPGSQL;
723
724 CREATE OR REPLACE FUNCTION vandelay._node_tag_comparisons(
725     caseless BOOLEAN,
726     op TEXT,
727     tags_rstore HSTORE,
728     tagkey TEXT
729 ) RETURNS TEXT AS $$
730 DECLARE
731     result  TEXT;
732     i       INT;
733     vals    TEXT[];
734 BEGIN
735     i := 1;
736     vals := tags_rstore->tagkey;
737     result := '';
738
739     WHILE TRUE LOOP
740         IF i > 1 THEN
741             IF vals[i] IS NULL THEN
742                 EXIT;
743             ELSE
744                 result := result || ' OR ';
745             END IF;
746         END IF;
747
748         IF caseless THEN
749             result := result || 'LOWER(mfr.value) ' || op;
750         ELSE
751             result := result || 'mfr.value ' || op;
752         END IF;
753
754         result := result || ' ' || COALESCE('''' || vals[i] || '''', 'NULL');
755
756         IF vals[i] IS NULL THEN
757             EXIT;
758         END IF;
759         i := i + 1;
760     END LOOP;
761
762     RETURN result;
763
764 END;
765 $$ LANGUAGE PLPGSQL;
766
767 CREATE OR REPLACE FUNCTION vandelay._get_expr_render_one(
768     node vandelay.match_set_point
769 ) RETURNS TEXT AS $$
770 DECLARE
771     s           TEXT;
772 BEGIN
773     IF node.bool_op IS NOT NULL THEN
774         RETURN node.bool_op;
775     ELSE
776         RETURN '(n' || node.id::TEXT || '.id IS NOT NULL)';
777     END IF;
778 END;
779 $$ LANGUAGE PLPGSQL;
780
781 CREATE OR REPLACE FUNCTION vandelay.match_bib_record() RETURNS TRIGGER AS $func$
782 DECLARE
783     incoming_existing_id    TEXT;
784     test_result             vandelay.match_set_test_result%ROWTYPE;
785     tmp_rec                 BIGINT;
786     match_set               INT;
787     match_bucket            INT;
788 BEGIN
789     IF TG_OP IN ('INSERT','UPDATE') AND NEW.imported_as IS NOT NULL THEN
790         RETURN NEW;
791     END IF;
792
793     DELETE FROM vandelay.bib_match WHERE queued_record = NEW.id;
794
795     SELECT q.match_set INTO match_set FROM vandelay.bib_queue q WHERE q.id = NEW.queue;
796
797     IF match_set IS NOT NULL THEN
798         NEW.quality := vandelay.measure_record_quality( NEW.marc, match_set );
799     END IF;
800
801     -- Perfect matches on 901$c exit early with a match with high quality.
802     incoming_existing_id :=
803         oils_xpath_string('//*[@tag="901"]/*[@code="c"][1]', NEW.marc);
804
805     IF incoming_existing_id IS NOT NULL AND incoming_existing_id != '' THEN
806         SELECT id INTO tmp_rec FROM biblio.record_entry WHERE id = incoming_existing_id::bigint;
807         IF tmp_rec IS NOT NULL THEN
808             INSERT INTO vandelay.bib_match (queued_record, eg_record, match_score, quality) 
809                 SELECT
810                     NEW.id, 
811                     b.id,
812                     9999,
813                     -- note: no match_set means quality==0
814                     vandelay.measure_record_quality( b.marc, match_set )
815                 FROM biblio.record_entry b
816                 WHERE id = incoming_existing_id::bigint;
817         END IF;
818     END IF;
819
820     IF match_set IS NULL THEN
821         RETURN NEW;
822     END IF;
823
824     SELECT q.match_bucket INTO match_bucket FROM vandelay.bib_queue q WHERE q.id = NEW.queue;
825
826     FOR test_result IN SELECT * FROM
827         vandelay.match_set_test_marcxml(match_set, NEW.marc, match_bucket) LOOP
828
829         INSERT INTO vandelay.bib_match ( queued_record, eg_record, match_score, quality )
830             SELECT  
831                 NEW.id,
832                 test_result.record,
833                 test_result.quality,
834                 vandelay.measure_record_quality( b.marc, match_set )
835                 FROM  biblio.record_entry b
836                 WHERE id = test_result.record;
837
838     END LOOP;
839
840     RETURN NEW;
841 END;
842 $func$ LANGUAGE PLPGSQL;
843
844 CREATE OR REPLACE FUNCTION vandelay.measure_record_quality ( xml TEXT, match_set_id INT ) RETURNS INT AS $_$
845 DECLARE
846     out_q   INT := 0;
847     rvalue  TEXT;
848     test    vandelay.match_set_quality%ROWTYPE;
849 BEGIN
850
851     FOR test IN SELECT * FROM vandelay.match_set_quality WHERE match_set = match_set_id LOOP
852         IF test.tag IS NOT NULL THEN
853             FOR rvalue IN SELECT value FROM vandelay.flatten_marc( xml ) WHERE tag = test.tag AND subfield = test.subfield LOOP
854                 IF test.value = rvalue THEN
855                     out_q := out_q + test.quality;
856                 END IF;
857             END LOOP;
858         ELSE
859             IF test.value = vandelay.extract_rec_attrs(xml, ARRAY[test.svf]) -> test.svf THEN
860                 out_q := out_q + test.quality;
861             END IF;
862         END IF;
863     END LOOP;
864
865     RETURN out_q;
866 END;
867 $_$ LANGUAGE PLPGSQL;
868
869 CREATE TYPE vandelay.tcn_data AS (tcn TEXT, tcn_source TEXT, used BOOL);
870 CREATE OR REPLACE FUNCTION vandelay.find_bib_tcn_data ( xml TEXT ) RETURNS SETOF vandelay.tcn_data AS $_$
871 DECLARE
872     eg_tcn          TEXT;
873     eg_tcn_source   TEXT;
874     output          vandelay.tcn_data%ROWTYPE;
875 BEGIN
876
877     -- 001/003
878     eg_tcn := BTRIM((oils_xpath('//*[@tag="001"]/text()',xml))[1]);
879     IF eg_tcn IS NOT NULL AND eg_tcn <> '' THEN
880
881         eg_tcn_source := BTRIM((oils_xpath('//*[@tag="003"]/text()',xml))[1]);
882         IF eg_tcn_source IS NULL OR eg_tcn_source = '' THEN
883             eg_tcn_source := 'System Local';
884         END IF;
885
886         PERFORM id FROM biblio.record_entry WHERE tcn_value = eg_tcn  AND NOT deleted;
887
888         IF NOT FOUND THEN
889             output.used := FALSE;
890         ELSE
891             output.used := TRUE;
892         END IF;
893
894         output.tcn := eg_tcn;
895         output.tcn_source := eg_tcn_source;
896         RETURN NEXT output;
897
898     END IF;
899
900     -- 901 ab
901     eg_tcn := BTRIM((oils_xpath('//*[@tag="901"]/*[@code="a"]/text()',xml))[1]);
902     IF eg_tcn IS NOT NULL AND eg_tcn <> '' THEN
903
904         eg_tcn_source := BTRIM((oils_xpath('//*[@tag="901"]/*[@code="b"]/text()',xml))[1]);
905         IF eg_tcn_source IS NULL OR eg_tcn_source = '' THEN
906             eg_tcn_source := 'System Local';
907         END IF;
908
909         PERFORM id FROM biblio.record_entry WHERE tcn_value = eg_tcn  AND NOT deleted;
910
911         IF NOT FOUND THEN
912             output.used := FALSE;
913         ELSE
914             output.used := TRUE;
915         END IF;
916
917         output.tcn := eg_tcn;
918         output.tcn_source := eg_tcn_source;
919         RETURN NEXT output;
920
921     END IF;
922
923     -- 039 ab
924     eg_tcn := BTRIM((oils_xpath('//*[@tag="039"]/*[@code="a"]/text()',xml))[1]);
925     IF eg_tcn IS NOT NULL AND eg_tcn <> '' THEN
926
927         eg_tcn_source := BTRIM((oils_xpath('//*[@tag="039"]/*[@code="b"]/text()',xml))[1]);
928         IF eg_tcn_source IS NULL OR eg_tcn_source = '' THEN
929             eg_tcn_source := 'System Local';
930         END IF;
931
932         PERFORM id FROM biblio.record_entry WHERE tcn_value = eg_tcn  AND NOT deleted;
933
934         IF NOT FOUND THEN
935             output.used := FALSE;
936         ELSE
937             output.used := TRUE;
938         END IF;
939
940         output.tcn := eg_tcn;
941         output.tcn_source := eg_tcn_source;
942         RETURN NEXT output;
943
944     END IF;
945
946     -- 020 a
947     eg_tcn := REGEXP_REPLACE((oils_xpath('//*[@tag="020"]/*[@code="a"]/text()',xml))[1], $re$^(\w+).*?$$re$, $re$\1$re$);
948     IF eg_tcn IS NOT NULL AND eg_tcn <> '' THEN
949
950         eg_tcn_source := 'ISBN';
951
952         PERFORM id FROM biblio.record_entry WHERE tcn_value = eg_tcn  AND NOT deleted;
953
954         IF NOT FOUND THEN
955             output.used := FALSE;
956         ELSE
957             output.used := TRUE;
958         END IF;
959
960         output.tcn := eg_tcn;
961         output.tcn_source := eg_tcn_source;
962         RETURN NEXT output;
963
964     END IF;
965
966     -- 022 a
967     eg_tcn := REGEXP_REPLACE((oils_xpath('//*[@tag="022"]/*[@code="a"]/text()',xml))[1], $re$^(\w+).*?$$re$, $re$\1$re$);
968     IF eg_tcn IS NOT NULL AND eg_tcn <> '' THEN
969
970         eg_tcn_source := 'ISSN';
971
972         PERFORM id FROM biblio.record_entry WHERE tcn_value = eg_tcn  AND NOT deleted;
973
974         IF NOT FOUND THEN
975             output.used := FALSE;
976         ELSE
977             output.used := TRUE;
978         END IF;
979
980         output.tcn := eg_tcn;
981         output.tcn_source := eg_tcn_source;
982         RETURN NEXT output;
983
984     END IF;
985
986     -- 010 a
987     eg_tcn := REGEXP_REPLACE((oils_xpath('//*[@tag="010"]/*[@code="a"]/text()',xml))[1], $re$^(\w+).*?$$re$, $re$\1$re$);
988     IF eg_tcn IS NOT NULL AND eg_tcn <> '' THEN
989
990         eg_tcn_source := 'LCCN';
991
992         PERFORM id FROM biblio.record_entry WHERE tcn_value = eg_tcn  AND NOT deleted;
993
994         IF NOT FOUND THEN
995             output.used := FALSE;
996         ELSE
997             output.used := TRUE;
998         END IF;
999
1000         output.tcn := eg_tcn;
1001         output.tcn_source := eg_tcn_source;
1002         RETURN NEXT output;
1003
1004     END IF;
1005
1006     -- 035 a
1007     eg_tcn := REGEXP_REPLACE((oils_xpath('//*[@tag="035"]/*[@code="a"]/text()',xml))[1], $re$^.*?(\w+)$$re$, $re$\1$re$);
1008     IF eg_tcn IS NOT NULL AND eg_tcn <> '' THEN
1009
1010         eg_tcn_source := 'System Legacy';
1011
1012         PERFORM id FROM biblio.record_entry WHERE tcn_value = eg_tcn  AND NOT deleted;
1013
1014         IF NOT FOUND THEN
1015             output.used := FALSE;
1016         ELSE
1017             output.used := TRUE;
1018         END IF;
1019
1020         output.tcn := eg_tcn;
1021         output.tcn_source := eg_tcn_source;
1022         RETURN NEXT output;
1023
1024     END IF;
1025
1026     RETURN;
1027 END;
1028 $_$ LANGUAGE PLPGSQL;
1029
1030 CREATE OR REPLACE FUNCTION vandelay.add_field ( target_xml TEXT, source_xml TEXT, field TEXT, force_add INT ) RETURNS TEXT AS $_$
1031
1032     use MARC::Record;
1033     use MARC::File::XML (BinaryEncoding => 'UTF-8');
1034     use MARC::Charset;
1035     use strict;
1036
1037     MARC::Charset->assume_unicode(1);
1038
1039     my $target_xml = shift;
1040     my $source_xml = shift;
1041     my $field_spec = shift;
1042     my $force_add = shift || 0;
1043
1044     my $target_r = MARC::Record->new_from_xml( $target_xml );
1045     my $source_r = MARC::Record->new_from_xml( $source_xml );
1046
1047     return $target_xml unless ($target_r && $source_r);
1048
1049     my @field_list = split(',', $field_spec);
1050
1051     my %fields;
1052     for my $f (@field_list) {
1053         $f =~ s/^\s*//; $f =~ s/\s*$//;
1054         if ($f =~ /^(.{3})(\w*)(?:\[([^]]*)\])?$/) {
1055             my $field = $1;
1056             $field =~ s/\s+//;
1057             my $sf = $2;
1058             $sf =~ s/\s+//;
1059             my $match = $3;
1060             $match =~ s/^\s*//; $match =~ s/\s*$//;
1061             $fields{$field} = { sf => [ split('', $sf) ] };
1062             if ($match) {
1063                 my ($msf,$mre) = split('~', $match);
1064                 if (length($msf) > 0 and length($mre) > 0) {
1065                     $msf =~ s/^\s*//; $msf =~ s/\s*$//;
1066                     $mre =~ s/^\s*//; $mre =~ s/\s*$//;
1067                     $fields{$field}{match} = { sf => $msf, re => qr/$mre/ };
1068                 }
1069             }
1070         }
1071     }
1072
1073     for my $f ( keys %fields) {
1074         if ( @{$fields{$f}{sf}} ) {
1075             for my $from_field ($source_r->field( $f )) {
1076                 my @tos = $target_r->field( $f );
1077                 if (!@tos) {
1078                     next if (exists($fields{$f}{match}) and !$force_add);
1079                     my @new_fields = map { $_->clone } $source_r->field( $f );
1080                     $target_r->insert_fields_ordered( @new_fields );
1081                 } else {
1082                     for my $to_field (@tos) {
1083                         if (exists($fields{$f}{match})) {
1084                             next unless (grep { $_ =~ $fields{$f}{match}{re} } $to_field->subfield($fields{$f}{match}{sf}));
1085                         }
1086                         for my $old_sf ($from_field->subfields) {
1087                             $to_field->add_subfields( @$old_sf ) if grep(/$$old_sf[0]/,@{$fields{$f}{sf}});
1088                         }
1089                     }
1090                 }
1091             }
1092         } else {
1093             my @new_fields = map { $_->clone } $source_r->field( $f );
1094             $target_r->insert_fields_ordered( @new_fields );
1095         }
1096     }
1097
1098     $target_xml = $target_r->as_xml_record;
1099     $target_xml =~ s/^<\?.+?\?>$//mo;
1100     $target_xml =~ s/\n//sgo;
1101     $target_xml =~ s/>\s+</></sgo;
1102
1103     return $target_xml;
1104
1105 $_$ LANGUAGE PLPERLU;
1106
1107 CREATE OR REPLACE FUNCTION vandelay.add_field ( target_xml TEXT, source_xml TEXT, field TEXT ) RETURNS TEXT AS $_$
1108     SELECT vandelay.add_field( $1, $2, $3, 0 );
1109 $_$ LANGUAGE SQL;
1110
1111 CREATE OR REPLACE FUNCTION vandelay.strip_field ( xml TEXT, field TEXT ) RETURNS TEXT AS $_$
1112
1113     use MARC::Record;
1114     use MARC::File::XML (BinaryEncoding => 'UTF-8');
1115     use MARC::Charset;
1116     use strict;
1117
1118     MARC::Charset->assume_unicode(1);
1119
1120     my $xml = shift;
1121     my $r = MARC::Record->new_from_xml( $xml );
1122
1123     return $xml unless ($r);
1124
1125     my $field_spec = shift;
1126     my @field_list = split(',', $field_spec);
1127
1128     my %fields;
1129     for my $f (@field_list) {
1130         $f =~ s/^\s*//; $f =~ s/\s*$//;
1131         if ($f =~ /^(.{3})(\w*)(?:\[([^]]*)\])?$/) {
1132             my $field = $1;
1133             $field =~ s/\s+//;
1134             my $sf = $2;
1135             $sf =~ s/\s+//;
1136             my $match = $3;
1137             $match =~ s/^\s*//; $match =~ s/\s*$//;
1138             $fields{$field} = { sf => [ split('', $sf) ] };
1139             if ($match) {
1140                 my ($msf,$mre) = split('~', $match);
1141                 if (length($msf) > 0 and length($mre) > 0) {
1142                     $msf =~ s/^\s*//; $msf =~ s/\s*$//;
1143                     $mre =~ s/^\s*//; $mre =~ s/\s*$//;
1144                     $fields{$field}{match} = { sf => $msf, re => qr/$mre/ };
1145                 }
1146             }
1147         }
1148     }
1149
1150     for my $f ( keys %fields) {
1151         for my $to_field ($r->field( $f )) {
1152             if (exists($fields{$f}{match})) {
1153                 next unless (grep { $_ =~ $fields{$f}{match}{re} } $to_field->subfield($fields{$f}{match}{sf}));
1154             }
1155
1156             if ( @{$fields{$f}{sf}} ) {
1157                 $to_field->delete_subfield(code => $fields{$f}{sf});
1158             } else {
1159                 $r->delete_field( $to_field );
1160             }
1161         }
1162     }
1163
1164     $xml = $r->as_xml_record;
1165     $xml =~ s/^<\?.+?\?>$//mo;
1166     $xml =~ s/\n//sgo;
1167     $xml =~ s/>\s+</></sgo;
1168
1169     return $xml;
1170
1171 $_$ LANGUAGE PLPERLU;
1172
1173 CREATE OR REPLACE FUNCTION vandelay.replace_field ( target_xml TEXT, source_xml TEXT, field TEXT ) RETURNS TEXT AS $_$
1174 DECLARE
1175     xml_output TEXT;
1176     parsed_target TEXT;
1177     curr_field TEXT;
1178 BEGIN
1179
1180     parsed_target := vandelay.strip_field( target_xml, ''); -- this dance normalizes the format of the xml for the IF below
1181     xml_output := parsed_target; -- if there are no replace rules, just return the input
1182
1183     FOR curr_field IN SELECT UNNEST( STRING_TO_ARRAY(field, ',') ) LOOP -- naive split, but it's the same we use in the perl
1184
1185         xml_output := vandelay.strip_field( parsed_target, curr_field);
1186
1187         IF xml_output <> parsed_target  AND curr_field ~ E'~' THEN
1188             -- we removed something, and there was a regexp restriction in the curr_field definition, so proceed
1189             xml_output := vandelay.add_field( xml_output, source_xml, curr_field, 1 );
1190         ELSIF curr_field !~ E'~' THEN
1191             -- No regexp restriction, add the curr_field
1192             xml_output := vandelay.add_field( xml_output, source_xml, curr_field, 0 );
1193         END IF;
1194
1195         parsed_target := xml_output; -- in prep for any following loop iterations
1196
1197     END LOOP;
1198
1199     RETURN xml_output;
1200 END;
1201 $_$ LANGUAGE PLPGSQL;
1202
1203 CREATE OR REPLACE FUNCTION vandelay.merge_record_xml ( target_xml TEXT, source_xml TEXT, add_rule TEXT, replace_preserve_rule TEXT, strip_rule TEXT ) RETURNS TEXT AS $_$
1204     SELECT vandelay.replace_field( vandelay.add_field( vandelay.strip_field( $1, $5) , $2, $3 ), $2, $4);
1205 $_$ LANGUAGE SQL;
1206
1207 CREATE TYPE vandelay.compile_profile AS (add_rule TEXT, replace_rule TEXT, preserve_rule TEXT, strip_rule TEXT);
1208 CREATE OR REPLACE FUNCTION vandelay.compile_profile ( incoming_xml TEXT ) RETURNS vandelay.compile_profile AS $_$
1209 DECLARE
1210     output              vandelay.compile_profile%ROWTYPE;
1211     profile             vandelay.merge_profile%ROWTYPE;
1212     profile_tmpl        TEXT;
1213     profile_tmpl_owner  TEXT;
1214     add_rule            TEXT := '';
1215     strip_rule          TEXT := '';
1216     replace_rule        TEXT := '';
1217     preserve_rule       TEXT := '';
1218
1219 BEGIN
1220
1221     profile_tmpl := (oils_xpath('//*[@tag="905"]/*[@code="t"]/text()',incoming_xml))[1];
1222     profile_tmpl_owner := (oils_xpath('//*[@tag="905"]/*[@code="o"]/text()',incoming_xml))[1];
1223
1224     IF profile_tmpl IS NOT NULL AND profile_tmpl <> '' AND profile_tmpl_owner IS NOT NULL AND profile_tmpl_owner <> '' THEN
1225         SELECT  p.* INTO profile
1226           FROM  vandelay.merge_profile p
1227                 JOIN actor.org_unit u ON (u.id = p.owner)
1228           WHERE p.name = profile_tmpl
1229                 AND u.shortname = profile_tmpl_owner;
1230
1231         IF profile.id IS NOT NULL THEN
1232             add_rule := COALESCE(profile.add_spec,'');
1233             strip_rule := COALESCE(profile.strip_spec,'');
1234             replace_rule := COALESCE(profile.replace_spec,'');
1235             preserve_rule := COALESCE(profile.preserve_spec,'');
1236         END IF;
1237     END IF;
1238
1239     add_rule := add_rule || ',' || COALESCE(ARRAY_TO_STRING(oils_xpath('//*[@tag="905"]/*[@code="a"]/text()',incoming_xml),','),'');
1240     strip_rule := strip_rule || ',' || COALESCE(ARRAY_TO_STRING(oils_xpath('//*[@tag="905"]/*[@code="d"]/text()',incoming_xml),','),'');
1241     replace_rule := replace_rule || ',' || COALESCE(ARRAY_TO_STRING(oils_xpath('//*[@tag="905"]/*[@code="r"]/text()',incoming_xml),','),'');
1242     preserve_rule := preserve_rule || ',' || COALESCE(ARRAY_TO_STRING(oils_xpath('//*[@tag="905"]/*[@code="p"]/text()',incoming_xml),','),'');
1243
1244     output.add_rule := BTRIM(add_rule,',');
1245     output.replace_rule := BTRIM(replace_rule,',');
1246     output.strip_rule := BTRIM(strip_rule,',');
1247     output.preserve_rule := BTRIM(preserve_rule,',');
1248
1249     RETURN output;
1250 END;
1251 $_$ LANGUAGE PLPGSQL;
1252
1253 CREATE OR REPLACE FUNCTION vandelay.template_overlay_bib_record ( v_marc TEXT, eg_id BIGINT, merge_profile_id INT ) RETURNS BOOL AS $$
1254 DECLARE
1255     merge_profile   vandelay.merge_profile%ROWTYPE;
1256     dyn_profile     vandelay.compile_profile%ROWTYPE;
1257     editor_string   TEXT;
1258     editor_id       INT;
1259     source_marc     TEXT;
1260     target_marc     TEXT;
1261     eg_marc         TEXT;
1262     replace_rule    TEXT;
1263     match_count     INT;
1264 BEGIN
1265
1266     SELECT  b.marc INTO eg_marc
1267       FROM  biblio.record_entry b
1268       WHERE b.id = eg_id
1269       LIMIT 1;
1270
1271     IF eg_marc IS NULL OR v_marc IS NULL THEN
1272         -- RAISE NOTICE 'no marc for template or bib record';
1273         RETURN FALSE;
1274     END IF;
1275
1276     dyn_profile := vandelay.compile_profile( v_marc );
1277
1278     IF merge_profile_id IS NOT NULL THEN
1279         SELECT * INTO merge_profile FROM vandelay.merge_profile WHERE id = merge_profile_id;
1280         IF FOUND THEN
1281             dyn_profile.add_rule := BTRIM( dyn_profile.add_rule || ',' || COALESCE(merge_profile.add_spec,''), ',');
1282             dyn_profile.strip_rule := BTRIM( dyn_profile.strip_rule || ',' || COALESCE(merge_profile.strip_spec,''), ',');
1283             dyn_profile.replace_rule := BTRIM( dyn_profile.replace_rule || ',' || COALESCE(merge_profile.replace_spec,''), ',');
1284             dyn_profile.preserve_rule := BTRIM( dyn_profile.preserve_rule || ',' || COALESCE(merge_profile.preserve_spec,''), ',');
1285         END IF;
1286     END IF;
1287
1288     IF dyn_profile.replace_rule <> '' AND dyn_profile.preserve_rule <> '' THEN
1289         -- RAISE NOTICE 'both replace [%] and preserve [%] specified', dyn_profile.replace_rule, dyn_profile.preserve_rule;
1290         RETURN FALSE;
1291     END IF;
1292
1293     IF dyn_profile.replace_rule = '' AND dyn_profile.preserve_rule = '' AND dyn_profile.add_rule = '' AND dyn_profile.strip_rule = '' THEN
1294         --Since we have nothing to do, just return a NOOP "we did it"
1295         RETURN TRUE;
1296     ELSIF dyn_profile.replace_rule <> '' THEN
1297         source_marc = v_marc;
1298         target_marc = eg_marc;
1299         replace_rule = dyn_profile.replace_rule;
1300     ELSE
1301         source_marc = eg_marc;
1302         target_marc = v_marc;
1303         replace_rule = dyn_profile.preserve_rule;
1304     END IF;
1305
1306     UPDATE  biblio.record_entry
1307       SET   marc = vandelay.merge_record_xml( target_marc, source_marc, dyn_profile.add_rule, replace_rule, dyn_profile.strip_rule )
1308       WHERE id = eg_id;
1309
1310     IF NOT FOUND THEN
1311         -- RAISE NOTICE 'update of biblio.record_entry failed';
1312         RETURN FALSE;
1313     END IF;
1314
1315     RETURN TRUE;
1316
1317 END;
1318 $$ LANGUAGE PLPGSQL;
1319
1320 CREATE OR REPLACE FUNCTION vandelay.merge_record_xml ( target_marc TEXT, template_marc TEXT ) RETURNS TEXT AS $$
1321 DECLARE
1322     dyn_profile     vandelay.compile_profile%ROWTYPE;
1323     replace_rule    TEXT;
1324     tmp_marc        TEXT;
1325     trgt_marc        TEXT;
1326     tmpl_marc        TEXT;
1327     match_count     INT;
1328 BEGIN
1329
1330     IF target_marc IS NULL OR template_marc IS NULL THEN
1331         -- RAISE NOTICE 'no marc for target or template record';
1332         RETURN NULL;
1333     END IF;
1334
1335     dyn_profile := vandelay.compile_profile( template_marc );
1336
1337     IF dyn_profile.replace_rule <> '' AND dyn_profile.preserve_rule <> '' THEN
1338         -- RAISE NOTICE 'both replace [%] and preserve [%] specified', dyn_profile.replace_rule, dyn_profile.preserve_rule;
1339         RETURN NULL;
1340     END IF;
1341
1342     IF dyn_profile.replace_rule = '' AND dyn_profile.preserve_rule = '' AND dyn_profile.add_rule = '' AND dyn_profile.strip_rule = '' THEN
1343         --Since we have nothing to do, just return what we were given.
1344         RETURN target_marc;
1345     ELSIF dyn_profile.replace_rule <> '' THEN
1346         trgt_marc = target_marc;
1347         tmpl_marc = template_marc;
1348         replace_rule = dyn_profile.replace_rule;
1349     ELSE
1350         tmp_marc = target_marc;
1351         trgt_marc = template_marc;
1352         tmpl_marc = tmp_marc;
1353         replace_rule = dyn_profile.preserve_rule;
1354     END IF;
1355
1356     RETURN vandelay.merge_record_xml( trgt_marc, tmpl_marc, dyn_profile.add_rule, replace_rule, dyn_profile.strip_rule );
1357
1358 END;
1359 $$ LANGUAGE PLPGSQL;
1360
1361 CREATE OR REPLACE FUNCTION vandelay.template_overlay_bib_record ( v_marc TEXT, eg_id BIGINT) RETURNS BOOL AS $$
1362     SELECT vandelay.template_overlay_bib_record( $1, $2, NULL);
1363 $$ LANGUAGE SQL;
1364
1365 CREATE OR REPLACE FUNCTION vandelay.overlay_bib_record ( import_id BIGINT, eg_id BIGINT, merge_profile_id INT ) RETURNS BOOL AS $$
1366 DECLARE
1367     merge_profile   vandelay.merge_profile%ROWTYPE;
1368     dyn_profile     vandelay.compile_profile%ROWTYPE;
1369     editor_string   TEXT;
1370     editor_id       INT;
1371     source_marc     TEXT;
1372     target_marc     TEXT;
1373     eg_marc         TEXT;
1374     v_marc          TEXT;
1375     replace_rule    TEXT;
1376 BEGIN
1377
1378     SELECT  q.marc INTO v_marc
1379       FROM  vandelay.queued_record q
1380             JOIN vandelay.bib_match m ON (m.queued_record = q.id AND q.id = import_id)
1381       LIMIT 1;
1382
1383     IF v_marc IS NULL THEN
1384         -- RAISE NOTICE 'no marc for vandelay or bib record';
1385         RETURN FALSE;
1386     END IF;
1387
1388     IF vandelay.template_overlay_bib_record( v_marc, eg_id, merge_profile_id) THEN
1389         UPDATE  vandelay.queued_bib_record
1390           SET   imported_as = eg_id,
1391                 import_time = NOW()
1392           WHERE id = import_id;
1393
1394         editor_string := (oils_xpath('//*[@tag="905"]/*[@code="u"]/text()',v_marc))[1];
1395
1396         IF editor_string IS NOT NULL AND editor_string <> '' THEN
1397             SELECT usr INTO editor_id FROM actor.card WHERE barcode = editor_string;
1398
1399             IF editor_id IS NULL THEN
1400                 SELECT id INTO editor_id FROM actor.usr WHERE usrname = editor_string;
1401             END IF;
1402
1403             IF editor_id IS NOT NULL THEN
1404                 UPDATE biblio.record_entry SET editor = editor_id WHERE id = eg_id;
1405             END IF;
1406         END IF;
1407
1408         RETURN TRUE;
1409     END IF;
1410
1411     -- RAISE NOTICE 'update of biblio.record_entry failed';
1412
1413     RETURN FALSE;
1414
1415 END;
1416 $$ LANGUAGE PLPGSQL;
1417
1418 CREATE OR REPLACE FUNCTION vandelay.auto_overlay_bib_record_with_best ( import_id BIGINT, merge_profile_id INT, lwm_ratio_value_p NUMERIC ) RETURNS BOOL AS $$
1419 DECLARE
1420     eg_id           BIGINT;
1421     lwm_ratio_value NUMERIC;
1422 BEGIN
1423
1424     lwm_ratio_value := COALESCE(lwm_ratio_value_p, 0.0);
1425
1426     PERFORM * FROM vandelay.queued_bib_record WHERE import_time IS NOT NULL AND id = import_id;
1427
1428     IF FOUND THEN
1429         -- RAISE NOTICE 'already imported, cannot auto-overlay'
1430         RETURN FALSE;
1431     END IF;
1432
1433     SELECT  m.eg_record INTO eg_id
1434       FROM  vandelay.bib_match m
1435             JOIN vandelay.queued_bib_record qr ON (m.queued_record = qr.id)
1436             JOIN vandelay.bib_queue q ON (qr.queue = q.id)
1437             JOIN biblio.record_entry r ON (r.id = m.eg_record)
1438       WHERE m.queued_record = import_id
1439             AND qr.quality::NUMERIC / COALESCE(NULLIF(m.quality,0),1)::NUMERIC >= lwm_ratio_value
1440       ORDER BY  m.match_score DESC, -- required match score
1441                 qr.quality::NUMERIC / COALESCE(NULLIF(m.quality,0),1)::NUMERIC DESC, -- quality tie breaker
1442                 m.id -- when in doubt, use the first match
1443       LIMIT 1;
1444
1445     IF eg_id IS NULL THEN
1446         -- RAISE NOTICE 'incoming record is not of high enough quality';
1447         RETURN FALSE;
1448     END IF;
1449
1450     RETURN vandelay.overlay_bib_record( import_id, eg_id, merge_profile_id );
1451 END;
1452 $$ LANGUAGE PLPGSQL;
1453
1454 CREATE OR REPLACE FUNCTION vandelay.auto_overlay_bib_record_with_best ( import_id BIGINT, merge_profile_id INT ) RETURNS BOOL AS $$
1455     SELECT vandelay.auto_overlay_bib_record_with_best( $1, $2, p.lwm_ratio ) FROM vandelay.merge_profile p WHERE id = $2;
1456 $$ LANGUAGE SQL;
1457
1458 CREATE OR REPLACE FUNCTION vandelay.auto_overlay_bib_record ( import_id BIGINT, merge_profile_id INT ) RETURNS BOOL AS $$
1459 DECLARE
1460     eg_id           BIGINT;
1461     match_count     INT;
1462 BEGIN
1463
1464     PERFORM * FROM vandelay.queued_bib_record WHERE import_time IS NOT NULL AND id = import_id;
1465
1466     IF FOUND THEN
1467         -- RAISE NOTICE 'already imported, cannot auto-overlay'
1468         RETURN FALSE;
1469     END IF;
1470
1471     SELECT COUNT(*) INTO match_count FROM vandelay.bib_match WHERE queued_record = import_id;
1472
1473     IF match_count <> 1 THEN
1474         -- RAISE NOTICE 'not an exact match';
1475         RETURN FALSE;
1476     END IF;
1477
1478     -- Check that the one match is on the first 901c
1479     SELECT  m.eg_record INTO eg_id
1480       FROM  vandelay.queued_bib_record q
1481             JOIN vandelay.bib_match m ON (m.queued_record = q.id)
1482       WHERE q.id = import_id
1483             AND m.eg_record = oils_xpath_string('//*[@tag="901"]/*[@code="c"][1]',marc)::BIGINT;
1484
1485     IF NOT FOUND THEN
1486         -- RAISE NOTICE 'not a 901c match';
1487         RETURN FALSE;
1488     END IF;
1489
1490     RETURN vandelay.overlay_bib_record( import_id, eg_id, merge_profile_id );
1491 END;
1492 $$ LANGUAGE PLPGSQL;
1493
1494 CREATE OR REPLACE FUNCTION vandelay.auto_overlay_bib_queue ( queue_id BIGINT, merge_profile_id INT ) RETURNS SETOF BIGINT AS $$
1495 DECLARE
1496     queued_record   vandelay.queued_bib_record%ROWTYPE;
1497 BEGIN
1498
1499     FOR queued_record IN SELECT * FROM vandelay.queued_bib_record WHERE queue = queue_id AND import_time IS NULL LOOP
1500
1501         IF vandelay.auto_overlay_bib_record( queued_record.id, merge_profile_id ) THEN
1502             RETURN NEXT queued_record.id;
1503         END IF;
1504
1505     END LOOP;
1506
1507     RETURN;
1508     
1509 END;
1510 $$ LANGUAGE PLPGSQL;
1511
1512 CREATE OR REPLACE FUNCTION vandelay.auto_overlay_bib_queue_with_best ( queue_id BIGINT, merge_profile_id INT, lwm_ratio_value NUMERIC ) RETURNS SETOF BIGINT AS $$
1513 DECLARE
1514     queued_record   vandelay.queued_bib_record%ROWTYPE;
1515 BEGIN
1516
1517     FOR queued_record IN SELECT * FROM vandelay.queued_bib_record WHERE queue = queue_id AND import_time IS NULL LOOP
1518
1519         IF vandelay.auto_overlay_bib_record_with_best( queued_record.id, merge_profile_id, lwm_ratio_value ) THEN
1520             RETURN NEXT queued_record.id;
1521         END IF;
1522
1523     END LOOP;
1524
1525     RETURN;
1526     
1527 END;
1528 $$ LANGUAGE PLPGSQL;
1529
1530 CREATE OR REPLACE FUNCTION vandelay.auto_overlay_bib_queue_with_best ( import_id BIGINT, merge_profile_id INT ) RETURNS SETOF BIGINT AS $$
1531     SELECT vandelay.auto_overlay_bib_queue_with_best( $1, $2, p.lwm_ratio ) FROM vandelay.merge_profile p WHERE id = $2;
1532 $$ LANGUAGE SQL;
1533
1534 CREATE OR REPLACE FUNCTION vandelay.auto_overlay_bib_queue ( queue_id BIGINT ) RETURNS SETOF BIGINT AS $$
1535     SELECT * FROM vandelay.auto_overlay_bib_queue( $1, NULL );
1536 $$ LANGUAGE SQL;
1537
1538 CREATE OR REPLACE FUNCTION vandelay.ingest_bib_marc ( ) RETURNS TRIGGER AS $$
1539 DECLARE
1540     value   TEXT;
1541     atype   TEXT;
1542     adef    RECORD;
1543 BEGIN
1544     IF TG_OP IN ('INSERT','UPDATE') AND NEW.imported_as IS NOT NULL THEN
1545         RETURN NEW;
1546     END IF;
1547
1548     FOR adef IN SELECT * FROM vandelay.bib_attr_definition LOOP
1549
1550         SELECT extract_marc_field('vandelay.queued_bib_record', id, adef.xpath, adef.remove) INTO value FROM vandelay.queued_bib_record WHERE id = NEW.id;
1551         IF (value IS NOT NULL AND value <> '') THEN
1552             INSERT INTO vandelay.queued_bib_record_attr (record, field, attr_value) VALUES (NEW.id, adef.id, value);
1553         END IF;
1554
1555     END LOOP;
1556
1557     RETURN NULL;
1558 END;
1559 $$ LANGUAGE PLPGSQL;
1560
1561 CREATE OR REPLACE FUNCTION vandelay.cleanup_bib_marc ( ) RETURNS TRIGGER AS $$
1562 BEGIN
1563     IF TG_OP IN ('INSERT','UPDATE') AND NEW.imported_as IS NOT NULL THEN
1564         RETURN NEW;
1565     END IF;
1566
1567     DELETE FROM vandelay.queued_bib_record_attr WHERE record = OLD.id;
1568     DELETE FROM vandelay.import_item WHERE record = OLD.id;
1569
1570     IF TG_OP = 'UPDATE' THEN
1571         RETURN NEW;
1572     END IF;
1573     RETURN OLD;
1574 END;
1575 $$ LANGUAGE PLPGSQL;
1576
1577 CREATE TRIGGER cleanup_bib_trigger
1578     BEFORE UPDATE OR DELETE ON vandelay.queued_bib_record
1579     FOR EACH ROW EXECUTE PROCEDURE vandelay.cleanup_bib_marc();
1580
1581 CREATE TRIGGER ingest_bib_trigger
1582     AFTER INSERT OR UPDATE ON vandelay.queued_bib_record
1583     FOR EACH ROW EXECUTE PROCEDURE vandelay.ingest_bib_marc();
1584
1585 CREATE TRIGGER zz_match_bibs_trigger
1586     BEFORE INSERT OR UPDATE ON vandelay.queued_bib_record
1587     FOR EACH ROW EXECUTE PROCEDURE vandelay.match_bib_record();
1588
1589
1590 /* Authority stuff down here */
1591 ---------------------------------------
1592 CREATE TABLE vandelay.authority_attr_definition (
1593         id                      SERIAL  PRIMARY KEY,
1594         code            TEXT    UNIQUE NOT NULL,
1595         description     TEXT,
1596         xpath           TEXT    NOT NULL,
1597         remove          TEXT    NOT NULL DEFAULT ''
1598 );
1599
1600 CREATE TYPE vandelay.authority_queue_queue_type AS ENUM ('authority');
1601 CREATE TABLE vandelay.authority_queue (
1602         queue_type      vandelay.authority_queue_queue_type NOT NULL DEFAULT 'authority',
1603         CONSTRAINT vand_authority_queue_name_once_per_owner_const UNIQUE (owner,name,queue_type)
1604 ) INHERITS (vandelay.queue);
1605 ALTER TABLE vandelay.authority_queue ADD PRIMARY KEY (id);
1606
1607 CREATE TABLE vandelay.queued_authority_record (
1608         queue           INT     NOT NULL REFERENCES vandelay.authority_queue (id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
1609         imported_as     INT     REFERENCES authority.record_entry (id) DEFERRABLE INITIALLY DEFERRED,
1610         import_error    TEXT    REFERENCES vandelay.import_error (code) ON DELETE SET NULL ON UPDATE CASCADE DEFERRABLE INITIALLY DEFERRED,
1611         error_detail    TEXT
1612 ) INHERITS (vandelay.queued_record);
1613 ALTER TABLE vandelay.queued_authority_record ADD PRIMARY KEY (id);
1614 CREATE INDEX queued_authority_record_queue_idx ON vandelay.queued_authority_record (queue);
1615
1616 CREATE TABLE vandelay.queued_authority_record_attr (
1617         id                      BIGSERIAL       PRIMARY KEY,
1618         record          BIGINT          NOT NULL REFERENCES vandelay.queued_authority_record (id) DEFERRABLE INITIALLY DEFERRED,
1619         field           INT                     NOT NULL REFERENCES vandelay.authority_attr_definition (id) DEFERRABLE INITIALLY DEFERRED,
1620         attr_value      TEXT            NOT NULL
1621 );
1622 CREATE INDEX queued_authority_record_attr_record_idx ON vandelay.queued_authority_record_attr (record);
1623
1624 CREATE TABLE vandelay.authority_match (
1625         id                              BIGSERIAL       PRIMARY KEY,
1626         queued_record   BIGINT          REFERENCES vandelay.queued_authority_record (id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
1627         eg_record               BIGINT          REFERENCES authority.record_entry (id) DEFERRABLE INITIALLY DEFERRED,
1628     quality         INT         NOT NULL DEFAULT 0
1629 );
1630
1631 CREATE OR REPLACE FUNCTION vandelay.ingest_authority_marc ( ) RETURNS TRIGGER AS $$
1632 DECLARE
1633     value   TEXT;
1634     atype   TEXT;
1635     adef    RECORD;
1636 BEGIN
1637     IF TG_OP IN ('INSERT','UPDATE') AND NEW.imported_as IS NOT NULL THEN
1638         RETURN NEW;
1639     END IF;
1640
1641     FOR adef IN SELECT * FROM vandelay.authority_attr_definition LOOP
1642
1643         SELECT extract_marc_field('vandelay.queued_authority_record', id, adef.xpath, adef.remove) INTO value FROM vandelay.queued_authority_record WHERE id = NEW.id;
1644         IF (value IS NOT NULL AND value <> '') THEN
1645             INSERT INTO vandelay.queued_authority_record_attr (record, field, attr_value) VALUES (NEW.id, adef.id, value);
1646         END IF;
1647
1648     END LOOP;
1649
1650     RETURN NULL;
1651 END;
1652 $$ LANGUAGE PLPGSQL;
1653
1654 CREATE OR REPLACE FUNCTION vandelay.cleanup_authority_marc ( ) RETURNS TRIGGER AS $$
1655 BEGIN
1656     IF TG_OP IN ('INSERT','UPDATE') AND NEW.imported_as IS NOT NULL THEN
1657         RETURN NEW;
1658     END IF;
1659
1660     DELETE FROM vandelay.queued_authority_record_attr WHERE record = OLD.id;
1661     IF TG_OP = 'UPDATE' THEN
1662         RETURN NEW;
1663     END IF;
1664     RETURN OLD;
1665 END;
1666 $$ LANGUAGE PLPGSQL;
1667
1668 CREATE TRIGGER cleanup_authority_trigger
1669     BEFORE UPDATE OR DELETE ON vandelay.queued_authority_record
1670     FOR EACH ROW EXECUTE PROCEDURE vandelay.cleanup_authority_marc();
1671
1672 CREATE TRIGGER ingest_authority_trigger
1673     AFTER INSERT OR UPDATE ON vandelay.queued_authority_record
1674     FOR EACH ROW EXECUTE PROCEDURE vandelay.ingest_authority_marc();
1675
1676 CREATE OR REPLACE FUNCTION vandelay.overlay_authority_record ( import_id BIGINT, eg_id BIGINT, merge_profile_id INT ) RETURNS BOOL AS $$
1677 DECLARE
1678     merge_profile   vandelay.merge_profile%ROWTYPE;
1679     dyn_profile     vandelay.compile_profile%ROWTYPE;
1680     source_marc     TEXT;
1681     target_marc     TEXT;
1682     eg_marc         TEXT;
1683     v_marc          TEXT;
1684     replace_rule    TEXT;
1685     match_count     INT;
1686 BEGIN
1687
1688     SELECT  b.marc INTO eg_marc
1689       FROM  authority.record_entry b
1690             JOIN vandelay.authority_match m ON (m.eg_record = b.id AND m.queued_record = import_id)
1691       LIMIT 1;
1692
1693     SELECT  q.marc INTO v_marc
1694       FROM  vandelay.queued_record q
1695             JOIN vandelay.authority_match m ON (m.queued_record = q.id AND q.id = import_id)
1696       LIMIT 1;
1697
1698     IF eg_marc IS NULL OR v_marc IS NULL THEN
1699         -- RAISE NOTICE 'no marc for vandelay or authority record';
1700         RETURN FALSE;
1701     END IF;
1702
1703     dyn_profile := vandelay.compile_profile( v_marc );
1704
1705     IF merge_profile_id IS NOT NULL THEN
1706         SELECT * INTO merge_profile FROM vandelay.merge_profile WHERE id = merge_profile_id;
1707         IF FOUND THEN
1708             dyn_profile.add_rule := BTRIM( dyn_profile.add_rule || ',' || COALESCE(merge_profile.add_spec,''), ',');
1709             dyn_profile.strip_rule := BTRIM( dyn_profile.strip_rule || ',' || COALESCE(merge_profile.strip_spec,''), ',');
1710             dyn_profile.replace_rule := BTRIM( dyn_profile.replace_rule || ',' || COALESCE(merge_profile.replace_spec,''), ',');
1711             dyn_profile.preserve_rule := BTRIM( dyn_profile.preserve_rule || ',' || COALESCE(merge_profile.preserve_spec,''), ',');
1712         END IF;
1713     END IF;
1714
1715     IF dyn_profile.replace_rule <> '' AND dyn_profile.preserve_rule <> '' THEN
1716         -- RAISE NOTICE 'both replace [%] and preserve [%] specified', dyn_profile.replace_rule, dyn_profile.preserve_rule;
1717         RETURN FALSE;
1718     END IF;
1719
1720     IF dyn_profile.replace_rule = '' AND dyn_profile.preserve_rule = '' AND dyn_profile.add_rule = '' AND dyn_profile.strip_rule = '' THEN
1721         --Since we have nothing to do, just return a NOOP "we did it"
1722         RETURN TRUE;
1723     ELSIF dyn_profile.replace_rule <> '' THEN
1724         source_marc = v_marc;
1725         target_marc = eg_marc;
1726         replace_rule = dyn_profile.replace_rule;
1727     ELSE
1728         source_marc = eg_marc;
1729         target_marc = v_marc;
1730         replace_rule = dyn_profile.preserve_rule;
1731     END IF;
1732
1733     UPDATE  authority.record_entry
1734       SET   marc = vandelay.merge_record_xml( target_marc, source_marc, dyn_profile.add_rule, replace_rule, dyn_profile.strip_rule )
1735       WHERE id = eg_id;
1736
1737     IF FOUND THEN
1738         UPDATE  vandelay.queued_authority_record
1739           SET   imported_as = eg_id,
1740                 import_time = NOW()
1741           WHERE id = import_id;
1742         RETURN TRUE;
1743     END IF;
1744
1745     -- RAISE NOTICE 'update of authority.record_entry failed';
1746
1747     RETURN FALSE;
1748
1749 END;
1750 $$ LANGUAGE PLPGSQL;
1751
1752 CREATE OR REPLACE FUNCTION vandelay.auto_overlay_authority_record ( import_id BIGINT, merge_profile_id INT ) RETURNS BOOL AS $$
1753 DECLARE
1754     eg_id           BIGINT;
1755     match_count     INT;
1756 BEGIN
1757     SELECT COUNT(*) INTO match_count FROM vandelay.authority_match WHERE queued_record = import_id;
1758
1759     IF match_count <> 1 THEN
1760         -- RAISE NOTICE 'not an exact match';
1761         RETURN FALSE;
1762     END IF;
1763
1764     SELECT  m.eg_record INTO eg_id
1765       FROM  vandelay.authority_match m
1766       WHERE m.queued_record = import_id
1767       LIMIT 1;
1768
1769     IF eg_id IS NULL THEN
1770         RETURN FALSE;
1771     END IF;
1772
1773     RETURN vandelay.overlay_authority_record( import_id, eg_id, merge_profile_id );
1774 END;
1775 $$ LANGUAGE PLPGSQL;
1776
1777 CREATE OR REPLACE FUNCTION vandelay.auto_overlay_authority_queue ( queue_id BIGINT, merge_profile_id INT ) RETURNS SETOF BIGINT AS $$
1778 DECLARE
1779     queued_record   vandelay.queued_authority_record%ROWTYPE;
1780 BEGIN
1781
1782     FOR queued_record IN SELECT * FROM vandelay.queued_authority_record WHERE queue = queue_id AND import_time IS NULL LOOP
1783
1784         IF vandelay.auto_overlay_authority_record( queued_record.id, merge_profile_id ) THEN
1785             RETURN NEXT queued_record.id;
1786         END IF;
1787
1788     END LOOP;
1789
1790     RETURN;
1791     
1792 END;
1793 $$ LANGUAGE PLPGSQL;
1794
1795 CREATE OR REPLACE FUNCTION vandelay.auto_overlay_authority_queue ( queue_id BIGINT ) RETURNS SETOF BIGINT AS $$
1796     SELECT * FROM vandelay.auto_overlay_authority_queue( $1, NULL );
1797 $$ LANGUAGE SQL;
1798
1799
1800 -- Vandelay (for importing and exporting records) 012.schema.vandelay.sql 
1801 --INSERT INTO vandelay.bib_attr_definition ( id, code, description, xpath ) VALUES (1, 'title', oils_i18n_gettext(1, 'vqbrad', 'Title of work', 'description'),'//*[@tag="245"]/*[contains("abcmnopr",@code)]');
1802 --INSERT INTO vandelay.bib_attr_definition ( id, code, description, xpath ) VALUES (2, 'author', oils_i18n_gettext(1, 'vqbrad', 'Author of work', 'description'),'//*[@tag="100" or @tag="110" or @tag="113"]/*[contains("ad",@code)]');
1803 --INSERT INTO vandelay.bib_attr_definition ( id, code, description, xpath ) VALUES (3, 'language', oils_i18n_gettext(3, 'vqbrad', 'Language of work', 'description'),'//*[@tag="240"]/*[@code="l"][1]');
1804 --INSERT INTO vandelay.bib_attr_definition ( id, code, description, xpath ) VALUES (4, 'pagination', oils_i18n_gettext(4, 'vqbrad', 'Pagination', 'description'),'//*[@tag="300"]/*[@code="a"][1]');
1805 --INSERT INTO vandelay.bib_attr_definition ( id, code, description, xpath, ident, remove ) VALUES (5, 'isbn',oils_i18n_gettext(5, 'vqbrad', 'ISBN', 'description'),'//*[@tag="020"]/*[@code="a"]', TRUE, $r$(?:-|\s.+$)$r$);
1806 --INSERT INTO vandelay.bib_attr_definition ( id, code, description, xpath, ident, remove ) VALUES (6, 'issn',oils_i18n_gettext(6, 'vqbrad', 'ISSN', 'description'),'//*[@tag="022"]/*[@code="a"]', TRUE, $r$(?:-|\s.+$)$r$);
1807 --INSERT INTO vandelay.bib_attr_definition ( id, code, description, xpath ) VALUES (7, 'price',oils_i18n_gettext(7, 'vqbrad', 'Price', 'description'),'//*[@tag="020" or @tag="022"]/*[@code="c"][1]');
1808 --INSERT INTO vandelay.bib_attr_definition ( id, code, description, xpath, ident ) VALUES (8, 'rec_identifier',oils_i18n_gettext(8, 'vqbrad', 'Accession Number', 'description'),'//*[@tag="001"]', TRUE);
1809 --INSERT INTO vandelay.bib_attr_definition ( id, code, description, xpath, ident ) VALUES (9, 'eg_tcn',oils_i18n_gettext(9, 'vqbrad', 'TCN Value', 'description'),'//*[@tag="901"]/*[@code="a"]', TRUE);
1810 --INSERT INTO vandelay.bib_attr_definition ( id, code, description, xpath, ident ) VALUES (10, 'eg_tcn_source',oils_i18n_gettext(10, 'vqbrad', 'TCN Source', 'description'),'//*[@tag="901"]/*[@code="b"]', TRUE);
1811 --INSERT INTO vandelay.bib_attr_definition ( id, code, description, xpath, ident ) VALUES (11, 'eg_identifier',oils_i18n_gettext(11, 'vqbrad', 'Internal ID', 'description'),'//*[@tag="901"]/*[@code="c"]', TRUE);
1812 --INSERT INTO vandelay.bib_attr_definition ( id, code, description, xpath ) VALUES (12, 'publisher',oils_i18n_gettext(12, 'vqbrad', 'Publisher', 'description'),'//*[@tag="260"]/*[@code="b"][1]');
1813 --INSERT INTO vandelay.bib_attr_definition ( id, code, description, xpath, remove ) VALUES (13, 'pubdate',oils_i18n_gettext(13, 'vqbrad', 'Publication Date', 'description'),'//*[@tag="260"]/*[@code="c"][1]',$r$\D$r$);
1814 --INSERT INTO vandelay.bib_attr_definition ( id, code, description, xpath ) VALUES (14, 'edition',oils_i18n_gettext(14, 'vqbrad', 'Edition', 'description'),'//*[@tag="250"]/*[@code="a"][1]');
1815 --
1816 --INSERT INTO vandelay.import_item_attr_definition (
1817 --    owner, name, tag, owning_lib, circ_lib, location,
1818 --    call_number, circ_modifier, barcode, price, copy_number,
1819 --    circulate, ref, holdable, opac_visible, status
1820 --) VALUES (
1821 --    1,
1822 --    'Evergreen 852 export format',
1823 --    '852',
1824 --    '[@code = "b"][1]',
1825 --    '[@code = "b"][2]',
1826 --    'c',
1827 --    'j',
1828 --    'g',
1829 --    'p',
1830 --    'y',
1831 --    't',
1832 --    '[@code = "x" and text() = "circulating"]',
1833 --    '[@code = "x" and text() = "reference"]',
1834 --    '[@code = "x" and text() = "holdable"]',
1835 --    '[@code = "x" and text() = "visible"]',
1836 --    'z'
1837 --);
1838 --
1839 --INSERT INTO vandelay.import_item_attr_definition (
1840 --    owner,
1841 --    name,
1842 --    tag,
1843 --    owning_lib,
1844 --    location,
1845 --    call_number,
1846 --    circ_modifier,
1847 --    barcode,
1848 --    price,
1849 --    status
1850 --) VALUES (
1851 --    1,
1852 --    'Unicorn Import format -- 999',
1853 --    '999',
1854 --    'm',
1855 --    'l',
1856 --    'a',
1857 --    't',
1858 --    'i',
1859 --    'p',
1860 --    'k'
1861 --);
1862 --
1863 --INSERT INTO vandelay.authority_attr_definition ( code, description, xpath, ident ) VALUES ('rec_identifier','Identifier','//*[@tag="001"]', TRUE);
1864
1865 COMMIT;
1866