问题:将Spark DataFrame列转换为python列表

我在具有两列mvv和count的数据帧上工作。

+---+-----+
|mvv|count|
+---+-----+
| 1 |  5  |
| 2 |  9  |
| 3 |  3  |
| 4 |  1  |

我想获得两个包含mvv值和计数值的列表。就像是

mvv = [1,2,3,4]
count = [5,9,3,1]

因此,我尝试了以下代码:第一行应返回python行列表。我想看第一个值:

mvv_list = mvv_count_df.select('mvv').collect()
firstvalue = mvv_list[0].getInt(0)

但是我在第二行收到一条错误消息:

AttributeError:getInt

I work on a dataframe with two column, mvv and count.

+---+-----+
|mvv|count|
+---+-----+
| 1 |  5  |
| 2 |  9  |
| 3 |  3  |
| 4 |  1  |

i would like to obtain two list containing mvv values and count value. Something like

mvv = [1,2,3,4]
count = [5,9,3,1]

So, I tried the following code: The first line should return a python list of row. I wanted to see the first value:

mvv_list = mvv_count_df.select('mvv').collect()
firstvalue = mvv_list[0].getInt(0)

But I get an error message with the second line:

AttributeError: getInt


回答 0

明白了,为什么这种方式无法正常工作。首先,您尝试从类型获取整数,collect的输出如下所示:

>>> mvv_list = mvv_count_df.select('mvv').collect()
>>> mvv_list[0]
Out: Row(mvv=1)

如果您采取这样的做法:

>>> firstvalue = mvv_list[0].mvv
Out: 1

您将获得mvv价值。如果您需要数组的所有信息,则可以采取以下方法:

>>> mvv_array = [int(row.mvv) for row in mvv_list.collect()]
>>> mvv_array
Out: [1,2,3,4]

但是,如果对另一列尝试相同的操作,则会得到:

>>> mvv_count = [int(row.count) for row in mvv_list.collect()]
Out: TypeError: int() argument must be a string or a number, not 'builtin_function_or_method'

发生这种情况是因为它count是一种内置方法。并且该列的名称与相同count。一种解决方法是将列名称更改count_count

>>> mvv_list = mvv_list.selectExpr("mvv as mvv", "count as _count")
>>> mvv_count = [int(row._count) for row in mvv_list.collect()]

但是不需要此解决方法,因为您可以使用字典语法访问列:

>>> mvv_array = [int(row['mvv']) for row in mvv_list.collect()]
>>> mvv_count = [int(row['count']) for row in mvv_list.collect()]

它将最终成功!

See, why this way that you are doing is not working. First, you are trying to get integer from a Row Type, the output of your collect is like this:

>>> mvv_list = mvv_count_df.select('mvv').collect()
>>> mvv_list[0]
Out: Row(mvv=1)

If you take something like this:

>>> firstvalue = mvv_list[0].mvv
Out: 1

You will get the mvv value. If you want all the information of the array you can take something like this:

>>> mvv_array = [int(row.mvv) for row in mvv_list.collect()]
>>> mvv_array
Out: [1,2,3,4]

But if you try the same for the other column, you get:

>>> mvv_count = [int(row.count) for row in mvv_list.collect()]
Out: TypeError: int() argument must be a string or a number, not 'builtin_function_or_method'

This happens because count is a built-in method. And the column has the same name as count. A workaround to do this is change the column name of count to _count:

>>> mvv_list = mvv_list.selectExpr("mvv as mvv", "count as _count")
>>> mvv_count = [int(row._count) for row in mvv_list.collect()]

But this workaround is not needed, as you can access the column using the dictionary syntax:

>>> mvv_array = [int(row['mvv']) for row in mvv_list.collect()]
>>> mvv_count = [int(row['count']) for row in mvv_list.collect()]

And it will finally work!


回答 1

紧随其后的是一支衬垫,列出了您想要的清单。

mvv = mvv_count_df.select("mvv").rdd.flatMap(lambda x: x).collect()

Following one liner gives the list you want.

mvv = mvv_count_df.select("mvv").rdd.flatMap(lambda x: x).collect()

回答 2

这将为您提供所有元素作为列表。

mvv_list = list(
    mvv_count_df.select('mvv').toPandas()['mvv']
)

This will give you all the elements as a list.

mvv_list = list(
    mvv_count_df.select('mvv').toPandas()['mvv']
)

回答 3

以下代码将为您提供帮助

mvv_count_df.select('mvv').rdd.map(lambda row : row[0]).collect()

The following code will help you

mvv_count_df.select('mvv').rdd.map(lambda row : row[0]).collect()

回答 4

根据我的数据,我得到了这些基准:

>>> data.select(col).rdd.flatMap(lambda x: x).collect()

0.52秒

>>> [row[col] for row in data.collect()]

0.271秒

>>> list(data.select(col).toPandas()[col])

