标签归档:logging

在Django中优雅地设置Python日志记录

问题:在Django中优雅地设置Python日志记录

我还没有找到对自己满意的Django设置Python日志记录的方法。我的要求很简单:

  • 不同事件的不同日志处理程序-也就是说,我希望能够登录到不同的文件
  • 轻松访问我模块中的记录器。该模块应该可以轻松找到其记录器。
  • 应该容易适用于命令行模块。系统的一部分是独立的命令行或守护进程。这些模块应易于使用日志记录。

我当前的设置是使用logging.conf文件,然后在我登录的每个模块中记录设置。感觉不对。

您是否有喜欢的日志记录设置?请详细说明:如何设置配置(logging.conf在代码中使用或设置),在何处/何时启动记录器,以及如何在模块中访问它们等等。

I have yet to find a way of setting up Python logging with Django that I’m happy with. My requirements are fairly simple:

  • Different log handlers for different events – that is, I want to be able to log to different files
  • Easy access to loggers in my modules. The module should be able to find its logger with little effort.
  • Should be easily applicable to command-line modules. Parts of the system are stand-alone command line or daemon processes. Logging should be easily usable with these modules.

My current setup is to use a logging.conf file and setup logging in each module I log from. It doesn’t feel right.

Do you have a logging setup that you like? Please detail it: how do you setup the configuration (do you use logging.conf or set it up in code), where/when do you initiate the loggers, and how do you get access to them in your modules, etc.


回答 0

到目前为止,我发现的最好方法是在settings.py中初始化日志记录设置-其他地方。您可以使用配置文件,也可以分步进行编程-这仅取决于您的要求。关键是我通常使用级别和有时记录日志的方式将我想要的处理程序添加到根记录器中。过滤器将我想要的事件获取到适当的文件,控制台,系统日志等。您当然可以将处理程序添加到任何其他记录器中同样,但根据我的经验,通常不需要这样做。

在每个模块中,我使用

logger = logging.getLogger(__name__)

并将其用于记录模块中的事件(并且,如果我想进一步区分),请使用记录器,该记录器是上面创建的记录器的子级。

如果我的应用程序可能会在未配置settings.py中的日志记录的站点中使用,请在以下位置定义NullHandler:

#someutils.py

class NullHandler(logging.Handler):
    def emit(self, record):
        pass

null_handler = NullHandler()

并确保将其实例添加到我的使用日志记录的应用程序的模块中创建的所有记录器中。(注意:NullHandler已经在Python 3.1的日志记录包中,并且将在Python 2.7中使用。)

logger = logging.getLogger(__name__)
logger.addHandler(someutils.null_handler)

这样做是为了确保您的模块在未配置settings.py中的日志记录的站点中正常运行,并且不会收到任何令人讨厌的“找不到记录器XYZ处理程序”的消息(这是有关潜在警告的警告)错误配置的日志记录)。

通过这种方式可以满足您规定的要求:

  • 您可以像当前一样为不同的事件设置不同的日志处理程序。
  • 轻松访问模块中的记录器-使用getLogger(__name__)
  • 容易适用于命令行模块-它们也可以导入settings.py

更新:请注意,从1.3版开始,Django现在合并了对logging的支持

The best way I’ve found so far is to initialize logging setup in settings.py – nowhere else. You can either use a configuration file or do it programmatically step-by-step – it just depends on your requirements. The key thing is that I usually add the handlers I want to the root logger, using levels and sometimes logging.Filters to get the events I want to the appropriate files, console, syslogs etc. You can of course add handlers to any other loggers too, but there isn’t commonly a need for this in my experience.

In each module, I define a logger using

logger = logging.getLogger(__name__)

and use that for logging events in the module (and, if I want to differentiate further) use a logger which is a child of the logger created above.

If my app is going to be potentially used in a site which doesn’t configure logging in settings.py, I define a NullHandler somewhere as follows:

#someutils.py

class NullHandler(logging.Handler):
    def emit(self, record):
        pass

null_handler = NullHandler()

and ensure that an instance of it is added to all loggers created in the modules in my apps which use logging. (Note: NullHandler is already in the logging package for Python 3.1, and will be in Python 2.7.) So:

logger = logging.getLogger(__name__)
logger.addHandler(someutils.null_handler)

This is done to ensure that your modules play nicely in a site which doesn’t configure logging in settings.py, and that you don’t get any annoying “No handlers could be found for logger X.Y.Z” messages (which are warnings about potentially misconfigured logging).

Doing it this way meets your stated requirements:

  • You can set up different log handlers for different events, as you currently do.
  • Easy access to loggers in your modules – use getLogger(__name__).
  • Easily applicable to command-line modules – they also import settings.py.

Update: Note that as of version 1.3, Django now incorporates support for logging.


回答 1

我知道这已经是一个解决的答案,但是按照django> = 1.3,有一个新的日志记录设置。

从旧到新并不是自动的,所以我想在这里写下来。

当然,请查看django文档以了解更多信息。

这是基本的配置文件,默认情况下是使用django-admin createproject v1.3创建的-里程可能会随着最新的django版本而变化:

LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'handlers': {
        'mail_admins': {
            'level': 'ERROR',
            'class': 'django.utils.log.AdminEmailHandler',
        }
    },
    'loggers': {
        'django.request': {
            'handlers': ['mail_admins'],
            'level': 'ERROR',
            'propagate': True,
        }
    }
}

该结构基于标准的Python日志dictConfig,该命令规定了以下块:

  • formatters -相应的值将是一个dict,其中每个键是一个格式化程序ID,每个值是一个描述如何配置相应的Formatter实例的dict。
  • filters -相应的值将是一个dict,其中每个键是一个过滤器ID,每个值是一个描述如何配置相应的Filter实例的dict。
  • handlers-相应的值将是一个dict,其中每个键是一个处理程序ID,每个值是一个描述如何配置相应的Handler实例的dict。每个处理程序具有以下键:

    • class(必填)。这是处理程序类的完全限定名称。
    • level(可选的)。处理程序的级别。
    • formatter(可选的)。此处理程序的格式化程序的ID。
    • filters(可选的)。此处理程序的过滤器ID的列表。

我通常至少这样做:

  • 添加一个.log文件
  • 配置我的应用程序以写入此日志

转换为:

LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'verbose': {
            'format': '%(levelname)s %(asctime)s %(module)s %(process)d %(thread)d %(message)s'
        },
        'simple': {
            'format': '%(levelname)s %(message)s'
        },
    },
    'filters': {
        'require_debug_false': {
            '()': 'django.utils.log.RequireDebugFalse'
        }
    },
    'handlers': {
        'null': {
            'level':'DEBUG',
            'class':'django.utils.log.NullHandler',
        },
        'console':{
            'level': 'DEBUG',
            'class': 'logging.StreamHandler',
            'formatter': 'simple'
        },
        # I always add this handler to facilitate separating loggings
        'log_file':{
            'level': 'DEBUG',
            'class': 'logging.handlers.RotatingFileHandler',
            'filename': os.path.join(VAR_ROOT, 'logs/django.log'),
            'maxBytes': '16777216', # 16megabytes
            'formatter': 'verbose'
        },
        'mail_admins': {
            'level': 'ERROR',
            'filters': ['require_debug_false'],
            'class': 'django.utils.log.AdminEmailHandler',
            'include_html': True,
        }
    },
    'loggers': {
        'django.request': {
            'handlers': ['mail_admins'],
            'level': 'ERROR',
            'propagate': True,
        },
        'apps': { # I keep all my of apps under 'apps' folder, but you can also add them one by one, and this depends on how your virtualenv/paths are set
            'handlers': ['log_file'],
            'level': 'INFO',
            'propagate': True,
        },
    },
    # you can also shortcut 'loggers' and just configure logging for EVERYTHING at once
    'root': {
        'handlers': ['console', 'mail_admins'],
        'level': 'INFO'
    },
}

编辑

请参阅现在始终记录请求异常故障单#16288

我更新了上面的示例conf,以明确包含针对mail_admins的正确过滤器,以便默认情况下,当debug为True时,不发送电子邮件。

您应该添加一个过滤器:

'filters': {
    'require_debug_false': {
        '()': 'django.utils.log.RequireDebugFalse'
    }
},

并将其应用于mail_admins处理程序:

    'mail_admins': {
        'level': 'ERROR',
        'filters': ['require_debug_false'],
        'class': 'django.utils.log.AdminEmailHandler',
        'include_html': True,
    }

否则,django.core.handers.base.handle_uncaught_exception如果settings.DEBUG为True,则不会将错误传递给django.request记录器。

如果您在Django 1.5中不执行此操作,则会得到一个

DeprecationWarning:您在’mail_admins’日志记录处理程序上未定义过滤器:添加隐式的debug-false-only过滤器

但是在django 1.4和django 1.5中,一切仍然可以正常工作。

**结束编辑**

该conf受到django doc中示例conf的强烈启发,但添加了日志文件部分。

我经常还会执行以下操作:

LOG_LEVEL = 'DEBUG' if DEBUG else 'INFO'

...
    'level': LOG_LEVEL
...

然后在我的python代码中,我总是添加一个NullHandler,以防万一没有定义任何日志配置。这样可以避免未指定任何处理程序的警告。对于不一定只在Django(ref)中调用的库特别有用

import logging
# Get an instance of a logger
logger = logging.getLogger(__name__)
class NullHandler(logging.Handler): #exists in python 3.1
    def emit(self, record):
        pass
nullhandler = logger.addHandler(NullHandler())

# here you can also add some local logger should you want: to stdout with streamhandler, or to a local file...

[…]

logger.warning('etc.etc.')

希望这可以帮助!

I know this is a solved answer already, but as per django >= 1.3 there’s a new logging setting.

Moving from old to new is not automatic, so I thought i’ll write it down here.

And of course checkout the django doc for some more.

This is the basic conf, created by default with django-admin createproject v1.3 – mileage might change with latest django versions:

LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'handlers': {
        'mail_admins': {
            'level': 'ERROR',
            'class': 'django.utils.log.AdminEmailHandler',
        }
    },
    'loggers': {
        'django.request': {
            'handlers': ['mail_admins'],
            'level': 'ERROR',
            'propagate': True,
        }
    }
}

This structure is based upon the standard Python logging dictConfig, that dictates the following blocks:

  • formatters – the corresponding value will be a dict in which each key is a formatter id and each value is a dict describing how to configure the corresponding Formatter instance.
  • filters – the corresponding value will be a dict in which each key is a filter id and each value is a dict describing how to configure the corresponding Filter instance.
  • handlers – the corresponding value will be a dict in which each key is a handler id and each value is a dict describing how to configure the corresponding Handler instance. Each handler has the following keys:

    • class (mandatory). This is the fully qualified name of the handler class.
    • level (optional). The level of the handler.
    • formatter (optional). The id of the formatter for this handler.
    • filters (optional). A list of ids of the filters for this handler.

I usually do at least this:

  • add a .log file
  • configure my apps to write to this log

Which translates into:

LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'verbose': {
            'format': '%(levelname)s %(asctime)s %(module)s %(process)d %(thread)d %(message)s'
        },
        'simple': {
            'format': '%(levelname)s %(message)s'
        },
    },
    'filters': {
        'require_debug_false': {
            '()': 'django.utils.log.RequireDebugFalse'
        }
    },
    'handlers': {
        'null': {
            'level':'DEBUG',
            'class':'django.utils.log.NullHandler',
        },
        'console':{
            'level': 'DEBUG',
            'class': 'logging.StreamHandler',
            'formatter': 'simple'
        },
        # I always add this handler to facilitate separating loggings
        'log_file':{
            'level': 'DEBUG',
            'class': 'logging.handlers.RotatingFileHandler',
            'filename': os.path.join(VAR_ROOT, 'logs/django.log'),
            'maxBytes': '16777216', # 16megabytes
            'formatter': 'verbose'
        },
        'mail_admins': {
            'level': 'ERROR',
            'filters': ['require_debug_false'],
            'class': 'django.utils.log.AdminEmailHandler',
            'include_html': True,
        }
    },
    'loggers': {
        'django.request': {
            'handlers': ['mail_admins'],
            'level': 'ERROR',
            'propagate': True,
        },
        'apps': { # I keep all my of apps under 'apps' folder, but you can also add them one by one, and this depends on how your virtualenv/paths are set
            'handlers': ['log_file'],
            'level': 'INFO',
            'propagate': True,
        },
    },
    # you can also shortcut 'loggers' and just configure logging for EVERYTHING at once
    'root': {
        'handlers': ['console', 'mail_admins'],
        'level': 'INFO'
    },
}

edit

See request exceptions are now always logged and Ticket #16288:

I updated the above sample conf to explicitly include the correct filter for mail_admins so that, by default, emails are not sent when debug is True.

You should add a filter:

'filters': {
    'require_debug_false': {
        '()': 'django.utils.log.RequireDebugFalse'
    }
},

and apply it to the mail_admins handler:

    'mail_admins': {
        'level': 'ERROR',
        'filters': ['require_debug_false'],
        'class': 'django.utils.log.AdminEmailHandler',
        'include_html': True,
    }

Otherwise the django.core.handers.base.handle_uncaught_exception doesn’t pass errors to the ‘django.request’ logger if settings.DEBUG is True.

If you don’t do this in Django 1.5 you’ll get a

DeprecationWarning: You have no filters defined on the ‘mail_admins’ logging handler: adding implicit debug-false-only filter

but things will still work correctly BOTH in django 1.4 and django 1.5.

** end edit **

That conf is strongly inspired by the sample conf in the django doc, but adding the log file part.

I often also do the following:

LOG_LEVEL = 'DEBUG' if DEBUG else 'INFO'

...
    'level': LOG_LEVEL
...

Then in my python code I always add a NullHandler in case no logging conf is defined whatsoever. This avoid warnings for no Handler specified. Especially useful for libs that are not necessarily called only in Django (ref)

import logging
# Get an instance of a logger
logger = logging.getLogger(__name__)
class NullHandler(logging.Handler): #exists in python 3.1
    def emit(self, record):
        pass
nullhandler = logger.addHandler(NullHandler())

# here you can also add some local logger should you want: to stdout with streamhandler, or to a local file...

[…]

logger.warning('etc.etc.')

Hope this helps!


回答 2

我们urls.py使用logging.ini文件初始化顶级日志记录。

的位置在logging.ini中提供settings.py,仅此而已。

然后每个模块都执行

logger = logging.getLogger(__name__)

为了区分测试,开发和生产实例,我们有不同的logging.ini文件。在大多数情况下,我们有一个“控制台日志”,该日志仅发送到带有错误的stderr。我们有一个“应用程序日志”,它使用常规的滚动日志文件进入日志目录。

We initialize logging in the top-level urls.py by using a logging.ini file.

The location of the logging.ini is provided in settings.py, but that’s all.

Each module then does

logger = logging.getLogger(__name__)

To distinguish testing, development and production instances, we have different logging.ini files. For the most part, we have a “console log” that goes to stderr with Errors only. We have an “application log” that uses a regular rolling log file that goes to a logs directory.


回答 3

我目前正在使用自己创建的日志系统。它使用CSV格式进行记录。

django-csvlog

该项目仍然没有完整的文档,但是我正在研究它。

I am currently using a logging system, which I created myself. It uses CSV format for logging.

django-csvlog

This project still doesn’t have full documentation, but I am working on it.


日志消息使用Python Logging出现两次

问题:日志消息使用Python Logging出现两次

我正在使用Python日志记录,由于某种原因,我的所有消息都出现了两次。

我有一个配置日志记录的模块:

# BUG: It's outputting logging messages twice - not sure why - it's not the propagate setting.
def configure_logging(self, logging_file):
    self.logger = logging.getLogger("my_logger")
    self.logger.setLevel(logging.DEBUG)
    self.logger.propagate = 0
    # Format for our loglines
    formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
    # Setup console logging
    ch = logging.StreamHandler()
    ch.setLevel(logging.DEBUG)
    ch.setFormatter(formatter)
    self.logger.addHandler(ch)
    # Setup file logging as well
    fh = logging.FileHandler(LOG_FILENAME)
    fh.setLevel(logging.DEBUG)
    fh.setFormatter(formatter)
    self.logger.addHandler(fh)

稍后,我调用此方法来配置日志记录:

if __name__ == '__main__':
    tom = Boy()
    tom.configure_logging(LOG_FILENAME)
    tom.buy_ham()

然后,在buy_ham模块中,我将调用:

self.logger.info('Successfully able to write to %s' % path)

由于某种原因,所有消息都出现两次。我注释掉其中一个流处理程序,还是一样。有点奇怪,不确定为什么会这样…大声笑。假设我错过了一些显而易见的事情。

干杯,维克多

I’m using Python logging, and for some reason, all of my messages are appearing twice.

I have a module to configure logging:

# BUG: It's outputting logging messages twice - not sure why - it's not the propagate setting.
def configure_logging(self, logging_file):
    self.logger = logging.getLogger("my_logger")
    self.logger.setLevel(logging.DEBUG)
    self.logger.propagate = 0
    # Format for our loglines
    formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
    # Setup console logging
    ch = logging.StreamHandler()
    ch.setLevel(logging.DEBUG)
    ch.setFormatter(formatter)
    self.logger.addHandler(ch)
    # Setup file logging as well
    fh = logging.FileHandler(LOG_FILENAME)
    fh.setLevel(logging.DEBUG)
    fh.setFormatter(formatter)
    self.logger.addHandler(fh)

Later on, I call this method to configure logging:

if __name__ == '__main__':
    tom = Boy()
    tom.configure_logging(LOG_FILENAME)
    tom.buy_ham()

And then within say, the buy_ham module, I’d call:

self.logger.info('Successfully able to write to %s' % path)

And for some reason, all the messages are appearing twice. I commented out one of the stream handlers, still the same thing. Bit of a weird one, not sure why this is happening…lol. Assuming I’ve missed something obvious.

Cheers, Victor


回答 0

您正在调用configure_logging两次(也许使用的__init__方法Boy):getLogger将返回相同的对象,但addHandler不检查是否已将类似的处理程序添加到记录器。

尝试跟踪对该方法的调用并消除其中之一。或设置一个标志logging_initializedFalse__init__方法中初始化为,如果is为,Boy则更configure_logging改为不执行任何logging_initialized操作True,并True在初始化记录器后将其设置为。

如果程序创建了多个Boy实例,则必须使用configure_logging添加处理程序的全局函数来更改处理方式,并且该Boy.configure_logging方法只能初始化self.logger属性。

解决此问题的另一种方法是检查记录器的handlers属性:

logger = logging.getLogger('my_logger')
if not logger.handlers:
    # create the handlers and call logger.addHandler(logging_handler)

You are calling configure_logging twice (maybe in the __init__ method of Boy) : getLogger will return the same object, but addHandler does not check if a similar handler has already been added to the logger.

Try tracing calls to that method and eliminating one of these. Or set up a flag logging_initialized initialized to False in the __init__ method of Boy and change configure_logging to do nothing if logging_initialized is True, and to set it to True after you’ve initialized the logger.

If your program creates several Boy instances, you’ll have to change the way you do things with a global configure_logging function adding the handlers, and the Boy.configure_logging method only initializing the self.logger attribute.

Another way of solving this is by checking the handlers attribute of your logger:

logger = logging.getLogger('my_logger')
if not logger.handlers:
    # create the handlers and call logger.addHandler(logging_handler)

回答 1

如果您看到此问题并且没有两次添加处理程序,则请参阅abarnert的答案 在此处

来自文档

注意:如果将处理程序附加到记录器及其一个或多个祖先,则它可能会多次发出相同的记录。通常,您不需要将一个处理程序附加到一个以上的记录器上-如果您将它附加到记录器层次结构中最高的适当记录器上,则它将看到所有后代记录器记录的所有事件,前提是它们的传播设置保留为True。一种常见的情况是仅将处理程序附加到根记录器,并让传播来处理其余部分。

因此,如果您希望在“测试”上使用自定义处理程序,并且不希望其消息也发送到根处理程序,则答案很简单:关闭其传播标志:

logger.propagate = False

If you are seeing this problem and you’re not adding the handler twice then see abarnert’s answer here

From the docs:

Note: If you attach a handler to a logger and one or more of its ancestors, it may emit the same record multiple times. In general, you should not need to attach a handler to more than one logger – if you just attach it to the appropriate logger which is highest in the logger hierarchy, then it will see all events logged by all descendant loggers, provided that their propagate setting is left set to True. A common scenario is to attach handlers only to the root logger, and to let propagation take care of the rest.

