2017-03-12 6 views
0

ich benutze python twisted zu streamen daten von twitter streaming api.Es gibt zwei schritte kurz. 1) get access_token 2) benutze access_token, um eine Anfrage für die Daten zu machen. Schritt 1 funktioniert völlig in Ordnung, aber in Schritt 2 bekomme ich einen Fehler von bad requeststatus 400. Warum ist das so? Ich denke es ist, weil Twitter HTTP1.1 verwendet und verdrillt wird HTTP1.0 von taubult. dann, wie Verbindungen zu HTTP1.1twisted twitter streaming api schlechte anfrage fehler

EDIT aktualisieren: Hier ist meine Fehlermeldung

HTTP/1.0 400 Bad Request 
content-length: 0 
date: Sun, 12 Mar 2017 14:57:13 GMT 
server: tsa 
x-connection-hash: dca361a2b4214ad66203e9912b05cf7f 

[Failure instance: Traceback (failure with no frames): <class 'twisted.internet.error.ConnectionDone'>: Connection was closed cleanly. 

.

#!/usr/bin/python 
import oauth2 as oauth 
import urlparse 
import time 
import webbrowser 
from twisted.internet import reactor, protocol, ssl 
from twisted.web import http 


CONSUMER_KEY = 'xxxx' 
CONSUMER_SECRET = 'xxxx' 
CONSUMER = oauth.Consumer(CONSUMER_KEY, CONSUMER_SECRET) 

ACCESS_TOKEN_FILE = 'OAUTH_ACCESS_TOKEN' 

TWITTER_REQUEST_TOKEN_URL = 'https://twitter.com/oauth/request_token' 
TWITTER_ACCESS_TOKEN_URL = 'https://twitter.com/oauth/access_token' 
TWITTER_AUTHORIZE_URL = 'https://twitter.com/oauth/authorize' 
TWITTER_STREAM_API_HOST = 'stream.twitter.com' 
TWITTER_STREAM_API_PATH = '/1.1/statuses/sample.json' 


class TwitterStreamer(http.HTTPClient): 
    def connectionMade(self): 
     self.sendCommand('GET', self.factory.url) 
     self.sendHeader('Host', self.factory.host) 
     self.sendHeader('User-Agent', self.factory.agent) 
     self.sendHeader('Authorization', self.factory.oauth_header) 
     self.endHeaders() 

    def handleStatus(self, version, status, message): 
     if status != '200': 
      self.factory.tweetError(ValueError("bad status")) 

    def lineReceived(self, line): 
     self.factory.tweetReceived(line) 

    def connectionLost(self, reason): 
     self.factory.tweetError(reason) 


class TwitterStreamerFactory(protocol.ClientFactory): 
    protocol = TwitterStreamer 

    def __init__(self, oauth_header): 
     self.url = TWITTER_STREAM_API_PATH 
     self.agent = 'Twisted/TwitterStreamer' 
     self.host = TWITTER_STREAM_API_HOST 
     self.oauth_header = oauth_header 

    def clientConnectionFailed(self, _, reason): 
     self.tweetError(reason) 

    def tweetReceived(self, tweet): 
     print tweet 

    def tweetError(self, error): 
     print error 


def save_access_token(key, secret): 
    with open(ACCESS_TOKEN_FILE, 'w') as f: 
     f.write("ACCESS_KEY=%s\n" % key) 
     f.write("ACCESS_SECRET=%s\n" % secret) 


def load_access_token(): 
    with open(ACCESS_TOKEN_FILE) as f: 
     lines = f.readlines() 

    str_key = lines[0].strip().split('=')[1] 
    str_secret = lines[1].strip().split('=')[1] 
    return oauth.Token(key=str_key, secret=str_secret) 


def fetch_access_token(): 
    CONSUMER_KEY = 'xxxxxxxx' 
    CONSUMER_SECRET = 'xxxxxxxxx' 
    ACCESS_KEY="xxxxxxx" 
    ACCESS_SECRET="xxxxxxxxx" 
    consumer = oauth.Consumer(key=CONSUMER_KEY, secret=CONSUMER_SECRET) 
    access_token = oauth.Token(key=ACCESS_KEY, secret=ACCESS_SECRET) 

    return (access_token.key, access_token.secret) 


def build_authorization_header(access_token): 
    url = "https://%s%s" % (TWITTER_STREAM_API_HOST, TWITTER_STREAM_API_PATH) 
    params = { 
     'oauth_version': "1.0", 
     'oauth_nonce': oauth.generate_nonce(), 
     'oauth_timestamp': str(int(time.time())), 
     'oauth_token': access_token.key, 
     'oauth_consumer_key': CONSUMER.key 
    } 

    # Sign the request. 
    # For some messed up reason, we need to specify is_form_encoded to prevent 
    # the oauth2 library from setting oauth_body_hash which Twitter doesn't like. 
    req = oauth.Request(method="GET", url=url, parameters=params, is_form_encoded=True) 
    req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), CONSUMER, access_token) 

    # Grab the Authorization header 
    header = req.to_header()['Authorization'].encode('utf-8') 
    print "Authorization header:" 
    print "  header = %s" % header 
    return header 

