2016-10-11 3 views
1

Ich versuche, mqtt als Stream in Apache Funken zu konsumieren die lib verwendet wird, ist Apache bahir spark-sql-streaming-mqtt. Diese Bibliothek verwendet die Paho MQTT-Bibliothek.spark-sql-streaming-mqtt schlechter Benutzer oder Passwort

Ich verwende den lib wie folgt:

val spark = SparkSession 
    .builder 
    .appName("MQTTStreamWordCount") 
    .master("local[4]") 
    .getOrCreate() 

import spark.implicits._ 
// Create DataFrame representing the stream of input lines from connection to mqtt server 
val lines = spark.readStream 
    .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider") 
    .option("clientId", "sparkTest") 
    .option("username", "user") 
    .option("password", "psw") 
    .option("brokerUrl", "tcp://ip:1883") 
    .option("topic", "/bikes") 
    .option("cleanSession", "true") 
    .load("tcp://ip:1883").as[(String, Timestamp)] 

    val query = lines.select("value").writeStream 
    .outputMode("append") 
    .format("console") 
    .start() 



query.awaitTermination() 

und ich bekomme diese Fehlermeldung: „schlechten Benutzername oder Passwort“.

Aber in einem anderen akka/scala Projekt verwende ich paho-mqtt lib auf dem gleichen Broker, mit dem gleichen Benutzer/psw und es funktioniert gut.

so bin ich mit diesem Fehler verwirrt

Antwort

0

Die Lösung:

  1. Verwenden paho-MQTT lib Version 1.1.0 haben autoReconnect Methode:

    „org.eclipse.paho "% "org.eclipse.paho.client.mqttv3" % "1.1.0"

  2. Ihr eigenes bahir Funken sQL-Streaming-MQTT von github Quelle bauen, da der Authentifizierung nicht vor e Enthaftung. https://github.com/apache/bahir

Verwandte Themen