Python文件读写操作

学习如何使用Python进行文件的读取和写入,掌握数据持久化存储的基础技能

文件管理的故事

小明是一家网上书店的程序员,他需要开发一个图书管理系统,能够记录图书信息、读者借阅记录等。这些数据需要长期保存,以便系统重启后仍然能够使用。 他的任务是学习如何使用Python进行文件操作,实现数据的持久化存储。

📄

文件读写基础

使用open函数进行基本的文件读写操作

🔒

上下文管理器

使用with语句安全地处理文件资源

📊

CSV数据处理

使用csv库读写表格数据

1. open函数的基本使用

在Python中,使用open()函数来打开文件。该函数返回一个文件对象,我们可以通过这个对象来操作文件。

# 基本语法
file = open(文件路径, 模式)

# 示例:打开一个文本文件用于读取
file = open("books.txt", "r")

# 操作完成后,需要关闭文件
file.close()

文件打开模式

模式 描述 如果文件不存在
r 只读模式 抛出异常
w 写入模式(覆盖) 创建新文件
a 追加模式 创建新文件
r+ 读写模式 抛出异常
w+ 读写模式(覆盖) 创建新文件
a+ 读写模式(追加) 创建新文件

2. 文件读取操作

小明需要读取已有的图书信息文件。Python提供了多种读取文件的方法:

read() 方法

一次性读取整个文件的内容

# 读取整个文件
with open("books.txt", "r", encoding="utf-8") as file:
    content = file.read()
    print(content)

readline() 方法

逐行读取文件内容

# 逐行读取
with open("books.txt", "r", encoding="utf-8") as file:
    line = file.readline()
    while line:
        print(line.strip())
        line = file.readline()

readlines() 方法

读取所有行并返回一个列表

# 读取所有行到列表
with open("books.txt", "r", encoding="utf-8") as file:
    lines = file.readlines()
    
# 处理每一行
for line in lines:
    print(line.strip())

3. 文件写入操作

小明需要将新的图书信息写入到文件中。Python提供了几种写入文件的方法:

write() 方法

写入字符串到文件

# 写入文本到文件
with open("new_book.txt", "w", encoding="utf-8") as file:
    file.write("书名:Python编程入门\n")
    file.write("作者:张三\n")
    file.write("价格:45.00")

writelines() 方法

写入字符串列表到文件

# 写入多行文本
lines = [
    "书名:数据分析实战\n",
    "作者:李四\n",
    "价格:68.00"
]

with open("another_book.txt", "w", encoding="utf-8") as file:
    file.writelines(lines)

注意事项

• 使用w模式会覆盖文件的原有内容

• 如果想保留原有内容并添加新内容,使用a模式

• 写入完成后,确保调用close()方法或使用with语句

4. with语句的使用

使用with语句(上下文管理器)是处理文件的推荐方式,它会自动帮助我们关闭文件,即使在处理过程中发生异常。

传统方式 vs with语句

传统方式:

# 传统方式需要手动关闭文件
file = open("books.txt", "r")
try:
    content = file.read()
    print(content)
except Exception as e:
    print(f"发生错误:{e}")
finally:
    file.close()

with语句方式:

# 使用with语句自动管理文件资源
with open("books.txt", "r") as file:
    content = file.read()
    print(content)
# 文件在这里自动关闭

with语句的优势

代码更简洁,不需要显式调用close()

自动处理异常情况,确保文件关闭

提高代码可读性和可维护性

适用于任何支持上下文管理协议的对象

5. CSV文件处理

CSV(逗号分隔值)是一种常见的数据存储格式,特别适合表格数据。Python的csv模块提供了读取和写入CSV文件的功能。

读取CSV文件

import csv

# 读取CSV文件
with open("books.csv", "r", encoding="utf-8", newline="") as file:
    reader = csv.reader(file)
    header = next(reader)  # 读取表头
    print(f"表头:{header}")
    
    # 读取数据行
    for row in reader:
        print(f"书名:{row[0]}, 作者:{row[1]}, 价格:{row[2]}")

使用DictReader

import csv

# 使用DictReader读取CSV文件
with open("books.csv", "r", encoding="utf-8", newline="") as file:
    reader = csv.DictReader(file)
    
    for row in reader:
        print(f"书名:{row['书名']}, 作者:{row['作者']}, 价格:{row['价格']}")

写入CSV文件

import csv

# 写入CSV文件
books = [
    ["Python编程入门", "张三", "45.00"],
    ["数据分析实战", "李四", "68.00"],
    ["Web开发指南", "王五", "59.00"]
]

