2016-06-20 7 views
0

Ich verwende Flink Streaming Beispiel, wo Racks zur Verfügung & I durch Rack IDs..following Summe von Temperatur Gruppe berechnet werden soll mein Code:Flink Streaming während Summe Ausnahme werfen Berechnung

static Properties properties=new Properties(); 
    public static Properties getProperties() 
    { 
     properties.setProperty("bootstrap.servers", "54.210.139.57:9092"); 
     properties.setProperty("zookeeper.connect", "54.210.139.57:2181"); 
     //properties.setProperty("deserializer.class", "kafka.serializer.StringEncoder"); 
     //properties.setProperty("group.id", "akshay"); 
     properties.setProperty("auto.offset.reset", "earliest"); 
     return properties; 
    } 

@SuppressWarnings("rawtypes") 
public static void main(String[] args) throws Exception 
{ 
    StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment(); 
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 
    Properties props=Program.getProperties(); 
    //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 

    /*DataStream<String> dstream=env.addSource(new FlinkKafkaConsumer09<String>("TemperatureEvent",new SimpleStringSchema(), props)); 
    dstream.filter(dstream -> dstream.)*/ 
    DataStream<TemperatureEvent> dstream=env.addSource(new FlinkKafkaConsumer09<TemperatureEvent>("TemperatureEvent", new TemperatureEventSchema(), props)); 

    DataStream<TemperatureEvent> ds1=dstream.keyBy("rackId").sum(1); 

    ds1.print(); 
    env.execute("Temperature Consumer"); 
} 

Wenn ich versuche, führen sie diesen Code, wirft es folgende Ausnahme: das Programm mit folgender Ausnahme beendet:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error. 
     at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520) 
     at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) 
     at org.apache.flink.client.program.Client.runBlocking(Client.java:248) 
     at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) 
     at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) 
     at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1192) 
     at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243) 
Caused by: java.lang.IndexOutOfBoundsException: Not 0th field selected for a simple type (non-tuple, non-array). 
     at org.apache.flink.streaming.util.FieldAccessor.create(FieldAccessor.java:78) 
     at org.apache.flink.streaming.api.functions.aggregation.SumAggregator.<init>(SumAggregator.java:39) 
     at org.apache.flink.streaming.api.datastream.KeyedStream.sum(KeyedStream.java:292) 
     at com.yash.main.Program.main(Program.java:38) 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:498) 
     at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) 

I Summe bin mit (1), weil mein 0. param ist rackId & 1. param tempe ist ratur wie in pojo TemperatureEvent wie folgt definiert:

public class TemperatureEvent 
{ 
    private int rackId; 
    private double temperature; 
    private long timeStamp; 

    public TemperatureEvent() 
    { 
    // TODO Auto-generated constructor stub 
    } 

public TemperatureEvent(int rackId, double temperature, long timeStamp) { 
    super(); 
    this.rackId = rackId; 
    this.temperature = temperature; 
    this.timeStamp = timeStamp; 
} 

public int getRackId() { 
    return rackId; 
} 

public void setRackId(int rackId) { 
    this.rackId = rackId; 
} 

public double getTemperature() { 
    return temperature; 
} 

public void setTemperature(double temperature) { 
    this.temperature = temperature; 
} 

public long getTimeStamp() { 
    return timeStamp; 
} 

public void setTimeStamp(long timeStamp) { 
    this.timeStamp = timeStamp; 
} 

@Override 
public String toString() { 
    //return String.format("TemperatureEvent [rackId=%s, temperature=%s, timeStamp=%s]",rackId, temperature, timeStamp); 
      String str=getRackId()+","+temperature+","+getTimeStamp(); 
      return str; 

} 

Was ist die Lösung für dieses Problem? Wie kann ich die Summe der Temperaturen gruppieren nach RackID ??

Antwort

0

Sie können für diese Methode nur indexbasierte Parameter verwenden, wenn es sich bei Ihrem Typ um einen Tupel-Typ handelt. In Ihrem Fall sollte es mit .sum("temperature") funktionieren.

+0

Danke für die Lösung, die mir geholfen hat, das Problem zu lösen ... Nun angenommen, wenn ich "Durchschnitt" statt "Summe" der Temperaturen von RackID in Fensterzeit berechnen möchte, wie kann ich Durchschnitt berechnen ?? – Akki

Verwandte Themen