标签归档:file

使用Python获取文件的最后n行,类似于tail

问题:使用Python获取文件的最后n行,类似于tail

我正在为Web应用程序编写日志文件查看器,为此,我想在日志文件的各行中进行分页。文件中的项目是基于行的,底部是最新的项目。

因此,我需要一种tail()可以n从底部读取行并支持偏移量的方法。我想到的是这样的:

def tail(f, n, offset=0):
    """Reads a n lines from f with an offset of offset lines."""
    avg_line_length = 74
    to_read = n + offset
    while 1:
        try:
            f.seek(-(avg_line_length * to_read), 2)
        except IOError:
            # woops.  apparently file is smaller than what we want
            # to step back, go to the beginning instead
            f.seek(0)
        pos = f.tell()
        lines = f.read().splitlines()
        if len(lines) >= to_read or pos == 0:
            return lines[-to_read:offset and -offset or None]
        avg_line_length *= 1.3

这是合理的方法吗?推荐使用偏移量尾部日志文件的推荐方法是什么?

I’m writing a log file viewer for a web application and for that I want to paginate through the lines of the log file. The items in the file are line based with the newest item on the bottom.

So I need a tail() method that can read n lines from the bottom and supports an offset. What I came up with looks like this:

def tail(f, n, offset=0):
    """Reads a n lines from f with an offset of offset lines."""
    avg_line_length = 74
    to_read = n + offset
    while 1:
        try:
            f.seek(-(avg_line_length * to_read), 2)
        except IOError:
            # woops.  apparently file is smaller than what we want
            # to step back, go to the beginning instead
            f.seek(0)
        pos = f.tell()
        lines = f.read().splitlines()
        if len(lines) >= to_read or pos == 0:
            return lines[-to_read:offset and -offset or None]
        avg_line_length *= 1.3

Is this a reasonable approach? What is the recommended way to tail log files with offsets?


回答 0

这可能比您的要快。不假设行长。一次返回一个文件块,直到找到正确数量的’\ n’字符为止。

def tail( f, lines=20 ):
    total_lines_wanted = lines

    BLOCK_SIZE = 1024
    f.seek(0, 2)
    block_end_byte = f.tell()
    lines_to_go = total_lines_wanted
    block_number = -1
    blocks = [] # blocks of size BLOCK_SIZE, in reverse order starting
                # from the end of the file
    while lines_to_go > 0 and block_end_byte > 0:
        if (block_end_byte - BLOCK_SIZE > 0):
            # read the last block we haven't yet read
            f.seek(block_number*BLOCK_SIZE, 2)
            blocks.append(f.read(BLOCK_SIZE))
        else:
            # file too small, start from begining
            f.seek(0,0)
            # only read what was not read
            blocks.append(f.read(block_end_byte))
        lines_found = blocks[-1].count('\n')
        lines_to_go -= lines_found
        block_end_byte -= BLOCK_SIZE
        block_number -= 1
    all_read_text = ''.join(reversed(blocks))
    return '\n'.join(all_read_text.splitlines()[-total_lines_wanted:])

我不喜欢关于行长的棘手假设,实际上,您永远都不知道那样的事情。

通常,这将在循环的第一遍或第二遍中定位最后20行。如果您的74个字符实际上是准确的,则将块大小设置为2048,并且几乎立即尾随20行。

另外,我不会消耗大量的大脑卡路里来尝试与物理OS块进行精确对齐。使用这些高级I / O程序包,我怀疑您会发现尝试在OS块边界上对齐会对性能产生任何影响。如果使用较低级别的I / O,则可能会看到加速。


更新

对于Python 3.2及更高版本,请按照字节上的说明进行操作,如在文本文件中(那些在模式字符串中打开而没有“ b”的文件),则仅允许相对于文件开头的查找(exceptions是查找到文件末尾)与seek(0,2)).:

例如: f = open('C:/.../../apache_logs.txt', 'rb')

 def tail(f, lines=20):
    total_lines_wanted = lines

    BLOCK_SIZE = 1024
    f.seek(0, 2)
    block_end_byte = f.tell()
    lines_to_go = total_lines_wanted
    block_number = -1
    blocks = []
    while lines_to_go > 0 and block_end_byte > 0:
        if (block_end_byte - BLOCK_SIZE > 0):
            f.seek(block_number*BLOCK_SIZE, 2)
            blocks.append(f.read(BLOCK_SIZE))
        else:
            f.seek(0,0)
            blocks.append(f.read(block_end_byte))
        lines_found = blocks[-1].count(b'\n')
        lines_to_go -= lines_found
        block_end_byte -= BLOCK_SIZE
        block_number -= 1
    all_read_text = b''.join(reversed(blocks))
    return b'\n'.join(all_read_text.splitlines()[-total_lines_wanted:])

This may be quicker than yours. Makes no assumptions about line length. Backs through the file one block at a time till it’s found the right number of ‘\n’ characters.

def tail( f, lines=20 ):
    total_lines_wanted = lines

    BLOCK_SIZE = 1024
    f.seek(0, 2)
    block_end_byte = f.tell()
    lines_to_go = total_lines_wanted
    block_number = -1
    blocks = [] # blocks of size BLOCK_SIZE, in reverse order starting
                # from the end of the file
    while lines_to_go > 0 and block_end_byte > 0:
        if (block_end_byte - BLOCK_SIZE > 0):
            # read the last block we haven't yet read
            f.seek(block_number*BLOCK_SIZE, 2)
            blocks.append(f.read(BLOCK_SIZE))
        else:
            # file too small, start from begining
            f.seek(0,0)
            # only read what was not read
            blocks.append(f.read(block_end_byte))
        lines_found = blocks[-1].count('\n')
        lines_to_go -= lines_found
        block_end_byte -= BLOCK_SIZE
        block_number -= 1
    all_read_text = ''.join(reversed(blocks))
    return '\n'.join(all_read_text.splitlines()[-total_lines_wanted:])

I don’t like tricky assumptions about line length when — as a practical matter — you can never know things like that.

Generally, this will locate the last 20 lines on the first or second pass through the loop. If your 74 character thing is actually accurate, you make the block size 2048 and you’ll tail 20 lines almost immediately.

Also, I don’t burn a lot of brain calories trying to finesse alignment with physical OS blocks. Using these high-level I/O packages, I doubt you’ll see any performance consequence of trying to align on OS block boundaries. If you use lower-level I/O, then you might see a speedup.


UPDATE

for Python 3.2 and up, follow the process on bytes as In text files (those opened without a “b” in the mode string), only seeks relative to the beginning of the file are allowed (the exception being seeking to the very file end with seek(0, 2)).:

eg: f = open('C:/.../../apache_logs.txt', 'rb')

 def tail(f, lines=20):
    total_lines_wanted = lines

    BLOCK_SIZE = 1024
    f.seek(0, 2)
    block_end_byte = f.tell()
    lines_to_go = total_lines_wanted
    block_number = -1
    blocks = []
    while lines_to_go > 0 and block_end_byte > 0:
        if (block_end_byte - BLOCK_SIZE > 0):
            f.seek(block_number*BLOCK_SIZE, 2)
            blocks.append(f.read(BLOCK_SIZE))
        else:
            f.seek(0,0)
            blocks.append(f.read(block_end_byte))
        lines_found = blocks[-1].count(b'\n')
        lines_to_go -= lines_found
        block_end_byte -= BLOCK_SIZE
        block_number -= 1
    all_read_text = b''.join(reversed(blocks))
    return b'\n'.join(all_read_text.splitlines()[-total_lines_wanted:])

回答 1

假设您可以在Python 2上使用类似unix的系统:

import os
def tail(f, n, offset=0):
  stdin,stdout = os.popen2("tail -n "+n+offset+" "+f)
  stdin.close()
  lines = stdout.readlines(); stdout.close()
  return lines[:,-offset]

对于python 3,您可以执行以下操作:

import subprocess
def tail(f, n, offset=0):
    proc = subprocess.Popen(['tail', '-n', n + offset, f], stdout=subprocess.PIPE)
    lines = proc.stdout.readlines()
    return lines[:, -offset]

Assumes a unix-like system on Python 2 you can do:

import os
def tail(f, n, offset=0):
  stdin,stdout = os.popen2("tail -n "+n+offset+" "+f)
  stdin.close()
  lines = stdout.readlines(); stdout.close()
  return lines[:,-offset]

For python 3 you may do:

import subprocess
def tail(f, n, offset=0):
    proc = subprocess.Popen(['tail', '-n', n + offset, f], stdout=subprocess.PIPE)
    lines = proc.stdout.readlines()
    return lines[:, -offset]

回答 2

这是我的答案。纯Python。使用timeit似乎非常快。拖尾具有100,000行的日志文件的100行:

>>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=10)
0.0014600753784179688
>>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=100)
0.00899195671081543
>>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=1000)
0.05842900276184082
>>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=10000)
0.5394978523254395
>>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=100000)
5.377126932144165

这是代码:

import os


def tail(f, lines=1, _buffer=4098):
    """Tail a file and get X lines from the end"""
    # place holder for the lines found
    lines_found = []

    # block counter will be multiplied by buffer
    # to get the block size from the end
    block_counter = -1

    # loop until we find X lines
    while len(lines_found) < lines:
        try:
            f.seek(block_counter * _buffer, os.SEEK_END)
        except IOError:  # either file is too small, or too many lines requested
            f.seek(0)
            lines_found = f.readlines()
            break

        lines_found = f.readlines()

        # we found enough lines, get out
        # Removed this line because it was redundant the while will catch
        # it, I left it for history
        # if len(lines_found) > lines:
        #    break

        # decrement the block counter to get the
        # next X bytes
        block_counter -= 1

    return lines_found[-lines:]

Here is my answer. Pure python. Using timeit it seems pretty fast. Tailing 100 lines of a log file that has 100,000 lines:

>>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=10)
0.0014600753784179688
>>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=100)
0.00899195671081543
>>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=1000)
0.05842900276184082
>>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=10000)
0.5394978523254395
>>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=100000)
5.377126932144165

Here is the code:

import os


def tail(f, lines=1, _buffer=4098):
    """Tail a file and get X lines from the end"""
    # place holder for the lines found
    lines_found = []

    # block counter will be multiplied by buffer
    # to get the block size from the end
    block_counter = -1

    # loop until we find X lines
    while len(lines_found) < lines:
        try:
            f.seek(block_counter * _buffer, os.SEEK_END)
        except IOError:  # either file is too small, or too many lines requested
            f.seek(0)
            lines_found = f.readlines()
            break

        lines_found = f.readlines()

        # we found enough lines, get out
        # Removed this line because it was redundant the while will catch
        # it, I left it for history
        # if len(lines_found) > lines:
        #    break

        # decrement the block counter to get the
        # next X bytes
        block_counter -= 1

    return lines_found[-lines:]

回答 3

如果可以读取整个文件,请使用双端队列。

from collections import deque
deque(f, maxlen=n)

在2.6之前,双端队列没有maxlen选项,但实施起来很容易。

import itertools
def maxque(items, size):
    items = iter(items)
    q = deque(itertools.islice(items, size))
    for item in items:
        del q[0]
        q.append(item)
    return q

如果需要从头开始读取文件,请使用疾驰(也就是指数)搜索。

def tail(f, n):
    assert n >= 0
    pos, lines = n+1, []
    while len(lines) <= n:
        try:
            f.seek(-pos, 2)
        except IOError:
            f.seek(0)
            break
        finally:
            lines = list(f)
        pos *= 2
    return lines[-n:]

If reading the whole file is acceptable then use a deque.

from collections import deque
deque(f, maxlen=n)

Prior to 2.6, deques didn’t have a maxlen option, but it’s easy enough to implement.

import itertools
def maxque(items, size):
    items = iter(items)
    q = deque(itertools.islice(items, size))
    for item in items:
        del q[0]
        q.append(item)
    return q

If it’s a requirement to read the file from the end, then use a gallop (a.k.a exponential) search.

def tail(f, n):
    assert n >= 0
    pos, lines = n+1, []
    while len(lines) <= n:
        try:
            f.seek(-pos, 2)
        except IOError:
            f.seek(0)
            break
        finally:
            lines = list(f)
        pos *= 2
    return lines[-n:]

回答 4

S.Lott的上述回答几乎对我有用,但最终给了我一些局限性。事实证明,它破坏了块边界上的数据,因为数据以相反的顺序保存读取的块。调用”.join(data)时,块顺序错误。这样可以解决此问题。

def tail(f, window=20):
    """
    Returns the last `window` lines of file `f` as a list.
    f - a byte file-like object
    """
    if window == 0:
        return []
    BUFSIZ = 1024
    f.seek(0, 2)
    bytes = f.tell()
    size = window + 1
    block = -1
    data = []
    while size > 0 and bytes > 0:
        if bytes - BUFSIZ > 0:
            # Seek back one whole BUFSIZ
            f.seek(block * BUFSIZ, 2)
            # read BUFFER
            data.insert(0, f.read(BUFSIZ))
        else:
            # file too small, start from begining
            f.seek(0,0)
            # only read what was not read
            data.insert(0, f.read(bytes))
        linesFound = data[0].count('\n')
        size -= linesFound
        bytes -= BUFSIZ
        block -= 1
    return ''.join(data).splitlines()[-window:]

S.Lott’s answer above almost works for me but ends up giving me partial lines. It turns out that it corrupts data on block boundaries because data holds the read blocks in reversed order. When ”.join(data) is called, the blocks are in the wrong order. This fixes that.

def tail(f, window=20):
    """
    Returns the last `window` lines of file `f` as a list.
    f - a byte file-like object
    """
    if window == 0:
        return []
    BUFSIZ = 1024
    f.seek(0, 2)
    bytes = f.tell()
    size = window + 1
    block = -1
    data = []
    while size > 0 and bytes > 0:
        if bytes - BUFSIZ > 0:
            # Seek back one whole BUFSIZ
            f.seek(block * BUFSIZ, 2)
            # read BUFFER
            data.insert(0, f.read(BUFSIZ))
        else:
            # file too small, start from begining
            f.seek(0,0)
            # only read what was not read
            data.insert(0, f.read(bytes))
        linesFound = data[0].count('\n')
        size -= linesFound
        bytes -= BUFSIZ
        block -= 1
    return ''.join(data).splitlines()[-window:]

回答 5

我最终使用的代码。我认为这是迄今为止最好的:

def tail(f, n, offset=None):
    """Reads a n lines from f with an offset of offset lines.  The return
    value is a tuple in the form ``(lines, has_more)`` where `has_more` is
    an indicator that is `True` if there are more lines in the file.
    """
    avg_line_length = 74
    to_read = n + (offset or 0)

    while 1:
        try:
            f.seek(-(avg_line_length * to_read), 2)
        except IOError:
            # woops.  apparently file is smaller than what we want
            # to step back, go to the beginning instead
            f.seek(0)
        pos = f.tell()
        lines = f.read().splitlines()
        if len(lines) >= to_read or pos == 0:
            return lines[-to_read:offset and -offset or None], \
                   len(lines) > to_read or pos > 0
        avg_line_length *= 1.3

The code I ended up using. I think this is the best so far:

def tail(f, n, offset=None):
    """Reads a n lines from f with an offset of offset lines.  The return
    value is a tuple in the form ``(lines, has_more)`` where `has_more` is
    an indicator that is `True` if there are more lines in the file.
    """
    avg_line_length = 74
    to_read = n + (offset or 0)

    while 1:
        try:
            f.seek(-(avg_line_length * to_read), 2)
        except IOError:
            # woops.  apparently file is smaller than what we want
            # to step back, go to the beginning instead
            f.seek(0)
        pos = f.tell()
        lines = f.read().splitlines()
        if len(lines) >= to_read or pos == 0:
            return lines[-to_read:offset and -offset or None], \
                   len(lines) > to_read or pos > 0
        avg_line_length *= 1.3

回答 6

mmap的简单快速解决方案:

import mmap
import os

def tail(filename, n):
    """Returns last n lines from the filename. No exception handling"""
    size = os.path.getsize(filename)
    with open(filename, "rb") as f:
        # for Windows the mmap parameters are different
        fm = mmap.mmap(f.fileno(), 0, mmap.MAP_SHARED, mmap.PROT_READ)
        try:
            for i in xrange(size - 1, -1, -1):
                if fm[i] == '\n':
                    n -= 1
                    if n == -1:
                        break
            return fm[i + 1 if i else 0:].splitlines()
        finally:
            fm.close()

Simple and fast solution with mmap:

import mmap
import os

def tail(filename, n):
    """Returns last n lines from the filename. No exception handling"""
    size = os.path.getsize(filename)
    with open(filename, "rb") as f:
        # for Windows the mmap parameters are different
        fm = mmap.mmap(f.fileno(), 0, mmap.MAP_SHARED, mmap.PROT_READ)
        try:
            for i in xrange(size - 1, -1, -1):
                if fm[i] == '\n':
                    n -= 1
                    if n == -1:
                        break
            return fm[i + 1 if i else 0:].splitlines()
        finally:
            fm.close()

回答 7

甚至更清洁的python3兼容版本,不会插入但会附加和反向:

def tail(f, window=1):
    """
    Returns the last `window` lines of file `f` as a list of bytes.
    """
    if window == 0:
        return b''
    BUFSIZE = 1024
    f.seek(0, 2)
    end = f.tell()
    nlines = window + 1
    data = []
    while nlines > 0 and end > 0:
        i = max(0, end - BUFSIZE)
        nread = min(end, BUFSIZE)

        f.seek(i)
        chunk = f.read(nread)
        data.append(chunk)
        nlines -= chunk.count(b'\n')
        end -= nread
    return b'\n'.join(b''.join(reversed(data)).splitlines()[-window:])

像这样使用它:

with open(path, 'rb') as f:
    last_lines = tail(f, 3).decode('utf-8')

An even cleaner python3 compatible version that doesn’t insert but appends & reverses:

def tail(f, window=1):
    """
    Returns the last `window` lines of file `f` as a list of bytes.
    """
    if window == 0:
        return b''
    BUFSIZE = 1024
    f.seek(0, 2)
    end = f.tell()
    nlines = window + 1
    data = []
    while nlines > 0 and end > 0:
        i = max(0, end - BUFSIZE)
        nread = min(end, BUFSIZE)

        f.seek(i)
        chunk = f.read(nread)
        data.append(chunk)
        nlines -= chunk.count(b'\n')
        end -= nread
    return b'\n'.join(b''.join(reversed(data)).splitlines()[-window:])

use it like this:

with open(path, 'rb') as f:
    last_lines = tail(f, 3).decode('utf-8')

回答 8

将@papercrane解决方案更新为python3。使用open(filename, 'rb')和打开文件:

def tail(f, window=20):
    """Returns the last `window` lines of file `f` as a list.
    """
    if window == 0:
        return []

    BUFSIZ = 1024
    f.seek(0, 2)
    remaining_bytes = f.tell()
    size = window + 1
    block = -1
    data = []

    while size > 0 and remaining_bytes > 0:
        if remaining_bytes - BUFSIZ > 0:
            # Seek back one whole BUFSIZ
            f.seek(block * BUFSIZ, 2)
            # read BUFFER
            bunch = f.read(BUFSIZ)
        else:
            # file too small, start from beginning
            f.seek(0, 0)
            # only read what was not read
            bunch = f.read(remaining_bytes)

        bunch = bunch.decode('utf-8')
        data.insert(0, bunch)
        size -= bunch.count('\n')
        remaining_bytes -= BUFSIZ
        block -= 1

    return ''.join(data).splitlines()[-window:]

Update @papercrane solution to python3. Open the file with open(filename, 'rb') and:

def tail(f, window=20):
    """Returns the last `window` lines of file `f` as a list.
    """
    if window == 0:
        return []

    BUFSIZ = 1024
    f.seek(0, 2)
    remaining_bytes = f.tell()
    size = window + 1
    block = -1
    data = []

    while size > 0 and remaining_bytes > 0:
        if remaining_bytes - BUFSIZ > 0:
            # Seek back one whole BUFSIZ
            f.seek(block * BUFSIZ, 2)
            # read BUFFER
            bunch = f.read(BUFSIZ)
        else:
            # file too small, start from beginning
            f.seek(0, 0)
            # only read what was not read
            bunch = f.read(remaining_bytes)

        bunch = bunch.decode('utf-8')
        data.insert(0, bunch)
        size -= bunch.count('\n')
        remaining_bytes -= BUFSIZ
        block -= 1

    return ''.join(data).splitlines()[-window:]

回答 9

根据评论者的要求,将答案发布到我对类似问题的回答上,其中使用相同的技术来改变文件的最后一行,而不仅仅是获取文件。

对于很大的文件,这mmap是最好的方法。为了改善现有mmap答案,此版本可在Windows和Linux之间移植,并且运行速度应更快(尽管如果不对文件大小在GB范围内的32位Python进行一些修改,它将无法正常运行,请参见其他答案以获取处理此问题的提示) ,并进行修改以在Python 2上运行)。

import io  # Gets consistent version of open for both Py2.7 and Py3.x
import itertools
import mmap

def skip_back_lines(mm, numlines, startidx):
    '''Factored out to simplify handling of n and offset'''
    for _ in itertools.repeat(None, numlines):
        startidx = mm.rfind(b'\n', 0, startidx)
        if startidx < 0:
            break
    return startidx

def tail(f, n, offset=0):
    # Reopen file in binary mode
    with io.open(f.name, 'rb') as binf, mmap.mmap(binf.fileno(), 0, access=mmap.ACCESS_READ) as mm:
        # len(mm) - 1 handles files ending w/newline by getting the prior line
        startofline = skip_back_lines(mm, offset, len(mm) - 1)
        if startofline < 0:
            return []  # Offset lines consumed whole file, nothing to return
            # If using a generator function (yield-ing, see below),
            # this should be a plain return, no empty list

        endoflines = startofline + 1  # Slice end to omit offset lines

        # Find start of lines to capture (add 1 to move from newline to beginning of following line)
        startofline = skip_back_lines(mm, n, startofline) + 1

        # Passing True to splitlines makes it return the list of lines without
        # removing the trailing newline (if any), so list mimics f.readlines()
        return mm[startofline:endoflines].splitlines(True)
        # If Windows style \r\n newlines need to be normalized to \n, and input
        # is ASCII compatible, can normalize newlines with:
        # return mm[startofline:endoflines].replace(os.linesep.encode('ascii'), b'\n').splitlines(True)

假设尾行的数目足够小,您可以安全地一次将它们全部读取到内存中;您还可以将其作为生成器函数,并通过将最后一行替换为以下内容来手动读取一行:

        mm.seek(startofline)
        # Call mm.readline n times, or until EOF, whichever comes first
        # Python 3.2 and earlier:
        for line in itertools.islice(iter(mm.readline, b''), n):
            yield line

        # 3.3+:
        yield from itertools.islice(iter(mm.readline, b''), n)

