并发爬虫技术

学习并发爬虫技术,提高爬虫效率,让你的程序能够同时处理多个请求,充分利用系统资源。

第1章:进程与线程基础

在学习并发爬虫之前,我们需要先了解进程和线程的基本概念,这是并发编程的基础。

1.1 什么是进程?

进程是程序在执行过程中分配和管理资源的基本单位,每个进程都有自己独立的内存空间和系统资源。

📚 故事时间:工厂车间

想象一下,进程就像是工厂里的一个独立车间。每个车间有自己的设备、工人和原材料,它们之间相互独立工作。 车间之间要通信,必须通过特定的渠道。在计算机中,每个进程也有自己的内存空间,进程间通信需要特殊的机制。

1.2 什么是线程?

线程是进程内的一个执行单元,是CPU调度的基本单位。一个进程可以包含多个线程,它们共享进程的内存空间和资源。

📚 故事时间:车间里的工人

如果进程是工厂的车间,那么线程就是车间里的工人。多个工人可以在同一个车间里协作工作,共享车间的设备和原材料。 相比于创建新车间(新进程),雇佣新工人(新线程)的成本更低,而且工人之间交流也更方便。

1.3 进程与线程的区别

了解进程和线程的区别,有助于我们在实际开发中选择合适的并发方式。

特性 进程 线程
资源分配 拥有独立的内存空间和系统资源 共享进程的内存空间和资源
创建销毁开销 较大 较小
通信方式 需要特殊的IPC机制 可以直接共享变量
并发效率 适合CPU密集型任务 适合IO密集型任务

💡 练习题1:进程与线程识别

列出日常生活中5个可以用进程或线程概念类比的场景,并说明为什么。

💡 练习题2:选择合适的并发方式

分析以下任务,判断更适合使用进程还是线程来实现并发:

  • 批量下载网页内容
  • 进行大规模数据计算
  • 同时处理多个用户请求
  • 实时视频处理

第2章:多进程爬虫

多进程爬虫使用Python的multiprocessing模块来实现并发,适合CPU密集型的爬虫任务。

2.1 使用multiprocessing模块

multiprocessing模块提供了创建和管理进程的功能,让我们可以轻松实现多进程并发。

Python代码:基本的多进程爬虫
import requests
import multiprocessing
from bs4 import BeautifulSoup
import time

def fetch_url(url):
 """爬取单个URL的函数"""
 try:
 print(f"进程 {multiprocessing.current_process().name} 正在爬取: {url}")
 headers = {
 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36'
 }
 response = requests.get(url, headers=headers, timeout=10)
 # 简单解析页面标题
 soup = BeautifulSoup(response.text, 'html.parser')
 title = soup.title.string if soup.title else '无标题'
 return url, title
 except Exception as e:
 return url, f'爬取失败: {str(e)}'

def main():
 # 待爬取的URL列表
 urls = [
 'https://www.example.com/page1',
 'https://www.example.com/page2',
 'https://www.example.com/page3',
 'https://www.example.com/page4',
 'https://www.example.com/page5',
 'https://www.example.com/page6',
 'https://www.example.com/page7',
 'https://www.example.com/page8'
 ]

 # 记录开始时间
 start_time = time.time()

 # 创建进程池,最大进程数为4
 with multiprocessing.Pool(processes=4) as pool:
 # 使用进程池并发执行任务
 results = pool.map(fetch_url, urls)

 # 打印结果
 for url, title in results:
 print(f"URL: {url}, 标题: {title}")

 # 计算耗时
 end_time = time.time()
 print(f"总耗时: {end_time - start_time:.2f}秒")

if __name__ == '__main__':
 main()

📚 故事时间:团队分工

想象一下,你有一堆文件需要处理,如果你一个人做需要很长时间。但如果你把这些文件分给4个同事同时处理,那么完成时间会大大缩短。 这就是多进程的原理 - 将任务分给多个进程同时执行,充分利用多核CPU的优势。

