标签归档:apache-spark-sql

如何在Spark DataFrame中添加常量列?

问题:如何在Spark DataFrame中添加常量列?

我想在中添加DataFrame具有任意值的列(每行相同)。使用withColumn以下内容时出现错误:

dt.withColumn('new_column', 10).head(5)
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-50-a6d0257ca2be> in <module>()
      1 dt = (messages
      2     .select(messages.fromuserid, messages.messagetype, floor(messages.datetime/(1000*60*5)).alias("dt")))
----> 3 dt.withColumn('new_column', 10).head(5)

/Users/evanzamir/spark-1.4.1/python/pyspark/sql/dataframe.pyc in withColumn(self, colName, col)
   1166         [Row(age=2, name=u'Alice', age2=4), Row(age=5, name=u'Bob', age2=7)]
   1167         """
-> 1168         return self.select('*', col.alias(colName))
   1169 
   1170     @ignore_unicode_prefix

AttributeError: 'int' object has no attribute 'alias'

似乎我可以通过添加和减去其他一列(这样它们加到零)然后添加我想要的数字(在这种情况下为10)来欺骗该函数按我的意愿工作:

dt.withColumn('new_column', dt.messagetype - dt.messagetype + 10).head(5)
[Row(fromuserid=425, messagetype=1, dt=4809600.0, new_column=10),
 Row(fromuserid=47019141, messagetype=1, dt=4809600.0, new_column=10),
 Row(fromuserid=49746356, messagetype=1, dt=4809600.0, new_column=10),
 Row(fromuserid=93506471, messagetype=1, dt=4809600.0, new_column=10),
 Row(fromuserid=80488242, messagetype=1, dt=4809600.0, new_column=10)]

这绝对是骇客,对吧?我认为还有一种更合法的方法吗?

I want to add a column in a DataFrame with some arbitrary value (that is the same for each row). I get an error when I use withColumn as follows:

dt.withColumn('new_column', 10).head(5)
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-50-a6d0257ca2be> in <module>()
      1 dt = (messages
      2     .select(messages.fromuserid, messages.messagetype, floor(messages.datetime/(1000*60*5)).alias("dt")))
----> 3 dt.withColumn('new_column', 10).head(5)

/Users/evanzamir/spark-1.4.1/python/pyspark/sql/dataframe.pyc in withColumn(self, colName, col)
   1166         [Row(age=2, name=u'Alice', age2=4), Row(age=5, name=u'Bob', age2=7)]
   1167         """
-> 1168         return self.select('*', col.alias(colName))
   1169 
   1170     @ignore_unicode_prefix

AttributeError: 'int' object has no attribute 'alias'

It seems that I can trick the function into working as I want by adding and subtracting one of the other columns (so they add to zero) and then adding the number I want (10 in this case):

dt.withColumn('new_column', dt.messagetype - dt.messagetype + 10).head(5)
[Row(fromuserid=425, messagetype=1, dt=4809600.0, new_column=10),
 Row(fromuserid=47019141, messagetype=1, dt=4809600.0, new_column=10),
 Row(fromuserid=49746356, messagetype=1, dt=4809600.0, new_column=10),
 Row(fromuserid=93506471, messagetype=1, dt=4809600.0, new_column=10),
 Row(fromuserid=80488242, messagetype=1, dt=4809600.0, new_column=10)]

This is supremely hacky, right? I assume there is a more legit way to do this?


回答 0

Spark 2.2+

Spark 2.2引入typedLit了support SeqMapTuplesSPARK-19254),并且应该支持以下调用(Scala):

import org.apache.spark.sql.functions.typedLit

df.withColumn("some_array", typedLit(Seq(1, 2, 3)))
df.withColumn("some_struct", typedLit(("foo", 1, 0.3)))
df.withColumn("some_map", typedLit(Map("key1" -> 1, "key2" -> 2)))

Spark 1.3以上lit),1.4以上arraystruct),2.0以上map):

的第二个参数DataFrame.withColumn应该是a,Column因此您必须使用文字:

from pyspark.sql.functions import lit

df.withColumn('new_column', lit(10))

如果您需要复杂的列,则可以使用以下代码块构建这些列array

from pyspark.sql.functions import array, create_map, struct

df.withColumn("some_array", array(lit(1), lit(2), lit(3)))
df.withColumn("some_struct", struct(lit("foo"), lit(1), lit(.3)))
df.withColumn("some_map", create_map(lit("key1"), lit(1), lit("key2"), lit(2)))

可以在Scala中使用完全相同的方法。

import org.apache.spark.sql.functions.{array, lit, map, struct}

df.withColumn("new_column", lit(10))
df.withColumn("map", map(lit("key1"), lit(1), lit("key2"), lit(2)))

为了提供名称structs或者使用alias上的每个字段:

df.withColumn(
    "some_struct",
    struct(lit("foo").alias("x"), lit(1).alias("y"), lit(0.3).alias("z"))
 )

cast整个对象

df.withColumn(
    "some_struct", 
    struct(lit("foo"), lit(1), lit(0.3)).cast("struct<x: string, y: integer, z: double>")
 )

尽管较慢,也可以使用UDF。

注意事项

可以使用相同的构造将常量参数传递给UDF或SQL函数。

Spark 2.2+

Spark 2.2 introduces typedLit to support Seq, Map, and Tuples (SPARK-19254) and following calls should be supported (Scala):

import org.apache.spark.sql.functions.typedLit

df.withColumn("some_array", typedLit(Seq(1, 2, 3)))
df.withColumn("some_struct", typedLit(("foo", 1, 0.3)))
df.withColumn("some_map", typedLit(Map("key1" -> 1, "key2" -> 2)))

Spark 1.3+ (lit), 1.4+ (array, struct), 2.0+ (map):

The second argument for DataFrame.withColumn should be a Column so you have to use a literal:

from pyspark.sql.functions import lit

df.withColumn('new_column', lit(10))

If you need complex columns you can build these using blocks like array:

from pyspark.sql.functions import array, create_map, struct

df.withColumn("some_array", array(lit(1), lit(2), lit(3)))
df.withColumn("some_struct", struct(lit("foo"), lit(1), lit(.3)))
df.withColumn("some_map", create_map(lit("key1"), lit(1), lit("key2"), lit(2)))

Exactly the same methods can be used in Scala.

import org.apache.spark.sql.functions.{array, lit, map, struct}

df.withColumn("new_column", lit(10))
df.withColumn("map", map(lit("key1"), lit(1), lit("key2"), lit(2)))

To provide names for structs use either alias on each field:

df.withColumn(
    "some_struct",
    struct(lit("foo").alias("x"), lit(1).alias("y"), lit(0.3).alias("z"))
 )

or cast on the whole object

df.withColumn(
    "some_struct", 
    struct(lit("foo"), lit(1), lit(0.3)).cast("struct<x: string, y: integer, z: double>")
 )

It is also possible, although slower, to use an UDF.

Note:

The same constructs can be used to pass constant arguments to UDFs or SQL functions.


回答 1

在spark 2.2中,有两种方法可以在DataFrame的列中添加常量值:

1)使用 lit

2)使用typedLit

两者之间的区别在于typedLit还可以处理参数化的Scala类型,例如List,Seq和Map

样本数据框:

val df = spark.createDataFrame(Seq((0,"a"),(1,"b"),(2,"c"))).toDF("id", "col1")

+---+----+
| id|col1|
+---+----+
|  0|   a|
|  1|   b|
+---+----+

1)使用lit在名为newcol的新列中添加常量字符串值:

import org.apache.spark.sql.functions.lit
val newdf = df.withColumn("newcol",lit("myval"))

结果:

+---+----+------+
| id|col1|newcol|
+---+----+------+
|  0|   a| myval|
|  1|   b| myval|
+---+----+------+

2)使用typedLit

import org.apache.spark.sql.functions.typedLit
df.withColumn("newcol", typedLit(("sample", 10, .044)))

结果:

+---+----+-----------------+
| id|col1|           newcol|
+---+----+-----------------+
|  0|   a|[sample,10,0.044]|
|  1|   b|[sample,10,0.044]|
|  2|   c|[sample,10,0.044]|
+---+----+-----------------+

In spark 2.2 there are two ways to add constant value in a column in DataFrame:

1) Using lit

2) Using typedLit.

The difference between the two is that typedLit can also handle parameterized scala types e.g. List, Seq, and Map

Sample DataFrame:

val df = spark.createDataFrame(Seq((0,"a"),(1,"b"),(2,"c"))).toDF("id", "col1")

+---+----+
| id|col1|
+---+----+
|  0|   a|
|  1|   b|
+---+----+

1) Using lit: Adding constant string value in new column named newcol:

import org.apache.spark.sql.functions.lit
val newdf = df.withColumn("newcol",lit("myval"))

Result:

+---+----+------+
| id|col1|newcol|
+---+----+------+
|  0|   a| myval|
|  1|   b| myval|
+---+----+------+

2) Using typedLit:

import org.apache.spark.sql.functions.typedLit
df.withColumn("newcol", typedLit(("sample", 10, .044)))

Result:

+---+----+-----------------+
| id|col1|           newcol|
+---+----+-----------------+
|  0|   a|[sample,10,0.044]|
|  1|   b|[sample,10,0.044]|
|  2|   c|[sample,10,0.044]|
+---+----+-----------------+

如何将新列添加到Spark DataFrame(使用PySpark)?

问题:如何将新列添加到Spark DataFrame(使用PySpark)?