So, if you want a custom handler on “test”, and you don’t want its messages also going to the root handler, the answer is simple: turn off its propagate flag:

logger.propagate = False

回答 2

每次您从外部调用时都会添加该处理程序。完成工作后,尝试删除处理程序:

self.logger.removeHandler(ch)

The handler is added each time you call from outside. Try Removeing the Handler after you finish your job:

self.logger.removeHandler(ch)

回答 3

我是python新手,但这似乎对我有用(Python 2.7)

while logger.handlers:
     logger.handlers.pop()

I’m a python newbie, but this seemed to work for me (Python 2.7)

while logger.handlers:
     logger.handlers.pop()

回答 4

就我而言,我要设定 logger.propagate = False为防止重复打印。

在下面的代码中,如果删除logger.propagate = False,则将看到两次打印。

import logging
from typing import Optional

_logger: Optional[logging.Logger] = None

def get_logger() -> logging.Logger:
    global _logger
    if _logger is None:
        raise RuntimeError('get_logger call made before logger was setup!')
    return _logger

def set_logger(name:str, level=logging.DEBUG) -> None:
    global _logger
    if _logger is not None:
        raise RuntimeError('_logger is already setup!')
    _logger = logging.getLogger(name)
    _logger.handlers.clear()
    _logger.setLevel(level)
    ch = logging.StreamHandler()
    ch.setLevel(level)
    # warnings.filterwarnings("ignore", "(Possibly )?corrupt EXIF data", UserWarning)
    ch.setFormatter(_get_formatter())
    _logger.addHandler(ch)
    _logger.propagate = False # otherwise root logger prints things again


def _get_formatter() -> logging.Formatter:
    return logging.Formatter(
        '[%(asctime)s] [%(name)s] [%(levelname)s] %(message)s')

In my case I’d to set logger.propagate = False to prevent double printing.

In below code if you remove logger.propagate = False then you will see double printing.

import logging
from typing import Optional

_logger: Optional[logging.Logger] = None

def get_logger() -> logging.Logger:
    global _logger
    if _logger is None:
        raise RuntimeError('get_logger call made before logger was setup!')
    return _logger

def set_logger(name:str, level=logging.DEBUG) -> None:
    global _logger
    if _logger is not None:
        raise RuntimeError('_logger is already setup!')
    _logger = logging.getLogger(name)
    _logger.handlers.clear()
    _logger.setLevel(level)
    ch = logging.StreamHandler()
    ch.setLevel(level)
    # warnings.filterwarnings("ignore", "(Possibly )?corrupt EXIF data", UserWarning)
    ch.setFormatter(_get_formatter())
    _logger.addHandler(ch)
    _logger.propagate = False # otherwise root logger prints things again


def _get_formatter() -> logging.Formatter:
    return logging.Formatter(
        '[%(asctime)s] [%(name)s] [%(levelname)s] %(message)s')

回答 5

如果未安装根处理程序,则logging.debug()调用呼叫logging.basicConfig()。对于我来说,这是在无法控制测试用例触发顺序的测试框架中发生的。我的初始化代码正在安装第二个。默认使用我不需要的logging.BASIC_FORMAT。

A call to logging.debug() calls logging.basicConfig() if there are no root handlers installed. That was happening for me in a test framework where I couldn’t control the order that test cases fired. My initialization code was installing the second one. The default uses logging.BASIC_FORMAT that I didn’t want.


回答 6

看来,如果您(意外地)将某些内容输出到记录器然后进行配置,那就太迟了。例如,在我的代码中

logging.warning("look out)"

...
ch = logging.StreamHandler(sys.stdout)
root = logging.getLogger()
root.addHandler(ch)

root.info("hello")

我会得到类似(忽略格式选项)的信息

look out
hello
hello

并将所有内容写入标准输出两次。我相信这是因为第一次调用logging.warning会自动创建一个新的处理程序,然后我显式添加了另一个处理程序。当我删除意外的第一通logging.warning电话时,问题消失了。

It seems that if you output something to the logger (accidentally) then configure it, it is too late. For example, in my code I had

logging.warning("look out)"

...
ch = logging.StreamHandler(sys.stdout)
root = logging.getLogger()
root.addHandler(ch)

root.info("hello")

I would get something like (ignoring the format options)

look out
hello
hello

and everything was written to stdout twice. I believe this is because the first call to logging.warning creates a new handler automatically, and then I explicitly added another handler. The problem went away when I removed the accidental first logging.warning call.


回答 7

我遇到了奇怪的情况,控制台日志增加了一倍,但文件日志却没有。经过一番挖掘,我弄清楚了。

请注意,第三方软件包可以注册记录器。这是要提防的事情(在某些情况下是无法避免的)。在许多情况下,第三方代码会检查是否存在任何根目录 logger处理程序;例如,如果没有,他们会注册一个新的控制台处理程序。

我的解决方案是在根目录下注册控制台记录器:

rootLogger = logging.getLogger()  # note no text passed in--that's how we grab the root logger
if not rootLogger.handlers:
        ch = logging.StreamHandler()
        ch.setLevel(logging.INFO)
        ch.setFormatter(logging.Formatter('%(process)s|%(levelname)s] %(message)s'))
        rootLogger.addHandler(ch)

I was getting a strange situation where console logs were doubled but my file logs were not. After a ton of digging I figured it out.

Please be aware that third party packages can register loggers. This is something to watch out for (and in some cases can’t be prevented). In many cases third party code checks to see if there are any existing root logger handlers; and if there isn’t–they register a new console handler.

My solution to this was to register my console logger at the root level:

rootLogger = logging.getLogger()  # note no text passed in--that's how we grab the root logger
if not rootLogger.handlers:
        ch = logging.StreamHandler()
        ch.setLevel(logging.INFO)
        ch.setFormatter(logging.Formatter('%(process)s|%(levelname)s] %(message)s'))
        rootLogger.addHandler(ch)

Python日志记录-禁用导入模块的日志记录

问题:Python日志记录-禁用导入模块的日志记录

我正在使用Python日志记录模块,并且想禁用由导入的第三方模块打印的日志消息。例如,我正在使用类似以下内容的东西:

logger = logging.getLogger()
logger.setLevel(level=logging.DEBUG)
fh = logging.StreamHandler()
fh_formatter = logging.Formatter('%(asctime)s %(levelname)s %(lineno)d:%(filename)s(%(process)d) - %(message)s')
fh.setFormatter(fh_formatter)
logger.addHandler(fh)

当执行logger.debug(“ my message!”)时,这会打印出我的调试消息,但是它也会从我导入的任何模块(例如请求和许多其他东西)中打印出调试消息。

我只想查看我感兴趣的模块中的日志消息。是否可以使日志记录模块执行此操作?

理想情况下,我希望能够告诉记录器打印来自“ ModuleX,ModuleY”的消息,而忽略所有其他消息。

我看了以下内容,但是我不想在每次调用导入函数之前都禁用/启用日志记录: logging-如何忽略导入的模块日志?

I’m using the Python logging module, and would like to disable log messages printed by the third party modules that I import. For example, I’m using something like the following:

logger = logging.getLogger()
logger.setLevel(level=logging.DEBUG)
fh = logging.StreamHandler()
fh_formatter = logging.Formatter('%(asctime)s %(levelname)s %(lineno)d:%(filename)s(%(process)d) - %(message)s')
fh.setFormatter(fh_formatter)
logger.addHandler(fh)

This prints out my debug messages when I do a logger.debug(“my message!”), but it also prints out the debug messages from any module I import (such as requests, and a number of other things).

I’d like to see only the log messages from modules I’m interested in. Is it possible to make the logging module do this?

Ideally, I’d like to be able tell the logger to print messages from “ModuleX, ModuleY” and ignore all others.

I looked at the following, but I don’t want to have to disable/enable logging before every call to an imported function: logging – how to ignore imported module logs?


回答 0

问题在于,getLogger不带参数的调用会返回记录器,因此当您将级别logging.DEBUG设置为时,还将为使用该记录器的其他模块设置级别。

您可以通过使用root记录器来解决此问题。为此,只需将名称作为参数传递,例如模块的名称:

logger = logging.getLogger('my_module_name')
# as before

这将创建一个新的记录器,因此不会无意中更改其他模块的记录级别。


显然,您必须使用logger.debug而不是,logging.debug因为后者是一个方便的函数,它调用debug了根记录器的方法。

在“高级日志记录教程”中对此进行了提及。它还允许您以简单的方式知道哪个模块触发了日志消息。

The problem is that calling getLogger without arguments returns the root logger so when you set the level to logging.DEBUG you are also setting the level for other modules that use that logger.

You can solve this by simply not using the root logger. To do this just pass a name as argument, for example the name of your module:

logger = logging.getLogger('my_module_name')
# as before

this will create a new logger and thus it wont inadvertently change logging level for other modules.


Obviously you have to use logger.debug instead of logging.debug since the latter is a convenience function that calls the debug method of the root logger.

This is mentioned in the Advanced Logging Tutorial. It also allows you to know which module triggered the log message in a simple way.


回答 1

如果要使用pythonlogging包,则通常会在使用它的每个模块中定义一个记录器。

logger = logging.getLogger(__name__)

许多流行的python软件包都可以做到这一点,包括requests。如果程序包使用此约定,则很容易为其启用/禁用日志记录,因为记录器名称将与该程序包相同(或者是该记录器的子代)。您甚至可以将其记录到与其他记录器相同的文件中。

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)

requests_logger = logging.getLogger('requests')
requests_logger.setLevel(logging.DEBUG)

handler = logging.StreamHandler()
handler.setLevel(logging.DEBUG)
logger.addHandler(handler)
requests_logger.addHandler(handler)

If you’re going to use the python logging package, it’s a common convention to define a logger in every module that uses it.

logger = logging.getLogger(__name__)

Many popular python packages do this, including requests. If a package uses this convention, it’s easy to enable/disable logging for it, because the logger name will be the same name as the package (or will be a child of that logger). You can even log it to the same file as your other loggers.

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)

requests_logger = logging.getLogger('requests')
requests_logger.setLevel(logging.DEBUG)

handler = logging.StreamHandler()
handler.setLevel(logging.DEBUG)
logger.addHandler(handler)
requests_logger.addHandler(handler)

回答 2

不知道这是否适合发布,但是我被困了很长时间并且想帮助任何遇到相同问题的人,因为我在其他任何地方都找不到它!

尽管遵循了日志记录高级教程故障排除中非常简单的文档,但我还是从matplotlib获取调试日志。我在main()一个文件中启动记录器,然后导入一个函数以从另一个文件(我已导入matplotlib)中创建绘图。

对我有用的是导入之前设置matplotlib的级别,而不是像在我的主文件中的其他模块之后那样设置。这对我来说似乎是违反直觉的,因此,如果有人了解如何设置尚未导入的记录器的配置,我很想知道这是如何工作的。谢谢!

在我的主文件中:

import logging
import requests
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logging.getLogger('requests').setLevel(logging.DEBUG)

def main():
  ...

在我的plot.py档案中:

import logging
logging.getLogger('matplotlib').setLevel(logging.WARNING)
import matplotlib.pyplot as plt

def generatePlot():
  ...

Not sure if this is appropriate to post, but I was stuck for a long time & wanted to help out anyone with the same issue, as I hadn’t found it anywhere else!

I was getting debug logs from matplotlib despite following the pretty straightforward documentation at the logging advanced tutorial and the troubleshooting. I was initiating my logger in main() of one file and importing a function to create a plot from another file (where I had imported matplotlib).

What worked for me was setting the level of matplotlib before importing it, rather than after as I had for other modules in my main file. This seemed counterintuitive to me so if anyone has insight into how you can set the config for a logger that hasn’t been imported yet I’d be curious to find out how this works. Thanks!

In my main file:

import logging
import requests
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logging.getLogger('requests').setLevel(logging.DEBUG)

def main():
  ...

In my plot.py file:

import logging
logging.getLogger('matplotlib').setLevel(logging.WARNING)
import matplotlib.pyplot as plt

def generatePlot():
  ...

回答 3

@Bakuriu非常优雅地解释了该功能。相反,您可以使用该getLogger()方法来检索和重新配置/禁用不需要的记录器。

我还想添加该logging.fileConfig()方法接受一个名为的参数disable_existing_loggers,该参数将禁用以前定义的任何记录器(即,在导入的模块中)。

@Bakuriu quite elegantly explains the function. Conversely, you can use the getLogger() method to retrieve and reconfigure/disable the unwanted loggers.

I also wanted to add the logging.fileConfig() method accepts a parameter called disable_existing_loggers which will disable any loggers previously defined (i.e., in imported modules).


回答 4

这将禁用所有现有记录器,例如由导入模块创建的记录器,同时仍使用根记录器(而不必加载外部文件)。

logging.config.dictConfig({
    'version': 1,
    'disable_existing_loggers': True,
})

请注意,您需要导入所有您不想首先登录的模块!否则,这些将不被视为“现有记录器”。然后,它将禁用那些模块中的所有记录器。这可能会导致您也错过重要的错误!

有关使用相关选项进行配置的更多详细示例,请参见https://gist.github.com/st4lk/6287746是一个(部分工作的)示例,该示例使用YAML对该coloredlog库进行配置。

This disables all existing loggers, such as those created by imported modules, while still using the root logger (and without having to load an external file).

logging.config.dictConfig({
    'version': 1,
    'disable_existing_loggers': True,
})

Note that you need to import all modules you don’t want logged first! Otherwise those won’t be considered as “existing loggers”. It will then disable all loggers from those modules. This might lead you to also miss out on important errors!

For more detailed examples using related options for configuration, see https://gist.github.com/st4lk/6287746, and here is a (partially working) example using YAML for config with the coloredlog library.


回答 5

您可以使用类似:

logging.getLogger("imported_module").setLevel(logging.WARNING)
logging.getLogger("my_own_logger_name").setLevel(logging.DEBUG)

这会将我自己模块的日志级别设置为DEBUG,同时防止导入的模块使用同一级别。

注意: "imported_module"可以替换为imported_module.__name__(不带引号),如果您喜欢这样做,"my_own_logger_name"可以将其替换为__name__

You could use something like:

logging.getLogger("imported_module").setLevel(logging.WARNING)
logging.getLogger("my_own_logger_name").setLevel(logging.DEBUG)

This will set my own module’s log level to DEBUG, while preventing the imported module from using the same level.

Note: "imported_module" can be replaced with imported_module.__name__ (without quotes), and "my_own_logger_name" can be replaced by __name__ if that’s the way you prefer to do it.


回答 6

我有同样的问题。我有一个logging_config.py文件,我将其导入所有其他py文件中。在logging_config.py文件中,我将root logger的日志记录级别设置为ERROR(默认情况下为警告):

logging.basicConfig(
    handlers=[
        RotatingFileHandler('logs.log',maxBytes=1000, backupCount=2),
        logging.StreamHandler(), #print to console
    ],
    level=logging.ERROR
)

在其他模块中,我导入logging_config.py并声明一个新的记录器,并将其级别设置为debug:

log = logging.getLogger(__name__)
log.setLevel(logging.DEBUG)

这样,我登录py文件中的所有内容都会被记录下来,但是不会记录由urllib,request,boto3等导入模块在调试和信息级别记录的内容。如果这些导入模块中存在某些错误,则将其记录下来,因为我将根记录程序级别设置为ERROR。

I had the same problem. I have a logging_config.py file which I import in all other py files. In logging_config.py file I set root logger logging level to ERROR (by default its warning):

logging.basicConfig(
    handlers=[
        RotatingFileHandler('logs.log',maxBytes=1000, backupCount=2),
        logging.StreamHandler(), #print to console
    ],
    level=logging.ERROR
)

In other modules I import logging_config.py and declare a new logger and set its level to debug:

log = logging.getLogger(__name__)
log.setLevel(logging.DEBUG)

This way everything I log in my py files is logged, but stuff logged at debug and info level by imported modules like urllib, request,boto3 etc is not logged. If there is some error in those import module then its logged, since I set root loggers level to ERROR.


回答 7

另一个要考虑的是繁殖Logger类的属性。

例如,用于处理肥皂呼叫的py-suds库,甚至置为ERROR

logging.getLogger('suds.client').setLevel(logging.ERROR)
logging.getLogger('suds.transport').setLevel(logging.ERROR)
logging.getLogger('suds.xsdschema').setLevel(logging.ERROR)
logging.getLogger('suds.wsdl').setLevel(logging.ERROR)

记录有关名为sxbasics.py的模块的日志大量的日志

因为默认情况下日志的传播为True,所以设置为False,我恢复了514MB的日志。

import logging
logging.getLogger("suds").propagate = False
logging.getLogger('suds.client').setLevel(logging.ERROR)
logging.getLogger('suds.transport').setLevel(logging.ERROR)
logging.getLogger('suds.xsdschema').setLevel(logging.ERROR)
logging.getLogger('suds.wsdl').setLevel(logging.ERROR)

Another thing to consider is the propagate property of the Logger class.

For example, py-suds library for handling soap calls, even put to ERROR

logging.getLogger('suds.client').setLevel(logging.ERROR)
logging.getLogger('suds.transport').setLevel(logging.ERROR)
logging.getLogger('suds.xsdschema').setLevel(logging.ERROR)
logging.getLogger('suds.wsdl').setLevel(logging.ERROR)

logs logs about a module called sxbasics.py creationg a huge amount of logs

that because the propagation of the logs is True by default, setting to False, instead, i recovered 514MB of logs.

import logging
logging.getLogger("suds").propagate = False
logging.getLogger('suds.client').setLevel(logging.ERROR)
logging.getLogger('suds.transport').setLevel(logging.ERROR)
logging.getLogger('suds.xsdschema').setLevel(logging.ERROR)
logging.getLogger('suds.wsdl').setLevel(logging.ERROR)

使用日志记录打印pprint的输出

问题:使用日志记录打印pprint的输出

我想使用pprint的输出来显示复杂的数据结构,但是我想使用日志记录模块而不是stdout来输出它。

ds = [{'hello': 'there'}]
logging.debug( pprint.pprint(ds) ) # outputs as STDOUT

I want to use pprint’s output to show a complex data structure, but I would like to output it using the logging module rather than stdout.

ds = [{'hello': 'there'}]
logging.debug( pprint.pprint(ds) ) # outputs as STDOUT

回答 0

使用pprint.pformat得到一个字符串,然后将其发送到您的日志框架。

from pprint import pformat
ds = [{'hello': 'there'}]
logging.debug(pformat(ds))

Use pprint.pformat to get a string, and then send it to your logging framework.

from pprint import pformat
ds = [{'hello': 'there'}]
logging.debug(pformat(ds))

回答 1

上述解决方案没有相当,因为我还使用格式化的时候记录添加名称和levelname削减对我来说。看起来有点不整洁:

__main__    : DEBUG   : ['aaaaaaaaaaaaaaaaaaaa',
'bbbbbbbbbbbbbbbbbbbb',
'cccccccccccccccccccc',
'dddddddddddddddddddd']
__main__    : DEBUG   : Some other logging text

可能有一个更优雅的解决方案,但这是:

for line in pprint.pformat(ds).split('\n'):
    logging.debug(line)

产生更好的东西:

__main__    : DEBUG   : ['aaaaaaaaaaaaaaaaaaaa',
__main__    : DEBUG   :  'bbbbbbbbbbbbbbbbbbbb',
__main__    : DEBUG   :  'cccccccccccccccccccc',
__main__    : DEBUG   :  'dddddddddddddddddddd']
__main__    : DEBUG   : Some other logging text

The solution above didn’t quite cut it for me because I’m also using a formatter to add name and levelname when logging. It looks a little untidy:

__main__    : DEBUG   : ['aaaaaaaaaaaaaaaaaaaa',
'bbbbbbbbbbbbbbbbbbbb',
'cccccccccccccccccccc',
'dddddddddddddddddddd']
__main__    : DEBUG   : Some other logging text

There may be a more elegant solution, but this:

for line in pprint.pformat(ds).split('\n'):
    logging.debug(line)

produces something a little nicer:

__main__    : DEBUG   : ['aaaaaaaaaaaaaaaaaaaa',
__main__    : DEBUG   :  'bbbbbbbbbbbbbbbbbbbb',
__main__    : DEBUG   :  'cccccccccccccccccccc',
__main__    : DEBUG   :  'dddddddddddddddddddd']
__main__    : DEBUG   : Some other logging text