最后,以二进制模式(必须使用mmap)进行读取,因此得到str行(Py2)和bytes行(Py3);如果您想要unicode(Py2)或str(Py3),则可以调整迭代方法为您解码和/或修复换行符:

        lines = itertools.islice(iter(mm.readline, b''), n)
        if f.encoding:  # Decode if the passed file was opened with a specific encoding
            lines = (line.decode(f.encoding) for line in lines)
        if 'b' not in f.mode:  # Fix line breaks if passed file opened in text mode
            lines = (line.replace(os.linesep, '\n') for line in lines)
        # Python 3.2 and earlier:
        for line in lines:
            yield line
        # 3.3+:
        yield from lines

注意:我在无法访问Python进行测试的机器上输入了所有内容。如果我有错字,请告诉我;这与我认为应该起作用的其他答案足够相似,但是这些调整(例如,处理)可能会导致细微的错误。如果有任何错误,请在评论中让我知道。offset

Posting an answer at the behest of commenters on my answer to a similar question where the same technique was used to mutate the last line of a file, not just get it.

For a file of significant size, mmap is the best way to do this. To improve on the existing mmap answer, this version is portable between Windows and Linux, and should run faster (though it won’t work without some modifications on 32 bit Python with files in the GB range, see the other answer for hints on handling this, and for modifying to work on Python 2).

import io  # Gets consistent version of open for both Py2.7 and Py3.x
import itertools
import mmap

def skip_back_lines(mm, numlines, startidx):
    '''Factored out to simplify handling of n and offset'''
    for _ in itertools.repeat(None, numlines):
        startidx = mm.rfind(b'\n', 0, startidx)
        if startidx < 0:
            break
    return startidx

def tail(f, n, offset=0):
    # Reopen file in binary mode
    with io.open(f.name, 'rb') as binf, mmap.mmap(binf.fileno(), 0, access=mmap.ACCESS_READ) as mm:
        # len(mm) - 1 handles files ending w/newline by getting the prior line
        startofline = skip_back_lines(mm, offset, len(mm) - 1)
        if startofline < 0:
            return []  # Offset lines consumed whole file, nothing to return
            # If using a generator function (yield-ing, see below),
            # this should be a plain return, no empty list

        endoflines = startofline + 1  # Slice end to omit offset lines

        # Find start of lines to capture (add 1 to move from newline to beginning of following line)
        startofline = skip_back_lines(mm, n, startofline) + 1

        # Passing True to splitlines makes it return the list of lines without
        # removing the trailing newline (if any), so list mimics f.readlines()
        return mm[startofline:endoflines].splitlines(True)
        # If Windows style \r\n newlines need to be normalized to \n, and input
        # is ASCII compatible, can normalize newlines with:
        # return mm[startofline:endoflines].replace(os.linesep.encode('ascii'), b'\n').splitlines(True)

This assumes the number of lines tailed is small enough you can safely read them all into memory at once; you could also make this a generator function and manually read a line at a time by replacing the final line with:

        mm.seek(startofline)
        # Call mm.readline n times, or until EOF, whichever comes first
        # Python 3.2 and earlier:
        for line in itertools.islice(iter(mm.readline, b''), n):
            yield line

        # 3.3+:
        yield from itertools.islice(iter(mm.readline, b''), n)

Lastly, this read in binary mode (necessary to use mmap) so it gives str lines (Py2) and bytes lines (Py3); if you want unicode (Py2) or str (Py3), the iterative approach could be tweaked to decode for you and/or fix newlines:

        lines = itertools.islice(iter(mm.readline, b''), n)
        if f.encoding:  # Decode if the passed file was opened with a specific encoding
            lines = (line.decode(f.encoding) for line in lines)
        if 'b' not in f.mode:  # Fix line breaks if passed file opened in text mode
            lines = (line.replace(os.linesep, '\n') for line in lines)
        # Python 3.2 and earlier:
        for line in lines:
            yield line
        # 3.3+:
        yield from lines

Note: I typed this all up on a machine where I lack access to Python to test. Please let me know if I typoed anything; this was similar enough to my other answer that I think it should work, but the tweaks (e.g. handling an offset) could lead to subtle errors. Please let me know in the comments if there are any mistakes.


回答 10

我发现上面的Popen是最好的解决方案。它既快速又肮脏,并且适用于Unix机器上的python 2.6,我使用了以下命令

def GetLastNLines(self, n, fileName):
    """
    Name:           Get LastNLines
    Description:        Gets last n lines using Unix tail
    Output:         returns last n lines of a file
    Keyword argument:
    n -- number of last lines to return
    filename -- Name of the file you need to tail into
    """
    p = subprocess.Popen(['tail','-n',str(n),self.__fileName], stdout=subprocess.PIPE)
    soutput, sinput = p.communicate()
    return soutput

soutput将包含代码的最后n行。逐行遍历soutput:

for line in GetLastNLines(50,'myfile.log').split('\n'):
    print line

I found the Popen above to be the best solution. It’s quick and dirty and it works For python 2.6 on Unix machine i used the following

def GetLastNLines(self, n, fileName):
    """
    Name:           Get LastNLines
    Description:        Gets last n lines using Unix tail
    Output:         returns last n lines of a file
    Keyword argument:
    n -- number of last lines to return
    filename -- Name of the file you need to tail into
    """
    p = subprocess.Popen(['tail','-n',str(n),self.__fileName], stdout=subprocess.PIPE)
    soutput, sinput = p.communicate()
    return soutput

soutput will have will contain last n lines of the code. to iterate through soutput line by line do:

for line in GetLastNLines(50,'myfile.log').split('\n'):
    print line

回答 11

基于S.Lott的最高票选答案(08年9月25日在21:43),但针对小文件而固定。

def tail(the_file, lines_2find=20):  
    the_file.seek(0, 2)                         #go to end of file
    bytes_in_file = the_file.tell()             
    lines_found, total_bytes_scanned = 0, 0
    while lines_2find+1 > lines_found and bytes_in_file > total_bytes_scanned: 
        byte_block = min(1024, bytes_in_file-total_bytes_scanned)
        the_file.seek(-(byte_block+total_bytes_scanned), 2)
        total_bytes_scanned += byte_block
        lines_found += the_file.read(1024).count('\n')
    the_file.seek(-total_bytes_scanned, 2)
    line_list = list(the_file.readlines())
    return line_list[-lines_2find:]

    #we read at least 21 line breaks from the bottom, block by block for speed
    #21 to ensure we don't get a half line

希望这是有用的。

based on S.Lott’s top voted answer (Sep 25 ’08 at 21:43), but fixed for small files.

def tail(the_file, lines_2find=20):  
    the_file.seek(0, 2)                         #go to end of file
    bytes_in_file = the_file.tell()             
    lines_found, total_bytes_scanned = 0, 0
    while lines_2find+1 > lines_found and bytes_in_file > total_bytes_scanned: 
        byte_block = min(1024, bytes_in_file-total_bytes_scanned)
        the_file.seek(-(byte_block+total_bytes_scanned), 2)
        total_bytes_scanned += byte_block
        lines_found += the_file.read(1024).count('\n')
    the_file.seek(-total_bytes_scanned, 2)
    line_list = list(the_file.readlines())
    return line_list[-lines_2find:]

    #we read at least 21 line breaks from the bottom, block by block for speed
    #21 to ensure we don't get a half line

Hope this is useful.


回答 12

pypi上有一些tail的现有实现,您可以使用pip安装:

  • mtFileUtil
  • 多尾
  • log4tailer

根据您的情况,使用这些现有工具之一可能会有好处。

There are some existing implementations of tail on pypi which you can install using pip:

  • mtFileUtil
  • multitail
  • log4tailer

Depending on your situation, there may be advantages to using one of these existing tools.


回答 13

简单:

