函数高级特性

深入理解Python函数的高级用法,掌握高阶函数、装饰器、闭包等强大特性

高阶函数

装饰器

闭包

1. 高阶函数详解

高阶函数是指接受函数作为参数或返回函数的函数。这是Python函数式编程的核心概念。

小贴士:高阶函数让代码更加灵活和可复用,是Python强大功能的体现!

1.1 函数作为参数

# 定义一个简单的操作函数
def add(x, y):
    return x + y

def multiply(x, y):
    return x * y

# 高阶函数:接受函数作为参数
def calculate(operation, x, y):
    """高阶函数示例"""
    return operation(x, y)

# 使用高阶函数
result1 = calculate(add, 5, 3)      # 8
result2 = calculate(multiply, 5, 3)  # 15
print(f"5 + 3 = {result1}")
print(f"5 × 3 = {result2}")

1.2 函数作为返回值

# 创建特定功能的函数
def create_power_function(exponent):
    """返回一个幂函数"""
    def power(base):
        return base ** exponent
    return power

# 使用返回的函数
square = create_power_function(2)   # 平方函数
cube = create_power_function(3)     # 立方函数

print(square(4))  # 16
print(cube(3))    # 27

1.3 Lambda表达式

# 传统函数定义
def square(x):
    return x ** 2

# lambda表达式
square_lambda = lambda x: x ** 2

# 在排序中使用lambda
students = [
    {'name': '张三', 'score': 85},
    {'name': '李四', 'score': 92},
    {'name': '王五', 'score': 78}
]

# 按分数排序
sorted_students = sorted(students, key=lambda x: x['score'], reverse=True)
print(sorted_students)

1.4 内置高阶函数

map()
# 将函数应用于每个元素
numbers = [1, 2, 3, 4, 5]
squared = list(map(lambda x: x**2, numbers))
print(squared)  # [1, 4, 9, 16, 25]
filter()
# 过滤满足条件的元素
numbers = [1, 2, 3, 4, 5, 6]
evens = list(filter(lambda x: x % 2 == 0, numbers))
print(evens)  # [2, 4, 6]
reduce()
# 累积计算
from functools import reduce
numbers = [1, 2, 3, 4, 5]
total = reduce(lambda x, y: x + y, numbers)
print(total)  # 15

2. 装饰器基础

装饰器是一种特殊的高阶函数,可以在不修改原函数代码的情况下增加功能。

注意事项:装饰器是Python的高级特性,初学者可能需要多练习才能掌握!

2.1 简单装饰器

# 定义装饰器
def my_decorator(func):
    def wrapper():
        print("函数执行前的操作")
        func()
        print("函数执行后的操作")
    return wrapper

# 使用装饰器
@my_decorator
def say_hello():
    print("你好!")

say_hello()
# 输出:
# 函数执行前的操作
# 你好!
# 函数执行后的操作

2.2 带参数的装饰器

import time

def timing_decorator(func):
    """计时装饰器"""
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        print(f"{func.__name__} 执行时间: {end_time - start_time:.4f}秒")
        return result
    return wrapper

@timing_decorator
def slow_function(n):
    """模拟耗时操作"""
    total = 0
    for i in range(n):
        total += i
    return total

result = slow_function(100000)
print(f"结果: {result}")

2.3 实际应用:日志装饰器

def log_function_call(func):
    """记录函数调用的装饰器"""
    def wrapper(*args, **kwargs):
        print(f"调用函数: {func.__name__}")
        print(f"参数: args={args}, kwargs={kwargs}")
        result = func(*args, **kwargs)
        print(f"返回结果: {result}")
        return result
    return wrapper

@log_function_call
def calculate_area(length, width):
    return length * width

calculate_area(5, 3)

3. 闭包概念

闭包是一个函数对象,它记住了定义时的环境,即使在外部作用域已经结束时仍然可以访问这些变量。

3.1 闭包基础

def outer_function(x):
    """外部函数"""
    def inner_function(y):
        """内部函数(闭包)"""
        return x + y
    return inner_function

# 创建闭包
add_5 = outer_function(5)
add_10 = outer_function(10)

print(add_5(3))   # 8
print(add_10(3))  # 13

3.2 闭包的实际应用

