Ich habe eine Anforderung, wo ich für eine Datei kontinuierlich an Unix-Speicherort suchen muss. Sobald es verfügbar ist, dann muss ich es analysieren und konvertieren zu einigen Json-Format.Dies muss sein getan mit Spring-Integration - DSL. Es folgt das Stück Code, den ich vom Frühjahr Website bekam aber es zeigt folgende Ausnahme:Spring Integration mit DSL für das Lesen der Datei von Unix-Speicherort
o.s.integration.handler.LoggingHandler: org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.processFileChannel'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
Unten ist der Code:
@SpringBootApplication
public class FileReadingJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FileReadingJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow fileReadingFlow() {
return IntegrationFlows
.from(s -> s.file(new File("Y://"))
.patternFilter("*.txt"),
e -> e.poller(Pollers.fixedDelay(1000)))
.transform(Transformers.fileToString())
.channel("processFileChannel")
.get();
}
}
Neuer Code:
@SpringBootApplication public class SpringIntegration {
public static void main(String[] args) {
new SpringApplicationBuilder(SpringIntegration.class)
.web(false)
.run(args);
}
@Bean
public SessionFactory<LsEntry> sftpSessionFactory() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost("ip");
factory.setPort(port);
factory.setUser("username");
factory.setPassword("pwd");
factory.setAllowUnknownKeys(true);
return new CachingSessionFactory<LsEntry>(factory);
}
@Bean
public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
fileSynchronizer.setDeleteRemoteFiles(false);
fileSynchronizer.setRemoteDirectory("remote dir");
fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter("*.txt"));
return fileSynchronizer;
}
@Bean
@InboundChannelAdapter(channel = "sftpChannel", poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))
public MessageSource ftpMessageSource() {
SftpInboundFileSynchronizingMessageSource source =
new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer());
source.setLocalFilter(new AcceptOnceFileListFilter<File>());
source.setLocalDirectory(new File("Local directory"));
return source;
}
@Bean
@ServiceActivator(inputChannel = "fileInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println("File Name : "+message.getPayload());
}
};
}
@ Be an public static StandardIntegrationFlow processFileFlow() { Rückgabe IntegrationFlows .from ("fileInputChannel"). split() .handle ("fileProcessor", "process"). get();
}
@Bean
@InboundChannelAdapter(value = "fileInputChannel", poller = @Poller(fixedDelay = "1000"))
public MessageSource<File> fileReadingMessageSource() {
AcceptOnceFileListFilter<File> filters =new AcceptOnceFileListFilter<>();
FileReadingMessageSource source = new FileReadingMessageSource();
source.setAutoCreateDirectory(true);
source.setDirectory(new File("Local directory"));
source.setFilter(filters);
return source;
}
@Bean
public FileProcessor fileProcessor() {
return new FileProcessor();
}
@Bean
@ServiceActivator(inputChannel = "fileInputChannel")
public AmqpOutboundEndpoint amqpOutbound(AmqpTemplate amqpTemplate) {
AmqpOutboundEndpoint outbound = new AmqpOutboundEndpoint(amqpTemplate);
outbound.setExpectReply(true);
outbound.setRoutingKey("foo"); // default exchange - route to queue 'foo'
return outbound;
}
@MessagingGateway(defaultRequestChannel = "amqpOutboundChannel")
public interface MyGateway {
String sendToRabbit(String data);
}
}
FileProcessor:
public class FileProcessor {
public void process(Message<String> msg) {
String content = msg.getPayload();
JSONObject jsonObject ;
Map<String, String> dataMap = new HashMap<String, String>();
for(int i=0;i<=content.length();i++){
String userId = content.substring(i+5,i+16);
dataMap = new HashMap<String, String>();
dataMap.put("username", username.trim());
i+=290; //each record of size 290 in file
jsonObject = new JSONObject(dataMap);
System.out.println(jsonObject);
}
}
}
Eigentlich bin ich neu in Spring Integration.Das Konzept ist ein bisschen schwierig, es wird hilfreich sein, wenn Sie mir die Änderungen sagen können, die ich gemäß meiner Anforderung tun muss. – user
Alles, was Sie tun, ist, den Inhalt der Datei an "processFileChannel" zu senden - aber Sie brauchen etwas, um diese Daten zu konsumieren - irgendeinen anderen Fluss, der mit dem Kanal beginnt. –
Bitte finden Sie ein 'EDIT' in meiner Antwort. –