在Python Django中运行单元测试时,如何禁用日志记录?

问题:在Python Django中运行单元测试时,如何禁用日志记录?

我正在使用一个基于单元测试的简单测试运行器来测试我的Django应用程序。

我的应用程序本身配置为在settings.py中使用基本记录器,方法是:

logging.basicConfig(level=logging.DEBUG)

在我的应用程序代码中使用:

logger = logging.getLogger(__name__)
logger.setLevel(getattr(settings, 'LOG_LEVEL', logging.DEBUG))

但是,在运行单元测试时,我想禁用日志记录,以免混乱我的测试结果输出。有没有一种简单的方法可以以全局方式关闭日志记录,以便在运行测试时,特定于应用程序的记录器不会将内容写到控制台上?

I am using a simple unit test based test runner to test my Django application.

My application itself is configured to use a basic logger in settings.py using:

logging.basicConfig(level=logging.DEBUG)

And in my application code using:

logger = logging.getLogger(__name__)
logger.setLevel(getattr(settings, 'LOG_LEVEL', logging.DEBUG))

However, when running unittests, I’d like to disable logging so that it doesn’t clutter my test result output. Is there a simple way to turn off logging in a global way, so that the application specific loggers aren’t writing stuff out to the console when I run tests?


回答 0

logging.disable(logging.CRITICAL)

将禁用所有级别不低于或等于的日志记录调用CRITICAL。可以通过以下方式重新启用日志记录

logging.disable(logging.NOTSET)
logging.disable(logging.CRITICAL)

will disable all logging calls with levels less severe than or equal to CRITICAL. Logging can be re-enabled with

logging.disable(logging.NOTSET)

回答 1

由于您使用的是Django,因此可以将以下几行添加到settings.py中:

import sys
import logging

if len(sys.argv) > 1 and sys.argv[1] == 'test':
    logging.disable(logging.CRITICAL)

这样,您不必setUp()在测试中的每行中都添加该行。

您也可以通过这种方式对测试需求进行一些方便的更改。

还有另一种“更精细”的方法可以为测试添加细节,这就是您自己的测试运行者。

只需创建一个这样的类:

import logging

from django.test.simple import DjangoTestSuiteRunner
from django.conf import settings

class MyOwnTestRunner(DjangoTestSuiteRunner):
    def run_tests(self, test_labels, extra_tests=None, **kwargs):

        # Don't show logging messages while testing
        logging.disable(logging.CRITICAL)

        return super(MyOwnTestRunner, self).run_tests(test_labels, extra_tests, **kwargs)

现在添加到您的settings.py文件中:

TEST_RUNNER = "PATH.TO.PYFILE.MyOwnTestRunner"
#(for example, 'utils.mytest_runner.MyOwnTestRunner')

这使您可以进行一种非常方便的修改,而另一种方法则不需要,这就是使Django仅测试所需的应用程序。您可以通过更改test_labels将以下行添加到测试运行器来实现:

if not test_labels:
    test_labels = ['my_app1', 'my_app2', ...]

Since you are in Django, you could add these lines to your settings.py:

import sys
import logging

if len(sys.argv) > 1 and sys.argv[1] == 'test':
    logging.disable(logging.CRITICAL)

That way you don’t have to add that line in every setUp() on your tests.

You could also do a couple of handy changes for your test needs this way.

There is another “nicer” or “cleaner” way to add specifics to your tests and that is making your own test runner.

Just create a class like this:

import logging

from django.test.simple import DjangoTestSuiteRunner
from django.conf import settings

class MyOwnTestRunner(DjangoTestSuiteRunner):
    def run_tests(self, test_labels, extra_tests=None, **kwargs):

        # Don't show logging messages while testing
        logging.disable(logging.CRITICAL)

        return super(MyOwnTestRunner, self).run_tests(test_labels, extra_tests, **kwargs)

And now add to your settings.py file:

TEST_RUNNER = "PATH.TO.PYFILE.MyOwnTestRunner"
#(for example, 'utils.mytest_runner.MyOwnTestRunner')

This lets you do one really handy modification that the other approach doesn’t, which is to make Django just tests the applications that you want. You can do that by changing the test_labels adding this line to the test runner:

if not test_labels:
    test_labels = ['my_app1', 'my_app2', ...]

回答 2

有没有一种简单的方法可以以全局方式关闭日志记录,以便在运行测试时,特定于应用程序的记录器不会将内容写到控制台上?

其他答案通过全局设置日志记录基础结构以忽略任何内容来防止“将内容写到控制台”。这行得通,但我觉得这种方法太钝了。我的方法是执行配置更改,该更改只执行防止日志从控制台中丢失所需的操作。所以我添加了一个自定义的日志过滤器到我的settings.py

from logging import Filter

class NotInTestingFilter(Filter):

    def filter(self, record):
        # Although I normally just put this class in the settings.py
        # file, I have my reasons to load settings here. In many
        # cases, you could skip the import and just read the setting
        # from the local symbol space.
        from django.conf import settings

        # TESTING_MODE is some settings variable that tells my code
        # whether the code is running in a testing environment or
        # not. Any test runner I use will load the Django code in a
        # way that makes it True.
        return not settings.TESTING_MODE

将Django日志配置为使用过滤器:

LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'filters': {
        'testing': {
            '()': NotInTestingFilter
        }
    },
    'formatters': {
        'verbose': {
            'format': ('%(levelname)s %(asctime)s %(module)s '
                       '%(process)d %(thread)d %(message)s')
        },
    },
    'handlers': {
        'console': {
            'level': 'DEBUG',
            'class': 'logging.StreamHandler',
            'filters': ['testing'],
            'formatter': 'verbose'
        },
    },
    'loggers': {
        'foo': {
            'handlers': ['console'],
            'level': 'DEBUG',
            'propagate': True,
        },
    }
}

最终结果:当我进行测试时,没有任何内容进入控制台,但其他一切保持不变。

为什么这样做?

我设计的代码包含仅在特定情况下触发的日志记录指令,如果出现问题,该指令应输出我诊断所需的确切数据。因此,我测试了他们执行了应做的事情,因此完全禁用日志记录对我而言不可行。我不想在软件投入生产后发现我认为要记录的内容没有记录下来。

此外,一些测试运行程序(例如,Nose)将在测试过程中捕获日志,并输出日志的相关部分以及测试失败。在弄清楚测试失败的原因时很有用。如果日志记录已完全关闭,则无法捕获任何内容。

Is there a simple way to turn off logging in a global way, so that the application specific loggers aren’t writing stuff out to the console when I run tests?

The other answers prevent “writing stuff out to the console” by globally setting the logging infrastructure to ignore anything. This works but I find it too blunt an approach. My approach is to perform a configuration change which does only what’s needed to prevent logs to get out on the console. So I add a custom logging filter to my settings.py:

from logging import Filter

class NotInTestingFilter(Filter):

    def filter(self, record):
        # Although I normally just put this class in the settings.py
        # file, I have my reasons to load settings here. In many
        # cases, you could skip the import and just read the setting
        # from the local symbol space.
        from django.conf import settings

        # TESTING_MODE is some settings variable that tells my code
        # whether the code is running in a testing environment or
        # not. Any test runner I use will load the Django code in a
        # way that makes it True.
        return not settings.TESTING_MODE

And I configure the Django logging to use the filter:

LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'filters': {
        'testing': {
            '()': NotInTestingFilter
        }
    },
    'formatters': {
        'verbose': {
            'format': ('%(levelname)s %(asctime)s %(module)s '
                       '%(process)d %(thread)d %(message)s')
        },
    },
    'handlers': {
        'console': {
            'level': 'DEBUG',
            'class': 'logging.StreamHandler',
            'filters': ['testing'],
            'formatter': 'verbose'
        },
    },
    'loggers': {
        'foo': {
            'handlers': ['console'],
            'level': 'DEBUG',
            'propagate': True,
        },
    }
}

End result: when I’m testing, nothing goes to the console, but everything else stays the same.

Why Do This?

I design code that contains logging instructions that are triggered only in specific circumstances and that should output the exact data I need for diagnosis if things go wrong. Therefore I test that they do what they are supposed to do and thus completely disabling logging is not viable for me. I don’t want to find once the software is in production that what I thought would be logged is not logged.

Moreover, some test runners (Nose, for instance) will capture logs during testing and output the relevant part of the log together with a test failure. It is useful in figuring out why a test failed. If logging is completely turned off, then there’s nothing that can be captured.


回答 3

我喜欢Hassek的自定义测试跑步者想法。应该注意的DjangoTestSuiteRunner是,它不再是Django 1.6+中的默认测试运行程序,而是由代替DiscoverRunner。对于默认行为,测试运行器应类似于:

import logging

from django.test.runner import DiscoverRunner

class NoLoggingTestRunner(DiscoverRunner):
    def run_tests(self, test_labels, extra_tests=None, **kwargs):

        # disable logging below CRITICAL while testing
        logging.disable(logging.CRITICAL)

        return super(NoLoggingTestRunner, self).run_tests(test_labels, extra_tests, **kwargs)

I like Hassek’s custom test runner idea. It should be noted that DjangoTestSuiteRunner is no longer the default test runner in Django 1.6+, it has been replaced by the DiscoverRunner. For default behaviour, the test runner should be more like:

import logging

from django.test.runner import DiscoverRunner

class NoLoggingTestRunner(DiscoverRunner):
    def run_tests(self, test_labels, extra_tests=None, **kwargs):

        # disable logging below CRITICAL while testing
        logging.disable(logging.CRITICAL)

        return super(NoLoggingTestRunner, self).run_tests(test_labels, extra_tests, **kwargs)

回答 4

我发现对于unittest框架内或类似框架内的测试,安全禁用单元测试中不必要的日志记录的最有效方法是在特定测试用例的setUp/ tearDown方法中启用/禁用。这使一个目标明确地应在哪里禁用日志。您也可以在要测试的类的记录器上明确地执行此操作。

import unittest
import logging

class TestMyUnitTest(unittest.TestCase):
    def setUp(self):
        logging.disable(logging.CRITICAL)

    def tearDown(self):
        logging.disable(logging.NOTSET)

I’ve found that for tests within unittest or similar a framework, the most effective way to safely disable unwanted logging in unit tests is to enable/disable in the setUp/tearDown methods of a particular test case. This lets one target specifically where logs should be disabled. You could also do this explicitly on the logger of the class you’re testing.

import unittest
import logging

class TestMyUnitTest(unittest.TestCase):
    def setUp(self):
        logging.disable(logging.CRITICAL)

    def tearDown(self):
        logging.disable(logging.NOTSET)

回答 5

我正在使用一个简单的方法装饰器来仅在特定的测试方法中禁用日志记录。

def disable_logging(f):

    def wrapper(*args):
        logging.disable(logging.CRITICAL)
        result = f(*args)
        logging.disable(logging.NOTSET)

        return result

    return wrapper

然后像下面的示例一样使用它:

class ScenarioTestCase(TestCase):

    @disable_logging
    test_scenario(self):
        pass

I am using a simple method decorator to disable logging only in a particular test method.

def disable_logging(f):

    def wrapper(*args):
        logging.disable(logging.CRITICAL)
        result = f(*args)
        logging.disable(logging.NOTSET)

        return result

    return wrapper

And then I use it as in the following example:

class ScenarioTestCase(TestCase):

    @disable_logging
    test_scenario(self):
        pass

回答 6

有一些漂亮而干净的方法可以挂起使用unittest.mock.patch方法登录测试。

foo.py

import logging


logger = logging.getLogger(__name__)

def bar():
    logger.error('There is some error output here!')
    return True

tests.py

from unittest import mock, TestCase
from foo import bar


class FooBarTestCase(TestCase):
    @mock.patch('foo.logger', mock.Mock())
    def test_bar(self):
        self.assertTrue(bar())

并且python3 -m unittest tests不会产生任何日志输出。

There is some pretty and clean method to suspend logging in tests with unittest.mock.patch method.

foo.py:

import logging


logger = logging.getLogger(__name__)

def bar():
    logger.error('There is some error output here!')
    return True

tests.py:

from unittest import mock, TestCase
from foo import bar


class FooBarTestCase(TestCase):
    @mock.patch('foo.logger', mock.Mock())
    def test_bar(self):
        self.assertTrue(bar())

And python3 -m unittest tests will produce no logging output.


回答 7

有时您需要日志,有时则不需要。我的代码中有settings.py

import sys

if '--no-logs' in sys.argv:
    print('> Disabling logging levels of CRITICAL and below.')
    sys.argv.remove('--no-logs')
    logging.disable(logging.CRITICAL)

因此,如果使用--no-logs选项运行测试,则只会获得critical日志:

$ python ./manage.py tests --no-logs
> Disabling logging levels of CRITICAL and below.

如果要在持续集成流程中加快测试速度,这将非常有帮助。

Sometimes you want the logs and sometimes not. I have this code in my settings.py

import sys

if '--no-logs' in sys.argv:
    print('> Disabling logging levels of CRITICAL and below.')
    sys.argv.remove('--no-logs')
    logging.disable(logging.CRITICAL)

So if you run your test with the --no-logs options you’ll get only the critical logs:

$ python ./manage.py tests --no-logs
> Disabling logging levels of CRITICAL and below.

It’s very helpful if you want speedup the tests on your continuous integration flow.


回答 8

如果您不希望它在setUp()和tearDown()中反复打开/关闭它以进行单元测试(看不到原因),则每个类只能执行一次:

    import unittest
    import logging

    class TestMyUnitTest(unittest.TestCase):
        @classmethod
        def setUpClass(cls):
            logging.disable(logging.CRITICAL)
        @classmethod
        def tearDownClass(cls):
            logging.disable(logging.NOTSET)

If you don’t want it repeatedly turn it on/off in setUp() and tearDown() for unittest (don’t see the reason for that), you could just do it once per class:

    import unittest
    import logging

    class TestMyUnitTest(unittest.TestCase):
        @classmethod
        def setUpClass(cls):
            logging.disable(logging.CRITICAL)
        @classmethod
        def tearDownClass(cls):
            logging.disable(logging.NOTSET)

回答 9

如果我想暂时取消某个特定的记录器,我编写了一个有用的小上下文管理器:

from contextlib import contextmanager
import logging

@contextmanager
def disable_logger(name):
    """Temporarily disable a specific logger."""
    logger = logging.getLogger(name)
    old_value = logger.disabled
    logger.disabled = True
    try:
        yield
    finally:
        logger.disabled = old_value

然后,您可以像这样使用它:

class MyTestCase(TestCase):
    def test_something(self):
        with disable_logger('<logger name>'):
            # code that causes the logger to fire

这样做的好处是,with完成后将重新启用记录器(或将其设置回其先前的状态)。

In cases where I wish to temporarily suppress a specific logger, I’ve written a little context manager that I’ve found useful:

from contextlib import contextmanager
import logging

@contextmanager
def disable_logger(name):
    """Temporarily disable a specific logger."""
    logger = logging.getLogger(name)
    old_value = logger.disabled
    logger.disabled = True
    try:
        yield
    finally:
        logger.disabled = old_value

You then use it like:

class MyTestCase(TestCase):
    def test_something(self):
        with disable_logger('<logger name>'):
            # code that causes the logger to fire

This has the advantage that the logger is re-enabled (or set back to its prior state) once the with completes.


回答 10

您可以将其放在单元测试__init__.py文件的顶级目录中。这将禁用单元测试套件中的全局日志记录。

# tests/unit/__init__.py
import logging

logging.disable(logging.CRITICAL)

You can put this in the top level directory for unit tests __init__.py file. This will disable logging globally in the unit test suite.

# tests/unit/__init__.py
import logging

logging.disable(logging.CRITICAL)

回答 11

就我而言,我有一个settings/test.py专门为测试目的而创建的设置文件 ,如下所示:

from .base import *

DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.sqlite3',
        'NAME': 'test_db'
    }
}

PASSWORD_HASHERS = (
    'django.contrib.auth.hashers.MD5PasswordHasher',
)

LOGGING = {}

我把一个环境变量DJANGO_SETTINGS_MODULE=settings.test/etc/environment

In my case I have a settings file settings/test.py created specifically for testing purposes, here’s what it looks like:

from .base import *

DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.sqlite3',
        'NAME': 'test_db'
    }
}

PASSWORD_HASHERS = (
    'django.contrib.auth.hashers.MD5PasswordHasher',
)

LOGGING = {}

I put an environment variable DJANGO_SETTINGS_MODULE=settings.test to /etc/environment.


回答 12

如果您有用于测试,开发和生产的不同的初始化模块,则可以禁用任何内容或将其重定向到初始化程序中。我有local.py,test.py和production.py,它们都从common.y继承

common.py进行包括以下代码段在内的所有主要配置:

LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
    'django.server': {
        '()': 'django.utils.log.ServerFormatter',
        'format': '[%(server_time)s] %(message)s',
    },
    'verbose': {
        'format': '%(levelname)s %(asctime)s %(module)s %(process)d %(thread)d %(message)s'
    },
    'simple': {
        'format': '%(levelname)s %(message)s'
    },
},
'filters': {
    'require_debug_true': {
        '()': 'django.utils.log.RequireDebugTrue',
    },
},
'handlers': {
    'django.server': {
        'level': 'INFO',
        'class': 'logging.StreamHandler',
        'formatter': 'django.server',
    },
    'console': {
        'level': 'DEBUG',
        'class': 'logging.StreamHandler',
        'formatter': 'simple'
    },
    'mail_admins': {
        'level': 'ERROR',
        'class': 'django.utils.log.AdminEmailHandler'
    }
},
'loggers': {
    'django': {
        'handlers': ['console'],
        'level': 'INFO',
        'propagate': True,
    },
    'celery.tasks': {
        'handlers': ['console'],
        'level': 'DEBUG',
        'propagate': True,
    },
    'django.server': {
        'handlers': ['django.server'],
        'level': 'INFO',
        'propagate': False,
    },
}

然后在test.py我有这个:

console_logger = Common.LOGGING.get('handlers').get('console')
console_logger['class'] = 'logging.FileHandler
console_logger['filename'] = './unitest.log

这用FileHandler代替了控制台处理程序,意味着仍然可以记录日志,但是我不必接触生产代码库。

If you have different initaliser modules for test, dev and production then you can disable anything or redirect it in the initialser. I have local.py, test.py and production.py that all inherit from common.y

common.py does all the main config including this snippet :

LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
    'django.server': {
        '()': 'django.utils.log.ServerFormatter',
        'format': '[%(server_time)s] %(message)s',
    },
    'verbose': {
        'format': '%(levelname)s %(asctime)s %(module)s %(process)d %(thread)d %(message)s'
    },
    'simple': {
        'format': '%(levelname)s %(message)s'
    },
},
'filters': {
    'require_debug_true': {
        '()': 'django.utils.log.RequireDebugTrue',
    },
},
'handlers': {
    'django.server': {
        'level': 'INFO',
        'class': 'logging.StreamHandler',
        'formatter': 'django.server',
    },
    'console': {
        'level': 'DEBUG',
        'class': 'logging.StreamHandler',
        'formatter': 'simple'
    },
    'mail_admins': {
        'level': 'ERROR',
        'class': 'django.utils.log.AdminEmailHandler'
    }
},
'loggers': {
    'django': {
        'handlers': ['console'],
        'level': 'INFO',
        'propagate': True,
    },
    'celery.tasks': {
        'handlers': ['console'],
        'level': 'DEBUG',
        'propagate': True,
    },
    'django.server': {
        'handlers': ['django.server'],
        'level': 'INFO',
        'propagate': False,
    },
}

Then in test.py I have this:

console_logger = Common.LOGGING.get('handlers').get('console')
console_logger['class'] = 'logging.FileHandler
console_logger['filename'] = './unitest.log

This replaces the console handler with a FileHandler and means still get logging but I do not have to touch the production code base.


回答 13

如果您使用的是pytest

由于pytest捕获日志消息并仅在失败的测试中显示它们,因此您通常不希望禁用任何日志记录。相反,请使用单独的settings.py文件进行测试(例如test_settings.py),然后添加到其中:

LOGGING_CONFIG = None

这告诉Django完全跳过配置日志记录。的LOGGING设置将被忽略,可以从设置中删除。

使用这种方法,对于通过的测试,您将不会获得任何日志记录,对于失败的测试,您将获得所有可用的日志记录。

