文件管理的故事
小明是一家网上书店的程序员,他需要开发一个图书管理系统,能够记录图书信息、读者借阅记录等。这些数据需要长期保存,以便系统重启后仍然能够使用。 他的任务是学习如何使用Python进行文件操作,实现数据的持久化存储。
文件读写基础
使用open函数进行基本的文件读写操作
上下文管理器
使用with语句安全地处理文件资源
CSV数据处理
使用csv库读写表格数据
1. open函数的基本使用
在Python中,使用open()函数来打开文件。该函数返回一个文件对象,我们可以通过这个对象来操作文件。
# 基本语法
file = open(文件路径, 模式)
# 示例:打开一个文本文件用于读取
file = open("books.txt", "r")
# 操作完成后,需要关闭文件
file.close()
文件打开模式
| 模式 | 描述 | 如果文件不存在 |
|---|---|---|
r |
只读模式 | 抛出异常 |
w |
写入模式(覆盖) | 创建新文件 |
a |
追加模式 | 创建新文件 |
r+ |
读写模式 | 抛出异常 |
w+ |
读写模式(覆盖) | 创建新文件 |
a+ |
读写模式(追加) | 创建新文件 |
2. 文件读取操作
小明需要读取已有的图书信息文件。Python提供了多种读取文件的方法:
read() 方法
一次性读取整个文件的内容
# 读取整个文件
with open("books.txt", "r", encoding="utf-8") as file:
content = file.read()
print(content)
readline() 方法
逐行读取文件内容
# 逐行读取
with open("books.txt", "r", encoding="utf-8") as file:
line = file.readline()
while line:
print(line.strip())
line = file.readline()
readlines() 方法
读取所有行并返回一个列表
# 读取所有行到列表
with open("books.txt", "r", encoding="utf-8") as file:
lines = file.readlines()
# 处理每一行
for line in lines:
print(line.strip())
3. 文件写入操作
小明需要将新的图书信息写入到文件中。Python提供了几种写入文件的方法:
write() 方法
写入字符串到文件
# 写入文本到文件
with open("new_book.txt", "w", encoding="utf-8") as file:
file.write("书名:Python编程入门\n")
file.write("作者:张三\n")
file.write("价格:45.00")
writelines() 方法
写入字符串列表到文件
# 写入多行文本
lines = [
"书名:数据分析实战\n",
"作者:李四\n",
"价格:68.00"
]
with open("another_book.txt", "w", encoding="utf-8") as file:
file.writelines(lines)
注意事项
• 使用w模式会覆盖文件的原有内容
• 如果想保留原有内容并添加新内容,使用a模式
• 写入完成后,确保调用close()方法或使用with语句
4. with语句的使用
使用with语句(上下文管理器)是处理文件的推荐方式,它会自动帮助我们关闭文件,即使在处理过程中发生异常。
传统方式 vs with语句
传统方式:
# 传统方式需要手动关闭文件
file = open("books.txt", "r")
try:
content = file.read()
print(content)
except Exception as e:
print(f"发生错误:{e}")
finally:
file.close()
with语句方式:
# 使用with语句自动管理文件资源
with open("books.txt", "r") as file:
content = file.read()
print(content)
# 文件在这里自动关闭
with语句的优势
代码更简洁,不需要显式调用close()
自动处理异常情况,确保文件关闭
提高代码可读性和可维护性
适用于任何支持上下文管理协议的对象
5. CSV文件处理
CSV(逗号分隔值)是一种常见的数据存储格式,特别适合表格数据。Python的csv模块提供了读取和写入CSV文件的功能。
读取CSV文件
import csv
# 读取CSV文件
with open("books.csv", "r", encoding="utf-8", newline="") as file:
reader = csv.reader(file)
header = next(reader) # 读取表头
print(f"表头:{header}")
# 读取数据行
for row in reader:
print(f"书名:{row[0]}, 作者:{row[1]}, 价格:{row[2]}")
使用DictReader
import csv
# 使用DictReader读取CSV文件
with open("books.csv", "r", encoding="utf-8", newline="") as file:
reader = csv.DictReader(file)
for row in reader:
print(f"书名:{row['书名']}, 作者:{row['作者']}, 价格:{row['价格']}")
写入CSV文件
import csv
# 写入CSV文件
books = [
["Python编程入门", "张三", "45.00"],
["数据分析实战", "李四", "68.00"],
["Web开发指南", "王五", "59.00"]
]
with open("new_books.csv", "w", encoding="utf-8", newline="") as file:
writer = csv.writer(file)
# 写入表头
writer.writerow(["书名", "作者", "价格"])
# 写入数据
writer.writerows(books)
使用DictWriter
import csv
# 使用DictWriter写入CSV文件
books = [
{"书名": "Python高级编程", "作者": "赵六", "价格": "88.00"},
{"书名": "机器学习基础", "作者": "孙七", "价格": "99.00"}
]
with open("another_books.csv", "w", encoding="utf-8", newline="") as file:
fieldnames = ["书名", "作者", "价格"]
writer = csv.DictWriter(file, fieldnames=fieldnames)
# 写入表头
writer.writeheader()
# 写入数据
writer.writerows(books)
实战练习
练习题1:图书信息读取
小明收到了一个包含图书信息的文本文件library.txt,每行格式为"书名-作者-价格"。请编写一个程序读取该文件,并统计图书的数量和平均价格。
total_price = 0
book_count = 0
with open("library.txt", "r", encoding="utf-8") as file:
for line in file:
# 去除行尾的换行符并分割数据
parts = line.strip().split("-")
if len(parts) == 3:
title, author, price = parts
try:
# 将价格转换为浮点数
total_price += float(price)
book_count += 1
except ValueError:
print(f"警告:'{title}'的价格格式不正确")
# 计算平均价格
if book_count > 0:
average_price = total_price / book_count
print(f"图书总数:{book_count}")
print(f"平均价格:{average_price:.2f}")
else:
print("没有找到有效的图书信息")
练习题2:图书分类统计
基于上一题,假设图书分类信息也包含在文件中(格式:"书名-作者-价格-分类"),请编写一个程序统计每个分类的图书数量和平均价格,并将统计结果写入到category_stats.txt文件中。
from collections import defaultdict
# 用于存储每个分类的图书信息
category_data = defaultdict(lambda: {'count': 0, 'total_price': 0})
# 读取文件并统计
try:
with open("library_extended.txt", "r", encoding="utf-8") as file:
for line in file:
parts = line.strip().split("-")
if len(parts) == 4:
title, author, price, category = parts
try:
price_float = float(price)
category_data[category]['count'] += 1
category_data[category]['total_price'] += price_float
except ValueError:
print(f"警告:'{title}'的价格格式不正确")
# 将统计结果写入文件
with open("category_stats.txt", "w", encoding="utf-8") as output_file:
output_file.write("分类,图书数量,平均价格\n")
for category, data in category_data.items():
count = data['count']
avg_price = data['total_price'] / count if count > 0 else 0
output_file.write(f"{category},{count},{avg_price:.2f}\n")
print("统计结果已写入category_stats.txt文件")
except FileNotFoundError:
print("错误:找不到library_extended.txt文件")
练习题3:CSV图书信息转换
小明需要将一个CSV格式的图书信息文件books_data.csv转换为JSON格式。请编写一个程序完成这个转换任务。
扩展:添加错误处理,确保程序在遇到格式不正确的行时不会崩溃。
import csv
import json
try:
# 读取CSV文件
books = []
with open("books_data.csv", "r", encoding="utf-8", newline="") as csv_file:
reader = csv.DictReader(csv_file)
row_count = 0
for row in reader:
try:
# 处理每一行数据
book = {
"title": row.get("书名", ""),
"author": row.get("作者", ""),
"price": float(row.get("价格", "0")) if row.get("价格") else 0,
"year": int(row.get("出版年份", "0")) if row.get("出版年份") else 0
}
books.append(book)
row_count += 1
except ValueError as e:
print(f"警告:第{row_count+1}行数据格式不正确 - {e}")
row_count += 1
# 写入JSON文件
with open("books_data.json", "w", encoding="utf-8") as json_file:
json.dump(books, json_file, ensure_ascii=False, indent=4)
print(f"成功将{len(books)}条图书信息从CSV转换为JSON格式")
except FileNotFoundError:
print("错误:找不到books_data.csv文件")
except Exception as e:
print(f"发生错误:{e}")
练习题4:会员借阅记录管理
小明需要开发一个简单的会员借阅记录管理系统。系统要求能够记录会员借阅和归还图书的信息,并将这些信息保存到CSV文件中。请编写一个程序实现以下功能:
- 记录会员借阅图书信息(会员ID、图书ID、借阅日期)
- 记录会员归还图书信息(添加归还日期)
- 查询特定会员的借阅记录
import csv
import os
from datetime import datetime
def add_borrow_record(member_id, book_id):
"""添加借阅记录"""
borrow_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
new_record = [member_id, book_id, borrow_date, ""]
# 检查文件是否存在
file_exists = os.path.exists("borrow_records.csv")
with open("borrow_records.csv", "a", encoding="utf-8", newline="") as file:
writer = csv.writer(file)
# 如果文件不存在,写入表头
if not file_exists:
writer.writerow(["会员ID", "图书ID", "借阅日期", "归还日期"])
writer.writerow(new_record)
print(f"已添加借阅记录:会员{member_id}借阅图书{book_id}")
def return_book(member_id, book_id):
"""记录图书归还"""
records = []
updated = False
return_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
try:
with open("borrow_records.csv", "r", encoding="utf-8", newline="") as file:
reader = csv.reader(file)
header = next(reader)
records.append(header)
for row in reader:
if row[0] == member_id and row[1] == book_id and row[3] == "":
row[3] = return_date
updated = True
records.append(row)
if updated:
with open("borrow_records.csv", "w", encoding="utf-8", newline="") as file:
writer = csv.writer(file)
writer.writerows(records)
print(f"已记录归还:会员{member_id}归还图书{book_id}")
else:
print(f"未找到会员{member_id}借阅图书{book_id}的未归还记录")
except FileNotFoundError:
print("错误:借阅记录文件不存在")
def query_member_records(member_id):
"""查询会员的借阅记录"""
found = False
try:
with open("borrow_records.csv", "r", encoding="utf-8", newline="") as file:
reader = csv.DictReader(file)
print(f"会员{member_id}的借阅记录:")
print("="*60)
print(f"{'图书ID':<10}{'借阅日期':<20}{'归还日期':<20}")
print("="*60)
for row in reader:
if row["会员ID"] == member_id:
return_date = row["归还日期"] if row["归还日期"] else "未归还"
print(f"{row['图书ID']:<10}{row['借阅日期']:<20}{return_date:<20}")
found = True
if not found:
print("没有找到该会员的借阅记录")
except FileNotFoundError:
print("错误:借阅记录文件不存在")
# 测试功能
if __name__ == "__main__":
# 添加借阅记录
add_borrow_record("M001", "B001")
add_borrow_record("M001", "B002")
add_borrow_record("M002", "B003")
# 查询会员记录
query_member_records("M001")
# 归还图书
return_book("M001", "B001")
# 再次查询会员记录
query_member_records("M001")
练习题5:文件内容搜索和替换
小明发现图书信息文件中有一些错误需要批量修改。请编写一个程序,能够在指定文件中搜索特定文本并替换为新的文本,同时创建一个备份文件以防万一。
import shutil
import os
def search_and_replace(file_path, search_text, replace_text):
"""在文件中搜索并替换文本"""
try:
# 检查文件是否存在
if not os.path.exists(file_path):
print(f"错误:文件{file_path}不存在")
return False
# 创建备份文件
backup_path = f"{file_path}.bak"
shutil.copy2(file_path, backup_path)
print(f"已创建备份文件:{backup_path}")
# 读取原文件内容
with open(file_path, "r", encoding="utf-8") as file:
content = file.read()
# 统计替换次数
original_count = content.count(search_text)
# 进行替换
new_content = content.replace(search_text, replace_text)
# 写入新内容
with open(file_path, "w", encoding="utf-8") as file:
file.write(new_content)
# 输出结果
replaced_count = original_count - new_content.count(search_text)
print(f"替换完成:共找到{original_count}处匹配,成功替换{replaced_count}处")
return True
except Exception as e:
print(f"发生错误:{e}")
# 如果出现错误,尝试恢复文件
try:
if os.path.exists(backup_path):
shutil.copy2(backup_path, file_path)
print("已恢复原文件")
except:
pass
return False
# 测试函数
if __name__ == "__main__":
# 示例:将文件中的"Python"替换为"Python3"
success = search_and_replace("book_descriptions.txt", "Python", "Python3")
if success:
print("搜索替换操作成功完成")
else:
print("搜索替换操作失败")