]> git.evergreen-ils.org Git - Evergreen.git/blob - Open-ILS/src/perlmods/lib/OpenILS/Application/Trigger.pm
LP#1322341: Count part holds on records
[Evergreen.git] / Open-ILS / src / perlmods / lib / OpenILS / Application / Trigger.pm
1 package OpenILS::Application::Trigger;
2 use strict; use warnings;
3 use OpenILS::Application;
4 use base qw/OpenILS::Application/;
5
6 use OpenSRF::EX qw/:try/;
7 use OpenSRF::Utils::JSON;
8
9 use OpenSRF::AppSession;
10 use OpenSRF::MultiSession;
11 use OpenSRF::Utils::SettingsClient;
12 use OpenSRF::Utils::Logger qw/$logger/;
13 use OpenSRF::Utils qw/:datetime/;
14
15 use DateTime;
16 use DateTime::Format::ISO8601;
17
18 use OpenILS::Utils::Fieldmapper;
19 use OpenILS::Utils::CStoreEditor q/:funcs/;
20 use OpenILS::Application::Trigger::Event;
21 use OpenILS::Application::Trigger::EventGroup;
22
23
24 my $log = 'OpenSRF::Utils::Logger';
25 my $parallel_collect;
26 my $parallel_react;
27
28 sub initialize {
29
30     my $conf = OpenSRF::Utils::SettingsClient->new;
31     $parallel_collect = $conf->config_value( apps => 'open-ils.trigger' => app_settings => parallel => 'collect') || 1;
32     $parallel_react = $conf->config_value( apps => 'open-ils.trigger' => app_settings => parallel => 'react') || 1;
33
34 }
35 sub child_init {}
36
37 sub create_active_events_for_object {
38     my $self = shift;
39     my $client = shift;
40     my $key = shift;
41     my $target = shift;
42     my $location = shift;
43     my $granularity = shift;
44     my $user_data = shift;
45
46     my $ident = $target->Identity;
47     my $ident_value = $target->$ident();
48
49     my $editor = new_editor(xact=>1);
50
51     my $hooks = $editor->search_action_trigger_hook(
52         { key       => $key,
53           core_type => $target->json_hint
54         }
55     );
56
57     unless(@$hooks) {
58         $editor->rollback;
59         return undef;
60     }
61
62     my %hook_hash = map { ($_->key, $_) } @$hooks;
63
64     my $orgs = $editor->json_query({ from => [ 'actor.org_unit_ancestors' => $location ] });
65     my $defs = $editor->search_action_trigger_event_definition(
66         { hook   => [ keys %hook_hash ],
67           owner  => [ map { $_->{id} } @$orgs  ],
68           active => 't'
69         }
70     );
71
72     for my $def ( @$defs ) {
73         next if ($granularity && $def->granularity ne $granularity );
74
75         if ($def->usr_field && $def->opt_in_setting) {
76             my $ufield = $def->usr_field;
77             my $uid = $target->$ufield;
78             $uid = $uid->id if (ref $uid); # fleshed user object, unflesh it
79
80             my $opt_in_setting = $editor->search_actor_user_setting(
81                 { usr   => $uid,
82                   name  => $def->opt_in_setting,
83                   value => 'true'
84                 }
85             );
86
87             next unless (@$opt_in_setting);
88         }
89
90         my $date = DateTime->now;
91
92         if ($hook_hash{$def->hook}->passive eq 'f') {
93
94             if (my $dfield = $def->delay_field) {
95                 if ($target->$dfield()) {
96                     $date = DateTime::Format::ISO8601->new->parse_datetime( cleanse_ISO8601($target->$dfield) );
97                 } else {
98                     next;
99                 }
100             }
101
102             $date->add( seconds => interval_to_seconds($def->delay) );
103         }
104
105         my $event = Fieldmapper::action_trigger::event->new();
106         $event->target( $ident_value );
107         $event->event_def( $def->id );
108         $event->run_time( $date->strftime( '%F %T%z' ) );
109         $event->user_data( OpenSRF::Utils::JSON->perl2JSON($user_data) ) if (defined($user_data));
110
111         $editor->create_action_trigger_event( $event );
112
113         $client->respond( $event->id );
114     }
115
116     $editor->commit;
117
118     return undef;
119 }
120 __PACKAGE__->register_method(
121     api_name => 'open-ils.trigger.event.autocreate',
122     method   => 'create_active_events_for_object',
123     api_level=> 1,
124     stream   => 1,
125     argc     => 3
126 );
127
128 sub create_event_for_object_and_def {
129     my $self = shift;
130     my $client = shift;
131     my $definitions = shift;
132     my $target = shift;
133     my $location = shift;
134     my $user_data = shift;
135
136     my $ident = $target->Identity;
137     my $ident_value = $target->$ident();
138
139     my @active = ($self->api_name =~ /inactive/o) ? () : ( active => 't' );
140
141     my $editor = new_editor(xact=>1);
142
143     my $orgs = $editor->json_query({ from => [ 'actor.org_unit_ancestors' => $location ] });
144     my $defs = $editor->search_action_trigger_event_definition(
145         { id => $definitions,
146           owner  => [ map { $_->{id} } @$orgs  ],
147           @active
148         }
149     );
150
151     my $hooks = $editor->search_action_trigger_hook(
152         { key       => [ map { $_->hook } @$defs ],
153           core_type => $target->json_hint
154         }
155     );
156
157     my %hook_hash = map { ($_->key, $_) } @$hooks;
158
159     for my $def ( @$defs ) {
160
161         if ($def->usr_field && $def->opt_in_setting) {
162             my $ufield = $def->usr_field;
163             my $uid = $target->$ufield;
164             $uid = $uid->id if (ref $uid); # fleshed user object, unflesh it
165
166             my $opt_in_setting = $editor->search_actor_user_setting(
167                 { usr   => $uid,
168                   name  => $def->opt_in_setting,
169                   value => 'true'
170                 }
171             );
172
173             next unless (@$opt_in_setting);
174         }
175
176         my $date = DateTime->now;
177
178         if ($hook_hash{$def->hook}->passive eq 'f') {
179
180             if (my $dfield = $def->delay_field) {
181                 if ($target->$dfield()) {
182                     $date = DateTime::Format::ISO8601->new->parse_datetime( cleanse_ISO8601($target->$dfield) );
183                 } else {
184                     next;
185                 }
186             }
187
188             $date->add( seconds => interval_to_seconds($def->delay) );
189         }
190
191         my $event = Fieldmapper::action_trigger::event->new();
192         $event->target( $ident_value );
193         $event->event_def( $def->id );
194         $event->run_time( $date->strftime( '%F %T%z' ) );
195         $event->user_data( OpenSRF::Utils::JSON->perl2JSON($user_data) ) if (defined($user_data));
196
197         $editor->create_action_trigger_event( $event );
198
199         $client->respond( $event->id );
200     }
201
202     $editor->commit;
203
204     return undef;
205 }
206 __PACKAGE__->register_method(
207     api_name => 'open-ils.trigger.event.autocreate.by_definition',
208     method   => 'create_event_for_object_and_def',
209     api_level=> 1,
210     stream   => 1,
211     argc     => 3
212 );
213 __PACKAGE__->register_method(
214     api_name => 'open-ils.trigger.event.autocreate.by_definition.include_inactive',
215     method   => 'create_event_for_object_and_def',
216     api_level=> 1,
217     stream   => 1,
218     argc     => 3
219 );
220
221
222 # Retrieves events by object, or object type + filter
223 #  $object : a target object or object type (class hint)
224 #
225 #  $filter : an optional hash of filters ... top level keys:
226 #     event
227 #        filters on the atev objects, such as states or null-ness of timing
228 #        fields. contains the effective default of:
229 #          { state => 'pending' }
230 #        an example, which overrides the default, and will find
231 #        stale 'found' events:
232 #          { state => 'found', update_time => { '<' => 'yesterday' } }
233 #
234 #      event_def
235 #        filters on the atevdef object. contains the effective default of:
236 #          { active => 't' }
237 #
238 #      hook
239 #        filters on the hook object. no defaults, but there is a pinned,
240 #        unchangeable filter based on the passed hint or object type (see
241 #        $object above). an example for finding passive events:
242 #          { passive => 't' }
243 #
244 #     target
245 #        filters against the target field on the event. this can contain
246 #        either an array of target ids (if you passed an object type, and
247 #        not an object) or can contain a json_query that will return exactly
248 #        a list of target-type ids.  If you pass an object, the pkey value of
249 #        that object will be used as a filter in addition to the filter passed
250 #        in here.  example filter for circs of user 1234 that are open:
251 #          { select => { circ => ['id'] },
252 #            from => 'circ',
253 #            where => {
254 #              usr => 1234,
255 #              checkin_time => undef, 
256 #              '-or' => [
257 #                { stop_fines => undef },
258 #                { stop_fines => { 'not in' => ['LOST','LONGOVERDUE','CLAIMSRETURNED'] } }
259 #              ]
260 #            }
261
262 sub events_by_target {
263     my $self = shift;
264     my $client = shift;
265     my $object = shift;
266     my $filter = shift || {};
267     my $flesh_fields = shift || {};
268     my $flesh_depth = shift || 1;
269
270     my $obj_class = ref($object) || _fm_class_by_hint($object);
271     my $obj_hint = ref($object) ? _fm_hint_by_class(ref($object)) : $object;
272
273     my $object_ident_field = $obj_class->Identity;
274
275     my $query = {
276         select => { atev => ["id"] },
277         from   => {
278             atev => {
279                 atevdef => {
280                     field => "id",
281                     fkey => "event_def",
282                     join => {
283                         ath => { field => "key", fkey => "hook" }
284                     }
285                 }
286             }
287         },
288         where  => {
289             "+ath"  => { core_type => $obj_hint },
290             "+atevdef" => { active => 't' },
291             "+atev" => { state => 'pending' }
292         },
293         order_by => { "atev" => [ 'run_time', 'add_time' ] }
294     };
295
296     $query->{limit} = $filter->{limit} if defined $filter->{limit};
297     $query->{offset} = $filter->{offset} if defined $filter->{offset};
298     $query->{order_by} = $filter->{order_by} if defined $filter->{order_by};
299
300
301     # allow multiple 'target' filters
302     $query->{where}->{'+atev'}->{'-and'} = [];
303
304     # if we got a real object, filter on its pkey value
305     if (ref($object)) { # pass an object, require that target
306         push @{ $query->{where}->{'+atev'}->{'-and'} },
307             { target => $object->$object_ident_field }
308     }
309
310     # we have a fancy complex target filter or a list of target ids
311     if ($$filter{target}) {
312         push @{ $query->{where}->{'+atev'}->{'-and'} },
313             { target => {in => $$filter{target} } };
314     }
315
316     # pass no target filter or object, you get no events
317     if (!@{ $query->{where}->{'+atev'}->{'-and'} }) {
318         return undef; 
319     }
320
321     # any hook filters, other than the required core_type filter
322     if ($$filter{hook}) {
323         $query->{where}->{'+ath'}->{$_} = $$filter{hook}{$_}
324             for (grep { $_ ne 'core_type' } keys %{$$filter{hook}});
325     }
326
327     # any event_def filters.  defaults to { active => 't' }
328     if ($$filter{event_def}) {
329         $query->{where}->{'+atevdef'}->{$_} = $$filter{event_def}{$_}
330             for (keys %{$$filter{event_def}});
331     }
332
333     # any event filters.  defaults to { state => 'pending' }.
334     # don't overwrite '-and' used for multiple target filters above
335     if ($$filter{event}) {
336         $query->{where}->{'+atev'}->{$_} = $$filter{event}{$_}
337             for (grep { $_ ne '-and' } keys %{$$filter{event}});
338     }
339
340     my $e = new_editor(xact=>1);
341
342     my $events = $e->json_query($query);
343
344     $flesh_fields->{atev} = ['event_def'] unless $flesh_fields->{atev};
345
346     for my $id (@$events) {
347         my $event = $e->retrieve_action_trigger_event([
348             $id->{id},
349             {flesh => $flesh_depth, flesh_fields => $flesh_fields}
350         ]);
351
352         (my $meth = $obj_class) =~ s/^Fieldmapper:://o;
353         $meth =~ s/::/_/go;
354         $meth = 'retrieve_'.$meth;
355
356         $event->target($e->$meth($event->target));
357         $client->respond($event);
358     }
359
360     return undef;
361 }
362 __PACKAGE__->register_method(
363     api_name => 'open-ils.trigger.events_by_target',
364     method   => 'events_by_target',
365     api_level=> 1,
366     stream   => 1,
367     argc     => 2
368 );
369  
370 sub _fm_hint_by_class {
371     my $class = shift;
372     return OpenILS::Application->publish_fieldmapper->{$class}->{hint};
373 }
374
375 sub _fm_class_by_hint {
376     my $hint = shift;
377
378     my ($class) = grep {
379         OpenILS::Application->publish_fieldmapper->{$_}->{hint} eq $hint
380     } keys %{ OpenILS::Application->publish_fieldmapper };
381
382     return $class;
383 }
384
385 sub create_batch_events {
386     my $self = shift;
387     my $client = shift;
388     my $key = shift;
389     my $location_field = shift; # where to look for event_def.owner filtering ... circ_lib, for instance, where hook.core_type = circ
390     my $filter = shift || {};
391     my $granularity = shift;
392     my $user_data = shift;
393
394     my $active = ($self->api_name =~ /active/o) ? 1 : 0;
395     if ($active && !keys(%$filter)) {
396         $log->info("Active batch event creation requires a target filter but none was supplied to create_batch_events");
397         return undef;
398     }
399
400     return undef unless ($key && $location_field);
401
402     my $editor = new_editor(xact=>1);
403     my $hooks = $editor->search_action_trigger_hook(
404         { passive => $active ? 'f' : 't', key => $key }
405     );
406
407     my %hook_hash = map { ($_->key, $_) } @$hooks;
408
409     my $defs = $editor->search_action_trigger_event_definition(
410         { hook   => [ keys %hook_hash ], active => 't' },
411     );
412
413     my $orig_filter_and = [];
414     if ($$filter{'-and'}) {
415         for my $f ( @{ $$filter{'-and'} } ) {
416             push @$orig_filter_and, $f;
417         }
418     }
419
420     for my $def ( @$defs ) {
421         next if ($granularity && $def->granularity ne $granularity );
422
423         my $date = DateTime->now->subtract( seconds => interval_to_seconds($def->delay) );
424
425         # we may need to do some work to backport this to 1.2
426         $filter->{ $location_field } = { 'in' =>
427             {
428                 select  => { aou => [{ column => 'id', transform => 'actor.org_unit_descendants', result_field => 'id' }] },
429                 from    => 'aou',
430                 where   => { id => $def->owner }
431             }
432         };
433
434         my $run_time = 'now';
435         if ($active) {
436             $run_time = 
437                 DateTime
438                     ->now
439                     ->add( seconds => interval_to_seconds($def->delay) )
440                     ->strftime( '%F %T%z' );
441         } else {
442             if ($def->max_delay) {
443                 my @times = sort {$a <=> $b} interval_to_seconds($def->delay), interval_to_seconds($def->max_delay);
444                 $filter->{ $def->delay_field } = {
445                     'between' => [
446                         DateTime->now->subtract( seconds => $times[1] )->strftime( '%F %T%z' ),
447                         DateTime->now->subtract( seconds => $times[0] )->strftime( '%F %T%z' )
448                     ]
449                 };
450             } else {
451                 $filter->{ $def->delay_field } = {
452                     '<=' => DateTime->now->subtract( seconds => interval_to_seconds($def->delay) )->strftime( '%F %T%z' )
453                 };
454             }
455         }
456
457         my $class = _fm_class_by_hint($hook_hash{$def->hook}->core_type);
458
459         # filter where this target has an event (and it's pending, for active hooks)
460         $$filter{'-and'} = [];
461         for my $f ( @$orig_filter_and ) {
462             push @{ $$filter{'-and'} }, $f;
463         }
464
465         my $join = { 'join' => {
466             atev => {
467                 field => 'target',
468                 fkey => $class->Identity,
469                 type => 'left',
470                 filter => { event_def => $def->id }
471             }
472         }};
473         if ($def->repeat_delay) {
474             $join->{'join'}{atev}{filter} = { start_time => {
475                 '>' => DateTime->now->subtract( seconds => interval_to_seconds($def->repeat_delay) )->strftime( '%F %T%z' )
476             } };
477         }
478
479         push @{ $filter->{'-and'} }, { '+atev' => { id => undef } };
480
481         if ($def->usr_field && $def->opt_in_setting) {
482             push @{ $filter->{'-and'} }, {
483                 '-exists' => {
484                     from  => 'aus',
485                     where => {
486                         name => $def->opt_in_setting,
487                         usr  => { '=' => { '+' . $hook_hash{$def->hook}->core_type => $def->usr_field } },
488                         value=> 'true'
489                     }
490                 }
491             };
492         }
493
494         $class =~ s/^Fieldmapper:://o;
495         $class =~ s/::/_/go;
496         my $method = 'search_'. $class;
497
498         # for cleaner logging
499         my $def_id = $def->id;
500         my $hook = $def->hook;
501
502         $logger->info("trigger: create_batch_events() collecting object IDs for def=$def_id / hook=$hook");
503
504         my $object_ids = $editor->$method( [$filter, $join], {idlist => 1, timeout => 10800} );
505
506         if($object_ids) {
507             $logger->info("trigger: create_batch_events() fetched ".scalar(@$object_ids)." object IDs for def=$def_id / hook=$hook");
508         } else {
509             $logger->warn("trigger: create_batch_events() timeout occurred collecting object IDs for def=$def_id / hook=$hook");
510         }
511
512         for my $o_id (@$object_ids) {
513
514             my $event = Fieldmapper::action_trigger::event->new();
515             $event->target( $o_id );
516             $event->event_def( $def->id );
517             $event->run_time( $run_time );
518             $event->user_data( OpenSRF::Utils::JSON->perl2JSON($user_data) ) if (defined($user_data));
519
520             $editor->create_action_trigger_event( $event );
521
522             $client->respond( $event->id );
523         }
524         
525         $logger->info("trigger: create_batch_events() successfully created events for def=$def_id / hook=$hook");
526     }
527
528     $logger->info("trigger: create_batch_events() done creating events");
529
530     $editor->commit;
531
532     return undef;
533 }
534 __PACKAGE__->register_method(
535     api_name => 'open-ils.trigger.passive.event.autocreate.batch',
536     method   => 'create_batch_events',
537     api_level=> 1,
538     stream   => 1,
539     argc     => 2
540 );
541
542 __PACKAGE__->register_method(
543     api_name => 'open-ils.trigger.active.event.autocreate.batch',
544     method   => 'create_batch_events',
545     api_level=> 1,
546     stream   => 1,
547     argc     => 2
548 );
549
550 sub fire_single_event {
551     my $self = shift;
552     my $client = shift;
553     my $event_id = shift;
554
555     my $e = OpenILS::Application::Trigger::Event->new($event_id);
556
557     if ($e->validate->valid) {
558         $logger->info("trigger: Event is valid, reacting...");
559         $e->react->cleanup;
560     }
561
562     $e->editor->disconnect;
563     OpenILS::Application::Trigger::Event->ClearObjectCache();
564
565     return {
566         valid     => $e->valid,
567         reacted   => $e->reacted,
568         cleanedup => $e->cleanedup,
569         event     => $e->event
570     };
571 }
572 __PACKAGE__->register_method(
573     api_name => 'open-ils.trigger.event.fire',
574     method   => 'fire_single_event',
575     api_level=> 1,
576     argc     => 1
577 );
578
579 sub fire_event_group {
580     my $self = shift;
581     my $client = shift;
582     my $events = shift;
583
584     my $e = OpenILS::Application::Trigger::EventGroup->new(@$events);
585
586     if ($e->validate->valid) {
587         $logger->info("trigger: Event group is valid, reacting...");
588         $e->react->cleanup;
589     }
590
591     $e->editor->disconnect;
592     OpenILS::Application::Trigger::Event->ClearObjectCache();
593
594     return {
595         valid     => $e->valid,
596         reacted   => $e->reacted,
597         cleanedup => $e->cleanedup,
598         events    => [map { $_->event } @{$e->events}]
599     };
600 }
601 __PACKAGE__->register_method(
602     api_name => 'open-ils.trigger.event_group.fire',
603     method   => 'fire_event_group',
604     api_level=> 1,
605     argc     => 1
606 );
607
608 sub revalidate_event_group_test {
609     my $self = shift;
610     my $client = shift;
611     my $events = shift;
612
613     my $e = OpenILS::Application::Trigger::EventGroup->new_nochanges(@$events);
614
615     my $result = $e->revalidate_test;
616
617     $e->editor->disconnect;
618     OpenILS::Application::Trigger::Event->ClearObjectCache();
619
620     return $result;
621 }
622 __PACKAGE__->register_method(
623     api_name => 'open-ils.trigger.event_group.revalidate.test',
624     method   => 'revalidate_event_group_test',
625     api_level=> 1,
626     argc     => 1,
627     signature => {
628         desc => q/revalidate a group of events.
629         This does not actually update the events (so there will be no change
630         of atev.state or anything else in the database, unless an event's
631         validator makes changes out-of-band).
632         
633         This returns an array of valid event IDs.
634         /,
635         params => [
636             {name => "events", type => "array", desc => "list of event ids"}
637         ]
638     }
639 );
640
641
642 sub pending_events {
643     my $self = shift;
644     my $client = shift;
645     my $granularity = shift;
646     my $granflag = shift;
647
648     my $query = [{ state => 'pending', run_time => {'<' => 'now'} }, { order_by => { atev => [ qw/run_time add_time/] }, 'join' => 'atevdef' }];
649
650     if (defined $granularity) {
651         if ($granflag) {
652             $query->[0]->{'+atevdef'} = {granularity => $granularity};
653         } else {
654             $query->[0]->{'+atevdef'} = {'-or' => [ {granularity => $granularity}, {granularity => undef} ] };
655         }
656     } else {
657         $query->[0]->{'+atevdef'} = {granularity => undef};
658     }
659
660     return new_editor(xact=>1)->search_action_trigger_event(
661         $query, { idlist=> 1, timeout => 7200, substream => 1 }
662     );
663 }
664 __PACKAGE__->register_method(
665     api_name => 'open-ils.trigger.event.find_pending',
666     method   => 'pending_events',
667     api_level=> 1
668 );
669
670 sub gather_events {
671     my $self = shift;
672     my $client = shift;
673     my $e_ids = shift;
674
675     $e_ids = [$e_ids] if (!ref($e_ids));
676
677     my @events;
678     for my $e_id (@$e_ids) {
679         my $e;
680         try {
681            $e = OpenILS::Application::Trigger::Event->new($e_id);
682         } catch Error with {
683             $logger->error("trigger: Event creation failed with ".shift());
684         };
685
686         next if !$e or $e->event->state eq 'invalid'; 
687
688         try {
689             $e->build_environment;
690         } catch Error with {
691             $logger->error("trigger: Event environment building failed with ".shift());
692         };
693
694         $e->editor->disconnect;
695         $e->environment->{EventProcessor} = undef; # remove circular ref for json encoding
696         $client->respond($e);
697     }
698
699     OpenILS::Application::Trigger::Event->ClearObjectCache();
700
701     return undef;
702 }
703 __PACKAGE__->register_method(
704     api_name => 'open-ils.trigger.event.gather',
705     method   => 'gather_events',
706     api_level=> 1
707 );
708
709 sub grouped_events {
710     my $self = shift;
711     my $client = shift;
712     my $granularity = shift;
713     my $granflag = shift;
714
715     my ($events) = $self->method_lookup('open-ils.trigger.event.find_pending')->run($granularity, $granflag);
716
717     my %groups = ( '*' => [] );
718
719     if($events) {
720         $logger->info("trigger: grouped_events found ".scalar(@$events)." pending events to process");
721     } else {
722         $logger->warn("trigger: grouped_events timed out loading pending events");
723         return \%groups;
724     }
725
726     my @fleshed_events;
727
728     if ($parallel_collect == 1 or @$events == 1) { # use method lookup
729         @fleshed_events = $self->method_lookup('open-ils.trigger.event.gather')->run($events);
730     } else {
731         my $self_multi = OpenSRF::MultiSession->new(
732             app                 => 'open-ils.trigger',
733             cap                 => $parallel_collect,
734             success_handler     => sub {
735                 my $self = shift;
736                 my $req = shift;
737
738                 push @fleshed_events,
739                     map { OpenILS::Application::Trigger::Event->new($_) }
740                     map { $_->content }
741                     @{ $req->{response} };
742             },
743         );
744
745         $self_multi->request( 'open-ils.trigger.event.gather' => $_ ) for ( @$events );
746         $client->status( new OpenSRF::DomainObject::oilsContinueStatus );
747
748         $self_multi->session_wait(1);
749         $client->status( new OpenSRF::DomainObject::oilsContinueStatus );
750     }
751
752     for my  $e (@fleshed_events) {
753         if (my $group = $e->event->event_def->group_field) {
754
755             # split the grouping link steps
756             my @steps = split /\./, $group;
757             my $group_field = pop(@steps); # we didn't flesh to this, it's a field not an object
758
759             my $node;
760             eval {
761                 $node = $e->target;
762                 $node = $node->$_() for ( @steps );
763             };
764
765             unless($node) { # should not get here, but to be safe..
766                 $e->update_state('invalid');
767                 next;
768             }
769
770             # get the grouping value for the grouping object on this event
771             my $ident_value = $node->$group_field();
772             if(ref $ident_value) {
773                 my $ident_field = $ident_value->Identity; 
774                 $ident_value = $ident_value->$ident_field()
775             }
776
777             # push this event onto the event+grouping_value stack
778             $groups{$e->event->event_def->id}{$ident_value} ||= [];
779             push @{ $groups{$e->event->event_def->id}{$ident_value} }, $e;
780         } else {
781             # it's a non-grouped event
782             push @{ $groups{'*'} }, $e;
783         }
784     }
785
786
787     return \%groups;
788 }
789 __PACKAGE__->register_method(
790     api_name => 'open-ils.trigger.event.find_pending_by_group',
791     method   => 'grouped_events',
792     api_level=> 1
793 );
794
795 sub run_all_events {
796     my $self = shift;
797     my $client = shift;
798     my $granularity = shift;
799     my $granflag = shift;
800
801     my ($groups) = $self->method_lookup('open-ils.trigger.event.find_pending_by_group')->run($granularity, $granflag);
802     $client->respond({"status" => "found"}) if (keys(%$groups) > 1 || @{$$groups{'*'}});
803
804     my $self_multi;
805     if ($parallel_react > 1 and (keys(%$groups) > 1 || @{$$groups{'*'}} > 1)) {
806         $self_multi = OpenSRF::MultiSession->new(
807             app                   => 'open-ils.trigger',
808             cap                   => $parallel_react,
809             session_hash_function => sub {
810                 my $args = shift;
811                 return $args->{target_id};
812             },
813             success_handler       => sub {
814                 my $me = shift;
815                 my $req = shift;
816                 $client->respond( $req->{response}->[0]->content );
817             }
818         );
819     }
820
821     for my $def ( keys %$groups ) {
822         if ($def eq '*') {
823             $logger->info("trigger: run_all_events firing un-grouped events");
824             for my $event ( @{ $$groups{'*'} } ) {
825                 try {
826                     if ($self_multi) {
827                         $event->environment->{EventProcessor} = undef; # remove circular ref for json encoding
828                         $self_multi->request({target_id => $event->id}, 'open-ils.trigger.event.fire', $event);
829                     } else {
830                         $client->respond(
831                             $self
832                                 ->method_lookup('open-ils.trigger.event.fire')
833                                 ->run($event)
834                         );
835                     }
836                 } catch Error with { 
837                     $logger->error("trigger: event firing failed with ".shift());
838                 };
839             }
840             $logger->info("trigger: run_all_events completed queuing un-grouped events");
841             $client->status( new OpenSRF::DomainObject::oilsContinueStatus );
842
843         } else {
844             my $defgroup = $$groups{$def};
845             $logger->info("trigger: run_all_events firing events for grouped event def=$def");
846             for my $ident ( keys %$defgroup ) {
847                 $logger->info("trigger: run_all_events firing group for grouped event def=$def and grp ident $ident");
848                 try {
849                     if ($self_multi) {
850                         $_->environment->{EventProcessor} = undef for @{$$defgroup{$ident}}; # remove circular ref for json encoding
851                         $self_multi->request({target_id => $ident}, 'open-ils.trigger.event_group.fire', $$defgroup{$ident});
852                     } else {
853                         $client->respond(
854                             $self
855                                 ->method_lookup('open-ils.trigger.event_group.fire')
856                                 ->run($$defgroup{$ident})
857                         );
858                     }
859                     $client->status( new OpenSRF::DomainObject::oilsContinueStatus );
860                 } catch Error with {
861                     $logger->error("trigger: event firing failed with ".shift());
862                 };
863             }
864             $logger->info("trigger: run_all_events completed queuing events for grouped event def=$def");
865         }
866     }
867
868     $self_multi->session_wait(1) if ($self_multi);
869     $logger->info("trigger: run_all_events completed firing events");
870
871     $client->respond_complete();
872     return undef;
873 }
874 __PACKAGE__->register_method(
875     api_name => 'open-ils.trigger.event.run_all_pending',
876     method   => 'run_all_events',
877     api_level=> 1
878 );
879
880
881 1;