1 package OpenILS::Application::Trigger;
2 use strict; use warnings;
3 use OpenILS::Application;
4 use base qw/OpenILS::Application/;
6 use OpenSRF::EX qw/:try/;
7 use OpenSRF::Utils::JSON;
9 use OpenSRF::AppSession;
10 use OpenSRF::MultiSession;
11 use OpenSRF::Utils::SettingsClient;
12 use OpenSRF::Utils::Logger qw/$logger/;
13 use OpenSRF::Utils qw/:datetime/;
16 use DateTime::Format::ISO8601;
18 use OpenILS::Utils::Fieldmapper;
19 use OpenILS::Utils::CStoreEditor q/:funcs/;
20 use OpenILS::Application::Trigger::Event;
21 use OpenILS::Application::Trigger::EventGroup;
24 my $log = 'OpenSRF::Utils::Logger';
30 my $conf = OpenSRF::Utils::SettingsClient->new;
31 $parallel_collect = $conf->config_value( apps => 'open-ils.trigger' => app_settings => parallel => 'collect') || 1;
32 $parallel_react = $conf->config_value( apps => 'open-ils.trigger' => app_settings => parallel => 'react') || 1;
37 sub create_active_events_for_object {
43 my $granularity = shift;
44 my $user_data = shift;
46 my $ident = $target->Identity;
47 my $ident_value = $target->$ident();
49 my $editor = new_editor(xact=>1);
51 my $hooks = $editor->search_action_trigger_hook(
53 core_type => $target->json_hint
62 my %hook_hash = map { ($_->key, $_) } @$hooks;
64 my $orgs = $editor->json_query({ from => [ 'actor.org_unit_ancestors' => $location ] });
65 my $defs = $editor->search_action_trigger_event_definition(
66 { hook => [ keys %hook_hash ],
67 owner => [ map { $_->{id} } @$orgs ],
72 for my $def ( @$defs ) {
73 next if ($granularity && $def->granularity ne $granularity );
75 if ($def->usr_field && $def->opt_in_setting) {
76 my $ufield = $def->usr_field;
77 my $uid = $target->$ufield;
78 $uid = $uid->id if (ref $uid); # fleshed user object, unflesh it
80 my $opt_in_setting = $editor->search_actor_user_setting(
82 name => $def->opt_in_setting,
87 next unless (@$opt_in_setting);
90 my $date = DateTime->now;
92 if ($hook_hash{$def->hook}->passive eq 'f') {
94 if (my $dfield = $def->delay_field) {
95 if ($target->$dfield()) {
96 $date = DateTime::Format::ISO8601->new->parse_datetime( cleanse_ISO8601($target->$dfield) );
102 $date->add( seconds => interval_to_seconds($def->delay) );
105 my $event = Fieldmapper::action_trigger::event->new();
106 $event->target( $ident_value );
107 $event->event_def( $def->id );
108 $event->run_time( $date->strftime( '%F %T%z' ) );
109 $event->user_data( OpenSRF::Utils::JSON->perl2JSON($user_data) ) if (defined($user_data));
111 $editor->create_action_trigger_event( $event );
113 $client->respond( $event->id );
120 __PACKAGE__->register_method(
121 api_name => 'open-ils.trigger.event.autocreate',
122 method => 'create_active_events_for_object',
128 sub create_event_for_object_and_def {
131 my $definitions = shift;
133 my $location = shift;
134 my $user_data = shift;
136 my $ident = $target->Identity;
137 my $ident_value = $target->$ident();
139 my @active = ($self->api_name =~ /inactive/o) ? () : ( active => 't' );
141 my $editor = new_editor(xact=>1);
143 my $orgs = $editor->json_query({ from => [ 'actor.org_unit_ancestors' => $location ] });
144 my $defs = $editor->search_action_trigger_event_definition(
145 { id => $definitions,
146 owner => [ map { $_->{id} } @$orgs ],
151 my $hooks = $editor->search_action_trigger_hook(
152 { key => [ map { $_->hook } @$defs ],
153 core_type => $target->json_hint
157 my %hook_hash = map { ($_->key, $_) } @$hooks;
159 for my $def ( @$defs ) {
161 if ($def->usr_field && $def->opt_in_setting) {
162 my $ufield = $def->usr_field;
163 my $uid = $target->$ufield;
164 $uid = $uid->id if (ref $uid); # fleshed user object, unflesh it
166 my $opt_in_setting = $editor->search_actor_user_setting(
168 name => $def->opt_in_setting,
173 next unless (@$opt_in_setting);
176 my $date = DateTime->now;
178 if ($hook_hash{$def->hook}->passive eq 'f') {
180 if (my $dfield = $def->delay_field) {
181 if ($target->$dfield()) {
182 $date = DateTime::Format::ISO8601->new->parse_datetime( cleanse_ISO8601($target->$dfield) );
188 $date->add( seconds => interval_to_seconds($def->delay) );
191 my $event = Fieldmapper::action_trigger::event->new();
192 $event->target( $ident_value );
193 $event->event_def( $def->id );
194 $event->run_time( $date->strftime( '%F %T%z' ) );
195 $event->user_data( OpenSRF::Utils::JSON->perl2JSON($user_data) ) if (defined($user_data));
197 $editor->create_action_trigger_event( $event );
199 $client->respond( $event->id );
206 __PACKAGE__->register_method(
207 api_name => 'open-ils.trigger.event.autocreate.by_definition',
208 method => 'create_event_for_object_and_def',
213 __PACKAGE__->register_method(
214 api_name => 'open-ils.trigger.event.autocreate.by_definition.include_inactive',
215 method => 'create_event_for_object_and_def',
222 # Retrieves events by object, or object type + filter
223 # $object : a target object or object type (class hint)
225 # $filter : an optional hash of filters ... top level keys:
227 # filters on the atev objects, such as states or null-ness of timing
228 # fields. contains the effective default of:
229 # { state => 'pending' }
230 # an example, which overrides the default, and will find
231 # stale 'found' events:
232 # { state => 'found', update_time => { '<' => 'yesterday' } }
235 # filters on the atevdef object. contains the effective default of:
239 # filters on the hook object. no defaults, but there is a pinned,
240 # unchangeable filter based on the passed hint or object type (see
241 # $object above). an example for finding passive events:
245 # filters against the target field on the event. this can contain
246 # either an array of target ids (if you passed an object type, and
247 # not an object) or can contain a json_query that will return exactly
248 # a list of target-type ids. If you pass an object, the pkey value of
249 # that object will be used as a filter in addition to the filter passed
250 # in here. example filter for circs of user 1234 that are open:
251 # { select => { circ => ['id'] },
255 # checkin_time => undef,
257 # { stop_fines => undef },
258 # { stop_fines => { 'not in' => ['LOST','LONGOVERDUE','CLAIMSRETURNED'] } }
262 sub events_by_target {
266 my $filter = shift || {};
267 my $flesh_fields = shift || {};
268 my $flesh_depth = shift || 1;
270 my $obj_class = ref($object) || _fm_class_by_hint($object);
271 my $obj_hint = ref($object) ? _fm_hint_by_class(ref($object)) : $object;
273 my $object_ident_field = $obj_class->Identity;
276 select => { atev => ["id"] },
283 ath => { field => "key", fkey => "hook" }
289 "+ath" => { core_type => $obj_hint },
290 "+atevdef" => { active => 't' },
291 "+atev" => { state => 'pending' }
293 order_by => { "atev" => [ 'run_time', 'add_time' ] }
296 $query->{limit} = $filter->{limit} if defined $filter->{limit};
297 $query->{offset} = $filter->{offset} if defined $filter->{offset};
298 $query->{order_by} = $filter->{order_by} if defined $filter->{order_by};
301 # allow multiple 'target' filters
302 $query->{where}->{'+atev'}->{'-and'} = [];
304 # if we got a real object, filter on its pkey value
305 if (ref($object)) { # pass an object, require that target
306 push @{ $query->{where}->{'+atev'}->{'-and'} },
307 { target => $object->$object_ident_field }
310 # we have a fancy complex target filter or a list of target ids
311 if ($$filter{target}) {
312 push @{ $query->{where}->{'+atev'}->{'-and'} },
313 { target => {in => $$filter{target} } };
316 # pass no target filter or object, you get no events
317 if (!@{ $query->{where}->{'+atev'}->{'-and'} }) {
321 # any hook filters, other than the required core_type filter
322 if ($$filter{hook}) {
323 $query->{where}->{'+ath'}->{$_} = $$filter{hook}{$_}
324 for (grep { $_ ne 'core_type' } keys %{$$filter{hook}});
327 # any event_def filters. defaults to { active => 't' }
328 if ($$filter{event_def}) {
329 $query->{where}->{'+atevdef'}->{$_} = $$filter{event_def}{$_}
330 for (keys %{$$filter{event_def}});
333 # any event filters. defaults to { state => 'pending' }.
334 # don't overwrite '-and' used for multiple target filters above
335 if ($$filter{event}) {
336 $query->{where}->{'+atev'}->{$_} = $$filter{event}{$_}
337 for (grep { $_ ne '-and' } keys %{$$filter{event}});
340 my $e = new_editor(xact=>1);
342 my $events = $e->json_query($query);
344 $flesh_fields->{atev} = ['event_def'] unless $flesh_fields->{atev};
346 for my $id (@$events) {
347 my $event = $e->retrieve_action_trigger_event([
349 {flesh => $flesh_depth, flesh_fields => $flesh_fields}
352 (my $meth = $obj_class) =~ s/^Fieldmapper:://o;
354 $meth = 'retrieve_'.$meth;
356 $event->target($e->$meth($event->target));
357 $client->respond($event);
362 __PACKAGE__->register_method(
363 api_name => 'open-ils.trigger.events_by_target',
364 method => 'events_by_target',
370 sub _fm_hint_by_class {
372 return OpenILS::Application->publish_fieldmapper->{$class}->{hint};
375 sub _fm_class_by_hint {
379 OpenILS::Application->publish_fieldmapper->{$_}->{hint} eq $hint
380 } keys %{ OpenILS::Application->publish_fieldmapper };
385 sub create_batch_events {
389 my $location_field = shift; # where to look for event_def.owner filtering ... circ_lib, for instance, where hook.core_type = circ
390 my $filter = shift || {};
391 my $granularity = shift;
392 my $user_data = shift;
394 my $active = ($self->api_name =~ /active/o) ? 1 : 0;
395 if ($active && !keys(%$filter)) {
396 $log->info("Active batch event creation requires a target filter but none was supplied to create_batch_events");
400 return undef unless ($key && $location_field);
402 my $editor = new_editor(xact=>1);
403 my $hooks = $editor->search_action_trigger_hook(
404 { passive => $active ? 'f' : 't', key => $key }
407 my %hook_hash = map { ($_->key, $_) } @$hooks;
409 my $defs = $editor->search_action_trigger_event_definition(
410 { hook => [ keys %hook_hash ], active => 't' },
413 my $orig_filter_and = [];
414 if ($$filter{'-and'}) {
415 for my $f ( @{ $$filter{'-and'} } ) {
416 push @$orig_filter_and, $f;
420 for my $def ( @$defs ) {
421 next if ($granularity && $def->granularity ne $granularity );
423 my $date = DateTime->now->subtract( seconds => interval_to_seconds($def->delay) );
425 # we may need to do some work to backport this to 1.2
426 $filter->{ $location_field } = { 'in' =>
428 select => { aou => [{ column => 'id', transform => 'actor.org_unit_descendants', result_field => 'id' }] },
430 where => { id => $def->owner }
434 my $run_time = 'now';
439 ->add( seconds => interval_to_seconds($def->delay) )
440 ->strftime( '%F %T%z' );
442 if ($def->max_delay) {
443 my @times = sort {$a <=> $b} interval_to_seconds($def->delay), interval_to_seconds($def->max_delay);
444 $filter->{ $def->delay_field } = {
446 DateTime->now->subtract( seconds => $times[1] )->strftime( '%F %T%z' ),
447 DateTime->now->subtract( seconds => $times[0] )->strftime( '%F %T%z' )
451 $filter->{ $def->delay_field } = {
452 '<=' => DateTime->now->subtract( seconds => interval_to_seconds($def->delay) )->strftime( '%F %T%z' )
457 my $class = _fm_class_by_hint($hook_hash{$def->hook}->core_type);
459 # filter where this target has an event (and it's pending, for active hooks)
460 $$filter{'-and'} = [];
461 for my $f ( @$orig_filter_and ) {
462 push @{ $$filter{'-and'} }, $f;
465 my $join = { 'join' => {
468 fkey => $class->Identity,
470 filter => { event_def => $def->id }
473 if ($def->repeat_delay) {
474 $join->{'join'}{atev}{filter} = { start_time => {
475 '>' => DateTime->now->subtract( seconds => interval_to_seconds($def->repeat_delay) )->strftime( '%F %T%z' )
479 push @{ $filter->{'-and'} }, { '+atev' => { id => undef } };
481 if ($def->usr_field && $def->opt_in_setting) {
482 push @{ $filter->{'-and'} }, {
486 name => $def->opt_in_setting,
487 usr => { '=' => { '+' . $hook_hash{$def->hook}->core_type => $def->usr_field } },
494 $class =~ s/^Fieldmapper:://o;
496 my $method = 'search_'. $class;
498 # for cleaner logging
499 my $def_id = $def->id;
500 my $hook = $def->hook;
502 $logger->info("trigger: create_batch_events() collecting object IDs for def=$def_id / hook=$hook");
504 my $object_ids = $editor->$method( [$filter, $join], {idlist => 1, timeout => 10800} );
507 $logger->info("trigger: create_batch_events() fetched ".scalar(@$object_ids)." object IDs for def=$def_id / hook=$hook");
509 $logger->warn("trigger: create_batch_events() timeout occurred collecting object IDs for def=$def_id / hook=$hook");
512 for my $o_id (@$object_ids) {
514 my $event = Fieldmapper::action_trigger::event->new();
515 $event->target( $o_id );
516 $event->event_def( $def->id );
517 $event->run_time( $run_time );
518 $event->user_data( OpenSRF::Utils::JSON->perl2JSON($user_data) ) if (defined($user_data));
520 $editor->create_action_trigger_event( $event );
522 $client->respond( $event->id );
525 $logger->info("trigger: create_batch_events() successfully created events for def=$def_id / hook=$hook");
528 $logger->info("trigger: create_batch_events() done creating events");
534 __PACKAGE__->register_method(
535 api_name => 'open-ils.trigger.passive.event.autocreate.batch',
536 method => 'create_batch_events',
542 __PACKAGE__->register_method(
543 api_name => 'open-ils.trigger.active.event.autocreate.batch',
544 method => 'create_batch_events',
550 sub fire_single_event {
553 my $event_id = shift;
555 my $e = OpenILS::Application::Trigger::Event->new($event_id);
557 if ($e->validate->valid) {
558 $logger->info("trigger: Event is valid, reacting...");
562 $e->editor->disconnect;
563 OpenILS::Application::Trigger::Event->ClearObjectCache();
567 reacted => $e->reacted,
568 cleanedup => $e->cleanedup,
572 __PACKAGE__->register_method(
573 api_name => 'open-ils.trigger.event.fire',
574 method => 'fire_single_event',
579 sub fire_event_group {
584 my $e = OpenILS::Application::Trigger::EventGroup->new(@$events);
586 if ($e->validate->valid) {
587 $logger->info("trigger: Event group is valid, reacting...");
591 $e->editor->disconnect;
592 OpenILS::Application::Trigger::Event->ClearObjectCache();
596 reacted => $e->reacted,
597 cleanedup => $e->cleanedup,
598 events => [map { $_->event } @{$e->events}]
601 __PACKAGE__->register_method(
602 api_name => 'open-ils.trigger.event_group.fire',
603 method => 'fire_event_group',
608 sub revalidate_event_group_test {
613 my $e = OpenILS::Application::Trigger::EventGroup->new_nochanges(@$events);
615 my $result = $e->revalidate_test;
617 $e->editor->disconnect;
618 OpenILS::Application::Trigger::Event->ClearObjectCache();
622 __PACKAGE__->register_method(
623 api_name => 'open-ils.trigger.event_group.revalidate.test',
624 method => 'revalidate_event_group_test',
628 desc => q/revalidate a group of events.
629 This does not actually update the events (so there will be no change
630 of atev.state or anything else in the database, unless an event's
631 validator makes changes out-of-band).
633 This returns an array of valid event IDs.
636 {name => "events", type => "array", desc => "list of event ids"}
645 my $granularity = shift;
646 my $granflag = shift;
648 my $query = [{ state => 'pending', run_time => {'<' => 'now'} }, { order_by => { atev => [ qw/run_time add_time/] }, 'join' => 'atevdef' }];
650 if (defined $granularity) {
652 $query->[0]->{'+atevdef'} = {granularity => $granularity};
654 $query->[0]->{'+atevdef'} = {'-or' => [ {granularity => $granularity}, {granularity => undef} ] };
657 $query->[0]->{'+atevdef'} = {granularity => undef};
660 return new_editor(xact=>1)->search_action_trigger_event(
661 $query, { idlist=> 1, timeout => 7200, substream => 1 }
664 __PACKAGE__->register_method(
665 api_name => 'open-ils.trigger.event.find_pending',
666 method => 'pending_events',
675 $e_ids = [$e_ids] if (!ref($e_ids));
678 for my $e_id (@$e_ids) {
681 $e = OpenILS::Application::Trigger::Event->new($e_id);
683 $logger->error("trigger: Event creation failed with ".shift());
686 next if !$e or $e->event->state eq 'invalid';
689 $e->build_environment;
691 $logger->error("trigger: Event environment building failed with ".shift());
694 $e->editor->disconnect;
695 $e->environment->{EventProcessor} = undef; # remove circular ref for json encoding
696 $client->respond($e);
699 OpenILS::Application::Trigger::Event->ClearObjectCache();
703 __PACKAGE__->register_method(
704 api_name => 'open-ils.trigger.event.gather',
705 method => 'gather_events',
712 my $granularity = shift;
713 my $granflag = shift;
715 my ($events) = $self->method_lookup('open-ils.trigger.event.find_pending')->run($granularity, $granflag);
717 my %groups = ( '*' => [] );
720 $logger->info("trigger: grouped_events found ".scalar(@$events)." pending events to process");
722 $logger->warn("trigger: grouped_events timed out loading pending events");
728 if ($parallel_collect == 1 or @$events == 1) { # use method lookup
729 @fleshed_events = $self->method_lookup('open-ils.trigger.event.gather')->run($events);
731 my $self_multi = OpenSRF::MultiSession->new(
732 app => 'open-ils.trigger',
733 cap => $parallel_collect,
734 success_handler => sub {
738 push @fleshed_events,
739 map { OpenILS::Application::Trigger::Event->new($_) }
741 @{ $req->{response} };
745 $self_multi->request( 'open-ils.trigger.event.gather' => $_ ) for ( @$events );
746 $client->status( new OpenSRF::DomainObject::oilsContinueStatus );
748 $self_multi->session_wait(1);
749 $client->status( new OpenSRF::DomainObject::oilsContinueStatus );
752 for my $e (@fleshed_events) {
753 if (my $group = $e->event->event_def->group_field) {
755 # split the grouping link steps
756 my @steps = split /\./, $group;
757 my $group_field = pop(@steps); # we didn't flesh to this, it's a field not an object
762 $node = $node->$_() for ( @steps );
765 unless($node) { # should not get here, but to be safe..
766 $e->update_state('invalid');
770 # get the grouping value for the grouping object on this event
771 my $ident_value = $node->$group_field();
772 if(ref $ident_value) {
773 my $ident_field = $ident_value->Identity;
774 $ident_value = $ident_value->$ident_field()
777 # push this event onto the event+grouping_value stack
778 $groups{$e->event->event_def->id}{$ident_value} ||= [];
779 push @{ $groups{$e->event->event_def->id}{$ident_value} }, $e;
781 # it's a non-grouped event
782 push @{ $groups{'*'} }, $e;
789 __PACKAGE__->register_method(
790 api_name => 'open-ils.trigger.event.find_pending_by_group',
791 method => 'grouped_events',
798 my $granularity = shift;
799 my $granflag = shift;
801 my ($groups) = $self->method_lookup('open-ils.trigger.event.find_pending_by_group')->run($granularity, $granflag);
802 $client->respond({"status" => "found"}) if (keys(%$groups) > 1 || @{$$groups{'*'}});
805 if ($parallel_react > 1 and (keys(%$groups) > 1 || @{$$groups{'*'}} > 1)) {
806 $self_multi = OpenSRF::MultiSession->new(
807 app => 'open-ils.trigger',
808 cap => $parallel_react,
809 session_hash_function => sub {
811 return $args->{target_id};
813 success_handler => sub {
816 $client->respond( $req->{response}->[0]->content );
821 for my $def ( keys %$groups ) {
823 $logger->info("trigger: run_all_events firing un-grouped events");
824 for my $event ( @{ $$groups{'*'} } ) {
827 $event->environment->{EventProcessor} = undef; # remove circular ref for json encoding
828 $self_multi->request({target_id => $event->id}, 'open-ils.trigger.event.fire', $event);
832 ->method_lookup('open-ils.trigger.event.fire')
837 $logger->error("trigger: event firing failed with ".shift());
840 $logger->info("trigger: run_all_events completed queuing un-grouped events");
841 $client->status( new OpenSRF::DomainObject::oilsContinueStatus );
844 my $defgroup = $$groups{$def};
845 $logger->info("trigger: run_all_events firing events for grouped event def=$def");
846 for my $ident ( keys %$defgroup ) {
847 $logger->info("trigger: run_all_events firing group for grouped event def=$def and grp ident $ident");
850 $_->environment->{EventProcessor} = undef for @{$$defgroup{$ident}}; # remove circular ref for json encoding
851 $self_multi->request({target_id => $ident}, 'open-ils.trigger.event_group.fire', $$defgroup{$ident});
855 ->method_lookup('open-ils.trigger.event_group.fire')
856 ->run($$defgroup{$ident})
859 $client->status( new OpenSRF::DomainObject::oilsContinueStatus );
861 $logger->error("trigger: event firing failed with ".shift());
864 $logger->info("trigger: run_all_events completed queuing events for grouped event def=$def");
868 $self_multi->session_wait(1) if ($self_multi);
869 $logger->info("trigger: run_all_events completed firing events");
871 $client->respond_complete();
874 __PACKAGE__->register_method(
875 api_name => 'open-ils.trigger.event.run_all_pending',
876 method => 'run_all_events',