并发爬虫技术

学习并发爬虫技术,提高爬虫效率,让你的程序能够同时处理多个请求,充分利用系统资源。

第1章:进程与线程基础

在学习并发爬虫之前,我们需要先了解进程和线程的基本概念,这是并发编程的基础。

1.1 什么是进程?

进程是程序在执行过程中分配和管理资源的基本单位,每个进程都有自己独立的内存空间和系统资源。

📚 故事时间:工厂车间

想象一下,进程就像是工厂里的一个独立车间。每个车间有自己的设备、工人和原材料,它们之间相互独立工作。 车间之间要通信,必须通过特定的渠道。在计算机中,每个进程也有自己的内存空间,进程间通信需要特殊的机制。

1.2 什么是线程?

线程是进程内的一个执行单元,是CPU调度的基本单位。一个进程可以包含多个线程,它们共享进程的内存空间和资源。

📚 故事时间:车间里的工人

如果进程是工厂的车间,那么线程就是车间里的工人。多个工人可以在同一个车间里协作工作,共享车间的设备和原材料。 相比于创建新车间(新进程),雇佣新工人(新线程)的成本更低,而且工人之间交流也更方便。

1.3 进程与线程的区别

了解进程和线程的区别,有助于我们在实际开发中选择合适的并发方式。

特性 进程 线程
资源分配 拥有独立的内存空间和系统资源 共享进程的内存空间和资源
创建销毁开销 较大 较小
通信方式 需要特殊的IPC机制 可以直接共享变量
并发效率 适合CPU密集型任务 适合IO密集型任务

💡 练习题1:进程与线程识别

列出日常生活中5个可以用进程或线程概念类比的场景,并说明为什么。

💡 练习题2:选择合适的并发方式

分析以下任务,判断更适合使用进程还是线程来实现并发:

  • 批量下载网页内容
  • 进行大规模数据计算
  • 同时处理多个用户请求
  • 实时视频处理

第2章:多进程爬虫

多进程爬虫使用Python的multiprocessing模块来实现并发,适合CPU密集型的爬虫任务。

2.1 使用multiprocessing模块

multiprocessing模块提供了创建和管理进程的功能,让我们可以轻松实现多进程并发。

Python代码:基本的多进程爬虫
import requests
import multiprocessing
from bs4 import BeautifulSoup
import time

def fetch_url(url):
    """爬取单个URL的函数"""
    try:
        print(f"进程 {multiprocessing.current_process().name} 正在爬取: {url}")
        headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36'
        }
        response = requests.get(url, headers=headers, timeout=10)
        # 简单解析页面标题
        soup = BeautifulSoup(response.text, 'html.parser')
        title = soup.title.string if soup.title else '无标题'
        return url, title
    except Exception as e:
        return url, f'爬取失败: {str(e)}'

def main():
    # 待爬取的URL列表
    urls = [
        'https://www.example.com/page1',
        'https://www.example.com/page2',
        'https://www.example.com/page3',
        'https://www.example.com/page4',
        'https://www.example.com/page5',
        'https://www.example.com/page6',
        'https://www.example.com/page7',
        'https://www.example.com/page8'
    ]
    
    # 记录开始时间
    start_time = time.time()
    
    # 创建进程池,最大进程数为4
    with multiprocessing.Pool(processes=4) as pool:
        # 使用进程池并发执行任务
        results = pool.map(fetch_url, urls)
    
    # 打印结果
    for url, title in results:
        print(f"URL: {url}, 标题: {title}")
    
    # 计算耗时
    end_time = time.time()
    print(f"总耗时: {end_time - start_time:.2f}秒")

if __name__ == '__main__':
    main()

📚 故事时间:团队分工

想象一下,你有一堆文件需要处理,如果你一个人做需要很长时间。但如果你把这些文件分给4个同事同时处理,那么完成时间会大大缩短。 这就是多进程的原理 - 将任务分给多个进程同时执行,充分利用多核CPU的优势。

