Ich möchte einen benutzerdefinierten Message-Handler erstellen, um Checkpoints in Flows zu verwenden. Außerdem werden diese Kontrollpunkte in ElasticSearch gespeichert.Wie man Beans in einem benutzerdefinierten Spring-Integration-Nachrichtenhandler autowire?
habe ich eine Klasse Checkpoint:
@Component
public class Checkpoint {
public static final String TASK_HEADER_KEY = "task";
public static CheckpointMessageHandlerSpec warn(String message) {
return new CheckpointMessageHandlerSpec(new CheckpointHandler("WARN", message));
}
}
// ... methods omitted: error, info etc
Weiter habe ich CheckpointMessageHandlerSpec:
public class CheckpointMessageHandlerSpec extends MessageHandlerSpec<CheckpointMessageHandlerSpec, CheckpointHandler> {
public CheckpointMessageHandlerSpec(CheckpointHandler checkpointHandler) {
this.target = checkpointHandler;
}
public CheckpointMessageHandlerSpec apply(Message<?> message) {
this.target.handleMessage(message);
return _this();
}
@Override
protected CheckpointHandler doGet() {
throw new UnsupportedOperationException();
}
}
CheckpointHandler, in dieser Klasse ich wünschte, die Dinge zu injizieren, wie Dienstleistungen oder Repositorys von Frühlings-Daten:
public class CheckpointHandler extends IntegrationObjectSupport implements MessageHandler {
private String status;
private String message;
// I want inject services or repositories here
public CheckpointHandler(String status, String message) {
this.status = status;
this.message = message;
}
@Override
public void handleMessage(Message<?> message) {
// Test to watch if I have the bean factory. It is always null
this.getBeanFactory();
Expression expression = EXPRESSION_PARSER.parseExpression("'" + this.message + "'");
// Here I intend to persist information of payload/headers with spring-data-elasticSearch repository previously injected
Object obj = expression.getValue(message);
}
}
Schließlich ist ein Beispiel für die Verwendung innerhalb eines Strömungs:
@Bean
public IntegrationFlow checkpointFlow(Checkpoint checkpoint) {
return IntegrationFlows.from(Http.inboundChannelAdapter("/checkpointFlow"))
.enrichHeaders(Collections.singletonMap(Checkpoint.TASK_HEADER_KEY, taskName))
.handle(new AppendMessageHandler())
.wireTap(c -> c.handle(m -> checkpoint.warn("SOMETHING IS HAPPENING HERE. MY PAYLOAD: ' + payload.toString() + '").apply(m)))
.handle(m -> log.info("[LOGGING DEMO] {}" , m.getPayload()))
.get();
}
private class AppendMessageHandler implements GenericHandler {
@Override
public String handle(Object payload, Map headers) {
return new StringBuilder().append(testMessage).toString();
}
}
Was ich vermisse? Ist es möglich, das zu tun? Ich hatte diese Idee nach dieser Frage How to create custom component and add it to flow in spring java dsl?
Vielen Dank!