with open("test.txt") as f:
data = f.readlines()
tail = data[-2:]
print(''.join(tail)

Simple :

with open("test.txt") as f:
data = f.readlines()
tail = data[-2:]
print(''.join(tail)

回答 14

为了提高大文件的效率(在日志文件中通常需要使用tail的情况下很常见),通常希望避免读取整个文件(即使这样做时也没有立即将整个文件读入内存),但是需要以某种方式而不是字符来计算偏移量。一种可能性是通过char用seek()char向后读取,但这非常慢。相反,最好在较大的块中进行处理。

我有一阵子我写的实用程序函数,可以向后读取文件,可以在这里使用。

import os, itertools

def rblocks(f, blocksize=4096):
    """Read file as series of blocks from end of file to start.

    The data itself is in normal order, only the order of the blocks is reversed.
    ie. "hello world" -> ["ld","wor", "lo ", "hel"]
    Note that the file must be opened in binary mode.
    """
    if 'b' not in f.mode.lower():
        raise Exception("File must be opened using binary mode.")
    size = os.stat(f.name).st_size
    fullblocks, lastblock = divmod(size, blocksize)

    # The first(end of file) block will be short, since this leaves 
    # the rest aligned on a blocksize boundary.  This may be more 
    # efficient than having the last (first in file) block be short
    f.seek(-lastblock,2)
    yield f.read(lastblock)

    for i in range(fullblocks-1,-1, -1):
        f.seek(i * blocksize)
        yield f.read(blocksize)

def tail(f, nlines):
    buf = ''
    result = []
    for block in rblocks(f):
        buf = block + buf
        lines = buf.splitlines()

        # Return all lines except the first (since may be partial)
        if lines:
            result.extend(lines[1:]) # First line may not be complete
            if(len(result) >= nlines):
                return result[-nlines:]

            buf = lines[0]

    return ([buf]+result)[-nlines:]


f=open('file_to_tail.txt','rb')
for line in tail(f, 20):
    print line

[编辑]添加了更特定的版本(避免需要反向两次)

For efficiency with very large files (common in logfile situations where you may want to use tail), you generally want to avoid reading the whole file (even if you do do it without reading the whole file into memory at once) However, you do need to somehow work out the offset in lines rather than characters. One possibility is reading backwards with seek() char by char, but this is very slow. Instead, its better to process in larger blocks.

I’ve a utility function I wrote a while ago to read files backwards that can be used here.

import os, itertools

def rblocks(f, blocksize=4096):
    """Read file as series of blocks from end of file to start.

    The data itself is in normal order, only the order of the blocks is reversed.
    ie. "hello world" -> ["ld","wor", "lo ", "hel"]
    Note that the file must be opened in binary mode.
    """
    if 'b' not in f.mode.lower():
        raise Exception("File must be opened using binary mode.")
    size = os.stat(f.name).st_size
    fullblocks, lastblock = divmod(size, blocksize)

    # The first(end of file) block will be short, since this leaves 
    # the rest aligned on a blocksize boundary.  This may be more 
    # efficient than having the last (first in file) block be short
    f.seek(-lastblock,2)
    yield f.read(lastblock)

    for i in range(fullblocks-1,-1, -1):
        f.seek(i * blocksize)
        yield f.read(blocksize)

def tail(f, nlines):
    buf = ''
    result = []
    for block in rblocks(f):
        buf = block + buf
        lines = buf.splitlines()

        # Return all lines except the first (since may be partial)
        if lines:
            result.extend(lines[1:]) # First line may not be complete
            if(len(result) >= nlines):
                return result[-nlines:]

            buf = lines[0]

    return ([buf]+result)[-nlines:]


f=open('file_to_tail.txt','rb')
for line in tail(f, 20):
    print line

[Edit] Added more specific version (avoids need to reverse twice)


回答 15

您可以使用f.seek(0,2)到文件末尾,然后用readline()的以下替换内容逐行读取:

def readline_backwards(self, f):
    backline = ''
    last = ''
    while not last == '\n':
        backline = last + backline
        if f.tell() <= 0:
            return backline
        f.seek(-1, 1)
        last = f.read(1)
        f.seek(-1, 1)
    backline = last
    last = ''
    while not last == '\n':
        backline = last + backline
        if f.tell() <= 0:
            return backline
        f.seek(-1, 1)
        last = f.read(1)
        f.seek(-1, 1)
    f.seek(1, 1)
    return backline

you can go to the end of your file with f.seek(0, 2) and then read off lines one by one with the following replacement for readline():

def readline_backwards(self, f):
    backline = ''
    last = ''
    while not last == '\n':
        backline = last + backline
        if f.tell() <= 0:
            return backline
        f.seek(-1, 1)
        last = f.read(1)
        f.seek(-1, 1)
    backline = last
    last = ''
    while not last == '\n':
        backline = last + backline
        if f.tell() <= 0:
            return backline
        f.seek(-1, 1)
        last = f.read(1)
        f.seek(-1, 1)
    f.seek(1, 1)
    return backline

回答 16

基于Eyecue的答案(2010年6月10日,21:28):此类将head()和tail()方法添加到文件对象。

class File(file):
    def head(self, lines_2find=1):
        self.seek(0)                            #Rewind file
        return [self.next() for x in xrange(lines_2find)]

    def tail(self, lines_2find=1):  
        self.seek(0, 2)                         #go to end of file
        bytes_in_file = self.tell()             
        lines_found, total_bytes_scanned = 0, 0
        while (lines_2find+1 > lines_found and
               bytes_in_file > total_bytes_scanned): 
            byte_block = min(1024, bytes_in_file-total_bytes_scanned)
            self.seek(-(byte_block+total_bytes_scanned), 2)
            total_bytes_scanned += byte_block
            lines_found += self.read(1024).count('\n')
        self.seek(-total_bytes_scanned, 2)
        line_list = list(self.readlines())
        return line_list[-lines_2find:]

用法:

f = File('path/to/file', 'r')
f.head(3)
f.tail(3)

Based on Eyecue answer (Jun 10 ’10 at 21:28): this class add head() and tail() method to file object.

class File(file):
    def head(self, lines_2find=1):
        self.seek(0)                            #Rewind file
        return [self.next() for x in xrange(lines_2find)]

    def tail(self, lines_2find=1):  
        self.seek(0, 2)                         #go to end of file
        bytes_in_file = self.tell()             
        lines_found, total_bytes_scanned = 0, 0
        while (lines_2find+1 > lines_found and
               bytes_in_file > total_bytes_scanned): 
            byte_block = min(1024, bytes_in_file-total_bytes_scanned)
            self.seek(-(byte_block+total_bytes_scanned), 2)
            total_bytes_scanned += byte_block
            lines_found += self.read(1024).count('\n')
        self.seek(-total_bytes_scanned, 2)
        line_list = list(self.readlines())
        return line_list[-lines_2find:]

Usage:

f = File('path/to/file', 'r')
f.head(3)
f.tail(3)

回答 17

如果文件未以\ n结尾或确保读取完整的第一行,则其中的几种解决方案都存在问题。

def tail(file, n=1, bs=1024):
    f = open(file)
    f.seek(-1,2)
    l = 1-f.read(1).count('\n') # If file doesn't end in \n, count it anyway.
    B = f.tell()
    while n >= l and B > 0:
            block = min(bs, B)
            B -= block
            f.seek(B, 0)
            l += f.read(block).count('\n')
    f.seek(B, 0)
    l = min(l,n) # discard first (incomplete) line if l > n
    lines = f.readlines()[-l:]
    f.close()
    return lines

Several of these solutions have issues if the file doesn’t end in \n or in ensuring the complete first line is read.

def tail(file, n=1, bs=1024):
    f = open(file)
    f.seek(-1,2)
    l = 1-f.read(1).count('\n') # If file doesn't end in \n, count it anyway.
    B = f.tell()
    while n >= l and B > 0:
            block = min(bs, B)
            B -= block
            f.seek(B, 0)
            l += f.read(block).count('\n')
    f.seek(B, 0)
    l = min(l,n) # discard first (incomplete) line if l > n
    lines = f.readlines()[-l:]
    f.close()
    return lines

回答 18

这是一个非常简单的实现:

with open('/etc/passwd', 'r') as f:
  try:
    f.seek(0,2)
    s = ''
    while s.count('\n') < 11:
      cur = f.tell()
      f.seek((cur - 10))
      s = f.read(10) + s
      f.seek((cur - 10))
    print s
  except Exception as e:
    f.readlines()

Here is a pretty simple implementation:

with open('/etc/passwd', 'r') as f:
  try:
    f.seek(0,2)
    s = ''
    while s.count('\n') < 11:
      cur = f.tell()
      f.seek((cur - 10))
      s = f.read(10) + s
      f.seek((cur - 10))
    print s
  except Exception as e:
    f.readlines()

回答 19

有一个非常有用的模块可以做到这一点:

from file_read_backwards import FileReadBackwards

with FileReadBackwards("/tmp/file", encoding="utf-8") as frb:

# getting lines by lines starting from the last line up
for l in frb:
    print(l)

There is very useful module that can do this:

from file_read_backwards import FileReadBackwards

with FileReadBackwards("/tmp/file", encoding="utf-8") as frb:

# getting lines by lines starting from the last line up
for l in frb:
    print(l)

回答 20

另一种解决方案

如果您的txt文件如下所示:鼠标蛇猫蜥蜴狼狗

您可以通过简单地在python”’中使用数组索引来反转此文件

contents=[]
def tail(contents,n):
    with open('file.txt') as file:
        for i in file.readlines():
            contents.append(i)

    for i in contents[:n:-1]:
        print(i)

tail(contents,-5)

结果:狗狼蜥蜴猫

Another Solution

if your txt file looks like this: mouse snake cat lizard wolf dog

you could reverse this file by simply using array indexing in python ”’

contents=[]
def tail(contents,n):
    with open('file.txt') as file:
        for i in file.readlines():
            contents.append(i)

    for i in contents[:n:-1]:
        print(i)

tail(contents,-5)

result: dog wolf lizard cat


回答 21

最简单的方法是使用deque

from collections import deque

def tail(filename, n=10):
    with open(filename) as f:
        return deque(f, n)

The simplest way is to use deque:

from collections import deque

def tail(filename, n=10):
    with open(filename) as f:
        return deque(f, n)

回答 22

我不得不从文件的最后一行读取特定值,并偶然发现了该线程。我没有重新发明Python的工作原理,而是得到了一个很小的shell脚本,保存为/ usr / local / bin / get_last_netp:

#! /bin/bash
tail -n1 /home/leif/projects/transfer/export.log | awk {'print $14'}

在Python程序中:

from subprocess import check_output

last_netp = int(check_output("/usr/local/bin/get_last_netp"))

I had to read a specific value from the last line of a file, and stumbled upon this thread. Rather than reinventing the wheel in Python, I ended up with a tiny shell script, saved as /usr/local/bin/get_last_netp:

#! /bin/bash
tail -n1 /home/leif/projects/transfer/export.log | awk {'print $14'}

And in the Python program:

from subprocess import check_output

last_netp = int(check_output("/usr/local/bin/get_last_netp"))

回答 23

不是第一个使用双端队列的示例,而是一个更简单的示例。这是一般性的:它适用于任何可迭代的对象,而不仅仅是文件。

#!/usr/bin/env python
import sys
import collections
def tail(iterable, N):
    deq = collections.deque()
    for thing in iterable:
        if len(deq) >= N:
            deq.popleft()
        deq.append(thing)
    for thing in deq:
        yield thing
if __name__ == '__main__':
    for line in tail(sys.stdin,10):
        sys.stdout.write(line)

Not the first example using a deque, but a simpler one. This one is general: it works on any iterable object, not just a file.

#!/usr/bin/env python
import sys
import collections
def tail(iterable, N):
    deq = collections.deque()
    for thing in iterable:
        if len(deq) >= N:
            deq.popleft()
        deq.append(thing)
    for thing in deq:
        yield thing
if __name__ == '__main__':
    for line in tail(sys.stdin,10):
        sys.stdout.write(line)

回答 24

This is my version of tailf

import sys, time, os

filename = 'path to file'

try:
    with open(filename) as f:
        size = os.path.getsize(filename)
        if size < 1024:
            s = size
        else:
            s = 999
        f.seek(-s, 2)
        l = f.read()
        print l
        while True:
            line = f.readline()
            if not line:
                time.sleep(1)
                continue
            print line
except IOError:
    pass
This is my version of tailf

import sys, time, os

filename = 'path to file'

try:
    with open(filename) as f:
        size = os.path.getsize(filename)
        if size < 1024:
            s = size
        else:
            s = 999
        f.seek(-s, 2)
        l = f.read()
        print l
        while True:
            line = f.readline()
            if not line:
                time.sleep(1)
                continue
            print line
except IOError:
    pass

回答 25

import time

attemps = 600
wait_sec = 5
fname = "YOUR_PATH"

with open(fname, "r") as f:
    where = f.tell()
    for i in range(attemps):
        line = f.readline()
        if not line:
            time.sleep(wait_sec)
            f.seek(where)
        else:
            print line, # already has newline
import time

attemps = 600
wait_sec = 5
fname = "YOUR_PATH"

with open(fname, "r") as f:
    where = f.tell()
    for i in range(attemps):
        line = f.readline()
        if not line:
            time.sleep(wait_sec)
            f.seek(where)
        else:
            print line, # already has newline

回答 26

import itertools
fname = 'log.txt'
offset = 5
n = 10
with open(fname) as f:
    n_last_lines = list(reversed([x for x in itertools.islice(f, None)][-(offset+1):-(offset+n+1):-1]))
import itertools
fname = 'log.txt'
offset = 5
n = 10
with open(fname) as f:
    n_last_lines = list(reversed([x for x in itertools.islice(f, None)][-(offset+1):-(offset+n+1):-1]))

回答 27

abc = "2018-06-16 04:45:18.68"
filename = "abc.txt"
with open(filename) as myFile:
    for num, line in enumerate(myFile, 1):
        if abc in line:
            lastline = num
print "last occurance of work at file is in "+str(lastline) 
abc = "2018-06-16 04:45:18.68"
filename = "abc.txt"
with open(filename) as myFile:
    for num, line in enumerate(myFile, 1):
        if abc in line:
            lastline = num
print "last occurance of work at file is in "+str(lastline) 

回答 28

更新由A.Coady给出的答案

适用于python 3

这使用指数搜索,将仅缓冲N来自后面的行,并且非常高效。

import time
import os
import sys

def tail(f, n):
    assert n >= 0
    pos, lines = n+1, []

    # set file pointer to end

    f.seek(0, os.SEEK_END)

    isFileSmall = False

    while len(lines) <= n:
        try:
            f.seek(f.tell() - pos, os.SEEK_SET)
        except ValueError as e:
            # lines greater than file seeking size
            # seek to start
            f.seek(0,os.SEEK_SET)
            isFileSmall = True
        except IOError:
            print("Some problem reading/seeking the file")
            sys.exit(-1)
        finally:
            lines = f.readlines()
            if isFileSmall:
                break

        pos *= 2

    print(lines)

    return lines[-n:]




with open("stream_logs.txt") as f:
    while(True):
        time.sleep(0.5)
        print(tail(f,2))

Update for answer given by A.Coady

Works with python 3.

This uses Exponential Search and will buffer only N lines from back and is very efficient.

import time
import os
import sys

def tail(f, n):
    assert n >= 0
    pos, lines = n+1, []

    # set file pointer to end

    f.seek(0, os.SEEK_END)

    isFileSmall = False

    while len(lines) <= n:
        try:
            f.seek(f.tell() - pos, os.SEEK_SET)
        except ValueError as e:
            # lines greater than file seeking size
            # seek to start
            f.seek(0,os.SEEK_SET)
            isFileSmall = True
        except IOError:
            print("Some problem reading/seeking the file")
            sys.exit(-1)
        finally:
            lines = f.readlines()
            if isFileSmall:
                break

        pos *= 2

    print(lines)

    return lines[-n:]




with open("stream_logs.txt") as f:
    while(True):
        time.sleep(0.5)
        print(tail(f,2))


回答 29

再次考虑,这可能和这里的一切一样快。

def tail( f, window=20 ):
    lines= ['']*window
    count= 0
    for l in f:
        lines[count%window]= l
        count += 1
    print lines[count%window:], lines[:count%window]

这要简单得多。而且它似乎确实在快速发展。

On second thought, this is probably just as fast as anything here.

def tail( f, window=20 ):
    lines= ['']*window
    count= 0
    for l in f:
        lines[count%window]= l
        count += 1
    print lines[count%window:], lines[:count%window]

It’s a lot simpler. And it does seem to rip along at a good pace.


python多久刷新一次文件?

问题:python多久刷新一次文件?

  1. Python多久刷新一次文件?
  2. Python多久刷新一次到stdout?

我不确定(1)。

至于(2),我相信Python每隔一行都会刷新到stdout。但是,如果将stdout重载为文件,它是否会经常刷新?

  1. How often does Python flush to a file?
  2. How often does Python flush to stdout?

I’m unsure about (1).

As for (2), I believe Python flushes to stdout after every new line. But, if you overload stdout to be to a file, does it flush as often?


回答 0

对于文件操作,Python使用操作系统的默认缓冲,除非您对其进行配置,否则进行配置。您可以指定缓冲区大小,无缓冲或行缓冲。

例如,open函数采用缓冲区大小参数。

http://docs.python.org/library/functions.html#open

“可选的缓冲参数指定文件的所需缓冲区大小:”

  • 0表示未缓冲,
  • 1表示行缓冲,
  • 任何其他正值表示使用(大约)该大小的缓冲区。
  • 负缓冲意味着使用系统默认值,通常对tty设备使用行缓冲,而对于其他文件则使用完全缓冲。
  • 如果省略,则使用系统默认值。

码:

bufsize = 0
f = open('file.txt', 'w', buffering=bufsize)

For file operations, Python uses the operating system’s default buffering unless you configure it do otherwise. You can specify a buffer size, unbuffered, or line buffered.

For example, the open function takes a buffer size argument.

http://docs.python.org/library/functions.html#open

“The optional buffering argument specifies the file’s desired buffer size:”

  • 0 means unbuffered,
  • 1 means line buffered,
  • any other positive value means use a buffer of (approximately) that size.
  • A negative buffering means to use the system default, which is usually line buffered for tty devices and fully buffered for other files.
  • If omitted, the system default is used.

code:

bufsize = 0
f = open('file.txt', 'w', buffering=bufsize)

回答 1

您也可以使用该flush()方法以编程方式强制将缓冲区刷新到文件。

with open('out.log', 'w+') as f:
    f.write('output is ')
    # some work
    s = 'OK.'
    f.write(s)
    f.write('\n')
    f.flush()
    # some other work
    f.write('done\n')
    f.flush()

我发现当尾随输出文件时,这很有用tail -f

You can also force flush the buffer to a file programmatically with the flush() method.

with open('out.log', 'w+') as f:
    f.write('output is ')
    # some work
    s = 'OK.'
    f.write(s)
    f.write('\n')
    f.flush()
    # some other work
    f.write('done\n')
    f.flush()

I have found this useful when tailing an output file with tail -f.


回答 2

我不知道这是否也适用于python,但我认为这取决于您所运行的操作系统。

例如,在Linux上,输出到终端会在换行符上刷新缓冲区,而输出到文件只会在缓冲区已满时刷新(默认情况下)。这是因为较少次数刷新缓冲区更有效,并且用户不太可能注意到输出是否未刷新到文件中的换行符上。

如果需要,您也许可以自动刷新输出。

编辑:我认为您将以这种方式在python中自动刷新(从此处开始

#0 means there is no buffer, so all output
#will be auto-flushed
fsock = open('out.log', 'w', 0)
sys.stdout = fsock
#do whatever
fsock.close()

I don’t know if this applies to python as well, but I think it depends on the operating system that you are running.

On Linux for example, output to terminal flushes the buffer on a newline, whereas for output to files it only flushes when the buffer is full (by default). This is because it is more efficient to flush the buffer fewer times, and the user is less likely to notice if the output is not flushed on a newline in a file.

You might be able to auto-flush the output if that is what you need.

EDIT: I think you would auto-flush in python this way (based from here)

#0 means there is no buffer, so all output
#will be auto-flushed
fsock = open('out.log', 'w', 0)
sys.stdout = fsock
#do whatever
fsock.close()

回答 3

您还可以通过调用io模块中的只读DEFAULT_BUFFER_SIZE属性来检查默认缓冲区大小。

import io
print (io.DEFAULT_BUFFER_SIZE)

You can also check the default buffer size by calling the read only DEFAULT_BUFFER_SIZE attribute from io module.

import io
print (io.DEFAULT_BUFFER_SIZE)

回答 4

这是另一种方法,由OP来选择他更喜欢哪个。

如果将以下代码包含在__init__.py文件中,然后再包含其他任何代码,则打印有消息print和任何错误的消息将不再记录到Ableton的Log.txt中,而是将磁盘上的文件分开:

import sys

path = "/Users/#username#"

errorLog = open(path + "/stderr.txt", "w", 1)
errorLog.write("---Starting Error Log---\n")
sys.stderr = errorLog
stdoutLog = open(path + "/stdout.txt", "w", 1)
stdoutLog.write("---Starting Standard Out Log---\n")
sys.stdout = stdoutLog

(对于Mac,更改#username#为用户文件夹的名称。在Windows上,用户文件夹的路径将具有不同的格式)

当您在文本编辑器中打开文件时,该文件在更改磁盘上的文件时会刷新其内容(例如,Mac:TextEdit不会,但TextWrangler会),您会看到日志实时更新。

鸣谢:这段代码主要是Nathan Ramella从liveAPI控制界面脚本复制的

Here is another approach, up to the OP to choose which one he prefers.

When including the code below in the __init__.py file before any other code, messages printed with print and any errors will no longer be logged to Ableton’s Log.txt but to separate files on your disk:

import sys

path = "/Users/#username#"

errorLog = open(path + "/stderr.txt", "w", 1)
errorLog.write("---Starting Error Log---\n")
sys.stderr = errorLog
stdoutLog = open(path + "/stdout.txt", "w", 1)
stdoutLog.write("---Starting Standard Out Log---\n")
sys.stdout = stdoutLog

(for Mac, change #username# to the name of your user folder. On Windows the path to your user folder will have a different format)

When you open the files in a text editor that refreshes its content when the file on disk is changed (example for Mac: TextEdit does not but TextWrangler does), you will see the logs being updated in real-time.

Credits: this code was copied mostly from the liveAPI control surface scripts by Nathan Ramella


如何检查文件是否为空?

问题:如何检查文件是否为空?

我有一个文本文件。
如何检查是否为空?

I have a text file.
How can I check whether it’s empty or not?


回答 0

>>> import os
>>> os.stat("file").st_size == 0
True
>>> import os
>>> os.stat("file").st_size == 0
True

回答 1

import os    
os.path.getsize(fullpathhere) > 0
import os    
os.path.getsize(fullpathhere) > 0

回答 2

双方getsize()stat()会抛出一个异常,如果该文件不存在。此函数将返回True / False而不抛出(更简单但更不可靠):

import os
def is_non_zero_file(fpath):  
    return os.path.isfile(fpath) and os.path.getsize(fpath) > 0

Both getsize() and stat() will throw an exception if the file does not exist. This function will return True/False without throwing (simpler but less robust):

import os
def is_non_zero_file(fpath):  
    return os.path.isfile(fpath) and os.path.getsize(fpath) > 0

回答 3

如果由于某种原因您已经打开了文件,则可以尝试以下操作:

>>> with open('New Text Document.txt') as my_file:
...     # I already have file open at this point.. now what?
...     my_file.seek(0) #ensure you're at the start of the file..
...     first_char = my_file.read(1) #get the first character
...     if not first_char:
...         print "file is empty" #first character is the empty string..
...     else:
...         my_file.seek(0) #first character wasn't empty, return to start of file.
...         #use file now
...
file is empty

if for some reason you already had the file open you could try this:

>>> with open('New Text Document.txt') as my_file:
...     # I already have file open at this point.. now what?
...     my_file.seek(0) #ensure you're at the start of the file..
...     first_char = my_file.read(1) #get the first character
...     if not first_char:
...         print "file is empty" #first character is the empty string..
...     else:
...         my_file.seek(0) #first character wasn't empty, return to start of file.
...         #use file now
...
file is empty

回答 4

好吧,我将把ghostdog74的答案和评论结合起来,只是为了好玩。

>>> import os
>>> os.stat('c:/pagefile.sys').st_size==0
False

False 表示非空文件。

因此,让我们编写一个函数:

import os

def file_is_empty(path):
    return os.stat(path).st_size==0

Ok so I’ll combine ghostdog74’s answer and the comments, just for fun.

>>> import os
>>> os.stat('c:/pagefile.sys').st_size==0
False

False means a non-empty file.

So let’s write a function:

import os

def file_is_empty(path):
    return os.stat(path).st_size==0

回答 5

如果您将Python3与一起使用,则pathlib可以os.stat()使用Path.stat()方法访问信息,该方法具有属性st_size(文件大小,以字节为单位):

>>> from pathlib import Path 
>>> mypath = Path("path/to/my/file")
>>> mypath.stat().st_size == 0 # True if empty

If you are using Python3 with pathlib you can access os.stat() information using the Path.stat() method, which has the attribute st_size(file size in bytes):

>>> from pathlib import Path 
>>> mypath = Path("path/to/my/file")
>>> mypath.stat().st_size == 0 # True if empty

回答 6

如果您有文件对象,那么

>>> import os
>>> with open('new_file.txt') as my_file:
...     my_file.seek(0, os.SEEK_END) # go to end of file
...     if my_file.tell(): # if current position is truish (i.e != 0)
...         my_file.seek(0) # rewind the file for later use 
...     else:
...         print "file is empty"
... 
file is empty

if you have the file object, then

>>> import os
>>> with open('new_file.txt') as my_file:
...     my_file.seek(0, os.SEEK_END) # go to end of file
...     if my_file.tell(): # if current position is truish (i.e != 0)
...         my_file.seek(0) # rewind the file for later use 
...     else:
...         print "file is empty"
... 
file is empty

回答 7

一个重要的陷阱:使用或函数测试时,压缩的空文件将显示为非零:getsize()stat()

$ python
>>> import os
>>> os.path.getsize('empty-file.txt.gz')
35
>>> os.stat("empty-file.txt.gz").st_size == 0
False

$ gzip -cd empty-file.txt.gz | wc
0 0 0

因此,您应该检查要测试的文件是否已压缩(例如检查文件名后缀),如果是,则将其保释或将其解压缩到临时位置,测试未压缩的文件,然后在完成后将其删除。

An important gotcha: a compressed empty file will appear to be non-zero when tested with getsize() or stat() functions:

$ python
>>> import os
>>> os.path.getsize('empty-file.txt.gz')
35
>>> os.stat("empty-file.txt.gz").st_size == 0
False

$ gzip -cd empty-file.txt.gz | wc
0 0 0

So you should check whether the file to be tested is compressed (e.g. examine the filename suffix) and if so, either bail or uncompress it to a temporary location, test the uncompressed file, and then delete it when done.


回答 8

由于您尚未定义什么是空文件。有些人可能会认为只有空白行的文件也是空文件。因此,如果要检查文件是否仅包含空白行(任何空白字符,’\ r’,’\ n’,’\ t’),则可以按照以下示例进行操作:

Python3

import re

def whitespace_only(file):
    content = open(file, 'r').read()
    if re.search(r'^\s*$', content):
        return True

说明:上面的示例使用正则表达式(regex)来匹配content文件的内容()。
特别是:对于:的正则表达式:^\s*$整体表示文件中是否仅包含空白行和/或空格。
^在行的开头断言位置
\s匹配任何空格字符(等于[\ r \ n \ t \ f \ v])
*量词-尽可能在零和无限次数之间进行匹配,并根据需要返回(贪婪)
$在行尾声明位置

Since you have not defined what an empty file is. Some might consider a file with just blank lines also an empty file. So if you want to check if your file contains only blank lines (any whitespace character, ‘\r’, ‘\n’, ‘\t’), you can follow the example below:

Python3

import re

def whitespace_only(file):
    content = open(file, 'r').read()
    if re.search(r'^\s*$', content):
        return True

Explain: the example above uses regular expression (regex) to match the content (content) of the file.
Specifically: for regex of: ^\s*$ as a whole means if the file contains only blank lines and/or blank spaces.
^ asserts position at start of a line
\s matches any whitespace character (equal to [\r\n\t\f\v ])
* Quantifier — Matches between zero and unlimited times, as many times as possible, giving back as needed (greedy)
$ asserts position at the end of a line


回答 9

如果要检查csv文件是否为空….请尝试

with open('file.csv','a',newline='') as f:
        csv_writer=DictWriter(f,fieldnames=['user_name','user_age','user_email','user_gender','user_type','user_check'])
        if os.stat('file.csv').st_size > 0:
            pass
        else:
            csv_writer.writeheader()

if you want to check csv file is empty or not …….try this

with open('file.csv','a',newline='') as f:
        csv_writer=DictWriter(f,fieldnames=['user_name','user_age','user_email','user_gender','user_type','user_check'])
        if os.stat('file.csv').st_size > 0:
            pass
        else:
            csv_writer.writeheader()

在Python中搜索并替换文件中的一行

问题:在Python中搜索并替换文件中的一行

我想遍历文本文件的内容,进行搜索并替换某些行,然后将结果写回到文件中。我可以先将整个文件加载到内存中,然后再写回去,但这可能不是最好的方法。

在以下代码中,执行此操作的最佳方法是什么?

f = open(file)
for line in f:
    if line.contains('foo'):
        newline = line.replace('foo', 'bar')
        # how to write this newline back to the file

I want to loop over the contents of a text file and do a search and replace on some lines and write the result back to the file. I could first load the whole file in memory and then write it back, but that probably is not the best way to do it.

What is the best way to do this, within the following code?

f = open(file)
for line in f:
    if line.contains('foo'):
        newline = line.replace('foo', 'bar')
        # how to write this newline back to the file

回答 0

我想类似的事情应该做。它基本上将内容写入新文件,并用新文件替换旧文件:

from tempfile import mkstemp
from shutil import move, copymode
from os import fdopen, remove

def replace(file_path, pattern, subst):
    #Create temp file
    fh, abs_path = mkstemp()
    with fdopen(fh,'w') as new_file:
        with open(file_path) as old_file:
            for line in old_file:
                new_file.write(line.replace(pattern, subst))
    #Copy the file permissions from the old file to the new file
    copymode(file_path, abs_path)
    #Remove original file
    remove(file_path)
    #Move new file
    move(abs_path, file_path)

I guess something like this should do it. It basically writes the content to a new file and replaces the old file with the new file:

from tempfile import mkstemp
from shutil import move, copymode
from os import fdopen, remove

def replace(file_path, pattern, subst):
    #Create temp file
    fh, abs_path = mkstemp()
    with fdopen(fh,'w') as new_file:
        with open(file_path) as old_file:
            for line in old_file:
                new_file.write(line.replace(pattern, subst))
    #Copy the file permissions from the old file to the new file
    copymode(file_path, abs_path)
    #Remove original file
    remove(file_path)
    #Move new file
    move(abs_path, file_path)

回答 1

最短的方法可能是使用fileinput模块。例如,以下代码将行号就地添加到文件中:

import fileinput

for line in fileinput.input("test.txt", inplace=True):
    print('{} {}'.format(fileinput.filelineno(), line), end='') # for Python 3
    # print "%d: %s" % (fileinput.filelineno(), line), # for Python 2

这里发生的是:

  1. 原始文件已移至备份文件
  2. 标准输出在循环中重定向到原始文件
  3. 因此,所有print语句都会写回到原始文件中

fileinput有更多的钟声和口哨声。例如,它可以用于自动操作中的所有文件sys.args[1:],而无需显式遍历它们。从Python 3.2开始,它还提供了在with语句中使用的便捷上下文管理器。


虽然fileinput对于一次性脚本非常有用,但我会警惕在实际代码中使用它,因为要承认它不是很易读或不熟悉。在实际(生产)代码中,值得花几行代码来使过程明确,从而使代码可读。

有两种选择:

  1. 该文件不是太大,您可以将其全部读取到内存中。然后关闭文件,以写入模式将其重新打开,然后将修改后的内容写回。
  2. 该文件太大,无法存储在内存中。您可以将其移到一个临时文件中并打开它,逐行阅读,然后写回到原始文件中。请注意,这需要两倍的存储空间。

The shortest way would probably be to use the fileinput module. For example, the following adds line numbers to a file, in-place:

import fileinput

for line in fileinput.input("test.txt", inplace=True):
    print('{} {}'.format(fileinput.filelineno(), line), end='') # for Python 3
    # print "%d: %s" % (fileinput.filelineno(), line), # for Python 2

What happens here is:

  1. The original file is moved to a backup file
  2. The standard output is redirected to the original file within the loop
  3. Thus any print statements write back into the original file

fileinput has more bells and whistles. For example, it can be used to automatically operate on all files in sys.args[1:], without your having to iterate over them explicitly. Starting with Python 3.2 it also provides a convenient context manager for use in a with statement.


While fileinput is great for throwaway scripts, I would be wary of using it in real code because admittedly it’s not very readable or familiar. In real (production) code it’s worthwhile to spend just a few more lines of code to make the process explicit and thus make the code readable.

There are two options:

  1. The file is not overly large, and you can just read it wholly to memory. Then close the file, reopen it in writing mode and write the modified contents back.
  2. The file is too large to be stored in memory; you can move it over to a temporary file and open that, reading it line by line, writing back into the original file. Note that this requires twice the storage.

回答 2

这是另一个经过测试的示例,它将匹配搜索和替换模式:

import fileinput
import sys

def replaceAll(file,searchExp,replaceExp):
    for line in fileinput.input(file, inplace=1):
        if searchExp in line:
            line = line.replace(searchExp,replaceExp)
        sys.stdout.write(line)

使用示例:

replaceAll("/fooBar.txt","Hello\sWorld!$","Goodbye\sWorld.")

Here’s another example that was tested, and will match search & replace patterns:

import fileinput
import sys

def replaceAll(file,searchExp,replaceExp):
    for line in fileinput.input(file, inplace=1):
        if searchExp in line:
            line = line.replace(searchExp,replaceExp)
        sys.stdout.write(line)

Example use:

replaceAll("/fooBar.txt","Hello\sWorld!$","Goodbye\sWorld.")

回答 3

这应该起作用:(就地编辑)

import fileinput

# Does a list of files, and
# redirects STDOUT to the file in question
for line in fileinput.input(files, inplace = 1): 
      print line.replace("foo", "bar"),

This should work: (inplace editing)

import fileinput

# Does a list of files, and
# redirects STDOUT to the file in question
for line in fileinput.input(files, inplace = 1): 
      print line.replace("foo", "bar"),

回答 4

根据Thomas Watnedal的回答。但是,这不能完全回答原始问题的线对线部分。该功能仍可以逐行替换

此实现无需使用临时文件即可替换文件内容,因此文件权限保持不变。

同样,re.sub代替replace,仅允许正则表达式代替纯文本替换。

以单个字符串而不是逐行读取文件可以进行多行匹配和替换。

import re

def replace(file, pattern, subst):
    # Read contents from file as a single string
    file_handle = open(file, 'r')
    file_string = file_handle.read()
    file_handle.close()

    # Use RE package to allow for replacement (also allowing for (multiline) REGEX)
    file_string = (re.sub(pattern, subst, file_string))

    # Write contents to file.
    # Using mode 'w' truncates the file.
    file_handle = open(file, 'w')
    file_handle.write(file_string)
    file_handle.close()

Based on the answer by Thomas Watnedal. However, this does not answer the line-to-line part of the original question exactly. The function can still replace on a line-to-line basis

This implementation replaces the file contents without using temporary files, as a consequence file permissions remain unchanged.

Also re.sub instead of replace, allows regex replacement instead of plain text replacement only.

Reading the file as a single string instead of line by line allows for multiline match and replacement.

import re

def replace(file, pattern, subst):
    # Read contents from file as a single string
    file_handle = open(file, 'r')
    file_string = file_handle.read()
    file_handle.close()

    # Use RE package to allow for replacement (also allowing for (multiline) REGEX)
    file_string = (re.sub(pattern, subst, file_string))

    # Write contents to file.
    # Using mode 'w' truncates the file.
    file_handle = open(file, 'w')
    file_handle.write(file_string)
    file_handle.close()

回答 5

就像lassevk建议的那样,随时随地写出新文件,这是一些示例代码:

fin = open("a.txt")
fout = open("b.txt", "wt")
for line in fin:
    fout.write( line.replace('foo', 'bar') )
fin.close()
fout.close()

As lassevk suggests, write out the new file as you go, here is some example code:

fin = open("a.txt")
fout = open("b.txt", "wt")
for line in fin:
    fout.write( line.replace('foo', 'bar') )
fin.close()
fout.close()

回答 6

如果您想要一个通用函数来将任何文本替换为其他文本,那么这可能是最好的选择,特别是如果您是正则表达式的支持者:

import re
def replace( filePath, text, subs, flags=0 ):
    with open( filePath, "r+" ) as file:
        fileContents = file.read()
        textPattern = re.compile( re.escape( text ), flags )
        fileContents = textPattern.sub( subs, fileContents )
        file.seek( 0 )
        file.truncate()
        file.write( fileContents )

If you’re wanting a generic function that replaces any text with some other text, this is likely the best way to go, particularly if you’re a fan of regex’s:

import re
def replace( filePath, text, subs, flags=0 ):
    with open( filePath, "r+" ) as file:
        fileContents = file.read()
        textPattern = re.compile( re.escape( text ), flags )
        fileContents = textPattern.sub( subs, fileContents )
        file.seek( 0 )
        file.truncate()
        file.write( fileContents )

回答 7

更加Python化的方式是使用上下文管理器,如下面的代码:

from tempfile import mkstemp
from shutil import move
from os import remove

def replace(source_file_path, pattern, substring):
    fh, target_file_path = mkstemp()
    with open(target_file_path, 'w') as target_file:
        with open(source_file_path, 'r') as source_file:
            for line in source_file:
                target_file.write(line.replace(pattern, substring))
    remove(source_file_path)
    move(target_file_path, source_file_path)

您可以在此处找到完整的代码段。

A more pythonic way would be to use context managers like the code below:

from tempfile import mkstemp
from shutil import move
from os import remove

def replace(source_file_path, pattern, substring):
    fh, target_file_path = mkstemp()
    with open(target_file_path, 'w') as target_file:
        with open(source_file_path, 'r') as source_file:
            for line in source_file:
                target_file.write(line.replace(pattern, substring))
    remove(source_file_path)
    move(target_file_path, source_file_path)

You can find the full snippet here.


回答 8

创建一个新文件,将行从旧复制到新,并在将行写入新文件之前进行替换。

Create a new file, copy lines from the old to the new, and do the replacing before you write the lines to the new file.


回答 9

扩展@Kiran的答案(我同意是更简洁和Pythonic的),这增加了编解码器以支持UTF-8的读写:

import codecs 

from tempfile import mkstemp
from shutil import move
from os import remove


def replace(source_file_path, pattern, substring):
    fh, target_file_path = mkstemp()

    with codecs.open(target_file_path, 'w', 'utf-8') as target_file:
        with codecs.open(source_file_path, 'r', 'utf-8') as source_file:
            for line in source_file:
                target_file.write(line.replace(pattern, substring))
    remove(source_file_path)
    move(target_file_path, source_file_path)

Expanding on @Kiran’s answer, which I agree is more succinct and Pythonic, this adds codecs to support the reading and writing of UTF-8:

import codecs 

from tempfile import mkstemp
from shutil import move
from os import remove


def replace(source_file_path, pattern, substring):
    fh, target_file_path = mkstemp()

    with codecs.open(target_file_path, 'w', 'utf-8') as target_file:
        with codecs.open(source_file_path, 'r', 'utf-8') as source_file:
            for line in source_file:
                target_file.write(line.replace(pattern, substring))
    remove(source_file_path)
    move(target_file_path, source_file_path)

回答 10

使用hamishmcn的答案作为模板,我能够在文件中搜索与我的正则表达式匹配的一行并将其替换为空字符串。

import re 

fin = open("in.txt", 'r') # in file
fout = open("out.txt", 'w') # out file
for line in fin:
    p = re.compile('[-][0-9]*[.][0-9]*[,]|[-][0-9]*[,]') # pattern
    newline = p.sub('',line) # replace matching strings with empty string
    print newline
    fout.write(newline)
fin.close()
fout.close()

Using hamishmcn’s answer as a template I was able to search for a line in a file that match my regex and replacing it with empty string.

import re 

fin = open("in.txt", 'r') # in file
fout = open("out.txt", 'w') # out file
for line in fin:
    p = re.compile('[-][0-9]*[.][0-9]*[,]|[-][0-9]*[,]') # pattern
    newline = p.sub('',line) # replace matching strings with empty string
    print newline
    fout.write(newline)
fin.close()
fout.close()

回答 11

fileinput 如先前的答案所述,它非常简单:

import fileinput

def replace_in_file(file_path, search_text, new_text):
    with fileinput.input(file_path, inplace=True) as f:
        for line in f:
            new_line = line.replace(search_text, new_text)
            print(new_line, end='')

说明:

  • fileinput可以接受多个文件,但是我更喜欢在处理每个文件后立即将其关闭。所以,放置单file_pathwith声明。
  • printinplace=True,语句不会打印任何内容,因为STDOUT将其转发到原始文件。
  • end=''in print语句是消除中间的空白新行。

可以如下使用:

file_path = '/path/to/my/file'
replace_in_file(file_path, 'old-text', 'new-text')

fileinput is quite straightforward as mentioned on previous answers:

import fileinput

def replace_in_file(file_path, search_text, new_text):
    with fileinput.input(file_path, inplace=True) as f:
        for line in f:
            new_line = line.replace(search_text, new_text)
            print(new_line, end='')

Explanation:

  • fileinput can accept multiple files, but I prefer to close each single file as soon as it is being processed. So placed single file_path in with statement.
  • print statement does not print anything when inplace=True, because STDOUT is being forwarded to the original file.
  • end='' in print statement is to eliminate intermediate blank new lines.

Can be used as follows:

file_path = '/path/to/my/file'
replace_in_file(file_path, 'old-text', 'new-text')

回答 12

如果您在以下位置删除缩进,它将搜索并替换成多行。参见以下示例。

def replace(file, pattern, subst):
    #Create temp file
    fh, abs_path = mkstemp()
    print fh, abs_path
    new_file = open(abs_path,'w')
    old_file = open(file)
    for line in old_file:
        new_file.write(line.replace(pattern, subst))
    #close temp file
    new_file.close()
    close(fh)
    old_file.close()
    #Remove original file
    remove(file)
    #Move new file
    move(abs_path, file)

if you remove the indent at the like below, it will search and replace in multiple line. See below for example.

def replace(file, pattern, subst):
    #Create temp file
    fh, abs_path = mkstemp()
    print fh, abs_path
    new_file = open(abs_path,'w')
    old_file = open(file)
    for line in old_file:
        new_file.write(line.replace(pattern, subst))
    #close temp file
    new_file.close()
    close(fh)
    old_file.close()
    #Remove original file
    remove(file)
    #Move new file
    move(abs_path, file)

记录器配置以记录到文件并打印到stdout

问题:记录器配置以记录到文件并打印到stdout

我正在使用Python的日志记录模块将一些调试字符串记录到运行良好的文件中。现在,此外,我想使用此模块还将字符串输出到stdout。我该怎么做呢?为了将我的字符串记录到文件中,我使用以下代码:

import logging
import logging.handlers
logger = logging.getLogger("")
logger.setLevel(logging.DEBUG)
handler = logging.handlers.RotatingFileHandler(
    LOGFILE, maxBytes=(1048576*5), backupCount=7
)
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)

然后调用记录器功能,例如

logger.debug("I am written to the file")

谢谢您的帮助!

I’m using Python’s logging module to log some debug strings to a file which works pretty well. Now in addition, I’d like to use this module to also print the strings out to stdout. How do I do this? In order to log my strings to a file I use following code:

import logging
import logging.handlers
logger = logging.getLogger("")
logger.setLevel(logging.DEBUG)
handler = logging.handlers.RotatingFileHandler(
    LOGFILE, maxBytes=(1048576*5), backupCount=7
)
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)

and then call a logger function like

logger.debug("I am written to the file")

Thank you for some help here!


回答 0

只需获取根记录器的句柄并添加即可StreamHandler。在StreamHandler写至标准错误。不知道您是否真的需要stdout而不是stderr,但这是我在设置Python记录器时使用的方法,我也添加了FileHandler它。然后,我所有的日志都转到两个地方(这听起来像您想要的)。

import logging
logging.getLogger().addHandler(logging.StreamHandler())

如果要输出到stdout而不是stderr,则只需将其指定给StreamHandler构造函数。

import sys
# ...
logging.getLogger().addHandler(logging.StreamHandler(sys.stdout))

您也可以在其中添加一个Formatter,以便所有日志行都有一个公共标题。

即:

import logging
logFormatter = logging.Formatter("%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s]  %(message)s")
rootLogger = logging.getLogger()

fileHandler = logging.FileHandler("{0}/{1}.log".format(logPath, fileName))
fileHandler.setFormatter(logFormatter)
rootLogger.addHandler(fileHandler)

consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(logFormatter)
rootLogger.addHandler(consoleHandler)

打印为以下格式:

2012-12-05 16:58:26,618 [MainThread  ] [INFO ]  my message

Just get a handle to the root logger and add the StreamHandler. The StreamHandler writes to stderr. Not sure if you really need stdout over stderr, but this is what I use when I setup the Python logger and I also add the FileHandler as well. Then all my logs go to both places (which is what it sounds like you want).

import logging
logging.getLogger().addHandler(logging.StreamHandler())

If you want to output to stdout instead of stderr, you just need to specify it to the StreamHandler constructor.

import sys
# ...
logging.getLogger().addHandler(logging.StreamHandler(sys.stdout))

You could also add a Formatter to it so all your log lines have a common header.

ie:

import logging
logFormatter = logging.Formatter("%(asctime)s [%(threadName)-12.12s] [%(levelname)-5.5s]  %(message)s")
rootLogger = logging.getLogger()

fileHandler = logging.FileHandler("{0}/{1}.log".format(logPath, fileName))
fileHandler.setFormatter(logFormatter)
rootLogger.addHandler(fileHandler)

consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(logFormatter)
rootLogger.addHandler(consoleHandler)

Prints to the format of:

2012-12-05 16:58:26,618 [MainThread  ] [INFO ]  my message

回答 1

logging.basicConfig()handlers从Python 3.3开始可以使用关键字参数,这大大简化了日志记录设置,尤其是在使用同一格式化程序设置多个处理程序时:

handlers–如果指定,则这应该是已经创建的处理程序的迭代,以添加到根记录器。任何尚未设置格式器的处理程序都将被分配此函数中创建的默认格式器。

因此,整个设置可以通过以下单个调用完成:

import logging

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[
        logging.FileHandler("debug.log"),
        logging.StreamHandler()
    ]
)

(或按照原始问题的要求加上import sys+ StreamHandler(sys.stdout)– StreamHandler的默认值是写入stderr。查看LogRecord属性如果要自定义日志格式并添加诸如文件名/行,线程信息等内容,请。)

上面的设置只需要在脚本开始处执行一次即可。您可以稍后在代码库的所有其他位置使用日志记录,如下所示:

logging.info('Useful message')
logging.error('Something bad happened')
...

注意:如果它不起作用,则可能是其他人已经用不同的方式初始化了日志系统。建议logging.root.handlers = []在调用之前先做评论basicConfig()

logging.basicConfig() can take a keyword argument handlers since Python 3.3, which simplifies logging setup a lot, especially when setting up multiple handlers with the same formatter:

handlers – If specified, this should be an iterable of already created handlers to add to the root logger. Any handlers which don’t already have a formatter set will be assigned the default formatter created in this function.

The whole setup can therefore be done with a single call like this:

import logging

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[
        logging.FileHandler("debug.log"),
        logging.StreamHandler()
    ]
)

(Or with import sys + StreamHandler(sys.stdout) per original question’s requirements – the default for StreamHandler is to write to stderr. Look at LogRecord attributes if you want to customize the log format and add things like filename/line, thread info etc.)

The setup above needs to be done only once near the beginning of the script. You can use the logging from all other places in the codebase later like this:

logging.info('Useful message')
logging.error('Something bad happened')
...

Note: If it doesn’t work, someone else has probably already initialized the logging system differently. Comments suggest doing logging.root.handlers = [] before the call to basicConfig().


回答 2

添加不带参数的StreamHandler转到stderr而不是stdout。如果某些其他进程依赖于stdout转储(即,在编写NRPE插件时),则请确保明确指定stdout,否则您可能会遇到一些意想不到的麻烦。

这是一个快速示例,该示例重用了问题中的假定值和LOGFILE:

import logging
from logging.handlers import RotatingFileHandler
from logging import handlers
import sys

log = logging.getLogger('')
log.setLevel(logging.DEBUG)
format = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")

ch = logging.StreamHandler(sys.stdout)
ch.setFormatter(format)
log.addHandler(ch)

fh = handlers.RotatingFileHandler(LOGFILE, maxBytes=(1048576*5), backupCount=7)
fh.setFormatter(format)
log.addHandler(fh)

Adding a StreamHandler without arguments goes to stderr instead of stdout. If some other process has a dependency on the stdout dump (i.e. when writing an NRPE plugin), then make sure to specify stdout explicitly or you might run into some unexpected troubles.

Here’s a quick example reusing the assumed values and LOGFILE from the question:

import logging
from logging.handlers import RotatingFileHandler
from logging import handlers
import sys

log = logging.getLogger('')
log.setLevel(logging.DEBUG)
format = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")

ch = logging.StreamHandler(sys.stdout)
ch.setFormatter(format)
log.addHandler(ch)

fh = handlers.RotatingFileHandler(LOGFILE, maxBytes=(1048576*5), backupCount=7)
fh.setFormatter(format)
log.addHandler(fh)

回答 3

在设置任何其他处理程序或记录任何消息之前,可以basicConfig使用stream=sys.stdout作为参数运行,或者手动添加一个StreamHandler将消息推入stdout的根记录器(或与此相关的任何其他记录器)。

Either run basicConfig with stream=sys.stdout as the argument prior to setting up any other handlers or logging any messages, or manually add a StreamHandler that pushes messages to stdout to the root logger (or any other logger you want, for that matter).


回答 4

在多个Python包中反复使用Waterboy的代码后,我最终将其转换为一个很小的独立Python包,您可以在这里找到:

https://github.com/acschaefer/duallog

该代码有据可查且易于使用。只需下载.py文件并将其包含在您的项目中,或通过安装整个软件包pip install duallog

After having used Waterboy’s code over and over in multiple Python packages, I finally cast it into a tiny standalone Python package, which you can find here:

https://github.com/acschaefer/duallog

The code is well documented and easy to use. Simply download the .py file and include it in your project, or install the whole package via pip install duallog.


回答 5

登录stdoutrotating file使用不同的级别和格式:

import logging
import logging.handlers
import sys

if __name__ == "__main__":

    # Change root logger level from WARNING (default) to NOTSET in order for all messages to be delegated.
    logging.getLogger().setLevel(logging.NOTSET)

    # Add stdout handler, with level INFO
    console = logging.StreamHandler(sys.stdout)
    console.setLevel(logging.INFO)
    formater = logging.Formatter('%(name)-13s: %(levelname)-8s %(message)s')
    console.setFormatter(formater)
    logging.getLogger().addHandler(console)

    # Add file rotating handler, with level DEBUG
    rotatingHandler = logging.handlers.RotatingFileHandler(filename='rotating.log', maxBytes=1000, backupCount=5)
    rotatingHandler.setLevel(logging.DEBUG)
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    rotatingHandler.setFormatter(formatter)
    logging.getLogger().addHandler(rotatingHandler)

    log = logging.getLogger("app." + __name__)

    log.debug('Debug message, should only appear in the file.')
    log.info('Info message, should appear in file and stdout.')
    log.warning('Warning message, should appear in file and stdout.')
    log.error('Error message, should appear in file and stdout.')

Logging to stdout and rotating file with different levels and formats:

import logging
import logging.handlers
import sys

if __name__ == "__main__":

    # Change root logger level from WARNING (default) to NOTSET in order for all messages to be delegated.
    logging.getLogger().setLevel(logging.NOTSET)

    # Add stdout handler, with level INFO
    console = logging.StreamHandler(sys.stdout)
    console.setLevel(logging.INFO)
    formater = logging.Formatter('%(name)-13s: %(levelname)-8s %(message)s')
    console.setFormatter(formater)
    logging.getLogger().addHandler(console)

    # Add file rotating handler, with level DEBUG
    rotatingHandler = logging.handlers.RotatingFileHandler(filename='rotating.log', maxBytes=1000, backupCount=5)
    rotatingHandler.setLevel(logging.DEBUG)
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    rotatingHandler.setFormatter(formatter)
    logging.getLogger().addHandler(rotatingHandler)

    log = logging.getLogger("app." + __name__)

    log.debug('Debug message, should only appear in the file.')
    log.info('Info message, should appear in file and stdout.')
    log.warning('Warning message, should appear in file and stdout.')
    log.error('Error message, should appear in file and stdout.')

回答 6

这是一个完整的包装好的解决方案,基于Waterboy的答案和其他各种来源。它支持同时记录到控制台和日志文件,允许进行不同的日志级别设置,提供彩色输出,并且易于配置(也可以作为Gist使用):

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# -------------------------------------------------------------------------------
#                                                                               -
#  Python dual-logging setup (console and log file),                            -
#  supporting different log levels and colorized output                         -
#                                                                               -
#  Created by Fonic <https://github.com/fonic>                                  -
#  Date: 04/05/20                                                               -
#                                                                               -
#  Based on:                                                                    -
#  https://stackoverflow.com/a/13733863/1976617                                 -
#  https://uran198.github.io/en/python/2016/07/12/colorful-python-logging.html  -
#  https://en.wikipedia.org/wiki/ANSI_escape_code#Colors                        -
#                                                                               -
# -------------------------------------------------------------------------------

# Imports
import os
import sys
import logging

# Logging formatter supporting colored output
class LogFormatter(logging.Formatter):

    COLOR_CODES = {
        logging.CRITICAL: "\033[1;35m", # bright/bold magenta
        logging.ERROR:    "\033[1;31m", # bright/bold red
        logging.WARNING:  "\033[1;33m", # bright/bold yellow
        logging.INFO:     "\033[0;37m", # white / light gray
        logging.DEBUG:    "\033[1;30m"  # bright/bold black / dark gray
    }

    RESET_CODE = "\033[0m"

    def __init__(self, color, *args, **kwargs):
        super(LogFormatter, self).__init__(*args, **kwargs)
        self.color = color

    def format(self, record, *args, **kwargs):
        if (self.color == True and record.levelno in self.COLOR_CODES):
            record.color_on  = self.COLOR_CODES[record.levelno]
            record.color_off = self.RESET_CODE
        else:
            record.color_on  = ""
            record.color_off = ""
        return super(LogFormatter, self).format(record, *args, **kwargs)

# Setup logging
def setup_logging(console_log_output, console_log_level, console_log_color, logfile_file, logfile_log_level, logfile_log_color, log_line_template):

    # Create logger
    # For simplicity, we use the root logger, i.e. call 'logging.getLogger()'
    # without name argument. This way we can simply use module methods for
    # for logging throughout the script. An alternative would be exporting
    # the logger, i.e. 'global logger; logger = logging.getLogger("<name>")'
    logger = logging.getLogger()

    # Set global log level to 'debug' (required for handler levels to work)
    logger.setLevel(logging.DEBUG)

    # Create console handler
    console_log_output = console_log_output.lower()
    if (console_log_output == "stdout"):
        console_log_output = sys.stdout
    elif (console_log_output == "stderr"):
        console_log_output = sys.stderr
    else:
        print("Failed to set console output: invalid output: '%s'" % console_log_output)
        return False
    console_handler = logging.StreamHandler(console_log_output)

    # Set console log level
    try:
        console_handler.setLevel(console_log_level.upper()) # only accepts uppercase level names
    except:
        print("Failed to set console log level: invalid level: '%s'" % console_log_level)
        return False

    # Create and set formatter, add console handler to logger
    console_formatter = LogFormatter(fmt=log_line_template, color=console_log_color)
    console_handler.setFormatter(console_formatter)
    logger.addHandler(console_handler)

    # Create log file handler
    try:
        logfile_handler = logging.FileHandler(logfile_file)
    except Exception as exception:
        print("Failed to set up log file: %s" % str(exception))
        return False

    # Set log file log level
    try:
        logfile_handler.setLevel(logfile_log_level.upper()) # only accepts uppercase level names
    except:
        print("Failed to set log file log level: invalid level: '%s'" % logfile_log_level)
        return False

    # Create and set formatter, add log file handler to logger
    logfile_formatter = LogFormatter(fmt=log_line_template, color=logfile_log_color)
    logfile_handler.setFormatter(logfile_formatter)
    logger.addHandler(logfile_handler)

    # Success
    return True

# Main function
def main():

    # Setup logging
    script_name = os.path.splitext(os.path.basename(sys.argv[0]))[0]
    if (not setup_logging(console_log_output="stdout", console_log_level="warning", console_log_color=True,
                        logfile_file=script_name + ".log", logfile_log_level="debug", logfile_log_color=False,
                        log_line_template="%(color_on)s[%(created)d] [%(threadName)s] [%(levelname)-8s] %(message)s%(color_off)s")):
        print("Failed to setup logging, aborting.")
        return 1

    # Log some messages
    logging.debug("Debug message")
    logging.info("Info message")
    logging.warning("Warning message")
    logging.error("Error message")
    logging.critical("Critical message")

# Call main function
if (__name__ == "__main__"):
    sys.exit(main())

Here is a complete, nicely wrapped solution based on Waterboy’s answer and various other sources. It supports logging to both console and log file, allows for different log level settings, provides colorized output and is easily configurable (also available as Gist):

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

# -------------------------------------------------------------------------------
#                                                                               -
#  Python dual-logging setup (console and log file),                            -
#  supporting different log levels and colorized output                         -
#                                                                               -
#  Created by Fonic <https://github.com/fonic>                                  -
#  Date: 04/05/20                                                               -
#                                                                               -
#  Based on:                                                                    -
#  https://stackoverflow.com/a/13733863/1976617                                 -
#  https://uran198.github.io/en/python/2016/07/12/colorful-python-logging.html  -
#  https://en.wikipedia.org/wiki/ANSI_escape_code#Colors                        -
#                                                                               -
# -------------------------------------------------------------------------------

# Imports
import os
import sys
import logging

# Logging formatter supporting colored output
class LogFormatter(logging.Formatter):

    COLOR_CODES = {
        logging.CRITICAL: "\033[1;35m", # bright/bold magenta
        logging.ERROR:    "\033[1;31m", # bright/bold red
        logging.WARNING:  "\033[1;33m", # bright/bold yellow
        logging.INFO:     "\033[0;37m", # white / light gray
        logging.DEBUG:    "\033[1;30m"  # bright/bold black / dark gray
    }

    RESET_CODE = "\033[0m"

    def __init__(self, color, *args, **kwargs):
        super(LogFormatter, self).__init__(*args, **kwargs)
        self.color = color

    def format(self, record, *args, **kwargs):
        if (self.color == True and record.levelno in self.COLOR_CODES):
            record.color_on  = self.COLOR_CODES[record.levelno]
            record.color_off = self.RESET_CODE
        else:
            record.color_on  = ""
            record.color_off = ""
        return super(LogFormatter, self).format(record, *args, **kwargs)

# Setup logging
def setup_logging(console_log_output, console_log_level, console_log_color, logfile_file, logfile_log_level, logfile_log_color, log_line_template):

    # Create logger
    # For simplicity, we use the root logger, i.e. call 'logging.getLogger()'
    # without name argument. This way we can simply use module methods for
    # for logging throughout the script. An alternative would be exporting
    # the logger, i.e. 'global logger; logger = logging.getLogger("<name>")'
    logger = logging.getLogger()

    # Set global log level to 'debug' (required for handler levels to work)
    logger.setLevel(logging.DEBUG)

    # Create console handler
    console_log_output = console_log_output.lower()
    if (console_log_output == "stdout"):
        console_log_output = sys.stdout
    elif (console_log_output == "stderr"):
        console_log_output = sys.stderr
    else:
        print("Failed to set console output: invalid output: '%s'" % console_log_output)
        return False
    console_handler = logging.StreamHandler(console_log_output)

    # Set console log level
    try:
        console_handler.setLevel(console_log_level.upper()) # only accepts uppercase level names
    except:
        print("Failed to set console log level: invalid level: '%s'" % console_log_level)
        return False

    # Create and set formatter, add console handler to logger
    console_formatter = LogFormatter(fmt=log_line_template, color=console_log_color)
    console_handler.setFormatter(console_formatter)
    logger.addHandler(console_handler)

    # Create log file handler
    try:
        logfile_handler = logging.FileHandler(logfile_file)
    except Exception as exception:
        print("Failed to set up log file: %s" % str(exception))
        return False

    # Set log file log level
    try:
        logfile_handler.setLevel(logfile_log_level.upper()) # only accepts uppercase level names
    except:
        print("Failed to set log file log level: invalid level: '%s'" % logfile_log_level)
        return False

    # Create and set formatter, add log file handler to logger
    logfile_formatter = LogFormatter(fmt=log_line_template, color=logfile_log_color)
    logfile_handler.setFormatter(logfile_formatter)
    logger.addHandler(logfile_handler)

    # Success
    return True

# Main function
def main():

    # Setup logging
    script_name = os.path.splitext(os.path.basename(sys.argv[0]))[0]
    if (not setup_logging(console_log_output="stdout", console_log_level="warning", console_log_color=True,
                        logfile_file=script_name + ".log", logfile_log_level="debug", logfile_log_color=False,
                        log_line_template="%(color_on)s[%(created)d] [%(threadName)s] [%(levelname)-8s] %(message)s%(color_off)s")):
        print("Failed to setup logging, aborting.")
        return 1

    # Log some messages
    logging.debug("Debug message")
    logging.info("Info message")
    logging.warning("Warning message")
    logging.error("Error message")
    logging.critical("Critical message")

# Call main function
if (__name__ == "__main__"):
    sys.exit(main())

回答 7

对于2.7,请尝试以下操作:

fh = logging.handlers.RotatingFileHandler(LOGFILE, maxBytes=(1048576*5), backupCount=7)

For 2.7, try the following:

fh = logging.handlers.RotatingFileHandler(LOGFILE, maxBytes=(1048576*5), backupCount=7)

如何查看文件中的更改?

问题:如何查看文件中的更改?

我有一个日志文件正在由另一个进程写入,我想监视它的更改。每次发生更改时,我都希望读入新数据以对其进行一些处理。

最好的方法是什么?我希望从PyWin32库中获得某种吸引。我找到了该win32file.FindNextChangeNotification功能,但不知道如何要求它观看特定文件。

如果有人做了这样的事情,我将不胜感激。

[编辑]我应该提到我在寻求不需要轮询的解决方案。

[编辑]诅咒!看来这在映射的网络驱动器上不起作用。我猜想Windows不会像在本地磁盘上那样“听到”文件的任何更新。

I have a log file being written by another process which I want to watch for changes. Each time a change occurs I’d like to read the new data in to do some processing on it.

What’s the best way to do this? I was hoping there’d be some sort of hook from the PyWin32 library. I’ve found the win32file.FindNextChangeNotification function but have no idea how to ask it to watch a specific file.

If anyone’s done anything like this I’d be really grateful to hear how…

[Edit] I should have mentioned that I was after a solution that doesn’t require polling.

[Edit] Curses! It seems this doesn’t work over a mapped network drive. I’m guessing windows doesn’t ‘hear’ any updates to the file the way it does on a local disk.


回答 0

您是否已经看过http://timgolden.me.uk/python/win32_how_do_i/watch_directory_for_changes.html上的可用文档?如果只需要它在Windows下运行,则第二个示例似乎正是您想要的(如果您将目录的路径与要观看的文件之一交换了)。

否则,轮询将可能是唯一真正与平台无关的选项。

注意:我还没有尝试过任何这些解决方案。

Have you already looked at the documentation available on http://timgolden.me.uk/python/win32_how_do_i/watch_directory_for_changes.html? If you only need it to work under Windows the 2nd example seems to be exactly what you want (if you exchange the path of the directory with the one of the file you want to watch).

Otherwise, polling will probably be the only really platform-independent option.

Note: I haven’t tried any of these solutions.


回答 1

您尝试使用看门狗了吗?

Python API库和Shell实用程序可监视文件系统事件。

目录监视变得容易

  • 跨平台API。
  • 一种外壳程序工具,用于响应目录更改而运行命令。

通过快速入门中的一个简单示例快速入门

Did you try using Watchdog?

Python API library and shell utilities to monitor file system events.

Directory monitoring made easy with

  • A cross-platform API.
  • A shell tool to run commands in response to directory changes.

Get started quickly with a simple example in Quickstart


回答 2

如果轮询对您来说足够好,那么我只看“修改时间”文件的统计信息是否发生了变化。阅读:

os.stat(filename).st_mtime

(还要注意,Windows本机更改事件解决方案并非在所有情况下(例如在网络驱动器上)都适用。)

import os

class Monkey(object):
    def __init__(self):
        self._cached_stamp = 0
        self.filename = '/path/to/file'

    def ook(self):
        stamp = os.stat(self.filename).st_mtime
        if stamp != self._cached_stamp:
            self._cached_stamp = stamp
            # File has changed, so do something...

If polling is good enough for you, I’d just watch if the “modified time” file stat changes. To read it:

os.stat(filename).st_mtime

(Also note that the Windows native change event solution does not work in all circumstances, e.g. on network drives.)

import os

class Monkey(object):
    def __init__(self):
        self._cached_stamp = 0
        self.filename = '/path/to/file'

    def ook(self):
        stamp = os.stat(self.filename).st_mtime
        if stamp != self._cached_stamp:
            self._cached_stamp = stamp
            # File has changed, so do something...

回答 3

如果您需要多平台解决方案,请检查QFileSystemWatcher。这里是一个示例代码(未清除):

from PyQt4 import QtCore

@QtCore.pyqtSlot(str)
def directory_changed(path):
    print('Directory Changed!!!')

@QtCore.pyqtSlot(str)
def file_changed(path):
    print('File Changed!!!')

fs_watcher = QtCore.QFileSystemWatcher(['/path/to/files_1', '/path/to/files_2', '/path/to/files_3'])

fs_watcher.connect(fs_watcher, QtCore.SIGNAL('directoryChanged(QString)'), directory_changed)
fs_watcher.connect(fs_watcher, QtCore.SIGNAL('fileChanged(QString)'), file_changed)

If you want a multiplatform solution, then check QFileSystemWatcher. Here an example code (not sanitized):

from PyQt4 import QtCore

@QtCore.pyqtSlot(str)
def directory_changed(path):
    print('Directory Changed!!!')

@QtCore.pyqtSlot(str)
def file_changed(path):
    print('File Changed!!!')

fs_watcher = QtCore.QFileSystemWatcher(['/path/to/files_1', '/path/to/files_2', '/path/to/files_3'])

fs_watcher.connect(fs_watcher, QtCore.SIGNAL('directoryChanged(QString)'), directory_changed)
fs_watcher.connect(fs_watcher, QtCore.SIGNAL('fileChanged(QString)'), file_changed)

回答 4

它不能在Windows上运行(也许使用cygwin吗?),但是对于Unix用户,您应该使用“ fcntl”系统调用。这是Python中的示例。如果您需要用C编写(相同的函数名称),则几乎是相同的代码

import time
import fcntl
import os
import signal

FNAME = "/HOME/TOTO/FILETOWATCH"

def handler(signum, frame):
    print "File %s modified" % (FNAME,)

signal.signal(signal.SIGIO, handler)
fd = os.open(FNAME,  os.O_RDONLY)
fcntl.fcntl(fd, fcntl.F_SETSIG, 0)
fcntl.fcntl(fd, fcntl.F_NOTIFY,
            fcntl.DN_MODIFY | fcntl.DN_CREATE | fcntl.DN_MULTISHOT)

while True:
    time.sleep(10000)

It should not work on windows (maybe with cygwin ?), but for unix user, you should use the “fcntl” system call. Here is an example in Python. It’s mostly the same code if you need to write it in C (same function names)

import time
import fcntl
import os
import signal

FNAME = "/HOME/TOTO/FILETOWATCH"

def handler(signum, frame):
    print "File %s modified" % (FNAME,)

signal.signal(signal.SIGIO, handler)
fd = os.open(FNAME,  os.O_RDONLY)
fcntl.fcntl(fd, fcntl.F_SETSIG, 0)
fcntl.fcntl(fd, fcntl.F_NOTIFY,
            fcntl.DN_MODIFY | fcntl.DN_CREATE | fcntl.DN_MULTISHOT)

while True:
    time.sleep(10000)

回答 5

查看pyinotify

inotify在较新的linux中替换了dnotify(来自较早的答案),并允许文件级而不是目录级的监视。

Check out pyinotify.

inotify replaces dnotify (from an earlier answer) in newer linuxes and allows file-level rather than directory-level monitoring.


回答 6

在对Tim Golden的脚本进行了一些修改之后,我发现以下内容似乎运行良好:

import os

import win32file
import win32con

path_to_watch = "." # look at the current directory
file_to_watch = "test.txt" # look for changes to a file called test.txt

def ProcessNewData( newData ):
    print "Text added: %s"%newData

# Set up the bits we'll need for output
ACTIONS = {
  1 : "Created",
  2 : "Deleted",
  3 : "Updated",
  4 : "Renamed from something",
  5 : "Renamed to something"
}
FILE_LIST_DIRECTORY = 0x0001
hDir = win32file.CreateFile (
  path_to_watch,
  FILE_LIST_DIRECTORY,
  win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE,
  None,
  win32con.OPEN_EXISTING,
  win32con.FILE_FLAG_BACKUP_SEMANTICS,
  None
)

# Open the file we're interested in
a = open(file_to_watch, "r")

# Throw away any exising log data
a.read()

# Wait for new data and call ProcessNewData for each new chunk that's written
while 1:
  # Wait for a change to occur
  results = win32file.ReadDirectoryChangesW (
    hDir,
    1024,
    False,
    win32con.FILE_NOTIFY_CHANGE_LAST_WRITE,
    None,
    None
  )

  # For each change, check to see if it's updating the file we're interested in
  for action, file in results:
    full_filename = os.path.join (path_to_watch, file)
    #print file, ACTIONS.get (action, "Unknown")
    if file == file_to_watch:
        newText = a.read()
        if newText != "":
            ProcessNewData( newText )

它可能可以处理更多的错误检查,但是对于简单地查看日志文件并在将其吐到屏幕上之前对其进行一些处理,这很好。

感谢大家的投入-很棒的东西!

Well after a bit of hacking of Tim Golden’s script, I have the following which seems to work quite well:

import os

import win32file
import win32con

path_to_watch = "." # look at the current directory
file_to_watch = "test.txt" # look for changes to a file called test.txt

def ProcessNewData( newData ):
    print "Text added: %s"%newData

# Set up the bits we'll need for output
ACTIONS = {
  1 : "Created",
  2 : "Deleted",
  3 : "Updated",
  4 : "Renamed from something",
  5 : "Renamed to something"
}
FILE_LIST_DIRECTORY = 0x0001
hDir = win32file.CreateFile (
  path_to_watch,
  FILE_LIST_DIRECTORY,
  win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE,
  None,
  win32con.OPEN_EXISTING,
  win32con.FILE_FLAG_BACKUP_SEMANTICS,
  None
)

# Open the file we're interested in
a = open(file_to_watch, "r")

# Throw away any exising log data
a.read()

# Wait for new data and call ProcessNewData for each new chunk that's written
while 1:
  # Wait for a change to occur
  results = win32file.ReadDirectoryChangesW (
    hDir,
    1024,
    False,
    win32con.FILE_NOTIFY_CHANGE_LAST_WRITE,
    None,
    None
  )

  # For each change, check to see if it's updating the file we're interested in
  for action, file in results:
    full_filename = os.path.join (path_to_watch, file)
    #print file, ACTIONS.get (action, "Unknown")
    if file == file_to_watch:
        newText = a.read()
        if newText != "":
            ProcessNewData( newText )

It could probably do with a load more error checking, but for simply watching a log file and doing some processing on it before spitting it out to the screen, this works well.

Thanks everyone for your input – great stuff!


回答 7

为了观看具有轮询和最小依赖性的单个文件,下面是一个基于Deestan(上图)的答案的充实示例:

import os
import sys 
import time

class Watcher(object):
    running = True
    refresh_delay_secs = 1

    # Constructor
    def __init__(self, watch_file, call_func_on_change=None, *args, **kwargs):
        self._cached_stamp = 0
        self.filename = watch_file
        self.call_func_on_change = call_func_on_change
        self.args = args
        self.kwargs = kwargs

    # Look for changes
    def look(self):
        stamp = os.stat(self.filename).st_mtime
        if stamp != self._cached_stamp:
            self._cached_stamp = stamp
            # File has changed, so do something...
            print('File changed')
            if self.call_func_on_change is not None:
                self.call_func_on_change(*self.args, **self.kwargs)

    # Keep watching in a loop        
    def watch(self):
        while self.running: 
            try: 
                # Look for changes
                time.sleep(self.refresh_delay_secs) 
                self.look() 
            except KeyboardInterrupt: 
                print('\nDone') 
                break 
            except FileNotFoundError:
                # Action on file not found
                pass
            except: 
                print('Unhandled error: %s' % sys.exc_info()[0])

# Call this function each time a change happens
def custom_action(text):
    print(text)

watch_file = 'my_file.txt'

# watcher = Watcher(watch_file)  # simple
watcher = Watcher(watch_file, custom_action, text='yes, changed')  # also call custom action function
watcher.watch()  # start the watch going

For watching a single file with polling, and minimal dependencies, here is a fully fleshed-out example, based on answer from Deestan (above):

import os
import sys 
import time

class Watcher(object):
    running = True
    refresh_delay_secs = 1

    # Constructor
    def __init__(self, watch_file, call_func_on_change=None, *args, **kwargs):
        self._cached_stamp = 0
        self.filename = watch_file
        self.call_func_on_change = call_func_on_change
        self.args = args
        self.kwargs = kwargs

    # Look for changes
    def look(self):
        stamp = os.stat(self.filename).st_mtime
        if stamp != self._cached_stamp:
            self._cached_stamp = stamp
            # File has changed, so do something...
            print('File changed')
            if self.call_func_on_change is not None:
                self.call_func_on_change(*self.args, **self.kwargs)

    # Keep watching in a loop        
    def watch(self):
        while self.running: 
            try: 
                # Look for changes
                time.sleep(self.refresh_delay_secs) 
                self.look() 
            except KeyboardInterrupt: 
                print('\nDone') 
                break 
            except FileNotFoundError:
                # Action on file not found
                pass
            except: 
                print('Unhandled error: %s' % sys.exc_info()[0])

# Call this function each time a change happens
def custom_action(text):
    print(text)

watch_file = 'my_file.txt'

# watcher = Watcher(watch_file)  # simple
watcher = Watcher(watch_file, custom_action, text='yes, changed')  # also call custom action function
watcher.watch()  # start the watch going

回答 8

检查类似问题的回答。您可以在Python中尝试相同的循环。该页面建议:

import time

while 1:
    where = file.tell()
    line = file.readline()
    if not line:
        time.sleep(1)
        file.seek(where)
    else:
        print line, # already has newline

另请参见问题tail()使用Python的文件

Check my answer to a similar question. You could try the same loop in Python. This page suggests:

import time

while 1:
    where = file.tell()
    line = file.readline()
    if not line:
        time.sleep(1)
        file.seek(where)
    else:
        print line, # already has newline

Also see the question tail() a file with Python.


回答 9

对我来说,最简单的解决方案是使用看门狗工具watchmedo

现在从https://pypi.python.org/pypi/watchdog获得一个进程,该进程可以查找目录中的sql文件,并在必要时执行它们。

watchmedo shell-command \
--patterns="*.sql" \
--recursive \
--command='~/Desktop/load_files_into_mysql_database.sh' \
.

Simplest solution for me is using watchdog’s tool watchmedo

From https://pypi.python.org/pypi/watchdog I now have a process that looks up the sql files in a directory and executes them if necessary.

watchmedo shell-command \
--patterns="*.sql" \
--recursive \
--command='~/Desktop/load_files_into_mysql_database.sh' \
.

回答 10

好吧,由于您使用的是Python,因此您可以打开一个文件并继续读取其中的行。

f = open('file.log')

如果读取的行不为空,则对其进行处理。

line = f.readline()
if line:
    // Do what you want with the line

您可能会错过继续拨打readlineEOF的权限。在这种情况下,它将仅返回一个空字符串。并且,将某些内容添加到日志文件后,将根据需要从停止的位置继续读取。

如果您正在寻找使用事件或特定库的解决方案,请在问题中进行指定。否则,我认为这种解决方案就可以了。

Well, since you are using Python, you can just open a file and keep reading lines from it.

f = open('file.log')

If the line read is not empty, you process it.

line = f.readline()
if line:
    // Do what you want with the line

You may be missing that it is ok to keep calling readline at the EOF. It will just keep returning an empty string in this case. And when something is appended to the log file, the reading will continue from where it stopped, as you need.

If you are looking for a solution that uses events, or a particular library, please specify this in your question. Otherwise, I think this solution is just fine.


回答 11

这是Kender代码的简化版本,看起来可以起到相同的作用,并且不会导入整个文件:

# Check file for new data.

import time

f = open(r'c:\temp\test.txt', 'r')

while True:

    line = f.readline()
    if not line:
        time.sleep(1)
        print 'Nothing New'
    else:
        print 'Call Function: ', line

Here is a simplified version of Kender’s code that appears to do the same trick and does not import the entire file:

# Check file for new data.

import time

f = open(r'c:\temp\test.txt', 'r')

while True:

    line = f.readline()
    if not line:
        time.sleep(1)
        print 'Nothing New'
    else:
        print 'Call Function: ', line

回答 12

这是Tim Goldan脚本的另一种修改,该脚本可在unix类型上运行,并通过使用dict(file => time)添加了一个简单的文件修改监视程序。

用法:whateverName.py path_to_dir_to_watch

#!/usr/bin/env python

import os, sys, time

def files_to_timestamp(path):
    files = [os.path.join(path, f) for f in os.listdir(path)]
    return dict ([(f, os.path.getmtime(f)) for f in files])

if __name__ == "__main__":

    path_to_watch = sys.argv[1]
    print('Watching {}..'.format(path_to_watch))

    before = files_to_timestamp(path_to_watch)

    while 1:
        time.sleep (2)
        after = files_to_timestamp(path_to_watch)

        added = [f for f in after.keys() if not f in before.keys()]
        removed = [f for f in before.keys() if not f in after.keys()]
        modified = []

        for f in before.keys():
            if not f in removed:
                if os.path.getmtime(f) != before.get(f):
                    modified.append(f)

        if added: print('Added: {}'.format(', '.join(added)))
        if removed: print('Removed: {}'.format(', '.join(removed)))
        if modified: print('Modified: {}'.format(', '.join(modified)))

        before = after

This is another modification of Tim Goldan’s script that runs on unix types and adds a simple watcher for file modification by using a dict (file=>time).

usage: whateverName.py path_to_dir_to_watch

#!/usr/bin/env python

import os, sys, time

def files_to_timestamp(path):
    files = [os.path.join(path, f) for f in os.listdir(path)]
    return dict ([(f, os.path.getmtime(f)) for f in files])

if __name__ == "__main__":

    path_to_watch = sys.argv[1]
    print('Watching {}..'.format(path_to_watch))

    before = files_to_timestamp(path_to_watch)

    while 1:
        time.sleep (2)
        after = files_to_timestamp(path_to_watch)

        added = [f for f in after.keys() if not f in before.keys()]
        removed = [f for f in before.keys() if not f in after.keys()]
        modified = []

        for f in before.keys():
            if not f in removed:
                if os.path.getmtime(f) != before.get(f):
                    modified.append(f)

        if added: print('Added: {}'.format(', '.join(added)))
        if removed: print('Removed: {}'.format(', '.join(removed)))
        if modified: print('Modified: {}'.format(', '.join(modified)))

        before = after

回答 13

正如您在由Horst Gutmann指出的Tim Golden的文章中所看到,WIN32比较复杂,并且监视目录,而不是单个文件。

我想建议您研究IronPython,它是一个.NET python实现。借助IronPython,您可以使用所有.NET功能-包括

System.IO.FileSystemWatcher

使用简单的事件接口处理单个文件。

As you can see in Tim Golden’s article, pointed by Horst Gutmann, WIN32 is relatively complex and watches directories, not a single file.

I’d like to suggest you look into IronPython, which is a .NET python implementation. With IronPython you can use all the .NET functionality – including

System.IO.FileSystemWatcher

Which handles single files with a simple Event interface.


回答 14

这是检查文件更改的示例。这样做可能不是最好的方法,但是肯定是很短的方法。

对源进行更改后重新启动应用程序的便捷工具。我在使用pygame玩游戏时做到了这一点,因此我可以看到文件保存后立即发生了效果。

在pygame中使用时,请确保“ while”循环中的内容已放置在您的游戏循环中,也称为update或其他内容。否则,您的应用程序将陷入无限循环,并且您将看不到游戏更新。

file_size_stored = os.stat('neuron.py').st_size

  while True:
    try:
      file_size_current = os.stat('neuron.py').st_size
      if file_size_stored != file_size_current:
        restart_program()
    except: 
      pass

如果您想要我在网上找到的重启代码。这里是。(与问题无关,尽管可以派上用场)

def restart_program(): #restart application
    python = sys.executable
    os.execl(python, python, * sys.argv)

让电子做您想做的事情变得有趣。

This is an example of checking a file for changes. One that may not be the best way of doing it, but it sure is a short way.

Handy tool for restarting application when changes have been made to the source. I made this when playing with pygame so I can see effects take place immediately after file save.

When used in pygame make sure the stuff in the ‘while’ loop is placed in your game loop aka update or whatever. Otherwise your application will get stuck in an infinite loop and you will not see your game updating.

file_size_stored = os.stat('neuron.py').st_size

  while True:
    try:
      file_size_current = os.stat('neuron.py').st_size
      if file_size_stored != file_size_current:
        restart_program()
    except: 
      pass

In case you wanted the restart code which I found on the web. Here it is. (Not relevant to the question, though it could come in handy)

def restart_program(): #restart application
    python = sys.executable
    os.execl(python, python, * sys.argv)

Have fun making electrons do what you want them to do.


回答 15

ACTIONS = {
  1 : "Created",
  2 : "Deleted",
  3 : "Updated",
  4 : "Renamed from something",
  5 : "Renamed to something"
}
FILE_LIST_DIRECTORY = 0x0001

class myThread (threading.Thread):
    def __init__(self, threadID, fileName, directory, origin):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.fileName = fileName
        self.daemon = True
        self.dir = directory
        self.originalFile = origin
    def run(self):
        startMonitor(self.fileName, self.dir, self.originalFile)

def startMonitor(fileMonitoring,dirPath,originalFile):
    hDir = win32file.CreateFile (
        dirPath,
        FILE_LIST_DIRECTORY,
        win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE,
        None,
        win32con.OPEN_EXISTING,
        win32con.FILE_FLAG_BACKUP_SEMANTICS,
        None
    )
    # Wait for new data and call ProcessNewData for each new chunk that's
    # written
    while 1:
        # Wait for a change to occur
        results = win32file.ReadDirectoryChangesW (
            hDir,
            1024,
            False,
            win32con.FILE_NOTIFY_CHANGE_LAST_WRITE,
            None,
            None
        )
        # For each change, check to see if it's updating the file we're
        # interested in
        for action, file_M in results:
            full_filename = os.path.join (dirPath, file_M)
            #print file, ACTIONS.get (action, "Unknown")
            if len(full_filename) == len(fileMonitoring) and action == 3:
                #copy to main file
                ...
ACTIONS = {
  1 : "Created",
  2 : "Deleted",
  3 : "Updated",
  4 : "Renamed from something",
  5 : "Renamed to something"
}
FILE_LIST_DIRECTORY = 0x0001

class myThread (threading.Thread):
    def __init__(self, threadID, fileName, directory, origin):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.fileName = fileName
        self.daemon = True
        self.dir = directory
        self.originalFile = origin
    def run(self):
        startMonitor(self.fileName, self.dir, self.originalFile)

def startMonitor(fileMonitoring,dirPath,originalFile):
    hDir = win32file.CreateFile (
        dirPath,
        FILE_LIST_DIRECTORY,
        win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE,
        None,
        win32con.OPEN_EXISTING,
        win32con.FILE_FLAG_BACKUP_SEMANTICS,
        None
    )
    # Wait for new data and call ProcessNewData for each new chunk that's
    # written
    while 1:
        # Wait for a change to occur
        results = win32file.ReadDirectoryChangesW (
            hDir,
            1024,
            False,
            win32con.FILE_NOTIFY_CHANGE_LAST_WRITE,
            None,
            None
        )
        # For each change, check to see if it's updating the file we're
        # interested in
        for action, file_M in results:
            full_filename = os.path.join (dirPath, file_M)
            #print file, ACTIONS.get (action, "Unknown")
            if len(full_filename) == len(fileMonitoring) and action == 3:
                #copy to main file
                ...

回答 16

这是一个示例,用于观察每秒写入不超过一行但通常少得多的输入文件。目标是将最后一行(最新写入)追加到指定的输出文件。我已从我的一个项目中复制了此内容,并删除了所有不相关的行。您必须填写或更改缺少的符号。

from PyQt5.QtCore import QFileSystemWatcher, QSettings, QThread
from ui_main_window import Ui_MainWindow   # Qt Creator gen'd 

class MainWindow(QMainWindow, Ui_MainWindow):
    def __init__(self, parent=None):
        QMainWindow.__init__(self, parent)
        Ui_MainWindow.__init__(self)
        self._fileWatcher = QFileSystemWatcher()
        self._fileWatcher.fileChanged.connect(self.fileChanged)

    def fileChanged(self, filepath):
        QThread.msleep(300)    # Reqd on some machines, give chance for write to complete
        # ^^ About to test this, may need more sophisticated solution
        with open(filepath) as file:
            lastLine = list(file)[-1]
        destPath = self._filemap[filepath]['dest file']
        with open(destPath, 'a') as out_file:               # a= append
            out_file.writelines([lastLine])

当然,并不是严格要求包含QMainWindow类。您可以单独使用QFileSystemWatcher。

Here’s an example geared toward watching input files that write no more than one line per second but usually a lot less. The goal is to append the last line (most recent write) to the specified output file. I’ve copied this from one of my projects and just deleted all the irrelevant lines. You’ll have to fill in or change the missing symbols.

from PyQt5.QtCore import QFileSystemWatcher, QSettings, QThread
from ui_main_window import Ui_MainWindow   # Qt Creator gen'd 

class MainWindow(QMainWindow, Ui_MainWindow):
    def __init__(self, parent=None):
        QMainWindow.__init__(self, parent)
        Ui_MainWindow.__init__(self)
        self._fileWatcher = QFileSystemWatcher()
        self._fileWatcher.fileChanged.connect(self.fileChanged)

    def fileChanged(self, filepath):
        QThread.msleep(300)    # Reqd on some machines, give chance for write to complete
        # ^^ About to test this, may need more sophisticated solution
        with open(filepath) as file:
            lastLine = list(file)[-1]
        destPath = self._filemap[filepath]['dest file']
        with open(destPath, 'a') as out_file:               # a= append
            out_file.writelines([lastLine])

Of course, the encompassing QMainWindow class is not strictly required, ie. you can use QFileSystemWatcher alone.


回答 17

您还可以使用一个名为repyt的简单库,这是一个示例:

repyt ./app.py

You can also use a simple library called repyt, here is an example:

repyt ./app.py

回答 18

似乎没有人发布fswatch。它是一个跨平台的文件系统监视程序。只需安装,运行并按照提示进行操作即可。

我已经将它与python和golang程序一起使用,并且可以正常工作。

Seems that no one has posted fswatch. It is a cross-platform file system watcher. Just install it, run it and follow the prompts.

I’ve used it with python and golang programs and it just works.


回答 19

相关的@ 4Oh4解决方案可以平滑地更改要查看的文件列表;

import os
import sys
import time

class Watcher(object):
    running = True
    refresh_delay_secs = 1

    # Constructor
    def __init__(self, watch_files, call_func_on_change=None, *args, **kwargs):
        self._cached_stamp = 0
        self._cached_stamp_files = {}
        self.filenames = watch_files
        self.call_func_on_change = call_func_on_change
        self.args = args
        self.kwargs = kwargs

    # Look for changes
    def look(self):
        for file in self.filenames:
            stamp = os.stat(file).st_mtime
            if not file in self._cached_stamp_files:
                self._cached_stamp_files[file] = 0
            if stamp != self._cached_stamp_files[file]:
                self._cached_stamp_files[file] = stamp
                # File has changed, so do something...
                file_to_read = open(file, 'r')
                value = file_to_read.read()
                print("value from file", value)
                file_to_read.seek(0)
                if self.call_func_on_change is not None:
                    self.call_func_on_change(*self.args, **self.kwargs)

    # Keep watching in a loop
    def watch(self):
        while self.running:
            try:
                # Look for changes
                time.sleep(self.refresh_delay_secs)
                self.look()
            except KeyboardInterrupt:
                print('\nDone')
                break
            except FileNotFoundError:
                # Action on file not found
                pass
            except Exception as e:
                print(e)
                print('Unhandled error: %s' % sys.exc_info()[0])

# Call this function each time a change happens
def custom_action(text):
    print(text)
    # pass

watch_files = ['/Users/mexekanez/my_file.txt', '/Users/mexekanez/my_file1.txt']

# watcher = Watcher(watch_file)  # simple



if __name__ == "__main__":
    watcher = Watcher(watch_files, custom_action, text='yes, changed')  # also call custom action function
    watcher.watch()  # start the watch going

related @4Oh4 solution a smooth change for a list of files to watch;

import os
import sys
import time

class Watcher(object):
    running = True
    refresh_delay_secs = 1

    # Constructor
    def __init__(self, watch_files, call_func_on_change=None, *args, **kwargs):
        self._cached_stamp = 0
        self._cached_stamp_files = {}
        self.filenames = watch_files
        self.call_func_on_change = call_func_on_change
        self.args = args
        self.kwargs = kwargs

    # Look for changes
    def look(self):
        for file in self.filenames:
            stamp = os.stat(file).st_mtime
            if not file in self._cached_stamp_files:
                self._cached_stamp_files[file] = 0
            if stamp != self._cached_stamp_files[file]:
                self._cached_stamp_files[file] = stamp
                # File has changed, so do something...
                file_to_read = open(file, 'r')
                value = file_to_read.read()
                print("value from file", value)
                file_to_read.seek(0)
                if self.call_func_on_change is not None:
                    self.call_func_on_change(*self.args, **self.kwargs)

    # Keep watching in a loop
    def watch(self):
        while self.running:
            try:
                # Look for changes
                time.sleep(self.refresh_delay_secs)
                self.look()
            except KeyboardInterrupt:
                print('\nDone')
                break
            except FileNotFoundError:
                # Action on file not found
                pass
            except Exception as e:
                print(e)
                print('Unhandled error: %s' % sys.exc_info()[0])

# Call this function each time a change happens
def custom_action(text):
    print(text)
    # pass

watch_files = ['/Users/mexekanez/my_file.txt', '/Users/mexekanez/my_file1.txt']

# watcher = Watcher(watch_file)  # simple



if __name__ == "__main__":
    watcher = Watcher(watch_files, custom_action, text='yes, changed')  # also call custom action function
    watcher.watch()  # start the watch going

回答 20

最好和最简单的解决方案是使用pygtail:https ://pypi.python.org/pypi/pygtail

from pygtail import Pygtail
import sys

while True:
    for line in Pygtail("some.log"):
        sys.stdout.write(line)

The best and simplest solution is to use pygtail: https://pypi.python.org/pypi/pygtail

from pygtail import Pygtail
import sys

while True:
    for line in Pygtail("some.log"):
        sys.stdout.write(line)

回答 21

我不知道Windows的任何特定功能。您可以尝试每秒钟/分钟/小时获取文件的MD5哈希值(取决于您需要的速度),并将其与最后一个哈希值进行比较。如果不同,您就知道文件已更改,并读出了最新的行。

I don’t know any Windows specific function. You could try getting the MD5 hash of the file every second/minute/hour (depends on how fast you need it) and compare it to the last hash. When it differs you know the file has been changed and you read out the newest lines.


回答 22

我会尝试这样的事情。

    try:
            f = open(filePath)
    except IOError:
            print "No such file: %s" % filePath
            raw_input("Press Enter to close window")
    try:
            lines = f.readlines()
            while True:
                    line = f.readline()
                    try:
                            if not line:
                                    time.sleep(1)
                            else:
                                    functionThatAnalisesTheLine(line)
                    except Exception, e:
                            # handle the exception somehow (for example, log the trace) and raise the same exception again
                            raw_input("Press Enter to close window")
                            raise e
    finally:
            f.close()

循环检查自上次读取文件以来是否存在新行-如果存在,则将其读取并传递给functionThatAnalisesTheLine函数。如果不是,脚本将等待1秒钟,然后重试该过程。

I’d try something like this.

    try:
            f = open(filePath)
    except IOError:
            print "No such file: %s" % filePath
            raw_input("Press Enter to close window")
    try:
            lines = f.readlines()
            while True:
                    line = f.readline()
                    try:
                            if not line:
                                    time.sleep(1)
                            else:
                                    functionThatAnalisesTheLine(line)
                    except Exception, e:
                            # handle the exception somehow (for example, log the trace) and raise the same exception again
                            raw_input("Press Enter to close window")
                            raise e
    finally:
            f.close()

The loop checks if there is a new line(s) since last time file was read – if there is, it’s read and passed to the functionThatAnalisesTheLine function. If not, script waits 1 second and retries the process.


TypeError:需要类似字节的对象,而在Python3中写入文件时不是’str’

问题:TypeError:需要类似字节的对象,而在Python3中写入文件时不是’str’

我最近已经迁移到Py 3.5。这段代码在Python 2.7中正常工作:

with open(fname, 'rb') as f:
    lines = [x.strip() for x in f.readlines()]

for line in lines:
    tmp = line.strip().lower()
    if 'some-pattern' in tmp: continue
    # ... code

升级到3.5后,我得到了:

TypeError: a bytes-like object is required, not 'str'

最后一行错误(模式搜索代码)。

我试过使用.decode()语句两侧的函数,也尝试过:

if tmp.find('some-pattern') != -1: continue

-无济于事。

我能够很快解决几乎所有的2:3问题,但是这个小小的声明困扰着我。

I’ve very recently migrated to Py 3.5. This code was working properly in Python 2.7:

with open(fname, 'rb') as f:
    lines = [x.strip() for x in f.readlines()]

for line in lines:
    tmp = line.strip().lower()
    if 'some-pattern' in tmp: continue
    # ... code

After upgrading to 3.5, I’m getting the:

TypeError: a bytes-like object is required, not 'str'

error on the last line (the pattern search code).

I’ve tried using the .decode() function on either side of the statement, also tried:

if tmp.find('some-pattern') != -1: continue

– to no avail.

I was able to resolve almost all 2:3 issues quickly, but this little statement is bugging me.


回答 0

您以二进制模式打开文件:

with open(fname, 'rb') as f:

这意味着从文件读取的所有数据都作为bytes对象而不是作为对象返回str。然后,您不能在收容测试中使用字符串:

if 'some-pattern' in tmp: continue

您必须改为使用一个bytes对象进行测试tmp

if b'some-pattern' in tmp: continue

或以文本文件形式打开文件,而不是将'rb'模式替换为'r'

You opened the file in binary mode:

with open(fname, 'rb') as f:

This means that all data read from the file is returned as bytes objects, not str. You cannot then use a string in a containment test:

if 'some-pattern' in tmp: continue

You’d have to use a bytes object to test against tmp instead:

if b'some-pattern' in tmp: continue

or open the file as a textfile instead by replacing the 'rb' mode with 'r'.


回答 1

您可以使用以下方式对字符串进行编码 .encode()

例:

'Hello World'.encode()

You can encode your string by using .encode()

Example:

'Hello World'.encode()

回答 2

就像已经提到的一样,您正在以二进制模式读取文件,然后创建字节列表。在下面的for循环中,您将字符串与字节进行比较,这就是代码失败的地方。

在将字节添加到列表时对字节进行解码应该可以。更改后的代码应如下所示:

with open(fname, 'rb') as f:
    lines = [x.decode('utf8').strip() for x in f.readlines()]

字节类型是在Python 3中引入的,这就是为什么您的代码在Python 2中可以工作的原因。在Python 2中,没有字节的数据类型:

>>> s=bytes('hello')
>>> type(s)
<type 'str'>

Like it has been already mentioned, you are reading the file in binary mode and then creating a list of bytes. In your following for loop you are comparing string to bytes and that is where the code is failing.

Decoding the bytes while adding to the list should work. The changed code should look as follows:

with open(fname, 'rb') as f:
    lines = [x.decode('utf8').strip() for x in f.readlines()]

The bytes type was introduced in Python 3 and that is why your code worked in Python 2. In Python 2 there was no data type for bytes:

>>> s=bytes('hello')
>>> type(s)
<type 'str'>

回答 3

您必须从wb更改为w:

def __init__(self):
    self.myCsv = csv.writer(open('Item.csv', 'wb')) 
    self.myCsv.writerow(['title', 'link'])

def __init__(self):
    self.myCsv = csv.writer(open('Item.csv', 'w'))
    self.myCsv.writerow(['title', 'link'])

更改此设置后,错误消失,但是您无法写入文件(以我为例)。毕竟,我没有答案吗?

来源:如何删除^ M

更改为“ rb”会给我带来另一个错误:io.UnsupportedOperation:写入

You have to change from wb to w:

def __init__(self):
    self.myCsv = csv.writer(open('Item.csv', 'wb')) 
    self.myCsv.writerow(['title', 'link'])

to

def __init__(self):
    self.myCsv = csv.writer(open('Item.csv', 'w'))
    self.myCsv.writerow(['title', 'link'])

After changing this, the error disappears, but you can’t write to the file (in my case). So after all, I don’t have an answer?

Source: How to remove ^M

Changing to ‘rb’ brings me the other error: io.UnsupportedOperation: write


回答 4

对于这个小例子:import socket

mysock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
mysock.connect(('www.py4inf.com', 80))
mysock.send(**b**'GET http://www.py4inf.com/code/romeo.txt HTTP/1.0\n\n')

while True:
    data = mysock.recv(512)
    if ( len(data) < 1 ) :
        break
    print (data);

mysock.close()

在’GET http://www.py4inf.com/code/romeo.txt HTTP / 1.0 \ n \ n’ 之前添加“ b” 解决了我的问题

for this small example: import socket

mysock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
mysock.connect(('www.py4inf.com', 80))
mysock.send(**b**'GET http://www.py4inf.com/code/romeo.txt HTTP/1.0\n\n')

while True:
    data = mysock.recv(512)
    if ( len(data) < 1 ) :
        break
    print (data);

mysock.close()

adding the “b” before ‘GET http://www.py4inf.com/code/romeo.txt HTTP/1.0\n\n’ solved my problem


回答 5

与单引号中给出的硬编码字符串值一起使用encode()函数。

例如:

file.write(answers[i] + '\n'.encode())

要么

line.split(' +++$+++ '.encode())

Use encode() function along with hardcoded String value given in a single quote.

Ex:

file.write(answers[i] + '\n'.encode())

OR

line.split(' +++$+++ '.encode())

回答 6

您以二进制模式打开文件:

以下代码将引发TypeError:需要一个类似字节的对象,而不是’str’。

for line in lines:
    print(type(line))# <class 'bytes'>
    if 'substring' in line:
       print('success')

以下代码将起作用-您必须使用encode()函数:

for line in lines:
    line = line.decode()
    print(type(line))# <class 'str'>
    if 'substring' in line:
       print('success')

You opened the file in binary mode:

The following code will throw a TypeError: a bytes-like object is required, not ‘str’.

for line in lines:
    print(type(line))# <class 'bytes'>
    if 'substring' in line:
       print('success')

The following code will work – you have to use the decode() function:

for line in lines:
    line = line.decode()
    print(type(line))# <class 'str'>
    if 'substring' in line:
       print('success')

回答 7

为什么不尝试以文本形式打开文件?

with open(fname, 'rt') as f:
    lines = [x.strip() for x in f.readlines()]

此外,以下是官方页面上python 3.x的链接:https : //docs.python.org/3/library/io.html 这是开放功能:https : //docs.python.org/3 /library/functions.html#open

如果您确实想将其作为二进制文件处理,则考虑对字符串进行编码。

why not try opening your file as text?

with open(fname, 'rt') as f:
    lines = [x.strip() for x in f.readlines()]

Additionally here is a link for python 3.x on the official page: https://docs.python.org/3/library/io.html And this is the open function: https://docs.python.org/3/library/functions.html#open

If you are really trying to handle it as a binary then consider encoding your string.


回答 8

当我尝试将char(或字符串)转换为时,出现此错误bytes,代码在Python 2.7中是这样的:

# -*- coding: utf-8 -*-
print( bytes('ò') )

这是Python 2.7处理Unicode字符的方式。

这在Python 3.6中不起作用,因为bytes需要一个额外的参数来编码,但这可能有点棘手,因为不同的编码可能会输出不同的结果:

print( bytes('ò', 'iso_8859_1') ) # prints: b'\xf2'
print( bytes('ò', 'utf-8') ) # prints: b'\xc3\xb2'

就我而言,我不得不使用 iso_8859_1在对字节进行编码时来解决问题。

希望这对某人有帮助。

I got this error when I was trying to convert a char (or string) to bytes, the code was something like this with Python 2.7:

# -*- coding: utf-8 -*-
print( bytes('ò') )

This is the way of Python 2.7 when dealing with unicode chars.

This won’t work with Python 3.6, since bytes require an extra argument for encoding, but this can be little tricky, since different encoding may output different result:

print( bytes('ò', 'iso_8859_1') ) # prints: b'\xf2'
print( bytes('ò', 'utf-8') ) # prints: b'\xc3\xb2'

In my case I had to use iso_8859_1 when encoding bytes in order to solve the issue.

Hope this helps someone.


Python中的目录树列表

问题:Python中的目录树列表

如何获取Python给定目录中所有文件(和目录)的列表?

How do I get a list of all files (and directories) in a given directory in Python?


回答 0

这是遍历目录树中每个文件和目录的一种方式:

import os

for dirname, dirnames, filenames in os.walk('.'):
    # print path to all subdirectories first.
    for subdirname in dirnames:
        print(os.path.join(dirname, subdirname))

    # print path to all filenames.
    for filename in filenames:
        print(os.path.join(dirname, filename))

    # Advanced usage:
    # editing the 'dirnames' list will stop os.walk() from recursing into there.
    if '.git' in dirnames:
        # don't go into any .git directories.
        dirnames.remove('.git')

This is a way to traverse every file and directory in a directory tree:

import os

for dirname, dirnames, filenames in os.walk('.'):
    # print path to all subdirectories first.
    for subdirname in dirnames:
        print(os.path.join(dirname, subdirname))

    # print path to all filenames.
    for filename in filenames:
        print(os.path.join(dirname, filename))

    # Advanced usage:
    # editing the 'dirnames' list will stop os.walk() from recursing into there.
    if '.git' in dirnames:
        # don't go into any .git directories.
        dirnames.remove('.git')

回答 1

您可以使用

os.listdir(path)

作为参考和更多的os函数,请看这里:

You can use

os.listdir(path)

For reference and more os functions look here:


回答 2

这是我经常使用的辅助函数:

import os

def listdir_fullpath(d):
    return [os.path.join(d, f) for f in os.listdir(d)]

Here’s a helper function I use quite often:

import os

def listdir_fullpath(d):
    return [os.path.join(d, f) for f in os.listdir(d)]

回答 3

import os

for filename in os.listdir("C:\\temp"):
    print  filename
import os

for filename in os.listdir("C:\\temp"):
    print  filename

回答 4

如果您需要遍历功能,那么还可以使用一个模块。例如:

import glob
glob.glob('./[0-9].*')

将返回类似:

['./1.gif', './2.txt']

请参阅此处的文档。

If you need globbing abilities, there’s a module for that as well. For example:

import glob
glob.glob('./[0-9].*')

will return something like:

['./1.gif', './2.txt']

See the documentation here.


回答 5

尝试这个:

import os
for top, dirs, files in os.walk('./'):
    for nm in files:       
        print os.path.join(top, nm)

Try this:

import os
for top, dirs, files in os.walk('./'):
    for nm in files:       
        print os.path.join(top, nm)

回答 6

对于当前工作目录中的文件,但未指定路径

Python 2.7:

import os
os.listdir(os.getcwd())

Python 3.x:

import os
os.listdir()

感谢Stam Kaly对python 3.x的评论

For files in current working directory without specifying a path

Python 2.7:

import os
os.listdir(os.getcwd())

Python 3.x:

import os
os.listdir()

Thanks to Stam Kaly for comment on python 3.x


回答 7

递归实现

import os

def scan_dir(dir):
    for name in os.listdir(dir):
        path = os.path.join(dir, name)
        if os.path.isfile(path):
            print path
        else:
            scan_dir(path)

A recursive implementation

import os

def scan_dir(dir):
    for name in os.listdir(dir):
        path = os.path.join(dir, name)
        if os.path.isfile(path):
            print path
        else:
            scan_dir(path)

回答 8

我写了一个很长的版本,其中包含了我可能需要的所有选项:http : //sam.nipl.net/code/python/find.py

我想它也适合这里:

#!/usr/bin/env python

import os
import sys

def ls(dir, hidden=False, relative=True):
    nodes = []
    for nm in os.listdir(dir):
        if not hidden and nm.startswith('.'):
            continue
        if not relative:
            nm = os.path.join(dir, nm)
        nodes.append(nm)
    nodes.sort()
    return nodes

def find(root, files=True, dirs=False, hidden=False, relative=True, topdown=True):
    root = os.path.join(root, '')  # add slash if not there
    for parent, ldirs, lfiles in os.walk(root, topdown=topdown):
        if relative:
            parent = parent[len(root):]
        if dirs and parent:
            yield os.path.join(parent, '')
        if not hidden:
            lfiles   = [nm for nm in lfiles if not nm.startswith('.')]
            ldirs[:] = [nm for nm in ldirs  if not nm.startswith('.')]  # in place
        if files:
            lfiles.sort()
            for nm in lfiles:
                nm = os.path.join(parent, nm)
                yield nm

def test(root):
    print "* directory listing, with hidden files:"
    print ls(root, hidden=True)
    print
    print "* recursive listing, with dirs, but no hidden files:"
    for f in find(root, dirs=True):
        print f
    print

if __name__ == "__main__":
    test(*sys.argv[1:])

I wrote a long version, with all the options I might need: http://sam.nipl.net/code/python/find.py

I guess it will fit here too:

#!/usr/bin/env python

import os
import sys

def ls(dir, hidden=False, relative=True):
    nodes = []
    for nm in os.listdir(dir):
        if not hidden and nm.startswith('.'):
            continue
        if not relative:
            nm = os.path.join(dir, nm)
        nodes.append(nm)
    nodes.sort()
    return nodes

def find(root, files=True, dirs=False, hidden=False, relative=True, topdown=True):
    root = os.path.join(root, '')  # add slash if not there
    for parent, ldirs, lfiles in os.walk(root, topdown=topdown):
        if relative:
            parent = parent[len(root):]
        if dirs and parent:
            yield os.path.join(parent, '')
        if not hidden:
            lfiles   = [nm for nm in lfiles if not nm.startswith('.')]
            ldirs[:] = [nm for nm in ldirs  if not nm.startswith('.')]  # in place
        if files:
            lfiles.sort()
            for nm in lfiles:
                nm = os.path.join(parent, nm)
                yield nm

def test(root):
    print "* directory listing, with hidden files:"
    print ls(root, hidden=True)
    print
    print "* recursive listing, with dirs, but no hidden files:"
    for f in find(root, dirs=True):
        print f
    print

if __name__ == "__main__":
    test(*sys.argv[1:])

回答 9

这是另一种选择。

os.scandir(path='.')

它返回os.DirEntry对象的迭代器,该对象与path所给目录中的条目(以及文件属性信息)相对应。

例:

with os.scandir(path) as it:
    for entry in it:
        if not entry.name.startswith('.'):
            print(entry.name)

使用scandir()而不是listdir()可以显着提高还需要文件类型或文件属性信息的代码的性能,因为如果操作系统在扫描目录时提供了os.DirEntry对象,则该信息会公开。所有的os.DirEntry方法都可以执行系统调用,但是is_dir()和is_file()通常只需要系统调用即可进行符号链接。os.DirEntry.stat()在Unix上始终需要系统调用,而在Windows上只需要一个系统调用即可。

Python文档

Here is another option.

os.scandir(path='.')

It returns an iterator of os.DirEntry objects corresponding to the entries (along with file attribute information) in the directory given by path.

Example:

with os.scandir(path) as it:
    for entry in it:
        if not entry.name.startswith('.'):
            print(entry.name)

Using scandir() instead of listdir() can significantly increase the performance of code that also needs file type or file attribute information, because os.DirEntry objects expose this information if the operating system provides it when scanning a directory. All os.DirEntry methods may perform a system call, but is_dir() and is_file() usually only require a system call for symbolic links; os.DirEntry.stat() always requires a system call on Unix but only requires one for symbolic links on Windows.

Python Docs


回答 10

虽然os.listdir()可以很好地生成文件名和目录名列表,但是一旦拥有了这些文件名,就经常想做更多的事情-在Python3中,pathlib使其他琐事变得简单。让我们看一下,看看您是否像我一样喜欢它。

要列出目录内容,请构造一个Path对象并获取迭代器:

In [16]: Path('/etc').iterdir()
Out[16]: <generator object Path.iterdir at 0x110853fc0>

如果我们只想要事物名称列表:

In [17]: [x.name for x in Path('/etc').iterdir()]
Out[17]:
['emond.d',
 'ntp-restrict.conf',
 'periodic',

如果您只想要Dirs:

In [18]: [x.name for x in Path('/etc').iterdir() if x.is_dir()]
Out[18]:
['emond.d',
 'periodic',
 'mach_init.d',

如果您想要该树中所有conf文件的名称:

In [20]: [x.name for x in Path('/etc').glob('**/*.conf')]
Out[20]:
['ntp-restrict.conf',
 'dnsextd.conf',
 'syslog.conf',

如果要在树中的conf文件列表> = 1K中:

In [23]: [x.name for x in Path('/etc').glob('**/*.conf') if x.stat().st_size > 1024]
Out[23]:
['dnsextd.conf',
 'pf.conf',
 'autofs.conf',

解决相对路径变得容易:

In [32]: Path('../Operational Metrics.md').resolve()
Out[32]: PosixPath('/Users/starver/code/xxxx/Operational Metrics.md')

使用路径导航非常清晰(尽管出乎意料):

In [10]: p = Path('.')

In [11]: core = p / 'web' / 'core'

In [13]: [x for x in core.iterdir() if x.is_file()]
Out[13]:
[PosixPath('web/core/metrics.py'),
 PosixPath('web/core/services.py'),
 PosixPath('web/core/querysets.py'),

While os.listdir() is fine for generating a list of file and dir names, frequently you want to do more once you have those names – and in Python3, pathlib makes those other chores simple. Let’s take a look and see if you like it as much as I do.

To list dir contents, construct a Path object and grab the iterator:

In [16]: Path('/etc').iterdir()
Out[16]: <generator object Path.iterdir at 0x110853fc0>

If we want just a list of names of things:

In [17]: [x.name for x in Path('/etc').iterdir()]
Out[17]:
['emond.d',
 'ntp-restrict.conf',
 'periodic',

If you want just the dirs:

In [18]: [x.name for x in Path('/etc').iterdir() if x.is_dir()]
Out[18]:
['emond.d',
 'periodic',
 'mach_init.d',

If you want the names of all conf files in that tree:

In [20]: [x.name for x in Path('/etc').glob('**/*.conf')]
Out[20]:
['ntp-restrict.conf',
 'dnsextd.conf',
 'syslog.conf',

If you want a list of conf files in the tree >= 1K:

In [23]: [x.name for x in Path('/etc').glob('**/*.conf') if x.stat().st_size > 1024]
Out[23]:
['dnsextd.conf',
 'pf.conf',
 'autofs.conf',

Resolving relative paths become easy:

In [32]: Path('../Operational Metrics.md').resolve()
Out[32]: PosixPath('/Users/starver/code/xxxx/Operational Metrics.md')

Navigating with a Path is pretty clear (although unexpected):

In [10]: p = Path('.')

In [11]: core = p / 'web' / 'core'

In [13]: [x for x in core.iterdir() if x.is_file()]
Out[13]:
[PosixPath('web/core/metrics.py'),
 PosixPath('web/core/services.py'),
 PosixPath('web/core/querysets.py'),

回答 11

一个很好的衬垫,可以递归地仅列出文件。我在setup.py package_data指令中使用了此命令:

import os

[os.path.join(x[0],y) for x in os.walk('<some_directory>') for y in x[2]]

我知道这不是问题的答案,但可能会派上用场

A nice one liner to list only the files recursively. I used this in my setup.py package_data directive:

import os

[os.path.join(x[0],y) for x in os.walk('<some_directory>') for y in x[2]]

I know it’s not the answer to the question, but may come in handy


回答 12

对于Python 2

#!/bin/python2

import os

def scan_dir(path):
    print map(os.path.abspath, os.listdir(pwd))

对于Python 3

对于过滤器和地图,您需要使用list()包装它们

#!/bin/python3

import os

def scan_dir(path):
    print(list(map(os.path.abspath, os.listdir(pwd))))

现在的建议是,用生成器表达式或列表推导替换map和filter的用法:

#!/bin/python

import os

def scan_dir(path):
    print([os.path.abspath(f) for f in os.listdir(path)])

For Python 2

#!/bin/python2

import os

def scan_dir(path):
    print map(os.path.abspath, os.listdir(pwd))

For Python 3

For filter and map, you need wrap them with list()

#!/bin/python3

import os

def scan_dir(path):
    print(list(map(os.path.abspath, os.listdir(pwd))))

The recommendation now is that you replace your usage of map and filter with generators expressions or list comprehensions:

#!/bin/python

import os

def scan_dir(path):
    print([os.path.abspath(f) for f in os.listdir(path)])

回答 13

这是一行Pythonic版本:

import os
dir = 'given_directory_name'
filenames = [os.path.join(os.path.dirname(os.path.abspath(__file__)),dir,i) for i in os.listdir(dir)]

此代码列出了给定目录名称中所有文件和目录的完整路径。

Here is a one line Pythonic version:

import os
dir = 'given_directory_name'
filenames = [os.path.join(os.path.dirname(os.path.abspath(__file__)),dir,i) for i in os.listdir(dir)]

This code lists the full path of all files and directories in the given directory name.


回答 14

我知道这是一个老问题。如果您使用的是liunx机器,这是我遇到的一种巧妙方法。

import subprocess
print(subprocess.check_output(["ls", "/"]).decode("utf8"))

I know this is an old question. This is a neat way I came across if you are on a liunx machine.

import subprocess
print(subprocess.check_output(["ls", "/"]).decode("utf8"))

回答 15

#import modules
import os

_CURRENT_DIR = '.'


def rec_tree_traverse(curr_dir, indent):
    "recurcive function to traverse the directory"
    #print "[traverse_tree]"

    try :
        dfList = [os.path.join(curr_dir, f_or_d) for f_or_d in os.listdir(curr_dir)]
    except:
        print "wrong path name/directory name"
        return

    for file_or_dir in dfList:

        if os.path.isdir(file_or_dir):
            #print "dir  : ",
            print indent, file_or_dir,"\\"
            rec_tree_traverse(file_or_dir, indent*2)

        if os.path.isfile(file_or_dir):
            #print "file : ",
            print indent, file_or_dir

    #end if for loop
#end of traverse_tree()

def main():

    base_dir = _CURRENT_DIR

    rec_tree_traverse(base_dir," ")

    raw_input("enter any key to exit....")
#end of main()


if __name__ == '__main__':
    main()
#import modules
import os

_CURRENT_DIR = '.'


def rec_tree_traverse(curr_dir, indent):
    "recurcive function to traverse the directory"
    #print "[traverse_tree]"

    try :
        dfList = [os.path.join(curr_dir, f_or_d) for f_or_d in os.listdir(curr_dir)]
    except:
        print "wrong path name/directory name"
        return

    for file_or_dir in dfList:

        if os.path.isdir(file_or_dir):
            #print "dir  : ",
            print indent, file_or_dir,"\\"
            rec_tree_traverse(file_or_dir, indent*2)

        if os.path.isfile(file_or_dir):
            #print "file : ",
            print indent, file_or_dir

    #end if for loop
#end of traverse_tree()

def main():

    base_dir = _CURRENT_DIR

    rec_tree_traverse(base_dir," ")

    raw_input("enter any key to exit....")
#end of main()


if __name__ == '__main__':
    main()

回答 16

仅供参考,添加扩展名或扩展名文件过滤器os

path = '.'
for dirname, dirnames, filenames in os.walk(path):
    # print path to all filenames with extension py.
    for filename in filenames:
        fname_path = os.path.join(dirname, filename)
        fext = os.path.splitext(fname_path)[1]
        if fext == '.py':
            print fname_path
        else:
            continue

FYI Add a filter of extension or ext file import os

path = '.'
for dirname, dirnames, filenames in os.walk(path):
    # print path to all filenames with extension py.
    for filename in filenames:
        fname_path = os.path.join(dirname, filename)
        fext = os.path.splitext(fname_path)[1]
        if fext == '.py':
            print fname_path
        else:
            continue

回答 17

如果知道的话,我会把它扔进去。通配符搜索的简单而肮脏的方法。

import re
import os

[a for a in os.listdir(".") if re.search("^.*\.py$",a)]

If figured I’d throw this in. Simple and dirty way to do wildcard searches.

import re
import os

[a for a in os.listdir(".") if re.search("^.*\.py$",a)]

回答 18

下面的代码将列出目录和目录中的文件

def print_directory_contents(sPath):
        import os                                       
        for sChild in os.listdir(sPath):                
            sChildPath = os.path.join(sPath,sChild)
            if os.path.isdir(sChildPath):
                print_directory_contents(sChildPath)
            else:
                print(sChildPath)

Below code will list directories and the files within the dir

def print_directory_contents(sPath):
        import os                                       
        for sChild in os.listdir(sPath):                
            sChildPath = os.path.join(sPath,sChild)
            if os.path.isdir(sChildPath):
                print_directory_contents(sChildPath)
            else:
                print(sChildPath)

回答 19

和我一起工作的人是上述萨利赫回答的一种修改版本。

代码如下:

“ dir =’given_directory_name’文件名= [os.listdir(dir)中用于i的os.path.abspath(os.path.join(dir,i))]”

The one worked with me is kind of a modified version from Saleh answer above.

The code is as follows:

“dir = ‘given_directory_name’ filenames = [os.path.abspath(os.path.join(dir,i)) for i in os.listdir(dir)]”


从Python中的另一个文件调用函数

问题:从Python中的另一个文件调用函数

设置:我需要在程序中使用每个函数的.py文件。

在此程序中,我需要从外部文件调用该函数。

我试过了:

from file.py import function(a,b)

但是我得到了错误:

ImportError:没有名为“ file.py”的模块;文件不是包

我该如何解决这个问题?

Set_up: I have a .py file for each function I need to use in a program.

In this program, I need to call the function from the external files.

I’ve tried:

from file.py import function(a,b)

But I get the error:

ImportError: No module named ‘file.py’; file is not a package

How do I fix this problem?


回答 0

file.py导入时无需添加任何内容。只需编写from file import function,然后使用调用函数function(a, b)。之所以可能不起作用,是因为它file是Python的核心模块之一,所以我建议您更改文件名。

请注意,如果您尝试将函数从导入a.py到名为的文件中b.py,则需要确保a.pyb.py处于同一目录中。

There isn’t any need to add file.py while importing. Just write from file import function, and then call the function using function(a, b). The reason why this may not work, is because file is one of Python’s core modules, so I suggest you change the name of your file.

Note that if you’re trying to import functions from a.py to a file called b.py, you will need to make sure that a.py and b.py are in the same directory.


回答 1

首先,您不需要.py

如果您有文件a.py并且内部有一些功能:

def b():
  # Something
  return 1

def c():
  # Something
  return 2

而您要导入它们,z.py您必须编写

from a import b, c

First of all you do not need a .py.

If you have a file a.py and inside you have some functions:

def b():
  # Something
  return 1

def c():
  # Something
  return 2

And you want to import them in z.py you have to write

from a import b, c

回答 2

您可以通过2种方式执行此操作。首先只是从file.py导入所需的特定功能。为此使用

from file import function

另一种方法是导入整个文件

import file as fl

然后您可以使用以下命令在file.py中调用任何函数

fl.function(a,b)

You can do this in 2 ways. First is just to import the specific function you want from file.py. To do this use

from file import function

Another way is to import the entire file

import file as fl

Then you can call any function inside file.py using

fl.function(a,b)

回答 3

如果您不能或不想在正在使用的同一目录中使用该函数,也可以从其他目录中调用该函数。您可以通过两种方式来做到这一点(也许还有更多选择,但这是对我有用的选择)。

备选方案1临时更改您的工作目录

import os

os.chdir("**Put here the directory where you have the file with your function**")

from file import function

os.chdir("**Put here the directory where you were working**")

选择2将具有功能的目录添加到sys.path

import sys

sys.path.append("**Put here the directory where you have the file with your function**")

from file import function

You can call the function from a different directory as well, in case you cannot or do not want to have the function in the same directory you are working. You can do this in two ways (perhaps there are more alternatives, but these are the ones that have worked for me).

Alternative 1 Temporarily change your working directory

import os

os.chdir("**Put here the directory where you have the file with your function**")

from file import function

os.chdir("**Put here the directory where you were working**")

Alternative 2 Add the directory where you have your function to sys.path

import sys

sys.path.append("**Put here the directory where you have the file with your function**")

from file import function

回答 4

如果您的文件位于不同的包结构中,并且您想从其他包中调用它,则可以按照以下方式调用它:

假设您在python项目中具有以下包结构:

Python包和文件结构

com.my.func.DifferentFunction-python文件中,您具有一些功能,例如:

def add(arg1, arg2):
    return arg1 + arg2

def sub(arg1, arg2) :
    return arg1 - arg2

def mul(arg1, arg2) :
    return arg1 * arg2

您想从中调用不同的函数Example3.py,然后按照以下方式进行操作:

Example3.py文件中定义导入语句以导入所有功能

from com.my.func.DifferentFunction import *

或定义要导入的每个函数名称

from com.my.func.DifferentFunction import add, sub, mul

然后Example3.py可以调用函数执行:

num1 = 20
num2 = 10

print("\n add : ", add(num1,num2))
print("\n sub : ", sub(num1,num2))
print("\n mul : ", mul(num1,num2))

输出:

 add :  30

 sub :  10

 mul :  200

If your file is in the different package structure and you want to call it from a different package, then you can call it in that fashion:

Let’s say you have following package structure in your python project:

Python package and file structure

in – com.my.func.DifferentFunction python file you have some function, like:

def add(arg1, arg2):
    return arg1 + arg2

def sub(arg1, arg2) :
    return arg1 - arg2

def mul(arg1, arg2) :
    return arg1 * arg2

And you want to call different functions from Example3.py, then following way you can do it:

Define import statement in Example3.py – file for import all function

from com.my.func.DifferentFunction import *

or define each function name which you want to import

from com.my.func.DifferentFunction import add, sub, mul

Then in Example3.py you can call function for execute:

num1 = 20
num2 = 10

print("\n add : ", add(num1,num2))
print("\n sub : ", sub(num1,num2))
print("\n mul : ", mul(num1,num2))

Output:

 add :  30

 sub :  10

 mul :  200

回答 5

遇到了相同的功能,但我必须执行以下操作才能使其正常工作。

如果看到“ ModuleNotFoundError:未命名模块”,则可能需要在文件名前面加点号(。),如下所示;

.file导入功能

Came across the same feature but I had to do the below to make it work.

If you are seeing ‘ModuleNotFoundError: No module named’, you probably need the dot(.) in front of the filename as below;

from .file import funtion


回答 6

首先以.py格式保存文件(例如my_example.py)。如果该文件具有功能,

def xyz():

        --------

        --------

def abc():

        --------

        --------

在调用函数中,您只需要键入以下几行。

文件名:my_example2.py

===========================

import my_example.py


a = my_example.xyz()

b = my_example.abc()

===========================

First save the file in .py format (for example, my_example.py). And if that file have functions,

def xyz():

        --------

        --------

def abc():

        --------

        --------

In the calling function you just have to type the below lines.

file_name: my_example2.py

============================

import my_example.py


a = my_example.xyz()

b = my_example.abc()

============================


回答 7

将模块重命名为“文件”以外的名称。

然后还要确保在调用函数时:

1)如果要导入整个模块,则在调用它时要重申模块名称:

import module
module.function_name()

要么

import pizza
pizza.pizza_function()

2)或如果您要导入特定功能,带别名的功能或所有使用*的功能,则无需重复模块名称:

from pizza import pizza_function
pizza_function()

要么

from pizza import pizza_function as pf
pf()

要么

from pizza import *
pizza_function()

Rename the module to something other than ‘file’.

Then also be sure when you are calling the function that:

1)if you are importing the entire module, you reiterate the module name when calling it:

import module
module.function_name()

or

import pizza
pizza.pizza_function()

2)or if you are importing specific functions, functions with an alias, or all functions using *, you don’t reiterate the module name:

from pizza import pizza_function
pizza_function()

or

from pizza import pizza_function as pf
pf()

or

from pizza import *
pizza_function()

回答 8

.py文件中的函数(可以(当然)可以在不同目录中)可以通过首先写入目录然后输入不带.py扩展名的文件名来简单地导入:

from directory_name.file_name import function_name

后来被使用: function_name()

Functions from .py file (can (of course) be in different directory) can be simply imported by writing directories first and then the file name without .py extension:

from directory_name.file_name import function_name

And later be used: function_name()


回答 9

在MathMethod.Py内部。

def Add(a,b):
   return a+b 

def subtract(a,b):
  return a-b

内部Main.Py

import MathMethod as MM 
  print(MM.Add(200,1000))

输出:1200

Inside MathMethod.Py.

def Add(a,b):
   return a+b 

def subtract(a,b):
  return a-b

Inside Main.Py

import MathMethod as MM 
  print(MM.Add(200,1000))

Output:1200


回答 10

您不必添加file.py

只需将文件与文件导入位置保持在相同位置即可。然后只需导入您的函数:

from file import a, b

You don’t have to add file.py.

Just keep the file in the same location with the file from where you want to import it. Then just import your functions:

from file import a, b

回答 11

您应该将文件与要导入的Python文件放在同一位置。“从文件导入功能”也足够。

You should have the file at the same location as that of the Python files you are trying to import. Also ‘from file import function’ is enough.


回答 12

如果要导入此文件,请在文件名前附加一个点(。),该文件与运行代码的目录相同。

例如,我正在运行一个名为a.py的文件,我想导入一个名为addFun的方法,该方法是用b.py编写的,而b.py在同一目录中

从.b import addFun

append a dot(.) in front of a file name if you want to import this file which is in the same directory where you are running your code.

For example, i’m running a file named a.py and i want to import a method named addFun which is written in b.py, and b.py is there in the same directory

from .b import addFun


回答 13

假设您要调用的文件是anotherfile.py,并且您要调用的方法是method1,然后先导入文件,然后再导入方法

from anotherfile import method1

如果method1是类的一部分,则将该类设为class1,则

from anotherfile import class1

然后创建一个class1对象,假设对象名称是ob1,然后

ob1 = class1()
ob1.method1()

Suppose the file you want to call is anotherfile.py and the method you want to call is method1, then first import the file and then the method

from anotherfile import method1

if method1 is part of a class, let the class be class1, then

from anotherfile import class1

then create an object of class1, suppose the object name is ob1, then

ob1 = class1()
ob1.method1()

回答 14

就我而言,我命名了文件helper.scrap.py,直到更改为helper.py

in my case i named my file helper.scrap.py and couldn’t make it work until i changed to helper.py


如何获取当前正在执行的文件的路径和名称?

问题:如何获取当前正在执行的文件的路径和名称?

我有调用其他脚本文件的脚本,但是我需要获取该进程中当前正在运行的文件的文件路径。

例如,假设我有三个文件。使用execfile

  • script_1.py来电script_2.py
  • 依次script_2.py调用script_3.py

我怎样才能获得的文件名和路径script_3.py从内部代码script_3.py,而无需从传递这些信息作为参数script_2.py

(执行os.getcwd()将返回原始启动脚本的文件路径,而不是当前文件的路径。)

I have scripts calling other script files but I need to get the filepath of the file that is currently running within the process.

For example, let’s say I have three files. Using execfile:

  • script_1.py calls script_2.py.
  • In turn, script_2.py calls script_3.py.

How can I get the file name and path of script_3.py, from code within script_3.py, without having to pass that information as arguments from script_2.py?

(Executing os.getcwd() returns the original starting script’s filepath not the current file’s.)


回答 0

p1.py:

execfile("p2.py")

p2.py:

import inspect, os
print (inspect.getfile(inspect.currentframe()) # script filename (usually with path)
print (os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))) # script directory

p1.py:

execfile("p2.py")

p2.py:

import inspect, os
print (inspect.getfile(inspect.currentframe()) # script filename (usually with path)
print (os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))) # script directory

回答 1

__file__

正如其他人所说。您可能还想使用os.path.realpath消除符号链接:

import os

os.path.realpath(__file__)
__file__

as others have said. You may also want to use os.path.realpath to eliminate symlinks:

import os

os.path.realpath(__file__)

回答 2

更新2018-11-28:

以下是使用Python 2和3进行实验的摘要。

main.py-运行foo.py
foo.py-运行lib / bar.py
lib / bar.py-打印文件路径表达式

| Python | Run statement       | Filepath expression                    |
|--------+---------------------+----------------------------------------|
|      2 | execfile            | os.path.abspath(inspect.stack()[0][1]) |
|      2 | from lib import bar | __file__                               |
|      3 | exec                | (wasn't able to obtain it)             |
|      3 | import lib.bar      | __file__                               |

对于Python 2,切换到软件包以便可以使用更为清晰from lib import bar-只需将空__init__.py文件添加到两个文件夹中即可。

对于Python 3,execfile不存在-最接近的替代方法是exec(open(<filename>).read()),尽管这会影响堆栈框架。使用起来最简单,import foo而且import lib.bar-无需__init__.py文件。

另请参见import和execfile之间的区别


原始答案:

这是基于该线程答案的实验-Windows上的Python 2.7.10。

基于堆栈的堆栈似乎是唯一可以提供可靠结果的堆栈。后两个语法最短,即-

print os.path.abspath(inspect.stack()[0][1])                   # C:\filepaths\lib\bar.py
print os.path.dirname(os.path.abspath(inspect.stack()[0][1]))  # C:\filepaths\lib

这些是作为功​​能添加到sys中的!归功于@Usagi和@pablog

基于以下三个文件,并从其文件夹运行main.py python main.py(也尝试了具有绝对路径并从另一个文件夹调用的execfile)。

C:\ filepaths \ main.py:execfile('foo.py')
C:\ filepaths \ foo.py:execfile('lib/bar.py')
C:\ filepaths \ lib \ bar.py:

import sys
import os
import inspect

print "Python " + sys.version
print

print __file__                                        # main.py
print sys.argv[0]                                     # main.py
print inspect.stack()[0][1]                           # lib/bar.py
print sys.path[0]                                     # C:\filepaths
print

print os.path.realpath(__file__)                      # C:\filepaths\main.py
print os.path.abspath(__file__)                       # C:\filepaths\main.py
print os.path.basename(__file__)                      # main.py
print os.path.basename(os.path.realpath(sys.argv[0])) # main.py
print

print sys.path[0]                                     # C:\filepaths
print os.path.abspath(os.path.split(sys.argv[0])[0])  # C:\filepaths
print os.path.dirname(os.path.abspath(__file__))      # C:\filepaths
print os.path.dirname(os.path.realpath(sys.argv[0]))  # C:\filepaths
print os.path.dirname(__file__)                       # (empty string)
print

print inspect.getfile(inspect.currentframe())         # lib/bar.py

print os.path.abspath(inspect.getfile(inspect.currentframe())) # C:\filepaths\lib\bar.py
print os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))) # C:\filepaths\lib
print

print os.path.abspath(inspect.stack()[0][1])          # C:\filepaths\lib\bar.py
print os.path.dirname(os.path.abspath(inspect.stack()[0][1]))  # C:\filepaths\lib
print

Update 2018-11-28:

Here is a summary of experiments with Python 2 and 3. With

main.py – runs foo.py
foo.py – runs lib/bar.py
lib/bar.py – prints filepath expressions

| Python | Run statement       | Filepath expression                    |
|--------+---------------------+----------------------------------------|
|      2 | execfile            | os.path.abspath(inspect.stack()[0][1]) |
|      2 | from lib import bar | __file__                               |
|      3 | exec                | (wasn't able to obtain it)             |
|      3 | import lib.bar      | __file__                               |

For Python 2, it might be clearer to switch to packages so can use from lib import bar – just add empty __init__.py files to the two folders.

For Python 3, execfile doesn’t exist – the nearest alternative is exec(open(<filename>).read()), though this affects the stack frames. It’s simplest to just use import foo and import lib.bar – no __init__.py files needed.

See also Difference between import and execfile


Original Answer:

Here is an experiment based on the answers in this thread – with Python 2.7.10 on Windows.

The stack-based ones are the only ones that seem to give reliable results. The last two have the shortest syntax, i.e. –

print os.path.abspath(inspect.stack()[0][1])                   # C:\filepaths\lib\bar.py
print os.path.dirname(os.path.abspath(inspect.stack()[0][1]))  # C:\filepaths\lib

Here’s to these being added to sys as functions! Credit to @Usagi and @pablog

Based on the following three files, and running main.py from its folder with python main.py (also tried execfiles with absolute paths and calling from a separate folder).

C:\filepaths\main.py: execfile('foo.py')
C:\filepaths\foo.py: execfile('lib/bar.py')
C:\filepaths\lib\bar.py:

import sys
import os
import inspect

print "Python " + sys.version
print

print __file__                                        # main.py
print sys.argv[0]                                     # main.py
print inspect.stack()[0][1]                           # lib/bar.py
print sys.path[0]                                     # C:\filepaths
print

print os.path.realpath(__file__)                      # C:\filepaths\main.py
print os.path.abspath(__file__)                       # C:\filepaths\main.py
print os.path.basename(__file__)                      # main.py
print os.path.basename(os.path.realpath(sys.argv[0])) # main.py
print

print sys.path[0]                                     # C:\filepaths
print os.path.abspath(os.path.split(sys.argv[0])[0])  # C:\filepaths
print os.path.dirname(os.path.abspath(__file__))      # C:\filepaths
print os.path.dirname(os.path.realpath(sys.argv[0]))  # C:\filepaths
print os.path.dirname(__file__)                       # (empty string)
print

print inspect.getfile(inspect.currentframe())         # lib/bar.py

print os.path.abspath(inspect.getfile(inspect.currentframe())) # C:\filepaths\lib\bar.py
print os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))) # C:\filepaths\lib
print

print os.path.abspath(inspect.stack()[0][1])          # C:\filepaths\lib\bar.py
print os.path.dirname(os.path.abspath(inspect.stack()[0][1]))  # C:\filepaths\lib
print

回答 3

我认为这更干净:

import inspect
print inspect.stack()[0][1]

并获得与以下信息相同的信息:

print inspect.getfile(inspect.currentframe())

其中[0]是堆栈中的当前帧(堆栈的顶部),[1]是文件名,请增加以在堆栈中向后移动,即

print inspect.stack()[1][1]

将是调用当前框架的脚本的文件名。另外,使用[-1]将使您到达堆栈的底部,即原始调用脚本。

I think this is cleaner:

import inspect
print inspect.stack()[0][1]

and gets the same information as:

print inspect.getfile(inspect.currentframe())

Where [0] is the current frame in the stack (top of stack) and [1] is for the file name, increase to go backwards in the stack i.e.

print inspect.stack()[1][1]

would be the file name of the script that called the current frame. Also, using [-1] will get you to the bottom of the stack, the original calling script.


回答 4

import os
os.path.dirname(__file__) # relative directory path
os.path.abspath(__file__) # absolute file path
os.path.basename(__file__) # the file name only
import os
os.path.dirname(__file__) # relative directory path
os.path.abspath(__file__) # absolute file path
os.path.basename(__file__) # the file name only

回答 5

如果您的脚本仅包含一个文件,则标记为“最佳”的建议都是正确的。

如果要从可能作为模块导入的文件中找出可执行文件的名称(即,传递给当前程序的python解释器的根文件),则需要执行此操作(假设此文件位于文件中)名为foo.py):