我有一个Spark DataFrame(使用PySpark 1.5.1),想添加一个新列。

我已经尝试了以下方法,但没有成功:

type(randomed_hours) # => list

# Create in Python and transform to RDD

new_col = pd.DataFrame(randomed_hours, columns=['new_col'])

spark_new_col = sqlContext.createDataFrame(new_col)

my_df_spark.withColumn("hours", spark_new_col["new_col"])

使用此命令也出错:

my_df_spark.withColumn("hours",  sc.parallelize(randomed_hours))

那么,如何使用PySpark将新列(基于Python向量)添加到现有DataFrame中?

I have a Spark DataFrame (using PySpark 1.5.1) and would like to add a new column.

I’ve tried the following without any success:

type(randomed_hours) # => list

# Create in Python and transform to RDD

new_col = pd.DataFrame(randomed_hours, columns=['new_col'])

spark_new_col = sqlContext.createDataFrame(new_col)

my_df_spark.withColumn("hours", spark_new_col["new_col"])

Also got an error using this:

my_df_spark.withColumn("hours",  sc.parallelize(randomed_hours))

So how do I add a new column (based on Python vector) to an existing DataFrame with PySpark?


回答 0

您不能将任意列添加到DataFrameSpark中。只能通过使用文字来创建新列(其他文字类型在如何在Spark DataFrame中添加常量列中进行了描述)。

from pyspark.sql.functions import lit

df = sqlContext.createDataFrame(
    [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))

df_with_x4 = df.withColumn("x4", lit(0))
df_with_x4.show()

## +---+---+-----+---+
## | x1| x2|   x3| x4|
## +---+---+-----+---+
## |  1|  a| 23.0|  0|
## |  3|  B|-23.0|  0|
## +---+---+-----+---+

转换现有列:

from pyspark.sql.functions import exp

df_with_x5 = df_with_x4.withColumn("x5", exp("x3"))
df_with_x5.show()

## +---+---+-----+---+--------------------+
## | x1| x2|   x3| x4|                  x5|
## +---+---+-----+---+--------------------+
## |  1|  a| 23.0|  0| 9.744803446248903E9|
## |  3|  B|-23.0|  0|1.026187963170189...|
## +---+---+-----+---+--------------------+

包括使用join

from pyspark.sql.functions import exp

lookup = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v"))
df_with_x6 = (df_with_x5
    .join(lookup, col("x1") == col("k"), "leftouter")
    .drop("k")
    .withColumnRenamed("v", "x6"))

## +---+---+-----+---+--------------------+----+
## | x1| x2|   x3| x4|                  x5|  x6|
## +---+---+-----+---+--------------------+----+
## |  1|  a| 23.0|  0| 9.744803446248903E9| foo|
## |  3|  B|-23.0|  0|1.026187963170189...|null|
## +---+---+-----+---+--------------------+----+

或使用函数/ udf生成:

from pyspark.sql.functions import rand

df_with_x7 = df_with_x6.withColumn("x7", rand())
df_with_x7.show()

## +---+---+-----+---+--------------------+----+-------------------+
## | x1| x2|   x3| x4|                  x5|  x6|                 x7|
## +---+---+-----+---+--------------------+----+-------------------+
## |  1|  a| 23.0|  0| 9.744803446248903E9| foo|0.41930610446846617|
## |  3|  B|-23.0|  0|1.026187963170189...|null|0.37801881545497873|
## +---+---+-----+---+--------------------+----+-------------------+

在性能方面,pyspark.sql.functions映射到Catalyst表达式的内置函数()通常优于Python用户定义的函数。

如果要添加任意RDD的内容作为列,则可以

  • 行号添加到现有数据框
  • 调用zipWithIndexRDD并将其转换为数据帧
  • 使用索引作为连接键来连接两者

You cannot add an arbitrary column to a DataFrame in Spark. New columns can be created only by using literals (other literal types are described in How to add a constant column in a Spark DataFrame?)

from pyspark.sql.functions import lit

df = sqlContext.createDataFrame(
    [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))

df_with_x4 = df.withColumn("x4", lit(0))
df_with_x4.show()

## +---+---+-----+---+
## | x1| x2|   x3| x4|
## +---+---+-----+---+
## |  1|  a| 23.0|  0|
## |  3|  B|-23.0|  0|
## +---+---+-----+---+

transforming an existing column:

from pyspark.sql.functions import exp

df_with_x5 = df_with_x4.withColumn("x5", exp("x3"))
df_with_x5.show()

## +---+---+-----+---+--------------------+
## | x1| x2|   x3| x4|                  x5|
## +---+---+-----+---+--------------------+
## |  1|  a| 23.0|  0| 9.744803446248903E9|
## |  3|  B|-23.0|  0|1.026187963170189...|
## +---+---+-----+---+--------------------+

