2017-06-17 3 views
1

Ist es möglich, eine Implementierung eines Nachrichtensplitters zu haben, der eine IteratorAND benutzerdefinierte Header-Informationen hinzufügen kann?Spring-Integration: Benutzerdefinierter Splitter mit Header-Anreicherung

Zum Beispiel, wenn ich die folgenden Klasse

public class CsvFileToIteratorSplitter extends AbstractMessageSplitter { 

    @Override 
    protected Object splitMessage(Message<?> message) { 
     Object payload = message.getPayload(); 
     Assert.isInstanceOf(File.class, payload, "Expected java.io.File in the message payload"); 

     try { 
      InputStream source = new FileInputStream((File) payload); 
      BufferedReader reader = new BufferedReader(new InputStreamReader(source)); 

      String header = reader.lines().findFirst().orElse(null); 

      return MessageBuilder.withPayload(reader.lines().iterator()) 
        .setHeaderIfAbsent("HEADER", header) 
        .build(); 

     } catch (IOException e) { 
      throw new UncheckedIOException(e); 
     } 
    } 
} 

Dann kann ich zu dem Header hinzufügen, aber die Nutzlast ist eigentlich eine Instanz von Iterator und die Spaltung nicht

Wenn ich so ändern, dass die Klasse habe ist jetzt

public class CsvFileToIteratorSplitter extends AbstractMessageSplitter { 

    @Override 
    protected Object splitMessage(Message<?> message) { 
     log.debug("{}", message.toString()); 

     Object payload = message.getPayload(); 
     Assert.isInstanceOf(File.class, payload, "Expected java.io.File in the message payload"); 

     try { 
      InputStream source = new FileInputStream((File) payload); 
      BufferedReader reader = new BufferedReader(new InputStreamReader(source)); 

      return reader.lines().iterator(); 

     } catch (IOException e) { 
      throw new UncheckedIOException(e); 
     } 
    } 
} 

Die Aufteilung funktioniert, aber ich verliere die Header-Info.

Gibt es eine Möglichkeit, eine funktionierende Aufteilung mit der Möglichkeit, die Kopfzeile hinzuzufügen?

Antwort

1

Sie sollten eine Iterator<MessageBuilder<String>> zurückkehren ...

@SpringBootApplication 
public class So44604817Application { 

    public static void main(String[] args) { 
     ConfigurableApplicationContext context = SpringApplication.run(So44604817Application.class, args); 
     context.getBean("in", MessageChannel.class).send(new GenericMessage<>(new File("/tmp/foo.txt"))); 
     context.close(); 
    } 

    @Bean 
    @Splitter(inputChannel = "in") 
    public MySplitter splitter() { 
     MySplitter splitter = new MySplitter(); 
     splitter.setOutputChannelName("out"); 
     return splitter; 
    } 

    @Bean 
    public MessageChannel out() { 
     return new MessageChannel() { 

      @Override 
      public boolean send(Message<?> message) { 
       return send(message, -1); 
      } 

      @Override 
      public boolean send(Message<?> message, long timeout) { 
       System.out.println(message); 
       return true; 
      } 

     }; 
    } 

    public static class MySplitter extends AbstractMessageSplitter { 

     @SuppressWarnings("resource") 
     @Override 
     protected Object splitMessage(Message<?> message) { 
      Object payload = message.getPayload(); 
      Assert.isInstanceOf(File.class, payload, "Expected java.io.File in the message payload"); 

      try { 
       InputStream source = new FileInputStream((File) payload); 
       final BufferedReader reader = new BufferedReader(new InputStreamReader(source)); 
       final String header = reader.lines().findFirst().orElse(null); 
       final Iterator<String> iterator = reader.lines().iterator(); 
       Iterator<MessageBuilder<String>> builderIterator = new Iterator<MessageBuilder<String>>() { 

        private String next; 

        @Override 
        public boolean hasNext() { 
         if (this.next != null) { // handle multiple hasNext() calls. 
          return true; 
         } 
         if (!iterator.hasNext()) { 
          try { 
           reader.close(); 
          } 
          catch (IOException e) { 
           e.printStackTrace(); 
          } 
          return false; 
         } 
         else { 
          this.next = iterator.next(); 
          // Handle empty last line 
          if (next.length() == 0 && !iterator.hasNext()) { 
           try { 
            reader.close(); 
           } 
           catch (IOException e) { 
            e.printStackTrace(); 
           } 
           return false; 
          } 
          return true; 
         } 
        } 

        @Override 
        public MessageBuilder<String> next() { 
         String line = this.next; 
         this.next = null; 
         return MessageBuilder 
           .withPayload(line).setHeaderIfAbsent("HEADER", header); 
        } 

       }; 
       return builderIterator; 
      } 
      catch (IOException e) { 
       throw new UncheckedIOException(e); 
      } 
     } 

    } 

} 

Beachten Sie, dass Ihre skip(1) nicht korrekt ist, da die erste Zeile wurde aus dem Leser bereits verbraucht.

Mit Datei:

FOO,BAR 
foo,bar 
baz.qux 

Ergebnis:

GenericMessage [payload=foo,bar, headers={sequenceNumber=1, HEADER=FOO,BAR, correlationId=42ce2e1f-5337-1f75-d4fe-0d7f366f76f1, id=94e98261-fd49-b4d0-f6a0-3181b27f145b, sequenceSize=0, timestamp=1497713691192}] 
GenericMessage [payload=baz.qux, headers={sequenceNumber=2, HEADER=FOO,BAR, correlationId=42ce2e1f-5337-1f75-d4fe-0d7f366f76f1, id=c0b1edd6-adb9-3857-cb7c-70f603f376bc, sequenceSize=0, timestamp=1497713691192}] 

JIRA Issue INT-4297 to add this functionality to FileSplitter.

+0

Danke. Funktioniert gut – Pram