import inspect

print inspect.stack()[-1][1]

因为[-1]堆栈上的最后一件事()是进入堆栈的第一件事(堆栈是LIFO / FILO数据结构)。

然后在文件bar.py中,如果您import foo将输出bar.py,而不是foo.py,它将是所有这些值:

  • __file__
  • inspect.getfile(inspect.currentframe())
  • inspect.stack()[0][1]

The suggestions marked as best are all true if your script consists of only one file.

If you want to find out the name of the executable (i.e. the root file passed to the python interpreter for the current program) from a file that may be imported as a module, you need to do this (let’s assume this is in a file named foo.py):

import inspect

print inspect.stack()[-1][1]

Because the last thing ([-1]) on the stack is the first thing that went into it (stacks are LIFO/FILO data structures).

Then in file bar.py if you import foo it’ll print bar.py, rather than foo.py, which would be the value of all of these:

  • __file__
  • inspect.getfile(inspect.currentframe())
  • inspect.stack()[0][1]

回答 6

import os
print os.path.basename(__file__)

这只会给我们文件名。即如果文件的绝对路径为c:\ abcd \ abc.py,则第二行将打印abc.py

import os
print os.path.basename(__file__)

this will give us the filename only. i.e. if abspath of file is c:\abcd\abc.py then 2nd line will print abc.py


