I have a Python script which takes as input a list of integers, which I need to work with four integers at a time. Unfortunately, I don’t have control of the input, or I’d have it passed in as a list of four-element tuples. Currently, I’m iterating over it this way:
for i in xrange(0, len(ints), 4):
# dummy op for example code
foo += ints[i] * ints[i + 1] + ints[i + 2] * ints[i + 3]
It looks a lot like “C-think”, though, which makes me suspect there’s a more pythonic way of dealing with this situation. The list is discarded after iterating, so it needn’t be preserved. Perhaps something like this would be better?
from itertools import zip_longest
def grouper(iterable, n, fillvalue=None):
args = [iter(iterable)] * n
return zip_longest(*args, fillvalue=fillvalue)
Example
In pseudocode to keep the example terse.
grouper('ABCDEFG', 3, 'x') --> 'ABC' 'DEF' 'Gxx'
Note: on Python 2 use izip_longest instead of zip_longest.
回答 1
def chunker(seq, size):return(seq[pos:pos + size]for pos in range(0, len(seq), size))# (in python 2 use xrange() instead of range() to avoid allocating a list)
简单。简单。快速。适用于任何序列:
text ="I am a very, very helpful text"for group in chunker(text,7):print repr(group),# 'I am a ' 'very, v' 'ery hel' 'pful te' 'xt'print'|'.join(chunker(text,10))# I am a ver|y, very he|lpful text
animals =['cat','dog','rabbit','duck','bird','cow','gnu','fish']for group in chunker(animals,3):print group
# ['cat', 'dog', 'rabbit']# ['duck', 'bird', 'cow']# ['gnu', 'fish']
def chunker(seq, size):
return (seq[pos:pos + size] for pos in range(0, len(seq), size))
# (in python 2 use xrange() instead of range() to avoid allocating a list)
Simple. Easy. Fast. Works with any sequence:
text = "I am a very, very helpful text"
for group in chunker(text, 7):
print repr(group),
# 'I am a ' 'very, v' 'ery hel' 'pful te' 'xt'
print '|'.join(chunker(text, 10))
# I am a ver|y, very he|lpful text
animals = ['cat', 'dog', 'rabbit', 'duck', 'bird', 'cow', 'gnu', 'fish']
for group in chunker(animals, 3):
print group
# ['cat', 'dog', 'rabbit']
# ['duck', 'bird', 'cow']
# ['gnu', 'fish']
回答 2
我是的粉丝
chunk_size=4for i in range(0, len(ints), chunk_size):
chunk = ints[i:i+chunk_size]# process chunk of size <= chunk_size
chunk_size= 4
for i in range(0, len(ints), chunk_size):
chunk = ints[i:i+chunk_size]
# process chunk of size <= chunk_size
回答 3
import itertools
def chunks(iterable,size):
it = iter(iterable)
chunk = tuple(itertools.islice(it,size))while chunk:yield chunk
chunk = tuple(itertools.islice(it,size))# though this will throw ValueError if the length of ints# isn't a multiple of four:for x1,x2,x3,x4 in chunks(ints,4):
foo += x1 + x2 + x3 + x4
for chunk in chunks(ints,4):
foo += sum(chunk)
另一种方式:
import itertools
def chunks2(iterable,size,filler=None):
it = itertools.chain(iterable,itertools.repeat(filler,size-1))
chunk = tuple(itertools.islice(it,size))while len(chunk)== size:yield chunk
chunk = tuple(itertools.islice(it,size))# x2, x3 and x4 could get the value 0 if the length is not# a multiple of 4.for x1,x2,x3,x4 in chunks2(ints,4,0):
foo += x1 + x2 + x3 + x4
import itertools
def chunks(iterable,size):
it = iter(iterable)
chunk = tuple(itertools.islice(it,size))
while chunk:
yield chunk
chunk = tuple(itertools.islice(it,size))
# though this will throw ValueError if the length of ints
# isn't a multiple of four:
for x1,x2,x3,x4 in chunks(ints,4):
foo += x1 + x2 + x3 + x4
for chunk in chunks(ints,4):
foo += sum(chunk)
Another way:
import itertools
def chunks2(iterable,size,filler=None):
it = itertools.chain(iterable,itertools.repeat(filler,size-1))
chunk = tuple(itertools.islice(it,size))
while len(chunk) == size:
yield chunk
chunk = tuple(itertools.islice(it,size))
# x2, x3 and x4 could get the value 0 if the length is not
# a multiple of 4.
for x1,x2,x3,x4 in chunks2(ints,4,0):
foo += x1 + x2 + x3 + x4
回答 4
from itertools import izip_longest
def chunker(iterable, chunksize, filler):return izip_longest(*[iter(iterable)]*chunksize, fillvalue=filler)
def grouper(size, iterable):
i = iter(iterable)whileTrue:
out =[]try:for _ in range(size):
out.append(i.next())exceptStopIteration:yield out
breakyield out
简单但很慢:每个循环693 us
我可以想出的最佳解决方案islice用于内部循环:
def grouper(size, iterable):
it = iter(iterable)whileTrue:
group = tuple(itertools.islice(it,None, size))ifnot group:breakyield group
def grouper(n, iterable, fillvalue=None):#"grouper(3, 'ABCDEFG', 'x') --> ABC DEF Gxx"
args =[iter(iterable)]* n
for i in itertools.izip_longest(fillvalue=fillvalue,*args):if tuple(i)[-1]== fillvalue:yield tuple(v for v in i if v != fillvalue)else:yield i
Using ipython’s %timeit on my mac book air, I get 47.5 us per loop.
However, this really doesn’t work for me since the results are padded to be even sized groups. A solution without the padding is slightly more complicated. The most naive solution might be:
def grouper(size, iterable):
i = iter(iterable)
while True:
out = []
try:
for _ in range(size):
out.append(i.next())
except StopIteration:
yield out
break
yield out
Simple, but pretty slow: 693 us per loop
The best solution I could come up with uses islice for the inner loop:
def grouper(size, iterable):
it = iter(iterable)
while True:
group = tuple(itertools.islice(it, None, size))
if not group:
break
yield group
With the same dataset, I get 305 us per loop.
Unable to get a pure solution any faster than that, I provide the following solution with an important caveat: If your input data has instances of filldata in it, you could get wrong answer.
def grouper(n, iterable, fillvalue=None):
#"grouper(3, 'ABCDEFG', 'x') --> ABC DEF Gxx"
args = [iter(iterable)] * n
for i in itertools.izip_longest(fillvalue=fillvalue, *args):
if tuple(i)[-1] == fillvalue:
yield tuple(v for v in i if v != fillvalue)
else:
yield i
I really don’t like this answer, but it is significantly faster. 124 us per loop
回答 6
我需要一个可以与集合和生成器一起使用的解决方案。我无法提出任何简短而又漂亮的内容,但至少可以理解。
def chunker(seq, size):
res =[]for el in seq:
res.append(el)if len(res)== size:yield res
res =[]if res:yield res
清单:
>>> list(chunker([i for i in range(10)],3))[[0,1,2],[3,4,5],[6,7,8],[9]]
组:
>>> list(chunker(set([i for i in range(10)]),3))[[0,1,2],[3,4,5],[6,7,8],[9]]
生成器:
>>> list(chunker((i for i in range(10)),3))[[0,1,2],[3,4,5],[6,7,8],[9]]
I needed a solution that would also work with sets and generators. I couldn’t come up with anything very short and pretty, but it’s quite readable at least.
def chunker(seq, size):
res = []
for el in seq:
res.append(el)
if len(res) == size:
yield res
res = []
if res:
yield res
List:
>>> list(chunker([i for i in range(10)], 3))
[[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]
Set:
>>> list(chunker(set([i for i in range(10)]), 3))
[[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]
Generator:
>>> list(chunker((i for i in range(10)), 3))
[[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]
回答 7
与其他建议类似,但不完全相同,我喜欢这样做,因为它简单易读:
it = iter([1,2,3,4,5,6,7,8,9])for chunk in zip(it, it, it, it):print chunk
>>>(1,2,3,4)>>>(5,6,7,8)
In case the length isn’t a multiple of the groupsize it also supports filling (the incomplete last group) or truncating (discarding the incomplete last group) the last one:
from iteration_utilities import grouper
seq = list(range(17))
for group in grouper(seq, 4):
print(group)
# (0, 1, 2, 3)
# (4, 5, 6, 7)
# (8, 9, 10, 11)
# (12, 13, 14, 15)
# (16,)
for group in grouper(seq, 4, fillvalue=None):
print(group)
# (0, 1, 2, 3)
# (4, 5, 6, 7)
# (8, 9, 10, 11)
# (12, 13, 14, 15)
# (16, None, None, None)
for group in grouper(seq, 4, truncate=True):
print(group)
# (0, 1, 2, 3)
# (4, 5, 6, 7)
# (8, 9, 10, 11)
# (12, 13, 14, 15)
Benchmarks
I also decided to compare the run-time of a few of the mentioned approaches. It’s a log-log plot grouping into groups of “10” elements based on a list of varying size. For qualitative results: Lower means faster:
At least in this benchmark the iteration_utilities.grouper performs best. Followed by the approach of Craz.
The benchmark was created with simple_benchmark1. The code used to run this benchmark was:
import iteration_utilities
import itertools
from itertools import zip_longest
def consume_all(it):
return iteration_utilities.consume(it, None)
import simple_benchmark
b = simple_benchmark.BenchmarkBuilder()
@b.add_function()
def grouper(l, n):
return consume_all(iteration_utilities.grouper(l, n))
def Craz_inner(iterable, n, fillvalue=None):
args = [iter(iterable)] * n
return zip_longest(*args, fillvalue=fillvalue)
@b.add_function()
def Craz(iterable, n, fillvalue=None):
return consume_all(Craz_inner(iterable, n, fillvalue))
def nosklo_inner(seq, size):
return (seq[pos:pos + size] for pos in range(0, len(seq), size))
@b.add_function()
def nosklo(seq, size):
return consume_all(nosklo_inner(seq, size))
def SLott_inner(ints, chunk_size):
for i in range(0, len(ints), chunk_size):
yield ints[i:i+chunk_size]
@b.add_function()
def SLott(ints, chunk_size):
return consume_all(SLott_inner(ints, chunk_size))
def MarkusJarderot1_inner(iterable,size):
it = iter(iterable)
chunk = tuple(itertools.islice(it,size))
while chunk:
yield chunk
chunk = tuple(itertools.islice(it,size))
@b.add_function()
def MarkusJarderot1(iterable,size):
return consume_all(MarkusJarderot1_inner(iterable,size))
def MarkusJarderot2_inner(iterable,size,filler=None):
it = itertools.chain(iterable,itertools.repeat(filler,size-1))
chunk = tuple(itertools.islice(it,size))
while len(chunk) == size:
yield chunk
chunk = tuple(itertools.islice(it,size))
@b.add_function()
def MarkusJarderot2(iterable,size):
return consume_all(MarkusJarderot2_inner(iterable,size))
@b.add_arguments()
def argument_provider():
for exp in range(2, 20):
size = 2**exp
yield size, simple_benchmark.MultiArgument([[0] * size, 10])
r = b.run()
1 Disclaimer: I’m the author of the libraries iteration_utilities and simple_benchmark.
_no_pad = object()
def group(it, size, pad=_no_pad):
if pad == _no_pad:
it = iter(it)
sentinel = ()
else:
it = chain(iter(it), repeat(pad))
sentinel = (pad,) * size
return iter(lambda: tuple(islice(it, size)), sentinel)
回答 12
如果列表很大,执行此操作的最高性能方法是使用生成器:
def get_chunk(iterable, chunk_size):
result =[]for item in iterable:
result.append(item)if len(result)== chunk_size:yield tuple(result)
result =[]if len(result)>0:yield tuple(result)for x in get_chunk([1,2,3,4,5,6,7,8,9,10],3):print x
(1,2,3)(4,5,6)(7,8,9)(10,)
If the list is large, the highest-performing way to do this will be to use a generator:
def get_chunk(iterable, chunk_size):
result = []
for item in iterable:
result.append(item)
if len(result) == chunk_size:
yield tuple(result)
result = []
if len(result) > 0:
yield tuple(result)
for x in get_chunk([1,2,3,4,5,6,7,8,9,10], 3):
print x
(1, 2, 3)
(4, 5, 6)
(7, 8, 9)
(10,)
回答 13
使用小功能和事情确实对我没有吸引力。我更喜欢只使用切片:
data =[...]
chunk_size =10000# or whatever
chunks =[data[i:i+chunk_size]for i in xrange(0,len(data),chunk_size)]for chunk in chunks:...
for k,g in itertools.groupby(xrange(35),lambda x: x/10):for i in g:# do what you need to do with individual items# now do what you need to do with the whole group
我对此的特别兴趣是需要消耗一个生成器才能将最多1000个更改批量提交给gmail API:
messages = a_generator_which_would_not_be_smart_as_a_list
for idx, batch in groupby(messages,lambda x: x/1000):
batch_request =BatchHttpRequest()for message in batch:
batch_request.add(self.service.users().messages().modify(userId='me', id=message['id'], body=msg_labels))
http = httplib2.Http()
self.credentials.authorize(http)
batch_request.execute(http=http)
I checked groupby and it doesn’t convert to list or use len so I (think) this will delay resolution of each value until it is actually used. Sadly none of the available answers (at this time) seemed to offer this variation.
Obviously if you need to handle each item in turn nest a for loop over g:
for k,g in itertools.groupby(xrange(35), lambda x: x/10):
for i in g:
# do what you need to do with individual items
# now do what you need to do with the whole group
My specific interest in this was the need to consume a generator to submit changes in batches of up to 1000 to the gmail API:
messages = a_generator_which_would_not_be_smart_as_a_list
for idx, batch in groupby(messages, lambda x: x/1000):
batch_request = BatchHttpRequest()
for message in batch:
batch_request.add(self.service.users().messages().modify(userId='me', id=message['id'], body=msg_labels))
http = httplib2.Http()
self.credentials.authorize(http)
batch_request.execute(http=http)
回答 15
使用NumPy很简单:
ints = array([1,2,3,4,5,6,7,8])for int1, int2 in ints.reshape(-1,2):print(int1, int2)
def chunks(it, n, m):"""Make an iterator over m first chunks of size n.
"""
it = iter(it)# Chunks are presented as tuples.return(tuple(next(it)for _ in range(n))for _ in range(m))
Unless I misses something, the following simple solution with generator expressions has not been mentioned. It assumes that both the size and the number of chunks are known (which is often the case), and that no padding is required:
def chunks(it, n, m):
"""Make an iterator over m first chunks of size n.
"""
it = iter(it)
# Chunks are presented as tuples.
return (tuple(next(it) for _ in range(n)) for _ in range(m))
def chunkiter(iterable, size):def inneriter(first, iterator, size):yield first
for _ in xrange(size -1):yield iterator.next()
it = iter(iterable)whileTrue:yield inneriter(it.next(), it, size)In[2]: i = chunkiter('abcdefgh',3)In[3]:for ii in i:for c in ii:print c,print''...:
a b c
d e f
g h
更新:
由于内循环和外循环从同一个迭代器中提取值而造成的一些弊端:
1)继续无法在外循环中按预期方式工作-继续执行下一个项目而不是跳过一个块。但是,这似乎不是问题,因为在外循环中没有要测试的东西。
2)break不能在内部循环中按预期方式工作-控件将在迭代器中的下一个项目中再次进入内部循环。要跳过整个块,可以将内部迭代器(上面的ii)包装在一个元组中,例如for c in tuple(ii),或者设置一个标志并耗尽迭代器。
1) Easily understandable
2) Works on any iterable, not just sequences (some of the above answers will choke on filehandles)
3) Does not load the chunk into memory all at once
4) Does not make a chunk-long list of references to the same iterator in memory
5) No padding of fill values at the end of the list
That being said, I haven’t timed it so it might be slower than some of the more clever methods, and some of the advantages may be irrelevant given the use case.
def chunkiter(iterable, size):
def inneriter(first, iterator, size):
yield first
for _ in xrange(size - 1):
yield iterator.next()
it = iter(iterable)
while True:
yield inneriter(it.next(), it, size)
In [2]: i = chunkiter('abcdefgh', 3)
In [3]: for ii in i:
for c in ii:
print c,
print ''
...:
a b c
d e f
g h
Update:
A couple of drawbacks due to the fact the inner and outer loops are pulling values from the same iterator:
1) continue doesn’t work as expected in the outer loop – it just continues on to the next item rather than skipping a chunk. However, this doesn’t seem like a problem as there’s nothing to test in the outer loop.
2) break doesn’t work as expected in the inner loop – control will wind up in the inner loop again with the next item in the iterator. To skip whole chunks, either wrap the inner iterator (ii above) in a tuple, e.g. for c in tuple(ii), or set a flag and exhaust the iterator.
回答 20
def group_by(iterable, size):"""Group an iterable into lists that don't exceed the size given.
>>> group_by([1,2,3,4,5], 2)
[[1, 2], [3, 4], [5]]
"""
sublist =[]for index, item in enumerate(iterable):if index >0and index % size ==0:yield sublist
sublist =[]
sublist.append(item)if sublist:yield sublist
def group_by(iterable, size):
"""Group an iterable into lists that don't exceed the size given.
>>> group_by([1,2,3,4,5], 2)
[[1, 2], [3, 4], [5]]
"""
sublist = []
for index, item in enumerate(iterable):
if index > 0 and index % size == 0:
yield sublist
sublist = []
sublist.append(item)
if sublist:
yield sublist
classIteratorExhausted(Exception):passdef translate_StopIteration(iterable, to=IteratorExhausted):for i in iterable:yield i
raise to # StopIteration would get ignored because this is generator,# but custom exception can leave the generator.def custom_zip(*iterables, reductor=tuple):
iterators = tuple(map(translate_StopIteration, iterables))whileTrue:try:yield reductor(next(i)for i in iterators)exceptIteratorExhausted:# when any of iterators get exhausted.break
It’s clever, but has one disadvantage – always return tuple. How to get string instead?
Of course you can write ''.join(chunker(...)), but the temporary tuple is constructed anyway.
You can get rid of the temporary tuple by writing own zip, like this:
class IteratorExhausted(Exception):
pass
def translate_StopIteration(iterable, to=IteratorExhausted):
for i in iterable:
yield i
raise to # StopIteration would get ignored because this is generator,
# but custom exception can leave the generator.
def custom_zip(*iterables, reductor=tuple):
iterators = tuple(map(translate_StopIteration, iterables))
while True:
try:
yield reductor(next(i) for i in iterators)
except IteratorExhausted: # when any of iterators get exhausted.
break
>>> for i in chunker('12345', 2):
... print(repr(i))
...
('1', '2')
('3', '4')
>>> for i in chunker('12345', 2, ''.join):
... print(repr(i))
...
'12'
'34'
回答 23
我喜欢这种方法。它感觉简单而不是魔术,并且支持所有可迭代的类型,并且不需要导入。
def chunk_iter(iterable, chunk_size):
it = iter(iterable)whileTrue:
chunk = tuple(next(it)for _ in range(chunk_size))ifnot chunk:breakyield chunk
I like this approach. It feels simple and not magical and supports all iterable types and doesn’t require imports.
def chunk_iter(iterable, chunk_size):
it = iter(iterable)
while True:
chunk = tuple(next(it) for _ in range(chunk_size))
if not chunk:
break
yield chunk
I never want my chunks padded, so that requirement is essential. I find that the ability to work on any iterable is also requirement. Given that, I decided to extend on the accepted answer, https://stackoverflow.com/a/434411/1074659.
Performance takes a slight hit in this approach if padding is not wanted due to the need to compare and filter the padded values. However, for large chunk sizes, this utility is very performant.
#!/usr/bin/env python3
from itertools import zip_longest
_UNDEFINED = object()
def chunker(iterable, chunksize, fillvalue=_UNDEFINED):
"""
Collect data into chunks and optionally pad it.
Performance worsens as `chunksize` approaches 1.
Inspired by:
https://docs.python.org/3/library/itertools.html#itertools-recipes
"""
args = [iter(iterable)] * chunksize
chunks = zip_longest(*args, fillvalue=fillvalue)
yield from (
filter(lambda val: val is not _UNDEFINED, chunk)
if chunk[-1] is _UNDEFINED
else chunk
for chunk in chunks
) if fillvalue is _UNDEFINED else chunks
回答 25
这是一个没有导入功能的分块器,它支持生成器:
def chunks(seq, size):
it = iter(seq)whileTrue:
ret = tuple(next(it)for _ in range(size))if len(ret)== size:yield ret
else:raiseStopIteration()
使用示例:
>>>def foo():... i =0...whileTrue:... i +=1...yield i
...>>> c = chunks(foo(),3)>>> c.next()(1,2,3)>>> c.next()(4,5,6)>>> list(chunks('abcdefg',2))[('a','b'),('c','d'),('e','f')]
Here is a chunker without imports that supports generators:
def chunks(seq, size):
it = iter(seq)
while True:
ret = tuple(next(it) for _ in range(size))
if len(ret) == size:
yield ret
else:
raise StopIteration()
Example of use:
>>> def foo():
... i = 0
... while True:
... i += 1
... yield i
...
>>> c = chunks(foo(), 3)
>>> c.next()
(1, 2, 3)
>>> c.next()
(4, 5, 6)
>>> list(chunks('abcdefg', 2))
[('a', 'b'), ('c', 'd'), ('e', 'f')]
回答 26
在Python 3.8中,您可以使用walrus运算符和itertools.islice。
from itertools import islice
list_ =[i for i in range(10,100)]def chunker(it, size):
iterator = iter(it)while chunk := list(islice(iterator, size)):print(chunk)
With Python 3.8 you can use the walrus operator and itertools.islice.
from itertools import islice
list_ = [i for i in range(10, 100)]
def chunker(it, size):
iterator = iter(it)
while chunk := list(islice(iterator, size)):
print(chunk)