2016-08-30 4 views
0

Ich bin mit den folgenden Ausdruck unter Verwendung eines Pyspark dataframe:Ärger mit Spark-Dataframe GROUPBY

md = data.filter(data['cluster_id'].like('cluster30')) \ 
       .select(
        udf_make_date(
         fn.year(data['request_timestamp']), 
         fn.month(data['request_timestamp']), 
         fn.dayofmonth(data['request_timestamp']) 
        ), 
        who_assigned, 
        fn.hour(data['request_timestamp']).alias('request_hour'), 
        fn.date_format(
         data['request_timestamp'], 
         'F').alias('request_day_of_week'), 
        fn.lit(data.count()).alias('num_requests'), 
        fn.countDistinct(data['user_id']).alias('num_users'), 
        fn.avg(data['microseconds']).alias(
         'avg_response_time_microseconds')) \ 
       .groupBy(
        udf_make_date(
         fn.year(data['request_timestamp']), 
         fn.month(data['request_timestamp']), 
         fn.dayofmonth(data['request_timestamp']) 
        ), 
        who_assigned, 
        fn.hour(data['request_timestamp']), 
        fn.date_format(
         data['request_timestamp'], 
         'F') 
      ) 

und erhalte den folgenden Fehler:

pyspark.sql.utils.AnalysisException: "expression '`request_timestamp`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;". 

Soweit ich das beurteilen kann, sollte ich sein einschließlich alles in der groupBy, die ich brauche ... Ich schreibe dies, um die Struktur meiner SQL Abfrage zu spiegeln, die in etwa so aussieht:

SELECT 
MAKE_DATE(YEAR(request_timestamp), MONTH(request_timestamp), DAYOFMONTH(request_timestamp)), 
CASE 
    lots of case logic here... 
HOUR(request_timestamp) AS request_hour, 
DATE_FORMAT(request_timestamp, 'F') request_day_of_week, 
COUNT(*) as num_requests, 
COUNT(DISTINCT user_id) num_users, 
AVG(microseconds) AS avg_response_time_microseconds 
FROM 
(SELECT * 
FROM {table} 
WHERE cluster_id LIKE 'cluster30') 
GROUP BY 
MAKE_DATE(YEAR(request_timestamp), MONTH(request_timestamp), DAYOFMONTH(request_timestamp)), 
CASE 
    lots of case logic here... 
HOUR(request_timestamp), 
DATE_FORMAT(request_timestamp, 'F') 

Antwort

2

In Spark kommt die groupBy vor den Aggregationen. Außerdem wird jede Spalte in der Funktion groupBy im Ergebnis DataFrame ausgewählt. Für Ihre Abfrage würde das Äquivalent in der SparkSQL-API etwa lauten:

data \ 
    .filter(data['cluster_id'].like('cluster30')) \ 
    .groupBy(
     udf_make_date(
      fn.year(data['request_timestamp']), 
      fn.month(data['request_timestamp']), 
      fn.dayofmonth(data['request_timestamp']) 
     ).alias('request_date'), 
     who_assigned, 
     fn.hour(data['request_timestamp']).alias('request_hour'), 
     fn.date_format(
      data['request_timestamp'], 
      'F' 
     ).alias('request_day_of_week') 
    ) \ 
    .agg(
     fn.countDistinct(data['user_id']).alias('num_users'), 
     fn.count('*').alias('num_requests'), 
     fn.avg(data['microseconds']).alias('avg_response_time_microseconds') 
    )