回答 7

您还不清楚“进程中当前正在运行的文件的文件路径”是什么意思。 sys.argv[0]通常包含由Python解释器调用的脚本的位置。查看sys文档以获取更多详细信息。

正如@Tim和@Pat Notz指出的那样,__file__属性提供了对

从模块加载文件的文件(如果从文件加载的文件)

It’s not entirely clear what you mean by “the filepath of the file that is currently running within the process”. sys.argv[0] usually contains the location of the script that was invoked by the Python interpreter. Check the sys documentation for more details.

As @Tim and @Pat Notz have pointed out, the __file__ attribute provides access to

the file from which the module was loaded, if it was loaded from a file


回答 8

我有一个必须在Windows环境下工作的脚本。这段代码是我完成的:

import os,sys
PROJECT_PATH = os.path.abspath(os.path.split(sys.argv[0])[0])

这是一个不明智的决定。但这不需要外部库,这对我来说是最重要的。

I have a script that must work under windows environment. This code snipped is what I’ve finished with:

import os,sys
PROJECT_PATH = os.path.abspath(os.path.split(sys.argv[0])[0])

it’s quite a hacky decision. But it requires no external libraries and it’s the most important thing in my case.


回答 9

尝试这个,

import os
os.path.dirname(os.path.realpath(__file__))

