Ich versuche, JSON-Daten von RabbitMQ zu Apache Spark mit Java zu bekommen und einige Echtzeit-Analysen daraus zu machen.Apache Spark - Java, Gruppe Live Stream Daten
Ich bin in der Lage, die Daten und auch einige grundlegende SQL-Abfragen zu bekommen, aber ich bin nicht in der Lage, herauszufinden, die Gruppierung.
Unten finden Sie die JSON I
{"DeviceId":"MAC-101","DeviceType":"Simulator-1","data":{"TimeStamp":"26-06-2017 16:43:41","FR":10,"ASSP":20,"Mode":1,"EMode":2,"ProgramNo":2,"Status":3,"Timeinmillisecs":636340922213668165}}
{"DeviceId":"MAC-101","DeviceType":"Simulator-1","data":{"TimeStamp":"26-06-2017 16:43:41","FR":10,"ASSP":20,"Mode":1,"EMode":2,"ProgramNo":2,"Status":3,"Timeinmillisecs":636340922213668165}}
{"DeviceId":"MAC-102","DeviceType":"Simulator-1","data":{"TimeStamp":"26-06-2017 16:43:41","FR":10,"ASSP":20,"Mode":1,"EMode":2,"ProgramNo":2,"Status":3,"Timeinmillisecs":636340922213668165}}
{"DeviceId":"MAC-102","DeviceType":"Simulator-1","data":{"TimeStamp":"26-06-2017 16:43:41","FR":10,"ASSP":20,"Mode":1,"EMode":2,"ProgramNo":2,"Status":3,"Timeinmillisecs":636340922213668165}}
Ich möchte haben sie id von Gerät zu gruppieren. Die Idee ist, dass ich Analysen für einzelne Geräte ausführen und sammeln kann. Unten ist der Beispielcode-Schnipsel, die ich
public static void main(String[] args) {
try {
mconf = new SparkConf();
mconf.setAppName("RabbitMqReceiver");
mconf.setMaster("local[*]");
jssc = new JavaStreamingContext(mconf,Durations.seconds(10));
SparkSession spksess = SparkSession
.builder()
.master("local[*]")
.appName("RabbitMqReceiver2")
.getOrCreate();
SQLContext sqlctxt = new SQLContext(spksess);
JavaReceiverInputDStream<String> jsonData = jssc.receiverStream(
new mqreceiver(StorageLevel.MEMORY_AND_DISK_2()));
//jsonData.print();
JavaDStream<String> machineData = jsonData.window(Durations.minutes(1), Durations.seconds(20));
machineData.foreachRDD(new VoidFunction<JavaRDD<String>>() {
@Override
public void call(JavaRDD<String> rdd) {
if(!rdd.isEmpty()){
Dataset<Row> data = sqlctxt.read().json(rdd);
//Dataset<Row> data = spksess.read().json(rdd).select("*");
data.createOrReplaceTempView("DeviceData");
data.printSchema();
//data.show(false);
// The below select query works
//Dataset<Row> groupedData = sqlctxt.sql("select * from DeviceData where DeviceId='MAC-101'");
// The below sql fails...
Dataset<Row> groupedData = sqlctxt.sql("select * from DeviceData GROUP BY DeviceId");
groupedData.show();
}
}
});
jssc.start();
jssc.awaitTermination();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
versuchen, was ich suche mit den gestreamten Daten zu tun ist, um zu sehen, ob ich die eingehenden Daten in einzelne Eimer schieben kann ...
Können sagen, uns haben die unten eingehenden Daten von rabbitmq,
Was ich tun möchte, ist entweder eine einzige Schlüssel/Wert-basierte Sammlung, die die Geräte-ID als Schlüssel und Liste als Wert haben Oder es könnte jemand Art der individuellen dynamischen Sammlung sein für jede Geräte-ID.
Können wir etwas tun, wie der Code unten (von url - http://backtobazics.com/big-data/spark/apache-spark-groupby-example/)
public class GroupByExample {
public static void main(String[] args) throws Exception {
JavaSparkContext sc = new JavaSparkContext();
// Parallelized with 2 partitions
JavaRDD<String> rddX = sc.parallelize(
Arrays.asList("Joseph", "Jimmy", "Tina",
"Thomas", "James", "Cory",
"Christine", "Jackeline", "Juan"), 3);
JavaPairRDD<Character, Iterable<String>> rddY = rddX.groupBy(word -> word.charAt(0));
System.out.println(rddY.collect());
}
}
So in unserem Fall brauchen wir einen Filter für die Gruppe von WRT passieren DeviceId
Arbeitscode ....
JavaDStream<String> strmData = jssc.receiverStream(
\t \t \t \t \t new mqreceiver(StorageLevel.MEMORY_AND_DISK_2()));
\t \t \t
//This is just a sliding window i have kept
JavaDStream<String> machineData = strmData.window(Durations.minutes(1), Durations.seconds(10));
machineData.print();
JavaPairDStream<String, String> pairedData = machineData.mapToPair(s -> new Tuple2<String, String>(s.substring(5, 10) , new String(s)));
\t \t \t
JavaPairDStream<String, Iterable<String>> groupedData = pairedData.groupByKey();
groupedData.print();
Danke, ich hätte über Gruppierungslogik nachdenken sollen, die viel sql selbst geschrieben hat. Wie Sie vorgeschlagen haben, kommt die Abfrage offensichtlich durch, wenn Sie der Abfrage eine Aggregation zugeordnet haben. Ich habe meine Frage mit weiteren Details bearbeitet, bitte lassen Sie mich Ihre Gedanken wissen. –