2017-06-27 5 views
0

Jetzt weiß ich für dataframe, konnten wir udf verwenden, aber zurzeit handle ich RDD, Daten. Außer auf Funktion zu verwenden, wie kann ich ein Verfahren, durch das Filter in allen Verfahren in der KlasseWie man äußere Funktion im Filter für RDD Daten anruft

def date_filer_help(date1, date2): 
     date1_arr = date1.split("-") 
     date2_arr = date2.split("-") 
     for i in range(len(date1_arr)): 
      if int(date1_arr[i]) < int(date2_arr[i]): 
       return True 
      elif int(date1_arr[i]) > int(date2_arr[i]): 
       return False 
     return True 

    def date_filter(prev_date, date, end_date): 
     return date_filer_help(prev_date, date) and date_filer_help(date, end_date) 


rdd = sc.textFile(action_file).map(lambda x: x.split(','))\ 
    .filter(lambda x: date_filter("0000-00-00", x[0], "2016-06-30")) 

Ich will schreiben date_filter als statisch in dieser Klasse oder anderer classes.Otherwise, in jedem Verfahren verwendet, zu definieren, I müssen zwei Hilfen wie oben immer wieder neu definieren. Aber es funktioniert nicht, wenn ich im Cluster laufe, wie soll ich das machen?

Antwort

1

Wollen Sie dies:

class DataFilter(object): 
    def __init__(self): 
     self.sc = SparkContext() 

    @staticmethod 
    def date_filer_help(date1, date2): 
     return date1 <= date2 

    @staticmethod 
    def date_filter(prev_date, date, end_date): 
     return DataFilter.date_filer_help(prev_date, date) and DataFilter.date_filer_help(date, end_date) 

    def run(self): 
     rdd = self.sc.parallelize(
      map(lambda x: ((datetime.date(2016, 6, 25) + datetime.timedelta(x)).strftime('%Y-%m-%d'), x), range(10))) 
     result = rdd.filter(lambda x: DataFilter.date_filter("0000-00-00", x[0], "2016-06-30")) 
     return result.collect() 


if __name__ == '__main__': 
    print(DataFilter().run()) 
+0

Jetzt erkenne ich, ist es, weil ich die Haupt in einer anderen Datei und tat nicht sc.addFile laufen. – user3341953

Verwandte Themen