标签归档:file-io

使用Python删除目录中的所有文件

问题:使用Python删除目录中的所有文件

我想删除目录中带有扩展名的所有文件.bak。如何在Python中做到这一点?

I want to delete all files with the extension .bak in a directory. How can I do that in Python?


回答 0

通过os.listdiros.remove

import os

filelist = [ f for f in os.listdir(mydir) if f.endswith(".bak") ]
for f in filelist:
    os.remove(os.path.join(mydir, f))

或通过glob.glob

import glob, os, os.path

filelist = glob.glob(os.path.join(mydir, "*.bak"))
for f in filelist:
    os.remove(f)

确保位于正确的目录中,并最终使用os.chdir

Via os.listdir and os.remove:

import os

filelist = [ f for f in os.listdir(mydir) if f.endswith(".bak") ]
for f in filelist:
    os.remove(os.path.join(mydir, f))

Using only a single loop:

for f in os.listdir(mydir):
    if not f.endswith(".bak"):
        continue
    os.remove(os.path.join(mydir, f))

Or via glob.glob:

import glob, os, os.path

filelist = glob.glob(os.path.join(mydir, "*.bak"))
for f in filelist:
    os.remove(f)

Be sure to be in the correct directory, eventually using os.chdir.


回答 1

使用os.chdir以更改目录。使用glob.glob以生成结束它“.bak的”文件名列表。列表的元素只是字符串。

然后,您可以os.unlink用来删除文件。(PS os.unlink和和os.remove是相同功能的同义词。)

#!/usr/bin/env python
import glob
import os
directory='/path/to/dir'
os.chdir(directory)
files=glob.glob('*.bak')
for filename in files:
    os.unlink(filename)

Use os.chdir to change directory . Use glob.glob to generate a list of file names which end it ‘.bak’. The elements of the list are just strings.

Then you could use os.unlink to remove the files. (PS. os.unlink and os.remove are synonyms for the same function.)

#!/usr/bin/env python
import glob
import os
directory='/path/to/dir'
os.chdir(directory)
files=glob.glob('*.bak')
for filename in files:
    os.unlink(filename)

回答 2

在Python 3.5中,os.scandir如果需要检查文件属性或类型会更好-请参见os.DirEntry函数所返回对象的属性。

import os 

for file in os.scandir(path):
    if file.name.endswith(".bak"):
        os.unlink(file.path)

这也不需要更改目录,因为每个目录DirEntry都已包含文件的完整路径。

In Python 3.5, os.scandir is better if you need to check for file attributes or type – see os.DirEntry for properties of the object that’s returned by the function.

import os 

for file in os.scandir(path):
    if file.name.endswith(".bak"):
        os.unlink(file.path)

This also doesn’t require changing directories since each DirEntry already includes the full path to the file.


回答 3

您可以创建一个函数。根据需要添加maxdepth以遍历子目录。

def findNremove(path,pattern,maxdepth=1):
    cpath=path.count(os.sep)
    for r,d,f in os.walk(path):
        if r.count(os.sep) - cpath <maxdepth:
            for files in f:
                if files.endswith(pattern):
                    try:
                        print "Removing %s" % (os.path.join(r,files))
                        #os.remove(os.path.join(r,files))
                    except Exception,e:
                        print e
                    else:
                        print "%s removed" % (os.path.join(r,files))

path=os.path.join("/home","dir1","dir2")
findNremove(path,".bak")

you can create a function. Add maxdepth as you like for traversing subdirectories.

def findNremove(path,pattern,maxdepth=1):
    cpath=path.count(os.sep)
    for r,d,f in os.walk(path):
        if r.count(os.sep) - cpath <maxdepth:
            for files in f:
                if files.endswith(pattern):
                    try:
                        print "Removing %s" % (os.path.join(r,files))
                        #os.remove(os.path.join(r,files))
                    except Exception,e:
                        print e
                    else:
                        print "%s removed" % (os.path.join(r,files))

path=os.path.join("/home","dir1","dir2")
findNremove(path,".bak")

回答 4

首先glob的他们,然后断开链接


回答 5

在Linux和macOS上,您可以对shell运行简单的命令:

subprocess.run('rm /tmp/*.bak', shell=True)

On Linux and macOS you can run simple command to the shell:

subprocess.run('rm /tmp/*.bak', shell=True)

回答 6

我意识到这很老了;但是,这将是仅使用os模块的方法…

def purgedir(parent):
    for root, dirs, files in os.walk(parent):                                      
        for item in files:
            # Delete subordinate files                                                 
            filespec = os.path.join(root, item)
            if filespec.endswith('.bak'):
                os.unlink(filespec)
        for item in dirs:
            # Recursively perform this operation for subordinate directories   
            purgedir(os.path.join(root, item))

I realize this is old; however, here would be how to do so using just the os module…

def purgedir(parent):
    for root, dirs, files in os.walk(parent):                                      
        for item in files:
            # Delete subordinate files                                                 
            filespec = os.path.join(root, item)
            if filespec.endswith('.bak'):
                os.unlink(filespec)
        for item in dirs:
            # Recursively perform this operation for subordinate directories   
            purgedir(os.path.join(root, item))

Python递归文件夹读取

问题:Python递归文件夹读取

我有C ++ / Obj-C背景,而我刚发现Python(大约写了一个小时)。我正在编写一个脚本,以递归方式读取文件夹结构中文本文件的内容。

