2017-06-26 4 views
0

Ich versuche, JSON-Daten von RabbitMQ zu Apache Spark mit Java zu bekommen und einige Echtzeit-Analysen daraus zu machen.Apache Spark - Java, Gruppe Live Stream Daten

Ich bin in der Lage, die Daten und auch einige grundlegende SQL-Abfragen zu bekommen, aber ich bin nicht in der Lage, herauszufinden, die Gruppierung.

Unten finden Sie die JSON I

{"DeviceId":"MAC-101","DeviceType":"Simulator-1","data":{"TimeStamp":"26-06-2017 16:43:41","FR":10,"ASSP":20,"Mode":1,"EMode":2,"ProgramNo":2,"Status":3,"Timeinmillisecs":636340922213668165}} 
 
{"DeviceId":"MAC-101","DeviceType":"Simulator-1","data":{"TimeStamp":"26-06-2017 16:43:41","FR":10,"ASSP":20,"Mode":1,"EMode":2,"ProgramNo":2,"Status":3,"Timeinmillisecs":636340922213668165}} 
 
{"DeviceId":"MAC-102","DeviceType":"Simulator-1","data":{"TimeStamp":"26-06-2017 16:43:41","FR":10,"ASSP":20,"Mode":1,"EMode":2,"ProgramNo":2,"Status":3,"Timeinmillisecs":636340922213668165}} 
 
{"DeviceId":"MAC-102","DeviceType":"Simulator-1","data":{"TimeStamp":"26-06-2017 16:43:41","FR":10,"ASSP":20,"Mode":1,"EMode":2,"ProgramNo":2,"Status":3,"Timeinmillisecs":636340922213668165}}

Ich möchte haben sie id von Gerät zu gruppieren. Die Idee ist, dass ich Analysen für einzelne Geräte ausführen und sammeln kann. Unten ist der Beispielcode-Schnipsel, die ich

public static void main(String[] args) { 

     try { 

      mconf = new SparkConf(); 
      mconf.setAppName("RabbitMqReceiver"); 
      mconf.setMaster("local[*]"); 

      jssc = new JavaStreamingContext(mconf,Durations.seconds(10)); 

      SparkSession spksess = SparkSession 
        .builder() 
        .master("local[*]") 
        .appName("RabbitMqReceiver2") 
        .getOrCreate(); 


      SQLContext sqlctxt = new SQLContext(spksess); 

      JavaReceiverInputDStream<String> jsonData = jssc.receiverStream(
        new mqreceiver(StorageLevel.MEMORY_AND_DISK_2())); 

      //jsonData.print(); 

      JavaDStream<String> machineData = jsonData.window(Durations.minutes(1), Durations.seconds(20)); 

      machineData.foreachRDD(new VoidFunction<JavaRDD<String>>() { 

       @Override 
       public void call(JavaRDD<String> rdd) { 
        if(!rdd.isEmpty()){ 

         Dataset<Row> data = sqlctxt.read().json(rdd); 

         //Dataset<Row> data = spksess.read().json(rdd).select("*"); 
         data.createOrReplaceTempView("DeviceData"); 
         data.printSchema(); 
         //data.show(false); 

         // The below select query works 
         //Dataset<Row> groupedData = sqlctxt.sql("select * from DeviceData where DeviceId='MAC-101'"); 

         // The below sql fails... 

         Dataset<Row> groupedData = sqlctxt.sql("select * from DeviceData GROUP BY DeviceId"); 
         groupedData.show(); 


        } 
       } 
      }); 

      jssc.start(); 

      jssc.awaitTermination(); 

     } catch (InterruptedException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 
    } 

versuchen, was ich suche mit den gestreamten Daten zu tun ist, um zu sehen, ob ich die eingehenden Daten in einzelne Eimer schieben kann ...

Können sagen, uns haben die unten eingehenden Daten von rabbitmq,

Was ich tun möchte, ist entweder eine einzige Schlüssel/Wert-basierte Sammlung, die die Geräte-ID als Schlüssel und Liste als Wert haben Oder es könnte jemand Art der individuellen dynamischen Sammlung sein für jede Geräte-ID.

Können wir etwas tun, wie der Code unten (von url - http://backtobazics.com/big-data/spark/apache-spark-groupby-example/)

public class GroupByExample { 
 
     public static void main(String[] args) throws Exception { 
 
      
 
      JavaSparkContext sc = new JavaSparkContext(); 
 
      
 
      // Parallelized with 2 partitions 
 
      JavaRDD<String> rddX = sc.parallelize(
 
        Arrays.asList("Joseph", "Jimmy", "Tina", 
 
          "Thomas", "James", "Cory", 
 
          "Christine", "Jackeline", "Juan"), 3); 
 
      
 
      JavaPairRDD<Character, Iterable<String>> rddY = rddX.groupBy(word -> word.charAt(0)); 
 
      
 
      System.out.println(rddY.collect()); 
 
     } 
 
    }

So in unserem Fall brauchen wir einen Filter für die Gruppe von WRT passieren DeviceId

Arbeitscode ....

JavaDStream<String> strmData = jssc.receiverStream(
 
\t \t \t \t \t new mqreceiver(StorageLevel.MEMORY_AND_DISK_2())); 
 
\t \t \t 
 
//This is just a sliding window i have kept 
 
JavaDStream<String> machineData = strmData.window(Durations.minutes(1), Durations.seconds(10)); 
 
machineData.print(); 
 

 
JavaPairDStream<String, String> pairedData = machineData.mapToPair(s -> new Tuple2<String, String>(s.substring(5, 10) , new String(s))); 
 
\t \t \t 
 
JavaPairDStream<String, Iterable<String>> groupedData = pairedData.groupByKey(); 
 
groupedData.print();

Antwort

1

Es ist, weil in Abfragen mit Gruppe, können nur folgende Spalten in select verwendet werden: in Gruppe

von

  • Aggregation von jeder Spalte aufgeführt

    • Spalten Wenn Sie verwenden "*", dann werden alle Spalten in select verwendet - und deshalb schlägt die Abfrage fehl. Ändern Sie die Abfrage zum Beispiel:

      select DeviceId, count(distinct DeviceType) as deviceTypeCount from DeviceData group by DeviceId 
      

      und es wird funktionieren, weil es nur Spalte in der Gruppe verwendet, die durch und Spalten in Aggregationsfunktionen

  • +0

    Danke, ich hätte über Gruppierungslogik nachdenken sollen, die viel sql selbst geschrieben hat. Wie Sie vorgeschlagen haben, kommt die Abfrage offensichtlich durch, wenn Sie der Abfrage eine Aggregation zugeordnet haben. Ich habe meine Frage mit weiteren Details bearbeitet, bitte lassen Sie mich Ihre Gedanken wissen. –

    0

    Die GROUP BY-Anweisung oft mit Aggregatfunktionen verwendet wird (COUNT, MAX , MIN, SUM, AVG), um die Ergebnismenge nach einer oder mehreren Spalten zu gruppieren.

    +0

    Dieser Code wurde im Hauptpost aktualisiert, nur für den Fall, dass jemand das gleiche Problem betrachtet. –