If you just want a straightforward non-weighted moving average, you can easily implement it with np.cumsum, which may beis faster than FFT based methods:
EDIT Corrected an off-by-one wrong indexing spotted by Bean in the code. EDIT
>>># the recommended syntax to import pandas>>>import pandas as PD
>>>import numpy as NP
>>># prepare some fake data:>>># the date-time indices:>>> t = PD.date_range('1/1/2010','12/31/2012', freq='D')>>># the data:>>> x = NP.arange(0, t.shape[0])>>># combine the data & index into a Pandas 'Series' object>>> D = PD.Series(x, t)
>>> d_mva = PD.rolling_mean(D,10)>>># d_mva is the same size as the original Series>>> d_mva.shape
(1096,)>>># though obviously the first w values are NaN where w is the window size>>> d_mva[:3]2010-01-01NaN2010-01-02NaN2010-01-03NaN
验证它是否有效-例如,将原始系列中的值10-15与通过滚动平均值平滑后的新系列进行比较
>>> D[10:15]2010-01-112.0410762010-01-122.0410762010-01-132.7205852010-01-142.7205852010-01-153.656987Freq: D
>>> d_mva[10:20]2010-01-113.1311252010-01-123.0352322010-01-132.9231442010-01-142.8110552010-01-152.785824Freq: D
NumPy’s lack of a particular domain-specific function is perhaps due to the Core Team’s discipline and fidelity to NumPy’s prime directive: provide an N-dimensional array type, as well as functions for creating, and indexing those arrays. Like many foundational objectives, this one is not small, and NumPy does it brilliantly.
The (much) larger SciPy contains a much larger collection of domain-specific libraries (called subpackages by SciPy devs)–for instance, numerical optimization (optimize), signal processsing (signal), and integral calculus (integrate).
My guess is that the function you are after is in at least one of the SciPy subpackages (scipy.signal perhaps); however, i would look first in the collection of SciPy scikits, identify the relevant scikit(s) and look for the function of interest there.
Scikits are independently developed packages based on NumPy/SciPy and directed to a particular technical discipline (e.g., scikits-image, scikits-learn, etc.) Several of these were (in particular, the awesome OpenOpt for numerical optimization) were highly regarded, mature projects long before choosing to reside under the relatively new scikits rubric. The Scikits homepage liked to above lists about 30 such scikits, though at least several of those are no longer under active development.
Following this advice would lead you to scikits-timeseries; however, that package is no longer under active development; In effect, Pandas has become, AFAIK, the de factoNumPy-based time series library.
Pandas has several functions that can be used to calculate a moving average; the simplest of these is probably rolling_mean, which you use like so:
>>> # the recommended syntax to import pandas
>>> import pandas as PD
>>> import numpy as NP
>>> # prepare some fake data:
>>> # the date-time indices:
>>> t = PD.date_range('1/1/2010', '12/31/2012', freq='D')
>>> # the data:
>>> x = NP.arange(0, t.shape[0])
>>> # combine the data & index into a Pandas 'Series' object
>>> D = PD.Series(x, t)
Now, just call the function rolling_mean passing in the Series object and a window size, which in my example below is 10 days.
>>> d_mva = PD.rolling_mean(D, 10)
>>> # d_mva is the same size as the original Series
>>> d_mva.shape
(1096,)
>>> # though obviously the first w values are NaN where w is the window size
>>> d_mva[:3]
2010-01-01 NaN
2010-01-02 NaN
2010-01-03 NaN
verify that it worked–e.g., compared values 10 – 15 in the original series versus the new Series smoothed with rolling mean
The function rolling_mean, along with about a dozen or so other function are informally grouped in the Pandas documentation under the rubric moving window functions; a second, related group of functions in Pandas is referred to as exponentially-weighted functions (e.g., ewma, which calculates exponentially moving weighted average). The fact that this second group is not included in the first (moving window functions) is perhaps because the exponentially-weighted transforms don’t rely on a fixed-length window
A simple way to achieve this is by using np.convolve.
The idea behind this is to leverage the way the discrete convolution is computed and use it to return a rolling mean. This can be done by convolving with a sequence of np.ones of a length equal to the sliding window length we want.
In order to do so we could define the following function:
def moving_average(x, w):
return np.convolve(x, np.ones(w), 'valid') / w
This function will be taking the convolution of the sequence x and a sequence of ones of length w. Note that the chosen mode is valid so that the convolution product is only given for points where the sequences overlap completely.
Some examples:
x = np.array([5,3,8,10,2,1,5,1,0,2])
For a moving average with a window of length 2 we would have:
Lets have a more in depth look at the way the discrete convolution is being computed.
The following function aims to replicate the way np.convolve is computing the output values:
def mov_avg(x, w):
for m in range(len(x)-(w-1)):
yield sum(np.ones(w) * x[m:m+w]) / w
Which, for the same example above would also yield:
So what is being done at each step is to take the inner product between the array of ones and the current window. In this case the multiplication by np.ones(w) is superfluous given that we are directly taking the sum of the sequence.
Bellow is an example of how the first outputs are computed so that it is a little clearer. Lets suppose we want a window of w=4:
import numpy as np
import scipy as sci
import scipy.signal as sig
import pandas as pd
import bottleneck as bn
import time as time
def rollavg_direct(a,n):'Direct "for" loop'assert n%2==1
b = a*0.0for i in range(len(a)):
b[i]=a[max(i-n//2,0):min(i+n//2+1,len(a))].mean()return b
def rollavg_comprehension(a,n):'List comprehension'assert n%2==1
r,N = int(n/2),len(a)return np.array([a[max(i-r,0):min(i+r+1,N)].mean()for i in range(N)])def rollavg_convolve(a,n):'scipy.convolve'assert n%2==1return sci.convolve(a,np.ones(n,dtype='float')/n,'same')[n//2:-n//2+1]def rollavg_convolve_edges(a,n):'scipy.convolve, edge handling'assert n%2==1return sci.convolve(a,np.ones(n,dtype='float'),'same')/sci.convolve(np.ones(len(a)),np.ones(n),'same')def rollavg_cumsum(a,n):'numpy.cumsum'assert n%2==1
cumsum_vec = np.cumsum(np.insert(a,0,0))return(cumsum_vec[n:]- cumsum_vec[:-n])/ n
def rollavg_cumsum_edges(a,n):'numpy.cumsum, edge handling'assert n%2==1
N = len(a)
cumsum_vec = np.cumsum(np.insert(np.pad(a,(n-1,n-1),'constant'),0,0))
d = np.hstack((np.arange(n//2+1,n),np.ones(N-n)*n,np.arange(n,n//2,-1)))return(cumsum_vec[n+n//2:-n//2+1]- cumsum_vec[n//2:-n-n//2])/ d
def rollavg_roll(a,n):'Numpy array rolling'assert n%2==1
N = len(a)
rolling_idx = np.mod((N-1)*np.arange(n)[:,None]+ np.arange(N), N)return a[rolling_idx].mean(axis=0)[n-1:]def rollavg_roll_edges(a,n):# see /programming/42101082/fast-numpy-roll'Numpy array rolling, edge handling'assert n%2==1
a = np.pad(a,(0,n-1-n//2),'constant')*np.ones(n)[:,None]
m = a.shape[1]
idx = np.mod((m-1)*np.arange(n)[:,None]+ np.arange(m), m)# Rolling index
out = a[np.arange(-n//2,n//2)[:,None], idx]
d = np.hstack((np.arange(1,n),np.ones(m-2*n+1+n//2)*n,np.arange(n,n//2,-1)))return(out.sum(axis=0)/d)[n//2:]def rollavg_pandas(a,n):'Pandas rolling average'return pd.DataFrame(a).rolling(n, center=True, min_periods=1).mean().to_numpy()def rollavg_bottlneck(a,n):'bottleneck.move_mean'return bn.move_mean(a, window=n, min_count=1)
N =10**6
a = np.random.rand(N)
functions =[rollavg_direct, rollavg_comprehension, rollavg_convolve,
rollavg_convolve_edges, rollavg_cumsum, rollavg_cumsum_edges,
rollavg_pandas, rollavg_bottlneck, rollavg_roll, rollavg_roll_edges]print('Small window (n=3)')%load_ext memory_profiler
for f in functions :print('\n'+f.__doc__+' : ')%timeit b=f(a,3)print('\nLarge window (n=1001)')for f in functions[0:-2]:print('\n'+f.__doc__+' : ')%timeit b=f(a,1001)print('\nMemory\n')print('Small window (n=3)')
N =10**7
a = np.random.rand(N)%load_ext memory_profiler
for f in functions[2:]:print('\n'+f.__doc__+' : ')%memit b=f(a,3)print('\nLarge window (n=1001)')for f in functions[2:-2]:print('\n'+f.__doc__+' : ')%memit b=f(a,1001)
定时,小窗口(n = 3)
Direct"for" loop :4.14 s ±23.7 ms per loop (mean ± std. dev. of 7 runs,1 loop each)List comprehension :3.96 s ±27.9 ms per loop (mean ± std. dev. of 7 runs,1 loop each)
scipy.convolve :1.07 ms ±26.7µs per loop (mean ± std. dev. of 7 runs,1000 loops each)
scipy.convolve, edge handling :4.68 ms ±9.69µs per loop (mean ± std. dev. of 7 runs,100 loops each)
numpy.cumsum :5.31 ms ±5.11µs per loop (mean ± std. dev. of 7 runs,100 loops each)
numpy.cumsum, edge handling :8.52 ms ±11.1µs per loop (mean ± std. dev. of 7 runs,100 loops each)Pandas rolling average :9.85 ms ±9.63µs per loop (mean ± std. dev. of 7 runs,100 loops each)
bottleneck.move_mean :1.3 ms ±12.2µs per loop (mean ± std. dev. of 7 runs,100 loops each)Numpy array rolling :31.3 ms ±91.9µs per loop (mean ± std. dev. of 7 runs,10 loops each)Numpy array rolling, edge handling :61.1 ms ±55.9µs per loop (mean ± std. dev. of 7 runs,10 loops each)
大窗口计时(n = 1001)
Direct"for" loop :4.67 s ±34 ms per loop (mean ± std. dev. of 7 runs,1 loop each)List comprehension :4.46 s ±14.6 ms per loop (mean ± std. dev. of 7 runs,1 loop each)
scipy.convolve :103 ms ±165µs per loop (mean ± std. dev. of 7 runs,10 loops each)
scipy.convolve, edge handling :272 ms ±1.23 ms per loop (mean ± std. dev. of 7 runs,1 loop each)
numpy.cumsum :5.19 ms ±12.4µs per loop (mean ± std. dev. of 7 runs,100 loops each)
numpy.cumsum, edge handling :8.7 ms ±11.5µs per loop (mean ± std. dev. of 7 runs,100 loops each)Pandas rolling average :9.67 ms ±199µs per loop (mean ± std. dev. of 7 runs,100 loops each)
bottleneck.move_mean :1.31 ms ±15.7µs per loop (mean ± std. dev. of 7 runs,100 loops each)
Here are a variety of ways to do this, along with some benchmarks. The best methods are versions using optimized code from other libraries. The bottleneck.move_mean method is probably best all around. The scipy.convolve approach is also very fast, extensible, and syntactically and conceptually simple, but doesn’t scale well for very large window values. The numpy.cumsum method is good if you need a pure numpy approach.
Note: Some of these (e.g. bottleneck.move_mean) are not centered, and will shift your data.
import numpy as np
import scipy as sci
import scipy.signal as sig
import pandas as pd
import bottleneck as bn
import time as time
def rollavg_direct(a,n):
'Direct "for" loop'
assert n%2==1
b = a*0.0
for i in range(len(a)) :
b[i]=a[max(i-n//2,0):min(i+n//2+1,len(a))].mean()
return b
def rollavg_comprehension(a,n):
'List comprehension'
assert n%2==1
r,N = int(n/2),len(a)
return np.array([a[max(i-r,0):min(i+r+1,N)].mean() for i in range(N)])
def rollavg_convolve(a,n):
'scipy.convolve'
assert n%2==1
return sci.convolve(a,np.ones(n,dtype='float')/n, 'same')[n//2:-n//2+1]
def rollavg_convolve_edges(a,n):
'scipy.convolve, edge handling'
assert n%2==1
return sci.convolve(a,np.ones(n,dtype='float'), 'same')/sci.convolve(np.ones(len(a)),np.ones(n), 'same')
def rollavg_cumsum(a,n):
'numpy.cumsum'
assert n%2==1
cumsum_vec = np.cumsum(np.insert(a, 0, 0))
return (cumsum_vec[n:] - cumsum_vec[:-n]) / n
def rollavg_cumsum_edges(a,n):
'numpy.cumsum, edge handling'
assert n%2==1
N = len(a)
cumsum_vec = np.cumsum(np.insert(np.pad(a,(n-1,n-1),'constant'), 0, 0))
d = np.hstack((np.arange(n//2+1,n),np.ones(N-n)*n,np.arange(n,n//2,-1)))
return (cumsum_vec[n+n//2:-n//2+1] - cumsum_vec[n//2:-n-n//2]) / d
def rollavg_roll(a,n):
'Numpy array rolling'
assert n%2==1
N = len(a)
rolling_idx = np.mod((N-1)*np.arange(n)[:,None] + np.arange(N), N)
return a[rolling_idx].mean(axis=0)[n-1:]
def rollavg_roll_edges(a,n):
# see https://stackoverflow.com/questions/42101082/fast-numpy-roll
'Numpy array rolling, edge handling'
assert n%2==1
a = np.pad(a,(0,n-1-n//2), 'constant')*np.ones(n)[:,None]
m = a.shape[1]
idx = np.mod((m-1)*np.arange(n)[:,None] + np.arange(m), m) # Rolling index
out = a[np.arange(-n//2,n//2)[:,None], idx]
d = np.hstack((np.arange(1,n),np.ones(m-2*n+1+n//2)*n,np.arange(n,n//2,-1)))
return (out.sum(axis=0)/d)[n//2:]
def rollavg_pandas(a,n):
'Pandas rolling average'
return pd.DataFrame(a).rolling(n, center=True, min_periods=1).mean().to_numpy()
def rollavg_bottlneck(a,n):
'bottleneck.move_mean'
return bn.move_mean(a, window=n, min_count=1)
N = 10**6
a = np.random.rand(N)
functions = [rollavg_direct, rollavg_comprehension, rollavg_convolve,
rollavg_convolve_edges, rollavg_cumsum, rollavg_cumsum_edges,
rollavg_pandas, rollavg_bottlneck, rollavg_roll, rollavg_roll_edges]
print('Small window (n=3)')
%load_ext memory_profiler
for f in functions :
print('\n'+f.__doc__+ ' : ')
%timeit b=f(a,3)
print('\nLarge window (n=1001)')
for f in functions[0:-2] :
print('\n'+f.__doc__+ ' : ')
%timeit b=f(a,1001)
print('\nMemory\n')
print('Small window (n=3)')
N = 10**7
a = np.random.rand(N)
%load_ext memory_profiler
for f in functions[2:] :
print('\n'+f.__doc__+ ' : ')
%memit b=f(a,3)
print('\nLarge window (n=1001)')
for f in functions[2:-2] :
print('\n'+f.__doc__+ ' : ')
%memit b=f(a,1001)
Timing, Small window (n=3)
Direct "for" loop :
4.14 s ± 23.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
List comprehension :
3.96 s ± 27.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
scipy.convolve :
1.07 ms ± 26.7 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
scipy.convolve, edge handling :
4.68 ms ± 9.69 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
numpy.cumsum :
5.31 ms ± 5.11 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
numpy.cumsum, edge handling :
8.52 ms ± 11.1 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
Pandas rolling average :
9.85 ms ± 9.63 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
bottleneck.move_mean :
1.3 ms ± 12.2 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
Numpy array rolling :
31.3 ms ± 91.9 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
Numpy array rolling, edge handling :
61.1 ms ± 55.9 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
Timing, Large window (n=1001)
Direct "for" loop :
4.67 s ± 34 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
List comprehension :
4.46 s ± 14.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
scipy.convolve :
103 ms ± 165 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
scipy.convolve, edge handling :
272 ms ± 1.23 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
numpy.cumsum :
5.19 ms ± 12.4 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
numpy.cumsum, edge handling :
8.7 ms ± 11.5 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
Pandas rolling average :
9.67 ms ± 199 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
bottleneck.move_mean :
1.31 ms ± 15.7 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
# the recommended syntax to import pandasimport pandas as pd
import numpy as np
# prepare some fake data:# the date-time indices:
t = pd.date_range('1/1/2010','12/31/2012', freq='D')# the data:
x = np.arange(0, t.shape[0])# combine the data & index into a Pandas 'Series' object
D = pd.Series(x, t)
现在,只需rolling使用窗口大小在数据框上调用该函数,在我的下面的示例中为10天。
d_mva10 = D.rolling(10).mean()# d_mva is the same size as the original Series# though obviously the first w values are NaN where w is the window size
d_mva10[:11]2010-01-01NaN2010-01-02NaN2010-01-03NaN2010-01-04NaN2010-01-05NaN2010-01-06NaN2010-01-07NaN2010-01-08NaN2010-01-09NaN2010-01-104.52010-01-115.5Freq: D, dtype: float64
This answer using Pandas is adapted from above, as rolling_mean is not part of Pandas anymore
# the recommended syntax to import pandas
import pandas as pd
import numpy as np
# prepare some fake data:
# the date-time indices:
t = pd.date_range('1/1/2010', '12/31/2012', freq='D')
# the data:
x = np.arange(0, t.shape[0])
# combine the data & index into a Pandas 'Series' object
D = pd.Series(x, t)
Now, just call the function rolling on the dataframe with a window size, which in my example below is 10 days.
d_mva10 = D.rolling(10).mean()
# d_mva is the same size as the original Series
# though obviously the first w values are NaN where w is the window size
d_mva10[:11]
2010-01-01 NaN
2010-01-02 NaN
2010-01-03 NaN
2010-01-04 NaN
2010-01-05 NaN
2010-01-06 NaN
2010-01-07 NaN
2010-01-08 NaN
2010-01-09 NaN
2010-01-10 4.5
2010-01-11 5.5
Freq: D, dtype: float64
import numpy as np
import bottleneck as bn
a = np.random.randint(4, 1000, size=(5, 7))
mm = bn.move_mean(a, window=2, min_count=1)
This gives move mean along each axis.
“mm” is the moving mean for “a”.
“window” is the max number of entries to consider for moving mean.
“min_count” is min number of entries to consider for moving mean (e.g. for first element or if the array has nan values).
The good part is Bottleneck helps to deal with nan values and it’s also very efficient.
回答 6
如果您要小心处理边缘条件(仅从边缘上的可用元素计算平均值),则可以使用以下函数。
import numpy as np
def running_mean(x, N):
out = np.zeros_like(x, dtype=np.float64)
dim_len = x.shape[0]for i in range(dim_len):if N%2==0:
a, b = i -(N-1)//2, i +(N-1)//2+2else:
a, b = i -(N-1)//2, i +(N-1)//2+1#cap indices to min and max indices
a = max(0, a)
b = min(dim_len, b)
out[i]= np.mean(x[a:b])return out
>>> running_mean(np.array([1,2,3,4]),2)
array([1.5,2.5,3.5,4.])>>> running_mean(np.array([1,2,3,4]),3)
array([1.5,2.,3.,3.5])
In case you want to take care the edge conditions carefully (compute mean only from available elements at edges), the following function will do the trick.
import numpy as np
def running_mean(x, N):
out = np.zeros_like(x, dtype=np.float64)
dim_len = x.shape[0]
for i in range(dim_len):
if N%2 == 0:
a, b = i - (N-1)//2, i + (N-1)//2 + 2
else:
a, b = i - (N-1)//2, i + (N-1)//2 + 1
#cap indices to min and max indices
a = max(0, a)
b = min(dim_len, b)
out[i] = np.mean(x[a:b])
return out
>>> running_mean(np.array([1,2,3,4]), 2)
array([1.5, 2.5, 3.5, 4. ])
>>> running_mean(np.array([1,2,3,4]), 3)
array([1.5, 2. , 3. , 3.5])
回答 7
for i in range(len(Data)):Data[i,1]=Data[i-lookback:i,0].sum()/ lookback
for i in range(len(Data)):
Data[i, 1] = Data[i-lookback:i, 0].sum() / lookback
Try this piece of code. I think it’s simpler and does the job.
lookback is the window of the moving average.
In the Data[i-lookback:i, 0].sum() I have put 0 to refer to the first column of the dataset but you can put any column you like in case you have more than one column.
I actually wanted a slightly different behavior than the accepted answer. I was building a moving average feature extractor for an sklearn pipeline, so I required that the output of the moving average have the same dimension as the input. What I want is for the moving average to assume the series stays constant, ie a moving average of [1,2,3,4,5] with window 2 would give [1.5,2.5,3.5,4.5,5.0].
talib contains a simple moving average tool, as well as other similar averaging tools (i.e. exponential moving average). Below compares the method to some of the other solutions.
%timeit pd.Series(np.arange(100000)).rolling(3).mean()
2.53 ms ± 40.5 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
%timeit talib.SMA(real = np.arange(100000.), timeperiod = 3)
348 µs ± 3.5 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
%timeit moving_average(np.arange(100000))
638 µs ± 45.1 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
One caveat is that the real must have elements of dtype = float. Otherwise the following error is raised
Exception: real is not double
回答 10
这是使用numba(注意类型)的快速实现。请注意,它确实包含移位的nan。
import numpy as np
import numba as nb
@nb.jit(nb.float64[:](nb.float64[:],nb.int64),
fastmath=True,nopython=True)def moving_average( array, window ):
ret = np.cumsum(array)
ret[window:]= ret[window:]- ret[:-window]
ma = ret[window -1:]/ window
n = np.empty(window-1); n.fill(np.nan)return np.concatenate((n.ravel(), ma.ravel()))
Here is a fast implementation using numba (mind the types). Note it does contain nans where shifted.
import numpy as np
import numba as nb
@nb.jit(nb.float64[:](nb.float64[:],nb.int64),
fastmath=True,nopython=True)
def moving_average( array, window ):
ret = np.cumsum(array)
ret[window:] = ret[window:] - ret[:-window]
ma = ret[window - 1:] / window
n = np.empty(window-1); n.fill(np.nan)
return np.concatenate((n.ravel(), ma.ravel()))
回答 11
移动平均
反转i处的数组,并简单地将均值从i取到n。
使用列表推导来动态生成迷你数组。
x = np.random.randint(10, size=20)def moving_average(arr, n):return[(arr[:i+1][::-1][:n]).mean()for i, ele in enumerate(arr)]
n =5
moving_average(x, n)
reverse the array at i, and simply take the mean from i to n.
use list comprehension to generate mini arrays on the fly.
x = np.random.randint(10, size=20)
def moving_average(arr, n):
return [ (arr[:i+1][::-1][:n]).mean() for i, ele in enumerate(arr) ]
d = 5
moving_average(x, d)
I use either the accepted answer‘s solution, slightly modified to have same length for output as input, or pandas‘ version as mentioned in a comment of another answer. I summarize both here with a reproducible example for future reference:
import numpy as np
import pandas as pd
def moving_average(a, n):
ret = np.cumsum(a, dtype=float)
ret[n:] = ret[n:] - ret[:-n]
return ret / n
def moving_average_centered(a, n):
return pd.Series(a).rolling(window=n, center=True).mean().to_numpy()
A = [0, 0, 1, 2, 4, 5, 4]
print(moving_average(A, 3))
# [0. 0. 0.33333333 1. 2.33333333 3.66666667 4.33333333]
print(moving_average_centered(A, 3))
# [nan 0.33333333 1. 2.33333333 3.66666667 4.33333333 nan ]
By comparing the solution below with the one that uses cumsum of numpy, This one takes almost half the time. This is because it does not need to go through the entire array to do the cumsum and then do all the subtraction. Moreover, the cumsum can be “dangerous” if the array is huge and the number are huge (possible overflow). Of course, also here the danger exists but at least are summed together only the essential numbers.
def moving_average(array_numbers, n):
if n > len(array_numbers):
return []
temp_sum = sum(array_numbers[:n])
averages = [temp_sum / float(n)]
for first_index, item in enumerate(array_numbers[n:]):
temp_sum += item - array_numbers[first_index]
averages.append(temp_sum / float(n))
return averages
Is there a SciPy function or NumPy function or module for Python that calculates the running mean of a 1D array given a specific window?
回答 0
对于一个简短,快速的解决方案,它可以在一个循环中完成所有事情,而没有依赖关系,下面的代码效果很好。
mylist =[1,2,3,4,5,6,7]
N =3
cumsum, moving_aves =[0],[]for i, x in enumerate(mylist,1):
cumsum.append(cumsum[i-1]+ x)if i>=N:
moving_ave =(cumsum[i]- cumsum[i-N])/N
#can do stuff with moving_ave here
moving_aves.append(moving_ave)
For a short, fast solution that does the whole thing in one loop, without dependencies, the code below works great.
mylist = [1, 2, 3, 4, 5, 6, 7]
N = 3
cumsum, moving_aves = [0], []
for i, x in enumerate(mylist, 1):
cumsum.append(cumsum[i-1] + x)
if i>=N:
moving_ave = (cumsum[i] - cumsum[i-N])/N
#can do stuff with moving_ave here
moving_aves.append(moving_ave)
The running mean is a case of the mathematical operation of convolution. For the running mean, you slide a window along the input and compute the mean of the window’s contents. For discrete 1D signals, convolution is the same thing, except instead of the mean you compute an arbitrary linear combination, i.e. multiply each element by a corresponding coefficient and add up the results. Those coefficients, one for each position in the window, are sometimes called the convolution kernel. Now, the arithmetic mean of N values is (x_1 + x_2 + ... + x_N) / N, so the corresponding kernel is (1/N, 1/N, ..., 1/N), and that’s exactly what we get by using np.ones((N,))/N.
Edges
The mode argument of np.convolve specifies how to handle the edges. I chose the valid mode here because I think that’s how most people expect the running mean to work, but you may have other priorities. Here is a plot that illustrates the difference between the modes:
import numpy as np
import matplotlib.pyplot as plt
modes = ['full', 'same', 'valid']
for m in modes:
plt.plot(np.convolve(np.ones((200,)), np.ones((50,))/50, mode=m));
plt.axis([-10, 251, -.1, 1.1]);
plt.legend(modes, loc='lower center');
plt.show()
In[3]: x = numpy.random.random(100000)In[4]: N =1000In[5]:%timeit result1 = numpy.convolve(x, numpy.ones((N,))/N, mode='valid')10 loops, best of 3:41.4 ms per loop
In[6]:%timeit result2 = running_mean(x, N)1000 loops, best of 3:1.04 ms per loop
# demonstrate loss of precision with only 100,000 points
np.random.seed(42)
x = np.random.randn(100000)+1e6
y1 = running_mean_convolve(x,10)
y2 = running_mean_cumsum(x,10)assert np.allclose(y1, y2, rtol=1e-12, atol=0)
Convolution is much better than straightforward approach, but (I guess) it uses FFT and thus quite slow. However specially for computing the running mean the following approach works fine
In[3]: x = numpy.random.random(100000)
In[4]: N = 1000
In[5]: %timeit result1 = numpy.convolve(x, numpy.ones((N,))/N, mode='valid')
10 loops, best of 3: 41.4 ms per loop
In[6]: %timeit result2 = running_mean(x, N)
1000 loops, best of 3: 1.04 ms per loop
Note that numpy.allclose(result1, result2) is True, two methods are equivalent.
The greater N, the greater difference in time.
warning: although cumsum is faster there will be increased floating point error that may cause your results to be invalid/incorrect/unacceptable
# demonstrate loss of precision with only 100,000 points
np.random.seed(42)
x = np.random.randn(100000)+1e6
y1 = running_mean_convolve(x, 10)
y2 = running_mean_cumsum(x, 10)
assert np.allclose(y1, y2, rtol=1e-12, atol=0)
the more points you accumulate over the greater the floating point error (so 1e5 points is noticable, 1e6 points is more significant, more than 1e6 and you may want to resetting the accumulators)
you can cheat by using np.longdouble but your floating point error still will get significant for relatively large number of points (around >1e5 but depends on your data)
you can plot the error and see it increasing relatively fast
the convolve solution is slower but does not have this floating point loss of precision
the uniform_filter1d solution is faster than this cumsum solution AND does not have this floating point loss of precision
In[1]:import numpy as np
In[2]:import pandas as pd
In[3]:def running_mean(x, N):...: cumsum = np.cumsum(np.insert(x,0,0))...:return(cumsum[N:]- cumsum[:-N])/ N
...:In[4]: x = np.random.random(100000)In[5]: N =1000In[6]:%timeit np.convolve(x, np.ones((N,))/N, mode='valid')10 loops, best of 3:172 ms per loop
In[7]:%timeit running_mean(x, N)100 loops, best of 3:6.72 ms per loop
In[8]:%timeit pd.rolling_mean(x, N)[N-1:]100 loops, best of 3:4.74 ms per loop
In[9]: np.allclose(pd.rolling_mean(x, N)[N-1:], running_mean(x, N))Out[9]:True
Update: The example below shows the old pandas.rolling_mean function which has been removed in recent versions of pandas. A modern equivalent of the function call below would be
pandas is more suitable for this than NumPy or SciPy. Its function rolling_mean does the job conveniently. It also returns a NumPy array when the input is an array.
It is difficult to beat rolling_mean in performance with any custom pure Python implementation. Here is an example performance against two of the proposed solutions:
In [1]: import numpy as np
In [2]: import pandas as pd
In [3]: def running_mean(x, N):
...: cumsum = np.cumsum(np.insert(x, 0, 0))
...: return (cumsum[N:] - cumsum[:-N]) / N
...:
In [4]: x = np.random.random(100000)
In [5]: N = 1000
In [6]: %timeit np.convolve(x, np.ones((N,))/N, mode='valid')
10 loops, best of 3: 172 ms per loop
In [7]: %timeit running_mean(x, N)
100 loops, best of 3: 6.72 ms per loop
In [8]: %timeit pd.rolling_mean(x, N)[N-1:]
100 loops, best of 3: 4.74 ms per loop
In [9]: np.allclose(pd.rolling_mean(x, N)[N-1:], running_mean(x, N))
Out[9]: True
There are also nice options as to how to deal with the edge values.
回答 4
您可以使用以下方法计算运行平均值:
import numpy as np
def runningMean(x, N):
y = np.zeros((len(x),))for ctr in range(len(x)):
y[ctr]= np.sum(x[ctr:(ctr+N)])return y/N
import numpy as np
def runningMean(x, N):
y = np.zeros((len(x),))
for ctr in range(len(x)):
y[ctr] = np.sum(x[ctr:(ctr+N)])
return y/N
But it’s slow.
Fortunately, numpy includes a convolve function which we can use to speed things up. The running mean is equivalent to convolving x with a vector that is N long, with all members equal to 1/N. The numpy implementation of convolve includes the starting transient, so you have to remove the first N-1 points:
On my machine, the fast version is 20-30 times faster, depending on the length of the input vector and size of the averaging window.
Note that convolve does include a 'same' mode which seems like it should address the starting transient issue, but it splits it between the beginning and end.
回答 5
或用于计算的python模块
在Tradewave.net上进行的测试中,TA-lib总是会赢得:
import talib as ta
import numpy as np
import pandas as pd
import scipy
from scipy import signal
import time as t
PAIR = info.primary_pair
PERIOD =30def initialize():
storage.reset()
storage.elapsed = storage.get('elapsed',[0,0,0,0,0,0])def cumsum_sma(array, period):
ret = np.cumsum(array, dtype=float)
ret[period:]= ret[period:]- ret[:-period]return ret[period -1:]/ period
def pandas_sma(array, period):return pd.rolling_mean(array, period)def api_sma(array, period):# this method is native to Tradewave and does NOT return an arrayreturn(data[PAIR].ma(PERIOD))def talib_sma(array, period):return ta.MA(array, period)def convolve_sma(array, period):return np.convolve(array, np.ones((period,))/period, mode='valid')def fftconvolve_sma(array, period):return scipy.signal.fftconvolve(
array, np.ones((period,))/period, mode='valid')def tick():
close = data[PAIR].warmup_period('close')
t1 = t.time()
sma_api = api_sma(close, PERIOD)
t2 = t.time()
sma_cumsum = cumsum_sma(close, PERIOD)
t3 = t.time()
sma_pandas = pandas_sma(close, PERIOD)
t4 = t.time()
sma_talib = talib_sma(close, PERIOD)
t5 = t.time()
sma_convolve = convolve_sma(close, PERIOD)
t6 = t.time()
sma_fftconvolve = fftconvolve_sma(close, PERIOD)
t7 = t.time()
storage.elapsed[-1]= storage.elapsed[-1]+ t2-t1
storage.elapsed[-2]= storage.elapsed[-2]+ t3-t2
storage.elapsed[-3]= storage.elapsed[-3]+ t4-t3
storage.elapsed[-4]= storage.elapsed[-4]+ t5-t4
storage.elapsed[-5]= storage.elapsed[-5]+ t6-t5
storage.elapsed[-6]= storage.elapsed[-6]+ t7-t6
plot('sma_api', sma_api)
plot('sma_cumsum', sma_cumsum[-5])
plot('sma_pandas', sma_pandas[-10])
plot('sma_talib', sma_talib[-15])
plot('sma_convolve', sma_convolve[-20])
plot('sma_fftconvolve', sma_fftconvolve[-25])def stop():
log('ticks....: %s'% info.max_ticks)
log('api......: %.5f'% storage.elapsed[-1])
log('cumsum...: %.5f'% storage.elapsed[-2])
log('pandas...: %.5f'% storage.elapsed[-3])
log('talib....: %.5f'% storage.elapsed[-4])
log('convolve.: %.5f'% storage.elapsed[-5])
log('fft......: %.5f'% storage.elapsed[-6])
For a ready-to-use solution, see https://scipy-cookbook.readthedocs.io/items/SignalSmooth.html.
It provides running average with the flat window type. Note that this is a bit more sophisticated than the simple do-it-yourself convolve-method, since it tries to handle the problems at the beginning and the end of the data by reflecting it (which may or may not work in your case…).
To start with, you could try:
a = np.random.random(100)
plt.plot(a)
b = smooth(a, window='flat')
plt.plot(b)
%timeit y1 = np.convolve(x, np.ones((N,))/N, mode='same')100 loops, best of 3:9.28 ms per loop
%timeit y2 = uniform_filter1d(x, size=N)10000 loops, best of 3:191µs per loop
这是3个函数,可让您比较不同实现的错误/速度:
from __future__ import division
import numpy as np
import scipy.ndimage.filters as ndif
def running_mean_convolve(x, N):return np.convolve(x, np.ones(N)/ float(N),'valid')def running_mean_cumsum(x, N):
cumsum = np.cumsum(np.insert(x,0,0))return(cumsum[N:]- cumsum[:-N])/ float(N)def running_mean_uniform_filter1d(x, N):return ndif.uniform_filter1d(x, N, mode='constant', origin=-(N//2))[:-(N-1)]
%timeit y1 = np.convolve(x, np.ones((N,))/N, mode='same')
100 loops, best of 3: 9.28 ms per loop
%timeit y2 = uniform_filter1d(x, size=N)
10000 loops, best of 3: 191 µs per loop
here’s 3 functions that let you compare error/speed of different implementations:
### Running mean/Moving averagedef running_mean(l, N):
sum =0
result = list(0for x in l)for i in range(0, N ):
sum = sum + l[i]
result[i]= sum /(i+1)for i in range( N, len(l)):
sum = sum - l[i-N]+ l[i]
result[i]= sum / N
return result
I know this is an old question, but here is a solution that doesn’t use any extra data structures or libraries. It is linear in the number of elements of the input list and I cannot think of any other way to make it more efficient (actually if anyone knows of a better way to allocate the result, please let me know).
NOTE: this would be much faster using a numpy array instead of a list, but I wanted to eliminate all dependencies. It would also be possible to improve performance by multi-threaded execution
The function assumes that the input list is one dimensional, so be careful.
### Running mean/Moving average
def running_mean(l, N):
sum = 0
result = list( 0 for x in l)
for i in range( 0, N ):
sum = sum + l[i]
result[i] = sum / (i+1)
for i in range( N, len(l) ):
sum = sum - l[i-N] + l[i]
result[i] = sum / N
return result
Example
Assume that we have a list data = [ 1, 2, 3, 4, 5, 6 ] on which we want to compute a rolling mean with period of 3, and that you also want a output list that is the same size of the input one (that’s most often the case).
The first element has index 0, so the rolling mean should be computed on elements of index -2, -1 and 0. Obviously we don’t have data[-2] and data[-1] (unless you want to use special boundary conditions), so we assume that those elements are 0. This is equivalent to zero-padding the list, except we don’t actually pad it, just keep track of the indices that require padding (from 0 to N-1).
So, for the first N elements we just keep adding up the elements in an accumulator.
From elements N+1 forwards simple accumulation doesn’t work. we expect result[3] = (2 + 3 + 4)/3 = 3 but this is different from (sum + 4)/3 = 3.333.
The way to compute the correct value is to subtract data[0] = 1 from sum+4, thus giving sum + 4 - 1 = 9.
This happens because currently sum = data[0] + data[1] + data[2], but it is also true for every i >= N because, before the subtraction, sum is data[i-N] + ... + data[i-2] + data[i-1].
I feel this can be elegantly solved using bottleneck
See basic sample below:
import numpy as np
import bottleneck as bn
a = np.random.randint(4, 1000, size=100)
mm = bn.move_mean(a, window=5, min_count=1)
“mm” is the moving mean for “a”.
“window” is the max number of entries to consider for moving mean.
“min_count” is min number of entries to consider for moving mean (e.g. for first few elements or if the array has nan values).
The good part is Bottleneck helps to deal with nan values and it’s also very efficient.
回答 10
我尚未检查这有多快,但是您可以尝试:
from collections import deque
cache = deque()# keep track of seen values
n =10# window size
A = xrange(100)# some dummy iterable
cum_sum =0# initialize cumulative sumfor t, val in enumerate(A,1):
cache.append(val)
cum_sum += val
if t < n:
avg = cum_sum / float(t)else:# if window is saturated,
cum_sum -= cache.popleft()# subtract oldest value
avg = cum_sum / float(n)
I haven’t yet checked how fast this is, but you could try:
from collections import deque
cache = deque() # keep track of seen values
n = 10 # window size
A = xrange(100) # some dummy iterable
cum_sum = 0 # initialize cumulative sum
for t, val in enumerate(A, 1):
cache.append(val)
cum_sum += val
if t < n:
avg = cum_sum / float(t)
else: # if window is saturated,
cum_sum -= cache.popleft() # subtract oldest value
avg = cum_sum / float(n)
from collections import deque
from itertools import islice
def sliding_avg(iterable, N):
it = iter(iterable)
window = deque(islice(it, N))
num_vals = len(window)if num_vals < N:
msg ='window size {} exceeds total number of values {}'raiseValueError(msg.format(N, num_vals))
N = float(N)# force floating point division if using Python 2
s = sum(window)whileTrue:yield s/N
try:
nxt = next(it)exceptStopIteration:break
s = s - window.popleft()+ nxt
window.append(nxt)
This is a memory efficient Python 3.2+ solution computing the running average over an iterable of values by leveraging itertools.accumulate.
>>> from itertools import accumulate
>>> values = range(100)
Note that values can be any iterable, including generators or any other object that produces values on the fly.
First, lazily construct the cumulative sum of the values.
>>> cumu_sum = accumulate(value_stream)
Next, enumerate the cumulative sum (starting at 1) and construct a generator that yields the fraction of accumulated values and the current enumeration index.
>>> rolling_avg = (accu/i for i, accu in enumerate(cumu_sum, 1))
You can issue means = list(rolling_avg) if you need all the values in memory at once or call next incrementally.
(Of course, you can also iterate over rolling_avg with a for loop, which will call next implicitly.)
This solution can be written as a function as follows.
from itertools import accumulate
def rolling_avg(iterable):
cumu_sum = accumulate(iterable)
yield from (accu/i for i, accu in enumerate(cumu_sum, 1))
A coroutine to which you can send values at any time
This coroutine consumes values you send it and keeps a running average of the values seen so far.
It is useful when you don’t have an iterable of values but aquire the values to be averaged one by one at different times throughout your program’s life.
def rolling_avg_coro():
i = 0
total = 0.0
avg = None
while True:
next_value = yield avg
i += 1
total += next_value
avg = total/i
Computing the average over a sliding window of size N
This generator-function takes an iterable and a window size N and yields the average over the current values inside the window. It uses a deque, which is a datastructure similar to a list, but optimized for fast modifications (pop, append) at both endpoints.
from collections import deque
from itertools import islice
def sliding_avg(iterable, N):
it = iter(iterable)
window = deque(islice(it, N))
num_vals = len(window)
if num_vals < N:
msg = 'window size {} exceeds total number of values {}'
raise ValueError(msg.format(N, num_vals))
N = float(N) # force floating point division if using Python 2
s = sum(window)
while True:
yield s/N
try:
nxt = next(it)
except StopIteration:
break
s = s - window.popleft() + nxt
window.append(nxt)
def running_mean(y_in, x_in, N_out=101, sigma=1):'''
Returns running mean as a Bell-curve weighted average at evenly spaced
points. Does NOT wrap signal around, or pad with zeros.
Arguments:
y_in -- y values, the values to be smoothed and re-sampled
x_in -- x values for array
Keyword arguments:
N_out -- NoOf elements in resampled array.
sigma -- 'Width' of Bell-curve in units of param x .
'''
N_in = size(y_in)# Gaussian kernel
x_out = np.linspace(np.min(x_in), np.max(x_in), N_out)
x_in_mesh, x_out_mesh = np.meshgrid(x_in, x_out)
gauss_kernel = np.exp(-np.square(x_in_mesh - x_out_mesh)/(2* sigma**2))# Normalize kernel, such that the sum is one along axis 1
normalization = np.tile(np.reshape(sum(gauss_kernel, axis=1),(N_out,1)),(1, N_in))
gauss_kernel_normalized = gauss_kernel / normalization
# Perform running average as a linear operation
y_out = gauss_kernel_normalized @ y_in
return y_out, x_out
A bit late to the party, but I’ve made my own little function that does NOT wrap around the ends or pads with zeroes that are then used to find the average as well. As a further treat is, that it also re-samples the signal at linearly spaced points. Customize the code at will to get other features.
The method is a simple matrix multiplication with a normalized Gaussian kernel.
def running_mean(y_in, x_in, N_out=101, sigma=1):
'''
Returns running mean as a Bell-curve weighted average at evenly spaced
points. Does NOT wrap signal around, or pad with zeros.
Arguments:
y_in -- y values, the values to be smoothed and re-sampled
x_in -- x values for array
Keyword arguments:
N_out -- NoOf elements in resampled array.
sigma -- 'Width' of Bell-curve in units of param x .
'''
N_in = size(y_in)
# Gaussian kernel
x_out = np.linspace(np.min(x_in), np.max(x_in), N_out)
x_in_mesh, x_out_mesh = np.meshgrid(x_in, x_out)
gauss_kernel = np.exp(-np.square(x_in_mesh - x_out_mesh) / (2 * sigma**2))
# Normalize kernel, such that the sum is one along axis 1
normalization = np.tile(np.reshape(sum(gauss_kernel, axis=1), (N_out, 1)), (1, N_in))
gauss_kernel_normalized = gauss_kernel / normalization
# Perform running average as a linear operation
y_out = gauss_kernel_normalized @ y_in
return y_out, x_out
A simple usage on a sinusoidal signal with added normal distributed noise:
Instead of numpy or scipy, I would recommend pandas to do this more swiftly:
df['data'].rolling(3).mean()
This takes the moving average (MA) of 3 periods of the column “data”. You can also calculate the shifted versions, for example the one that excludes the current cell (shifted one back) can be calculated easily as:
There is a comment by mab buried in one of the answers above which has this method. bottleneck has move_mean which is a simple moving average:
import numpy as np
import bottleneck as bn
a = np.arange(10) + np.random.random(10)
mva = bn.move_mean(a, window=2, min_count=1)
min_count is a handy parameter that will basically take the moving average up to that point in your array. If you don’t set min_count, it will equal window, and everything up to window points will be nan.
import numpy as np
def running_mean(l, N):# Also works for the(strictly invalid) cases when N is even.if(N//2)*2== N:
N = N -1
front = np.zeros(N//2)
back = np.zeros(N//2)for i in range(1,(N//2)*2,2):
front[i//2]= np.convolve(l[:i], np.ones((i,))/i, mode ='valid')for i in range(1,(N//2)*2,2):
back[i//2]= np.convolve(l[-i:], np.ones((i,))/i, mode ='valid')return np.concatenate([front, np.convolve(l, np.ones((N,))/N, mode ='valid'), back[::-1]])
This question is now even older than when NeXuS wrote about it last month, BUT I like how his code deals with edge cases. However, because it is a “simple moving average,” its results lag behind the data they apply to. I thought that dealing with edge cases in a more satisfying way than NumPy’s modes valid, same, and full could be achieved by applying a similar approach to a convolution() based method.
My contribution uses a central running average to align its results with their data. When there are too few points available for the full-sized window to be used, running averages are computed from successively smaller windows at the edges of the array. [Actually, from successively larger windows, but that’s an implementation detail.]
import numpy as np
def running_mean(l, N):
# Also works for the(strictly invalid) cases when N is even.
if (N//2)*2 == N:
N = N - 1
front = np.zeros(N//2)
back = np.zeros(N//2)
for i in range(1, (N//2)*2, 2):
front[i//2] = np.convolve(l[:i], np.ones((i,))/i, mode = 'valid')
for i in range(1, (N//2)*2, 2):
back[i//2] = np.convolve(l[-i:], np.ones((i,))/i, mode = 'valid')
return np.concatenate([front, np.convolve(l, np.ones((N,))/N, mode = 'valid'), back[::-1]])
It’s relatively slow because it uses convolve(), and could likely be spruced up quite a lot by a true Pythonista, however, I believe that the idea stands.
N=10# number of points to test on each side of point of interest, best if even
padded_x = np.insert(np.insert( np.insert(x, len(x), np.empty(int(N/2))*np.nan),0, np.empty(int(N/2))*np.nan ),0,0)
n_nan = np.cumsum(np.isnan(padded_x))
cumsum = np.nancumsum(padded_x)
window_sum = cumsum[N+1:]- cumsum[:-(N+1)]- x # subtract value of interest from sum of all values within window
window_n_nan = n_nan[N+1:]- n_nan[:-(N+1)]- np.isnan(x)
window_n_values =(N - window_n_nan)
movavg =(window_sum)/(window_n_values)
此代码仅适用于Ns。可以通过更改padded_x和n_nan的np.insert来调整奇数。
输出示例(黑色为原始,蓝色为movavg):
可以轻松地修改此代码,以删除从少于cutoff = 3个非nan值计算出的所有移动平均值。
window_n_values =(N - window_n_nan).astype(float)# dtype must be float to set some values to nan
cutoff =3
window_n_values[window_n_values<cutoff]= np.nan
movavg =(window_sum)/(window_n_values)
N=10 # number of points to test on each side of point of interest, best if even
padded_x = np.insert(np.insert( np.insert(x, len(x), np.empty(int(N/2))*np.nan), 0, np.empty(int(N/2))*np.nan ),0,0)
n_nan = np.cumsum(np.isnan(padded_x))
cumsum = np.nancumsum(padded_x)
window_sum = cumsum[N+1:] - cumsum[:-(N+1)] - x # subtract value of interest from sum of all values within window
window_n_nan = n_nan[N+1:] - n_nan[:-(N+1)] - np.isnan(x)
window_n_values = (N - window_n_nan)
movavg = (window_sum) / (window_n_values)
This code works for even Ns only. It can be adjusted for odd numbers by changing the np.insert of padded_x and n_nan.
Example output (raw in black, movavg in blue):
This code can be easily adapted to remove all moving average values calculated from fewer than cutoff = 3 non-nan values.
window_n_values = (N - window_n_nan).astype(float) # dtype must be float to set some values to nan
cutoff = 3
window_n_values[window_n_values<cutoff] = np.nan
movavg = (window_sum) / (window_n_values)
回答 18
仅使用Python标准库(高效存储)
仅给出使用标准库的另一个版本deque。令我惊讶的是,大多数答案都使用pandas或numpy。
def moving_average(iterable, n=3):
d = deque(maxlen=n)for i in iterable:
d.append(i)if len(d)== n:yield sum(d)/n
r = moving_average([40,30,50,46,39,44])assert list(r)==[40.0,42.0,45.0,43.0]
def moving_average(iterable, n=3):
# moving_average([40, 30, 50, 46, 39, 44]) --> 40.0 42.0 45.0 43.0
# http://en.wikipedia.org/wiki/Moving_average
it = iter(iterable)
d = deque(itertools.islice(it, n-1))
d.appendleft(0)
s = sum(d)
for elem in it:
s += elem - d.popleft()
d.append(elem)
yield s / n
However the implementation seems to me is a bit more complex than it should be. But it must be in the standard python docs for a reason, could someone comment on the implementation of mine and the standard doc?
回答 19
使用@Aikude的变量,我编写了单行代码。
import numpy as np
mylist =[1,2,3,4,5,6,7]
N =3
mean =[np.mean(mylist[x:x+N])for x in range(len(mylist)-N+1)]print(mean)>>>[2.0,3.0,4.0,5.0,6.0]
import numpy as np
mylist = [1, 2, 3, 4, 5, 6, 7]
N = 3
mean = [np.mean(mylist[x:x+N]) for x in range(len(mylist)-N+1)]
print(mean)
>>> [2.0, 3.0, 4.0, 5.0, 6.0]
回答 20
尽管这里有针对此问题的解决方案,但请查看我的解决方案。它非常简单并且运行良好。
import numpy as np
dataset = np.asarray([1,2,3,4,5,6,7])
ma = list()
window =3for t in range(0, len(dataset)):if t+window <= len(dataset):
indices = range(t, t+window)
ma.append(np.average(np.take(dataset, indices)))else:
ma = np.asarray(ma)
Although there are solutions for this question here, please take a look at my solution. It is very simple and working well.
import numpy as np
dataset = np.asarray([1, 2, 3, 4, 5, 6, 7])
ma = list()
window = 3
for t in range(0, len(dataset)):
if t+window <= len(dataset):
indices = range(t, t+window)
ma.append(np.average(np.take(dataset, indices)))
else:
ma = np.asarray(ma)
classRunning_Average(object):def __init__(self, buffer_size=10):"""
Create a new Running_Average object.
This object allows the efficient calculation of the average of the last
`buffer_size` numbers added to it.
Examples
--------
>>> a = Running_Average(2)
>>> a.add(1)
>>> a.get()
1.0
>>> a.add(1) # there are two 1 in buffer
>>> a.get()
1.0
>>> a.add(2) # there's a 1 and a 2 in the buffer
>>> a.get()
1.5
>>> a.add(2)
>>> a.get() # now there's only two 2 in the buffer
2.0
"""
self._buffer_size = int(buffer_size)# make sure it's an int
self.reset()def add(self, new):"""
Add a new number to the buffer, or replaces the oldest one there.
"""
new = float(new)# make sure it's a float
n = len(self._buffer)if n < self.buffer_size:# still have to had numbers to the buffer.
self._buffer.append(new)if self._average != self._average:# ~ if isNaN().
self._average = new # no previous numbers, so it's new.else:
self._average *= n # so it's only the sum of numbers.
self._average += new # add new number.
self._average /=(n+1)# divide by new number of numbers.else:# buffer full, replace oldest value.
old = self._buffer[self._index]# the previous oldest number.
self._buffer[self._index]= new # replace with new one.
self._index +=1# update the index and make sure it's...
self._index %= self.buffer_size # ... smaller than buffer_size.
self._average -= old/self.buffer_size # remove old one...
self._average += new/self.buffer_size # ...and add new one...# ... weighted by the number of elements.def __call__(self):"""
Return the moving average value, for the lazy ones who don't want
to write .get .
"""return self._average
def get(self):"""
Return the moving average value.
"""return self()def reset(self):"""
Reset the moving average.
If for some reason you don't want to just create a new one.
"""
self._buffer =[]# could use np.empty(self.buffer_size)...
self._index =0# and use this to keep track of how many numbers.
self._average = float('nan')# could use np.NaN .def get_buffer_size(self):"""
Return current buffer_size.
"""return self._buffer_size
def set_buffer_size(self, buffer_size):"""
>>> a = Running_Average(10)
>>> for i in range(15):
... a.add(i)
...
>>> a()
9.5
>>> a._buffer # should not access this!!
[10.0, 11.0, 12.0, 13.0, 14.0, 5.0, 6.0, 7.0, 8.0, 9.0]
Decreasing buffer size:
>>> a.buffer_size = 6
>>> a._buffer # should not access this!!
[9.0, 10.0, 11.0, 12.0, 13.0, 14.0]
>>> a.buffer_size = 2
>>> a._buffer
[13.0, 14.0]
Increasing buffer size:
>>> a.buffer_size = 5
Warning: no older data available!
>>> a._buffer
[13.0, 14.0]
Keeping buffer size:
>>> a = Running_Average(10)
>>> for i in range(15):
... a.add(i)
...
>>> a()
9.5
>>> a._buffer # should not access this!!
[10.0, 11.0, 12.0, 13.0, 14.0, 5.0, 6.0, 7.0, 8.0, 9.0]
>>> a.buffer_size = 10 # reorders buffer!
>>> a._buffer
[5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0]
"""
buffer_size = int(buffer_size)# order the buffer so index is zero again:
new_buffer = self._buffer[self._index:]
new_buffer.extend(self._buffer[:self._index])
self._index =0if self._buffer_size < buffer_size:print('Warning: no older data available!')# should use Warnings!else:
diff = self._buffer_size - buffer_size
print(diff)
new_buffer = new_buffer[diff:]
self._buffer_size = buffer_size
self._buffer = new_buffer
buffer_size = property(get_buffer_size, set_buffer_size)
From reading the other answers I don’t think this is what the question asked for, but I got here with the need of keeping a running average of a list of values that was growing in size.
So if you want to keep a list of values that you are acquiring from somewhere (a site, a measuring device, etc.) and the average of the last n values updated, you can use the code bellow, that minimizes the effort of adding new elements:
class Running_Average(object):
def __init__(self, buffer_size=10):
"""
Create a new Running_Average object.
This object allows the efficient calculation of the average of the last
`buffer_size` numbers added to it.
Examples
--------
>>> a = Running_Average(2)
>>> a.add(1)
>>> a.get()
1.0
>>> a.add(1) # there are two 1 in buffer
>>> a.get()
1.0
>>> a.add(2) # there's a 1 and a 2 in the buffer
>>> a.get()
1.5
>>> a.add(2)
>>> a.get() # now there's only two 2 in the buffer
2.0
"""
self._buffer_size = int(buffer_size) # make sure it's an int
self.reset()
def add(self, new):
"""
Add a new number to the buffer, or replaces the oldest one there.
"""
new = float(new) # make sure it's a float
n = len(self._buffer)
if n < self.buffer_size: # still have to had numbers to the buffer.
self._buffer.append(new)
if self._average != self._average: # ~ if isNaN().
self._average = new # no previous numbers, so it's new.
else:
self._average *= n # so it's only the sum of numbers.
self._average += new # add new number.
self._average /= (n+1) # divide by new number of numbers.
else: # buffer full, replace oldest value.
old = self._buffer[self._index] # the previous oldest number.
self._buffer[self._index] = new # replace with new one.
self._index += 1 # update the index and make sure it's...
self._index %= self.buffer_size # ... smaller than buffer_size.
self._average -= old/self.buffer_size # remove old one...
self._average += new/self.buffer_size # ...and add new one...
# ... weighted by the number of elements.
def __call__(self):
"""
Return the moving average value, for the lazy ones who don't want
to write .get .
"""
return self._average
def get(self):
"""
Return the moving average value.
"""
return self()
def reset(self):
"""
Reset the moving average.
If for some reason you don't want to just create a new one.
"""
self._buffer = [] # could use np.empty(self.buffer_size)...
self._index = 0 # and use this to keep track of how many numbers.
self._average = float('nan') # could use np.NaN .
def get_buffer_size(self):
"""
Return current buffer_size.
"""
return self._buffer_size
def set_buffer_size(self, buffer_size):
"""
>>> a = Running_Average(10)
>>> for i in range(15):
... a.add(i)
...
>>> a()
9.5
>>> a._buffer # should not access this!!
[10.0, 11.0, 12.0, 13.0, 14.0, 5.0, 6.0, 7.0, 8.0, 9.0]
Decreasing buffer size:
>>> a.buffer_size = 6
>>> a._buffer # should not access this!!
[9.0, 10.0, 11.0, 12.0, 13.0, 14.0]
>>> a.buffer_size = 2
>>> a._buffer
[13.0, 14.0]
Increasing buffer size:
>>> a.buffer_size = 5
Warning: no older data available!
>>> a._buffer
[13.0, 14.0]
Keeping buffer size:
>>> a = Running_Average(10)
>>> for i in range(15):
... a.add(i)
...
>>> a()
9.5
>>> a._buffer # should not access this!!
[10.0, 11.0, 12.0, 13.0, 14.0, 5.0, 6.0, 7.0, 8.0, 9.0]
>>> a.buffer_size = 10 # reorders buffer!
>>> a._buffer
[5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0]
"""
buffer_size = int(buffer_size)
# order the buffer so index is zero again:
new_buffer = self._buffer[self._index:]
new_buffer.extend(self._buffer[:self._index])
self._index = 0
if self._buffer_size < buffer_size:
print('Warning: no older data available!') # should use Warnings!
else:
diff = self._buffer_size - buffer_size
print(diff)
new_buffer = new_buffer[diff:]
self._buffer_size = buffer_size
self._buffer = new_buffer
buffer_size = property(get_buffer_size, set_buffer_size)
And you can test it with, for example:
def graph_test(N=200):
import matplotlib.pyplot as plt
values = list(range(N))
values_average_calculator = Running_Average(N/2)
values_averages = []
for value in values:
values_average_calculator.add(value)
values_averages.append(values_average_calculator())
fig, ax = plt.subplots(1, 1)
ax.plot(values, label='values')
ax.plot(values_averages, label='averages')
ax.grid()
ax.set_xlim(0, N)
ax.set_ylim(0, N)
fig.show()
Which gives:
回答 22
另一个使用标准库和双端队列的解决方案:
from collections import deque
import itertools
def moving_average(iterable, n=3):# http://en.wikipedia.org/wiki/Moving_average
it = iter(iterable)# create an iterable object from input argument
d = deque(itertools.islice(it, n-1))# create deque object by slicing iterable
d.appendleft(0)
s = sum(d)for elem in it:
s += elem - d.popleft()
d.append(elem)yield s / n
# example on how to use itfor i in moving_average([40,30,50,46,39,44]):print(i)# 40.0# 42.0# 45.0# 43.0
Another solution just using a standard library and deque:
from collections import deque
import itertools
def moving_average(iterable, n=3):
# http://en.wikipedia.org/wiki/Moving_average
it = iter(iterable)
# create an iterable object from input argument
d = deque(itertools.islice(it, n-1))
# create deque object by slicing iterable
d.appendleft(0)
s = sum(d)
for elem in it:
s += elem - d.popleft()
d.append(elem)
yield s / n
# example on how to use it
for i in moving_average([40, 30, 50, 46, 39, 44]):
print(i)
# 40.0
# 42.0
# 45.0
# 43.0
回答 23
出于教育目的,让我添加另外两个Numpy解决方案(比cumsum解决方案要慢):
import numpy as np
from numpy.lib.stride_tricks import as_strided
def ra_strides(arr, window):''' Running average using as_strided'''
n = arr.shape[0]- window +1
arr_strided = as_strided(arr, shape=[n, window], strides=2*arr.strides)return arr_strided.mean(axis=1)def ra_add(arr, window):''' Running average using add.reduceat'''
n = arr.shape[0]- window +1
indices = np.array([0, window]*n)+ np.repeat(np.arange(n),2)
arr = np.append(arr,0)return np.add.reduceat(arr, indices )[::2]/window
All the aforementioned solutions are poor because they lack
speed due to a native python instead of a numpy vectorized implementation,
numerical stability due to poor use of numpy.cumsum, or
speed due to O(len(x) * w) implementations as convolutions.
Given
import numpy
m = 10000
x = numpy.random.rand(m)
w = 1000
Note that x_[:w].sum() equals x[:w-1].sum(). So for the first average the numpy.cumsum(...) adds x[w] / w (via x_[w+1] / w), and subtracts 0 (from x_[0] / w). This results in x[0:w].mean()
Via cumsum, you’ll update the second average by additionally add x[w+1] / w and subtracting x[0] / w, resulting in x[1:w+1].mean().
This goes on until x[-w:].mean() is reached.
x_ = numpy.insert(x, 0, 0)
sliding_average = x_[:w].sum() / w + numpy.cumsum(x_[w:] - x_[:-w]) / w
This solution is vectorized, O(m), readable and numerically stable.
How about a moving average filter? It is also a one-liner and has the advantage, that you can easily manipulate the window type if you need something else than the rectangle, ie. a N-long simple moving average of an array a:
If you do choose to roll your own, rather than use an existing library, please be conscious of floating point error and try to minimize its effects:
class SumAccumulator:
def __init__(self):
self.values = [0]
self.count = 0
def add( self, val ):
self.values.append( val )
self.count = self.count + 1
i = self.count
while i & 0x01:
i = i >> 1
v0 = self.values.pop()
v1 = self.values.pop()
self.values.append( v0 + v1 )
def get_total(self):
return sum( reversed(self.values) )
def get_size( self ):
return self.count
If all your values are roughly the same order of magnitude, then this will help to preserve precision by always adding values of roughly similar magnitudes.