The Executor binding enables applications to offload work to separate threads for processor-intensive tasks or outbound gateway implementations.
Overview
The Executor Bus Binding is a special bus binding that allows applications to provide send work (via a message) to be processed on a separate thread. The executor binding can be used to:
Perform processor-intensive work in a thread other than an application's main dispatch thread
Implement outbound gateways where the executing thread 'pushes' the sent message to an external system
Work done by the processor of an executor bus is acknowledged and therefore Guaranteed across failures.
Implement an ExecutorBusProcessor that handles the processing
Expose it via an ExecutorBusProcessorFactory configured for the bus
Acknowledge completed work via the provided Acknowledger as work is completed
Send the processor work in the form of messages that the processor will complete
Implementing an Executor Bus Processor
An executor bus needs an ExecutorBusProcessor to perform processing, which is supplied to the bus when it is created by its executor bus processor factory.
The Process Method
The ExecutorBusProcessor interface defines a single method:
Lifecycle Integration
Processors that need to open connections to external systems may implement LifecycleAwareExecutorBusProcessor to hook into the bus lifecycle:
onExecutorBusOpen() - Called when the bus is opened
onExecutorBusStart() - Called when the bus is started
onExecutorBusClose() - Called when the bus is closed
Accessing Configuration Properties
Processor-related configuration properties from the bus descriptor can be retrieved from the provider config portion of the binding's descriptor:
Acknowledger
Regardless of whether or not an executor bus channel is configured to be Guaranteed, the bus will pass a non-null Acknowledger to the application and the application must call its acknowledge() method when processing has been completed.
The acknowledger can be called by any thread asynchronously, but its acknowledge() method may only be called once as the Acknowledger implementation is pooled.
Sample Implementation
The following pseudo-code illustrates how an executor bus processor can be implemented:
Implementing a Processor Factory
The processor factory returns instances of a processor for use by the executor bus. The executor bus will create a processor when the bus is created. In an AEP Engine, this will be when an engine is activated.
Executor Bus Processor Dependencies
If the class implementing ExecutorBusProcessor is created through a DI framework or needs access to other objects in your application, consider registering the instance of your processor as a static variable in the bus processor factory. If you have multiple processor instances, you can store them in a static map and use the bus binding descriptor to determine which instance to return.
Useful binding configuration includes:
binding.getUsername(): The name of the engine creating the bus
binding.getName(): The name of the bus as configured in DDL
binding.getAddress(): The address as configured in DDL
binding.getDescriptor().getProperties(): Binding properties as configured in DDL
Example: E-mail Alert Gateway
This example demonstrates creating a gateway that bridges alerts received from Solace out through an email gateway.
Application Code
Configuration
Because the e-mail gateway bus is acknowledging its work after sending each e-mail, this application will guarantee that e-mail alerts will be sent, and by virtue of clustering will be highly available.
/**
* Called by the executor bus to process a message sent through the executor bus.
* <p>
* The implementer may perform the send in the thread calling this method or
* pass the message off to a thread pool that it manages for greater parallelism.
* <p>
* Users of an executor bus should be able to expect ordered processing of messages
* on a per channel basis, so implementations that perform processing on multiple
* threads are encouraged to call MessageView.getMessageChannel() to determine
* the execution channel and process accordingly.
*
* @param view The view to send.
* @param acknowledger The acknowledger or null if no acknowledgement is required for processing this message.
* @param flags Flags provided by the executor bus as hints to the processor.
*/
public void process(MessageView view, Acknowledger acknowledger, int flags) throws Exception;
public class EmailGatewayProcessor implements LifecycleAwareExecutorBusProcessor {
private final Tracer tracer = Tracer.create("email.sender", INFO);
private volatile JavaMailSenderImpl mailer;
/**
* Called by executor bus prior to open.
*/
@Override
public void onExecutorBusOpen(MessageBusBinding binding) throws Exception {
// Get config properties:
Properties config = binding.getDescriptor().getProviderConfig();
mailer = new JavaMailSenderImpl();
mailer.setHost(config.getProperty("smtp_host"));
mailer.setPort(Short.valueOf(config.getProperty("smtp_port")));
mailer.setUsername(config.getProperty("smtp_user"));
mailer.setPassword(config.getProperty("smtp_password"));
mailer.setProtocol(config.getProperty("smtp_protocol"));
// etc...
tracer.log("EmailSender opened", INFO);
}
/**
* Called by executor bus binding on start.
*/
@Override
public void onExecutorBusStart(MessageBusBinding binding) throws Exception {
tracer.log("EmailSender started", INFO);
}
/**
* Called by executor bus binding on close.
*/
@Override
public void onExecutorBusClose(MessageBusBinding binding) throws Exception {
mailer = null;
tracer.log("EmailSender closed", INFO);
}
/**
* Executor bus callback.
*/
@Override
public final void process(MessageView message, Acknowledger acknowledger, int flags) {
try {
final MimeMessage template = mailer.createMimeMessage();
final MimeMessageHelper helper = new MimeMessageHelper(template, true);
helper.setSubject("ALERT: " + message.getClass().getSimpleName());
helper.setText(message.serializeToJson(), false);
// etc...
// send
mailer.send(template);
// acknowledge completion:
acknowledger.acknowledge();
}
catch (Exception e) {
// Acknowledge with a failure (to close the bus):
acknowledger.acknowledge(e);
}
}
}
/**
* Factory for creating EmailGatewayProcessors
*/
public class EmailGatewayProcessorFactory extends AbstractExecutorBusProcessorFactory {
@Override
public ExecutorBusProcessor createExecutorBusProcessor(MessageBusBinding binding) {
return new EmailGatewayProcessor();
}
}
@AppHAPolicy(value = HAPolicy.StateReplication)
public class EmailGatewayApp {
private volatile AepMessageSender messageSender;
@AppInjectionPoint
public void setMessageSender(AepMessageSender messageSender) {
this.messageSender = messageSender;
}
@EventHandler
public void onAlert(MyAppAlertMessage alert) {
// Send a copy of the received alert out through the
// email-alerts channel of the e-mail bus:
messageSender.send("email-alerts", alert.copy());
}
}