2

Diese Frage ist eine Folge zu this one. Ich versuche, Apache Beam zu verwenden, um Daten aus einer Google-Schlüssel-Tabelle zu lesen (und dann einige Datenverarbeitung). Ich schrieb das folgende Mindest Beispiel des Java-SDK:Fehler bei der Verwendung von SpannerIO in Apache-Balken

package com.google.cloud.dataflow.examples; 
import java.io.IOException; 
import org.apache.beam.sdk.Pipeline; 
import org.apache.beam.sdk.PipelineResult; 
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO; 
import org.apache.beam.sdk.options.PipelineOptions; 
import org.apache.beam.sdk.options.PipelineOptionsFactory; 
import org.apache.beam.sdk.values.PCollection; 
import com.google.cloud.spanner.Struct; 

public class backup { 

    public static void main(String[] args) throws IOException { 
    PipelineOptions options = PipelineOptionsFactory.create(); 

    Pipeline p = Pipeline.create(options); 
    PCollection<Struct> rows = p.apply(
      SpannerIO.read() 
       .withInstanceId("my_instance") 
       .withDatabaseId("my_db") 
       .withQuery("SELECT t.table_name FROM information_schema.tables AS t") 
       ); 

    PipelineResult result = p.run(); 
    try { 
     result.waitUntilFinish(); 
    } catch (Exception exc) { 
     result.cancel(); 
    } 
    } 
} 

Wenn ich versuche, den Code mit dem DirectRunner auszuführen, erhalte ich die folgende Fehlermeldung:

org.apache.beam.runners.direct .repackaged.com.google.common.util.concurrent.UncheckedExecutionException:

org.apache.beam.sdk.util.UserCodeException: java.lang.NoClassDefFoundError: Could not initialize class com.google.cloud.spanner.spi.v1.SpannerErrorInterceptor

[...] Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.NoClassDefFoundError: Could not initialize class com.google.cloud.spanner.spi.v1.SpannerErrorInterceptor

[...] Caused by: java.lang.NoClassDefFoundError: Could not initialize class com.google.cloud.spanner.spi.v1.SpannerErrorInterceptor

Oder mit den DataflowRunner:

org.apache.beam.runners.direct.repackaged.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.beam.sdk.util.UserCodeException: java.lang.NoSuchFieldError: internal_static_google_rpc_LocalizedMessage_fieldAccessorTable

[...] Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.NoSuchFieldError: internal_static_google_rpc_LocalizedMessage_fieldAccessorTable

[...] Caused by: java.lang.NoSuchFieldError: internal_static_google_rpc_LocalizedMessage_fieldAccessorTable

In beiden Fällen ist die Fehlermeldung eher kryptisch, und ich konnte keine klaren Ideen finden, was den Fehler bei einer Google-Suche verursacht. Ich konnte auch keine Beispielskripte mit dem SpannerIO-Modul finden.

Ist dieser Fehler auf einen offensichtlichen Fehler in meinem Code zurückzuführen, oder liegt es an einer fehlerhaften Installation der Google Cloud-Tools?

+2

Argh, Sie treffen wahrscheinlich Abhängigkeitskonflikt https://issues.apache.org/jira/browse/BEAM-2837. Es wurde behoben, aber wir müssen auf neue Version des Balkens warten. Sie können Beam Binaries selbst aus der Quelle erstellen oder verwenden Sie diesen Trick in Ihrer pom.xml https://gist.github.com/mairbek/0c770ff7b591e3db58936b0b9294215a –

+1

Oh. Vielen Dank ! Ich schätze, ich probiere den Fehler aus. –

Antwort

1

Sie müssen die ProjectID angeben:

SpannerIO.read() 
      .withProjectId("my_project") 
      .withInstanceId("my_instance") 
      .withDatabaseId("my_db") 

Und Sie müssen die Anmeldeinformationen für Ihr Spanner Projekt einzustellen. Da die API von SpannerIO es nicht erlaubt, benutzerdefinierte Anmeldeinformationen festzulegen, müssen Sie die Anmeldeinformationen für globale Anwendungen mit der Umgebungsvariablen GOOGLE_APPLICATION_CREDENTIALS festlegen.

Sie könnten auch mithilfe von JDBC nach Cloud Spanner lesen (und schreiben). Das Lesen wird wie folgt durchgeführt:

 PCollection<KV<String, Long>> words = p2.apply(JdbcIO.<KV<String, Long>> read() 
      .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("nl.topicus.jdbc.CloudSpannerDriver", 
        "jdbc:cloudspanner://localhost;Project=my-project-id;Instance=instance-id;Database=database;PvtKeyPath=C:\\Users\\MyUserName\\Documents\\CloudSpannerKeys\\cloudspanner-key.json")) 
      .withQuery("SELECT t.table_name FROM information_schema.tables AS t").withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianLongCoder.of())) 
      .withRowMapper(new JdbcIO.RowMapper<KV<String, Long>>() 
      { 
       private static final long serialVersionUID = 1L; 

       @Override 
       public KV<String, Long> mapRow(ResultSet resultSet) throws Exception 
       { 
        return KV.of(resultSet.getString(1), resultSet.getLong(2)); 
       } 
      })); 

Diese Methode ermöglicht es Ihnen auch individuelle Anmeldeinformationen zu verwenden, indem die PvtKeyPath Einstellung. Sie können auch mit JDBC an Google Cloud Spanner schreiben. Sehen Sie sich hier ein Beispiel an: http://www.googlecloudspanner.com/2017/10/google-cloud-spanner-with-apache-beam.html

+0

Ich hatte tatsächlich die "projectID" -Zeile vergessen, obwohl das Hinzufügen den Fehler nicht behob. In der Tat verwende ich das Eclipse Google Cloud Tools-Plugin und bin in meinem Google-Konto angemeldet. Also sollte sich das um die Anmeldeinformationen kümmern? Ich muss möglicherweise versuchen, die JDBC-Version. –

1

Dieses Problem wird höchstwahrscheinlich durch das hier beschriebene Problem der Abhängigkeitskompatibilität verursacht: BEAM-2837. Hier ist eine schneller in einen der Kommentare in dem JIRA Problem beschrieben Abhilfe:

<dependency> 
    <groupId>com.google.api.grpc</groupId> 
    <artifactId>grpc-google-common-protos</artifactId> 
    <version>0.1.9</version> 
</dependency> 

<dependency> 
    <groupId>org.apache.beam</groupId> 
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId> 
    <version>${beam.version}</version> 
    <exclusions> 
     <exclusion> 
      <groupId>com.google.api.grpc</groupId> 
      <artifactId>grpc-google-common-protos</artifactId> 
     </exclusion> 
    </exclusions> 
</dependency> 

Explizit die erforderlichen com.google.api.grpc Abhängigkeit definieren und die Version von org.apache.beam auszuschließen.

+0

Danke! Um ehrlich zu sein, endete ich damit, das Python-SDK zu verwenden und ein benutzerdefiniertes ParDo zum Lesen/Schreiben zu einem Schlüssel zu machen. –

Verwandte Themen