0

Ich habe einen Fall ähnlich wie diese:Einfügen von Arrays in Elasticsearch über PySpark

Beispiel Datenrahmen:

from pyspark.sql.types import * 
schema = StructType([ # schema 
    StructField("id", StringType(), True), 
    StructField("email", ArrayType(StringType()), True)]) 
df = spark.createDataFrame([{"id": "id1"}, 
          {"id": "id2", "email": None}, 
          {"id": "id3","email": ["[email protected]"]}, 
          {"id": "id4", "email": ["[email protected]", "[email protected]"]}], 
          schema=schema) 
df.show(truncate=False) 
+---+------------------------------------+ 
|id |email        | 
+---+------------------------------------+ 
|id1|null        | 
|id2|null        | 
|id3|[[email protected]]     | 
|id4|[[email protected], [email protected]]| 
+---+------------------------------------+ 

Ich möchte diese Daten in Elasticsearch einzufügen, so weit, wie ich recherchiert, ich habe zur Transformation in die Indizierung Format:

def parseTest(r): 
    if r['email'] is None: 
     return r['id'],{"id":r['id']} 
    else: 
     return r['id'],{"id":r['id'],"email":r['email']} 
df2 = df.rdd.map(lambda row: parseTest(row)) 
df2.top(4) 
[('id4', {'email': ['[email protected]', '[email protected]'], 'id': 'id4'}), 
('id3', {'email': ['[email protected]'], 'id': 'id3'}), 
('id2', {'id': 'id2'}), 
('id1', {'id': 'id1'})] 

Dann füge ich versuche:

es_conf = {"es.nodes" : "node1.com,node2.com", 
      "es.resource": "index/type"} 
df2.saveAsNewAPIHadoopFile(
    path='-', 
    outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", 
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf=es_conf) 

Und ich bekomme diese:

org.apache.spark.SparkException: Daten vom Typ java.util.ArrayList nicht

Spark v 2.1.0 
ES v 2.4.4 

Ohne die email Feld verwendet werden, es funktioniert gut, ich fand einige vorgeschlagene Lösung mit der es.output.json: true und , aber es schien für die Version 5 zu sein, so versuchte ich in einem anderen Cluster ich habe mit ES v5

df3 = df2.map(json.dumps) 
df3.top(4) 
['["id4", {"email": ["[email protected]", "[email protected]"], "id": "id4"}]', 
'["id3", {"email": ["[email protected]"], "id": "id3"}]', 
'["id2", {"id": "id2"}]', 
'["id1", {"id": "id1"}]'] 
es_conf2 = {"es.nodes" : "anothernode1.com,anothernode2.com", 
      "es.output.json": "true", 
      "es.resource": "index/type"} 
df3.saveAsNewAPIHadoopFile(
    path='-', 
    outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", 
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf=es_conf2) 

Dann erhalte ich:

RDD Element vom Typ java.lang.String nicht

Spark v 2.1.0 
ES v 5.2.0 

feelsbadman

Antwort

0

verwendet werden, fand ich eine andere Art und Weise die gleiche Arbeit zu tun , indem Sie die write Methode eines Datenrahmenobjekts verwenden.

also nach dem ersten Abschnitt:

from pyspark.sql.types import * 
schema = StructType([ # schema 
    StructField("id", StringType(), True), 
    StructField("email", ArrayType(StringType()), True)]) 
df = spark.createDataFrame([{"id": "id1"}, 
          {"id": "id2", "email": None}, 
          {"id": "id3","email": ["[email protected]"]}, 
          {"id": "id4", "email": ["[email protected]ail.com", "[email protected]"]}], 
          schema=schema) 
df.show(truncate=False) 
+---+------------------------------------+ 
|id |email        | 
+---+------------------------------------+ 
|id1|null        | 
|id2|null        | 
|id3|[[email protected]]     | 
|id4|[[email protected], [email protected]]| 
+---+------------------------------------+ 

Sie müssen nur:

df.write\ 
    .format("org.elasticsearch.spark.sql")\ 
    .option("es.nodes","node1.com,node2.com")\ 
    .option("es.resource","index/type")\ 
    .option("es.mapping.id", "id")\ 
    .save() 

Keine Notwendigkeit, in eine RDD zu verwandeln oder in irgendeiner Art und Weise zu modifizieren.

Verwandte Themen