included using join:

from pyspark.sql.functions import exp

lookup = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v"))
df_with_x6 = (df_with_x5
    .join(lookup, col("x1") == col("k"), "leftouter")
    .drop("k")
    .withColumnRenamed("v", "x6"))

## +---+---+-----+---+--------------------+----+
## | x1| x2|   x3| x4|                  x5|  x6|
## +---+---+-----+---+--------------------+----+
## |  1|  a| 23.0|  0| 9.744803446248903E9| foo|
## |  3|  B|-23.0|  0|1.026187963170189...|null|
## +---+---+-----+---+--------------------+----+

or generated with function / udf:

from pyspark.sql.functions import rand

df_with_x7 = df_with_x6.withColumn("x7", rand())
df_with_x7.show()

## +---+---+-----+---+--------------------+----+-------------------+
## | x1| x2|   x3| x4|                  x5|  x6|                 x7|
## +---+---+-----+---+--------------------+----+-------------------+
## |  1|  a| 23.0|  0| 9.744803446248903E9| foo|0.41930610446846617|
## |  3|  B|-23.0|  0|1.026187963170189...|null|0.37801881545497873|
## +---+---+-----+---+--------------------+----+-------------------+

Performance-wise, built-in functions (pyspark.sql.functions), which map to Catalyst expression, are usually preferred over Python user defined functions.

If you want to add content of an arbitrary RDD as a column you can


回答 1

要使用UDF添加列:

df = sqlContext.createDataFrame(
    [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))

from pyspark.sql.functions import udf
from pyspark.sql.types import *

def valueToCategory(value):
   if   value == 1: return 'cat1'
   elif value == 2: return 'cat2'
   ...
   else: return 'n/a'

# NOTE: it seems that calls to udf() must be after SparkContext() is called
udfValueToCategory = udf(valueToCategory, StringType())
df_with_cat = df.withColumn("category", udfValueToCategory("x1"))
df_with_cat.show()

## +---+---+-----+---------+
## | x1| x2|   x3| category|
## +---+---+-----+---------+
## |  1|  a| 23.0|     cat1|
## |  3|  B|-23.0|      n/a|
## +---+---+-----+---------+

To add a column using a UDF:

df = sqlContext.createDataFrame(
    [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))

from pyspark.sql.functions import udf
from pyspark.sql.types import *

def valueToCategory(value):
   if   value == 1: return 'cat1'
   elif value == 2: return 'cat2'
   ...
   else: return 'n/a'

# NOTE: it seems that calls to udf() must be after SparkContext() is called
udfValueToCategory = udf(valueToCategory, StringType())
df_with_cat = df.withColumn("category", udfValueToCategory("x1"))
df_with_cat.show()

## +---+---+-----+---------+
## | x1| x2|   x3| category|
## +---+---+-----+---------+
## |  1|  a| 23.0|     cat1|
## |  3|  B|-23.0|      n/a|
## +---+---+-----+---------+

回答 2

对于Spark 2.0

# assumes schema has 'age' column 
df.select('*', (df.age + 10).alias('agePlusTen'))

For Spark 2.0

# assumes schema has 'age' column 
df.select('*', (df.age + 10).alias('agePlusTen'))

回答 3

我们可以通过多种方式在pySpark中添加新列。

让我们首先创建一个简单的DataFrame。

date = [27, 28, 29, None, 30, 31]
df = spark.createDataFrame(date, IntegerType())

现在,让我们尝试将列值加倍并将其存储在新列中。PFB很少有不同的方法可以实现相同。

# Approach - 1 : using withColumn function
df.withColumn("double", df.value * 2).show()

# Approach - 2 : using select with alias function.
df.select("*", (df.value * 2).alias("double")).show()

# Approach - 3 : using selectExpr function with as clause.
df.selectExpr("*", "value * 2 as double").show()

# Approach - 4 : Using as clause in SQL statement.
df.createTempView("temp")
spark.sql("select *, value * 2 as double from temp").show()

有关Spark DataFrame函数的更多示例和说明,请访问我的博客

我希望这有帮助。

There are multiple ways we can add a new column in pySpark.

Let’s first create a simple DataFrame.

date = [27, 28, 29, None, 30, 31]
df = spark.createDataFrame(date, IntegerType())

Now let’s try to double the column value and store it in a new column. PFB few different approaches to achieve the same.

# Approach - 1 : using withColumn function
df.withColumn("double", df.value * 2).show()

# Approach - 2 : using select with alias function.
df.select("*", (df.value * 2).alias("double")).show()

# Approach - 3 : using selectExpr function with as clause.
df.selectExpr("*", "value * 2 as double").show()

# Approach - 4 : Using as clause in SQL statement.
df.createTempView("temp")
spark.sql("select *, value * 2 as double from temp").show()