with open("new_books.csv", "w", encoding="utf-8", newline="") as file:
    writer = csv.writer(file)
    # 写入表头
    writer.writerow(["书名", "作者", "价格"])
    # 写入数据
    writer.writerows(books)

使用DictWriter

import csv

# 使用DictWriter写入CSV文件
books = [
    {"书名": "Python高级编程", "作者": "赵六", "价格": "88.00"},
    {"书名": "机器学习基础", "作者": "孙七", "价格": "99.00"}
]

with open("another_books.csv", "w", encoding="utf-8", newline="") as file:
    fieldnames = ["书名", "作者", "价格"]
    writer = csv.DictWriter(file, fieldnames=fieldnames)
    
    # 写入表头
    writer.writeheader()
    # 写入数据
    writer.writerows(books)

实战练习

练习题1:图书信息读取

小明收到了一个包含图书信息的文本文件library.txt,每行格式为"书名-作者-价格"。请编写一个程序读取该文件,并统计图书的数量和平均价格。

total_price = 0
book_count = 0

with open("library.txt", "r", encoding="utf-8") as file:
    for line in file:
        # 去除行尾的换行符并分割数据
        parts = line.strip().split("-")
        if len(parts) == 3:
            title, author, price = parts
            try:
                # 将价格转换为浮点数
                total_price += float(price)
                book_count += 1
            except ValueError:
                print(f"警告:'{title}'的价格格式不正确")

# 计算平均价格
if book_count > 0:
    average_price = total_price / book_count
    print(f"图书总数:{book_count}")
    print(f"平均价格:{average_price:.2f}")
else:
    print("没有找到有效的图书信息")

练习题2:图书分类统计

基于上一题,假设图书分类信息也包含在文件中(格式:"书名-作者-价格-分类"),请编写一个程序统计每个分类的图书数量和平均价格,并将统计结果写入到category_stats.txt文件中。

from collections import defaultdict

# 用于存储每个分类的图书信息
category_data = defaultdict(lambda: {'count': 0, 'total_price': 0})

# 读取文件并统计
try:
    with open("library_extended.txt", "r", encoding="utf-8") as file:
        for line in file:
            parts = line.strip().split("-")
            if len(parts) == 4:
                title, author, price, category = parts
                try:
                    price_float = float(price)
                    category_data[category]['count'] += 1
                    category_data[category]['total_price'] += price_float
                except ValueError:
                    print(f"警告:'{title}'的价格格式不正确")

    # 将统计结果写入文件
    with open("category_stats.txt", "w", encoding="utf-8") as output_file:
        output_file.write("分类,图书数量,平均价格\n")
        
        for category, data in category_data.items():
            count = data['count']
            avg_price = data['total_price'] / count if count > 0 else 0
            output_file.write(f"{category},{count},{avg_price:.2f}\n")
            
    print("统计结果已写入category_stats.txt文件")
except FileNotFoundError:
    print("错误:找不到library_extended.txt文件")

练习题3:CSV图书信息转换

小明需要将一个CSV格式的图书信息文件books_data.csv转换为JSON格式。请编写一个程序完成这个转换任务。

扩展:添加错误处理,确保程序在遇到格式不正确的行时不会崩溃。

import csv
import json

try:
    # 读取CSV文件
    books = []
    with open("books_data.csv", "r", encoding="utf-8", newline="") as csv_file:
        reader = csv.DictReader(csv_file)
        row_count = 0
        
        for row in reader:
            try:
                # 处理每一行数据
                book = {
                    "title": row.get("书名", ""),
                    "author": row.get("作者", ""),
                    "price": float(row.get("价格", "0")) if row.get("价格") else 0,
                    "year": int(row.get("出版年份", "0")) if row.get("出版年份") else 0
                }
                books.append(book)
                row_count += 1
            except ValueError as e:
                print(f"警告:第{row_count+1}行数据格式不正确 - {e}")
                row_count += 1

    # 写入JSON文件
    with open("books_data.json", "w", encoding="utf-8") as json_file:
        json.dump(books, json_file, ensure_ascii=False, indent=4)
        
    print(f"成功将{len(books)}条图书信息从CSV转换为JSON格式")
except FileNotFoundError:
    print("错误:找不到books_data.csv文件")
except Exception as e:
    print(f"发生错误:{e}")

练习题4:会员借阅记录管理

小明需要开发一个简单的会员借阅记录管理系统。系统要求能够记录会员借阅和归还图书的信息,并将这些信息保存到CSV文件中。请编写一个程序实现以下功能:

  • 记录会员借阅图书信息(会员ID、图书ID、借阅日期)
  • 记录会员归还图书信息(添加归还日期)
  • 查询特定会员的借阅记录