Try this,

import os
os.path.dirname(os.path.realpath(__file__))

回答 10

import os
os.path.dirname(os.path.abspath(__file__))

无需检查或任何其他库。

当我必须导入脚本(从与执行脚本所在的目录不同的目录)导入脚本时,此方法对我有用,该脚本使用与导入脚本位于同一文件夹中的配置文件。

import os
os.path.dirname(os.path.abspath(__file__))

No need for inspect or any other library.

This worked for me when I had to import a script (from a different directory then the executed script), that used a configuration file residing in the same folder as the imported script.


回答 11

__file__属性适用于包含主要执行代码的文件以及导入的模块。

参见https://web.archive.org/web/20090918095828/http://pyref.infogami.com/__file__

The __file__ attribute works for both the file containing the main execution code as well as imported modules.

See https://web.archive.org/web/20090918095828/http://pyref.infogami.com/__file__


回答 12

import sys

print sys.path[0]

这将打印当前正在执行的脚本的路径

import sys

print sys.path[0]

this would print the path of the currently executing script


回答 13

我认为这__file__ 听起来像您可能还想签出inspect模块

I think it’s just __file__ Sounds like you may also want to checkout the inspect module.


回答 14

您可以使用 inspect.stack()

import inspect,os
inspect.stack()[0]  => (<frame object at 0x00AC2AC0>, 'g:\\Python\\Test\\_GetCurrentProgram.py', 15, '<module>', ['print inspect.stack()[0]\n'], 0)
os.path.abspath (inspect.stack()[0][1]) => 'g:\\Python\\Test\\_GetCurrentProgram.py'