💡 练习题1:调整进程池大小

修改上面的代码,尝试不同的进程池大小(如2、4、8),观察执行时间的变化,并分析原因。

💡 练习题2:结果处理优化

改进上面的代码,将爬取的结果保存到CSV文件中,包含URL、标题、爬取时间和HTTP状态码。

2.2 多进程间通信

在多进程爬虫中,有时我们需要在不同进程之间传递数据。multiprocessing提供了多种进程间通信机制。

Python代码:使用Queue进行进程间通信
import multiprocessing
import requests
import time

def producer(queue, urls):
 """生产者进程:爬取页面并将结果放入队列"""
 for url in urls:
 try:
 print(f"生产者进程正在爬取: {url}")
 headers = {
 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36'
 }
 response = requests.get(url, headers=headers, timeout=10)
 # 将结果放入队列
 queue.put((url, response.status_code, len(response.text)))
 time.sleep(0.5) # 模拟网络延迟
 except Exception as e:
 queue.put((url, '错误', str(e)))
 # 放入结束标志
 queue.put(None)

def consumer(queue, result_file):
 """消费者进程:从队列中取出结果并处理"""
 with open(result_file, 'w', encoding='utf-8') as f:
 f.write('URL,状态码,内容长度\n')
 while True:
 # 从队列中取出结果
 item = queue.get()
 # 如果收到结束标志,则退出循环
 if item is None:
 break
 url, status, content = item
 print(f"消费者进程处理: {url}, 状态: {status}")
 # 保存结果到文件
 f.write(f"{url},{status},{content}\n")
 f.flush()

def main():
 # 待爬取的URL列表
 urls = [
 'https://www.example.com/page1',
 'https://www.example.com/page2',
 'https://www.example.com/page3',
 'https://www.example.com/page4',
 'https://www.example.com/page5'
 ]

 # 创建一个队列用于进程间通信
 queue = multiprocessing.Queue()

 # 创建生产者进程
 producer_process = multiprocessing.Process(
 target=producer,
 args=(queue, urls)
 )

 # 创建消费者进程
 consumer_process = multiprocessing.Process(
 target=consumer,
 args=(queue, 'crawl_results.csv')
 )

 # 启动进程
 producer_process.start()
 consumer_process.start()

 # 等待进程结束
 producer_process.join()
 consumer_process.join()

 print("所有任务完成!")

if __name__ == '__main__':
 main()

💡 练习题1:多生产者多消费者模型

修改上面的代码,实现多个生产者进程和多个消费者进程的协作模式,提高爬取效率。

💡 练习题2:使用管道进行通信

尝试使用multiprocessing.Pipe代替Queue实现进程间通信,并比较两种方式的优缺点。

第3章:多线程爬虫

多线程爬虫使用Python的threading模块来实现并发,适合IO密集型的爬虫任务。

3.1 使用threading模块

threading模块提供了创建和管理线程的功能,让我们可以轻松实现多线程并发。

Python代码:基本的多线程爬虫
import requests
import threading
from bs4 import BeautifulSoup
import time
import queue

def fetch_url(url, results):
 """爬取单个URL的函数"""
 try:
 print(f"线程 {threading.current_thread().name} 正在爬取: {url}")
 headers = {
 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36'
 }
 response = requests.get(url, headers=headers, timeout=10)
 # 简单解析页面标题
 soup = BeautifulSoup(response.text, 'html.parser')
 title = soup.title.string if soup.title else '无标题'
 # 使用线程锁保护结果列表
 with threading.Lock():
 results.append((url, title))
 except Exception as e:
 with threading.Lock():
 results.append((url, f'爬取失败: {str(e)}'))

