]> git.evergreen-ils.org Git - Evergreen.git/blob - Open-ILS/src/perlmods/lib/OpenILS/Application/Trigger.pm
Fix in-transit hold retarget
[Evergreen.git] / Open-ILS / src / perlmods / lib / OpenILS / Application / Trigger.pm
1 package OpenILS::Application::Trigger;
2 use strict; use warnings;
3 use OpenILS::Application;
4 use base qw/OpenILS::Application/;
5
6 use OpenSRF::EX qw/:try/;
7 use OpenSRF::Utils::JSON;
8
9 use OpenSRF::AppSession;
10 use OpenSRF::MultiSession;
11 use OpenSRF::Utils::SettingsClient;
12 use OpenSRF::Utils::Logger qw/$logger/;
13 use OpenSRF::Utils qw/:datetime/;
14
15 use DateTime;
16 use DateTime::Format::ISO8601;
17
18 use OpenILS::Utils::Fieldmapper;
19 use OpenILS::Utils::CStoreEditor q/:funcs/;
20 use OpenILS::Application::Trigger::Event;
21 use OpenILS::Application::Trigger::EventGroup;
22
23
24 my $log = 'OpenSRF::Utils::Logger';
25 my $parallel_collect;
26 my $parallel_react;
27
28 sub initialize {
29
30     my $conf = OpenSRF::Utils::SettingsClient->new;
31     $parallel_collect = $conf->config_value( apps => 'open-ils.trigger' => app_settings => parallel => 'collect') || 1;
32     $parallel_react = $conf->config_value( apps => 'open-ils.trigger' => app_settings => parallel => 'react') || 1;
33
34 }
35 sub child_init {}
36
37 sub create_active_events_for_object {
38     my $self = shift;
39     my $client = shift;
40     my $key = shift;
41     my $target = shift;
42     my $location = shift;
43     my $granularity = shift;
44     my $user_data = shift;
45
46     my $ident = $target->Identity;
47     my $ident_value = $target->$ident();
48
49     my $editor = new_editor(xact=>1);
50
51     my $hooks = $editor->search_action_trigger_hook(
52         { key       => $key,
53           core_type => $target->json_hint
54         }
55     );
56
57     unless(@$hooks) {
58         $editor->rollback;
59         return undef;
60     }
61
62     my %hook_hash = map { ($_->key, $_) } @$hooks;
63
64     my $orgs = $editor->json_query({ from => [ 'actor.org_unit_ancestors' => $location ] });
65     my $defs = $editor->search_action_trigger_event_definition(
66         { hook   => [ keys %hook_hash ],
67           owner  => [ map { $_->{id} } @$orgs  ],
68           active => 't'
69         }
70     );
71
72     for my $def ( @$defs ) {
73         next if ($granularity && $def->granularity ne $granularity );
74
75         if ($def->usr_field && $def->opt_in_setting) {
76             my $ufield = $def->usr_field;
77             my $uid = $target->$ufield;
78             $uid = $uid->id if (ref $uid); # fleshed user object, unflesh it
79
80             my $opt_in_setting = $editor->search_actor_user_setting(
81                 { usr   => $uid,
82                   name  => $def->opt_in_setting,
83                   value => 'true'
84                 }
85             );
86
87             next unless (@$opt_in_setting);
88         }
89
90         my $date = DateTime->now;
91
92         if ($hook_hash{$def->hook}->passive eq 'f') {
93
94             if (my $dfield = $def->delay_field) {
95                 if ($target->$dfield()) {
96                     $date = DateTime::Format::ISO8601->new->parse_datetime( cleanse_ISO8601($target->$dfield) );
97                 } else {
98                     next;
99                 }
100             }
101
102             $date->add( seconds => interval_to_seconds($def->delay) );
103         }
104
105         my $event = Fieldmapper::action_trigger::event->new();
106         $event->target( $ident_value );
107         $event->event_def( $def->id );
108         $event->run_time( $date->strftime( '%F %T%z' ) );
109         $event->user_data( OpenSRF::Utils::JSON->perl2JSON($user_data) ) if (defined($user_data));
110
111         $editor->create_action_trigger_event( $event );
112
113         $client->respond( $event->id );
114     }
115
116     $editor->commit;
117
118     return undef;
119 }
120 __PACKAGE__->register_method(
121     api_name => 'open-ils.trigger.event.autocreate',
122     method   => 'create_active_events_for_object',
123     api_level=> 1,
124     stream   => 1,
125     argc     => 3
126 );
127
128 sub create_event_for_object_and_def {
129     my $self = shift;
130     my $client = shift;
131     my $definitions = shift;
132     my $target = shift;
133     my $location = shift;
134     my $user_data = shift;
135
136     my $ident = $target->Identity;
137     my $ident_value = $target->$ident();
138
139     my @active = ($self->api_name =~ /inactive/o) ? () : ( active => 't' );
140
141     my $editor = new_editor(xact=>1);
142
143     my $orgs = $editor->json_query({ from => [ 'actor.org_unit_ancestors' => $location ] });
144     my $defs = $editor->search_action_trigger_event_definition(
145         { id => $definitions,
146           owner  => [ map { $_->{id} } @$orgs  ],
147           @active
148         }
149     );
150
151     my $hooks = $editor->search_action_trigger_hook(
152         { key       => [ map { $_->hook } @$defs ],
153           core_type => $target->json_hint
154         }
155     );
156
157     my %hook_hash = map { ($_->key, $_) } @$hooks;
158
159     for my $def ( @$defs ) {
160
161         if ($def->usr_field && $def->opt_in_setting) {
162             my $ufield = $def->usr_field;
163             my $uid = $target->$ufield;
164             $uid = $uid->id if (ref $uid); # fleshed user object, unflesh it
165
166             my $opt_in_setting = $editor->search_actor_user_setting(
167                 { usr   => $uid,
168                   name  => $def->opt_in_setting,
169                   value => 'true'
170                 }
171             );
172
173             next unless (@$opt_in_setting);
174         }
175
176         my $date = DateTime->now;
177
178         if ($hook_hash{$def->hook}->passive eq 'f') {
179
180             if (my $dfield = $def->delay_field) {
181                 if ($target->$dfield()) {
182                     $date = DateTime::Format::ISO8601->new->parse_datetime( cleanse_ISO8601($target->$dfield) );
183                 } else {
184                     next;
185                 }
186             }
187
188             $date->add( seconds => interval_to_seconds($def->delay) );
189         }
190
191         my $event = Fieldmapper::action_trigger::event->new();
192         $event->target( $ident_value );
193         $event->event_def( $def->id );
194         $event->run_time( $date->strftime( '%F %T%z' ) );
195         $event->user_data( OpenSRF::Utils::JSON->perl2JSON($user_data) ) if (defined($user_data));
196
197         $editor->create_action_trigger_event( $event );
198
199         $client->respond( $event->id );
200     }
201
202     $editor->commit;
203
204     return undef;
205 }
206 __PACKAGE__->register_method(
207     api_name => 'open-ils.trigger.event.autocreate.by_definition',
208     method   => 'create_event_for_object_and_def',
209     api_level=> 1,
210     stream   => 1,
211     argc     => 3
212 );
213 __PACKAGE__->register_method(
214     api_name => 'open-ils.trigger.event.autocreate.by_definition.include_inactive',
215     method   => 'create_event_for_object_and_def',
216     api_level=> 1,
217     stream   => 1,
218     argc     => 3
219 );
220
221
222 # Retrieves events by object, or object type + filter
223 #  $object : a target object or object type (class hint)
224 #
225 #  $filter : an optional hash of filters ... top level keys:
226 #     event
227 #        filters on the atev objects, such as states or null-ness of timing
228 #        fields. contains the effective default of:
229 #          { state => 'pending' }
230 #        an example, which overrides the default, and will find
231 #        stale 'found' events:
232 #          { state => 'found', update_time => { '<' => 'yesterday' } }
233 #
234 #      event_def
235 #        filters on the atevdef object. contains the effective default of:
236 #          { active => 't' }
237 #
238 #      hook
239 #        filters on the hook object. no defaults, but there is a pinned,
240 #        unchangeable filter based on the passed hint or object type (see
241 #        $object above). an example for finding passive events:
242 #          { passive => 't' }
243 #
244 #     target
245 #        filters against the target field on the event. this can contain
246 #        either an array of target ids (if you passed an object type, and
247 #        not an object) or can contain a json_query that will return exactly
248 #        a list of target-type ids.  If you pass an object, the pkey value of
249 #        that object will be used as a filter in addition to the filter passed
250 #        in here.  example filter for circs of user 1234 that are open:
251 #          { select => { circ => ['id'] },
252 #            from => 'circ',
253 #            where => {
254 #              usr => 1234,
255 #              checkin_time => undef, 
256 #              '-or' => [
257 #                { stop_fines => undef },
258 #                { stop_fines => { 'not in' => ['LOST','LONGOVERDUE','CLAIMSRETURNED'] } }
259 #              ]
260 #            }
261
262 sub events_by_target {
263     my $self = shift;
264     my $client = shift;
265     my $object = shift;
266     my $filter = shift || {};
267     my $flesh_fields = shift || {};
268     my $flesh_depth = shift || 1;
269
270     my $obj_class = ref($object) || _fm_class_by_hint($object);
271     my $obj_hint = ref($object) ? _fm_hint_by_class(ref($object)) : $object;
272
273     my $object_ident_field = $obj_class->Identity;
274
275     my $query = {
276         select => { atev => ["id"] },
277         from   => {
278             atev => {
279                 atevdef => {
280                     field => "id",
281                     fkey => "event_def",
282                     join => {
283                         ath => { field => "key", fkey => "hook" }
284                     }
285                 }
286             }
287         },
288         where  => {
289             "+ath"  => { core_type => $obj_hint },
290             "+atevdef" => { active => 't' },
291             "+atev" => { state => 'pending' }
292         },
293         order_by => { "atev" => [ 'run_time', 'add_time' ] }
294     };
295
296     $query->{limit} = $filter->{limit} if defined $filter->{limit};
297     $query->{offset} = $filter->{offset} if defined $filter->{offset};
298     $query->{order_by} = $filter->{order_by} if defined $filter->{order_by};
299
300
301     # allow multiple 'target' filters
302     $query->{where}->{'+atev'}->{'-and'} = [];
303
304     # if we got a real object, filter on its pkey value
305     if (ref($object)) { # pass an object, require that target
306         push @{ $query->{where}->{'+atev'}->{'-and'} },
307             { target => $object->$object_ident_field }
308     }
309
310     # we have a fancy complex target filter or a list of target ids
311     if ($$filter{target}) {
312         push @{ $query->{where}->{'+atev'}->{'-and'} },
313             { target => {in => $$filter{target} } };
314     }
315
316     # pass no target filter or object, you get no events
317     if (!@{ $query->{where}->{'+atev'}->{'-and'} }) {
318         return undef; 
319     }
320
321     # any hook filters, other than the required core_type filter
322     if ($$filter{hook}) {
323         $query->{where}->{'+ath'}->{$_} = $$filter{hook}{$_}
324             for (grep { $_ ne 'core_type' } keys %{$$filter{hook}});
325     }
326
327     # any event_def filters.  defaults to { active => 't' }
328     if ($$filter{event_def}) {
329         $query->{where}->{'+atevdef'}->{$_} = $$filter{event_def}{$_}
330             for (keys %{$$filter{event_def}});
331     }
332
333     # any event filters.  defaults to { state => 'pending' }.
334     # don't overwrite '-and' used for multiple target filters above
335     if ($$filter{event}) {
336         $query->{where}->{'+atev'}->{$_} = $$filter{event}{$_}
337             for (grep { $_ ne '-and' } keys %{$$filter{event}});
338     }
339
340     my $e = new_editor(xact=>1);
341
342     my $events = $e->json_query($query);
343
344     $flesh_fields->{atev} = ['event_def'] unless $flesh_fields->{atev};
345
346     for my $id (@$events) {
347         my $event = $e->retrieve_action_trigger_event([
348             $id->{id},
349             {flesh => $flesh_depth, flesh_fields => $flesh_fields}
350         ]);
351
352         (my $meth = $obj_class) =~ s/^Fieldmapper:://o;
353         $meth =~ s/::/_/go;
354         $meth = 'retrieve_'.$meth;
355
356         $event->target($e->$meth($event->target));
357         $client->respond($event);
358     }
359
360     return undef;
361 }
362 __PACKAGE__->register_method(
363     api_name => 'open-ils.trigger.events_by_target',
364     method   => 'events_by_target',
365     api_level=> 1,
366     stream   => 1,
367     argc     => 2
368 );
369  
370 sub _fm_hint_by_class {
371     my $class = shift;
372     return Fieldmapper->publish_fieldmapper->{$class}->{hint};
373 }
374
375 sub _fm_class_by_hint {
376     my $hint = shift;
377
378     my ($class) = grep {
379         Fieldmapper->publish_fieldmapper->{$_}->{hint} eq $hint
380     } keys %{ Fieldmapper->publish_fieldmapper };
381
382     return $class;
383 }
384
385 sub create_batch_events {
386     my $self = shift;
387     my $client = shift;
388     my $key = shift;
389     my $location_field = shift; # where to look for event_def.owner filtering ... circ_lib, for instance, where hook.core_type = circ
390     my $filter = shift || {};
391     my $granularity = shift;
392     my $user_data = shift;
393
394     my $active = ($self->api_name =~ /active/o) ? 1 : 0;
395     if ($active && !keys(%$filter)) {
396         $log->info("Active batch event creation requires a target filter but none was supplied to create_batch_events");
397         return undef;
398     }
399
400     return undef unless ($key && $location_field);
401
402     my $editor = new_editor(xact=>1);
403     my $hooks = $editor->search_action_trigger_hook(
404         { passive => $active ? 'f' : 't', key => $key }
405     );
406
407     my %hook_hash = map { ($_->key, $_) } @$hooks;
408
409     my $defs = $editor->search_action_trigger_event_definition(
410         { hook   => [ keys %hook_hash ], active => 't' },
411     );
412
413     my $orig_filter_and = [];
414     if ($$filter{'-and'}) {
415         for my $f ( @{ $$filter{'-and'} } ) {
416             push @$orig_filter_and, $f;
417         }
418     }
419
420     for my $def ( @$defs ) {
421         next if ($granularity && $def->granularity ne $granularity );
422
423         my $date = DateTime->now->subtract( seconds => interval_to_seconds($def->delay) );
424
425         # we may need to do some work to backport this to 1.2
426         $filter->{ $location_field } = { 'in' =>
427             {
428                 select  => { aou => [{ column => 'id', transform => 'actor.org_unit_descendants', result_field => 'id' }] },
429                 from    => 'aou',
430                 where   => { id => $def->owner }
431             }
432         };
433
434         my $run_time = 'now';
435         if ($active) {
436             $run_time = 
437                 DateTime
438                     ->now
439                     ->add( seconds => interval_to_seconds($def->delay) )
440                     ->strftime( '%F %T%z' );
441         } else {
442             if ($def->max_delay) {
443                 my @times = sort {$a <=> $b} interval_to_seconds($def->delay), interval_to_seconds($def->max_delay);
444                 $filter->{ $def->delay_field } = {
445                     'between' => [
446                         DateTime->now->subtract( seconds => $times[1] )->strftime( '%F %T%z' ),
447                         DateTime->now->subtract( seconds => $times[0] )->strftime( '%F %T%z' )
448                     ]
449                 };
450             } else {
451                 $filter->{ $def->delay_field } = {
452                     '<=' => DateTime->now->subtract( seconds => interval_to_seconds($def->delay) )->strftime( '%F %T%z' )
453                 };
454             }
455         }
456
457         my $class = _fm_class_by_hint($hook_hash{$def->hook}->core_type);
458
459         # filter where this target has an event (and it's pending, for active hooks)
460         $$filter{'-and'} = [];
461         for my $f ( @$orig_filter_and ) {
462             push @{ $$filter{'-and'} }, $f;
463         }
464
465         my $join = { 'join' => {
466             atev => {
467                 field => 'target',
468                 fkey => $class->Identity,
469                 type => 'left',
470                 filter => { event_def => $def->id }
471             }
472         }};
473
474         push @{ $filter->{'-and'} }, { '+atev' => { id => undef } };
475
476         if ($def->usr_field && $def->opt_in_setting) {
477             push @{ $filter->{'-and'} }, {
478                 '-exists' => {
479                     from  => 'aus',
480                     where => {
481                         name => $def->opt_in_setting,
482                         usr  => { '=' => { '+' . $hook_hash{$def->hook}->core_type => $def->usr_field } },
483                         value=> 'true'
484                     }
485                 }
486             };
487         }
488
489         $class =~ s/^Fieldmapper:://o;
490         $class =~ s/::/_/go;
491         my $method = 'search_'. $class;
492
493         # for cleaner logging
494         my $def_id = $def->id;
495         my $hook = $def->hook;
496
497         $logger->info("trigger: create_batch_events() collecting object IDs for def=$def_id / hook=$hook");
498
499         my $object_ids = $editor->$method( [$filter, $join], {idlist => 1, timeout => 10800} );
500
501         if($object_ids) {
502             $logger->info("trigger: create_batch_events() fetched ".scalar(@$object_ids)." object IDs for def=$def_id / hook=$hook");
503         } else {
504             $logger->warn("trigger: create_batch_events() timeout occurred collecting object IDs for def=$def_id / hook=$hook");
505         }
506
507         for my $o_id (@$object_ids) {
508
509             my $event = Fieldmapper::action_trigger::event->new();
510             $event->target( $o_id );
511             $event->event_def( $def->id );
512             $event->run_time( $run_time );
513             $event->user_data( OpenSRF::Utils::JSON->perl2JSON($user_data) ) if (defined($user_data));
514
515             $editor->create_action_trigger_event( $event );
516
517             $client->respond( $event->id );
518         }
519         
520         $logger->info("trigger: create_batch_events() successfully created events for def=$def_id / hook=$hook");
521     }
522
523     $logger->info("trigger: create_batch_events() done creating events");
524
525     $editor->commit;
526
527     return undef;
528 }
529 __PACKAGE__->register_method(
530     api_name => 'open-ils.trigger.passive.event.autocreate.batch',
531     method   => 'create_batch_events',
532     api_level=> 1,
533     stream   => 1,
534     argc     => 2
535 );
536
537 __PACKAGE__->register_method(
538     api_name => 'open-ils.trigger.active.event.autocreate.batch',
539     method   => 'create_batch_events',
540     api_level=> 1,
541     stream   => 1,
542     argc     => 2
543 );
544
545 sub fire_single_event {
546     my $self = shift;
547     my $client = shift;
548     my $event_id = shift;
549
550     my $e = OpenILS::Application::Trigger::Event->new($event_id);
551
552     if ($e->validate->valid) {
553         $logger->info("trigger: Event is valid, reacting...");
554         $e->react->cleanup;
555     }
556
557     $e->editor->disconnect;
558     OpenILS::Application::Trigger::Event->ClearObjectCache();
559
560     return {
561         valid     => $e->valid,
562         reacted   => $e->reacted,
563         cleanedup => $e->cleanedup,
564         event     => $e->event
565     };
566 }
567 __PACKAGE__->register_method(
568     api_name => 'open-ils.trigger.event.fire',
569     method   => 'fire_single_event',
570     api_level=> 1,
571     argc     => 1
572 );
573
574 sub fire_event_group {
575     my $self = shift;
576     my $client = shift;
577     my $events = shift;
578
579     my $e = OpenILS::Application::Trigger::EventGroup->new(@$events);
580
581     if ($e->validate->valid) {
582         $logger->info("trigger: Event group is valid, reacting...");
583         $e->react->cleanup;
584     }
585
586     $e->editor->disconnect;
587     OpenILS::Application::Trigger::Event->ClearObjectCache();
588
589     return {
590         valid     => $e->valid,
591         reacted   => $e->reacted,
592         cleanedup => $e->cleanedup,
593         events    => [map { $_->event } @{$e->events}]
594     };
595 }
596 __PACKAGE__->register_method(
597     api_name => 'open-ils.trigger.event_group.fire',
598     method   => 'fire_event_group',
599     api_level=> 1,
600     argc     => 1
601 );
602
603 sub revalidate_event_group_test {
604     my $self = shift;
605     my $client = shift;
606     my $events = shift;
607
608     my $e = OpenILS::Application::Trigger::EventGroup->new_nochanges(@$events);
609
610     my $result = $e->revalidate_test;
611
612     $e->editor->disconnect;
613     OpenILS::Application::Trigger::Event->ClearObjectCache();
614
615     return $result;
616 }
617 __PACKAGE__->register_method(
618     api_name => 'open-ils.trigger.event_group.revalidate.test',
619     method   => 'revalidate_event_group_test',
620     api_level=> 1,
621     argc     => 1,
622     signature => {
623         desc => q/revalidate a group of events.
624         This does not actually update the events (so there will be no change
625         of atev.state or anything else in the database, unless an event's
626         validator makes changes out-of-band).
627         
628         This returns an array of valid event IDs.
629         /,
630         params => [
631             {name => "events", type => "array", desc => "list of event ids"}
632         ]
633     }
634 );
635
636
637 sub pending_events {
638     my $self = shift;
639     my $client = shift;
640     my $granularity = shift;
641     my $granflag = shift;
642
643     my $query = [{ state => 'pending', run_time => {'<' => 'now'} }, { order_by => { atev => [ qw/run_time add_time/] }, 'join' => 'atevdef' }];
644
645     if (defined $granularity) {
646         if ($granflag) {
647             $query->[0]->{'+atevdef'} = {granularity => $granularity};
648         } else {
649             $query->[0]->{'+atevdef'} = {'-or' => [ {granularity => $granularity}, {granularity => undef} ] };
650         }
651     } else {
652         $query->[0]->{'+atevdef'} = {granularity => undef};
653     }
654
655     return new_editor(xact=>1)->search_action_trigger_event(
656         $query, { idlist=> 1, timeout => 7200, substream => 1 }
657     );
658 }
659 __PACKAGE__->register_method(
660     api_name => 'open-ils.trigger.event.find_pending',
661     method   => 'pending_events',
662     api_level=> 1
663 );
664
665 sub gather_events {
666     my $self = shift;
667     my $client = shift;
668     my $e_ids = shift;
669
670     $e_ids = [$e_ids] if (!ref($e_ids));
671
672     my @events;
673     for my $e_id (@$e_ids) {
674         my $e;
675         try {
676            $e = OpenILS::Application::Trigger::Event->new($e_id);
677         } catch Error with {
678             $logger->error("trigger: Event creation failed with ".shift());
679         };
680
681         next if !$e or $e->event->state eq 'invalid'; 
682
683         try {
684             $e->build_environment;
685         } catch Error with {
686             $logger->error("trigger: Event environment building failed with ".shift());
687         };
688
689         $e->editor->disconnect;
690         $e->environment->{EventProcessor} = undef; # remove circular ref for json encoding
691         $client->respond($e);
692     }
693
694     OpenILS::Application::Trigger::Event->ClearObjectCache();
695
696     return undef;
697 }
698 __PACKAGE__->register_method(
699     api_name => 'open-ils.trigger.event.gather',
700     method   => 'gather_events',
701     api_level=> 1
702 );
703
704 sub grouped_events {
705     my $self = shift;
706     my $client = shift;
707     my $granularity = shift;
708     my $granflag = shift;
709
710     my ($events) = $self->method_lookup('open-ils.trigger.event.find_pending')->run($granularity, $granflag);
711
712     my %groups = ( '*' => [] );
713
714     if($events) {
715         $logger->info("trigger: grouped_events found ".scalar(@$events)." pending events to process");
716     } else {
717         $logger->warn("trigger: grouped_events timed out loading pending events");
718         return \%groups;
719     }
720
721     my @fleshed_events;
722
723     if ($parallel_collect == 1 or @$events == 1) { # use method lookup
724         @fleshed_events = $self->method_lookup('open-ils.trigger.event.gather')->run($events);
725     } else {
726         my $self_multi = OpenSRF::MultiSession->new(
727             app                 => 'open-ils.trigger',
728             cap                 => $parallel_collect,
729             success_handler     => sub {
730                 my $self = shift;
731                 my $req = shift;
732
733                 push @fleshed_events,
734                     map { OpenILS::Application::Trigger::Event->new($_) }
735                     map { $_->content }
736                     @{ $req->{response} };
737             },
738         );
739
740         $self_multi->request( 'open-ils.trigger.event.gather' => $_ ) for ( @$events );
741         $client->status( new OpenSRF::DomainObject::oilsContinueStatus );
742
743         $self_multi->session_wait(1);
744         $client->status( new OpenSRF::DomainObject::oilsContinueStatus );
745     }
746
747     for my  $e (@fleshed_events) {
748         if (my $group = $e->event->event_def->group_field) {
749
750             # split the grouping link steps
751             my @steps = split /\./, $group;
752             my $group_field = pop(@steps); # we didn't flesh to this, it's a field not an object
753
754             my $node;
755             eval {
756                 $node = $e->target;
757                 $node = $node->$_() for ( @steps );
758             };
759
760             unless($node) { # should not get here, but to be safe..
761                 $e->update_state('invalid');
762                 next;
763             }
764
765             # get the grouping value for the grouping object on this event
766             my $ident_value = $node->$group_field();
767             if(ref $ident_value) {
768                 my $ident_field = $ident_value->Identity; 
769                 $ident_value = $ident_value->$ident_field()
770             }
771
772             # push this event onto the event+grouping_value stack
773             $groups{$e->event->event_def->id}{$ident_value} ||= [];
774             push @{ $groups{$e->event->event_def->id}{$ident_value} }, $e;
775         } else {
776             # it's a non-grouped event
777             push @{ $groups{'*'} }, $e;
778         }
779     }
780
781
782     return \%groups;
783 }
784 __PACKAGE__->register_method(
785     api_name => 'open-ils.trigger.event.find_pending_by_group',
786     method   => 'grouped_events',
787     api_level=> 1
788 );
789
790 sub run_all_events {
791     my $self = shift;
792     my $client = shift;
793     my $granularity = shift;
794     my $granflag = shift;
795
796     my ($groups) = $self->method_lookup('open-ils.trigger.event.find_pending_by_group')->run($granularity, $granflag);
797     $client->respond({"status" => "found"}) if (keys(%$groups) > 1 || @{$$groups{'*'}});
798
799     my $self_multi;
800     if ($parallel_react > 1 and (keys(%$groups) > 1 || @{$$groups{'*'}} > 1)) {
801         $self_multi = OpenSRF::MultiSession->new(
802             app                   => 'open-ils.trigger',
803             cap                   => $parallel_react,
804             session_hash_function => sub {
805                 my $args = shift;
806                 return $args->{target_id};
807             },
808             success_handler       => sub {
809                 my $me = shift;
810                 my $req = shift;
811                 $client->respond( $req->{response}->[0]->content );
812             }
813         );
814     }
815
816     for my $def ( keys %$groups ) {
817         if ($def eq '*') {
818             $logger->info("trigger: run_all_events firing un-grouped events");
819             for my $event ( @{ $$groups{'*'} } ) {
820                 try {
821                     if ($self_multi) {
822                         $event->environment->{EventProcessor} = undef; # remove circular ref for json encoding
823                         $self_multi->request({target_id => $event->id}, 'open-ils.trigger.event.fire', $event);
824                     } else {
825                         $client->respond(
826                             $self
827                                 ->method_lookup('open-ils.trigger.event.fire')
828                                 ->run($event)
829                         );
830                     }
831                 } catch Error with { 
832                     $logger->error("trigger: event firing failed with ".shift());
833                 };
834             }
835             $logger->info("trigger: run_all_events completed queuing un-grouped events");
836             $client->status( new OpenSRF::DomainObject::oilsContinueStatus );
837
838         } else {
839             my $defgroup = $$groups{$def};
840             $logger->info("trigger: run_all_events firing events for grouped event def=$def");
841             for my $ident ( keys %$defgroup ) {
842                 $logger->info("trigger: run_all_events firing group for grouped event def=$def and grp ident $ident");
843                 try {
844                     if ($self_multi) {
845                         $_->environment->{EventProcessor} = undef for @{$$defgroup{$ident}}; # remove circular ref for json encoding
846                         $self_multi->request({target_id => $ident}, 'open-ils.trigger.event_group.fire', $$defgroup{$ident});
847                     } else {
848                         $client->respond(
849                             $self
850                                 ->method_lookup('open-ils.trigger.event_group.fire')
851                                 ->run($$defgroup{$ident})
852                         );
853                     }
854                     $client->status( new OpenSRF::DomainObject::oilsContinueStatus );
855                 } catch Error with {
856                     $logger->error("trigger: event firing failed with ".shift());
857                 };
858             }
859             $logger->info("trigger: run_all_events completed queuing events for grouped event def=$def");
860         }
861     }
862
863     $self_multi->session_wait(1) if ($self_multi);
864     $logger->info("trigger: run_all_events completed firing events");
865
866     $client->respond_complete();
867     return undef;
868 }
869 __PACKAGE__->register_method(
870     api_name => 'open-ils.trigger.event.run_all_pending',
871     method   => 'run_all_events',
872     api_level=> 1
873 );
874
875
876 1;