java中的接口是类吗
236
2022-08-27
Python从门到精通(五):文件处理-01-文件I/O(python的文件操作)
一、文件读写
文件读取需要注意三个问题:1、with上下文环境;2、换行符;3、编码(编码可用sys.gefdefaultencoding()取得系统默认编码)。如果想绕过文件编码层可直接访问buffer属性如 sys.stduout.buffer.write();
1.1、读写文本文件
open中有几种模式【文件格式+打开方式】,文件格式:文本-t,二进制-b,压缩文件-t。打开方式:r-读,w-写。
file_name = 'test.txt' #默认的是读当前目录下的文件"""读取文本"""with open(file_name, 'rt') as f: f.read()#文件有可能存在,所以需要用这种方式判断一下import osif not os.path.exists(file_name): with open(file_name, 'wt') as f: f.write('Hello,I am a test.\n')else: print(f'File {file_name} already exists!')
1.2、读写二进制文件
#二进制写法b = b'Hello World'print(f'binary object b[0] = {b[0]}')#二进制的读写必须要进行解码和编码with open('test.bin', 'rb') as f: data = f.read(16) text = data.decode('utf-8')with open('test.bin', 'wb') as f: text = 'Hello World' f.write(text.encode('utf-8'))import arraya_obj = array.array('i', [0, 0, 0, 0, 0, 0, 0, 0])with open('test.bin', 'rb') as f: # readinto会直接操作到内存中,但这个会和平台相关,注意使用 f.readinto(a_obj)
1.3、读写压缩文件
# gzip compressionimport gzipgz_file, bz_file = "giztext.gz", "bz.gz"with gzip.open(gz_file, 'rt') as f: text = f.read()# bz2 compressionimport bz2with bz2.open(bz_file, 'rt') as f: text = f.read()# gzip compressionimport gzipwith gzip.open(gz_file, 'wt') as f: f.write(text)# bz2 compressionimport bz2with bz2.open(bz_file, 'wt') as f: f.write(text)#设置压缩级别with gzip.open(gz_file, 'wt', compresslevel=3) as f: f.write(text)
1.4、文件编码
import urllib.requestimport io#二进制文件编码修改url_res = urllib.request.urlopen('= io.TextIOWrapper(url_res, encoding='utf-8')text_val = f_test.read()#修改一个已经打开的文本模式的编码,先用detach()清除现在的编码层import sysprint(f'sys stdout encoding is: {sys.stdout.encoding}') #utf-8sys.stdout = io.TextIOWrapper(sys.stdout.detach(), encoding='latin-1')print(f'sys stdout new encoding is: {sys.stdout.encoding}') #latin-1#I/O系统示例,下面是一次I/O的完整过程file_read = open('sample.txt','w')print(f'file read: {file_read}') #<_io.TextIOWrapper name='sample.txt' mode='w' encoding='UTF-8'>print(f'file buffer: {file_read.buffer}') #<_io.BufferedWriter name='sample.txt'>print(f'file buffer raw: {file_read.buffer.raw}') #<_io.FileIO name='sample.txt' mode='wb' closefd=True>
1.5、读取定长文件
from functools import partialRECORD_SIZE = 32with open('somefile.data', 'rb') as f: records = iter(partial(f.read, RECORD_SIZE), b'') for r in records: pass
1.6、创建临时文件
from tempfile import TemporaryFile#TemporaryFile:创建一个匿名的临时文件,不可以使用底层的一些方法#NamedTemporaryFile:创建一个匿名的临时文件,同时可以使用底层的一些方法with TemporaryFile('w+t') as f: # Read/write to the file f.write('Hello World\n') f.write('Testing\n') # Seek back to beginning and read the data f.seek(0) data = f.read()f = TemporaryFile('w+t')# Use the temporary filef.close()# ---------------------------------------------------from tempfile import NamedTemporaryFilewith NamedTemporaryFile('w+t') as f: print('filename is:', f.name) passwith NamedTemporaryFile('w+t', delete=False) as f: print('filename is:', f.name) pass# ---------------------------------------------------from tempfile import TemporaryDirectorywith TemporaryDirectory() as dirname: print('dirname is:', dirname) # Use the directory #/var/folders/h1/jwyy02nd1hg5p0_pgxg_9w3c0000gn/T/tmp_3lwonjhimport tempfileprint(tempfile.mkstemp()) #(4, '/var/folders/h1/jwyy02nd1hg5p0_pgxg_9w3c0000gn/T/tmpi_hjdkd0')print(tempfile.gettempdir()) #/var/folders/h1/jwyy02nd1hg5p0_pgxg_9w3c0000gn/Tf = NamedTemporaryFile(prefix='mytemp', suffix='.txt', dir='/tmp')print(f.name) #/tmp/mytempng2rx_bg.txt
1.7、文件描述符包装
文件描述符就是一个变量,用来指定某个系统的I/O通道,可以通过open()和makefile()函数来包装,后者性能不如前者但可以跨平台。在unix系统中,可以通过这种主式来创建管道。
import osfile_data = os.open('test.txt', os.O_WRONLY | os.O_CREAT)# Turn into a proper filetest_file = open(file_data, 'wt')test_file.write('hello world\n')test_file.close()from socket import socket, AF_INET, SOCK_STREAMdef echo_client(client_sock, addr): print(f'Got connection from {addr}') # Make text-mode file wrappers for socket reading/writing client_in = open(client_sock.fileno(), 'rt', encoding='latin-1', closefd=False) client_out = open(client_sock.fileno(), 'wt', encoding='latin-1', closefd=False) # Echo lines back to the client using file I/O for line in client_in: client_out.write(line) client_out.flush() client_sock.close()def echo_server(address): sock = socket(AF_INET, SOCK_STREAM) sock.bind(address) sock.listen(1) while True: client, addr = sock.accept() echo_client(client, addr)import sysbstd_out = open(sys.stdout.fileno(), 'wb', closefd=False)bstd_out.write(b'Hello World\n')bstd_out.flush()
二、文件操作
1.1、路径
import oscsv_path = '/usr/test/Data/test.csv'print(f'{csv_path} base name is: {os.path.basename(csv_path)}') #test.csvprint(f'{csv_path} dir name is: {os.path.dirname(csv_path)}') #/usr/test/Dataprint(f"new path: {os.path.join('tmp', 'data', os.path.basename(csv_path))}") #tmp/data/test.csvcsv_path = '~/Data/test.csv'print(f'path expand user is: {os.path.expanduser(csv_path)}') #/Users/liudong/Data/test.csvprint(f'{csv_path} splitext is: {os.path.splitext(csv_path)}') #('~/Data/test', '.csv')
1.2、检查
#需要注意权限问题import osfile_path = '/etc/passwd'test_path = '/etc/test'print(f"is {file_path} exists: {os.path.exists(file_path)}")
1.3、列表
import osfile_path = '/etc'#列出文件夹中的所有文件name_list = os.listdir(file_path)print(f'file list of etc is:\n{name_list}')#以下就是文件过滤import os.pathdir_name_list = [name for name in os.listdir(file_path) if os.path.isdir(os.path.join(file_path, name))]py_file_list = [name for name in os.listdir(file_path) if name.endswith('.py')]import os.pathimport globpy_file_list = glob.glob('*.py')# Get file sizes and modification dates,获取更多的文件信息name_sz_date = [(name, os.path.getsize(name), os.path.getmtime(name)) for name in py_file_list]for name, size, mtime in name_sz_date: print(f'name={name}, size={size}, mtime={mtime}')# Alternative: Get file metadatafile_metadata = [(name, os.stat(name)) for name in py_file_list]for name, meta in file_metadata: print(name, meta.st_size, meta.st_mtime) print(f'name={name}, size={meta.st_size}, mtime={meta.st_mtime}')
三、文件的内存操作
3.1、文件的内存映射
import osimport mmapdef memory_map(file_name, access=mmap.ACCESS_WRITE): size_val = os.path.getsize(file_name) fd = os.open(file_name, os.O_RDWR) return mmap.mmap(fd, size_val, access=access)size = 1000000with open('test_data', 'wb') as f: f.seek(size - 1) f.write(b'\x00')m = memory_map('test_data')print(f'the len of m is: {len(m)}') #1000000print(f'm split: {m[0:10]}') #b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'print(f'm[0] is: {m[0]}') #0m[0:11] = b'Hello World'print(f'close result: {m.close()}') #Nonewith open('test_data', 'rb') as f: print(f'read content: {f.read(11)}') #b'Hello World'm = memory_map('test_data')v = memoryview(m).cast('I')v[0] = 7print(f'point content from m is: {m[0:4]}') #b'\x07\x00\x00\x00'm[0:4] = b'\x07\x01\x00\x00'print(f'v[0] = {v[0]}') #263
3.2、从缓冲区读取二进制文件
import os.pathdef read_into_buffer(file_name): buf = bytearray(os.path.getsize(file_name)) with open(file_name, 'rb') as f: """#用来填充已有的缓存冲,而read是新建一个缓冲区,可避免大量的内存操作, 有个问题就是需要判断读取与字节数与缓存区大小是否一致,就是简单判断下读取的大小与返回的大小是否一致""" f.readinto(buf) return bufwith open('test_file.bin', 'wb') as f: f.write(b'Hello World')buf_read = read_into_buffer('test_file.bin')print(f'buf read is: {buf_read}')buf_read[0:5] = b'Hello'print(f'buf read is: {buf_read}')with open('new_test_file.bin', 'wb') as f: f.write(buf_read)# Size of each record (adjust value)record_size = 32"""判断大小"""buf_read = bytearray(record_size)with open('test_file', 'rb') as f: while True: n = f.readinto(buf_read) if n < record_size: breakprint(f'buf read is: {buf_read}')memory_val = memoryview(buf_read)#这是一种0-copy技术memory_val = memory_val[-3:]print(f'memory value is: {memory_val}')memory_val[:] = b'WORLD'print(f'buf read is: {buf_read}')
3.3、序列化
import serialser = serial.Serial('/dev/tty.usbmodem641', # Device name varies baudrate=9600, bytesize=8, parity='N', stopbits=1)ser.write(b'G1 X50 Y50\r\n')resp = ser.readline()
import pickle# file load is [1, 6, 3, 9]# file load is hello,world!# file load is {'go', 'java', 'python'}# pickle funciton: b'\x80\x04\x95\x10\x00\x00\x00\x00\x00\x00\x00\x8c\x04math\x94\x8c\x03cos\x94\x93\x94.'# T-minus is: 30# T-minus is: 29load result: <__main__.Countdown object at 0x1037da850>data_obj = ... # Some Python objecttest_file = open('test_file', 'wb')pickle.dump(data_obj, test_file)#它和load是相逆的两个操作p_con = pickle.dumps(data_obj)# Restore from a filetest_file = open('test_file', 'rb')data_obj = pickle.load(test_file)# Restore from a stringdata_obj = pickle.loads(p_con)import pickletest_file = open('some_data', 'wb')pickle.dump([1, 6, 3, 9], test_file)pickle.dump('hello,world!', test_file)pickle.dump({'python', 'java', 'go'}, test_file)test_file.close()test_file = open('some_data', 'rb')print(f'file load is {pickle.load(test_file)}')print(f'file load is {pickle.load(test_file)}')print(f'file load is {pickle.load(test_file)}')import mathimport pickleprint(f'pickle funciton: {pickle.dumps(math.cos)}')import timeimport threading"""有些对象依赖系统底层是不能被序列化的,其实可以使用getstate和setstate来实现序列化和反序列化"""class Countdown: def __init__(self, n): self.n = n self.thr = threading.Thread(target=self.run) self.thr.daemon = True self.thr.start() def run(self): while self.n > 0: print(f'T-minus is: {self.n}') self.n -= 1 time.sleep(5) def __getstate__(self): return self.n def __setstate__(self, n): self.__init__(n)count_down = Countdown(30)test_file = open('test.p', 'wb')import picklepickle.dump(count_down, test_file)test_file.close()test_file = open('test.p', 'rb')print(f'load result: {pickle.load(test_file)}')
四、编码与解码
base64
s_obj = b'hello'import base64code_obj = base64.b64encode(s_obj)print(f'b64 encode {s_obj} = {code_obj}') #b'aGVsbG8='print(f'decode {code_obj} = {base64.b64decode(code_obj)}') #b'hello'code_obj = base64.b64encode(s_obj).decode('ascii')print(f'encode decode {s_obj}= {code_obj}') #aGVsbG8=
0x
s = b'hello'import binasciih = binascii.b2a_hex(s)print(f'base: {h}') #b'68656c6c6f'print(f'b2a hex: {binascii.a2b_hex(h)}') #b'hello'import base64h = base64.b16encode(s)print(f'base: {h}') #b'68656C6C6F'print(f'b16 decode: {base64.b16decode(h)}') #b'hello'h = base64.b16encode(s)print(f'base: {h}') #b'68656C6C6F'print(f"decode: {h.decode('ascii')}") #68656C6C6F
五、高级操作
5.1、文件目录的复制和移动
import shutil#shutil库最大的问题是对文件的元数据保留的不全;# Copy src to dst. (cp src dst)shutil.copy(src, dst)# Copy files, but preserve metadata (cp -p src dst)shutil.copy2(src, dst)# Copy directory tree (cp -R src dst)shutil.copytree(src, dst)# Move src to dst (mv src dst)shutil.move(src, dst)shutil.copytree(src, dst, symlinks=True)#忽略文件def ignore_pyc_files(dirname, filenames): return [name in filenames if name.endswith('.pyc')]shutil.copytree(src, dst, ignore=ignore_pyc_files)shutil.copytree(src, dst, ignore=shutil.ignore_patterns('*~', '*.pyc'))try: shutil.copytree(src, dst)except shutil.Error as e: for src, dst, msg in e.args[0]: # src is source name # dst is destination name # msg is error message from exception print(dst, src, msg)
import os.path#它的比shutil的优势是元数据保留的全file_name = '/davanced_programming/chapter13/spam.py'print(f'base name is: {os.path.basename(file_name)}')print(f'dir name is: {os.path.dirname(file_name)}')print(f'file split: {os.path.split(file_name)}')print(os.path.join('/new/dir', os.path.basename(file_name)))print(os.path.expanduser('~/chapter13/spam.py'))
5.2、压缩文件
如果想处理更高级的细节,可使用tarfile,zipfile, gzip, bz2模块,shutil只是一层代理
import shutilshutil.unpack_archive('py38.zip')shutil.make_archive('py38','zip','test_zip')print(shutil.get_archive_formats())#输出支持的文件归档格式
5.3、查找文件
import osdef find_file(start, name): for rel_path, dirs, files in os.walk(start):#os.walk(start): if name in files: full_path = os.path.join(start, rel_path, name) print(f'full path is: {os.path.normpath(os.path.abspath(full_path))}')#abspath修正路径名if __name__ == '__main__': find_file('/advanced_programming/chapter13', 'file_input.py')
import osimport time#查找最近修改过的文件def modified_within(top, seconds): now = time.time() for path, dirs, files in os.walk(top): for name in files: full_path = os.path.join(path, name) if not os.path.exists(full_path): continue m_time = os.path.getmtime(full_path) if m_time > (now - seconds): print(f'full path is: {full_path}')if __name__ == '__main__': modified_within('/advanced_programming/chapter13', float(1000))
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~