def main():
 # 待爬取的URL列表
 urls = [
 'https://www.example.com/page1',
 'https://www.example.com/page2',
 'https://www.example.com/page3',
 'https://www.example.com/page4',
 'https://www.example.com/page5',
 'https://www.example.com/page6',
 'https://www.example.com/page7',
 'https://www.example.com/page8'
 ]

 # 用于存储结果的列表
 results = []
 # 创建线程列表
 threads = []

 # 记录开始时间
 start_time = time.time()

 # 创建并启动线程
 for url in urls:
 thread = threading.Thread(target=fetch_url, args=(url, results))
 threads.append(thread)
 thread.start()

 # 等待所有线程完成
 for thread in threads:
 thread.join()

 # 打印结果
 for url, title in results:
 print(f"URL: {url}, 标题: {title}")

 # 计算耗时
 end_time = time.time()
 print(f"总耗时: {end_time - start_time:.2f}秒")

if __name__ == '__main__':
 main()

📚 故事时间:餐厅服务员

想象一下,在餐厅里,一个服务员可以同时处理多张桌子的客人需求。当客人点餐后,服务员可以先去处理其他客人的请求,而不必一直站在厨房等待菜品做好。 这类似于多线程的工作方式 - 当一个线程等待IO操作时,其他线程可以继续执行,充分利用CPU时间。

💡 练习题1:线程池的使用

修改上面的代码,使用concurrent.futures.ThreadPoolExecutor实现线程池,限制最大线程数量。

💡 练习题2:线程安全的数据采集

设计一个线程安全的数据采集器,可以从多个网站同时采集数据,并将结果汇总到一个共享的数据结构中。

3.2 线程安全与锁机制

在多线程编程中,当多个线程访问共享资源时,可能会出现数据竞争问题。我们需要使用锁机制来保证线程安全。

Python代码:使用线程锁保证数据安全
import threading
import time

# 共享资源
counter = 0
# 创建锁
lock = threading.Lock()

def increment_without_lock():
 """不使用锁的递增操作"""
 global counter
 for _ in range(100000):
 # 读取当前值
 current = counter
 # 模拟一些处理时间
 time.sleep(0)
 # 更新值
 counter = current + 1

def increment_with_lock():
 """使用锁的递增操作"""
 global counter
 for _ in range(100000):
 # 获取锁
 with lock:
 # 读取当前值
 current = counter
 # 模拟一些处理时间
 time.sleep(0)
 # 更新值
 counter = current + 1

def main():
 global counter

 # 测试不使用锁的情况
 counter = 0
 threads = []
 for _ in range(10):
 thread = threading.Thread(target=increment_without_lock)
 threads.append(thread)
 thread.start()

 for thread in threads:
 thread.join()

 print(f"不使用锁的最终结果: {counter} (期望值: 1000000)")

 # 测试使用锁的情况
 counter = 0
 threads = []
 for _ in range(10):
 thread = threading.Thread(target=increment_with_lock)
 threads.append(thread)
 thread.start()

 for thread in threads:
 thread.join()

 print(f"使用锁的最终结果: {counter} (期望值: 1000000)")

if __name__ == '__main__':
 main()

📚 故事时间:共享办公室

想象一下,办公室里有一台共享打印机。如果多个人同时使用打印机,可能会导致纸张卡住或打印混乱。 为了解决这个问题,可以设置一个"正在使用"的标志,当一个人使用打印机时,其他人需要等待。 这就是锁机制的原理 - 确保在同一时间只有一个线程可以访问共享资源。

💡 练习题1:死锁避免

编写一个程序,演示如何避免多线程中的死锁问题。创建两个以上的线程,每个线程需要获取多个锁。

💡 练习题2:读写锁实现

实现一个简单的读写锁,允许多个线程同时读取数据,但写入数据时需要独占访问。

第4章:异步爬虫

异步爬虫使用Python的asyncio库来实现并发,是一种基于事件循环的非阻塞IO模型,效率非常高。

4.1 asyncio基础

asyncio是Python的异步I/O库,提供了基于协程的并发编程支持。

Python代码:基本的异步爬虫
import asyncio
import aiohttp
from bs4 import BeautifulSoup
import time

