How to write a publishing Service plugin that uses a typed invoke strategy¶
This guide extends the basic publishing service example by delivering events via a strongly-typed interface on the event processor. Instead of always calling onEvent(Object), the service configures a custom EventToInvokeStrategy and a CallBackType bound to an interface. Only processors that implement that interface will be targeted, and the strategy invokes the typed method.
What you will learn:
- Define a listener interface that processors can implement (e.g., PublishingServiceListener)
- Configure a service to use CallBackType.forClass(PublishingServiceListener.class)
- Provide an EventToInvokeStrategy that calls listener.onServiceEvent(String)
- Subscribe from the processor via @ServiceRegistered and service.subscribe()
References in this repository:
- Typed service: PublishingService.java
- Listener interface: PublishingServiceListener.java
- Typed processor: PublishingServiceTypedSubscriberHandler.java
- End-to-end test: PublishingServiceTypedInvokeExampleTest.java
- Base service support: AbstractEventSourceService.java
1) Define a listener interface¶
package com.fluxtion.server.example;
public interface PublishingServiceListener {
void onServiceEvent(String event);
}
2) Implement a typed publishing service¶
Extend AbstractEventSourceService
- CallBackType.forClass(PublishingServiceListener.class) as the callback type
- A supplier of a custom EventToInvokeStrategy that calls the typed method
package com.fluxtion.server.example;
import com.fluxtion.server.dispatch.AbstractEventToInvocationStrategy;
import com.fluxtion.server.service.CallBackType;
import com.fluxtion.server.service.extension.AbstractEventSourceService;
import com.fluxtion.runtime.StaticEventProcessor;
public class PublishingServiceTyped extends AbstractEventSourceService<String> {
public PublishingServiceTyped(String name) {
super(name,
CallBackType.forClass(PublishingServiceListener.class),
TypedInvokeStrategy::new);
}
public void publish(String event) {
if (output != null) {
output.publish(event);
}
}
static class TypedInvokeStrategy extends AbstractEventToInvocationStrategy {
@Override
protected void dispatchEvent(Object event, StaticEventProcessor eventProcessor) {
if (eventProcessor instanceof PublishingServiceListener listener && event instanceof String s) {
listener.onServiceEvent(s);
} else {
eventProcessor.onEvent(event);
}
}
@Override
protected boolean isValidTarget(StaticEventProcessor eventProcessor) {
return eventProcessor instanceof PublishingServiceListener;
}
}
}
3) Implement a processor that subscribes and implements the listener¶
package com.fluxtion.server.example;
import com.fluxtion.runtime.DefaultEventProcessor;
import com.fluxtion.runtime.annotations.runtime.ServiceRegistered;
import com.fluxtion.runtime.node.ObjectEventHandlerNode;
import com.fluxtion.runtime.output.MessageSink;
public class PublishingServiceTypedSubscriberHandler extends DefaultEventProcessor
implements PublishingServiceListener {
private final TypedHandler typedHandler;
public PublishingServiceTypedSubscriberHandler(TypedHandler typedHandler) {
super(typedHandler);
this.typedHandler = typedHandler;
}
@Override
public void onServiceEvent(String event) {
typedHandler.onServiceEvent(event);
}
public static class TypedHandler extends ObjectEventHandlerNode implements PublishingServiceListener {
private PublishingServiceTyped service;
private MessageSink<String> sink;
@ServiceRegistered
public void wire(PublishingServiceTyped service, String name) {
this.service = service;
}
@ServiceRegistered
public void sink(MessageSink<String> sink, String name) {
this.sink = sink;
}
@Override
public void start() {
if (service != null) {
service.subscribe();
}
}
@Override
public void tearDown() {
// No-op
}
@Override
public void onServiceEvent(String event) {
if (sink != null) {
sink.accept(event);
}
}
}
}
4) Wire and test¶
The test boots a server, registers the typed service and a processor that implements the listener, then publishes events:
PublishingServiceTyped pubService = new PublishingServiceTyped("pubServiceTyped");
PublishingServiceTypedSubscriberHandler handler = new PublishingServiceTypedSubscriberHandler(new PublishingServiceTypedSubscriberHandler.TypedHandler());
EventProcessorGroupConfig processorGroup = EventProcessorGroupConfig.builder()
.agentName("processor-agent")
.put("typed-subscriber-processor", EventProcessorConfig.builder().handler(handler).build())
.build();
ServiceConfig<PublishingServiceTyped> svcCfg = ServiceConfig.<PublishingServiceTyped>builder()
.service(pubService)
.serviceClass(PublishingServiceTyped.class)
.name("pubServiceTyped")
//.agent("service-agent", new BusySpinIdleStrategy()) // optional: uncomment to give the service its own agent
.build();
AppConfig appConfig = AppConfig.builder()
.addProcessorGroup(processorGroup)
.addService(svcCfg)
.build();
FluxtionServer server = FluxtionServer.bootServer(appConfig, rec -> {});
pubService.publish("t1");
pubService.publish("t2");
The processor receives these via onServiceEvent and can forward them to a sink.
Why use typed invokes?¶
- Explicit contracts between service and processor via an interface
- Compile-time safety for callback signatures
- Ability to filter eligible processors by interface
- Coexists with onEvent fallback if desired