2016-03-25 15 views
4

Ich möchte die Bibliotheken matplotlib.bblpath oder shapely.geometry in pyspark verwenden.Wie bekomme ich Python-Bibliotheken in Pyspark?

Wenn ich versuche, einen von ihnen zu importieren ich die folgenden Fehlermeldung erhalten:

>>> from shapely.geometry import polygon 
Traceback (most recent call last): 
File "<stdin>", line 1, in <module> 
ImportError: No module named shapely.geometry 

Ich weiß, dass das Modul nicht vorhanden ist, aber ich möchte wissen, wie kann diese Pakete auf meine pyspark Bibliotheken gebracht werden.

+1

'shapely' installieren pip –

+0

Ich möchte installiere es in pyspark, nicht in meinem lokalen Rechner. Dieser Befehl funktioniert nicht in der pypspark-Shell. – nakulchawla09

+0

Dies ist ein mögliches Duplikat von http://stackoverflow.com/q/29495435/1711188 –

Antwort

7

im Funken Kontext versuchen, mit:

SparkContext.addPyFile("module.py") # also .zip 

, zitiert aus den docs:

Add a .py or .zip dependency for all tasks to be executed on this SparkContext in the future. The path passed can be either a local file, a file in HDFS (or other Hadoop-supported filesystems), or an HTTP, HTTPS or FTP URI.

+1

Ich bin in der Lage, diese Abhängigkeit hinzuzufügen. Gibt es eine Möglichkeit, dies zu tun, wenn ich einen Funke abschicke? Ich mache eine Spark-Submit von file.py, in dieser Datei sollte ich addPyFile ("module.py") tun oder gibt es eine Möglichkeit, Abhängigkeiten durch Hinzufügen eines Arguments zum Spark-Submit-Befehl – nakulchawla09

+0

