python教程—如何使用Spark查找中位数和分位数-Python实用宝典

python教程—如何使用Spark查找中位数和分位数

如何使用分布式方法、IPython和Spark找到整数的RDD的中值?RDD大约有700,000个元素,因此太大而无法收集和找到中间值。

如何使用分布式方法、IPython和Spark找到整数的RDD的中值?RDD大约有700,000个元素,因此太大而无法收集和找到中间值。

这个问题和这个问题相似。然而,这个问题的答案是使用Scala,我不知道。

使用Scala答案的思想,我尝试用Python编写类似的答案。

我知道我首先要对RDD排序。我不知道该怎么做。我看到sortBy(根据给定的keyfunc对这个RDD排序)和sortByKey(对这个RDD排序,假定它由(key, value)对组成)方法。我认为两者都使用键值,而我的RDD只有整数元素。

  1. 首先,我想做myrdd。sortBy(λx, x) ?
  2. 接下来,我将找到rdd (rdd.count())的长度。
  3. 最后,我想找到位于rdd中心的元素或两个元素。我也需要这个方法的帮助。

编辑:

我有个主意。也许我可以索引我的RDD,然后key = index和value = element。然后我可以试着按值排序?我不知道这是否可能,因为只有sortByKey方法。

回答

火花2.0 +:

您可以使用approxQuantile方法实现Greenwald-Khanna算法:

Python <强> < /强>:

    df.approxQuantile("x", [0.5], 0.25)

Scala <强> < /强>:

    df.stat.approxQuantile("x", Array(0.5), 0.25)

其中最后一个参数是相对误差。数值越低,计算结果越精确,计算费用越高。

由于Spark 2.2 ( Spark -14352)支持多列估计:

    df.approxQuantile(["x", "y", "z"], [0.5], 0.25)

    df.approxQuantile(Array("x", "y", "z"), Array(0.5), 0.25)

火花& lt;2.0

Python <强> < /强>

正如我在评论中提到的,这很可能不值得大惊小怪。如果数据相对较小,比如你的情况,那么只需在本地收集和计算中位数:

    import numpy as np np.random.seed(323) rdd = sc.parallelize(np.random.randint(1000000, size=700000)) %time np.median(rdd.collect()) np.array(rdd.collect()).nbytes

在我那台用了几年的旧电脑上,它大约只需要0.01秒,内存大约5.5MB。

如果数据要大得多,排序将是一个限制因素,因此与其获得确切的值,不如在本地采样、收集和计算。但如果你真的想让a使用Spark这样的东西,应该会奏效(如果我没有搞砸任何事情):

    from numpy import floor import time def quantile(rdd, p, sample=None, seed=None): """Compute a quantile of order p ∈ [0, 1] :rdd a numeric rdd :p quantile(between 0 and 1) :sample fraction of and rdd to use. If not provided we use a whole dataset :seed random number generator seed to be used with sample """ assert 0 <= p <= 1 assert sample is None or 0 < sample <= 1 seed = seed if seed is not None else time.time() rdd = rdd if sample is None else rdd.sample(False, sample, seed) rddSortedWithIndex = (rdd. sortBy(lambda x: x). zipWithIndex(). map(lambda (x, i): (i, x)). cache()) n = rddSortedWithIndex.count() h = (n - 1) * p rddX, rddXPlusOne = ( rddSortedWithIndex.lookup(x)[0] for x in int(floor(h)) + np.array([0L, 1L])) return rddX + (h - floor(h)) * (rddXPlusOne - rddX)

和一些测试:

    np.median(rdd.collect()), quantile(rdd, 0.5) ## (500184.5, 500184.5) np.percentile(rdd.collect(), 25), quantile(rdd, 0.25) ## (250506.75, 250506.75) np.percentile(rdd.collect(), 75), quantile(rdd, 0.75) (750069.25, 750069.25)

最后定义中值:

    from functools import partial median = partial(quantile, p=0.5)

到目前为止还不错,但是在没有任何网络通信的本地模式下需要4.66 s。也许有办法改善这一点,但为什么还要麻烦呢?

<强>语言无关 (Hive UDAF):

如果使用HiveContext,还可以使用Hive UDAFs。用积分值:

    rdd.map(lambda x: (float(x), )).toDF(["x"]).registerTempTable("df") sqlContext.sql("SELECT percentile_approx(x, 0.5) FROM df")

连续值:

    sqlContext.sql("SELECT percentile(x, 0.5) FROM df")

在percentile_approx中,您可以传递一个额外的参数,该参数确定要使用的记录数量。

​Python实用宝典 (pythondict.com)
不只是一个宝典
欢迎关注公众号:Python实用宝典

本文由 Python实用宝典 作者:Python实用宝典 发表,其版权均为 Python实用宝典 所有,文章内容系作者个人观点,不代表 Python实用宝典 对观点赞同或支持。如需转载,请注明文章来源。
0

发表评论