For more examples and explanation on spark DataFrame functions, you can visit my blog.

I hope this helps.


回答 4

您可以udf在添加时定义一个新的column_name

u_f = F.udf(lambda :yourstring,StringType())
a.select(u_f().alias('column_name')

You can define a new udf when adding a column_name:

u_f = F.udf(lambda :yourstring,StringType())
a.select(u_f().alias('column_name')

回答 5

from pyspark.sql.functions import udf
from pyspark.sql.types import *
func_name = udf(
    lambda val: val, # do sth to val
    StringType()
)
df.withColumn('new_col', func_name(df.old_col))
from pyspark.sql.functions import udf
from pyspark.sql.types import *
func_name = udf(
    lambda val: val, # do sth to val
    StringType()
)
df.withColumn('new_col', func_name(df.old_col))

回答 6

我想提供一个非常相似的用例的通用示例:

用例:我的csv包含:

First|Third|Fifth
data|data|data
data|data|data
...billion more lines

我需要执行一些转换,最终的csv需要看起来像

First|Second|Third|Fourth|Fifth
data|null|data|null|data
data|null|data|null|data
...billion more lines

我需要执行此操作,因为这是某些模型定义的架构,并且我需要最终数据与SQL Bulk Inserts等具有互操作性。

所以:

1)我使用spark.read读取原始的csv,并将其称为“ df”。

2)我对数据做了一些处理。

3)我使用此脚本添加空列:

outcols = []
for column in MY_COLUMN_LIST:
    if column in df.columns:
        outcols.append(column)
    else:
        outcols.append(lit(None).cast(StringType()).alias('{0}'.format(column)))

df = df.select(outcols)

这样,您可以在加载csv之后构造架构(如果必须对许多表执行此操作,也可以对列进行重新排序)。

I would like to offer a generalized example for a very similar use case:

Use Case: I have a csv consisting of:

First|Third|Fifth
data|data|data
data|data|data
...billion more lines

I need to perform some transformations and the final csv needs to look like

First|Second|Third|Fourth|Fifth
data|null|data|null|data
data|null|data|null|data
...billion more lines

I need to do this because this is the schema defined by some model and I need for my final data to be interoperable with SQL Bulk Inserts and such things.

so:

1) I read the original csv using spark.read and call it “df”.

2) I do something to the data.

3) I add the null columns using this script:

outcols = []
for column in MY_COLUMN_LIST:
    if column in df.columns:
        outcols.append(column)
    else:
        outcols.append(lit(None).cast(StringType()).alias('{0}'.format(column)))

df = df.select(outcols)

In this way, you can structure your schema after loading a csv (would also work for reordering columns if you have to do this for many tables).


回答 7

添加列的最简单方法是使用“ withColumn”。由于数据框是使用sqlContext创建的,因此您必须指定架构或默认情况下可以在数据集中使用。如果指定了架构,则每次更改时工作量都会变得很乏味。

您可以考虑以下示例:

from pyspark.sql import SQLContext
from pyspark.sql.types import *
sqlContext = SQLContext(sc) # SparkContext will be sc by default 

# Read the dataset of your choice (Already loaded with schema)
Data = sqlContext.read.csv("/path", header = True/False, schema = "infer", sep = "delimiter")

# For instance the data has 30 columns from col1, col2, ... col30. If you want to add a 31st column, you can do so by the following:
Data = Data.withColumn("col31", "Code goes here")

# Check the change 
Data.printSchema()

The simplest way to add a column is to use “withColumn”. Since the dataframe is created using sqlContext, you have to specify the schema or by default can be available in the dataset. If the schema is specified, the workload becomes tedious when changing every time.

Below is an example that you can consider:

from pyspark.sql import SQLContext
from pyspark.sql.types import *
sqlContext = SQLContext(sc) # SparkContext will be sc by default 

# Read the dataset of your choice (Already loaded with schema)
Data = sqlContext.read.csv("/path", header = True/False, schema = "infer", sep = "delimiter")

# For instance the data has 30 columns from col1, col2, ... col30. If you want to add a 31st column, you can do so by the following:
Data = Data.withColumn("col31", "Code goes here")

# Check the change 
Data.printSchema()

回答 8

我们可以通过以下步骤直接向DataFrame添加其他列:

from pyspark.sql.functions import when
df = spark.createDataFrame([["amit", 30], ["rohit", 45], ["sameer", 50]], ["name", "age"])
df = df.withColumn("profile", when(df.age >= 40, "Senior").otherwise("Executive"))
df.show()

We can add additional columns to DataFrame directly with below steps:

from pyspark.sql.functions import when
df = spark.createDataFrame([["amit", 30], ["rohit", 45], ["sameer", 50]], ["name", "age"])
df = df.withColumn("profile", when(df.age >= 40, "Senior").otherwise("Executive"))
df.show()