aus dem Spark hinzuzufügen doc (https://spark.apache.org/docs/1.1.0/submitting-applications.html) scheint es machbar, eine py-Datei nach Argument hinzuzufügen (in den Suchpfad einzufügen). Allerdings weiß ich nicht, ob die Submission-API für PySpark in irgendeiner Weise anders ist. – armatita

+0

ok Ich werde es in einem Argument und in meiner Datei versuchen. Beide Möglichkeiten zu sehen, was funktioniert. – nakulchawla09

1

Ist das auf Standalone (dh Laptop/Desktop) oder in einer Cluster-Umgebung (zB AWS EMR)

  1. Wenn auf Ihrem Laptop/Desktop, pip install shapely sollte gut funktionieren. Möglicherweise müssen Sie Ihre Umgebungsvariablen für Ihre Standard-Python-Umgebung (en) überprüfen. Wenn Sie beispielsweise in der Regel Python 3 verwenden, Python 2 jedoch für pyspark verwenden, wären Sie für pyspark nicht formschön verfügbar.

  2. Wenn in einer Cluster-Umgebung wie in AWS EMR, können Sie versuchen:

    import os 
    
    def myfun(x):` 
         os.system("pip install shapely") 
         return x 
    rdd = sc.parallelize([1,2,3,4]) ## assuming 4 worker nodes 
    rdd.map(lambda x: myfun(x)).collect() 
    ## call each cluster to run the code to import the library 
    

„Ich weiß, dass das Modul nicht vorhanden ist, aber ich möchte wissen, wie diese Pakete sein können in meine pyspark-Bibliotheken gebracht. "

Wenn auf EMR pyspark mit den gewünschten anderen Bibliotheken und Konfigurationen vorbereitet werden soll, können Sie diese Einstellungen mit einem Bootstrap-Schritt vornehmen. Abgesehen davon, können Sie eine Bibliothek nicht zu "pyspark" hinzufügen, ohne Spark in Scala zu kompilieren (was ein Ärgernis wäre, wenn Sie mit SBT nicht vertraut sind).

+0

Das Problem dabei ist, dass das Paket auf Knoten 3 nicht installiert werden kann, wenn es verwendet wurde. – user48956

+0

Sie können beim Start Ihres EMR ein Bash-Skript verwenden (hoffentlich verwenden Sie EMR, falls in AWS), um alle benötigten Bibliotheken zu installieren. Dies ist der "Bootstrap-Installationsschritt" – Jon

+0

@ user48956 Sie dürfen keine 3rd-Party-Pakete importieren, die möglicherweise aktualisiert werden, bevor Sie alles, was Sie benötigen, aktualisieren. –

4

So funktioniert es in unserem AWS EMR-Cluster (Es sollte auch in jedem anderen Cluster identisch sein). Ich habe das folgende Shell-Skript und ausgeführt als eine Bootstrap-Aktionen:

#!/bin/bash 
# shapely installation 
wget http://download.osgeo.org/geos/geos-3.5.0.tar.bz2 
tar jxf geos-3.5.0.tar.bz2 
cd geos-3.5.0 && ./configure --prefix=$HOME/geos-bin && make && make install 
sudo cp /home/hadoop/geos-bin/lib/* /usr/lib 
sudo /bin/sh -c 'echo "/usr/lib" >> /etc/ld.so.conf' 
sudo /bin/sh -c 'echo "/usr/lib/local" >> /etc/ld.so.conf' 
sudo /sbin/ldconfig 
sudo /bin/sh -c 'echo -e "\nexport LD_LIBRARY_PATH=/usr/lib" >> /home/hadoop/.bashrc' 
source /home/hadoop/.bashrc 
sudo pip install shapely 
echo "Shapely installation complete" 
pip install https://pypi.python.org/packages/74/84/fa80c5e92854c7456b591f6e797c5be18315994afd3ef16a58694e1b5eb1/Geohash-1.0.tar.gz 
# 
exit 0 

Hinweis: Statt als Bootstrap-Aktionen laufen dieses Skript kann in jedem Knoten in einem Cluster unabhängig ausgeführt werden. Ich habe beide Szenarien getestet.

Es folgt eine Probe pyspark und wohlgeformten Code (Spark-SQL UDF) über Befehle, um sicherzustellen, arbeiten wie erwartet:

Python 2.7.10 (default, Dec 8 2015, 18:25:23) 
[GCC 4.8.3 20140911 (Red Hat 4.8.3-9)] on linux2 
Type "help", "copyright", "credits" or "license" for more information. 
Welcome to 
     ____    __ 
    /__/__ ___ _____/ /__ 
    _\ \/ _ \/ _ `/ __/ '_/ 
    /__/.__/\_,_/_/ /_/\_\ version 1.6.1 
     /_/ 

Using Python version 2.7.10 (default, Dec 8 2015 18:25:23) 
SparkContext available as sc, HiveContext available as sqlContext. 
>>> from pyspark.sql.functions import udf 
>>> from pyspark.sql.types import StringType 
>>> from shapely.wkt import loads as load_wkt 
>>> def parse_region(region): 
...  from shapely.wkt import loads as load_wkt 
...  reverse_coordinate = lambda coord: ' '.join(reversed(coord.split(':'))) 
...  coordinate_list = map(reverse_coordinate, region.split(', ')) 
...  if coordinate_list[0] != coordinate_list[-1]: 
...   coordinate_list.append(coordinate_list[0]) 
...  return str(load_wkt('POLYGON ((%s))' % ','.join(coordinate_list)).wkt) 
... 
>>> udf_parse_region=udf(parse_region, StringType()) 
16/09/06 22:18:34 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0 
16/09/06 22:18:34 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException 
>>> df = sqlContext.sql('select id, bounds from <schema.table_name> limit 10') 
>>> df2 = df.withColumn('bounds1', udf_parse_region('bounds')) 
>>> df2.first() 
Row(id=u'0089d43a-1b42-4fba-80d6-dda2552ee08e', bounds=u'33.42838509594465:-119.0533447265625, 33.39170168789402:-119.0203857421875, 33.29992542601392:-119.0478515625', bounds1=u'POLYGON ((-119.0533447265625 33.42838509594465, -119.0203857421875 33.39170168789402, -119.0478515625 33.29992542601392, -119.0533447265625 33.42838509594465))') 
>>> 

Danke, Hussain Bohra