async def fetch_url(session, url):
 """异步爬取单个URL的函数"""
 try:
 print(f"正在爬取: {url}")
 headers = {
 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36'
 }
 async with session.get(url, headers=headers, timeout=10) as response:
 # 等待响应完成
 text = await response.text()
 # 简单解析页面标题
 soup = BeautifulSoup(text, 'html.parser')
 title = soup.title.string if soup.title else '无标题'
 return url, title
 except Exception as e:
 return url, f'爬取失败: {str(e)}'

async def main():
 # 待爬取的URL列表
 urls = [
 'https://www.example.com/page1',
 'https://www.example.com/page2',
 'https://www.example.com/page3',
 'https://www.example.com/page4',
 'https://www.example.com/page5',
 'https://www.example.com/page6',
 'https://www.example.com/page7',
 'https://www.example.com/page8'
 ]

 # 记录开始时间
 start_time = time.time()

 # 创建异步会话
 async with aiohttp.ClientSession() as session:
 # 创建任务列表
 tasks = [fetch_url(session, url) for url in urls]
 # 并发执行所有任务
 results = await asyncio.gather(*tasks)

 # 打印结果
 for url, title in results:
 print(f"URL: {url}, 标题: {title}")

 # 计算耗时
 end_time = time.time()
 print(f"总耗时: {end_time - start_time:.2f}秒")

if __name__ == '__main__':
 # 运行异步主函数
 asyncio.run(main())

📚 故事时间:高效的服务员

想象一个非常高效的服务员,能够同时处理多张桌子。当他为一张桌子点餐后,不是站在厨房等待,而是去服务其他桌子。 当厨房准备好了,他会回来取餐并送到相应的桌子。这种工作方式比传统的多线程方式更高效,因为服务员不需要不断地在不同任务之间切换。 这就是异步编程的原理 - 通过事件循环和协程,避免了线程切换的开销,提高了并发效率。

💡 练习题1:异步任务优先级

修改上面的代码,实现一个带有优先级的异步爬虫,可以优先爬取重要的URL。

💡 练习题2:异步爬虫限速

实现一个限速机制,控制异步爬虫的请求频率,避免对目标服务器造成过大压力。

4.2 异步爬虫实战

下面我们通过一个实际的例子来演示异步爬虫的强大功能。

Python代码:异步爬虫实战 - 批量下载图片
import asyncio
import aiohttp
import os
import time
from pathlib import Path

async def download_image(session, image_url, save_dir):
 """异步下载单个图片"""
 try:
 # 获取文件名
 filename = os.path.basename(image_url.split('?')[0])
 filepath = os.path.join(save_dir, filename)

 # 发送请求
 async with session.get(image_url, timeout=15) as response:
 if response.status == 200:
 # 保存图片
 with open(filepath, 'wb') as f:
 f.write(await response.read())
 print(f"成功下载: {filename}")
 return True
 else:
 print(f"下载失败: {image_url}, 状态码: {response.status}")
 return False
 except Exception as e:
 print(f"下载异常: {image_url}, 错误: {str(e)}")
 return False

async def main():
 # 图片URL列表(示例URL,请替换为实际的图片URL)
 image_urls = [
 'https://example.com/image1.jpg',
 'https://example.com/image2.jpg',
 'https://example.com/image3.jpg',
 'https://example.com/image4.jpg',
 'https://example.com/image5.jpg',
 'https://example.com/image6.jpg',
 'https://example.com/image7.jpg',
 'https://example.com/image8.jpg',
 'https://example.com/image9.jpg',
 'https://example.com/image10.jpg'
 ]

 # 创建保存目录
 save_dir = 'downloaded_images'
 Path(save_dir).mkdir(exist_ok=True)

 # 记录开始时间
 start_time = time.time()

 # 限制并发数量
 semaphore = asyncio.Semaphore(5) # 最多同时下载5个图片

 async with aiohttp.ClientSession() as session:
 # 定义一个带限制的下载函数
 async def bounded_download(url):
 async with semaphore:
 return await download_image(session, url, save_dir)

 # 创建任务列表
 tasks = [bounded_download(url) for url in image_urls]
 # 并发执行所有任务
 results = await asyncio.gather(*tasks)

 # 计算耗时和成功率
 end_time = time.time()
 success_count = sum(results)

 print(f"下载完成!耗时: {end_time - start_time:.2f}秒")
 print(f"成功下载: {success_count}/{len(image_urls)} 张图片")