0.427秒

结果是一样的

On my data I got these benchmarks:

>>> data.select(col).rdd.flatMap(lambda x: x).collect()

0.52 sec

>>> [row[col] for row in data.collect()]

0.271 sec

>>> list(data.select(col).toPandas()[col])

0.427 sec

The result is the same


回答 5

如果出现以下错误:

AttributeError:“列表”对象没有属性“收集”

此代码将解决您的问题:

mvv_list = mvv_count_df.select('mvv').collect()

mvv_array = [int(i.mvv) for i in mvv_list]

If you get the error below :

AttributeError: ‘list’ object has no attribute ‘collect’

This code will solve your issues :

mvv_list = mvv_count_df.select('mvv').collect()

mvv_array = [int(i.mvv) for i in mvv_list]

回答 6

我进行了基准分析,这list(mvv_count_df.select('mvv').toPandas()['mvv'])是最快的方法。我很惊讶

我使用5个节点的i3.xlarge集群(每个节点具有30.5 GB的RAM和4个内核)和Spark 2.4.5对10万/亿个行数据集运行了不同的方法。数据以单列均匀分布在20个快速压缩的Parquet文件中。

这是基准测试结果(运行时间以秒为单位):

+-------------------------------------------------------------+---------+-------------+
|                          Code                               | 100,000 | 100,000,000 |
+-------------------------------------------------------------+---------+-------------+
| df.select("col_name").rdd.flatMap(lambda x: x).collect()    |     0.4 | 55.3        |
| list(df.select('col_name').toPandas()['col_name'])          |     0.4 | 17.5        |
| df.select('col_name').rdd.map(lambda row : row[0]).collect()|     0.9 | 69          |
| [row[0] for row in df.select('col_name').collect()]         |     1.0 | OOM         |
| [r[0] for r in mid_df.select('col_name').toLocalIterator()] |     1.2 | *           |
+-------------------------------------------------------------+---------+-------------+

* cancelled after 800 seconds

在驱动程序节点上收集数据时要遵循的黄金法则:

  • 尝试用其他方法解决问题。将数据收集到驱动程序节点非常昂贵,无法利用Spark集群的功能,因此应尽可能避免。
  • 收集尽可能少的行。在收集数据之前,对列进行聚合,重复数据删除,过滤和修剪。尽可能少地将数据发送到驱动程序节点。

toPandas 在Spark 2.3中得到了显着改进。如果您使用的Spark版本早于2.3,则可能不是最佳方法。

有关更多详细信息/基准测试结果,请参见此处

I ran a benchmarking analysis and list(mvv_count_df.select('mvv').toPandas()['mvv']) is the fastest method. I’m very surprised.

I ran the different approaches on 100 thousand / 100 million row datasets using a 5 node i3.xlarge cluster (each node has 30.5 GBs of RAM and 4 cores) with Spark 2.4.5. Data was evenly distributed on 20 snappy compressed Parquet files with a single column.

Here’s the benchmarking results (runtimes in seconds):

+-------------------------------------------------------------+---------+-------------+
|                          Code                               | 100,000 | 100,000,000 |
+-------------------------------------------------------------+---------+-------------+
| df.select("col_name").rdd.flatMap(lambda x: x).collect()    |     0.4 | 55.3        |
| list(df.select('col_name').toPandas()['col_name'])          |     0.4 | 17.5        |
| df.select('col_name').rdd.map(lambda row : row[0]).collect()|     0.9 | 69          |
| [row[0] for row in df.select('col_name').collect()]         |     1.0 | OOM         |
| [r[0] for r in mid_df.select('col_name').toLocalIterator()] |     1.2 | *           |
+-------------------------------------------------------------+---------+-------------+

* cancelled after 800 seconds

Golden rules to follow when collecting data on the driver node:

  • Try to solve the problem with other approaches. Collecting data to the driver node is expensive, doesn’t harness the power of the Spark cluster, and should be avoided whenever possible.
  • Collect as few rows as possible. Aggregate, deduplicate, filter, and prune columns before collecting the data. Send as little data to the driver node as you can.

toPandas was significantly improved in Spark 2.3. It’s probably not the best approach if you’re using a Spark version earlier than 2.3.

See here for more details / benchmarking results.


回答 7

可能的解决方案是使用中的collect_list()功能pyspark.sql.functions。这会将所有列值聚合到一个pyspark数组中,该数组在收集时将转换为python列表:

mvv_list   = df.select(collect_list("mvv")).collect()[0][0]
count_list = df.select(collect_list("count")).collect()[0][0] 

A possible solution is using the collect_list() function from pyspark.sql.functions. This will aggregate all column values into a pyspark array that is converted into a python list when collected:

mvv_list   = df.select(collect_list("mvv")).collect()[0][0]
count_list = df.select(collect_list("count")).collect()[0][0] 

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。