测试将使用由设置的日志记录运行pytest。您可以根据自己的喜好来配置它pytest(例如tox.ini)。要包括调试级别日志消息,请使用log_level = DEBUG(或相应的命令行参数)。

If you’re using pytest:

Since pytest captures log messages and only displays them for failed tests, you typically don’t want to disable any logging. Instead, use a separate settings.py file for tests (e.g., test_settings.py), and add to it:

LOGGING_CONFIG = None

This tells Django to skip configuring the logging altogether. The LOGGING setting will be ignored and can be removed from the settings.

With this approach, you don’t get any logging for passed tests, and you get all available logging for failed tests.

The tests will run using the logging that was set up by pytest. It can be configured to your liking in the pytest settings (e.g., tox.ini). To include debug level log messages, use log_level = DEBUG (or the corresponding command line argument).


Python日志记录:使用毫秒格式的时间

问题:Python日志记录:使用毫秒格式的时间

默认情况下logging.Formatter('%(asctime)s'),使用以下格式打印:

2011-06-09 10:54:40,638

其中638是毫秒。我需要将逗号更改为点:

2011-06-09 10:54:40.638

要格式化时间,我可以使用:

logging.Formatter(fmt='%(asctime)s',datestr=date_format_str)

但是,文档未指定如何设置毫秒格式。我发现这太问题,其中约微秒的会谈,但)我宁愿毫秒和b)下列不上的Python 2.6(其中我的工作),由于工作的关系%f

logging.Formatter(fmt='%(asctime)s',datefmt='%Y-%m-%d,%H:%M:%S.%f')

By default logging.Formatter('%(asctime)s') prints with the following format:

2011-06-09 10:54:40,638

where 638 is the millisecond. I need to change the comma to a dot:

2011-06-09 10:54:40.638

To format the time I can use:

logging.Formatter(fmt='%(asctime)s',datestr=date_format_str)

however the documentation doesn’t specify how to format milliseconds. I’ve found this SO question which talks about microseconds, but a) I would prefer milliseconds and b) the following doesn’t work on Python 2.6 (which I’m working on) due to the %f:

logging.Formatter(fmt='%(asctime)s',datefmt='%Y-%m-%d,%H:%M:%S.%f')

回答 0

请注意,克雷格·麦克丹尼尔(Craig McDaniel)的解决方案显然更好。


logging.Formatter的formatTime方法如下所示:

def formatTime(self, record, datefmt=None):
    ct = self.converter(record.created)
    if datefmt:
        s = time.strftime(datefmt, ct)
    else:
        t = time.strftime("%Y-%m-%d %H:%M:%S", ct)
        s = "%s,%03d" % (t, record.msecs)
    return s

请注意中的逗号"%s,%03d"。不能通过指定a来解决此问题,datefmt因为cta是,time.struct_time并且这些对象不记录毫秒。

如果我们更改的定义ct以使其成为datetime对象而不是struct_time,那么(至少在现代版本的Python中)可以调用ct.strftime,然后可以用来%f设置微秒的格式:

import logging
import datetime as dt

class MyFormatter(logging.Formatter):
    converter=dt.datetime.fromtimestamp
    def formatTime(self, record, datefmt=None):
        ct = self.converter(record.created)
        if datefmt:
            s = ct.strftime(datefmt)
        else:
            t = ct.strftime("%Y-%m-%d %H:%M:%S")
            s = "%s,%03d" % (t, record.msecs)
        return s

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)

console = logging.StreamHandler()
logger.addHandler(console)

formatter = MyFormatter(fmt='%(asctime)s %(message)s',datefmt='%Y-%m-%d,%H:%M:%S.%f')
console.setFormatter(formatter)

logger.debug('Jackdaws love my big sphinx of quartz.')
# 2011-06-09,07:12:36.553554 Jackdaws love my big sphinx of quartz.

或者,要获取毫秒数,请将逗号更改为小数点,然后省略datefmt参数:

class MyFormatter(logging.Formatter):
    converter=dt.datetime.fromtimestamp
    def formatTime(self, record, datefmt=None):
        ct = self.converter(record.created)
        if datefmt:
            s = ct.strftime(datefmt)
        else:
            t = ct.strftime("%Y-%m-%d %H:%M:%S")
            s = "%s.%03d" % (t, record.msecs)
        return s

...
formatter = MyFormatter(fmt='%(asctime)s %(message)s')
...
logger.debug('Jackdaws love my big sphinx of quartz.')
# 2011-06-09 08:14:38.343 Jackdaws love my big sphinx of quartz.

Please note Craig McDaniel’s solution is clearly better.


logging.Formatter’s formatTime method looks like this:

def formatTime(self, record, datefmt=None):
    ct = self.converter(record.created)
    if datefmt:
        s = time.strftime(datefmt, ct)
    else:
        t = time.strftime("%Y-%m-%d %H:%M:%S", ct)
        s = "%s,%03d" % (t, record.msecs)
    return s

Notice the comma in "%s,%03d". This can not be fixed by specifying a datefmt because ct is a time.struct_time and these objects do not record milliseconds.

If we change the definition of ct to make it a datetime object instead of a struct_time, then (at least with modern versions of Python) we can call ct.strftime and then we can use %f to format microseconds:

import logging
import datetime as dt

class MyFormatter(logging.Formatter):
    converter=dt.datetime.fromtimestamp
    def formatTime(self, record, datefmt=None):
        ct = self.converter(record.created)
        if datefmt:
            s = ct.strftime(datefmt)
        else:
            t = ct.strftime("%Y-%m-%d %H:%M:%S")
            s = "%s,%03d" % (t, record.msecs)
        return s

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)

console = logging.StreamHandler()
logger.addHandler(console)

formatter = MyFormatter(fmt='%(asctime)s %(message)s',datefmt='%Y-%m-%d,%H:%M:%S.%f')
console.setFormatter(formatter)

logger.debug('Jackdaws love my big sphinx of quartz.')
# 2011-06-09,07:12:36.553554 Jackdaws love my big sphinx of quartz.

Or, to get milliseconds, change the comma to a decimal point, and omit the datefmt argument:

class MyFormatter(logging.Formatter):
    converter=dt.datetime.fromtimestamp
    def formatTime(self, record, datefmt=None):
        ct = self.converter(record.created)
        if datefmt:
            s = ct.strftime(datefmt)
        else:
            t = ct.strftime("%Y-%m-%d %H:%M:%S")
            s = "%s.%03d" % (t, record.msecs)
        return s

...
formatter = MyFormatter(fmt='%(asctime)s %(message)s')
...
logger.debug('Jackdaws love my big sphinx of quartz.')
# 2011-06-09 08:14:38.343 Jackdaws love my big sphinx of quartz.

回答 1

这也应该工作:

logging.Formatter(fmt='%(asctime)s.%(msecs)03d',datefmt='%Y-%m-%d,%H:%M:%S')

This should work too:

logging.Formatter(fmt='%(asctime)s.%(msecs)03d',datefmt='%Y-%m-%d,%H:%M:%S')

回答 2

添加毫秒是更好的选择,谢谢。这是我在Blender中将其与Python 3.5.3结合使用的修正

import logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s.%(msecs)03d %(levelname)s:\t%(message)s', datefmt='%Y-%m-%d %H:%M:%S')
log = logging.getLogger(__name__)
log.info("Logging Info")
log.debug("Logging Debug")

Adding msecs was the better option, Thanks. Here is my amendment using this with Python 3.5.3 in Blender

import logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s.%(msecs)03d %(levelname)s:\t%(message)s', datefmt='%Y-%m-%d %H:%M:%S')
log = logging.getLogger(__name__)
log.info("Logging Info")
log.debug("Logging Debug")

回答 3

我发现的最简单的方法是覆盖default_msec_format:

formatter = logging.Formatter('%(asctime)s')
formatter.default_msec_format = '%s.%03d'

The simplest way I found was to override default_msec_format:

formatter = logging.Formatter('%(asctime)s')
formatter.default_msec_format = '%s.%03d'

回答 4

实例化后,Formatter我通常会设置formatter.converter = gmtime。因此,在这种情况下,为了使@unutbu的答案起作用,您需要:

class MyFormatter(logging.Formatter):
    def formatTime(self, record, datefmt=None):
        ct = self.converter(record.created)
        if datefmt:
            s = time.strftime(datefmt, ct)
        else:
            t = time.strftime("%Y-%m-%d %H:%M:%S", ct)
            s = "%s.%03d" % (t, record.msecs)
        return s

After instantiating a Formatter I usually set formatter.converter = gmtime. So in order for @unutbu’s answer to work in this case you’ll need:

class MyFormatter(logging.Formatter):
    def formatTime(self, record, datefmt=None):
        ct = self.converter(record.created)
        if datefmt:
            s = time.strftime(datefmt, ct)
        else:
            t = time.strftime("%Y-%m-%d %H:%M:%S", ct)
            s = "%s.%03d" % (t, record.msecs)
        return s

回答 5

一个不需要datetime模块且不受其他解决方案限制的简单扩展就是使用简单的字符串替换,如下所示:

import logging
import time

class MyFormatter(logging.Formatter):
    def formatTime(self, record, datefmt=None):
    ct = self.converter(record.created)
    if datefmt:
        if "%F" in datefmt:
            msec = "%03d" % record.msecs
            datefmt = datefmt.replace("%F", msec)
        s = time.strftime(datefmt, ct)
    else:
        t = time.strftime("%Y-%m-%d %H:%M:%S", ct)
        s = "%s,%03d" % (t, record.msecs)
    return s

这样,可以使用%F毫秒来编写所需的日期格式,甚至允许区域差异。例如:

log = logging.getLogger(__name__)
log.setLevel(logging.INFO)

sh = logging.StreamHandler()
log.addHandler(sh)

fm = MyFormatter(fmt='%(asctime)s-%(levelname)s-%(message)s',datefmt='%H:%M:%S.%F')
sh.setFormatter(fm)

log.info("Foo, Bar, Baz")
# 03:26:33.757-INFO-Foo, Bar, Baz

A simple expansion that doesn’t require the datetime module and isn’t handicapped like some other solutions is to use simple string replacement like so:

import logging
import time

class MyFormatter(logging.Formatter):
    def formatTime(self, record, datefmt=None):
    ct = self.converter(record.created)
    if datefmt:
        if "%F" in datefmt:
            msec = "%03d" % record.msecs
            datefmt = datefmt.replace("%F", msec)
        s = time.strftime(datefmt, ct)
    else:
        t = time.strftime("%Y-%m-%d %H:%M:%S", ct)
        s = "%s,%03d" % (t, record.msecs)
    return s

This way a date format can be written however you want, even allowing for region differences, by using %F for milliseconds. For example:

log = logging.getLogger(__name__)
log.setLevel(logging.INFO)

sh = logging.StreamHandler()
log.addHandler(sh)

fm = MyFormatter(fmt='%(asctime)s-%(levelname)s-%(message)s',datefmt='%H:%M:%S.%F')
sh.setFormatter(fm)

log.info("Foo, Bar, Baz")
# 03:26:33.757-INFO-Foo, Bar, Baz

回答 6

如果您使用箭头,或者您不介意使用箭头。您可以将python的时间格式替换为arrow的时间格式。

import logging

from arrow.arrow import Arrow


class ArrowTimeFormatter(logging.Formatter):

    def formatTime(self, record, datefmt=None):
        arrow_time = Arrow.fromtimestamp(record.created)

        if datefmt:
            arrow_time = arrow_time.format(datefmt)

        return str(arrow_time)


logger = logging.getLogger(__name__)

default_handler = logging.StreamHandler()
default_handler.setFormatter(ArrowTimeFormatter(
    fmt='%(asctime)s',
    datefmt='YYYY-MM-DD HH:mm:ss.SSS'
))

logger.setLevel(logging.DEBUG)
logger.addHandler(default_handler)

现在,您可以在属性中使用所有箭头的时间格式datefmt

If you are using arrow or if you don’t mind using arrow. You can substitute python’s time formatting for arrow’s one.

import logging

from arrow.arrow import Arrow


class ArrowTimeFormatter(logging.Formatter):

    def formatTime(self, record, datefmt=None):
        arrow_time = Arrow.fromtimestamp(record.created)

        if datefmt:
            arrow_time = arrow_time.format(datefmt)

        return str(arrow_time)


logger = logging.getLogger(__name__)

default_handler = logging.StreamHandler()
default_handler.setFormatter(ArrowTimeFormatter(
    fmt='%(asctime)s',
    datefmt='YYYY-MM-DD HH:mm:ss.SSS'
))

logger.setLevel(logging.DEBUG)
logger.addHandler(default_handler)

Now you can use all of arrow’s time formatting in datefmt attribute.


回答 7

tl; dr供在此处查找ISO格式日期的人员使用:

datefmt:’%Y-%m-%d%H:%M:%S.%03d%z’

tl;dr for folks looking here for an ISO formatted date:

instead of using something like ‘%Y-%m-%d %H:%M:%S.%03d%z’, create your own class as @unutbu indicated. Here’s one for iso date format:

import logging
from time import gmtime, strftime

class ISOFormatter(logging.Formatter):
    def formatTime(self, record, datefmt=None):
        t = strftime("%Y-%m-%dT%H:%M:%S", gmtime(record.created))
        z = strftime("%z",gmtime(record.created))
        s = "%s.%03d%s" % (t, record.msecs,z)        
        return s

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)

console = logging.StreamHandler()
logger.addHandler(console)

formatter = ISOFormatter(fmt='%(asctime)s - %(module)s - %(levelname)s - %(message)s')
console.setFormatter(formatter)

logger.debug('Jackdaws love my big sphinx of quartz.')
#2020-10-23T17:25:48.310-0800 - <stdin> - DEBUG - Jackdaws love my big sphinx of quartz.


回答 8

到目前为止,以下与python 3完美兼容。

         logging.basicConfig(level=logging.DEBUG,
                     format='%(asctime)s %(levelname)-8s %(message)s',
                     datefmt='%Y/%m/%d %H:%M:%S.%03d',
                     filename=self.log_filepath,
                     filemode='w')

提供以下输出

2020/01/11 18:51:19.011信息

As of now the following works perfectly with python 3 .

         logging.basicConfig(level=logging.DEBUG,
                     format='%(asctime)s %(levelname)-8s %(message)s',
                     datefmt='%Y/%m/%d %H:%M:%S.%03d',
                     filename=self.log_filepath,
                     filemode='w')

gives the following output

2020/01/11 18:51:19.011 INFO


带回溯的日志异常

问题:带回溯的日志异常

如何记录我的Python错误?

try:
    do_something()
except:
    # How can I log my exception here, complete with its traceback?

How can I log my Python errors?

try:
    do_something()
except:
    # How can I log my exception here, complete with its traceback?

回答 0

使用logging.exception从内except:处理/块与跟踪信息,与消息前缀一起记录当前异常。

import logging
LOG_FILENAME = '/tmp/logging_example.out'
logging.basicConfig(filename=LOG_FILENAME, level=logging.DEBUG)

logging.debug('This message should go to the log file')

try:
    run_my_stuff()
except:
    logging.exception('Got exception on main handler')
    raise

现在查看日志文件/tmp/logging_example.out

DEBUG:root:This message should go to the log file
ERROR:root:Got exception on main handler
Traceback (most recent call last):
  File "/tmp/teste.py", line 9, in <module>
    run_my_stuff()
NameError: name 'run_my_stuff' is not defined

Use logging.exception from within the except: handler/block to log the current exception along with the trace information, prepended with a message.

import logging
LOG_FILENAME = '/tmp/logging_example.out'
logging.basicConfig(filename=LOG_FILENAME, level=logging.DEBUG)

logging.debug('This message should go to the log file')

try:
    run_my_stuff()
except:
    logging.exception('Got exception on main handler')
    raise

Now looking at the log file, /tmp/logging_example.out:

DEBUG:root:This message should go to the log file
ERROR:root:Got exception on main handler
Traceback (most recent call last):
  File "/tmp/teste.py", line 9, in <module>
    run_my_stuff()
NameError: name 'run_my_stuff' is not defined

回答 1

使用exc_info选项可能更好,但仍会显示警告或错误标题:

try:
    # coode in here
except Exception as e:
    logging.error(e, exc_info=True)

Use exc_info options may be better, remains warning or error title:

try:
    # coode in here
except Exception as e:
    logging.error(e, exc_info=True)

回答 2

最近,我的工作要求我记录应用程序中的所有回溯/异常。我尝试了其他人在网上发布的许多技术,例如上面的一种,但选择了另一种方法。覆盖traceback.print_exception

我在http://www.bbarrows.com/上写了一篇文章,它很容易阅读,但我也会将其粘贴在这里。

当任务是记录我们的软件在野外可能遇到的所有异常时,我尝试了多种不同的技术来记录我们的python异常回溯。起初,我认为python系统异常挂钩sys.excepthook将是插入日志记录代码的理想场所。我正在尝试类似的东西:

import traceback
import StringIO
import logging
import os, sys

def my_excepthook(excType, excValue, traceback, logger=logger):
    logger.error("Logging an uncaught exception",
                 exc_info=(excType, excValue, traceback))

sys.excepthook = my_excepthook  

这适用于主线程,但是我很快发现我的sys.excepthook在进程启动的任何新线程中都不存在。这是一个很大的问题,因为大多数事情都发生在该项目的线程中。

仔细阅读并阅读大量文档后,我发现最有用的信息来自Python问题跟踪器。

线程的第一篇文章显示了一个sys.excepthook跨线程不持久的工作示例(如下所示)。显然,这是预期的行为。

import sys, threading

def log_exception(*args):
    print 'got exception %s' % (args,)
sys.excepthook = log_exception

def foo():
    a = 1 / 0

threading.Thread(target=foo).start()

该Python Issue线程上的消息确实导致了2条建议的hack。可以将子类Thread并将run方法包装在我们自己的tryexcept块中以捕获和记录异常,或者将Monkey补丁threading.Thread.run以您自己的tryexcept块中的方式运行,以阻止和记录异常。

Thread我看来,第一种子类化方法在您的代码中似乎不太优雅,因为您必须在Thread想要拥有日志记录线程的任何地方导入和使用自定义类。最终这很麻烦,因为我不得不搜索我们的整个代码库,并Threads用此自定义替换所有常规代码Thread。但是,很清楚这Thread是在做什么,如果自定义日志代码出了问题,则对于某人来说,诊断和调试将更容易。定制日志记录线程可能如下所示:

class TracebackLoggingThread(threading.Thread):
    def run(self):
        try:
            super(TracebackLoggingThread, self).run()
        except (KeyboardInterrupt, SystemExit):
            raise
        except Exception, e:
            logger = logging.getLogger('')
            logger.exception("Logging an uncaught exception")

Monkey修补的第二种方法threading.Thread.run很好,因为我可以立即运行一次,__main__并在所有异常中检测日志记录代码。Monkey修补可能会令人讨厌调试,因为它会更改某些功能的预期功能。来自Python问题跟踪器的建议补丁为:

def installThreadExcepthook():
    """
    Workaround for sys.excepthook thread bug
    From
http://spyced.blogspot.com/2007/06/workaround-for-sysexcepthook-bug.html

(https://sourceforge.net/tracker/?func=detail&atid=105470&aid=1230540&group_id=5470).
    Call once from __main__ before creating any threads.
    If using psyco, call psyco.cannotcompile(threading.Thread.run)
    since this replaces a new-style class method.
    """
    init_old = threading.Thread.__init__
    def init(self, *args, **kwargs):
        init_old(self, *args, **kwargs)
        run_old = self.run
        def run_with_except_hook(*args, **kw):
            try:
                run_old(*args, **kw)
            except (KeyboardInterrupt, SystemExit):
                raise
            except:
                sys.excepthook(*sys.exc_info())
        self.run = run_with_except_hook
    threading.Thread.__init__ = init

直到我开始测试异常日志记录时,我才意识到自己在处理所有错误。

为了测试,我放置了一个

raise Exception("Test")

在我的代码中的某处。但是,包装一个称为该方法的方法是一种尝试,除了打印出回溯并吞没了异常的块。这非常令人沮丧,因为我看到回溯将打印输出到STDOUT,但是没有被记录下来。然后我决定,记录回溯的一种更简单的方法就是Monkey补丁所有Python代码用来打印回溯的方法traceback.print_exception。我最终得到了类似于以下内容的东西:

def add_custom_print_exception():
    old_print_exception = traceback.print_exception
    def custom_print_exception(etype, value, tb, limit=None, file=None):
        tb_output = StringIO.StringIO()
        traceback.print_tb(tb, limit, tb_output)
        logger = logging.getLogger('customLogger')
        logger.error(tb_output.getvalue())
        tb_output.close()
        old_print_exception(etype, value, tb, limit=None, file=None)
    traceback.print_exception = custom_print_exception

此代码将回溯写到字符串缓冲区,并将其记录到日志记录错误中。我有一个自定义日志记录处理程序,它设置了’customLogger’记录器,该记录器将使用ERROR级日志并将其发送回家进行分析。

My job recently tasked me with logging all the tracebacks/exceptions from our application. I tried numerous techniques that others had posted online such as the one above but settled on a different approach. Overriding traceback.print_exception.

I have a write up at http://www.bbarrows.com/ That would be much easier to read but Ill paste it in here as well.

When tasked with logging all the exceptions that our software might encounter in the wild I tried a number of different techniques to log our python exception tracebacks. At first I thought that the python system exception hook, sys.excepthook would be the perfect place to insert the logging code. I was trying something similar to:

import traceback
import StringIO
import logging
import os, sys

def my_excepthook(excType, excValue, traceback, logger=logger):
    logger.error("Logging an uncaught exception",
                 exc_info=(excType, excValue, traceback))

sys.excepthook = my_excepthook  

This worked for the main thread but I soon found that the my sys.excepthook would not exist across any new threads my process started. This is a huge issue because most everything happens in threads in this project.

After googling and reading plenty of documentation the most helpful information I found was from the Python Issue tracker.

The first post on the thread shows a working example of the sys.excepthook NOT persisting across threads (as shown below). Apparently this is expected behavior.

import sys, threading

def log_exception(*args):
    print 'got exception %s' % (args,)
sys.excepthook = log_exception

def foo():
    a = 1 / 0

threading.Thread(target=foo).start()

The messages on this Python Issue thread really result in 2 suggested hacks. Either subclass Thread and wrap the run method in our own try except block in order to catch and log exceptions or monkey patch threading.Thread.run to run in your own try except block and log the exceptions.

The first method of subclassing Thread seems to me to be less elegant in your code as you would have to import and use your custom Thread class EVERYWHERE you wanted to have a logging thread. This ended up being a hassle because I had to search our entire code base and replace all normal Threads with this custom Thread. However, it was clear as to what this Thread was doing and would be easier for someone to diagnose and debug if something went wrong with the custom logging code. A custome logging thread might look like this:

class TracebackLoggingThread(threading.Thread):
    def run(self):
        try:
            super(TracebackLoggingThread, self).run()
        except (KeyboardInterrupt, SystemExit):
            raise
        except Exception, e:
            logger = logging.getLogger('')
            logger.exception("Logging an uncaught exception")

The second method of monkey patching threading.Thread.run is nice because I could just run it once right after __main__ and instrument my logging code in all exceptions. Monkey patching can be annoying to debug though as it changes the expected functionality of something. The suggested patch from the Python Issue tracker was:

def installThreadExcepthook():
    """
    Workaround for sys.excepthook thread bug
    From
http://spyced.blogspot.com/2007/06/workaround-for-sysexcepthook-bug.html

(https://sourceforge.net/tracker/?func=detail&atid=105470&aid=1230540&group_id=5470).
    Call once from __main__ before creating any threads.
    If using psyco, call psyco.cannotcompile(threading.Thread.run)
    since this replaces a new-style class method.
    """
    init_old = threading.Thread.__init__
    def init(self, *args, **kwargs):
        init_old(self, *args, **kwargs)
        run_old = self.run
        def run_with_except_hook(*args, **kw):
            try:
                run_old(*args, **kw)
            except (KeyboardInterrupt, SystemExit):
                raise
            except:
                sys.excepthook(*sys.exc_info())
        self.run = run_with_except_hook
    threading.Thread.__init__ = init

It was not until I started testing my exception logging I realized that I was going about it all wrong.

To test I had placed a

raise Exception("Test")

somewhere in my code. However, wrapping a a method that called this method was a try except block that printed out the traceback and swallowed the exception. This was very frustrating because I saw the traceback bring printed to STDOUT but not being logged. It was I then decided that a much easier method of logging the tracebacks was just to monkey patch the method that all python code uses to print the tracebacks themselves, traceback.print_exception. I ended up with something similar to the following:

def add_custom_print_exception():
    old_print_exception = traceback.print_exception
    def custom_print_exception(etype, value, tb, limit=None, file=None):
        tb_output = StringIO.StringIO()
        traceback.print_tb(tb, limit, tb_output)
        logger = logging.getLogger('customLogger')
        logger.error(tb_output.getvalue())
        tb_output.close()
        old_print_exception(etype, value, tb, limit=None, file=None)
    traceback.print_exception = custom_print_exception

This code writes the traceback to a String Buffer and logs it to logging ERROR. I have a custom logging handler set up the ‘customLogger’ logger which takes the ERROR level logs and send them home for analysis.


回答 3

您可以通过将处理程序分配给来记录主线程上所有未捕获的异常sys.excepthook,也许使用exc_infoPython的记录函数参数

import sys
import logging

logging.basicConfig(filename='/tmp/foobar.log')

def exception_hook(exc_type, exc_value, exc_traceback):
    logging.error(
        "Uncaught exception",
        exc_info=(exc_type, exc_value, exc_traceback)
    )

sys.excepthook = exception_hook

raise Exception('Boom')

如果你的程序使用的线程,然而,然后记下创建的线程使用threading.Thread不会触发sys.excepthook时未捕获的异常在他们里面发生,在指出问题1230540 Python的问题跟踪器。已经有人提出了一些可以解决此限制的技巧,例如Monkey修补Thread.__init__程序self.run用另run一种方法覆盖,该方法将原始文件包装在一个try块中并sys.excepthook从该except块内部进行调用。或者,您可以手动将每个线程的入口点包装在try/ except自己中。

You can log all uncaught exceptions on the main thread by assigning a handler to sys.excepthook, perhaps using the exc_info parameter of Python’s logging functions:

import sys
import logging

logging.basicConfig(filename='/tmp/foobar.log')

def exception_hook(exc_type, exc_value, exc_traceback):
    logging.error(
        "Uncaught exception",
        exc_info=(exc_type, exc_value, exc_traceback)
    )

sys.excepthook = exception_hook

raise Exception('Boom')

If your program uses threads, however, then note that threads created using threading.Thread will not trigger sys.excepthook when an uncaught exception occurs inside them, as noted in Issue 1230540 on Python’s issue tracker. Some hacks have been suggested there to work around this limitation, like monkey-patching Thread.__init__ to overwrite self.run with an alternative run method that wraps the original in a try block and calls sys.excepthook from inside the except block. Alternatively, you could just manually wrap the entry point for each of your threads in try/except yourself.


回答 4

未捕获的异常消息将发送到STDERR,因此,您可以使用用于运行Python脚本的任何Shell将STDERR发送到文件,而不是在Python本身中实现日志记录。在Bash脚本中,您可以使用输出重定向来执行此操作,如BASH指南中所述

例子

将错误附加到文件,其他输出到终端:

./test.py 2>> mylog.log

用交错的STDOUT和STDERR输出覆盖文件:

./test.py &> mylog.log

Uncaught exception messages go to STDERR, so instead of implementing your logging in Python itself you could send STDERR to a file using whatever shell you’re using to run your Python script. In a Bash script, you can do this with output redirection, as described in the BASH guide.

Examples

Append errors to file, other output to the terminal:

./test.py 2>> mylog.log

Overwrite file with interleaved STDOUT and STDERR output:

./test.py &> mylog.log

回答 5

我在寻找什么:

import sys
import traceback

exc_type, exc_value, exc_traceback = sys.exc_info()
traceback_in_var = traceback.format_tb(exc_traceback)

看到:

What I was looking for:

import sys
import traceback

exc_type, exc_value, exc_traceback = sys.exc_info()
traceback_in_var = traceback.format_tb(exc_traceback)

See:


回答 6

您可以使用记录器在任何级别(调试,信息等)获取回溯。请注意,使用logging.exception,级别为ERROR。

# test_app.py
import sys
import logging

logging.basicConfig(level="DEBUG")

def do_something():
    raise ValueError(":(")

try:
    do_something()
except Exception:
    logging.debug("Something went wrong", exc_info=sys.exc_info())
DEBUG:root:Something went wrong
Traceback (most recent call last):
  File "test_app.py", line 10, in <module>
    do_something()
  File "test_app.py", line 7, in do_something
    raise ValueError(":(")
ValueError: :(

编辑:

这也可以工作(使用python 3.6)

logging.debug("Something went wrong", exc_info=True)

You can get the traceback using a logger, at any level (DEBUG, INFO, …). Note that using logging.exception, the level is ERROR.

# test_app.py
import sys
import logging

logging.basicConfig(level="DEBUG")

def do_something():
    raise ValueError(":(")

try:
    do_something()
except Exception:
    logging.debug("Something went wrong", exc_info=sys.exc_info())
DEBUG:root:Something went wrong
Traceback (most recent call last):
  File "test_app.py", line 10, in <module>
    do_something()
  File "test_app.py", line 7, in do_something
    raise ValueError(":(")
ValueError: :(

EDIT:

This works too (using python 3.6)

logging.debug("Something went wrong", exc_info=True)

回答 7

这是使用sys.excepthook的版本

import traceback
import sys

logger = logging.getLogger()

def handle_excepthook(type, message, stack):
     logger.error(f'An unhandled exception occured: {message}. Traceback: {traceback.format_tb(stack)}')

sys.excepthook = handle_excepthook

Here is a version that uses sys.excepthook

import traceback
import sys

logger = logging.getLogger()

def handle_excepthook(type, message, stack):
     logger.error(f'An unhandled exception occured: {message}. Traceback: {traceback.format_tb(stack)}')

sys.excepthook = handle_excepthook

回答 8

也许不那么时尚,但是更容易:

#!/bin/bash
log="/var/log/yourlog"
/path/to/your/script.py 2>&1 | (while read; do echo "$REPLY" >> $log; done)

maybe not as stylish, but easier:

#!/bin/bash
log="/var/log/yourlog"
/path/to/your/script.py 2>&1 | (while read; do echo "$REPLY" >> $log; done)

回答 9

这是取自python 2.6文档的一个简单示例:

import logging
LOG_FILENAME = '/tmp/logging_example.out'
logging.basicConfig(filename=LOG_FILENAME,level=logging.DEBUG,)

logging.debug('This message should go to the log file')

Heres a simple example taken from the python 2.6 documentation:

import logging
LOG_FILENAME = '/tmp/logging_example.out'
logging.basicConfig(filename=LOG_FILENAME,level=logging.DEBUG,)

logging.debug('This message should go to the log file')

如何自定义Python日志记录的时间格式?

问题:如何自定义Python日志记录的时间格式?

我是Python日志记录包的新手,并计划将其用于我的项目。我想根据自己的喜好定制时间格式。这是我从教程中复制的简短代码:

import logging

# create logger
logger = logging.getLogger("logging_tryout2")
logger.setLevel(logging.DEBUG)

# create console handler and set level to debug
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)

# create formatter
formatter = logging.Formatter("%(asctime)s;%(levelname)s;%(message)s")

# add formatter to ch
ch.setFormatter(formatter)

# add ch to logger
logger.addHandler(ch)

# "application" code
logger.debug("debug message")
logger.info("info message")
logger.warn("warn message")
logger.error("error message")
logger.critical("critical message")

这是输出:

2010-07-10 10:46:28,811;DEBUG;debug message
2010-07-10 10:46:28,812;INFO;info message
2010-07-10 10:46:28,812;WARNING;warn message
2010-07-10 10:46:28,812;ERROR;error message
2010-07-10 10:46:28,813;CRITICAL;critical message

我想将时间格式缩短为:“ 2010-07-10 10:46:28”,删除毫秒后缀。我看着Formatter.formatTime,但是很困惑。感谢您为实现我的目标所提供的帮助。谢谢。

I am new to Python’s logging package and plan to use it for my project. I would like to customize the time format to my taste. Here is a short code I copied from a tutorial:

import logging

# create logger
logger = logging.getLogger("logging_tryout2")
logger.setLevel(logging.DEBUG)

# create console handler and set level to debug
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)

# create formatter
formatter = logging.Formatter("%(asctime)s;%(levelname)s;%(message)s")

# add formatter to ch
ch.setFormatter(formatter)

# add ch to logger
logger.addHandler(ch)

# "application" code
logger.debug("debug message")
logger.info("info message")
logger.warn("warn message")
logger.error("error message")
logger.critical("critical message")

And here is the output:

2010-07-10 10:46:28,811;DEBUG;debug message
2010-07-10 10:46:28,812;INFO;info message
2010-07-10 10:46:28,812;WARNING;warn message
2010-07-10 10:46:28,812;ERROR;error message
2010-07-10 10:46:28,813;CRITICAL;critical message

I would like to shorten the time format to just: ‘2010-07-10 10:46:28‘, dropping the mili-second suffix. I looked at the Formatter.formatTime, but confused. I appreciate your help to achieve my goal. Thank you.


回答 0

从有关Formatter类的官方文档中

构造函数采用两个可选参数:消息格式字符串和日期格式字符串。

所以改变

# create formatter
formatter = logging.Formatter("%(asctime)s;%(levelname)s;%(message)s")

# create formatter
formatter = logging.Formatter("%(asctime)s;%(levelname)s;%(message)s",
                              "%Y-%m-%d %H:%M:%S")

From the official documentation regarding the Formatter class:

The constructor takes two optional arguments: a message format string and a date format string.

So change

# create formatter
formatter = logging.Formatter("%(asctime)s;%(levelname)s;%(message)s")

to

# create formatter
formatter = logging.Formatter("%(asctime)s;%(levelname)s;%(message)s",
                              "%Y-%m-%d %H:%M:%S")

回答 1

使用logging.basicConfig,以下示例对我有用:

logging.basicConfig(
    filename='HISTORYlistener.log',
    level=logging.DEBUG,
    format='%(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S',
)

这样一来,您就可以全部格式化和配置文件。生成的日志记录如下所示:

2014-05-26 12:22:52.376 CRITICAL historylistener - main: History log failed to start

Using logging.basicConfig, the following example works for me:

logging.basicConfig(
    filename='HISTORYlistener.log',
    level=logging.DEBUG,
    format='%(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S',
)

This allows you to format & config all in one line. A resulting log record looks as follows:

2014-05-26 12:22:52.376 CRITICAL historylistener - main: History log failed to start

回答 2

如果将logging.config.fileConfig与配置文件一起使用,请使用以下内容:

[formatter_simpleFormatter]
format=%(asctime)s - %(name)s - %(levelname)s - %(message)s
datefmt=%Y-%m-%d %H:%M:%S

if using logging.config.fileConfig with a configuration file use something like:

[formatter_simpleFormatter]
format=%(asctime)s - %(name)s - %(levelname)s - %(message)s
datefmt=%Y-%m-%d %H:%M:%S

回答 3

为了增加其他答案,这是Python文档中的变量列表

Directive   Meaning Notes

%a  Locales abbreviated weekday name.   
%A  Locales full weekday name.  
%b  Locales abbreviated month name.     
%B  Locales full month name.    
%c  Locales appropriate date and time representation.   
%d  Day of the month as a decimal number [01,31].    
%H  Hour (24-hour clock) as a decimal number [00,23].    
%I  Hour (12-hour clock) as a decimal number [01,12].    
%j  Day of the year as a decimal number [001,366].   
%m  Month as a decimal number [01,12].   
%M  Minute as a decimal number [00,59].  
%p  Locales equivalent of either AM or PM. (1)
%S  Second as a decimal number [00,61]. (2)
%U  Week number of the year (Sunday as the first day of the week) as a decimal number [00,53]. All days in a new year preceding the first Sunday are considered to be in week 0.    (3)
%w  Weekday as a decimal number [0(Sunday),6].   
%W  Week number of the year (Monday as the first day of the week) as a decimal number [00,53]. All days in a new year preceding the first Monday are considered to be in week 0.    (3)
%x  Locales appropriate date representation.    
%X  Locales appropriate time representation.    
%y  Year without century as a decimal number [00,99].    
%Y  Year with century as a decimal number.   
%z  Time zone offset indicating a positive or negative time difference from UTC/GMT of the form +HHMM or -HHMM, where H represents decimal hour digits and M represents decimal minute digits [-23:59, +23:59].  
%Z  Time zone name (no characters if no time zone exists).   
%%  A literal '%' character.     

To add to the other answers, here are the variable list from Python Documentation.

Directive   Meaning Notes

%a  Locale’s abbreviated weekday name.   
%A  Locale’s full weekday name.  
%b  Locale’s abbreviated month name.     
%B  Locale’s full month name.    
%c  Locale’s appropriate date and time representation.   
%d  Day of the month as a decimal number [01,31].    
%H  Hour (24-hour clock) as a decimal number [00,23].    
%I  Hour (12-hour clock) as a decimal number [01,12].    
%j  Day of the year as a decimal number [001,366].   
%m  Month as a decimal number [01,12].   
%M  Minute as a decimal number [00,59].  
%p  Locale’s equivalent of either AM or PM. (1)
%S  Second as a decimal number [00,61]. (2)
%U  Week number of the year (Sunday as the first day of the week) as a decimal number [00,53]. All days in a new year preceding the first Sunday are considered to be in week 0.    (3)
%w  Weekday as a decimal number [0(Sunday),6].   
%W  Week number of the year (Monday as the first day of the week) as a decimal number [00,53]. All days in a new year preceding the first Monday are considered to be in week 0.    (3)
%x  Locale’s appropriate date representation.    
%X  Locale’s appropriate time representation.    
%y  Year without century as a decimal number [00,99].    
%Y  Year with century as a decimal number.   
%z  Time zone offset indicating a positive or negative time difference from UTC/GMT of the form +HHMM or -HHMM, where H represents decimal hour digits and M represents decimal minute digits [-23:59, +23:59].  
%Z  Time zone name (no characters if no time zone exists).   
%%  A literal '%' character.     

子流程命令的实时输出

问题:子流程命令的实时输出

我正在使用python脚本作为流体力学代码的驱动程序。是时候运行模拟了,我subprocess.Popen用来运行代码,将stdout和stderr的输出收集到subprocess.PIPE—中,然后我可以打印(并保存到日志文件中)输出信息,并检查是否有错误。问题是,我不知道代码是如何进行的。如果直接从命令行运行它,它会向我输出有关它的迭代时间,时间,下一时间步长等的信息。

有没有办法既存储输出(用于日志记录和错误检查),又产生实时流输出?

我的代码的相关部分:

ret_val = subprocess.Popen( run_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True )
output, errors = ret_val.communicate()
log_file.write(output)
print output
if( ret_val.returncode ):
    print "RUN failed\n\n%s\n\n" % (errors)
    success = False

if( errors ): log_file.write("\n\n%s\n\n" % errors)

最初,我是run_command通过管道传递数据,tee以便将副本直接发送到日志文件,并且流仍直接输出到终端-但是那样,我无法存储任何错误(据我所知)。


编辑:

临时解决方案:

ret_val = subprocess.Popen( run_command, stdout=log_file, stderr=subprocess.PIPE, shell=True )
while not ret_val.poll():
    log_file.flush()

然后,在另一个终端中,运行tail -f log.txt(st log_file = 'log.txt')。

I’m using a python script as a driver for a hydrodynamics code. When it comes time to run the simulation, I use subprocess.Popen to run the code, collect the output from stdout and stderr into a subprocess.PIPE — then I can print (and save to a log-file) the output information, and check for any errors. The problem is, I have no idea how the code is progressing. If I run it directly from the command line, it gives me output about what iteration its at, what time, what the next time-step is, etc.

Is there a way to both store the output (for logging and error checking), and also produce a live-streaming output?

The relevant section of my code:

ret_val = subprocess.Popen( run_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True )
output, errors = ret_val.communicate()
log_file.write(output)
print output
if( ret_val.returncode ):
    print "RUN failed\n\n%s\n\n" % (errors)
    success = False

if( errors ): log_file.write("\n\n%s\n\n" % errors)

Originally I was piping the run_command through tee so that a copy went directly to the log-file, and the stream still output directly to the terminal — but that way I can’t store any errors (to my knowlege).


Edit:

Temporary solution:

