]> git.evergreen-ils.org Git - Evergreen.git/blob - Open-ILS/src/perlmods/lib/OpenILS/Application/Trigger.pm
lp1863252 toward geosort
[Evergreen.git] / Open-ILS / src / perlmods / lib / OpenILS / Application / Trigger.pm
1 package OpenILS::Application::Trigger;
2 use strict; use warnings;
3 use OpenILS::Application;
4 use base qw/OpenILS::Application/;
5
6 use OpenSRF::EX qw/:try/;
7 use OpenSRF::Utils::JSON;
8
9 use OpenSRF::AppSession;
10 use OpenSRF::MultiSession;
11 use OpenSRF::Utils::SettingsClient;
12 use OpenSRF::Utils::Logger qw/$logger/;
13 use OpenILS::Utils::DateTime qw/:datetime/;
14
15 use DateTime;
16 use DateTime::Format::ISO8601;
17
18 use OpenILS::Utils::Fieldmapper;
19 use OpenILS::Utils::CStoreEditor q/:funcs/;
20 use OpenILS::Application::Trigger::Event;
21 use OpenILS::Application::Trigger::EventGroup;
22
23
24 my $log = 'OpenSRF::Utils::Logger';
25 my $parallel_collect;
26 my $parallel_react;
27
28 sub initialize {
29
30     my $conf = OpenSRF::Utils::SettingsClient->new;
31     $parallel_collect = $conf->config_value( apps => 'open-ils.trigger' => app_settings => parallel => 'collect') || 1;
32     $parallel_react = $conf->config_value( apps => 'open-ils.trigger' => app_settings => parallel => 'react') || 1;
33
34 }
35 sub child_init {}
36
37 sub create_active_events_for_object {
38     my $self = shift;
39     my $client = shift;
40     my $key = shift;
41     my $target = shift;
42     my $location = shift;
43     my $granularity = shift;
44     my $user_data = shift;
45
46     my $ident = $target->Identity;
47     my $ident_value = $target->$ident();
48
49     my $editor = new_editor(xact=>1);
50
51     my $hooks = $editor->search_action_trigger_hook(
52         { key       => $key,
53           core_type => $target->json_hint
54         }
55     );
56
57     unless(@$hooks) {
58         $editor->rollback;
59         return undef;
60     }
61
62     my %hook_hash = map { ($_->key, $_) } @$hooks;
63
64     my $orgs = $editor->json_query({ from => [ 'actor.org_unit_ancestors' => $location ] });
65     my $defs = $editor->search_action_trigger_event_definition(
66         { hook   => [ keys %hook_hash ],
67           owner  => [ map { $_->{id} } @$orgs  ],
68           active => 't'
69         }
70     );
71
72     for my $def ( @$defs ) {
73         next if ($granularity && $def->granularity ne $granularity );
74
75         if (!$self->{ignore_opt_in} && $def->usr_field && $def->opt_in_setting) {
76             my $ufield = $def->usr_field;
77             my $uid = $target->$ufield;
78             $uid = $uid->id if (ref $uid); # fleshed user object, unflesh it
79
80             my $opt_in_setting = $editor->search_actor_user_setting(
81                 { usr   => $uid,
82                   name  => $def->opt_in_setting,
83                   value => 'true'
84                 }
85             );
86
87             next unless (@$opt_in_setting);
88         }
89
90         my $date = DateTime->now;
91
92         if ($hook_hash{$def->hook}->passive eq 'f') {
93
94             if (my $dfield = $def->delay_field) {
95                 if ($target->$dfield()) {
96                     $date = DateTime::Format::ISO8601->new->parse_datetime( clean_ISO8601($target->$dfield) );
97                 } else {
98                     next;
99                 }
100             }
101
102             $date->add( seconds => interval_to_seconds($def->delay) );
103         }
104
105         my $event = Fieldmapper::action_trigger::event->new();
106         $event->target( $ident_value );
107         $event->event_def( $def->id );
108         $event->run_time( $date->strftime( '%F %T%z' ) );
109         $event->user_data( OpenSRF::Utils::JSON->perl2JSON($user_data) ) if (defined($user_data));
110
111         $editor->create_action_trigger_event( $event );
112
113         $client->respond( $event->id );
114     }
115
116     $editor->commit;
117
118     return undef;
119 }
120 __PACKAGE__->register_method(
121     api_name => 'open-ils.trigger.event.autocreate',
122     method   => 'create_active_events_for_object',
123     api_level=> 1,
124     stream   => 1,
125     argc     => 3
126 );
127 __PACKAGE__->register_method(
128     api_name      => 'open-ils.trigger.event.autocreate.ignore_opt_in',
129     method        => 'create_active_events_for_object',
130     api_level     => 1,
131     stream        => 1,
132     argc          => 3,
133     ignore_opt_in => 1
134 );
135
136 sub create_event_for_object_and_def {
137     my $self = shift;
138     my $client = shift;
139     my $definitions = shift;
140     my $target = shift;
141     my $location = shift;
142     my $user_data = shift;
143
144     my $ident = $target->Identity;
145     my $ident_value = $target->$ident();
146
147     my @active = ($self->api_name =~ /inactive/o) ? () : ( active => 't' );
148
149     my $editor = new_editor(xact=>1);
150
151     my $orgs = $editor->json_query({ from => [ 'actor.org_unit_ancestors' => $location ] });
152     my $defs = $editor->search_action_trigger_event_definition(
153         { id => $definitions,
154           owner  => [ map { $_->{id} } @$orgs  ],
155           @active
156         }
157     );
158
159     my $hooks = $editor->search_action_trigger_hook(
160         { key       => [ map { $_->hook } @$defs ],
161           core_type => $target->json_hint
162         }
163     );
164
165     my %hook_hash = map { ($_->key, $_) } @$hooks;
166
167     for my $def ( @$defs ) {
168
169         if ($def->usr_field && $def->opt_in_setting) {
170             my $ufield = $def->usr_field;
171             my $uid = $target->$ufield;
172             $uid = $uid->id if (ref $uid); # fleshed user object, unflesh it
173
174             my $opt_in_setting = $editor->search_actor_user_setting(
175                 { usr   => $uid,
176                   name  => $def->opt_in_setting,
177                   value => 'true'
178                 }
179             );
180
181             next unless (@$opt_in_setting);
182         }
183
184         my $date = DateTime->now;
185
186         if ($hook_hash{$def->hook}->passive eq 'f') {
187
188             if (my $dfield = $def->delay_field) {
189                 if ($target->$dfield()) {
190                     $date = DateTime::Format::ISO8601->new->parse_datetime( clean_ISO8601($target->$dfield) );
191                 } else {
192                     next;
193                 }
194             }
195
196             $date->add( seconds => interval_to_seconds($def->delay) );
197         }
198
199         my $event = Fieldmapper::action_trigger::event->new();
200         $event->target( $ident_value );
201         $event->event_def( $def->id );
202         $event->run_time( $date->strftime( '%F %T%z' ) );
203         $event->user_data( OpenSRF::Utils::JSON->perl2JSON($user_data) ) if (defined($user_data));
204
205         $editor->create_action_trigger_event( $event );
206
207         $client->respond( $event->id );
208     }
209
210     $editor->commit;
211
212     return undef;
213 }
214 __PACKAGE__->register_method(
215     api_name => 'open-ils.trigger.event.autocreate.by_definition',
216     method   => 'create_event_for_object_and_def',
217     api_level=> 1,
218     stream   => 1,
219     argc     => 3
220 );
221 __PACKAGE__->register_method(
222     api_name => 'open-ils.trigger.event.autocreate.by_definition.include_inactive',
223     method   => 'create_event_for_object_and_def',
224     api_level=> 1,
225     stream   => 1,
226     argc     => 3
227 );
228
229
230 # Retrieves events by object, or object type + filter
231 #  $object : a target object or object type (class hint)
232 #
233 #  $filter : an optional hash of filters ... top level keys:
234 #     event
235 #        filters on the atev objects, such as states or null-ness of timing
236 #        fields. contains the effective default of:
237 #          { state => 'pending' }
238 #        an example, which overrides the default, and will find
239 #        stale 'found' events:
240 #          { state => 'found', update_time => { '<' => 'yesterday' } }
241 #
242 #      event_def
243 #        filters on the atevdef object. contains the effective default of:
244 #          { active => 't' }
245 #
246 #      hook
247 #        filters on the hook object. no defaults, but there is a pinned,
248 #        unchangeable filter based on the passed hint or object type (see
249 #        $object above). an example for finding passive events:
250 #          { passive => 't' }
251 #
252 #     target
253 #        filters against the target field on the event. this can contain
254 #        either an array of target ids (if you passed an object type, and
255 #        not an object) or can contain a json_query that will return exactly
256 #        a list of target-type ids.  If you pass an object, the pkey value of
257 #        that object will be used as a filter in addition to the filter passed
258 #        in here.  example filter for circs of user 1234 that are open:
259 #          { select => { circ => ['id'] },
260 #            from => 'circ',
261 #            where => {
262 #              usr => 1234,
263 #              checkin_time => undef, 
264 #              '-or' => [
265 #                { stop_fines => undef },
266 #                { stop_fines => { 'not in' => ['LOST','LONGOVERDUE','CLAIMSRETURNED'] } }
267 #              ]
268 #            }
269
270 sub events_by_target {
271     my $self = shift;
272     my $client = shift;
273     my $object = shift;
274     my $filter = shift || {};
275     my $flesh_fields = shift || {};
276     my $flesh_depth = shift || 1;
277
278     my $obj_class = ref($object) || _fm_class_by_hint($object);
279     my $obj_hint = ref($object) ? _fm_hint_by_class(ref($object)) : $object;
280
281     my $object_ident_field = $obj_class->Identity;
282
283     my $query = {
284         select => { atev => ["id"] },
285         from   => {
286             atev => {
287                 atevdef => {
288                     field => "id",
289                     fkey => "event_def",
290                     join => {
291                         ath => { field => "key", fkey => "hook" }
292                     }
293                 }
294             }
295         },
296         where  => {
297             "+ath"  => { core_type => $obj_hint },
298             "+atevdef" => { active => 't' },
299             "+atev" => { state => 'pending' }
300         },
301         order_by => { "atev" => [ 'run_time', 'add_time' ] }
302     };
303
304     $query->{limit} = $filter->{limit} if defined $filter->{limit};
305     $query->{offset} = $filter->{offset} if defined $filter->{offset};
306     $query->{order_by} = $filter->{order_by} if defined $filter->{order_by};
307
308
309     # allow multiple 'target' filters
310     $query->{where}->{'+atev'}->{'-and'} = [];
311
312     # if we got a real object, filter on its pkey value
313     if (ref($object)) { # pass an object, require that target
314         push @{ $query->{where}->{'+atev'}->{'-and'} },
315             { target => $object->$object_ident_field }
316     }
317
318     # we have a fancy complex target filter or a list of target ids
319     if ($$filter{target}) {
320         push @{ $query->{where}->{'+atev'}->{'-and'} },
321             { target => {in => $$filter{target} } };
322     }
323
324     # pass no target filter or object, you get no events
325     if (!@{ $query->{where}->{'+atev'}->{'-and'} }) {
326         return undef; 
327     }
328
329     # any hook filters, other than the required core_type filter
330     if ($$filter{hook}) {
331         $query->{where}->{'+ath'}->{$_} = $$filter{hook}{$_}
332             for (grep { $_ ne 'core_type' } keys %{$$filter{hook}});
333     }
334
335     # any event_def filters.  defaults to { active => 't' }
336     if ($$filter{event_def}) {
337         $query->{where}->{'+atevdef'}->{$_} = $$filter{event_def}{$_}
338             for (keys %{$$filter{event_def}});
339     }
340
341     # any event filters.  defaults to { state => 'pending' }.
342     # don't overwrite '-and' used for multiple target filters above
343     if ($$filter{event}) {
344         $query->{where}->{'+atev'}->{$_} = $$filter{event}{$_}
345             for (grep { $_ ne '-and' } keys %{$$filter{event}});
346     }
347
348     my $e = new_editor(xact=>1);
349
350     my $events = $e->json_query($query);
351
352     $flesh_fields->{atev} = ['event_def'] unless $flesh_fields->{atev};
353
354     for my $id (@$events) {
355         my $event = $e->retrieve_action_trigger_event([
356             $id->{id},
357             {flesh => $flesh_depth, flesh_fields => $flesh_fields}
358         ]);
359
360         (my $meth = $obj_class) =~ s/^Fieldmapper:://o;
361         $meth =~ s/::/_/go;
362         $meth = 'retrieve_'.$meth;
363
364         $event->target($e->$meth($event->target));
365         $client->respond($event);
366     }
367
368     return undef;
369 }
370 __PACKAGE__->register_method(
371     api_name => 'open-ils.trigger.events_by_target',
372     method   => 'events_by_target',
373     api_level=> 1,
374     stream   => 1,
375     argc     => 2
376 );
377  
378 sub _fm_hint_by_class {
379     my $class = shift;
380     return OpenILS::Application->publish_fieldmapper->{$class}->{hint};
381 }
382
383 sub _fm_class_by_hint {
384     my $hint = shift;
385
386     my ($class) = grep {
387         OpenILS::Application->publish_fieldmapper->{$_}->{hint} eq $hint
388     } keys %{ OpenILS::Application->publish_fieldmapper };
389
390     return $class;
391 }
392
393 sub create_batch_events {
394     my $self = shift;
395     my $client = shift;
396     my $key = shift;
397     my $location_field = shift; # where to look for event_def.owner filtering ... circ_lib, for instance, where hook.core_type = circ
398     my $filter = shift || {};
399     my $granularity = shift;
400     my $user_data = shift;
401
402     my $active = ($self->api_name =~ /active/o) ? 1 : 0;
403     if ($active && !keys(%$filter)) {
404         $log->info("Active batch event creation requires a target filter but none was supplied to create_batch_events");
405         return undef;
406     }
407
408     return undef unless ($key && $location_field);
409
410     my $editor = new_editor(xact=>1);
411     my $hooks = $editor->search_action_trigger_hook(
412         { passive => $active ? 'f' : 't', key => $key }
413     );
414
415     my %hook_hash = map { ($_->key, $_) } @$hooks;
416
417     my $defs = $editor->search_action_trigger_event_definition(
418         { hook   => [ keys %hook_hash ], active => 't' },
419     );
420
421     my $orig_filter_and = [];
422     if ($$filter{'-and'}) {
423         for my $f ( @{ $$filter{'-and'} } ) {
424             push @$orig_filter_and, $f;
425         }
426     }
427
428     for my $def ( @$defs ) {
429         next if ($granularity && $def->granularity ne $granularity );
430
431         my $date = DateTime->now->subtract( seconds => interval_to_seconds($def->delay) );
432
433         # we may need to do some work to backport this to 1.2
434         $filter->{ $location_field } = { 'in' =>
435             {
436                 select  => { aou => [{ column => 'id', transform => 'actor.org_unit_descendants', result_field => 'id' }] },
437                 from    => 'aou',
438                 where   => { id => $def->owner }
439             }
440         };
441
442         my $run_time = 'now';
443         if ($active) {
444             $run_time = 
445                 DateTime
446                     ->now
447                     ->add( seconds => interval_to_seconds($def->delay) )
448                     ->strftime( '%F %T%z' );
449         } else {
450             if ($def->max_delay) {
451                 my @times = sort {$a <=> $b} interval_to_seconds($def->delay), interval_to_seconds($def->max_delay);
452                 $filter->{ $def->delay_field } = {
453                     'between' => [
454                         DateTime->now->subtract( seconds => $times[1] )->strftime( '%F %T%z' ),
455                         DateTime->now->subtract( seconds => $times[0] )->strftime( '%F %T%z' )
456                     ]
457                 };
458             } else {
459                 $filter->{ $def->delay_field } = {
460                     '<=' => DateTime->now->subtract( seconds => interval_to_seconds($def->delay) )->strftime( '%F %T%z' )
461                 };
462             }
463         }
464
465         my $class = _fm_class_by_hint($hook_hash{$def->hook}->core_type);
466
467         # filter where this target has an event (and it's pending, for active hooks)
468         $$filter{'-and'} = [];
469         for my $f ( @$orig_filter_and ) {
470             push @{ $$filter{'-and'} }, $f;
471         }
472
473         my $join = { 'join' => {
474             atev => {
475                 field => 'target',
476                 fkey => $class->Identity,
477                 type => 'left',
478                 filter => { event_def => $def->id }
479             }
480         }};
481         if ($def->repeat_delay) {
482             $join->{'join'}{atev}{filter} = { start_time => {
483                 '>' => DateTime->now->subtract( seconds => interval_to_seconds($def->repeat_delay) )->strftime( '%F %T%z' )
484             } };
485         }
486
487         push @{ $filter->{'-and'} }, { '+atev' => { id => undef } };
488
489         if ($def->usr_field && $def->opt_in_setting) {
490             push @{ $filter->{'-and'} }, {
491                 '-exists' => {
492                     from  => 'aus',
493                     where => {
494                         name => $def->opt_in_setting,
495                         usr  => { '=' => { '+' . $hook_hash{$def->hook}->core_type => $def->usr_field } },
496                         value=> 'true'
497                     }
498                 }
499             };
500         }
501
502         $class =~ s/^Fieldmapper:://o;
503         $class =~ s/::/_/go;
504         my $method = 'search_'. $class;
505
506         # for cleaner logging
507         my $def_id = $def->id;
508         my $hook = $def->hook;
509
510         $logger->info("trigger: create_batch_events() collecting object IDs for def=$def_id / hook=$hook");
511
512         my $object_ids = $editor->$method( [$filter, $join], {idlist => 1, timeout => 10800} );
513
514         if($object_ids) {
515             $logger->info("trigger: create_batch_events() fetched ".scalar(@$object_ids)." object IDs for def=$def_id / hook=$hook");
516         } else {
517             $logger->warn("trigger: create_batch_events() timeout occurred collecting object IDs for def=$def_id / hook=$hook");
518         }
519
520         for my $o_id (@$object_ids) {
521
522             my $event = Fieldmapper::action_trigger::event->new();
523             $event->target( $o_id );
524             $event->event_def( $def->id );
525             $event->run_time( $run_time );
526             $event->user_data( OpenSRF::Utils::JSON->perl2JSON($user_data) ) if (defined($user_data));
527
528             $editor->create_action_trigger_event( $event );
529
530             $client->respond( $event->id );
531         }
532         
533         $logger->info("trigger: create_batch_events() successfully created events for def=$def_id / hook=$hook");
534     }
535
536     $logger->info("trigger: create_batch_events() done creating events");
537
538     $editor->commit;
539
540     return undef;
541 }
542 __PACKAGE__->register_method(
543     api_name => 'open-ils.trigger.passive.event.autocreate.batch',
544     method   => 'create_batch_events',
545     api_level=> 1,
546     stream   => 1,
547     argc     => 2
548 );
549
550 __PACKAGE__->register_method(
551     api_name => 'open-ils.trigger.active.event.autocreate.batch',
552     method   => 'create_batch_events',
553     api_level=> 1,
554     stream   => 1,
555     argc     => 2
556 );
557
558 sub fire_single_event {
559     my $self = shift;
560     my $client = shift;
561     my $event_id = shift;
562
563     my $e = OpenILS::Application::Trigger::Event->new($event_id);
564
565     if ($e->validate->valid) {
566         $logger->info("trigger: Event is valid, reacting...");
567         $e->react->cleanup;
568     }
569
570     $e->editor->disconnect;
571     OpenILS::Application::Trigger::Event->ClearObjectCache();
572
573     return {
574         valid     => $e->valid,
575         reacted   => $e->reacted,
576         cleanedup => $e->cleanedup,
577         event     => $e->event
578     };
579 }
580 __PACKAGE__->register_method(
581     api_name => 'open-ils.trigger.event.fire',
582     method   => 'fire_single_event',
583     api_level=> 1,
584     argc     => 1
585 );
586
587 sub fire_event_group {
588     my $self = shift;
589     my $client = shift;
590     my $events = shift;
591
592     my $e = OpenILS::Application::Trigger::EventGroup->new(@$events);
593
594     if ($e->validate->valid) {
595         $logger->info("trigger: Event group is valid, reacting...");
596         $e->react->cleanup;
597     }
598
599     $e->editor->disconnect;
600     OpenILS::Application::Trigger::Event->ClearObjectCache();
601
602     return {
603         valid     => $e->valid,
604         reacted   => $e->reacted,
605         cleanedup => $e->cleanedup,
606         events    => [map { $_->event } @{$e->events}]
607     };
608 }
609 __PACKAGE__->register_method(
610     api_name => 'open-ils.trigger.event_group.fire',
611     method   => 'fire_event_group',
612     api_level=> 1,
613     argc     => 1
614 );
615
616 sub revalidate_event_group_test {
617     my $self = shift;
618     my $client = shift;
619     my $events = shift;
620
621     my $e = OpenILS::Application::Trigger::EventGroup->new_nochanges(@$events);
622
623     my $result = $e->revalidate_test;
624
625     $e->editor->disconnect;
626     OpenILS::Application::Trigger::Event->ClearObjectCache();
627
628     return $result;
629 }
630 __PACKAGE__->register_method(
631     api_name => 'open-ils.trigger.event_group.revalidate.test',
632     method   => 'revalidate_event_group_test',
633     api_level=> 1,
634     argc     => 1,
635     signature => {
636         desc => q/revalidate a group of events.
637         This does not actually update the events (so there will be no change
638         of atev.state or anything else in the database, unless an event's
639         validator makes changes out-of-band).
640         
641         This returns an array of valid event IDs.
642         /,
643         params => [
644             {name => "events", type => "array", desc => "list of event ids"}
645         ]
646     }
647 );
648
649
650 sub pending_events {
651     my $self = shift;
652     my $client = shift;
653     my $granularity = shift;
654     my $granflag = shift;
655
656     my $query = [{ state => 'pending', run_time => {'<' => 'now'} }, { order_by => { atev => [ qw/run_time add_time/] }, 'join' => 'atevdef' }];
657
658     if (defined $granularity) {
659         if ($granflag) {
660             $query->[0]->{'+atevdef'} = {granularity => $granularity};
661         } else {
662             $query->[0]->{'+atevdef'} = {'-or' => [ {granularity => $granularity}, {granularity => undef} ] };
663         }
664     } else {
665         $query->[0]->{'+atevdef'} = {granularity => undef};
666     }
667
668     return new_editor(xact=>1)->search_action_trigger_event(
669         $query, { idlist=> 1, timeout => 7200, substream => 1 }
670     );
671 }
672 __PACKAGE__->register_method(
673     api_name => 'open-ils.trigger.event.find_pending',
674     method   => 'pending_events',
675     api_level=> 1
676 );
677
678 sub gather_events {
679     my $self = shift;
680     my $client = shift;
681     my $e_ids = shift;
682
683     $e_ids = [$e_ids] if (!ref($e_ids));
684
685     my @events;
686     for my $e_id (@$e_ids) {
687         my $e;
688         try {
689            $e = OpenILS::Application::Trigger::Event->new($e_id);
690         } catch Error with {
691             $logger->error("trigger: Event creation failed with ".shift());
692         };
693
694         next if !$e or $e->event->state eq 'invalid'; 
695
696         try {
697             $e->build_environment;
698         } catch Error with {
699             $logger->error("trigger: Event environment building failed with ".shift());
700         };
701
702         $e->editor->disconnect;
703         $e->environment->{EventProcessor} = undef; # remove circular ref for json encoding
704         $client->respond($e);
705     }
706
707     OpenILS::Application::Trigger::Event->ClearObjectCache();
708
709     return undef;
710 }
711 __PACKAGE__->register_method(
712     api_name => 'open-ils.trigger.event.gather',
713     method   => 'gather_events',
714     api_level=> 1
715 );
716
717 sub grouped_events {
718     my $self = shift;
719     my $client = shift;
720     my $granularity = shift;
721     my $granflag = shift;
722
723     my ($events) = $self->method_lookup('open-ils.trigger.event.find_pending')->run($granularity, $granflag);
724
725     my %groups = ( '*' => [] );
726
727     if($events) {
728         $logger->info("trigger: grouped_events found ".scalar(@$events)." pending events to process");
729     } else {
730         $logger->warn("trigger: grouped_events timed out loading pending events");
731         return \%groups;
732     }
733
734     my @fleshed_events;
735
736     if ($parallel_collect == 1 or @$events == 1) { # use method lookup
737         @fleshed_events = $self->method_lookup('open-ils.trigger.event.gather')->run($events);
738     } else {
739         my $self_multi = OpenSRF::MultiSession->new(
740             app                 => 'open-ils.trigger',
741             cap                 => $parallel_collect,
742             success_handler     => sub {
743                 my $self = shift;
744                 my $req = shift;
745
746                 push @fleshed_events,
747                     map { OpenILS::Application::Trigger::Event->new($_) }
748                     map { $_->content }
749                     @{ $req->{response} };
750             },
751         );
752
753         $self_multi->request( 'open-ils.trigger.event.gather' => $_ ) for ( @$events );
754         $client->status( new OpenSRF::DomainObject::oilsContinueStatus );
755
756         $self_multi->session_wait(1);
757         $client->status( new OpenSRF::DomainObject::oilsContinueStatus );
758     }
759
760     my @invalid; # sync for events with a null grouping field
761     for my  $e (@fleshed_events) {
762         if (my $group = $e->event->event_def->group_field) {
763
764             # split the grouping link steps
765             my @steps = split /\./, $group;
766             my $group_field = pop(@steps); # we didn't flesh to this, it's a field not an object
767
768             my $node;
769             eval {
770                 $node = $e->target;
771                 $node = $node->$_() for ( @steps );
772             };
773
774             unless($node) { # should not get here, but to be safe..
775                 push @invalid, $e;
776                 next;
777             }
778
779             # get the grouping value for the grouping object on this event
780             my $ident_value = $node->$group_field();
781
782             # could by false-y, so check definedness
783             if (!defined($ident_value)) {
784                 push @invalid, $e;
785                 next;
786             }
787
788             if(ref $ident_value) {
789                 my $ident_field = $ident_value->Identity; 
790                 $ident_value = $ident_value->$ident_field()
791             }
792
793             # push this event onto the event+grouping_value stack
794             $groups{$e->event->event_def->id}{$ident_value} ||= [];
795             push @{ $groups{$e->event->event_def->id}{$ident_value} }, $e;
796         } else {
797             # it's a non-grouped event
798             push @{ $groups{'*'} }, $e;
799         }
800     }
801
802     OpenILS::Application::Trigger::Event->invalidate(@invalid) if @invalid;
803
804     return \%groups;
805 }
806 __PACKAGE__->register_method(
807     api_name => 'open-ils.trigger.event.find_pending_by_group',
808     method   => 'grouped_events',
809     api_level=> 1
810 );
811
812 sub run_all_events {
813     my $self = shift;
814     my $client = shift;
815     my $granularity = shift;
816     my $granflag = shift;
817
818     my ($groups) = $self->method_lookup('open-ils.trigger.event.find_pending_by_group')->run($granularity, $granflag);
819     $client->respond({"status" => "found"}) if (keys(%$groups) > 1 || @{$$groups{'*'}});
820
821     my $self_multi;
822     if ($parallel_react > 1 and (keys(%$groups) > 1 || @{$$groups{'*'}} > 1)) {
823         $self_multi = OpenSRF::MultiSession->new(
824             app                   => 'open-ils.trigger',
825             cap                   => $parallel_react,
826             session_hash_function => sub {
827                 my $args = shift;
828                 return $args->{target_id};
829             },
830             success_handler       => sub {
831                 my $me = shift;
832                 my $req = shift;
833                 $client->respond( $req->{response}->[0]->content );
834             }
835         );
836     }
837
838     for my $def ( keys %$groups ) {
839         if ($def eq '*') {
840             $logger->info("trigger: run_all_events firing un-grouped events");
841             for my $event ( @{ $$groups{'*'} } ) {
842                 try {
843                     if ($self_multi) {
844                         $event->environment->{EventProcessor} = undef; # remove circular ref for json encoding
845                         $self_multi->request({target_id => $event->id}, 'open-ils.trigger.event.fire', $event);
846                     } else {
847                         $client->respond(
848                             $self
849                                 ->method_lookup('open-ils.trigger.event.fire')
850                                 ->run($event)
851                         );
852                     }
853                 } catch Error with { 
854                     $logger->error("trigger: event firing failed with ".shift());
855                 };
856             }
857             $logger->info("trigger: run_all_events completed queuing un-grouped events");
858             $client->status( new OpenSRF::DomainObject::oilsContinueStatus );
859
860         } else {
861             my $defgroup = $$groups{$def};
862             $logger->info("trigger: run_all_events firing events for grouped event def=$def");
863             for my $ident ( keys %$defgroup ) {
864                 $logger->info("trigger: run_all_events firing group for grouped event def=$def and grp ident $ident");
865                 try {
866                     if ($self_multi) {
867                         $_->environment->{EventProcessor} = undef for @{$$defgroup{$ident}}; # remove circular ref for json encoding
868                         $self_multi->request({target_id => $ident}, 'open-ils.trigger.event_group.fire', $$defgroup{$ident});
869                     } else {
870                         $client->respond(
871                             $self
872                                 ->method_lookup('open-ils.trigger.event_group.fire')
873                                 ->run($$defgroup{$ident})
874                         );
875                     }
876                     $client->status( new OpenSRF::DomainObject::oilsContinueStatus );
877                 } catch Error with {
878                     $logger->error("trigger: event firing failed with ".shift());
879                 };
880             }
881             $logger->info("trigger: run_all_events completed queuing events for grouped event def=$def");
882         }
883     }
884
885     $self_multi->session_wait(1) if ($self_multi);
886     $logger->info("trigger: run_all_events completed firing events");
887
888     $client->respond_complete();
889     return undef;
890 }
891 __PACKAGE__->register_method(
892     api_name => 'open-ils.trigger.event.run_all_pending',
893     method   => 'run_all_events',
894     api_level=> 1
895 );
896
897
898 1;