2017-03-10 2 views
0

Ich benutze Funken-Streaming, um Daten von Kafka zu streamen, und ich möchte Daten nach Daten in MySql filtern.Funken rdd fliter durch Abfrage mysql

Zum Beispiel habe ich bekommen Daten aus kafka gerade wie:

{"id":1, "data":"abcdefg"} 

und es gibt Daten in MySql wie folgt aus:

id | state 
1 | "success" 

Ich brauche die MySql abzufragen den Zustand der Begriff zu bekommen Ich würde. Ich kann eine Verbindung zu MySql in der Funktion des Filters definieren, und es funktioniert. Der Code wie folgt:

def isSuccess(x): 
    id = x["id"] 
    sql = """ 
     SELECT * 
     FROM Test 
     WHERE id = "{0}" 
     """.format(id) 
    conn = mysql_connection(......) 
    result = rdbi.query_one(sql) 
    if result == None: 
     return False 
    else: 
     return True 
successRDD = rdd.filter(isSuccess) 

Aber es wird Verbindung für jede Zeile der RDD definieren, und wird eine Menge Computerressource verschwenden.

Wie im Filter zu tun?

+0

Ich würde empfehlen, 'mapPartition' Transformation gefolgt von' Filter' zu verwenden, um die wiederholte Verbindung zu MySQL zu lösen. – CoDhEr

Antwort

1

Ich schlage vor, Sie gehen für die Verwendung von mapPartition in Apache Spark verfügbar, um die Initialisierung der MySQL-Verbindung für jede RDD zu verhindern.

Dies ist der MySQL-Tabelle, die ich erstellt:

create table test2(id varchar(10), state varchar(10)); 

mit den folgenden Werten:

+------+---------+ 
| id | state | 
+------+---------+ 
| 1 | success | 
| 2 | stopped | 
+------+---------+ 

Verwenden Sie den folgenden PySpark-Code als Referenz:

import MySQLdb 

data1=[["1", "afdasds"],["2","dfsdfada"],["3","dsfdsf"]] #sampe data, in your case streaming data 
rdd = sc.parallelize(data1) 

def func1(data1): 
    con = MySQLdb.connect(host="127.0.0.1", user="root", passwd="yourpassword", db="yourdb") 
    c=con.cursor() 
    c.execute("select * from test2;") 
    data=c.fetchall() 
    dict={} 
    for x in data: 
     dict[x[0]]=x[1] 
    list1=[] 
    for x in data1: 
     if x[0] in dict: 
      list1.append([x[0], x[1], dict[x[0]]]) 
     else: 
      list1.append([x[0], x[1], "none"]) # i assign none if id in table and one received from streaming dont match 
    return iter(list1) 

print rdd.mapPartitions(func1).filter(lambda x: "none" not in x[2]).collect() 

der Ausgang, Ich bekam war:

[['1', 'afdasds', 'success'], ['2', 'dfsdfada', 'stopped']] 
+0

danke! Ich werde es versuchen – xiang