ret_val = subprocess.Popen( run_command, stdout=log_file, stderr=subprocess.PIPE, shell=True )
while not ret_val.poll():
    log_file.flush()

then, in another terminal, run tail -f log.txt (s.t. log_file = 'log.txt').


回答 0

您可以通过两种方法执行此操作,或者通过从readreadline函数创建一个迭代器,然后执行:

import subprocess
import sys
with open('test.log', 'w') as f:  # replace 'w' with 'wb' for Python 3
    process = subprocess.Popen(your_command, stdout=subprocess.PIPE)
    for c in iter(lambda: process.stdout.read(1), ''):  # replace '' with b'' for Python 3
        sys.stdout.write(c)
        f.write(c)

要么

import subprocess
import sys
with open('test.log', 'w') as f:  # replace 'w' with 'wb' for Python 3
    process = subprocess.Popen(your_command, stdout=subprocess.PIPE)
    for line in iter(process.stdout.readline, ''):  # replace '' with b'' for Python 3
        sys.stdout.write(line)
        f.write(line)

或者,您可以创建readerwriter文件。将传递writerPopen并从中读取reader

import io
import time
import subprocess
import sys

filename = 'test.log'
with io.open(filename, 'wb') as writer, io.open(filename, 'rb', 1) as reader:
    process = subprocess.Popen(command, stdout=writer)
    while process.poll() is None:
        sys.stdout.write(reader.read())
        time.sleep(0.5)
    # Read the remaining
    sys.stdout.write(reader.read())

这样,您就可以将数据写入 test.log在和标准输出中。

文件方法的唯一优点是您的代码不会阻塞。因此,您可以在此期间做任何您想做的事情,并reader以不阻塞的方式随时阅读。当使用PIPEreadreadline功能将阻塞,直到任一个字符被写入到管或线被分别写入到管道。

You have two ways of doing this, either by creating an iterator from the read or readline functions and do:

import subprocess
import sys
with open('test.log', 'w') as f:  # replace 'w' with 'wb' for Python 3
    process = subprocess.Popen(your_command, stdout=subprocess.PIPE)
    for c in iter(lambda: process.stdout.read(1), ''):  # replace '' with b'' for Python 3
        sys.stdout.write(c)
        f.write(c)

or

import subprocess
import sys
with open('test.log', 'w') as f:  # replace 'w' with 'wb' for Python 3
    process = subprocess.Popen(your_command, stdout=subprocess.PIPE)
    for line in iter(process.stdout.readline, ''):  # replace '' with b'' for Python 3
        sys.stdout.write(line)
        f.write(line)

Or you can create a reader and a writer file. Pass the writer to the Popen and read from the reader

import io
import time
import subprocess
import sys

filename = 'test.log'
with io.open(filename, 'wb') as writer, io.open(filename, 'rb', 1) as reader:
    process = subprocess.Popen(command, stdout=writer)
    while process.poll() is None:
        sys.stdout.write(reader.read())
        time.sleep(0.5)
    # Read the remaining
    sys.stdout.write(reader.read())

This way you will have the data written in the test.log as well as on the standard output.

The only advantage of the file approach is that your code doesn’t block. So you can do whatever you want in the meantime and read whenever you want from the reader in a non-blocking way. When you use PIPE, read and readline functions will block until either one character is written to the pipe or a line is written to the pipe respectively.


回答 1

执行摘要(或“ tl; dr”版本):最多有一个很容易subprocess.PIPE,否则很难。

现在可能是时候解释一下它是如何subprocess.Popen工作的了。

(注意:这是针对Python 2.x的,尽管3.x相似;并且我对Windows变体很模糊。我对POSIX的了解要好得多。)

Popen功能需要同时处理零到三个I / O流。分别以stdinstdout和表示stderr

您可以提供:

  • None,表示您不想重定向流。它将照常继承这些。请注意,至少在POSIX系统上,这并不意味着它将使用Python的sys.stdout,而仅使用Python的实际标准输出。参见演示示例。
  • 一个int值。这是一个“原始”文件描述符(至少在POSIX中)。(附带说明:PIPESTDOUT实际上int是内部的,但是是“不可能的”描述符-1和-2。)
  • 流-实际上是具有fileno方法的任何对象。 Popen将使用来找到该流的描述符stream.fileno(),然后按照int值进行操作。
  • subprocess.PIPE,指示Python应该创建一个管道。
  • subprocess.STDOUTstderr仅适用):告诉Python使用与相同的描述符stdout。仅当您提供的(非None)值时才有意义stdout,即使如此,也只有在设置时才需要stdout=subprocess.PIPE。(否则,您可以只提供您提供的相同参数stdout,例如Popen(..., stdout=stream, stderr=stream)。)

最简单的情况(无管道)

如果不进行任何重定向(将所有三个都保留为默认None值或提供明确的None),Pipe则非常简单。它只需要剥离子流程并使其运行。或者,如果您重定向到一个非PIPE-an int或流是fileno()-它仍然很容易,因为OS做所有的工作。Python只需要剥离子进程,即可将其stdin,stdout和/或stderr连接到提供的文件描述符。

仍然很容易的情况:一根烟斗

如果仅重定向一个流,那么Pipe事情仍然很简单。让我们一次选择一个流并观看。

假设你想提供一些stdin,但让stdoutstderr去未重定向,或去文件描述符。作为父进程,您的Python程序只需要用于通过write()管道发送数据。您可以自己执行此操作,例如:

proc = subprocess.Popen(cmd, stdin=subprocess.PIPE)
proc.stdin.write('here, have some data\n') # etc

或者您可以将stdin数据传递到proc.communicate(),然后执行stdin.write上面所示的操作。没有输出返回,因此communicate()只有一项实际工作:它还会为您关闭管道。(如果不调用proc.communicate(),则必须调用proc.stdin.close()以关闭管道,以便子进程知道不再有数据通过。)

假设你想捕捉stdout,但休假stdinstderr孤独。同样,这很容易:只需调用proc.stdout.read()(或等效命令),直到没有更多输出为止。由于proc.stdout()是普通的Python I / O流,因此可以在其上使用所有普通的构造,例如:

for line in proc.stdout:

或者,您也可以使用proc.communicate(),它可以read()为您轻松完成。

如果只想捕获stderr,则它的功能与相同stdout

在事情变得艰难之前,还有另外一个技巧。假设您要捕获stdout,并且还捕获stderr与stdout在同一管道上:

proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)

在这种情况下,subprocess“作弊”!好吧,它必须这样做,所以它并不是真正的作弊:它使用其stdout和stderr引导到(单个)管道描述符中的子进程来启动子进程,该子进程描述符反馈给其父进程(Python)。在父端,只有一个管道描述符用于读取输出。所有“ stderr”输出都显示在中proc.stdout,如果调用proc.communicate(),stderr结果(元组中的第二个值)将是None,而不是字符串。

困难情况:两个或更多管道

当您要使用至少两个管道时,所有问题都会出现。实际上,subprocess代码本身具有以下功能:

def communicate(self, input=None):
    ...
    # Optimization: If we are only using one pipe, or no pipe at
    # all, using select() or threads is unnecessary.
    if [self.stdin, self.stdout, self.stderr].count(None) >= 2:

但是,可惜,在这里,我们至少制作了两个(也许三个)不同的管道,因此count(None)返回值为1或0。我们必须用困难的方式做事。

在Windows上,这用于threading.Thread累积self.stdout和的结果self.stderr,并让父线程传递self.stdin输入数据(然后关闭管道)。

在POSIX上,poll如果可用,则使用,否则select,使用累加输出并传递标准输入。所有这些都在(单个)父进程/线程中运行。

这里需要线程或轮询/选择以避免死锁。例如,假设我们已将所有三个流重定向到三个单独的管道。进一步假设在写入过程被挂起之前,等待读取过程从另一端“清除”管道之前,可以在管道中填充多少数据有一个很小的限制。为了说明起见,我们将这个较小的限制设置为一个字节。(实际上,这是工作原理,但限制远大于一个字节。)

如果父进程(Python)尝试写入多个字节(例如'go\n'到)proc.stdin,则第一个字节进入,然后第二个字节导致Python进程挂起,等待子进程读取第一个字节,从而清空管道。

同时,假设子流程决定打印一个友好的“ Hello!Do n’t Panic!”。问候。在H进入它的标准输出管道,但e导致其暂停,等待其家长阅读H,排空stdout管道。

现在我们陷入困境:Python进程处于睡眠状态,等待说完“ go”,而子进程也处于睡眠状态,等待说完“ Hello!Don Panic!”。

subprocess.Popen代码避免了线程化或选择/轮询的问题。当字节可以通过管道时,它们就会通过。如果不能,则只有一个线程(而不是整个进程)必须进入睡眠状态;或者,在选择/轮询的情况下,Python进程同时等待“可以写入”或“可用数据”,然后写入该进程的stdin仅在有空间时,并且仅在数据准备就绪时读取其stdout和/或stderr。一旦发送了所有标准输入数据(如果有的话)并且所有标准输出和/或标准错误数据都已存储,则该proc.communicate()代码(实际上_communicate是处理多毛案件的地方)返回。

如果你想同时读取stdoutstderr在两个不同的管道(无论任何的stdin重定向),则需要避免死锁了。此处的死锁情况有所不同-发生在子进程stderr从中提取数据时写入了很长时间stdout,反之亦然,但是这种情况仍然存在。


演示

我答应演示未经重定向的python subprocess写入底层标准输出,而不是sys.stdout。因此,这是一些代码:

from cStringIO import StringIO
import os
import subprocess
import sys

def show1():
    print 'start show1'
    save = sys.stdout
    sys.stdout = StringIO()
    print 'sys.stdout being buffered'
    proc = subprocess.Popen(['echo', 'hello'])
    proc.wait()
    in_stdout = sys.stdout.getvalue()
    sys.stdout = save
    print 'in buffer:', in_stdout

def show2():
    print 'start show2'
    save = sys.stdout
    sys.stdout = open(os.devnull, 'w')
    print 'after redirect sys.stdout'
    proc = subprocess.Popen(['echo', 'hello'])
    proc.wait()
    sys.stdout = save

show1()
show2()

运行时:

$ python out.py
start show1
hello
in buffer: sys.stdout being buffered

start show2
hello

请注意,如果添加stdout=sys.stdout,第一个例程将失败,因为StringIO对象没有filenohello如果已添加,第二个将省略,stdout=sys.stdout因为它sys.stdout已被重定向到os.devnull

(如果重定向Python的file-descriptor-1,则子进程遵循该重定向。该open(os.devnull, 'w')调用将产生一个fileno()大于2 的流。)

Executive Summary (or “tl;dr” version): it’s easy when there’s at most one subprocess.PIPE, otherwise it’s hard.

It may be time to explain a bit about how subprocess.Popen does its thing.

(Caveat: this is for Python 2.x, although 3.x is similar; and I’m quite fuzzy on the Windows variant. I understand the POSIX stuff much better.)

The Popen function needs to deal with zero-to-three I/O streams, somewhat simultaneously. These are denoted stdin, stdout, and stderr as usual.

You can provide:

  • None, indicating that you don’t want to redirect the stream. It will inherit these as usual instead. Note that on POSIX systems, at least, this does not mean it will use Python’s sys.stdout, just Python’s actual stdout; see demo at end.
  • An int value. This is a “raw” file descriptor (in POSIX at least). (Side note: PIPE and STDOUT are actually ints internally, but are “impossible” descriptors, -1 and -2.)
  • A stream—really, any object with a fileno method. Popen will find the descriptor for that stream, using stream.fileno(), and then proceed as for an int value.
  • subprocess.PIPE, indicating that Python should create a pipe.
  • subprocess.STDOUT (for stderr only): tell Python to use the same descriptor as for stdout. This only makes sense if you provided a (non-None) value for stdout, and even then, it is only needed if you set stdout=subprocess.PIPE. (Otherwise you can just provide the same argument you provided for stdout, e.g., Popen(..., stdout=stream, stderr=stream).)

The easiest cases (no pipes)

If you redirect nothing (leave all three as the default None value or supply explicit None), Pipe has it quite easy. It just needs to spin off the subprocess and let it run. Or, if you redirect to a non-PIPE—an int or a stream’s fileno()—it’s still easy, as the OS does all the work. Python just needs to spin off the subprocess, connecting its stdin, stdout, and/or stderr to the provided file descriptors.

The still-easy case: one pipe

If you redirect only one stream, Pipe still has things pretty easy. Let’s pick one stream at a time and watch.

Suppose you want to supply some stdin, but let stdout and stderr go un-redirected, or go to a file descriptor. As the parent process, your Python program simply needs to use write() to send data down the pipe. You can do this yourself, e.g.:

proc = subprocess.Popen(cmd, stdin=subprocess.PIPE)
proc.stdin.write('here, have some data\n') # etc

or you can pass the stdin data to proc.communicate(), which then does the stdin.write shown above. There is no output coming back so communicate() has only one other real job: it also closes the pipe for you. (If you don’t call proc.communicate() you must call proc.stdin.close() to close the pipe, so that the subprocess knows there is no more data coming through.)

Suppose you want to capture stdout but leave stdin and stderr alone. Again, it’s easy: just call proc.stdout.read() (or equivalent) until there is no more output. Since proc.stdout() is a normal Python I/O stream you can use all the normal constructs on it, like:

for line in proc.stdout:

or, again, you can use proc.communicate(), which simply does the read() for you.

If you want to capture only stderr, it works the same as with stdout.

There’s one more trick before things get hard. Suppose you want to capture stdout, and also capture stderr but on the same pipe as stdout:

proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)

In this case, subprocess “cheats”! Well, it has to do this, so it’s not really cheating: it starts the subprocess with both its stdout and its stderr directed into the (single) pipe-descriptor that feeds back to its parent (Python) process. On the parent side, there’s again only a single pipe-descriptor for reading the output. All the “stderr” output shows up in proc.stdout, and if you call proc.communicate(), the stderr result (second value in the tuple) will be None, not a string.

The hard cases: two or more pipes

The problems all come about when you want to use at least two pipes. In fact, the subprocess code itself has this bit:

def communicate(self, input=None):
    ...
    # Optimization: If we are only using one pipe, or no pipe at
    # all, using select() or threads is unnecessary.
    if [self.stdin, self.stdout, self.stderr].count(None) >= 2:

But, alas, here we’ve made at least two, and maybe three, different pipes, so the count(None) returns either 1 or 0. We must do things the hard way.

On Windows, this uses threading.Thread to accumulate results for self.stdout and self.stderr, and has the parent thread deliver self.stdin input data (and then close the pipe).

On POSIX, this uses poll if available, otherwise select, to accumulate output and deliver stdin input. All this runs in the (single) parent process/thread.

Threads or poll/select are needed here to avoid deadlock. Suppose, for instance, that we’ve redirected all three streams to three separate pipes. Suppose further that there’s a small limit on how much data can be stuffed into to a pipe before the writing process is suspended, waiting for the reading process to “clean out” the pipe from the other end. Let’s set that small limit to a single byte, just for illustration. (This is in fact how things work, except that the limit is much bigger than one byte.)

If the parent (Python) process tries to write several bytes—say, 'go\n'to proc.stdin, the first byte goes in and then the second causes the Python process to suspend, waiting for the subprocess to read the first byte, emptying the pipe.

Meanwhile, suppose the subprocess decides to print a friendly “Hello! Don’t Panic!” greeting. The H goes into its stdout pipe, but the e causes it to suspend, waiting for its parent to read that H, emptying the stdout pipe.

Now we’re stuck: the Python process is asleep, waiting to finish saying “go”, and the subprocess is also asleep, waiting to finish saying “Hello! Don’t Panic!”.

The subprocess.Popen code avoids this problem with threading-or-select/poll. When bytes can go over the pipes, they go. When they can’t, only a thread (not the whole process) has to sleep—or, in the case of select/poll, the Python process waits simultaneously for “can write” or “data available”, writes to the process’s stdin only when there is room, and reads its stdout and/or stderr only when data are ready. The proc.communicate() code (actually _communicate where the hairy cases are handled) returns once all stdin data (if any) have been sent and all stdout and/or stderr data have been accumulated.

If you want to read both stdout and stderr on two different pipes (regardless of any stdin redirection), you will need to avoid deadlock too. The deadlock scenario here is different—it occurs when the subprocess writes something long to stderr while you’re pulling data from stdout, or vice versa—but it’s still there.


The Demo

I promised to demonstrate that, un-redirected, Python subprocesses write to the underlying stdout, not sys.stdout. So, here is some code:

from cStringIO import StringIO
import os
import subprocess
import sys

def show1():
    print 'start show1'
    save = sys.stdout
    sys.stdout = StringIO()
    print 'sys.stdout being buffered'
    proc = subprocess.Popen(['echo', 'hello'])
    proc.wait()
    in_stdout = sys.stdout.getvalue()
    sys.stdout = save
    print 'in buffer:', in_stdout

def show2():
    print 'start show2'
    save = sys.stdout
    sys.stdout = open(os.devnull, 'w')
    print 'after redirect sys.stdout'
    proc = subprocess.Popen(['echo', 'hello'])
    proc.wait()
    sys.stdout = save

show1()
show2()

When run:

$ python out.py
start show1
hello
in buffer: sys.stdout being buffered

start show2
hello

Note that the first routine will fail if you add stdout=sys.stdout, as a StringIO object has no fileno. The second will omit the hello if you add stdout=sys.stdout since sys.stdout has been redirected to os.devnull.

(If you redirect Python’s file-descriptor-1, the subprocess will follow that redirection. The open(os.devnull, 'w') call produces a stream whose fileno() is greater than 2.)


回答 2

我们还可以使用默认的文件迭代器来读取stdout,而不是使用带有readline()的iter构造。

import subprocess
import sys
process = subprocess.Popen(your_command, stdout=subprocess.PIPE)
for line in process.stdout:
    sys.stdout.write(line)

We can also use the default file iterator for reading stdout instead of using iter construct with readline().

import subprocess
import sys
process = subprocess.Popen(your_command, stdout=subprocess.PIPE)
for line in process.stdout:
    sys.stdout.write(line)

回答 3

如果您可以使用第三方库,则可以使用类似的东西sarge(披露:我是它的维护者)。该库允许无阻塞地访问子流程的输出流-它位于subprocess模块之上。

If you’re able to use third-party libraries, You might be able to use something like sarge (disclosure: I’m its maintainer). This library allows non-blocking access to output streams from subprocesses – it’s layered over the subprocess module.


回答 4

解决方案1:实时并发记录stdoutstderr

一个简单的解决方案,可以同时逐行实时地同时将stdout和stderr 记录到日志文件中。

import subprocess as sp
from concurrent.futures import ThreadPoolExecutor


def log_popen_pipe(p, stdfile):

    with open("mylog.txt", "w") as f:

        while p.poll() is None:
            f.write(stdfile.readline())
            f.flush()

        # Write the rest from the buffer
        f.write(stdfile.read())


with sp.Popen(["ls"], stdout=sp.PIPE, stderr=sp.PIPE, text=True) as p:

    with ThreadPoolExecutor(2) as pool:
        r1 = pool.submit(log_popen_pipe, p, p.stdout)
        r2 = pool.submit(log_popen_pipe, p, p.stderr)
        r1.result()
        r2.result()

解决方案2:read_popen_pipes()允许您同时并行访问两个管道(stdout / stderr)的功能

import subprocess as sp
from queue import Queue, Empty
from concurrent.futures import ThreadPoolExecutor


def enqueue_output(file, queue):
    for line in iter(file.readline, ''):
        queue.put(line)
    file.close()


def read_popen_pipes(p):

    with ThreadPoolExecutor(2) as pool:
        q_stdout, q_stderr = Queue(), Queue()

        pool.submit(enqueue_output, p.stdout, q_stdout)
        pool.submit(enqueue_output, p.stderr, q_stderr)

        while True:

            if p.poll() is not None and q_stdout.empty() and q_stderr.empty():
                break

            out_line = err_line = ''

            try:
                out_line = q_stdout.get_nowait()
                err_line = q_stderr.get_nowait()
            except Empty:
                pass

            yield (out_line, err_line)

# The function in use:

with sp.Popen(my_cmd, stdout=sp.PIPE, stderr=sp.PIPE, text=True) as p:

    for out_line, err_line in read_popen_pipes(p):
        print(out_line, end='')
        print(err_line, end='')

    return p.poll()

Solution 1: Log stdout AND stderr concurrently in realtime

