2017-05-03 6 views
3

Flink Version: 1.2.0
Scala Version: 2.11.8Real-Time Streaming-Vorhersage in Flink scala mit

ich ein Datastream zur Vorhersage eines Modells in flink mit mit scala verwenden möchten. Ich habe einen DataStream [String] in flink mit scala, der json-formatierte Daten von einer Kafka-Quelle enthält. Ich möchte diesen Datenstrom verwenden, um auf einem bereits trainierten Flink-ml-Modell vorherzusagen. Das Problem ist, alle flink-ml-Beispiele verwenden DataSet api zur Vorhersage. Ich bin relativ neu zu flink und scala, so dass jede Hilfe in Form einer Code-Lösung geschätzt werden würde.

Eingang:

{"FC196":"Dormant","FC174":"Yolo","FC195":"Lol","FC176":"4","FC198":"BANKING","FC175":"ABDULMAJEED","FC197":"2017/04/04","FC178":"1","FC177":"CBS","FC199":"INDIVIDUAL","FC179":"SYSTEM","FC190":"OK","FC192":"osName","FC191":"Completed","FC194":"125","FC193":"7","FC203":"A10SBPUB000000000004439900053570","FC205":"1","FC185":"20","FC184":"Transfer","FC187":"2","FC186":"2121","FC189":"abcdef","FC200":"","FC188":"BR01","FC202":"INDIVIDUAL","FC201":"","FC181":"7:00PM","FC180":"2007/04/01","FC183":"11000000","FC182":"INR"} 

Code:

package org.apache.flink.quickstart 

//imports 

import java.util.Properties 

import org.apache.flink.api.scala._ 
import org.apache.flink.ml.recommendation.ALS 
import org.apache.flink.ml.regression.MultipleLinearRegression 
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment 

import scala.util.parsing.json.JSON 

//kafka consumer imports 
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09 
import org.apache.flink.streaming.util.serialization.SimpleStringSchema 

//kafka json table imports 
import org.apache.flink.table.examples.scala.StreamTableExample 
import org.apache.flink.table.api.TableEnvironment 
import org.apache.flink.streaming.connectors.kafka.Kafka09JsonTableSource 
import org.apache.flink.api.java.DataSet 

//JSon4s imports 
import org.json4s.native.JsonMethods 



// Case class 
case class CC(FC196:String,FC174:String,FC195:String,FC176:String,FC198:String,FC175:String,FC197:String,FC178:String,FC177:String,FC199:String,FC179:String,FC190:String,FC192:String,FC191:String,FC194:String,FC193:String,FC203:String,FC205:String,FC185:String,FC184:String,FC187:String,FC186:String,FC189:String,FC200:String,FC188:String,FC202:String,FC201:String,FC181:String,FC180:String,FC183:String,FC182:String) 


object WordCount { 

    implicit val formats = org.json4s.DefaultFormats 

    def main(args: Array[String]) { 

    // set up the execution environment 
    implicit lazy val formats = org.json4s.DefaultFormats 

    // kafka properties 
    val properties = new Properties() 
    properties.setProperty("bootstrap.servers", "***.**.*.***:9093") 
    properties.setProperty("zookeeper.connect", "***.**.*.***:2181") 
    properties.setProperty("group.id","grouop") 
    properties.setProperty("auto.offset.reset", "earliest") 
    val env = StreamExecutionEnvironment.getExecutionEnvironment 
// val tableEnv = TableEnvironment.getTableEnvironment(env) 

    val st = env 
     .addSource(new FlinkKafkaConsumer09("new", new SimpleStringSchema() , properties)) 
     .flatMap(raw => JsonMethods.parse(raw).toOption) 


    val mapped = st.map(_.extract[CC]) 

    mapped.print() 

    env.execute() 

    } 
} 

Antwort

1

Die Art und Weise lösen dieses Problem ist es, ein MapFunction zu schreiben, die das Modell beim Start des Jobs liest. Das MapFunction speichert dann das Modell als Teil seines internen Status. Auf diese Weise wird es im Falle eines Fehlers automatisch wiederhergestellt:

public static void main(String[] args) throws Exception { 
     // obtain execution environment, run this example in "ingestion time" 
     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
     env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); 

     DataStream<Value> input = ...; // read from Kafka for example 

     DataStream<Prediction> prediction = input.map(new Predictor()); 

     prediction.print(); 

     env.execute(); 
    } 

    public static class Predictor implements MapFunction<Value, Prediction>, CheckpointedFunction { 

     private transient ListState<Model> modelState; 

     private transient Model model; 

     @Override 
     public Prediction map(Value value) throws Exception { 
      return model.predict(value); 
     } 

     @Override 
     public void snapshotState(FunctionSnapshotContext context) throws Exception { 
      // we don't have to do anything here because we assume the model to be constant 
     } 

     @Override 
     public void initializeState(FunctionInitializationContext context) throws Exception { 
      ListStateDescriptor<Model> listStateDescriptor = new ListStateDescriptor<>("model", Model.class); 

      modelState = context.getOperatorStateStore().getUnionListState(listStateDescriptor); 

      if (context.isRestored()) { 
       // restore the model from state 
       model = modelState.get().iterator().next(); 
      } else { 
       modelState.clear(); 

       // read the model from somewhere, e.g. read from a file 
       model = ...; 

       // update the modelState so that it is checkpointed from now 
       modelState.add(model); 
      } 
     } 
    } 

    public static class Model {} 

    public static class Value{} 

    public static class Prediction{} 
}