💡 练习题1:调整进程池大小

修改上面的代码,尝试不同的进程池大小(如2、4、8),观察执行时间的变化,并分析原因。

💡 练习题2:结果处理优化

改进上面的代码,将爬取的结果保存到CSV文件中,包含URL、标题、爬取时间和HTTP状态码。

2.2 多进程间通信

在多进程爬虫中,有时我们需要在不同进程之间传递数据。multiprocessing提供了多种进程间通信机制。

Python代码:使用Queue进行进程间通信
import multiprocessing
import requests
import time

def producer(queue, urls):
    """生产者进程:爬取页面并将结果放入队列"""
    for url in urls:
        try:
            print(f"生产者进程正在爬取: {url}")
            headers = {
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36'
            }
            response = requests.get(url, headers=headers, timeout=10)
            # 将结果放入队列
            queue.put((url, response.status_code, len(response.text)))
            time.sleep(0.5)  # 模拟网络延迟
        except Exception as e:
            queue.put((url, '错误', str(e)))
    # 放入结束标志
    queue.put(None)

def consumer(queue, result_file):
    """消费者进程:从队列中取出结果并处理"""
    with open(result_file, 'w', encoding='utf-8') as f:
        f.write('URL,状态码,内容长度\n')
        while True:
            # 从队列中取出结果
            item = queue.get()
            # 如果收到结束标志,则退出循环
            if item is None:
                break
            url, status, content = item
            print(f"消费者进程处理: {url}, 状态: {status}")
            # 保存结果到文件
            f.write(f"{url},{status},{content}\n")
            f.flush()

def main():
    # 待爬取的URL列表
    urls = [
        'https://www.example.com/page1',
        'https://www.example.com/page2',
        'https://www.example.com/page3',
        'https://www.example.com/page4',
        'https://www.example.com/page5'
    ]
    
    # 创建一个队列用于进程间通信
    queue = multiprocessing.Queue()
    
    # 创建生产者进程
    producer_process = multiprocessing.Process(
        target=producer, 
        args=(queue, urls)
    )
    
    # 创建消费者进程
    consumer_process = multiprocessing.Process(
        target=consumer, 
        args=(queue, 'crawl_results.csv')
    )
    
    # 启动进程
    producer_process.start()
    consumer_process.start()
    
    # 等待进程结束
    producer_process.join()
    consumer_process.join()
    
    print("所有任务完成!")

if __name__ == '__main__':
    main()

💡 练习题1:多生产者多消费者模型

修改上面的代码,实现多个生产者进程和多个消费者进程的协作模式,提高爬取效率。

💡 练习题2:使用管道进行通信

尝试使用multiprocessing.Pipe代替Queue实现进程间通信,并比较两种方式的优缺点。

第3章:多线程爬虫

多线程爬虫使用Python的threading模块来实现并发,适合IO密集型的爬虫任务。

3.1 使用threading模块

threading模块提供了创建和管理线程的功能,让我们可以轻松实现多线程并发。

Python代码:基本的多线程爬虫
import requests
import threading
from bs4 import BeautifulSoup
import time
import queue

def fetch_url(url, results):
    """爬取单个URL的函数"""
    try:
        print(f"线程 {threading.current_thread().name} 正在爬取: {url}")
        headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36'
        }
        response = requests.get(url, headers=headers, timeout=10)
        # 简单解析页面标题
        soup = BeautifulSoup(response.text, 'html.parser')
        title = soup.title.string if soup.title else '无标题'
        # 使用线程锁保护结果列表
        with threading.Lock():
            results.append((url, title))
    except Exception as e:
        with threading.Lock():
            results.append((url, f'爬取失败: {str(e)}'))

