Ich versuche, eingehende Ereignisse von einem Socket zu verarbeiten, dann Windowing und die Ereignisdaten aggregieren. Ich habe einen Haken mit dem Fenster gefunden. Es scheint, dass, obwohl ich ein Schema für den DataFrame spezifiziere, es nicht in Spalten übersetzt.Windowing und aggregieren pyspark DataFrame
import sys
from pyspark.sql.types import StructType, StringType, TimestampType, FloatType, IntegerType, StructField
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
if __name__ == "__main__":
# our data currently looks like this (tab separated).
# -SYMBOL DATE PRICE TICKVOL BID ASK
# NQU7 2017-05-28T15:00:00 5800.50 12 5800.50 5800.50
# NQU7 2017-05-28T15:00:00 5800.50 1 5800.50 5800.50
# NQU7 2017-05-28T15:00:00 5800.50 5 5800.50 5800.50
# NQU7 2017-05-28T15:00:00 5800.50 1 5800.50 5800.50
if len(sys.argv) != 3:
# print("Usage: network_wordcount.py <hostname> <port>", file=sys.stderr)
exit(-1)
spark = SparkSession \
.builder \
.appName("StructuredTickStream") \
.getOrCreate()
sc = spark.sparkContext
sc.setLogLevel('WARN')
# Read all the csv files written atomically in a directory
tickSchema = StructType([
StructField("symbol", StringType(), True),
StructField("dt", TimestampType(), True),
StructField("price", FloatType(), True),
StructField("tickvol", IntegerType(), True),
StructField("bid", FloatType(), True),
StructField("ask", FloatType(), True)
])
events_df = spark \
.readStream \
.option("sep", "\t") \
.option("host", sys.argv[1]) \
.option("port", sys.argv[2]) \
.format("socket") \
.schema(tickSchema) \
.load()
events_df.printSchema()
print("columns = ", events_df.columns)
ohlc_df = events_df \
.groupby(F.window("dt", "5 minutes", "1 minutes")) \
.agg(
F.first('price').alias('open'),
F.max('price').alias('high'),
F.min('price').alias('low'),
F.last('price').alias('close')
) \
.collect()
query = ohlc_df \
.writeStream \
.outputMode("complete") \
.format("console") \
.start()
query.awaitTermination()
Der Ausgang des print("columns = ", events_df.columns)
ist ['value']
, und der Prozess schlägt mit dem folgenden Spur:
pyspark.sql.utils.AnalysisException: "cannot resolve '`dt`' given input columns: [value];;\n'Aggregate [timewindow('dt, 300000000, 60000000, 0)], [timewindow('dt, 300000000, 60000000, 0) AS window#3, first('price, false) AS open#7, max('price) AS high#9, min('price) AS low#11, last('price, false) AS close#13]\n+- StreamingRelation DataSource([email protected],socket,List(),Some(StructType(StructField(symbol,StringType,true), StructField(dt,TimestampType,true), StructField(price,FloatType,true), StructField(tickvol,IntegerType,true), StructField(bid,FloatType,true), StructField(ask,FloatType,true))),List(),None,Map(sep -> \t, host -> localhost, port -> 9999),None), textSocket, [value#0]\n"
Jede Idee, was mache ich falsch?