1 package OpenILS::Application::Trigger;
2 use strict; use warnings;
3 use OpenILS::Application;
4 use base qw/OpenILS::Application/;
6 use OpenSRF::EX qw/:try/;
7 use OpenSRF::Utils::JSON;
9 use OpenSRF::AppSession;
10 use OpenSRF::MultiSession;
11 use OpenSRF::Utils::SettingsClient;
12 use OpenSRF::Utils::Logger qw/$logger/;
13 use OpenSRF::Utils qw/:datetime/;
16 use DateTime::Format::ISO8601;
18 use OpenILS::Utils::Fieldmapper;
19 use OpenILS::Utils::CStoreEditor q/:funcs/;
20 use OpenILS::Application::Trigger::Event;
21 use OpenILS::Application::Trigger::EventGroup;
24 my $log = 'OpenSRF::Utils::Logger';
30 my $conf = OpenSRF::Utils::SettingsClient->new;
31 $parallel_collect = $conf->config_value( apps => 'open-ils.trigger' => app_settings => parallel => 'collect') || 1;
32 $parallel_react = $conf->config_value( apps => 'open-ils.trigger' => app_settings => parallel => 'react') || 1;
37 sub create_active_events_for_object {
43 my $granularity = shift;
44 my $user_data = shift;
46 my $ident = $target->Identity;
47 my $ident_value = $target->$ident();
49 my $editor = new_editor(xact=>1);
51 my $hooks = $editor->search_action_trigger_hook(
53 core_type => $target->json_hint
62 my %hook_hash = map { ($_->key, $_) } @$hooks;
64 my $orgs = $editor->json_query({ from => [ 'actor.org_unit_ancestors' => $location ] });
65 my $defs = $editor->search_action_trigger_event_definition(
66 { hook => [ keys %hook_hash ],
67 owner => [ map { $_->{id} } @$orgs ],
72 for my $def ( @$defs ) {
73 next if ($granularity && $def->granularity ne $granularity );
75 if ($def->usr_field && $def->opt_in_setting) {
76 my $ufield = $def->usr_field;
77 my $uid = $target->$ufield;
78 $uid = $uid->id if (ref $uid); # fleshed user object, unflesh it
80 my $opt_in_setting = $editor->search_actor_user_setting(
82 name => $def->opt_in_setting,
87 next unless (@$opt_in_setting);
90 my $date = DateTime->now;
92 if ($hook_hash{$def->hook}->passive eq 'f') {
94 if (my $dfield = $def->delay_field) {
95 if ($target->$dfield()) {
96 $date = DateTime::Format::ISO8601->new->parse_datetime( cleanse_ISO8601($target->$dfield) );
102 $date->add( seconds => interval_to_seconds($def->delay) );
105 my $event = Fieldmapper::action_trigger::event->new();
106 $event->target( $ident_value );
107 $event->event_def( $def->id );
108 $event->run_time( $date->strftime( '%F %T%z' ) );
109 $event->user_data( OpenSRF::Utils::JSON->perl2JSON($user_data) ) if (defined($user_data));
111 $editor->create_action_trigger_event( $event );
113 $client->respond( $event->id );
120 __PACKAGE__->register_method(
121 api_name => 'open-ils.trigger.event.autocreate',
122 method => 'create_active_events_for_object',
128 sub create_event_for_object_and_def {
131 my $definitions = shift;
133 my $location = shift;
134 my $user_data = shift;
136 my $ident = $target->Identity;
137 my $ident_value = $target->$ident();
139 my @active = ($self->api_name =~ /inactive/o) ? () : ( active => 't' );
141 my $editor = new_editor(xact=>1);
143 my $orgs = $editor->json_query({ from => [ 'actor.org_unit_ancestors' => $location ] });
144 my $defs = $editor->search_action_trigger_event_definition(
145 { id => $definitions,
146 owner => [ map { $_->{id} } @$orgs ],
151 my $hooks = $editor->search_action_trigger_hook(
152 { key => [ map { $_->hook } @$defs ],
153 core_type => $target->json_hint
157 my %hook_hash = map { ($_->key, $_) } @$hooks;
159 for my $def ( @$defs ) {
161 if ($def->usr_field && $def->opt_in_setting) {
162 my $ufield = $def->usr_field;
163 my $uid = $target->$ufield;
164 $uid = $uid->id if (ref $uid); # fleshed user object, unflesh it
166 my $opt_in_setting = $editor->search_actor_user_setting(
168 name => $def->opt_in_setting,
173 next unless (@$opt_in_setting);
176 my $date = DateTime->now;
178 if ($hook_hash{$def->hook}->passive eq 'f') {
180 if (my $dfield = $def->delay_field) {
181 if ($target->$dfield()) {
182 $date = DateTime::Format::ISO8601->new->parse_datetime( cleanse_ISO8601($target->$dfield) );
188 $date->add( seconds => interval_to_seconds($def->delay) );
191 my $event = Fieldmapper::action_trigger::event->new();
192 $event->target( $ident_value );
193 $event->event_def( $def->id );
194 $event->run_time( $date->strftime( '%F %T%z' ) );
195 $event->user_data( OpenSRF::Utils::JSON->perl2JSON($user_data) ) if (defined($user_data));
197 $editor->create_action_trigger_event( $event );
199 $client->respond( $event->id );
206 __PACKAGE__->register_method(
207 api_name => 'open-ils.trigger.event.autocreate.by_definition',
208 method => 'create_event_for_object_and_def',
213 __PACKAGE__->register_method(
214 api_name => 'open-ils.trigger.event.autocreate.by_definition.include_inactive',
215 method => 'create_event_for_object_and_def',
222 # Retrieves events by object, or object type + filter
223 # $object : a target object or object type (class hint)
225 # $filter : an optional hash of filters ... top level keys:
227 # filters on the atev objects, such as states or null-ness of timing
228 # fields. contains the effective default of:
229 # { state => 'pending' }
230 # an example, which overrides the default, and will find
231 # stale 'found' events:
232 # { state => 'found', update_time => { '<' => 'yesterday' } }
235 # filters on the atevdef object. contains the effective default of:
239 # filters on the hook object. no defaults, but there is a pinned,
240 # unchangeable filter based on the passed hint or object type (see
241 # $object above). an example for finding passive events:
245 # filters against the target field on the event. this can contain
246 # either an array of target ids (if you passed an object type, and
247 # not an object) or can contain a json_query that will return exactly
248 # a list of target-type ids. If you pass an object, the pkey value of
249 # that object will be used as a filter in addition to the filter passed
250 # in here. example filter for circs of user 1234 that are open:
251 # { select => { circ => ['id'] },
255 # checkin_time => undef,
257 # { stop_fines => undef },
258 # { stop_fines => { 'not in' => ['LOST','LONGOVERDUE','CLAIMSRETURNED'] } }
262 sub events_by_target {
266 my $filter = shift || {};
267 my $flesh_fields = shift || {};
268 my $flesh_depth = shift || 1;
270 my $obj_class = ref($object) || _fm_class_by_hint($object);
271 my $obj_hint = ref($object) ? _fm_hint_by_class(ref($object)) : $object;
273 my $object_ident_field = $obj_class->Identity;
276 select => { atev => ["id"] },
283 ath => { field => "key", fkey => "hook" }
289 "+ath" => { core_type => $obj_hint },
290 "+atevdef" => { active => 't' },
291 "+atev" => { state => 'pending' }
293 order_by => { "atev" => [ 'run_time', 'add_time' ] }
296 $query->{limit} = $filter->{limit} if defined $filter->{limit};
297 $query->{offset} = $filter->{offset} if defined $filter->{offset};
298 $query->{order_by} = $filter->{order_by} if defined $filter->{order_by};
301 # allow multiple 'target' filters
302 $query->{where}->{'+atev'}->{'-and'} = [];
304 # if we got a real object, filter on its pkey value
305 if (ref($object)) { # pass an object, require that target
306 push @{ $query->{where}->{'+atev'}->{'-and'} },
307 { target => $object->$object_ident_field }
310 # we have a fancy complex target filter or a list of target ids
311 if ($$filter{target}) {
312 push @{ $query->{where}->{'+atev'}->{'-and'} },
313 { target => {in => $$filter{target} } };
316 # pass no target filter or object, you get no events
317 if (!@{ $query->{where}->{'+atev'}->{'-and'} }) {
321 # any hook filters, other than the required core_type filter
322 if ($$filter{hook}) {
323 $query->{where}->{'+ath'}->{$_} = $$filter{hook}{$_}
324 for (grep { $_ ne 'core_type' } keys %{$$filter{hook}});
327 # any event_def filters. defaults to { active => 't' }
328 if ($$filter{event_def}) {
329 $query->{where}->{'+atevdef'}->{$_} = $$filter{event_def}{$_}
330 for (keys %{$$filter{event_def}});
333 # any event filters. defaults to { state => 'pending' }.
334 # don't overwrite '-and' used for multiple target filters above
335 if ($$filter{event}) {
336 $query->{where}->{'+atev'}->{$_} = $$filter{event}{$_}
337 for (grep { $_ ne '-and' } keys %{$$filter{event}});
340 my $e = new_editor(xact=>1);
342 my $events = $e->json_query($query);
344 $flesh_fields->{atev} = ['event_def'] unless $flesh_fields->{atev};
346 for my $id (@$events) {
347 my $event = $e->retrieve_action_trigger_event([
349 {flesh => $flesh_depth, flesh_fields => $flesh_fields}
352 (my $meth = $obj_class) =~ s/^Fieldmapper:://o;
354 $meth = 'retrieve_'.$meth;
356 $event->target($e->$meth($event->target));
357 $client->respond($event);
362 __PACKAGE__->register_method(
363 api_name => 'open-ils.trigger.events_by_target',
364 method => 'events_by_target',
370 sub _fm_hint_by_class {
372 return Fieldmapper->publish_fieldmapper->{$class}->{hint};
375 sub _fm_class_by_hint {
379 Fieldmapper->publish_fieldmapper->{$_}->{hint} eq $hint
380 } keys %{ Fieldmapper->publish_fieldmapper };
385 sub create_batch_events {
389 my $location_field = shift; # where to look for event_def.owner filtering ... circ_lib, for instance, where hook.core_type = circ
390 my $filter = shift || {};
391 my $granularity = shift;
392 my $user_data = shift;
394 my $active = ($self->api_name =~ /active/o) ? 1 : 0;
395 if ($active && !keys(%$filter)) {
396 $log->info("Active batch event creation requires a target filter but none was supplied to create_batch_events");
400 return undef unless ($key && $location_field);
402 my $editor = new_editor(xact=>1);
403 my $hooks = $editor->search_action_trigger_hook(
404 { passive => $active ? 'f' : 't', key => $key }
407 my %hook_hash = map { ($_->key, $_) } @$hooks;
409 my $defs = $editor->search_action_trigger_event_definition(
410 { hook => [ keys %hook_hash ], active => 't' },
413 my $orig_filter_and = [];
414 if ($$filter{'-and'}) {
415 for my $f ( @{ $$filter{'-and'} } ) {
416 push @$orig_filter_and, $f;
420 for my $def ( @$defs ) {
421 next if ($granularity && $def->granularity ne $granularity );
423 my $date = DateTime->now->subtract( seconds => interval_to_seconds($def->delay) );
425 # we may need to do some work to backport this to 1.2
426 $filter->{ $location_field } = { 'in' =>
428 select => { aou => [{ column => 'id', transform => 'actor.org_unit_descendants', result_field => 'id' }] },
430 where => { id => $def->owner }
434 my $run_time = 'now';
439 ->add( seconds => interval_to_seconds($def->delay) )
440 ->strftime( '%F %T%z' );
442 if ($def->max_delay) {
443 my @times = sort {$a <=> $b} interval_to_seconds($def->delay), interval_to_seconds($def->max_delay);
444 $filter->{ $def->delay_field } = {
446 DateTime->now->subtract( seconds => $times[1] )->strftime( '%F %T%z' ),
447 DateTime->now->subtract( seconds => $times[0] )->strftime( '%F %T%z' )
451 $filter->{ $def->delay_field } = {
452 '<=' => DateTime->now->subtract( seconds => interval_to_seconds($def->delay) )->strftime( '%F %T%z' )
457 my $class = _fm_class_by_hint($hook_hash{$def->hook}->core_type);
459 # filter where this target has an event (and it's pending, for active hooks)
460 $$filter{'-and'} = [];
461 for my $f ( @$orig_filter_and ) {
462 push @{ $$filter{'-and'} }, $f;
465 my $join = { 'join' => {
468 fkey => $class->Identity,
470 filter => { event_def => $def->id }
474 push @{ $filter->{'-and'} }, { '+atev' => { id => undef } };
476 if ($def->usr_field && $def->opt_in_setting) {
477 push @{ $filter->{'-and'} }, {
481 name => $def->opt_in_setting,
482 usr => { '=' => { '+' . $hook_hash{$def->hook}->core_type => $def->usr_field } },
489 $class =~ s/^Fieldmapper:://o;
491 my $method = 'search_'. $class;
493 # for cleaner logging
494 my $def_id = $def->id;
495 my $hook = $def->hook;
497 $logger->info("trigger: create_batch_events() collecting object IDs for def=$def_id / hook=$hook");
499 my $object_ids = $editor->$method( [$filter, $join], {idlist => 1, timeout => 10800} );
502 $logger->info("trigger: create_batch_events() fetched ".scalar(@$object_ids)." object IDs for def=$def_id / hook=$hook");
504 $logger->warn("trigger: create_batch_events() timeout occurred collecting object IDs for def=$def_id / hook=$hook");
507 for my $o_id (@$object_ids) {
509 my $event = Fieldmapper::action_trigger::event->new();
510 $event->target( $o_id );
511 $event->event_def( $def->id );
512 $event->run_time( $run_time );
513 $event->user_data( OpenSRF::Utils::JSON->perl2JSON($user_data) ) if (defined($user_data));
515 $editor->create_action_trigger_event( $event );
517 $client->respond( $event->id );
520 $logger->info("trigger: create_batch_events() successfully created events for def=$def_id / hook=$hook");
523 $logger->info("trigger: create_batch_events() done creating events");
529 __PACKAGE__->register_method(
530 api_name => 'open-ils.trigger.passive.event.autocreate.batch',
531 method => 'create_batch_events',
537 __PACKAGE__->register_method(
538 api_name => 'open-ils.trigger.active.event.autocreate.batch',
539 method => 'create_batch_events',
545 sub fire_single_event {
548 my $event_id = shift;
550 my $e = OpenILS::Application::Trigger::Event->new($event_id);
552 if ($e->validate->valid) {
553 $logger->info("trigger: Event is valid, reacting...");
557 $e->editor->disconnect;
558 OpenILS::Application::Trigger::Event->ClearObjectCache();
562 reacted => $e->reacted,
563 cleanedup => $e->cleanedup,
567 __PACKAGE__->register_method(
568 api_name => 'open-ils.trigger.event.fire',
569 method => 'fire_single_event',
574 sub fire_event_group {
579 my $e = OpenILS::Application::Trigger::EventGroup->new(@$events);
581 if ($e->validate->valid) {
582 $logger->info("trigger: Event group is valid, reacting...");
586 $e->editor->disconnect;
587 OpenILS::Application::Trigger::Event->ClearObjectCache();
591 reacted => $e->reacted,
592 cleanedup => $e->cleanedup,
593 events => [map { $_->event } @{$e->events}]
596 __PACKAGE__->register_method(
597 api_name => 'open-ils.trigger.event_group.fire',
598 method => 'fire_event_group',
606 my $granularity = shift;
607 my $granflag = shift;
609 my $query = [{ state => 'pending', run_time => {'<' => 'now'} }, { order_by => { atev => [ qw/run_time add_time/] }, 'join' => 'atevdef' }];
611 if (defined $granularity) {
613 $query->[0]->{'+atevdef'} = {granularity => $granularity};
615 $query->[0]->{'+atevdef'} = {'-or' => [ {granularity => $granularity}, {granularity => undef} ] };
618 $query->[0]->{'+atevdef'} = {granularity => undef};
621 return new_editor(xact=>1)->search_action_trigger_event(
622 $query, { idlist=> 1, timeout => 7200, substream => 1 }
625 __PACKAGE__->register_method(
626 api_name => 'open-ils.trigger.event.find_pending',
627 method => 'pending_events',
636 $e_ids = [$e_ids] if (!ref($e_ids));
639 for my $e_id (@$e_ids) {
642 $e = OpenILS::Application::Trigger::Event->new($e_id);
644 $logger->error("trigger: Event creation failed with ".shift());
650 $e->build_environment;
652 $logger->error("trigger: Event environment building failed with ".shift());
655 $e->editor->disconnect;
656 $e->environment->{EventProcessor} = undef; # remove circular ref for json encoding
657 $client->respond($e);
660 OpenILS::Application::Trigger::Event->ClearObjectCache();
664 __PACKAGE__->register_method(
665 api_name => 'open-ils.trigger.event.gather',
666 method => 'gather_events',
673 my $granularity = shift;
674 my $granflag = shift;
676 my ($events) = $self->method_lookup('open-ils.trigger.event.find_pending')->run($granularity, $granflag);
678 my %groups = ( '*' => [] );
681 $logger->info("trigger: grouped_events found ".scalar(@$events)." pending events to process");
683 $logger->warn("trigger: grouped_events timed out loading pending events");
689 if ($parallel_collect == 1 or @$events == 1) { # use method lookup
690 @fleshed_events = $self->method_lookup('open-ils.trigger.event.gather')->run($events);
692 my $self_multi = OpenSRF::MultiSession->new(
693 app => 'open-ils.trigger',
694 cap => $parallel_collect,
695 success_handler => sub {
699 push @fleshed_events,
700 map { OpenILS::Application::Trigger::Event->new($_) }
702 @{ $req->{response} };
706 $self_multi->request( 'open-ils.trigger.event.gather' => $_ ) for ( @$events );
707 $client->status( new OpenSRF::DomainObject::oilsContinueStatus );
709 $self_multi->session_wait(1);
710 $client->status( new OpenSRF::DomainObject::oilsContinueStatus );
713 for my $e (@fleshed_events) {
714 if (my $group = $e->event->event_def->group_field) {
716 # split the grouping link steps
717 my @steps = split /\./, $group;
718 my $group_field = pop(@steps); # we didn't flesh to this, it's a field not an object
720 # find the grouping object
721 my $node = $e->target;
722 $node = $node->$_() for ( @steps );
724 # get the grouping value for the grouping object on this event
725 my $ident_value = $node->$group_field();
726 if(ref $ident_value) {
727 my $ident_field = $ident_value->Identity;
728 $ident_value = $ident_value->$ident_field()
731 # push this event onto the event+grouping_value stack
732 $groups{$e->event->event_def->id}{$ident_value} ||= [];
733 push @{ $groups{$e->event->event_def->id}{$ident_value} }, $e;
735 # it's a non-grouped event
736 push @{ $groups{'*'} }, $e;
743 __PACKAGE__->register_method(
744 api_name => 'open-ils.trigger.event.find_pending_by_group',
745 method => 'grouped_events',
752 my $granularity = shift;
753 my $granflag = shift;
755 my ($groups) = $self->method_lookup('open-ils.trigger.event.find_pending_by_group')->run($granularity, $granflag);
756 $client->respond({"status" => "found"}) if (keys(%$groups) > 1 || @{$$groups{'*'}});
759 if ($parallel_react > 1 and (keys(%$groups) > 1 || @{$$groups{'*'}} > 1)) {
760 $self_multi = OpenSRF::MultiSession->new(
761 app => 'open-ils.trigger',
762 cap => $parallel_react,
763 session_hash_function => sub {
765 return $args->{target_id};
767 success_handler => sub {
770 $client->respond( $req->{response}->[0]->content );
775 for my $def ( keys %$groups ) {
777 $logger->info("trigger: run_all_events firing un-grouped events");
778 for my $event ( @{ $$groups{'*'} } ) {
781 $event->environment->{EventProcessor} = undef; # remove circular ref for json encoding
782 $self_multi->request({target_id => $event->id}, 'open-ils.trigger.event.fire', $event);
786 ->method_lookup('open-ils.trigger.event.fire')
791 $logger->error("trigger: event firing failed with ".shift());
794 $logger->info("trigger: run_all_events completed queuing un-grouped events");
795 $client->status( new OpenSRF::DomainObject::oilsContinueStatus );
798 my $defgroup = $$groups{$def};
799 $logger->info("trigger: run_all_events firing events for grouped event def=$def");
800 for my $ident ( keys %$defgroup ) {
801 $logger->info("trigger: run_all_events firing group for grouped event def=$def and grp ident $ident");
804 $_->environment->{EventProcessor} = undef for @{$$defgroup{$ident}}; # remove circular ref for json encoding
805 $self_multi->request({target_id => $ident}, 'open-ils.trigger.event_group.fire', $$defgroup{$ident});
809 ->method_lookup('open-ils.trigger.event_group.fire')
810 ->run($$defgroup{$ident})
813 $client->status( new OpenSRF::DomainObject::oilsContinueStatus );
815 $logger->error("trigger: event firing failed with ".shift());
818 $logger->info("trigger: run_all_events completed queuing events for grouped event def=$def");
822 $self_multi->session_wait(1) if ($self_multi);
823 $logger->info("trigger: run_all_events completed firing events");
825 $client->respond_complete();
828 __PACKAGE__->register_method(
829 api_name => 'open-ils.trigger.event.run_all_pending',
830 method => 'run_all_events',