问题:如何将新列添加到Spark DataFrame(使用PySpark)?
我有一个Spark DataFrame(使用PySpark 1.5.1),想添加一个新列。
我已经尝试了以下方法,但没有成功:
type(randomed_hours) # => list
# Create in Python and transform to RDD
new_col = pd.DataFrame(randomed_hours, columns=['new_col'])
spark_new_col = sqlContext.createDataFrame(new_col)
my_df_spark.withColumn("hours", spark_new_col["new_col"])
使用此命令也出错:
my_df_spark.withColumn("hours", sc.parallelize(randomed_hours))
那么,如何使用PySpark将新列(基于Python向量)添加到现有DataFrame中?
I have a Spark DataFrame (using PySpark 1.5.1) and would like to add a new column.
I’ve tried the following without any success:
type(randomed_hours) # => list
# Create in Python and transform to RDD
new_col = pd.DataFrame(randomed_hours, columns=['new_col'])
spark_new_col = sqlContext.createDataFrame(new_col)
my_df_spark.withColumn("hours", spark_new_col["new_col"])
Also got an error using this:
my_df_spark.withColumn("hours", sc.parallelize(randomed_hours))
So how do I add a new column (based on Python vector) to an existing DataFrame with PySpark?
回答 0
您不能将任意列添加到DataFrame
Spark中。只能通过使用文字来创建新列(其他文字类型在如何在Spark DataFrame中添加常量列中进行了描述)。
from pyspark.sql.functions import lit
df = sqlContext.createDataFrame(
[(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))
df_with_x4 = df.withColumn("x4", lit(0))
df_with_x4.show()
## +---+---+-----+---+
## | x1| x2| x3| x4|
## +---+---+-----+---+
## | 1| a| 23.0| 0|
## | 3| B|-23.0| 0|
## +---+---+-----+---+
转换现有列:
from pyspark.sql.functions import exp
df_with_x5 = df_with_x4.withColumn("x5", exp("x3"))
df_with_x5.show()
## +---+---+-----+---+--------------------+
## | x1| x2| x3| x4| x5|
## +---+---+-----+---+--------------------+
## | 1| a| 23.0| 0| 9.744803446248903E9|
## | 3| B|-23.0| 0|1.026187963170189...|
## +---+---+-----+---+--------------------+
包括使用join
:
from pyspark.sql.functions import exp
lookup = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v"))
df_with_x6 = (df_with_x5
.join(lookup, col("x1") == col("k"), "leftouter")
.drop("k")
.withColumnRenamed("v", "x6"))
## +---+---+-----+---+--------------------+----+
## | x1| x2| x3| x4| x5| x6|
## +---+---+-----+---+--------------------+----+
## | 1| a| 23.0| 0| 9.744803446248903E9| foo|
## | 3| B|-23.0| 0|1.026187963170189...|null|
## +---+---+-----+---+--------------------+----+
或使用函数/ udf生成:
from pyspark.sql.functions import rand
df_with_x7 = df_with_x6.withColumn("x7", rand())
df_with_x7.show()
## +---+---+-----+---+--------------------+----+-------------------+
## | x1| x2| x3| x4| x5| x6| x7|
## +---+---+-----+---+--------------------+----+-------------------+
## | 1| a| 23.0| 0| 9.744803446248903E9| foo|0.41930610446846617|
## | 3| B|-23.0| 0|1.026187963170189...|null|0.37801881545497873|
## +---+---+-----+---+--------------------+----+-------------------+
在性能方面,pyspark.sql.functions
映射到Catalyst表达式的内置函数()通常优于Python用户定义的函数。
如果要添加任意RDD的内容作为列,则可以
- 将行号添加到现有数据框
- 调用
zipWithIndex
RDD并将其转换为数据帧 - 使用索引作为连接键来连接两者
You cannot add an arbitrary column to a DataFrame
in Spark. New columns can be created only by using literals (other literal types are described in How to add a constant column in a Spark DataFrame?)
from pyspark.sql.functions import lit
df = sqlContext.createDataFrame(
[(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))
df_with_x4 = df.withColumn("x4", lit(0))
df_with_x4.show()
## +---+---+-----+---+
## | x1| x2| x3| x4|
## +---+---+-----+---+
## | 1| a| 23.0| 0|
## | 3| B|-23.0| 0|
## +---+---+-----+---+
transforming an existing column:
from pyspark.sql.functions import exp
df_with_x5 = df_with_x4.withColumn("x5", exp("x3"))
df_with_x5.show()
## +---+---+-----+---+--------------------+
## | x1| x2| x3| x4| x5|
## +---+---+-----+---+--------------------+
## | 1| a| 23.0| 0| 9.744803446248903E9|
## | 3| B|-23.0| 0|1.026187963170189...|
## +---+---+-----+---+--------------------+
included using join
:
from pyspark.sql.functions import exp
lookup = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v"))
df_with_x6 = (df_with_x5
.join(lookup, col("x1") == col("k"), "leftouter")
.drop("k")
.withColumnRenamed("v", "x6"))
## +---+---+-----+---+--------------------+----+
## | x1| x2| x3| x4| x5| x6|
## +---+---+-----+---+--------------------+----+
## | 1| a| 23.0| 0| 9.744803446248903E9| foo|
## | 3| B|-23.0| 0|1.026187963170189...|null|
## +---+---+-----+---+--------------------+----+
or generated with function / udf:
from pyspark.sql.functions import rand
df_with_x7 = df_with_x6.withColumn("x7", rand())
df_with_x7.show()
## +---+---+-----+---+--------------------+----+-------------------+
## | x1| x2| x3| x4| x5| x6| x7|
## +---+---+-----+---+--------------------+----+-------------------+
## | 1| a| 23.0| 0| 9.744803446248903E9| foo|0.41930610446846617|
## | 3| B|-23.0| 0|1.026187963170189...|null|0.37801881545497873|
## +---+---+-----+---+--------------------+----+-------------------+
Performance-wise, built-in functions (pyspark.sql.functions
), which map to Catalyst expression, are usually preferred over Python user defined functions.
If you want to add content of an arbitrary RDD as a column you can
回答 1
要使用UDF添加列:
df = sqlContext.createDataFrame(
[(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))
from pyspark.sql.functions import udf
from pyspark.sql.types import *
def valueToCategory(value):
if value == 1: return 'cat1'
elif value == 2: return 'cat2'
...
else: return 'n/a'
# NOTE: it seems that calls to udf() must be after SparkContext() is called
udfValueToCategory = udf(valueToCategory, StringType())
df_with_cat = df.withColumn("category", udfValueToCategory("x1"))
df_with_cat.show()
## +---+---+-----+---------+
## | x1| x2| x3| category|
## +---+---+-----+---------+
## | 1| a| 23.0| cat1|
## | 3| B|-23.0| n/a|
## +---+---+-----+---------+
To add a column using a UDF:
df = sqlContext.createDataFrame(
[(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))
from pyspark.sql.functions import udf
from pyspark.sql.types import *
def valueToCategory(value):
if value == 1: return 'cat1'
elif value == 2: return 'cat2'
...
else: return 'n/a'
# NOTE: it seems that calls to udf() must be after SparkContext() is called
udfValueToCategory = udf(valueToCategory, StringType())
df_with_cat = df.withColumn("category", udfValueToCategory("x1"))
df_with_cat.show()
## +---+---+-----+---------+
## | x1| x2| x3| category|
## +---+---+-----+---------+
## | 1| a| 23.0| cat1|
## | 3| B|-23.0| n/a|
## +---+---+-----+---------+
回答 2
对于Spark 2.0
# assumes schema has 'age' column
df.select('*', (df.age + 10).alias('agePlusTen'))
For Spark 2.0
# assumes schema has 'age' column
df.select('*', (df.age + 10).alias('agePlusTen'))
回答 3
我们可以通过多种方式在pySpark中添加新列。
让我们首先创建一个简单的DataFrame。
date = [27, 28, 29, None, 30, 31]
df = spark.createDataFrame(date, IntegerType())
现在,让我们尝试将列值加倍并将其存储在新列中。PFB很少有不同的方法可以实现相同。
# Approach - 1 : using withColumn function
df.withColumn("double", df.value * 2).show()
# Approach - 2 : using select with alias function.
df.select("*", (df.value * 2).alias("double")).show()
# Approach - 3 : using selectExpr function with as clause.
df.selectExpr("*", "value * 2 as double").show()
# Approach - 4 : Using as clause in SQL statement.
df.createTempView("temp")
spark.sql("select *, value * 2 as double from temp").show()
有关Spark DataFrame函数的更多示例和说明,请访问我的博客。
我希望这有帮助。
There are multiple ways we can add a new column in pySpark.
Let’s first create a simple DataFrame.
date = [27, 28, 29, None, 30, 31]
df = spark.createDataFrame(date, IntegerType())
Now let’s try to double the column value and store it in a new column. PFB few different approaches to achieve the same.
# Approach - 1 : using withColumn function
df.withColumn("double", df.value * 2).show()
# Approach - 2 : using select with alias function.
df.select("*", (df.value * 2).alias("double")).show()
# Approach - 3 : using selectExpr function with as clause.
df.selectExpr("*", "value * 2 as double").show()
# Approach - 4 : Using as clause in SQL statement.
df.createTempView("temp")
spark.sql("select *, value * 2 as double from temp").show()
For more examples and explanation on spark DataFrame functions, you can visit my blog.
I hope this helps.
回答 4
您可以udf
在添加时定义一个新的column_name
:
u_f = F.udf(lambda :yourstring,StringType())
a.select(u_f().alias('column_name')
You can define a new udf
when adding a column_name
:
u_f = F.udf(lambda :yourstring,StringType())
a.select(u_f().alias('column_name')
回答 5
from pyspark.sql.functions import udf
from pyspark.sql.types import *
func_name = udf(
lambda val: val, # do sth to val
StringType()
)
df.withColumn('new_col', func_name(df.old_col))
from pyspark.sql.functions import udf
from pyspark.sql.types import *
func_name = udf(
lambda val: val, # do sth to val
StringType()
)
df.withColumn('new_col', func_name(df.old_col))
回答 6
我想提供一个非常相似的用例的通用示例:
用例:我的csv包含:
First|Third|Fifth
data|data|data
data|data|data
...billion more lines
我需要执行一些转换,最终的csv需要看起来像
First|Second|Third|Fourth|Fifth
data|null|data|null|data
data|null|data|null|data
...billion more lines
我需要执行此操作,因为这是某些模型定义的架构,并且我需要最终数据与SQL Bulk Inserts等具有互操作性。
所以:
1)我使用spark.read读取原始的csv,并将其称为“ df”。
2)我对数据做了一些处理。
3)我使用此脚本添加空列:
outcols = []
for column in MY_COLUMN_LIST:
if column in df.columns:
outcols.append(column)
else:
outcols.append(lit(None).cast(StringType()).alias('{0}'.format(column)))
df = df.select(outcols)
这样,您可以在加载csv之后构造架构(如果必须对许多表执行此操作,也可以对列进行重新排序)。
I would like to offer a generalized example for a very similar use case:
Use Case: I have a csv consisting of:
First|Third|Fifth
data|data|data
data|data|data
...billion more lines
I need to perform some transformations and the final csv needs to look like
First|Second|Third|Fourth|Fifth
data|null|data|null|data
data|null|data|null|data
...billion more lines
I need to do this because this is the schema defined by some model and I need for my final data to be interoperable with SQL Bulk Inserts and such things.
so:
1) I read the original csv using spark.read and call it “df”.
2) I do something to the data.
3) I add the null columns using this script:
outcols = []
for column in MY_COLUMN_LIST:
if column in df.columns:
outcols.append(column)
else:
outcols.append(lit(None).cast(StringType()).alias('{0}'.format(column)))
df = df.select(outcols)
In this way, you can structure your schema after loading a csv (would also work for reordering columns if you have to do this for many tables).
回答 7
添加列的最简单方法是使用“ withColumn”。由于数据框是使用sqlContext创建的,因此您必须指定架构或默认情况下可以在数据集中使用。如果指定了架构,则每次更改时工作量都会变得很乏味。
您可以考虑以下示例:
from pyspark.sql import SQLContext
from pyspark.sql.types import *
sqlContext = SQLContext(sc) # SparkContext will be sc by default
# Read the dataset of your choice (Already loaded with schema)
Data = sqlContext.read.csv("/path", header = True/False, schema = "infer", sep = "delimiter")
# For instance the data has 30 columns from col1, col2, ... col30. If you want to add a 31st column, you can do so by the following:
Data = Data.withColumn("col31", "Code goes here")
# Check the change
Data.printSchema()
The simplest way to add a column is to use “withColumn”. Since the dataframe is created using sqlContext, you have to specify the schema or by default can be available in the dataset. If the schema is specified, the workload becomes tedious when changing every time.
Below is an example that you can consider:
from pyspark.sql import SQLContext
from pyspark.sql.types import *
sqlContext = SQLContext(sc) # SparkContext will be sc by default
# Read the dataset of your choice (Already loaded with schema)
Data = sqlContext.read.csv("/path", header = True/False, schema = "infer", sep = "delimiter")
# For instance the data has 30 columns from col1, col2, ... col30. If you want to add a 31st column, you can do so by the following:
Data = Data.withColumn("col31", "Code goes here")
# Check the change
Data.printSchema()
回答 8
我们可以通过以下步骤直接向DataFrame添加其他列:
from pyspark.sql.functions import when
df = spark.createDataFrame([["amit", 30], ["rohit", 45], ["sameer", 50]], ["name", "age"])
df = df.withColumn("profile", when(df.age >= 40, "Senior").otherwise("Executive"))
df.show()
We can add additional columns to DataFrame directly with below steps:
from pyspark.sql.functions import when
df = spark.createDataFrame([["amit", 30], ["rohit", 45], ["sameer", 50]], ["name", "age"])
df = df.withColumn("profile", when(df.age >= 40, "Senior").otherwise("Executive"))
df.show()
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。