1 package OpenILS::Application::Trigger;
2 use strict; use warnings;
3 use OpenILS::Application;
4 use base qw/OpenILS::Application/;
6 use OpenSRF::EX qw/:try/;
7 use OpenSRF::Utils::JSON;
9 use OpenSRF::AppSession;
10 use OpenSRF::MultiSession;
11 use OpenSRF::Utils::SettingsClient;
12 use OpenSRF::Utils::Logger qw/$logger/;
13 use OpenSRF::Utils qw/:datetime/;
16 use DateTime::Format::ISO8601;
18 use OpenILS::Utils::Fieldmapper;
19 use OpenILS::Utils::CStoreEditor q/:funcs/;
20 use OpenILS::Application::Trigger::Event;
21 use OpenILS::Application::Trigger::EventGroup;
24 my $log = 'OpenSRF::Utils::Logger';
30 my $conf = OpenSRF::Utils::SettingsClient->new;
31 $parallel_collect = $conf->config_value( apps => 'open-ils.trigger' => app_settings => parallel => 'collect') || 1;
32 $parallel_react = $conf->config_value( apps => 'open-ils.trigger' => app_settings => parallel => 'react') || 1;
37 sub create_active_events_for_object {
43 my $granularity = shift;
44 my $user_data = shift;
46 my $ident = $target->Identity;
47 my $ident_value = $target->$ident();
49 my $editor = new_editor(xact=>1);
51 my $hooks = $editor->search_action_trigger_hook(
53 core_type => $target->json_hint
62 my %hook_hash = map { ($_->key, $_) } @$hooks;
64 my $orgs = $editor->json_query({ from => [ 'actor.org_unit_ancestors' => $location ] });
65 my $defs = $editor->search_action_trigger_event_definition(
66 { hook => [ keys %hook_hash ],
67 owner => [ map { $_->{id} } @$orgs ],
72 for my $def ( @$defs ) {
73 next if ($granularity && $def->granularity ne $granularity );
75 if ($def->usr_field && $def->opt_in_setting) {
76 my $ufield = $def->usr_field;
77 my $uid = $target->$ufield;
78 $uid = $uid->id if (ref $uid); # fleshed user object, unflesh it
80 my $opt_in_setting = $editor->search_actor_user_setting(
82 name => $def->opt_in_setting,
87 next unless (@$opt_in_setting);
90 my $date = DateTime->now;
92 if ($hook_hash{$def->hook}->passive eq 'f') {
94 if (my $dfield = $def->delay_field) {
95 if ($target->$dfield()) {
96 $date = DateTime::Format::ISO8601->new->parse_datetime( cleanse_ISO8601($target->$dfield) );
102 $date->add( seconds => interval_to_seconds($def->delay) );
105 my $event = Fieldmapper::action_trigger::event->new();
106 $event->target( $ident_value );
107 $event->event_def( $def->id );
108 $event->run_time( $date->strftime( '%F %T%z' ) );
109 $event->user_data( OpenSRF::Utils::JSON->perl2JSON($user_data) ) if (defined($user_data));
111 $editor->create_action_trigger_event( $event );
113 $client->respond( $event->id );
120 __PACKAGE__->register_method(
121 api_name => 'open-ils.trigger.event.autocreate',
122 method => 'create_active_events_for_object',
128 sub create_event_for_object_and_def {
131 my $definitions = shift;
133 my $location = shift;
134 my $user_data = shift;
136 my $ident = $target->Identity;
137 my $ident_value = $target->$ident();
139 my @active = ($self->api_name =~ /inactive/o) ? () : ( active => 't' );
141 my $editor = new_editor(xact=>1);
143 my $orgs = $editor->json_query({ from => [ 'actor.org_unit_ancestors' => $location ] });
144 my $defs = $editor->search_action_trigger_event_definition(
145 { id => $definitions,
146 owner => [ map { $_->{id} } @$orgs ],
151 my $hooks = $editor->search_action_trigger_hook(
152 { key => [ map { $_->hook } @$defs ],
153 core_type => $target->json_hint
157 my %hook_hash = map { ($_->key, $_) } @$hooks;
159 for my $def ( @$defs ) {
161 if ($def->usr_field && $def->opt_in_setting) {
162 my $ufield = $def->usr_field;
163 my $uid = $target->$ufield;
164 $uid = $uid->id if (ref $uid); # fleshed user object, unflesh it
166 my $opt_in_setting = $editor->search_actor_user_setting(
168 name => $def->opt_in_setting,
173 next unless (@$opt_in_setting);
176 my $date = DateTime->now;
178 if ($hook_hash{$def->hook}->passive eq 'f') {
180 if (my $dfield = $def->delay_field) {
181 if ($target->$dfield()) {
182 $date = DateTime::Format::ISO8601->new->parse_datetime( cleanse_ISO8601($target->$dfield) );
188 $date->add( seconds => interval_to_seconds($def->delay) );
191 my $event = Fieldmapper::action_trigger::event->new();
192 $event->target( $ident_value );
193 $event->event_def( $def->id );
194 $event->run_time( $date->strftime( '%F %T%z' ) );
195 $event->user_data( OpenSRF::Utils::JSON->perl2JSON($user_data) ) if (defined($user_data));
197 $editor->create_action_trigger_event( $event );
199 $client->respond( $event->id );
206 __PACKAGE__->register_method(
207 api_name => 'open-ils.trigger.event.autocreate.by_definition',
208 method => 'create_event_for_object_and_def',
213 __PACKAGE__->register_method(
214 api_name => 'open-ils.trigger.event.autocreate.by_definition.include_inactive',
215 method => 'create_event_for_object_and_def',
222 # Retrieves events by object, or object type + filter
223 # $object : a target object or object type (class hint)
225 # $filter : an optional hash of filters ... top level keys:
227 # filters on the atev objects, such as states or null-ness of timing
228 # fields. contains the effective default of:
229 # { state => 'pending' }
230 # an example, which overrides the default, and will find
231 # stale 'found' events:
232 # { state => 'found', update_time => { '<' => 'yesterday' } }
235 # filters on the atevdef object. contains the effective default of:
239 # filters on the hook object. no defaults, but there is a pinned,
240 # unchangeable filter based on the passed hint or object type (see
241 # $object above). an example for finding passive events:
245 # filters against the target field on the event. this can contain
246 # either an array of target ids (if you passed an object type, and
247 # not an object) or can contain a json_query that will return exactly
248 # a list of target-type ids. If you pass an object, the pkey value of
249 # that object will be used as a filter in addition to the filter passed
250 # in here. example filter for circs of user 1234 that are open:
251 # { select => { circ => ['id'] },
255 # checkin_time => undef,
257 # { stop_fines => undef },
258 # { stop_fines => { 'not in' => ['LOST','LONGOVERDUE','CLAIMSRETURNED'] } }
262 sub events_by_target {
266 my $filter = shift || {};
267 my $flesh_fields = shift || {};
268 my $flesh_depth = shift || 1;
270 my $obj_class = ref($object) || _fm_class_by_hint($object);
271 my $obj_hint = ref($object) ? _fm_hint_by_class(ref($object)) : $object;
273 my $object_ident_field = $obj_class->Identity;
276 select => { atev => ["id"] },
283 ath => { field => "key", fkey => "hook" }
289 "+ath" => { core_type => $obj_hint },
290 "+atevdef" => { active => 't' },
291 "+atev" => { state => 'pending' }
293 order_by => { "atev" => [ 'run_time', 'add_time' ] }
296 $query->{limit} = $filter->{limit} if defined $filter->{limit};
297 $query->{offset} = $filter->{offset} if defined $filter->{offset};
298 $query->{order_by} = $filter->{order_by} if defined $filter->{order_by};
301 # allow multiple 'target' filters
302 $query->{where}->{'+atev'}->{'-and'} = [];
304 # if we got a real object, filter on its pkey value
305 if (ref($object)) { # pass an object, require that target
306 push @{ $query->{where}->{'+atev'}->{'-and'} },
307 { target => $object->$object_ident_field }
310 # we have a fancy complex target filter or a list of target ids
311 if ($$filter{target}) {
312 push @{ $query->{where}->{'+atev'}->{'-and'} },
313 { target => {in => $$filter{target} } };
316 # pass no target filter or object, you get no events
317 if (!@{ $query->{where}->{'+atev'}->{'-and'} }) {
321 # any hook filters, other than the required core_type filter
322 if ($$filter{hook}) {
323 $query->{where}->{'+ath'}->{$_} = $$filter{hook}{$_}
324 for (grep { $_ ne 'core_type' } keys %{$$filter{hook}});
327 # any event_def filters. defaults to { active => 't' }
328 if ($$filter{event_def}) {
329 $query->{where}->{'+atevdef'}->{$_} = $$filter{event_def}{$_}
330 for (keys %{$$filter{event_def}});
333 # any event filters. defaults to { state => 'pending' }.
334 # don't overwrite '-and' used for multiple target filters above
335 if ($$filter{event}) {
336 $query->{where}->{'+atev'}->{$_} = $$filter{event}{$_}
337 for (grep { $_ ne '-and' } keys %{$$filter{event}});
340 my $e = new_editor(xact=>1);
342 my $events = $e->json_query($query);
344 $flesh_fields->{atev} = ['event_def'] unless $flesh_fields->{atev};
346 for my $id (@$events) {
347 my $event = $e->retrieve_action_trigger_event([
349 {flesh => $flesh_depth, flesh_fields => $flesh_fields}
352 (my $meth = $obj_class) =~ s/^Fieldmapper:://o;
354 $meth = 'retrieve_'.$meth;
356 $event->target($e->$meth($event->target));
357 $client->respond($event);
362 __PACKAGE__->register_method(
363 api_name => 'open-ils.trigger.events_by_target',
364 method => 'events_by_target',
370 sub _fm_hint_by_class {
372 return Fieldmapper->publish_fieldmapper->{$class}->{hint};
375 sub _fm_class_by_hint {
379 Fieldmapper->publish_fieldmapper->{$_}->{hint} eq $hint
380 } keys %{ Fieldmapper->publish_fieldmapper };
385 sub create_batch_events {
389 my $location_field = shift; # where to look for event_def.owner filtering ... circ_lib, for instance, where hook.core_type = circ
390 my $filter = shift || {};
391 my $granularity = shift;
392 my $user_data = shift;
394 my $active = ($self->api_name =~ /active/o) ? 1 : 0;
395 if ($active && !keys(%$filter)) {
396 $log->info("Active batch event creation requires a target filter but none was supplied to create_batch_events");
400 return undef unless ($key && $location_field);
402 my $editor = new_editor(xact=>1);
403 my $hooks = $editor->search_action_trigger_hook(
404 { passive => $active ? 'f' : 't', key => $key }
407 my %hook_hash = map { ($_->key, $_) } @$hooks;
409 my $defs = $editor->search_action_trigger_event_definition(
410 { hook => [ keys %hook_hash ], active => 't' },
413 my $orig_filter_and = [];
414 if ($$filter{'-and'}) {
415 for my $f ( @{ $$filter{'-and'} } ) {
416 push @$orig_filter_and, $f;
420 for my $def ( @$defs ) {
421 next if ($granularity && $def->granularity ne $granularity );
423 my $date = DateTime->now->subtract( seconds => interval_to_seconds($def->delay) );
425 # we may need to do some work to backport this to 1.2
426 $filter->{ $location_field } = { 'in' =>
428 select => { aou => [{ column => 'id', transform => 'actor.org_unit_descendants', result_field => 'id' }] },
430 where => { id => $def->owner }
434 my $run_time = 'now';
439 ->add( seconds => interval_to_seconds($def->delay) )
440 ->strftime( '%F %T%z' );
442 if ($def->max_delay) {
443 my @times = sort {$a <=> $b} interval_to_seconds($def->delay), interval_to_seconds($def->max_delay);
444 $filter->{ $def->delay_field } = {
446 DateTime->now->subtract( seconds => $times[1] )->strftime( '%F %T%z' ),
447 DateTime->now->subtract( seconds => $times[0] )->strftime( '%F %T%z' )
451 $filter->{ $def->delay_field } = {
452 '<=' => DateTime->now->subtract( seconds => interval_to_seconds($def->delay) )->strftime( '%F %T%z' )
457 my $class = _fm_class_by_hint($hook_hash{$def->hook}->core_type);
459 # filter where this target has an event (and it's pending, for active hooks)
460 $$filter{'-and'} = [];
461 for my $f ( @$orig_filter_and ) {
462 push @{ $$filter{'-and'} }, $f;
465 my $join = { 'join' => {
468 fkey => $class->Identity,
470 filter => { event_def => $def->id }
474 push @{ $filter->{'-and'} }, { '+atev' => { id => undef } };
476 if ($def->usr_field && $def->opt_in_setting) {
477 push @{ $filter->{'-and'} }, {
481 name => $def->opt_in_setting,
482 usr => { '=' => { '+' . $hook_hash{$def->hook}->core_type => $def->usr_field } },
489 $class =~ s/^Fieldmapper:://o;
491 my $method = 'search_'. $class;
493 # for cleaner logging
494 my $def_id = $def->id;
495 my $hook = $def->hook;
497 $logger->info("trigger: create_batch_events() collecting object IDs for def=$def_id / hook=$hook");
499 my $object_ids = $editor->$method( [$filter, $join], {idlist => 1, timeout => 10800} );
502 $logger->info("trigger: create_batch_events() fetched ".scalar(@$object_ids)." object IDs for def=$def_id / hook=$hook");
504 $logger->warn("trigger: create_batch_events() timeout occurred collecting object IDs for def=$def_id / hook=$hook");
507 for my $o_id (@$object_ids) {
509 my $event = Fieldmapper::action_trigger::event->new();
510 $event->target( $o_id );
511 $event->event_def( $def->id );
512 $event->run_time( $run_time );
513 $event->user_data( OpenSRF::Utils::JSON->perl2JSON($user_data) ) if (defined($user_data));
515 $editor->create_action_trigger_event( $event );
517 $client->respond( $event->id );
520 $logger->info("trigger: create_batch_events() successfully created events for def=$def_id / hook=$hook");
523 $logger->info("trigger: create_batch_events() done creating events");
529 __PACKAGE__->register_method(
530 api_name => 'open-ils.trigger.passive.event.autocreate.batch',
531 method => 'create_batch_events',
537 __PACKAGE__->register_method(
538 api_name => 'open-ils.trigger.active.event.autocreate.batch',
539 method => 'create_batch_events',
545 sub fire_single_event {
548 my $event_id = shift;
550 my $e = OpenILS::Application::Trigger::Event->new($event_id);
552 if ($e->validate->valid) {
553 $logger->info("trigger: Event is valid, reacting...");
557 $e->editor->disconnect;
558 OpenILS::Application::Trigger::Event->ClearObjectCache();
562 reacted => $e->reacted,
563 cleanedup => $e->cleanedup,
567 __PACKAGE__->register_method(
568 api_name => 'open-ils.trigger.event.fire',
569 method => 'fire_single_event',
574 sub fire_event_group {
579 my $e = OpenILS::Application::Trigger::EventGroup->new(@$events);
581 if ($e->validate->valid) {
582 $logger->info("trigger: Event group is valid, reacting...");
586 $e->editor->disconnect;
587 OpenILS::Application::Trigger::Event->ClearObjectCache();
591 reacted => $e->reacted,
592 cleanedup => $e->cleanedup,
593 events => [map { $_->event } @{$e->events}]
596 __PACKAGE__->register_method(
597 api_name => 'open-ils.trigger.event_group.fire',
598 method => 'fire_event_group',
603 sub revalidate_event_group_test {
608 my $e = OpenILS::Application::Trigger::EventGroup->new_nochanges(@$events);
610 my $result = $e->revalidate_test;
612 $e->editor->disconnect;
613 OpenILS::Application::Trigger::Event->ClearObjectCache();
617 __PACKAGE__->register_method(
618 api_name => 'open-ils.trigger.event_group.revalidate.test',
619 method => 'revalidate_event_group_test',
623 desc => q/revalidate a group of events.
624 This does not actually update the events (so there will be no change
625 of atev.state or anything else in the database, unless an event's
626 validator makes changes out-of-band).
628 This returns an array of valid event IDs.
631 {name => "events", type => "array", desc => "list of event ids"}
640 my $granularity = shift;
641 my $granflag = shift;
643 my $query = [{ state => 'pending', run_time => {'<' => 'now'} }, { order_by => { atev => [ qw/run_time add_time/] }, 'join' => 'atevdef' }];
645 if (defined $granularity) {
647 $query->[0]->{'+atevdef'} = {granularity => $granularity};
649 $query->[0]->{'+atevdef'} = {'-or' => [ {granularity => $granularity}, {granularity => undef} ] };
652 $query->[0]->{'+atevdef'} = {granularity => undef};
655 return new_editor(xact=>1)->search_action_trigger_event(
656 $query, { idlist=> 1, timeout => 7200, substream => 1 }
659 __PACKAGE__->register_method(
660 api_name => 'open-ils.trigger.event.find_pending',
661 method => 'pending_events',
670 $e_ids = [$e_ids] if (!ref($e_ids));
673 for my $e_id (@$e_ids) {
676 $e = OpenILS::Application::Trigger::Event->new($e_id);
678 $logger->error("trigger: Event creation failed with ".shift());
681 next if !$e or $e->event->state eq 'invalid';
684 $e->build_environment;
686 $logger->error("trigger: Event environment building failed with ".shift());
689 $e->editor->disconnect;
690 $e->environment->{EventProcessor} = undef; # remove circular ref for json encoding
691 $client->respond($e);
694 OpenILS::Application::Trigger::Event->ClearObjectCache();
698 __PACKAGE__->register_method(
699 api_name => 'open-ils.trigger.event.gather',
700 method => 'gather_events',
707 my $granularity = shift;
708 my $granflag = shift;
710 my ($events) = $self->method_lookup('open-ils.trigger.event.find_pending')->run($granularity, $granflag);
712 my %groups = ( '*' => [] );
715 $logger->info("trigger: grouped_events found ".scalar(@$events)." pending events to process");
717 $logger->warn("trigger: grouped_events timed out loading pending events");
723 if ($parallel_collect == 1 or @$events == 1) { # use method lookup
724 @fleshed_events = $self->method_lookup('open-ils.trigger.event.gather')->run($events);
726 my $self_multi = OpenSRF::MultiSession->new(
727 app => 'open-ils.trigger',
728 cap => $parallel_collect,
729 success_handler => sub {
733 push @fleshed_events,
734 map { OpenILS::Application::Trigger::Event->new($_) }
736 @{ $req->{response} };
740 $self_multi->request( 'open-ils.trigger.event.gather' => $_ ) for ( @$events );
741 $client->status( new OpenSRF::DomainObject::oilsContinueStatus );
743 $self_multi->session_wait(1);
744 $client->status( new OpenSRF::DomainObject::oilsContinueStatus );
747 for my $e (@fleshed_events) {
748 if (my $group = $e->event->event_def->group_field) {
750 # split the grouping link steps
751 my @steps = split /\./, $group;
752 my $group_field = pop(@steps); # we didn't flesh to this, it's a field not an object
757 $node = $node->$_() for ( @steps );
760 unless($node) { # should not get here, but to be safe..
761 $e->update_state('invalid');
765 # get the grouping value for the grouping object on this event
766 my $ident_value = $node->$group_field();
767 if(ref $ident_value) {
768 my $ident_field = $ident_value->Identity;
769 $ident_value = $ident_value->$ident_field()
772 # push this event onto the event+grouping_value stack
773 $groups{$e->event->event_def->id}{$ident_value} ||= [];
774 push @{ $groups{$e->event->event_def->id}{$ident_value} }, $e;
776 # it's a non-grouped event
777 push @{ $groups{'*'} }, $e;
784 __PACKAGE__->register_method(
785 api_name => 'open-ils.trigger.event.find_pending_by_group',
786 method => 'grouped_events',
793 my $granularity = shift;
794 my $granflag = shift;
796 my ($groups) = $self->method_lookup('open-ils.trigger.event.find_pending_by_group')->run($granularity, $granflag);
797 $client->respond({"status" => "found"}) if (keys(%$groups) > 1 || @{$$groups{'*'}});
800 if ($parallel_react > 1 and (keys(%$groups) > 1 || @{$$groups{'*'}} > 1)) {
801 $self_multi = OpenSRF::MultiSession->new(
802 app => 'open-ils.trigger',
803 cap => $parallel_react,
804 session_hash_function => sub {
806 return $args->{target_id};
808 success_handler => sub {
811 $client->respond( $req->{response}->[0]->content );
816 for my $def ( keys %$groups ) {
818 $logger->info("trigger: run_all_events firing un-grouped events");
819 for my $event ( @{ $$groups{'*'} } ) {
822 $event->environment->{EventProcessor} = undef; # remove circular ref for json encoding
823 $self_multi->request({target_id => $event->id}, 'open-ils.trigger.event.fire', $event);
827 ->method_lookup('open-ils.trigger.event.fire')
832 $logger->error("trigger: event firing failed with ".shift());
835 $logger->info("trigger: run_all_events completed queuing un-grouped events");
836 $client->status( new OpenSRF::DomainObject::oilsContinueStatus );
839 my $defgroup = $$groups{$def};
840 $logger->info("trigger: run_all_events firing events for grouped event def=$def");
841 for my $ident ( keys %$defgroup ) {
842 $logger->info("trigger: run_all_events firing group for grouped event def=$def and grp ident $ident");
845 $_->environment->{EventProcessor} = undef for @{$$defgroup{$ident}}; # remove circular ref for json encoding
846 $self_multi->request({target_id => $ident}, 'open-ils.trigger.event_group.fire', $$defgroup{$ident});
850 ->method_lookup('open-ils.trigger.event_group.fire')
851 ->run($$defgroup{$ident})
854 $client->status( new OpenSRF::DomainObject::oilsContinueStatus );
856 $logger->error("trigger: event firing failed with ".shift());
859 $logger->info("trigger: run_all_events completed queuing events for grouped event def=$def");
863 $self_multi->session_wait(1) if ($self_multi);
864 $logger->info("trigger: run_all_events completed firing events");
866 $client->respond_complete();
869 __PACKAGE__->register_method(
870 api_name => 'open-ils.trigger.event.run_all_pending',
871 method => 'run_all_events',