2017-02-16 1 views
2

Ich benutze Dataflow 0.5.5 Python. Lief in den folgenden Fehler in sehr einfachen Code:Dataflow 0.5.5 - was hat das Objekt vom Typ '_UnwindowedValues' keine len()?

print(len(row_list)) 

row_list eine Liste ist. Genau der gleiche Code, die gleichen Daten und die gleiche Pipeline laufen auf DirectRunner einwandfrei, aber sie löst die folgende Ausnahme auf DataflowRunner aus. Was bedeutet es und wie kann ich es lösen?

job name: `beamapp-root-0216042234-124125` 

    (f14756f20f567f62): Traceback (most recent call last): 
    File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 544, in do_work 
    work_executor.execute() 
    File "dataflow_worker/executor.py", line 973, in dataflow_worker.executor.MapTaskExecutor.execute (dataflow_worker/executor.c:30547) 
    with op.scoped_metrics_container: 
    File "dataflow_worker/executor.py", line 974, in dataflow_worker.executor.MapTaskExecutor.execute (dataflow_worker/executor.c:30495) 
    op.start() 
    File "dataflow_worker/executor.py", line 302, in dataflow_worker.executor.GroupedShuffleReadOperation.start (dataflow_worker/executor.c:12149) 
    def start(self): 
    File "dataflow_worker/executor.py", line 303, in dataflow_worker.executor.GroupedShuffleReadOperation.start (dataflow_worker/executor.c:12053) 
    with self.scoped_start_state: 
    File "dataflow_worker/executor.py", line 316, in dataflow_worker.executor.GroupedShuffleReadOperation.start (dataflow_worker/executor.c:11968) 
    with self.shuffle_source.reader() as reader: 
    File "dataflow_worker/executor.py", line 320, in dataflow_worker.executor.GroupedShuffleReadOperation.start (dataflow_worker/executor.c:11912) 
    self.output(windowed_value) 
    File "dataflow_worker/executor.py", line 152, in dataflow_worker.executor.Operation.output (dataflow_worker/executor.c:6317) 
    cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value) 
    File "dataflow_worker/executor.py", line 85, in dataflow_worker.executor.ConsumerSet.receive (dataflow_worker/executor.c:4021) 
    cython.cast(Operation, consumer).process(windowed_value) 
    File "dataflow_worker/executor.py", line 766, in dataflow_worker.executor.BatchGroupAlsoByWindowsOperation.process (dataflow_worker/executor.c:25558) 
    self.output(wvalue.with_value((k, wvalue.value))) 
    File "dataflow_worker/executor.py", line 152, in dataflow_worker.executor.Operation.output (dataflow_worker/executor.c:6317) 
    cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value) 
    File "dataflow_worker/executor.py", line 85, in dataflow_worker.executor.ConsumerSet.receive (dataflow_worker/executor.c:4021) 
    cython.cast(Operation, consumer).process(windowed_value) 
    File "dataflow_worker/executor.py", line 545, in dataflow_worker.executor.DoOperation.process (dataflow_worker/executor.c:18474) 
    with self.scoped_process_state: 
    File "dataflow_worker/executor.py", line 546, in dataflow_worker.executor.DoOperation.process (dataflow_worker/executor.c:18428) 
    self.dofn_receiver.receive(o) 
    File "apache_beam/runners/common.py", line 195, in apache_beam.runners.common.DoFnRunner.receive (apache_beam/runners/common.c:5137) 
    self.process(windowed_value) 
    File "apache_beam/runners/common.py", line 262, in apache_beam.runners.common.DoFnRunner.process (apache_beam/runners/common.c:7078) 
    self.reraise_augmented(exn) 
    File "apache_beam/runners/common.py", line 274, in apache_beam.runners.common.DoFnRunner.reraise_augmented (apache_beam/runners/common.c:7467) 
    raise type(exn), args, sys.exc_info()[2] 
    File "apache_beam/runners/common.py", line 258, in apache_beam.runners.common.DoFnRunner.process (apache_beam/runners/common.c:6967) 
    self._dofn_simple_invoker(element) 
    File "apache_beam/runners/common.py", line 198, in apache_beam.runners.common.DoFnRunner._dofn_simple_invoker (apache_beam/runners/common.c:5283) 
    self._process_outputs(element, self.dofn_process(element.value)) 
    File "apache_beam/runners/common.py", line 286, in apache_beam.runners.common.DoFnRunner._process_outputs (apache_beam/runners/common.c:7678) 
    for result in results: 
    File "trip_augmentation_test.py", line 120, in get_osm_way 
TypeError: object of type '_UnwindowedValues' has no len() [while running 'Pull way info from mapserver'] 

Code hier: trip_augmentation_test.py

#!/usr/bin/env python 
# coding: utf-8 

from __future__ import absolute_import 

import argparse 
import logging 
import json 

import apache_beam as beam 
from apache_beam.utils.options import PipelineOptions 
from apache_beam.utils.options import SetupOptions 