You can use inspect.stack()

import inspect,os
inspect.stack()[0]  => (<frame object at 0x00AC2AC0>, 'g:\\Python\\Test\\_GetCurrentProgram.py', 15, '<module>', ['print inspect.stack()[0]\n'], 0)
os.path.abspath (inspect.stack()[0][1]) => 'g:\\Python\\Test\\_GetCurrentProgram.py'

回答 15

由于Python 3相当主流,因此我想提供一个pathlib答案,因为我认为它现在可能是访问文件和路径信息的更好工具。

from pathlib import Path

current_file: Path = Path(__file__).resolve()

如果要查找当前文件的目录,则只需添加.parent以下Path()语句即可:

current_path: Path = Path(__file__).parent.resolve()

Since Python 3 is fairly mainstream, I wanted to include a pathlib answer, as I believe that it is probably now a better tool for accessing file and path information.

from pathlib import Path

current_file: Path = Path(__file__).resolve()

If you are seeking the directory of the current file, it is as easy as adding .parent to the Path() statement:

current_path: Path = Path(__file__).parent.resolve()

回答 16

import sys
print sys.argv[0]
import sys
print sys.argv[0]

回答 17

这应该工作:

import os,sys
filename=os.path.basename(os.path.realpath(sys.argv[0]))
dirname=os.path.dirname(os.path.realpath(sys.argv[0]))