A simple solution which logs both stdout AND stderr concurrently, line-by-line in realtime into a log file.

import subprocess as sp
from concurrent.futures import ThreadPoolExecutor


def log_popen_pipe(p, stdfile):

    with open("mylog.txt", "w") as f:

        while p.poll() is None:
            f.write(stdfile.readline())
            f.flush()

        # Write the rest from the buffer
        f.write(stdfile.read())


with sp.Popen(["ls"], stdout=sp.PIPE, stderr=sp.PIPE, text=True) as p:

    with ThreadPoolExecutor(2) as pool:
        r1 = pool.submit(log_popen_pipe, p, p.stdout)
        r2 = pool.submit(log_popen_pipe, p, p.stderr)
        r1.result()
        r2.result()

Solution 2: A function read_popen_pipes() that allows you to iterate over both pipes (stdout/stderr), concurrently in realtime

import subprocess as sp
from queue import Queue, Empty
from concurrent.futures import ThreadPoolExecutor


def enqueue_output(file, queue):
    for line in iter(file.readline, ''):
        queue.put(line)
    file.close()


def read_popen_pipes(p):

    with ThreadPoolExecutor(2) as pool:
        q_stdout, q_stderr = Queue(), Queue()

        pool.submit(enqueue_output, p.stdout, q_stdout)
        pool.submit(enqueue_output, p.stderr, q_stderr)

        while True:

            if p.poll() is not None and q_stdout.empty() and q_stderr.empty():
                break

            out_line = err_line = ''

            try:
                out_line = q_stdout.get_nowait()
                err_line = q_stderr.get_nowait()
            except Empty:
                pass

            yield (out_line, err_line)

# The function in use:

with sp.Popen(["ls"], stdout=sp.PIPE, stderr=sp.PIPE, text=True) as p:

    for out_line, err_line in read_popen_pipes(p):
        print(out_line, end='')
        print(err_line, end='')

    p.poll()


回答 5

一个好的但“重量级”的解决方案是使用Twisted-参见底部。

如果您只愿意接受标准输出,则应该遵循以下原则:

import subprocess
import sys
popenobj = subprocess.Popen(["ls", "-Rl"], stdout=subprocess.PIPE)
while not popenobj.poll():
   stdoutdata = popenobj.stdout.readline()
   if stdoutdata:
      sys.stdout.write(stdoutdata)
   else:
      break
print "Return code", popenobj.returncode

(如果使用read(),它将尝试读取无用的整个“文件”,我们在这里真正可以使用的是读取管道中所有数据的东西)

一个人也可以尝试通过线程来解决这个问题,例如:

import subprocess
import sys
import threading

popenobj = subprocess.Popen("ls", stdout=subprocess.PIPE, shell=True)

def stdoutprocess(o):
   while True:
      stdoutdata = o.stdout.readline()
      if stdoutdata:
         sys.stdout.write(stdoutdata)
      else:
         break

t = threading.Thread(target=stdoutprocess, args=(popenobj,))
t.start()
popenobj.wait()
t.join()
print "Return code", popenobj.returncode

现在,我们可以通过两个线程来添加stderr。

但是请注意,子流程文档不建议直接使用这些文件,建议使用communicate()(主要涉及死锁,我认为这不是上面的问题),解决方案有点笨拙,因此看来子流程模块似乎还不够用工作(另请参见:http : //www.python.org/dev/peps/pep-3145/),我们需要查看其他内容。

一个更复杂的解决方案是使用Twisted,如下所示:https : //twistedmatrix.com/documents/11.1.0/core/howto/process.html

使用Twisted进行此操作的方法是使用reactor.spawnprocess()并提供ProcessProtocol,然后异步处理输出来创建您的流程。Twisted示例Python代码在这里:https : //twistedmatrix.com/documents/11.1.0/core/howto/listings/process/process.py

A good but “heavyweight” solution is to use Twisted – see the bottom.

If you’re willing to live with only stdout something along those lines should work:

import subprocess
import sys
popenobj = subprocess.Popen(["ls", "-Rl"], stdout=subprocess.PIPE)
while not popenobj.poll():
   stdoutdata = popenobj.stdout.readline()
   if stdoutdata:
      sys.stdout.write(stdoutdata)
   else:
      break
print "Return code", popenobj.returncode

(If you use read() it tries to read the entire “file” which isn’t useful, what we really could use here is something that reads all the data that’s in the pipe right now)

One might also try to approach this with threading, e.g.:

import subprocess
import sys
import threading

popenobj = subprocess.Popen("ls", stdout=subprocess.PIPE, shell=True)

def stdoutprocess(o):
   while True:
      stdoutdata = o.stdout.readline()
      if stdoutdata:
         sys.stdout.write(stdoutdata)
      else:
         break

t = threading.Thread(target=stdoutprocess, args=(popenobj,))
t.start()
popenobj.wait()
t.join()
print "Return code", popenobj.returncode

Now we could potentially add stderr as well by having two threads.

Note however the subprocess docs discourage using these files directly and recommends to use communicate() (mostly concerned with deadlocks which I think isn’t an issue above) and the solutions are a little klunky so it really seems like the subprocess module isn’t quite up to the job (also see: http://www.python.org/dev/peps/pep-3145/ ) and we need to look at something else.

A more involved solution is to use Twisted as shown here: https://twistedmatrix.com/documents/11.1.0/core/howto/process.html

The way you do this with Twisted is to create your process using reactor.spawnprocess() and providing a ProcessProtocol that then processes output asynchronously. The Twisted sample Python code is here: https://twistedmatrix.com/documents/11.1.0/core/howto/listings/process/process.py


回答 6

除了所有这些答案之外,一种简单的方法还可以如下:

process = subprocess.Popen(your_command, stdout=subprocess.PIPE)

while process.stdout.readable():
    line = process.stdout.readline()

    if not line:
        break

    print(line.strip())

只要可读流就循环遍历可读流,如果结果为空,则将其停止。

这里的关键是,只要有输出,就readline()返回一行(\n末尾带有),如果确实是末尾,则返回空。

希望这对某人有帮助。

In addition to all these answer, one simple approach could also be as follows:

process = subprocess.Popen(your_command, stdout=subprocess.PIPE)

while process.stdout.readable():
    line = process.stdout.readline()

    if not line:
        break

    print(line.strip())

Loop through the readable stream as long as it’s readable and if it gets an empty result, stop.

The key here is that readline() returns a line (with \n at the end) as long as there’s an output and empty if it’s really at the end.

Hope this helps someone.


回答 7

基于以上所有内容,我建议您对版本进行略微修改(python3):

  • while循环调用readline(建议的iter解决方案似乎对我而言永远受阻-Python 3,Windows 7)
  • 经过结构化处理,因此在轮询返回后,不需要重复处理读数据-None
  • 将stderr传递到stdout,以便读取两个输出
  • 添加了代码以获取cmd的退出值。

码:

import subprocess
proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
                        stderr=subprocess.STDOUT, universal_newlines=True)
while True:
    rd = proc.stdout.readline()
    print(rd, end='')  # and whatever you want to do...
    if not rd:  # EOF
        returncode = proc.poll()
        if returncode is not None:
            break
        time.sleep(0.1)  # cmd closed stdout, but not exited yet

# You may want to check on ReturnCode here

Based on all the above I suggest a slightly modified version (python3):

  • while loop calling readline (The iter solution suggested seemed to block forever for me – Python 3, Windows 7)
  • structered so handling of read data does not need to be duplicated after poll returns not-None
  • stderr piped into stdout so both output outputs are read
  • Added code to get exit value of cmd.

Code:

import subprocess
proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
                        stderr=subprocess.STDOUT, universal_newlines=True)
while True:
    rd = proc.stdout.readline()
    print(rd, end='')  # and whatever you want to do...
    if not rd:  # EOF
        returncode = proc.poll()
        if returncode is not None:
            break
        time.sleep(0.1)  # cmd closed stdout, but not exited yet

# You may want to check on ReturnCode here

回答 8

看起来行缓冲输出将为您工作,在这种情况下,可能适合以下情况。(注意:未经测试。)这只会实时提供子进程的标准输出。如果您想同时拥有stderr和stdout,则必须使用进行更复杂的操作select