def get_osm_way(pairs_same_group): 

    import requests 
    from requests.adapters import HTTPAdapter 
    from requests.packages.urllib3.exceptions import InsecureRequestWarning 
    from multiprocessing.pool import ThreadPool 
    import time 
    #disable InsecureRequestWarning for a cleaner output 
    requests.packages.urllib3.disable_warnings(InsecureRequestWarning) 

    print('processing hardwareid={} trips'.format(pairs_same_group[0])) 

    row_list = pairs_same_group[1] 
    print(row_list) 
    http_request_num = len(row_list) ######### this line ran into the above error########## 
    with requests.Session() as s: 
     s.mount('https://ip address',HTTPAdapter(pool_maxsize=http_request_num)) ##### a host name is needed for this http persistent connection 
     pool = ThreadPool(processes=1) 

     for row in row_list: 
      hardwareid=row['HardwareId'] 
      tripid=row['TripId'] 
      latlonArr = row['LatLonStrArr'].split(','); 
      print('gps points num: {}'.format(len(latlonArr))) 
      cor_array = [] 
      for latlon in latlonArr: 
       lat = latlon.split(';')[0] 
       lon = latlon.split(';')[1] 
       cor_array.append('{{"x":"{}","y":"{}"}}'.format(lon, lat)) 
      url = 'https://<ip address>/functionname?coordinates=[{}]'.format(','.join(cor_array)) 
      print(url) 
      print("Requesting") 
      r = pool.apply_async(thread_get, (s, url)).get() 
      print ("Got response") 
      print(r) 
      if r.status_code==200: 
       yield (hardwareid,tripid,r.text) 
      else: 
       yield (hardwareid,tripid,None) 


def run(argv=None): 
    parser = argparse.ArgumentParser() 
    parser.add_argument('--input', 
         help=('Input BigQuery table to process specified as: ' 
          'PROJECT:DATASET.TABLE or DATASET.TABLE.')) 
    parser.add_argument(
     '--output', 
     required=True, 
     help= 
     ('Output BigQuery table for results specified as: PROJECT:DATASET.TABLE ' 
     'or DATASET.TABLE.')) 

    known_args, pipeline_args = parser.parse_known_args(argv) 
    pipeline_options = PipelineOptions(argv) 
    pipeline_options.view_as(SetupOptions).save_main_session = True 
    p = beam.Pipeline(options=pipeline_options) 

    (p 
    | 'Read trip from BigQuery' >> beam.io.Read(beam.io.BigQuerySource(query=known_args.input)) 
    | 'Convert' >> beam.Map(lambda row: (row['HardwareId'],row)) 
    | 'Group devices' >> beam.GroupByKey() 
    | 'Pull way info from mapserver' >> beam.FlatMap(get_osm_way) 
    | 'Map way info to dictionary' >> beam.FlatMap(convert_to_dict) 
    | 'Save to BQ' >> beam.io.Write(beam.io.BigQuerySink(
      known_args.output,   schema='HardwareId:INTEGER,TripId:INTEGER,OrderBy:INTEGER,IndexRatio:FLOAT,IsEstimate:BOOLEAN,IsOverRide:BOOLEAN,MaxSpeed:FLOAT,Provider:STRING,RoadName:STRING,WayId:STRING,LastEdited:TIMESTAMP,WayLatLons:STRING,BigDataComment:STRING', 
      create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, 
      write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)) 
) 
    # Run the pipeline (all operations are deferred until run() is called). 
    p.run() 


if __name__ == '__main__': 
    logging.getLogger().setLevel(logging.INFO) 
    run() 

Pipeline Anruf hier (ich bin mit Google Cloud Datalab)

!python trip_augmentation_test.py \ 
--output 'my-project:my-dataset.mytable' \ 
--input 'SELECT HardwareId,TripId, LatLonStrArr FROM [my-project:my-dataset.mytable] ' \ 
--project 'my-project' \ 
--runner 'DataflowRunner' \ ### if just change this to DirectRunner, everything's fine 
--temp_location 'gs://mybucket/tripway_temp' \ 
--staging_location 'gs://mybucket/tripway_staging' \ 
--worker_machine_type 'n1-standard-2' \ 
--profile_cpu True \ 
--profile_memory True 

Folgen

up

Ich protokolliert den Typ von row_list, stellte sich heraus, in DataflowRunner, es ist <class 'apache_beam.transforms.trigger._UnwindowedValues'>, während in DirectRunner ist es list. Ist das eine zu erwartende Inkonsistenz?

+0

Können Sie den vollständigen Code für mehr Kontext posten? –

+0

Code hinzugefügt, eliminierte unrelavante Funktionsdefinition und mosaik einige sensible Details – foxwendy

+0

Als Kommentar müssen Sie nicht alle diese Profiling ausführen - es kann Ihren Job verlangsamen. – Pablo

Antwort

5

Hinweis: Dieses Problem wird in Beam-2.0.1


Leider fixiert wird, ist dies ein Läufer spezifische Marotte des Datenfluß Läufer. In Dataflow kommt das Ergebnis von GroupByKey nicht in Form einer Liste vor und unterstützt len nicht - aber ist iterable.

Kurz vor http_request_num = len(row_list) tun, können Sie es in eine Art zwingen, die len unterstützt, z:

row_list = list(pairs_same_group[1]) 
http_request_num = len(row_list) 

Als Hinweis, das ist ein known issue, und wird hoffentlich bald behoben werden.

+0

Danke @Pablo, ich habe es auch selbst herausgefunden, irgendwie die gleiche Idee wie deine ... Also sagt uns das, dass verschiedene Läufer den Code anders interpretieren? Wenn das der Fall ist, wäre es besser, eine schöne Dokumentation zu haben, die die verschiedenen Läuferspezifikationen erklärt. Es war eine frustrierende Erfahrung, es auszuprobieren – foxwendy

+0

Yup. Das tut mir leid. Wir werden es hoffentlich bald reparieren oder dokumentieren. – Pablo

Verwandte Themen