2017-05-10 1 views
2

Ich Analyse von Protokollen und ich habe diese Architektur:Elasticsearch Spark Streaming

kafka-> Funken Streaming -> Elasticsearch

Mein Hauptziel ist es Modelle für maschinelles Lernen im Streaming zu erstellen. Ich denke, dass ich zwei Dinge tun:

1) Kafka-> Funken Streaming (ML) -> Elasticsearch

2) Kafka-> Funken Streaming-> Elasticsearch -> Funken Streaming (ML)

-Ich denke, dass die zweite Architektur die beste ist, da Spark-Streaming indizierte Daten direkt verwendet. Was denken Sie? Ist das korrekt? -Können wir Sparks Streaming in Echtzeit mit elasticsearch verbinden? -Wenn wir ein Modell im Spark-Streaming (nach elastischer Suche) erstellen, müssen wir dieses Modell an dieser Stelle verwenden (nach elasticsearch) oder können wir es im Spark-Streaming (directery nach kafka) verwenden? # use == in Echtzeit voraussagen -Das Erstellen von Modellen nach elasticsearch machte unsere Modelle statisch (oder nicht in Echtzeit)

Vielen Dank.

+0

Elasticsearch ist nicht 'Echt Time' System. __Source__: https://www.elastic.co/guide/en/elasticsearch/reference/current/_basic_concepts.html#_near_realtime_nrt – avr

+0

In der Dokumentation sagten sie, dass es 1 Sekunde dauert, um durchsuchbar zu werden, also denkst du, es ist dasselbe transformiert werden, um Funken zu strömen? Sonst denkst du Kafka-> Sparking Streaming (ML) -> elastische Suche ist besser? Vielen Dank für Ihre Antwort. –

+0

Alles jenseits von '1 Sekunde' ist keine Echtzeit mehr. Wenn Sie möchten, dass Ihre Anwendung "Echtzeit" ist, dann kann elasticsearch nicht dazu dienen, egal wo (nach dem Funken-Streaming oder vorher) Sie hineinlegen. – avr

Antwort

0

Sie meinen das?

kafka -> Funken Streaming -> Elasticsearch db

val sqlContext = new SQLContext(sc) 

//kafka group 
val group_id = "receiveScanner" 
// kafka topic 
val topic = Map("testStreaming"-> 1) 
// zk connect 
val zkParams = Map(
    "zookeeper.connect" ->"localhost", 
    "zookeeper.connection.timeout.ms" -> "10000", 
    "group.id" -> group_id) 

// Kafka 
val kafkaConsumer = KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](ssc,zkParams,topic,StorageLevel.MEMORY_ONLY_SER) 
val receiveData = kafkaConsumer.map(_._2) 
// printer kafka data 
receiveData.print() 
receiveData.foreachRDD{ rdd=> 
    val transform = rdd.map{ line => 
    val data = Json.parse(line) 
    // play json parse 
    val id = (data \ "id").asOpt[Int] match { case Some(x) => x; case None => 0} 
    val name = (data \ "name" ).asOpt[String] match { case Some(x)=> x ; case None => "" } 
    val age = (data \ "age").asOpt[Int] match { case Some(x) => x; case None => 0} 
    val address = (data \ "address" ).asOpt[String] match { case Some(x)=> x ; case None => "" } 
    Row(id,name,age,address) 
    } 

    val transfromrecive = sqlContext.createDataFrame(transform,schameType) 
    import org.apache.spark.sql.functions._ 
    import org.elasticsearch.spark.sql._ 
    //filter age < 20 , to ES database 
    transfromrecive.where(col("age").<(20)).orderBy(col("age").asc) 
    .saveToEs("member/user",Map("es.mapping.id" -> "id")) 
} 

} 

/** * Datenrahmen schame * */

def schameType = StructType(
    StructField("id",IntegerType,false):: 
    StructField("name",StringType,false):: 
    StructField("age",IntegerType,false):: 
    StructField("address",StringType,false):: 
    Nil 
)