我的问题是我编写的代码仅适用于一个文件夹较深的地方。我可以看到为什么在代码中(请参阅参考资料#hardcoded path),我只是不知道如何继续使用Python,因为我的经验仅仅是全新的。

Python代码:

import os
import sys

rootdir = sys.argv[1]

for root, subFolders, files in os.walk(rootdir):

    for folder in subFolders:
        outfileName = rootdir + "/" + folder + "/py-outfile.txt" # hardcoded path
        folderOut = open( outfileName, 'w' )
        print "outfileName is " + outfileName

        for file in files:
            filePath = rootdir + '/' + file
            f = open( filePath, 'r' )
            toWrite = f.read()
            print "Writing '" + toWrite + "' to" + filePath
            folderOut.write( toWrite )
            f.close()

        folderOut.close()

I have a C++/Obj-C background and I am just discovering Python (been writing it for about an hour). I am writing a script to recursively read the contents of text files in a folder structure.

The problem I have is the code I have written will only work for one folder deep. I can see why in the code (see #hardcoded path), I just don’t know how I can move forward with Python since my experience with it is only brand new.

Python Code:

import os
import sys

rootdir = sys.argv[1]

for root, subFolders, files in os.walk(rootdir):

    for folder in subFolders:
        outfileName = rootdir + "/" + folder + "/py-outfile.txt" # hardcoded path
        folderOut = open( outfileName, 'w' )
        print "outfileName is " + outfileName

        for file in files:
            filePath = rootdir + '/' + file
            f = open( filePath, 'r' )
            toWrite = f.read()
            print "Writing '" + toWrite + "' to" + filePath
            folderOut.write( toWrite )
            f.close()

        folderOut.close()

回答 0

确保您了解以下三个返回值os.walk

for root, subdirs, files in os.walk(rootdir):

具有以下含义:

  • root:“经过”的当前路径
  • subdirsroot目录类型中的文件
  • files:目录中以外类型root(不在中subdirs)的文件

并且请使用os.path.join而不是用斜杠连接!您的问题是filePath = rootdir + '/' + file-您必须串联当前“步行”的文件夹,而不是最顶层的文件夹。所以一定是filePath = os.path.join(root, file)。顺便说一句,“文件”是内置的,因此通常不将其用作变量名。

另一个问题是循环,应该像这样,例如:

import os
import sys

walk_dir = sys.argv[1]

print('walk_dir = ' + walk_dir)

# If your current working directory may change during script execution, it's recommended to
# immediately convert program arguments to an absolute path. Then the variable root below will
# be an absolute path as well. Example:
# walk_dir = os.path.abspath(walk_dir)
print('walk_dir (absolute) = ' + os.path.abspath(walk_dir))

for root, subdirs, files in os.walk(walk_dir):
    print('--\nroot = ' + root)
    list_file_path = os.path.join(root, 'my-directory-list.txt')
    print('list_file_path = ' + list_file_path)

    with open(list_file_path, 'wb') as list_file:
        for subdir in subdirs:
            print('\t- subdirectory ' + subdir)

        for filename in files:
            file_path = os.path.join(root, filename)

            print('\t- file %s (full path: %s)' % (filename, file_path))

            with open(file_path, 'rb') as f:
                f_content = f.read()
                list_file.write(('The file %s contains:\n' % filename).encode('utf-8'))
                list_file.write(f_content)
                list_file.write(b'\n')

如果您不知道,则with文件声明是一种简写形式:

with open('filename', 'rb') as f:
    dosomething()

# is effectively the same as

f = open('filename', 'rb')
try:
    dosomething()
finally:
    f.close()

Make sure you understand the three return values of os.walk:

for root, subdirs, files in os.walk(rootdir):

has the following meaning:

  • root: Current path which is “walked through”
  • subdirs: Files in root of type directory
  • files: Files in root (not in subdirs) of type other than directory

And please use os.path.join instead of concatenating with a slash! Your problem is filePath = rootdir + '/' + file – you must concatenate the currently “walked” folder instead of the topmost folder. So that must be filePath = os.path.join(root, file). BTW “file” is a builtin, so you don’t normally use it as variable name.

Another problem are your loops, which should be like this, for example:

import os
import sys

walk_dir = sys.argv[1]

print('walk_dir = ' + walk_dir)

# If your current working directory may change during script execution, it's recommended to
# immediately convert program arguments to an absolute path. Then the variable root below will
# be an absolute path as well. Example:
# walk_dir = os.path.abspath(walk_dir)
print('walk_dir (absolute) = ' + os.path.abspath(walk_dir))

for root, subdirs, files in os.walk(walk_dir):
    print('--\nroot = ' + root)
    list_file_path = os.path.join(root, 'my-directory-list.txt')
    print('list_file_path = ' + list_file_path)

    with open(list_file_path, 'wb') as list_file:
        for subdir in subdirs:
            print('\t- subdirectory ' + subdir)

        for filename in files:
            file_path = os.path.join(root, filename)

            print('\t- file %s (full path: %s)' % (filename, file_path))

            with open(file_path, 'rb') as f:
                f_content = f.read()
                list_file.write(('The file %s contains:\n' % filename).encode('utf-8'))
                list_file.write(f_content)
                list_file.write(b'\n')

If you didn’t know, the with statement for files is a shorthand:

with open('filename', 'rb') as f:
    dosomething()

# is effectively the same as

f = open('filename', 'rb')
try:
    dosomething()
finally:
    f.close()

回答 1

如果您使用的是Python 3.5或更高版本,则可以在1行中完成此操作。

import glob

for filename in glob.iglob(root_dir + '**/*.txt', recursive=True):
     print(filename)

文档中所述

如果递归为true,则模式**将匹配任何文件以及零个或多个目录和子目录。

如果需要每个文件,可以使用

import glob

for filename in glob.iglob(root_dir + '**/*', recursive=True):
     print(filename)

If you are using Python 3.5 or above, you can get this done in 1 line.

import glob

# root_dir needs a trailing slash (i.e. /root/dir/)
for filename in glob.iglob(root_dir + '**/*.txt', recursive=True):
     print(filename)

As mentioned in the documentation

If recursive is true, the pattern ‘**’ will match any files and zero or more directories and subdirectories.

If you want every file, you can use

import glob

for filename in glob.iglob(root_dir + '**/**', recursive=True):
     print(filename)

回答 2

同意Dave Webb,os.walk将为树中的每个目录生成一个项目。事实是,您不必在意subFolders

这样的代码应该工作:

import os
import sys

rootdir = sys.argv[1]

for folder, subs, files in os.walk(rootdir):
    with open(os.path.join(folder, 'python-outfile.txt'), 'w') as dest:
        for filename in files:
            with open(os.path.join(folder, filename), 'r') as src:
                dest.write(src.read())

Agree with Dave Webb, os.walk will yield an item for each directory in the tree. Fact is, you just don’t have to care about subFolders.

Code like this should work:

import os
import sys

rootdir = sys.argv[1]

for folder, subs, files in os.walk(rootdir):
    with open(os.path.join(folder, 'python-outfile.txt'), 'w') as dest:
        for filename in files:
            with open(os.path.join(folder, filename), 'r') as src:
                dest.write(src.read())

回答 3

TL; DR:这等效于find -type f遍历以下所有文件夹(包括当前文件夹)中的所有文件:

for currentpath, folders, files in os.walk('.'):
    for file in files:
        print(os.path.join(currentpath, file))

正如其他答案中已经提到的那样,答案os.walk()是正确的,但是可以更好地解释它。很简单!让我们来看看这棵树:

docs/
└── doc1.odt
pics/
todo.txt

使用此代码:

for currentpath, folders, files in os.walk('.'):
    print(currentpath)

currentpath是它正在查看的当前文件夹。这将输出:

.
./docs
./pics

因此它循环了3次,因为有3个文件夹:当前文件夹docs,和pics。在每个循环中,它将填充变量folders以及files所有文件夹和文件。让我们向他们展示:

for currentpath, folders, files in os.walk('.'):
    print(currentpath, folders, files)

这向我们显示:

# currentpath  folders           files
.              ['pics', 'docs']  ['todo.txt']
./pics         []                []
./docs         []                ['doc1.odt']

因此,在第一行中,我们看到我们在folder中.,它包含两个文件夹即picsdocs,并且存在一个文件,即todo.txt。您无需执行任何操作即可递归到那些文件夹中,因为如您所见,它会自动递归,并且只为您提供任何子文件夹中的文件。以及它的任何子文件夹(尽管示例中没有这些子文件夹)。

如果您只想遍历所有文件(等效于)find -type f,则可以执行以下操作:

for currentpath, folders, files in os.walk('.'):
    for file in files:
        print(os.path.join(currentpath, file))

输出:

./todo.txt
./docs/doc1.odt

TL;DR: This is the equivalent to find -type f to go over all files in all folders below and including the current one:

for currentpath, folders, files in os.walk('.'):
    for file in files:
        print(os.path.join(currentpath, file))

As already mentioned in other answers, os.walk() is the answer, but it could be explained better. It’s quite simple! Let’s walk through this tree:

docs/
└── doc1.odt
pics/
todo.txt

With this code:

for currentpath, folders, files in os.walk('.'):
    print(currentpath)

The currentpath is the current folder it is looking at. This will output:

.
./docs
./pics

So it loops three times, because there are three folders: the current one, docs, and pics. In every loop, it fills the variables folders and files with all folders and files. Let’s show them:

for currentpath, folders, files in os.walk('.'):
    print(currentpath, folders, files)

This shows us:

# currentpath  folders           files
.              ['pics', 'docs']  ['todo.txt']
./pics         []                []
./docs         []                ['doc1.odt']

So in the first line, we see that we are in folder ., that it contains two folders namely pics and docs, and that there is one file, namely todo.txt. You don’t have to do anything to recurse into those folders, because as you see, it recurses automatically and just gives you the files in any subfolders. And any subfolders of that (though we don’t have those in the example).

If you just want to loop through all files, the equivalent of find -type f, you can do this:

for currentpath, folders, files in os.walk('.'):
    for file in files:
        print(os.path.join(currentpath, file))

This outputs:

./todo.txt
./docs/doc1.odt

回答 4

pathlib库非常适合处理文件。您可以Path像这样对对象执行递归glob 。

from pathlib import Path

for elem in Path('/path/to/my/files').rglob('*.*'):
    print(elem)

The pathlib library is really great for working with files. You can do a recursive glob on a Path object like so.

from pathlib import Path

for elem in Path('/path/to/my/files').rglob('*.*'):
    print(elem)

回答 5

如果要给定目录下所有路径的平面列表(如find .在shell中):

   files = [ 
       os.path.join(parent, name)
       for (parent, subdirs, files) in os.walk(YOUR_DIRECTORY)
       for name in files + subdirs
   ]

要仅在基本目录下包含文件的完整路径,请省略+ subdirs

If you want a flat list of all paths under a given dir (like find . in the shell):

   files = [ 
       os.path.join(parent, name)
       for (parent, subdirs, files) in os.walk(YOUR_DIRECTORY)
       for name in files + subdirs
   ]

To only include full paths to files under the base dir, leave out + subdirs.


回答 6

import glob
import os

root_dir = <root_dir_here>

for filename in glob.iglob(root_dir + '**/**', recursive=True):
    if os.path.isfile(filename):
        with open(filename,'r') as file:
            print(file.read())

**/**用于递归获取所有文件,包括directory

if os.path.isfile(filename)用于检查filename变量是file还是directory,如果它是文件,那么我们可以读取该文件。我在这里打印文件。

import glob
import os

root_dir = <root_dir_here>

for filename in glob.iglob(root_dir + '**/**', recursive=True):
    if os.path.isfile(filename):
        with open(filename,'r') as file:
            print(file.read())

**/** is used to get all files recursively including directory.

if os.path.isfile(filename) is used to check if filename variable is file or directory, if it is file then we can read that file. Here I am printing file.


回答 7

我发现以下是最简单的

from glob import glob
import os

files = [f for f in glob('rootdir/**', recursive=True) if os.path.isfile(f)]

使用glob('some/path/**', recursive=True)获取所有文件,但还包括目录名称。添加if os.path.isfile(f)条件仅将此列表过滤到现有文件

I’ve found the following to be the easiest

from glob import glob
import os

files = [f for f in glob('rootdir/**', recursive=True) if os.path.isfile(f)]

Using glob('some/path/**', recursive=True) gets all files, but also includes directory names. Adding the if os.path.isfile(f) condition filters this list to existing files only


回答 8

用于os.path.join()构建路径-更整洁:

import os
import sys
rootdir = sys.argv[1]
for root, subFolders, files in os.walk(rootdir):
    for folder in subFolders:
        outfileName = os.path.join(root,folder,"py-outfile.txt")
        folderOut = open( outfileName, 'w' )
        print "outfileName is " + outfileName
        for file in files:
            filePath = os.path.join(root,file)
            toWrite = open( filePath).read()
            print "Writing '" + toWrite + "' to" + filePath
            folderOut.write( toWrite )
        folderOut.close()

use os.path.join() to construct your paths – It’s neater:

import os
import sys
rootdir = sys.argv[1]
for root, subFolders, files in os.walk(rootdir):
    for folder in subFolders:
        outfileName = os.path.join(root,folder,"py-outfile.txt")
        folderOut = open( outfileName, 'w' )
        print "outfileName is " + outfileName
        for file in files:
            filePath = os.path.join(root,file)
            toWrite = open( filePath).read()
            print "Writing '" + toWrite + "' to" + filePath
            folderOut.write( toWrite )
        folderOut.close()

回答 9

os.walk默认情况下不会递归遍历。对于每个目录,从根目录开始都会生成一个三元组(目录路径,目录名,文件名)

from os import walk
from os.path import splitext, join

def select_files(root, files):
    """
    simple logic here to filter out interesting files
    .py files in this example
    """

    selected_files = []

    for file in files:
        #do concatenation here to get full path 
        full_path = join(root, file)
        ext = splitext(file)[1]

        if ext == ".py":
            selected_files.append(full_path)

    return selected_files

def build_recursive_dir_tree(path):
    """
    path    -    where to begin folder scan
    """
    selected_files = []

    for root, dirs, files in walk(path):
        selected_files += select_files(root, files)

    return selected_files

os.walk does recursive walk by default. For each dir, starting from root it yields a 3-tuple (dirpath, dirnames, filenames)

from os import walk
from os.path import splitext, join

def select_files(root, files):
    """
    simple logic here to filter out interesting files
    .py files in this example
    """

    selected_files = []

    for file in files:
        #do concatenation here to get full path 
        full_path = join(root, file)
        ext = splitext(file)[1]

        if ext == ".py":
            selected_files.append(full_path)

    return selected_files

def build_recursive_dir_tree(path):
    """
    path    -    where to begin folder scan
    """
    selected_files = []

    for root, dirs, files in walk(path):
        selected_files += select_files(root, files)

    return selected_files

回答 10

试试这个:

import os
import sys

for root, subdirs, files in os.walk(path):

    for file in os.listdir(root):

        filePath = os.path.join(root, file)

        if os.path.isdir(filePath):
            pass

        else:
            f = open (filePath, 'r')
            # Do Stuff

Try this:

import os
import sys

for root, subdirs, files in os.walk(path):

    for file in os.listdir(root):

        filePath = os.path.join(root, file)

        if os.path.isdir(filePath):
            pass

        else:
            f = open (filePath, 'r')
            # Do Stuff

回答 11

我认为问题是您没有在处理 os.walk正确。

首先,更改:

filePath = rootdir + '/' + file

至:

filePath = root + '/' + file

rootdir是您的固定起始目录;root是由返回的目录os.walk

其次,您不需要缩进文件处理循环,因为对每个子目录运行此循环都没有意义。您将root设置到每个子目录。除非您要对目录本身进行某些操作,否则无需手动处理子目录。

I think the problem is that you’re not processing the output of os.walk correctly.

Firstly, change:

filePath = rootdir + '/' + file

to:

filePath = root + '/' + file

rootdir is your fixed starting directory; root is a directory returned by os.walk.

Secondly, you don’t need to indent your file processing loop, as it makes no sense to run this for each subdirectory. You’ll get root set to each subdirectory. You don’t need to process the subdirectories by hand unless you want to do something with the directories themselves.


如何打开文件进行读写?

问题:如何打开文件进行读写?

有没有办法打开文件进行读写?

解决方法是,打开文件进行写入,将其关闭,然后再次打开以进行读取。但是,有没有办法打开一个文件阅读和写作?

Is there a way to open a file for both reading and writing?

As a workaround, I open the file for writing, close it, then open it again for reading. But is there a way to open a file for both reading and writing?


回答 0

在不关闭和重新打开的情况下,读取文件然后写入文件(覆盖所有现有数据)的方法如下:

with open(filename, "r+") as f:
    data = f.read()
    f.seek(0)
    f.write(output)
    f.truncate()

Here’s how you read a file, and then write to it (overwriting any existing data), without closing and reopening:

with open(filename, "r+") as f:
    data = f.read()
    f.seek(0)
    f.write(output)
    f.truncate()

回答 1

r+是同时读取和写入的规范模式。这与使用fopen()系统调用没有什么不同,因为file()/ open()只是围绕此操作系统调用的一个小包装。

r+ is the canonical mode for reading and writing at the same time. This is not different from using the fopen() system call since file() / open() is just a tiny wrapper around this operating system call.


回答 2

总结I / O行为

|          Mode          |  r   |  r+  |  w   |  w+  |  a   |  a+  |
| :--------------------: | :--: | :--: | :--: | :--: | :--: | :--: |
|          Read          |  +   |  +   |      |  +   |      |  +   |
|         Write          |      |  +   |  +   |  +   |  +   |  +   |
|         Create         |      |      |  +   |  +   |  +   |  +   |
|         Cover          |      |      |  +   |  +   |      |      |
| Point in the beginning |  +   |  +   |  +   |  +   |      |      |
|    Point in the end    |      |      |      |      |  +   |  +   |

和决策分支

Summarize the I/O behaviors

|          Mode          |  r   |  r+  |  w   |  w+  |  a   |  a+  |
| :--------------------: | :--: | :--: | :--: | :--: | :--: | :--: |
|          Read          |  +   |  +   |      |  +   |      |  +   |
|         Write          |      |  +   |  +   |  +   |  +   |  +   |
|         Create         |      |      |  +   |  +   |  +   |  +   |
|         Cover          |      |      |  +   |  +   |      |      |
| Point in the beginning |  +   |  +   |  +   |  +   |      |      |
|    Point in the end    |      |      |      |      |  +   |  +   |

and the decision branch


回答 3

我已经尝试过类似的方法,并且可以按预期工作:

f = open("c:\\log.log", 'r+b')
f.write("\x5F\x9D\x3E")
f.read(100)
f.close()

哪里:

f.read(size)-要读取文件的内容,请调用f.read(size),它将读取一定数量的数据并将其作为字符串返回。

和:

f.write(string)将string的内容写入文件,返回None。

另外,如果您打开有关读写文件的Python教程,则会发现:

“ r +”打开文件以供读取和写入。

在Windows上,附加到模式的’b’以二进制模式打开文件,因此也有’rb’,’wb’和’r + b’之类的模式。

I have tried something like this and it works as expected:

f = open("c:\\log.log", 'r+b')
f.write("\x5F\x9D\x3E")
f.read(100)
f.close()

Where:

f.read(size) – To read a file’s contents, call f.read(size), which reads some quantity of data and returns it as a string.

And:

f.write(string) writes the contents of string to the file, returning None.

Also if you open Python tutorial about reading and writing files you will find that:

‘r+’ opens the file for both reading and writing.

On Windows, ‘b’ appended to the mode opens the file in binary mode, so there are also modes like ‘rb’, ‘wb’, and ‘r+b’.


如何使用open with语句打开文件

问题:如何使用open with语句打开文件

我正在研究如何在Python中进行文件输入和输出。我编写了以下代码,以将文件列表中的名称列表(每行一个)读入另一个文件中,同时对照文件中的名称检查名称并将文本附加到文件中。该代码有效。可以做得更好吗?

我想对with open(...输入文件和输出文件都使用该语句,但是看不到它们如何位于同一块中,这意味着我需要将名称存储在一个临时位置。

def filter(txt, oldfile, newfile):
    '''\
    Read a list of names from a file line by line into an output file.
    If a line begins with a particular name, insert a string of text
    after the name before appending the line to the output file.
    '''

    outfile = open(newfile, 'w')
    with open(oldfile, 'r', encoding='utf-8') as infile:
        for line in infile:
            if line.startswith(txt):
                line = line[0:len(txt)] + ' - Truly a great person!\n'
            outfile.write(line)

    outfile.close()
    return # Do I gain anything by including this?

# input the name you want to check against
text = input('Please enter the name of a great person: ')    
letsgo = filter(text,'Spanish', 'Spanish2')

I’m looking at how to do file input and output in Python. I’ve written the following code to read a list of names (one per line) from a file into another file while checking a name against the names in the file and appending text to the occurrences in the file. The code works. Could it be done better?

I’d wanted to use the with open(... statement for both input and output files but can’t see how they could be in the same block meaning I’d need to store the names in a temporary location.

def filter(txt, oldfile, newfile):
    '''\
    Read a list of names from a file line by line into an output file.
    If a line begins with a particular name, insert a string of text
    after the name before appending the line to the output file.
    '''

    outfile = open(newfile, 'w')
    with open(oldfile, 'r', encoding='utf-8') as infile:
        for line in infile:
            if line.startswith(txt):
                line = line[0:len(txt)] + ' - Truly a great person!\n'
            outfile.write(line)

    outfile.close()
    return # Do I gain anything by including this?

# input the name you want to check against
text = input('Please enter the name of a great person: ')    
letsgo = filter(text,'Spanish', 'Spanish2')

回答 0

Python允许将多个open()语句放在一个语句中with。您用逗号分隔。您的代码将是:

def filter(txt, oldfile, newfile):
    '''\
    Read a list of names from a file line by line into an output file.
    If a line begins with a particular name, insert a string of text
    after the name before appending the line to the output file.
    '''

    with open(newfile, 'w') as outfile, open(oldfile, 'r', encoding='utf-8') as infile:
        for line in infile:
            if line.startswith(txt):
                line = line[0:len(txt)] + ' - Truly a great person!\n'
            outfile.write(line)

# input the name you want to check against
text = input('Please enter the name of a great person: ')    
letsgo = filter(text,'Spanish', 'Spanish2')

不,通过return在函数的末尾放置一个显式字符不会获得任何收益。您可以使用return提前退出,但最后要退出,并且该函数将在没有退出的情况下退出。(当然,对于返回值的函数,您可以使用return来指定要返回的值。)

引入该语句时,Python 2.5 或Python 2.6 不支持open()与一起使用多个项目,但Python 2.7和Python 3.1或更高版本with支持使用多个项目with

http://docs.python.org/reference/compound_stmts.html#the-with-statement http://docs.python.org/release/3.1/reference/compound_stmts.html#the-with-statement

如果要编写必须在Python 2.5、2.6或3.0中运行的代码,则将with语句嵌套为建议的其他答案或使用contextlib.nested

Python allows putting multiple open() statements in a single with. You comma-separate them. Your code would then be:

def filter(txt, oldfile, newfile):
    '''\
    Read a list of names from a file line by line into an output file.
    If a line begins with a particular name, insert a string of text
    after the name before appending the line to the output file.
    '''

    with open(newfile, 'w') as outfile, open(oldfile, 'r', encoding='utf-8') as infile:
        for line in infile:
            if line.startswith(txt):
                line = line[0:len(txt)] + ' - Truly a great person!\n'
            outfile.write(line)

# input the name you want to check against
text = input('Please enter the name of a great person: ')    
letsgo = filter(text,'Spanish', 'Spanish2')

And no, you don’t gain anything by putting an explicit return at the end of your function. You can use return to exit early, but you had it at the end, and the function will exit without it. (Of course with functions that return a value, you use the return to specify the value to return.)

Using multiple open() items with with was not supported in Python 2.5 when the with statement was introduced, or in Python 2.6, but it is supported in Python 2.7 and Python 3.1 or newer.

http://docs.python.org/reference/compound_stmts.html#the-with-statement http://docs.python.org/release/3.1/reference/compound_stmts.html#the-with-statement

If you are writing code that must run in Python 2.5, 2.6 or 3.0, nest the with statements as the other answers suggested or use contextlib.nested.


回答 1

这样使用嵌套块

with open(newfile, 'w') as outfile:
    with open(oldfile, 'r', encoding='utf-8') as infile:
        # your logic goes right here

Use nested blocks like this,

with open(newfile, 'w') as outfile:
    with open(oldfile, 'r', encoding='utf-8') as infile:
        # your logic goes right here

回答 2

您可以将块嵌套。像这样:

with open(newfile, 'w') as outfile:
    with open(oldfile, 'r', encoding='utf-8') as infile:
        for line in infile:
            if line.startswith(txt):
                line = line[0:len(txt)] + ' - Truly a great person!\n'
            outfile.write(line)

这比您的版本更好,因为outfile即使您的代码遇到异常,您也可以保证将其关闭。显然,您可以通过try / finally进行操作,但这with是正确的方法。

或者,正如我刚学到的,您可以在@steveha描述的with语句中包含多个上下文管理器。在我看来,这比嵌套是更好的选择。

对于您的最后一个小问题,退货没有实际目的。我会删除它。

You can nest your with blocks. Like this:

with open(newfile, 'w') as outfile:
    with open(oldfile, 'r', encoding='utf-8') as infile:
        for line in infile:
            if line.startswith(txt):
                line = line[0:len(txt)] + ' - Truly a great person!\n'
            outfile.write(line)

This is better than your version because you guarantee that outfile will be closed even if your code encounters exceptions. Obviously you could do that with try/finally, but with is the right way to do this.

Or, as I have just learnt, you can have multiple context managers in a with statement as described by @steveha. That seems to me to be a better option than nesting.

And for your final minor question, the return serves no real purpose. I would remove it.


回答 3

有时,您可能想打开不同数量的文件,并对待每个文件相同,可以使用 contextlib

from contextlib import ExitStack
filenames = [file1.txt, file2.txt, file3.txt]

with open('outfile.txt', 'a') as outfile:
    with ExitStack() as stack:
        file_pointers = [stack.enter_context(open(file, 'r')) for file in filenames]                
            for fp in file_pointers:
                outfile.write(fp.read())                   

Sometimes, you might want to open a variable amount of files and treat each one the same, you can do this with contextlib

from contextlib import ExitStack
filenames = [file1.txt, file2.txt, file3.txt]

with open('outfile.txt', 'a') as outfile:
    with ExitStack() as stack:
        file_pointers = [stack.enter_context(open(file, 'r')) for file in filenames]                
            for fp in file_pointers:
                outfile.write(fp.read())                   

使用Python获取文件的最后n行,类似于tail

问题:使用Python获取文件的最后n行,类似于tail

我正在为Web应用程序编写日志文件查看器,为此,我想在日志文件的各行中进行分页。文件中的项目是基于行的,底部是最新的项目。

因此,我需要一种tail()可以n从底部读取行并支持偏移量的方法。我想到的是这样的:

def tail(f, n, offset=0):
    """Reads a n lines from f with an offset of offset lines."""
    avg_line_length = 74
    to_read = n + offset
    while 1:
        try:
            f.seek(-(avg_line_length * to_read), 2)
        except IOError:
            # woops.  apparently file is smaller than what we want
            # to step back, go to the beginning instead
            f.seek(0)
        pos = f.tell()
        lines = f.read().splitlines()
        if len(lines) >= to_read or pos == 0:
            return lines[-to_read:offset and -offset or None]
        avg_line_length *= 1.3

这是合理的方法吗?推荐使用偏移量尾部日志文件的推荐方法是什么?

I’m writing a log file viewer for a web application and for that I want to paginate through the lines of the log file. The items in the file are line based with the newest item on the bottom.

So I need a tail() method that can read n lines from the bottom and supports an offset. What I came up with looks like this:

def tail(f, n, offset=0):
    """Reads a n lines from f with an offset of offset lines."""
    avg_line_length = 74
    to_read = n + offset
    while 1:
        try:
            f.seek(-(avg_line_length * to_read), 2)
        except IOError:
            # woops.  apparently file is smaller than what we want
            # to step back, go to the beginning instead
            f.seek(0)
        pos = f.tell()
        lines = f.read().splitlines()
        if len(lines) >= to_read or pos == 0:
            return lines[-to_read:offset and -offset or None]
        avg_line_length *= 1.3

Is this a reasonable approach? What is the recommended way to tail log files with offsets?


回答 0

这可能比您的要快。不假设行长。一次返回一个文件块,直到找到正确数量的’\ n’字符为止。

def tail( f, lines=20 ):
    total_lines_wanted = lines

    BLOCK_SIZE = 1024
    f.seek(0, 2)
    block_end_byte = f.tell()
    lines_to_go = total_lines_wanted
    block_number = -1
    blocks = [] # blocks of size BLOCK_SIZE, in reverse order starting
                # from the end of the file
    while lines_to_go > 0 and block_end_byte > 0:
        if (block_end_byte - BLOCK_SIZE > 0):
            # read the last block we haven't yet read
            f.seek(block_number*BLOCK_SIZE, 2)
            blocks.append(f.read(BLOCK_SIZE))
        else:
            # file too small, start from begining
            f.seek(0,0)
            # only read what was not read
            blocks.append(f.read(block_end_byte))
        lines_found = blocks[-1].count('\n')
        lines_to_go -= lines_found
        block_end_byte -= BLOCK_SIZE
        block_number -= 1
    all_read_text = ''.join(reversed(blocks))
    return '\n'.join(all_read_text.splitlines()[-total_lines_wanted:])

我不喜欢关于行长的棘手假设,实际上,您永远都不知道那样的事情。

通常,这将在循环的第一遍或第二遍中定位最后20行。如果您的74个字符实际上是准确的,则将块大小设置为2048,并且几乎立即尾随20行。

另外,我不会消耗大量的大脑卡路里来尝试与物理OS块进行精确对齐。使用这些高级I / O程序包,我怀疑您会发现尝试在OS块边界上对齐会对性能产生任何影响。如果使用较低级别的I / O,则可能会看到加速。


更新

对于Python 3.2及更高版本,请按照字节上的说明进行操作,如在文本文件中(那些在模式字符串中打开而没有“ b”的文件),则仅允许相对于文件开头的查找(exceptions是查找到文件末尾)与seek(0,2)).:

例如: f = open('C:/.../../apache_logs.txt', 'rb')

 def tail(f, lines=20):
    total_lines_wanted = lines

    BLOCK_SIZE = 1024
    f.seek(0, 2)
    block_end_byte = f.tell()
    lines_to_go = total_lines_wanted
    block_number = -1
    blocks = []
    while lines_to_go > 0 and block_end_byte > 0:
        if (block_end_byte - BLOCK_SIZE > 0):
            f.seek(block_number*BLOCK_SIZE, 2)
            blocks.append(f.read(BLOCK_SIZE))
        else:
            f.seek(0,0)
            blocks.append(f.read(block_end_byte))
        lines_found = blocks[-1].count(b'\n')
        lines_to_go -= lines_found
        block_end_byte -= BLOCK_SIZE
        block_number -= 1
    all_read_text = b''.join(reversed(blocks))
    return b'\n'.join(all_read_text.splitlines()[-total_lines_wanted:])

This may be quicker than yours. Makes no assumptions about line length. Backs through the file one block at a time till it’s found the right number of ‘\n’ characters.

def tail( f, lines=20 ):
    total_lines_wanted = lines

    BLOCK_SIZE = 1024
    f.seek(0, 2)
    block_end_byte = f.tell()
    lines_to_go = total_lines_wanted
    block_number = -1
    blocks = [] # blocks of size BLOCK_SIZE, in reverse order starting
                # from the end of the file
    while lines_to_go > 0 and block_end_byte > 0:
        if (block_end_byte - BLOCK_SIZE > 0):
            # read the last block we haven't yet read
            f.seek(block_number*BLOCK_SIZE, 2)
            blocks.append(f.read(BLOCK_SIZE))
        else:
            # file too small, start from begining
            f.seek(0,0)
            # only read what was not read
            blocks.append(f.read(block_end_byte))
        lines_found = blocks[-1].count('\n')
        lines_to_go -= lines_found
        block_end_byte -= BLOCK_SIZE
        block_number -= 1
    all_read_text = ''.join(reversed(blocks))
    return '\n'.join(all_read_text.splitlines()[-total_lines_wanted:])

I don’t like tricky assumptions about line length when — as a practical matter — you can never know things like that.

Generally, this will locate the last 20 lines on the first or second pass through the loop. If your 74 character thing is actually accurate, you make the block size 2048 and you’ll tail 20 lines almost immediately.

Also, I don’t burn a lot of brain calories trying to finesse alignment with physical OS blocks. Using these high-level I/O packages, I doubt you’ll see any performance consequence of trying to align on OS block boundaries. If you use lower-level I/O, then you might see a speedup.


UPDATE

for Python 3.2 and up, follow the process on bytes as In text files (those opened without a “b” in the mode string), only seeks relative to the beginning of the file are allowed (the exception being seeking to the very file end with seek(0, 2)).:

eg: f = open('C:/.../../apache_logs.txt', 'rb')

 def tail(f, lines=20):
    total_lines_wanted = lines

    BLOCK_SIZE = 1024
    f.seek(0, 2)
    block_end_byte = f.tell()
    lines_to_go = total_lines_wanted
    block_number = -1
    blocks = []
    while lines_to_go > 0 and block_end_byte > 0:
        if (block_end_byte - BLOCK_SIZE > 0):
            f.seek(block_number*BLOCK_SIZE, 2)
            blocks.append(f.read(BLOCK_SIZE))
        else:
            f.seek(0,0)
            blocks.append(f.read(block_end_byte))
        lines_found = blocks[-1].count(b'\n')
        lines_to_go -= lines_found
        block_end_byte -= BLOCK_SIZE
        block_number -= 1
    all_read_text = b''.join(reversed(blocks))
    return b'\n'.join(all_read_text.splitlines()[-total_lines_wanted:])

回答 1

假设您可以在Python 2上使用类似unix的系统:

import os
def tail(f, n, offset=0):
  stdin,stdout = os.popen2("tail -n "+n+offset+" "+f)
  stdin.close()
  lines = stdout.readlines(); stdout.close()
  return lines[:,-offset]

对于python 3,您可以执行以下操作:

import subprocess
def tail(f, n, offset=0):
    proc = subprocess.Popen(['tail', '-n', n + offset, f], stdout=subprocess.PIPE)
    lines = proc.stdout.readlines()
    return lines[:, -offset]

Assumes a unix-like system on Python 2 you can do:

import os
def tail(f, n, offset=0):
  stdin,stdout = os.popen2("tail -n "+n+offset+" "+f)
  stdin.close()
  lines = stdout.readlines(); stdout.close()
  return lines[:,-offset]

For python 3 you may do:

import subprocess
def tail(f, n, offset=0):
    proc = subprocess.Popen(['tail', '-n', n + offset, f], stdout=subprocess.PIPE)
    lines = proc.stdout.readlines()
    return lines[:, -offset]

回答 2

这是我的答案。纯Python。使用timeit似乎非常快。拖尾具有100,000行的日志文件的100行:

>>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=10)
0.0014600753784179688
>>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=100)
0.00899195671081543
>>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=1000)
0.05842900276184082
>>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=10000)
0.5394978523254395
>>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=100000)
5.377126932144165

这是代码:

import os


def tail(f, lines=1, _buffer=4098):
    """Tail a file and get X lines from the end"""
    # place holder for the lines found
    lines_found = []

    # block counter will be multiplied by buffer
    # to get the block size from the end
    block_counter = -1

    # loop until we find X lines
    while len(lines_found) < lines:
        try:
            f.seek(block_counter * _buffer, os.SEEK_END)
        except IOError:  # either file is too small, or too many lines requested
            f.seek(0)
            lines_found = f.readlines()
            break

        lines_found = f.readlines()

        # we found enough lines, get out
        # Removed this line because it was redundant the while will catch
        # it, I left it for history
        # if len(lines_found) > lines:
        #    break

        # decrement the block counter to get the
        # next X bytes
        block_counter -= 1

    return lines_found[-lines:]

Here is my answer. Pure python. Using timeit it seems pretty fast. Tailing 100 lines of a log file that has 100,000 lines:

>>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=10)
0.0014600753784179688
>>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=100)
0.00899195671081543
>>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=1000)
0.05842900276184082
>>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=10000)
0.5394978523254395
>>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=100000)
5.377126932144165

Here is the code:

import os


def tail(f, lines=1, _buffer=4098):
    """Tail a file and get X lines from the end"""
    # place holder for the lines found
    lines_found = []

    # block counter will be multiplied by buffer
    # to get the block size from the end
    block_counter = -1

    # loop until we find X lines
    while len(lines_found) < lines:
        try:
            f.seek(block_counter * _buffer, os.SEEK_END)
        except IOError:  # either file is too small, or too many lines requested
            f.seek(0)
            lines_found = f.readlines()
            break

        lines_found = f.readlines()

        # we found enough lines, get out
        # Removed this line because it was redundant the while will catch
        # it, I left it for history
        # if len(lines_found) > lines:
        #    break

        # decrement the block counter to get the
        # next X bytes
        block_counter -= 1

    return lines_found[-lines:]

回答 3

如果可以读取整个文件,请使用双端队列。

from collections import deque
deque(f, maxlen=n)

在2.6之前,双端队列没有maxlen选项,但实施起来很容易。

import itertools
def maxque(items, size):
    items = iter(items)
    q = deque(itertools.islice(items, size))
    for item in items:
        del q[0]
        q.append(item)
    return q

如果需要从头开始读取文件,请使用疾驰(也就是指数)搜索。

def tail(f, n):
    assert n >= 0
    pos, lines = n+1, []
    while len(lines) <= n:
        try:
            f.seek(-pos, 2)
        except IOError:
            f.seek(0)
            break
        finally:
            lines = list(f)
        pos *= 2
    return lines[-n:]

If reading the whole file is acceptable then use a deque.

from collections import deque
deque(f, maxlen=n)

Prior to 2.6, deques didn’t have a maxlen option, but it’s easy enough to implement.

import itertools
def maxque(items, size):
    items = iter(items)
    q = deque(itertools.islice(items, size))
    for item in items:
        del q[0]
        q.append(item)
    return q

If it’s a requirement to read the file from the end, then use a gallop (a.k.a exponential) search.

def tail(f, n):
    assert n >= 0
    pos, lines = n+1, []
    while len(lines) <= n:
        try:
            f.seek(-pos, 2)
        except IOError:
            f.seek(0)
            break
        finally:
            lines = list(f)
        pos *= 2
    return lines[-n:]

回答 4

S.Lott的上述回答几乎对我有用,但最终给了我一些局限性。事实证明,它破坏了块边界上的数据,因为数据以相反的顺序保存读取的块。调用”.join(data)时,块顺序错误。这样可以解决此问题。

def tail(f, window=20):
    """
    Returns the last `window` lines of file `f` as a list.
    f - a byte file-like object
    """
    if window == 0:
        return []
    BUFSIZ = 1024
    f.seek(0, 2)
    bytes = f.tell()
    size = window + 1
    block = -1
    data = []
    while size > 0 and bytes > 0:
        if bytes - BUFSIZ > 0:
            # Seek back one whole BUFSIZ
            f.seek(block * BUFSIZ, 2)
            # read BUFFER
            data.insert(0, f.read(BUFSIZ))
        else:
            # file too small, start from begining
            f.seek(0,0)
            # only read what was not read
            data.insert(0, f.read(bytes))
        linesFound = data[0].count('\n')
        size -= linesFound
        bytes -= BUFSIZ
        block -= 1
    return ''.join(data).splitlines()[-window:]

S.Lott’s answer above almost works for me but ends up giving me partial lines. It turns out that it corrupts data on block boundaries because data holds the read blocks in reversed order. When ”.join(data) is called, the blocks are in the wrong order. This fixes that.

def tail(f, window=20):
    """
    Returns the last `window` lines of file `f` as a list.
    f - a byte file-like object
    """
    if window == 0:
        return []
    BUFSIZ = 1024
    f.seek(0, 2)
    bytes = f.tell()
    size = window + 1
    block = -1
    data = []
    while size > 0 and bytes > 0:
        if bytes - BUFSIZ > 0:
            # Seek back one whole BUFSIZ
            f.seek(block * BUFSIZ, 2)
            # read BUFFER
            data.insert(0, f.read(BUFSIZ))
        else:
            # file too small, start from begining
            f.seek(0,0)
            # only read what was not read
            data.insert(0, f.read(bytes))
        linesFound = data[0].count('\n')
        size -= linesFound
        bytes -= BUFSIZ
        block -= 1
    return ''.join(data).splitlines()[-window:]

回答 5

我最终使用的代码。我认为这是迄今为止最好的:

def tail(f, n, offset=None):
    """Reads a n lines from f with an offset of offset lines.  The return
    value is a tuple in the form ``(lines, has_more)`` where `has_more` is
    an indicator that is `True` if there are more lines in the file.
    """
    avg_line_length = 74
    to_read = n + (offset or 0)

    while 1:
        try:
            f.seek(-(avg_line_length * to_read), 2)
        except IOError:
            # woops.  apparently file is smaller than what we want
            # to step back, go to the beginning instead
            f.seek(0)
        pos = f.tell()
        lines = f.read().splitlines()
        if len(lines) >= to_read or pos == 0:
            return lines[-to_read:offset and -offset or None], \
                   len(lines) > to_read or pos > 0
        avg_line_length *= 1.3

The code I ended up using. I think this is the best so far:

def tail(f, n, offset=None):
    """Reads a n lines from f with an offset of offset lines.  The return
    value is a tuple in the form ``(lines, has_more)`` where `has_more` is
    an indicator that is `True` if there are more lines in the file.
    """
    avg_line_length = 74
    to_read = n + (offset or 0)

    while 1:
        try:
            f.seek(-(avg_line_length * to_read), 2)
        except IOError:
            # woops.  apparently file is smaller than what we want
            # to step back, go to the beginning instead
            f.seek(0)
        pos = f.tell()
        lines = f.read().splitlines()
        if len(lines) >= to_read or pos == 0:
            return lines[-to_read:offset and -offset or None], \
                   len(lines) > to_read or pos > 0
        avg_line_length *= 1.3

回答 6

mmap的简单快速解决方案:

import mmap
import os

def tail(filename, n):
    """Returns last n lines from the filename. No exception handling"""
    size = os.path.getsize(filename)
    with open(filename, "rb") as f:
        # for Windows the mmap parameters are different
        fm = mmap.mmap(f.fileno(), 0, mmap.MAP_SHARED, mmap.PROT_READ)
        try:
            for i in xrange(size - 1, -1, -1):
                if fm[i] == '\n':
                    n -= 1
                    if n == -1:
                        break
            return fm[i + 1 if i else 0:].splitlines()
        finally:
            fm.close()

Simple and fast solution with mmap:

import mmap
import os

def tail(filename, n):
    """Returns last n lines from the filename. No exception handling"""
    size = os.path.getsize(filename)
    with open(filename, "rb") as f:
        # for Windows the mmap parameters are different
        fm = mmap.mmap(f.fileno(), 0, mmap.MAP_SHARED, mmap.PROT_READ)
        try:
            for i in xrange(size - 1, -1, -1):
                if fm[i] == '\n':
                    n -= 1
                    if n == -1:
                        break
            return fm[i + 1 if i else 0:].splitlines()
        finally:
            fm.close()

回答 7

甚至更清洁的python3兼容版本,不会插入但会附加和反向:

def tail(f, window=1):
    """
    Returns the last `window` lines of file `f` as a list of bytes.
    """
    if window == 0:
        return b''
    BUFSIZE = 1024
    f.seek(0, 2)
    end = f.tell()
    nlines = window + 1
    data = []
    while nlines > 0 and end > 0:
        i = max(0, end - BUFSIZE)
        nread = min(end, BUFSIZE)

        f.seek(i)
        chunk = f.read(nread)
        data.append(chunk)
        nlines -= chunk.count(b'\n')
        end -= nread
    return b'\n'.join(b''.join(reversed(data)).splitlines()[-window:])

像这样使用它:

with open(path, 'rb') as f:
    last_lines = tail(f, 3).decode('utf-8')

An even cleaner python3 compatible version that doesn’t insert but appends & reverses:

def tail(f, window=1):
    """
    Returns the last `window` lines of file `f` as a list of bytes.
    """
    if window == 0:
        return b''
    BUFSIZE = 1024
    f.seek(0, 2)
    end = f.tell()
    nlines = window + 1
    data = []
    while nlines > 0 and end > 0:
        i = max(0, end - BUFSIZE)
        nread = min(end, BUFSIZE)

        f.seek(i)
        chunk = f.read(nread)
        data.append(chunk)
        nlines -= chunk.count(b'\n')
        end -= nread
    return b'\n'.join(b''.join(reversed(data)).splitlines()[-window:])

use it like this:

with open(path, 'rb') as f:
    last_lines = tail(f, 3).decode('utf-8')

回答 8

将@papercrane解决方案更新为python3。使用open(filename, 'rb')和打开文件:

def tail(f, window=20):
    """Returns the last `window` lines of file `f` as a list.
    """
    if window == 0:
        return []

    BUFSIZ = 1024
    f.seek(0, 2)
    remaining_bytes = f.tell()
    size = window + 1
    block = -1
    data = []

    while size > 0 and remaining_bytes > 0:
        if remaining_bytes - BUFSIZ > 0:
            # Seek back one whole BUFSIZ
            f.seek(block * BUFSIZ, 2)
            # read BUFFER
            bunch = f.read(BUFSIZ)
        else:
            # file too small, start from beginning
            f.seek(0, 0)
            # only read what was not read
            bunch = f.read(remaining_bytes)

        bunch = bunch.decode('utf-8')
        data.insert(0, bunch)
        size -= bunch.count('\n')
        remaining_bytes -= BUFSIZ
        block -= 1

    return ''.join(data).splitlines()[-window:]

Update @papercrane solution to python3. Open the file with open(filename, 'rb') and:

def tail(f, window=20):
    """Returns the last `window` lines of file `f` as a list.
    """
    if window == 0:
        return []

    BUFSIZ = 1024
    f.seek(0, 2)
    remaining_bytes = f.tell()
    size = window + 1
    block = -1
    data = []

    while size > 0 and remaining_bytes > 0:
        if remaining_bytes - BUFSIZ > 0:
            # Seek back one whole BUFSIZ
            f.seek(block * BUFSIZ, 2)
            # read BUFFER
            bunch = f.read(BUFSIZ)
        else:
            # file too small, start from beginning
            f.seek(0, 0)
            # only read what was not read
            bunch = f.read(remaining_bytes)

        bunch = bunch.decode('utf-8')
        data.insert(0, bunch)
        size -= bunch.count('\n')
        remaining_bytes -= BUFSIZ
        block -= 1

    return ''.join(data).splitlines()[-window:]

回答 9

根据评论者的要求,将答案发布到我对类似问题的回答上,其中使用相同的技术来改变文件的最后一行,而不仅仅是获取文件。

对于很大的文件,这mmap是最好的方法。为了改善现有mmap答案,此版本可在Windows和Linux之间移植,并且运行速度应更快(尽管如果不对文件大小在GB范围内的32位Python进行一些修改,它将无法正常运行,请参见其他答案以获取处理此问题的提示) ,并进行修改以在Python 2上运行)。

import io  # Gets consistent version of open for both Py2.7 and Py3.x
import itertools
import mmap

def skip_back_lines(mm, numlines, startidx):
    '''Factored out to simplify handling of n and offset'''
    for _ in itertools.repeat(None, numlines):
        startidx = mm.rfind(b'\n', 0, startidx)
        if startidx < 0:
            break
    return startidx

def tail(f, n, offset=0):
    # Reopen file in binary mode
    with io.open(f.name, 'rb') as binf, mmap.mmap(binf.fileno(), 0, access=mmap.ACCESS_READ) as mm:
        # len(mm) - 1 handles files ending w/newline by getting the prior line
        startofline = skip_back_lines(mm, offset, len(mm) - 1)
        if startofline < 0:
            return []  # Offset lines consumed whole file, nothing to return
            # If using a generator function (yield-ing, see below),
            # this should be a plain return, no empty list

        endoflines = startofline + 1  # Slice end to omit offset lines

        # Find start of lines to capture (add 1 to move from newline to beginning of following line)
        startofline = skip_back_lines(mm, n, startofline) + 1

        # Passing True to splitlines makes it return the list of lines without
        # removing the trailing newline (if any), so list mimics f.readlines()
        return mm[startofline:endoflines].splitlines(True)
        # If Windows style \r\n newlines need to be normalized to \n, and input
        # is ASCII compatible, can normalize newlines with:
        # return mm[startofline:endoflines].replace(os.linesep.encode('ascii'), b'\n').splitlines(True)

假设尾行的数目足够小,您可以安全地一次将它们全部读取到内存中;您还可以将其作为生成器函数,并通过将最后一行替换为以下内容来手动读取一行:

        mm.seek(startofline)
        # Call mm.readline n times, or until EOF, whichever comes first
        # Python 3.2 and earlier:
        for line in itertools.islice(iter(mm.readline, b''), n):
            yield line

        # 3.3+:
        yield from itertools.islice(iter(mm.readline, b''), n)

最后,以二进制模式(必须使用mmap)进行读取,因此得到str行(Py2)和bytes行(Py3);如果您想要unicode(Py2)或str(Py3),则可以调整迭代方法为您解码和/或修复换行符:

        lines = itertools.islice(iter(mm.readline, b''), n)
        if f.encoding:  # Decode if the passed file was opened with a specific encoding
            lines = (line.decode(f.encoding) for line in lines)
        if 'b' not in f.mode:  # Fix line breaks if passed file opened in text mode
            lines = (line.replace(os.linesep, '\n') for line in lines)
        # Python 3.2 and earlier:
        for line in lines:
            yield line
        # 3.3+:
        yield from lines

注意:我在无法访问Python进行测试的机器上输入了所有内容。如果我有错字,请告诉我;这与我认为应该起作用的其他答案足够相似,但是这些调整(例如,处理)可能会导致细微的错误。如果有任何错误,请在评论中让我知道。offset

Posting an answer at the behest of commenters on my answer to a similar question where the same technique was used to mutate the last line of a file, not just get it.

For a file of significant size, mmap is the best way to do this. To improve on the existing mmap answer, this version is portable between Windows and Linux, and should run faster (though it won’t work without some modifications on 32 bit Python with files in the GB range, see the other answer for hints on handling this, and for modifying to work on Python 2).

import io  # Gets consistent version of open for both Py2.7 and Py3.x
import itertools
import mmap

def skip_back_lines(mm, numlines, startidx):
    '''Factored out to simplify handling of n and offset'''
    for _ in itertools.repeat(None, numlines):
        startidx = mm.rfind(b'\n', 0, startidx)
        if startidx < 0:
            break
    return startidx

def tail(f, n, offset=0):
    # Reopen file in binary mode
    with io.open(f.name, 'rb') as binf, mmap.mmap(binf.fileno(), 0, access=mmap.ACCESS_READ) as mm:
        # len(mm) - 1 handles files ending w/newline by getting the prior line
        startofline = skip_back_lines(mm, offset, len(mm) - 1)
        if startofline < 0:
            return []  # Offset lines consumed whole file, nothing to return
            # If using a generator function (yield-ing, see below),
            # this should be a plain return, no empty list

        endoflines = startofline + 1  # Slice end to omit offset lines

        # Find start of lines to capture (add 1 to move from newline to beginning of following line)
        startofline = skip_back_lines(mm, n, startofline) + 1

        # Passing True to splitlines makes it return the list of lines without
        # removing the trailing newline (if any), so list mimics f.readlines()
        return mm[startofline:endoflines].splitlines(True)
        # If Windows style \r\n newlines need to be normalized to \n, and input
        # is ASCII compatible, can normalize newlines with:
        # return mm[startofline:endoflines].replace(os.linesep.encode('ascii'), b'\n').splitlines(True)

This assumes the number of lines tailed is small enough you can safely read them all into memory at once; you could also make this a generator function and manually read a line at a time by replacing the final line with:

        mm.seek(startofline)
        # Call mm.readline n times, or until EOF, whichever comes first
        # Python 3.2 and earlier:
        for line in itertools.islice(iter(mm.readline, b''), n):
            yield line

        # 3.3+:
        yield from itertools.islice(iter(mm.readline, b''), n)

Lastly, this read in binary mode (necessary to use mmap) so it gives str lines (Py2) and bytes lines (Py3); if you want unicode (Py2) or str (Py3), the iterative approach could be tweaked to decode for you and/or fix newlines:

        lines = itertools.islice(iter(mm.readline, b''), n)
        if f.encoding:  # Decode if the passed file was opened with a specific encoding
            lines = (line.decode(f.encoding) for line in lines)
        if 'b' not in f.mode:  # Fix line breaks if passed file opened in text mode
            lines = (line.replace(os.linesep, '\n') for line in lines)
        # Python 3.2 and earlier:
        for line in lines:
            yield line
        # 3.3+:
        yield from lines

Note: I typed this all up on a machine where I lack access to Python to test. Please let me know if I typoed anything; this was similar enough to my other answer that I think it should work, but the tweaks (e.g. handling an offset) could lead to subtle errors. Please let me know in the comments if there are any mistakes.


回答 10

我发现上面的Popen是最好的解决方案。它既快速又肮脏,并且适用于Unix机器上的python 2.6,我使用了以下命令

def GetLastNLines(self, n, fileName):
    """
    Name:           Get LastNLines
    Description:        Gets last n lines using Unix tail
    Output:         returns last n lines of a file
    Keyword argument:
    n -- number of last lines to return
    filename -- Name of the file you need to tail into
    """
    p = subprocess.Popen(['tail','-n',str(n),self.__fileName], stdout=subprocess.PIPE)
    soutput, sinput = p.communicate()
    return soutput

soutput将包含代码的最后n行。逐行遍历soutput:

for line in GetLastNLines(50,'myfile.log').split('\n'):
    print line

I found the Popen above to be the best solution. It’s quick and dirty and it works For python 2.6 on Unix machine i used the following

def GetLastNLines(self, n, fileName):
    """
    Name:           Get LastNLines
    Description:        Gets last n lines using Unix tail
    Output:         returns last n lines of a file
    Keyword argument:
    n -- number of last lines to return
    filename -- Name of the file you need to tail into
    """
    p = subprocess.Popen(['tail','-n',str(n),self.__fileName], stdout=subprocess.PIPE)
    soutput, sinput = p.communicate()
    return soutput

soutput will have will contain last n lines of the code. to iterate through soutput line by line do:

for line in GetLastNLines(50,'myfile.log').split('\n'):
    print line

回答 11

基于S.Lott的最高票选答案(08年9月25日在21:43),但针对小文件而固定。

def tail(the_file, lines_2find=20):  
    the_file.seek(0, 2)                         #go to end of file
    bytes_in_file = the_file.tell()             
    lines_found, total_bytes_scanned = 0, 0
    while lines_2find+1 > lines_found and bytes_in_file > total_bytes_scanned: 
        byte_block = min(1024, bytes_in_file-total_bytes_scanned)
        the_file.seek(-(byte_block+total_bytes_scanned), 2)
        total_bytes_scanned += byte_block
        lines_found += the_file.read(1024).count('\n')
    the_file.seek(-total_bytes_scanned, 2)
    line_list = list(the_file.readlines())
    return line_list[-lines_2find:]

    #we read at least 21 line breaks from the bottom, block by block for speed
    #21 to ensure we don't get a half line

希望这是有用的。

based on S.Lott’s top voted answer (Sep 25 ’08 at 21:43), but fixed for small files.

def tail(the_file, lines_2find=20):  
    the_file.seek(0, 2)                         #go to end of file
    bytes_in_file = the_file.tell()             
    lines_found, total_bytes_scanned = 0, 0
    while lines_2find+1 > lines_found and bytes_in_file > total_bytes_scanned: 
        byte_block = min(1024, bytes_in_file-total_bytes_scanned)
        the_file.seek(-(byte_block+total_bytes_scanned), 2)
        total_bytes_scanned += byte_block
        lines_found += the_file.read(1024).count('\n')
    the_file.seek(-total_bytes_scanned, 2)
    line_list = list(the_file.readlines())
    return line_list[-lines_2find:]

    #we read at least 21 line breaks from the bottom, block by block for speed
    #21 to ensure we don't get a half line

Hope this is useful.


回答 12

pypi上有一些tail的现有实现,您可以使用pip安装:

  • mtFileUtil
  • 多尾
  • log4tailer

根据您的情况,使用这些现有工具之一可能会有好处。

There are some existing implementations of tail on pypi which you can install using pip:

  • mtFileUtil
  • multitail
  • log4tailer

Depending on your situation, there may be advantages to using one of these existing tools.


回答 13

简单:

with open("test.txt") as f:
data = f.readlines()
tail = data[-2:]
print(''.join(tail)

Simple :

with open("test.txt") as f:
data = f.readlines()
tail = data[-2:]
print(''.join(tail)

回答 14

为了提高大文件的效率(在日志文件中通常需要使用tail的情况下很常见),通常希望避免读取整个文件(即使这样做时也没有立即将整个文件读入内存),但是需要以某种方式而不是字符来计算偏移量。一种可能性是通过char用seek()char向后读取,但这非常慢。相反,最好在较大的块中进行处理。

我有一阵子我写的实用程序函数,可以向后读取文件,可以在这里使用。

import os, itertools

def rblocks(f, blocksize=4096):
    """Read file as series of blocks from end of file to start.

    The data itself is in normal order, only the order of the blocks is reversed.
    ie. "hello world" -> ["ld","wor", "lo ", "hel"]
    Note that the file must be opened in binary mode.
    """
    if 'b' not in f.mode.lower():
        raise Exception("File must be opened using binary mode.")
    size = os.stat(f.name).st_size
    fullblocks, lastblock = divmod(size, blocksize)

    # The first(end of file) block will be short, since this leaves 
    # the rest aligned on a blocksize boundary.  This may be more 
    # efficient than having the last (first in file) block be short
    f.seek(-lastblock,2)
    yield f.read(lastblock)

    for i in range(fullblocks-1,-1, -1):
        f.seek(i * blocksize)
        yield f.read(blocksize)

def tail(f, nlines):
    buf = ''
    result = []
    for block in rblocks(f):
        buf = block + buf
        lines = buf.splitlines()

        # Return all lines except the first (since may be partial)
        if lines:
            result.extend(lines[1:]) # First line may not be complete
            if(len(result) >= nlines):
                return result[-nlines:]

            buf = lines[0]

    return ([buf]+result)[-nlines:]


f=open('file_to_tail.txt','rb')
for line in tail(f, 20):
    print line

[编辑]添加了更特定的版本(避免需要反向两次)

For efficiency with very large files (common in logfile situations where you may want to use tail), you generally want to avoid reading the whole file (even if you do do it without reading the whole file into memory at once) However, you do need to somehow work out the offset in lines rather than characters. One possibility is reading backwards with seek() char by char, but this is very slow. Instead, its better to process in larger blocks.

I’ve a utility function I wrote a while ago to read files backwards that can be used here.

import os, itertools

def rblocks(f, blocksize=4096):
    """Read file as series of blocks from end of file to start.

    The data itself is in normal order, only the order of the blocks is reversed.
    ie. "hello world" -> ["ld","wor", "lo ", "hel"]
    Note that the file must be opened in binary mode.
    """
    if 'b' not in f.mode.lower():
        raise Exception("File must be opened using binary mode.")
    size = os.stat(f.name).st_size
    fullblocks, lastblock = divmod(size, blocksize)

    # The first(end of file) block will be short, since this leaves 
    # the rest aligned on a blocksize boundary.  This may be more 
    # efficient than having the last (first in file) block be short
    f.seek(-lastblock,2)
    yield f.read(lastblock)

    for i in range(fullblocks-1,-1, -1):
        f.seek(i * blocksize)
        yield f.read(blocksize)

def tail(f, nlines):
    buf = ''
    result = []
    for block in rblocks(f):
        buf = block + buf
        lines = buf.splitlines()

        # Return all lines except the first (since may be partial)
        if lines:
            result.extend(lines[1:]) # First line may not be complete
            if(len(result) >= nlines):
                return result[-nlines:]

            buf = lines[0]

    return ([buf]+result)[-nlines:]


f=open('file_to_tail.txt','rb')
for line in tail(f, 20):
    print line

[Edit] Added more specific version (avoids need to reverse twice)


回答 15

您可以使用f.seek(0,2)到文件末尾,然后用readline()的以下替换内容逐行读取:

def readline_backwards(self, f):
    backline = ''
    last = ''
    while not last == '\n':
        backline = last + backline
        if f.tell() <= 0:
            return backline
        f.seek(-1, 1)
        last = f.read(1)
        f.seek(-1, 1)
    backline = last
    last = ''
    while not last == '\n':
        backline = last + backline
        if f.tell() <= 0:
            return backline
        f.seek(-1, 1)
        last = f.read(1)
        f.seek(-1, 1)
    f.seek(1, 1)
    return backline

you can go to the end of your file with f.seek(0, 2) and then read off lines one by one with the following replacement for readline():

def readline_backwards(self, f):
    backline = ''
    last = ''
    while not last == '\n':
        backline = last + backline
        if f.tell() <= 0:
            return backline
        f.seek(-1, 1)
        last = f.read(1)
        f.seek(-1, 1)
    backline = last
    last = ''
    while not last == '\n':
        backline = last + backline
        if f.tell() <= 0:
            return backline
        f.seek(-1, 1)
        last = f.read(1)
        f.seek(-1, 1)
    f.seek(1, 1)
    return backline

回答 16

基于Eyecue的答案(2010年6月10日,21:28):此类将head()和tail()方法添加到文件对象。

class File(file):
    def head(self, lines_2find=1):
        self.seek(0)                            #Rewind file
        return [self.next() for x in xrange(lines_2find)]

    def tail(self, lines_2find=1):  
        self.seek(0, 2)                         #go to end of file
        bytes_in_file = self.tell()             
        lines_found, total_bytes_scanned = 0, 0
        while (lines_2find+1 > lines_found and
               bytes_in_file > total_bytes_scanned): 
            byte_block = min(1024, bytes_in_file-total_bytes_scanned)
            self.seek(-(byte_block+total_bytes_scanned), 2)
            total_bytes_scanned += byte_block
            lines_found += self.read(1024).count('\n')
        self.seek(-total_bytes_scanned, 2)
        line_list = list(self.readlines())
        return line_list[-lines_2find:]

用法:

f = File('path/to/file', 'r')
f.head(3)
f.tail(3)

Based on Eyecue answer (Jun 10 ’10 at 21:28): this class add head() and tail() method to file object.

class File(file):
    def head(self, lines_2find=1):
        self.seek(0)                            #Rewind file
        return [self.next() for x in xrange(lines_2find)]

    def tail(self, lines_2find=1):  
        self.seek(0, 2)                         #go to end of file
        bytes_in_file = self.tell()             
        lines_found, total_bytes_scanned = 0, 0
        while (lines_2find+1 > lines_found and
               bytes_in_file > total_bytes_scanned): 
            byte_block = min(1024, bytes_in_file-total_bytes_scanned)
            self.seek(-(byte_block+total_bytes_scanned), 2)
            total_bytes_scanned += byte_block
            lines_found += self.read(1024).count('\n')
        self.seek(-total_bytes_scanned, 2)
        line_list = list(self.readlines())
        return line_list[-lines_2find:]

Usage:

f = File('path/to/file', 'r')
f.head(3)
f.tail(3)

回答 17

如果文件未以\ n结尾或确保读取完整的第一行,则其中的几种解决方案都存在问题。

def tail(file, n=1, bs=1024):
    f = open(file)
    f.seek(-1,2)
    l = 1-f.read(1).count('\n') # If file doesn't end in \n, count it anyway.
    B = f.tell()
    while n >= l and B > 0:
            block = min(bs, B)
            B -= block
            f.seek(B, 0)
            l += f.read(block).count('\n')
    f.seek(B, 0)
    l = min(l,n) # discard first (incomplete) line if l > n
    lines = f.readlines()[-l:]
    f.close()
    return lines

Several of these solutions have issues if the file doesn’t end in \n or in ensuring the complete first line is read.

def tail(file, n=1, bs=1024):
    f = open(file)
    f.seek(-1,2)
    l = 1-f.read(1).count('\n') # If file doesn't end in \n, count it anyway.
    B = f.tell()
    while n >= l and B > 0:
            block = min(bs, B)
            B -= block
            f.seek(B, 0)
            l += f.read(block).count('\n')
    f.seek(B, 0)
    l = min(l,n) # discard first (incomplete) line if l > n
    lines = f.readlines()[-l:]
    f.close()
    return lines

回答 18

这是一个非常简单的实现:

with open('/etc/passwd', 'r') as f:
  try:
    f.seek(0,2)
    s = ''
    while s.count('\n') < 11:
      cur = f.tell()
      f.seek((cur - 10))
      s = f.read(10) + s
      f.seek((cur - 10))
    print s
  except Exception as e:
    f.readlines()

Here is a pretty simple implementation:

with open('/etc/passwd', 'r') as f:
  try:
    f.seek(0,2)
    s = ''
    while s.count('\n') < 11:
      cur = f.tell()
      f.seek((cur - 10))
      s = f.read(10) + s
      f.seek((cur - 10))
    print s
  except Exception as e:
    f.readlines()

回答 19

有一个非常有用的模块可以做到这一点:

from file_read_backwards import FileReadBackwards

with FileReadBackwards("/tmp/file", encoding="utf-8") as frb:

# getting lines by lines starting from the last line up
for l in frb:
    print(l)

There is very useful module that can do this:

from file_read_backwards import FileReadBackwards

with FileReadBackwards("/tmp/file", encoding="utf-8") as frb:

# getting lines by lines starting from the last line up
for l in frb:
    print(l)

回答 20

另一种解决方案

如果您的txt文件如下所示:鼠标蛇猫蜥蜴狼狗

您可以通过简单地在python”’中使用数组索引来反转此文件

contents=[]
def tail(contents,n):
    with open('file.txt') as file:
        for i in file.readlines():
            contents.append(i)

    for i in contents[:n:-1]:
        print(i)

tail(contents,-5)

结果:狗狼蜥蜴猫

Another Solution

if your txt file looks like this: mouse snake cat lizard wolf dog

you could reverse this file by simply using array indexing in python ”’

contents=[]
def tail(contents,n):
    with open('file.txt') as file:
        for i in file.readlines():
            contents.append(i)

    for i in contents[:n:-1]:
        print(i)

tail(contents,-5)

result: dog wolf lizard cat


回答 21

最简单的方法是使用deque

from collections import deque

def tail(filename, n=10):
    with open(filename) as f:
        return deque(f, n)

The simplest way is to use deque:

from collections import deque

def tail(filename, n=10):
    with open(filename) as f:
        return deque(f, n)

回答 22

我不得不从文件的最后一行读取特定值,并偶然发现了该线程。我没有重新发明Python的工作原理,而是得到了一个很小的shell脚本,保存为/ usr / local / bin / get_last_netp:

#! /bin/bash
tail -n1 /home/leif/projects/transfer/export.log | awk {'print $14'}

在Python程序中:

from subprocess import check_output

last_netp = int(check_output("/usr/local/bin/get_last_netp"))

I had to read a specific value from the last line of a file, and stumbled upon this thread. Rather than reinventing the wheel in Python, I ended up with a tiny shell script, saved as /usr/local/bin/get_last_netp:

#! /bin/bash
tail -n1 /home/leif/projects/transfer/export.log | awk {'print $14'}

And in the Python program:

from subprocess import check_output

last_netp = int(check_output("/usr/local/bin/get_last_netp"))

回答 23

不是第一个使用双端队列的示例,而是一个更简单的示例。这是一般性的:它适用于任何可迭代的对象,而不仅仅是文件。

#!/usr/bin/env python
import sys
import collections
def tail(iterable, N):
    deq = collections.deque()
    for thing in iterable:
        if len(deq) >= N:
            deq.popleft()
        deq.append(thing)
    for thing in deq:
        yield thing
if __name__ == '__main__':
    for line in tail(sys.stdin,10):
        sys.stdout.write(line)

Not the first example using a deque, but a simpler one. This one is general: it works on any iterable object, not just a file.

#!/usr/bin/env python
import sys
import collections
def tail(iterable, N):
    deq = collections.deque()
    for thing in iterable:
        if len(deq) >= N:
            deq.popleft()
        deq.append(thing)
    for thing in deq:
        yield thing
if __name__ == '__main__':
    for line in tail(sys.stdin,10):
        sys.stdout.write(line)

回答 24

This is my version of tailf

import sys, time, os

filename = 'path to file'

try:
    with open(filename) as f:
        size = os.path.getsize(filename)
        if size < 1024:
            s = size
        else:
            s = 999
        f.seek(-s, 2)
        l = f.read()
        print l
        while True:
            line = f.readline()
            if not line:
                time.sleep(1)
                continue
            print line
except IOError:
    pass
This is my version of tailf

import sys, time, os

filename = 'path to file'

try:
    with open(filename) as f:
        size = os.path.getsize(filename)
        if size < 1024:
            s = size
        else:
            s = 999
        f.seek(-s, 2)
        l = f.read()
        print l
        while True:
            line = f.readline()
            if not line:
                time.sleep(1)
                continue
            print line
except IOError:
    pass

回答 25

import time

attemps = 600
wait_sec = 5
fname = "YOUR_PATH"

with open(fname, "r") as f:
    where = f.tell()
    for i in range(attemps):
        line = f.readline()
        if not line:
            time.sleep(wait_sec)
            f.seek(where)
        else:
            print line, # already has newline
import time

attemps = 600
wait_sec = 5
fname = "YOUR_PATH"

with open(fname, "r") as f:
    where = f.tell()
    for i in range(attemps):
        line = f.readline()
        if not line:
            time.sleep(wait_sec)
            f.seek(where)
        else:
            print line, # already has newline

回答 26

import itertools
fname = 'log.txt'
offset = 5
n = 10
with open(fname) as f:
    n_last_lines = list(reversed([x for x in itertools.islice(f, None)][-(offset+1):-(offset+n+1):-1]))
import itertools
fname = 'log.txt'
offset = 5
n = 10
with open(fname) as f:
    n_last_lines = list(reversed([x for x in itertools.islice(f, None)][-(offset+1):-(offset+n+1):-1]))

回答 27

abc = "2018-06-16 04:45:18.68"
filename = "abc.txt"
with open(filename) as myFile:
    for num, line in enumerate(myFile, 1):
        if abc in line:
            lastline = num
print "last occurance of work at file is in "+str(lastline) 
abc = "2018-06-16 04:45:18.68"
filename = "abc.txt"
with open(filename) as myFile:
    for num, line in enumerate(myFile, 1):
        if abc in line:
            lastline = num
print "last occurance of work at file is in "+str(lastline) 

回答 28

更新由A.Coady给出的答案

适用于python 3

这使用指数搜索,将仅缓冲N来自后面的行,并且非常高效。

import time
import os
import sys

def tail(f, n):
    assert n >= 0
    pos, lines = n+1, []

    # set file pointer to end

    f.seek(0, os.SEEK_END)

    isFileSmall = False

    while len(lines) <= n:
        try:
            f.seek(f.tell() - pos, os.SEEK_SET)
        except ValueError as e:
            # lines greater than file seeking size
            # seek to start
            f.seek(0,os.SEEK_SET)
            isFileSmall = True
        except IOError:
            print("Some problem reading/seeking the file")
            sys.exit(-1)
        finally:
            lines = f.readlines()
            if isFileSmall:
                break

        pos *= 2

    print(lines)

    return lines[-n:]




with open("stream_logs.txt") as f:
    while(True):
        time.sleep(0.5)
        print(tail(f,2))

Update for answer given by A.Coady

Works with python 3.

This uses Exponential Search and will buffer only N lines from back and is very efficient.

import time
import os
import sys

def tail(f, n):
    assert n >= 0
    pos, lines = n+1, []

    # set file pointer to end

    f.seek(0, os.SEEK_END)

    isFileSmall = False

    while len(lines) <= n:
        try:
            f.seek(f.tell() - pos, os.SEEK_SET)
        except ValueError as e:
            # lines greater than file seeking size
            # seek to start
            f.seek(0,os.SEEK_SET)
            isFileSmall = True
        except IOError:
            print("Some problem reading/seeking the file")
            sys.exit(-1)
        finally:
            lines = f.readlines()
            if isFileSmall:
                break

        pos *= 2

    print(lines)

    return lines[-n:]




with open("stream_logs.txt") as f:
    while(True):
        time.sleep(0.5)
        print(tail(f,2))


回答 29

再次考虑,这可能和这里的一切一样快。

def tail( f, window=20 ):
    lines= ['']*window
    count= 0
    for l in f:
        lines[count%window]= l
        count += 1
    print lines[count%window:], lines[:count%window]

这要简单得多。而且它似乎确实在快速发展。

On second thought, this is probably just as fast as anything here.

def tail( f, window=20 ):
    lines= ['']*window
    count= 0
    for l in f:
        lines[count%window]= l
        count += 1
    print lines[count%window:], lines[:count%window]

It’s a lot simpler. And it does seem to rip along at a good pace.


从第2行读取文件或跳过标题行

问题:从第2行读取文件或跳过标题行

如何跳过标题行并开始从第2行读取文件?

How can I skip the header row and start reading a file from line2?


回答 0

with open(fname) as f:
    next(f)
    for line in f:
        #do something
with open(fname) as f:
    next(f)
    for line in f:
        #do something

回答 1

f = open(fname,'r')
lines = f.readlines()[1:]
f.close()
f = open(fname,'r')
lines = f.readlines()[1:]
f.close()

回答 2

如果要第一行,然后要对文件执行一些操作,此代码将很有帮助。

with open(filename , 'r') as f:
    first_line = f.readline()
    for line in f:
            # Perform some operations

If you want the first line and then you want to perform some operation on file this code will helpful.

with open(filename , 'r') as f:
    first_line = f.readline()
    for line in f:
            # Perform some operations

回答 3

如果切片可以在迭代器上工作…

from itertools import islice
with open(fname) as f:
    for line in islice(f, 1, None):
        pass

If slicing could work on iterators…

from itertools import islice
with open(fname) as f:
    for line in islice(f, 1, None):
        pass

回答 4

f = open(fname).readlines()
firstLine = f.pop(0) #removes the first line
for line in f:
    ...
f = open(fname).readlines()
firstLine = f.pop(0) #removes the first line
for line in f:
    ...

回答 5

为了概括读取多个标题行的任务并提高可读性,我将使用方法提取。假设您想标记化前三行coordinates.txt以用作标题信息。

coordinates.txt
---------------
Name,Longitude,Latitude,Elevation, Comments
String, Decimal Deg., Decimal Deg., Meters, String
Euler's Town,7.58857,47.559537,0, "Blah"
Faneuil Hall,-71.054773,42.360217,0
Yellowstone National Park,-110.588455,44.427963,0

然后提取方法允许你指定什么,你想用头信息做(在这个例子中,我们简单的记号化基础上,逗号标题行并返回一个列表,但有足够的空间做更多的工作)。

def __readheader(filehandle, numberheaderlines=1):
    """Reads the specified number of lines and returns the comma-delimited 
    strings on each line as a list"""
    for _ in range(numberheaderlines):
        yield map(str.strip, filehandle.readline().strip().split(','))

with open('coordinates.txt', 'r') as rh:
    # Single header line
    #print next(__readheader(rh))

    # Multiple header lines
    for headerline in __readheader(rh, numberheaderlines=2):
        print headerline  # Or do other stuff with headerline tokens

输出量

['Name', 'Longitude', 'Latitude', 'Elevation', 'Comments']
['String', 'Decimal Deg.', 'Decimal Deg.', 'Meters', 'String']

如果coordinates.txt包含另一个标题行,只需更改numberheaderlines。最重要的是,很清楚__readheader(rh, numberheaderlines=2)正在做什么,并且我们避免了必须弄清楚或评论为什么接受的答案的作者next()在其代码中使用原因的含糊之处。

To generalize the task of reading multiple header lines and to improve readability I’d use method extraction. Suppose you wanted to tokenize the first three lines of coordinates.txt to use as header information.

Example

coordinates.txt
---------------
Name,Longitude,Latitude,Elevation, Comments
String, Decimal Deg., Decimal Deg., Meters, String
Euler's Town,7.58857,47.559537,0, "Blah"
Faneuil Hall,-71.054773,42.360217,0
Yellowstone National Park,-110.588455,44.427963,0

Then method extraction allows you to specify what you want to do with the header information (in this example we simply tokenize the header lines based on the comma and return it as a list but there’s room to do much more).

def __readheader(filehandle, numberheaderlines=1):
    """Reads the specified number of lines and returns the comma-delimited 
    strings on each line as a list"""
    for _ in range(numberheaderlines):
        yield map(str.strip, filehandle.readline().strip().split(','))

with open('coordinates.txt', 'r') as rh:
    # Single header line
    #print next(__readheader(rh))

    # Multiple header lines
    for headerline in __readheader(rh, numberheaderlines=2):
        print headerline  # Or do other stuff with headerline tokens

Output

['Name', 'Longitude', 'Latitude', 'Elevation', 'Comments']
['String', 'Decimal Deg.', 'Decimal Deg.', 'Meters', 'String']

If coordinates.txt contains another headerline, simply change numberheaderlines. Best of all, it’s clear what __readheader(rh, numberheaderlines=2) is doing and we avoid the ambiguity of having to figure out or comment on why author of the the accepted answer uses next() in his code.


回答 6

如果您想从第2行开始读取多个CSV文件,这就像一个超级按钮

for files in csv_file_list:
        with open(files, 'r') as r: 
            next(r)                  #skip headers             
            rr = csv.reader(r)
            for row in rr:
                #do something

(这是帕菲特对另一个问题的回答的一部分)

If you want to read multiple CSV files starting from line 2, this works like a charm

for files in csv_file_list:
        with open(files, 'r') as r: 
            next(r)                  #skip headers             
            rr = csv.reader(r)
            for row in rr:
                #do something

(this is part of Parfait’s answer to a different question)


回答 7

# Open a connection to the file
with open('world_dev_ind.csv') as file:

    # Skip the column names
    file.readline()

    # Initialize an empty dictionary: counts_dict
    counts_dict = {}

    # Process only the first 1000 rows
    for j in range(0, 1000):

        # Split the current line into a list: line
        line = file.readline().split(',')

        # Get the value for the first column: first_col
        first_col = line[0]

        # If the column value is in the dict, increment its value
        if first_col in counts_dict.keys():
            counts_dict[first_col] += 1

        # Else, add to the dict and set value to 1
        else:
            counts_dict[first_col] = 1

# Print the resulting dictionary
print(counts_dict)
# Open a connection to the file
with open('world_dev_ind.csv') as file:

    # Skip the column names
    file.readline()

    # Initialize an empty dictionary: counts_dict
    counts_dict = {}

    # Process only the first 1000 rows
    for j in range(0, 1000):

        # Split the current line into a list: line
        line = file.readline().split(',')

        # Get the value for the first column: first_col
        first_col = line[0]

        # If the column value is in the dict, increment its value
        if first_col in counts_dict.keys():
            counts_dict[first_col] += 1

        # Else, add to the dict and set value to 1
        else:
            counts_dict[first_col] = 1

# Print the resulting dictionary
print(counts_dict)

在Python中读取大文件的惰性方法?

问题:在Python中读取大文件的惰性方法?

我有一个很大的文件4GB,当我尝试读取它时,计算机挂起了。因此,我想逐个读取它,并且在处理完每个块之后,将处理后的块存储到另一个文件中并读取下一个块。

yield这些零件有什么方法吗?

我很想有一个懒惰的方法

I have a very big file 4GB and when I try to read it my computer hangs. So I want to read it piece by piece and after processing each piece store the processed piece into another file and read next piece.

Is there any method to yield these pieces ?

I would love to have a lazy method.


回答 0

要编写一个惰性函数,只需使用yield

def read_in_chunks(file_object, chunk_size=1024):
    """Lazy function (generator) to read a file piece by piece.
    Default chunk size: 1k."""
    while True:
        data = file_object.read(chunk_size)
        if not data:
            break
        yield data


with open('really_big_file.dat') as f:
    for piece in read_in_chunks(f):
        process_data(piece)

另一个选择是使用iter和辅助功能:

f = open('really_big_file.dat')
def read1k():
    return f.read(1024)

for piece in iter(read1k, ''):
    process_data(piece)

如果文件是基于行的,则文件对象已经是行的惰性生成器:

for line in open('really_big_file.dat'):
    process_data(line)

To write a lazy function, just use yield:

def read_in_chunks(file_object, chunk_size=1024):
    """Lazy function (generator) to read a file piece by piece.
    Default chunk size: 1k."""
    while True:
        data = file_object.read(chunk_size)
        if not data:
            break
        yield data


with open('really_big_file.dat') as f:
    for piece in read_in_chunks(f):
        process_data(piece)

Another option would be to use iter and a helper function:

f = open('really_big_file.dat')
def read1k():
    return f.read(1024)

for piece in iter(read1k, ''):
    process_data(piece)

If the file is line-based, the file object is already a lazy generator of lines:

for line in open('really_big_file.dat'):
    process_data(line)

回答 1

如果您的计算机,操作系统和python是64位的,则可以使用mmap模块将文件的内容映射到内存中,并使用索引和切片对其进行访问。以下是文档中的示例:

import mmap
with open("hello.txt", "r+") as f:
    # memory-map the file, size 0 means whole file
    map = mmap.mmap(f.fileno(), 0)
    # read content via standard file methods
    print map.readline()  # prints "Hello Python!"
    # read content via slice notation
    print map[:5]  # prints "Hello"
    # update content using slice notation;
    # note that new content must have same size
    map[6:] = " world!\n"
    # ... and read again using standard file methods
    map.seek(0)
    print map.readline()  # prints "Hello  world!"
    # close the map
    map.close()

如果您的计算机,操作系统或python是32位的,则映射大型文件可能会保留地址空间的大部分,并使内存程序饿死

If your computer, OS and python are 64-bit, then you can use the mmap module to map the contents of the file into memory and access it with indices and slices. Here an example from the documentation:

import mmap
with open("hello.txt", "r+") as f:
    # memory-map the file, size 0 means whole file
    map = mmap.mmap(f.fileno(), 0)
    # read content via standard file methods
    print map.readline()  # prints "Hello Python!"
    # read content via slice notation
    print map[:5]  # prints "Hello"
    # update content using slice notation;
    # note that new content must have same size
    map[6:] = " world!\n"
    # ... and read again using standard file methods
    map.seek(0)
    print map.readline()  # prints "Hello  world!"
    # close the map
    map.close()

If either your computer, OS or python are 32-bit, then mmap-ing large files can reserve large parts of your address space and starve your program of memory.


回答 2

file.readlines() 接受一个可选的size参数,该参数近似返回的行中读取的行数。

bigfile = open('bigfilename','r')
tmp_lines = bigfile.readlines(BUF_SIZE)
while tmp_lines:
    process([line for line in tmp_lines])
    tmp_lines = bigfile.readlines(BUF_SIZE)

file.readlines() takes in an optional size argument which approximates the number of lines read in the lines returned.

bigfile = open('bigfilename','r')
tmp_lines = bigfile.readlines(BUF_SIZE)
while tmp_lines:
    process([line for line in tmp_lines])
    tmp_lines = bigfile.readlines(BUF_SIZE)

回答 3

已经有很多不错的答案,但是如果您的整个文件都在一行上,并且您仍要处理“行”(与固定大小的块相对),那么这些答案将无济于事。

99%的时间,可以逐行处理文件。然后,按照此答案的建议,您可以将文件对象本身用作延迟生成器:

with open('big.csv') as f:
    for line in f:
        process(line)

然而,有一次我遇到了一个非常非常大的(几乎)单行文件,其中的行分隔符实际上没有'\n',但是'|'

  • 不能逐行读取,但是我仍然需要逐行处理它。
  • 转换'|''\n'处理前也是不可能的,因为此csv的某些字段包含'\n'(自由文本用户输入)。
  • 还排除了使用csv库的原因,因为至少在lib的早期版本中,已对其进行了硬编码以逐行读取输入

对于这种情况,我创建了以下代码段:

def rows(f, chunksize=1024, sep='|'):
    """
    Read a file where the row separator is '|' lazily.

    Usage:

    >>> with open('big.csv') as f:
    >>>     for r in rows(f):
    >>>         process(row)
    """
    curr_row = ''
    while True:
        chunk = f.read(chunksize)
        if chunk == '': # End of file
            yield curr_row
            break
        while True:
            i = chunk.find(sep)
            if i == -1:
                break
            yield curr_row + chunk[:i]
            curr_row = ''
            chunk = chunk[i+1:]
        curr_row += chunk

我能够成功使用它来解决我的问题。它已通过各种块大小的广泛测试。


测试套件,适合那些想要说服自己的人。

test_file = 'test_file'

def cleanup(func):
    def wrapper(*args, **kwargs):
        func(*args, **kwargs)
        os.unlink(test_file)
    return wrapper

@cleanup
def test_empty(chunksize=1024):
    with open(test_file, 'w') as f:
        f.write('')
    with open(test_file) as f:
        assert len(list(rows(f, chunksize=chunksize))) == 1

@cleanup
def test_1_char_2_rows(chunksize=1024):
    with open(test_file, 'w') as f:
        f.write('|')
    with open(test_file) as f:
        assert len(list(rows(f, chunksize=chunksize))) == 2

@cleanup
def test_1_char(chunksize=1024):
    with open(test_file, 'w') as f:
        f.write('a')
    with open(test_file) as f:
        assert len(list(rows(f, chunksize=chunksize))) == 1

@cleanup
def test_1025_chars_1_row(chunksize=1024):
    with open(test_file, 'w') as f:
        for i in range(1025):
            f.write('a')
    with open(test_file) as f:
        assert len(list(rows(f, chunksize=chunksize))) == 1

@cleanup
def test_1024_chars_2_rows(chunksize=1024):
    with open(test_file, 'w') as f:
        for i in range(1023):
            f.write('a')
        f.write('|')
    with open(test_file) as f:
        assert len(list(rows(f, chunksize=chunksize))) == 2

@cleanup
def test_1025_chars_1026_rows(chunksize=1024):
    with open(test_file, 'w') as f:
        for i in range(1025):
            f.write('|')
    with open(test_file) as f:
        assert len(list(rows(f, chunksize=chunksize))) == 1026

@cleanup
def test_2048_chars_2_rows(chunksize=1024):
    with open(test_file, 'w') as f:
        for i in range(1022):
            f.write('a')
        f.write('|')
        f.write('a')
        # -- end of 1st chunk --
        for i in range(1024):
            f.write('a')
        # -- end of 2nd chunk
    with open(test_file) as f:
        assert len(list(rows(f, chunksize=chunksize))) == 2

@cleanup
def test_2049_chars_2_rows(chunksize=1024):
    with open(test_file, 'w') as f:
        for i in range(1022):
            f.write('a')
        f.write('|')
        f.write('a')
        # -- end of 1st chunk --
        for i in range(1024):
            f.write('a')
        # -- end of 2nd chunk
        f.write('a')
    with open(test_file) as f:
        assert len(list(rows(f, chunksize=chunksize))) == 2

if __name__ == '__main__':
    for chunksize in [1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024]:
        test_empty(chunksize)
        test_1_char_2_rows(chunksize)
        test_1_char(chunksize)
        test_1025_chars_1_row(chunksize)
        test_1024_chars_2_rows(chunksize)
        test_1025_chars_1026_rows(chunksize)
        test_2048_chars_2_rows(chunksize)
        test_2049_chars_2_rows(chunksize)

There are already many good answers, but if your entire file is on a single line and you still want to process “rows” (as opposed to fixed-size blocks), these answers will not help you.

99% of the time, it is possible to process files line by line. Then, as suggested in this answer, you can to use the file object itself as lazy generator:

with open('big.csv') as f:
    for line in f:
        process(line)

However, I once ran into a very very big (almost) single line file, where the row separator was in fact not '\n' but '|'.

  • Reading line by line was not an option, but I still needed to process it row by row.
  • Converting'|' to '\n' before processing was also out of the question, because some of the fields of this csv contained '\n' (free text user input).
  • Using the csv library was also ruled out because the fact that, at least in early versions of the lib, it is hardcoded to read the input line by line.

For these kind of situations, I created the following snippet:

def rows(f, chunksize=1024, sep='|'):
    """
    Read a file where the row separator is '|' lazily.

    Usage:

    >>> with open('big.csv') as f:
    >>>     for r in rows(f):
    >>>         process(row)
    """
    curr_row = ''
    while True:
        chunk = f.read(chunksize)
        if chunk == '': # End of file
            yield curr_row
            break
        while True:
            i = chunk.find(sep)
            if i == -1:
                break
            yield curr_row + chunk[:i]
            curr_row = ''
            chunk = chunk[i+1:]
        curr_row += chunk

I was able to use it successfully to solve my problem. It has been extensively tested, with various chunk sizes.


Test suite, for those who want to convince themselves.

test_file = 'test_file'

def cleanup(func):
    def wrapper(*args, **kwargs):
        func(*args, **kwargs)
        os.unlink(test_file)
    return wrapper

@cleanup
def test_empty(chunksize=1024):
    with open(test_file, 'w') as f:
        f.write('')
    with open(test_file) as f:
        assert len(list(rows(f, chunksize=chunksize))) == 1

@cleanup
def test_1_char_2_rows(chunksize=1024):
    with open(test_file, 'w') as f:
        f.write('|')
    with open(test_file) as f:
        assert len(list(rows(f, chunksize=chunksize))) == 2

@cleanup
def test_1_char(chunksize=1024):
    with open(test_file, 'w') as f:
        f.write('a')
    with open(test_file) as f:
        assert len(list(rows(f, chunksize=chunksize))) == 1

@cleanup
def test_1025_chars_1_row(chunksize=1024):
    with open(test_file, 'w') as f:
        for i in range(1025):
            f.write('a')
    with open(test_file) as f:
        assert len(list(rows(f, chunksize=chunksize))) == 1

@cleanup
def test_1024_chars_2_rows(chunksize=1024):
    with open(test_file, 'w') as f:
        for i in range(1023):
            f.write('a')
        f.write('|')
    with open(test_file) as f:
        assert len(list(rows(f, chunksize=chunksize))) == 2

@cleanup
def test_1025_chars_1026_rows(chunksize=1024):
    with open(test_file, 'w') as f:
        for i in range(1025):
            f.write('|')
    with open(test_file) as f:
        assert len(list(rows(f, chunksize=chunksize))) == 1026

@cleanup
def test_2048_chars_2_rows(chunksize=1024):
    with open(test_file, 'w') as f:
        for i in range(1022):
            f.write('a')
        f.write('|')
        f.write('a')
        # -- end of 1st chunk --
        for i in range(1024):
            f.write('a')
        # -- end of 2nd chunk
    with open(test_file) as f:
        assert len(list(rows(f, chunksize=chunksize))) == 2

@cleanup
def test_2049_chars_2_rows(chunksize=1024):
    with open(test_file, 'w') as f:
        for i in range(1022):
            f.write('a')
        f.write('|')
        f.write('a')
        # -- end of 1st chunk --
        for i in range(1024):
            f.write('a')
        # -- end of 2nd chunk
        f.write('a')
    with open(test_file) as f:
        assert len(list(rows(f, chunksize=chunksize))) == 2

if __name__ == '__main__':
    for chunksize in [1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024]:
        test_empty(chunksize)
        test_1_char_2_rows(chunksize)
        test_1_char(chunksize)
        test_1025_chars_1_row(chunksize)
        test_1024_chars_2_rows(chunksize)
        test_1025_chars_1026_rows(chunksize)
        test_2048_chars_2_rows(chunksize)
        test_2049_chars_2_rows(chunksize)

回答 4

f = ... # file-like object, i.e. supporting read(size) function and 
        # returning empty string '' when there is nothing to read

def chunked(file, chunk_size):
    return iter(lambda: file.read(chunk_size), '')

for data in chunked(f, 65536):
    # process the data

更新:最好在https://stackoverflow.com/a/4566523/38592中解释该方法

f = ... # file-like object, i.e. supporting read(size) function and 
        # returning empty string '' when there is nothing to read

def chunked(file, chunk_size):
    return iter(lambda: file.read(chunk_size), '')

for data in chunked(f, 65536):
    # process the data

UPDATE: The approach is best explained in https://stackoverflow.com/a/4566523/38592


回答 5

请参阅python的官方文档 https://docs.python.org/zh-cn/3/library/functions.html?#iter

也许这种方法更pythonic:

from functools import partial

"""A file object returned by open() is a iterator with
read method which could specify current read's block size"""
with open('mydata.db', 'r') as f_in:

    part_read = partial(f_in.read, 1024*1024)
    iterator = iter(part_read, b'')

    for index, block in enumerate(iterator, start=1):
        block = process_block(block)    # process block data
        with open(f'{index}.txt', 'w') as f_out:
            f_out.write(block)

Refer to python’s official documentation https://docs.python.org/3/library/functions.html#iter

Maybe this method is more pythonic:

from functools import partial

"""A file object returned by open() is a iterator with
read method which could specify current read's block size"""
with open('mydata.db', 'r') as f_in:

    part_read = partial(f_in.read, 1024*1024)
    iterator = iter(part_read, b'')

    for index, block in enumerate(iterator, start=1):
        block = process_block(block)    # process your block data
        
        with open(f'{index}.txt', 'w') as f_out:
            f_out.write(block)

回答 6

我认为我们可以这样写:

def read_file(path, block_size=1024): 
    with open(path, 'rb') as f: 
        while True: 
            piece = f.read(block_size) 
            if piece: 
                yield piece 
            else: 
                return

for piece in read_file(path):
    process_piece(piece)

I think we can write like this:

def read_file(path, block_size=1024): 
    with open(path, 'rb') as f: 
        while True: 
            piece = f.read(block_size) 
            if piece: 
                yield piece 
            else: 
                return

for piece in read_file(path):
    process_piece(piece)

回答 7

由于声誉低下,我不允许发表评论,但是SilentGhosts解决方案应该可以通过file.readlines([sizehint])轻松得多

python文件方法

编辑:SilentGhost是正确的,但这应该比:

s = "" 
for i in xrange(100): 
   s += file.next()

i am not allowed to comment due to my low reputation, but SilentGhosts solution should be much easier with file.readlines([sizehint])

python file methods

edit: SilentGhost is right, but this should be better than:

s = "" 
for i in xrange(100): 
   s += file.next()

回答 8

我处于类似情况。目前尚不清楚您是否知道块大小(以字节为单位)。我通常不知道,但是所需的记录(行)数是已知的:

def get_line():
     with open('4gb_file') as file:
         for i in file:
             yield i

lines_required = 100
gen = get_line()
chunk = [i for i, j in zip(gen, range(lines_required))]

更新:谢谢nosklo。这就是我的意思。它几乎起作用了,只是它丢失了块之间的一行。

chunk = [next(gen) for i in range(lines_required)]

技巧不丢失任何行,但看起来不是很好。

I’m in a somewhat similar situation. It’s not clear whether you know chunk size in bytes; I usually don’t, but the number of records (lines) that is required is known:

def get_line():
     with open('4gb_file') as file:
         for i in file:
             yield i

lines_required = 100
gen = get_line()
chunk = [i for i, j in zip(gen, range(lines_required))]

Update: Thanks nosklo. Here’s what I meant. It almost works, except that it loses a line ‘between’ chunks.

chunk = [next(gen) for i in range(lines_required)]

Does the trick w/o losing any lines, but it doesn’t look very nice.


回答 9

要逐行处理,这是一个很好的解决方案:

  def stream_lines(file_name):
    file = open(file_name)
    while True:
      line = file.readline()
      if not line:
        file.close()
        break
      yield line

只要没有空行。

To process line by line, this is an elegant solution:

  def stream_lines(file_name):
    file = open(file_name)
    while True:
      line = file.readline()
      if not line:
        file.close()
        break
      yield line

As long as there’re no blank lines.


回答 10

您可以使用以下代码。

file_obj = open('big_file') 

open()返回一个文件对象

然后使用os.stat获取大小

file_size = os.stat('big_file').st_size

for i in range( file_size/1024):
    print file_obj.read(1024)

you can use following code.

file_obj = open('big_file') 

open() returns a file object

then use os.stat for getting size

file_size = os.stat('big_file').st_size

for i in range( file_size/1024):
    print file_obj.read(1024)

在Python中读取.mat文件

问题:在Python中读取.mat文件

是否可以在Python中读取二进制MATLAB .mat文件?

我已经看到SciPy声称支持读取.mat文件,但是我没有成功。我安装了SciPy 0.7.0版,但找不到该loadmat()方法。

Is it possible to read binary MATLAB .mat files in Python?

I’ve seen that SciPy has alleged support for reading .mat files, but I’m unsuccessful with it. I installed SciPy version 0.7.0, and I can’t find the loadmat() method.


回答 0

需要导入,import scipy.io

import scipy.io
mat = scipy.io.loadmat('file.mat')

An import is required, import scipy.io

import scipy.io
mat = scipy.io.loadmat('file.mat')

回答 1

无论是scipy.io.savemat,还是scipy.io.loadmat对于MATLAB阵列的7.3版本的工作。但是好消息是MATLAB版本7.3文件是hdf5数据集。因此,可以使用许多工具(包括NumPy)读取它们。

对于Python,您将需要h5py扩展,该扩展在系统上需要HDF5。

import numpy as np
import h5py
f = h5py.File('somefile.mat','r')
data = f.get('data/variable1')
data = np.array(data) # For converting to a NumPy array

Neither scipy.io.savemat, nor scipy.io.loadmat work for MATLAB arrays version 7.3. But the good part is that MATLAB version 7.3 files are hdf5 datasets. So they can be read using a number of tools, including NumPy.

For Python, you will need the h5py extension, which requires HDF5 on your system.

import numpy as np
import h5py
f = h5py.File('somefile.mat','r')
data = f.get('data/variable1')
data = np.array(data) # For converting to a NumPy array

回答 2

首先将.mat文件另存为:

save('test.mat', '-v7')

之后,在Python中,使用通常的loadmat函数:

import scipy.io as sio
test = sio.loadmat('test.mat')

First save the .mat file as:

save('test.mat', '-v7')

After that, in Python, use the usual loadmat function:

import scipy.io as sio
test = sio.loadmat('test.mat')

回答 3

有一个很好的软件包mat4py,可以很容易地使用安装

pip install mat4py

使用起来很简单(从网站上):

从MAT文件加载数据

该函数loadmat仅使用Python dictlist对象将MAT文件中存储的所有变量加载到简单的Python数据结构中。数字和单元格数组将转换为按行排序的嵌套列表。压缩数组以消除仅包含一个元素的数组。结果数据结构由与JSON格式兼容的简单类型组成。

示例:将MAT文件加载到Python数据结构中:

from mat4py import loadmat

data = loadmat('datafile.mat')

变量datadict带有MAT文件中包含的变量和值的a 。

将Python数据结构保存到MAT文件

可以使用函数将Python数据保存到MAT文件中savemat。数据已经以同样的方式为被结构化的loadmat,也就是说,它应该由简单数据类型,像dictliststrint,和float

示例:将Python数据结构保存到MAT文件中:

from mat4py import savemat

savemat('datafile.mat', data)

参数data应为dict带有变量的a。

There is a nice package called mat4py which can easily be installed using

pip install mat4py

It is straightforward to use (from the website):

Load data from a MAT-file

The function loadmat loads all variables stored in the MAT-file into a simple Python data structure, using only Python’s dict and list objects. Numeric and cell arrays are converted to row-ordered nested lists. Arrays are squeezed to eliminate arrays with only one element. The resulting data structure is composed of simple types that are compatible with the JSON format.

Example: Load a MAT-file into a Python data structure:

from mat4py import loadmat

data = loadmat('datafile.mat')

The variable data is a dict with the variables and values contained in the MAT-file.

Save a Python data structure to a MAT-file

Python data can be saved to a MAT-file, with the function savemat. Data has to be structured in the same way as for loadmat, i.e. it should be composed of simple data types, like dict, list, str, int, and float.

Example: Save a Python data structure to a MAT-file:

from mat4py import savemat

savemat('datafile.mat', data)

The parameter data shall be a dict with the variables.


回答 4

安装了MATLAB 2014b或更高版本后,可以使用适用于PythonMATLAB引擎

import matlab.engine
eng = matlab.engine.start_matlab()
content = eng.load("example.mat", nargout=1)

Having MATLAB 2014b or newer installed, the MATLAB engine for Python could be used:

import matlab.engine
eng = matlab.engine.start_matlab()
content = eng.load("example.mat", nargout=1)

回答 5

读取文件

import scipy.io
mat = scipy.io.loadmat(file_name)

检查MAT变量的类型

print(type(mat))
#OUTPUT - <class 'dict'>

字典中的MATLAB变量分配给这些变量对象

Reading the file

import scipy.io
mat = scipy.io.loadmat(file_name)

Inspecting the type of MAT variable

print(type(mat))
#OUTPUT - <class 'dict'>

The keys inside the dictionary are MATLAB variables, and the values are the objects assigned to those variables.


回答 6

MathWorks本身也提供用于PythonMATLAB引擎。如果您有MATLAB,则可能值得考虑(我自己还没有尝试过,但是它具有比仅读取MATLAB文件更多的功能)。但是,我不知道是否允许将其分发给其他用户(如果这些人拥有MATLAB,这可能不是问题。否则,也许NumPy是正确的选择?)。

另外,如果您想自己掌握所有基础知识,MathWorks将提供有关文件格式结构的详细文档(如果链接发生更改,请尝试使用google matfile_format.pdf或其标题MAT-FILE Format)。它并不像我个人想象的那样复杂,但是显然,这不是最简单的方法。它还取决于.mat您要支持-files的多少功能。

我编写了一个“小”(约700行)Python脚本,该脚本可以读取一些基本的.mat-files。我既不是Python专家,也不是初学者,我花了大约两天时间来编写它(使用上面链接的MathWorks文档)。我学到很多新东西,这很有趣(大部分时间)。当我在工作时编写Python脚本时,恐怕我无法发布它了……但是我可以在这里给出一些建议:

  • 首先阅读文档。
  • 使用十六进制编辑器(例如HxD)并查看.mat要解析的参考文件。
  • 尝试通过将字节保存到.txt文件并注释每行来弄清楚每个字节的含义。
  • 使用类来保存每个数据元素(如miCOMPRESSEDmiMATRIXmxDOUBLE,或miINT32
  • .mat-files’结构是最佳的用于保存在一个树形数据结构中的数据元素; 每个节点都有一个类和子节点

There is also the MATLAB Engine for Python by MathWorks itself. If you have MATLAB, this might be worth considering (I haven’t tried it myself but it has a lot more functionality than just reading MATLAB files). However, I don’t know if it is allowed to distribute it to other users (it is probably not a problem if those persons have MATLAB. Otherwise, maybe NumPy is the right way to go?).

Also, if you want to do all the basics yourself, MathWorks provides (if the link changes, try to google for matfile_format.pdf or its title MAT-FILE Format) a detailed documentation on the structure of the file format. It’s not as complicated as I personally thought, but obviously, this is not the easiest way to go. It also depends on how many features of the .mat-files you want to support.

I’ve written a “small” (about 700 lines) Python script which can read some basic .mat-files. I’m neither a Python expert nor a beginner and it took me about two days to write it (using the MathWorks documentation linked above). I’ve learned a lot of new stuff and it was quite fun (most of the time). As I’ve written the Python script at work, I’m afraid I cannot publish it… But I can give some advice here:

  • First read the documentation.
  • Use a hex editor (such as HxD) and look into a reference .mat-file you want to parse.
  • Try to figure out the meaning of each byte by saving the bytes to a .txt file and annotate each line.
  • Use classes to save each data element (such as miCOMPRESSED, miMATRIX, mxDOUBLE, or miINT32)
  • The .mat-files’ structure is optimal for saving the data elements in a tree data structure; each node has one class and subnodes

回答 7

from os.path import dirname, join as pjoin
import scipy.io as sio
data_dir = pjoin(dirname(sio.__file__), 'matlab', 'tests', 'data')
mat_fname = pjoin(data_dir, 'testdouble_7.4_GLNX86.mat')
mat_contents = sio.loadmat(mat_fname)

您可以使用上面的代码在Python中读取默认保存的.mat文件。

from os.path import dirname, join as pjoin
import scipy.io as sio
data_dir = pjoin(dirname(sio.__file__), 'matlab', 'tests', 'data')
mat_fname = pjoin(data_dir, 'testdouble_7.4_GLNX86.mat')
mat_contents = sio.loadmat(mat_fname)

You can use above code to read the default saved .mat file in Python.


读取二进制文件并遍历每个字节

问题:读取二进制文件并遍历每个字节

在Python中,如何读取二进制文件并在该文件的每个字节上循环?

In Python, how do I read in a binary file and loop over each byte of that file?


回答 0

Python 2.4及更早版本

f = open("myfile", "rb")
try:
    byte = f.read(1)
    while byte != "":
        # Do stuff with byte.
        byte = f.read(1)
finally:
    f.close()

Python 2.5-2.7

with open("myfile", "rb") as f:
    byte = f.read(1)
    while byte != "":
        # Do stuff with byte.
        byte = f.read(1)

请注意,with语句在2.5以下的Python版本中不可用。要在v 2.5中使用它,您需要导入它:

from __future__ import with_statement

在2.6中是不需要的。

Python 3

在Python 3中,这有点不同。我们将不再以字节模式而是字节对象从流中获取原始字符,因此我们需要更改条件:

with open("myfile", "rb") as f:
    byte = f.read(1)
    while byte != b"":
        # Do stuff with byte.
        byte = f.read(1)

或如benhoyt所说,跳过不相等并利用b""评估为false 的事实。这使代码在2.6和3.x之间兼容,而无需进行任何更改。如果从字节模式更改为文本模式或相反,也可以避免更改条件。

with open("myfile", "rb") as f:
    byte = f.read(1)
    while byte:
        # Do stuff with byte.
        byte = f.read(1)

python 3.8

从现在开始,借助:=运算符,可以以更短的方式编写以上代码。

with open("myfile", "rb") as f:
    while (byte := f.read(1)):
        # Do stuff with byte.

Python 2.4 and Earlier

f = open("myfile", "rb")
try:
    byte = f.read(1)
    while byte != "":
        # Do stuff with byte.
        byte = f.read(1)
finally:
    f.close()

Python 2.5-2.7

with open("myfile", "rb") as f:
    byte = f.read(1)
    while byte != "":
        # Do stuff with byte.
        byte = f.read(1)

Note that the with statement is not available in versions of Python below 2.5. To use it in v 2.5 you’ll need to import it:

from __future__ import with_statement

In 2.6 this is not needed.

Python 3

In Python 3, it’s a bit different. We will no longer get raw characters from the stream in byte mode but byte objects, thus we need to alter the condition:

with open("myfile", "rb") as f:
    byte = f.read(1)
    while byte != b"":
        # Do stuff with byte.
        byte = f.read(1)

Or as benhoyt says, skip the not equal and take advantage of the fact that b"" evaluates to false. This makes the code compatible between 2.6 and 3.x without any changes. It would also save you from changing the condition if you go from byte mode to text or the reverse.

with open("myfile", "rb") as f:
    byte = f.read(1)
    while byte:
        # Do stuff with byte.
        byte = f.read(1)

python 3.8

From now on thanks to := operator the above code can be written in a shorter way.

with open("myfile", "rb") as f:
    while (byte := f.read(1)):
        # Do stuff with byte.

回答 1

该生成器从文件中产生字节,并分块读取文件:

def bytes_from_file(filename, chunksize=8192):
    with open(filename, "rb") as f:
        while True:
            chunk = f.read(chunksize)
            if chunk:
                for b in chunk:
                    yield b
            else:
                break

# example:
for b in bytes_from_file('filename'):
    do_stuff_with(b)

有关迭代器生成器的信息,请参见Python文档。

This generator yields bytes from a file, reading the file in chunks:

def bytes_from_file(filename, chunksize=8192):
    with open(filename, "rb") as f:
        while True:
            chunk = f.read(chunksize)
            if chunk:
                for b in chunk:
                    yield b
            else:
                break

# example:
for b in bytes_from_file('filename'):
    do_stuff_with(b)

See the Python documentation for information on iterators and generators.


回答 2

如果文件不是太大,则将其保存在内存中是一个问题:

with open("filename", "rb") as f:
    bytes_read = f.read()
for b in bytes_read:
    process_byte(b)

其中process_byte表示要对传入的字节执行的某些操作。

如果要一次处理一个块:

with open("filename", "rb") as f:
    bytes_read = f.read(CHUNKSIZE)
    while bytes_read:
        for b in bytes_read:
            process_byte(b)
        bytes_read = f.read(CHUNKSIZE)

with语句在Python 2.5及更高版本中可用。

If the file is not too big that holding it in memory is a problem:

with open("filename", "rb") as f:
    bytes_read = f.read()
for b in bytes_read:
    process_byte(b)

where process_byte represents some operation you want to perform on the passed-in byte.

If you want to process a chunk at a time:

with open("filename", "rb") as f:
    bytes_read = f.read(CHUNKSIZE)
    while bytes_read:
        for b in bytes_read:
            process_byte(b)
        bytes_read = f.read(CHUNKSIZE)

The with statement is available in Python 2.5 and greater.


回答 3

要读取一个文件(一次一个字节(忽略缓冲)),可以使用内置两个参数iter(callable, sentinel)的函数

with open(filename, 'rb') as file:
    for byte in iter(lambda: file.read(1), b''):
        # Do stuff with byte

它调用file.read(1)直到不返回任何内容b''(空字节串)。对于大文件,内存不会无限增长。您可以传递buffering=0open(),以禁用缓冲-它确保每次迭代仅读取一个字节(慢速)。

with-statement自动关闭文件-包括下面的代码引发异常的情况。

尽管默认情况下存在内部缓冲,但是每次处理一个字节仍然效率低下。例如,以下是该blackhole.py实用程序,它消耗了给出的所有内容:

#!/usr/bin/env python3
"""Discard all input. `cat > /dev/null` analog."""
import sys
from functools import partial
from collections import deque

chunksize = int(sys.argv[1]) if len(sys.argv) > 1 else (1 << 15)
deque(iter(partial(sys.stdin.detach().read, chunksize), b''), maxlen=0)

例:

$ dd if=/dev/zero bs=1M count=1000 | python3 blackhole.py

它处理〜1.5 Gb / s的时候chunksize == 32768我的机器,只有在〜7.5 MB / s的时候chunksize == 1。也就是说,一次读取一个字节要慢200倍。考虑到这一点,如果你可以重写你的处理同时使用多个字节,如果你需要的性能。

mmap允许您同时将文件视为bytearray和文件对象。如果您需要访问两个接口,它可以作为将整个文件加载到内存中的替代方法。特别是,您可以仅使用普通for-loop 一次遍历一个内存映射文件一个字节:

from mmap import ACCESS_READ, mmap

with open(filename, 'rb', 0) as f, mmap(f.fileno(), 0, access=ACCESS_READ) as s:
    for byte in s: # length is equal to the current file size
        # Do stuff with byte

mmap支持切片符号。例如,从文件中从position开始mm[i:i+len]返回len字节i。Python 3.2之前不支持上下文管理器协议。mm.close()在这种情况下,您需要显式调用。使用遍历每个字节mmap比消耗更多的内存file.read(1),但mmap速度要快一个数量级。

To read a file — one byte at a time (ignoring the buffering) — you could use the two-argument iter(callable, sentinel) built-in function:

with open(filename, 'rb') as file:
    for byte in iter(lambda: file.read(1), b''):
        # Do stuff with byte

It calls file.read(1) until it returns nothing b'' (empty bytestring). The memory doesn’t grow unlimited for large files. You could pass buffering=0 to open(), to disable the buffering — it guarantees that only one byte is read per iteration (slow).

with-statement closes the file automatically — including the case when the code underneath raises an exception.

Despite the presence of internal buffering by default, it is still inefficient to process one byte at a time. For example, here’s the blackhole.py utility that eats everything it is given:

#!/usr/bin/env python3
"""Discard all input. `cat > /dev/null` analog."""
import sys
from functools import partial
from collections import deque

chunksize = int(sys.argv[1]) if len(sys.argv) > 1 else (1 << 15)
deque(iter(partial(sys.stdin.detach().read, chunksize), b''), maxlen=0)

Example:

$ dd if=/dev/zero bs=1M count=1000 | python3 blackhole.py

It processes ~1.5 GB/s when chunksize == 32768 on my machine and only ~7.5 MB/s when chunksize == 1. That is, it is 200 times slower to read one byte at a time. Take it into account if you can rewrite your processing to use more than one byte at a time and if you need performance.

mmap allows you to treat a file as a bytearray and a file object simultaneously. It can serve as an alternative to loading the whole file in memory if you need access both interfaces. In particular, you can iterate one byte at a time over a memory-mapped file just using a plain for-loop:

from mmap import ACCESS_READ, mmap

with open(filename, 'rb', 0) as f, mmap(f.fileno(), 0, access=ACCESS_READ) as s:
    for byte in s: # length is equal to the current file size
        # Do stuff with byte

mmap supports the slice notation. For example, mm[i:i+len] returns len bytes from the file starting at position i. The context manager protocol is not supported before Python 3.2; you need to call mm.close() explicitly in this case. Iterating over each byte using mmap consumes more memory than file.read(1), but mmap is an order of magnitude faster.


回答 4

用Python读取二进制文件并遍历每个字节

Python 3.5中的新功能是 pathlib模块,它具有一种便捷的方法,专门用于将文件读取为字节,从而允许我们遍历字节。我认为这是一个不错的回答(如果又快又脏):

import pathlib

for byte in pathlib.Path(path).read_bytes():
    print(byte)

有趣的是,这是唯一要提及的答案pathlib

在Python 2中,您可能会这样做(正如Vinay Sajip也建议的那样):

with open(path, 'b') as file:
    for byte in file.read():
        print(byte)

如果文件太大而无法在内存中进行迭代,则可以使用iter带有callable, sentinel签名的函数(Python 2版本)来惯用地对其进行分块:

with open(path, 'b') as file:
    callable = lambda: file.read(1024)
    sentinel = bytes() # or b''
    for chunk in iter(callable, sentinel): 
        for byte in chunk:
            print(byte)

(其他几个答案都提到了这一点,但很少有人提供合理的读取大小。)

大文件或缓冲/交互式读取的最佳实践

让我们创建一个函数来做到这一点,包括对Python 3.5+标准库的惯用用法:

from pathlib import Path
from functools import partial
from io import DEFAULT_BUFFER_SIZE

def file_byte_iterator(path):
    """given a path, return an iterator over the file
    that lazily loads the file
    """
    path = Path(path)
    with path.open('rb') as file:
        reader = partial(file.read1, DEFAULT_BUFFER_SIZE)
        file_iterator = iter(reader, bytes())
        for chunk in file_iterator:
            yield from chunk

请注意,我们使用file.read1file.read阻塞,直到获得所有请求的字节或EOFfile.read1让我们避免阻塞,因此它可以更快地返回。没有其他答案也提到这一点。

最佳实践演示:

让我们制作一个具有兆字节(实际上是兆字节)的伪随机数据的文件:

import random
import pathlib
path = 'pseudorandom_bytes'
pathobj = pathlib.Path(path)

pathobj.write_bytes(
  bytes(random.randint(0, 255) for _ in range(2**20)))

现在让我们对其进行迭代并在内存中实现它:

>>> l = list(file_byte_iterator(path))
>>> len(l)
1048576

我们可以检查数据的任何部分,例如,最后100个字节和前100个字节:

>>> l[-100:]
[208, 5, 156, 186, 58, 107, 24, 12, 75, 15, 1, 252, 216, 183, 235, 6, 136, 50, 222, 218, 7, 65, 234, 129, 240, 195, 165, 215, 245, 201, 222, 95, 87, 71, 232, 235, 36, 224, 190, 185, 12, 40, 131, 54, 79, 93, 210, 6, 154, 184, 82, 222, 80, 141, 117, 110, 254, 82, 29, 166, 91, 42, 232, 72, 231, 235, 33, 180, 238, 29, 61, 250, 38, 86, 120, 38, 49, 141, 17, 190, 191, 107, 95, 223, 222, 162, 116, 153, 232, 85, 100, 97, 41, 61, 219, 233, 237, 55, 246, 181]
>>> l[:100]
[28, 172, 79, 126, 36, 99, 103, 191, 146, 225, 24, 48, 113, 187, 48, 185, 31, 142, 216, 187, 27, 146, 215, 61, 111, 218, 171, 4, 160, 250, 110, 51, 128, 106, 3, 10, 116, 123, 128, 31, 73, 152, 58, 49, 184, 223, 17, 176, 166, 195, 6, 35, 206, 206, 39, 231, 89, 249, 21, 112, 168, 4, 88, 169, 215, 132, 255, 168, 129, 127, 60, 252, 244, 160, 80, 155, 246, 147, 234, 227, 157, 137, 101, 84, 115, 103, 77, 44, 84, 134, 140, 77, 224, 176, 242, 254, 171, 115, 193, 29]

不要逐行迭代二进制文件

不要执行以下操作-这会拉出任意大小的块,直到到达换行符为止-当块太小且可能也太大时,太慢:

    with open(path, 'rb') as file:
        for chunk in file: # text newline iteration - not for bytes
            yield from chunk

上面的内容仅适用于语义上人类可读的文本文件(例如纯文本,代码,标记,降价等……本质上是ascii,utf,拉丁语等……已编码),您应该在不带'b'标志的情况下打开它们。

Reading binary file in Python and looping over each byte

New in Python 3.5 is the pathlib module, which has a convenience method specifically to read in a file as bytes, allowing us to iterate over the bytes. I consider this a decent (if quick and dirty) answer:

import pathlib

for byte in pathlib.Path(path).read_bytes():
    print(byte)

Interesting that this is the only answer to mention pathlib.

In Python 2, you probably would do this (as Vinay Sajip also suggests):

with open(path, 'b') as file:
    for byte in file.read():
        print(byte)

In the case that the file may be too large to iterate over in-memory, you would chunk it, idiomatically, using the iter function with the callable, sentinel signature – the Python 2 version:

with open(path, 'b') as file:
    callable = lambda: file.read(1024)
    sentinel = bytes() # or b''
    for chunk in iter(callable, sentinel): 
        for byte in chunk:
            print(byte)

(Several other answers mention this, but few offer a sensible read size.)

Best practice for large files or buffered/interactive reading

Let’s create a function to do this, including idiomatic uses of the standard library for Python 3.5+:

from pathlib import Path
from functools import partial
from io import DEFAULT_BUFFER_SIZE

def file_byte_iterator(path):
    """given a path, return an iterator over the file
    that lazily loads the file
    """
    path = Path(path)
    with path.open('rb') as file:
        reader = partial(file.read1, DEFAULT_BUFFER_SIZE)
        file_iterator = iter(reader, bytes())
        for chunk in file_iterator:
            yield from chunk

Note that we use file.read1. file.read blocks until it gets all the bytes requested of it or EOF. file.read1 allows us to avoid blocking, and it can return more quickly because of this. No other answers mention this as well.

Demonstration of best practice usage:

Let’s make a file with a megabyte (actually mebibyte) of pseudorandom data:

import random
import pathlib
path = 'pseudorandom_bytes'
pathobj = pathlib.Path(path)

pathobj.write_bytes(
  bytes(random.randint(0, 255) for _ in range(2**20)))

Now let’s iterate over it and materialize it in memory:

>>> l = list(file_byte_iterator(path))
>>> len(l)
1048576

We can inspect any part of the data, for example, the last 100 and first 100 bytes:

>>> l[-100:]
[208, 5, 156, 186, 58, 107, 24, 12, 75, 15, 1, 252, 216, 183, 235, 6, 136, 50, 222, 218, 7, 65, 234, 129, 240, 195, 165, 215, 245, 201, 222, 95, 87, 71, 232, 235, 36, 224, 190, 185, 12, 40, 131, 54, 79, 93, 210, 6, 154, 184, 82, 222, 80, 141, 117, 110, 254, 82, 29, 166, 91, 42, 232, 72, 231, 235, 33, 180, 238, 29, 61, 250, 38, 86, 120, 38, 49, 141, 17, 190, 191, 107, 95, 223, 222, 162, 116, 153, 232, 85, 100, 97, 41, 61, 219, 233, 237, 55, 246, 181]
>>> l[:100]
[28, 172, 79, 126, 36, 99, 103, 191, 146, 225, 24, 48, 113, 187, 48, 185, 31, 142, 216, 187, 27, 146, 215, 61, 111, 218, 171, 4, 160, 250, 110, 51, 128, 106, 3, 10, 116, 123, 128, 31, 73, 152, 58, 49, 184, 223, 17, 176, 166, 195, 6, 35, 206, 206, 39, 231, 89, 249, 21, 112, 168, 4, 88, 169, 215, 132, 255, 168, 129, 127, 60, 252, 244, 160, 80, 155, 246, 147, 234, 227, 157, 137, 101, 84, 115, 103, 77, 44, 84, 134, 140, 77, 224, 176, 242, 254, 171, 115, 193, 29]

Don’t iterate by lines for binary files

Don’t do the following – this pulls a chunk of arbitrary size until it gets to a newline character – too slow when the chunks are too small, and possibly too large as well:

    with open(path, 'rb') as file:
        for chunk in file: # text newline iteration - not for bytes
            yield from chunk

The above is only good for what are semantically human readable text files (like plain text, code, markup, markdown etc… essentially anything ascii, utf, latin, etc… encoded) that you should open without the 'b' flag.


回答 5

总结一下chrispy,Skurmedel,Ben Hoyt和Peter Hansen的所有要点,这将是一次处理一个字节的二进制文件的最佳解决方案:

with open("myfile", "rb") as f:
    while True:
        byte = f.read(1)
        if not byte:
            break
        do_stuff_with(ord(byte))

对于python 2.6及更高版本,因为:

  • 内部python缓冲区-无需读取块
  • 干式原理-不要重复读取行
  • with语句可确保关闭文件干净
  • 如果没有更多的字节(不是字节为零),则“ byte”的计算结果为false

或使用JF Sebastians解决方案提高速度

from functools import partial

with open(filename, 'rb') as file:
    for byte in iter(partial(file.read, 1), b''):
        # Do stuff with byte

或者,如果您希望将其用作生成器功能(如codeape所示):

def bytes_from_file(filename):
    with open(filename, "rb") as f:
        while True:
            byte = f.read(1)
            if not byte:
                break
            yield(ord(byte))

# example:
for b in bytes_from_file('filename'):
    do_stuff_with(b)

To sum up all the brilliant points of chrispy, Skurmedel, Ben Hoyt and Peter Hansen, this would be the optimal solution for processing a binary file one byte at a time:

with open("myfile", "rb") as f:
    while True:
        byte = f.read(1)
        if not byte:
            break
        do_stuff_with(ord(byte))

For python versions 2.6 and above, because:

  • python buffers internally – no need to read chunks
  • DRY principle – do not repeat the read line
  • with statement ensures a clean file close
  • ‘byte’ evaluates to false when there are no more bytes (not when a byte is zero)

Or use J. F. Sebastians solution for improved speed

from functools import partial

with open(filename, 'rb') as file:
    for byte in iter(partial(file.read, 1), b''):
        # Do stuff with byte

Or if you want it as a generator function like demonstrated by codeape:

def bytes_from_file(filename):
    with open(filename, "rb") as f:
        while True:
            byte = f.read(1)
            if not byte:
                break
            yield(ord(byte))

# example:
for b in bytes_from_file('filename'):
    do_stuff_with(b)

回答 6

Python 3,一次读取所有文件:

with open("filename", "rb") as binary_file:
    # Read the whole file at once
    data = binary_file.read()
    print(data)

您可以使用data变量迭代任何对象。

Python 3, read all of the file at once:

with open("filename", "rb") as binary_file:
    # Read the whole file at once
    data = binary_file.read()
    print(data)

You can iterate whatever you want using data variable.


回答 7

经过上述所有尝试并使用@Aaron Hall的回答后,在运行Window 10、8 Gb RAM和32位Python 3.5的计算机上,我收到了约90 Mb文件的内存错误。我被同事推荐使用numpy改用它,这样效果会很好。

到目前为止,读取整个二进制文件(我已经测试过)的最快方法是:

import numpy as np

file = "binary_file.bin"
data = np.fromfile(file, 'u1')

参考

到目前为止,速度比任何其他方法都要快。希望它能对某人有所帮助!

After trying all the above and using the answer from @Aaron Hall, I was getting memory errors for a ~90 Mb file on a computer running Window 10, 8 Gb RAM and Python 3.5 32-bit. I was recommended by a colleague to use numpy instead and it works wonders.

By far, the fastest to read an entire binary file (that I have tested) is:

import numpy as np

file = "binary_file.bin"
data = np.fromfile(file, 'u1')

Reference

Multitudes faster than any other methods so far. Hope it helps someone!


回答 8

如果要读取大量二进制数据,则可能需要考虑struct模块。它被记录为“在C和Python类型之间转换”,但是字节当然是字节,并且是否将它们创建为C类型并不重要。例如,如果您的二进制数据包含两个2个字节的整数和一个4个字节的整数,则可以按以下方式读取它们(示例取自struct文档):

>>> struct.unpack('hhl', b'\x00\x01\x00\x02\x00\x00\x00\x03')
(1, 2, 3)

与显式遍历文件内容相比,您可能会发现这更方便,更快或两者兼而有之。

If you have a lot of binary data to read, you might want to consider the struct module. It is documented as converting “between C and Python types”, but of course, bytes are bytes, and whether those were created as C types does not matter. For example, if your binary data contains two 2-byte integers and one 4-byte integer, you can read them as follows (example taken from struct documentation):

>>> struct.unpack('hhl', b'\x00\x01\x00\x02\x00\x00\x00\x03')
(1, 2, 3)

You might find this more convenient, faster, or both, than explicitly looping over the content of a file.


回答 9

这篇文章本身不是该问题的直接答案。相反,它是一个数据驱动的可扩展基准,可用于比较已发布到此问题的许多答案(以及在以后的更现代的Python版本中使用的新功能的变体),因此应该有助于确定哪个具有最佳性能。

在某些情况下,我已经修改了参考答案中的代码,以使其与基准框架兼容。

首先,以下是当前最新版本的Python 2和3的结果:

Fastest to slowest execution speeds with 32-bit Python 2.7.16
  numpy version 1.16.5
  Test file size: 1,024 KiB
  100 executions, best of 3 repetitions

1                  Tcll (array.array) :   3.8943 secs, rel speed   1.00x,   0.00% slower (262.95 KiB/sec)
2  Vinay Sajip (read all into memory) :   4.1164 secs, rel speed   1.06x,   5.71% slower (248.76 KiB/sec)
3            codeape + iter + partial :   4.1616 secs, rel speed   1.07x,   6.87% slower (246.06 KiB/sec)
4                             codeape :   4.1889 secs, rel speed   1.08x,   7.57% slower (244.46 KiB/sec)
5               Vinay Sajip (chunked) :   4.1977 secs, rel speed   1.08x,   7.79% slower (243.94 KiB/sec)
6           Aaron Hall (Py 2 version) :   4.2417 secs, rel speed   1.09x,   8.92% slower (241.41 KiB/sec)
7                     gerrit (struct) :   4.2561 secs, rel speed   1.09x,   9.29% slower (240.59 KiB/sec)
8                     Rick M. (numpy) :   8.1398 secs, rel speed   2.09x, 109.02% slower (125.80 KiB/sec)
9                           Skurmedel :  31.3264 secs, rel speed   8.04x, 704.42% slower ( 32.69 KiB/sec)

Benchmark runtime (min:sec) - 03:26

Fastest to slowest execution speeds with 32-bit Python 3.8.0
  numpy version 1.17.4
  Test file size: 1,024 KiB
  100 executions, best of 3 repetitions

1  Vinay Sajip + "yield from" + "walrus operator" :   3.5235 secs, rel speed   1.00x,   0.00% slower (290.62 KiB/sec)
2                       Aaron Hall + "yield from" :   3.5284 secs, rel speed   1.00x,   0.14% slower (290.22 KiB/sec)
3         codeape + iter + partial + "yield from" :   3.5303 secs, rel speed   1.00x,   0.19% slower (290.06 KiB/sec)
4                      Vinay Sajip + "yield from" :   3.5312 secs, rel speed   1.00x,   0.22% slower (289.99 KiB/sec)
5      codeape + "yield from" + "walrus operator" :   3.5370 secs, rel speed   1.00x,   0.38% slower (289.51 KiB/sec)
6                          codeape + "yield from" :   3.5390 secs, rel speed   1.00x,   0.44% slower (289.35 KiB/sec)
7                                      jfs (mmap) :   4.0612 secs, rel speed   1.15x,  15.26% slower (252.14 KiB/sec)
8              Vinay Sajip (read all into memory) :   4.5948 secs, rel speed   1.30x,  30.40% slower (222.86 KiB/sec)
9                        codeape + iter + partial :   4.5994 secs, rel speed   1.31x,  30.54% slower (222.64 KiB/sec)
10                                        codeape :   4.5995 secs, rel speed   1.31x,  30.54% slower (222.63 KiB/sec)
11                          Vinay Sajip (chunked) :   4.6110 secs, rel speed   1.31x,  30.87% slower (222.08 KiB/sec)
12                      Aaron Hall (Py 2 version) :   4.6292 secs, rel speed   1.31x,  31.38% slower (221.20 KiB/sec)
13                             Tcll (array.array) :   4.8627 secs, rel speed   1.38x,  38.01% slower (210.58 KiB/sec)
14                                gerrit (struct) :   5.0816 secs, rel speed   1.44x,  44.22% slower (201.51 KiB/sec)
15                 Rick M. (numpy) + "yield from" :  11.8084 secs, rel speed   3.35x, 235.13% slower ( 86.72 KiB/sec)
16                                      Skurmedel :  11.8806 secs, rel speed   3.37x, 237.18% slower ( 86.19 KiB/sec)
17                                Rick M. (numpy) :  13.3860 secs, rel speed   3.80x, 279.91% slower ( 76.50 KiB/sec)

Benchmark runtime (min:sec) - 04:47

我还使用更大的10 MiB测试文件(运行了将近一个小时)运行了它,并获得了与上面显示的结果相当的性能结果。

这是用于进行基准测试的代码:

from __future__ import print_function
import array
import atexit
from collections import deque, namedtuple
import io
from mmap import ACCESS_READ, mmap
import numpy as np
from operator import attrgetter
import os
import random
import struct
import sys
import tempfile
from textwrap import dedent
import time
import timeit
import traceback

try:
    xrange
except NameError:  # Python 3
    xrange = range


class KiB(int):
    """ KibiBytes - multiples of the byte units for quantities of information. """
    def __new__(self, value=0):
        return 1024*value


BIG_TEST_FILE = 1  # MiBs or 0 for a small file.
SML_TEST_FILE = KiB(64)
EXECUTIONS = 100  # Number of times each "algorithm" is executed per timing run.
TIMINGS = 3  # Number of timing runs.
CHUNK_SIZE = KiB(8)
if BIG_TEST_FILE:
    FILE_SIZE = KiB(1024) * BIG_TEST_FILE
else:
    FILE_SIZE = SML_TEST_FILE  # For quicker testing.

# Common setup for all algorithms -- prefixed to each algorithm's setup.
COMMON_SETUP = dedent("""
    # Make accessible in algorithms.
    from __main__ import array, deque, get_buffer_size, mmap, np, struct
    from __main__ import ACCESS_READ, CHUNK_SIZE, FILE_SIZE, TEMP_FILENAME
    from functools import partial
    try:
        xrange
    except NameError:  # Python 3
        xrange = range
""")


def get_buffer_size(path):
    """ Determine optimal buffer size for reading files. """
    st = os.stat(path)
    try:
        bufsize = st.st_blksize # Available on some Unix systems (like Linux)
    except AttributeError:
        bufsize = io.DEFAULT_BUFFER_SIZE
    return bufsize

# Utility primarily for use when embedding additional algorithms into benchmark.
VERIFY_NUM_READ = """
    # Verify generator reads correct number of bytes (assumes values are correct).
    bytes_read = sum(1 for _ in file_byte_iterator(TEMP_FILENAME))
    assert bytes_read == FILE_SIZE, \
           'Wrong number of bytes generated: got {:,} instead of {:,}'.format(
                bytes_read, FILE_SIZE)
"""

TIMING = namedtuple('TIMING', 'label, exec_time')

class Algorithm(namedtuple('CodeFragments', 'setup, test')):

    # Default timeit "stmt" code fragment.
    _TEST = """
        #for b in file_byte_iterator(TEMP_FILENAME):  # Loop over every byte.
        #    pass  # Do stuff with byte...
        deque(file_byte_iterator(TEMP_FILENAME), maxlen=0)  # Data sink.
    """

    # Must overload __new__ because (named)tuples are immutable.
    def __new__(cls, setup, test=None):
        """ Dedent (unindent) code fragment string arguments.
        Args:
          `setup` -- Code fragment that defines things used by `test` code.
                     In this case it should define a generator function named
                     `file_byte_iterator()` that will be passed that name of a test file
                     of binary data. This code is not timed.
          `test` -- Code fragment that uses things defined in `setup` code.
                    Defaults to _TEST. This is the code that's timed.
        """
        test =  cls._TEST if test is None else test  # Use default unless one is provided.

        # Uncomment to replace all performance tests with one that verifies the correct
        # number of bytes values are being generated by the file_byte_iterator function.
        #test = VERIFY_NUM_READ

        return tuple.__new__(cls, (dedent(setup), dedent(test)))


algorithms = {

    'Aaron Hall (Py 2 version)': Algorithm("""
        def file_byte_iterator(path):
            with open(path, "rb") as file:
                callable = partial(file.read, 1024)
                sentinel = bytes() # or b''
                for chunk in iter(callable, sentinel):
                    for byte in chunk:
                        yield byte
    """),

    "codeape": Algorithm("""
        def file_byte_iterator(filename, chunksize=CHUNK_SIZE):
            with open(filename, "rb") as f:
                while True:
                    chunk = f.read(chunksize)
                    if chunk:
                        for b in chunk:
                            yield b
                    else:
                        break
    """),

    "codeape + iter + partial": Algorithm("""
        def file_byte_iterator(filename, chunksize=CHUNK_SIZE):
            with open(filename, "rb") as f:
                for chunk in iter(partial(f.read, chunksize), b''):
                    for b in chunk:
                        yield b
    """),

    "gerrit (struct)": Algorithm("""
        def file_byte_iterator(filename):
            with open(filename, "rb") as f:
                fmt = '{}B'.format(FILE_SIZE)  # Reads entire file at once.
                for b in struct.unpack(fmt, f.read()):
                    yield b
    """),

    'Rick M. (numpy)': Algorithm("""
        def file_byte_iterator(filename):
            for byte in np.fromfile(filename, 'u1'):
                yield byte
    """),

    "Skurmedel": Algorithm("""
        def file_byte_iterator(filename):
            with open(filename, "rb") as f:
                byte = f.read(1)
                while byte:
                    yield byte
                    byte = f.read(1)
    """),

    "Tcll (array.array)": Algorithm("""
        def file_byte_iterator(filename):
            with open(filename, "rb") as f:
                arr = array.array('B')
                arr.fromfile(f, FILE_SIZE)  # Reads entire file at once.
                for b in arr:
                    yield b
    """),

    "Vinay Sajip (read all into memory)": Algorithm("""
        def file_byte_iterator(filename):
            with open(filename, "rb") as f:
                bytes_read = f.read()  # Reads entire file at once.
            for b in bytes_read:
                yield b
    """),

    "Vinay Sajip (chunked)": Algorithm("""
        def file_byte_iterator(filename, chunksize=CHUNK_SIZE):
            with open(filename, "rb") as f:
                chunk = f.read(chunksize)
                while chunk:
                    for b in chunk:
                        yield b
                    chunk = f.read(chunksize)
    """),

}  # End algorithms

#
# Versions of algorithms that will only work in certain releases (or better) of Python.
#
if sys.version_info >= (3, 3):
    algorithms.update({

        'codeape + iter + partial + "yield from"': Algorithm("""
            def file_byte_iterator(filename, chunksize=CHUNK_SIZE):
                with open(filename, "rb") as f:
                    for chunk in iter(partial(f.read, chunksize), b''):
                        yield from chunk
        """),

        'codeape + "yield from"': Algorithm("""
            def file_byte_iterator(filename, chunksize=CHUNK_SIZE):
                with open(filename, "rb") as f:
                    while True:
                        chunk = f.read(chunksize)
                        if chunk:
                            yield from chunk
                        else:
                            break
        """),

        "jfs (mmap)": Algorithm("""
            def file_byte_iterator(filename):
                with open(filename, "rb") as f, \
                     mmap(f.fileno(), 0, access=ACCESS_READ) as s:
                    yield from s
        """),

        'Rick M. (numpy) + "yield from"': Algorithm("""
            def file_byte_iterator(filename):
            #    data = np.fromfile(filename, 'u1')
                yield from np.fromfile(filename, 'u1')
        """),

        'Vinay Sajip + "yield from"': Algorithm("""
            def file_byte_iterator(filename, chunksize=CHUNK_SIZE):
                with open(filename, "rb") as f:
                    chunk = f.read(chunksize)
                    while chunk:
                        yield from chunk  # Added in Py 3.3
                        chunk = f.read(chunksize)
        """),

    })  # End Python 3.3 update.

if sys.version_info >= (3, 5):
    algorithms.update({

        'Aaron Hall + "yield from"': Algorithm("""
            from pathlib import Path

            def file_byte_iterator(path):
                ''' Given a path, return an iterator over the file
                    that lazily loads the file.
                '''
                path = Path(path)
                bufsize = get_buffer_size(path)

                with path.open('rb') as file:
                    reader = partial(file.read1, bufsize)
                    for chunk in iter(reader, bytes()):
                        yield from chunk
        """),

    })  # End Python 3.5 update.

if sys.version_info >= (3, 8, 0):
    algorithms.update({

        'Vinay Sajip + "yield from" + "walrus operator"': Algorithm("""
            def file_byte_iterator(filename, chunksize=CHUNK_SIZE):
                with open(filename, "rb") as f:
                    while chunk := f.read(chunksize):
                        yield from chunk  # Added in Py 3.3
        """),

        'codeape + "yield from" + "walrus operator"': Algorithm("""
            def file_byte_iterator(filename, chunksize=CHUNK_SIZE):
                with open(filename, "rb") as f:
                    while chunk := f.read(chunksize):
                        yield from chunk
        """),

    })  # End Python 3.8.0 update.update.


#### Main ####

def main():
    global TEMP_FILENAME

    def cleanup():
        """ Clean up after testing is completed. """
        try:
            os.remove(TEMP_FILENAME)  # Delete the temporary file.
        except Exception:
            pass

    atexit.register(cleanup)

    # Create a named temporary binary file of pseudo-random bytes for testing.
    fd, TEMP_FILENAME = tempfile.mkstemp('.bin')
    with os.fdopen(fd, 'wb') as file:
         os.write(fd, bytearray(random.randrange(256) for _ in range(FILE_SIZE)))

    # Execute and time each algorithm, gather results.
    start_time = time.time()  # To determine how long testing itself takes.

    timings = []
    for label in algorithms:
        try:
            timing = TIMING(label,
                            min(timeit.repeat(algorithms[label].test,
                                              setup=COMMON_SETUP + algorithms[label].setup,
                                              repeat=TIMINGS, number=EXECUTIONS)))
        except Exception as exc:
            print('{} occurred timing the algorithm: "{}"\n  {}'.format(
                    type(exc).__name__, label, exc))
            traceback.print_exc(file=sys.stdout)  # Redirect to stdout.
            sys.exit(1)
        timings.append(timing)

    # Report results.
    print('Fastest to slowest execution speeds with {}-bit Python {}.{}.{}'.format(
            64 if sys.maxsize > 2**32 else 32, *sys.version_info[:3]))
    print('  numpy version {}'.format(np.version.full_version))
    print('  Test file size: {:,} KiB'.format(FILE_SIZE // KiB(1)))
    print('  {:,d} executions, best of {:d} repetitions'.format(EXECUTIONS, TIMINGS))
    print()

    longest = max(len(timing.label) for timing in timings)  # Len of longest identifier.
    ranked = sorted(timings, key=attrgetter('exec_time')) # Sort so fastest is first.
    fastest = ranked[0].exec_time
    for rank, timing in enumerate(ranked, 1):
        print('{:<2d} {:>{width}} : {:8.4f} secs, rel speed {:6.2f}x, {:6.2f}% slower '
              '({:6.2f} KiB/sec)'.format(
                    rank,
                    timing.label, timing.exec_time, round(timing.exec_time/fastest, 2),
                    round((timing.exec_time/fastest - 1) * 100, 2),
                    (FILE_SIZE/timing.exec_time) / KiB(1),  # per sec.
                    width=longest))
    print()
    mins, secs = divmod(time.time()-start_time, 60)
    print('Benchmark runtime (min:sec) - {:02d}:{:02d}'.format(int(mins),
                                                               int(round(secs))))

main()

This post itself is not a direct answer to the question. What it is instead is a data-driven extensible benchmark that can be used to compare many of the answers (and variations of utilizing new features added in later, more modern, versions of Python) that have been posted to this question — and should therefore be helpful in determining which has the best performance.

In a few cases I’ve modified the code in the referenced answer to make it compatible with the benchmark framework.

First, here are the results for what currently are the latest versions of Python 2 & 3:

Fastest to slowest execution speeds with 32-bit Python 2.7.16
  numpy version 1.16.5
  Test file size: 1,024 KiB
  100 executions, best of 3 repetitions

1                  Tcll (array.array) :   3.8943 secs, rel speed   1.00x,   0.00% slower (262.95 KiB/sec)
2  Vinay Sajip (read all into memory) :   4.1164 secs, rel speed   1.06x,   5.71% slower (248.76 KiB/sec)
3            codeape + iter + partial :   4.1616 secs, rel speed   1.07x,   6.87% slower (246.06 KiB/sec)
4                             codeape :   4.1889 secs, rel speed   1.08x,   7.57% slower (244.46 KiB/sec)
5               Vinay Sajip (chunked) :   4.1977 secs, rel speed   1.08x,   7.79% slower (243.94 KiB/sec)
6           Aaron Hall (Py 2 version) :   4.2417 secs, rel speed   1.09x,   8.92% slower (241.41 KiB/sec)
7                     gerrit (struct) :   4.2561 secs, rel speed   1.09x,   9.29% slower (240.59 KiB/sec)
8                     Rick M. (numpy) :   8.1398 secs, rel speed   2.09x, 109.02% slower (125.80 KiB/sec)
9                           Skurmedel :  31.3264 secs, rel speed   8.04x, 704.42% slower ( 32.69 KiB/sec)

Benchmark runtime (min:sec) - 03:26

Fastest to slowest execution speeds with 32-bit Python 3.8.0
  numpy version 1.17.4
  Test file size: 1,024 KiB
  100 executions, best of 3 repetitions

1  Vinay Sajip + "yield from" + "walrus operator" :   3.5235 secs, rel speed   1.00x,   0.00% slower (290.62 KiB/sec)
2                       Aaron Hall + "yield from" :   3.5284 secs, rel speed   1.00x,   0.14% slower (290.22 KiB/sec)
3         codeape + iter + partial + "yield from" :   3.5303 secs, rel speed   1.00x,   0.19% slower (290.06 KiB/sec)
4                      Vinay Sajip + "yield from" :   3.5312 secs, rel speed   1.00x,   0.22% slower (289.99 KiB/sec)
5      codeape + "yield from" + "walrus operator" :   3.5370 secs, rel speed   1.00x,   0.38% slower (289.51 KiB/sec)
6                          codeape + "yield from" :   3.5390 secs, rel speed   1.00x,   0.44% slower (289.35 KiB/sec)
7                                      jfs (mmap) :   4.0612 secs, rel speed   1.15x,  15.26% slower (252.14 KiB/sec)
8              Vinay Sajip (read all into memory) :   4.5948 secs, rel speed   1.30x,  30.40% slower (222.86 KiB/sec)
9                        codeape + iter + partial :   4.5994 secs, rel speed   1.31x,  30.54% slower (222.64 KiB/sec)
10                                        codeape :   4.5995 secs, rel speed   1.31x,  30.54% slower (222.63 KiB/sec)
11                          Vinay Sajip (chunked) :   4.6110 secs, rel speed   1.31x,  30.87% slower (222.08 KiB/sec)
12                      Aaron Hall (Py 2 version) :   4.6292 secs, rel speed   1.31x,  31.38% slower (221.20 KiB/sec)
13                             Tcll (array.array) :   4.8627 secs, rel speed   1.38x,  38.01% slower (210.58 KiB/sec)
14                                gerrit (struct) :   5.0816 secs, rel speed   1.44x,  44.22% slower (201.51 KiB/sec)
15                 Rick M. (numpy) + "yield from" :  11.8084 secs, rel speed   3.35x, 235.13% slower ( 86.72 KiB/sec)
16                                      Skurmedel :  11.8806 secs, rel speed   3.37x, 237.18% slower ( 86.19 KiB/sec)
17                                Rick M. (numpy) :  13.3860 secs, rel speed   3.80x, 279.91% slower ( 76.50 KiB/sec)

Benchmark runtime (min:sec) - 04:47

I also ran it with a much larger 10 MiB test file (which took nearly an hour to run) and got performance results which were comparable to those shown above.

Here’s the code used to do the benchmarking:

from __future__ import print_function
import array
import atexit
from collections import deque, namedtuple
import io
from mmap import ACCESS_READ, mmap
import numpy as np
from operator import attrgetter
import os
import random
import struct
import sys
import tempfile
from textwrap import dedent
import time
import timeit
import traceback

try:
    xrange
except NameError:  # Python 3
    xrange = range


class KiB(int):
    """ KibiBytes - multiples of the byte units for quantities of information. """
    def __new__(self, value=0):
        return 1024*value


BIG_TEST_FILE = 1  # MiBs or 0 for a small file.
SML_TEST_FILE = KiB(64)
EXECUTIONS = 100  # Number of times each "algorithm" is executed per timing run.
TIMINGS = 3  # Number of timing runs.
CHUNK_SIZE = KiB(8)
if BIG_TEST_FILE:
    FILE_SIZE = KiB(1024) * BIG_TEST_FILE
else:
    FILE_SIZE = SML_TEST_FILE  # For quicker testing.

# Common setup for all algorithms -- prefixed to each algorithm's setup.
COMMON_SETUP = dedent("""
    # Make accessible in algorithms.
    from __main__ import array, deque, get_buffer_size, mmap, np, struct
    from __main__ import ACCESS_READ, CHUNK_SIZE, FILE_SIZE, TEMP_FILENAME
    from functools import partial
    try:
        xrange
    except NameError:  # Python 3
        xrange = range
""")


def get_buffer_size(path):
    """ Determine optimal buffer size for reading files. """
    st = os.stat(path)
    try:
        bufsize = st.st_blksize # Available on some Unix systems (like Linux)
    except AttributeError:
        bufsize = io.DEFAULT_BUFFER_SIZE
    return bufsize

# Utility primarily for use when embedding additional algorithms into benchmark.
VERIFY_NUM_READ = """
    # Verify generator reads correct number of bytes (assumes values are correct).
    bytes_read = sum(1 for _ in file_byte_iterator(TEMP_FILENAME))
    assert bytes_read == FILE_SIZE, \
           'Wrong number of bytes generated: got {:,} instead of {:,}'.format(
                bytes_read, FILE_SIZE)
"""

TIMING = namedtuple('TIMING', 'label, exec_time')

class Algorithm(namedtuple('CodeFragments', 'setup, test')):

    # Default timeit "stmt" code fragment.
    _TEST = """
        #for b in file_byte_iterator(TEMP_FILENAME):  # Loop over every byte.
        #    pass  # Do stuff with byte...
        deque(file_byte_iterator(TEMP_FILENAME), maxlen=0)  # Data sink.
    """

    # Must overload __new__ because (named)tuples are immutable.
    def __new__(cls, setup, test=None):
        """ Dedent (unindent) code fragment string arguments.
        Args:
          `setup` -- Code fragment that defines things used by `test` code.
                     In this case it should define a generator function named
                     `file_byte_iterator()` that will be passed that name of a test file
                     of binary data. This code is not timed.
          `test` -- Code fragment that uses things defined in `setup` code.
                    Defaults to _TEST. This is the code that's timed.
        """
        test =  cls._TEST if test is None else test  # Use default unless one is provided.

        # Uncomment to replace all performance tests with one that verifies the correct
        # number of bytes values are being generated by the file_byte_iterator function.
        #test = VERIFY_NUM_READ

        return tuple.__new__(cls, (dedent(setup), dedent(test)))


algorithms = {

    'Aaron Hall (Py 2 version)': Algorithm("""
        def file_byte_iterator(path):
            with open(path, "rb") as file:
                callable = partial(file.read, 1024)
                sentinel = bytes() # or b''
                for chunk in iter(callable, sentinel):
                    for byte in chunk:
                        yield byte
    """),

    "codeape": Algorithm("""
        def file_byte_iterator(filename, chunksize=CHUNK_SIZE):
            with open(filename, "rb") as f:
                while True:
                    chunk = f.read(chunksize)
                    if chunk:
                        for b in chunk:
                            yield b
                    else:
                        break
    """),

    "codeape + iter + partial": Algorithm("""
        def file_byte_iterator(filename, chunksize=CHUNK_SIZE):
            with open(filename, "rb") as f:
                for chunk in iter(partial(f.read, chunksize), b''):
                    for b in chunk:
                        yield b
    """),

    "gerrit (struct)": Algorithm("""
        def file_byte_iterator(filename):
            with open(filename, "rb") as f:
                fmt = '{}B'.format(FILE_SIZE)  # Reads entire file at once.
                for b in struct.unpack(fmt, f.read()):
                    yield b
    """),

    'Rick M. (numpy)': Algorithm("""
        def file_byte_iterator(filename):
            for byte in np.fromfile(filename, 'u1'):
                yield byte
    """),

    "Skurmedel": Algorithm("""
        def file_byte_iterator(filename):
            with open(filename, "rb") as f:
                byte = f.read(1)
                while byte:
                    yield byte
                    byte = f.read(1)
    """),

    "Tcll (array.array)": Algorithm("""
        def file_byte_iterator(filename):
            with open(filename, "rb") as f:
                arr = array.array('B')
                arr.fromfile(f, FILE_SIZE)  # Reads entire file at once.
                for b in arr:
                    yield b
    """),

    "Vinay Sajip (read all into memory)": Algorithm("""
        def file_byte_iterator(filename):
            with open(filename, "rb") as f:
                bytes_read = f.read()  # Reads entire file at once.
            for b in bytes_read:
                yield b
    """),

    "Vinay Sajip (chunked)": Algorithm("""
        def file_byte_iterator(filename, chunksize=CHUNK_SIZE):
            with open(filename, "rb") as f:
                chunk = f.read(chunksize)
                while chunk:
                    for b in chunk:
                        yield b
                    chunk = f.read(chunksize)
    """),

}  # End algorithms

#
# Versions of algorithms that will only work in certain releases (or better) of Python.
#
if sys.version_info >= (3, 3):
    algorithms.update({

        'codeape + iter + partial + "yield from"': Algorithm("""
            def file_byte_iterator(filename, chunksize=CHUNK_SIZE):
                with open(filename, "rb") as f:
                    for chunk in iter(partial(f.read, chunksize), b''):
                        yield from chunk
        """),

        'codeape + "yield from"': Algorithm("""
            def file_byte_iterator(filename, chunksize=CHUNK_SIZE):
                with open(filename, "rb") as f:
                    while True:
                        chunk = f.read(chunksize)
                        if chunk:
                            yield from chunk
                        else:
                            break
        """),

        "jfs (mmap)": Algorithm("""
            def file_byte_iterator(filename):
                with open(filename, "rb") as f, \
                     mmap(f.fileno(), 0, access=ACCESS_READ) as s:
                    yield from s
        """),

        'Rick M. (numpy) + "yield from"': Algorithm("""
            def file_byte_iterator(filename):
            #    data = np.fromfile(filename, 'u1')
                yield from np.fromfile(filename, 'u1')
        """),

        'Vinay Sajip + "yield from"': Algorithm("""
            def file_byte_iterator(filename, chunksize=CHUNK_SIZE):
                with open(filename, "rb") as f:
                    chunk = f.read(chunksize)
                    while chunk:
                        yield from chunk  # Added in Py 3.3
                        chunk = f.read(chunksize)
        """),

    })  # End Python 3.3 update.

if sys.version_info >= (3, 5):
    algorithms.update({

        'Aaron Hall + "yield from"': Algorithm("""
            from pathlib import Path

            def file_byte_iterator(path):
                ''' Given a path, return an iterator over the file
                    that lazily loads the file.
                '''
                path = Path(path)
                bufsize = get_buffer_size(path)

                with path.open('rb') as file:
                    reader = partial(file.read1, bufsize)
                    for chunk in iter(reader, bytes()):
                        yield from chunk
        """),

    })  # End Python 3.5 update.

if sys.version_info >= (3, 8, 0):
    algorithms.update({

        'Vinay Sajip + "yield from" + "walrus operator"': Algorithm("""
            def file_byte_iterator(filename, chunksize=CHUNK_SIZE):
                with open(filename, "rb") as f:
                    while chunk := f.read(chunksize):
                        yield from chunk  # Added in Py 3.3
        """),

        'codeape + "yield from" + "walrus operator"': Algorithm("""
            def file_byte_iterator(filename, chunksize=CHUNK_SIZE):
                with open(filename, "rb") as f:
                    while chunk := f.read(chunksize):
                        yield from chunk
        """),

    })  # End Python 3.8.0 update.update.


#### Main ####

def main():
    global TEMP_FILENAME

    def cleanup():
        """ Clean up after testing is completed. """
        try:
            os.remove(TEMP_FILENAME)  # Delete the temporary file.
        except Exception:
            pass

    atexit.register(cleanup)

    # Create a named temporary binary file of pseudo-random bytes for testing.
    fd, TEMP_FILENAME = tempfile.mkstemp('.bin')
    with os.fdopen(fd, 'wb') as file:
         os.write(fd, bytearray(random.randrange(256) for _ in range(FILE_SIZE)))

    # Execute and time each algorithm, gather results.
    start_time = time.time()  # To determine how long testing itself takes.

    timings = []
    for label in algorithms:
        try:
            timing = TIMING(label,
                            min(timeit.repeat(algorithms[label].test,
                                              setup=COMMON_SETUP + algorithms[label].setup,
                                              repeat=TIMINGS, number=EXECUTIONS)))
        except Exception as exc:
            print('{} occurred timing the algorithm: "{}"\n  {}'.format(
                    type(exc).__name__, label, exc))
            traceback.print_exc(file=sys.stdout)  # Redirect to stdout.
            sys.exit(1)
        timings.append(timing)

    # Report results.
    print('Fastest to slowest execution speeds with {}-bit Python {}.{}.{}'.format(
            64 if sys.maxsize > 2**32 else 32, *sys.version_info[:3]))
    print('  numpy version {}'.format(np.version.full_version))
    print('  Test file size: {:,} KiB'.format(FILE_SIZE // KiB(1)))
    print('  {:,d} executions, best of {:d} repetitions'.format(EXECUTIONS, TIMINGS))
    print()

    longest = max(len(timing.label) for timing in timings)  # Len of longest identifier.
    ranked = sorted(timings, key=attrgetter('exec_time')) # Sort so fastest is first.
    fastest = ranked[0].exec_time
    for rank, timing in enumerate(ranked, 1):
        print('{:<2d} {:>{width}} : {:8.4f} secs, rel speed {:6.2f}x, {:6.2f}% slower '
              '({:6.2f} KiB/sec)'.format(
                    rank,
                    timing.label, timing.exec_time, round(timing.exec_time/fastest, 2),
                    round((timing.exec_time/fastest - 1) * 100, 2),
                    (FILE_SIZE/timing.exec_time) / KiB(1),  # per sec.
                    width=longest))
    print()
    mins, secs = divmod(time.time()-start_time, 60)
    print('Benchmark runtime (min:sec) - {:02d}:{:02d}'.format(int(mins),
                                                               int(round(secs))))

main()

回答 10

如果您正在寻找快速的东西,这是我多年来一直使用的一种方法:

from array import array

with open( path, 'rb' ) as file:
    data = array( 'B', file.read() ) # buffer the file

# evaluate it's data
for byte in data:
    v = byte # int value
    c = chr(byte)

如果要迭代char而不是ints,则可以简单地使用data = file.read(),它应该是py3中的bytes()对象。

if you are looking for something speedy, here’s a method I’ve been using that’s worked for years:

from array import array

with open( path, 'rb' ) as file:
    data = array( 'B', file.read() ) # buffer the file

# evaluate it's data
for byte in data:
    v = byte # int value
    c = chr(byte)

if you want to iterate chars instead of ints, you can simply use data = file.read(), which should be a bytes() object in py3.


读取整个文件是否会使文件句柄保持打开状态?

问题:读取整个文件是否会使文件句柄保持打开状态?

如果您读取整个文件时content = open('Path/to/file', 'r').read(),文件句柄是否一直打开直到脚本退出?有没有更简洁的方法来读取整个文件?

If you read an entire file with content = open('Path/to/file', 'r').read() is the file handle left open until the script exits? Is there a more concise method to read a whole file?


回答 0

这个问题的答案在某种程度上取决于特定的Python实现。

要了解所有内容,请特别注意实际file对象。在您的代码中,该对象仅在表达式中被提及一次,并且在read()调用返回后立即变得不可访问。

这意味着文件对象是垃圾。剩下的唯一问题是“垃圾收集器何时收集文件对象?”。

在使用引用计数器的CPython中,这种垃圾立即被注意到,因此将立即被收集。这通常不适用于其他python实现。

确保该文件已关闭的一种更好的解决方案是以下模式:

with open('Path/to/file', 'r') as content_file:
    content = content_file.read()

块结束后,它将始终立即关闭文件;即使发生异常。

编辑:在上面提出一个更好的点:

除了file.__exit__(),这是在with上下文管理器设置中“自动”调用的,唯一file.close()可以自动调用的其他方法(即,除了自己明确调用之外)是via file.__del__()。这就引出了我们什么时候__del__()打电话的问题?

正确编写的程序不能假定终结器将在程序终止之前的任何时候运行。

https://devblogs.microsoft.com/oldnewthing/20100809-00/?p=13203

特别是:

从不显式销毁对象。但是,当它们变得不可访问时,它们可能会被垃圾回收。允许实现推迟或完全取消垃圾回收 -只要没有收集仍可到达的对象,垃圾回收的实现方式就取决于实现质量。

[…]

CPython当前使用带有循环计数垃圾的(可选)延迟检测的引用计数方案,该方案会在无法访问时立即收集大多数对象,但不能保证收集包含循环引用的垃圾。

https://docs.python.org/3.5/reference/datamodel.html#objects-values-and-types

(强调我的)

但正如它暗示的那样,其他实现可能具有其他行为。例如,PyPy 6种不同的垃圾回收实现

The answer to that question depends somewhat on the particular Python implementation.

To understand what this is all about, pay particular attention to the actual file object. In your code, that object is mentioned only once, in an expression, and becomes inaccessible immediately after the read() call returns.

This means that the file object is garbage. The only remaining question is “When will the garbage collector collect the file object?”.

in CPython, which uses a reference counter, this kind of garbage is noticed immediately, and so it will be collected immediately. This is not generally true of other python implementations.

A better solution, to make sure that the file is closed, is this pattern:

with open('Path/to/file', 'r') as content_file:
    content = content_file.read()

which will always close the file immediately after the block ends; even if an exception occurs.

Edit: To put a finer point on it:

Other than file.__exit__(), which is “automatically” called in a with context manager setting, the only other way that file.close() is automatically called (that is, other than explicitly calling it yourself,) is via file.__del__(). This leads us to the question of when does __del__() get called?

A correctly-written program cannot assume that finalizers will ever run at any point prior to program termination.

https://devblogs.microsoft.com/oldnewthing/20100809-00/?p=13203

In particular:

Objects are never explicitly destroyed; however, when they become unreachable they may be garbage-collected. An implementation is allowed to postpone garbage collection or omit it altogether — it is a matter of implementation quality how garbage collection is implemented, as long as no objects are collected that are still reachable.

[…]

CPython currently uses a reference-counting scheme with (optional) delayed detection of cyclically linked garbage, which collects most objects as soon as they become unreachable, but is not guaranteed to collect garbage containing circular references.

https://docs.python.org/3.5/reference/datamodel.html#objects-values-and-types

(Emphasis mine)

but as it suggests, other implementations may have other behavior. As an example, PyPy has 6 different garbage collection implementations!


回答 1

您可以使用pathlib

对于Python 3.5及更高版本:

from pathlib import Path
contents = Path(file_path).read_text()

对于旧版本的Python,请使用pathlib2

$ pip install pathlib2

然后:

from pathlib2 import Path
contents = Path(file_path).read_text()

这是实际的read_text 实现

def read_text(self, encoding=None, errors=None):
    """
    Open the file in text mode, read it, and close the file.
    """
    with self.open(mode='r', encoding=encoding, errors=errors) as f:
        return f.read()

You can use pathlib.

For Python 3.5 and above:

from pathlib import Path
contents = Path(file_path).read_text()

For older versions of Python use pathlib2:

$ pip install pathlib2

Then:

from pathlib2 import Path
contents = Path(file_path).read_text()

This is the actual read_text implementation:

def read_text(self, encoding=None, errors=None):
    """
    Open the file in text mode, read it, and close the file.
    """
    with self.open(mode='r', encoding=encoding, errors=errors) as f:
        return f.read()

回答 2

好吧,如果您必须逐行读取文件才能使用每一行,则可以使用

with open('Path/to/file', 'r') as f:
    s = f.readline()
    while s:
        # do whatever you want to
        s = f.readline()

甚至更好的方法:

with open('Path/to/file') as f:
    for line in f:
        # do whatever you want to

Well, if you have to read file line by line to work with each line, you can use

with open('Path/to/file', 'r') as f:
    s = f.readline()
    while s:
        # do whatever you want to
        s = f.readline()

Or even better way:

with open('Path/to/file') as f:
    for line in f:
        # do whatever you want to

回答 3

与其将文件内容作为单个字符串检索,不如将其存储为文件包括的所有行的列表,这很方便:

with open('Path/to/file', 'r') as content_file:
    content_list = content_file.read().strip().split("\n")

可以看出,一个需要经连结的方法添加.strip().split("\n")这个线程的主要答案

在这里,.strip()只需删除整个文件字符串末尾的空格和换行符,并.split("\n")通过在每个换行符 \ n处拆分整个文件字符串来生成实际列表。

而且,这种方式可以将整个文件内容存储在一个变量中,这在某些情况下可能是需要的,而不是像在文件中指出的那样逐行循环文件。 先前的答案中

Instead of retrieving the file content as a single string, it can be handy to store the content as a list of all lines the file comprises:

with open('Path/to/file', 'r') as content_file:
    content_list = content_file.read().strip().split("\n")

As can be seen, one needs to add the concatenated methods .strip().split("\n") to the main answer in this thread.

Here, .strip() just removes whitespace and newline characters at the endings of the entire file string, and .split("\n") produces the actual list via splitting the entire file string at every newline character \n.

Moreover, this way the entire file content can be stored in a variable, which might be desired in some cases, instead of looping over the file line by line as pointed out in this previous answer.