def counter():
    """创建计数器闭包"""
    count = 0
    
    def increment():
        nonlocal count  # 声明使用外部变量
        count += 1
        return count
    
    return increment

# 使用计数器
counter1 = counter()
print(counter1())  # 1
print(counter1())  # 2
print(counter1())  # 3

# 另一个独立的计数器
counter2 = counter()
print(counter2())  # 1

3.3 工厂函数模式

def create_multiplier(factor):
    """创建乘法器工厂"""
    def multiplier(number):
        return number * factor
    return multiplier

# 创建不同的乘法器
double = create_multiplier(2)
triple = create_multiplier(3)
quadruple = create_multiplier(4)

print(double(5))    # 10
print(triple(5))    # 15
print(quadruple(5)) # 20

4. 高级参数处理

4.1 *args和**kwargs详解

def flexible_function(*args, **kwargs):
    """接受任意数量的位置参数和关键字参数"""
    print("位置参数:", args)
    print("关键字参数:", kwargs)
    
    # 处理参数
    for arg in args:
        print(f"处理位置参数: {arg}")
    
    for key, value in kwargs.items():
        print(f"处理关键字参数: {key} = {value}")

# 使用示例
flexible_function(1, 2, 3, name="小明", age=25, city="北京")

4.2 函数注解

def process_data(data: list, multiplier: float = 1.0) -> list:
    """处理数据的函数,带有类型注解"""
    return [x * multiplier for x in data]

# 使用示例
numbers = [1, 2, 3, 4, 5]
result = process_data(numbers, 2.5)
print(result)  # [2.5, 5.0, 7.5, 10.0, 12.5]

4.3 生成器函数

def fibonacci_generator(n):
    """生成斐波那契数列的生成器"""
    a, b = 0, 1
    count = 0
    while count < n:
        yield a
        a, b = b, a + b
        count += 1

# 使用生成器
fib = fibonacci_generator(10)
print("前10个斐波那契数:")
for num in fib:
    print(num, end=" ")
# 输出: 0 1 1 2 3 5 8 13 21 34

故事化案例:智能数据分析系统

场景:小张是一名数据分析师,需要处理大量的销售数据。他想要创建一个灵活的数据处理系统,能够根据不同的需求快速处理和分析数据。让我们用函数的高级特性来实现这个系统!

1. 数据处理管道

class DataProcessor:
    """智能数据处理器"""
    
    def __init__(self):
        self.processing_steps = []
    
    def add_step(self, step_function):
        """添加处理步骤(高阶函数应用)"""
        self.processing_steps.append(step_function)
        return self
    
    def process(self, data):
        """执行所有处理步骤"""
        result = data
        for step in self.processing_steps:
            result = step(result)
        return result

# 创建处理步骤
def clean_data(data):
    """清理数据"""
    return [x for x in data if x is not None and x != ""]

def normalize_data(data):
    """标准化数据"""
    if not data:
        return []
    max_val = max(data)
    return [x / max_val for x in data]

def add_bonus(data, bonus=0.1):
    """添加奖励(闭包应用)"""
    return [x * (1 + bonus) for x in data]

# 使用数据处理管道
processor = DataProcessor()
processor.add_step(clean_data)
processor.add_step(normalize_data)
processor.add_step(lambda x: add_bonus(x, 0.15))

sales_data = [100, 200, None, 150, 300, ""]
processed_data = processor.process(sales_data)
print("处理后的数据:", processed_data)

2. 性能监控装饰器

import time
import functools

def performance_monitor(func_name=None):
    """性能监控装饰器工厂"""
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            start_time = time.time()
            memory_before = len(str(args))  # 简单内存估算
            
            result = func(*args, **kwargs)
            
            end_time = time.time()
            memory_after = len(str(result))
            
            execution_time = end_time - start_time
            memory_used = memory_after - memory_before
            
            print(f"🚀 {func_name or func.__name__} 性能报告:")
            print(f"   执行时间: {execution_time:.4f}秒")
            print(f"   内存变化: {memory_used}字符")
            
            return result
        return wrapper
    return decorator

@performance_monitor("数据分析")
def analyze_sales(sales_data):
    """分析销售数据"""
    total = sum(sales_data)
    average = total / len(sales_data) if sales_data else 0
    max_sale = max(sales_data) if sales_data else 0
    
    return {
        'total': total,
        'average': average,
        'max': max_sale,
        'count': len(sales_data)
    }

