2016-08-24 1 views
1

Ich habe eine Anforderung, wo ich für eine Datei kontinuierlich an Unix-Speicherort suchen muss. Sobald es verfügbar ist, dann muss ich es analysieren und konvertieren zu einigen Json-Format.Dies muss sein getan mit Spring-Integration - DSL. Es folgt das Stück Code, den ich vom Frühjahr Website bekam aber es zeigt folgende Ausnahme:Spring Integration mit DSL für das Lesen der Datei von Unix-Speicherort

o.s.integration.handler.LoggingHandler: org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.processFileChannel'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers 

Unten ist der Code:

@SpringBootApplication 
public class FileReadingJavaApplication { 

    public static void main(String[] args) { 
     new SpringApplicationBuilder(FileReadingJavaApplication.class) 
      .web(false) 
      .run(args); 
    } 

    @Bean 
    public IntegrationFlow fileReadingFlow() { 
     return IntegrationFlows 
        .from(s -> s.file(new File("Y://")) 
           .patternFilter("*.txt"), 
          e -> e.poller(Pollers.fixedDelay(1000))) 
        .transform(Transformers.fileToString()) 
        .channel("processFileChannel") 
        .get(); 
     } 

} 

Neuer Code:

@SpringBootApplication public class SpringIntegration {

public static void main(String[] args) { 
    new SpringApplicationBuilder(SpringIntegration.class) 
    .web(false) 
    .run(args); 
} 

@Bean 
public SessionFactory<LsEntry> sftpSessionFactory() { 
    DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true); 
    factory.setHost("ip"); 
    factory.setPort(port); 
    factory.setUser("username"); 
    factory.setPassword("pwd"); 
    factory.setAllowUnknownKeys(true); 
    return new CachingSessionFactory<LsEntry>(factory); 
} 

@Bean 
public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() { 
    SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory()); 
    fileSynchronizer.setDeleteRemoteFiles(false); 
    fileSynchronizer.setRemoteDirectory("remote dir"); 
    fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter("*.txt")); 

    return fileSynchronizer; 
} 
@Bean 
@InboundChannelAdapter(channel = "sftpChannel", poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1")) 
public MessageSource ftpMessageSource() { 
    SftpInboundFileSynchronizingMessageSource source = 
      new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer()); 
    source.setLocalFilter(new AcceptOnceFileListFilter<File>()); 
    source.setLocalDirectory(new File("Local directory")); 

    return source; 
} 

@Bean 
@ServiceActivator(inputChannel = "fileInputChannel") 
public MessageHandler handler() { 
    return new MessageHandler() { 


     @Override 
     public void handleMessage(Message<?> message) throws MessagingException { 
      System.out.println("File Name : "+message.getPayload()); 

     } 

    }; 
} 

@ Be an public static StandardIntegrationFlow processFileFlow() { Rückgabe IntegrationFlows .from ("fileInputChannel"). split() .handle ("fileProcessor", "process"). get();

} 

@Bean 
@InboundChannelAdapter(value = "fileInputChannel", poller = @Poller(fixedDelay = "1000")) 
public MessageSource<File> fileReadingMessageSource() { 
    AcceptOnceFileListFilter<File> filters =new AcceptOnceFileListFilter<>(); 

    FileReadingMessageSource source = new FileReadingMessageSource(); 
    source.setAutoCreateDirectory(true); 
    source.setDirectory(new File("Local directory")); 
    source.setFilter(filters); 

    return source; 
} 
@Bean 
public FileProcessor fileProcessor() { 
    return new FileProcessor(); 
} 


@Bean 
    @ServiceActivator(inputChannel = "fileInputChannel") 
    public AmqpOutboundEndpoint amqpOutbound(AmqpTemplate amqpTemplate) { 
     AmqpOutboundEndpoint outbound = new AmqpOutboundEndpoint(amqpTemplate); 
     outbound.setExpectReply(true); 
     outbound.setRoutingKey("foo"); // default exchange - route to queue 'foo' 
     return outbound; 
    } 



    @MessagingGateway(defaultRequestChannel = "amqpOutboundChannel") 
    public interface MyGateway { 
     String sendToRabbit(String data); 

    } 

}

FileProcessor:

public class FileProcessor {

public void process(Message<String> msg) { 
    String content = msg.getPayload(); 
    JSONObject jsonObject ; 
    Map<String, String> dataMap = new HashMap<String, String>(); 
    for(int i=0;i<=content.length();i++){ 
    String userId = content.substring(i+5,i+16); 


    dataMap = new HashMap<String, String>(); 

    dataMap.put("username", username.trim()); 


    i+=290; //each record of size 290 in file 
    jsonObject = new JSONObject(dataMap); 
    System.out.println(jsonObject); 

    } 

} 

}

Antwort

0

Ihr Code korrekt ist, aber eine Ausnahme sagt Ihnen, dass es etwas brauchen was liest Nachrichten vom direkten Kanal "processFileChannel".

Bitte lesen Sie mehr über verschiedene Kanaltypen in der Spring Integration Reference Manual.

EDIT

Eine der ersten Klasse Bürger in Spring Integration ist MessageChannel Abstraktion. Weitere Informationen finden Sie unter EIP.

Die Definition wie .channel("processFileChannel") bedeuten deklarieren DirectChannel. Diese Art von Kanal bedeutet, dass die Nachricht beim Senden akzeptiert wird und die Verarbeitung direkt nur in send Prozess ausgeführt wird. In den rohen Java-Wörtern mag es so klingen: Rufen Sie einen Dienst von einem anderen an. Werfen Sie NPE, wenn der andere nicht autowired wurde.

Also, wenn Sie DirectChannel für die Ausgabe verwenden, sollten Sie irgendwo einen Abonnenten dafür erklären. Ich weiß nicht, was ist Ihre Logik, aber so funktioniert es und keine andere Wahl zu beheben Dispatcher has no subscribers for channel.

Obwohl Sie einen anderen MessageChannel Typ verwenden können. Aber für diesen Zweck sollten Sie mehr Dokument lesen, z. Mark Fishers Spring Integration in Action.

+0

Eigentlich bin ich neu in Spring Integration.Das Konzept ist ein bisschen schwierig, es wird hilfreich sein, wenn Sie mir die Änderungen sagen können, die ich gemäß meiner Anforderung tun muss. – user

+0

Alles, was Sie tun, ist, den Inhalt der Datei an "processFileChannel" zu senden - aber Sie brauchen etwas, um diese Daten zu konsumieren - irgendeinen anderen Fluss, der mit dem Kanal beginnt. –

+0

Bitte finden Sie ein 'EDIT' in meiner Antwort. –

Verwandte Themen