分类目录归档:知识问答

如何使用itertools.groupby()?

问题:如何使用itertools.groupby()?

我还没有找到关于如何实际使用Python itertools.groupby()函数的可理解的解释。我想做的是这样的:

  • 列出一个列表-在这种情况下,是对象化lxml元素的子元素
  • 根据一些标准将其分为几组
  • 然后,稍后分别遍历每个组。

我已经阅读了文档示例,但是尝试将它们应用于简单的数字列表之外时遇到了麻烦。

那么,我该如何使用itertools.groupby()?我应该使用另一种技术吗?指向良好“前提”阅读的指针也将受到赞赏。

I haven’t been able to find an understandable explanation of how to actually use Python’s itertools.groupby() function. What I’m trying to do is this:

  • Take a list – in this case, the children of an objectified lxml element
  • Divide it into groups based on some criteria
  • Then later iterate over each of these groups separately.

I’ve reviewed the documentation, and the examples, but I’ve had trouble trying to apply them beyond a simple list of numbers.

So, how do I use of itertools.groupby()? Is there another technique I should be using? Pointers to good “prerequisite” reading would also be appreciated.


回答 0

重要说明:您必须先对数据进行排序


我没有得到的部分是在示例构造中

groups = []
uniquekeys = []
for k, g in groupby(data, keyfunc):
   groups.append(list(g))    # Store group iterator as a list
   uniquekeys.append(k)

k是当前的分组密钥,并且g是一个迭代器,您可以用来迭代该分组密钥定义的组。换句话说,groupby迭代器本身返回迭代器。

这是使用更清晰的变量名的示例:

from itertools import groupby

things = [("animal", "bear"), ("animal", "duck"), ("plant", "cactus"), ("vehicle", "speed boat"), ("vehicle", "school bus")]

for key, group in groupby(things, lambda x: x[0]):
    for thing in group:
        print "A %s is a %s." % (thing[1], key)
    print " "

这将为您提供输出:

熊是动物。
鸭子是动物。

仙人掌是植物。

快艇是车辆。
校车是车辆。

在此示例中,things是一个元组列表,其中每个元组中的第一项是第二项所属的组。

groupby()函数有两个参数:(1)要分组的数据和(2)将数据分组的函数。

在这里,lambda x: x[0]告诉groupby()使用每个元组中的第一项作为分组键。

在上面的for语句中,groupby返回三个(键,组迭代器)对-每个唯一键一次。您可以使用返回的迭代器来迭代该组中的每个单个项目。

这是一个使用列表推导的具有相同数据的稍微不同的示例:

for key, group in groupby(things, lambda x: x[0]):
    listOfThings = " and ".join([thing[1] for thing in group])
    print key + "s:  " + listOfThings + "."

这将为您提供输出:

动物:熊和鸭。
植物:仙人掌。
车辆:快艇和校车。

IMPORTANT NOTE: You have to sort your data first.


The part I didn’t get is that in the example construction

groups = []
uniquekeys = []
for k, g in groupby(data, keyfunc):
   groups.append(list(g))    # Store group iterator as a list
   uniquekeys.append(k)

k is the current grouping key, and g is an iterator that you can use to iterate over the group defined by that grouping key. In other words, the groupby iterator itself returns iterators.

Here’s an example of that, using clearer variable names:

from itertools import groupby

things = [("animal", "bear"), ("animal", "duck"), ("plant", "cactus"), ("vehicle", "speed boat"), ("vehicle", "school bus")]

for key, group in groupby(things, lambda x: x[0]):
    for thing in group:
        print "A %s is a %s." % (thing[1], key)
    print " "

This will give you the output:

A bear is a animal.
A duck is a animal.

A cactus is a plant.

A speed boat is a vehicle.
A school bus is a vehicle.

In this example, things is a list of tuples where the first item in each tuple is the group the second item belongs to.

The groupby() function takes two arguments: (1) the data to group and (2) the function to group it with.

Here, lambda x: x[0] tells groupby() to use the first item in each tuple as the grouping key.

In the above for statement, groupby returns three (key, group iterator) pairs – once for each unique key. You can use the returned iterator to iterate over each individual item in that group.

Here’s a slightly different example with the same data, using a list comprehension:

for key, group in groupby(things, lambda x: x[0]):
    listOfThings = " and ".join([thing[1] for thing in group])
    print key + "s:  " + listOfThings + "."

This will give you the output:

animals: bear and duck.
plants: cactus.
vehicles: speed boat and school bus.


回答 1

Python文档上的示例非常简单:

groups = []
uniquekeys = []
for k, g in groupby(data, keyfunc):
    groups.append(list(g))      # Store group iterator as a list
    uniquekeys.append(k)

因此,在您的情况下,数据是节点列表,keyfunc准则功能的逻辑将在此进行然后groupby()对数据进行分组。

在调用之前,您必须小心按照条件对数据进行排序,groupby否则它将无法正常工作。groupby方法实际上只是遍历列表,并且每当键更改时,它都会创建一个新组。

The example on the Python docs is quite straightforward:

groups = []
uniquekeys = []
for k, g in groupby(data, keyfunc):
    groups.append(list(g))      # Store group iterator as a list
    uniquekeys.append(k)

So in your case, data is a list of nodes, keyfunc is where the logic of your criteria function goes and then groupby() groups the data.

You must be careful to sort the data by the criteria before you call groupby or it won’t work. groupby method actually just iterates through a list and whenever the key changes it creates a new group.


回答 2

itertools.groupby 是用于分组项目的工具。

文档中,我们进一步了解了它的作用:

# [k for k, g in groupby('AAAABBBCCDAABBB')] --> A B C D A B

# [list(g) for k, g in groupby('AAAABBBCCD')] --> AAAA BBB CC D

groupby 对象产生密钥组对,其中组是生成器。

特征

  • A.将连续的项目组合在一起
  • B.归因于可迭代项,将某项的所有出现分组
  • C.指定如何使用按键功能 对项目进行分组*

比较

# Define a printer for comparing outputs
>>> def print_groupby(iterable, keyfunc=None):
...    for k, g in it.groupby(iterable, keyfunc):
...        print("key: '{}'--> group: {}".format(k, list(g)))

# Feature A: group consecutive occurrences
>>> print_groupby("BCAACACAADBBB")
key: 'B'--> group: ['B']
key: 'C'--> group: ['C']
key: 'A'--> group: ['A', 'A']
key: 'C'--> group: ['C']
key: 'A'--> group: ['A']
key: 'C'--> group: ['C']
key: 'A'--> group: ['A', 'A']
key: 'D'--> group: ['D']
key: 'B'--> group: ['B', 'B', 'B']

# Feature B: group all occurrences
>>> print_groupby(sorted("BCAACACAADBBB"))
key: 'A'--> group: ['A', 'A', 'A', 'A', 'A']
key: 'B'--> group: ['B', 'B', 'B', 'B']
key: 'C'--> group: ['C', 'C', 'C']
key: 'D'--> group: ['D']

# Feature C: group by a key function
>>> # keyfunc = lambda s: s.islower()                      # equivalent
>>> def keyfunc(s):
...     """Return a True if a string is lowercase, else False."""   
...     return s.islower()
>>> print_groupby(sorted("bCAaCacAADBbB"), keyfunc)
key: 'False'--> group: ['A', 'A', 'A', 'B', 'B', 'C', 'C', 'D']
key: 'True'--> group: ['a', 'a', 'b', 'b', 'c']

用途

注意:后面的几个示例来自VíctorTerrón的PyCon (谈话) (西班牙语),“黎明时的功夫与Itertools”。另请参见用C编写的groupby源代码

*传递和比较所有项目,影响结果的功能。按键功能的对象包括 sorted()max()min()


响应

# OP: Yes, you can use `groupby`, e.g. 
[do_something(list(g)) for _, g in groupby(lxml_elements, criteria_func)]

itertools.groupby is a tool for grouping items.

From the docs, we glean further what it might do:

# [k for k, g in groupby('AAAABBBCCDAABBB')] --> A B C D A B

# [list(g) for k, g in groupby('AAAABBBCCD')] --> AAAA BBB CC D

groupby objects yield key-group pairs where the group is a generator.

Features

  • A. Group consecutive items together
  • B. Group all occurrences of an item, given a sorted iterable
  • C. Specify how to group items with a key function *

Comparisons

# Define a printer for comparing outputs
>>> def print_groupby(iterable, keyfunc=None):
...    for k, g in it.groupby(iterable, keyfunc):
...        print("key: '{}'--> group: {}".format(k, list(g)))

# Feature A: group consecutive occurrences
>>> print_groupby("BCAACACAADBBB")
key: 'B'--> group: ['B']
key: 'C'--> group: ['C']
key: 'A'--> group: ['A', 'A']
key: 'C'--> group: ['C']
key: 'A'--> group: ['A']
key: 'C'--> group: ['C']
key: 'A'--> group: ['A', 'A']
key: 'D'--> group: ['D']
key: 'B'--> group: ['B', 'B', 'B']

# Feature B: group all occurrences
>>> print_groupby(sorted("BCAACACAADBBB"))
key: 'A'--> group: ['A', 'A', 'A', 'A', 'A']
key: 'B'--> group: ['B', 'B', 'B', 'B']
key: 'C'--> group: ['C', 'C', 'C']
key: 'D'--> group: ['D']

# Feature C: group by a key function
>>> # keyfunc = lambda s: s.islower()                      # equivalent
>>> def keyfunc(s):
...     """Return a True if a string is lowercase, else False."""   
...     return s.islower()
>>> print_groupby(sorted("bCAaCacAADBbB"), keyfunc)
key: 'False'--> group: ['A', 'A', 'A', 'B', 'B', 'C', 'C', 'D']
key: 'True'--> group: ['a', 'a', 'b', 'b', 'c']

Uses

Note: Several of the latter examples derive from Víctor Terrón’s PyCon (talk) (Spanish), “Kung Fu at Dawn with Itertools”. See also the groupbysource code written in C.

* A function where all items are passed through and compared, influencing the result. Other objects with key functions include sorted(), max() and min().


Response

# OP: Yes, you can use `groupby`, e.g. 
[do_something(list(g)) for _, g in groupby(lxml_elements, criteria_func)]

回答 3

groupby的一个绝妙技巧是在一行中运行长度编码:

[(c,len(list(cgen))) for c,cgen in groupby(some_string)]

将为您提供2元组的列表,其中第一个元素是char,第二个是重复数。

编辑:请注意,这是itertools.groupby与SQL GROUP BY语义分离的内容:itertools不会(通常不能)事先对迭代器进行排序,因此不会合并具有相同“键”的组。

A neato trick with groupby is to run length encoding in one line:

[(c,len(list(cgen))) for c,cgen in groupby(some_string)]

will give you a list of 2-tuples where the first element is the char and the 2nd is the number of repetitions.

Edit: Note that this is what separates itertools.groupby from the SQL GROUP BY semantics: itertools doesn’t (and in general can’t) sort the iterator in advance, so groups with the same “key” aren’t merged.


回答 4

另一个例子:

for key, igroup in itertools.groupby(xrange(12), lambda x: x // 5):
    print key, list(igroup)

结果是

0 [0, 1, 2, 3, 4]
1 [5, 6, 7, 8, 9]
2 [10, 11]

请注意,igroup是一个迭代器(文档中称为子迭代器)。

这对于分块生成器很有用:

def chunker(items, chunk_size):
    '''Group items in chunks of chunk_size'''
    for _key, group in itertools.groupby(enumerate(items), lambda x: x[0] // chunk_size):
        yield (g[1] for g in group)

with open('file.txt') as fobj:
    for chunk in chunker(fobj):
        process(chunk)

groupby的另一个示例-不对键进行排序时。在以下示例中,xx中的项目按yy中的值分组。在这种情况下,首先输出一组零,然后输出一组零,再输出一组零。

xx = range(10)
yy = [0, 0, 0, 1, 1, 1, 0, 0, 0, 0]
for group in itertools.groupby(iter(xx), lambda x: yy[x]):
    print group[0], list(group[1])

生成:

0 [0, 1, 2]
1 [3, 4, 5]
0 [6, 7, 8, 9]

Another example:

for key, igroup in itertools.groupby(xrange(12), lambda x: x // 5):
    print key, list(igroup)

results in

0 [0, 1, 2, 3, 4]
1 [5, 6, 7, 8, 9]
2 [10, 11]

Note that igroup is an iterator (a sub-iterator as the documentation calls it).

This is useful for chunking a generator:

def chunker(items, chunk_size):
    '''Group items in chunks of chunk_size'''
    for _key, group in itertools.groupby(enumerate(items), lambda x: x[0] // chunk_size):
        yield (g[1] for g in group)

with open('file.txt') as fobj:
    for chunk in chunker(fobj):
        process(chunk)

Another example of groupby – when the keys are not sorted. In the following example, items in xx are grouped by values in yy. In this case, one set of zeros is output first, followed by a set of ones, followed again by a set of zeros.

xx = range(10)
yy = [0, 0, 0, 1, 1, 1, 0, 0, 0, 0]
for group in itertools.groupby(iter(xx), lambda x: yy[x]):
    print group[0], list(group[1])

Produces:

0 [0, 1, 2]
1 [3, 4, 5]
0 [6, 7, 8, 9]

回答 5

警告:

语法列表(groupby(…))不能按您期望的方式工作。似乎破坏了内部迭代器对象,因此使用

for x in list(groupby(range(10))):
    print(list(x[1]))

将生成:

[]
[]
[]
[]
[]
[]
[]
[]
[]
[9]

代替list(groupby(…)),尝试[[k,list(g))for groupby(…)中的k,g,或者如果您经常使用该语法,

def groupbylist(*args, **kwargs):
    return [(k, list(g)) for k, g in groupby(*args, **kwargs)]

并访问了groupby功能,同时避免了那些讨厌的(对于小数据)迭代器。

WARNING:

The syntax list(groupby(…)) won’t work the way that you intend. It seems to destroy the internal iterator objects, so using

for x in list(groupby(range(10))):
    print(list(x[1]))

will produce:

[]
[]
[]
[]
[]
[]
[]
[]
[]
[9]

Instead, of list(groupby(…)), try [(k, list(g)) for k,g in groupby(…)], or if you use that syntax often,

def groupbylist(*args, **kwargs):
    return [(k, list(g)) for k, g in groupby(*args, **kwargs)]

and get access to the groupby functionality while avoiding those pesky (for small data) iterators all together.


回答 6

我想再举一个没有排序的groupby无法正常工作的例子。改编自James Sulak的例子

from itertools import groupby

things = [("vehicle", "bear"), ("animal", "duck"), ("animal", "cactus"), ("vehicle", "speed boat"), ("vehicle", "school bus")]

for key, group in groupby(things, lambda x: x[0]):
    for thing in group:
        print "A %s is a %s." % (thing[1], key)
    print " "

输出是

A bear is a vehicle.

A duck is a animal.
A cactus is a animal.

A speed boat is a vehicle.
A school bus is a vehicle.

有两组带有车辆的车辆,而一个可以预期只有一组

I would like to give another example where groupby without sort is not working. Adapted from example by James Sulak

from itertools import groupby

things = [("vehicle", "bear"), ("animal", "duck"), ("animal", "cactus"), ("vehicle", "speed boat"), ("vehicle", "school bus")]

for key, group in groupby(things, lambda x: x[0]):
    for thing in group:
        print "A %s is a %s." % (thing[1], key)
    print " "

output is

A bear is a vehicle.

A duck is a animal.
A cactus is a animal.

A speed boat is a vehicle.
A school bus is a vehicle.

there are two groups with vehicule, whereas one could expect only one group


回答 7

@CaptSolo,我尝试了您的示例,但没有成功。

from itertools import groupby 
[(c,len(list(cs))) for c,cs in groupby('Pedro Manoel')]

输出:

[('P', 1), ('e', 1), ('d', 1), ('r', 1), ('o', 1), (' ', 1), ('M', 1), ('a', 1), ('n', 1), ('o', 1), ('e', 1), ('l', 1)]

如您所见,有两个o和两个e,但是它们分成不同的组。从那时起,我意识到您需要对传递给groupby函数的列表进行排序。因此,正确的用法是:

name = list('Pedro Manoel')
name.sort()
[(c,len(list(cs))) for c,cs in groupby(name)]

输出:

[(' ', 1), ('M', 1), ('P', 1), ('a', 1), ('d', 1), ('e', 2), ('l', 1), ('n', 1), ('o', 2), ('r', 1)]

请记住,如果列表未排序,groupby函数将无法工作

@CaptSolo, I tried your example, but it didn’t work.

from itertools import groupby 
[(c,len(list(cs))) for c,cs in groupby('Pedro Manoel')]

Output:

[('P', 1), ('e', 1), ('d', 1), ('r', 1), ('o', 1), (' ', 1), ('M', 1), ('a', 1), ('n', 1), ('o', 1), ('e', 1), ('l', 1)]

As you can see, there are two o’s and two e’s, but they got into separate groups. That’s when I realized you need to sort the list passed to the groupby function. So, the correct usage would be:

name = list('Pedro Manoel')
name.sort()
[(c,len(list(cs))) for c,cs in groupby(name)]

Output:

[(' ', 1), ('M', 1), ('P', 1), ('a', 1), ('d', 1), ('e', 2), ('l', 1), ('n', 1), ('o', 2), ('r', 1)]

Just remembering, if the list is not sorted, the groupby function will not work!


回答 8

排序和分组

from itertools import groupby

val = [{'name': 'satyajit', 'address': 'btm', 'pin': 560076}, 
       {'name': 'Mukul', 'address': 'Silk board', 'pin': 560078},
       {'name': 'Preetam', 'address': 'btm', 'pin': 560076}]


for pin, list_data in groupby(sorted(val, key=lambda k: k['pin']),lambda x: x['pin']):
...     print pin
...     for rec in list_data:
...             print rec
... 
o/p:

560076
{'name': 'satyajit', 'pin': 560076, 'address': 'btm'}
{'name': 'Preetam', 'pin': 560076, 'address': 'btm'}
560078
{'name': 'Mukul', 'pin': 560078, 'address': 'Silk board'}

Sorting and groupby

from itertools import groupby

val = [{'name': 'satyajit', 'address': 'btm', 'pin': 560076}, 
       {'name': 'Mukul', 'address': 'Silk board', 'pin': 560078},
       {'name': 'Preetam', 'address': 'btm', 'pin': 560076}]


for pin, list_data in groupby(sorted(val, key=lambda k: k['pin']),lambda x: x['pin']):
...     print pin
...     for rec in list_data:
...             print rec
... 
o/p:

560076
{'name': 'satyajit', 'pin': 560076, 'address': 'btm'}
{'name': 'Preetam', 'pin': 560076, 'address': 'btm'}
560078
{'name': 'Mukul', 'pin': 560078, 'address': 'Silk board'}

回答 9

如何使用Python的itertools.groupby()?

您可以使用groupby对事物进行分组以进行迭代。您为groupby提供了一个可迭代的对象,以及一个可选的函数/可调用对象,通过它可以检查从可迭代对象中出来的项,然后返回一个迭代器,该迭代器给出了可调用键和实际项的结果的二元组。另一个可迭代的。从帮助中:

groupby(iterable[, keyfunc]) -> create an iterator which returns
(key, sub-iterator) grouped by each value of key(value).

这是一个使用协程对计数进行分组的groupby示例,它使用可调用的键(在这种情况下为coroutine.send)来吐出计数,无论迭代多少次,以及元素的分组子迭代器:

import itertools


def grouper(iterable, n):
    def coroutine(n):
        yield # queue up coroutine
        for i in itertools.count():
            for j in range(n):
                yield i
    groups = coroutine(n)
    next(groups) # queue up coroutine

    for c, objs in itertools.groupby(iterable, groups.send):
        yield c, list(objs)
    # or instead of materializing a list of objs, just:
    # return itertools.groupby(iterable, groups.send)

list(grouper(range(10), 3))

版画

[(0, [0, 1, 2]), (1, [3, 4, 5]), (2, [6, 7, 8]), (3, [9])]

How do I use Python’s itertools.groupby()?

You can use groupby to group things to iterate over. You give groupby an iterable, and a optional key function/callable by which to check the items as they come out of the iterable, and it returns an iterator that gives a two-tuple of the result of the key callable and the actual items in another iterable. From the help:

groupby(iterable[, keyfunc]) -> create an iterator which returns
(key, sub-iterator) grouped by each value of key(value).

Here’s an example of groupby using a coroutine to group by a count, it uses a key callable (in this case, coroutine.send) to just spit out the count for however many iterations and a grouped sub-iterator of elements:

import itertools


def grouper(iterable, n):
    def coroutine(n):
        yield # queue up coroutine
        for i in itertools.count():
            for j in range(n):
                yield i
    groups = coroutine(n)
    next(groups) # queue up coroutine

    for c, objs in itertools.groupby(iterable, groups.send):
        yield c, list(objs)
    # or instead of materializing a list of objs, just:
    # return itertools.groupby(iterable, groups.send)

list(grouper(range(10), 3))

prints

[(0, [0, 1, 2]), (1, [3, 4, 5]), (2, [6, 7, 8]), (3, [9])]

回答 10

我遇到的一个有用的示例可能会有所帮助:

from itertools import groupby

#user input

myinput = input()

#creating empty list to store output

myoutput = []

for k,g in groupby(myinput):

    myoutput.append((len(list(g)),int(k)))

print(*myoutput)

输入样本:14445221

样本输出:(1,1)(3,4)(1,5)(2,2)(1,1)

One useful example that I came across may be helpful:

from itertools import groupby

#user input

myinput = input()

#creating empty list to store output

myoutput = []

for k,g in groupby(myinput):

    myoutput.append((len(list(g)),int(k)))

print(*myoutput)

Sample input: 14445221

Sample output: (1,1) (3,4) (1,5) (2,2) (1,1)


回答 11

这个基本的实现帮助我理解了此功能。希望它也能帮助其他人:

arr = [(1, "A"), (1, "B"), (1, "C"), (2, "D"), (2, "E"), (3, "F")]

for k,g in groupby(arr, lambda x: x[0]):
    print("--", k, "--")
    for tup in g:
        print(tup[1])  # tup[0] == k
-- 1 --
A
B
C
-- 2 --
D
E
-- 3 --
F

This basic implementation helped me understand this function. Hope it helps others as well:

arr = [(1, "A"), (1, "B"), (1, "C"), (2, "D"), (2, "E"), (3, "F")]

for k,g in groupby(arr, lambda x: x[0]):
    print("--", k, "--")
    for tup in g:
        print(tup[1])  # tup[0] == k
-- 1 --
A
B
C
-- 2 --
D
E
-- 3 --
F

回答 12

您可以编写自己的groupby函数:

           def groupby(data):
                kv = {}
                for k,v in data:
                    if k not in kv:
                         kv[k]=[v]
                    else:
                        kv[k].append(v)
           return kv

     Run on ipython:
       In [10]: data = [('a', 1), ('b',2),('a',2)]

        In [11]: groupby(data)
        Out[11]: {'a': [1, 2], 'b': [2]}

You can write own groupby function:

           def groupby(data):
                kv = {}
                for k,v in data:
                    if k not in kv:
                         kv[k]=[v]
                    else:
                        kv[k].append(v)
           return kv

     Run on ipython:
       In [10]: data = [('a', 1), ('b',2),('a',2)]

        In [11]: groupby(data)
        Out[11]: {'a': [1, 2], 'b': [2]}

在Python中访问字典中的任意元素

问题:在Python中访问字典中的任意元素

如果a mydict不为空,则访问以下任意元素:

mydict[mydict.keys()[0]]

有什么更好的方法吗?

If a mydict is not empty, I access an arbitrary element as:

mydict[mydict.keys()[0]]

Is there any better way to do this?


回答 0

在Python 3上,非破坏性和迭代性地:

next(iter(mydict.values()))

在Python 2上,非破坏性和迭代性地:

mydict.itervalues().next()

如果希望它在Python 2和3中都可以使用,则可以使用该six包:

six.next(six.itervalues(mydict))

尽管在这一点上它还是很神秘的,但我还是更喜欢您的代码。

如果要删除任何项目,请执行以下操作:

key, value = mydict.popitem()

请注意,“ first”在此处可能不是合适的术语,因为dict在Python <3.6中不是有序类型。Python 3.6+ dicts已订购。

On Python 3, non-destructively and iteratively:

next(iter(mydict.values()))

On Python 2, non-destructively and iteratively:

mydict.itervalues().next()

If you want it to work in both Python 2 and 3, you can use the six package:

six.next(six.itervalues(mydict))

though at this point it is quite cryptic and I’d rather prefer your code.

If you want to remove any item, do:

key, value = mydict.popitem()

Note that “first” may not be an appropriate term here because dict is not an ordered type in Python < 3.6. Python 3.6+ dicts are ordered.


回答 1

如果您只需要访问一个元素(由于字典不能保证排序,则是偶然的第一个元素),您可以在Python 2中简单地做到这一点:

my_dict.keys()[0]     -> key of "first" element
my_dict.values()[0]   -> value of "first" element
my_dict.items()[0]    -> (key, value) tuple of "first" element

请注意(据我所知),Python不保证对这些方法中任何一个的2个连续调用将返回具有相同顺序的list。Python3不支持此功能。

Python 3中

list(my_dict.keys())[0]     -> key of "first" element
list(my_dict.values())[0]   -> value of "first" element
list(my_dict.items())[0]    -> (key, value) tuple of "first" element

If you only need to access one element (being the first by chance, since dicts do not guarantee ordering) you can simply do this in Python 2:

my_dict.keys()[0]     -> key of "first" element
my_dict.values()[0]   -> value of "first" element
my_dict.items()[0]    -> (key, value) tuple of "first" element

Please note that (at best of my knowledge) Python does not guarantee that 2 successive calls to any of these methods will return list with the same ordering. This is not supported with Python3.

in Python 3:

list(my_dict.keys())[0]     -> key of "first" element
list(my_dict.values())[0]   -> value of "first" element
list(my_dict.items())[0]    -> (key, value) tuple of "first" element

回答 2

在python3中,方式:

dict.keys() 

返回类型为dict_keys()的值,当通过这种方式获得dict的键的第一个成员时,我们将得到一个错误:

dict.keys()[0]
TypeError: 'dict_keys' object does not support indexing

最后,我将dict.keys()转换为列表@ 1st,并通过列表拼接方法获得了第一个成员:

list(dict.keys())[0]

In python3, The way :

dict.keys() 

return a value in type : dict_keys(), we’ll got an error when got 1st member of keys of dict by this way:

dict.keys()[0]
TypeError: 'dict_keys' object does not support indexing

Finally, I convert dict.keys() to list @1st, and got 1st member by list splice method:

list(dict.keys())[0]

回答 3

拿到钥匙

next(iter(mydict))

获得价值

next(iter(mydict.values()))

两者兼得

next(iter(mydict.items())) # or next(iter(mydict.viewitems())) in python 2

前两个是Python 2和3。后两个在Python 3中是惰性的,但在Python 2中则不是。

to get a key

next(iter(mydict))

to get a value

next(iter(mydict.values()))

to get both

next(iter(mydict.items())) # or next(iter(mydict.viewitems())) in python 2

The first two are Python 2 and 3. The last two are lazy in Python 3, but not in Python 2.


回答 4

正如其他人提到的那样,由于字典没有保证的顺序(它们被实现为哈希表),因此没有“第一项”。例如,如果您想要与最小键对应的值,则thedict[min(thedict)]可以这样做。如果您关心键的插入顺序,即“首先”是指“最早插入”,那么在Python 3.1中,您可以使用collections.OrderedDict,即将在即将发布的Python 2.7中使用;对于旧版本的Python,请下载,安装并使用有序的dict反向移植(2.4及更高版本),您可以在此处找到。

Python 3.7 现在,字典按插入顺序排序。

As others mentioned, there is no “first item”, since dictionaries have no guaranteed order (they’re implemented as hash tables). If you want, for example, the value corresponding to the smallest key, thedict[min(thedict)] will do that. If you care about the order in which the keys were inserted, i.e., by “first” you mean “inserted earliest”, then in Python 3.1 you can use collections.OrderedDict, which is also in the forthcoming Python 2.7; for older versions of Python, download, install, and use the ordered dict backport (2.4 and later) which you can find here.

Python 3.7 Now dicts are insertion ordered.


回答 5

这个怎么样。这里还没有提到。

py 2&3

a = {"a":2,"b":3}
a[list(a)[0]] # the first element is here
>>> 2

How about, this. Not mentioned here yet.

py 2 & 3

a = {"a":2,"b":3}
a[list(a)[0]] # the first element is here
>>> 2

回答 6

忽略有关字典排序的问题,这可能会更好:

next(dict.itervalues())

这样,我们避免了项目查找,并生成了我们不使用的键列表。

Python3

next(iter(dict.values()))

Ignoring issues surrounding dict ordering, this might be better:

next(dict.itervalues())

This way we avoid item lookup and generating a list of keys that we don’t use.

Python3

next(iter(dict.values()))

回答 7

在python3中

list(dict.values())[0]

In python3

list(dict.values())[0]

回答 8

您可以随时这样做:

for k in sorted(d.keys()):
    print d[k]

如果排序对您有任何意义,这将为您提供一致的排序键(关于内置 .hash()我猜)。例如,这意味着即使您扩展字典,数字类型也会一致地排序。

# lets create a simple dictionary
d = {1:1, 2:2, 3:3, 4:4, 10:10, 100:100}
print d.keys()
print sorted(d.keys())

# add some other stuff
d['peter'] = 'peter'
d['parker'] = 'parker'
print d.keys()
print sorted(d.keys())

# some more stuff, numeric of different type, this will "mess up" the keys set order
d[0.001] = 0.001
d[3.14] = 'pie'
d[2.71] = 'apple pie'
print d.keys()
print sorted(d.keys())

请注意,字典在打印时已排序。但是键集本质上是一个哈希图!

You can always do:

for k in sorted(d.keys()):
    print d[k]

This will give you a consistently sorted (with respect to builtin.hash() I guess) set of keys you can process on if the sorting has any meaning to you. That means for example numeric types are sorted consistently even if you expand the dictionary.

EXAMPLE

# lets create a simple dictionary
d = {1:1, 2:2, 3:3, 4:4, 10:10, 100:100}
print d.keys()
print sorted(d.keys())

# add some other stuff
d['peter'] = 'peter'
d['parker'] = 'parker'
print d.keys()
print sorted(d.keys())

# some more stuff, numeric of different type, this will "mess up" the keys set order
d[0.001] = 0.001
d[3.14] = 'pie'
d[2.71] = 'apple pie'
print d.keys()
print sorted(d.keys())

Note that the dictionary is sorted when printed. But the key set is essentially a hashmap!


回答 9

对于Python 2和3:

import six

six.next(six.itervalues(d))

For both Python 2 and 3:

import six

six.next(six.itervalues(d))

回答 10

first_key, *rest_keys = mydict
first_key, *rest_keys = mydict

回答 11

没有外部库,可在Python 2.7和3.x上运行:

>>> list(set({"a":1, "b": 2}.values()))[0]
1

对于aribtrary键,只需忽略.values()

>>> list(set({"a":1, "b": 2}))[0]
'a'

No external libraries, works on both Python 2.7 and 3.x:

>>> list(set({"a":1, "b": 2}.values()))[0]
1

For aribtrary key just leave out .values()

>>> list(set({"a":1, "b": 2}))[0]
'a'

回答 12

子类化dict是一种方法,尽管效率不高。如果您提供整数,它将返回d[list(d)[n]],否则按预期方式访问字典:

class mydict(dict):
    def __getitem__(self, value):
        if isinstance(value, int):
            return self.get(list(self)[value])
        else:
            return self.get(value)

d = mydict({'a': 'hello', 'b': 'this', 'c': 'is', 'd': 'a',
            'e': 'test', 'f': 'dictionary', 'g': 'testing'})

d[0]    # 'hello'
d[1]    # 'this'
d['c']  # 'is'

Subclassing dict is one method, though not efficient. Here if you supply an integer it will return d[list(d)[n]], otherwise access the dictionary as expected:

class mydict(dict):
    def __getitem__(self, value):
        if isinstance(value, int):
            return self.get(list(self)[value])
        else:
            return self.get(value)

d = mydict({'a': 'hello', 'b': 'this', 'c': 'is', 'd': 'a',
            'e': 'test', 'f': 'dictionary', 'g': 'testing'})

d[0]    # 'hello'
d[1]    # 'this'
d['c']  # 'is'

根据列值删除Pandas中的DataFrame行

问题:根据列值删除Pandas中的DataFrame行

我有以下DataFrame:

             daysago  line_race rating        rw    wrating
 line_date                                                 
 2007-03-31       62         11     56  1.000000  56.000000
 2007-03-10       83         11     67  1.000000  67.000000
 2007-02-10      111          9     66  1.000000  66.000000
 2007-01-13      139         10     83  0.880678  73.096278
 2006-12-23      160         10     88  0.793033  69.786942
 2006-11-09      204          9     52  0.636655  33.106077
 2006-10-22      222          8     66  0.581946  38.408408
 2006-09-29      245          9     70  0.518825  36.317752
 2006-09-16      258         11     68  0.486226  33.063381
 2006-08-30      275          8     72  0.446667  32.160051
 2006-02-11      475          5     65  0.164591  10.698423
 2006-01-13      504          0     70  0.142409   9.968634
 2006-01-02      515          0     64  0.134800   8.627219
 2005-12-06      542          0     70  0.117803   8.246238
 2005-11-29      549          0     70  0.113758   7.963072
 2005-11-22      556          0     -1  0.109852  -0.109852
 2005-11-01      577          0     -1  0.098919  -0.098919
 2005-10-20      589          0     -1  0.093168  -0.093168
 2005-09-27      612          0     -1  0.083063  -0.083063
 2005-09-07      632          0     -1  0.075171  -0.075171
 2005-06-12      719          0     69  0.048690   3.359623
 2005-05-29      733          0     -1  0.045404  -0.045404
 2005-05-02      760          0     -1  0.039679  -0.039679
 2005-04-02      790          0     -1  0.034160  -0.034160
 2005-03-13      810          0     -1  0.030915  -0.030915
 2004-11-09      934          0     -1  0.016647  -0.016647

我需要删除line_race等于的行0。最有效的方法是什么?

I have the following DataFrame:

             daysago  line_race rating        rw    wrating
 line_date                                                 
 2007-03-31       62         11     56  1.000000  56.000000
 2007-03-10       83         11     67  1.000000  67.000000
 2007-02-10      111          9     66  1.000000  66.000000
 2007-01-13      139         10     83  0.880678  73.096278
 2006-12-23      160         10     88  0.793033  69.786942
 2006-11-09      204          9     52  0.636655  33.106077
 2006-10-22      222          8     66  0.581946  38.408408
 2006-09-29      245          9     70  0.518825  36.317752
 2006-09-16      258         11     68  0.486226  33.063381
 2006-08-30      275          8     72  0.446667  32.160051
 2006-02-11      475          5     65  0.164591  10.698423
 2006-01-13      504          0     70  0.142409   9.968634
 2006-01-02      515          0     64  0.134800   8.627219
 2005-12-06      542          0     70  0.117803   8.246238
 2005-11-29      549          0     70  0.113758   7.963072
 2005-11-22      556          0     -1  0.109852  -0.109852
 2005-11-01      577          0     -1  0.098919  -0.098919
 2005-10-20      589          0     -1  0.093168  -0.093168
 2005-09-27      612          0     -1  0.083063  -0.083063
 2005-09-07      632          0     -1  0.075171  -0.075171
 2005-06-12      719          0     69  0.048690   3.359623
 2005-05-29      733          0     -1  0.045404  -0.045404
 2005-05-02      760          0     -1  0.039679  -0.039679
 2005-04-02      790          0     -1  0.034160  -0.034160
 2005-03-13      810          0     -1  0.030915  -0.030915
 2004-11-09      934          0     -1  0.016647  -0.016647

I need to remove the rows where line_race is equal to 0. What’s the most efficient way to do this?


回答 0

如果我理解正确,它应该很简单:

df = df[df.line_race != 0]

If I’m understanding correctly, it should be as simple as:

df = df[df.line_race != 0]

回答 1

但是对于任何将来的绕过器,您可能会提到df = df[df.line_race != 0]在尝试过滤None/丢失值时不执行任何操作。

可以工作:

df = df[df.line_race != 0]

什么都不做:

df = df[df.line_race != None]

可以工作:

df = df[df.line_race.notnull()]

But for any future bypassers you could mention that df = df[df.line_race != 0] doesn’t do anything when trying to filter for None/missing values.

Does work:

df = df[df.line_race != 0]

Doesn’t do anything:

df = df[df.line_race != None]

Does work:

df = df[df.line_race.notnull()]

回答 2

最好的方法是使用布尔掩码:

In [56]: df
Out[56]:
     line_date  daysago  line_race  rating    raw  wrating
0   2007-03-31       62         11      56  1.000   56.000
1   2007-03-10       83         11      67  1.000   67.000
2   2007-02-10      111          9      66  1.000   66.000
3   2007-01-13      139         10      83  0.881   73.096
4   2006-12-23      160         10      88  0.793   69.787
5   2006-11-09      204          9      52  0.637   33.106
6   2006-10-22      222          8      66  0.582   38.408
7   2006-09-29      245          9      70  0.519   36.318
8   2006-09-16      258         11      68  0.486   33.063
9   2006-08-30      275          8      72  0.447   32.160
10  2006-02-11      475          5      65  0.165   10.698
11  2006-01-13      504          0      70  0.142    9.969
12  2006-01-02      515          0      64  0.135    8.627
13  2005-12-06      542          0      70  0.118    8.246
14  2005-11-29      549          0      70  0.114    7.963
15  2005-11-22      556          0      -1  0.110   -0.110
16  2005-11-01      577          0      -1  0.099   -0.099
17  2005-10-20      589          0      -1  0.093   -0.093
18  2005-09-27      612          0      -1  0.083   -0.083
19  2005-09-07      632          0      -1  0.075   -0.075
20  2005-06-12      719          0      69  0.049    3.360
21  2005-05-29      733          0      -1  0.045   -0.045
22  2005-05-02      760          0      -1  0.040   -0.040
23  2005-04-02      790          0      -1  0.034   -0.034
24  2005-03-13      810          0      -1  0.031   -0.031
25  2004-11-09      934          0      -1  0.017   -0.017

In [57]: df[df.line_race != 0]
Out[57]:
     line_date  daysago  line_race  rating    raw  wrating
0   2007-03-31       62         11      56  1.000   56.000
1   2007-03-10       83         11      67  1.000   67.000
2   2007-02-10      111          9      66  1.000   66.000
3   2007-01-13      139         10      83  0.881   73.096
4   2006-12-23      160         10      88  0.793   69.787
5   2006-11-09      204          9      52  0.637   33.106
6   2006-10-22      222          8      66  0.582   38.408
7   2006-09-29      245          9      70  0.519   36.318
8   2006-09-16      258         11      68  0.486   33.063
9   2006-08-30      275          8      72  0.447   32.160
10  2006-02-11      475          5      65  0.165   10.698

更新:现在熊猫0.13了,另一种方法是df.query('line_race != 0')

The best way to do this is with boolean masking:

In [56]: df
Out[56]:
     line_date  daysago  line_race  rating    raw  wrating
0   2007-03-31       62         11      56  1.000   56.000
1   2007-03-10       83         11      67  1.000   67.000
2   2007-02-10      111          9      66  1.000   66.000
3   2007-01-13      139         10      83  0.881   73.096
4   2006-12-23      160         10      88  0.793   69.787
5   2006-11-09      204          9      52  0.637   33.106
6   2006-10-22      222          8      66  0.582   38.408
7   2006-09-29      245          9      70  0.519   36.318
8   2006-09-16      258         11      68  0.486   33.063
9   2006-08-30      275          8      72  0.447   32.160
10  2006-02-11      475          5      65  0.165   10.698
11  2006-01-13      504          0      70  0.142    9.969
12  2006-01-02      515          0      64  0.135    8.627
13  2005-12-06      542          0      70  0.118    8.246
14  2005-11-29      549          0      70  0.114    7.963
15  2005-11-22      556          0      -1  0.110   -0.110
16  2005-11-01      577          0      -1  0.099   -0.099
17  2005-10-20      589          0      -1  0.093   -0.093
18  2005-09-27      612          0      -1  0.083   -0.083
19  2005-09-07      632          0      -1  0.075   -0.075
20  2005-06-12      719          0      69  0.049    3.360
21  2005-05-29      733          0      -1  0.045   -0.045
22  2005-05-02      760          0      -1  0.040   -0.040
23  2005-04-02      790          0      -1  0.034   -0.034
24  2005-03-13      810          0      -1  0.031   -0.031
25  2004-11-09      934          0      -1  0.017   -0.017

In [57]: df[df.line_race != 0]
Out[57]:
     line_date  daysago  line_race  rating    raw  wrating
0   2007-03-31       62         11      56  1.000   56.000
1   2007-03-10       83         11      67  1.000   67.000
2   2007-02-10      111          9      66  1.000   66.000
3   2007-01-13      139         10      83  0.881   73.096
4   2006-12-23      160         10      88  0.793   69.787
5   2006-11-09      204          9      52  0.637   33.106
6   2006-10-22      222          8      66  0.582   38.408
7   2006-09-29      245          9      70  0.519   36.318
8   2006-09-16      258         11      68  0.486   33.063
9   2006-08-30      275          8      72  0.447   32.160
10  2006-02-11      475          5      65  0.165   10.698

UPDATE: Now that pandas 0.13 is out, another way to do this is df.query('line_race != 0').


回答 3

只是添加另一种解决方案,如果您使用的是新的熊猫评估器,这将特别有用,其他解决方案将取代原始的熊猫并失去评估器

df.drop(df.loc[df['line_race']==0].index, inplace=True)

just to add another solution, particularly useful if you are using the new pandas assessors, other solutions will replace the original pandas and lose the assessors

df.drop(df.loc[df['line_race']==0].index, inplace=True)

回答 4

如果要基于列的多个值删除行,则可以使用:

df[(df.line_race != 0) & (df.line_race != 10)]

为删除所有值为0和10的行line_race

If you want to delete rows based on multiple values of the column, you could use:

df[(df.line_race != 0) & (df.line_race != 10)]

To drop all rows with values 0 and 10 for line_race.


回答 5

给出的答案仍然是正确的,因为上面的人说您可以使用df.query('line_race != 0')它,具体取决于您的问题要快得多。强烈推荐。

The given answer is correct nontheless as someone above said you can use df.query('line_race != 0') which depending on your problem is much faster. Highly recommend.


回答 6

虽然上一个答案几乎与我要执行的操作类似,但是使用索引方法不需要使用其他索引方法.loc()。可以通过类似但精确的方式完成

df.drop(df.index[df['line_race'] == 0], inplace = True)

Though the previou answer are almost similar to what I am going to do, but using the index method does not require using another indexing method .loc(). It can be done in a similar but precise manner as

df.drop(df.index[df['line_race'] == 0], inplace = True)

回答 7

另一种方式。可能不是最有效的方法,因为该代码看起来比其他答案中提到的代码更复杂,但是仍然是执行相同操作的另一种方法。

  df = df.drop(df[df['line_race']==0].index)

Another way of doing it. May not be the most efficient way as the code looks a bit more complex than the code mentioned in other answers, but still alternate way of doing the same thing.

  df = df.drop(df[df['line_race']==0].index)

回答 8

只需为DataFrame添加另一种方法即可扩展所有列:

for column in df.columns:
   df = df[df[column]!=0]

例:

def z_score(data,count):
   threshold=3
   for column in data.columns:
       mean = np.mean(data[column])
       std = np.std(data[column])
       for i in data[column]:
           zscore = (i-mean)/std
           if(np.abs(zscore)>threshold):
               count=count+1
               data = data[data[column]!=i]
   return data,count

Just adding another way for DataFrame expanded over all columns:

for column in df.columns:
   df = df[df[column]!=0]

Example:

def z_score(data,count):
   threshold=3
   for column in data.columns:
       mean = np.mean(data[column])
       std = np.std(data[column])
       for i in data[column]:
           zscore = (i-mean)/std
           if(np.abs(zscore)>threshold):
               count=count+1
               data = data[data[column]!=i]
   return data,count

在Python中对子进程进行非阻塞读取

问题:在Python中对子进程进行非阻塞读取

我正在使用子流程模块来启动子流程并连接到其输出流(stdout)。我希望能够在其stdout上执行非阻塞读取。有没有一种方法可以使.readline无阻塞或在调用之前检查流中是否有数据.readline?我希望这是可移植的,或者至少要在Windows和Linux下工作。

这是我目前的操作方式(.readline如果没有可用数据,则会阻塞):

p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()

I’m using the subprocess module to start a subprocess and connect to it’s output stream (stdout). I want to be able to execute non-blocking reads on its stdout. Is there a way to make .readline non-blocking or to check if there is data on the stream before I invoke .readline? I’d like this to be portable or at least work under Windows and Linux.

here is how I do it for now (It’s blocking on the .readline if no data is avaible):

p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()

回答 0

fcntlselectasyncproc不会在这种情况下帮助。

不管使用什么操作系统,一种可靠地读取流而不阻塞的可靠方法是使用Queue.get_nowait()

import sys
from subprocess import PIPE, Popen
from threading  import Thread

try:
    from queue import Queue, Empty
except ImportError:
    from Queue import Queue, Empty  # python 2.x

ON_POSIX = 'posix' in sys.builtin_module_names

def enqueue_output(out, queue):
    for line in iter(out.readline, b''):
        queue.put(line)
    out.close()

p = Popen(['myprogram.exe'], stdout=PIPE, bufsize=1, close_fds=ON_POSIX)
q = Queue()
t = Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True # thread dies with the program
t.start()

# ... do other things here

# read line without blocking
try:  line = q.get_nowait() # or q.get(timeout=.1)
except Empty:
    print('no output yet')
else: # got line
    # ... do something with line

fcntl, select, asyncproc won’t help in this case.

A reliable way to read a stream without blocking regardless of operating system is to use Queue.get_nowait():

import sys
from subprocess import PIPE, Popen
from threading  import Thread

try:
    from queue import Queue, Empty
except ImportError:
    from Queue import Queue, Empty  # python 2.x

ON_POSIX = 'posix' in sys.builtin_module_names

def enqueue_output(out, queue):
    for line in iter(out.readline, b''):
        queue.put(line)
    out.close()

p = Popen(['myprogram.exe'], stdout=PIPE, bufsize=1, close_fds=ON_POSIX)
q = Queue()
t = Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True # thread dies with the program
t.start()

# ... do other things here

# read line without blocking
try:  line = q.get_nowait() # or q.get(timeout=.1)
except Empty:
    print('no output yet')
else: # got line
    # ... do something with line

回答 1

我经常遇到类似的问题。我经常编写的Python程序需要具有执行一些主要功能的能力,同时还要从命令行(stdin)接受用户输入。仅将用户输入处理功能放在另一个线程中并不能解决问题,因为会readline()阻塞并且没有超时。如果主要功能已经完成,并且不再需要等待进一步的用户输入,我通常希望我的程序退出,但是不能,因为readline()仍然在另一个线程中等待一行。我发现此问题的解决方案是使用fcntl模块使stdin成为非阻塞文件:

import fcntl
import os
import sys

# make stdin a non-blocking file
fd = sys.stdin.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)

# user input handling thread
while mainThreadIsRunning:
      try: input = sys.stdin.readline()
      except: continue
      handleInput(input)

在我看来,这比使用选择或信号模块来解决此问题要干净一些,但是再说一次,它仅适用于UNIX …

I have often had a similar problem; Python programs I write frequently need to have the ability to execute some primary functionality while simultaneously accepting user input from the command line (stdin). Simply putting the user input handling functionality in another thread doesn’t solve the problem because readline() blocks and has no timeout. If the primary functionality is complete and there is no longer any need to wait for further user input I typically want my program to exit, but it can’t because readline() is still blocking in the other thread waiting for a line. A solution I have found to this problem is to make stdin a non-blocking file using the fcntl module:

import fcntl
import os
import sys

# make stdin a non-blocking file
fd = sys.stdin.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)

# user input handling thread
while mainThreadIsRunning:
      try: input = sys.stdin.readline()
      except: continue
      handleInput(input)

In my opinion this is a bit cleaner than using the select or signal modules to solve this problem but then again it only works on UNIX…


回答 2

Python 3.4引入了用于异步IO 模块的临时APIasyncio

该方法类似于twisted@Bryan Ward的基于答案 -定义协议,并在数据准备好后立即调用其方法:

#!/usr/bin/env python3
import asyncio
import os

class SubprocessProtocol(asyncio.SubprocessProtocol):
    def pipe_data_received(self, fd, data):
        if fd == 1: # got stdout data (bytes)
            print(data)

    def connection_lost(self, exc):
        loop.stop() # end loop.run_forever()

if os.name == 'nt':
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(loop.subprocess_exec(SubprocessProtocol, 
        "myprogram.exe", "arg1", "arg2"))
    loop.run_forever()
finally:
    loop.close()

请参阅docs中的“子流程”

有一个高级接口asyncio.create_subprocess_exec(),该接口返回允许使用协程 (使用/ Python 3.5+语法)异步读取行的Process对象StreamReader.readline()asyncawait

#!/usr/bin/env python3.5
import asyncio
import locale
import sys
from asyncio.subprocess import PIPE
from contextlib import closing

async def readline_and_kill(*args):
    # start child process
    process = await asyncio.create_subprocess_exec(*args, stdout=PIPE)

    # read line (sequence of bytes ending with b'\n') asynchronously
    async for line in process.stdout:
        print("got line:", line.decode(locale.getpreferredencoding(False)))
        break
    process.kill()
    return await process.wait() # wait for the child process to exit


if sys.platform == "win32":
    loop = asyncio.ProactorEventLoop()
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()

with closing(loop):
    sys.exit(loop.run_until_complete(readline_and_kill(
        "myprogram.exe", "arg1", "arg2")))

readline_and_kill() 执行以下任务:

  • 启动子进程,将其标准输出重定向到管道
  • 从子进程的stdout异步读取一行
  • 杀死子进程
  • 等待它退出

如有必要,每个步骤可能会受到超时秒数的限制。

Python 3.4 introduces new provisional API for asynchronous IO — asyncio module.

The approach is similar to twisted-based answer by @Bryan Ward — define a protocol and its methods are called as soon as data is ready:

#!/usr/bin/env python3
import asyncio
import os

class SubprocessProtocol(asyncio.SubprocessProtocol):
    def pipe_data_received(self, fd, data):
        if fd == 1: # got stdout data (bytes)
            print(data)

    def connection_lost(self, exc):
        loop.stop() # end loop.run_forever()

if os.name == 'nt':
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(loop.subprocess_exec(SubprocessProtocol, 
        "myprogram.exe", "arg1", "arg2"))
    loop.run_forever()
finally:
    loop.close()

See “Subprocess” in the docs.

There is a high-level interface asyncio.create_subprocess_exec() that returns Process objects that allows to read a line asynchroniosly using StreamReader.readline() coroutine (with async/await Python 3.5+ syntax):

#!/usr/bin/env python3.5
import asyncio
import locale
import sys
from asyncio.subprocess import PIPE
from contextlib import closing

async def readline_and_kill(*args):
    # start child process
    process = await asyncio.create_subprocess_exec(*args, stdout=PIPE)

    # read line (sequence of bytes ending with b'\n') asynchronously
    async for line in process.stdout:
        print("got line:", line.decode(locale.getpreferredencoding(False)))
        break
    process.kill()
    return await process.wait() # wait for the child process to exit


if sys.platform == "win32":
    loop = asyncio.ProactorEventLoop()
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()

with closing(loop):
    sys.exit(loop.run_until_complete(readline_and_kill(
        "myprogram.exe", "arg1", "arg2")))

readline_and_kill() performs the following tasks:

  • start subprocess, redirect its stdout to a pipe
  • read a line from subprocess’ stdout asynchronously
  • kill subprocess
  • wait for it to exit

Each step could be limited by timeout seconds if necessary.


回答 3

尝试使用asyncproc模块。例如:

import os
from asyncproc import Process
myProc = Process("myprogram.app")

while True:
    # check to see if process has ended
    poll = myProc.wait(os.WNOHANG)
    if poll != None:
        break
    # print any new output
    out = myProc.read()
    if out != "":
        print out

该模块负责S.Lott建议的所有线程。

Try the asyncproc module. For example:

import os
from asyncproc import Process
myProc = Process("myprogram.app")

while True:
    # check to see if process has ended
    poll = myProc.wait(os.WNOHANG)
    if poll != None:
        break
    # print any new output
    out = myProc.read()
    if out != "":
        print out

The module takes care of all the threading as suggested by S.Lott.


回答 4

您可以在Twisted中非常轻松地执行此操作。根据您现有的代码库,这可能不是那么容易使用,但是如果您要构建一个扭曲的应用程序,那么类似这样的事情将变得微不足道。您创建一个ProcessProtocol类,并重写该outReceived()方法。扭曲(取决于所使用的反应堆)通常只是一个大select()循环,其中安装了用于处理来自不同文件描述符(通常是网络套接字)的数据的回调。因此,该outReceived()方法只是安装用于处理来自的数据的回调STDOUT。演示此行为的简单示例如下:

from twisted.internet import protocol, reactor

class MyProcessProtocol(protocol.ProcessProtocol):

    def outReceived(self, data):
        print data

proc = MyProcessProtocol()
reactor.spawnProcess(proc, './myprogram', ['./myprogram', 'arg1', 'arg2', 'arg3'])
reactor.run()

扭曲的文档对此有一些有用的信息。

如果您围绕Twisted构建整个应用程序,它将与本地或远程的其他进程进行异步通信,就像这样非常优雅。另一方面,如果您的程序不是基于Twisted构建的,那么它实际上并没有那么大的帮助。希望这对其他读者有帮助,即使它不适用于您的特定应用程序。

You can do this really easily in Twisted. Depending upon your existing code base, this might not be that easy to use, but if you are building a twisted application, then things like this become almost trivial. You create a ProcessProtocol class, and override the outReceived() method. Twisted (depending upon the reactor used) is usually just a big select() loop with callbacks installed to handle data from different file descriptors (often network sockets). So the outReceived() method is simply installing a callback for handling data coming from STDOUT. A simple example demonstrating this behavior is as follows:

from twisted.internet import protocol, reactor

class MyProcessProtocol(protocol.ProcessProtocol):

    def outReceived(self, data):
        print data

proc = MyProcessProtocol()
reactor.spawnProcess(proc, './myprogram', ['./myprogram', 'arg1', 'arg2', 'arg3'])
reactor.run()

The Twisted documentation has some good information on this.

If you build your entire application around Twisted, it makes asynchronous communication with other processes, local or remote, really elegant like this. On the other hand, if your program isn’t built on top of Twisted, this isn’t really going to be that helpful. Hopefully this can be helpful to other readers, even if it isn’t applicable for your particular application.


回答 5

使用select&read(1)。

import subprocess     #no new requirements
def readAllSoFar(proc, retVal=''): 
  while (select.select([proc.stdout],[],[],0)[0]!=[]):   
    retVal+=proc.stdout.read(1)
  return retVal
p = subprocess.Popen(['/bin/ls'], stdout=subprocess.PIPE)
while not p.poll():
  print (readAllSoFar(p))

对于类似readline()的:

lines = ['']
while not p.poll():
  lines = readAllSoFar(p, lines[-1]).split('\n')
  for a in range(len(lines)-1):
    print a
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
  print a

Use select & read(1).

import subprocess     #no new requirements
def readAllSoFar(proc, retVal=''): 
  while (select.select([proc.stdout],[],[],0)[0]!=[]):   
    retVal+=proc.stdout.read(1)
  return retVal
p = subprocess.Popen(['/bin/ls'], stdout=subprocess.PIPE)
while not p.poll():
  print (readAllSoFar(p))

For readline()-like:

lines = ['']
while not p.poll():
  lines = readAllSoFar(p, lines[-1]).split('\n')
  for a in range(len(lines)-1):
    print a
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
  print a

回答 6

一种解决方案是使另一个进程执行该进程的读取,或者使该进程的线程超时。

这是超时功能的线程版本:

http://code.activestate.com/recipes/473878/

但是,您需要在输入stdout时对其进行阅读吗?另一个解决方案可能是将输出转储到文件中,并等待使用p.wait()完成该过程。

f = open('myprogram_output.txt','w')
p = subprocess.Popen('myprogram.exe', stdout=f)
p.wait()
f.close()


str = open('myprogram_output.txt','r').read()

One solution is to make another process to perform your read of the process, or make a thread of the process with a timeout.

Here’s the threaded version of a timeout function:

http://code.activestate.com/recipes/473878/

However, do you need to read the stdout as it’s coming in? Another solution may be to dump the output to a file and wait for the process to finish using p.wait().

f = open('myprogram_output.txt','w')
p = subprocess.Popen('myprogram.exe', stdout=f)
p.wait()
f.close()


str = open('myprogram_output.txt','r').read()

回答 7

免责声明:这仅适用于龙卷风

您可以通过将fd设置为非阻塞,然后使用ioloop来注册回调来实现。我将其包装在一个名为tornado_subprocess的鸡蛋中,您可以通过PyPI安装它:

easy_install tornado_subprocess

现在您可以执行以下操作:

import tornado_subprocess
import tornado.ioloop

    def print_res( status, stdout, stderr ) :
    print status, stdout, stderr
    if status == 0:
        print "OK:"
        print stdout
    else:
        print "ERROR:"
        print stderr

t = tornado_subprocess.Subprocess( print_res, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
tornado.ioloop.IOLoop.instance().start()

您也可以将其与RequestHandler一起使用

class MyHandler(tornado.web.RequestHandler):
    def on_done(self, status, stdout, stderr):
        self.write( stdout )
        self.finish()

    @tornado.web.asynchronous
    def get(self):
        t = tornado_subprocess.Subprocess( self.on_done, timeout=30, args=[ "cat", "/etc/passwd" ] )
        t.start()

Disclaimer: this works only for tornado

You can do this by setting the fd to be nonblocking and then use ioloop to register callbacks. I have packaged this in an egg called tornado_subprocess and you can install it via PyPI:

easy_install tornado_subprocess

now you can do something like this:

import tornado_subprocess
import tornado.ioloop

    def print_res( status, stdout, stderr ) :
    print status, stdout, stderr
    if status == 0:
        print "OK:"
        print stdout
    else:
        print "ERROR:"
        print stderr

t = tornado_subprocess.Subprocess( print_res, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
tornado.ioloop.IOLoop.instance().start()

you can also use it with a RequestHandler

class MyHandler(tornado.web.RequestHandler):
    def on_done(self, status, stdout, stderr):
        self.write( stdout )
        self.finish()

    @tornado.web.asynchronous
    def get(self):
        t = tornado_subprocess.Subprocess( self.on_done, timeout=30, args=[ "cat", "/etc/passwd" ] )
        t.start()

回答 8

现有解决方案对我不起作用(详细信息如下)。最终有效的方法是使用read(1)实现readline(基于此答案)。后者不会阻止:

from subprocess import Popen, PIPE
from threading import Thread
def process_output(myprocess): #output-consuming thread
    nextline = None
    buf = ''
    while True:
        #--- extract line using read(1)
        out = myprocess.stdout.read(1)
        if out == '' and myprocess.poll() != None: break
        if out != '':
            buf += out
            if out == '\n':
                nextline = buf
                buf = ''
        if not nextline: continue
        line = nextline
        nextline = None

        #--- do whatever you want with line here
        print 'Line is:', line
    myprocess.stdout.close()

myprocess = Popen('myprogram.exe', stdout=PIPE) #output-producing process
p1 = Thread(target=process_output, args=(dcmpid,)) #output-consuming thread
p1.daemon = True
p1.start()

#--- do whatever here and then kill process and thread if needed
if myprocess.poll() == None: #kill process; will automatically stop thread
    myprocess.kill()
    myprocess.wait()
if p1 and p1.is_alive(): #wait for thread to finish
    p1.join()

为什么现有解决方案不起作用:

  1. 需要readline的解决方案(包括基于Queue的解决方案)始终会阻塞。很难(不可能?)杀死执行readline的线程。它仅在创建它的过程完成时被杀死,而在产生输出的过程被杀死时则不被杀死。
  2. 正如anonnn指出的那样,将低级fcntl与高级别readline调用混合可能无法正常工作。
  3. 使用select.poll()很简单,但是根据python文档,它在Windows上不起作用。
  4. 使用第三方库似乎无法胜任此任务,并增加了其他依赖性。

Existing solutions did not work for me (details below). What finally worked was to implement readline using read(1) (based on this answer). The latter does not block:

from subprocess import Popen, PIPE
from threading import Thread
def process_output(myprocess): #output-consuming thread
    nextline = None
    buf = ''
    while True:
        #--- extract line using read(1)
        out = myprocess.stdout.read(1)
        if out == '' and myprocess.poll() != None: break
        if out != '':
            buf += out
            if out == '\n':
                nextline = buf
                buf = ''
        if not nextline: continue
        line = nextline
        nextline = None

        #--- do whatever you want with line here
        print 'Line is:', line
    myprocess.stdout.close()

myprocess = Popen('myprogram.exe', stdout=PIPE) #output-producing process
p1 = Thread(target=process_output, args=(dcmpid,)) #output-consuming thread
p1.daemon = True
p1.start()

#--- do whatever here and then kill process and thread if needed
if myprocess.poll() == None: #kill process; will automatically stop thread
    myprocess.kill()
    myprocess.wait()
if p1 and p1.is_alive(): #wait for thread to finish
    p1.join()

Why existing solutions did not work:

  1. Solutions that require readline (including the Queue based ones) always block. It is difficult (impossible?) to kill the thread that executes readline. It only gets killed when the process that created it finishes, but not when the output-producing process is killed.
  2. Mixing low-level fcntl with high-level readline calls may not work properly as anonnn has pointed out.
  3. Using select.poll() is neat, but doesn’t work on Windows according to python docs.
  4. Using third-party libraries seems overkill for this task and adds additional dependencies.

回答 9

这是我的代码,用于捕获子流程ASAP的每个输出,包括部分行。它同时抽水,并且以几乎正确的顺序抽出stdout和stderr。

经过测试并在Python 2.7 linux&Windows上正确工作。

#!/usr/bin/python
#
# Runner with stdout/stderr catcher
#
from sys import argv
from subprocess import Popen, PIPE
import os, io
from threading import Thread
import Queue
def __main__():
    if (len(argv) > 1) and (argv[-1] == "-sub-"):
        import time, sys
        print "Application runned!"
        time.sleep(2)
        print "Slept 2 second"
        time.sleep(1)
        print "Slept 1 additional second",
        time.sleep(2)
        sys.stderr.write("Stderr output after 5 seconds")
        print "Eol on stdin"
        sys.stderr.write("Eol on stderr\n")
        time.sleep(1)
        print "Wow, we have end of work!",
    else:
        os.environ["PYTHONUNBUFFERED"]="1"
        try:
            p = Popen( argv + ["-sub-"],
                       bufsize=0, # line-buffered
                       stdin=PIPE, stdout=PIPE, stderr=PIPE )
        except WindowsError, W:
            if W.winerror==193:
                p = Popen( argv + ["-sub-"],
                           shell=True, # Try to run via shell
                           bufsize=0, # line-buffered
                           stdin=PIPE, stdout=PIPE, stderr=PIPE )
            else:
                raise
        inp = Queue.Queue()
        sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
        serr = io.open(p.stderr.fileno(), 'rb', closefd=False)
        def Pump(stream, category):
            queue = Queue.Queue()
            def rdr():
                while True:
                    buf = stream.read1(8192)
                    if len(buf)>0:
                        queue.put( buf )
                    else:
                        queue.put( None )
                        return
            def clct():
                active = True
                while active:
                    r = queue.get()
                    try:
                        while True:
                            r1 = queue.get(timeout=0.005)
                            if r1 is None:
                                active = False
                                break
                            else:
                                r += r1
                    except Queue.Empty:
                        pass
                    inp.put( (category, r) )
            for tgt in [rdr, clct]:
                th = Thread(target=tgt)
                th.setDaemon(True)
                th.start()
        Pump(sout, 'stdout')
        Pump(serr, 'stderr')

        while p.poll() is None:
            # App still working
            try:
                chan,line = inp.get(timeout = 1.0)
                if chan=='stdout':
                    print "STDOUT>>", line, "<?<"
                elif chan=='stderr':
                    print " ERROR==", line, "=?="
            except Queue.Empty:
                pass
        print "Finish"

if __name__ == '__main__':
    __main__()

Here is my code, used to catch every output from subprocess ASAP, including partial lines. It pumps at same time and stdout and stderr in almost correct order.

Tested and correctly worked on Python 2.7 linux & windows.

#!/usr/bin/python
#
# Runner with stdout/stderr catcher
#
from sys import argv
from subprocess import Popen, PIPE
import os, io
from threading import Thread
import Queue
def __main__():
    if (len(argv) > 1) and (argv[-1] == "-sub-"):
        import time, sys
        print "Application runned!"
        time.sleep(2)
        print "Slept 2 second"
        time.sleep(1)
        print "Slept 1 additional second",
        time.sleep(2)
        sys.stderr.write("Stderr output after 5 seconds")
        print "Eol on stdin"
        sys.stderr.write("Eol on stderr\n")
        time.sleep(1)
        print "Wow, we have end of work!",
    else:
        os.environ["PYTHONUNBUFFERED"]="1"
        try:
            p = Popen( argv + ["-sub-"],
                       bufsize=0, # line-buffered
                       stdin=PIPE, stdout=PIPE, stderr=PIPE )
        except WindowsError, W:
            if W.winerror==193:
                p = Popen( argv + ["-sub-"],
                           shell=True, # Try to run via shell
                           bufsize=0, # line-buffered
                           stdin=PIPE, stdout=PIPE, stderr=PIPE )
            else:
                raise
        inp = Queue.Queue()
        sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
        serr = io.open(p.stderr.fileno(), 'rb', closefd=False)
        def Pump(stream, category):
            queue = Queue.Queue()
            def rdr():
                while True:
                    buf = stream.read1(8192)
                    if len(buf)>0:
                        queue.put( buf )
                    else:
                        queue.put( None )
                        return
            def clct():
                active = True
                while active:
                    r = queue.get()
                    try:
                        while True:
                            r1 = queue.get(timeout=0.005)
                            if r1 is None:
                                active = False
                                break
                            else:
                                r += r1
                    except Queue.Empty:
                        pass
                    inp.put( (category, r) )
            for tgt in [rdr, clct]:
                th = Thread(target=tgt)
                th.setDaemon(True)
                th.start()
        Pump(sout, 'stdout')
        Pump(serr, 'stderr')

        while p.poll() is None:
            # App still working
            try:
                chan,line = inp.get(timeout = 1.0)
                if chan=='stdout':
                    print "STDOUT>>", line, "<?<"
                elif chan=='stderr':
                    print " ERROR==", line, "=?="
            except Queue.Empty:
                pass
        print "Finish"

if __name__ == '__main__':
    __main__()

回答 10

我添加此问题以读取一些subprocess.Popen stdout。这是我的非阻塞读取解决方案:

import fcntl

def non_block_read(output):
    fd = output.fileno()
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    try:
        return output.read()
    except:
        return ""

# Use example
from subprocess import *
sb = Popen("echo test && sleep 1000", shell=True, stdout=PIPE)
sb.kill()

# sb.stdout.read() # <-- This will block
non_block_read(sb.stdout)
'test\n'

I add this problem to read some subprocess.Popen stdout. Here is my non blocking read solution:

import fcntl

def non_block_read(output):
    fd = output.fileno()
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    try:
        return output.read()
    except:
        return ""

# Use example
from subprocess import *
sb = Popen("echo test && sleep 1000", shell=True, stdout=PIPE)
sb.kill()

# sb.stdout.read() # <-- This will block
non_block_read(sb.stdout)
'test\n'

回答 11

这无阻塞读的版本并不需要特殊的模块,并在大多数Linux发行版的工作外的开箱。

import os
import sys
import time
import fcntl
import subprocess

def async_read(fd):
    # set non-blocking flag while preserving old flags
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    # read char until EOF hit
    while True:
        try:
            ch = os.read(fd.fileno(), 1)
            # EOF
            if not ch: break                                                                                                                                                              
            sys.stdout.write(ch)
        except OSError:
            # waiting for data be available on fd
            pass

def shell(args, async=True):
    # merge stderr and stdout
    proc = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    if async: async_read(proc.stdout)
    sout, serr = proc.communicate()
    return (sout, serr)

if __name__ == '__main__':
    cmd = 'ping 8.8.8.8'
    sout, serr = shell(cmd.split())

This version of non-blocking read doesn’t require special modules and will work out-of-the-box on majority of Linux distros.

import os
import sys
import time
import fcntl
import subprocess

def async_read(fd):
    # set non-blocking flag while preserving old flags
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    # read char until EOF hit
    while True:
        try:
            ch = os.read(fd.fileno(), 1)
            # EOF
            if not ch: break                                                                                                                                                              
            sys.stdout.write(ch)
        except OSError:
            # waiting for data be available on fd
            pass

def shell(args, async=True):
    # merge stderr and stdout
    proc = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    if async: async_read(proc.stdout)
    sout, serr = proc.communicate()
    return (sout, serr)

if __name__ == '__main__':
    cmd = 'ping 8.8.8.8'
    sout, serr = shell(cmd.split())

回答 12

这是一个基于线程的简单解决方案,其中:

  • 在Linux和Windows上均可使用(不依赖select)。
  • 同时读取stdoutstderrasynchronouly。
  • 不依赖于具有任意等待时间的主动轮询(CPU友好)。
  • 不使用asyncio(可能与其他库冲突)。
  • 运行直到子进程终止。

打印机

import time
import sys

sys.stdout.write("Hello\n")
sys.stdout.flush()
time.sleep(1)
sys.stdout.write("World!\n")
sys.stdout.flush()
time.sleep(1)
sys.stderr.write("That's an error\n")
sys.stderr.flush()
time.sleep(2)
sys.stdout.write("Actually, I'm fine\n")
sys.stdout.flush()
time.sleep(1)

reader.py

import queue
import subprocess
import sys
import threading


def enqueue_stream(stream, queue, type):
    for line in iter(stream.readline, b''):
        queue.put(str(type) + line.decode('utf-8'))
    stream.close()


def enqueue_process(process, queue):
    process.wait()
    queue.put('x')


p = subprocess.Popen('python printer.py', stdout=subprocess.PIPE, stderr=subprocess.PIPE)
q = queue.Queue()
to = threading.Thread(target=enqueue_stream, args=(p.stdout, q, 1))
te = threading.Thread(target=enqueue_stream, args=(p.stderr, q, 2))
tp = threading.Thread(target=enqueue_process, args=(p, q))
te.start()
to.start()
tp.start()

while True:
    line = q.get()
    if line[0] == 'x':
        break
    if line[0] == '2':  # stderr
        sys.stdout.write("\033[0;31m")  # ANSI red color
    sys.stdout.write(line[1:])
    if line[0] == '2':
        sys.stdout.write("\033[0m")  # reset ANSI code
    sys.stdout.flush()

tp.join()
to.join()
te.join()

Here is a simple solution based on threads which:

  • works on both Linux and Windows (not relying on select).
  • reads both stdout and stderr asynchronouly.
  • doesn’t rely on active polling with arbitrary waiting time (CPU friendly).
  • doesn’t use asyncio (which may conflict with other libraries).
  • runs until the child process terminates.

printer.py

import time
import sys

sys.stdout.write("Hello\n")
sys.stdout.flush()
time.sleep(1)
sys.stdout.write("World!\n")
sys.stdout.flush()
time.sleep(1)
sys.stderr.write("That's an error\n")
sys.stderr.flush()
time.sleep(2)
sys.stdout.write("Actually, I'm fine\n")
sys.stdout.flush()
time.sleep(1)

reader.py

import queue
import subprocess
import sys
import threading


def enqueue_stream(stream, queue, type):
    for line in iter(stream.readline, b''):
        queue.put(str(type) + line.decode('utf-8'))
    stream.close()


def enqueue_process(process, queue):
    process.wait()
    queue.put('x')


p = subprocess.Popen('python printer.py', stdout=subprocess.PIPE, stderr=subprocess.PIPE)
q = queue.Queue()
to = threading.Thread(target=enqueue_stream, args=(p.stdout, q, 1))
te = threading.Thread(target=enqueue_stream, args=(p.stderr, q, 2))
tp = threading.Thread(target=enqueue_process, args=(p, q))
te.start()
to.start()
tp.start()

while True:
    line = q.get()
    if line[0] == 'x':
        break
    if line[0] == '2':  # stderr
        sys.stdout.write("\033[0;31m")  # ANSI red color
    sys.stdout.write(line[1:])
    if line[0] == '2':
        sys.stdout.write("\033[0m")  # reset ANSI code
    sys.stdout.flush()

tp.join()
to.join()
te.join()

回答 13

在此添加答案,因为它提供了在Windows和Unix上设置非阻塞管道的功能。

所有ctypes细节都感谢@techtonik的回答

在Unix和Windows系统上都可以使用经过稍微修改的版本。

  • 与Python3兼容(仅需要很小的更改)
  • 包括posix版本,并定义要使用的异常。

这样,您可以对Unix和Windows代码使用相同的功能和异常。

# pipe_non_blocking.py (module)
"""
Example use:

    p = subprocess.Popen(
            command,
            stdout=subprocess.PIPE,
            )

    pipe_non_blocking_set(p.stdout.fileno())

    try:
        data = os.read(p.stdout.fileno(), 1)
    except PortableBlockingIOError as ex:
        if not pipe_non_blocking_is_error_blocking(ex):
            raise ex
"""


__all__ = (
    "pipe_non_blocking_set",
    "pipe_non_blocking_is_error_blocking",
    "PortableBlockingIOError",
    )

import os


if os.name == "nt":
    def pipe_non_blocking_set(fd):
        # Constant could define globally but avoid polluting the name-space
        # thanks to: /programming/34504970
        import msvcrt

        from ctypes import windll, byref, wintypes, WinError, POINTER
        from ctypes.wintypes import HANDLE, DWORD, BOOL

        LPDWORD = POINTER(DWORD)

        PIPE_NOWAIT = wintypes.DWORD(0x00000001)

        def pipe_no_wait(pipefd):
            SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
            SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD]
            SetNamedPipeHandleState.restype = BOOL

            h = msvcrt.get_osfhandle(pipefd)

            res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None)
            if res == 0:
                print(WinError())
                return False
            return True

        return pipe_no_wait(fd)

    def pipe_non_blocking_is_error_blocking(ex):
        if not isinstance(ex, PortableBlockingIOError):
            return False
        from ctypes import GetLastError
        ERROR_NO_DATA = 232

        return (GetLastError() == ERROR_NO_DATA)

    PortableBlockingIOError = OSError
else:
    def pipe_non_blocking_set(fd):
        import fcntl
        fl = fcntl.fcntl(fd, fcntl.F_GETFL)
        fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
        return True

    def pipe_non_blocking_is_error_blocking(ex):
        if not isinstance(ex, PortableBlockingIOError):
            return False
        return True

    PortableBlockingIOError = BlockingIOError

为了避免读取不完整的数据,我最终编写了自己的readline生成器(该生成器返回每行的字节字符串)。

它是一个生成器,因此您可以例如…

def non_blocking_readlines(f, chunk=1024):
    """
    Iterate over lines, yielding b'' when nothings left
    or when new data is not yet available.

    stdout_iter = iter(non_blocking_readlines(process.stdout))

    line = next(stdout_iter)  # will be a line or b''.
    """
    import os

    from .pipe_non_blocking import (
            pipe_non_blocking_set,
            pipe_non_blocking_is_error_blocking,
            PortableBlockingIOError,
            )

    fd = f.fileno()
    pipe_non_blocking_set(fd)

    blocks = []

    while True:
        try:
            data = os.read(fd, chunk)
            if not data:
                # case were reading finishes with no trailing newline
                yield b''.join(blocks)
                blocks.clear()
        except PortableBlockingIOError as ex:
            if not pipe_non_blocking_is_error_blocking(ex):
                raise ex

            yield b''
            continue

        while True:
            n = data.find(b'\n')
            if n == -1:
                break

            yield b''.join(blocks) + data[:n + 1]
            data = data[n + 1:]
            blocks.clear()
        blocks.append(data)

Adding this answer here since it provides ability to set non-blocking pipes on Windows and Unix.

All the ctypes details are thanks to @techtonik’s answer.

There is a slightly modified version to be used both on Unix and Windows systems.

  • Python3 compatible (only minor change needed).
  • Includes posix version, and defines exception to use for either.

This way you can use the same function and exception for Unix and Windows code.

# pipe_non_blocking.py (module)
"""
Example use:

    p = subprocess.Popen(
            command,
            stdout=subprocess.PIPE,
            )

    pipe_non_blocking_set(p.stdout.fileno())

    try:
        data = os.read(p.stdout.fileno(), 1)
    except PortableBlockingIOError as ex:
        if not pipe_non_blocking_is_error_blocking(ex):
            raise ex
"""


__all__ = (
    "pipe_non_blocking_set",
    "pipe_non_blocking_is_error_blocking",
    "PortableBlockingIOError",
    )

import os


if os.name == "nt":
    def pipe_non_blocking_set(fd):
        # Constant could define globally but avoid polluting the name-space
        # thanks to: https://stackoverflow.com/questions/34504970
        import msvcrt

        from ctypes import windll, byref, wintypes, WinError, POINTER
        from ctypes.wintypes import HANDLE, DWORD, BOOL

        LPDWORD = POINTER(DWORD)

        PIPE_NOWAIT = wintypes.DWORD(0x00000001)

        def pipe_no_wait(pipefd):
            SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
            SetNamedPipeHandleState.argtypes = [HANDLE, LPDWORD, LPDWORD, LPDWORD]
            SetNamedPipeHandleState.restype = BOOL

            h = msvcrt.get_osfhandle(pipefd)

            res = windll.kernel32.SetNamedPipeHandleState(h, byref(PIPE_NOWAIT), None, None)
            if res == 0:
                print(WinError())
                return False
            return True

        return pipe_no_wait(fd)

    def pipe_non_blocking_is_error_blocking(ex):
        if not isinstance(ex, PortableBlockingIOError):
            return False
        from ctypes import GetLastError
        ERROR_NO_DATA = 232

        return (GetLastError() == ERROR_NO_DATA)

    PortableBlockingIOError = OSError
else:
    def pipe_non_blocking_set(fd):
        import fcntl
        fl = fcntl.fcntl(fd, fcntl.F_GETFL)
        fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
        return True

    def pipe_non_blocking_is_error_blocking(ex):
        if not isinstance(ex, PortableBlockingIOError):
            return False
        return True

    PortableBlockingIOError = BlockingIOError

To avoid reading incomplete data, I ended up writing my own readline generator (which returns the byte string for each line).

Its a generator so you can for example…

def non_blocking_readlines(f, chunk=1024):
    """
    Iterate over lines, yielding b'' when nothings left
    or when new data is not yet available.

    stdout_iter = iter(non_blocking_readlines(process.stdout))

    line = next(stdout_iter)  # will be a line or b''.
    """
    import os

    from .pipe_non_blocking import (
            pipe_non_blocking_set,
            pipe_non_blocking_is_error_blocking,
            PortableBlockingIOError,
            )

    fd = f.fileno()
    pipe_non_blocking_set(fd)

    blocks = []

    while True:
        try:
            data = os.read(fd, chunk)
            if not data:
                # case were reading finishes with no trailing newline
                yield b''.join(blocks)
                blocks.clear()
        except PortableBlockingIOError as ex:
            if not pipe_non_blocking_is_error_blocking(ex):
                raise ex

            yield b''
            continue

        while True:
            n = data.find(b'\n')
            if n == -1:
                break

            yield b''.join(blocks) + data[:n + 1]
            data = data[n + 1:]
            blocks.clear()
        blocks.append(data)

回答 14

我有原始发问者的问题,但不希望调用线程。我将Jesse的解决方案与管道中的直接read()以及我自己的用于行读取的缓冲区处理程序混合在一起(但是,我的子进程ping总是写完整的行<系统页面大小)。通过仅阅读通过gobject注册的io手表,可以避免繁忙的等待。这些天,我通常在gobject MainLoop中运行代码以避免线程。

def set_up_ping(ip, w):
# run the sub-process
# watch the resultant pipe
p = subprocess.Popen(['/bin/ping', ip], stdout=subprocess.PIPE)
# make stdout a non-blocking file
fl = fcntl.fcntl(p.stdout, fcntl.F_GETFL)
fcntl.fcntl(p.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
stdout_gid = gobject.io_add_watch(p.stdout, gobject.IO_IN, w)
return stdout_gid # for shutting down

观察者是

def watch(f, *other):
print 'reading',f.read()
return True

并且主程序设置ping,然后调用gobject邮件循环。

def main():
set_up_ping('192.168.1.8', watch)
# discard gid as unused here
gobject.MainLoop().run()

其他任何工作都附加到gobject中的回调中。

I have the original questioner’s problem, but did not wish to invoke threads. I mixed Jesse’s solution with a direct read() from the pipe, and my own buffer-handler for line reads (however, my sub-process – ping – always wrote full lines < a system page size). I avoid busy-waiting by only reading in a gobject-registered io watch. These days I usually run code within a gobject MainLoop to avoid threads.

def set_up_ping(ip, w):
# run the sub-process
# watch the resultant pipe
p = subprocess.Popen(['/bin/ping', ip], stdout=subprocess.PIPE)
# make stdout a non-blocking file
fl = fcntl.fcntl(p.stdout, fcntl.F_GETFL)
fcntl.fcntl(p.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
stdout_gid = gobject.io_add_watch(p.stdout, gobject.IO_IN, w)
return stdout_gid # for shutting down

The watcher is

def watch(f, *other):
print 'reading',f.read()
return True

And the main program sets up a ping and then calls gobject mail loop.

def main():
set_up_ping('192.168.1.8', watch)
# discard gid as unused here
gobject.MainLoop().run()

Any other work is attached to callbacks in gobject.


回答 15

在现代Python中,情况要好得多。

这是一个简单的子程序“ hello.py”:

#!/usr/bin/env python3

while True:
    i = input()
    if i == "quit":
        break
    print(f"hello {i}")

和一个与之交互的程序:

import asyncio


async def main():
    proc = await asyncio.subprocess.create_subprocess_exec(
        "./hello.py", stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE
    )
    proc.stdin.write(b"bob\n")
    print(await proc.stdout.read(1024))
    proc.stdin.write(b"alice\n")
    print(await proc.stdout.read(1024))
    proc.stdin.write(b"quit\n")
    await proc.wait()


asyncio.run(main())

打印出来:

b'hello bob\n'
b'hello alice\n'

请注意,实际模式(在此处以及在相关问题中,几乎所有先前的答案也是如此)是将子级的stdout文件描述符设置为非阻塞,然后在某种选择循环中对其进行轮询。这些天,当然,该循环是由asyncio提供的。

Things are a lot better in modern Python.

Here’s a simple child program, “hello.py”:

#!/usr/bin/env python3

while True:
    i = input()
    if i == "quit":
        break
    print(f"hello {i}")

And a program to interact with it:

import asyncio


async def main():
    proc = await asyncio.subprocess.create_subprocess_exec(
        "./hello.py", stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE
    )
    proc.stdin.write(b"bob\n")
    print(await proc.stdout.read(1024))
    proc.stdin.write(b"alice\n")
    print(await proc.stdout.read(1024))
    proc.stdin.write(b"quit\n")
    await proc.wait()


asyncio.run(main())

That prints out:

b'hello bob\n'
b'hello alice\n'

Note that the actual pattern, which is also by almost all of the previous answers, both here and in related questions, is to set the child’s stdout file descriptor to non-blocking and then poll it in some sort of select loop. These days, of course, that loop is provided by asyncio.


回答 16

选择模块可以帮助您确定下一个有用的输入。

但是,您几乎总是对单独的线程感到满意。一个阻止读取标准输入,另一种则在您不希望阻止的位置进行。

The select module helps you determine where the next useful input is.

However, you’re almost always happier with separate threads. One does a blocking read the stdin, another does wherever it is you don’t want blocked.


回答 17

为什么要打扰线程和队列?与readline()不同,BufferedReader.read1()不会阻塞等待\ r \ n,如果有任何输出进入,它将返回ASAP。

#!/usr/bin/python
from subprocess import Popen, PIPE, STDOUT
import io

def __main__():
    try:
        p = Popen( ["ping", "-n", "3", "127.0.0.1"], stdin=PIPE, stdout=PIPE, stderr=STDOUT )
    except: print("Popen failed"); quit()
    sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
    while True:
        buf = sout.read1(1024)
        if len(buf) == 0: break
        print buf,

if __name__ == '__main__':
    __main__()

why bothering thread&queue? unlike readline(), BufferedReader.read1() wont block waiting for \r\n, it returns ASAP if there is any output coming in.

#!/usr/bin/python
from subprocess import Popen, PIPE, STDOUT
import io

def __main__():
    try:
        p = Popen( ["ping", "-n", "3", "127.0.0.1"], stdin=PIPE, stdout=PIPE, stderr=STDOUT )
    except: print("Popen failed"); quit()
    sout = io.open(p.stdout.fileno(), 'rb', closefd=False)
    while True:
        buf = sout.read1(1024)
        if len(buf) == 0: break
        print buf,

if __name__ == '__main__':
    __main__()

回答 18

以我为例,我需要一个日志记录模块,该模块可以捕获后台应用程序的输出并对其进行扩充(添加时间戳,颜色等)。

我最后得到一个执行实际I / O的后台线程。以下代码仅适用于POSIX平台。我剥去了不必要的部分。

如果有人打算长期使用此野兽,请考虑管理开放描述符。就我而言,这不是一个大问题。

# -*- python -*-
import fcntl
import threading
import sys, os, errno
import subprocess

class Logger(threading.Thread):
    def __init__(self, *modules):
        threading.Thread.__init__(self)
        try:
            from select import epoll, EPOLLIN
            self.__poll = epoll()
            self.__evt = EPOLLIN
            self.__to = -1
        except:
            from select import poll, POLLIN
            print 'epoll is not available'
            self.__poll = poll()
            self.__evt = POLLIN
            self.__to = 100
        self.__fds = {}
        self.daemon = True
        self.start()

    def run(self):
        while True:
            events = self.__poll.poll(self.__to)
            for fd, ev in events:
                if (ev&self.__evt) != self.__evt:
                    continue
                try:
                    self.__fds[fd].run()
                except Exception, e:
                    print e

    def add(self, fd, log):
        assert not self.__fds.has_key(fd)
        self.__fds[fd] = log
        self.__poll.register(fd, self.__evt)

class log:
    logger = Logger()

    def __init__(self, name):
        self.__name = name
        self.__piped = False

    def fileno(self):
        if self.__piped:
            return self.write
        self.read, self.write = os.pipe()
        fl = fcntl.fcntl(self.read, fcntl.F_GETFL)
        fcntl.fcntl(self.read, fcntl.F_SETFL, fl | os.O_NONBLOCK)
        self.fdRead = os.fdopen(self.read)
        self.logger.add(self.read, self)
        self.__piped = True
        return self.write

    def __run(self, line):
        self.chat(line, nl=False)

    def run(self):
        while True:
            try: line = self.fdRead.readline()
            except IOError, exc:
                if exc.errno == errno.EAGAIN:
                    return
                raise
            self.__run(line)

    def chat(self, line, nl=True):
        if nl: nl = '\n'
        else: nl = ''
        sys.stdout.write('[%s] %s%s' % (self.__name, line, nl))

def system(command, param=[], cwd=None, env=None, input=None, output=None):
    args = [command] + param
    p = subprocess.Popen(args, cwd=cwd, stdout=output, stderr=output, stdin=input, env=env, bufsize=0)
    p.wait()

ls = log('ls')
ls.chat('go')
system("ls", ['-l', '/'], output=ls)

date = log('date')
date.chat('go')
system("date", output=date)

In my case I needed a logging module that catches the output from the background applications and augments it(adding time-stamps, colors, etc.).

I ended up with a background thread that does the actual I/O. Following code is only for POSIX platforms. I stripped non-essential parts.

If someone is going to use this beast for long runs consider managing open descriptors. In my case it was not a big problem.

# -*- python -*-
import fcntl
import threading
import sys, os, errno
import subprocess

class Logger(threading.Thread):
    def __init__(self, *modules):
        threading.Thread.__init__(self)
        try:
            from select import epoll, EPOLLIN
            self.__poll = epoll()
            self.__evt = EPOLLIN
            self.__to = -1
        except:
            from select import poll, POLLIN
            print 'epoll is not available'
            self.__poll = poll()
            self.__evt = POLLIN
            self.__to = 100
        self.__fds = {}
        self.daemon = True
        self.start()

    def run(self):
        while True:
            events = self.__poll.poll(self.__to)
            for fd, ev in events:
                if (ev&self.__evt) != self.__evt:
                    continue
                try:
                    self.__fds[fd].run()
                except Exception, e:
                    print e

    def add(self, fd, log):
        assert not self.__fds.has_key(fd)
        self.__fds[fd] = log
        self.__poll.register(fd, self.__evt)

class log:
    logger = Logger()

    def __init__(self, name):
        self.__name = name
        self.__piped = False

    def fileno(self):
        if self.__piped:
            return self.write
        self.read, self.write = os.pipe()
        fl = fcntl.fcntl(self.read, fcntl.F_GETFL)
        fcntl.fcntl(self.read, fcntl.F_SETFL, fl | os.O_NONBLOCK)
        self.fdRead = os.fdopen(self.read)
        self.logger.add(self.read, self)
        self.__piped = True
        return self.write

    def __run(self, line):
        self.chat(line, nl=False)

    def run(self):
        while True:
            try: line = self.fdRead.readline()
            except IOError, exc:
                if exc.errno == errno.EAGAIN:
                    return
                raise
            self.__run(line)

    def chat(self, line, nl=True):
        if nl: nl = '\n'
        else: nl = ''
        sys.stdout.write('[%s] %s%s' % (self.__name, line, nl))

def system(command, param=[], cwd=None, env=None, input=None, output=None):
    args = [command] + param
    p = subprocess.Popen(args, cwd=cwd, stdout=output, stderr=output, stdin=input, env=env, bufsize=0)
    p.wait()

ls = log('ls')
ls.chat('go')
system("ls", ['-l', '/'], output=ls)

date = log('date')
date.chat('go')
system("date", output=date)

回答 19

我的问题有点不同,因为我想从正在运行的进程中同时收集stdout和stderr,但最终还是一样,因为我想在小部件中生成其生成的输出。

我不想诉诸使用Queues或其他线程的许多建议的解决方法,因为执行诸如运行另一个脚本并收集其输出之类的常见任务不需要它们。

阅读建议的解决方案和python文档后,我通过以下实现解决了我的问题。是的,它仅适用于POSIX,因为我正在使用select函数调用。

我同意这些文档令人困惑,并且对于这种常见的脚本编写任务而言,实现很尴尬。我认为python的旧版本具有不同的默认值Popen和不同的解释,因此造成了很多混乱。这对于Python 2.7.12和3.5.2似乎都很好。

关键是设置bufsize=1行缓冲,然后universal_newlines=True处理为文本文件,而不是二进制文件,而二进制文件似乎在设置时成为默认文件bufsize=1

class workerThread(QThread):
   def __init__(self, cmd):
      QThread.__init__(self)
      self.cmd = cmd
      self.result = None           ## return code
      self.error = None            ## flag indicates an error
      self.errorstr = ""           ## info message about the error

   def __del__(self):
      self.wait()
      DEBUG("Thread removed")

   def run(self):
      cmd_list = self.cmd.split(" ")   
      try:
         cmd = subprocess.Popen(cmd_list, bufsize=1, stdin=None
                                        , universal_newlines=True
                                        , stderr=subprocess.PIPE
                                        , stdout=subprocess.PIPE)
      except OSError:
         self.error = 1
         self.errorstr = "Failed to execute " + self.cmd
         ERROR(self.errorstr)
      finally:
         VERBOSE("task started...")
      import select
      while True:
         try:
            r,w,x = select.select([cmd.stdout, cmd.stderr],[],[])
            if cmd.stderr in r:
               line = cmd.stderr.readline()
               if line != "":
                  line = line.strip()
                  self.emit(SIGNAL("update_error(QString)"), line)
            if cmd.stdout in r:
               line = cmd.stdout.readline()
               if line == "":
                  break
               line = line.strip()
               self.emit(SIGNAL("update_output(QString)"), line)
         except IOError:
            pass
      cmd.wait()
      self.result = cmd.returncode
      if self.result < 0:
         self.error = 1
         self.errorstr = "Task terminated by signal " + str(self.result)
         ERROR(self.errorstr)
         return
      if self.result:
         self.error = 1
         self.errorstr = "exit code " + str(self.result)
         ERROR(self.errorstr)
         return
      return

ERROR,DEBUG和VERBOSE只是将输出打印到终端的宏。

该解决方案的IMHO 99.99%有效,因为它仍使用阻塞readline功能,因此我们认为子过程很好并且输出了完整的行。

我欢迎反馈以改进解决方案,因为我还是Python的新手。

My problem is a bit different as I wanted to collect both stdout and stderr from a running process, but ultimately the same since I wanted to render the output in a widget as its generated.

I did not want to resort to many of the proposed workarounds using Queues or additional Threads as they should not be necessary to perform such a common task as running another script and collecting its output.

After reading the proposed solutions and python docs I resolved my issue with the implementation below. Yes it only works for POSIX as I’m using the select function call.

I agree that the docs are confusing and the implementation is awkward for such a common scripting task. I believe that older versions of python have different defaults for Popen and different explanations so that created a lot of confusion. This seems to work well for both Python 2.7.12 and 3.5.2.

The key was to set bufsize=1 for line buffering and then universal_newlines=True to process as a text file instead of a binary which seems to become the default when setting bufsize=1.

class workerThread(QThread):
   def __init__(self, cmd):
      QThread.__init__(self)
      self.cmd = cmd
      self.result = None           ## return code
      self.error = None            ## flag indicates an error
      self.errorstr = ""           ## info message about the error

   def __del__(self):
      self.wait()
      DEBUG("Thread removed")

   def run(self):
      cmd_list = self.cmd.split(" ")   
      try:
         cmd = subprocess.Popen(cmd_list, bufsize=1, stdin=None
                                        , universal_newlines=True
                                        , stderr=subprocess.PIPE
                                        , stdout=subprocess.PIPE)
      except OSError:
         self.error = 1
         self.errorstr = "Failed to execute " + self.cmd
         ERROR(self.errorstr)
      finally:
         VERBOSE("task started...")
      import select
      while True:
         try:
            r,w,x = select.select([cmd.stdout, cmd.stderr],[],[])
            if cmd.stderr in r:
               line = cmd.stderr.readline()
               if line != "":
                  line = line.strip()
                  self.emit(SIGNAL("update_error(QString)"), line)
            if cmd.stdout in r:
               line = cmd.stdout.readline()
               if line == "":
                  break
               line = line.strip()
               self.emit(SIGNAL("update_output(QString)"), line)
         except IOError:
            pass
      cmd.wait()
      self.result = cmd.returncode
      if self.result < 0:
         self.error = 1
         self.errorstr = "Task terminated by signal " + str(self.result)
         ERROR(self.errorstr)
         return
      if self.result:
         self.error = 1
         self.errorstr = "exit code " + str(self.result)
         ERROR(self.errorstr)
         return
      return

ERROR, DEBUG and VERBOSE are simply macros that print output to the terminal.

This solution is IMHO 99.99% effective as it still uses the blocking readline function, so we assume the sub process is nice and outputs complete lines.

I welcome feedback to improve the solution as I am still new to Python.


回答 20

我已经基于JF Sebastian的解决方案创建了一个库。您可以使用它。

https://github.com/cenkalti/什么


回答 21

从JF Sebastian的答案以及其他几个方面的工作出发,我组成了一个简单的子流程管理器。它提供对请求的非阻塞读取,以及并行运行多个进程。它不使用任何操作系统特定的调用(据我所知),因此应该可以在任何地方使用。

可以从pypi获得,所以pip install shelljob。有关示例和完整文档,请参考项目页面

Working from J.F. Sebastian’s answer, and several other sources, I’ve put together a simple subprocess manager. It provides the request non-blocking reading, as well as running several processes in parallel. It doesn’t use any OS-specific call (that I’m aware) and thus should work anywhere.

It’s available from pypi, so just pip install shelljob. Refer to the project page for examples and full docs.


回答 22

编辑:此实现仍会阻止。请改用JFSebastian的答案

我尝试了最佳答案,但是线程代码的额外风险和维护令人担忧。

通过io模块(仅限于2.6),我发现BufferedReader。这是我的无线程,非阻塞解决方案。

import io
from subprocess import PIPE, Popen

p = Popen(['myprogram.exe'], stdout=PIPE)

SLEEP_DELAY = 0.001

# Create an io.BufferedReader on the file descriptor for stdout
with io.open(p.stdout.fileno(), 'rb', closefd=False) as buffer:
  while p.poll() == None:
      time.sleep(SLEEP_DELAY)
      while '\n' in bufferedStdout.peek(bufferedStdout.buffer_size):
          line = buffer.readline()
          # do stuff with the line

  # Handle any remaining output after the process has ended
  while buffer.peek():
    line = buffer.readline()
    # do stuff with the line

EDIT: This implementation still blocks. Use J.F.Sebastian’s answer instead.

I tried the top answer, but the additional risk and maintenance of thread code was worrisome.

Looking through the io module (and being limited to 2.6), I found BufferedReader. This is my threadless, non-blocking solution.

import io
from subprocess import PIPE, Popen

p = Popen(['myprogram.exe'], stdout=PIPE)

SLEEP_DELAY = 0.001

# Create an io.BufferedReader on the file descriptor for stdout
with io.open(p.stdout.fileno(), 'rb', closefd=False) as buffer:
  while p.poll() == None:
      time.sleep(SLEEP_DELAY)
      while '\n' in bufferedStdout.peek(bufferedStdout.buffer_size):
          line = buffer.readline()
          # do stuff with the line

  # Handle any remaining output after the process has ended
  while buffer.peek():
    line = buffer.readline()
    # do stuff with the line

回答 23

我最近偶然发现了同一个问题,我需要在非阻塞模式下一次从流中读取一行(在子进程中运行尾部),我想避免下一个问题:不要刻录cpu,不要按一个字节读取流(就像readline一样),等等

这是我的实现 https://gist.github.com/grubberr/5501e1a9760c3eab5e0a 它不支持Windows(民意测验),不处理EOF,但是对我来说很好

I recently stumbled upon on the same problem I need to read one line at time from stream ( tail run in subprocess ) in non-blocking mode I wanted to avoid next problems: not to burn cpu, don’t read stream by one byte (like readline did ), etc

Here is my implementation https://gist.github.com/grubberr/5501e1a9760c3eab5e0a it don’t support windows (poll), don’t handle EOF, but it works for me well


回答 24

这是在子进程中运行交互式命令的示例,并且stdout是使用伪终端进行交互的。您可以参考:https : //stackoverflow.com/a/43012138/3555925

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import sys
import select
import termios
import tty
import pty
from subprocess import Popen

command = 'bash'
# command = 'docker run -it --rm centos /bin/bash'.split()

# save original tty setting then set it to raw mode
old_tty = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin.fileno())

# open pseudo-terminal to interact with subprocess
master_fd, slave_fd = pty.openpty()

# use os.setsid() make it run in a new process group, or bash job control will not be enabled
p = Popen(command,
          preexec_fn=os.setsid,
          stdin=slave_fd,
          stdout=slave_fd,
          stderr=slave_fd,
          universal_newlines=True)

while p.poll() is None:
    r, w, e = select.select([sys.stdin, master_fd], [], [])
    if sys.stdin in r:
        d = os.read(sys.stdin.fileno(), 10240)
        os.write(master_fd, d)
    elif master_fd in r:
        o = os.read(master_fd, 10240)
        if o:
            os.write(sys.stdout.fileno(), o)

# restore tty settings back
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)

This is a example to run interactive command in subprocess, and the stdout is interactive by using pseudo terminal. You can refer to: https://stackoverflow.com/a/43012138/3555925

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import sys
import select
import termios
import tty
import pty
from subprocess import Popen

command = 'bash'
# command = 'docker run -it --rm centos /bin/bash'.split()

# save original tty setting then set it to raw mode
old_tty = termios.tcgetattr(sys.stdin)
tty.setraw(sys.stdin.fileno())

# open pseudo-terminal to interact with subprocess
master_fd, slave_fd = pty.openpty()

# use os.setsid() make it run in a new process group, or bash job control will not be enabled
p = Popen(command,
          preexec_fn=os.setsid,
          stdin=slave_fd,
          stdout=slave_fd,
          stderr=slave_fd,
          universal_newlines=True)

while p.poll() is None:
    r, w, e = select.select([sys.stdin, master_fd], [], [])
    if sys.stdin in r:
        d = os.read(sys.stdin.fileno(), 10240)
        os.write(master_fd, d)
    elif master_fd in r:
        o = os.read(master_fd, 10240)
        if o:
            os.write(sys.stdout.fileno(), o)

# restore tty settings back
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_tty)

回答 25

此解决方案使用 select模块从IO流“读取任何可用数据”。此功能最初会阻塞,直到有可用数据为止,然后才读取可用数据,并且不会进一步阻塞。

考虑到它使用select模块的事实,这仅适用于Unix。

该代码完全符合PEP8。

import select


def read_available(input_stream, max_bytes=None):
    """
    Blocks until any data is available, then all available data is then read and returned.
    This function returns an empty string when end of stream is reached.

    Args:
        input_stream: The stream to read from.
        max_bytes (int|None): The maximum number of bytes to read. This function may return fewer bytes than this.

    Returns:
        str
    """
    # Prepare local variables
    input_streams = [input_stream]
    empty_list = []
    read_buffer = ""

    # Initially block for input using 'select'
    if len(select.select(input_streams, empty_list, empty_list)[0]) > 0:

        # Poll read-readiness using 'select'
        def select_func():
            return len(select.select(input_streams, empty_list, empty_list, 0)[0]) > 0

        # Create while function based on parameters
        if max_bytes is not None:
            def while_func():
                return (len(read_buffer) < max_bytes) and select_func()
        else:
            while_func = select_func

        while True:
            # Read single byte at a time
            read_data = input_stream.read(1)
            if len(read_data) == 0:
                # End of stream
                break
            # Append byte to string buffer
            read_buffer += read_data
            # Check if more data is available
            if not while_func():
                break

    # Return read buffer
    return read_buffer

This solution uses the select module to “read any available data” from an IO stream. This function blocks initially until data is available, but then reads only the data that is available and doesn’t block further.

Given the fact that it uses the select module, this only works on Unix.

The code is fully PEP8-compliant.

import select


def read_available(input_stream, max_bytes=None):
    """
    Blocks until any data is available, then all available data is then read and returned.
    This function returns an empty string when end of stream is reached.

    Args:
        input_stream: The stream to read from.
        max_bytes (int|None): The maximum number of bytes to read. This function may return fewer bytes than this.

    Returns:
        str
    """
    # Prepare local variables
    input_streams = [input_stream]
    empty_list = []
    read_buffer = ""

    # Initially block for input using 'select'
    if len(select.select(input_streams, empty_list, empty_list)[0]) > 0:

        # Poll read-readiness using 'select'
        def select_func():
            return len(select.select(input_streams, empty_list, empty_list, 0)[0]) > 0

        # Create while function based on parameters
        if max_bytes is not None:
            def while_func():
                return (len(read_buffer) < max_bytes) and select_func()
        else:
            while_func = select_func

        while True:
            # Read single byte at a time
            read_data = input_stream.read(1)
            if len(read_data) == 0:
                # End of stream
                break
            # Append byte to string buffer
            read_buffer += read_data
            # Check if more data is available
            if not while_func():
                break

    # Return read buffer
    return read_buffer

回答 26

我也遇到了Jesse所描述的问题,并像BradleyAndy和其他人一样使用“选择”来解决该问题,但是以阻塞模式进行以避免繁忙的循环。它使用虚拟管道作为伪造的标准输入。选择阻塞并等待标准输入或管道准备就绪。按下键时,stdin会取消选择的阻塞,并且可以使用read(1)检索键值。当不同的线程写入管道时,管道将解除对选择的阻塞,并且可以将其视为对stdin的需求已结束的指示。这是一些参考代码:

import sys
import os
from select import select

# -------------------------------------------------------------------------    
# Set the pipe (fake stdin) to simulate a final key stroke
# which will unblock the select statement
readEnd, writeEnd = os.pipe()
readFile = os.fdopen(readEnd)
writeFile = os.fdopen(writeEnd, "w")

# -------------------------------------------------------------------------
def getKey():

    # Wait for stdin or pipe (fake stdin) to be ready
    dr,dw,de = select([sys.__stdin__, readFile], [], [])

    # If stdin is the one ready then read it and return value
    if sys.__stdin__ in dr:
        return sys.__stdin__.read(1)   # For Windows use ----> getch() from module msvcrt

    # Must finish
    else:
        return None

# -------------------------------------------------------------------------
def breakStdinRead():
    writeFile.write(' ')
    writeFile.flush()

# -------------------------------------------------------------------------
# MAIN CODE

# Get key stroke
key = getKey()

# Keyboard input
if key:
    # ... do your stuff with the key value

# Faked keystroke
else:
    # ... use of stdin finished

# -------------------------------------------------------------------------
# OTHER THREAD CODE

breakStdinRead()

I also faced the problem described by Jesse and solved it by using “select” as Bradley, Andy and others did but in a blocking mode to avoid a busy loop. It uses a dummy Pipe as a fake stdin. The select blocks and wait for either stdin or the pipe to be ready. When a key is pressed stdin unblocks the select and the key value can be retrieved with read(1). When a different thread writes to the pipe then the pipe unblocks the select and it can be taken as an indication that the need for stdin is over. Here is some reference code:

import sys
import os
from select import select

# -------------------------------------------------------------------------    
# Set the pipe (fake stdin) to simulate a final key stroke
# which will unblock the select statement
readEnd, writeEnd = os.pipe()
readFile = os.fdopen(readEnd)
writeFile = os.fdopen(writeEnd, "w")

# -------------------------------------------------------------------------
def getKey():

    # Wait for stdin or pipe (fake stdin) to be ready
    dr,dw,de = select([sys.__stdin__, readFile], [], [])

    # If stdin is the one ready then read it and return value
    if sys.__stdin__ in dr:
        return sys.__stdin__.read(1)   # For Windows use ----> getch() from module msvcrt

    # Must finish
    else:
        return None

# -------------------------------------------------------------------------
def breakStdinRead():
    writeFile.write(' ')
    writeFile.flush()

# -------------------------------------------------------------------------
# MAIN CODE

# Get key stroke
key = getKey()

# Keyboard input
if key:
    # ... do your stuff with the key value

# Faked keystroke
else:
    # ... use of stdin finished

# -------------------------------------------------------------------------
# OTHER THREAD CODE

breakStdinRead()

回答 27

尝试wexpect,它是pexpect的Windows替代产品

import wexpect

p = wexpect.spawn('myprogram.exe')
p.stdout.readline('.')               // regex pattern of any character
output_str = p.after()

Try wexpect, which is the windows alternative of pexpect.

import wexpect

p = wexpect.spawn('myprogram.exe')
p.stdout.readline('.')               // regex pattern of any character
output_str = p.after()

回答 28

在类似Unix的系统和Python 3.5+上os.set_blocking,它的功能完全符合其要求。

import os
import time
import subprocess

cmd = 'python3', '-c', 'import time; [(print(i), time.sleep(1)) for i in range(5)]'
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
os.set_blocking(p.stdout.fileno(), False)
start = time.time()
while True:
    # first iteration always produces empty byte string in non-blocking mode
    for i in range(2):    
        line = p.stdout.readline()
        print(i, line)
        time.sleep(0.5)
    if time.time() > start + 5:
        break
p.terminate()

输出:

1 b''
2 b'0\n'
1 b''
2 b'1\n'
1 b''
2 b'2\n'
1 b''
2 b'3\n'
1 b''
2 b'4\n'

os.set_blocking评论的是:

0 b'0\n'
1 b'1\n'
0 b'2\n'
1 b'3\n'
0 b'4\n'
1 b''

On Unix-like systems and Python 3.5+ there’s os.set_blocking which does exactly what it says.

import os
import time
import subprocess

cmd = 'python3', '-c', 'import time; [(print(i), time.sleep(1)) for i in range(5)]'
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
os.set_blocking(p.stdout.fileno(), False)
start = time.time()
while True:
    # first iteration always produces empty byte string in non-blocking mode
    for i in range(2):    
        line = p.stdout.readline()
        print(i, line)
        time.sleep(0.5)
    if time.time() > start + 5:
        break
p.terminate()

This outputs:

1 b''
2 b'0\n'
1 b''
2 b'1\n'
1 b''
2 b'2\n'
1 b''
2 b'3\n'
1 b''
2 b'4\n'

With os.set_blocking commented it’s:

0 b'0\n'
1 b'1\n'
0 b'2\n'
1 b'3\n'
0 b'4\n'
1 b''

回答 29

这是一个支持python中的非阻塞读取和后台写入的模块:

https://pypi.python.org/pypi/python-nonblock

提供功能

nonblock_read将从流中读取数据(如果有),否则返回一个空字符串(如果流的另一端关闭并且已读取所有可能的数据,则返回None)

您还可以考虑使用python-subprocess2模块,

https://pypi.python.org/pypi/python-subprocess2

这将添加到子流程模块。因此,在从“ subprocess.Popen”返回的对象上添加了附加方法runInBackground。这将启动一个线程并返回一个对象,该对象将在将内容写入stdout / stderr时自动填充,而不会阻塞您的主线程。

请享用!

Here is a module that supports non-blocking reads and background writes in python:

https://pypi.python.org/pypi/python-nonblock

Provides a function,

nonblock_read which will read data from the stream, if available, otherwise return an empty string (or None if the stream is closed on the other side and all possible data has been read)

You may also consider the python-subprocess2 module,

https://pypi.python.org/pypi/python-subprocess2

which adds to the subprocess module. So on the object returned from “subprocess.Popen” is added an additional method, runInBackground. This starts a thread and returns an object which will automatically be populated as stuff is written to stdout/stderr, without blocking your main thread.

Enjoy!


如何计算两个给定日期之间的天数?

问题:如何计算两个给定日期之间的天数?

如果我有两个日期(例如'8/18/2008''9/26/2008'),获取这两个日期之间的天数的最佳方法是什么?

If I have two dates (ex. '8/18/2008' and '9/26/2008'), what is the best way to get the number of days between these two dates?


回答 0

如果您有两个日期对象,则可以减去它们,从而计算出一个timedelta对象。

from datetime import date

d0 = date(2008, 8, 18)
d1 = date(2008, 9, 26)
delta = d1 - d0
print(delta.days)

文档的相关部分:https : //docs.python.org/library/datetime.html

参见此答案的另一个示例。

If you have two date objects, you can just subtract them, which computes a timedelta object.

from datetime import date

d0 = date(2008, 8, 18)
d1 = date(2008, 9, 26)
delta = d1 - d0
print(delta.days)

The relevant section of the docs: https://docs.python.org/library/datetime.html.

See this answer for another example.


回答 1

使用datetime的功能:

from datetime import datetime
date_format = "%m/%d/%Y"
a = datetime.strptime('8/18/2008', date_format)
b = datetime.strptime('9/26/2008', date_format)
delta = b - a
print delta.days # that's it

Using the power of datetime:

from datetime import datetime
date_format = "%m/%d/%Y"
a = datetime.strptime('8/18/2008', date_format)
b = datetime.strptime('9/26/2008', date_format)
delta = b - a
print delta.days # that's it

回答 2

直到圣诞节的天数:

>>> import datetime
>>> today = datetime.date.today()
>>> someday = datetime.date(2008, 12, 25)
>>> diff = someday - today
>>> diff.days
86

这里有更多的算术。

Days until Christmas:

>>> import datetime
>>> today = datetime.date.today()
>>> someday = datetime.date(2008, 12, 25)
>>> diff = someday - today
>>> diff.days
86

More arithmetic here.


回答 3

您需要datetime模块。

>>> from datetime import datetime, timedelta 
>>> datetime(2008,08,18) - datetime(2008,09,26) 
datetime.timedelta(4) 

另一个例子:

>>> import datetime 
>>> today = datetime.date.today() 
>>> print(today)
2008-09-01 
>>> last_year = datetime.date(2007, 9, 1) 
>>> print(today - last_year)
366 days, 0:00:00 

正如这里指出的

You want the datetime module.

>>> from datetime import datetime, timedelta 
>>> datetime(2008,08,18) - datetime(2008,09,26) 
datetime.timedelta(4) 

Another example:

>>> import datetime 
>>> today = datetime.date.today() 
>>> print(today)
2008-09-01 
>>> last_year = datetime.date(2007, 9, 1) 
>>> print(today - last_year)
366 days, 0:00:00 

As pointed out here


回答 4

from datetime import datetime
start_date = datetime.strptime('8/18/2008', "%m/%d/%Y")
end_date = datetime.strptime('9/26/2008', "%m/%d/%Y")
print abs((end_date-start_date).days)
from datetime import datetime
start_date = datetime.strptime('8/18/2008', "%m/%d/%Y")
end_date = datetime.strptime('9/26/2008', "%m/%d/%Y")
print abs((end_date-start_date).days)

回答 5

也可以通过以下操作轻松完成arrow

import arrow

a = arrow.get('2017-05-09')
b = arrow.get('2017-05-11')

delta = (b-a)
print delta.days

供参考:http : //arrow.readthedocs.io/en/latest/

It also can be easily done with arrow:

import arrow

a = arrow.get('2017-05-09')
b = arrow.get('2017-05-11')

delta = (b-a)
print delta.days

For reference: http://arrow.readthedocs.io/en/latest/


回答 6

不使用Lib只是纯代码:

#Calculate the Days between Two Date

daysOfMonths = [ 31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]

def isLeapYear(year):

    # Pseudo code for this algorithm is found at
    # http://en.wikipedia.org/wiki/Leap_year#Algorithm
    ## if (year is not divisible by 4) then (it is a common Year)
    #else if (year is not divisable by 100) then (ut us a leap year)
    #else if (year is not disible by 400) then (it is a common year)
    #else(it is aleap year)
    return (year % 4 == 0 and year % 100 != 0) or year % 400 == 0

def Count_Days(year1, month1, day1):
    if month1 ==2:
        if isLeapYear(year1):
            if day1 < daysOfMonths[month1-1]+1:
                return year1, month1, day1+1
            else:
                if month1 ==12:
                    return year1+1,1,1
                else:
                    return year1, month1 +1 , 1
        else: 
            if day1 < daysOfMonths[month1-1]:
                return year1, month1, day1+1
            else:
                if month1 ==12:
                    return year1+1,1,1
                else:
                    return year1, month1 +1 , 1
    else:
        if day1 < daysOfMonths[month1-1]:
             return year1, month1, day1+1
        else:
            if month1 ==12:
                return year1+1,1,1
            else:
                    return year1, month1 +1 , 1


def daysBetweenDates(y1, m1, d1, y2, m2, d2,end_day):

    if y1 > y2:
        m1,m2 = m2,m1
        y1,y2 = y2,y1
        d1,d2 = d2,d1
    days=0
    while(not(m1==m2 and y1==y2 and d1==d2)):
        y1,m1,d1 = Count_Days(y1,m1,d1)
        days+=1
    if end_day:
        days+=1
    return days


# Test Case

def test():
    test_cases = [((2012,1,1,2012,2,28,False), 58), 
                  ((2012,1,1,2012,3,1,False), 60),
                  ((2011,6,30,2012,6,30,False), 366),
                  ((2011,1,1,2012,8,8,False), 585 ),
                  ((1994,5,15,2019,8,31,False), 9239),
                  ((1999,3,24,2018,2,4,False), 6892),
                  ((1999,6,24,2018,8,4,False),6981),
                  ((1995,5,24,2018,12,15,False),8606),
                  ((1994,8,24,2019,12,15,True),9245),
                  ((2019,12,15,1994,8,24,True),9245),
                  ((2019,5,15,1994,10,24,True),8970),
                  ((1994,11,24,2019,8,15,True),9031)]

    for (args, answer) in test_cases:
        result = daysBetweenDates(*args)
        if result != answer:
            print "Test with data:", args, "failed"
        else:
            print "Test case passed!"

test()

without using Lib just pure code:

#Calculate the Days between Two Date

daysOfMonths = [ 31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]

def isLeapYear(year):

    # Pseudo code for this algorithm is found at
    # http://en.wikipedia.org/wiki/Leap_year#Algorithm
    ## if (year is not divisible by 4) then (it is a common Year)
    #else if (year is not divisable by 100) then (ut us a leap year)
    #else if (year is not disible by 400) then (it is a common year)
    #else(it is aleap year)
    return (year % 4 == 0 and year % 100 != 0) or year % 400 == 0

def Count_Days(year1, month1, day1):
    if month1 ==2:
        if isLeapYear(year1):
            if day1 < daysOfMonths[month1-1]+1:
                return year1, month1, day1+1
            else:
                if month1 ==12:
                    return year1+1,1,1
                else:
                    return year1, month1 +1 , 1
        else: 
            if day1 < daysOfMonths[month1-1]:
                return year1, month1, day1+1
            else:
                if month1 ==12:
                    return year1+1,1,1
                else:
                    return year1, month1 +1 , 1
    else:
        if day1 < daysOfMonths[month1-1]:
             return year1, month1, day1+1
        else:
            if month1 ==12:
                return year1+1,1,1
            else:
                    return year1, month1 +1 , 1


def daysBetweenDates(y1, m1, d1, y2, m2, d2,end_day):

    if y1 > y2:
        m1,m2 = m2,m1
        y1,y2 = y2,y1
        d1,d2 = d2,d1
    days=0
    while(not(m1==m2 and y1==y2 and d1==d2)):
        y1,m1,d1 = Count_Days(y1,m1,d1)
        days+=1
    if end_day:
        days+=1
    return days


# Test Case

def test():
    test_cases = [((2012,1,1,2012,2,28,False), 58), 
                  ((2012,1,1,2012,3,1,False), 60),
                  ((2011,6,30,2012,6,30,False), 366),
                  ((2011,1,1,2012,8,8,False), 585 ),
                  ((1994,5,15,2019,8,31,False), 9239),
                  ((1999,3,24,2018,2,4,False), 6892),
                  ((1999,6,24,2018,8,4,False),6981),
                  ((1995,5,24,2018,12,15,False),8606),
                  ((1994,8,24,2019,12,15,True),9245),
                  ((2019,12,15,1994,8,24,True),9245),
                  ((2019,5,15,1994,10,24,True),8970),
                  ((1994,11,24,2019,8,15,True),9031)]

    for (args, answer) in test_cases:
        result = daysBetweenDates(*args)
        if result != answer:
            print "Test with data:", args, "failed"
        else:
            print "Test case passed!"

test()

回答 7

每个人都很好地用日期回答了,让我尝试用熊猫来回答

dt = pd.to_datetime('2008/08/18', format='%Y/%m/%d')
dt1 = pd.to_datetime('2008/09/26', format='%Y/%m/%d')

(dt1-dt).days

这将给出答案。如果输入之一是dataframe列。只需使用dt.days代替days

(dt1-dt).dt.days

everyone has answered excellently using the date, let me try to answer it using pandas

dt = pd.to_datetime('2008/08/18', format='%Y/%m/%d')
dt1 = pd.to_datetime('2008/09/26', format='%Y/%m/%d')

(dt1-dt).days

This will give the answer. In case one of the input is dataframe column. simply use dt.days in place of days

(dt1-dt).dt.days

回答 8

还有一种datetime.toordinal()尚未提及的方法:

import datetime
print(datetime.date(2008,9,26).toordinal() - datetime.date(2008,8,18).toordinal())  # 39

https://docs.python.org/3/library/datetime.html#datetime.date.toordinal

date.toordinal()

返回日期,其中1月1日1年的有序1.对于任何的proleptic阳历序date对象ddate.fromordinal(d.toordinal()) == d

似乎很适合计算天差,尽管不如timedelta.days

There is also a datetime.toordinal() method that was not mentioned yet:

import datetime
print(datetime.date(2008,9,26).toordinal() - datetime.date(2008,8,18).toordinal())  # 39

https://docs.python.org/3/library/datetime.html#datetime.date.toordinal

date.toordinal()

Return the proleptic Gregorian ordinal of the date, where January 1 of year 1 has ordinal 1. For any date object d, date.fromordinal(d.toordinal()) == d.

Seems well suited for calculating days difference, though not as readable as timedelta.days.


回答 9

为了计算日期和时间,有几种选择,但是我将编写简单的方法:

from datetime import timedelta, datetime, date
import dateutil.relativedelta

# current time
date_and_time = datetime.datetime.now()
date_only = date.today()
time_only = datetime.datetime.now().time()

# calculate date and time
result = date_and_time - datetime.timedelta(hours=26, minutes=25, seconds=10)

# calculate dates: years (-/+)
result = date_only - dateutil.relativedelta.relativedelta(years=10)

# months
result = date_only - dateutil.relativedelta.relativedelta(months=10)

# days
result = date_only - dateutil.relativedelta.relativedelta(days=10)

# calculate time 
result = date_and_time - datetime.timedelta(hours=26, minutes=25, seconds=10)
result.time()

希望能帮助到你

For calculating dates and times, there are several options but I will write the simple way:

from datetime import timedelta, datetime, date
import dateutil.relativedelta

# current time
date_and_time = datetime.datetime.now()
date_only = date.today()
time_only = datetime.datetime.now().time()

# calculate date and time
result = date_and_time - datetime.timedelta(hours=26, minutes=25, seconds=10)

# calculate dates: years (-/+)
result = date_only - dateutil.relativedelta.relativedelta(years=10)

# months
result = date_only - dateutil.relativedelta.relativedelta(months=10)

# days
result = date_only - dateutil.relativedelta.relativedelta(days=10)

# calculate time 
result = date_and_time - datetime.timedelta(hours=26, minutes=25, seconds=10)
result.time()

Hope it helps


回答 10

from datetime import date
def d(s):
  [month, day, year] = map(int, s.split('/'))
  return date(year, month, day)
def days(start, end):
  return (d(end) - d(start)).days
print days('8/18/2008', '9/26/2008')

当然,这是假设您已经确认日期采用格式r'\d+/\d+/\d+'

from datetime import date
def d(s):
  [month, day, year] = map(int, s.split('/'))
  return date(year, month, day)
def days(start, end):
  return (d(end) - d(start)).days
print days('8/18/2008', '9/26/2008')

This assumes, of course, that you’ve already verified that your dates are in the format r'\d+/\d+/\d+'.


回答 11

以下是解决此问题的三种方法:

from datetime import datetime

Now = datetime.now()
StartDate = datetime.strptime(str(Now.year) +'-01-01', '%Y-%m-%d')
NumberOfDays = (Now - StartDate)

print(NumberOfDays.days)                     # Starts at 0
print(datetime.now().timetuple().tm_yday)    # Starts at 1
print(Now.strftime('%j'))                    # Starts at 1

Here are three ways to go with this problem :

from datetime import datetime

Now = datetime.now()
StartDate = datetime.strptime(str(Now.year) +'-01-01', '%Y-%m-%d')
NumberOfDays = (Now - StartDate)

print(NumberOfDays.days)                     # Starts at 0
print(datetime.now().timetuple().tm_yday)    # Starts at 1
print(Now.strftime('%j'))                    # Starts at 1

转置/解压缩功能(zip的反函数)?

问题:转置/解压缩功能(zip的反函数)?

我有一个2项元组的列表,我想将它们转换为2个列表,其中第一个包含每个元组中的第一项,第二个包含第二项。

例如:

original = [('a', 1), ('b', 2), ('c', 3), ('d', 4)]
# and I want to become...
result = (['a', 'b', 'c', 'd'], [1, 2, 3, 4])

有内置的功能吗?

I have a list of 2-item tuples and I’d like to convert them to 2 lists where the first contains the first item in each tuple and the second list holds the second item.

For example:

original = [('a', 1), ('b', 2), ('c', 3), ('d', 4)]
# and I want to become...
result = (['a', 'b', 'c', 'd'], [1, 2, 3, 4])

Is there a builtin function that does that?


回答 0

zip是它自己的逆!前提是您使用特殊的*运算符。

>>> zip(*[('a', 1), ('b', 2), ('c', 3), ('d', 4)])
[('a', 'b', 'c', 'd'), (1, 2, 3, 4)]

它的工作方式是通过调用zip参数:

zip(('a', 1), ('b', 2), ('c', 3), ('d', 4))

…除了参数zip直接传递(在转换为元组之后)之外,因此不必担心参数数量太大。

zip is its own inverse! Provided you use the special * operator.

>>> zip(*[('a', 1), ('b', 2), ('c', 3), ('d', 4)])
[('a', 'b', 'c', 'd'), (1, 2, 3, 4)]

The way this works is by calling zip with the arguments:

zip(('a', 1), ('b', 2), ('c', 3), ('d', 4))

… except the arguments are passed to zip directly (after being converted to a tuple), so there’s no need to worry about the number of arguments getting too big.


回答 1

你也可以

result = ([ a for a,b in original ], [ b for a,b in original ])

应该更好地扩展。特别是如果Python除非需要,否则最好不要扩展列表推导。

(顺便说一句,它会组成一个2元组(一对)的列表,而不是一个元组列表,例如 zip。)

如果可以使用生成器而不是实际列表,则可以这样做:

result = (( a for a,b in original ), ( b for a,b in original ))

生成器在您请求每个元素之前不会仔细检查列表,但是另一方面,它们会保留对原始列表的引用。

You could also do

result = ([ a for a,b in original ], [ b for a,b in original ])

It should scale better. Especially if Python makes good on not expanding the list comprehensions unless needed.

(Incidentally, it makes a 2-tuple (pair) of lists, rather than a list of tuples, like zip does.)

If generators instead of actual lists are ok, this would do that:

result = (( a for a,b in original ), ( b for a,b in original ))

The generators don’t munch through the list until you ask for each element, but on the other hand, they do keep references to the original list.


回答 2

如果列表的长度不同,则可能不希望按照Patricks的答案使用zip。这有效:

>>> zip(*[('a', 1), ('b', 2), ('c', 3), ('d', 4)])
[('a', 'b', 'c', 'd'), (1, 2, 3, 4)]

但是使用不同的长度列表,zip会将每个项目截断为最短列表的长度:

>>> zip(*[('a', 1), ('b', 2), ('c', 3), ('d', 4), ('e', )])
[('a', 'b', 'c', 'd', 'e')]

您可以使用不带功能的map来用None填充空白结果:

>>> map(None, *[('a', 1), ('b', 2), ('c', 3), ('d', 4), ('e', )])
[('a', 'b', 'c', 'd', 'e'), (1, 2, 3, 4, None)]

zip()稍快一些。

If you have lists that are not the same length, you may not want to use zip as per Patricks answer. This works:

>>> zip(*[('a', 1), ('b', 2), ('c', 3), ('d', 4)])
[('a', 'b', 'c', 'd'), (1, 2, 3, 4)]

But with different length lists, zip truncates each item to the length of the shortest list:

>>> zip(*[('a', 1), ('b', 2), ('c', 3), ('d', 4), ('e', )])
[('a', 'b', 'c', 'd', 'e')]

You can use map with no function to fill empty results with None:

>>> map(None, *[('a', 1), ('b', 2), ('c', 3), ('d', 4), ('e', )])
[('a', 'b', 'c', 'd', 'e'), (1, 2, 3, 4, None)]

zip() is marginally faster though.


回答 3

我喜欢在程序中使用zip(*iterable)(这是您要查找的代码):

def unzip(iterable):
    return zip(*iterable)

我发现unzip更具可读性。

I like to use zip(*iterable) (which is the piece of code you’re looking for) in my programs as so:

def unzip(iterable):
    return zip(*iterable)

I find unzip more readable.


回答 4

>>> original = [('a', 1), ('b', 2), ('c', 3), ('d', 4)]
>>> tuple([list(tup) for tup in zip(*original)])
(['a', 'b', 'c', 'd'], [1, 2, 3, 4])

给出问题中的列表元组。

list1, list2 = [list(tup) for tup in zip(*original)]

解压缩两个列表。

>>> original = [('a', 1), ('b', 2), ('c', 3), ('d', 4)]
>>> tuple([list(tup) for tup in zip(*original)])
(['a', 'b', 'c', 'd'], [1, 2, 3, 4])

Gives a tuple of lists as in the question.

list1, list2 = [list(tup) for tup in zip(*original)]

Unpacks the two lists.


回答 5

天真的方法

def transpose_finite_iterable(iterable):
    return zip(*iterable)  # `itertools.izip` for Python 2 users

对于(潜在无限)可迭代的有限可迭代(例如list/ tuple/的序列str),效果很好

| |a_00| |a_10| ... |a_n0| |
| |a_01| |a_11| ... |a_n1| |
| |... | |... | ... |... | |
| |a_0i| |a_1i| ... |a_ni| |
| |... | |... | ... |... | |

哪里

  • n in ℕ
  • a_ij对应于-th可迭代的j-th元素i

申请后transpose_finite_iterable我们得到

| |a_00| |a_01| ... |a_0i| ... |
| |a_10| |a_11| ... |a_1i| ... |
| |... | |... | ... |... | ... |
| |a_n0| |a_n1| ... |a_ni| ... |

这种情况的Python示例,其中a_ij == jn == 2

>>> from itertools import count
>>> iterable = [count(), count()]
>>> result = transpose_finite_iterable(iterable)
>>> next(result)
(0, 0)
>>> next(result)
(1, 1)

但是我们不能transpose_finite_iterable再次使用它来返回原始的结构,iterable因为它result是有限迭代的无限迭代(tuple在我们的例子中是s):

>>> transpose_finite_iterable(result)
... hangs ...
Traceback (most recent call last):
  File "...", line 1, in ...
  File "...", line 2, in transpose_finite_iterable
MemoryError

那么我们该如何处理呢?

…这是 deque

看完itertools.teefunction文档后,有一些Python配方可以通过一些修改来帮助解决我们的问题

def transpose_finite_iterables(iterable):
    iterator = iter(iterable)
    try:
        first_elements = next(iterator)
    except StopIteration:
        return ()
    queues = [deque([element])
              for element in first_elements]

    def coordinate(queue):
        while True:
            if not queue:
                try:
                    elements = next(iterator)
                except StopIteration:
                    return
                for sub_queue, element in zip(queues, elements):
                    sub_queue.append(element)
            yield queue.popleft()

    return tuple(map(coordinate, queues))

让我们检查

>>> from itertools import count
>>> iterable = [count(), count()]
>>> result = transpose_finite_iterables(transpose_finite_iterable(iterable))
>>> result
(<generator object transpose_finite_iterables.<locals>.coordinate at ...>, <generator object transpose_finite_iterables.<locals>.coordinate at ...>)
>>> next(result[0])
0
>>> next(result[0])
1

合成

现在我们可以定义通用函数来处理可迭代的可迭代对象,其中一些是有限的,而另一个则可以使用functools.singledispatch装饰器(例如)

from collections import (abc,
                         deque)
from functools import singledispatch


@singledispatch
def transpose(object_):
    """
    Transposes given object.
    """
    raise TypeError('Unsupported object type: {type}.'
                    .format(type=type))


@transpose.register(abc.Iterable)
def transpose_finite_iterables(object_):
    """
    Transposes given iterable of finite iterables.
    """
    iterator = iter(object_)
    try:
        first_elements = next(iterator)
    except StopIteration:
        return ()
    queues = [deque([element])
              for element in first_elements]

    def coordinate(queue):
        while True:
            if not queue:
                try:
                    elements = next(iterator)
                except StopIteration:
                    return
                for sub_queue, element in zip(queues, elements):
                    sub_queue.append(element)
            yield queue.popleft()

    return tuple(map(coordinate, queues))


def transpose_finite_iterable(object_):
    """
    Transposes given finite iterable of iterables.
    """
    yield from zip(*object_)

try:
    transpose.register(abc.Collection, transpose_finite_iterable)
except AttributeError:
    # Python3.5-
    transpose.register(abc.Mapping, transpose_finite_iterable)
    transpose.register(abc.Sequence, transpose_finite_iterable)
    transpose.register(abc.Set, transpose_finite_iterable)

在有限非空可迭代项上的二元运算符类中,可以将其视为自身的逆(数学家称这种函数为“对合”)。


作为singledispatching 的奖励,我们可以处理numpy类似

import numpy as np
...
transpose.register(np.ndarray, np.transpose)

然后像

>>> array = np.arange(4).reshape((2,2))
>>> array
array([[0, 1],
       [2, 3]])
>>> transpose(array)
array([[0, 2],
       [1, 3]])

注意

由于transpose返回迭代器,并且如果有人希望在OP中具有的tuplelist则可以通过map内置函数(例如

>>> original = [('a', 1), ('b', 2), ('c', 3), ('d', 4)]
>>> tuple(map(list, transpose(original)))
(['a', 'b', 'c', 'd'], [1, 2, 3, 4])

广告

我已经添加推广解决方案lz0.5.0版本,可以像使用

>>> from lz.transposition import transpose
>>> list(map(tuple, transpose(zip(range(10), range(10, 20)))))
[(0, 1, 2, 3, 4, 5, 6, 7, 8, 9), (10, 11, 12, 13, 14, 15, 16, 17, 18, 19)]

聚苯乙烯

没有用于处理潜在无限迭代的潜在无限迭代的解决方案(至少很明显),但是这种情况并不常见。

Naive approach

def transpose_finite_iterable(iterable):
    return zip(*iterable)  # `itertools.izip` for Python 2 users

works fine for finite iterable (e.g. sequences like list/tuple/str) of (potentially infinite) iterables which can be illustrated like

| |a_00| |a_10| ... |a_n0| |
| |a_01| |a_11| ... |a_n1| |
| |... | |... | ... |... | |
| |a_0i| |a_1i| ... |a_ni| |
| |... | |... | ... |... | |

where

  • n in ℕ,
  • a_ij corresponds to j-th element of i-th iterable,

and after applying transpose_finite_iterable we get

| |a_00| |a_01| ... |a_0i| ... |
| |a_10| |a_11| ... |a_1i| ... |
| |... | |... | ... |... | ... |
| |a_n0| |a_n1| ... |a_ni| ... |

Python example of such case where a_ij == j, n == 2

>>> from itertools import count
>>> iterable = [count(), count()]
>>> result = transpose_finite_iterable(iterable)
>>> next(result)
(0, 0)
>>> next(result)
(1, 1)

But we can’t use transpose_finite_iterable again to return to structure of original iterable because result is an infinite iterable of finite iterables (tuples in our case):

>>> transpose_finite_iterable(result)
... hangs ...
Traceback (most recent call last):
  File "...", line 1, in ...
  File "...", line 2, in transpose_finite_iterable
MemoryError

So how can we deal with this case?

… and here comes the deque

After we take a look at docs of itertools.tee function, there is Python recipe that with some modification can help in our case

def transpose_finite_iterables(iterable):
    iterator = iter(iterable)
    try:
        first_elements = next(iterator)
    except StopIteration:
        return ()
    queues = [deque([element])
              for element in first_elements]

    def coordinate(queue):
        while True:
            if not queue:
                try:
                    elements = next(iterator)
                except StopIteration:
                    return
                for sub_queue, element in zip(queues, elements):
                    sub_queue.append(element)
            yield queue.popleft()

    return tuple(map(coordinate, queues))

let’s check

>>> from itertools import count
>>> iterable = [count(), count()]
>>> result = transpose_finite_iterables(transpose_finite_iterable(iterable))
>>> result
(<generator object transpose_finite_iterables.<locals>.coordinate at ...>, <generator object transpose_finite_iterables.<locals>.coordinate at ...>)
>>> next(result[0])
0
>>> next(result[0])
1

Synthesis

Now we can define general function for working with iterables of iterables ones of which are finite and another ones are potentially infinite using functools.singledispatch decorator like

from collections import (abc,
                         deque)
from functools import singledispatch


@singledispatch
def transpose(object_):
    """
    Transposes given object.
    """
    raise TypeError('Unsupported object type: {type}.'
                    .format(type=type))


@transpose.register(abc.Iterable)
def transpose_finite_iterables(object_):
    """
    Transposes given iterable of finite iterables.
    """
    iterator = iter(object_)
    try:
        first_elements = next(iterator)
    except StopIteration:
        return ()
    queues = [deque([element])
              for element in first_elements]

    def coordinate(queue):
        while True:
            if not queue:
                try:
                    elements = next(iterator)
                except StopIteration:
                    return
                for sub_queue, element in zip(queues, elements):
                    sub_queue.append(element)
            yield queue.popleft()

    return tuple(map(coordinate, queues))


def transpose_finite_iterable(object_):
    """
    Transposes given finite iterable of iterables.
    """
    yield from zip(*object_)

try:
    transpose.register(abc.Collection, transpose_finite_iterable)
except AttributeError:
    # Python3.5-
    transpose.register(abc.Mapping, transpose_finite_iterable)
    transpose.register(abc.Sequence, transpose_finite_iterable)
    transpose.register(abc.Set, transpose_finite_iterable)

which can be considered as its own inverse (mathematicians call this kind of functions “involutions”) in class of binary operators over finite non-empty iterables.


As a bonus of singledispatching we can handle numpy arrays like

import numpy as np
...
transpose.register(np.ndarray, np.transpose)

and then use it like

>>> array = np.arange(4).reshape((2,2))
>>> array
array([[0, 1],
       [2, 3]])
>>> transpose(array)
array([[0, 2],
       [1, 3]])

Note

Since transpose returns iterators and if someone wants to have a tuple of lists like in OP — this can be made additionally with map built-in function like

>>> original = [('a', 1), ('b', 2), ('c', 3), ('d', 4)]
>>> tuple(map(list, transpose(original)))
(['a', 'b', 'c', 'd'], [1, 2, 3, 4])

Advertisement

I’ve added generalized solution to lz package from 0.5.0 version which can be used like

>>> from lz.transposition import transpose
>>> list(map(tuple, transpose(zip(range(10), range(10, 20)))))
[(0, 1, 2, 3, 4, 5, 6, 7, 8, 9), (10, 11, 12, 13, 14, 15, 16, 17, 18, 19)]

P.S.

There is no solution (at least obvious) for handling potentially infinite iterable of potentially infinite iterables, but this case is less common though.


回答 6

这只是另一种实现方式,但是它对我有很大帮助,所以我在这里写下来:

具有以下数据结构:

X=[1,2,3,4]
Y=['a','b','c','d']
XY=zip(X,Y)

导致:

In: XY
Out: [(1, 'a'), (2, 'b'), (3, 'c'), (4, 'd')]

我认为,将其解压缩并返回原始格式的更Python方式是:

x,y=zip(*XY)

但这返回一个元组,因此如果您需要一个列表,则可以使用:

x,y=(list(x),list(y))

It’s only another way to do it but it helped me a lot so I write it here:

Having this data structure:

X=[1,2,3,4]
Y=['a','b','c','d']
XY=zip(X,Y)

Resulting in:

In: XY
Out: [(1, 'a'), (2, 'b'), (3, 'c'), (4, 'd')]

The more pythonic way to unzip it and go back to the original is this one in my opinion:

x,y=zip(*XY)

But this return a tuple so if you need a list you can use:

x,y=(list(x),list(y))

回答 7

考虑使用more_itertools.unzip

>>> from more_itertools import unzip
>>> original = [('a', 1), ('b', 2), ('c', 3), ('d', 4)]
>>> [list(x) for x in unzip(original)]
[['a', 'b', 'c', 'd'], [1, 2, 3, 4]]     

Consider using more_itertools.unzip:

>>> from more_itertools import unzip
>>> original = [('a', 1), ('b', 2), ('c', 3), ('d', 4)]
>>> [list(x) for x in unzip(original)]
[['a', 'b', 'c', 'd'], [1, 2, 3, 4]]     

回答 8

因为它返回元组(并且可以使用大量内存),所以zip(*zipped)对我来说,这个技巧似乎比有用的还要聪明。

这是一个实际上将为您提供zip反函数的函数。

def unzip(zipped):
    """Inverse of built-in zip function.
    Args:
        zipped: a list of tuples

    Returns:
        a tuple of lists

    Example:
        a = [1, 2, 3]
        b = [4, 5, 6]
        zipped = list(zip(a, b))

        assert zipped == [(1, 4), (2, 5), (3, 6)]

        unzipped = unzip(zipped)

        assert unzipped == ([1, 2, 3], [4, 5, 6])

    """

    unzipped = ()
    if len(zipped) == 0:
        return unzipped

    dim = len(zipped[0])

    for i in range(dim):
        unzipped = unzipped + ([tup[i] for tup in zipped], )

    return unzipped

Since it returns tuples (and can use tons of memory), the zip(*zipped) trick seems more clever than useful, to me.

Here’s a function that will actually give you the inverse of zip.

def unzip(zipped):
    """Inverse of built-in zip function.
    Args:
        zipped: a list of tuples

    Returns:
        a tuple of lists

    Example:
        a = [1, 2, 3]
        b = [4, 5, 6]
        zipped = list(zip(a, b))

        assert zipped == [(1, 4), (2, 5), (3, 6)]

        unzipped = unzip(zipped)

        assert unzipped == ([1, 2, 3], [4, 5, 6])

    """

    unzipped = ()
    if len(zipped) == 0:
        return unzipped

    dim = len(zipped[0])

    for i in range(dim):
        unzipped = unzipped + ([tup[i] for tup in zipped], )

    return unzipped

回答 9

先前的答案都没有有效地提供所需的输出,即列表的元组,而不是元组的列表。对于前者,你可以使用与。区别在于:tuplemap

res1 = list(zip(*original))              # [('a', 'b', 'c', 'd'), (1, 2, 3, 4)]
res2 = tuple(map(list, zip(*original)))  # (['a', 'b', 'c', 'd'], [1, 2, 3, 4])

此外,大多数以前的解决方案都假定使用Python 2.7,在Python 2.7中zip返回列表而不是迭代器。

对于Python 3.x,您需要将结果传递给诸如listtuple耗尽迭代器的函数。对于内存高效的迭代器,您可以省略外部list和外部tuple调用各自的解决方案。

None of the previous answers efficiently provide the required output, which is a tuple of lists, rather than a list of tuples. For the former, you can use tuple with map. Here’s the difference:

res1 = list(zip(*original))              # [('a', 'b', 'c', 'd'), (1, 2, 3, 4)]
res2 = tuple(map(list, zip(*original)))  # (['a', 'b', 'c', 'd'], [1, 2, 3, 4])

In addition, most of the previous solutions assume Python 2.7, where zip returns a list rather than an iterator.

For Python 3.x, you will need to pass the result to a function such as list or tuple to exhaust the iterator. For memory-efficient iterators, you can omit the outer list and tuple calls for the respective solutions.


回答 10

虽然zip(*seq)非常有用,但可能不适用于很长的序列,因为它将创建要传递的值的元组。例如,我一直在使用具有超过一百万个条目的坐标系,并且发现创建它的速度明显更快序列直接。

通用方法如下所示:

from collections import deque
seq = ((a1, b1, …), (a2, b2, …), …)
width = len(seq[0])
output = [deque(len(seq))] * width # preallocate memory
for element in seq:
    for s, item in zip(output, element):
        s.append(item)

但是,根据您要对结果执行的操作,收集的选择可能会产生很大的不同。在我的实际用例中,使用集而不使用内部循环比所有其他方法明显更快。

而且,正如其他人指出的那样,如果您要对数据集执行此操作,则可以改用Numpy或Pandas集合。

While zip(*seq) is very useful, it may be unsuitable for very long sequences as it will create a tuple of values to be passed in. For example, I’ve been working with a coordinate system with over a million entries and find it signifcantly faster to create the sequences directly.

A generic approach would be something like this:

from collections import deque
seq = ((a1, b1, …), (a2, b2, …), …)
width = len(seq[0])
output = [deque(len(seq))] * width # preallocate memory
for element in seq:
    for s, item in zip(output, element):
        s.append(item)

But, depending on what you want to do with the result, the choice of collection can make a big difference. In my actual use case, using sets and no internal loop, is noticeably faster than all other approaches.

And, as others have noted, if you are doing this with datasets, it might make sense to use Numpy or Pandas collections instead.


回答 11

虽然numpy数组和熊猫可能是更可取的,但此函数模仿zip(*args)as时的行为unzip(args)

允许在args迭代值时传递生成器。装饰cls和/或main_cls微管理容器初始化。

def unzip(items, cls=list, main_cls=tuple):
    """Zip function in reverse.

    :param items: Zipped-like iterable.
    :type  items: iterable

    :param cls: Callable that returns iterable with callable append attribute.
        Defaults to `list`.
    :type  cls: callable, optional

    :param main_cls: Callable that returns iterable with callable append
        attribute. Defaults to `tuple`.
    :type  main_cls: callable, optional

    :returns: Unzipped items in instances returned from `cls`, in an instance
        returned from `main_cls`.

    :Example:

        assert unzip(zip(["a","b","c"],[1,2,3])) == (["a","b",c"],[1,2,3])
        assert unzip([("a",1),("b",2),("c",3)]) == (["a","b","c"],[1,2,3])
        assert unzip([("a",1)], deque, list) == [deque(["a"]),deque([1])]
        assert unzip((["a"],["b"]), lambda i: deque(i,1)) == (deque(["b"]),)
    """
    items = iter(items)

    try:
        i = next(items)
    except StopIteration:
        return main_cls()

    unzipped = main_cls(cls([v]) for v in i)

    for i in items:
        for c,v in zip(unzipped,i):
            c.append(v)

    return unzipped

While numpy arrays and pandas may be preferrable, this function imitates the behavior of zip(*args) when called as unzip(args).

Allows for generators to be passed as args as it iterates through values. Decorate cls and/or main_cls to micro manage container initialization.

def unzip(items, cls=list, main_cls=tuple):
    """Zip function in reverse.

    :param items: Zipped-like iterable.
    :type  items: iterable

    :param cls: Callable that returns iterable with callable append attribute.
        Defaults to `list`.
    :type  cls: callable, optional

    :param main_cls: Callable that returns iterable with callable append
        attribute. Defaults to `tuple`.
    :type  main_cls: callable, optional

    :returns: Unzipped items in instances returned from `cls`, in an instance
        returned from `main_cls`.

    :Example:

        assert unzip(zip(["a","b","c"],[1,2,3])) == (["a","b",c"],[1,2,3])
        assert unzip([("a",1),("b",2),("c",3)]) == (["a","b","c"],[1,2,3])
        assert unzip([("a",1)], deque, list) == [deque(["a"]),deque([1])]
        assert unzip((["a"],["b"]), lambda i: deque(i,1)) == (deque(["b"]),)
    """
    items = iter(items)

    try:
        i = next(items)
    except StopIteration:
        return main_cls()

    unzipped = main_cls(cls([v]) for v in i)

    for i in items:
        for c,v in zip(unzipped,i):
            c.append(v)

    return unzipped

如何在python中识别未知的日期时间时区

问题:如何在python中识别未知的日期时间时区

我需要做什么

我有一个不带时区的datetime对象,我需要向其添加一个时区,以便能够将其与其他时区可感知的datetime对象进行比较。对于这一旧情况,我不想将我的整个应用程序转换为时区。

我尝试过的

首先,演示该问题:

Python 2.6.1 (r261:67515, Jun 24 2010, 21:47:49) 
[GCC 4.2.1 (Apple Inc. build 5646)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import datetime
>>> import pytz
>>> unaware = datetime.datetime(2011,8,15,8,15,12,0)
>>> unaware
datetime.datetime(2011, 8, 15, 8, 15, 12)
>>> aware = datetime.datetime(2011,8,15,8,15,12,0,pytz.UTC)
>>> aware
datetime.datetime(2011, 8, 15, 8, 15, 12, tzinfo=<UTC>)
>>> aware == unaware
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: can't compare offset-naive and offset-aware datetimes

首先,我尝试了astimezone:

>>> unaware.astimezone(pytz.UTC)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
ValueError: astimezone() cannot be applied to a naive datetime
>>>

这次失败并不令人惊讶,因为它实际上是在尝试进行转换。替换似乎是一个更好的选择(根据Python:如何获取“时区感知”的datetime.today()值?):

>>> unaware.replace(tzinfo=pytz.UTC)
datetime.datetime(2011, 8, 15, 8, 15, 12, tzinfo=<UTC>)
>>> unaware == aware
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: can't compare offset-naive and offset-aware datetimes
>>> 

但是正如您所看到的,replace似乎设置了tzinfo,但没有使对象知道。我准备回过头来解析输入字符串以在解析它之前有一个时区(如果重要的话,我正在使用dateutil进行解析),但这似乎令人难以置信。

另外,我在python 2.6和python 2.7中都尝试过,结果相同。

语境

我正在为某些数据文件编写解析器。我需要支持一种旧格式,其中日期字符串没有时区指示符。我已经修复了数据源,但是我仍然需要支持旧数据格式。由于各种业务BS的原因,不能一次转换旧数据。通常,我不喜欢对默认时区进行硬编码的想法,在这种情况下,这似乎是最好的选择。我完全有把握地知道所有有问题的旧数据都位于UTC中,因此在这种情况下,我准备接受默认设置的风险。

What I need to do

I have a timezone-unaware datetime object, to which I need to add a time zone in order to be able to compare it with other timezone-aware datetime objects. I do not want to convert my entire application to timezone unaware for this one legacy case.

What I’ve Tried

First, to demonstrate the problem:

Python 2.6.1 (r261:67515, Jun 24 2010, 21:47:49) 
[GCC 4.2.1 (Apple Inc. build 5646)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import datetime
>>> import pytz
>>> unaware = datetime.datetime(2011,8,15,8,15,12,0)
>>> unaware
datetime.datetime(2011, 8, 15, 8, 15, 12)
>>> aware = datetime.datetime(2011,8,15,8,15,12,0,pytz.UTC)
>>> aware
datetime.datetime(2011, 8, 15, 8, 15, 12, tzinfo=<UTC>)
>>> aware == unaware
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: can't compare offset-naive and offset-aware datetimes

First, I tried astimezone:

>>> unaware.astimezone(pytz.UTC)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
ValueError: astimezone() cannot be applied to a naive datetime
>>>

It’s not terribly surprising this failed, since it’s actually trying to do a conversion. Replace seemed like a better choice (as per Python: How to get a value of datetime.today() that is “timezone aware”?):

>>> unaware.replace(tzinfo=pytz.UTC)
datetime.datetime(2011, 8, 15, 8, 15, 12, tzinfo=<UTC>)
>>> unaware == aware
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: can't compare offset-naive and offset-aware datetimes
>>> 

But as you can see, replace seems to set the tzinfo, but not make the object aware. I’m getting ready to fall back to doctoring the input string to have a timezone before parsing it (I’m using dateutil for parsing, if that matters), but that seems incredibly kludgy.

Also, I’ve tried this in both python 2.6 and python 2.7, with the same results.

Context

I am writing a parser for some data files. There is an old format I need to support where the date string does not have a timezone indicator. I’ve already fixed the data source, but I still need to support the legacy data format. A one time conversion of the legacy data is not an option for various business BS reasons. While in general, I do not like the idea of hard-coding a default timezone, in this case it seems like the best option. I know with reasonable confidence that all the legacy data in question is in UTC, so I’m prepared to accept the risk of defaulting to that in this case.


回答 0

通常,要使原始的datetime时区感知,请使用localize方法

import datetime
import pytz

unaware = datetime.datetime(2011, 8, 15, 8, 15, 12, 0)
aware = datetime.datetime(2011, 8, 15, 8, 15, 12, 0, pytz.UTC)

now_aware = pytz.utc.localize(unaware)
assert aware == now_aware

对于UTC时区,localize由于没有夏令时计算可处理,因此实际上没有必要使用:

now_aware = unaware.replace(tzinfo=pytz.UTC)

作品。(.replace返回一个新的日期时间;它不会修改unaware。)

In general, to make a naive datetime timezone-aware, use the localize method:

import datetime
import pytz

unaware = datetime.datetime(2011, 8, 15, 8, 15, 12, 0)
aware = datetime.datetime(2011, 8, 15, 8, 15, 12, 0, pytz.UTC)

now_aware = pytz.utc.localize(unaware)
assert aware == now_aware

For the UTC timezone, it is not really necessary to use localize since there is no daylight savings time calculation to handle:

now_aware = unaware.replace(tzinfo=pytz.UTC)

works. (.replace returns a new datetime; it does not modify unaware.)


回答 1

所有这些示例都使用一个外部模块,但是您可以仅使用datetime模块来达到相同的结果,SO答案中也有介绍:

from datetime import datetime
from datetime import timezone

dt = datetime.now()
dt.replace(tzinfo=timezone.utc)

print(dt.replace(tzinfo=timezone.utc).isoformat())
'2017-01-12T22:11:31+00:00'

更少的依赖项,没有pytz问题。

注意:如果您希望将其与python3和python2一起使用,则也可以将其用于时区导入(针对UTC进行硬编码):

try:
    from datetime import timezone
    utc = timezone.utc
except ImportError:
    #Hi there python2 user
    class UTC(tzinfo):
        def utcoffset(self, dt):
            return timedelta(0)
        def tzname(self, dt):
            return "UTC"
        def dst(self, dt):
            return timedelta(0)
    utc = UTC()

All of these examples use an external module, but you can achieve the same result using just the datetime module, as also presented in this SO answer:

from datetime import datetime
from datetime import timezone

dt = datetime.now()
dt.replace(tzinfo=timezone.utc)

print(dt.replace(tzinfo=timezone.utc).isoformat())
'2017-01-12T22:11:31+00:00'

Fewer dependencies and no pytz issues.

NOTE: If you wish to use this with python3 and python2, you can use this as well for the timezone import (hardcoded for UTC):

try:
    from datetime import timezone
    utc = timezone.utc
except ImportError:
    #Hi there python2 user
    class UTC(tzinfo):
        def utcoffset(self, dt):
            return timedelta(0)
        def tzname(self, dt):
            return "UTC"
        def dst(self, dt):
            return timedelta(0)
    utc = UTC()

回答 2

我曾经使用过从dt_aware到dt_unaware

dt_unaware = dt_aware.replace(tzinfo=None)

和dt_unware到dt_aware

from pytz import timezone
localtz = timezone('Europe/Lisbon')
dt_aware = localtz.localize(dt_unware)

但之前回答也是一个很好的解决方案。

I had use from dt_aware to dt_unaware

dt_unaware = dt_aware.replace(tzinfo=None)

and dt_unware to dt_aware

from pytz import timezone
localtz = timezone('Europe/Lisbon')
dt_aware = localtz.localize(dt_unware)

but answer before is also a good solution.


回答 3

我在Django中使用以下语句将无意识的时间转换为有意识的时间:

from django.utils import timezone

dt_aware = timezone.make_aware(dt_unaware, timezone.get_current_timezone())

I use this statement in Django to convert an unaware time to an aware:

from django.utils import timezone

dt_aware = timezone.make_aware(dt_unaware, timezone.get_current_timezone())

回答 4

我同意之前的回答,如果可以开始使用UTC,也可以。但我认为这也是人们使用tz感知值(其日期时间具有非UTC本地时区)的常见情况

如果只是按名称命名,则可能会推断replace()将适用,并产生正确的日期时间感知对象。不是这种情况。

replace(tzinfo = …)的行为似乎是随机的。因此,它是没有用的。不要使用这个!

本地化是正确使用的功能。例:

localdatetime_aware = tz.localize(datetime_nonaware)

或更完整的示例:

import pytz
from datetime import datetime
pytz.timezone('Australia/Melbourne').localize(datetime.now())

给我一个当前本地时间的时区感知日期时间值:

datetime.datetime(2017, 11, 3, 7, 44, 51, 908574, tzinfo=<DstTzInfo 'Australia/Melbourne' AEDT+11:00:00 DST>)

I agree with the previous answers, and is fine if you are ok to start in UTC. But I think it is also a common scenario for people to work with a tz aware value that has a datetime that has a non UTC local timezone.

If you were to just go by name, one would probably infer replace() will be applicable and produce the right datetime aware object. This is not the case.

the replace( tzinfo=… ) seems to be random in its behaviour. It is therefore useless. Do not use this!

localize is the correct function to use. Example:

localdatetime_aware = tz.localize(datetime_nonaware)

Or a more complete example:

import pytz
from datetime import datetime
pytz.timezone('Australia/Melbourne').localize(datetime.now())

gives me a timezone aware datetime value of the current local time:

datetime.datetime(2017, 11, 3, 7, 44, 51, 908574, tzinfo=<DstTzInfo 'Australia/Melbourne' AEDT+11:00:00 DST>)

回答 5

使用dateutil.tz.tzlocal()来获取时区在你的使用datetime.datetime.now()datetime.datetime.astimezone()

from datetime import datetime
from dateutil import tz

unlocalisedDatetime = datetime.now()

localisedDatetime1 = datetime.now(tz = tz.tzlocal())
localisedDatetime2 = datetime(2017, 6, 24, 12, 24, 36, tz.tzlocal())
localisedDatetime3 = unlocalisedDatetime.astimezone(tz = tz.tzlocal())
localisedDatetime4 = unlocalisedDatetime.replace(tzinfo = tz.tzlocal())

请注意,这datetime.astimezone将首先将您的datetime对象转换为UTC,然后转换为时区,这datetime.replace与使用原始时区信息为进行调用相同None

Use dateutil.tz.tzlocal() to get the timezone in your usage of datetime.datetime.now() and datetime.datetime.astimezone():

from datetime import datetime
from dateutil import tz

unlocalisedDatetime = datetime.now()

localisedDatetime1 = datetime.now(tz = tz.tzlocal())
localisedDatetime2 = datetime(2017, 6, 24, 12, 24, 36, tz.tzlocal())
localisedDatetime3 = unlocalisedDatetime.astimezone(tz = tz.tzlocal())
localisedDatetime4 = unlocalisedDatetime.replace(tzinfo = tz.tzlocal())

Note that datetime.astimezone will first convert your datetime object to UTC then into the timezone, which is the same as calling datetime.replace with the original timezone information being None.


回答 6

这将@Sérgio和@unutbu的答案整理成代码。它将与pytz.timezone对象或IANA时区字符串“兼容” 。

def make_tz_aware(dt, tz='UTC', is_dst=None):
    """Add timezone information to a datetime object, only if it is naive."""
    tz = dt.tzinfo or tz
    try:
        tz = pytz.timezone(tz)
    except AttributeError:
        pass
    return tz.localize(dt, is_dst=is_dst) 

似乎应该做什么datetime.localize()(或.inform().awarify()),接受tz参数的字符串和时区对象,如果未指定时区,则默认为UTC。

This codifies @Sérgio and @unutbu’s answers. It will “just work” with either a pytz.timezone object or an IANA Time Zone string.

def make_tz_aware(dt, tz='UTC', is_dst=None):
    """Add timezone information to a datetime object, only if it is naive."""
    tz = dt.tzinfo or tz
    try:
        tz = pytz.timezone(tz)
    except AttributeError:
        pass
    return tz.localize(dt, is_dst=is_dst) 

This seems like what datetime.localize() (or .inform() or .awarify()) should do, accept both strings and timezone objects for the tz argument and default to UTC if no time zone is specified.


回答 7

Python 3.9添加了zoneinfo模块,因此现在仅需要标准库!

from zoneinfo import ZoneInfo
from datetime import datetime
unaware = datetime(2020, 10, 31, 12)

附加时区:

>>> unaware.replace(tzinfo=ZoneInfo('Asia/Tokyo'))
datetime.datetime(2020, 10, 31, 12, 0, tzinfo=zoneinfo.ZoneInfo(key='Asia/Tokyo'))
>>> str(_)
'2020-10-31 12:00:00+09:00'

附加系统的本地时区:

>>> unaware.replace(tzinfo=ZoneInfo('localtime'))
datetime.datetime(2020, 10, 31, 12, 0, tzinfo=zoneinfo.ZoneInfo(key='localtime'))
>>> str(_)
'2020-10-31 12:00:00+01:00'

随后,它将正确转换为其他时区:

>>> unaware.replace(tzinfo=ZoneInfo('localtime')).astimezone(ZoneInfo('Asia/Tokyo'))
datetime.datetime(2020, 10, 31, 20, 0, tzinfo=backports.zoneinfo.ZoneInfo(key='Asia/Tokyo'))
>>> str(_)
'2020-10-31 20:00:00+09:00'

可用时区的维基百科列表


有一个backport允许在Python 3.6至3.8中使用

sudo pip install backports.zoneinfo

然后:

from backports.zoneinfo import ZoneInfo

Python 3.9 adds the zoneinfo module so now only the the standard library is needed!

from zoneinfo import ZoneInfo
from datetime import datetime
unaware = datetime(2020, 10, 31, 12)

Attach a timezone:

>>> unaware.replace(tzinfo=ZoneInfo('Asia/Tokyo'))
datetime.datetime(2020, 10, 31, 12, 0, tzinfo=zoneinfo.ZoneInfo(key='Asia/Tokyo'))
>>> str(_)
'2020-10-31 12:00:00+09:00'

Attach the system’s local timezone:

>>> unaware.replace(tzinfo=ZoneInfo('localtime'))
datetime.datetime(2020, 10, 31, 12, 0, tzinfo=zoneinfo.ZoneInfo(key='localtime'))
>>> str(_)
'2020-10-31 12:00:00+01:00'

Subsequently it is properly converted to other timezones:

>>> unaware.replace(tzinfo=ZoneInfo('localtime')).astimezone(ZoneInfo('Asia/Tokyo'))
datetime.datetime(2020, 10, 31, 20, 0, tzinfo=backports.zoneinfo.ZoneInfo(key='Asia/Tokyo'))
>>> str(_)
'2020-10-31 20:00:00+09:00'

Wikipedia list of available time zones


There is a backport to allow use in Python 3.6 to 3.8:

sudo pip install backports.zoneinfo

Then:

from backports.zoneinfo import ZoneInfo

回答 8

以unutbu的答案格式;我制作了一个实用程序模块,以更直观的语法处理此类问题。可以通过pip安装。

import datetime
import saturn

unaware = datetime.datetime(2011, 8, 15, 8, 15, 12, 0)
now_aware = saturn.fix_naive(unaware)

now_aware_madrid = saturn.fix_naive(unaware, 'Europe/Madrid')

In the format of unutbu’s answer; I made a utility module that handles things like this, with more intuitive syntax. Can be installed with pip.

import datetime
import saturn

unaware = datetime.datetime(2011, 8, 15, 8, 15, 12, 0)
now_aware = saturn.fix_naive(unaware)

now_aware_madrid = saturn.fix_naive(unaware, 'Europe/Madrid')

回答 9

对于那些只想使时区知道日期时间的人

import datetime
import pytz

datetime.datetime(2019, 12, 7, tzinfo=pytz.UTC)

for those that just want to make a timezone aware datetime

import datetime
import pytz

datetime.datetime(2019, 12, 7, tzinfo=pytz.UTC)

回答 10

对Python来说还很陌生,我遇到了同样的问题。我发现此解决方案非常简单,对我来说也可以正常工作(Python 3.6):

unaware=parser.parse("2020-05-01 0:00:00")
aware=unaware.replace(tzinfo=tz.tzlocal()).astimezone(tz.tzlocal())

quite new to Python and I encountered the same issue. I find this solution quite simple and for me it works fine (Python 3.6):

unaware=parser.parse("2020-05-01 0:00:00")
aware=unaware.replace(tzinfo=tz.tzlocal()).astimezone(tz.tzlocal())

回答 11

在时区之间切换

import pytz
from datetime import datetime

other_tz = pytz.timezone('Europe/Madrid')

# From random aware datetime...
aware_datetime = datetime.utcnow().astimezone(other_tz)
>> 2020-05-21 08:28:26.984948+02:00

# 1. Change aware datetime to UTC and remove tzinfo to obtain an unaware datetime
unaware_datetime = aware_datetime.astimezone(pytz.UTC).replace(tzinfo=None)
>> 2020-05-21 06:28:26.984948

# 2. Set tzinfo to UTC directly on an unaware datetime to obtain an utc aware datetime
aware_datetime_utc = unaware_datetime.replace(tzinfo=pytz.UTC)
>> 2020-05-21 06:28:26.984948+00:00

# 3. Convert the aware utc datetime into another timezone
reconverted_aware_datetime = aware_datetime_utc.astimezone(other_tz)
>> 2020-05-21 08:28:26.984948+02:00

# Initial Aware Datetime and Reconverted Aware Datetime are equal
print(aware_datetime1 == aware_datetime2)
>> True

Changing between timezones

import pytz
from datetime import datetime

other_tz = pytz.timezone('Europe/Madrid')

# From random aware datetime...
aware_datetime = datetime.utcnow().astimezone(other_tz)
>> 2020-05-21 08:28:26.984948+02:00

# 1. Change aware datetime to UTC and remove tzinfo to obtain an unaware datetime
unaware_datetime = aware_datetime.astimezone(pytz.UTC).replace(tzinfo=None)
>> 2020-05-21 06:28:26.984948

# 2. Set tzinfo to UTC directly on an unaware datetime to obtain an utc aware datetime
aware_datetime_utc = unaware_datetime.replace(tzinfo=pytz.UTC)
>> 2020-05-21 06:28:26.984948+00:00

# 3. Convert the aware utc datetime into another timezone
reconverted_aware_datetime = aware_datetime_utc.astimezone(other_tz)
>> 2020-05-21 08:28:26.984948+02:00

# Initial Aware Datetime and Reconverted Aware Datetime are equal
print(aware_datetime1 == aware_datetime2)
>> True

移除Python unicode字符串中的重音符号的最佳方法是什么?

问题:移除Python unicode字符串中的重音符号的最佳方法是什么?

我在Python中有一个Unicode字符串,我想删除所有的重音符号(变音符号)。

我在网上发现了一种用Java实现此目的的优雅方法:

  1. 将Unicode字符串转换为长规范化格式(带有单独的字母和变音符号)
  2. 删除Unicode类型为“变音符号”的所有字符。

我是否需要安装pyICU之类的库,还是仅使用python标准库就可以?那python 3呢?

重要说明:我想避免使用带有重音符号到非重音符号的显式映射的代码。

I have a Unicode string in Python, and I would like to remove all the accents (diacritics).

I found on the Web an elegant way to do this in Java:

  1. convert the Unicode string to its long normalized form (with a separate character for letters and diacritics)
  2. remove all the characters whose Unicode type is “diacritic”.

Do I need to install a library such as pyICU or is this possible with just the python standard library? And what about python 3?

Important note: I would like to avoid code with an explicit mapping from accented characters to their non-accented counterpart.


回答 0

Unidecode是正确的答案。它将所有unicode字符串音译为ASCII文本中最接近的可能表示形式。

例:

accented_string = u'Málaga'
# accented_string is of type 'unicode'
import unidecode
unaccented_string = unidecode.unidecode(accented_string)
# unaccented_string contains 'Malaga'and is of type 'str'

Unidecode is the correct answer for this. It transliterates any unicode string into the closest possible representation in ascii text.

Example:

accented_string = u'Málaga'
# accented_string is of type 'unicode'
import unidecode
unaccented_string = unidecode.unidecode(accented_string)
# unaccented_string contains 'Malaga'and is of type 'str'

回答 1

这个怎么样:

import unicodedata
def strip_accents(s):
   return ''.join(c for c in unicodedata.normalize('NFD', s)
                  if unicodedata.category(c) != 'Mn')

这也适用于希腊字母:

>>> strip_accents(u"A \u00c0 \u0394 \u038E")
u'A A \u0394 \u03a5'
>>> 

字符类别 “锰”表示Nonspacing_Mark,这是类似于MiniQuark的答案unicodedata.combining(我没想到unicodedata.combining的,但它可能是更好的解决方案,因为它更明确)。

请记住,这些操作可能会大大改变文本的含义。口音,Umlauts等不是“装饰”。

How about this:

import unicodedata
def strip_accents(s):
   return ''.join(c for c in unicodedata.normalize('NFD', s)
                  if unicodedata.category(c) != 'Mn')

This works on greek letters, too:

>>> strip_accents(u"A \u00c0 \u0394 \u038E")
u'A A \u0394 \u03a5'
>>> 

The character category “Mn” stands for Nonspacing_Mark, which is similar to unicodedata.combining in MiniQuark’s answer (I didn’t think of unicodedata.combining, but it is probably the better solution, because it’s more explicit).

And keep in mind, these manipulations may significantly alter the meaning of the text. Accents, Umlauts etc. are not “decoration”.


回答 2

我刚刚在网上找到了这个答案:

import unicodedata

def remove_accents(input_str):
    nfkd_form = unicodedata.normalize('NFKD', input_str)
    only_ascii = nfkd_form.encode('ASCII', 'ignore')
    return only_ascii

它可以正常工作(例如,对于法语),但是我认为第二步(删除重音符号)比丢弃非ASCII字符要好,因为这对于某些语言(例如希腊文)会失败。最好的解决方案可能是显式删除标记为变音符号的unicode字符。

编辑:这可以解决问题:

import unicodedata

def remove_accents(input_str):
    nfkd_form = unicodedata.normalize('NFKD', input_str)
    return u"".join([c for c in nfkd_form if not unicodedata.combining(c)])

unicodedata.combining(c)如果该字符c可以与前面的字符组合,则返回true ,这主要是如果它是一个变音符。

编辑2remove_accents需要一个unicode字符串,而不是字节字符串。如果您有字节字符串,则必须将其解码为一个unicode字符串,如下所示:

encoding = "utf-8" # or iso-8859-15, or cp1252, or whatever encoding you use
byte_string = b"café"  # or simply "café" before python 3.
unicode_string = byte_string.decode(encoding)

I just found this answer on the Web:

import unicodedata

def remove_accents(input_str):
    nfkd_form = unicodedata.normalize('NFKD', input_str)
    only_ascii = nfkd_form.encode('ASCII', 'ignore')
    return only_ascii

It works fine (for French, for example), but I think the second step (removing the accents) could be handled better than dropping the non-ASCII characters, because this will fail for some languages (Greek, for example). The best solution would probably be to explicitly remove the unicode characters that are tagged as being diacritics.

Edit: this does the trick:

import unicodedata

def remove_accents(input_str):
    nfkd_form = unicodedata.normalize('NFKD', input_str)
    return u"".join([c for c in nfkd_form if not unicodedata.combining(c)])

unicodedata.combining(c) will return true if the character c can be combined with the preceding character, that is mainly if it’s a diacritic.

Edit 2: remove_accents expects a unicode string, not a byte string. If you have a byte string, then you must decode it into a unicode string like this:

encoding = "utf-8" # or iso-8859-15, or cp1252, or whatever encoding you use
byte_string = b"café"  # or simply "café" before python 3.
unicode_string = byte_string.decode(encoding)

回答 3

实际上,我正在开发与项目兼容的python 2.6、2.7和3.4,并且必须从免费用户条目中创建ID。

多亏了您,我创建了一个可以实现奇迹的功能。

import re
import unicodedata

def strip_accents(text):
    """
    Strip accents from input String.

    :param text: The input string.
    :type text: String.

    :returns: The processed String.
    :rtype: String.
    """
    try:
        text = unicode(text, 'utf-8')
    except (TypeError, NameError): # unicode is a default on python 3 
        pass
    text = unicodedata.normalize('NFD', text)
    text = text.encode('ascii', 'ignore')
    text = text.decode("utf-8")
    return str(text)

def text_to_id(text):
    """
    Convert input text to id.

    :param text: The input string.
    :type text: String.

    :returns: The processed String.
    :rtype: String.
    """
    text = strip_accents(text.lower())
    text = re.sub('[ ]+', '_', text)
    text = re.sub('[^0-9a-zA-Z_-]', '', text)
    return text

结果:

text_to_id("Montréal, über, 12.89, Mère, Françoise, noël, 889")
>>> 'montreal_uber_1289_mere_francoise_noel_889'

Actually I work on project compatible python 2.6, 2.7 and 3.4 and I have to create IDs from free user entries.

Thanks to you, I have created this function that works wonders.

import re
import unicodedata

def strip_accents(text):
    """
    Strip accents from input String.

    :param text: The input string.
    :type text: String.

    :returns: The processed String.
    :rtype: String.
    """
    try:
        text = unicode(text, 'utf-8')
    except (TypeError, NameError): # unicode is a default on python 3 
        pass
    text = unicodedata.normalize('NFD', text)
    text = text.encode('ascii', 'ignore')
    text = text.decode("utf-8")
    return str(text)

def text_to_id(text):
    """
    Convert input text to id.

    :param text: The input string.
    :type text: String.

    :returns: The processed String.
    :rtype: String.
    """
    text = strip_accents(text.lower())
    text = re.sub('[ ]+', '_', text)
    text = re.sub('[^0-9a-zA-Z_-]', '', text)
    return text

result:

text_to_id("Montréal, über, 12.89, Mère, Françoise, noël, 889")
>>> 'montreal_uber_1289_mere_francoise_noel_889'

回答 4

这不仅处理重音,而且还处理“笔画”(如ø等):

import unicodedata as ud

def rmdiacritics(char):
    '''
    Return the base character of char, by "removing" any
    diacritics like accents or curls and strokes and the like.
    '''
    desc = ud.name(char)
    cutoff = desc.find(' WITH ')
    if cutoff != -1:
        desc = desc[:cutoff]
        try:
            char = ud.lookup(desc)
        except KeyError:
            pass  # removing "WITH ..." produced an invalid name
    return char

这是我能想到的最优雅的方式(alexis在此页的评论中已经提到),尽管我认为这确实不是很优雅。实际上,正如注释中所指出的那样,这更像是一种黑客,因为Unicode名称是–实际上只是名称,它们不能保证其一致性或任何形式。

由于它们的Unicode名称中不包含“ WITH”,因此仍有一些特殊的字母无法对此进行处理,例如转弯和倒转字母。无论如何,这取决于您想做什么。有时我需要重音符号来实现字典的排序顺序。

编辑说明:

合并了注释中的建议(处理查找错误,Python-3代码)。

This handles not only accents, but also “strokes” (as in ø etc.):

import unicodedata as ud

def rmdiacritics(char):
    '''
    Return the base character of char, by "removing" any
    diacritics like accents or curls and strokes and the like.
    '''
    desc = ud.name(char)
    cutoff = desc.find(' WITH ')
    if cutoff != -1:
        desc = desc[:cutoff]
        try:
            char = ud.lookup(desc)
        except KeyError:
            pass  # removing "WITH ..." produced an invalid name
    return char

This is the most elegant way I can think of (and it has been mentioned by alexis in a comment on this page), although I don’t think it is very elegant indeed. In fact, it’s more of a hack, as pointed out in comments, since Unicode names are – really just names, they give no guarantee to be consistent or anything.

There are still special letters that are not handled by this, such as turned and inverted letters, since their unicode name does not contain ‘WITH’. It depends on what you want to do anyway. I sometimes needed accent stripping for achieving dictionary sort order.

EDIT NOTE:

Incorporated suggestions from the comments (handling lookup errors, Python-3 code).


回答 5

回应@MiniQuark的回答:

我试图读取一个半法语的csv文件(包含重音符号)以及一些最终会变成整数和浮点数的字符串。作为测试,我创建了一个如下所示的test.txt文件:

蒙特利尔,于伯,12.89,梅尔,弗朗索瓦,诺尔,889

我必须包括行23使其起作用(在python票证中找到),并包含@Jabba的注释:

import sys 
reload(sys) 
sys.setdefaultencoding("utf-8")
import csv
import unicodedata

def remove_accents(input_str):
    nkfd_form = unicodedata.normalize('NFKD', unicode(input_str))
    return u"".join([c for c in nkfd_form if not unicodedata.combining(c)])

with open('test.txt') as f:
    read = csv.reader(f)
    for row in read:
        for element in row:
            print remove_accents(element)

结果:

Montreal
uber
12.89
Mere
Francoise
noel
889

(注意:我在Mac OS X 10.8.4上并使用Python 2.7.3)

In response to @MiniQuark’s answer:

I was trying to read in a csv file that was half-French (containing accents) and also some strings which would eventually become integers and floats. As a test, I created a test.txt file that looked like this:

Montréal, über, 12.89, Mère, Françoise, noël, 889

I had to include lines 2 and 3 to get it to work (which I found in a python ticket), as well as incorporate @Jabba’s comment:

import sys 
reload(sys) 
sys.setdefaultencoding("utf-8")
import csv
import unicodedata

def remove_accents(input_str):
    nkfd_form = unicodedata.normalize('NFKD', unicode(input_str))
    return u"".join([c for c in nkfd_form if not unicodedata.combining(c)])

with open('test.txt') as f:
    read = csv.reader(f)
    for row in read:
        for element in row:
            print remove_accents(element)

The result:

Montreal
uber
12.89
Mere
Francoise
noel
889

(Note: I am on Mac OS X 10.8.4 and using Python 2.7.3)


回答 6

gensim.utils.deaccent(文本)Gensim -人类主题建模

'Sef chomutovskych komunistu dostal postou bily prasek'

另一个解决方案是unidecode

需要注意的是,用建议的解决方案unicodedata通常只在某些字符去掉口音(例如,它变成'ł''',而不是进入'l')。

gensim.utils.deaccent(text) from Gensim – topic modelling for humans:

'Sef chomutovskych komunistu dostal postou bily prasek'

Another solution is unidecode.

Note that the suggested solution with unicodedata typically removes accents only in some character (e.g. it turns 'ł' into '', rather than into 'l').


回答 7

一些语言结合了变音符号作为语言字母和重音符号来指定重音。

我认为更明确地指定要去除的折光度数更安全:

def strip_accents(string, accents=('COMBINING ACUTE ACCENT', 'COMBINING GRAVE ACCENT', 'COMBINING TILDE')):
    accents = set(map(unicodedata.lookup, accents))
    chars = [c for c in unicodedata.normalize('NFD', string) if c not in accents]
    return unicodedata.normalize('NFC', ''.join(chars))

Some languages have combining diacritics as language letters and accent diacritics to specify accent.

I think it is more safe to specify explicitly what diactrics you want to strip:

def strip_accents(string, accents=('COMBINING ACUTE ACCENT', 'COMBINING GRAVE ACCENT', 'COMBINING TILDE')):
    accents = set(map(unicodedata.lookup, accents))
    chars = [c for c in unicodedata.normalize('NFD', string) if c not in accents]
    return unicodedata.normalize('NFC', ''.join(chars))

将Unicode字符串转换为Python中的字符串(包含多余的符号)

问题:将Unicode字符串转换为Python中的字符串(包含多余的符号)

如何将Unicode字符串(包含额外的字符,如£$等)转换为Python字符串?

How do you convert a Unicode string (containing extra characters like £ $, etc.) into a Python string?


回答 0

title = u"Klüft skräms inför på fédéral électoral große"
import unicodedata
unicodedata.normalize('NFKD', title).encode('ascii','ignore')
'Kluft skrams infor pa federal electoral groe'
title = u"Klüft skräms inför på fédéral électoral große"
import unicodedata
unicodedata.normalize('NFKD', title).encode('ascii','ignore')
'Kluft skrams infor pa federal electoral groe'

回答 1

如果不需要翻译非ASCII字符,则可以使用编码为ASCII:

>>> a=u"aaaàçççñññ"
>>> type(a)
<type 'unicode'>
>>> a.encode('ascii','ignore')
'aaa'
>>> a.encode('ascii','replace')
'aaa???????'
>>>

You can use encode to ASCII if you don’t need to translate the non-ASCII characters:

>>> a=u"aaaàçççñññ"
>>> type(a)
<type 'unicode'>
>>> a.encode('ascii','ignore')
'aaa'
>>> a.encode('ascii','replace')
'aaa???????'
>>>

回答 2

>>> text=u'abcd'
>>> str(text)
'abcd'

如果字符串仅包含ascii字符。

>>> text=u'abcd'
>>> str(text)
'abcd'

If the string only contains ascii characters.


回答 3

如果您有Unicode字符串,并且想要将其写入文件或其他序列化形式,则必须首先将其编码为可以存储的特定表示形式。有几种常见的Unicode编码,例如UTF-16(大多数Unicode字符使用两个字节)或UTF-8(1-4个字节/代码点,取决于字符)等。要将该字符串转换为特定的编码,您可以可以使用:

>>> s= u'£10'
>>> s.encode('utf8')
'\xc2\x9c10'
>>> s.encode('utf16')
'\xff\xfe\x9c\x001\x000\x00'

可以将此原始字节字符串写入文件。但是,请注意,当读回它时,您必须知道它所使用的编码并使用相同的编码对其进行解码。

写入文件时,您可以使用编解码器模块摆脱此手动编码/解码过程。因此,要打开将所有Unicode字符串编码为UTF-8的文件,请使用:

import codecs
f = codecs.open('path/to/file.txt','w','utf8')
f.write(my_unicode_string)  # Stored on disk as UTF-8

请注意,正在使用这些文件的其他任何文件,如果要读取它们,都必须了解文件的编码格式。如果您是唯一一个进行读/写的人,那么这不是问题,否则请确保以一种其他任何使用文件都可以理解的形式书写。

在Python 3中,这种形式的文件访问是默认的,并且内置open函数将采用编码参数,并始终与以文本模式打开的文件在Unicode字符串(Python 3中的默认字符串对象)之间进行转换。

If you have a Unicode string, and you want to write this to a file, or other serialised form, you must first encode it into a particular representation that can be stored. There are several common Unicode encodings, such as UTF-16 (uses two bytes for most Unicode characters) or UTF-8 (1-4 bytes / codepoint depending on the character), etc. To convert that string into a particular encoding, you can use:

>>> s= u'£10'
>>> s.encode('utf8')
'\xc2\x9c10'
>>> s.encode('utf16')
'\xff\xfe\x9c\x001\x000\x00'

This raw string of bytes can be written to a file. However, note that when reading it back, you must know what encoding it is in and decode it using that same encoding.

When writing to files, you can get rid of this manual encode/decode process by using the codecs module. So, to open a file that encodes all Unicode strings into UTF-8, use:

import codecs
f = codecs.open('path/to/file.txt','w','utf8')
f.write(my_unicode_string)  # Stored on disk as UTF-8

Do note that anything else that is using these files must understand what encoding the file is in if they want to read them. If you are the only one doing the reading/writing this isn’t a problem, otherwise make sure that you write in a form understandable by whatever else uses the files.

In Python 3, this form of file access is the default, and the built-in open function will take an encoding parameter and always translate to/from Unicode strings (the default string object in Python 3) for files opened in text mode.


回答 4

这是一个例子:

>>> u = u'€€€'
>>> s = u.encode('utf8')
>>> s
'\xe2\x82\xac\xe2\x82\xac\xe2\x82\xac'

Here is an example:

>>> u = u'€€€'
>>> s = u.encode('utf8')
>>> s
'\xe2\x82\xac\xe2\x82\xac\xe2\x82\xac'

回答 5

好吧,如果您愿意/准备切换到Python 3(可能不是由于与某些Python 2代码的向后不兼容),则不必进行任何转换。Python 3中的所有文本均以Unicode字符串表示,这也意味着不再使用该u'<text>'语法。实际上,您还有字节字符串,用于表示数据(可以是编码字符串)。

http://docs.python.org/3.1/whatsnew/3.0.html#text-vs-data-instead-of-unicode-vs-8位

(当然,如果您当前使用的是Python 3,则问题可能与您尝试将文本保存到文件中有关。)

Well, if you’re willing/ready to switch to Python 3 (which you may not be due to the backwards incompatibility with some Python 2 code), you don’t have to do any converting; all text in Python 3 is represented with Unicode strings, which also means that there’s no more usage of the u'<text>' syntax. You also have what are, in effect, strings of bytes, which are used to represent data (which may be an encoded string).

http://docs.python.org/3.1/whatsnew/3.0.html#text-vs-data-instead-of-unicode-vs-8-bit

(Of course, if you’re currently using Python 3, then the problem is likely something to do with how you’re attempting to save the text to a file.)


回答 6

这是一个示例代码

import unicodedata    
raw_text = u"here $%6757 dfgdfg"
convert_text = unicodedata.normalize('NFKD', raw_text).encode('ascii','ignore')

Here is an example code

import unicodedata    
raw_text = u"here $%6757 dfgdfg"
convert_text = unicodedata.normalize('NFKD', raw_text).encode('ascii','ignore')

回答 7

文件包含Unicode字符串

\"message\": \"\\u0410\\u0432\\u0442\\u043e\\u0437\\u0430\\u0446\\u0438\\u044f .....\",

为了我

 f = open("56ad62-json.log", encoding="utf-8")
 qq=f.readline() 

 print(qq)                          
 {"log":\"message\": \"\\u0410\\u0432\\u0442\\u043e\\u0440\\u0438\\u0437\\u0430\\u0446\\u0438\\u044f \\u043f\\u043e\\u043b\\u044c\\u0437\\u043e\\u0432\\u0430\\u0442\\u0435\\u043b\\u044f\"}

(qq.encode().decode("unicode-escape").encode().decode("unicode-escape")) 
# '{"log":"message": "Авторизация пользователя"}\n'

file contain unicode-esaped string

\"message\": \"\\u0410\\u0432\\u0442\\u043e\\u0437\\u0430\\u0446\\u0438\\u044f .....\",

for me

 f = open("56ad62-json.log", encoding="utf-8")
 qq=f.readline() 

 print(qq)                          
 {"log":\"message\": \"\\u0410\\u0432\\u0442\\u043e\\u0440\\u0438\\u0437\\u0430\\u0446\\u0438\\u044f \\u043f\\u043e\\u043b\\u044c\\u0437\\u043e\\u0432\\u0430\\u0442\\u0435\\u043b\\u044f\"}

(qq.encode().decode("unicode-escape").encode().decode("unicode-escape")) 
# '{"log":"message": "Авторизация пользователя"}\n'

回答 8

对于我的情况,没有答案可用。在这里,我有一个包含unichar字符的字符串变量,在此没有解释的encoding-decode起作用。

如果我在航站楼里

echo "no me llama mucho la atenci\u00f3n"

要么

python3
>>> print("no me llama mucho la atenci\u00f3n")

输出正确:

output: no me llama mucho la atención

但是使用脚本加载此字符串变量无法正常工作。

这是对我的案例起作用的,以防万一:

string_to_convert = "no me llama mucho la atenci\u00f3n"
print(json.dumps(json.loads(r'"%s"' % string_to_convert), ensure_ascii=False))
output: no me llama mucho la atención

No answere worked for my case, where I had a string variable containing unicode chars, and no encode-decode explained here did the work.

If I do in a Terminal

echo "no me llama mucho la atenci\u00f3n"

or

python3
>>> print("no me llama mucho la atenci\u00f3n")

The output is correct:

output: no me llama mucho la atención

But working with scripts loading this string variable didn’t work.

This is what worked on my case, in case helps anybody:

string_to_convert = "no me llama mucho la atenci\u00f3n"
print(json.dumps(json.loads(r'"%s"' % string_to_convert), ensure_ascii=False))
output: no me llama mucho la atención

处理多个Python版本和PIP?

问题:处理多个Python版本和PIP?

有什么方法可以使pip多个版本的Python正常运行吗?例如,我想用于pip将内容显式安装到站点2.5安装或站点2.6安装中。

例如,使用easy_install,我使用easy_install-2.{5,6}

而且,是的-我了解virtualenv,不是-这不是解决此特定问题的方法。

Is there any way to make pip play well with multiple versions of Python? For example, I want to use pip to explicitly install things to either my site 2.5 installation or my site 2.6 installation.

For example, with easy_install, I use easy_install-2.{5,6}.

And, yes — I know about virtualenv, and no — it’s not a solution to this particular problem.


回答 0

目前的建议是使用python -m pip,这里python是Python的版本,你想使用。这是建议,因为它适用于所有版本的Python和所有形式的virtualenv。例如:

# The system default python:
$ python -m pip install fish

# A virtualenv's python:
$ .env/bin/python -m pip install fish

# A specific version of python:
$ python-3.6 -m pip install fish

先前的答案,留给后代:

从0.8版开始,Pip支持pip-{version}。您可以像使用它一样easy_install-{version}

$ pip-2.5 install myfoopackage
$ pip-2.6 install otherpackage
$ pip-2.7 install mybarpackage

编辑:pip更改其架构以使用,pipVERSION而不是pip-VERSION在1.5版中使用。如果有,则应使用以下内容pip >= 1.5

$ pip2.6 install otherpackage
$ pip2.7 install mybarpackage

检查https://github.com/pypa/pip/pull/1053了解更多详细信息


参考文献:

The current recommendation is to use python -m pip, where python is the version of Python you would like to use. This is the recommendation because it works across all versions of Python, and in all forms of virtualenv. For example:

# The system default python:
$ python -m pip install fish

# A virtualenv's python:
$ .env/bin/python -m pip install fish

# A specific version of python:
$ python-3.6 -m pip install fish

Previous answer, left for posterity:

Since version 0.8, Pip supports pip-{version}. You can use it the same as easy_install-{version}:

$ pip-2.5 install myfoopackage
$ pip-2.6 install otherpackage
$ pip-2.7 install mybarpackage

EDIT: pip changed its schema to use pipVERSION instead of pip-VERSION in version 1.5. You should use the following if you have pip >= 1.5:

$ pip2.6 install otherpackage
$ pip2.7 install mybarpackage

Check https://github.com/pypa/pip/pull/1053 for more details


References:


回答 1

在Windows中,您可以执行使用通过给定的Python版本的PIP模块Python的发射器py.exe如果你选择了Python 3的安装过程中安装它。

py -3 -m pip install packagename
py -2 -m pip install packagename

您甚至可以更加具体,并请求确切的Python子版本:

py -3.6 -m pip install packagename

要获取通过启动器可用的所有已安装Python版本的列表,请运行:

py --list

另外,您可以直接启动所需的Python可执行文件:

C:/path/to/specific/python.exe -m pip install packagename

On Windows, you can execute the pip module using a given Python version through the Python launcher, py.exe, if you chose to install it during Python 3 setup.

py -3 -m pip install packagename
py -2 -m pip install packagename

You can be even more specific and request an exact sub-version of Python:

py -3.6 -m pip install packagename

To get a list of all installed Python versions available through the launcher, run:

py --list

Alternatively, you can launch the desired Python executable directly:

C:/path/to/specific/python.exe -m pip install packagename

回答 2

/path/to/python2.{5,6} /path/to/pip install PackageName 不起作用?

为了使它能够在尚未安装pip的任何python版本上运行,您需要下载pip并执行python*version* setup.py install。例如python3.3 setup.py install。这样可以解决注释中的导入错误。(由@hbdgaf建议)

/path/to/python2.{5,6} /path/to/pip install PackageName doesn’t work?

For this to work on any python version that doesn’t have pip already installed you need to download pip and do python*version* setup.py install. For example python3.3 setup.py install. This resolves the import error in the comments. (As suggested by @hbdgaf)


回答 3

我默认情况下安装了python 2.6(Amazon EC2 AMI),但是我的应用程序需要python2.7以及一些外部软件包。假设您已经安装了python2.7和默认的python(在我的例子中是2.6)。这是如何为非默认python2.7安装pip和软件包

为您的python版本安装pip:

curl -O https://bootstrap.pypa.io/get-pip.py
python27 get-pip.py

使用特定的pip版本来安装软件包:

pip2.7 install mysql-connector-python --allow-external mysql-connector-python

I had python 2.6 installed by default (Amazon EC2 AMI), but needed python2.7 plus some external packages for my application. Assuming you already installed python2.7 alongside with default python (2.6 in my case). Here is how to install pip and packages for non-default python2.7

Install pip for your python version:

curl -O https://bootstrap.pypa.io/get-pip.py
python27 get-pip.py

Use specific pip version to install packages:

pip2.7 install mysql-connector-python --allow-external mysql-connector-python

回答 4

它以这种方式在Windows中为我工作:

  1. 我将python文件python.py和pythonw.exe的名称更改为python3.py pythonw3.py

  2. 然后,我在提示符中运行了此命令:

    python3 -m pip install package

It worked for me in windows this way:

  1. I changed the name of python files python.py and pythonw.exe to python3.py pythonw3.py

  2. Then I just ran this command in the prompt:

    python3 -m pip install package


回答 5

其他答案显示了如何在2.X和3.X Python上同时使用pip,但没有显示如何处理多个Python发行版的情况(例如,原始Python和Anaconda Python)

我总共有3个Python版本:原始Python 2.7和Python 3.5以及Anaconda Python 3.5。

这是我将软件包安装到的方法:

  1. 原始Python 3.5

    /usr/bin/python3 -m pip install python-daemon
  2. 原始Python 2.7

    /usr/bin/python -m pip install python-daemon
  3. Anaconda Python 3.5

    python3 -m pip install python-daemon

    要么

    pip3 install python-daemon

    更简单,因为Anaconda在用户环境中覆盖了原始Python二进制文件。

    当然,在anaconda中安装应使用conda命令完成,这只是一个示例。


另外,请确保已为该特定python安装了pip。您可能需要手动安装pip。在Ubuntu 16.04中可以使用:

sudo apt-get install python-pip 

要么

sudo apt-get install python3-pip

Other answers show how to use pip with both 2.X and 3.X Python, but does not show how to handle the case of multiple Python distributions (eg. original Python and Anaconda Python).

I have a total of 3 Python versions: original Python 2.7 and Python 3.5 and Anaconda Python 3.5.

Here is how I install a package into:

  1. Original Python 3.5:

    /usr/bin/python3 -m pip install python-daemon
    
  2. Original Python 2.7:

    /usr/bin/python -m pip install python-daemon
    
  3. Anaconda Python 3.5:

    python3 -m pip install python-daemon
    

    or

    pip3 install python-daemon
    

    Simpler, as Anaconda overrides original Python binaries in user environment.

    Of course, installing in anaconda should be done with conda command, this is just an example.


Also, make sure that pip is installed for that specific python.You might need to manually install pip. This works in Ubuntu 16.04:

sudo apt-get install python-pip 

or

sudo apt-get install python3-pip

回答 6

我本人最近遇到了这个问题,发现在我的同时具有Python 2的Linux系统上,我对Python 3的了解不正确。

首先,您必须确保已为python版本安装了pip:

对于Python 2:

sudo apt-get install python-pip

对于Python 3:

sudo apt-get install python3-pip

然后,要为一个版本的Python或其他版本安装软件包,只需对Python 2使用以下代码:

pip install <package>

或对于Python 3:

pip3 install <package>

I ran into this issue myself recently and found that I wasn’t getting the right pip for Python 3, on my Linux system that also has Python 2.

First you must ensure that you have installed pip for your python version:

For Python 2:

sudo apt-get install python-pip

For Python 3:

sudo apt-get install python3-pip

Then to install packages for one version of Python or the other, simply use the following for Python 2:

pip install <package>

or for Python 3:

pip3 install <package>

回答 7

pip也是python包。因此,将模块安装到特定python版本的最简单方法如下

 python2.7 /usr/bin/pip install foo

要么

python2.7 -m pip install foo

pip is also a python package. So the easiest way to install modules to a specific python version would be below

 python2.7 /usr/bin/pip install foo

or

python2.7 -m pip install foo

回答 8

因此很明显,有多个版本easy_install pip。这似乎是一个大混乱。无论如何,这就是我在Ubuntu 12.10上为Python 2.7安装Django的目的:

$ sudo easy_install-2.7 pip
Searching for pip
Best match: pip 1.1
Adding pip 1.1 to easy-install.pth file
Installing pip-2.7 script to /usr/local/bin

Using /usr/lib/python2.7/dist-packages
Processing dependencies for pip
Finished processing dependencies for pip

$ sudo pip-2.7 install django
Downloading/unpacking django
  Downloading Django-1.5.1.tar.gz (8.0Mb): 8.0Mb downloaded
  Running setup.py egg_info for package django

    warning: no previously-included files matching '__pycache__' found under directory '*'
    warning: no previously-included files matching '*.py[co]' found under directory '*'
Installing collected packages: django
  Running setup.py install for django
    changing mode of build/scripts-2.7/django-admin.py from 644 to 755

    warning: no previously-included files matching '__pycache__' found under directory '*'
    warning: no previously-included files matching '*.py[co]' found under directory '*'
    changing mode of /usr/local/bin/django-admin.py to 755
Successfully installed django
Cleaning up...

$ python
Python 2.7.3 (default, Sep 26 2012, 21:51:14) 
[GCC 4.7.2] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import django
>>> 

So apparently there are multiple versions of easy_install and pip. It seems to be a big mess. Anyway, this is what I did to install Django for Python 2.7 on Ubuntu 12.10:

$ sudo easy_install-2.7 pip
Searching for pip
Best match: pip 1.1
Adding pip 1.1 to easy-install.pth file
Installing pip-2.7 script to /usr/local/bin

Using /usr/lib/python2.7/dist-packages
Processing dependencies for pip
Finished processing dependencies for pip

$ sudo pip-2.7 install django
Downloading/unpacking django
  Downloading Django-1.5.1.tar.gz (8.0Mb): 8.0Mb downloaded
  Running setup.py egg_info for package django

    warning: no previously-included files matching '__pycache__' found under directory '*'
    warning: no previously-included files matching '*.py[co]' found under directory '*'
Installing collected packages: django
  Running setup.py install for django
    changing mode of build/scripts-2.7/django-admin.py from 644 to 755

    warning: no previously-included files matching '__pycache__' found under directory '*'
    warning: no previously-included files matching '*.py[co]' found under directory '*'
    changing mode of /usr/local/bin/django-admin.py to 755
Successfully installed django
Cleaning up...

$ python
Python 2.7.3 (default, Sep 26 2012, 21:51:14) 
[GCC 4.7.2] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import django
>>> 

回答 9

从这里:https : //docs.python.org/3/installing/

这是如何为同时安装linux,mac,posix的各种版本安装软件包的方法:

python2   -m pip install SomePackage  # default Python 2
python2.7 -m pip install SomePackage  # specifically Python 2.7
python3   -m pip install SomePackage  # default Python 3
python3.4 -m pip install SomePackage  # specifically Python 3.4
python3.5 -m pip install SomePackage  # specifically Python 3.5
python3.6 -m pip install SomePackage  # specifically Python 3.6

Windows上,将py Python启动器与-m开关结合使用:

py -2   -m pip install SomePackage  # default Python 2
py -2.7 -m pip install SomePackage  # specifically Python 2.7
py -3   -m pip install SomePackage  # default Python 3
py -3.4 -m pip install SomePackage  # specifically Python 3.4

From here: https://docs.python.org/3/installing/

Here is how to install packages for various versions that are installed at the same time linux, mac, posix:

python2   -m pip install SomePackage  # default Python 2
python2.7 -m pip install SomePackage  # specifically Python 2.7
python3   -m pip install SomePackage  # default Python 3
python3.4 -m pip install SomePackage  # specifically Python 3.4
python3.5 -m pip install SomePackage  # specifically Python 3.5
python3.6 -m pip install SomePackage  # specifically Python 3.6

On Windows, use the py Python launcher in combination with the -m switch:

py -2   -m pip install SomePackage  # default Python 2
py -2.7 -m pip install SomePackage  # specifically Python 2.7
py -3   -m pip install SomePackage  # default Python 3
py -3.4 -m pip install SomePackage  # specifically Python 3.4

回答 10

在Linux,Mac OS X和其他POSIX系统上,结合使用版本化的Python命令和-m开关,以运行以下命令的适当副本pip

python2.7 -m pip install SomePackage
python3.4 -m pip install SomePackage

(也可能提供适当版本的pip命令)

在Windows上,将pyPython启动器与-m开关结合使用:

py -2.7 -m pip install SomePackage  # specifically Python 2.7
py -3.4 -m pip install SomePackage  # specifically Python 3.4

如果遇到错误,请py -3.4尝试:

pip install SomePackage

On Linux, Mac OS X and other POSIX systems, use the versioned Python commands in combination with the -m switch to run the appropriate copy of pip:

python2.7 -m pip install SomePackage
python3.4 -m pip install SomePackage

(appropriately versioned pip commands may also be available)

On Windows, use the py Python launcher in combination with the -m switch:

py -2.7 -m pip install SomePackage  # specifically Python 2.7
py -3.4 -m pip install SomePackage  # specifically Python 3.4

if you get an error for py -3.4 then try:

pip install SomePackage

回答 11

安装多个版本的Python和相应的软件包。

同一Windows机器上的Python版本:2.7,3.4和3.6

安装所有3个版本的Python

  • 使用以下路径安装了Python 2.7、3.4和3.6

在此处输入图片说明

所有3个版本的Python的PATH

  • 确保PATH变量(在System Variables中)包含以下路径-C:\ Python27 \; C:\ Python27 \ Scripts; C:\ Python34 \; C:\ Python34 \ Scripts; C:\ Python36 \; C:\ Python36 \ Scripts \;

重命名版本的可执行文件

  • 将C:\ Python36和C:\ Python34中的python可执行文件名称分别更改为python36和python34。

在此处输入图片说明

检查所有版本的命令提示符:

在此处输入图片说明

为每个版本分别安装软件包

在此处输入图片说明

Installation of multiple versions of Python and respective Packages.

Python version on the same windows machine : 2.7 , 3.4 and 3.6

Installation of all 3 versions of Python :

  • Installed the Python 2.7 , 3.4 and 3.6 with the below paths

enter image description here

PATH for all 3 versions of Python :

  • Made sure the PATH variable ( in System Variables ) has below paths included – C:\Python27\;C:\Python27\Scripts;C:\Python34\;C:\Python34\Scripts;C:\Python36\;C:\Python36\Scripts\;

Renaming the executables for versions :

  • Changed the python executable name in C:\Python36 and C:\Python34 to python36 and python34 respectively.

enter image description here

Checked for the command prompt with all versions :

enter image description here

Installing the packages separately for each version

enter image description here


回答 12

如果您有多个版本以及多个体系结构(32位,64位),则需要在版本末尾添加-32或-64。

对于Windows,请转到cmd并键入py –list,它将生成您已安装的版本。该列表将如下所示:

Installed Pythons found by py Launcher for Windows
 -3.7-64 *
 -3.7-32
 -3.6-32

以完整命令为例:

py -3.6-32 -m pip install (package)

如果您想更深入,要在特定版本的python上安装特定版本的软件包,请在该软件包之后使用==(version)。举个例子,

py -3.6-32 -m pip install opencv-python==4.1.0.25

If you have multiple versions as well as multiple architectures (32 bit, 64 bit) you will need to add a -32 or -64 at the end of your version.

For windows, go to cmd and type py –list and it will produce the versions you have installed. The list will look like the following:

Installed Pythons found by py Launcher for Windows
 -3.7-64 *
 -3.7-32
 -3.6-32

The full command as an example will be:

py -3.6-32 -m pip install (package)

If you want to get more indepth, to install a specific version of a package on a specific version of python, use ==(version) after the package. As an example,

py -3.6-32 -m pip install opencv-python==4.1.0.25

回答 13

这里的大多数答案都解决了这个问题,但是我想添加一些使我/usr/local在CentOS 7上的python替代安装中不断困惑的东西。当我安装在那里时,由于可以使用pip2.7 install和它会安装模块。但是,我不知道是为什么我新安装的python版本没有看到我要安装的内容。

事实证明,在CentOS 7中,该/usr/bin文件夹中已经有一个python2.7和pip2.7 。要为新的python发行版安装pip,您需要专门告诉sudo转到/usr/local/bin

sudo /usr/local/bin/python2.7 -m ensurepip

这应该在您的/usr/local/bin文件夹中安装了pip2.7 以及您的python版本。诀窍是,当您要安装模块时,您需要修改sudo $PATH变量以包含它,/usr/local/bin或者需要执行

sudo /usr/local/bin/pip2.7 install <module>

如果要安装新模块。我花了一辈子的时间才记住那sudo不是立即看到的/usr/local/bin

Most of the answers here address the issue but I want to add something what was continually confusing me with regard to creating an alternate installation of python in the /usr/local on CentOS 7. When I installed there, it appeared like pip was working since I could use pip2.7 install and it would install modules. However, what I couldn’t figure out was why my newly installed version of python wasn’t seeing what I was installing.

It turns out in CentOS 7 that there is already a python2.7 and a pip2.7 in the /usr/bin folder. To install pip for your new python distribution, you need to specifically tell sudo to go to /usr/local/bin

sudo /usr/local/bin/python2.7 -m ensurepip

This should get pip2.7 installed in your /usr/local/bin folder along with your version of python. The trick is that when you want to install modules, you either need to modify the sudo $PATH variable to include /usr/local/bin or you need to execute

sudo /usr/local/bin/pip2.7 install <module>

if you want to install a new module. It took me forever to remember that sudo wasn’t immediately seeing /usr/local/bin.


回答 14

这是我对这个问题的看法。适用于Python3。主要特点是:

  • 每个Python版本均从源代码编译
  • 所有版本均在本地安装
  • 不会以任何方式破坏系统的默认Python安装
  • 每个Python版本都通过virtualenv隔离

步骤如下:

  1. 如果您以其他方式安装了多个额外的python版本,请摆脱它们,例如,删除$ HOME / .local / lib / python3.x等(以及全局安装的版本)。但是不要触摸系统的默认python3版本。

  2. 在以下目录结构下的不同python版本的下载源:

    $HOME/
        python_versions/ : download Python-*.tgz packages here and "tar xvf" them.  You'll get directories like this:
          Python-3.4.8/
          Python-3.6.5/
          Python-3.x.y/
          ...
  3. 在每个“ Python-3.xy /”目录中,执行以下操作(在任何步骤中都不要使用“ sudo”!):

    mkdir root
    ./configure --prefix=$PWD/root 
    make -j 2
    make install
    virtualenv --no-site-packages -p root/bin/python3.x env
  4. 在“ python_versions /”处创建如下文件:

    env_python3x.bash:
    
    #!/bin/bash
    echo "type deactivate to exit"
    source $HOME/python_versions/Python-3.x.y/env/bin/activate
  5. 现在,无论何时您希望选择python3.x,都可以

    source $HOME/python_versions/env_python3x.bash

    进入virtualenv

  6. 在virtualenv中,使用以下命令安装您喜欢的python软件包:

    pip install --upgrade package_name
  7. 要退出virtualenv和python版本,只需键入“ deactivate”

更新

似乎--no-site-packages已弃用。有一个简单的解决方法:激活virtualenv后,只需将HOME env变量指向实际主目录之外的其他位置即可,即:

export HOME=some/where/else

通常,执行此操作的一种好方法是:

  • 创建virtualenv
  • 激活virtualenv
  • 如果要将现有库“回收”到virtualenv,请从现有安装中将它们进行软链接,即 ln -s $HOME/.local/lib/python3.6/site-packages/numpy $PWD/venv/lib/python3.6/site-packages/
  • export PYTHONPATH=export HOME=/some/other/dir

现在,您应该具有自定义隔离的virtualenv。

Here is my take on the problem. Works for Python3. The main features are:

  • Each Python version is compiled from source
  • All versions are installed locally
  • Does not mangle your system’s default Python installation in any way
  • Each Python version is isolated with virtualenv

The steps are as follows:

  1. If you have several extra python versions installed in some other way, get rid of them, e.g., remove $HOME/.local/lib/python3.x, etc. (also the globally installed ones). Don’t touch your system’s default python3 version though.

  2. Download source for different python versions under the following directory structure:

    $HOME/
        python_versions/ : download Python-*.tgz packages here and "tar xvf" them.  You'll get directories like this:
          Python-3.4.8/
          Python-3.6.5/
          Python-3.x.y/
          ...
    
  3. At each “Python-3.x.y/” directory, do the following (do NOT use “sudo” in any of the steps!):

    mkdir root
    ./configure --prefix=$PWD/root 
    make -j 2
    make install
    virtualenv --no-site-packages -p root/bin/python3.x env
    
  4. At “python_versions/” create files like this:

    env_python3x.bash:
    
    #!/bin/bash
    echo "type deactivate to exit"
    source $HOME/python_versions/Python-3.x.y/env/bin/activate
    
  5. Now, anytime you wish to opt for python3.x, do

    source $HOME/python_versions/env_python3x.bash
    

    to enter the virtualenv

  6. While in the virtualenv, install your favorite python packages with

    pip install --upgrade package_name
    
  7. To exit the virtualenv and python version just type “deactivate”

UPDATE

It seems that --no-site-packages is deprecated. There’s an easy fix for this: Once you have activated the virtualenv, just point the HOME env variable to somewhere else than your actual home directory, i.e.:

export HOME=some/where/else

A nice way to do this in general is:

  • Create virtualenv
  • Activate virtualenv
  • If you want to “recycle” existing libraries to your virtualenv, softlink them from your existing install, i.e. ln -s $HOME/.local/lib/python3.6/site-packages/numpy $PWD/venv/lib/python3.6/site-packages/
  • Do export PYTHONPATH=, export HOME=/some/other/dir

Now you should have custom-isolated virtualenv.


回答 15

上下文:Archlinux

行动:
安装python2-pip:
sudo pacman -S python2-pip

您现在有了pip2.7:
sudo pip2.7 install boto

测试(在我的情况下,我需要’boto’):
运行以下命令:

python2
import boto

成功:没有错误。

出口:Ctrl+D

Context: Archlinux

Action:
Install python2-pip:
sudo pacman -S python2-pip

You now have pip2.7:
sudo pip2.7 install boto

Test (in my case I needed ‘boto’):
Run the following commands:

python2
import boto

Success: No error.

Exit: Ctrl+D


回答 16

例如,如果您将其他版本(例如3.5)设置为默认版本,并想为python 2.7安装pip:

  1. https://pypi.python.org/pypi/pip(tar)下载pip
  2. 解压tar文件
  3. cd到文件目录
  4. sudo python2.7 setup.py安装

for example, if you set other versions (e.g. 3.5) as default and want to install pip for python 2.7:

  1. download pip at https://pypi.python.org/pypi/pip (tar)
  2. unzip tar file
  3. cd to the file’s directory
  4. sudo python2.7 setup.py install

回答 17

您可以转到例如C:\ Python2.7 \ Scripts,然后从该路径运行cmd。之后,您可以运行pip2.7安装软件包…

这将为该版本的Python安装软件包。

You can go to for example C:\Python2.7\Scripts and then run cmd from that path. After that you can run pip2.7 install yourpackage…

That will install package for that version of Python.


回答 18

这可能是完全错误的操作(我是python noob),但是我只是进去编辑了pip文件

#!/usr/bin/env python3 <-- I changed this line.

# -*- coding: utf-8 -*-
import re
import sys

from pip._internal import main

if __name__ == '__main__':
    sys.argv[0] = re.sub(r'(-script\.pyw?|\.exe)?$', '', sys.argv[0])
    sys.exit(main())

This is probably the completely wrong thing to do (I’m a python noob), but I just went in and edited the pip file

#!/usr/bin/env python3 <-- I changed this line.

# -*- coding: utf-8 -*-
import re
import sys

from pip._internal import main

if __name__ == '__main__':
    sys.argv[0] = re.sub(r'(-script\.pyw?|\.exe)?$', '', sys.argv[0])
    sys.exit(main())

回答 19

专门针对Windows: \ path \ to \ python.exe -m pip install PackageName有效。

For windows specifically: \path\to\python.exe -m pip install PackageName works.


回答 20

对于搅拌机:

/usr/bin $ python3.7 -m pip install irc

for Blender:

/usr/bin $ python3.7 -m pip install irc