反爬虫技术详解
在网络爬虫的世界中,了解反爬虫技术并掌握相应的应对策略,是成为一名优秀爬虫工程师的必备技能。
第1章:常见反爬虫手段
网站为了保护自己的数据,经常会采用各种反爬虫技术。了解这些技术对于开发有效的爬虫至关重要。
1.1 User-Agent检测
很多网站会检查请求头中的User-Agent字段,判断是否为爬虫程序。正常浏览器会发送特定格式的User-Agent,而默认的爬虫请求通常会暴露自己的身份。
📚 故事时间:伪装身份
想象一下,你是一名记者想混进一场重要会议。如果直接说"我是记者",可能会被拒之门外。但如果你穿上正装,拿着笔记本,假装是参会人员,就更容易进入会场。 同样,爬虫也需要伪装成正常的浏览器,通过设置合适的User-Agent来绕过检测。
import requests
url = 'https://www.example.com'
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36'
}
response = requests.get(url, headers=headers)
print(response.status_code)
💡 练习题1:User-Agent轮换
实现一个User-Agent轮换池,每次请求随机选择一个User-Agent发送请求。
📝 参考答案:
import requests
import random
class UserAgentRotator:
def __init__(self):
self.user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:95.0) Gecko/20100101 Firefox/95.0',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/15.1 Safari/605.1.15'
]
def get_random_user_agent(self):
return random.choice(self.user_agents)
# 使用示例
rotator = UserAgentRotator()
headers = {
'User-Agent': rotator.get_random_user_agent()
}
response = requests.get('https://www.example.com', headers=headers)
print(f"使用的User-Agent: {headers['User-Agent']}")
print(f"响应状态码: {response.status_code}")
💡 实现要点:
- 创建UserAgentRotator类管理多个User-Agent
- 使用random.choice()随机选择User-Agent
- 每次请求都使用不同的User-Agent
- 可以扩展为从文件或API动态加载User-Agent列表
💡 练习题2:检测User-Agent限制
编写一个程序,测试不同User-Agent对访问同一网站的影响,找出哪些User-Agent会被拒绝。
📝 参考答案:
import requests
import time
class UserAgentTester:
def __init__(self, target_url):
self.target_url = target_url
self.user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)',
'python-requests/2.28.1', # 默认requests User-Agent
'Mozilla/5.0 (compatible; Bingbot/2.0; +http://www.bing.com/bingbot.htm)',
'Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)'
]
def test_user_agents(self):
results = {}
for ua in self.user_agents:
headers = {'User-Agent': ua}
try:
start_time = time.time()
response = requests.get(self.target_url, headers=headers, timeout=10)
response_time = time.time() - start_time
results[ua] = {
'status_code': response.status_code,
'response_time': response_time,
'content_length': len(response.content),
'success': response.status_code == 200
}
print(f"User-Agent: {ua[:50]}...")
print(f"状态码: {response.status_code}, 响应时间: {response_time:.2f}s")
print("-" * 50)
# 添加延迟避免被封
time.sleep(1)
except Exception as e:
results[ua] = {
'error': str(e),
'success': False
}
print(f"User-Agent: {ua[:50]}... 失败: {e}")
return results
# 使用示例
tester = UserAgentTester('https://www.example.com')
results = tester.test_user_agents()
# 分析结果
print("\n=== 测试结果分析 ===")
for ua, result in results.items():
if result.get('success'):
print(f"✅ {ua[:30]}... - 成功访问")
else:
print(f"❌ {ua[:30]}... - 被拒绝或失败")
💡 实现要点:
- 测试不同User-Agent对同一网站的访问效果
- 记录状态码、响应时间、内容长度等指标
- 识别哪些User-Agent会被网站拒绝
- 添加延迟避免触发网站的反爬虫机制
💡 练习题3:动态生成User-Agent
创建一个函数,可以根据需要生成不同浏览器、不同操作系统的User-Agent字符串。
📝 参考答案:
import random
class UserAgentGenerator:
def __init__(self):
self.browsers = {
'chrome': {
'name': 'Chrome',
'versions': ['96.0.4664.110', '97.0.4692.71', '98.0.4758.102'],
'webkit_versions': ['537.36', '537.36', '537.36']
},
'firefox': {
'name': 'Firefox',
'versions': ['95.0', '96.0', '97.0', '98.0'],
'gecko_versions': ['20100101', '20100101', '20100101']
},
'safari': {
'name': 'Safari',
'versions': ['605.1.15', '605.1.15', '605.1.15'],
'webkit_versions': ['605.1.15', '605.1.15', '605.1.15']
}
}
self.operating_systems = {
'windows': {
'name': 'Windows NT 10.0',
'archs': ['Win64; x64', 'Win64; x64', 'Win64; x64']
},
'mac': {
'name': 'Macintosh',
'archs': ['Intel Mac OS X 10_15_7', 'Intel Mac OS X 11_6_0', 'Intel Mac OS X 12_2_0']
},
'linux': {
'name': 'X11',
'archs': ['Linux x86_64', 'Linux x86_64', 'Linux x86_64']
}
}
def generate_chrome_ua(self, os_type):
os_info = self.operating_systems[os_type]
browser = self.browsers['chrome']
version = random.choice(browser['versions'])
webkit_version = random.choice(browser['webkit_versions'])
arch = random.choice(os_info['archs'])
return f'Mozilla/5.0 ({os_info["name"]}; {arch}) AppleWebKit/{webkit_version} (KHTML, like Gecko) Chrome/{version} Safari/{webkit_version}'
def generate_firefox_ua(self, os_type):
os_info = self.operating_systems[os_type]
browser = self.browsers['firefox']
version = random.choice(browser['versions'])
gecko_version = random.choice(browser['gecko_versions'])
arch = random.choice(os_info['archs'])
return f'Mozilla/5.0 ({os_info["name"]}; {arch}; rv:{version}) Gecko/{gecko_version} Firefox/{version}'
def generate_safari_ua(self, os_type):
os_info = self.operating_systems[os_type]
browser = self.browsers['safari']
version = random.choice(browser['versions'])
webkit_version = random.choice(browser['webkit_versions'])
arch = random.choice(os_info['archs'])
return f'Mozilla/5.0 ({os_info["name"]}; {arch}) AppleWebKit/{webkit_version} (KHTML, like Gecko) Version/15.1 Safari/{webkit_version}'
def generate_random_ua(self):
browser = random.choice(list(self.browsers.keys()))
os_type = random.choice(list(self.operating_systems.keys()))
if browser == 'chrome':
return self.generate_chrome_ua(os_type)
elif browser == 'firefox':
return self.generate_firefox_ua(os_type)
elif browser == 'safari':
return self.generate_safari_ua(os_type)
# 使用示例
generator = UserAgentGenerator()
print("=== 随机生成User-Agent ===")
for i in range(5):
ua = generator.generate_random_ua()
print(f"{i+1}. {ua}")
print("\n=== 指定浏览器和操作系统 ===")
print("Chrome on Windows:", generator.generate_chrome_ua('windows'))
print("Firefox on Mac:", generator.generate_firefox_ua('mac'))
print("Safari on Linux:", generator.generate_safari_ua('linux'))
💡 实现要点:
- 支持Chrome、Firefox、Safari三种主流浏览器
- 支持Windows、Mac、Linux三种操作系统
- 可以随机生成或指定特定组合
- 生成的User-Agent格式符合真实浏览器标准
- 可以扩展支持更多浏览器和操作系统版本
1.2 IP频率限制
网站会监控同一IP的访问频率,如果短时间内请求过于频繁,会认为是爬虫并进行限制,如返回错误页面、要求验证码或直接封禁IP。
📚 故事时间:银行排队
想象一下,你在银行排队办理业务。如果你频繁地往返于队伍前后,银行保安一定会注意到你并询问你的意图。 同样,网站也会监控异常的访问模式,对于短时间内大量请求的IP会进行限制。
import requests
import time
import random
url = 'https://www.example.com/page/{page}'
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36'
}
for page in range(1, 11):
full_url = url.format(page=page)
response = requests.get(full_url, headers=headers)
print(f"页面 {page} 状态码: {response.status_code}")
# 添加随机延迟,模拟人类浏览行为
sleep_time = random.uniform(1, 3)
print(f"等待 {sleep_time:.2f} 秒...")
time.sleep(sleep_time)
💡 练习题1:自适应延迟策略
根据网站响应状态码和响应时间,动态调整请求间隔。当遇到429状态码(请求过多)时,自动增加延迟时间。
📝 参考答案:
import requests
import time
import random
class AdaptiveDelayStrategy:
def __init__(self, base_delay=1, max_delay=10, backoff_factor=2):
self.base_delay = base_delay
self.max_delay = max_delay
self.backoff_factor = backoff_factor
self.current_delay = base_delay
self.last_response_time = None
def get_delay(self, response=None, response_time=None):
"""根据响应情况计算延迟时间"""
# 如果遇到429错误,增加延迟
if response and response.status_code == 429:
self.current_delay = min(
self.current_delay * self.backoff_factor,
self.max_delay
)
print(f"遇到429错误,延迟增加到: {self.current_delay}秒")
# 根据响应时间调整延迟
elif response_time:
if self.last_response_time:
# 如果响应时间变长,稍微增加延迟
if response_time > self.last_response_time * 1.2:
self.current_delay = min(
self.current_delay * 1.1,
self.max_delay
)
# 如果响应时间变短,稍微减少延迟
elif response_time < self.last_response_time * 0.8:
self.current_delay = max(
self.current_delay * 0.9,
self.base_delay
)
self.last_response_time = response_time
# 添加随机性,避免过于规律
final_delay = self.current_delay * random.uniform(0.8, 1.2)
return final_delay
def reset_delay(self):
"""重置延迟到基础值"""
self.current_delay = self.base_delay
# 使用示例
if __name__ == "__main__":
strategy = AdaptiveDelayStrategy(base_delay=1, max_delay=10)
# 模拟请求过程
for i in range(10):
# 模拟不同的响应情况
if i == 3:
# 模拟遇到429错误
delay = strategy.get_delay(response=type('obj', (object,), {'status_code': 429})())
elif i == 5:
# 模拟响应时间变长
delay = strategy.get_delay(response_time=2.5)
else:
# 正常情况
delay = strategy.get_delay(response_time=1.0)
print(f"请求 {i+1}: 延迟 {delay:.2f}秒")
time.sleep(delay)
💡 实现要点:
- 根据HTTP状态码(特别是429)动态调整延迟
- 基于响应时间变化自动优化请求间隔
- 支持指数退避算法,遇到限流时延迟倍增
- 添加随机性避免过于规律的访问模式
- 可配置基础延迟、最大延迟和退避因子
💡 练习题2:指数退避算法
实现一个指数退避算法,当遇到限流时,每次重试的等待时间按照指数增长,但不超过最大等待时间。
📝 参考答案:
import time
import random
class ExponentialBackoff:
def __init__(self, base_delay=1, max_delay=60, max_retries=5, jitter=True):
"""
指数退避算法实现
Args:
base_delay: 基础延迟时间(秒)
max_delay: 最大延迟时间(秒)
max_retries: 最大重试次数
jitter: 是否添加随机抖动
"""
self.base_delay = base_delay
self.max_delay = max_delay
self.max_retries = max_retries
self.jitter = jitter
self.retry_count = 0
def get_delay(self):
"""计算当前重试的延迟时间"""
if self.retry_count >= self.max_retries:
raise Exception(f"已达到最大重试次数: {self.max_retries}")
# 计算指数延迟:base_delay * 2^retry_count
delay = self.base_delay * (2 ** self.retry_count)
# 限制最大延迟
delay = min(delay, self.max_delay)
# 添加随机抖动(可选)
if self.jitter:
delay = delay * random.uniform(0.5, 1.5)
self.retry_count += 1
return delay
def wait(self):
"""等待当前计算的延迟时间"""
delay = self.get_delay()
print(f"第 {self.retry_count} 次重试,等待 {delay:.2f} 秒...")
time.sleep(delay)
def reset(self):
"""重置重试计数器"""
self.retry_count = 0
def execute_with_retry(self, func, *args, **kwargs):
"""
使用指数退避算法执行函数
Args:
func: 要执行的函数
*args, **kwargs: 函数参数
Returns:
函数执行结果
"""
self.reset()
while True:
try:
result = func(*args, **kwargs)
print("执行成功!")
return result
except Exception as e:
print(f"执行失败: {e}")
if self.retry_count >= self.max_retries:
print(f"已达到最大重试次数,放弃重试")
raise
self.wait()
# 使用示例
if __name__ == "__main__":
# 示例1:基本使用
print("=== 基本使用示例 ===")
backoff = ExponentialBackoff(base_delay=1, max_delay=30, max_retries=5)
for i in range(6):
try:
delay = backoff.get_delay()
print(f"重试 {i+1}: 延迟 {delay:.2f}秒")
except Exception as e:
print(e)
# 示例2:模拟API调用
print("\n=== 模拟API调用示例 ===")
def mock_api_call(success_on_attempt=3):
"""模拟API调用,在第3次尝试时成功"""
global attempt_count
attempt_count += 1
if attempt_count < success_on_attempt:
raise Exception("API调用失败")
return f"API调用成功(第{attempt_count}次尝试)"
attempt_count = 0
backoff = ExponentialBackoff(base_delay=1, max_delay=10, max_retries=5)
try:
result = backoff.execute_with_retry(mock_api_call, success_on_attempt=3)
print(result)
except Exception as e:
print(f"最终失败: {e}")
# 示例3:带抖动的退避算法
print("\n=== 带抖动的退避算法 ===")
backoff_jitter = ExponentialBackoff(base_delay=1, max_delay=20, max_retries=4, jitter=True)
for i in range(5):
try:
delay = backoff_jitter.get_delay()
print(f"重试 {i+1}: 延迟 {delay:.2f}秒")
except Exception as e:
print(e)
💡 实现要点:
- 延迟时间按指数增长:base_delay × 2^retry_count
- 支持最大延迟时间限制,避免无限等待
- 可选的随机抖动功能,避免多个客户端同时重试
- 最大重试次数限制,防止无限循环
- 提供便捷的execute_with_retry方法,简化使用
- 支持重置重试计数器,可重复使用
💡 练习题3:访问频率统计
创建一个类来记录和控制单位时间内的请求次数,确保不超过设定的阈值。
1.3 验证码识别
验证码是最常见的反爬虫手段之一。当网站检测到异常访问时,会要求用户输入验证码以证明是人类。常见的验证码类型包括数字字母组合、滑块验证、拼图验证等。
📚 故事时间:身份验证
就像进入某些高安全区域需要指纹或人脸识别一样,网站使用验证码来区分人类用户和自动化程序。 对于爬虫来说,需要学习如何识别这些验证码或者寻找其他绕过方法。
import pytesseract
from PIL import Image
import requests
from io import BytesIO
# 假设我们有一个验证码图片URL
captcha_url = 'https://www.example.com/captcha.jpg'
# 下载验证码图片
response = requests.get(captcha_url)
image = Image.open(BytesIO(response.content))
# 简单预处理:转灰度
image = image.convert('L')
# 使用Tesseract OCR识别
text = pytesseract.image_to_string(image)
print(f"识别到的验证码:{text.strip()}")
# 注意:实际使用前需要安装Tesseract OCR和pytesseract库
# pip install pytesseract pillow requests
💡 练习题1:验证码图片预处理
实现验证码图片的预处理功能,包括二值化、去噪、旋转校正等,提高OCR识别率。
💡 练习题2:自定义验证码生成器
创建一个可以生成类似网站验证码的程序,用于训练和测试你的OCR识别算法。
💡 练习题3:验证码识别结果验证
实现一个验证机制,当OCR识别结果不准确时(例如包含非法字符),尝试重新识别或请求新的验证码。
1.4 JavaScript渲染
现代网站大量使用JavaScript动态生成页面内容。普通的requests库只能获取到初始HTML,无法执行JavaScript,导致无法获取到完整的页面数据。
📚 故事时间:魔术表演
想象一下,你拿到了一张魔术表演的门票,但只看了门票上的文字介绍,而没有亲临现场观看表演。 这样你永远无法欣赏到魔术的精彩之处。同样,对于使用JavaScript渲染的网站,仅获取HTML是不够的,需要执行JavaScript才能看到完整内容。
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
import time
# 设置Chrome选项
chrome_options = Options()
chrome_options.add_argument('--headless') # 无头模式
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument('--window-size=1920,1080')
# 初始化WebDriver
driver = webdriver.Chrome(options=chrome_options)
# 访问网页
driver.get('https://www.example.com/dynamic-content')
# 等待JavaScript执行
time.sleep(3)
# 获取渲染后的页面内容
html_content = driver.page_source
print(html_content)
# 查找特定元素
try:
dynamic_element = driver.find_element_by_css_selector('.dynamic-content')
print(f"动态内容: {dynamic_element.text}")
except Exception as e:
print(f"未找到元素: {e}")
# 关闭浏览器
driver.quit()
💡 练习题1:等待元素加载完成
使用Selenium的显式等待(WebDriverWait)来替代time.sleep,当特定元素加载完成后再进行后续操作。
💡 练习题2:JavaScript交互
使用Selenium执行JavaScript代码来模拟用户交互,例如点击按钮、填写表单等。
💡 练习题3:使用Pyppeteer
尝试使用Pyppeteer(Python版的Puppeteer)来处理JavaScript渲染的页面,比较与Selenium的性能差异。
第2章:验证码平台
当遇到复杂的验证码时,使用OCR库可能无法达到理想的识别率。这时可以考虑使用专业的验证码识别平台(打码平台),通过API接口获取人工识别的结果。
2.1 常见验证码平台
目前市面上有很多专业的验证码识别平台,它们提供API接口,可以帮助爬虫自动识别各种类型的验证码。
| 平台名称 | 支持的验证码类型 | 价格参考 |
|---|---|---|
| 超级鹰 | 数字字母、中文、各类图形验证码 | 约0.003元/张 |
| 联众打码 | 数字字母、中文、各类图形验证码 | 约0.004元/张 |
| 若快打码 | 数字字母、中文、各类图形验证码 | 约0.005元/张 |
| 云打码 | 数字字母、中文、各类图形验证码 | 约0.004元/张 |
📚 故事时间:请人帮忙
当你遇到一道复杂的数学题不会做时,最好的方法是请教专业的老师。 同样,当爬虫遇到无法自动识别的验证码时,可以通过打码平台请人工帮助识别,虽然需要付费,但能够有效解决问题。
2.2 验证码平台API使用
使用验证码平台的一般流程是:注册账号获取API密钥 → 上传验证码图片 → 获取识别结果。下面以超级鹰平台为例,展示如何使用其API。
import requests
from hashlib import md5
import base64
class ChaojiyingClient:
def __init__(self, username, password, soft_id):
self.username = username
self.password = md5(password.encode('utf-8')).hexdigest()
self.soft_id = soft_id
self.base_params = {
'user': self.username,
'pass2': self.password,
'softid': self.soft_id,
}
self.headers = {
'Connection': 'Keep-Alive',
'User-Agent': 'Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0)',
}
def PostPic(self, im, codetype):
# 上传图片并获取识别结果的代码
# ...
pass