2017-08-22 4 views
0

Ich versuche, eingehende Ereignisse von einem Socket zu verarbeiten, dann Windowing und die Ereignisdaten aggregieren. Ich habe einen Haken mit dem Fenster gefunden. Es scheint, dass, obwohl ich ein Schema für den DataFrame spezifiziere, es nicht in Spalten übersetzt.Windowing und aggregieren pyspark DataFrame

import sys 
from pyspark.sql.types import StructType, StringType, TimestampType, FloatType, IntegerType, StructField 

from pyspark.sql import SparkSession 
import pyspark.sql.functions as F 


if __name__ == "__main__": 
    # our data currently looks like this (tab separated). 
    # -SYMBOL DATE   PRICE TICKVOL BID   ASK 
    # NQU7 2017-05-28T15:00:00 5800.50 12  5800.50  5800.50 
    # NQU7 2017-05-28T15:00:00 5800.50 1  5800.50  5800.50 
    # NQU7 2017-05-28T15:00:00 5800.50 5  5800.50  5800.50 
    # NQU7 2017-05-28T15:00:00 5800.50 1  5800.50  5800.50 

    if len(sys.argv) != 3: 
     # print("Usage: network_wordcount.py <hostname> <port>", file=sys.stderr) 
     exit(-1) 

    spark = SparkSession \ 
     .builder \ 
     .appName("StructuredTickStream") \ 
     .getOrCreate() 
    sc = spark.sparkContext 
    sc.setLogLevel('WARN') 

    # Read all the csv files written atomically in a directory 
    tickSchema = StructType([ 
     StructField("symbol", StringType(), True), 
     StructField("dt", TimestampType(), True), 
     StructField("price", FloatType(), True), 
     StructField("tickvol", IntegerType(), True), 
     StructField("bid", FloatType(), True), 
     StructField("ask", FloatType(), True) 
    ]) 

    events_df = spark \ 
     .readStream \ 
     .option("sep", "\t") \ 
     .option("host", sys.argv[1]) \ 
     .option("port", sys.argv[2]) \ 
     .format("socket") \ 
     .schema(tickSchema) \ 
     .load() 

    events_df.printSchema() 
    print("columns = ", events_df.columns) 

    ohlc_df = events_df \ 
     .groupby(F.window("dt", "5 minutes", "1 minutes")) \ 
     .agg(
      F.first('price').alias('open'), 
      F.max('price').alias('high'), 
      F.min('price').alias('low'), 
      F.last('price').alias('close') 
     ) \ 
     .collect() 


    query = ohlc_df \ 
     .writeStream \ 
     .outputMode("complete") \ 
     .format("console") \ 
     .start() 

    query.awaitTermination() 

Der Ausgang des print("columns = ", events_df.columns) ist ['value'], und der Prozess schlägt mit dem folgenden Spur:

pyspark.sql.utils.AnalysisException: "cannot resolve '`dt`' given input columns: [value];;\n'Aggregate [timewindow('dt, 300000000, 60000000, 0)], [timewindow('dt, 300000000, 60000000, 0) AS window#3, first('price, false) AS open#7, max('price) AS high#9, min('price) AS low#11, last('price, false) AS close#13]\n+- StreamingRelation DataSource([email protected],socket,List(),Some(StructType(StructField(symbol,StringType,true), StructField(dt,TimestampType,true), StructField(price,FloatType,true), StructField(tickvol,IntegerType,true), StructField(bid,FloatType,true), StructField(ask,FloatType,true))),List(),None,Map(sep -> \t, host -> localhost, port -> 9999),None), textSocket, [value#0]\n" 

Jede Idee, was mache ich falsch?

Antwort

-1

Ihr Datenrahmen hat nur eine Spalte value und hier versuchen Sie aus dieser events_df Spalte dt zuzugreifen. Dies ist der Hauptgrund für das Problem.

Below Aussage klar, es zeigt einzelne Spalte value

print("columns = ", events_df.columns) 

Sie benötigen diese

events_df = spark \ 
    .readStream \ 
    .option("sep", "\t") \ 
    .option("host", sys.argv[1]) \ 
    .option("port", sys.argv[2]) \ 
    .format("socket") \ 
    .schema(tickSchema) \ 
    .load() 

prüfen haben, warum es mit nur einer Spalte df zu schaffen.

Verwandte Themen