2017-12-15 2 views
0

I für die Umwandlung von JSON-Arrays zu JSON-Objekten einen Code in normalen java haben und ich brauche diese normalen Java umwandeln zu kafka Streams ... unten ist meinkafka-Streams für die Umwandlung von JSON-Arrays zu JSON-Objekten

import java.io.*; 
import java.util.*; 
import java.lang.*; 
import org.json.simple.JSONArray; 
import org.json.simple.JSONObject; 
import org.json.simple.parser.JSONParser; 
import org.json.simple.parser.ParseException; 
public class JsonParseTest { 
    public static void main(String[] args) { 
     try { 
      JSONParser parser = new JSONParser(); 
      JSONArray jsonArray = (JSONArray) parser.parse(new FileReader("/root/jsonTestFile.json")); 
      for (Object o : jsonArray) { 
//to get the Json object 
       JSONObject snap = (JSONObject) o; 
       System.out.println(snap); 
      } 
} 
catch (FileNotFoundException ex) { 
      ex.printStackTrace(); 
     } catch (IOException ex) { 
      ex.printStackTrace(); 
     } catch (ParseException ex) { 
      ex.printStackTrace(); 
     } catch (NullPointerException ex) { 
      ex.printStackTrace(); 
     } 
    } 
} 

Wenn jemand mir helfen, Code für nur Logikteil in schriftlicher Form i mit, dass coninue kann, ist unter meinem Logikteil

public class JsonParseTest { 
    public static void main(String[] args) { 
     try { 
      JSONParser parser = new JSONParser(); 
      JSONArray jsonArray = (JSONArray) parser.parse(new FileReader("/root/jsonTestFile.json")); 
      for (Object o : jsonArray) { 
//to get the Json object 
       JSONObject snap = (JSONObject) o; 
       System.out.println(snap); 
      } 
} 

für diesen atleast ich brauche Hilfe Wie kann ich der gleiche Code in Kafka Ströme schreiben? kann jemand dabei helfen?

+0

Sie müssen Serializer und Deserializer implementieren, diese [Link] (https://docs.confluent.io/3.0.0/streams/developer-guide.html#data-types-and-serialization) wird Ihnen helfen. – Onkar

Antwort

0

würde ich zwei Optionen empfehlen:

  • Sie implementieren Ihre eigenen Serializer/Deserializer (Here is the javadoc)
  • Sie können Ihren Strom als Strom (String, String) und verwenden Sie einen flatMap verarbeiten zu analysieren jedes Element des Stroms und verwandelt es in einen Strom (String, JSONObject):

    JSONParser parser = new JSONParser(); 
    stringStream.flatMap((k,v) -> { 
        List<KeyValue<String,JSONOBject>> tmp = new ArrayList<KeyValue<String,JSONOBject>>(); 
        JSONArray jsonArray = (JSONArray) parser.parse(v); 
        for (Object o : jsonArray) { 
         JSONObject snap = (JSONObject) o; 
         tmp.add(new KeyValue(k, snap)); 
        }  
        return tmp; 
    }); 
    

Her Ich behandelte die Ausnahme nicht, Sie müssen den Code des Lambda in einen Versuch/Fang einwickeln.

+0

Hey, kannst du meinen obigen Code überprüfen, den ich als Antwort gepostet habe, und meine Zweifel beseitigen –

0

Dank für die Hilfe so sieht nun Code so etwas wie unter dem einige Fehler

zeigt
KStreamBuilder builder = new KStreamBuilder(); 
     KStream<String, String> mstream = builder.stream(Serdes.String(), Serdes.String(), "intopic"); 
     KStream<String, JSONPObject>msstream = mstream.flatMap((k,v) -> { 
      List<KeyValue<String, JSONPObject>> tmp = new ArrayList<KeyValue<String, JSONPObject>>(); 
      JSONParser parser= new JSONParser() ; 
      JsonNode jsonNode = (JsonNode) parser.parse(v); 
      for (Object o : jsonNode){ 
       JSONPObject snap = (JSONPObject) o; 
       tmp.add(new KeyValue(k,snap)); 
      } 
      return tmp; 
     }); 

So Fehler es sind in JSONParser Argumente wirft. Es fordert mich auf, Source, Global und Boolean einzuschließen ... und ein weiterer Fehler ist parser.parse (v); hier zeigt es v hat einen Fehler .... n ist der Code korrekt?

Verwandte Themen