使用无值过滤Pyspark数据框列

问题:使用无值过滤Pyspark数据框列

我正在尝试过滤具有None作为行值的PySpark数据框:

df.select('dt_mvmt').distinct().collect()

[Row(dt_mvmt=u'2016-03-27'),
 Row(dt_mvmt=u'2016-03-28'),
 Row(dt_mvmt=u'2016-03-29'),
 Row(dt_mvmt=None),
 Row(dt_mvmt=u'2016-03-30'),
 Row(dt_mvmt=u'2016-03-31')]

我可以使用字符串值正确过滤:

df[df.dt_mvmt == '2016-03-31']
# some results here

但这失败了:

df[df.dt_mvmt == None].count()
0
df[df.dt_mvmt != None].count()
0

但是每个类别上肯定都有价值。这是怎么回事?

I’m trying to filter a PySpark dataframe that has None as a row value:

df.select('dt_mvmt').distinct().collect()

[Row(dt_mvmt=u'2016-03-27'),
 Row(dt_mvmt=u'2016-03-28'),
 Row(dt_mvmt=u'2016-03-29'),
 Row(dt_mvmt=None),
 Row(dt_mvmt=u'2016-03-30'),
 Row(dt_mvmt=u'2016-03-31')]

and I can filter correctly with an string value:

df[df.dt_mvmt == '2016-03-31']
# some results here

but this fails:

df[df.dt_mvmt == None].count()
0
df[df.dt_mvmt != None].count()
0

But there are definitely values on each category. What’s going on?


回答 0

您可以使用Column.isNull/ Column.isNotNull

df.where(col("dt_mvmt").isNull())

df.where(col("dt_mvmt").isNotNull())

如果你想简单地丢弃NULL值,您可以使用na.dropsubset参数:

df.na.drop(subset=["dt_mvmt"])

与Equals进行基于相等的比较NULL将不起作用,因为在SQL NULL中未定义,因此任何将其与另一个值进行比较的尝试都将返回NULL

sqlContext.sql("SELECT NULL = NULL").show()
## +-------------+
## |(NULL = NULL)|
## +-------------+
## |         null|
## +-------------+


sqlContext.sql("SELECT NULL != NULL").show()
## +-------------------+
## |(NOT (NULL = NULL))|
## +-------------------+
## |               null|
## +-------------------+

与值进行比较的唯一有效方法NULLIS/ IS NOT,它等效于isNull/ isNotNull方法调用。

You can use Column.isNull / Column.isNotNull:

df.where(col("dt_mvmt").isNull())

df.where(col("dt_mvmt").isNotNull())

If you want to simply drop NULL values you can use na.drop with subset argument:

df.na.drop(subset=["dt_mvmt"])

Equality based comparisons with NULL won’t work because in SQL NULL is undefined so any attempt to compare it with another value returns NULL:

sqlContext.sql("SELECT NULL = NULL").show()
## +-------------+
## |(NULL = NULL)|
## +-------------+
## |         null|
## +-------------+


sqlContext.sql("SELECT NULL != NULL").show()
## +-------------------+
## |(NOT (NULL = NULL))|
## +-------------------+
## |               null|
## +-------------------+

The only valid method to compare value with NULL is IS / IS NOT which are equivalent to the isNull / isNotNull method calls.


回答 1

尝试仅使用isNotNull函数。

df.filter(df.dt_mvmt.isNotNull()).count()

Try to just use isNotNull function.

df.filter(df.dt_mvmt.isNotNull()).count()

回答 2

为了获得dt_mvmt列中的值不为null的条目,我们有

df.filter("dt_mvmt is not NULL")

对于为空的条目,我们有

df.filter("dt_mvmt is NULL")

To obtain entries whose values in the dt_mvmt column are not null we have

df.filter("dt_mvmt is not NULL")

and for entries which are null we have

df.filter("dt_mvmt is NULL")

回答 3

如果您想保留Pandas语法,这对我有用。

df = df[df.dt_mvmt.isNotNull()]

If you want to keep with the Pandas syntex this worked for me.

df = df[df.dt_mvmt.isNotNull()]

回答 4

如果列=无

COLUMN_OLD_VALUE
----------------
None
1
None
100
20
------------------

使用在数据框上创建一个临时表:

sqlContext.sql("select * from tempTable where column_old_value='None' ").show()

因此使用: column_old_value='None'

if column = None

COLUMN_OLD_VALUE
----------------
None
1
None
100
20
------------------

Use create a temptable on data frame:

sqlContext.sql("select * from tempTable where column_old_value='None' ").show()

So use : column_old_value='None'


回答 5

您可以通过多种方式从DataFrame的列中删除/过滤空值。

让我们用下面的代码创建一个简单的DataFrame:

