Skip to content

文件与IO

基本文件操作

在 Python 中,内置了文件(File)对象,在使用文件对象时,首先需要通过内置的 open() 方法创建一个文件对象,然后通过该对象提供的方法进行一些基本文件操作。例如,可以使用文件对象的 write() 方法向文件中写入内容,以及使用 close() 方法关闭文件等。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
open(
    file,
    mode='r',
    buffering=-1,
    encoding=None,
    errors=None,
    newline=None,
    closefd=True,
    opener=None,
)

mode 参数的常用参数值及说明

说明 注意
r 以只读模式打开文件。文件的指针将会放在文件的开头 文件必须存在
rb 以二进制格式打开文件,并且采用只读模式。文件的指针将会放在文件的开头。一般用于非文本文件,如图片、声音等
w / wb 以只写模式打开文件 文件存在,则将其覆盖,否则创建新文件
w+ / wb+ 打开文件后,先清空原有内容,使其变为一个空的文件,对这个空文件有读写权限
a / ab 以追加模式打开一个文件。如果该文件已经存在,文件指针将放在文件的末尾,否则,创建新文件用于写入
a+ / ab+ 以读写模式打开文件。如果该文件已经存在,文件指针将放在文件的末尾,否则,创建新文件用于读写

(1) 以二进制形式打开文件

使用 open() 函数不仅可以以文本的形式打开文本文件,而且可以以二进制形式打开非文本文件,如图片文件、音频文件、视频文件。

1
2
3
4
5
file = open("picture.png", "rb")
print(file)
"""
<_io.BufferedReader name='picture.png'>
"""

创建的是一个 BufferedReader 对象。对于该对象生成后,可以再应用其他的第三方模块进行处理。例如,上面的 BufferedReader 对象是通过打开图文件实现的。那么就可以将其传入到第三方的图像处理库 PIL 的 Image 模块的 open() 方法中,以便于对图片进行处理(如调整大小等)

(2) 打开文件时指定编码方式

1
file = open("nocilantro.txt", "r", encoding="utf-8")

(3) 关闭文件

1
file.close()

close() 方法先刷新缓冲区中还没有写入的信息,然后再关闭文件,这样可以将没有写入文件的内容写入到文件中。在关闭文件后,便不能再进行写入操作了

打开文件时使用 with 语句

打开文件后,要及时将其关闭。如果忘记关闭可能会带来意想不到的问题。另外,如果在打开文件时抛出了异常,那么将导致文件不能被及时关闭。为了更好地避免此类问题的发生,可以使用 with 语句。

1
2
with expression as target:
    with-body

expression: 用于指定一个表达式,这里可以是打开文件的 open() 函数
target: 用于指定一个变量,并且将 expression 的结果保存到该变量中

1
2
with open("message.txt", "w") as file:
    pass

写入文件内容

1
file.write(string)

在调用 write() 方法向文件中写入内容的前提是,打开文件时,指定的打开模式为 w 或者 a

读取文件

(1) 读取指定字符

文件对象提供了 read() 方法读取指定个数的字符

1
file.read([size])

其中,file 为打开的文件对象;size 为可选参数,用于指定要读取的字符个数,如果省略则一次性读取所有内容

1
2
3
with open("message.txt", "r") as file:
    string = file.read(9)
    print(string)

使用 read(size) 方法读取文件时,是从文件的开头读取的。如果想要读取部分内容,可以先使用文件对象的 seek() 方法将文件的指针移动到新的位置,然后再应用 read(size) 方法读取。

1
file.seek(offset[, whence])

offset: 用于指定移动的字符个数,其具体位置与 whence 有关

whence: 用于指定从什么位置开始计算。值为 0 表示从文件头开始计算,值为 1 表示从当前位置开始计算,值为 2 表示从文件尾开始计算,默认为 0

对于 whence 参数,如果在打开文件时,没有使用 b 模式(即 rb),那么只允许从文件头开始计算相对位置

在使用 seek() 方法时,offset 的值是按一个汉字占两个字符、英文和数字各占一个字符计算的,这与 read(size) 方法不同

(2) 读取一行

1
file.readline()

(3) 读取全部行

1
file.readlines()

返回的是一个字符串列表,每个元素为文件的一行内容

(4) 读取全部内容:

1
2
3
with open("./test.py", "r") as f:
    content = f.read()
    print(content)