def main():
    # 待爬取的URL列表
    urls = [
        'https://www.example.com/page1',
        'https://www.example.com/page2',
        'https://www.example.com/page3',
        'https://www.example.com/page4',
        'https://www.example.com/page5',
        'https://www.example.com/page6',
        'https://www.example.com/page7',
        'https://www.example.com/page8'
    ]
    
    # 用于存储结果的列表
    results = []
    # 创建线程列表
    threads = []
    
    # 记录开始时间
    start_time = time.time()
    
    # 创建并启动线程
    for url in urls:
        thread = threading.Thread(target=fetch_url, args=(url, results))
        threads.append(thread)
        thread.start()
    
    # 等待所有线程完成
    for thread in threads:
        thread.join()
    
    # 打印结果
    for url, title in results:
        print(f"URL: {url}, 标题: {title}")
    
    # 计算耗时
    end_time = time.time()
    print(f"总耗时: {end_time - start_time:.2f}秒")

if __name__ == '__main__':
    main()

📚 故事时间:餐厅服务员

想象一下,在餐厅里,一个服务员可以同时处理多张桌子的客人需求。当客人点餐后,服务员可以先去处理其他客人的请求,而不必一直站在厨房等待菜品做好。 这类似于多线程的工作方式 - 当一个线程等待IO操作时,其他线程可以继续执行,充分利用CPU时间。

💡 练习题1:线程池的使用

修改上面的代码,使用concurrent.futures.ThreadPoolExecutor实现线程池,限制最大线程数量。

💡 练习题2:线程安全的数据采集

设计一个线程安全的数据采集器,可以从多个网站同时采集数据,并将结果汇总到一个共享的数据结构中。

3.2 线程安全与锁机制

在多线程编程中,当多个线程访问共享资源时,可能会出现数据竞争问题。我们需要使用锁机制来保证线程安全。

Python代码:使用线程锁保证数据安全
import threading
import time

# 共享资源
counter = 0
# 创建锁
lock = threading.Lock()

def increment_without_lock():
    """不使用锁的递增操作"""
    global counter
    for _ in range(100000):
        # 读取当前值
        current = counter
        # 模拟一些处理时间
        time.sleep(0)
        # 更新值
        counter = current + 1

def increment_with_lock():
    """使用锁的递增操作"""
    global counter
    for _ in range(100000):
        # 获取锁
        with lock:
            # 读取当前值
            current = counter
            # 模拟一些处理时间
            time.sleep(0)
            # 更新值
            counter = current + 1

def main():
    global counter
    
    # 测试不使用锁的情况
    counter = 0
    threads = []
    for _ in range(10):
        thread = threading.Thread(target=increment_without_lock)
        threads.append(thread)
        thread.start()
    
    for thread in threads:
        thread.join()
    
    print(f"不使用锁的最终结果: {counter} (期望值: 1000000)")
    
    # 测试使用锁的情况
    counter = 0
    threads = []
    for _ in range(10):
        thread = threading.Thread(target=increment_with_lock)
        threads.append(thread)
        thread.start()
    
    for thread in threads:
        thread.join()
    
    print(f"使用锁的最终结果: {counter} (期望值: 1000000)")

if __name__ == '__main__':
    main()

📚 故事时间:共享办公室

想象一下,办公室里有一台共享打印机。如果多个人同时使用打印机,可能会导致纸张卡住或打印混乱。 为了解决这个问题,可以设置一个"正在使用"的标志,当一个人使用打印机时,其他人需要等待。 这就是锁机制的原理 - 确保在同一时间只有一个线程可以访问共享资源。

💡 练习题1:死锁避免

编写一个程序,演示如何避免多线程中的死锁问题。创建两个以上的线程,每个线程需要获取多个锁。

💡 练习题2:读写锁实现

实现一个简单的读写锁,允许多个线程同时读取数据,但写入数据时需要独占访问。

第4章:异步爬虫

异步爬虫使用Python的asyncio库来实现并发,是一种基于事件循环的非阻塞IO模型,效率非常高。

4.1 asyncio基础

asyncio是Python的异步I/O库,提供了基于协程的并发编程支持。

Python代码:基本的异步爬虫
import asyncio
import aiohttp
from bs4 import BeautifulSoup
import time

