Ich habe versucht, asynchrone Crawler und synchrone Crawler zu erstellen, und jetzt habe ich ein Problem, die Ergebnisse des Crawlens sind unterschiedlich, aber sie sollen gleich sein (Ergebnisse des Crawlens sind gleich wenn nur die Tiefe 1 ist.Asynchroner Crawler funktioniert nicht richtig [Python]

from bs4 import BeautifulSoup 
import networkx as nx 
import urllib 
import urllib.request 
from urllib.parse import urlparse 
from urllib.parse import urljoin 
import time 
import asyncio 
import aiohttp 
from contextlib import closing 

class Crawler: 

def __init__(self, delay, depth): 
    self.delay = delay 
    self.graph = nx.DiGraph() 
    self.list_of_links = list() 
    self.crawled_urls = list() 
    self.depth = depth 

def validate_url(self, url): 
    """Check if url is valid""" 
    return 'http' in urlparse(url).scheme 

def run(self, async, start_list): 
    if async: 
     t1 = time.time() 
     self.async_crawl(start_list, self.depth) 
     t2 = time.time() 
     print('Async seconds passed: ', t2 - t1) 

     t1 = time.time() 
     for elem in start_list: 
      self.crawl(elem, self.depth) 
     t2 = time.time() 
     print('Sync seconds passed: ', t2 - t1) 
    print('Links crawled: ', len(self.crawled_urls)) 
    print('Edges stored: ', len(self.list_of_links)) 
    print('Depth: ', self.depth) 

def crawl(self, url, depth): 
    if url in self.crawled_urls: 
     return [] 
    if depth and self.validate_url(url): 
     links = self.get_links(url) 
     for link in links: 
      self.list_of_links.append((url, link)) 
      self.crawl(link, depth - 1) 
     return [] 

async def fetch_page(self, session, url): 
    """Get one page.""" 
    if url in self.crawled_urls: 
     return [] 
     with aiohttp.Timeout(10): 
      async with session.get(url) as response: 
       assert response.status == 200 
       new_urls = self.parse_for_links(url, await response.text()) 
       for new_url in new_urls: 
        self.list_of_links.append((url, new_url)) 
       return new_urls 
     return [] 

def async_crawl(self, urls, depth): 
    """Get multiple pages.""" 
    if depth: 
     with closing(asyncio.get_event_loop()) as loop: 
      with aiohttp.ClientSession(loop=loop) as session: 
       tasks = [self.fetch_page(session, url) for url in urls if self.validate_url(url)] 
       new_urls = loop.run_until_complete(asyncio.gather(*tasks)) 
       if new_urls: 
        self.async_crawl(new_urls[0], depth - 1) 

def parse_for_links(self, url, text): 
    soup = BeautifulSoup(text, "html.parser") 
    return [urljoin(url, tag['href']) for tag in soup.findAll('a', href=True)] 

def get_links(self, url): 
     req = urllib.request.urlopen(url) 
     req = map(lambda x: x.decode('utf-8'), req) 
     return self.parse_for_links(url, ''.join(list(req))) 
     return [] 

def reset(self): 
    self.list_of_links = list() 
    self.crawled_urls = list() 
    self.graph = nx.DiGraph() 

def visualize(self): 
    nx.write_gexf(self.graph, "graph.gexf") 

test2 = ['http://www.aclweb.org/anthology/'] 
cr = Crawler(10, 2) 
cr.run(True, test2) 
cr.run(False, test2) 

Wie zum Beispiel werde ich Ihnen eine meiner Testfälle zeigen:

Async seconds passed: 13.632593870162964 
Links crawled: 371 
Edges stored: 15374 
Depth: 2 
Sync seconds passed: 385.6858592033386 
Links crawled: 371 
Edges stored: 102755 
Depth: 2 

Sie warten nie, bis 'self.async_crawl()' in Ihrer rekursiven Definition von 'async_crawl' endet. Außerdem erzeugt Ihr rekursiver Aufruf Tausende von Ereignisschleifen, was wahrscheinlich nicht das ist, was Sie tun möchten. Implementieren Sie diese Funktion stattdessen mit einer Queue und einer Art Mutex, um die Parallelität zu begrenzen. – Blender


@Blender können Sie mir sagen, wo kann ich einige Artikel über die Warteschlange finden? –



beste Art und Weise nutzen Produzenten und Konsumenten für asynchrones Arbeiten Vielleicht.

import asyncio 
    import aiohttp 

    from redd import redis.db.data #just module for take data 

    query = asyncio.Queue() 
    locker = [] 

    async def producer(num): 
     baseurl = redis.db.data 
     while True: 
       url = next(baseurl) 
      except StopIteration: 
       print('Producer {} end'.format(num)) 
       await query.put(url) 

    async def consumer(num): 
     flag = True 

     while flag: 
      url = await query.get() 
      async with aiohttp.ClientSession(loop=loop) as session: 
       async with session.get(url) as response: 
         result = await response.read() 
      if query.empty() and locker[num] is not True: 
       locker[num] = True 
       print('Thread number {} is END: {}'.format(num, locker[num])) 
      if False not in locker: 
       for task in asyncio.Task.all_tasks(): 

    loop = asyncio.get_event_loop() 

    for i in range(2): 

    for i in range(5): 