This should work:

import os,sys
filename=os.path.basename(os.path.realpath(sys.argv[0]))
dirname=os.path.dirname(os.path.realpath(sys.argv[0]))

回答 18

print(__file__)
print(__import__("pathlib").Path(__file__).parent)
print(__file__)
print(__import__("pathlib").Path(__file__).parent)

回答 19

获取执行脚本的目录

 print os.path.dirname( inspect.getfile(inspect.currentframe()))

To get directory of executing script

 print os.path.dirname( inspect.getfile(inspect.currentframe()))

回答 20

我一直只使用“当前工作目录”或CWD的os功能。这是标准库的一部分,非常容易实现。这是一个例子:

    import os
    base_directory = os.getcwd()

I have always just used the os feature of Current Working Directory, or CWD. This is part of the standard library, and is very easy to implement. Here is an example:

    import os
    base_directory = os.getcwd()

回答 21

我在__file__中使用了此方法,
os.path.abspath(__file__)
但是有一个小技巧,它在第一次运行代码时返回.py文件,下一次运行给出* .pyc文件的名称,
所以我使用:
inspect.getfile(inspect.currentframe())

sys._getframe().f_code.co_filename

I used the approach with __file__
os.path.abspath(__file__)
but there is a little trick, it returns the .py file when the code is run the first time, next runs give the name of *.pyc file
so I stayed with:
inspect.getfile(inspect.currentframe())
or
sys._getframe().f_code.co_filename


回答 22

我编写了一个函数,该函数考虑了Eclipse 调试器unittest。它返回您启动的第一个脚本的文件夹。您可以选择指定__file__ var,但主要的事情是您不必在所有调用层次结构中共享此变量

也许您可以处理其他我没看到的特殊情况,但是对我来说还可以。

import inspect, os
def getRootDirectory(_file_=None):
    """
    Get the directory of the root execution file
    Can help: http://stackoverflow.com/questions/50499/how-do-i-get-the-path-and-name-of-the-file-that-is-currently-executing
    For eclipse user with unittest or debugger, the function search for the correct folder in the stack
    You can pass __file__ (with 4 underscores) if you want the caller directory
    """
    # If we don't have the __file__ :
    if _file_ is None:
        # We get the last :
        rootFile = inspect.stack()[-1][1]
        folder = os.path.abspath(rootFile)
        # If we use unittest :
        if ("/pysrc" in folder) & ("org.python.pydev" in folder):
            previous = None
            # We search from left to right the case.py :
            for el in inspect.stack():
                currentFile = os.path.abspath(el[1])
                if ("unittest/case.py" in currentFile) | ("org.python.pydev" in currentFile):
                    break
                previous = currentFile
            folder = previous
        # We return the folder :
        return os.path.dirname(folder)
    else:
        # We return the folder according to specified __file__ :
        return os.path.dirname(os.path.realpath(_file_))

I wrote a function which take into account eclipse debugger and unittest. It return the folder of the first script you launch. You can optionally specify the __file__ var, but the main thing is that you don’t have to share this variable across all your calling hierarchy.

Maybe you can handle others stack particular cases I didn’t see, but for me it’s ok.

import inspect, os
def getRootDirectory(_file_=None):
    """
    Get the directory of the root execution file
    Can help: http://stackoverflow.com/questions/50499/how-do-i-get-the-path-and-name-of-the-file-that-is-currently-executing
    For eclipse user with unittest or debugger, the function search for the correct folder in the stack
    You can pass __file__ (with 4 underscores) if you want the caller directory
    """
    # If we don't have the __file__ :
    if _file_ is None:
        # We get the last :
        rootFile = inspect.stack()[-1][1]
        folder = os.path.abspath(rootFile)
        # If we use unittest :
        if ("/pysrc" in folder) & ("org.python.pydev" in folder):
            previous = None
            # We search from left to right the case.py :
            for el in inspect.stack():
                currentFile = os.path.abspath(el[1])
                if ("unittest/case.py" in currentFile) | ("org.python.pydev" in currentFile):
                    break
                previous = currentFile
            folder = previous
        # We return the folder :
        return os.path.dirname(folder)
    else:
        # We return the folder according to specified __file__ :
        return os.path.dirname(os.path.realpath(_file_))

回答 23

要保持跨平台(macOS / Windows / Linux)的迁移一致性,请尝试:

path = r'%s' % os.getcwd().replace('\\','/')

To keep the migration consistency across platforms (macOS/Windows/Linux), try:

path = r'%s' % os.getcwd().replace('\\','/')


回答 24

最简单的方法是:

script_1.py中:

import subprocess
subprocess.call(['python3',<path_to_script_2.py>])

script_2.py中:

sys.argv[0]

PS:我已经尝试过execfile,但是由于它以字符串形式读取script_2.py,所以sys.argv[0]返回了<string>

Simplest way is:

in script_1.py:

import subprocess
subprocess.call(['python3',<path_to_script_2.py>])

in script_2.py:

sys.argv[0]

P.S.: I’ve tried execfile, but since it reads script_2.py as a string, sys.argv[0] returned <string>.


回答 25

这是我所使用的,因此我可以将我的代码无处不在。__name__总是被定义,但是__file__仅当代码作为文件运行时才被定义(例如,不在IDLE / iPython中)。

    if '__file__' in globals():
        self_name = globals()['__file__']
    elif '__file__' in locals():
        self_name = locals()['__file__']
    else:
        self_name = __name__

或者,可以这样写:

self_name = globals().get('__file__', locals().get('__file__', __name__))

Here is what I use so I can throw my code anywhere without issue. __name__ is always defined, but __file__ is only defined when the code is run as a file (e.g. not in IDLE/iPython).

    if '__file__' in globals():
        self_name = globals()['__file__']
    elif '__file__' in locals():
        self_name = locals()['__file__']
    else:
        self_name = __name__

Alternatively, this can be written as:

self_name = globals().get('__file__', locals().get('__file__', __name__))

回答 26

这些答案大多数都是用Python 2.x或更早版本编写的。在Python 3.x中,print函数的语法已更改为需要括号,即print()。

因此,Python 2.x中来自user13993的较早的高分答案:

import inspect, os
print inspect.getfile(inspect.currentframe()) # script filename (usually with path)
print os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))) # script directory

在Python 3.x中成为:

import inspect, os
print(inspect.getfile(inspect.currentframe())) # script filename (usually with path)
print(os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))) ) # script directory

Most of these answers were written in Python version 2.x or earlier. In Python 3.x the syntax for the print function has changed to require parentheses, i.e. print().

So, this earlier high score answer from user13993 in Python 2.x:

import inspect, os
print inspect.getfile(inspect.currentframe()) # script filename (usually with path)
print os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))) # script directory

Becomes in Python 3.x:

import inspect, os
print(inspect.getfile(inspect.currentframe())) # script filename (usually with path)
print(os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))) ) # script directory

回答 27

如果您只想要文件名而没有,./或者.py可以尝试此操作

filename = testscript.py
file_name = __file__[2:-3]

file_name 将打印测试脚本,您可以通过更改[]中的索引来生成所需的任何内容

if you want just the filename without ./ or .py you can try this

filename = testscript.py
file_name = __file__[2:-3]

file_name will print testscript you can generate whatever you want by changing the index inside []


回答 28

import os

import wx


# return the full path of this file
print(os.getcwd())

icon = wx.Icon(os.getcwd() + '/img/image.png', wx.BITMAP_TYPE_PNG, 16, 16)

# put the icon on the frame
self.SetIcon(icon)
import os

import wx


# return the full path of this file
print(os.getcwd())

icon = wx.Icon(os.getcwd() + '/img/image.png', wx.BITMAP_TYPE_PNG, 16, 16)

# put the icon on the frame
self.SetIcon(icon)