async def fetch_url(session, url):
    """异步爬取单个URL的函数"""
    try:
        print(f"正在爬取: {url}")
        headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36'
        }
        async with session.get(url, headers=headers, timeout=10) as response:
            # 等待响应完成
            text = await response.text()
            # 简单解析页面标题
            soup = BeautifulSoup(text, 'html.parser')
            title = soup.title.string if soup.title else '无标题'
            return url, title
    except Exception as e:
        return url, f'爬取失败: {str(e)}'

async def main():
    # 待爬取的URL列表
    urls = [
        'https://www.example.com/page1',
        'https://www.example.com/page2',
        'https://www.example.com/page3',
        'https://www.example.com/page4',
        'https://www.example.com/page5',
        'https://www.example.com/page6',
        'https://www.example.com/page7',
        'https://www.example.com/page8'
    ]
    
    # 记录开始时间
    start_time = time.time()
    
    # 创建异步会话
    async with aiohttp.ClientSession() as session:
        # 创建任务列表
        tasks = [fetch_url(session, url) for url in urls]
        # 并发执行所有任务
        results = await asyncio.gather(*tasks)
    
    # 打印结果
    for url, title in results:
        print(f"URL: {url}, 标题: {title}")
    
    # 计算耗时
    end_time = time.time()
    print(f"总耗时: {end_time - start_time:.2f}秒")

if __name__ == '__main__':
    # 运行异步主函数
    asyncio.run(main())

📚 故事时间:高效的服务员

想象一个非常高效的服务员,能够同时处理多张桌子。当他为一张桌子点餐后,不是站在厨房等待,而是去服务其他桌子。 当厨房准备好了,他会回来取餐并送到相应的桌子。这种工作方式比传统的多线程方式更高效,因为服务员不需要不断地在不同任务之间切换。 这就是异步编程的原理 - 通过事件循环和协程,避免了线程切换的开销,提高了并发效率。

💡 练习题1:异步任务优先级

修改上面的代码,实现一个带有优先级的异步爬虫,可以优先爬取重要的URL。

💡 练习题2:异步爬虫限速

实现一个限速机制,控制异步爬虫的请求频率,避免对目标服务器造成过大压力。

4.2 异步爬虫实战

下面我们通过一个实际的例子来演示异步爬虫的强大功能。

Python代码:异步爬虫实战 - 批量下载图片
import asyncio
import aiohttp
import os
import time
from pathlib import Path

async def download_image(session, image_url, save_dir):
    """异步下载单个图片"""
    try:
        # 获取文件名
        filename = os.path.basename(image_url.split('?')[0])
        filepath = os.path.join(save_dir, filename)
        
        # 发送请求
        async with session.get(image_url, timeout=15) as response:
            if response.status == 200:
                # 保存图片
                with open(filepath, 'wb') as f:
                    f.write(await response.read())
                print(f"成功下载: {filename}")
                return True
            else:
                print(f"下载失败: {image_url}, 状态码: {response.status}")
                return False
    except Exception as e:
        print(f"下载异常: {image_url}, 错误: {str(e)}")
        return False

async def main():
    # 图片URL列表(示例URL,请替换为实际的图片URL)
    image_urls = [
        'https://example.com/image1.jpg',
        'https://example.com/image2.jpg',
        'https://example.com/image3.jpg',
        'https://example.com/image4.jpg',
        'https://example.com/image5.jpg',
        'https://example.com/image6.jpg',
        'https://example.com/image7.jpg',
        'https://example.com/image8.jpg',
        'https://example.com/image9.jpg',
        'https://example.com/image10.jpg'
    ]
    
    # 创建保存目录
    save_dir = 'downloaded_images'
    Path(save_dir).mkdir(exist_ok=True)
    
    # 记录开始时间
    start_time = time.time()
    
    # 限制并发数量
    semaphore = asyncio.Semaphore(5)  # 最多同时下载5个图片
    
    async with aiohttp.ClientSession() as session:
        # 定义一个带限制的下载函数
        async def bounded_download(url):
            async with semaphore:
                return await download_image(session, url, save_dir)
        
        # 创建任务列表
        tasks = [bounded_download(url) for url in image_urls]
        # 并发执行所有任务
        results = await asyncio.gather(*tasks)
    
    # 计算耗时和成功率
    end_time = time.time()
    success_count = sum(results)
    
    print(f"下载完成!耗时: {end_time - start_time:.2f}秒")
    print(f"成功下载: {success_count}/{len(image_urls)} 张图片")