目录操作

os 模块是 Python 内置的与操作系统功能和文件系统相关的模块,该模块中的语句的执行结果通常与操作系统有关,在不同的操作系统,可能会得到不一样的结果

name: 用于获取操作系统类型

1
2
3
4
5
import os
print(os.name)
"""
posix
"""

如果 os.name 的输出结果为 nt,则表示是 Windows 操作系统;如果是 posix,则表示是 Linux、Unix 或 Mac OS 操作系统

linesep: 用于获取当前操作系统上的换行符

1
2
In [7]: os.linesep
Out[7]: '\n'

sep: 用于获取当前操作系统所使用的路径分隔符

  • os.cpu_count()
  • os.system(command)

os 模块操作目录的函数

函数 说明
getcwd() 返回当前的工作目录
listdir(path) 返回指定路径下的文件和目录信息
mkdir(path[, mode]) 创建目录
mkdirs(path1/paht2....[, mode]) 创建多级目录
rmdir(path) 删除目录
remove(path) 删除文件
removedirs(path1/path2....) 删除多级目录
chdir(path) 把 path 设置为当前工作目录
walk(top[, topdown [, onerror]]) 遍历目录树,该方法返回一个元组,包括所有路径名、所有目录列表和文件列表 3 个元素

os.path 模块也提供了一些操作目录的函数

函数 说明
abspath(path) 用于获取文件或目录的绝对路径
exists(path) 用于判断目录或者文件是否存在
realpath(path) 获取消除 .. 和 . 等符号的真实路径
join(path, name) 将目录与目录或者文件名拼接起来
splitext() 分离文件名和扩展名
basename(path) 从一个目录中提取文件名
dirname(path) 从一个路径中提取文件路径,不包括文件名
isdir(path) 用于判断是否为路径
split(path) 返回以路径和最后一个文件名组成的元组(path, last_part)

使用 rmdir() 函数只能删除空的目录,如果想要删除非空目录,则需要使用 Python 内置的标准模块 shutil 的 rmtree() 函数实现。

os.path.expanduser:

1
2
3
4
5
import os

path = "~/Desktop/hhh.log"
print(os.path.expanduser(path))
# /Users/nocilantro/Desktop/hhh.log

常见到的用法

获取一个.py文件的工作目录:

1
2
3
4
5
import os
import sys

# 首先获得文件路径,然后通过 os.path.dirname 获得目录的路径
workdir = os.path.dirname(os.path.join(os.getcwd(), sys.argv[0]))

常用操作

1
2
3
4
import os
import sys

path = os.path.realpath(os.path.dirname(os.path.join(os.getcwd(), sys.argv[0])))

判断文件是否存在,不存在时创建文件:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# windows 不适用
import os


def check_path(path):
    if not os.path.exists(path):
        print("hhhh")
        os.system(f"touch {path}")

check_path("./woaini.log")
# hhhh

通用:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
import os


def check_path(path):
    if not os.path.exists(path):
        print("hhhh")
        with open(path, "w"):
            pass

check_path("./woaini.log")

清空文件内容:

1
2
with open("test.js", "w") as f:
    pass

高级文件操作

函数 说明
accsee(path, accessmode) 获取对文件是否有指定的访问权限(读取/写入/执行权限)。accessmode 的值是 R_OK(读取),W_OK(写入)、X_OK(执行)或 F_OK(存在)。如果有指定的权限,则返回 1,否则返回 0
chmod(paht, mode) 修改 path 指定文件的访问权限
remove(path) 删除 path 指定的文件路径
rename(src, dst) 将文件或目录 src 重命名为 dst
stat(path) 返回 path 指定文件的信息
startfile(path[, operation]) 使用关联的应用程序打开 path 指定的文件

I/O 基类

https://docs.python.org/zh-cn/3/library/io.html?highlight=fileno#i-o-base-classes

fileno()
返回流的底层文件描述符(整数)---如果存在。如果 IO 对象不使用文件描述符,则会引发 OSError 。

1
2
3
4
5
6
7
8
9
import os

info = os.stat("./test.log")
print(info)

with open("./test.log") as f:
    fino = f.fileno()
    print(fino)
    print(os.stat(fino))

输出结果:

1
2
3
os.stat_result(st_mode=33188, st_ino=67135708, st_dev=16777220, st_nlink=1, st_uid=501, st_gid=20, st_size=0, st_atime=1611815312, st_mtime=1611815312, st_ctime=1611815317)
3
os.stat_result(st_mode=33188, st_ino=67135708, st_dev=16777220, st_nlink=1, st_uid=501, st_gid=20, st_size=0, st_atime=1611815312, st_mtime=1611815312, st_ctime=1611815317)

seek(offset, whence=SEEK_SET):
将流位置修改到给定的字节 offset。
offset 将相对于由 whence 指定的位置进行解析。 whence 的默认值为 SEEK_SET。 whence 的可用值有:
SEEK_SET 或 0 -- 流的开头(默认值);offset 应为零或正值
SEEK_CUR or 1 -- 当前流位置;offset 可以为负值
SEEK_END or 2 -- 流的末尾;offset 通常为负值

tell()
返回当前流的位置。

truncate(size=None):
将流的大小调整为给定的 size 个字节(如果未指定 size 则调整至当前位置)。
当前的流位置不变。 这个调整操作可扩展或减小当前文件大小。
在扩展的情况下,新文件区域的内容取决于具体平台(在大多数系统上,额外的字节会填充为零)。 返回新的文件大小。

获取文件或文件描述符的状态

https://docs.python.org/zh-cn/3/library/os.html#os.stat

os.stat(path, *, dir_fd=None, follow_symlinks=True):
获取文件或文件描述符的状态。
在所给路径上执行等效于stat()系统调用的操作。
path可以是字符串类型,或(直接传入或通过PathLike接口间接传入的)bytes 类型,或打开的文件描述符。
返回一个stat_result对象。

1
2
3
4
5
6
7
8
>>> import os
>>> statinfo = os.stat('somefile.txt')
>>> statinfo
os.stat_result(st_mode=33188, st_ino=7876932, st_dev=234881026,
st_nlink=1, st_uid=501, st_gid=501, st_size=264, st_atime=1297230295,
st_mtime=1297230027, st_ctime=1297230027)
>>> statinfo.st_size
264

class os.stat_result:
本对象的属性大致对应于 stat 结构体成员,主要作为 os.stat()、os.fstat() 和 os.lstat() 的返回值。

属性:
st_mode 文件模式:包括文件类型和文件模式位(即权限位)。
st_ino 与平台有关,但如果不为零,则根据 st_dev 值唯一地标识文件。通常: 在 Unix 上该值表示索引节点号 (inode number)。
在 Windows 上该值表示 文件索引号 。
st_dev 该文件所在设备的标识符。
st_nlink 硬链接的数量。
st_uid 文件所有者的用户 ID。
st_gid 文件所有者的用户组 ID。
st_size 文件大小(以字节为单位),文件可以是常规文件或符号链接。符号链接的大小是它包含的路径的长度,不包括末尾的空字节。

时间戳:
st_atime 最近的访问时间,以秒为单位。
st_mtime 最近的修改时间,以秒为单位。
st_ctime
取决于平台:
在 Unix 上表示最近的元数据更改时间,
在 Windows 上表示创建时间,以秒为单位。

os.fstat(fd)
获取文件描述符 fd 的状态. 返回一个 stat_result 对象。
从 Python 3.3 起,此功能等价于 os.stat(fd)。

os.lstat(path, *, dir_fd=None)
在给定路径上执行本函数,其操作相当于 lstat() 系统调用,类似于 stat() 但不跟踪符号链接。返回值是 stat_result 对象。
在不支持符号链接的平台上,本函数是 stat() 的别名。
从 Python 3.3 起,此功能等价于 os.stat(path, dir_fd=dir_fd, follow_symlinks=False)

os.statvfs(path):
在所给的路径上执行 statvfs() 系统调用。
返回值是一个对象,其属性描述了所给路径上的文件系统,并且与 statvfs 结构体的成员相对应,
即:f_bsize, f_frsize, f_blocks, f_bfree, f_bavail, f_files, f_ffree, f_favail, f_flag, f_namemax, f_fsid。

f_bsize: 文件系统块大小
f_frsize: 分栈大小
f_blocks: 文件系统数据块总数
f_bfree: 可用块数
f_bavail:非超级用户可获取的块数
f_files: 文件结点总数
f_ffree: 可用文件结点数
f_favail: 非超级用户的可用文件结点数
f_fsid: 文件系统标识 ID
f_flag: 挂载标记
f_namemax: 最大文件长度