date = ['2016-03-27','2016-03-28','2016-03-29', None, '2016-03-30','2016-03-31']
df = spark.createDataFrame(date, StringType())

现在,您可以尝试使用以下方法之一来筛选出空值。

# Approach - 1
df.filter("value is not null").show()

# Approach - 2
df.filter(col("value").isNotNull()).show()

# Approach - 3
df.filter(df["value"].isNotNull()).show()

# Approach - 4
df.filter(df.value.isNotNull()).show()

# Approach - 5
df.na.drop(subset=["value"]).show()

# Approach - 6
df.dropna(subset=["value"]).show()

# Note: You can also use where function instead of a filter.

您也可以在我的博客上查看“使用NULL值”部分以获取更多信息。

希望对您有所帮助。

There are multiple ways you can remove/filter the null values from a column in DataFrame.

Lets create a simple DataFrame with below code:

date = ['2016-03-27','2016-03-28','2016-03-29', None, '2016-03-30','2016-03-31']
df = spark.createDataFrame(date, StringType())

Now you can try one of the below approach to filter out the null values.

# Approach - 1
df.filter("value is not null").show()

# Approach - 2
df.filter(col("value").isNotNull()).show()

# Approach - 3
df.filter(df["value"].isNotNull()).show()

# Approach - 4
df.filter(df.value.isNotNull()).show()

# Approach - 5
df.na.drop(subset=["value"]).show()

# Approach - 6
df.dropna(subset=["value"]).show()

# Note: You can also use where function instead of a filter.

You can also check the section “Working with NULL Values” on my blog for more information.

I hope it helps.


回答 6

PySpark根据算术,逻辑和其他条件提供各种过滤选项。NULL值的存在可能会妨碍进一步的处理。可以选择删除它们或从统计学上估算它们。

可以考虑以下代码集:

# Dataset is df
# Column name is dt_mvmt
# Before filtering make sure you have the right count of the dataset
df.count() # Some number

# Filter here
df = df.filter(df.dt_mvmt.isNotNull())

# Check the count to ensure there are NULL values present (This is important when dealing with large dataset)
df.count() # Count should be reduced if NULL values are present

PySpark provides various filtering options based on arithmetic, logical and other conditions. Presence of NULL values can hamper further processes. Removing them or statistically imputing them could be a choice.

Below set of code can be considered:

# Dataset is df
# Column name is dt_mvmt
# Before filtering make sure you have the right count of the dataset
df.count() # Some number

# Filter here
df = df.filter(df.dt_mvmt.isNotNull())

# Check the count to ensure there are NULL values present (This is important when dealing with large dataset)
df.count() # Count should be reduced if NULL values are present

回答 7

我也会尝试:

df = df.dropna(subset=["dt_mvmt"])

I would also try:

df = df.dropna(subset=["dt_mvmt"])


回答 8

如果要过滤出列中没有值的记录,请参见以下示例:

df=spark.createDataFrame([[123,"abc"],[234,"fre"],[345,None]],["a","b"])

现在过滤掉空值记录:

df=df.filter(df.b.isNotNull())

df.show()

如果要从DF中删除这些记录,请参见以下内容:

df1=df.na.drop(subset=['b'])

df1.show()

If you want to filter out records having None value in column then see below example:

df=spark.createDataFrame([[123,"abc"],[234,"fre"],[345,None]],["a","b"])

Now filter out null value records:

df=df.filter(df.b.isNotNull())

df.show()

If you want to remove those records from DF then see below:

df1=df.na.drop(subset=['b'])

df1.show()

回答 9

None / Null是pyspark / python中类NoneType的数据类型,因此,当您尝试将NoneType对象与字符串对象进行比较时,下面的方法将不起作用

错误的过滤方法

df [df.dt_mvmt ==无] .count()0 df [df.dt_mvmt!=无] .count()0

正确

df = df.where(col(“ dt_mvmt”)。isNotNull())返回dt_mvmt为None / Null的所有记录

None/Null is a data type of the class NoneType in pyspark/python so, Below will not work as you are trying to compare NoneType object with string object

Wrong way of filreting

df[df.dt_mvmt == None].count() 0 df[df.dt_mvmt != None].count() 0

correct

df=df.where(col(“dt_mvmt”).isNotNull()) returns all records with dt_mvmt as None/Null


如何在pyspark中将Dataframe列从String类型更改为Double类型

问题:如何在pyspark中将Dataframe列从String类型更改为Double类型

我有一个列为String的数据框。我想在PySpark中将列类型更改为Double type。

以下是我的方法:

toDoublefunc = UserDefinedFunction(lambda x: x,DoubleType())
changedTypedf = joindf.withColumn("label",toDoublefunc(joindf['show']))

只是想知道,这是正确的方法,就像通过Logistic回归运行时一样,我遇到了一些错误,所以我想知道,这是麻烦的原因。

