Ich benutze Funken-Streaming, um Daten von Kafka zu streamen, und ich möchte Daten nach Daten in MySql filtern.Funken rdd fliter durch Abfrage mysql
Zum Beispiel habe ich bekommen Daten aus kafka gerade wie:
{"id":1, "data":"abcdefg"}
und es gibt Daten in MySql wie folgt aus:
id | state
1 | "success"
Ich brauche die MySql abzufragen den Zustand der Begriff zu bekommen Ich würde. Ich kann eine Verbindung zu MySql in der Funktion des Filters definieren, und es funktioniert. Der Code wie folgt:
def isSuccess(x):
id = x["id"]
sql = """
SELECT *
FROM Test
WHERE id = "{0}"
""".format(id)
conn = mysql_connection(......)
result = rdbi.query_one(sql)
if result == None:
return False
else:
return True
successRDD = rdd.filter(isSuccess)
Aber es wird Verbindung für jede Zeile der RDD definieren, und wird eine Menge Computerressource verschwenden.
Wie im Filter zu tun?
Ich würde empfehlen, 'mapPartition' Transformation gefolgt von' Filter' zu verwenden, um die wiederholte Verbindung zu MySQL zu lösen. – CoDhEr