if __name__ == '__main__': 
    # Check if we have saved an access token before. 
    try: 
     f = open(ACCESS_TOKEN_FILE) 
    except IOError: 
     # No saved access token. Do the 3-legged OAuth dance and fetch one. 
     (access_token_key, access_token_secret) = fetch_access_token() 
     # Save the access token for next time. 
     save_access_token(access_token_key, access_token_secret) 

    # Load access token from disk. 
    access_token = load_access_token() 

    # Build Authorization header from the access_token. 
    auth_header = build_authorization_header(access_token) 

    # Twitter stream using the Authorization header. 
    twsf = TwitterStreamerFactory(auth_header) 
    reactor.connectSSL(TWITTER_STREAM_API_HOST, 443, twsf, ssl.ClientContextFactory()) 
    reactor.run() 

UPDATE: Arbeitscode:

import base64, urllib 
from twisted.internet import reactor 
from twisted.internet.defer import Deferred 
from twisted.protocols import basic 
from twisted.python.failure import DefaultException 
from twisted.web.client import Agent 
from twisted.web.http_headers import Headers 
import json 
import oauth2 as oauth 
import time 
from twisted.web import server,resource 
from twisted.internet import endpoints 
from twisted.web.server import Site 
CONSUMER_KEY = 'xxxxxxxxxxxx' 
CONSUMER_SECRET = 'xxxxxxxxxxxxxx' 
TWITTER_STREAM_API_HOST = 'stream.twitter.com' 
TWITTER_STREAM_API_PATH = '/1.1/statuses/sample.json' 
ACCESS_TOKEN_FILE = 'OAUTH_ACCESS_TOKEN' 

CONSUMER = oauth.Consumer(CONSUMER_KEY, CONSUMER_SECRET) 


def callback(result): 
    print result 
def errback(error): 
    print error 

class StreamingParser(basic.LineReceiver): 
    delimiter = '\r\n' 

    def __init__(self, user_callback, user_errback): 
     self.user_callback = user_callback 
     self.user_errback = user_errback 

    def lineReceived(self, line): 
     d = Deferred() 
     d.addCallback(self.user_callback) 
     d.addErrback(self.user_errback) 
     line = line.strip() 
     print line,'........' 
     try: 
      d.callback(json.loads(line)) 
     except ValueError, e: 
      if self.user_errback: 
       d.errback(e) 

    def connectionLost(self, reason): 
     if self.user_errback: 
      d = Deferred() 
      d.addErrback(self.user_errback) 
      d.errback(DefaultException(reason.getErrorMessage())) 

def _get_response(response, callback, errback): 
    print 'got response......' 
    response.deliverBody(StreamingParser(callback, errback)) 
    return Deferred() 

def _shutdown(reason, errback): 
    d = Deferred() 
    d.addErrback(errback) 
    d.errback(reason) 
    if reactor.running: 
     reactor.stop() 

def save_access_token(key, secret): 
    with open(ACCESS_TOKEN_FILE, 'w') as f: 
    f.write("ACCESS_KEY=%s\n" % key) 
    f.write("ACCESS_SECRET=%s\n" % secret) 


