numpy.amax() will find the max value in an array, and numpy.amin() does the same for the min value. If I want to find both max and min, I have to call both functions, which requires passing over the (very big) array twice, which seems slow.
Is there a function in the numpy API that finds both max and min with only a single pass through the data?
Is there a function in the numpy API that finds both max and min with only a single pass through the data?
No. At the time of this writing, there is no such function. (And yes, if there were such a function, its performance would be significantly better than calling numpy.amin() and numpy.amax() successively on a large array.)
回答 1
我认为两次通过数组都不是问题。 考虑以下伪代码:
minval = array[0]
maxval = array[0]for i in array:if i < minval:
minval = i
if i > maxval:
maxval = i
I don’t think that passing over the array twice is a problem. Consider the following pseudo-code:
minval = array[0]
maxval = array[0]
for i in array:
if i < minval:
minval = i
if i > maxval:
maxval = i
While there is only 1 loop here, there are still 2 checks. (Instead of having 2 loops with 1 check each). Really the only thing you save is the overhead of 1 loop. If the arrays really are big as you say, that overhead is small compared to the actual loop’s work load. (Note that this is all implemented in C, so the loops are more or less free anyway).
EDIT Sorry to the 4 of you who upvoted and had faith in me. You definitely can optimize this.
Here’s some fortran code which can be compiled into a python module via f2py (maybe a Cython guru can come along and compare this with an optimized C version …):
subroutine minmax1(a,n,amin,amax)
implicit none
!f2py intent(hidden) :: n
!f2py intent(out) :: amin,amax
!f2py intent(in) :: a
integer n
real a(n),amin,amax
integer i
amin = a(1)
amax = a(1)
do i=2, n
if(a(i) > amax)then
amax = a(i)
elseif(a(i) < amin) then
amin = a(i)
endif
enddo
end subroutine minmax1
subroutine minmax2(a,n,amin,amax)
implicit none
!f2py intent(hidden) :: n
!f2py intent(out) :: amin,amax
!f2py intent(in) :: a
integer n
real a(n),amin,amax
amin = minval(a)
amax = maxval(a)
end subroutine minmax2
I have to say, I don’t completely understand it. Comparing just np.min versus minmax1 and minmax2 is still a losing battle, so it’s not just a memory issue …
notes — Increasing size by a factor of 10**a and decreasing repeat by a factor of 10**a (keeping the problem size constant) does change the performance, but not in a seemingly consistent way which shows that there is some interplay between memory performance and function call overhead in python. Even comparing a simple min implementation in fortran beats numpy’s by a factor of approximately 2 …
import numpy
import numba
@numba.jit
def minmax(x):
maximum = x[0]
minimum = x[0]for i in x[1:]:if i > maximum:
maximum = i
elif i < minimum:
minimum = i
return(minimum, maximum)
numpy.random.seed(1)
x = numpy.random.rand(1000000)print(minmax(x)==(x.min(), x.max()))
You could use Numba, which is a NumPy-aware dynamic Python compiler using LLVM. The resulting implementation is pretty simple and clear:
import numpy
import numba
@numba.jit
def minmax(x):
maximum = x[0]
minimum = x[0]
for i in x[1:]:
if i > maximum:
maximum = i
elif i < minimum:
minimum = i
return (minimum, maximum)
numpy.random.seed(1)
x = numpy.random.rand(1000000)
print(minmax(x) == (x.min(), x.max()))
It should also be faster than a Numpy’s min() & max() implementation. And all without having to write a single C/Fortran line of code.
Do your own performance tests, as it is always dependent on your architecture, your data, your package versions…
import numba as nb
import numpy as np
@nb.njit
def minmax(array):# Ravel the array and return early if it's empty
array = array.ravel()
length = array.size
ifnot length:return# We want to process two elements at once so we need# an even sized array, but we preprocess the first and# start with the second element, so we want it "odd"
odd = length %2ifnot odd:
length -=1# Initialize min and max with the first item
minimum = maximum = array[0]
i =1while i < length:# Get the next two items and swap them if necessary
x = array[i]
y = array[i+1]if x > y:
x, y = y, x
# Compare the min with the smaller one and the max# with the bigger one
minimum = min(x, minimum)
maximum = max(y, maximum)
i +=2# If we had an even sized array we need to compare the# one remaining item too.ifnot odd:
x = array[length]
minimum = min(x, minimum)
maximum = max(x, maximum)return minimum, maximum
arr = np.random.random(3000000)assert minmax(arr)== minmax_peque(arr)# warmup and making sure they are identical %timeit minmax(arr)# 100 loops, best of 3: 2.1 ms per loop%timeit minmax_peque(arr)# 100 loops, best of 3: 2.75 ms per loop
In general you can reduce the amount of comparisons for a minmax algorithm by processing two elements at a time and only comparing the smaller to the temporary minimum and the bigger one to the temporary maximum. On average one needs only 3/4 of the comparisons than a naive approach.
This could be implemented in c or fortran (or any other low-level language) and should be almost unbeatable in terms of performance. I’m using numba to illustrate the principle and get a very fast, dtype-independant implementation:
import numba as nb
import numpy as np
@nb.njit
def minmax(array):
# Ravel the array and return early if it's empty
array = array.ravel()
length = array.size
if not length:
return
# We want to process two elements at once so we need
# an even sized array, but we preprocess the first and
# start with the second element, so we want it "odd"
odd = length % 2
if not odd:
length -= 1
# Initialize min and max with the first item
minimum = maximum = array[0]
i = 1
while i < length:
# Get the next two items and swap them if necessary
x = array[i]
y = array[i+1]
if x > y:
x, y = y, x
# Compare the min with the smaller one and the max
# with the bigger one
minimum = min(x, minimum)
maximum = max(y, maximum)
i += 2
# If we had an even sized array we need to compare the
# one remaining item too.
if not odd:
x = array[length]
minimum = min(x, minimum)
maximum = max(x, maximum)
return minimum, maximum
It’s definetly faster than the naive approach that Peque presented:
arr = np.random.random(3000000)
assert minmax(arr) == minmax_peque(arr) # warmup and making sure they are identical
%timeit minmax(arr) # 100 loops, best of 3: 2.1 ms per loop
%timeit minmax_peque(arr) # 100 loops, best of 3: 2.75 ms per loop
As expected the new minmax implementation only takes roughly 3/4 of the time the naive implementation took (2.1 / 2.75 = 0.7636363636363637)
回答 5
给出以下想法的一些想法:
import numpy as np
def extrema_np(arr):return np.max(arr), np.min(arr)
import numba as nb
@nb.jit(nopython=True)def extrema_loop_nb(arr):
n = arr.size
max_val = min_val = arr[0]for i in range(1, n):
item = arr[i]if item > max_val:
max_val = item
elif item < min_val:
min_val = item
return max_val, min_val
import numba as nb
@nb.jit(nopython=True)def extrema_while_nb(arr):
n = arr.size
odd = n %2ifnot odd:
n -=1
max_val = min_val = arr[0]
i =1while i < n:
x = arr[i]
y = arr[i +1]if x > y:
x, y = y, x
min_val = min(x, min_val)
max_val = max(y, max_val)
i +=2ifnot odd:
x = arr[n]
min_val = min(x, min_val)
max_val = max(x, max_val)return max_val, min_val
%%cython -c-O3 -c-march=native -a
#cython: language_level=3, boundscheck=False, wraparound=False, initializedcheck=False, cdivision=True, infer_types=Trueimport numpy as np
cdef void _extrema_loop_cy(
long[:] arr,size_t n,
long[:] result):
cdef size_t i
cdef long item, max_val, min_val
max_val = arr[0]
min_val = arr[0]for i in range(1, n):
item = arr[i]if item > max_val:
max_val = item
elif item < min_val:
min_val = item
result[0]= max_val
result[1]= min_val
def extrema_loop_cy(arr):
result = np.zeros(2, dtype=arr.dtype)
_extrema_loop_cy(arr, arr.size, result)return result[0], result[1]
%%cython -c-O3 -c-march=native -a
#cython: language_level=3, boundscheck=False, wraparound=False, initializedcheck=False, cdivision=True, infer_types=Trueimport numpy as np
cdef void _extrema_while_cy(
long[:] arr,size_t n,
long[:] result):
cdef size_t i, odd
cdef long x, y, max_val, min_val
max_val = arr[0]
min_val = arr[0]
odd = n %2ifnot odd:
n -=1
max_val = min_val = arr[0]
i =1while i < n:
x = arr[i]
y = arr[i +1]if x > y:
x, y = y, x
min_val = min(x, min_val)
max_val = max(y, max_val)
i +=2ifnot odd:
x = arr[n]
min_val = min(x, min_val)
max_val = max(x, max_val)
result[0]= max_val
result[1]= min_val
def extrema_while_cy(arr):
result = np.zeros(2, dtype=arr.dtype)
_extrema_while_cy(arr, arr.size, result)return result[0], result[1]
Just to get some ideas on the numbers one could expect, given the following approaches:
import numpy as np
def extrema_np(arr):
return np.max(arr), np.min(arr)
import numba as nb
@nb.jit(nopython=True)
def extrema_loop_nb(arr):
n = arr.size
max_val = min_val = arr[0]
for i in range(1, n):
item = arr[i]
if item > max_val:
max_val = item
elif item < min_val:
min_val = item
return max_val, min_val
import numba as nb
@nb.jit(nopython=True)
def extrema_while_nb(arr):
n = arr.size
odd = n % 2
if not odd:
n -= 1
max_val = min_val = arr[0]
i = 1
while i < n:
x = arr[i]
y = arr[i + 1]
if x > y:
x, y = y, x
min_val = min(x, min_val)
max_val = max(y, max_val)
i += 2
if not odd:
x = arr[n]
min_val = min(x, min_val)
max_val = max(x, max_val)
return max_val, min_val
%%cython -c-O3 -c-march=native -a
#cython: language_level=3, boundscheck=False, wraparound=False, initializedcheck=False, cdivision=True, infer_types=True
import numpy as np
cdef void _extrema_loop_cy(
long[:] arr,
size_t n,
long[:] result):
cdef size_t i
cdef long item, max_val, min_val
max_val = arr[0]
min_val = arr[0]
for i in range(1, n):
item = arr[i]
if item > max_val:
max_val = item
elif item < min_val:
min_val = item
result[0] = max_val
result[1] = min_val
def extrema_loop_cy(arr):
result = np.zeros(2, dtype=arr.dtype)
_extrema_loop_cy(arr, arr.size, result)
return result[0], result[1]
%%cython -c-O3 -c-march=native -a
#cython: language_level=3, boundscheck=False, wraparound=False, initializedcheck=False, cdivision=True, infer_types=True
import numpy as np
cdef void _extrema_while_cy(
long[:] arr,
size_t n,
long[:] result):
cdef size_t i, odd
cdef long x, y, max_val, min_val
max_val = arr[0]
min_val = arr[0]
odd = n % 2
if not odd:
n -= 1
max_val = min_val = arr[0]
i = 1
while i < n:
x = arr[i]
y = arr[i + 1]
if x > y:
x, y = y, x
min_val = min(x, min_val)
max_val = max(y, max_val)
i += 2
if not odd:
x = arr[n]
min_val = min(x, min_val)
max_val = max(x, max_val)
result[0] = max_val
result[1] = min_val
def extrema_while_cy(arr):
result = np.zeros(2, dtype=arr.dtype)
_extrema_while_cy(arr, arr.size, result)
return result[0], result[1]
(the extrema_loop_*() approaches are similar to what is proposed here, while extrema_while_*() approaches are based on the code from here)
The following timings:
indicate that the extrema_while_*() are the fastest, with extrema_while_nb() being fastest. In any case, also the extrema_loop_nb() and extrema_loop_cy() solutions do outperform the NumPy-only approach (using np.max() and np.min() separately).
Finally, note that none of these is as flexible as np.min()/np.max() (in terms of n-dim support, axis parameter, etc.).
In[1]:import numpy
In[2]: a = numpy.random.normal(0,1,1000000)In[3]:%%timeit
...: lo, hi = numpy.amin(a), numpy.amax(a)...:100 loops, best of 3:4.08 ms per loop
In[4]:%%timeit
...: lo, hi = numpy.percentile(a,[0,100])...:100 loops, best of 3:17.2 ms per loop
In[5]: numpy.__version__
Out[5]:'1.14.4'
Nobody mentioned numpy.percentile, so I thought I would. If you ask for [0, 100] percentiles, it will give you an array of two elements, the min (0th percentile) and the max (100th percentile).
However, it doesn’t satisfy the OP’s purpose: it’s not faster than min and max separately. That’s probably due to some machinery that would allow for non-extreme percentiles (a harder problem, which should take longer).
In [1]: import numpy
In [2]: a = numpy.random.normal(0, 1, 1000000)
In [3]: %%timeit
...: lo, hi = numpy.amin(a), numpy.amax(a)
...:
100 loops, best of 3: 4.08 ms per loop
In [4]: %%timeit
...: lo, hi = numpy.percentile(a, [0, 100])
...:
100 loops, best of 3: 17.2 ms per loop
In [5]: numpy.__version__
Out[5]: '1.14.4'
A future version of Numpy could put in a special case to skip the normal percentile calculation if only [0, 100] are requested. Without adding anything to the interface, there’s a way to ask Numpy for min and max in one call (contrary to what was said in the accepted answer), but the standard implementation of the library doesn’t take advantage of this case to make it worthwhile.
## for an even-sized array
_max = ar[0]
_min = ar[0]for ii in xrange(0, len(ar),2)):## iterate over every other value in the array
f1 = ar[ii]
f2 = ar[ii+1]if(f1 < f2):if f1 < _min: _min = f1
if f2 > _max: _max = f2
else:if f2 < _min: _min = f2
if f1 > _max: _max = f1
This is an old thread, but anyway, if anyone ever looks at this again…
When looking for the min and max simultaneously, it is possible to reduce the number of comparisons. If it is floats you are comparing (which I guess it is) this might save you some time, although not computational complexity.
Instead of (Python code):
_max = ar[0]
_min= ar[0]
for ii in xrange(len(ar)):
if _max > ar[ii]: _max = ar[ii]
if _min < ar[ii]: _min = ar[ii]
you can first compare two adjacent values in the array, and then only compare the smaller one against current minimum, and the larger one against current maximum:
## for an even-sized array
_max = ar[0]
_min = ar[0]
for ii in xrange(0, len(ar), 2)): ## iterate over every other value in the array
f1 = ar[ii]
f2 = ar[ii+1]
if (f1 < f2):
if f1 < _min: _min = f1
if f2 > _max: _max = f2
else:
if f2 < _min: _min = f2
if f1 > _max: _max = f1
The code here is written in Python, clearly for speed you would use C or Fortran or Cython, but this way you do 3 comparisons per iteration, with len(ar)/2 iterations, giving 3/2 * len(ar) comparisons. As opposed to that, doing the comparison “the obvious way” you do two comparisons per iteration, leading to 2*len(ar) comparisons. Saves you 25% of comparison time.
… but if you look at the source for that function, it simply calls a.min() and a.max() independently, and therefore fails to avoid the performance concerns addressed in this question. :-(
无论如何,这对我来说都是值得的,所以我将在这里为任何有兴趣的人提出最困难,最不优雅的解决方案。我的解决方案是在C ++中以一次通过算法实现多线程min-max,然后使用它创建一个Python扩展模块。这项工作需要花费一些开销来学习如何使用Python和NumPy C / C ++ API,在这里我将展示代码,并为希望沿这条路走的人提供一些小的解释和参考。
// mt_np.cc//// multi-threaded min/max algorithm#include<algorithm>#include<future>#include<vector>namespace mt_np {/*
* Get {min,max} in interval [begin,end)
*/template<typename T> std::pair<T, T> min_max(T *begin, T *end){
T min{*begin};
T max{*begin};while(++begin < end){if(*begin < min){
min =*begin;continue;}elseif(*begin > max){
max =*begin;}}return{min, max};}/*
* get {min,max} in interval [begin,end) using #workers for concurrency
*/template<typename T>
std::pair<T, T> min_max_mt(T *begin, T *end,int workers){constlongint chunk_size = std::max((end - begin)/ workers,1l);
std::vector<std::future<std::pair<T, T>>> min_maxes;// fire up the workerswhile(begin < end){
T *next = std::min(end, begin + chunk_size);
min_maxes.push_back(std::async(min_max<T>, begin, next));
begin = next;}// retrieve the resultsauto min_max_it = min_maxes.begin();auto v{min_max_it->get()};
T min{v.first};
T max{v.second};while(++min_max_it != min_maxes.end()){
v = min_max_it->get();
min = std::min(min, v.first);
max = std::max(max, v.second);}return{min, max};}};// namespace mt_np
It was worth the effort for me anyways, so I’ll propose the most difficult and least elegant solution here for whoever may be interested. My solution is to implement a multi-threaded min-max in one pass algorithm in C++, and use this to create an Python extension module. This effort requires a bit of overhead for learning how to use the Python and NumPy C/C++ APIs, and here I will show the code and give some small explanations and references for whoever wishes to go down this path.
Multi-threaded Min/Max
There is nothing too interesting here. The array is broken into chunks of size length / workers. The min/max is calculated for each chunk in a future, which are then scanned for the global min/max.
// mt_np.cc
//
// multi-threaded min/max algorithm
#include <algorithm>
#include <future>
#include <vector>
namespace mt_np {
/*
* Get {min,max} in interval [begin,end)
*/
template <typename T> std::pair<T, T> min_max(T *begin, T *end) {
T min{*begin};
T max{*begin};
while (++begin < end) {
if (*begin < min) {
min = *begin;
continue;
} else if (*begin > max) {
max = *begin;
}
}
return {min, max};
}
/*
* get {min,max} in interval [begin,end) using #workers for concurrency
*/
template <typename T>
std::pair<T, T> min_max_mt(T *begin, T *end, int workers) {
const long int chunk_size = std::max((end - begin) / workers, 1l);
std::vector<std::future<std::pair<T, T>>> min_maxes;
// fire up the workers
while (begin < end) {
T *next = std::min(end, begin + chunk_size);
min_maxes.push_back(std::async(min_max<T>, begin, next));
begin = next;
}
// retrieve the results
auto min_max_it = min_maxes.begin();
auto v{min_max_it->get()};
T min{v.first};
T max{v.second};
while (++min_max_it != min_maxes.end()) {
v = min_max_it->get();
min = std::min(min, v.first);
max = std::max(max, v.second);
}
return {min, max};
}
}; // namespace mt_np
The Python Extension Module
Here is where things start getting ugly… One way to use C++ code in Python is to implement an extension module. This module can be built and installed using the distutils.core standard module. A complete description of what this entails is covered in the Python documentation: https://docs.python.org/3/extending/extending.html. NOTE: there are certainly other ways to get similar results, to quote https://docs.python.org/3/extending/index.html#extending-index:
This guide only covers the basic tools for creating extensions provided as part of this version of CPython. Third party tools like Cython, cffi, SWIG and Numba offer both simpler and more sophisticated approaches to creating C and C++ extensions for Python.
Essentially, this route is probably more academic than practical. With that being said, what I did next was, sticking pretty close to the tutorial, create a module file. This is essentially boilerplate for distutils to know what to do with your code and create a Python module out of it. Before doing any of this it is probably wise to create a Python virtual environment so you don’t pollute your system packages (see https://docs.python.org/3/library/venv.html#module-venv).
These are far less encouraging than the results indicate earlier in the thread, which indicated somewhere around 3.5x speedup, and didn’t incorporate multi-threading. The results I achieved are somewhat reasonable, I would expect that the overhead of threading and would dominate the time until the arrays got very large, at which point the performance increase would start to approach std::thread::hardware_concurrency x increase.
Conclusion
There is certainly room for application specific optimizations to some NumPy code, it would seem, in particular with regards to multi-threading. Whether or not it is worth the effort is not clear to me, but it certainly seems like a good exercise (or something). I think that perhaps learning some of those “third party tools” like Cython may be a better use of time, but who knows.