if __name__ == '__main__':
    # 运行异步主函数
    asyncio.run(main())

💡 练习题1:异步爬虫与数据库集成

实现一个异步爬虫,将爬取的数据异步保存到数据库中(可以使用aiomysql或aiosqlite等异步数据库驱动)。

💡 练习题2:异步爬虫监控系统

开发一个简单的监控系统,可以实时显示异步爬虫的进度、成功率和性能指标。

第5章:并发爬虫最佳实践

在实际应用中,我们需要根据具体情况选择合适的并发方式,并注意一些重要的事项。

5.1 并发方式选择

不同的并发方式有各自的优缺点,我们需要根据任务的特点选择合适的方式。

并发方式 优点 缺点 适用场景
多进程
  • 可以充分利用多核CPU
  • 进程间相互独立,一个崩溃不影响其他进程
  • 避免了GIL的限制
  • 创建和切换开销大
  • 进程间通信复杂
  • 内存占用大
CPU密集型任务,如大量数据处理、图像处理等
多线程
  • 创建和切换开销小
  • 线程间通信简单
  • 内存占用小
  • 受GIL限制,CPU密集型任务效率不高
  • 线程安全问题需要特别注意
  • 一个线程崩溃可能影响整个进程
IO密集型任务,如网络请求、文件操作等
异步IO
  • 极高的并发性能
  • 资源占用极少
  • 不需要考虑线程安全问题
  • 编程模型较复杂
  • 需要使用异步友好的库
  • 调试相对困难
高并发IO密集型任务,如大量网络请求

📚 故事时间:选择合适的交通工具

想象一下,你需要从北京去上海。如果距离很近,你可能会选择骑自行车;如果距离适中,你可能会选择汽车;如果距离很远,你可能会选择飞机。 同样,在编程中,我们需要根据任务的特点选择合适的并发方式。对于CPU密集型任务,多进程可能是更好的选择;对于IO密集型任务,异步IO可能更高效。

💡 练习题1:性能对比测试

实现同一个爬虫任务的三种版本(多进程、多线程、异步IO),对比它们在不同规模数据下的性能表现。

💡 练习题2:混合并发模式

尝试实现一个混合并发模式的爬虫,结合多进程和异步IO的优势,在多进程中使用异步IO处理请求。

5.2 并发爬虫注意事项

在使用并发爬虫时,我们需要注意一些重要事项,以避免出现问题。

1. 遵守robots.txt规则

在爬取网站之前,一定要查看并遵守网站的robots.txt规则,尊重网站的爬取限制。

2. 控制爬取频率

即使使用并发爬虫,也不要对网站造成过大压力。设置合理的延迟和并发数,避免被封IP。

3. 使用代理IP

对于大规模爬取任务,建议使用代理IP池,避免单个IP被封。

4. 异常处理

完善的异常处理机制是并发爬虫稳定运行的关键。确保每个请求都有适当的超时和错误处理。

5. 资源管理

注意管理系统资源,如内存、文件描述符等。对于长时间运行的爬虫,需要定期清理资源。

💡 练习题1:健壮的并发爬虫设计

设计一个健壮的并发爬虫系统,包含错误重试、IP代理、限速控制等功能。

💡 练习题2:爬虫监控与报警系统

实现一个监控系统,可以实时监控爬虫的运行状态,当出现异常时发送报警。