]> git.evergreen-ils.org Git - Evergreen.git/blob - Open-ILS/src/sql/Pg/upgrade/0985.schema.speed_record_attr_ingest.sql
LP1893463: Follow-up to address de-duplication and adding release notes.
[Evergreen.git] / Open-ILS / src / sql / Pg / upgrade / 0985.schema.speed_record_attr_ingest.sql
1 BEGIN;
2
3 SELECT evergreen.upgrade_deps_block_check('0985', :eg_version);
4
5 CREATE OR REPLACE FUNCTION metabib.reingest_record_attributes (rid BIGINT, pattr_list TEXT[] DEFAULT NULL, prmarc TEXT DEFAULT NULL, rdeleted BOOL DEFAULT TRUE) RETURNS VOID AS $func$
6 DECLARE
7     transformed_xml TEXT;
8     rmarc           TEXT := prmarc;
9     tmp_val         TEXT;
10     prev_xfrm       TEXT;
11     normalizer      RECORD;
12     xfrm            config.xml_transform%ROWTYPE;
13     attr_vector     INT[] := '{}'::INT[];
14     attr_vector_tmp INT[];
15     attr_list       TEXT[] := pattr_list;
16     attr_value      TEXT[];
17     norm_attr_value TEXT[];
18     tmp_xml         TEXT;
19     attr_def        config.record_attr_definition%ROWTYPE;
20     ccvm_row        config.coded_value_map%ROWTYPE;
21 BEGIN
22
23     IF attr_list IS NULL OR rdeleted THEN -- need to do the full dance on INSERT or undelete
24         SELECT ARRAY_AGG(name) INTO attr_list FROM config.record_attr_definition
25         WHERE (
26             tag IS NOT NULL OR
27             fixed_field IS NOT NULL OR
28             xpath IS NOT NULL OR
29             phys_char_sf IS NOT NULL OR
30             composite
31         ) AND (
32             filter OR sorter
33         );
34     END IF;
35
36     IF rmarc IS NULL THEN
37         SELECT marc INTO rmarc FROM biblio.record_entry WHERE id = rid;
38     END IF;
39
40     FOR attr_def IN SELECT * FROM config.record_attr_definition WHERE NOT composite AND name = ANY( attr_list ) ORDER BY format LOOP
41
42         attr_value := '{}'::TEXT[];
43         norm_attr_value := '{}'::TEXT[];
44         attr_vector_tmp := '{}'::INT[];
45
46         SELECT * INTO ccvm_row FROM config.coded_value_map c WHERE c.ctype = attr_def.name LIMIT 1; 
47
48         -- tag+sf attrs only support SVF
49         IF attr_def.tag IS NOT NULL THEN -- tag (and optional subfield list) selection
50             SELECT  ARRAY[ARRAY_TO_STRING(ARRAY_AGG(value), COALESCE(attr_def.joiner,' '))] INTO attr_value
51               FROM  (SELECT * FROM metabib.full_rec ORDER BY tag, subfield) AS x
52               WHERE record = rid
53                     AND tag LIKE attr_def.tag
54                     AND CASE
55                         WHEN attr_def.sf_list IS NOT NULL 
56                             THEN POSITION(subfield IN attr_def.sf_list) > 0
57                         ELSE TRUE
58                     END
59               GROUP BY tag
60               ORDER BY tag
61               LIMIT 1;
62
63         ELSIF attr_def.fixed_field IS NOT NULL THEN -- a named fixed field, see config.marc21_ff_pos_map.fixed_field
64             attr_value := vandelay.marc21_extract_fixed_field_list(rmarc, attr_def.fixed_field);
65
66             IF NOT attr_def.multi THEN
67                 attr_value := ARRAY[attr_value[1]];
68             END IF;
69
70         ELSIF attr_def.xpath IS NOT NULL THEN -- and xpath expression
71
72             SELECT INTO xfrm * FROM config.xml_transform WHERE name = attr_def.format;
73         
74             -- See if we can skip the XSLT ... it's expensive
75             IF prev_xfrm IS NULL OR prev_xfrm <> xfrm.name THEN
76                 -- Can't skip the transform
77                 IF xfrm.xslt <> '---' THEN
78                     transformed_xml := oils_xslt_process(rmarc,xfrm.xslt);
79                 ELSE
80                     transformed_xml := rmarc;
81                 END IF;
82     
83                 prev_xfrm := xfrm.name;
84             END IF;
85
86             IF xfrm.name IS NULL THEN
87                 -- just grab the marcxml (empty) transform
88                 SELECT INTO xfrm * FROM config.xml_transform WHERE xslt = '---' LIMIT 1;
89                 prev_xfrm := xfrm.name;
90             END IF;
91
92             FOR tmp_xml IN SELECT UNNEST(oils_xpath(attr_def.xpath, transformed_xml, ARRAY[ARRAY[xfrm.prefix, xfrm.namespace_uri]])) LOOP
93                 tmp_val := oils_xpath_string(
94                                 '//*',
95                                 tmp_xml,
96                                 COALESCE(attr_def.joiner,' '),
97                                 ARRAY[ARRAY[xfrm.prefix, xfrm.namespace_uri]]
98                             );
99                 IF tmp_val IS NOT NULL AND BTRIM(tmp_val) <> '' THEN
100                     attr_value := attr_value || tmp_val;
101                     EXIT WHEN NOT attr_def.multi;
102                 END IF;
103             END LOOP;
104
105         ELSIF attr_def.phys_char_sf IS NOT NULL THEN -- a named Physical Characteristic, see config.marc21_physical_characteristic_*_map
106             SELECT  ARRAY_AGG(m.value) INTO attr_value
107               FROM  vandelay.marc21_physical_characteristics(rmarc) v
108                     LEFT JOIN config.marc21_physical_characteristic_value_map m ON (m.id = v.value)
109               WHERE v.subfield = attr_def.phys_char_sf AND (m.value IS NOT NULL AND BTRIM(m.value) <> '')
110                     AND ( ccvm_row.id IS NULL OR ( ccvm_row.id IS NOT NULL AND v.id IS NOT NULL) );
111
112             IF NOT attr_def.multi THEN
113                 attr_value := ARRAY[attr_value[1]];
114             END IF;
115
116         END IF;
117
118                 -- apply index normalizers to attr_value
119         FOR tmp_val IN SELECT value FROM UNNEST(attr_value) x(value) LOOP
120             FOR normalizer IN
121                 SELECT  n.func AS func,
122                         n.param_count AS param_count,
123                         m.params AS params
124                   FROM  config.index_normalizer n
125                         JOIN config.record_attr_index_norm_map m ON (m.norm = n.id)
126                   WHERE attr = attr_def.name
127                   ORDER BY m.pos LOOP
128                     EXECUTE 'SELECT ' || normalizer.func || '(' ||
129                     COALESCE( quote_literal( tmp_val ), 'NULL' ) ||
130                         CASE
131                             WHEN normalizer.param_count > 0
132                                 THEN ',' || REPLACE(REPLACE(BTRIM(normalizer.params,'[]'),E'\'',E'\\\''),E'"',E'\'')
133                                 ELSE ''
134                             END ||
135                     ')' INTO tmp_val;
136
137             END LOOP;
138             IF tmp_val IS NOT NULL AND tmp_val <> '' THEN
139                 -- note that a string that contains only blanks
140                 -- is a valid value for some attributes
141                 norm_attr_value := norm_attr_value || tmp_val;
142             END IF;
143         END LOOP;
144         
145         IF attr_def.filter THEN
146             -- Create unknown uncontrolled values and find the IDs of the values
147             IF ccvm_row.id IS NULL THEN
148                 FOR tmp_val IN SELECT value FROM UNNEST(norm_attr_value) x(value) LOOP
149                     IF tmp_val IS NOT NULL AND BTRIM(tmp_val) <> '' THEN
150                         BEGIN -- use subtransaction to isolate unique constraint violations
151                             INSERT INTO metabib.uncontrolled_record_attr_value ( attr, value ) VALUES ( attr_def.name, tmp_val );
152                         EXCEPTION WHEN unique_violation THEN END;
153                     END IF;
154                 END LOOP;
155
156                 SELECT ARRAY_AGG(id) INTO attr_vector_tmp FROM metabib.uncontrolled_record_attr_value WHERE attr = attr_def.name AND value = ANY( norm_attr_value );
157             ELSE
158                 SELECT ARRAY_AGG(id) INTO attr_vector_tmp FROM config.coded_value_map WHERE ctype = attr_def.name AND code = ANY( norm_attr_value );
159             END IF;
160
161             -- Add the new value to the vector
162             attr_vector := attr_vector || attr_vector_tmp;
163         END IF;
164
165         IF attr_def.sorter THEN
166             DELETE FROM metabib.record_sorter WHERE source = rid AND attr = attr_def.name;
167             IF norm_attr_value[1] IS NOT NULL THEN
168                 INSERT INTO metabib.record_sorter (source, attr, value) VALUES (rid, attr_def.name, norm_attr_value[1]);
169             END IF;
170         END IF;
171
172     END LOOP;
173
174 /* We may need to rewrite the vlist to contain
175    the intersection of new values for requested
176    attrs and old values for ignored attrs. To
177    do this, we take the old attr vlist and
178    subtract any values that are valid for the
179    requested attrs, and then add back the new
180    set of attr values. */
181
182     IF ARRAY_LENGTH(pattr_list, 1) > 0 THEN 
183         SELECT vlist INTO attr_vector_tmp FROM metabib.record_attr_vector_list WHERE source = rid;
184         SELECT attr_vector_tmp - ARRAY_AGG(id::INT) INTO attr_vector_tmp FROM metabib.full_attr_id_map WHERE attr = ANY (pattr_list);
185         attr_vector := attr_vector || attr_vector_tmp;
186     END IF;
187
188     -- On to composite attributes, now that the record attrs have been pulled.  Processed in name order, so later composite
189     -- attributes can depend on earlier ones.
190     PERFORM metabib.compile_composite_attr_cache_init();
191     FOR attr_def IN SELECT * FROM config.record_attr_definition WHERE composite AND name = ANY( attr_list ) ORDER BY name LOOP
192
193         FOR ccvm_row IN SELECT * FROM config.coded_value_map c WHERE c.ctype = attr_def.name ORDER BY value LOOP
194
195             tmp_val := metabib.compile_composite_attr( ccvm_row.id );
196             CONTINUE WHEN tmp_val IS NULL OR tmp_val = ''; -- nothing to do
197
198             IF attr_def.filter THEN
199                 IF attr_vector @@ tmp_val::query_int THEN
200                     attr_vector = attr_vector + intset(ccvm_row.id);
201                     EXIT WHEN NOT attr_def.multi;
202                 END IF;
203             END IF;
204
205             IF attr_def.sorter THEN
206                 IF attr_vector @@ tmp_val THEN
207                     DELETE FROM metabib.record_sorter WHERE source = rid AND attr = attr_def.name;
208                     INSERT INTO metabib.record_sorter (source, attr, value) VALUES (rid, attr_def.name, ccvm_row.code);
209                 END IF;
210             END IF;
211
212         END LOOP;
213
214     END LOOP;
215
216     IF ARRAY_LENGTH(attr_vector, 1) > 0 THEN
217         IF rdeleted THEN -- initial insert OR revivication
218             DELETE FROM metabib.record_attr_vector_list WHERE source = rid;
219             INSERT INTO metabib.record_attr_vector_list (source, vlist) VALUES (rid, attr_vector);
220         ELSE
221             UPDATE metabib.record_attr_vector_list SET vlist = attr_vector WHERE source = rid;
222         END IF;
223     END IF;
224
225 END;
226
227 $func$ LANGUAGE PLPGSQL;
228
229 CREATE INDEX config_coded_value_map_ctype_idx ON config.coded_value_map (ctype);
230
231 COMMIT;