# 使用示例
sales = [1000, 1500, 800, 2000, 1200, 1800]
result = analyze_sales(sales)
print("分析结果:", result)

3. 缓存系统

def memoize(func):
    """简单的缓存装饰器"""
    cache = {}
    
    def wrapper(*args):
        if args in cache:
            print(f"从缓存获取: {func.__name__}{args}")
            return cache[args]
        
        result = func(*args)
        cache[args] = result
        print(f"计算并缓存: {func.__name__}{args}")
        return result
    
    return wrapper

@memoize
def expensive_calculation(n):
    """模拟耗时计算"""
    print(f"正在计算 {n}...")
    return n * n + n * 2 + 1

# 使用缓存系统
print(expensive_calculation(5))  # 计算
print(expensive_calculation(5))  # 从缓存获取
print(expensive_calculation(10)) # 计算
print(expensive_calculation(5))  # 从缓存获取

高级练习题(共5个)

练习题1:函数组合器

创建一个函数组合器,可以将多个函数组合成一个管道,前一个函数的输出作为下一个函数的输入。

要求:支持任意数量的函数组合,如compose(f, g, h)(x) = h(g(f(x)))

# 参考答案
def compose(*functions):
    """函数组合器"""
    def composed_function(x):
        result = x
        for func in reversed(functions):
            result = func(result)
        return result
    return composed_function

# 使用示例
double = lambda x: x * 2
add_one = lambda x: x + 1
square = lambda x: x ** 2

composed = compose(square, add_one, double)
print(composed(3))  # ((3 * 2) + 1)² = 49

练习题2:重试装饰器

创建一个装饰器,当函数执行失败时自动重试指定次数。

# 参考答案
def retry(max_attempts=3):
    def decorator(func):
        def wrapper(*args, **kwargs):
            for attempt in range(max_attempts):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    print(f"尝试 {attempt + 1} 失败: {e}")
                    if attempt == max_attempts - 1:
                        raise
        return wrapper
    return decorator

@retry(max_attempts=3)
def risky_function():
    import random
    if random.random() < 0.8:
        raise ValueError("随机错误")
    return "成功!"

# 测试
print(risky_function())

练习题3:验证装饰器

创建一个参数验证装饰器,在函数执行前验证参数类型和范围。

# 参考答案
def validate_types(**expected_types):
    def decorator(func):
        def wrapper(*args, **kwargs):
            # 验证位置参数
            for i, (arg, expected_type) in enumerate(zip(args, expected_types.values())):
                if not isinstance(arg, expected_type):
                    raise TypeError(f"参数 {i} 应该是 {expected_type}, 但得到 {type(arg)}")
            
            return func(*args, **kwargs)
        return wrapper
    return decorator

@validate_types(x=int, y=int)
def add_numbers(x, y):
    return x + y

# 测试
print(add_numbers(5, 3))  # 正常
# print(add_numbers("5", 3))  # 会抛出TypeError

练习题4:记忆化斐波那契

使用闭包实现带记忆化的斐波那契数列计算。

# 参考答案
def fibonacci_memoized():
    cache = {0: 0, 1: 1}
    
    def fib(n):
        if n in cache:
            return cache[n]
        
        result = fib(n-1) + fib(n-2)
        cache[n] = result
        return result
    
    return fib

# 使用
fib = fibonacci_memoized()
print(fib(10))  # 55
print(fib(50))  # 非常快!

练习题5:事件系统

创建一个简单的事件系统,支持事件注册和触发。

# 参考答案
class EventSystem:
    def __init__(self):
        self.listeners = {}
    
    def on(self, event_name, callback):
        """注册事件监听器"""
        if event_name not in self.listeners:
            self.listeners[event_name] = []
        self.listeners[event_name].append(callback)
    
    def emit(self, event_name, *args, **kwargs):
        """触发事件"""
        if event_name in self.listeners:
            for callback in self.listeners[event_name]:
                callback(*args, **kwargs)

# 使用
events = EventSystem()

@events.on("user_login")
def greet_user(username):
    print(f"欢迎回来,{username}!")

@events.on("user_login")
def log_login(username):
    print(f"用户 {username} 于 {time.strftime('%H:%M:%S')} 登录")

events.emit("user_login", "张三")