if __name__ == '__main__':
 # 运行异步主函数
 asyncio.run(main())

💡 练习题1:异步爬虫与数据库集成

实现一个异步爬虫,将爬取的数据异步保存到数据库中(可以使用aiomysql或aiosqlite等异步数据库驱动)。

💡 练习题2:异步爬虫监控系统

开发一个简单的监控系统,可以实时显示异步爬虫的进度、成功率和性能指标。

第5章:并发爬虫最佳实践

在实际应用中,我们需要根据具体情况选择合适的并发方式,并注意一些重要的事项。

5.1 并发方式选择

不同的并发方式有各自的优缺点,我们需要根据任务的特点选择合适的方式。

并发方式 优点 缺点 适用场景
多进程
  • 可以充分利用多核CPU
  • 进程间相互独立,一个崩溃不影响其他进程
  • 避免了GIL的限制
  • 创建和切换开销大
  • 进程间通信复杂
  • 内存占用大
CPU密集型任务,如大量数据处理、图像处理等
多线程
  • 创建和切换开销小
  • 线程间通信简单
  • 内存占用小
  • 受GIL限制,CPU密集型任务效率不高
  • 线程安全问题需要特别注意
  • 一个线程崩溃可能影响整个进程
IO密集型任务,如网络请求、文件操作等
异步IO
  • 极高的并发性能
  • 资源占用极少
  • 不需要考虑线程安全问题
  • 编程模型较复杂
  • 需要使用异步友好的库
  • 调试相对困难
高并发IO密集型任务,如大量网络请求

📚 故事时间:选择合适的交通工具

想象一下,你需要从北京去上海。如果距离很近,你可能会选择骑自行车;如果距离适中,你可能会选择汽车;如果距离很远,你可能会选择飞机。 同样,在编程中,我们需要根据任务的特点选择合适的并发方式。对于CPU密集型任务,多进程可能是更好的选择;对于IO密集型任务,异步IO可能更高效。

💡 练习题1:性能对比测试

实现同一个爬虫任务的三种版本(多进程、多线程、异步IO),对比它们在不同规模数据下的性能表现。

💡 练习题2:混合并发模式

尝试实现一个混合并发模式的爬虫,结合多进程和异步IO的优势,在多进程中使用异步IO处理请求。

5.2 并发爬虫注意事项

在使用并发爬虫时,我们需要注意一些重要事项,以避免出现问题。

1. 遵守robots.txt规则

在爬取网站之前,一定要查看并遵守网站的robots.txt规则,尊重网站的爬取限制。

2. 控制爬取频率

即使使用并发爬虫,也不要对网站造成过大压力。设置合理的延迟和并发数,避免被封IP。

3. 使用代理IP

对于大规模爬取任务,建议使用代理IP池,避免单个IP被封。

4. 异常处理

完善的异常处理机制是并发爬虫稳定运行的关键。确保每个请求都有适当的超时和错误处理。

5. 资源管理

注意管理系统资源,如内存、文件描述符等。对于长时间运行的爬虫,需要定期清理资源。

💡 练习题1:健壮的并发爬虫设计

设计一个健壮的并发爬虫系统,包含错误重试、IP代理、限速控制等功能。

💡 练习题2:爬虫监控与报警系统

实现一个监控系统,可以实时监控爬虫的运行状态,当出现异常时发送报警。