Ich habe einen Datensatz der FormErhalten zufälligen Satz von Zeilen aus PCollection
user_id, date, other_columns
1, 2017-03-10, ...
2, 2017-03-10, ...
3, 2017-03-10, ...
...
und ich brauche folgendes zu tun: Für jede Zeile im Datensatz ich eine neue Zeile erstellt werden soll, die die enthalten aktuelle Zeile und eine zufällige Teilmenge von N Zeilen für den gleichen Tag an verschiedenen Benutzer folgend entsprechen:
row, other_rows
{'user_id': 1, 'date': '2017-03-10', ...}, [{'user_id': 2,...},...]
{'user_id': 2, 'date': '2017-03-10', ...}, [{'user_id': 1,...},...]
...
ich habe es wie folgt implementiert, aber es ist sehr langsam für große Datensätze, wenn in der Cloud ausgeführt.
dataset
| 'map-to-date' >> beam.Map(lambda x: (x['date'], x))
| 'group-by-date' >> beam.GroupByKey()
| 'generate-output' >> beam.ParDo(GenerateOutputRows())
wo GenerateOutputRows
wie folgt definiert ist:
class GenerateOutputRows(beam.DoFn):
def process(self, element):
(date, rows) = element
for r in rows:
other_users_rows = list(filter(lambda x: x['user_id'] != r['user_id'],
rows))
yield (r, random.sample(other_users_rows, N))
könnte denken Sie an eine andere leistungsfähigere Art und Weise, um das gewünschte Ergebnis zu bekommen?
Müssen Sie dies wirklich für jede Zeile tun? oder nur einmal pro User-Tag? – CasualT
Ja, ich brauche das für jede Zeile. Ich erstelle einen Trainingsdatensatz für ein ML-Modell und jede Zeile wird ein Trainingsbeispiel sein. – pnezis
Wie groß ist Ihr Dataset und welcher ist der langsamere Vorgang? Wie viele Arbeiter benutzt du? Und hast du eine Job ID? – Pablo