2017-08-30 1 views
1

I neu enter image description hereSCHRITTE Fehler in Pyspark Kommunikation über RPC in RabbitMQ

bin mit RabbitMQ für pyspark Kommunikation von Remote-PC über Zweck Testen RPC.For ich einen Test-Code entwickelt, die mir den Fehler geben ich verfolgt haben RabbitMQ doc Tutorial RPC über pyspark

hier für die Implementierung ist mein Funke RPC-Server-Code

import pika 
from tkinter import* 
from pyspark.sql import SparkSession 
from pyspark import SparkConf,SparkContext 
import json 
import re 



connectionparam=pika.ConnectionParameters(host="localhost") 
connection=pika.BlockingConnection(connectionparam) 

channel=connection.channel() 

channel.queue_declare(queue='rpc_queue') 







spark=SparkSession.builder.config("spark.sql.warehouse.dir", "C:\spark\spark-warehouse")\ 
    \ 
    .appName("TestApp").\ 
    enableHiveSupport().getOrCreate() 

print("success") 
#establishhing chraracter 
#sqlstring="SELECT lflow1.LeaseType as LeaseType, lflow1.Status as Status, lflow1.Property as property, lflow1.City as City, lesflow2.DealType as DealType, lesflow2.Area as Area, lflow1.Did as DID, lesflow2.MID as MID from lflow1, lesflow2 WHERE lflow1.Did = lesflow2.MID" 



def queryBuilder(sqlval): 
    print("printing",sqlval) 
    df=spark.sql(sqlval) 
    print("printing data frame table") 
    df.show() 

    resultlist = df.toJSON().collect() 
    dumpdata = re.sub(r"\'", "", str(resultlist)) 
    jsondata = json.dumps(dumpdata) 
    #print(jsondata) 
    return jsondata 


def on_request(ch,method,props, body): 
    n=body 
    print("printing request body ",n) 
    response=queryBuilder(n) 
    ch.basic_publish(exchange='', 
        routing_key=props.reply_to, 
        properties=pika.BasicProperties(correlation_id=props.correlation_id), 
        body=response 
        ) 
    ch.basic_ack(delivery_tag=method.delivery_tag) 


channel.basic_qos(prefetch_count=1) 
channel.basic_consume(on_request,queue='rpc_queue') 
print("[x] Awaiting RPC Request") 

channel.start_consuming() 

master=Tk() 
entryval=Entry(master) 
entryval.grid(row=0,column=1) 
Button(master,text='Quit',command=master.quit).grid(row=3,column=1,sticky=W,pady=50) 
mainloop() 

und meine folgenden RPC-Client-Code für Fern pyspark applicati auf

import pika 
import uuid 

class SparkRpcClient(object): 
    def __init__(self): 
     self.connection = pika.BlockingConnection(pika.ConnectionParameters(
       host='localhost')) 

     self.channel = self.connection.channel() 

     result = self.channel.queue_declare(exclusive=True) 
     self.callback_queue = result.method.queue 

     self.channel.basic_consume(self.on_response, no_ack=True, 
            queue=self.callback_queue) 

    def on_response(self, ch, method, props, body): 
     if self.corr_id == props.correlation_id: 
      self.response = body 

    def call(self, querymsg): 
     self.response = None 
     self.corr_id = str(uuid.uuid4()) 
     self.channel.basic_publish(exchange='', 
            routing_key='rpc_queue', 
            properties=pika.BasicProperties(
             reply_to = self.callback_queue, 
             correlation_id = self.corr_id, 
             ), 
            body=querymsg) 
     while self.response is None: 
      self.connection.process_data_events() 
     return int(self.response) 

sparkrpc = SparkRpcClient() 
sqlstring="SELECT lflow1.LeaseType as LeaseType, lflow1.Status as Status, lflow1.Property as property, lflow1.City as City, lesflow2.DealType as DealType, lesflow2.Area as Area, lflow1.Did as DID, lesflow2.MID as MID from lflow1, lesflow2 WHERE lflow1.Did = lesflow2.MID" 


print(" [x] Requesting query") 
response = sparkrpc.call(sqlstring) 
print(" [.] Got %s" % response) 

Mein Server ist erhalten hat bereits die Anfrage Zeichenfolge von Client und es drucken, aber es könnte die SqlString und Rück json Daten nicht funktioniert auf meinem querybuild() Funktion, die zu verarbeiten. Mehr darüber habe ich mehrfach angefordert und es scheint, dass die individuelle Anfrage in der RPC-Warteschlange eingereiht ist, aber nicht gelöscht wurde. Denn wenn ich nur das Server-Skript laufe, bekomme ich denselben Fehler. Vielleicht fehle ich hier etwas, kann mir jemand helfen, es herauszufinden. Ich möchte nur json Daten an den Client Vielen Dank im Voraus Kalyan

Antwort

1

Sie vorbei nicht kompatiblen Typ zurückzukehren (sieht aus wie entweder bytes oder bytearray), wo str erwartet wird.

Sie sollten decode den Inhalt zu Zeichenfolge zuerst.

def queryBuilder(sqlval, enc): 
    ... 
    df = spark.sql(sqlval.decode(enc)) 
    df.show() 
    ... 
+0

danke ich habe gerade Byte-Stream decodiert – Kalyan