proc = subprocess.Popen(run_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
while proc.poll() is None:
    line = proc.stdout.readline()
    print line
    log_file.write(line + '\n')
# Might still be data on stdout at this point.  Grab any
# remainder.
for line in proc.stdout.read().split('\n'):
    print line
    log_file.write(line + '\n')
# Do whatever you want with proc.stderr here...

It looks like line-buffered output will work for you, in which case something like the following might suit. (Caveat: it’s untested.) This will only give the subprocess’s stdout in real time. If you want to have both stderr and stdout in real time, you’ll have to do something more complex with select.

proc = subprocess.Popen(run_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
while proc.poll() is None:
    line = proc.stdout.readline()
    print line
    log_file.write(line + '\n')
# Might still be data on stdout at this point.  Grab any
# remainder.
for line in proc.stdout.read().split('\n'):
    print line
    log_file.write(line + '\n')
# Do whatever you want with proc.stderr here...

回答 9

为什么不stdout直接设置为sys.stdout?而且,如果还需要输出到日志,则可以简单地覆盖f的write方法。

import sys
import subprocess

class SuperFile(open.__class__):

    def write(self, data):
        sys.stdout.write(data)
        super(SuperFile, self).write(data)

f = SuperFile("log.txt","w+")       
process = subprocess.Popen(command, stdout=f, stderr=f)

Why not set stdout directly to sys.stdout? And if you need to output to a log as well, then you can simply override the write method of f.

import sys
import subprocess

class SuperFile(open.__class__):

    def write(self, data):
        sys.stdout.write(data)
        super(SuperFile, self).write(data)

f = SuperFile("log.txt","w+")       
process = subprocess.Popen(command, stdout=f, stderr=f)

回答 10

我尝试过的所有上述解决方案都无法将stderr和stdout输出分开(多个管道),或者在OS管道缓冲区已满时永远阻塞,这在运行命令的命令输出速度太快时会发生(在python上有此警告poll()子流程手册)。我发现的唯一可靠方法是通过select,但这是仅posix的解决方案:

import subprocess
import sys
import os
import select
# returns command exit status, stdout text, stderr text
# rtoutput: show realtime output while running
def run_script(cmd,rtoutput=0):
    p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    poller = select.poll()
    poller.register(p.stdout, select.POLLIN)
    poller.register(p.stderr, select.POLLIN)

    coutput=''
    cerror=''
    fdhup={}
    fdhup[p.stdout.fileno()]=0
    fdhup[p.stderr.fileno()]=0
    while sum(fdhup.values()) < len(fdhup):
        try:
            r = poller.poll(1)
        except select.error, err:
            if err.args[0] != EINTR:
                raise
            r=[]
        for fd, flags in r:
            if flags & (select.POLLIN | select.POLLPRI):
                c = os.read(fd, 1024)
                if rtoutput:
                    sys.stdout.write(c)
                    sys.stdout.flush()
                if fd == p.stderr.fileno():
                    cerror+=c
                else:
                    coutput+=c
            else:
                fdhup[fd]=1
    return p.poll(), coutput.strip(), cerror.strip()

All of the above solutions I tried failed either to separate stderr and stdout output, (multiple pipes) or blocked forever when the OS pipe buffer was full which happens when the command you are running outputs too fast (there is a warning for this on python poll() manual of subprocess). The only reliable way I found was through select, but this is a posix-only solution:

import subprocess
import sys
import os
import select
# returns command exit status, stdout text, stderr text
# rtoutput: show realtime output while running
def run_script(cmd,rtoutput=0):
    p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    poller = select.poll()
    poller.register(p.stdout, select.POLLIN)
    poller.register(p.stderr, select.POLLIN)

    coutput=''
    cerror=''
    fdhup={}
    fdhup[p.stdout.fileno()]=0
    fdhup[p.stderr.fileno()]=0
    while sum(fdhup.values()) < len(fdhup):
        try:
            r = poller.poll(1)
        except select.error, err:
            if err.args[0] != EINTR:
                raise
            r=[]
        for fd, flags in r:
            if flags & (select.POLLIN | select.POLLPRI):
                c = os.read(fd, 1024)
                if rtoutput:
                    sys.stdout.write(c)
                    sys.stdout.flush()
                if fd == p.stderr.fileno():
                    cerror+=c
                else:
                    coutput+=c
            else:
                fdhup[fd]=1
    return p.poll(), coutput.strip(), cerror.strip()

回答 11

与先前的答案类似,但是以下解决方案为我在使用Python3的Windows上提供了一种通用的实时打印和登录方法(get-realtime-output-using-python):

def print_and_log(command, logFile):
    with open(logFile, 'wb') as f:
        command = subprocess.Popen(command, stdout=subprocess.PIPE, shell=True)

        while True:
            output = command.stdout.readline()
            if not output and command.poll() is not None:
                f.close()
                break
            if output:
                f.write(output)
                print(str(output.strip(), 'utf-8'), flush=True)
        return command.poll()

Similar to previous answers but the following solution worked for for me on windows using Python3 to provide a common method to print and log in realtime (getting-realtime-output-using-python):

def print_and_log(command, logFile):
    with open(logFile, 'wb') as f:
        command = subprocess.Popen(command, stdout=subprocess.PIPE, shell=True)

        while True:
            output = command.stdout.readline()
            if not output and command.poll() is not None:
                f.close()
                break
            if output:
                f.write(output)
                print(str(output.strip(), 'utf-8'), flush=True)
        return command.poll()

回答 12

我认为该subprocess.communicate方法有点误导:它实际上填充了您在中指定的stdoutstderrsubprocess.Popen

但是,从subprocess.PIPE您可以提供给subprocess.Popenstdoutstderr参数中读取信息,最终将填满OS管道缓冲区并死锁您的应用程序(特别是如果您有多个必须使用的进程/线程)subprocess)。

我提出的解决方案是为stdoutstderr提供文件-并读取文件的内容,而不是从死锁中读取PIPE。这些文件可以是tempfile.NamedTemporaryFile()-在将它们写入时也可以访问以进行读取subprocess.communicate

以下是示例用法:

        try:
            with ProcessRunner(('python', 'task.py'), env=os.environ.copy(), seconds_to_wait=0.01) as process_runner:
                for out in process_runner:
                    print(out)
        catch ProcessError as e:
            print(e.error_message)
            raise

这是准备使用的源代码与我可以用来解释其作用的注释:

如果您使用的是python 2,请确保首先从pypi 安装最新版本的subprocess32软件包。


import os
import sys
import threading
import time
import tempfile
import logging

if os.name == 'posix' and sys.version_info[0] < 3:
    # Support python 2
    import subprocess32 as subprocess
else:
    # Get latest and greatest from python 3
    import subprocess

logger = logging.getLogger(__name__)


class ProcessError(Exception):
    """Base exception for errors related to running the process"""


class ProcessTimeout(ProcessError):
    """Error that will be raised when the process execution will exceed a timeout"""


class ProcessRunner(object):
    def __init__(self, args, env=None, timeout=None, bufsize=-1, seconds_to_wait=0.25, **kwargs):
        """
        Constructor facade to subprocess.Popen that receives parameters which are more specifically required for the
        Process Runner. This is a class that should be used as a context manager - and that provides an iterator
        for reading captured output from subprocess.communicate in near realtime.

        Example usage:


        try:
            with ProcessRunner(('python', task_file_path), env=os.environ.copy(), seconds_to_wait=0.01) as process_runner:
                for out in process_runner:
                    print(out)
        catch ProcessError as e:
            print(e.error_message)
            raise

        :param args: same as subprocess.Popen
        :param env: same as subprocess.Popen
        :param timeout: same as subprocess.communicate
        :param bufsize: same as subprocess.Popen
        :param seconds_to_wait: time to wait between each readline from the temporary file
        :param kwargs: same as subprocess.Popen
        """
        self._seconds_to_wait = seconds_to_wait
        self._process_has_timed_out = False
        self._timeout = timeout
        self._process_done = False
        self._std_file_handle = tempfile.NamedTemporaryFile()
        self._process = subprocess.Popen(args, env=env, bufsize=bufsize,
                                         stdout=self._std_file_handle, stderr=self._std_file_handle, **kwargs)
        self._thread = threading.Thread(target=self._run_process)
        self._thread.daemon = True

    def __enter__(self):
        self._thread.start()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self._thread.join()
        self._std_file_handle.close()

    def __iter__(self):
        # read all output from stdout file that subprocess.communicate fills
        with open(self._std_file_handle.name, 'r') as stdout:
            # while process is alive, keep reading data
            while not self._process_done:
                out = stdout.readline()
                out_without_trailing_whitespaces = out.rstrip()
                if out_without_trailing_whitespaces:
                    # yield stdout data without trailing \n
                    yield out_without_trailing_whitespaces
                else:
                    # if there is nothing to read, then please wait a tiny little bit
                    time.sleep(self._seconds_to_wait)

            # this is a hack: terraform seems to write to buffer after process has finished
            out = stdout.read()
            if out:
                yield out

        if self._process_has_timed_out:
            raise ProcessTimeout('Process has timed out')

        if self._process.returncode != 0:
            raise ProcessError('Process has failed')

    def _run_process(self):
        try:
            # Start gathering information (stdout and stderr) from the opened process
            self._process.communicate(timeout=self._timeout)
            # Graceful termination of the opened process
            self._process.terminate()
        except subprocess.TimeoutExpired:
            self._process_has_timed_out = True
            # Force termination of the opened process
            self._process.kill()

        self._process_done = True

    @property
    def return_code(self):
        return self._process.returncode


I think that the subprocess.communicate method is a bit misleading: it actually fills the stdout and stderr that you specify in the subprocess.Popen.

Yet, reading from the subprocess.PIPE that you can provide to the subprocess.Popen‘s stdout and stderr parameters will eventually fill up OS pipe buffers and deadlock your app (especially if you’ve multiple processes/threads that must use subprocess).

My proposed solution is to provide the stdout and stderr with files – and read the files’ content instead of reading from the deadlocking PIPE. These files can be tempfile.NamedTemporaryFile() – which can also be accessed for reading while they’re being written into by subprocess.communicate.

Below is a sample usage:

        try:
            with ProcessRunner(('python', 'task.py'), env=os.environ.copy(), seconds_to_wait=0.01) as process_runner:
                for out in process_runner:
                    print(out)
        catch ProcessError as e:
            print(e.error_message)
            raise

And this is the source code which is ready to be used with as many comments as I could provide to explain what it does:

If you’re using python 2, please make sure to first install the latest version of the subprocess32 package from pypi.


import os
import sys
import threading
import time
import tempfile
import logging

if os.name == 'posix' and sys.version_info[0] < 3:
    # Support python 2
    import subprocess32 as subprocess
else:
    # Get latest and greatest from python 3
    import subprocess

logger = logging.getLogger(__name__)


class ProcessError(Exception):
    """Base exception for errors related to running the process"""


class ProcessTimeout(ProcessError):
    """Error that will be raised when the process execution will exceed a timeout"""


class ProcessRunner(object):
    def __init__(self, args, env=None, timeout=None, bufsize=-1, seconds_to_wait=0.25, **kwargs):
        """
        Constructor facade to subprocess.Popen that receives parameters which are more specifically required for the
        Process Runner. This is a class that should be used as a context manager - and that provides an iterator
        for reading captured output from subprocess.communicate in near realtime.

        Example usage:


        try:
            with ProcessRunner(('python', task_file_path), env=os.environ.copy(), seconds_to_wait=0.01) as process_runner:
                for out in process_runner:
                    print(out)
        catch ProcessError as e:
            print(e.error_message)
            raise

        :param args: same as subprocess.Popen
        :param env: same as subprocess.Popen
        :param timeout: same as subprocess.communicate
        :param bufsize: same as subprocess.Popen
        :param seconds_to_wait: time to wait between each readline from the temporary file
        :param kwargs: same as subprocess.Popen
        """
        self._seconds_to_wait = seconds_to_wait
        self._process_has_timed_out = False
        self._timeout = timeout
        self._process_done = False
        self._std_file_handle = tempfile.NamedTemporaryFile()
        self._process = subprocess.Popen(args, env=env, bufsize=bufsize,
                                         stdout=self._std_file_handle, stderr=self._std_file_handle, **kwargs)
        self._thread = threading.Thread(target=self._run_process)
        self._thread.daemon = True

    def __enter__(self):
        self._thread.start()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self._thread.join()
        self._std_file_handle.close()

    def __iter__(self):
        # read all output from stdout file that subprocess.communicate fills
        with open(self._std_file_handle.name, 'r') as stdout:
            # while process is alive, keep reading data
            while not self._process_done:
                out = stdout.readline()
                out_without_trailing_whitespaces = out.rstrip()
                if out_without_trailing_whitespaces:
                    # yield stdout data without trailing \n
                    yield out_without_trailing_whitespaces
                else:
                    # if there is nothing to read, then please wait a tiny little bit
                    time.sleep(self._seconds_to_wait)

            # this is a hack: terraform seems to write to buffer after process has finished
            out = stdout.read()
            if out:
                yield out

        if self._process_has_timed_out:
            raise ProcessTimeout('Process has timed out')

        if self._process.returncode != 0:
            raise ProcessError('Process has failed')

    def _run_process(self):
        try:
            # Start gathering information (stdout and stderr) from the opened process
            self._process.communicate(timeout=self._timeout)
            # Graceful termination of the opened process
            self._process.terminate()
        except subprocess.TimeoutExpired:
            self._process_has_timed_out = True
            # Force termination of the opened process
            self._process.kill()

        self._process_done = True

    @property
    def return_code(self):
        return self._process.returncode




回答 13

这是我在一个项目中使用的类。它将子流程的输出重定向到日志。刚开始,我尝试简单地重写写方法,但是由于子进程将永远不会调用它而无法工作(重定向发生在文件描述符级别)。因此,我使用自己的管道,类似于subprocess-module中的管道。这具有将所有日志记录/打印逻辑封装在适配器中的优点,并且您只需将记录器的实例传递给Popensubprocess.Popen("/path/to/binary", stderr = LogAdapter("foo"))

class LogAdapter(threading.Thread):

    def __init__(self, logname, level = logging.INFO):
        super().__init__()
        self.log = logging.getLogger(logname)
        self.readpipe, self.writepipe = os.pipe()

        logFunctions = {
            logging.DEBUG: self.log.debug,
            logging.INFO: self.log.info,
            logging.WARN: self.log.warn,
            logging.ERROR: self.log.warn,
        }

        try:
            self.logFunction = logFunctions[level]
        except KeyError:
            self.logFunction = self.log.info

    def fileno(self):
        #when fileno is called this indicates the subprocess is about to fork => start thread
        self.start()
        return self.writepipe

    def finished(self):
       """If the write-filedescriptor is not closed this thread will
       prevent the whole program from exiting. You can use this method
       to clean up after the subprocess has terminated."""
       os.close(self.writepipe)

    def run(self):
        inputFile = os.fdopen(self.readpipe)

        while True:
            line = inputFile.readline()

            if len(line) == 0:
                #no new data was added
                break

            self.logFunction(line.strip())

如果您不需要日志记录而只想使用print()它,则可以明显地删除大部分代码并使该类更短。您还可以通过__enter__and __exit__方法将其展开并调用finished__exit__以便可以轻松地将其用作上下文。

Here is a class which I’m using in one of my projects. It redirects output of a subprocess to the log. At first I tried simply overwriting the write-method but that doesn’t work as the subprocess will never call it (redirection happens on filedescriptor level). So I’m using my own pipe, similar to how it’s done in the subprocess-module. This has the advantage of encapsulating all logging/printing logic in the adapter and you can simply pass instances of the logger to Popen: subprocess.Popen("/path/to/binary", stderr = LogAdapter("foo"))

class LogAdapter(threading.Thread):

    def __init__(self, logname, level = logging.INFO):
        super().__init__()
        self.log = logging.getLogger(logname)
        self.readpipe, self.writepipe = os.pipe()

        logFunctions = {
            logging.DEBUG: self.log.debug,
            logging.INFO: self.log.info,
            logging.WARN: self.log.warn,
            logging.ERROR: self.log.warn,
        }

        try:
            self.logFunction = logFunctions[level]
        except KeyError:
            self.logFunction = self.log.info

    def fileno(self):
        #when fileno is called this indicates the subprocess is about to fork => start thread
        self.start()
        return self.writepipe

    def finished(self):
       """If the write-filedescriptor is not closed this thread will
       prevent the whole program from exiting. You can use this method
       to clean up after the subprocess has terminated."""
       os.close(self.writepipe)

    def run(self):
        inputFile = os.fdopen(self.readpipe)

        while True:
            line = inputFile.readline()

            if len(line) == 0:
                #no new data was added
                break

            self.logFunction(line.strip())

If you don’t need logging but simply want to use print() you can obviously remove large portions of the code and keep the class shorter. You could also expand it by an __enter__ and __exit__ method and call finished in __exit__ so that you could easily use it as context.


回答 14

没有Pythonic解决方案对我有用。事实证明,proc.stdout.read()类似的行为可能永远存在。

因此,我这样使用tee

subprocess.run('./my_long_running_binary 2>&1 | tee -a my_log_file.txt && exit ${PIPESTATUS}', shell=True, check=True, executable='/bin/bash')

如果您已经在使用此解决方案,将非常方便shell=True

${PIPESTATUS}捕获整个命令链的成功状态(仅在Bash中可用)。如果我省略&& exit ${PIPESTATUS},则它将始终返回零,因为tee从不失败。

unbuffer可能需要立即将每行打印到终端中,而不是等待太久直到“管道缓冲区”填满。但是,unbuffer吞没了assert(SIG Abort)的退出状态。

2>&1 还将stderror记录到文件中。

None of the Pythonic solutions worked for me. It turned out that proc.stdout.read() or similar may block forever.

Therefore, I use tee like this:

subprocess.run('./my_long_running_binary 2>&1 | tee -a my_log_file.txt && exit ${PIPESTATUS}', shell=True, check=True, executable='/bin/bash')

This solution is convenient if you are already using shell=True.

${PIPESTATUS} captures the success status of the entire command chain (only available in Bash). If I omitted the && exit ${PIPESTATUS}, then this would always return zero since tee never fails.

unbuffer might be necessary for printing each line immediately into the terminal, instead of waiting way too long until the “pipe buffer” gets filled. However, unbuffer swallows the exit status of assert (SIG Abort)…

2>&1 also logs stderror to the file.


在Python中记录未捕获的异常

问题:在Python中记录未捕获的异常

您如何导致未捕获的异常通过logging模块而不是通过模块输出stderr

我意识到最好的方法是:

try:
    raise Exception, 'Throwing a boring exception'
except Exception, e:
    logging.exception(e)

但是我的情况是,如果在没有捕获到异常的情况下自动调用它,那将非常好logging.exception(...)

How do you cause uncaught exceptions to output via the logging module rather than to stderr?

I realize the best way to do this would be:

try:
    raise Exception, 'Throwing a boring exception'
except Exception, e:
    logging.exception(e)

But my situation is such that it would be really nice if logging.exception(...) were invoked automatically whenever an exception isn’t caught.


回答 0

正如Ned所指出的,sys.excepthook每次引发并捕获异常时都会调用它。实际的含义是,您可以在代码中覆盖的默认行为,sys.excepthook以执行所需的任何操作(包括使用logging.exception)。

作为一个稻草人的例子:

>>> import sys
>>> def foo(exctype, value, tb):
...     print 'My Error Information'
...     print 'Type:', exctype
...     print 'Value:', value
...     print 'Traceback:', tb
... 

覆写sys.excepthook

>>> sys.excepthook = foo

提交明显的语法错误(忽略冒号)并获取自定义错误信息:

>>> def bar(a, b)
My Error Information
Type: <type 'exceptions.SyntaxError'>
Value: invalid syntax (<stdin>, line 1)
Traceback: None

有关更多信息sys.excepthookhttp : //docs.python.org/library/sys.html#sys.excepthook

As Ned pointed out, sys.excepthook is invoked every time an exception is raised and uncaught. The practical implication of this is that in your code you can override the default behavior of sys.excepthook to do whatever you want (including using logging.exception).

As a straw man example:

>>> import sys
>>> def foo(exctype, value, tb):
...     print 'My Error Information'
...     print 'Type:', exctype
...     print 'Value:', value
...     print 'Traceback:', tb
... 

Override sys.excepthook:

>>> sys.excepthook = foo

Commit obvious syntax error (leave out the colon) and get back custom error information:

>>> def bar(a, b)
My Error Information
Type: <type 'exceptions.SyntaxError'>
Value: invalid syntax (<stdin>, line 1)
Traceback: None

For more information about sys.excepthook, read the docs.


回答 1

这是一个完整的小示例,其中还包括其他一些技巧:

import sys
import logging
logger = logging.getLogger(__name__)
handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(handler)

def handle_exception(exc_type, exc_value, exc_traceback):
    if issubclass(exc_type, KeyboardInterrupt):
        sys.__excepthook__(exc_type, exc_value, exc_traceback)
        return

    logger.error("Uncaught exception", exc_info=(exc_type, exc_value, exc_traceback))

sys.excepthook = handle_exception

if __name__ == "__main__":
    raise RuntimeError("Test unhandled")
  • 忽略KeyboardInterrupt,以便控制台python程序可以使用Ctrl + C退出。

  • 完全依靠python的日志记录模块来格式化异常。

  • 将自定义记录器与示例处理程序一起使用。这将未处理的异常更改为转到stdout而不是stderr,但是您可以将相同样式的各种处理程序添加到logger对象。

Here’s a complete small example that also includes a few other tricks:

import sys
import logging
logger = logging.getLogger(__name__)
handler = logging.StreamHandler(stream=sys.stdout)
logger.addHandler(handler)

def handle_exception(exc_type, exc_value, exc_traceback):
    if issubclass(exc_type, KeyboardInterrupt):
        sys.__excepthook__(exc_type, exc_value, exc_traceback)
        return

    logger.error("Uncaught exception", exc_info=(exc_type, exc_value, exc_traceback))

sys.excepthook = handle_exception

if __name__ == "__main__":
    raise RuntimeError("Test unhandled")
  • Ignore KeyboardInterrupt so a console python program can exit with Ctrl + C.

  • Rely entirely on python’s logging module for formatting the exception.

  • Use a custom logger with an example handler. This one changes the unhandled exception to go to stdout rather than stderr, but you could add all sorts of handlers in this same style to the logger object.


回答 2

sys.excepthook如果未捕获到异常,则将调用该方法:http : //docs.python.org/library/sys.html#sys.excepthook

当引发并捕获异常时,解释器使用三个参数(异常类,异常实例和回溯对象)调用sys.excepthook。在交互式会话中,这恰好在控制权返回到提示之前发生。在Python程序中,这恰好在程序退出之前发生。可以通过为sys.excepthook分配另一个三参数函数来定制此类顶级异常的处理。

The method sys.excepthook will be invoked if an exception is uncaught: http://docs.python.org/library/sys.html#sys.excepthook

When an exception is raised and uncaught, the interpreter calls sys.excepthook with three arguments, the exception class, exception instance, and a traceback object. In an interactive session this happens just before control is returned to the prompt; in a Python program this happens just before the program exits. The handling of such top-level exceptions can be customized by assigning another three-argument function to sys.excepthook.


回答 3

为什么不:

import sys
import logging
import traceback

def log_except_hook(*exc_info):
    text = "".join(traceback.format_exception(*exc_info))
    logging.error("Unhandled exception: %s", text)

sys.excepthook = log_except_hook

None()

这是sys.excepthook上面的输出:

$ python tb.py
ERROR:root:Unhandled exception: Traceback (most recent call last):
  File "tb.py", line 11, in <module>
    None()
TypeError: 'NoneType' object is not callable

这是带有sys.excepthook注释掉的输出:

$ python tb.py
Traceback (most recent call last):
  File "tb.py", line 11, in <module>
    None()
TypeError: 'NoneType' object is not callable

唯一的区别是前者ERROR:root:Unhandled exception:在第一行的开头。

Why not:

import sys
import logging
import traceback

def log_except_hook(*exc_info):
    text = "".join(traceback.format_exception(*exc_info))
    logging.error("Unhandled exception: %s", text)

sys.excepthook = log_except_hook

None()

Here is the output with sys.excepthook as seen above:

$ python tb.py
ERROR:root:Unhandled exception: Traceback (most recent call last):
  File "tb.py", line 11, in <module>
    None()
TypeError: 'NoneType' object is not callable

Here is the output with the sys.excepthook commented out:

$ python tb.py
Traceback (most recent call last):
  File "tb.py", line 11, in <module>
    None()
TypeError: 'NoneType' object is not callable

The only difference is that the former has ERROR:root:Unhandled exception: at the beginning of the first line.


回答 4

以Jacinda的答案为基础,但使用记录器对象:

def catchException(logger, typ, value, traceback):
    logger.critical("My Error Information")
    logger.critical("Type: %s" % typ)
    logger.critical("Value: %s" % value)
    logger.critical("Traceback: %s" % traceback)

# Use a partially applied function
func = lambda typ, value, traceback: catchException(logger, typ, value, traceback)
sys.excepthook = func

To build on Jacinda’s answer, but using a logger object:

def catchException(logger, typ, value, traceback):
    logger.critical("My Error Information")
    logger.critical("Type: %s" % typ)
    logger.critical("Value: %s" % value)
    logger.critical("Traceback: %s" % traceback)

# Use a partially applied function
func = lambda typ, value, traceback: catchException(logger, typ, value, traceback)
sys.excepthook = func

回答 5

将您的应用程序入口调用包装在一个try...except块中,这样您就可以捕获和记录(甚至重新引发)所有未捕获的异常。例如:

if __name__ == '__main__':
    main()

做这个:

if __name__ == '__main__':
    try:
        main()
    except Exception as e:
        logger.exception(e)
        raise

Wrap your app entry call in a try...except block so you’ll be able to catch and log (and perhaps re-raise) all uncaught exceptions. E.g. instead of:

if __name__ == '__main__':
    main()

Do this:

if __name__ == '__main__':
    try:
        main()
    except Exception as e:
        logger.exception(e)
        raise

回答 6

也许您可以在模块顶部执行一些操作,将stderr重定向到文件,然后在底部登录该文件

sock = open('error.log', 'w')               
sys.stderr = sock

doSomething() #makes errors and they will log to error.log

logging.exception(open('error.log', 'r').read() )

Maybe you could do something at the top of a module that redirects stderr to a file, and then logg that file at the bottom

sock = open('error.log', 'w')               
sys.stderr = sock

doSomething() #makes errors and they will log to error.log

logging.exception(open('error.log', 'r').read() )

回答 7

尽管@gnu_lorien的回答为我提供了一个很好的起点,但我的程序在发生第一次异常时崩溃。

我提供了一个定制的(和/或)改进的解决方案,该解决方案以静默方式记录了以修饰的函数的异常@handle_error

import logging

__author__ = 'ahmed'
logging.basicConfig(filename='error.log', level=logging.DEBUG)


def handle_exception(exc_type, exc_value, exc_traceback):
    import sys
    if issubclass(exc_type, KeyboardInterrupt):
        sys.__excepthook__(exc_type, exc_value, exc_traceback)
        return
    logging.critical(exc_value.message, exc_info=(exc_type, exc_value, exc_traceback))


def handle_error(func):
    import sys

    def __inner(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except Exception, e:
            exc_type, exc_value, exc_tb = sys.exc_info()
            handle_exception(exc_type, exc_value, exc_tb)
        finally:
            print(e.message)
    return __inner


@handle_error
def main():
    raise RuntimeError("RuntimeError")


if __name__ == "__main__":
    for _ in xrange(1, 20):
        main()

Although @gnu_lorien’s answer gave me good starting point, my program crashes on first exception.

I came with a customised (and/or) improved solution, which silently logs Exceptions of functions that are decorated with @handle_error.

import logging

__author__ = 'ahmed'
logging.basicConfig(filename='error.log', level=logging.DEBUG)


def handle_exception(exc_type, exc_value, exc_traceback):
    import sys
    if issubclass(exc_type, KeyboardInterrupt):
        sys.__excepthook__(exc_type, exc_value, exc_traceback)
        return
    logging.critical(exc_value.message, exc_info=(exc_type, exc_value, exc_traceback))


def handle_error(func):
    import sys

    def __inner(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except Exception, e:
            exc_type, exc_value, exc_tb = sys.exc_info()
            handle_exception(exc_type, exc_value, exc_tb)
        finally:
            print(e.message)
    return __inner


@handle_error
def main():
    raise RuntimeError("RuntimeError")


if __name__ == "__main__":
    for _ in xrange(1, 20):
        main()

回答 8

为了回答已接受答案的注释部分中讨论的宙斯先生的问题,我使用它在交互式控制台中记录未捕获的异常(已通过PyCharm 2018-2019测试)。我发现sys.excepthook它在python shell中不起作用,因此我更深入地研究并发现可以使用它sys.exc_info。但是,sys.exc_info不像其他任何参数sys.excepthook接受3个参数不同。

在这里,我同时使用sys.excepthooksys.exc_info在交互式控制台和带有包装函数的脚本中记录这两个异常。要将挂钩函数附加到这两个函数,我有两个不同的接口,具体取决于是否给定了参数。

这是代码:

def log_exception(exctype, value, traceback):
    logger.error("Uncaught exception occurred!",
                 exc_info=(exctype, value, traceback))


def attach_hook(hook_func, run_func):
    def inner(*args, **kwargs):
        if not (args or kwargs):
            # This condition is for sys.exc_info
            local_args = run_func()
            hook_func(*local_args)
        else:
            # This condition is for sys.excepthook
            hook_func(*args, **kwargs)
        return run_func(*args, **kwargs)
    return inner


sys.exc_info = attach_hook(log_exception, sys.exc_info)
sys.excepthook = attach_hook(log_exception, sys.excepthook)

日志设置可以在gnu_lorien的答案中找到。

To answer the question from Mr.Zeus discussed in the comment section of the accepted answer, I use this to log uncaught exceptions in an interactive console (tested with PyCharm 2018-2019). I found out sys.excepthook does not work in a python shell so I looked deeper and found that I could use sys.exc_info instead. However, sys.exc_info takes no arguments unlike sys.excepthook that takes 3 arguments.

Here, I use both sys.excepthook and sys.exc_info to log both exceptions in an interactive console and a script with a wrapper function. To attach a hook function to both functions, I have two different interfaces depending if arguments are given or not.

Here’s the code:

def log_exception(exctype, value, traceback):
    logger.error("Uncaught exception occurred!",
                 exc_info=(exctype, value, traceback))


def attach_hook(hook_func, run_func):
    def inner(*args, **kwargs):
        if not (args or kwargs):
            # This condition is for sys.exc_info
            local_args = run_func()
            hook_func(*local_args)
        else:
            # This condition is for sys.excepthook
            hook_func(*args, **kwargs)
        return run_func(*args, **kwargs)
    return inner


sys.exc_info = attach_hook(log_exception, sys.exc_info)
sys.excepthook = attach_hook(log_exception, sys.excepthook)

The logging setup can be found in gnu_lorien’s answer.


回答 9

就我而言,python 3在使用@Jacinda的答案时(使用)没有打印回溯的内容。相反,它只是打印对象本身:<traceback object at 0x7f90299b7b90>

相反,我这样做:

import sys
import logging
import traceback

def custom_excepthook(exc_type, exc_value, exc_traceback):
    # Do not print exception when user cancels the program
    if issubclass(exc_type, KeyboardInterrupt):
        sys.__excepthook__(exc_type, exc_value, exc_traceback)
        return

    logging.error("An uncaught exception occurred:")
    logging.error("Type: %s", exc_type)
    logging.error("Value: %s", exc_value)

    if exc_traceback:
        format_exception = traceback.format_tb(exc_traceback)
        for line in format_exception:
            logging.error(repr(line))

sys.excepthook = custom_excepthook

In my case (using python 3) when using @Jacinda ‘s answer the content of the traceback was not printed. Instead, it just prints the object itself: <traceback object at 0x7f90299b7b90>.

Instead I do:

import sys
import logging
import traceback

def custom_excepthook(exc_type, exc_value, exc_traceback):
    # Do not print exception when user cancels the program
    if issubclass(exc_type, KeyboardInterrupt):
        sys.__excepthook__(exc_type, exc_value, exc_traceback)
        return

    logging.error("An uncaught exception occurred:")
    logging.error("Type: %s", exc_type)
    logging.error("Value: %s", exc_value)

    if exc_traceback:
        format_exception = traceback.format_tb(exc_traceback)
        for line in format_exception:
            logging.error(repr(line))

sys.excepthook = custom_excepthook