2017-12-15 4 views
-2

Ich war etwas mit dem kodierenden Teil in kafka versuchen Strömen, die unterWert ist bereits in ihrem Umfang definiert in Kafka Strom

KStreamBuilder builder = new KStreamBuilder(); 
KStream<String, String> textlines = builder.stream("iostatin2"); 
KStream<String, String> mstream = textlines 
       .mapValues(value -> value.replace("[","")) 
       .mapValues(value -> value.replace("]","")) 
       .mapValues(value -> value.replaceAll("\":\"", "\":")) 
       .mapValues(value -> value.replaceAll("\":", "\":\"")) 
       .mapValues(value -> value.replaceAll("\",\"", ",\"")) 
       .mapValues(value -> value.replaceAll(",\"", "\",\"")) 
       .mapValues(value -> value.replaceAll(":\"\\{", ":\\{")) 
       .mapValues(value -> value.replaceAll("\\}\",", "\\},")) 
       .mapValues(value -> value.replaceAll("\\},\\{" ,"\\}\\},\\{\\{")); 

     textlines.foreach(new ForeachAction<String, String>() { 
        @Override 
        public void apply(String key, String value) { 
         try { 
          textlines.flatMapValues(value -> Arrays.asList(value.split("\\},\\{"))); 

          Thread.sleep(2000); 
         } catch (InterruptedException e) { 
          e.printStackTrace(); 
         } 

        } 
       }); 

so in der foreachaction() Funktion

textlines.flatMapValues(value -> Arrays.asList(value.split("\\},\\{"))); 

in dieser gezeigt wird Der Zeilenwert verursacht einen Fehler, da die Variable 'Wert' bereits im Gültigkeitsbereich definiert ist. Also, was soll ich diese Zeile mit ersetzen ... plzz helfen Sie mir ..

+2

Die Variable einfach umbenennen? –

Antwort

1

Wert wird bereits verwendet als Parameter in apply() Methode oben, So Wert ändert in der Zeile textlines.flatMapValues(value -> Arrays.asList(value.split("\\},\\{")));

einen anderer Name wie v

textlines.flatMapValues(v -> Arrays.asList(v.split("\\},\\{")));

Verwandte Themen