1
2
3
import os
info = os.statvfs("./test.log")
print(info)

输出结果:

1
os.statvfs_result(f_bsize=1048576, f_frsize=4096, f_blocks=61202533, f_bfree=21324916, f_bavail=17069780, f_files=2448101320, f_ffree=2444860977, f_favail=2444860977, f_flag=0, f_namemax=255)

StringIO

class io.StringIO(initial_value='', newline='\n'):
一个使用内存文本缓冲的文本流。 它继承自 TextIOBase。
当 close() 方法被调用时将会丢弃文本缓冲区。
缓冲区的初始值可通过提供 initial_value 来设置。
如果启用了行结束符转写,换行将以 write() 所用的方式被编码。 数据流位置将被设为缓冲区的开头。

class io.TextIOBase:
文本流的基类。 该类提供了基于字符和行的流 I/O 接口。 它继承自 IOBase。 该类无公有构造器。

获取文件操作的时间

1
2
3
os.path.getatime(file) 输出文件访问时间
os.path.getctime(file) 输出文件的创建时间
os.path.getmtime(file) 输出文件最近修改时间

例: 获取test.cpp的最近修改时间

1
2
3
4
5
6
7
import os
from datetime import datetime

secs = os.path.getmtime("./test.cpp")
t = datetime.fromtimestamp(secs)
print(t)
# 2020-12-30 22:24:57.273339

csv 模块

https://docs.python.org/zh-cn/3.7/library/csv.html

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
import csv

with open("test.csv","w") as csvfile:
    writer = csv.writer(csvfile)
    #先写入columns_name
    writer.writerow(["time", "code", "price"])


with open("test.csv","a") as csvfile:
    writer = csv.writer(csvfile)
    #写入多行用writerows
    writer.writerows([[0, 1, 3], [1, 2, 3], [2, 3, 4]])
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
>>> import csv
>>> with open('names.csv', newline='') as csvfile:
...     reader = csv.DictReader(csvfile)
...     for row in reader:
...         print(row['first_name'], row['last_name'])
...
Eric Idle
John Cleese

>>> print(row)
OrderedDict([('first_name', 'John'), ('last_name', 'Cleese')])

aiofiles

readlines:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import asyncio
import aiofiles

def test():
    fp = open("test.log", "r", 1)
    for line in fp.readlines()[-10:]:
        yield line.rstrip()
    fp.close()

for s in test():
    print(s)


async def async_test():
    fp = await aiofiles.open("test.log", "r", 1)
    lines = await fp.readlines()
    for line in lines[-10:]:
        yield line.rstrip()
    await fp.close()

async def run():
    async for x in async_test():
        print(x)

loop = asyncio.get_event_loop()
loop.run_until_complete(run())

seek:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import asyncio
import aiofiles
import os

def test():
    fp = open("test.log", "r", 1)
    fp.seek(0, os.SEEK_END)
    print(fp.read())
    fp.close()

test()

async def async_test():
    fp = await aiofiles.open("test.log", "r", 1)
    await fp.seek(0, os.SEEK_END)
    print(await fp.read())
    await fp.close()

async def run():
    await async_test()


loop = asyncio.get_event_loop()
loop.run_until_complete(run())

read:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import asyncio
import aiofiles
import os
from functools import partial

def test():
    fp = open("test.log", "r", 1)
    blocksize = os.statvfs("test.log").f_bsize
    print(blocksize)
    for chunk in iter(partial(fp.read, blocksize), ''):
        print(chunk)
    fp.close()

test()

async def async_test():
    fp = await aiofiles.open("test.log", "r", 1)
    blocksize = os.statvfs("test.log").f_bsize
    print(blocksize)
    while True:
        content = await fp.read(blocksize)
        if content == "":
            break
        print(content)
    await fp.close()

async def run():
    await async_test()

loop = asyncio.get_event_loop()
loop.run_until_complete(run())

readline:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
import asyncio
import aiofiles

async def async_test():
    fp = await aiofiles.open("test.log", "r", 1)
    while True:
        content = await fp.readline()
        if content == "":
            break
        print(content.rstrip())
    await fp.close()

loop = asyncio.get_event_loop()
loop.run_until_complete(async_test())