import csv
import os
from datetime import datetime

def add_borrow_record(member_id, book_id):
    """添加借阅记录"""
    borrow_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    new_record = [member_id, book_id, borrow_date, ""]
    
    # 检查文件是否存在
    file_exists = os.path.exists("borrow_records.csv")
    
    with open("borrow_records.csv", "a", encoding="utf-8", newline="") as file:
        writer = csv.writer(file)
        # 如果文件不存在,写入表头
        if not file_exists:
            writer.writerow(["会员ID", "图书ID", "借阅日期", "归还日期"])
        writer.writerow(new_record)
    
    print(f"已添加借阅记录:会员{member_id}借阅图书{book_id}")

def return_book(member_id, book_id):
    """记录图书归还"""
    records = []
    updated = False
    return_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    
    try:
        with open("borrow_records.csv", "r", encoding="utf-8", newline="") as file:
            reader = csv.reader(file)
            header = next(reader)
            records.append(header)
            
            for row in reader:
                if row[0] == member_id and row[1] == book_id and row[3] == "":
                    row[3] = return_date
                    updated = True
                records.append(row)
        
        if updated:
            with open("borrow_records.csv", "w", encoding="utf-8", newline="") as file:
                writer = csv.writer(file)
                writer.writerows(records)
            print(f"已记录归还:会员{member_id}归还图书{book_id}")
        else:
            print(f"未找到会员{member_id}借阅图书{book_id}的未归还记录")
    except FileNotFoundError:
        print("错误:借阅记录文件不存在")

def query_member_records(member_id):
    """查询会员的借阅记录"""
    found = False
    try:
        with open("borrow_records.csv", "r", encoding="utf-8", newline="") as file:
            reader = csv.DictReader(file)
            print(f"会员{member_id}的借阅记录:")
            print("="*60)
            print(f"{'图书ID':<10}{'借阅日期':<20}{'归还日期':<20}")
            print("="*60)
            
            for row in reader:
                if row["会员ID"] == member_id:
                    return_date = row["归还日期"] if row["归还日期"] else "未归还"
                    print(f"{row['图书ID']:<10}{row['借阅日期']:<20}{return_date:<20}")
                    found = True
            
            if not found:
                print("没有找到该会员的借阅记录")
    except FileNotFoundError:
        print("错误:借阅记录文件不存在")

# 测试功能
if __name__ == "__main__":
    # 添加借阅记录
    add_borrow_record("M001", "B001")
    add_borrow_record("M001", "B002")
    add_borrow_record("M002", "B003")
    
    # 查询会员记录
    query_member_records("M001")
    
    # 归还图书
    return_book("M001", "B001")
    
    # 再次查询会员记录
    query_member_records("M001")

练习题5:文件内容搜索和替换

小明发现图书信息文件中有一些错误需要批量修改。请编写一个程序,能够在指定文件中搜索特定文本并替换为新的文本,同时创建一个备份文件以防万一。

import shutil
import os
def search_and_replace(file_path, search_text, replace_text):
    """在文件中搜索并替换文本"""
    try:
        # 检查文件是否存在
        if not os.path.exists(file_path):
            print(f"错误:文件{file_path}不存在")
            return False
        
        # 创建备份文件
        backup_path = f"{file_path}.bak"
        shutil.copy2(file_path, backup_path)
        print(f"已创建备份文件:{backup_path}")
        
        # 读取原文件内容
        with open(file_path, "r", encoding="utf-8") as file:
            content = file.read()
        
        # 统计替换次数
        original_count = content.count(search_text)
        
        # 进行替换
        new_content = content.replace(search_text, replace_text)
        
        # 写入新内容
        with open(file_path, "w", encoding="utf-8") as file:
            file.write(new_content)
        
        # 输出结果
        replaced_count = original_count - new_content.count(search_text)
        print(f"替换完成:共找到{original_count}处匹配,成功替换{replaced_count}处")
        return True
        
except Exception as e:
        print(f"发生错误:{e}")
        # 如果出现错误,尝试恢复文件
        try:
            if os.path.exists(backup_path):
                shutil.copy2(backup_path, file_path)
                print("已恢复原文件")
        except:
            pass
        return False

# 测试函数
if __name__ == "__main__":
    # 示例:将文件中的"Python"替换为"Python3"
    success = search_and_replace("book_descriptions.txt", "Python", "Python3")
    if success:
        print("搜索替换操作成功完成")
    else:
        print("搜索替换操作失败")