f9e4e0964b10f46630b963f091ecff8d3f125ff7
[working/Evergreen.git] / Open-ILS / src / sql / Pg / upgrade / XXXX.schema.ingest_multiple_sources.sql
1 BEGIN;
2
3 SELECT evergreen.upgrade_deps_block_check('XXXX', :eg_version);
4
5 CREATE OR REPLACE FUNCTION metabib.reingest_record_attributes (rid BIGINT, pattr_list TEXT[] DEFAULT NULL, prmarc TEXT DEFAULT NULL, rdeleted BOOL DEFAULT TRUE) RETURNS VOID AS $func$
6 DECLARE
7     transformed_xml TEXT;
8     rmarc           TEXT := prmarc;
9     tmp_val         TEXT;
10     prev_xfrm       TEXT;
11     normalizer      RECORD;
12     xfrm            config.xml_transform%ROWTYPE;
13     attr_vector     INT[] := '{}'::INT[];
14     attr_vector_tmp INT[];
15     attr_list       TEXT[] := pattr_list;
16     attr_value      TEXT[];
17     norm_attr_value TEXT[];
18     tmp_xml         TEXT;
19     tmp_array       TEXT[];
20     attr_def        config.record_attr_definition%ROWTYPE;
21     ccvm_row        config.coded_value_map%ROWTYPE;
22     jump_past       BOOL;
23 BEGIN
24
25     IF attr_list IS NULL OR rdeleted THEN -- need to do the full dance on INSERT or undelete
26         SELECT ARRAY_AGG(name) INTO attr_list FROM config.record_attr_definition
27         WHERE (
28             tag IS NOT NULL OR
29             fixed_field IS NOT NULL OR
30             xpath IS NOT NULL OR
31             phys_char_sf IS NOT NULL OR
32             composite
33         ) AND (
34             filter OR sorter
35         );
36     END IF;
37
38     IF rmarc IS NULL THEN
39         SELECT marc INTO rmarc FROM biblio.record_entry WHERE id = rid;
40     END IF;
41
42     FOR attr_def IN SELECT * FROM config.record_attr_definition WHERE NOT composite AND name = ANY( attr_list ) ORDER BY format LOOP
43
44         jump_past := FALSE; -- This gets set when we are non-multi and have found something
45         attr_value := '{}'::TEXT[];
46         norm_attr_value := '{}'::TEXT[];
47         attr_vector_tmp := '{}'::INT[];
48
49         SELECT * INTO ccvm_row FROM config.coded_value_map c WHERE c.ctype = attr_def.name LIMIT 1; 
50
51         IF attr_def.tag IS NOT NULL THEN -- tag (and optional subfield list) selection
52             SELECT  ARRAY_AGG(value) INTO attr_value
53               FROM  (SELECT * FROM metabib.full_rec ORDER BY tag, subfield) AS x
54               WHERE record = rid
55                     AND tag LIKE attr_def.tag
56                     AND CASE
57                         WHEN attr_def.sf_list IS NOT NULL 
58                             THEN POSITION(subfield IN attr_def.sf_list) > 0
59                         ELSE TRUE
60                     END
61               GROUP BY tag
62               ORDER BY tag;
63
64             IF NOT attr_def.multi THEN
65                 attr_value := ARRAY[ARRAY_TO_STRING(attr_value, COALESCE(attr_def.joiner,' '))];
66                 jump_past := TRUE;
67             END IF;
68         END IF;
69
70         IF NOT jump_past AND attr_def.fixed_field IS NOT NULL THEN -- a named fixed field, see config.marc21_ff_pos_map.fixed_field
71             attr_value := attr_value || vandelay.marc21_extract_fixed_field_list(rmarc, attr_def.fixed_field);
72
73             IF NOT attr_def.multi THEN
74                 attr_value := ARRAY[attr_value[1]];
75                 jump_past := TRUE;
76             END IF;
77         END IF;
78
79         IF NOT jump_past AND attr_def.xpath IS NOT NULL THEN -- and xpath expression
80
81             SELECT INTO xfrm * FROM config.xml_transform WHERE name = attr_def.format;
82         
83             -- See if we can skip the XSLT ... it's expensive
84             IF prev_xfrm IS NULL OR prev_xfrm <> xfrm.name THEN
85                 -- Can't skip the transform
86                 IF xfrm.xslt <> '---' THEN
87                     transformed_xml := oils_xslt_process(rmarc,xfrm.xslt);
88                 ELSE
89                     transformed_xml := rmarc;
90                 END IF;
91     
92                 prev_xfrm := xfrm.name;
93             END IF;
94
95             IF xfrm.name IS NULL THEN
96                 -- just grab the marcxml (empty) transform
97                 SELECT INTO xfrm * FROM config.xml_transform WHERE xslt = '---' LIMIT 1;
98                 prev_xfrm := xfrm.name;
99             END IF;
100
101             FOR tmp_xml IN SELECT UNNEST(oils_xpath(attr_def.xpath, transformed_xml, ARRAY[ARRAY[xfrm.prefix, xfrm.namespace_uri]])) LOOP
102                 tmp_val := oils_xpath_string(
103                                 '//*',
104                                 tmp_xml,
105                                 COALESCE(attr_def.joiner,' '),
106                                 ARRAY[ARRAY[xfrm.prefix, xfrm.namespace_uri]]
107                             );
108                 IF tmp_val IS NOT NULL AND BTRIM(tmp_val) <> '' THEN
109                     attr_value := attr_value || tmp_val;
110                     EXIT WHEN NOT attr_def.multi;
111                 END IF;
112             END LOOP;
113         END IF;
114
115         IF NOT jump_past AND attr_def.phys_char_sf IS NOT NULL THEN -- a named Physical Characteristic, see config.marc21_physical_characteristic_*_map
116             SELECT  ARRAY_AGG(m.value) INTO tmp_array
117               FROM  vandelay.marc21_physical_characteristics(rmarc) v
118                     LEFT JOIN config.marc21_physical_characteristic_value_map m ON (m.id = v.value)
119               WHERE v.subfield = attr_def.phys_char_sf AND (m.value IS NOT NULL AND BTRIM(m.value) <> '')
120                     AND ( ccvm_row.id IS NULL OR ( ccvm_row.id IS NOT NULL AND v.id IS NOT NULL) );
121
122             attr_value := attr_value || tmp_array;
123
124             IF NOT attr_def.multi THEN
125                 attr_value := ARRAY[attr_value[1]];
126             END IF;
127
128         END IF;
129
130                 -- apply index normalizers to attr_value
131         FOR tmp_val IN SELECT value FROM UNNEST(attr_value) x(value) LOOP
132             FOR normalizer IN
133                 SELECT  n.func AS func,
134                         n.param_count AS param_count,
135                         m.params AS params
136                   FROM  config.index_normalizer n
137                         JOIN config.record_attr_index_norm_map m ON (m.norm = n.id)
138                   WHERE attr = attr_def.name
139                   ORDER BY m.pos LOOP
140                     EXECUTE 'SELECT ' || normalizer.func || '(' ||
141                     COALESCE( quote_literal( tmp_val ), 'NULL' ) ||
142                         CASE
143                             WHEN normalizer.param_count > 0
144                                 THEN ',' || REPLACE(REPLACE(BTRIM(normalizer.params,'[]'),E'\'',E'\\\''),E'"',E'\'')
145                                 ELSE ''
146                             END ||
147                     ')' INTO tmp_val;
148
149             END LOOP;
150             IF tmp_val IS NOT NULL AND tmp_val <> '' THEN
151                 -- note that a string that contains only blanks
152                 -- is a valid value for some attributes
153                 norm_attr_value := norm_attr_value || tmp_val;
154             END IF;
155         END LOOP;
156         
157         IF attr_def.filter THEN
158             -- Create unknown uncontrolled values and find the IDs of the values
159             IF ccvm_row.id IS NULL THEN
160                 FOR tmp_val IN SELECT value FROM UNNEST(norm_attr_value) x(value) LOOP
161                     IF tmp_val IS NOT NULL AND BTRIM(tmp_val) <> '' THEN
162                         BEGIN -- use subtransaction to isolate unique constraint violations
163                             INSERT INTO metabib.uncontrolled_record_attr_value ( attr, value ) VALUES ( attr_def.name, tmp_val );
164                         EXCEPTION WHEN unique_violation THEN END;
165                     END IF;
166                 END LOOP;
167
168                 SELECT ARRAY_AGG(id) INTO attr_vector_tmp FROM metabib.uncontrolled_record_attr_value WHERE attr = attr_def.name AND value = ANY( norm_attr_value );
169             ELSE
170                 SELECT ARRAY_AGG(id) INTO attr_vector_tmp FROM config.coded_value_map WHERE ctype = attr_def.name AND code = ANY( norm_attr_value );
171             END IF;
172
173             -- Add the new value to the vector
174             attr_vector := attr_vector || attr_vector_tmp;
175         END IF;
176
177         IF attr_def.sorter THEN
178             DELETE FROM metabib.record_sorter WHERE source = rid AND attr = attr_def.name;
179             IF norm_attr_value[1] IS NOT NULL THEN
180                 INSERT INTO metabib.record_sorter (source, attr, value) VALUES (rid, attr_def.name, norm_attr_value[1]);
181             END IF;
182         END IF;
183
184     END LOOP;
185
186 /* We may need to rewrite the vlist to contain
187    the intersection of new values for requested
188    attrs and old values for ignored attrs. To
189    do this, we take the old attr vlist and
190    subtract any values that are valid for the
191    requested attrs, and then add back the new
192    set of attr values. */
193
194     IF ARRAY_LENGTH(pattr_list, 1) > 0 THEN 
195         SELECT vlist INTO attr_vector_tmp FROM metabib.record_attr_vector_list WHERE source = rid;
196         SELECT attr_vector_tmp - ARRAY_AGG(id::INT) INTO attr_vector_tmp FROM metabib.full_attr_id_map WHERE attr = ANY (pattr_list);
197         attr_vector := attr_vector || attr_vector_tmp;
198     END IF;
199
200     -- On to composite attributes, now that the record attrs have been pulled.  Processed in name order, so later composite
201     -- attributes can depend on earlier ones.
202     PERFORM metabib.compile_composite_attr_cache_init();
203     FOR attr_def IN SELECT * FROM config.record_attr_definition WHERE composite AND name = ANY( attr_list ) ORDER BY name LOOP
204
205         FOR ccvm_row IN SELECT * FROM config.coded_value_map c WHERE c.ctype = attr_def.name ORDER BY value LOOP
206
207             tmp_val := metabib.compile_composite_attr( ccvm_row.id );
208             CONTINUE WHEN tmp_val IS NULL OR tmp_val = ''; -- nothing to do
209
210             IF attr_def.filter THEN
211                 IF attr_vector @@ tmp_val::query_int THEN
212                     attr_vector = attr_vector + intset(ccvm_row.id);
213                     EXIT WHEN NOT attr_def.multi;
214                 END IF;
215             END IF;
216
217             IF attr_def.sorter THEN
218                 IF attr_vector @@ tmp_val THEN
219                     DELETE FROM metabib.record_sorter WHERE source = rid AND attr = attr_def.name;
220                     INSERT INTO metabib.record_sorter (source, attr, value) VALUES (rid, attr_def.name, ccvm_row.code);
221                 END IF;
222             END IF;
223
224         END LOOP;
225
226     END LOOP;
227
228     IF ARRAY_LENGTH(attr_vector, 1) > 0 THEN
229         IF rdeleted THEN -- initial insert OR revivication
230             DELETE FROM metabib.record_attr_vector_list WHERE source = rid;
231             INSERT INTO metabib.record_attr_vector_list (source, vlist) VALUES (rid, attr_vector);
232         ELSE
233             UPDATE metabib.record_attr_vector_list SET vlist = attr_vector WHERE source = rid;
234         END IF;
235     END IF;
236
237 END;
238
239 $func$ LANGUAGE PLPGSQL;
240
241 COMMIT;
242