def load_access_token(): 
    with open(ACCESS_TOKEN_FILE) as f: 
    lines = f.readlines() 

    str_key = lines[0].strip().split('=')[1] 
    str_secret = lines[1].strip().split('=')[1] 
    return oauth.Token(key=str_key, secret=str_secret) 


def fetch_access_token(): 
    ACCESS_KEY="xxxxx-xxxx" 
    ACCESS_SECRET="xxxxxxxxxxxx" 
    access_token = oauth.Token(key=ACCESS_KEY, secret=ACCESS_SECRET) 
    return (access_token.key, access_token.secret) 


def make_header(access_token): 
    url = "https://%s%s" % (TWITTER_STREAM_API_HOST, TWITTER_STREAM_API_PATH) 
    params = { 
    # "Authorization": "Oauth %s" % auth, 
    "oauth_version": "1.0", 
    "oauth_nonce": oauth.generate_nonce(), 
    "oauth_timestamp": str(int(time.time())), 
    "oauth_token": access_token.key, 
    "oauth_consumer_key": CONSUMER.key 
    } 

    req = oauth.Request(method="GET", url=url, parameters=params, is_form_encoded=True) 
    req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), CONSUMER, access_token) 
    header = req.to_header()['Authorization'].encode('utf-8') 
    print "Authorization header:" 
    print "  header = %s" % header 
    return header 

def start_streaming(): 
    print 'streaming started...........' 
    try: 
     f = open(ACCESS_TOKEN_FILE) 
    except IOError: 
     access_token_key, access_token_secret = fetch_access_token() 
     save_access_token(access_token_key, access_token_secret) 

    access_token = load_access_token() 
    auth_header = make_header(access_token) 
    url = 'https://stream.twitter.com/1.1/statuses/sample.json' 
    headers = Headers({ 
     'User-Agent': ['TwistedSTreamReciever'], 
     'Authorization': [auth_header]}) 
    agent = Agent(reactor) 
    d = agent.request('GET', url, headers, None) 
    d.addCallback(_get_response, callback, errback) 
    d.addBoth(_shutdown, errback) 
    # reactor.run() 

class _Stream(resource.Resource): 
    isLeaf = True 
    def render_GET(self, request): 
     start_streaming()# Streaming started here....... 
     time.sleep(8) # wait for 8 seconds... 
     ########.........??? stop streaming here?? 
     return "<html>streaming started...........%s</html>" % (time.ctime(),) 


if __name__ == "__main__": 

    resource = _Stream() 
    factory = Site(resource) 
    endpoint = endpoints.TCP4ServerEndpoint(reactor, 8880) 
    endpoint.listen(factory) 
    reactor.run() 
+2

Warum verwenden Sie 'twisted.web.http.HTTPClient'? Verwenden Sie stattdessen "Agent". –

+0

@ Jean-PaulCalderone danke für die Antwort. Ich werde versuchen, Agent – anekix

+0

zu verwenden, ich habe es mit 'Agent' arbeiten. Ich habe auch einen HTTP-Endpunkt definiert, um das Streaming zu starten. jetzt nur ein kleiner Hinweis, den ich brauche, um das Streaming zu stoppen (vielleicht stoppen Sie die Verbindung von "Agent"). Ich habe meinen aktualisierten Code für Referenz – anekix

Antwort

0

auf Aufgeben eines bestimmten Streaming-Antwort zu lesen (was kann es erforderlich sein scheint - ich vermute, diese Twitter-Streams nie auf eigene Faust beenden) und Schließen Sie die Verbindung, die dieser Anfrage/Antwort zugeordnet ist (da HTTP keine andere Möglichkeit bietet, auf eine Antwort zu verzichten), verwenden Sie die Methode transport.loseConnection des Body Delivery-Protokolls. So zum Beispiel:

def _get_response(response, callback, errback): 
    print 'got response......' 
    proto = StreamingParser(callback, errback) 
    save_stream_by_name(stream_name, proto) 
    response.deliverBody(proto) 
    return Deferred() 

Wenn Sie mit diesem Strom getan:

pop_stream_by_name(stream_name).transport.loseConnection()