I have a dataframe with column as String. I wanted to change the column type to Double type in PySpark.

Following is the way, I did:

toDoublefunc = UserDefinedFunction(lambda x: x,DoubleType())
changedTypedf = joindf.withColumn("label",toDoublefunc(joindf['show']))

Just wanted to know, is this the right way to do it as while running through Logistic Regression, I am getting some error, so I wonder, is this the reason for the trouble.


回答 0

这里不需要UDF。Column已经提供了带有instance的cast方法DataType

from pyspark.sql.types import DoubleType

changedTypedf = joindf.withColumn("label", joindf["show"].cast(DoubleType()))

或短字符串:

changedTypedf = joindf.withColumn("label", joindf["show"].cast("double"))

规范的字符串名称(也可以支持其他变体)对应于simpleString值。因此对于原子类型:

from pyspark.sql import types 

for t in ['BinaryType', 'BooleanType', 'ByteType', 'DateType', 
          'DecimalType', 'DoubleType', 'FloatType', 'IntegerType', 
           'LongType', 'ShortType', 'StringType', 'TimestampType']:
    print(f"{t}: {getattr(types, t)().simpleString()}")
BinaryType: binary
BooleanType: boolean
ByteType: tinyint
DateType: date
DecimalType: decimal(10,0)
DoubleType: double
FloatType: float
IntegerType: int
LongType: bigint
ShortType: smallint
StringType: string
TimestampType: timestamp

例如复杂类型

types.ArrayType(types.IntegerType()).simpleString()   
'array<int>'
types.MapType(types.StringType(), types.IntegerType()).simpleString()
'map<string,int>'

There is no need for an UDF here. Column already provides cast method with DataType instance :

from pyspark.sql.types import DoubleType

changedTypedf = joindf.withColumn("label", joindf["show"].cast(DoubleType()))

or short string:

changedTypedf = joindf.withColumn("label", joindf["show"].cast("double"))

where canonical string names (other variations can be supported as well) correspond to simpleString value. So for atomic types:

from pyspark.sql import types 

for t in ['BinaryType', 'BooleanType', 'ByteType', 'DateType', 
          'DecimalType', 'DoubleType', 'FloatType', 'IntegerType', 
           'LongType', 'ShortType', 'StringType', 'TimestampType']:
    print(f"{t}: {getattr(types, t)().simpleString()}")
BinaryType: binary
BooleanType: boolean
ByteType: tinyint
DateType: date
DecimalType: decimal(10,0)
DoubleType: double
FloatType: float
IntegerType: int
LongType: bigint
ShortType: smallint
StringType: string
TimestampType: timestamp

and for example complex types

types.ArrayType(types.IntegerType()).simpleString()   
'array<int>'
types.MapType(types.StringType(), types.IntegerType()).simpleString()
'map<string,int>'

回答 1

保留列名,并通过使用与输入列相同的名称来避免添加额外的列:

changedTypedf = joindf.withColumn("show", joindf["show"].cast(DoubleType()))

Preserve the name of the column and avoid extra column addition by using the same name as input column:

changedTypedf = joindf.withColumn("show", joindf["show"].cast(DoubleType()))

回答 2

给定的答案足以解决问题,但是我想分享另一种可能会引入新版本的Spark的方法(我不确定),因此给定的答案无法解决。

我们可以使用col("colum_name")关键字到达spark语句中的列:

from pyspark.sql.functions import col , column
changedTypedf = joindf.withColumn("show", col("show").cast("double"))

Given answers are enough to deal with the problem but I want to share another way which may be introduced the new version of Spark (I am not sure about it) so given answer didn’t catch it.

We can reach the column in spark statement with col("colum_name") keyword:

from pyspark.sql.functions import col , column
changedTypedf = joindf.withColumn("show", col("show").cast("double"))

回答 3

pyspark版本:

  df = <source data>
  df.printSchema()

  from pyspark.sql.types import *

  # Change column type
  df_new = df.withColumn("myColumn", df["myColumn"].cast(IntegerType()))
  df_new.printSchema()
  df_new.select("myColumn").show()

pyspark version:

  df = <source data>
  df.printSchema()

  from pyspark.sql.types import *

  # Change column type
  df_new = df.withColumn("myColumn", df["myColumn"].cast(IntegerType()))
  df_new.printSchema()
  df_new.select("myColumn").show()

回答 4

解决方案很简单-

toDoublefunc = UserDefinedFunction(lambda x: float(x),DoubleType())
changedTypedf = joindf.withColumn("label",toDoublefunc(joindf['show']))

the solution was simple –

toDoublefunc = UserDefinedFunction(lambda x: float(x),DoubleType())
changedTypedf = joindf.withColumn("label",toDoublefunc(joindf['show']))