]> git.evergreen-ils.org Git - working/Evergreen.git/blob - Open-ILS/src/sql/Pg/012.schema.vandelay.sql
Vandelay record bucket-limited matching
[working/Evergreen.git] / Open-ILS / src / sql / Pg / 012.schema.vandelay.sql
1 DROP SCHEMA IF EXISTS vandelay CASCADE;
2
3 BEGIN;
4
5 CREATE SCHEMA vandelay;
6
7 CREATE TABLE vandelay.match_set (
8     id      SERIAL  PRIMARY KEY,
9     name    TEXT        NOT NULL,
10     owner   INT     NOT NULL REFERENCES actor.org_unit (id) ON DELETE CASCADE,
11     mtype   TEXT        NOT NULL DEFAULT 'biblio', -- 'biblio','authority','mfhd'?, others?
12     CONSTRAINT name_once_per_owner_mtype UNIQUE (name, owner, mtype)
13 );
14
15 -- Table to define match points, either FF via SVF or tag+subfield
16 CREATE TABLE vandelay.match_set_point (
17     id          SERIAL  PRIMARY KEY,
18     match_set   INT     REFERENCES vandelay.match_set (id) ON DELETE CASCADE,
19     parent      INT     REFERENCES vandelay.match_set_point (id),
20     bool_op     TEXT    CHECK (bool_op IS NULL OR (bool_op IN ('AND','OR','NOT'))),
21     svf         TEXT    REFERENCES config.record_attr_definition (name),
22     tag         TEXT,
23     subfield    TEXT,
24     negate      BOOL    DEFAULT FALSE,
25     quality     INT     NOT NULL DEFAULT 1, -- higher is better
26     CONSTRAINT vmsp_need_a_subfield_with_a_tag CHECK ((tag IS NOT NULL AND subfield IS NOT NULL) OR tag IS NULL),
27     CONSTRAINT vmsp_need_a_tag_or_a_ff_or_a_bo CHECK (
28         (tag IS NOT NULL AND svf IS NULL AND bool_op IS NULL) OR
29         (tag IS NULL AND svf IS NOT NULL AND bool_op IS NULL) OR
30         (tag IS NULL AND svf IS NULL AND bool_op IS NOT NULL)
31     )
32 );
33
34 CREATE TABLE vandelay.match_set_quality (
35     id          SERIAL  PRIMARY KEY,
36     match_set   INT     NOT NULL REFERENCES vandelay.match_set (id) ON DELETE CASCADE,
37     svf         TEXT    REFERENCES config.record_attr_definition,
38     tag         TEXT,
39     subfield    TEXT,
40     value       TEXT    NOT NULL,
41     quality     INT     NOT NULL DEFAULT 1, -- higher is better
42     CONSTRAINT vmsq_need_a_subfield_with_a_tag CHECK ((tag IS NOT NULL AND subfield IS NOT NULL) OR tag IS NULL),
43     CONSTRAINT vmsq_need_a_tag_or_a_ff CHECK ((tag IS NOT NULL AND svf IS NULL) OR (tag IS NULL AND svf IS NOT NULL))
44 );
45 CREATE UNIQUE INDEX vmsq_def_once_per_set ON vandelay.match_set_quality (match_set, COALESCE(tag,''), COALESCE(subfield,''), COALESCE(svf,''), value);
46
47
48 CREATE TABLE vandelay.queue (
49         id                              BIGSERIAL       PRIMARY KEY,
50         owner                   INT                     NOT NULL REFERENCES actor.usr (id) DEFERRABLE INITIALLY DEFERRED,
51         name                    TEXT            NOT NULL,
52         complete                BOOL            NOT NULL DEFAULT FALSE,
53     match_set       INT         REFERENCES vandelay.match_set (id) ON UPDATE CASCADE ON DELETE SET NULL DEFERRABLE INITIALLY DEFERRED
54 );
55
56 CREATE TABLE vandelay.queued_record (
57     id                  BIGSERIAL                   PRIMARY KEY,
58     create_time TIMESTAMP WITH TIME ZONE    NOT NULL DEFAULT NOW(),
59     import_time TIMESTAMP WITH TIME ZONE,
60         purpose         TEXT                                            NOT NULL DEFAULT 'import' CHECK (purpose IN ('import','overlay')),
61     marc                TEXT                        NOT NULL,
62     quality     INT                         NOT NULL DEFAULT 0
63 );
64
65
66
67 /* Bib stuff at the top */
68 ----------------------------------------------------
69
70 CREATE TABLE vandelay.bib_attr_definition (
71         id                      SERIAL  PRIMARY KEY,
72         code            TEXT    UNIQUE NOT NULL,
73         description     TEXT,
74         xpath           TEXT    NOT NULL,
75         remove          TEXT    NOT NULL DEFAULT ''
76 );
77
78 -- Each TEXT field (other than 'name') should hold an XPath predicate for pulling the data needed
79 -- DROP TABLE vandelay.import_item_attr_definition CASCADE;
80 CREATE TABLE vandelay.import_item_attr_definition (
81     id              BIGSERIAL   PRIMARY KEY,
82     owner           INT         NOT NULL REFERENCES actor.org_unit (id) ON UPDATE CASCADE ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
83     name            TEXT        NOT NULL,
84     tag             TEXT        NOT NULL,
85     keep            BOOL        NOT NULL DEFAULT FALSE,
86     owning_lib      TEXT,
87     circ_lib        TEXT,
88     call_number     TEXT,
89     copy_number     TEXT,
90     status          TEXT,
91     location        TEXT,
92     circulate       TEXT,
93     deposit         TEXT,
94     deposit_amount  TEXT,
95     ref             TEXT,
96     holdable        TEXT,
97     price           TEXT,
98     barcode         TEXT,
99     circ_modifier   TEXT,
100     circ_as_type    TEXT,
101     alert_message   TEXT,
102     opac_visible    TEXT,
103     pub_note_title  TEXT,
104     pub_note        TEXT,
105     priv_note_title TEXT,
106     priv_note       TEXT,
107     internal_id     TEXT,
108         CONSTRAINT vand_import_item_attr_def_idx UNIQUE (owner,name)
109 );
110
111 CREATE TABLE vandelay.import_error (
112     code        TEXT    PRIMARY KEY,
113     description TEXT    NOT NULL -- i18n
114 );
115
116 CREATE TYPE vandelay.bib_queue_queue_type AS ENUM ('bib', 'acq');
117
118 CREATE TABLE vandelay.bib_queue (
119         queue_type          vandelay.bib_queue_queue_type       NOT NULL DEFAULT 'bib',
120         item_attr_def   BIGINT REFERENCES vandelay.import_item_attr_definition (id) ON DELETE SET NULL DEFERRABLE INITIALLY DEFERRED,
121         CONSTRAINT vand_bib_queue_name_once_per_owner_const UNIQUE (owner,name,queue_type)
122 ) INHERITS (vandelay.queue);
123 ALTER TABLE vandelay.bib_queue ADD PRIMARY KEY (id);
124
125 CREATE TABLE vandelay.queued_bib_record (
126         queue               INT         NOT NULL REFERENCES vandelay.bib_queue (id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
127         bib_source          INT         REFERENCES config.bib_source (id) DEFERRABLE INITIALLY DEFERRED,
128         imported_as     BIGINT  REFERENCES biblio.record_entry (id) DEFERRABLE INITIALLY DEFERRED,
129         import_error    TEXT    REFERENCES vandelay.import_error (code) ON DELETE SET NULL ON UPDATE CASCADE DEFERRABLE INITIALLY DEFERRED,
130         error_detail    TEXT
131 ) INHERITS (vandelay.queued_record);
132 ALTER TABLE vandelay.queued_bib_record ADD PRIMARY KEY (id);
133 CREATE INDEX queued_bib_record_queue_idx ON vandelay.queued_bib_record (queue);
134
135 CREATE TABLE vandelay.queued_bib_record_attr (
136         id                      BIGSERIAL       PRIMARY KEY,
137         record          BIGINT          NOT NULL REFERENCES vandelay.queued_bib_record (id) DEFERRABLE INITIALLY DEFERRED,
138         field           INT                     NOT NULL REFERENCES vandelay.bib_attr_definition (id) DEFERRABLE INITIALLY DEFERRED,
139         attr_value      TEXT            NOT NULL
140 );
141 CREATE INDEX queued_bib_record_attr_record_idx ON vandelay.queued_bib_record_attr (record);
142
143 CREATE TABLE vandelay.bib_match (
144         id                              BIGSERIAL       PRIMARY KEY,
145         queued_record   BIGINT          REFERENCES vandelay.queued_bib_record (id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
146         eg_record               BIGINT          REFERENCES biblio.record_entry (id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
147     quality         INT         NOT NULL DEFAULT 1,
148     match_score     INT         NOT NULL DEFAULT 0
149 );
150
151 CREATE TABLE vandelay.import_item (
152     id              BIGSERIAL   PRIMARY KEY,
153     record          BIGINT      NOT NULL REFERENCES vandelay.queued_bib_record (id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
154     definition      BIGINT      NOT NULL REFERENCES vandelay.import_item_attr_definition (id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
155         import_error    TEXT        REFERENCES vandelay.import_error (code) ON DELETE SET NULL ON UPDATE CASCADE DEFERRABLE INITIALLY DEFERRED,
156         error_detail    TEXT,
157     imported_as     BIGINT,
158     import_time     TIMESTAMP WITH TIME ZONE,
159     owning_lib      INT,
160     circ_lib        INT,
161     call_number     TEXT,
162     copy_number     INT,
163     status          INT,
164     location        INT,
165     circulate       BOOL,
166     deposit         BOOL,
167     deposit_amount  NUMERIC(8,2),
168     ref             BOOL,
169     holdable        BOOL,
170     price           NUMERIC(8,2),
171     barcode         TEXT,
172     circ_modifier   TEXT,
173     circ_as_type    TEXT,
174     alert_message   TEXT,
175     pub_note        TEXT,
176     priv_note       TEXT,
177     opac_visible    BOOL,
178     internal_id     BIGINT -- queue_type == 'acq' ? acq.lineitem_detail.id : asset.copy.id
179 );
180
181 CREATE TABLE vandelay.import_bib_trash_group(
182     id           SERIAL  PRIMARY KEY,
183     owner        INTEGER NOT NULL REFERENCES actor.org_unit(id),
184     label        TEXT    NOT NULL, --i18n
185     always_apply BOOLEAN NOT NULL DEFAULT FALSE,
186         CONSTRAINT vand_import_bib_trash_grp_owner_label UNIQUE (owner, label)
187 );
188  
189 CREATE TABLE vandelay.import_bib_trash_fields (
190     id         BIGSERIAL PRIMARY KEY,
191     grp        INTEGER   NOT NULL REFERENCES vandelay.import_bib_trash_group,
192     field      TEXT      NOT NULL,
193     CONSTRAINT vand_import_bib_trash_fields_once_per UNIQUE (grp, field)
194 );
195
196 CREATE TABLE vandelay.merge_profile (
197     id              BIGSERIAL   PRIMARY KEY,
198     owner           INT         NOT NULL REFERENCES actor.org_unit (id) ON UPDATE CASCADE ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
199     name            TEXT        NOT NULL,
200     add_spec        TEXT,
201     replace_spec    TEXT,
202     strip_spec      TEXT,
203     preserve_spec   TEXT,
204     lwm_ratio       NUMERIC,
205         CONSTRAINT vand_merge_prof_owner_name_idx UNIQUE (owner,name),
206         CONSTRAINT add_replace_strip_or_preserve CHECK ((preserve_spec IS NOT NULL OR replace_spec IS NOT NULL) OR (preserve_spec IS NULL AND replace_spec IS NULL))
207 );
208
209 CREATE OR REPLACE FUNCTION vandelay.marc21_record_type( marc TEXT ) RETURNS config.marc21_rec_type_map AS $func$
210 DECLARE
211     ldr         TEXT;
212     tval        TEXT;
213     tval_rec    RECORD;
214     bval        TEXT;
215     bval_rec    RECORD;
216     retval      config.marc21_rec_type_map%ROWTYPE;
217 BEGIN
218     ldr := oils_xpath_string( '//*[local-name()="leader"]', marc );
219
220     IF ldr IS NULL OR ldr = '' THEN
221         SELECT * INTO retval FROM config.marc21_rec_type_map WHERE code = 'BKS';
222         RETURN retval;
223     END IF;
224
225     SELECT * INTO tval_rec FROM config.marc21_ff_pos_map WHERE fixed_field = 'Type' LIMIT 1; -- They're all the same
226     SELECT * INTO bval_rec FROM config.marc21_ff_pos_map WHERE fixed_field = 'BLvl' LIMIT 1; -- They're all the same
227
228
229     tval := SUBSTRING( ldr, tval_rec.start_pos + 1, tval_rec.length );
230     bval := SUBSTRING( ldr, bval_rec.start_pos + 1, bval_rec.length );
231
232     -- RAISE NOTICE 'type %, blvl %, ldr %', tval, bval, ldr;
233
234     SELECT * INTO retval FROM config.marc21_rec_type_map WHERE type_val LIKE '%' || tval || '%' AND blvl_val LIKE '%' || bval || '%';
235
236
237     IF retval.code IS NULL THEN
238         SELECT * INTO retval FROM config.marc21_rec_type_map WHERE code = 'BKS';
239     END IF;
240
241     RETURN retval;
242 END;
243 $func$ LANGUAGE PLPGSQL;
244
245 CREATE OR REPLACE FUNCTION vandelay.marc21_extract_fixed_field( marc TEXT, ff TEXT ) RETURNS TEXT AS $func$
246 DECLARE
247     rtype       TEXT;
248     ff_pos      RECORD;
249     tag_data    RECORD;
250     val         TEXT;
251 BEGIN
252     rtype := (vandelay.marc21_record_type( marc )).code;
253     FOR ff_pos IN SELECT * FROM config.marc21_ff_pos_map WHERE fixed_field = ff AND rec_type = rtype ORDER BY tag DESC LOOP
254         FOR tag_data IN SELECT value FROM UNNEST( oils_xpath( '//*[@tag="' || UPPER(ff_pos.tag) || '"]/text()', marc ) ) x(value) LOOP
255             val := SUBSTRING( tag_data.value, ff_pos.start_pos + 1, ff_pos.length );
256             RETURN val;
257         END LOOP;
258         val := REPEAT( ff_pos.default_val, ff_pos.length );
259         RETURN val;
260     END LOOP;
261
262     RETURN NULL;
263 END;
264 $func$ LANGUAGE PLPGSQL;
265
266 CREATE TYPE biblio.record_ff_map AS (record BIGINT, ff_name TEXT, ff_value TEXT);
267 CREATE OR REPLACE FUNCTION vandelay.marc21_extract_all_fixed_fields( marc TEXT ) RETURNS SETOF biblio.record_ff_map AS $func$
268 DECLARE
269     tag_data    TEXT;
270     rtype       TEXT;
271     ff_pos      RECORD;
272     output      biblio.record_ff_map%ROWTYPE;
273 BEGIN
274     rtype := (vandelay.marc21_record_type( marc )).code;
275
276     FOR ff_pos IN SELECT * FROM config.marc21_ff_pos_map WHERE rec_type = rtype ORDER BY tag DESC LOOP
277         output.ff_name  := ff_pos.fixed_field;
278         output.ff_value := NULL;
279
280         FOR tag_data IN SELECT value FROM UNNEST( oils_xpath( '//*[@tag="' || UPPER(tag) || '"]/text()', marc ) ) x(value) LOOP
281             output.ff_value := SUBSTRING( tag_data.value, ff_pos.start_pos + 1, ff_pos.length );
282             IF output.ff_value IS NULL THEN output.ff_value := REPEAT( ff_pos.default_val, ff_pos.length ); END IF;
283             RETURN NEXT output;
284             output.ff_value := NULL;
285         END LOOP;
286
287     END LOOP;
288
289     RETURN;
290 END;
291 $func$ LANGUAGE PLPGSQL;
292
293 CREATE TYPE biblio.marc21_physical_characteristics AS ( id INT, record BIGINT, ptype TEXT, subfield INT, value INT );
294 CREATE OR REPLACE FUNCTION vandelay.marc21_physical_characteristics( marc TEXT) RETURNS SETOF biblio.marc21_physical_characteristics AS $func$
295 DECLARE
296     rowid   INT := 0;
297     _007    TEXT;
298     ptype   config.marc21_physical_characteristic_type_map%ROWTYPE;
299     psf     config.marc21_physical_characteristic_subfield_map%ROWTYPE;
300     pval    config.marc21_physical_characteristic_value_map%ROWTYPE;
301     retval  biblio.marc21_physical_characteristics%ROWTYPE;
302 BEGIN
303
304     _007 := oils_xpath_string( '//*[@tag="007"]', marc );
305
306     IF _007 IS NOT NULL AND _007 <> '' THEN
307         SELECT * INTO ptype FROM config.marc21_physical_characteristic_type_map WHERE ptype_key = SUBSTRING( _007, 1, 1 );
308
309         IF ptype.ptype_key IS NOT NULL THEN
310             FOR psf IN SELECT * FROM config.marc21_physical_characteristic_subfield_map WHERE ptype_key = ptype.ptype_key LOOP
311                 SELECT * INTO pval FROM config.marc21_physical_characteristic_value_map WHERE ptype_subfield = psf.id AND value = SUBSTRING( _007, psf.start_pos + 1, psf.length );
312
313                 IF pval.id IS NOT NULL THEN
314                     rowid := rowid + 1;
315                     retval.id := rowid;
316                     retval.ptype := ptype.ptype_key;
317                     retval.subfield := psf.id;
318                     retval.value := pval.id;
319                     RETURN NEXT retval;
320                 END IF;
321
322             END LOOP;
323         END IF;
324     END IF;
325
326     RETURN;
327 END;
328 $func$ LANGUAGE PLPGSQL;
329
330 CREATE TYPE vandelay.flat_marc AS ( tag CHAR(3), ind1 TEXT, ind2 TEXT, subfield TEXT, value TEXT );
331 CREATE OR REPLACE FUNCTION vandelay.flay_marc ( TEXT ) RETURNS SETOF vandelay.flat_marc AS $func$
332
333 use MARC::Record;
334 use MARC::File::XML (BinaryEncoding => 'UTF-8');
335 use MARC::Charset;
336 use strict;
337
338 MARC::Charset->assume_unicode(1);
339
340 my $xml = shift;
341 my $r = MARC::Record->new_from_xml( $xml );
342
343 return_next( { tag => 'LDR', value => $r->leader } );
344
345 for my $f ( $r->fields ) {
346     if ($f->is_control_field) {
347         return_next({ tag => $f->tag, value => $f->data });
348     } else {
349         for my $s ($f->subfields) {
350             return_next({
351                 tag      => $f->tag,
352                 ind1     => $f->indicator(1),
353                 ind2     => $f->indicator(2),
354                 subfield => $s->[0],
355                 value    => $s->[1]
356             });
357
358             if ( $f->tag eq '245' and $s->[0] eq 'a' ) {
359                 my $trim = $f->indicator(2) || 0;
360                 return_next({
361                     tag      => 'tnf',
362                     ind1     => $f->indicator(1),
363                     ind2     => $f->indicator(2),
364                     subfield => 'a',
365                     value    => substr( $s->[1], $trim )
366                 });
367             }
368         }
369     }
370 }
371
372 return undef;
373
374 $func$ LANGUAGE PLPERLU;
375
376 CREATE OR REPLACE FUNCTION vandelay.flatten_marc ( marc TEXT ) RETURNS SETOF vandelay.flat_marc AS $func$
377 DECLARE
378     output  vandelay.flat_marc%ROWTYPE;
379     field   RECORD;
380 BEGIN
381     FOR field IN SELECT * FROM vandelay.flay_marc( marc ) LOOP
382         output.ind1 := field.ind1;
383         output.ind2 := field.ind2;
384         output.tag := field.tag;
385         output.subfield := field.subfield;
386         IF field.subfield IS NOT NULL AND field.tag NOT IN ('020','022','024') THEN -- exclude standard numbers and control fields
387             output.value := naco_normalize(field.value, field.subfield);
388         ELSE
389             output.value := field.value;
390         END IF;
391
392         CONTINUE WHEN output.value IS NULL;
393
394         RETURN NEXT output;
395     END LOOP;
396 END;
397 $func$ LANGUAGE PLPGSQL;
398
399 CREATE OR REPLACE FUNCTION vandelay.extract_rec_attrs ( xml TEXT, attr_defs TEXT[]) RETURNS hstore AS $_$
400 DECLARE
401     transformed_xml TEXT;
402     prev_xfrm       TEXT;
403     normalizer      RECORD;
404     xfrm            config.xml_transform%ROWTYPE;
405     attr_value      TEXT;
406     new_attrs       HSTORE := ''::HSTORE;
407     attr_def        config.record_attr_definition%ROWTYPE;
408 BEGIN
409
410     FOR attr_def IN SELECT * FROM config.record_attr_definition WHERE name IN (SELECT * FROM UNNEST(attr_defs)) ORDER BY format LOOP
411
412         IF attr_def.tag IS NOT NULL THEN -- tag (and optional subfield list) selection
413             SELECT  ARRAY_TO_STRING(ARRAY_ACCUM(x.value), COALESCE(attr_def.joiner,' ')) INTO attr_value
414               FROM  vandelay.flatten_marc(xml) AS x
415               WHERE x.tag LIKE attr_def.tag
416                     AND CASE
417                         WHEN attr_def.sf_list IS NOT NULL
418                             THEN POSITION(x.subfield IN attr_def.sf_list) > 0
419                         ELSE TRUE
420                         END
421               GROUP BY x.tag
422               ORDER BY x.tag
423               LIMIT 1;
424
425         ELSIF attr_def.fixed_field IS NOT NULL THEN -- a named fixed field, see config.marc21_ff_pos_map.fixed_field
426             attr_value := vandelay.marc21_extract_fixed_field(xml, attr_def.fixed_field);
427
428         ELSIF attr_def.xpath IS NOT NULL THEN -- and xpath expression
429
430             SELECT INTO xfrm * FROM config.xml_transform WHERE name = attr_def.format;
431
432             -- See if we can skip the XSLT ... it's expensive
433             IF prev_xfrm IS NULL OR prev_xfrm <> xfrm.name THEN
434                 -- Can't skip the transform
435                 IF xfrm.xslt <> '---' THEN
436                     transformed_xml := oils_xslt_process(xml,xfrm.xslt);
437                 ELSE
438                     transformed_xml := xml;
439                 END IF;
440
441                 prev_xfrm := xfrm.name;
442             END IF;
443
444             IF xfrm.name IS NULL THEN
445                 -- just grab the marcxml (empty) transform
446                 SELECT INTO xfrm * FROM config.xml_transform WHERE xslt = '---' LIMIT 1;
447                 prev_xfrm := xfrm.name;
448             END IF;
449
450             attr_value := oils_xpath_string(attr_def.xpath, transformed_xml, COALESCE(attr_def.joiner,' '), ARRAY[ARRAY[xfrm.prefix, xfrm.namespace_uri]]);
451
452         ELSIF attr_def.phys_char_sf IS NOT NULL THEN -- a named Physical Characteristic, see config.marc21_physical_characteristic_*_map
453             SELECT  m.value::TEXT INTO attr_value
454               FROM  vandelay.marc21_physical_characteristics(xml) v
455                     JOIN config.marc21_physical_characteristic_value_map m ON (m.id = v.value)
456               WHERE v.subfield = attr_def.phys_char_sf
457               LIMIT 1; -- Just in case ...
458
459         END IF;
460
461         -- apply index normalizers to attr_value
462         FOR normalizer IN
463             SELECT  n.func AS func,
464                     n.param_count AS param_count,
465                     m.params AS params
466               FROM  config.index_normalizer n
467                     JOIN config.record_attr_index_norm_map m ON (m.norm = n.id)
468               WHERE attr = attr_def.name
469               ORDER BY m.pos LOOP
470                 EXECUTE 'SELECT ' || normalizer.func || '(' ||
471                     quote_nullable( attr_value ) ||
472                     CASE
473                         WHEN normalizer.param_count > 0
474                             THEN ',' || REPLACE(REPLACE(BTRIM(normalizer.params,'[]'),E'\'',E'\\\''),E'"',E'\'')
475                             ELSE ''
476                         END ||
477                     ')' INTO attr_value;
478
479         END LOOP;
480
481         -- Add the new value to the hstore
482         new_attrs := new_attrs || hstore( attr_def.name, attr_value );
483
484     END LOOP;
485
486     RETURN new_attrs;
487 END;
488 $_$ LANGUAGE PLPGSQL;
489
490 CREATE OR REPLACE FUNCTION vandelay.extract_rec_attrs ( xml TEXT ) RETURNS hstore AS $_$
491     SELECT vandelay.extract_rec_attrs( $1, (SELECT ARRAY_ACCUM(name) FROM config.record_attr_definition));
492 $_$ LANGUAGE SQL;
493
494 -- Everything between this comment and the beginning of the definition of
495 -- vandelay.match_bib_record() is strictly in service of that function.
496 CREATE TYPE vandelay.match_set_test_result AS (record BIGINT, quality INTEGER);
497
498 CREATE OR REPLACE FUNCTION vandelay.match_set_test_marcxml(
499     match_set_id INTEGER, record_xml TEXT, bucket_id INTEGER 
500 ) RETURNS SETOF vandelay.match_set_test_result AS $$
501 DECLARE
502     tags_rstore HSTORE;
503     svf_rstore  HSTORE;
504     coal        TEXT;
505     joins       TEXT;
506     query_      TEXT;
507     wq          TEXT;
508     qvalue      INTEGER;
509     rec         RECORD;
510 BEGIN
511     tags_rstore := vandelay.flatten_marc_hstore(record_xml);
512     svf_rstore := vandelay.extract_rec_attrs(record_xml);
513
514     CREATE TEMPORARY TABLE _vandelay_tmp_qrows (q INTEGER);
515     CREATE TEMPORARY TABLE _vandelay_tmp_jrows (j TEXT);
516
517     -- generate the where clause and return that directly (into wq), and as
518     -- a side-effect, populate the _vandelay_tmp_[qj]rows tables.
519     wq := vandelay.get_expr_from_match_set(match_set_id, tags_rstore);
520
521     query_ := 'SELECT DISTINCT(record), ';
522
523     -- qrows table is for the quality bits we add to the SELECT clause
524     SELECT ARRAY_TO_STRING(
525         ARRAY_ACCUM('COALESCE(n' || q::TEXT || '.quality, 0)'), ' + '
526     ) INTO coal FROM _vandelay_tmp_qrows;
527
528     -- our query string so far is the SELECT clause and the inital FROM.
529     -- no JOINs yet nor the WHERE clause
530     query_ := query_ || coal || ' AS quality ' || E'\n';
531
532     -- jrows table is for the joins we must make (and the real text conditions)
533     SELECT ARRAY_TO_STRING(ARRAY_ACCUM(j), E'\n') INTO joins
534         FROM _vandelay_tmp_jrows;
535
536     -- add those joins and the where clause to our query.
537     query_ := query_ || joins || E'\n';
538
539     -- join the record bucket
540     IF bucket_id IS NOT NULL THEN
541         query_ := query_ || 'JOIN container.biblio_record_entry_bucket_item ' ||
542             'brebi ON (brebi.target_biblio_record_entry = record ' ||
543             'AND brebi.bucket = ' || bucket_id || E')\n';
544     END IF;
545
546     query_ := query_ || 'JOIN biblio.record_entry bre ON (bre.id = record) ' || 'WHERE ' || wq || ' AND not bre.deleted';
547
548     -- this will return rows of record,quality
549     FOR rec IN EXECUTE query_ USING tags_rstore, svf_rstore LOOP
550         RETURN NEXT rec;
551     END LOOP;
552
553     DROP TABLE _vandelay_tmp_qrows;
554     DROP TABLE _vandelay_tmp_jrows;
555     RETURN;
556 END;
557 $$ LANGUAGE PLPGSQL;
558
559
560 CREATE OR REPLACE FUNCTION vandelay.flatten_marc_hstore(
561     record_xml TEXT
562 ) RETURNS HSTORE AS $func$
563 BEGIN
564     RETURN (SELECT
565         HSTORE(
566             ARRAY_ACCUM(tag || (COALESCE(subfield, ''))),
567             ARRAY_ACCUM(value)
568         )
569         FROM (
570             SELECT  tag, subfield, ARRAY_ACCUM(value)::TEXT AS value
571               FROM  (SELECT tag,
572                             subfield,
573                             CASE WHEN tag = '020' THEN -- caseless -- isbn
574                                 LOWER((REGEXP_MATCHES(value,$$^(\S{10,17})$$))[1] || '%')
575                             WHEN tag = '022' THEN -- caseless -- issn
576                                 LOWER((REGEXP_MATCHES(value,$$^(\S{4}[- ]?\S{4})$$))[1] || '%')
577                             WHEN tag = '024' THEN -- caseless -- upc (other)
578                                 LOWER(value || '%')
579                             ELSE
580                                 value
581                             END AS value
582                       FROM  vandelay.flatten_marc(record_xml)) x
583                 GROUP BY tag, subfield ORDER BY tag, subfield
584         ) subquery
585     );
586 END;
587 $func$ LANGUAGE PLPGSQL;
588
589 CREATE OR REPLACE FUNCTION vandelay.get_expr_from_match_set(
590     match_set_id INTEGER,
591     tags_rstore HSTORE
592 ) RETURNS TEXT AS $$
593 DECLARE
594     root    vandelay.match_set_point;
595 BEGIN
596     SELECT * INTO root FROM vandelay.match_set_point
597         WHERE parent IS NULL AND match_set = match_set_id;
598
599     RETURN vandelay.get_expr_from_match_set_point(root, tags_rstore);
600 END;
601 $$  LANGUAGE PLPGSQL;
602
603 CREATE OR REPLACE FUNCTION vandelay.get_expr_from_match_set_point(
604     node vandelay.match_set_point,
605     tags_rstore HSTORE
606 ) RETURNS TEXT AS $$
607 DECLARE
608     q           TEXT;
609     i           INTEGER;
610     this_op     TEXT;
611     children    INTEGER[];
612     child       vandelay.match_set_point;
613 BEGIN
614     SELECT ARRAY_ACCUM(id) INTO children FROM vandelay.match_set_point
615         WHERE parent = node.id;
616
617     IF ARRAY_LENGTH(children, 1) > 0 THEN
618         this_op := vandelay._get_expr_render_one(node);
619         q := '(';
620         i := 1;
621         WHILE children[i] IS NOT NULL LOOP
622             SELECT * INTO child FROM vandelay.match_set_point
623                 WHERE id = children[i];
624             IF i > 1 THEN
625                 q := q || ' ' || this_op || ' ';
626             END IF;
627             i := i + 1;
628             q := q || vandelay.get_expr_from_match_set_point(child, tags_rstore);
629         END LOOP;
630         q := q || ')';
631         RETURN q;
632     ELSIF node.bool_op IS NULL THEN
633         PERFORM vandelay._get_expr_push_qrow(node);
634         PERFORM vandelay._get_expr_push_jrow(node, tags_rstore);
635         RETURN vandelay._get_expr_render_one(node);
636     ELSE
637         RETURN '';
638     END IF;
639 END;
640 $$  LANGUAGE PLPGSQL;
641
642 CREATE OR REPLACE FUNCTION vandelay._get_expr_push_qrow(
643     node vandelay.match_set_point
644 ) RETURNS VOID AS $$
645 DECLARE
646 BEGIN
647     INSERT INTO _vandelay_tmp_qrows (q) VALUES (node.id);
648 END;
649 $$ LANGUAGE PLPGSQL;
650
651 CREATE OR REPLACE FUNCTION vandelay._get_expr_push_jrow(
652     node vandelay.match_set_point,
653     tags_rstore HSTORE
654 ) RETURNS VOID AS $$
655 DECLARE
656     jrow        TEXT;
657     my_alias    TEXT;
658     op          TEXT;
659     tagkey      TEXT;
660     caseless    BOOL;
661     jrow_count  INT;
662     my_using    TEXT;
663     my_join     TEXT;
664 BEGIN
665     -- remember $1 is tags_rstore, and $2 is svf_rstore
666
667     caseless := FALSE;
668     SELECT COUNT(*) INTO jrow_count FROM _vandelay_tmp_jrows;
669     IF jrow_count > 0 THEN
670         my_using := ' USING (record)';
671         my_join := 'FULL OUTER JOIN';
672     ELSE
673         my_using := '';
674         my_join := 'FROM';
675     END IF;
676
677     IF node.tag IS NOT NULL THEN
678         caseless := (node.tag IN ('020', '022', '024'));
679         tagkey := node.tag;
680         IF node.subfield IS NOT NULL THEN
681             tagkey := tagkey || node.subfield;
682         END IF;
683     END IF;
684
685     IF node.negate THEN
686         IF caseless THEN
687             op := 'NOT LIKE';
688         ELSE
689             op := '<>';
690         END IF;
691     ELSE
692         IF caseless THEN
693             op := 'LIKE';
694         ELSE
695             op := '=';
696         END IF;
697     END IF;
698
699     my_alias := 'n' || node.id::TEXT;
700
701     jrow := my_join || ' (SELECT *, ';
702     IF node.tag IS NOT NULL THEN
703         jrow := jrow  || node.quality ||
704             ' AS quality FROM metabib.full_rec mfr WHERE mfr.tag = ''' ||
705             node.tag || '''';
706         IF node.subfield IS NOT NULL THEN
707             jrow := jrow || ' AND mfr.subfield = ''' ||
708                 node.subfield || '''';
709         END IF;
710         jrow := jrow || ' AND (';
711         jrow := jrow || vandelay._node_tag_comparisons(caseless, op, tags_rstore, tagkey);
712         jrow := jrow || ')) ' || my_alias || my_using || E'\n';
713     ELSE    -- svf
714         jrow := jrow || 'id AS record, ' || node.quality ||
715             ' AS quality FROM metabib.record_attr mra WHERE mra.attrs->''' ||
716             node.svf || ''' ' || op || ' $2->''' || node.svf || ''') ' ||
717             my_alias || my_using || E'\n';
718     END IF;
719     INSERT INTO _vandelay_tmp_jrows (j) VALUES (jrow);
720 END;
721 $$ LANGUAGE PLPGSQL;
722
723 CREATE OR REPLACE FUNCTION vandelay._node_tag_comparisons(
724     caseless BOOLEAN,
725     op TEXT,
726     tags_rstore HSTORE,
727     tagkey TEXT
728 ) RETURNS TEXT AS $$
729 DECLARE
730     result  TEXT;
731     i       INT;
732     vals    TEXT[];
733 BEGIN
734     i := 1;
735     vals := tags_rstore->tagkey;
736     result := '';
737
738     WHILE TRUE LOOP
739         IF i > 1 THEN
740             IF vals[i] IS NULL THEN
741                 EXIT;
742             ELSE
743                 result := result || ' OR ';
744             END IF;
745         END IF;
746
747         IF caseless THEN
748             result := result || 'LOWER(mfr.value) ' || op;
749         ELSE
750             result := result || 'mfr.value ' || op;
751         END IF;
752
753         result := result || ' ' || COALESCE('''' || vals[i] || '''', 'NULL');
754
755         IF vals[i] IS NULL THEN
756             EXIT;
757         END IF;
758         i := i + 1;
759     END LOOP;
760
761     RETURN result;
762
763 END;
764 $$ LANGUAGE PLPGSQL;
765
766 CREATE OR REPLACE FUNCTION vandelay._get_expr_render_one(
767     node vandelay.match_set_point
768 ) RETURNS TEXT AS $$
769 DECLARE
770     s           TEXT;
771 BEGIN
772     IF node.bool_op IS NOT NULL THEN
773         RETURN node.bool_op;
774     ELSE
775         RETURN '(n' || node.id::TEXT || '.id IS NOT NULL)';
776     END IF;
777 END;
778 $$ LANGUAGE PLPGSQL;
779
780 CREATE OR REPLACE FUNCTION vandelay.match_bib_record() RETURNS TRIGGER AS $func$
781 DECLARE
782     incoming_existing_id    TEXT;
783     test_result             vandelay.match_set_test_result%ROWTYPE;
784     tmp_rec                 BIGINT;
785     match_set               INT;
786     match_bucket            INT;
787 BEGIN
788     IF TG_OP IN ('INSERT','UPDATE') AND NEW.imported_as IS NOT NULL THEN
789         RETURN NEW;
790     END IF;
791
792     DELETE FROM vandelay.bib_match WHERE queued_record = NEW.id;
793
794     SELECT q.match_set INTO match_set FROM vandelay.bib_queue q WHERE q.id = NEW.queue;
795
796     IF match_set IS NOT NULL THEN
797         NEW.quality := vandelay.measure_record_quality( NEW.marc, match_set );
798     END IF;
799
800     -- Perfect matches on 901$c exit early with a match with high quality.
801     incoming_existing_id :=
802         oils_xpath_string('//*[@tag="901"]/*[@code="c"][1]', NEW.marc);
803
804     IF incoming_existing_id IS NOT NULL AND incoming_existing_id != '' THEN
805         SELECT id INTO tmp_rec FROM biblio.record_entry WHERE id = incoming_existing_id::bigint;
806         IF tmp_rec IS NOT NULL THEN
807             INSERT INTO vandelay.bib_match (queued_record, eg_record, match_score, quality) 
808                 SELECT
809                     NEW.id, 
810                     b.id,
811                     9999,
812                     -- note: no match_set means quality==0
813                     vandelay.measure_record_quality( b.marc, match_set )
814                 FROM biblio.record_entry b
815                 WHERE id = incoming_existing_id::bigint;
816         END IF;
817     END IF;
818
819     IF match_set IS NULL THEN
820         RETURN NEW;
821     END IF;
822
823     SELECT q.match_bucket INTO match_bucket FROM vandelay.bib_queue q WHERE q.id = NEW.queue;
824
825     FOR test_result IN SELECT * FROM
826         vandelay.match_set_test_marcxml(match_set, NEW.marc, match_bucket) LOOP
827
828         INSERT INTO vandelay.bib_match ( queued_record, eg_record, match_score, quality )
829             SELECT  
830                 NEW.id,
831                 test_result.record,
832                 test_result.quality,
833                 vandelay.measure_record_quality( b.marc, match_set )
834                 FROM  biblio.record_entry b
835                 WHERE id = test_result.record;
836
837     END LOOP;
838
839     RETURN NEW;
840 END;
841 $func$ LANGUAGE PLPGSQL;
842
843 CREATE OR REPLACE FUNCTION vandelay.measure_record_quality ( xml TEXT, match_set_id INT ) RETURNS INT AS $_$
844 DECLARE
845     out_q   INT := 0;
846     rvalue  TEXT;
847     test    vandelay.match_set_quality%ROWTYPE;
848 BEGIN
849
850     FOR test IN SELECT * FROM vandelay.match_set_quality WHERE match_set = match_set_id LOOP
851         IF test.tag IS NOT NULL THEN
852             FOR rvalue IN SELECT value FROM vandelay.flatten_marc( xml ) WHERE tag = test.tag AND subfield = test.subfield LOOP
853                 IF test.value = rvalue THEN
854                     out_q := out_q + test.quality;
855                 END IF;
856             END LOOP;
857         ELSE
858             IF test.value = vandelay.extract_rec_attrs(xml, ARRAY[test.svf]) -> test.svf THEN
859                 out_q := out_q + test.quality;
860             END IF;
861         END IF;
862     END LOOP;
863
864     RETURN out_q;
865 END;
866 $_$ LANGUAGE PLPGSQL;
867
868 CREATE TYPE vandelay.tcn_data AS (tcn TEXT, tcn_source TEXT, used BOOL);
869 CREATE OR REPLACE FUNCTION vandelay.find_bib_tcn_data ( xml TEXT ) RETURNS SETOF vandelay.tcn_data AS $_$
870 DECLARE
871     eg_tcn          TEXT;
872     eg_tcn_source   TEXT;
873     output          vandelay.tcn_data%ROWTYPE;
874 BEGIN
875
876     -- 001/003
877     eg_tcn := BTRIM((oils_xpath('//*[@tag="001"]/text()',xml))[1]);
878     IF eg_tcn IS NOT NULL AND eg_tcn <> '' THEN
879
880         eg_tcn_source := BTRIM((oils_xpath('//*[@tag="003"]/text()',xml))[1]);
881         IF eg_tcn_source IS NULL OR eg_tcn_source = '' THEN
882             eg_tcn_source := 'System Local';
883         END IF;
884
885         PERFORM id FROM biblio.record_entry WHERE tcn_value = eg_tcn  AND NOT deleted;
886
887         IF NOT FOUND THEN
888             output.used := FALSE;
889         ELSE
890             output.used := TRUE;
891         END IF;
892
893         output.tcn := eg_tcn;
894         output.tcn_source := eg_tcn_source;
895         RETURN NEXT output;
896
897     END IF;
898
899     -- 901 ab
900     eg_tcn := BTRIM((oils_xpath('//*[@tag="901"]/*[@code="a"]/text()',xml))[1]);
901     IF eg_tcn IS NOT NULL AND eg_tcn <> '' THEN
902
903         eg_tcn_source := BTRIM((oils_xpath('//*[@tag="901"]/*[@code="b"]/text()',xml))[1]);
904         IF eg_tcn_source IS NULL OR eg_tcn_source = '' THEN
905             eg_tcn_source := 'System Local';
906         END IF;
907
908         PERFORM id FROM biblio.record_entry WHERE tcn_value = eg_tcn  AND NOT deleted;
909
910         IF NOT FOUND THEN
911             output.used := FALSE;
912         ELSE
913             output.used := TRUE;
914         END IF;
915
916         output.tcn := eg_tcn;
917         output.tcn_source := eg_tcn_source;
918         RETURN NEXT output;
919
920     END IF;
921
922     -- 039 ab
923     eg_tcn := BTRIM((oils_xpath('//*[@tag="039"]/*[@code="a"]/text()',xml))[1]);
924     IF eg_tcn IS NOT NULL AND eg_tcn <> '' THEN
925
926         eg_tcn_source := BTRIM((oils_xpath('//*[@tag="039"]/*[@code="b"]/text()',xml))[1]);
927         IF eg_tcn_source IS NULL OR eg_tcn_source = '' THEN
928             eg_tcn_source := 'System Local';
929         END IF;
930
931         PERFORM id FROM biblio.record_entry WHERE tcn_value = eg_tcn  AND NOT deleted;
932
933         IF NOT FOUND THEN
934             output.used := FALSE;
935         ELSE
936             output.used := TRUE;
937         END IF;
938
939         output.tcn := eg_tcn;
940         output.tcn_source := eg_tcn_source;
941         RETURN NEXT output;
942
943     END IF;
944
945     -- 020 a
946     eg_tcn := REGEXP_REPLACE((oils_xpath('//*[@tag="020"]/*[@code="a"]/text()',xml))[1], $re$^(\w+).*?$$re$, $re$\1$re$);
947     IF eg_tcn IS NOT NULL AND eg_tcn <> '' THEN
948
949         eg_tcn_source := 'ISBN';
950
951         PERFORM id FROM biblio.record_entry WHERE tcn_value = eg_tcn  AND NOT deleted;
952
953         IF NOT FOUND THEN
954             output.used := FALSE;
955         ELSE
956             output.used := TRUE;
957         END IF;
958
959         output.tcn := eg_tcn;
960         output.tcn_source := eg_tcn_source;
961         RETURN NEXT output;
962
963     END IF;
964
965     -- 022 a
966     eg_tcn := REGEXP_REPLACE((oils_xpath('//*[@tag="022"]/*[@code="a"]/text()',xml))[1], $re$^(\w+).*?$$re$, $re$\1$re$);
967     IF eg_tcn IS NOT NULL AND eg_tcn <> '' THEN
968
969         eg_tcn_source := 'ISSN';
970
971         PERFORM id FROM biblio.record_entry WHERE tcn_value = eg_tcn  AND NOT deleted;
972
973         IF NOT FOUND THEN
974             output.used := FALSE;
975         ELSE
976             output.used := TRUE;
977         END IF;
978
979         output.tcn := eg_tcn;
980         output.tcn_source := eg_tcn_source;
981         RETURN NEXT output;
982
983     END IF;
984
985     -- 010 a
986     eg_tcn := REGEXP_REPLACE((oils_xpath('//*[@tag="010"]/*[@code="a"]/text()',xml))[1], $re$^(\w+).*?$$re$, $re$\1$re$);
987     IF eg_tcn IS NOT NULL AND eg_tcn <> '' THEN
988
989         eg_tcn_source := 'LCCN';
990
991         PERFORM id FROM biblio.record_entry WHERE tcn_value = eg_tcn  AND NOT deleted;
992
993         IF NOT FOUND THEN
994             output.used := FALSE;
995         ELSE
996             output.used := TRUE;
997         END IF;
998
999         output.tcn := eg_tcn;
1000         output.tcn_source := eg_tcn_source;
1001         RETURN NEXT output;
1002
1003     END IF;
1004
1005     -- 035 a
1006     eg_tcn := REGEXP_REPLACE((oils_xpath('//*[@tag="035"]/*[@code="a"]/text()',xml))[1], $re$^.*?(\w+)$$re$, $re$\1$re$);
1007     IF eg_tcn IS NOT NULL AND eg_tcn <> '' THEN
1008
1009         eg_tcn_source := 'System Legacy';
1010
1011         PERFORM id FROM biblio.record_entry WHERE tcn_value = eg_tcn  AND NOT deleted;
1012
1013         IF NOT FOUND THEN
1014             output.used := FALSE;
1015         ELSE
1016             output.used := TRUE;
1017         END IF;
1018
1019         output.tcn := eg_tcn;
1020         output.tcn_source := eg_tcn_source;
1021         RETURN NEXT output;
1022
1023     END IF;
1024
1025     RETURN;
1026 END;
1027 $_$ LANGUAGE PLPGSQL;
1028
1029 CREATE OR REPLACE FUNCTION vandelay.add_field ( target_xml TEXT, source_xml TEXT, field TEXT, force_add INT ) RETURNS TEXT AS $_$
1030
1031     use MARC::Record;
1032     use MARC::File::XML (BinaryEncoding => 'UTF-8');
1033     use MARC::Charset;
1034     use strict;
1035
1036     MARC::Charset->assume_unicode(1);
1037
1038     my $target_xml = shift;
1039     my $source_xml = shift;
1040     my $field_spec = shift;
1041     my $force_add = shift || 0;
1042
1043     my $target_r = MARC::Record->new_from_xml( $target_xml );
1044     my $source_r = MARC::Record->new_from_xml( $source_xml );
1045
1046     return $target_xml unless ($target_r && $source_r);
1047
1048     my @field_list = split(',', $field_spec);
1049
1050     my %fields;
1051     for my $f (@field_list) {
1052         $f =~ s/^\s*//; $f =~ s/\s*$//;
1053         if ($f =~ /^(.{3})(\w*)(?:\[([^]]*)\])?$/) {
1054             my $field = $1;
1055             $field =~ s/\s+//;
1056             my $sf = $2;
1057             $sf =~ s/\s+//;
1058             my $match = $3;
1059             $match =~ s/^\s*//; $match =~ s/\s*$//;
1060             $fields{$field} = { sf => [ split('', $sf) ] };
1061             if ($match) {
1062                 my ($msf,$mre) = split('~', $match);
1063                 if (length($msf) > 0 and length($mre) > 0) {
1064                     $msf =~ s/^\s*//; $msf =~ s/\s*$//;
1065                     $mre =~ s/^\s*//; $mre =~ s/\s*$//;
1066                     $fields{$field}{match} = { sf => $msf, re => qr/$mre/ };
1067                 }
1068             }
1069         }
1070     }
1071
1072     for my $f ( keys %fields) {
1073         if ( @{$fields{$f}{sf}} ) {
1074             for my $from_field ($source_r->field( $f )) {
1075                 my @tos = $target_r->field( $f );
1076                 if (!@tos) {
1077                     next if (exists($fields{$f}{match}) and !$force_add);
1078                     my @new_fields = map { $_->clone } $source_r->field( $f );
1079                     $target_r->insert_fields_ordered( @new_fields );
1080                 } else {
1081                     for my $to_field (@tos) {
1082                         if (exists($fields{$f}{match})) {
1083                             next unless (grep { $_ =~ $fields{$f}{match}{re} } $to_field->subfield($fields{$f}{match}{sf}));
1084                         }
1085                         my @new_sf = map { ($_ => $from_field->subfield($_)) } grep { defined($from_field->subfield($_)) } @{$fields{$f}{sf}};
1086                         $to_field->add_subfields( @new_sf );
1087                     }
1088                 }
1089             }
1090         } else {
1091             my @new_fields = map { $_->clone } $source_r->field( $f );
1092             $target_r->insert_fields_ordered( @new_fields );
1093         }
1094     }
1095
1096     $target_xml = $target_r->as_xml_record;
1097     $target_xml =~ s/^<\?.+?\?>$//mo;
1098     $target_xml =~ s/\n//sgo;
1099     $target_xml =~ s/>\s+</></sgo;
1100
1101     return $target_xml;
1102
1103 $_$ LANGUAGE PLPERLU;
1104
1105 CREATE OR REPLACE FUNCTION vandelay.add_field ( target_xml TEXT, source_xml TEXT, field TEXT ) RETURNS TEXT AS $_$
1106     SELECT vandelay.add_field( $1, $2, $3, 0 );
1107 $_$ LANGUAGE SQL;
1108
1109 CREATE OR REPLACE FUNCTION vandelay.strip_field ( xml TEXT, field TEXT ) RETURNS TEXT AS $_$
1110
1111     use MARC::Record;
1112     use MARC::File::XML (BinaryEncoding => 'UTF-8');
1113     use MARC::Charset;
1114     use strict;
1115
1116     MARC::Charset->assume_unicode(1);
1117
1118     my $xml = shift;
1119     my $r = MARC::Record->new_from_xml( $xml );
1120
1121     return $xml unless ($r);
1122
1123     my $field_spec = shift;
1124     my @field_list = split(',', $field_spec);
1125
1126     my %fields;
1127     for my $f (@field_list) {
1128         $f =~ s/^\s*//; $f =~ s/\s*$//;
1129         if ($f =~ /^(.{3})(\w*)(?:\[([^]]*)\])?$/) {
1130             my $field = $1;
1131             $field =~ s/\s+//;
1132             my $sf = $2;
1133             $sf =~ s/\s+//;
1134             my $match = $3;
1135             $match =~ s/^\s*//; $match =~ s/\s*$//;
1136             $fields{$field} = { sf => [ split('', $sf) ] };
1137             if ($match) {
1138                 my ($msf,$mre) = split('~', $match);
1139                 if (length($msf) > 0 and length($mre) > 0) {
1140                     $msf =~ s/^\s*//; $msf =~ s/\s*$//;
1141                     $mre =~ s/^\s*//; $mre =~ s/\s*$//;
1142                     $fields{$field}{match} = { sf => $msf, re => qr/$mre/ };
1143                 }
1144             }
1145         }
1146     }
1147
1148     for my $f ( keys %fields) {
1149         for my $to_field ($r->field( $f )) {
1150             if (exists($fields{$f}{match})) {
1151                 next unless (grep { $_ =~ $fields{$f}{match}{re} } $to_field->subfield($fields{$f}{match}{sf}));
1152             }
1153
1154             if ( @{$fields{$f}{sf}} ) {
1155                 $to_field->delete_subfield(code => $fields{$f}{sf});
1156             } else {
1157                 $r->delete_field( $to_field );
1158             }
1159         }
1160     }
1161
1162     $xml = $r->as_xml_record;
1163     $xml =~ s/^<\?.+?\?>$//mo;
1164     $xml =~ s/\n//sgo;
1165     $xml =~ s/>\s+</></sgo;
1166
1167     return $xml;
1168
1169 $_$ LANGUAGE PLPERLU;
1170
1171 CREATE OR REPLACE FUNCTION vandelay.replace_field ( target_xml TEXT, source_xml TEXT, field TEXT ) RETURNS TEXT AS $_$
1172 DECLARE
1173     xml_output TEXT;
1174     parsed_target TEXT;
1175     curr_field TEXT;
1176 BEGIN
1177
1178     parsed_target := vandelay.strip_field( target_xml, ''); -- this dance normalizes the format of the xml for the IF below
1179     xml_output := parsed_target; -- if there are no replace rules, just return the input
1180
1181     FOR curr_field IN SELECT UNNEST( STRING_TO_ARRAY(field, ',') ) LOOP -- naive split, but it's the same we use in the perl
1182
1183         xml_output := vandelay.strip_field( parsed_target, curr_field);
1184
1185         IF xml_output <> parsed_target  AND curr_field ~ E'~' THEN
1186             -- we removed something, and there was a regexp restriction in the curr_field definition, so proceed
1187             xml_output := vandelay.add_field( xml_output, source_xml, curr_field, 1 );
1188         ELSIF curr_field !~ E'~' THEN
1189             -- No regexp restriction, add the curr_field
1190             xml_output := vandelay.add_field( xml_output, source_xml, curr_field, 0 );
1191         END IF;
1192
1193         parsed_target := xml_output; -- in prep for any following loop iterations
1194
1195     END LOOP;
1196
1197     RETURN xml_output;
1198 END;
1199 $_$ LANGUAGE PLPGSQL;
1200
1201 CREATE OR REPLACE FUNCTION vandelay.merge_record_xml ( target_xml TEXT, source_xml TEXT, add_rule TEXT, replace_preserve_rule TEXT, strip_rule TEXT ) RETURNS TEXT AS $_$
1202     SELECT vandelay.replace_field( vandelay.add_field( vandelay.strip_field( $1, $5) , $2, $3 ), $2, $4);
1203 $_$ LANGUAGE SQL;
1204
1205 CREATE TYPE vandelay.compile_profile AS (add_rule TEXT, replace_rule TEXT, preserve_rule TEXT, strip_rule TEXT);
1206 CREATE OR REPLACE FUNCTION vandelay.compile_profile ( incoming_xml TEXT ) RETURNS vandelay.compile_profile AS $_$
1207 DECLARE
1208     output              vandelay.compile_profile%ROWTYPE;
1209     profile             vandelay.merge_profile%ROWTYPE;
1210     profile_tmpl        TEXT;
1211     profile_tmpl_owner  TEXT;
1212     add_rule            TEXT := '';
1213     strip_rule          TEXT := '';
1214     replace_rule        TEXT := '';
1215     preserve_rule       TEXT := '';
1216
1217 BEGIN
1218
1219     profile_tmpl := (oils_xpath('//*[@tag="905"]/*[@code="t"]/text()',incoming_xml))[1];
1220     profile_tmpl_owner := (oils_xpath('//*[@tag="905"]/*[@code="o"]/text()',incoming_xml))[1];
1221
1222     IF profile_tmpl IS NOT NULL AND profile_tmpl <> '' AND profile_tmpl_owner IS NOT NULL AND profile_tmpl_owner <> '' THEN
1223         SELECT  p.* INTO profile
1224           FROM  vandelay.merge_profile p
1225                 JOIN actor.org_unit u ON (u.id = p.owner)
1226           WHERE p.name = profile_tmpl
1227                 AND u.shortname = profile_tmpl_owner;
1228
1229         IF profile.id IS NOT NULL THEN
1230             add_rule := COALESCE(profile.add_spec,'');
1231             strip_rule := COALESCE(profile.strip_spec,'');
1232             replace_rule := COALESCE(profile.replace_spec,'');
1233             preserve_rule := COALESCE(profile.preserve_spec,'');
1234         END IF;
1235     END IF;
1236
1237     add_rule := add_rule || ',' || COALESCE(ARRAY_TO_STRING(oils_xpath('//*[@tag="905"]/*[@code="a"]/text()',incoming_xml),','),'');
1238     strip_rule := strip_rule || ',' || COALESCE(ARRAY_TO_STRING(oils_xpath('//*[@tag="905"]/*[@code="d"]/text()',incoming_xml),','),'');
1239     replace_rule := replace_rule || ',' || COALESCE(ARRAY_TO_STRING(oils_xpath('//*[@tag="905"]/*[@code="r"]/text()',incoming_xml),','),'');
1240     preserve_rule := preserve_rule || ',' || COALESCE(ARRAY_TO_STRING(oils_xpath('//*[@tag="905"]/*[@code="p"]/text()',incoming_xml),','),'');
1241
1242     output.add_rule := BTRIM(add_rule,',');
1243     output.replace_rule := BTRIM(replace_rule,',');
1244     output.strip_rule := BTRIM(strip_rule,',');
1245     output.preserve_rule := BTRIM(preserve_rule,',');
1246
1247     RETURN output;
1248 END;
1249 $_$ LANGUAGE PLPGSQL;
1250
1251 CREATE OR REPLACE FUNCTION vandelay.template_overlay_bib_record ( v_marc TEXT, eg_id BIGINT, merge_profile_id INT ) RETURNS BOOL AS $$
1252 DECLARE
1253     merge_profile   vandelay.merge_profile%ROWTYPE;
1254     dyn_profile     vandelay.compile_profile%ROWTYPE;
1255     editor_string   TEXT;
1256     editor_id       INT;
1257     source_marc     TEXT;
1258     target_marc     TEXT;
1259     eg_marc         TEXT;
1260     replace_rule    TEXT;
1261     match_count     INT;
1262 BEGIN
1263
1264     SELECT  b.marc INTO eg_marc
1265       FROM  biblio.record_entry b
1266       WHERE b.id = eg_id
1267       LIMIT 1;
1268
1269     IF eg_marc IS NULL OR v_marc IS NULL THEN
1270         -- RAISE NOTICE 'no marc for template or bib record';
1271         RETURN FALSE;
1272     END IF;
1273
1274     dyn_profile := vandelay.compile_profile( v_marc );
1275
1276     IF merge_profile_id IS NOT NULL THEN
1277         SELECT * INTO merge_profile FROM vandelay.merge_profile WHERE id = merge_profile_id;
1278         IF FOUND THEN
1279             dyn_profile.add_rule := BTRIM( dyn_profile.add_rule || ',' || COALESCE(merge_profile.add_spec,''), ',');
1280             dyn_profile.strip_rule := BTRIM( dyn_profile.strip_rule || ',' || COALESCE(merge_profile.strip_spec,''), ',');
1281             dyn_profile.replace_rule := BTRIM( dyn_profile.replace_rule || ',' || COALESCE(merge_profile.replace_spec,''), ',');
1282             dyn_profile.preserve_rule := BTRIM( dyn_profile.preserve_rule || ',' || COALESCE(merge_profile.preserve_spec,''), ',');
1283         END IF;
1284     END IF;
1285
1286     IF dyn_profile.replace_rule <> '' AND dyn_profile.preserve_rule <> '' THEN
1287         -- RAISE NOTICE 'both replace [%] and preserve [%] specified', dyn_profile.replace_rule, dyn_profile.preserve_rule;
1288         RETURN FALSE;
1289     END IF;
1290
1291     IF dyn_profile.replace_rule = '' AND dyn_profile.preserve_rule = '' AND dyn_profile.add_rule = '' AND dyn_profile.strip_rule = '' THEN
1292         --Since we have nothing to do, just return a NOOP "we did it"
1293         RETURN TRUE;
1294     ELSIF dyn_profile.replace_rule <> '' THEN
1295         source_marc = v_marc;
1296         target_marc = eg_marc;
1297         replace_rule = dyn_profile.replace_rule;
1298     ELSE
1299         source_marc = eg_marc;
1300         target_marc = v_marc;
1301         replace_rule = dyn_profile.preserve_rule;
1302     END IF;
1303
1304     UPDATE  biblio.record_entry
1305       SET   marc = vandelay.merge_record_xml( target_marc, source_marc, dyn_profile.add_rule, replace_rule, dyn_profile.strip_rule )
1306       WHERE id = eg_id;
1307
1308     IF NOT FOUND THEN
1309         -- RAISE NOTICE 'update of biblio.record_entry failed';
1310         RETURN FALSE;
1311     END IF;
1312
1313     RETURN TRUE;
1314
1315 END;
1316 $$ LANGUAGE PLPGSQL;
1317
1318 CREATE OR REPLACE FUNCTION vandelay.merge_record_xml ( target_marc TEXT, template_marc TEXT ) RETURNS TEXT AS $$
1319 DECLARE
1320     dyn_profile     vandelay.compile_profile%ROWTYPE;
1321     replace_rule    TEXT;
1322     tmp_marc        TEXT;
1323     trgt_marc        TEXT;
1324     tmpl_marc        TEXT;
1325     match_count     INT;
1326 BEGIN
1327
1328     IF target_marc IS NULL OR template_marc IS NULL THEN
1329         -- RAISE NOTICE 'no marc for target or template record';
1330         RETURN NULL;
1331     END IF;
1332
1333     dyn_profile := vandelay.compile_profile( template_marc );
1334
1335     IF dyn_profile.replace_rule <> '' AND dyn_profile.preserve_rule <> '' THEN
1336         -- RAISE NOTICE 'both replace [%] and preserve [%] specified', dyn_profile.replace_rule, dyn_profile.preserve_rule;
1337         RETURN NULL;
1338     END IF;
1339
1340     IF dyn_profile.replace_rule = '' AND dyn_profile.preserve_rule = '' AND dyn_profile.add_rule = '' AND dyn_profile.strip_rule = '' THEN
1341         --Since we have nothing to do, just return what we were given.
1342         RETURN target_marc;
1343     ELSIF dyn_profile.replace_rule <> '' THEN
1344         trgt_marc = target_marc;
1345         tmpl_marc = template_marc;
1346         replace_rule = dyn_profile.replace_rule;
1347     ELSE
1348         tmp_marc = target_marc;
1349         trgt_marc = template_marc;
1350         tmpl_marc = tmp_marc;
1351         replace_rule = dyn_profile.preserve_rule;
1352     END IF;
1353
1354     RETURN vandelay.merge_record_xml( trgt_marc, tmpl_marc, dyn_profile.add_rule, replace_rule, dyn_profile.strip_rule );
1355
1356 END;
1357 $$ LANGUAGE PLPGSQL;
1358
1359 CREATE OR REPLACE FUNCTION vandelay.template_overlay_bib_record ( v_marc TEXT, eg_id BIGINT) RETURNS BOOL AS $$
1360     SELECT vandelay.template_overlay_bib_record( $1, $2, NULL);
1361 $$ LANGUAGE SQL;
1362
1363 CREATE OR REPLACE FUNCTION vandelay.overlay_bib_record ( import_id BIGINT, eg_id BIGINT, merge_profile_id INT ) RETURNS BOOL AS $$
1364 DECLARE
1365     merge_profile   vandelay.merge_profile%ROWTYPE;
1366     dyn_profile     vandelay.compile_profile%ROWTYPE;
1367     editor_string   TEXT;
1368     editor_id       INT;
1369     source_marc     TEXT;
1370     target_marc     TEXT;
1371     eg_marc         TEXT;
1372     v_marc          TEXT;
1373     replace_rule    TEXT;
1374 BEGIN
1375
1376     SELECT  q.marc INTO v_marc
1377       FROM  vandelay.queued_record q
1378             JOIN vandelay.bib_match m ON (m.queued_record = q.id AND q.id = import_id)
1379       LIMIT 1;
1380
1381     IF v_marc IS NULL THEN
1382         -- RAISE NOTICE 'no marc for vandelay or bib record';
1383         RETURN FALSE;
1384     END IF;
1385
1386     IF vandelay.template_overlay_bib_record( v_marc, eg_id, merge_profile_id) THEN
1387         UPDATE  vandelay.queued_bib_record
1388           SET   imported_as = eg_id,
1389                 import_time = NOW()
1390           WHERE id = import_id;
1391
1392         editor_string := (oils_xpath('//*[@tag="905"]/*[@code="u"]/text()',v_marc))[1];
1393
1394         IF editor_string IS NOT NULL AND editor_string <> '' THEN
1395             SELECT usr INTO editor_id FROM actor.card WHERE barcode = editor_string;
1396
1397             IF editor_id IS NULL THEN
1398                 SELECT id INTO editor_id FROM actor.usr WHERE usrname = editor_string;
1399             END IF;
1400
1401             IF editor_id IS NOT NULL THEN
1402                 UPDATE biblio.record_entry SET editor = editor_id WHERE id = eg_id;
1403             END IF;
1404         END IF;
1405
1406         RETURN TRUE;
1407     END IF;
1408
1409     -- RAISE NOTICE 'update of biblio.record_entry failed';
1410
1411     RETURN FALSE;
1412
1413 END;
1414 $$ LANGUAGE PLPGSQL;
1415
1416 CREATE OR REPLACE FUNCTION vandelay.auto_overlay_bib_record_with_best ( import_id BIGINT, merge_profile_id INT, lwm_ratio_value_p NUMERIC ) RETURNS BOOL AS $$
1417 DECLARE
1418     eg_id           BIGINT;
1419     lwm_ratio_value NUMERIC;
1420 BEGIN
1421
1422     lwm_ratio_value := COALESCE(lwm_ratio_value_p, 0.0);
1423
1424     PERFORM * FROM vandelay.queued_bib_record WHERE import_time IS NOT NULL AND id = import_id;
1425
1426     IF FOUND THEN
1427         -- RAISE NOTICE 'already imported, cannot auto-overlay'
1428         RETURN FALSE;
1429     END IF;
1430
1431     SELECT  m.eg_record INTO eg_id
1432       FROM  vandelay.bib_match m
1433             JOIN vandelay.queued_bib_record qr ON (m.queued_record = qr.id)
1434             JOIN vandelay.bib_queue q ON (qr.queue = q.id)
1435             JOIN biblio.record_entry r ON (r.id = m.eg_record)
1436       WHERE m.queued_record = import_id
1437             AND qr.quality::NUMERIC / COALESCE(NULLIF(m.quality,0),1)::NUMERIC >= lwm_ratio_value
1438       ORDER BY  m.match_score DESC, -- required match score
1439                 qr.quality::NUMERIC / COALESCE(NULLIF(m.quality,0),1)::NUMERIC DESC, -- quality tie breaker
1440                 m.id -- when in doubt, use the first match
1441       LIMIT 1;
1442
1443     IF eg_id IS NULL THEN
1444         -- RAISE NOTICE 'incoming record is not of high enough quality';
1445         RETURN FALSE;
1446     END IF;
1447
1448     RETURN vandelay.overlay_bib_record( import_id, eg_id, merge_profile_id );
1449 END;
1450 $$ LANGUAGE PLPGSQL;
1451
1452 CREATE OR REPLACE FUNCTION vandelay.auto_overlay_bib_record_with_best ( import_id BIGINT, merge_profile_id INT ) RETURNS BOOL AS $$
1453     SELECT vandelay.auto_overlay_bib_record_with_best( $1, $2, p.lwm_ratio ) FROM vandelay.merge_profile p WHERE id = $2;
1454 $$ LANGUAGE SQL;
1455
1456 CREATE OR REPLACE FUNCTION vandelay.auto_overlay_bib_record ( import_id BIGINT, merge_profile_id INT ) RETURNS BOOL AS $$
1457 DECLARE
1458     eg_id           BIGINT;
1459     match_count     INT;
1460 BEGIN
1461
1462     PERFORM * FROM vandelay.queued_bib_record WHERE import_time IS NOT NULL AND id = import_id;
1463
1464     IF FOUND THEN
1465         -- RAISE NOTICE 'already imported, cannot auto-overlay'
1466         RETURN FALSE;
1467     END IF;
1468
1469     SELECT COUNT(*) INTO match_count FROM vandelay.bib_match WHERE queued_record = import_id;
1470
1471     IF match_count <> 1 THEN
1472         -- RAISE NOTICE 'not an exact match';
1473         RETURN FALSE;
1474     END IF;
1475
1476     -- Check that the one match is on the first 901c
1477     SELECT  m.eg_record INTO eg_id
1478       FROM  vandelay.queued_bib_record q
1479             JOIN vandelay.bib_match m ON (m.queued_record = q.id)
1480       WHERE q.id = import_id
1481             AND m.eg_record = oils_xpath_string('//*[@tag="901"]/*[@code="c"][1]',marc)::BIGINT;
1482
1483     IF NOT FOUND THEN
1484         -- RAISE NOTICE 'not a 901c match';
1485         RETURN FALSE;
1486     END IF;
1487
1488     RETURN vandelay.overlay_bib_record( import_id, eg_id, merge_profile_id );
1489 END;
1490 $$ LANGUAGE PLPGSQL;
1491
1492 CREATE OR REPLACE FUNCTION vandelay.auto_overlay_bib_queue ( queue_id BIGINT, merge_profile_id INT ) RETURNS SETOF BIGINT AS $$
1493 DECLARE
1494     queued_record   vandelay.queued_bib_record%ROWTYPE;
1495 BEGIN
1496
1497     FOR queued_record IN SELECT * FROM vandelay.queued_bib_record WHERE queue = queue_id AND import_time IS NULL LOOP
1498
1499         IF vandelay.auto_overlay_bib_record( queued_record.id, merge_profile_id ) THEN
1500             RETURN NEXT queued_record.id;
1501         END IF;
1502
1503     END LOOP;
1504
1505     RETURN;
1506     
1507 END;
1508 $$ LANGUAGE PLPGSQL;
1509
1510 CREATE OR REPLACE FUNCTION vandelay.auto_overlay_bib_queue_with_best ( queue_id BIGINT, merge_profile_id INT, lwm_ratio_value NUMERIC ) RETURNS SETOF BIGINT AS $$
1511 DECLARE
1512     queued_record   vandelay.queued_bib_record%ROWTYPE;
1513 BEGIN
1514
1515     FOR queued_record IN SELECT * FROM vandelay.queued_bib_record WHERE queue = queue_id AND import_time IS NULL LOOP
1516
1517         IF vandelay.auto_overlay_bib_record_with_best( queued_record.id, merge_profile_id, lwm_ratio_value ) THEN
1518             RETURN NEXT queued_record.id;
1519         END IF;
1520
1521     END LOOP;
1522
1523     RETURN;
1524     
1525 END;
1526 $$ LANGUAGE PLPGSQL;
1527
1528 CREATE OR REPLACE FUNCTION vandelay.auto_overlay_bib_queue_with_best ( import_id BIGINT, merge_profile_id INT ) RETURNS SETOF BIGINT AS $$
1529     SELECT vandelay.auto_overlay_bib_queue_with_best( $1, $2, p.lwm_ratio ) FROM vandelay.merge_profile p WHERE id = $2;
1530 $$ LANGUAGE SQL;
1531
1532 CREATE OR REPLACE FUNCTION vandelay.auto_overlay_bib_queue ( queue_id BIGINT ) RETURNS SETOF BIGINT AS $$
1533     SELECT * FROM vandelay.auto_overlay_bib_queue( $1, NULL );
1534 $$ LANGUAGE SQL;
1535
1536 CREATE OR REPLACE FUNCTION vandelay.ingest_bib_marc ( ) RETURNS TRIGGER AS $$
1537 DECLARE
1538     value   TEXT;
1539     atype   TEXT;
1540     adef    RECORD;
1541 BEGIN
1542     IF TG_OP IN ('INSERT','UPDATE') AND NEW.imported_as IS NOT NULL THEN
1543         RETURN NEW;
1544     END IF;
1545
1546     FOR adef IN SELECT * FROM vandelay.bib_attr_definition LOOP
1547
1548         SELECT extract_marc_field('vandelay.queued_bib_record', id, adef.xpath, adef.remove) INTO value FROM vandelay.queued_bib_record WHERE id = NEW.id;
1549         IF (value IS NOT NULL AND value <> '') THEN
1550             INSERT INTO vandelay.queued_bib_record_attr (record, field, attr_value) VALUES (NEW.id, adef.id, value);
1551         END IF;
1552
1553     END LOOP;
1554
1555     RETURN NULL;
1556 END;
1557 $$ LANGUAGE PLPGSQL;
1558
1559 CREATE OR REPLACE FUNCTION vandelay.cleanup_bib_marc ( ) RETURNS TRIGGER AS $$
1560 BEGIN
1561     IF TG_OP IN ('INSERT','UPDATE') AND NEW.imported_as IS NOT NULL THEN
1562         RETURN NEW;
1563     END IF;
1564
1565     DELETE FROM vandelay.queued_bib_record_attr WHERE record = OLD.id;
1566     DELETE FROM vandelay.import_item WHERE record = OLD.id;
1567
1568     IF TG_OP = 'UPDATE' THEN
1569         RETURN NEW;
1570     END IF;
1571     RETURN OLD;
1572 END;
1573 $$ LANGUAGE PLPGSQL;
1574
1575 CREATE TRIGGER cleanup_bib_trigger
1576     BEFORE UPDATE OR DELETE ON vandelay.queued_bib_record
1577     FOR EACH ROW EXECUTE PROCEDURE vandelay.cleanup_bib_marc();
1578
1579 CREATE TRIGGER ingest_bib_trigger
1580     AFTER INSERT OR UPDATE ON vandelay.queued_bib_record
1581     FOR EACH ROW EXECUTE PROCEDURE vandelay.ingest_bib_marc();
1582
1583 CREATE TRIGGER zz_match_bibs_trigger
1584     BEFORE INSERT OR UPDATE ON vandelay.queued_bib_record
1585     FOR EACH ROW EXECUTE PROCEDURE vandelay.match_bib_record();
1586
1587
1588 /* Authority stuff down here */
1589 ---------------------------------------
1590 CREATE TABLE vandelay.authority_attr_definition (
1591         id                      SERIAL  PRIMARY KEY,
1592         code            TEXT    UNIQUE NOT NULL,
1593         description     TEXT,
1594         xpath           TEXT    NOT NULL,
1595         remove          TEXT    NOT NULL DEFAULT ''
1596 );
1597
1598 CREATE TYPE vandelay.authority_queue_queue_type AS ENUM ('authority');
1599 CREATE TABLE vandelay.authority_queue (
1600         queue_type      vandelay.authority_queue_queue_type NOT NULL DEFAULT 'authority',
1601         CONSTRAINT vand_authority_queue_name_once_per_owner_const UNIQUE (owner,name,queue_type)
1602 ) INHERITS (vandelay.queue);
1603 ALTER TABLE vandelay.authority_queue ADD PRIMARY KEY (id);
1604
1605 CREATE TABLE vandelay.queued_authority_record (
1606         queue           INT     NOT NULL REFERENCES vandelay.authority_queue (id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
1607         imported_as     INT     REFERENCES authority.record_entry (id) DEFERRABLE INITIALLY DEFERRED,
1608         import_error    TEXT    REFERENCES vandelay.import_error (code) ON DELETE SET NULL ON UPDATE CASCADE DEFERRABLE INITIALLY DEFERRED,
1609         error_detail    TEXT
1610 ) INHERITS (vandelay.queued_record);
1611 ALTER TABLE vandelay.queued_authority_record ADD PRIMARY KEY (id);
1612 CREATE INDEX queued_authority_record_queue_idx ON vandelay.queued_authority_record (queue);
1613
1614 CREATE TABLE vandelay.queued_authority_record_attr (
1615         id                      BIGSERIAL       PRIMARY KEY,
1616         record          BIGINT          NOT NULL REFERENCES vandelay.queued_authority_record (id) DEFERRABLE INITIALLY DEFERRED,
1617         field           INT                     NOT NULL REFERENCES vandelay.authority_attr_definition (id) DEFERRABLE INITIALLY DEFERRED,
1618         attr_value      TEXT            NOT NULL
1619 );
1620 CREATE INDEX queued_authority_record_attr_record_idx ON vandelay.queued_authority_record_attr (record);
1621
1622 CREATE TABLE vandelay.authority_match (
1623         id                              BIGSERIAL       PRIMARY KEY,
1624         queued_record   BIGINT          REFERENCES vandelay.queued_authority_record (id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
1625         eg_record               BIGINT          REFERENCES authority.record_entry (id) DEFERRABLE INITIALLY DEFERRED,
1626     quality         INT         NOT NULL DEFAULT 0
1627 );
1628
1629 CREATE OR REPLACE FUNCTION vandelay.ingest_authority_marc ( ) RETURNS TRIGGER AS $$
1630 DECLARE
1631     value   TEXT;
1632     atype   TEXT;
1633     adef    RECORD;
1634 BEGIN
1635     IF TG_OP IN ('INSERT','UPDATE') AND NEW.imported_as IS NOT NULL THEN
1636         RETURN NEW;
1637     END IF;
1638
1639     FOR adef IN SELECT * FROM vandelay.authority_attr_definition LOOP
1640
1641         SELECT extract_marc_field('vandelay.queued_authority_record', id, adef.xpath, adef.remove) INTO value FROM vandelay.queued_authority_record WHERE id = NEW.id;
1642         IF (value IS NOT NULL AND value <> '') THEN
1643             INSERT INTO vandelay.queued_authority_record_attr (record, field, attr_value) VALUES (NEW.id, adef.id, value);
1644         END IF;
1645
1646     END LOOP;
1647
1648     RETURN NULL;
1649 END;
1650 $$ LANGUAGE PLPGSQL;
1651
1652 CREATE OR REPLACE FUNCTION vandelay.cleanup_authority_marc ( ) RETURNS TRIGGER AS $$
1653 BEGIN
1654     IF TG_OP IN ('INSERT','UPDATE') AND NEW.imported_as IS NOT NULL THEN
1655         RETURN NEW;
1656     END IF;
1657
1658     DELETE FROM vandelay.queued_authority_record_attr WHERE record = OLD.id;
1659     IF TG_OP = 'UPDATE' THEN
1660         RETURN NEW;
1661     END IF;
1662     RETURN OLD;
1663 END;
1664 $$ LANGUAGE PLPGSQL;
1665
1666 CREATE TRIGGER cleanup_authority_trigger
1667     BEFORE UPDATE OR DELETE ON vandelay.queued_authority_record
1668     FOR EACH ROW EXECUTE PROCEDURE vandelay.cleanup_authority_marc();
1669
1670 CREATE TRIGGER ingest_authority_trigger
1671     AFTER INSERT OR UPDATE ON vandelay.queued_authority_record
1672     FOR EACH ROW EXECUTE PROCEDURE vandelay.ingest_authority_marc();
1673
1674 CREATE OR REPLACE FUNCTION vandelay.overlay_authority_record ( import_id BIGINT, eg_id BIGINT, merge_profile_id INT ) RETURNS BOOL AS $$
1675 DECLARE
1676     merge_profile   vandelay.merge_profile%ROWTYPE;
1677     dyn_profile     vandelay.compile_profile%ROWTYPE;
1678     source_marc     TEXT;
1679     target_marc     TEXT;
1680     eg_marc         TEXT;
1681     v_marc          TEXT;
1682     replace_rule    TEXT;
1683     match_count     INT;
1684 BEGIN
1685
1686     SELECT  b.marc INTO eg_marc
1687       FROM  authority.record_entry b
1688             JOIN vandelay.authority_match m ON (m.eg_record = b.id AND m.queued_record = import_id)
1689       LIMIT 1;
1690
1691     SELECT  q.marc INTO v_marc
1692       FROM  vandelay.queued_record q
1693             JOIN vandelay.authority_match m ON (m.queued_record = q.id AND q.id = import_id)
1694       LIMIT 1;
1695
1696     IF eg_marc IS NULL OR v_marc IS NULL THEN
1697         -- RAISE NOTICE 'no marc for vandelay or authority record';
1698         RETURN FALSE;
1699     END IF;
1700
1701     dyn_profile := vandelay.compile_profile( v_marc );
1702
1703     IF merge_profile_id IS NOT NULL THEN
1704         SELECT * INTO merge_profile FROM vandelay.merge_profile WHERE id = merge_profile_id;
1705         IF FOUND THEN
1706             dyn_profile.add_rule := BTRIM( dyn_profile.add_rule || ',' || COALESCE(merge_profile.add_spec,''), ',');
1707             dyn_profile.strip_rule := BTRIM( dyn_profile.strip_rule || ',' || COALESCE(merge_profile.strip_spec,''), ',');
1708             dyn_profile.replace_rule := BTRIM( dyn_profile.replace_rule || ',' || COALESCE(merge_profile.replace_spec,''), ',');
1709             dyn_profile.preserve_rule := BTRIM( dyn_profile.preserve_rule || ',' || COALESCE(merge_profile.preserve_spec,''), ',');
1710         END IF;
1711     END IF;
1712
1713     IF dyn_profile.replace_rule <> '' AND dyn_profile.preserve_rule <> '' THEN
1714         -- RAISE NOTICE 'both replace [%] and preserve [%] specified', dyn_profile.replace_rule, dyn_profile.preserve_rule;
1715         RETURN FALSE;
1716     END IF;
1717
1718     IF dyn_profile.replace_rule = '' AND dyn_profile.preserve_rule = '' AND dyn_profile.add_rule = '' AND dyn_profile.strip_rule = '' THEN
1719         --Since we have nothing to do, just return a NOOP "we did it"
1720         RETURN TRUE;
1721     ELSIF dyn_profile.replace_rule <> '' THEN
1722         source_marc = v_marc;
1723         target_marc = eg_marc;
1724         replace_rule = dyn_profile.replace_rule;
1725     ELSE
1726         source_marc = eg_marc;
1727         target_marc = v_marc;
1728         replace_rule = dyn_profile.preserve_rule;
1729     END IF;
1730
1731     UPDATE  authority.record_entry
1732       SET   marc = vandelay.merge_record_xml( target_marc, source_marc, dyn_profile.add_rule, replace_rule, dyn_profile.strip_rule )
1733       WHERE id = eg_id;
1734
1735     IF FOUND THEN
1736         UPDATE  vandelay.queued_authority_record
1737           SET   imported_as = eg_id,
1738                 import_time = NOW()
1739           WHERE id = import_id;
1740         RETURN TRUE;
1741     END IF;
1742
1743     -- RAISE NOTICE 'update of authority.record_entry failed';
1744
1745     RETURN FALSE;
1746
1747 END;
1748 $$ LANGUAGE PLPGSQL;
1749
1750 CREATE OR REPLACE FUNCTION vandelay.auto_overlay_authority_record ( import_id BIGINT, merge_profile_id INT ) RETURNS BOOL AS $$
1751 DECLARE
1752     eg_id           BIGINT;
1753     match_count     INT;
1754 BEGIN
1755     SELECT COUNT(*) INTO match_count FROM vandelay.authority_match WHERE queued_record = import_id;
1756
1757     IF match_count <> 1 THEN
1758         -- RAISE NOTICE 'not an exact match';
1759         RETURN FALSE;
1760     END IF;
1761
1762     SELECT  m.eg_record INTO eg_id
1763       FROM  vandelay.authority_match m
1764       WHERE m.queued_record = import_id
1765       LIMIT 1;
1766
1767     IF eg_id IS NULL THEN
1768         RETURN FALSE;
1769     END IF;
1770
1771     RETURN vandelay.overlay_authority_record( import_id, eg_id, merge_profile_id );
1772 END;
1773 $$ LANGUAGE PLPGSQL;
1774
1775 CREATE OR REPLACE FUNCTION vandelay.auto_overlay_authority_queue ( queue_id BIGINT, merge_profile_id INT ) RETURNS SETOF BIGINT AS $$
1776 DECLARE
1777     queued_record   vandelay.queued_authority_record%ROWTYPE;
1778 BEGIN
1779
1780     FOR queued_record IN SELECT * FROM vandelay.queued_authority_record WHERE queue = queue_id AND import_time IS NULL LOOP
1781
1782         IF vandelay.auto_overlay_authority_record( queued_record.id, merge_profile_id ) THEN
1783             RETURN NEXT queued_record.id;
1784         END IF;
1785
1786     END LOOP;
1787
1788     RETURN;
1789     
1790 END;
1791 $$ LANGUAGE PLPGSQL;
1792
1793 CREATE OR REPLACE FUNCTION vandelay.auto_overlay_authority_queue ( queue_id BIGINT ) RETURNS SETOF BIGINT AS $$
1794     SELECT * FROM vandelay.auto_overlay_authority_queue( $1, NULL );
1795 $$ LANGUAGE SQL;
1796
1797
1798 -- Vandelay (for importing and exporting records) 012.schema.vandelay.sql 
1799 --INSERT INTO vandelay.bib_attr_definition ( id, code, description, xpath ) VALUES (1, 'title', oils_i18n_gettext(1, 'vqbrad', 'Title of work', 'description'),'//*[@tag="245"]/*[contains("abcmnopr",@code)]');
1800 --INSERT INTO vandelay.bib_attr_definition ( id, code, description, xpath ) VALUES (2, 'author', oils_i18n_gettext(1, 'vqbrad', 'Author of work', 'description'),'//*[@tag="100" or @tag="110" or @tag="113"]/*[contains("ad",@code)]');
1801 --INSERT INTO vandelay.bib_attr_definition ( id, code, description, xpath ) VALUES (3, 'language', oils_i18n_gettext(3, 'vqbrad', 'Language of work', 'description'),'//*[@tag="240"]/*[@code="l"][1]');
1802 --INSERT INTO vandelay.bib_attr_definition ( id, code, description, xpath ) VALUES (4, 'pagination', oils_i18n_gettext(4, 'vqbrad', 'Pagination', 'description'),'//*[@tag="300"]/*[@code="a"][1]');
1803 --INSERT INTO vandelay.bib_attr_definition ( id, code, description, xpath, ident, remove ) VALUES (5, 'isbn',oils_i18n_gettext(5, 'vqbrad', 'ISBN', 'description'),'//*[@tag="020"]/*[@code="a"]', TRUE, $r$(?:-|\s.+$)$r$);
1804 --INSERT INTO vandelay.bib_attr_definition ( id, code, description, xpath, ident, remove ) VALUES (6, 'issn',oils_i18n_gettext(6, 'vqbrad', 'ISSN', 'description'),'//*[@tag="022"]/*[@code="a"]', TRUE, $r$(?:-|\s.+$)$r$);
1805 --INSERT INTO vandelay.bib_attr_definition ( id, code, description, xpath ) VALUES (7, 'price',oils_i18n_gettext(7, 'vqbrad', 'Price', 'description'),'//*[@tag="020" or @tag="022"]/*[@code="c"][1]');
1806 --INSERT INTO vandelay.bib_attr_definition ( id, code, description, xpath, ident ) VALUES (8, 'rec_identifier',oils_i18n_gettext(8, 'vqbrad', 'Accession Number', 'description'),'//*[@tag="001"]', TRUE);
1807 --INSERT INTO vandelay.bib_attr_definition ( id, code, description, xpath, ident ) VALUES (9, 'eg_tcn',oils_i18n_gettext(9, 'vqbrad', 'TCN Value', 'description'),'//*[@tag="901"]/*[@code="a"]', TRUE);
1808 --INSERT INTO vandelay.bib_attr_definition ( id, code, description, xpath, ident ) VALUES (10, 'eg_tcn_source',oils_i18n_gettext(10, 'vqbrad', 'TCN Source', 'description'),'//*[@tag="901"]/*[@code="b"]', TRUE);
1809 --INSERT INTO vandelay.bib_attr_definition ( id, code, description, xpath, ident ) VALUES (11, 'eg_identifier',oils_i18n_gettext(11, 'vqbrad', 'Internal ID', 'description'),'//*[@tag="901"]/*[@code="c"]', TRUE);
1810 --INSERT INTO vandelay.bib_attr_definition ( id, code, description, xpath ) VALUES (12, 'publisher',oils_i18n_gettext(12, 'vqbrad', 'Publisher', 'description'),'//*[@tag="260"]/*[@code="b"][1]');
1811 --INSERT INTO vandelay.bib_attr_definition ( id, code, description, xpath, remove ) VALUES (13, 'pubdate',oils_i18n_gettext(13, 'vqbrad', 'Publication Date', 'description'),'//*[@tag="260"]/*[@code="c"][1]',$r$\D$r$);
1812 --INSERT INTO vandelay.bib_attr_definition ( id, code, description, xpath ) VALUES (14, 'edition',oils_i18n_gettext(14, 'vqbrad', 'Edition', 'description'),'//*[@tag="250"]/*[@code="a"][1]');
1813 --
1814 --INSERT INTO vandelay.import_item_attr_definition (
1815 --    owner, name, tag, owning_lib, circ_lib, location,
1816 --    call_number, circ_modifier, barcode, price, copy_number,
1817 --    circulate, ref, holdable, opac_visible, status
1818 --) VALUES (
1819 --    1,
1820 --    'Evergreen 852 export format',
1821 --    '852',
1822 --    '[@code = "b"][1]',
1823 --    '[@code = "b"][2]',
1824 --    'c',
1825 --    'j',
1826 --    'g',
1827 --    'p',
1828 --    'y',
1829 --    't',
1830 --    '[@code = "x" and text() = "circulating"]',
1831 --    '[@code = "x" and text() = "reference"]',
1832 --    '[@code = "x" and text() = "holdable"]',
1833 --    '[@code = "x" and text() = "visible"]',
1834 --    'z'
1835 --);
1836 --
1837 --INSERT INTO vandelay.import_item_attr_definition (
1838 --    owner,
1839 --    name,
1840 --    tag,
1841 --    owning_lib,
1842 --    location,
1843 --    call_number,
1844 --    circ_modifier,
1845 --    barcode,
1846 --    price,
1847 --    status
1848 --) VALUES (
1849 --    1,
1850 --    'Unicorn Import format -- 999',
1851 --    '999',
1852 --    'm',
1853 --    'l',
1854 --    'a',
1855 --    't',
1856 --    'i',
1857 --    'p',
1858 --    'k'
1859 --);
1860 --
1861 --INSERT INTO vandelay.authority_attr_definition ( code, description, xpath, ident ) VALUES ('rec_identifier','Identifier','//*[@tag="001"]', TRUE);
1862
1863 COMMIT;
1864