并发爬虫技术
学习并发爬虫技术,提高爬虫效率,让你的程序能够同时处理多个请求,充分利用系统资源。
第1章:进程与线程基础
在学习并发爬虫之前,我们需要先了解进程和线程的基本概念,这是并发编程的基础。
1.1 什么是进程?
进程是程序在执行过程中分配和管理资源的基本单位,每个进程都有自己独立的内存空间和系统资源。
📚 故事时间:工厂车间
想象一下,进程就像是工厂里的一个独立车间。每个车间有自己的设备、工人和原材料,它们之间相互独立工作。 车间之间要通信,必须通过特定的渠道。在计算机中,每个进程也有自己的内存空间,进程间通信需要特殊的机制。
1.2 什么是线程?
线程是进程内的一个执行单元,是CPU调度的基本单位。一个进程可以包含多个线程,它们共享进程的内存空间和资源。
📚 故事时间:车间里的工人
如果进程是工厂的车间,那么线程就是车间里的工人。多个工人可以在同一个车间里协作工作,共享车间的设备和原材料。 相比于创建新车间(新进程),雇佣新工人(新线程)的成本更低,而且工人之间交流也更方便。
1.3 进程与线程的区别
了解进程和线程的区别,有助于我们在实际开发中选择合适的并发方式。
| 特性 | 进程 | 线程 |
|---|---|---|
| 资源分配 | 拥有独立的内存空间和系统资源 | 共享进程的内存空间和资源 |
| 创建销毁开销 | 较大 | 较小 |
| 通信方式 | 需要特殊的IPC机制 | 可以直接共享变量 |
| 并发效率 | 适合CPU密集型任务 | 适合IO密集型任务 |
💡 练习题1:进程与线程识别
列出日常生活中5个可以用进程或线程概念类比的场景,并说明为什么。
💡 练习题2:选择合适的并发方式
分析以下任务,判断更适合使用进程还是线程来实现并发:
- 批量下载网页内容
- 进行大规模数据计算
- 同时处理多个用户请求
- 实时视频处理
第2章:多进程爬虫
多进程爬虫使用Python的multiprocessing模块来实现并发,适合CPU密集型的爬虫任务。
2.1 使用multiprocessing模块
multiprocessing模块提供了创建和管理进程的功能,让我们可以轻松实现多进程并发。
import requests
import multiprocessing
from bs4 import BeautifulSoup
import time
def fetch_url(url):
"""爬取单个URL的函数"""
try:
print(f"进程 {multiprocessing.current_process().name} 正在爬取: {url}")
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36'
}
response = requests.get(url, headers=headers, timeout=10)
# 简单解析页面标题
soup = BeautifulSoup(response.text, 'html.parser')
title = soup.title.string if soup.title else '无标题'
return url, title
except Exception as e:
return url, f'爬取失败: {str(e)}'
def main():
# 待爬取的URL列表
urls = [
'https://www.example.com/page1',
'https://www.example.com/page2',
'https://www.example.com/page3',
'https://www.example.com/page4',
'https://www.example.com/page5',
'https://www.example.com/page6',
'https://www.example.com/page7',
'https://www.example.com/page8'
]
# 记录开始时间
start_time = time.time()
# 创建进程池,最大进程数为4
with multiprocessing.Pool(processes=4) as pool:
# 使用进程池并发执行任务
results = pool.map(fetch_url, urls)
# 打印结果
for url, title in results:
print(f"URL: {url}, 标题: {title}")
# 计算耗时
end_time = time.time()
print(f"总耗时: {end_time - start_time:.2f}秒")
if __name__ == '__main__':
main()
📚 故事时间:团队分工
想象一下,你有一堆文件需要处理,如果你一个人做需要很长时间。但如果你把这些文件分给4个同事同时处理,那么完成时间会大大缩短。 这就是多进程的原理 - 将任务分给多个进程同时执行,充分利用多核CPU的优势。
💡 练习题1:调整进程池大小
修改上面的代码,尝试不同的进程池大小(如2、4、8),观察执行时间的变化,并分析原因。
💡 练习题2:结果处理优化
改进上面的代码,将爬取的结果保存到CSV文件中,包含URL、标题、爬取时间和HTTP状态码。
2.2 多进程间通信
在多进程爬虫中,有时我们需要在不同进程之间传递数据。multiprocessing提供了多种进程间通信机制。
import multiprocessing
import requests
import time
def producer(queue, urls):
"""生产者进程:爬取页面并将结果放入队列"""
for url in urls:
try:
print(f"生产者进程正在爬取: {url}")
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36'
}
response = requests.get(url, headers=headers, timeout=10)
# 将结果放入队列
queue.put((url, response.status_code, len(response.text)))
time.sleep(0.5) # 模拟网络延迟
except Exception as e:
queue.put((url, '错误', str(e)))
# 放入结束标志
queue.put(None)
def consumer(queue, result_file):
"""消费者进程:从队列中取出结果并处理"""
with open(result_file, 'w', encoding='utf-8') as f:
f.write('URL,状态码,内容长度\n')
while True:
# 从队列中取出结果
item = queue.get()
# 如果收到结束标志,则退出循环
if item is None:
break
url, status, content = item
print(f"消费者进程处理: {url}, 状态: {status}")
# 保存结果到文件
f.write(f"{url},{status},{content}\n")
f.flush()
def main():
# 待爬取的URL列表
urls = [
'https://www.example.com/page1',
'https://www.example.com/page2',
'https://www.example.com/page3',
'https://www.example.com/page4',
'https://www.example.com/page5'
]
# 创建一个队列用于进程间通信
queue = multiprocessing.Queue()
# 创建生产者进程
producer_process = multiprocessing.Process(
target=producer,
args=(queue, urls)
)
# 创建消费者进程
consumer_process = multiprocessing.Process(
target=consumer,
args=(queue, 'crawl_results.csv')
)
# 启动进程
producer_process.start()
consumer_process.start()
# 等待进程结束
producer_process.join()
consumer_process.join()
print("所有任务完成!")
if __name__ == '__main__':
main()
💡 练习题1:多生产者多消费者模型
修改上面的代码,实现多个生产者进程和多个消费者进程的协作模式,提高爬取效率。
💡 练习题2:使用管道进行通信
尝试使用multiprocessing.Pipe代替Queue实现进程间通信,并比较两种方式的优缺点。
第3章:多线程爬虫
多线程爬虫使用Python的threading模块来实现并发,适合IO密集型的爬虫任务。
3.1 使用threading模块
threading模块提供了创建和管理线程的功能,让我们可以轻松实现多线程并发。
import requests
import threading
from bs4 import BeautifulSoup
import time
import queue
def fetch_url(url, results):
"""爬取单个URL的函数"""
try:
print(f"线程 {threading.current_thread().name} 正在爬取: {url}")
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36'
}
response = requests.get(url, headers=headers, timeout=10)
# 简单解析页面标题
soup = BeautifulSoup(response.text, 'html.parser')
title = soup.title.string if soup.title else '无标题'
# 使用线程锁保护结果列表
with threading.Lock():
results.append((url, title))
except Exception as e:
with threading.Lock():
results.append((url, f'爬取失败: {str(e)}'))
def main():
# 待爬取的URL列表
urls = [
'https://www.example.com/page1',
'https://www.example.com/page2',
'https://www.example.com/page3',
'https://www.example.com/page4',
'https://www.example.com/page5',
'https://www.example.com/page6',
'https://www.example.com/page7',
'https://www.example.com/page8'
]
# 用于存储结果的列表
results = []
# 创建线程列表
threads = []
# 记录开始时间
start_time = time.time()
# 创建并启动线程
for url in urls:
thread = threading.Thread(target=fetch_url, args=(url, results))
threads.append(thread)
thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()
# 打印结果
for url, title in results:
print(f"URL: {url}, 标题: {title}")
# 计算耗时
end_time = time.time()
print(f"总耗时: {end_time - start_time:.2f}秒")
if __name__ == '__main__':
main()
📚 故事时间:餐厅服务员
想象一下,在餐厅里,一个服务员可以同时处理多张桌子的客人需求。当客人点餐后,服务员可以先去处理其他客人的请求,而不必一直站在厨房等待菜品做好。 这类似于多线程的工作方式 - 当一个线程等待IO操作时,其他线程可以继续执行,充分利用CPU时间。
💡 练习题1:线程池的使用
修改上面的代码,使用concurrent.futures.ThreadPoolExecutor实现线程池,限制最大线程数量。
💡 练习题2:线程安全的数据采集
设计一个线程安全的数据采集器,可以从多个网站同时采集数据,并将结果汇总到一个共享的数据结构中。
3.2 线程安全与锁机制
在多线程编程中,当多个线程访问共享资源时,可能会出现数据竞争问题。我们需要使用锁机制来保证线程安全。
import threading
import time
# 共享资源
counter = 0
# 创建锁
lock = threading.Lock()
def increment_without_lock():
"""不使用锁的递增操作"""
global counter
for _ in range(100000):
# 读取当前值
current = counter
# 模拟一些处理时间
time.sleep(0)
# 更新值
counter = current + 1
def increment_with_lock():
"""使用锁的递增操作"""
global counter
for _ in range(100000):
# 获取锁
with lock:
# 读取当前值
current = counter
# 模拟一些处理时间
time.sleep(0)
# 更新值
counter = current + 1
def main():
global counter
# 测试不使用锁的情况
counter = 0
threads = []
for _ in range(10):
thread = threading.Thread(target=increment_without_lock)
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
print(f"不使用锁的最终结果: {counter} (期望值: 1000000)")
# 测试使用锁的情况
counter = 0
threads = []
for _ in range(10):
thread = threading.Thread(target=increment_with_lock)
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
print(f"使用锁的最终结果: {counter} (期望值: 1000000)")
if __name__ == '__main__':
main()
📚 故事时间:共享办公室
想象一下,办公室里有一台共享打印机。如果多个人同时使用打印机,可能会导致纸张卡住或打印混乱。 为了解决这个问题,可以设置一个"正在使用"的标志,当一个人使用打印机时,其他人需要等待。 这就是锁机制的原理 - 确保在同一时间只有一个线程可以访问共享资源。
💡 练习题1:死锁避免
编写一个程序,演示如何避免多线程中的死锁问题。创建两个以上的线程,每个线程需要获取多个锁。
💡 练习题2:读写锁实现
实现一个简单的读写锁,允许多个线程同时读取数据,但写入数据时需要独占访问。
第4章:异步爬虫
异步爬虫使用Python的asyncio库来实现并发,是一种基于事件循环的非阻塞IO模型,效率非常高。
4.1 asyncio基础
asyncio是Python的异步I/O库,提供了基于协程的并发编程支持。
import asyncio
import aiohttp
from bs4 import BeautifulSoup
import time
async def fetch_url(session, url):
"""异步爬取单个URL的函数"""
try:
print(f"正在爬取: {url}")
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36'
}
async with session.get(url, headers=headers, timeout=10) as response:
# 等待响应完成
text = await response.text()
# 简单解析页面标题
soup = BeautifulSoup(text, 'html.parser')
title = soup.title.string if soup.title else '无标题'
return url, title
except Exception as e:
return url, f'爬取失败: {str(e)}'
async def main():
# 待爬取的URL列表
urls = [
'https://www.example.com/page1',
'https://www.example.com/page2',
'https://www.example.com/page3',
'https://www.example.com/page4',
'https://www.example.com/page5',
'https://www.example.com/page6',
'https://www.example.com/page7',
'https://www.example.com/page8'
]
# 记录开始时间
start_time = time.time()
# 创建异步会话
async with aiohttp.ClientSession() as session:
# 创建任务列表
tasks = [fetch_url(session, url) for url in urls]
# 并发执行所有任务
results = await asyncio.gather(*tasks)
# 打印结果
for url, title in results:
print(f"URL: {url}, 标题: {title}")
# 计算耗时
end_time = time.time()
print(f"总耗时: {end_time - start_time:.2f}秒")
if __name__ == '__main__':
# 运行异步主函数
asyncio.run(main())
📚 故事时间:高效的服务员
想象一个非常高效的服务员,能够同时处理多张桌子。当他为一张桌子点餐后,不是站在厨房等待,而是去服务其他桌子。 当厨房准备好了,他会回来取餐并送到相应的桌子。这种工作方式比传统的多线程方式更高效,因为服务员不需要不断地在不同任务之间切换。 这就是异步编程的原理 - 通过事件循环和协程,避免了线程切换的开销,提高了并发效率。
💡 练习题1:异步任务优先级
修改上面的代码,实现一个带有优先级的异步爬虫,可以优先爬取重要的URL。
💡 练习题2:异步爬虫限速
实现一个限速机制,控制异步爬虫的请求频率,避免对目标服务器造成过大压力。
4.2 异步爬虫实战
下面我们通过一个实际的例子来演示异步爬虫的强大功能。
import asyncio
import aiohttp
import os
import time
from pathlib import Path
async def download_image(session, image_url, save_dir):
"""异步下载单个图片"""
try:
# 获取文件名
filename = os.path.basename(image_url.split('?')[0])
filepath = os.path.join(save_dir, filename)
# 发送请求
async with session.get(image_url, timeout=15) as response:
if response.status == 200:
# 保存图片
with open(filepath, 'wb') as f:
f.write(await response.read())
print(f"成功下载: {filename}")
return True
else:
print(f"下载失败: {image_url}, 状态码: {response.status}")
return False
except Exception as e:
print(f"下载异常: {image_url}, 错误: {str(e)}")
return False
async def main():
# 图片URL列表(示例URL,请替换为实际的图片URL)
image_urls = [
'https://example.com/image1.jpg',
'https://example.com/image2.jpg',
'https://example.com/image3.jpg',
'https://example.com/image4.jpg',
'https://example.com/image5.jpg',
'https://example.com/image6.jpg',
'https://example.com/image7.jpg',
'https://example.com/image8.jpg',
'https://example.com/image9.jpg',
'https://example.com/image10.jpg'
]
# 创建保存目录
save_dir = 'downloaded_images'
Path(save_dir).mkdir(exist_ok=True)
# 记录开始时间
start_time = time.time()
# 限制并发数量
semaphore = asyncio.Semaphore(5) # 最多同时下载5个图片
async with aiohttp.ClientSession() as session:
# 定义一个带限制的下载函数
async def bounded_download(url):
async with semaphore:
return await download_image(session, url, save_dir)
# 创建任务列表
tasks = [bounded_download(url) for url in image_urls]
# 并发执行所有任务
results = await asyncio.gather(*tasks)
# 计算耗时和成功率
end_time = time.time()
success_count = sum(results)
print(f"下载完成!耗时: {end_time - start_time:.2f}秒")
print(f"成功下载: {success_count}/{len(image_urls)} 张图片")
if __name__ == '__main__':
# 运行异步主函数
asyncio.run(main())
💡 练习题1:异步爬虫与数据库集成
实现一个异步爬虫,将爬取的数据异步保存到数据库中(可以使用aiomysql或aiosqlite等异步数据库驱动)。
💡 练习题2:异步爬虫监控系统
开发一个简单的监控系统,可以实时显示异步爬虫的进度、成功率和性能指标。
第5章:并发爬虫最佳实践
在实际应用中,我们需要根据具体情况选择合适的并发方式,并注意一些重要的事项。
5.1 并发方式选择
不同的并发方式有各自的优缺点,我们需要根据任务的特点选择合适的方式。
| 并发方式 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| 多进程 |
|
|
CPU密集型任务,如大量数据处理、图像处理等 |
| 多线程 |
|
|
IO密集型任务,如网络请求、文件操作等 |
| 异步IO |
|
|
高并发IO密集型任务,如大量网络请求 |
📚 故事时间:选择合适的交通工具
想象一下,你需要从北京去上海。如果距离很近,你可能会选择骑自行车;如果距离适中,你可能会选择汽车;如果距离很远,你可能会选择飞机。 同样,在编程中,我们需要根据任务的特点选择合适的并发方式。对于CPU密集型任务,多进程可能是更好的选择;对于IO密集型任务,异步IO可能更高效。
💡 练习题1:性能对比测试
实现同一个爬虫任务的三种版本(多进程、多线程、异步IO),对比它们在不同规模数据下的性能表现。
💡 练习题2:混合并发模式
尝试实现一个混合并发模式的爬虫,结合多进程和异步IO的优势,在多进程中使用异步IO处理请求。
5.2 并发爬虫注意事项
在使用并发爬虫时,我们需要注意一些重要事项,以避免出现问题。
1. 遵守robots.txt规则
在爬取网站之前,一定要查看并遵守网站的robots.txt规则,尊重网站的爬取限制。
2. 控制爬取频率
即使使用并发爬虫,也不要对网站造成过大压力。设置合理的延迟和并发数,避免被封IP。
3. 使用代理IP
对于大规模爬取任务,建议使用代理IP池,避免单个IP被封。
4. 异常处理
完善的异常处理机制是并发爬虫稳定运行的关键。确保每个请求都有适当的超时和错误处理。
5. 资源管理
注意管理系统资源,如内存、文件描述符等。对于长时间运行的爬虫,需要定期清理资源。
💡 练习题1:健壮的并发爬虫设计
设计一个健壮的并发爬虫系统,包含错误重试、IP代理、限速控制等功能。
💡 练习题2:爬虫监控与报警系统
实现一个监控系统,可以实时监控爬虫的运行状态,当出现异常时发送报警。