标签归档:apache-spark

如何在Spark DataFrame中添加常量列?

问题:如何在Spark DataFrame中添加常量列?

我想在中添加DataFrame具有任意值的列(每行相同)。使用withColumn以下内容时出现错误:

dt.withColumn('new_column', 10).head(5)
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-50-a6d0257ca2be> in <module>()
      1 dt = (messages
      2     .select(messages.fromuserid, messages.messagetype, floor(messages.datetime/(1000*60*5)).alias("dt")))
----> 3 dt.withColumn('new_column', 10).head(5)

/Users/evanzamir/spark-1.4.1/python/pyspark/sql/dataframe.pyc in withColumn(self, colName, col)
   1166         [Row(age=2, name=u'Alice', age2=4), Row(age=5, name=u'Bob', age2=7)]
   1167         """
-> 1168         return self.select('*', col.alias(colName))
   1169 
   1170     @ignore_unicode_prefix

AttributeError: 'int' object has no attribute 'alias'

似乎我可以通过添加和减去其他一列(这样它们加到零)然后添加我想要的数字(在这种情况下为10)来欺骗该函数按我的意愿工作:

dt.withColumn('new_column', dt.messagetype - dt.messagetype + 10).head(5)
[Row(fromuserid=425, messagetype=1, dt=4809600.0, new_column=10),
 Row(fromuserid=47019141, messagetype=1, dt=4809600.0, new_column=10),
 Row(fromuserid=49746356, messagetype=1, dt=4809600.0, new_column=10),
 Row(fromuserid=93506471, messagetype=1, dt=4809600.0, new_column=10),
 Row(fromuserid=80488242, messagetype=1, dt=4809600.0, new_column=10)]

这绝对是骇客,对吧?我认为还有一种更合法的方法吗?

I want to add a column in a DataFrame with some arbitrary value (that is the same for each row). I get an error when I use withColumn as follows:

dt.withColumn('new_column', 10).head(5)
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-50-a6d0257ca2be> in <module>()
      1 dt = (messages
      2     .select(messages.fromuserid, messages.messagetype, floor(messages.datetime/(1000*60*5)).alias("dt")))
----> 3 dt.withColumn('new_column', 10).head(5)

/Users/evanzamir/spark-1.4.1/python/pyspark/sql/dataframe.pyc in withColumn(self, colName, col)
   1166         [Row(age=2, name=u'Alice', age2=4), Row(age=5, name=u'Bob', age2=7)]
   1167         """
-> 1168         return self.select('*', col.alias(colName))
   1169 
   1170     @ignore_unicode_prefix

AttributeError: 'int' object has no attribute 'alias'

It seems that I can trick the function into working as I want by adding and subtracting one of the other columns (so they add to zero) and then adding the number I want (10 in this case):

dt.withColumn('new_column', dt.messagetype - dt.messagetype + 10).head(5)
[Row(fromuserid=425, messagetype=1, dt=4809600.0, new_column=10),
 Row(fromuserid=47019141, messagetype=1, dt=4809600.0, new_column=10),
 Row(fromuserid=49746356, messagetype=1, dt=4809600.0, new_column=10),
 Row(fromuserid=93506471, messagetype=1, dt=4809600.0, new_column=10),
 Row(fromuserid=80488242, messagetype=1, dt=4809600.0, new_column=10)]

This is supremely hacky, right? I assume there is a more legit way to do this?


回答 0

Spark 2.2+

Spark 2.2引入typedLit了support SeqMapTuplesSPARK-19254),并且应该支持以下调用(Scala):

import org.apache.spark.sql.functions.typedLit

df.withColumn("some_array", typedLit(Seq(1, 2, 3)))
df.withColumn("some_struct", typedLit(("foo", 1, 0.3)))
df.withColumn("some_map", typedLit(Map("key1" -> 1, "key2" -> 2)))

Spark 1.3以上lit),1.4以上arraystruct),2.0以上map):

的第二个参数DataFrame.withColumn应该是a,Column因此您必须使用文字:

from pyspark.sql.functions import lit

df.withColumn('new_column', lit(10))

如果您需要复杂的列,则可以使用以下代码块构建这些列array

from pyspark.sql.functions import array, create_map, struct

df.withColumn("some_array", array(lit(1), lit(2), lit(3)))
df.withColumn("some_struct", struct(lit("foo"), lit(1), lit(.3)))
df.withColumn("some_map", create_map(lit("key1"), lit(1), lit("key2"), lit(2)))

可以在Scala中使用完全相同的方法。

import org.apache.spark.sql.functions.{array, lit, map, struct}

df.withColumn("new_column", lit(10))
df.withColumn("map", map(lit("key1"), lit(1), lit("key2"), lit(2)))

为了提供名称structs或者使用alias上的每个字段:

df.withColumn(
    "some_struct",
    struct(lit("foo").alias("x"), lit(1).alias("y"), lit(0.3).alias("z"))
 )

cast整个对象

df.withColumn(
    "some_struct", 
    struct(lit("foo"), lit(1), lit(0.3)).cast("struct<x: string, y: integer, z: double>")
 )

尽管较慢,也可以使用UDF。

注意事项

可以使用相同的构造将常量参数传递给UDF或SQL函数。

Spark 2.2+

Spark 2.2 introduces typedLit to support Seq, Map, and Tuples (SPARK-19254) and following calls should be supported (Scala):

import org.apache.spark.sql.functions.typedLit

df.withColumn("some_array", typedLit(Seq(1, 2, 3)))
df.withColumn("some_struct", typedLit(("foo", 1, 0.3)))
df.withColumn("some_map", typedLit(Map("key1" -> 1, "key2" -> 2)))

Spark 1.3+ (lit), 1.4+ (array, struct), 2.0+ (map):

The second argument for DataFrame.withColumn should be a Column so you have to use a literal:

from pyspark.sql.functions import lit

df.withColumn('new_column', lit(10))

If you need complex columns you can build these using blocks like array:

from pyspark.sql.functions import array, create_map, struct

df.withColumn("some_array", array(lit(1), lit(2), lit(3)))
df.withColumn("some_struct", struct(lit("foo"), lit(1), lit(.3)))
df.withColumn("some_map", create_map(lit("key1"), lit(1), lit("key2"), lit(2)))

Exactly the same methods can be used in Scala.

import org.apache.spark.sql.functions.{array, lit, map, struct}

df.withColumn("new_column", lit(10))
df.withColumn("map", map(lit("key1"), lit(1), lit("key2"), lit(2)))

To provide names for structs use either alias on each field:

df.withColumn(
    "some_struct",
    struct(lit("foo").alias("x"), lit(1).alias("y"), lit(0.3).alias("z"))
 )

or cast on the whole object

df.withColumn(
    "some_struct", 
    struct(lit("foo"), lit(1), lit(0.3)).cast("struct<x: string, y: integer, z: double>")
 )

It is also possible, although slower, to use an UDF.

Note:

The same constructs can be used to pass constant arguments to UDFs or SQL functions.


回答 1

在spark 2.2中,有两种方法可以在DataFrame的列中添加常量值:

1)使用 lit

2)使用typedLit

两者之间的区别在于typedLit还可以处理参数化的Scala类型,例如List,Seq和Map

样本数据框:

val df = spark.createDataFrame(Seq((0,"a"),(1,"b"),(2,"c"))).toDF("id", "col1")

+---+----+
| id|col1|
+---+----+
|  0|   a|
|  1|   b|
+---+----+

1)使用lit在名为newcol的新列中添加常量字符串值:

import org.apache.spark.sql.functions.lit
val newdf = df.withColumn("newcol",lit("myval"))

结果:

+---+----+------+
| id|col1|newcol|
+---+----+------+
|  0|   a| myval|
|  1|   b| myval|
+---+----+------+

2)使用typedLit

import org.apache.spark.sql.functions.typedLit
df.withColumn("newcol", typedLit(("sample", 10, .044)))

结果:

+---+----+-----------------+
| id|col1|           newcol|
+---+----+-----------------+
|  0|   a|[sample,10,0.044]|
|  1|   b|[sample,10,0.044]|
|  2|   c|[sample,10,0.044]|
+---+----+-----------------+

In spark 2.2 there are two ways to add constant value in a column in DataFrame:

1) Using lit

2) Using typedLit.

The difference between the two is that typedLit can also handle parameterized scala types e.g. List, Seq, and Map

Sample DataFrame:

val df = spark.createDataFrame(Seq((0,"a"),(1,"b"),(2,"c"))).toDF("id", "col1")

+---+----+
| id|col1|
+---+----+
|  0|   a|
|  1|   b|
+---+----+

1) Using lit: Adding constant string value in new column named newcol:

import org.apache.spark.sql.functions.lit
val newdf = df.withColumn("newcol",lit("myval"))

Result:

+---+----+------+
| id|col1|newcol|
+---+----+------+
|  0|   a| myval|
|  1|   b| myval|
+---+----+------+

2) Using typedLit:

import org.apache.spark.sql.functions.typedLit
df.withColumn("newcol", typedLit(("sample", 10, .044)))

Result:

+---+----+-----------------+
| id|col1|           newcol|
+---+----+-----------------+
|  0|   a|[sample,10,0.044]|
|  1|   b|[sample,10,0.044]|
|  2|   c|[sample,10,0.044]|
+---+----+-----------------+

如何将新列添加到Spark DataFrame(使用PySpark)?

问题:如何将新列添加到Spark DataFrame(使用PySpark)?

我有一个Spark DataFrame(使用PySpark 1.5.1),想添加一个新列。

我已经尝试了以下方法,但没有成功:

type(randomed_hours) # => list

# Create in Python and transform to RDD

new_col = pd.DataFrame(randomed_hours, columns=['new_col'])

spark_new_col = sqlContext.createDataFrame(new_col)

my_df_spark.withColumn("hours", spark_new_col["new_col"])

使用此命令也出错:

my_df_spark.withColumn("hours",  sc.parallelize(randomed_hours))

那么,如何使用PySpark将新列(基于Python向量)添加到现有DataFrame中?

I have a Spark DataFrame (using PySpark 1.5.1) and would like to add a new column.

I’ve tried the following without any success:

type(randomed_hours) # => list

# Create in Python and transform to RDD

new_col = pd.DataFrame(randomed_hours, columns=['new_col'])

spark_new_col = sqlContext.createDataFrame(new_col)

my_df_spark.withColumn("hours", spark_new_col["new_col"])

Also got an error using this:

my_df_spark.withColumn("hours",  sc.parallelize(randomed_hours))

So how do I add a new column (based on Python vector) to an existing DataFrame with PySpark?


回答 0

您不能将任意列添加到DataFrameSpark中。只能通过使用文字来创建新列(其他文字类型在如何在Spark DataFrame中添加常量列中进行了描述)。

from pyspark.sql.functions import lit

df = sqlContext.createDataFrame(
    [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))

df_with_x4 = df.withColumn("x4", lit(0))
df_with_x4.show()

## +---+---+-----+---+
## | x1| x2|   x3| x4|
## +---+---+-----+---+
## |  1|  a| 23.0|  0|
## |  3|  B|-23.0|  0|
## +---+---+-----+---+

转换现有列:

from pyspark.sql.functions import exp

df_with_x5 = df_with_x4.withColumn("x5", exp("x3"))
df_with_x5.show()

## +---+---+-----+---+--------------------+
## | x1| x2|   x3| x4|                  x5|
## +---+---+-----+---+--------------------+
## |  1|  a| 23.0|  0| 9.744803446248903E9|
## |  3|  B|-23.0|  0|1.026187963170189...|
## +---+---+-----+---+--------------------+

包括使用join

from pyspark.sql.functions import exp

lookup = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v"))
df_with_x6 = (df_with_x5
    .join(lookup, col("x1") == col("k"), "leftouter")
    .drop("k")
    .withColumnRenamed("v", "x6"))

## +---+---+-----+---+--------------------+----+
## | x1| x2|   x3| x4|                  x5|  x6|
## +---+---+-----+---+--------------------+----+
## |  1|  a| 23.0|  0| 9.744803446248903E9| foo|
## |  3|  B|-23.0|  0|1.026187963170189...|null|
## +---+---+-----+---+--------------------+----+

或使用函数/ udf生成:

from pyspark.sql.functions import rand

df_with_x7 = df_with_x6.withColumn("x7", rand())
df_with_x7.show()

## +---+---+-----+---+--------------------+----+-------------------+
## | x1| x2|   x3| x4|                  x5|  x6|                 x7|
## +---+---+-----+---+--------------------+----+-------------------+
## |  1|  a| 23.0|  0| 9.744803446248903E9| foo|0.41930610446846617|
## |  3|  B|-23.0|  0|1.026187963170189...|null|0.37801881545497873|
## +---+---+-----+---+--------------------+----+-------------------+

在性能方面,pyspark.sql.functions映射到Catalyst表达式的内置函数()通常优于Python用户定义的函数。

如果要添加任意RDD的内容作为列,则可以

  • 行号添加到现有数据框
  • 调用zipWithIndexRDD并将其转换为数据帧
  • 使用索引作为连接键来连接两者

You cannot add an arbitrary column to a DataFrame in Spark. New columns can be created only by using literals (other literal types are described in How to add a constant column in a Spark DataFrame?)

from pyspark.sql.functions import lit

df = sqlContext.createDataFrame(
    [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))

df_with_x4 = df.withColumn("x4", lit(0))
df_with_x4.show()

## +---+---+-----+---+
## | x1| x2|   x3| x4|
## +---+---+-----+---+
## |  1|  a| 23.0|  0|
## |  3|  B|-23.0|  0|
## +---+---+-----+---+

transforming an existing column:

from pyspark.sql.functions import exp

df_with_x5 = df_with_x4.withColumn("x5", exp("x3"))
df_with_x5.show()

## +---+---+-----+---+--------------------+
## | x1| x2|   x3| x4|                  x5|
## +---+---+-----+---+--------------------+
## |  1|  a| 23.0|  0| 9.744803446248903E9|
## |  3|  B|-23.0|  0|1.026187963170189...|
## +---+---+-----+---+--------------------+

included using join:

from pyspark.sql.functions import exp

lookup = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v"))
df_with_x6 = (df_with_x5
    .join(lookup, col("x1") == col("k"), "leftouter")
    .drop("k")
    .withColumnRenamed("v", "x6"))

## +---+---+-----+---+--------------------+----+
## | x1| x2|   x3| x4|                  x5|  x6|
## +---+---+-----+---+--------------------+----+
## |  1|  a| 23.0|  0| 9.744803446248903E9| foo|
## |  3|  B|-23.0|  0|1.026187963170189...|null|
## +---+---+-----+---+--------------------+----+

or generated with function / udf:

from pyspark.sql.functions import rand

df_with_x7 = df_with_x6.withColumn("x7", rand())
df_with_x7.show()

## +---+---+-----+---+--------------------+----+-------------------+
## | x1| x2|   x3| x4|                  x5|  x6|                 x7|
## +---+---+-----+---+--------------------+----+-------------------+
## |  1|  a| 23.0|  0| 9.744803446248903E9| foo|0.41930610446846617|
## |  3|  B|-23.0|  0|1.026187963170189...|null|0.37801881545497873|
## +---+---+-----+---+--------------------+----+-------------------+

Performance-wise, built-in functions (pyspark.sql.functions), which map to Catalyst expression, are usually preferred over Python user defined functions.

If you want to add content of an arbitrary RDD as a column you can


回答 1

要使用UDF添加列:

df = sqlContext.createDataFrame(
    [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))

from pyspark.sql.functions import udf
from pyspark.sql.types import *

def valueToCategory(value):
   if   value == 1: return 'cat1'
   elif value == 2: return 'cat2'
   ...
   else: return 'n/a'

# NOTE: it seems that calls to udf() must be after SparkContext() is called
udfValueToCategory = udf(valueToCategory, StringType())
df_with_cat = df.withColumn("category", udfValueToCategory("x1"))
df_with_cat.show()

## +---+---+-----+---------+
## | x1| x2|   x3| category|
## +---+---+-----+---------+
## |  1|  a| 23.0|     cat1|
## |  3|  B|-23.0|      n/a|
## +---+---+-----+---------+

To add a column using a UDF:

df = sqlContext.createDataFrame(
    [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))

from pyspark.sql.functions import udf
from pyspark.sql.types import *

def valueToCategory(value):
   if   value == 1: return 'cat1'
   elif value == 2: return 'cat2'
   ...
   else: return 'n/a'

# NOTE: it seems that calls to udf() must be after SparkContext() is called
udfValueToCategory = udf(valueToCategory, StringType())
df_with_cat = df.withColumn("category", udfValueToCategory("x1"))
df_with_cat.show()

## +---+---+-----+---------+
## | x1| x2|   x3| category|
## +---+---+-----+---------+
## |  1|  a| 23.0|     cat1|
## |  3|  B|-23.0|      n/a|
## +---+---+-----+---------+

回答 2

对于Spark 2.0

# assumes schema has 'age' column 
df.select('*', (df.age + 10).alias('agePlusTen'))

For Spark 2.0

# assumes schema has 'age' column 
df.select('*', (df.age + 10).alias('agePlusTen'))

回答 3

我们可以通过多种方式在pySpark中添加新列。

让我们首先创建一个简单的DataFrame。

date = [27, 28, 29, None, 30, 31]
df = spark.createDataFrame(date, IntegerType())

现在,让我们尝试将列值加倍并将其存储在新列中。PFB很少有不同的方法可以实现相同。

# Approach - 1 : using withColumn function
df.withColumn("double", df.value * 2).show()

# Approach - 2 : using select with alias function.
df.select("*", (df.value * 2).alias("double")).show()

# Approach - 3 : using selectExpr function with as clause.
df.selectExpr("*", "value * 2 as double").show()

# Approach - 4 : Using as clause in SQL statement.
df.createTempView("temp")
spark.sql("select *, value * 2 as double from temp").show()

有关Spark DataFrame函数的更多示例和说明,请访问我的博客

我希望这有帮助。

There are multiple ways we can add a new column in pySpark.

Let’s first create a simple DataFrame.

date = [27, 28, 29, None, 30, 31]
df = spark.createDataFrame(date, IntegerType())

Now let’s try to double the column value and store it in a new column. PFB few different approaches to achieve the same.

# Approach - 1 : using withColumn function
df.withColumn("double", df.value * 2).show()

# Approach - 2 : using select with alias function.
df.select("*", (df.value * 2).alias("double")).show()

# Approach - 3 : using selectExpr function with as clause.
df.selectExpr("*", "value * 2 as double").show()

# Approach - 4 : Using as clause in SQL statement.
df.createTempView("temp")
spark.sql("select *, value * 2 as double from temp").show()

For more examples and explanation on spark DataFrame functions, you can visit my blog.

I hope this helps.


回答 4

您可以udf在添加时定义一个新的column_name

u_f = F.udf(lambda :yourstring,StringType())
a.select(u_f().alias('column_name')

You can define a new udf when adding a column_name:

u_f = F.udf(lambda :yourstring,StringType())
a.select(u_f().alias('column_name')

回答 5

from pyspark.sql.functions import udf
from pyspark.sql.types import *
func_name = udf(
    lambda val: val, # do sth to val
    StringType()
)
df.withColumn('new_col', func_name(df.old_col))
from pyspark.sql.functions import udf
from pyspark.sql.types import *
func_name = udf(
    lambda val: val, # do sth to val
    StringType()
)
df.withColumn('new_col', func_name(df.old_col))

回答 6

我想提供一个非常相似的用例的通用示例:

用例:我的csv包含:

First|Third|Fifth
data|data|data
data|data|data
...billion more lines

我需要执行一些转换,最终的csv需要看起来像

First|Second|Third|Fourth|Fifth
data|null|data|null|data
data|null|data|null|data
...billion more lines

我需要执行此操作,因为这是某些模型定义的架构,并且我需要最终数据与SQL Bulk Inserts等具有互操作性。

所以:

1)我使用spark.read读取原始的csv,并将其称为“ df”。

2)我对数据做了一些处理。

3)我使用此脚本添加空列:

outcols = []
for column in MY_COLUMN_LIST:
    if column in df.columns:
        outcols.append(column)
    else:
        outcols.append(lit(None).cast(StringType()).alias('{0}'.format(column)))

df = df.select(outcols)

这样,您可以在加载csv之后构造架构(如果必须对许多表执行此操作,也可以对列进行重新排序)。

I would like to offer a generalized example for a very similar use case:

Use Case: I have a csv consisting of:

First|Third|Fifth
data|data|data
data|data|data
...billion more lines

I need to perform some transformations and the final csv needs to look like

First|Second|Third|Fourth|Fifth
data|null|data|null|data
data|null|data|null|data
...billion more lines

I need to do this because this is the schema defined by some model and I need for my final data to be interoperable with SQL Bulk Inserts and such things.

so:

1) I read the original csv using spark.read and call it “df”.

2) I do something to the data.

3) I add the null columns using this script:

outcols = []
for column in MY_COLUMN_LIST:
    if column in df.columns:
        outcols.append(column)
    else:
        outcols.append(lit(None).cast(StringType()).alias('{0}'.format(column)))

df = df.select(outcols)

In this way, you can structure your schema after loading a csv (would also work for reordering columns if you have to do this for many tables).


回答 7

添加列的最简单方法是使用“ withColumn”。由于数据框是使用sqlContext创建的,因此您必须指定架构或默认情况下可以在数据集中使用。如果指定了架构,则每次更改时工作量都会变得很乏味。

您可以考虑以下示例:

from pyspark.sql import SQLContext
from pyspark.sql.types import *
sqlContext = SQLContext(sc) # SparkContext will be sc by default 

# Read the dataset of your choice (Already loaded with schema)
Data = sqlContext.read.csv("/path", header = True/False, schema = "infer", sep = "delimiter")

# For instance the data has 30 columns from col1, col2, ... col30. If you want to add a 31st column, you can do so by the following:
Data = Data.withColumn("col31", "Code goes here")

# Check the change 
Data.printSchema()

The simplest way to add a column is to use “withColumn”. Since the dataframe is created using sqlContext, you have to specify the schema or by default can be available in the dataset. If the schema is specified, the workload becomes tedious when changing every time.

Below is an example that you can consider:

from pyspark.sql import SQLContext
from pyspark.sql.types import *
sqlContext = SQLContext(sc) # SparkContext will be sc by default 

# Read the dataset of your choice (Already loaded with schema)
Data = sqlContext.read.csv("/path", header = True/False, schema = "infer", sep = "delimiter")

# For instance the data has 30 columns from col1, col2, ... col30. If you want to add a 31st column, you can do so by the following:
Data = Data.withColumn("col31", "Code goes here")

# Check the change 
Data.printSchema()

回答 8

我们可以通过以下步骤直接向DataFrame添加其他列:

from pyspark.sql.functions import when
df = spark.createDataFrame([["amit", 30], ["rohit", 45], ["sameer", 50]], ["name", "age"])
df = df.withColumn("profile", when(df.age >= 40, "Senior").otherwise("Executive"))
df.show()

We can add additional columns to DataFrame directly with below steps:

from pyspark.sql.functions import when
df = spark.createDataFrame([["amit", 30], ["rohit", 45], ["sameer", 50]], ["name", "age"])
df = df.withColumn("profile", when(df.age >= 40, "Senior").otherwise("Executive"))
df.show()

在python shell中导入pyspark

问题:在python shell中导入pyspark

这是其他论坛上从未有人回答过的其他人的问题的副本,因此我想在这里重新提问,因为我有同样的问题。(请参阅http://geekple.com/blogs/feeds/Xgzu7/posts/351703064084736

我已经在机器上正确安装了Spark,并且在使用./bin/pyspark作为我的python解释器时,能够使用pyspark模块运行python程序而不会出错。

但是,当我尝试运行常规Python Shell时,当我尝试导入pyspark模块时,出现此错误:

from pyspark import SparkContext

它说

"No module named pyspark".

我怎样才能解决这个问题?我是否需要设置环境变量以将Python指向pyspark标头/库/等?如果我的spark安装是/ spark /,我需要包括哪些pyspark路径?还是只能从pyspark解释器运行pyspark程序?

This is a copy of someone else’s question on another forum that was never answered, so I thought I’d re-ask it here, as I have the same issue. (See http://geekple.com/blogs/feeds/Xgzu7/posts/351703064084736)

I have Spark installed properly on my machine and am able to run python programs with the pyspark modules without error when using ./bin/pyspark as my python interpreter.

However, when I attempt to run the regular Python shell, when I try to import pyspark modules I get this error:

from pyspark import SparkContext

and it says

"No module named pyspark".

How can I fix this? Is there an environment variable I need to set to point Python to the pyspark headers/libraries/etc.? If my spark installation is /spark/, which pyspark paths do I need to include? Or can pyspark programs only be run from the pyspark interpreter?


回答 0

这是一个简单的方法(如果您不关心它的工作原理!!!)

使用findspark

  1. 转到您的python shell

    pip install findspark
    
    import findspark
    findspark.init()
  2. 导入必要的模块

    from pyspark import SparkContext
    from pyspark import SparkConf
  3. 完成!!!

Here is a simple method (If you don’t bother about how it works!!!)

Use findspark

  1. Go to your python shell

    pip install findspark
    
    import findspark
    findspark.init()
    
  2. import the necessary modules

    from pyspark import SparkContext
    from pyspark import SparkConf
    
  3. Done!!!


回答 1

如果打印出这样的错误:

ImportError:没有名为py4j.java_gateway的模块

请将$ SPARK_HOME / python / build添加到PYTHONPATH:

export SPARK_HOME=/Users/pzhang/apps/spark-1.1.0-bin-hadoop2.4
export PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/build:$PYTHONPATH

If it prints such error:

ImportError: No module named py4j.java_gateway

Please add $SPARK_HOME/python/build to PYTHONPATH:

export SPARK_HOME=/Users/pzhang/apps/spark-1.1.0-bin-hadoop2.4
export PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/build:$PYTHONPATH

回答 2

原来pyspark bin是LOADING python,并且会自动加载正确的库路径。签出$ SPARK_HOME / bin / pyspark:

# Add the PySpark classes to the Python path:
export PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH

我将此行添加到我的.bashrc文件中,现在可以正确找到模块了!

Turns out that the pyspark bin is LOADING python and automatically loading the correct library paths. Check out $SPARK_HOME/bin/pyspark :

# Add the PySpark classes to the Python path:
export PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH

I added this line to my .bashrc file and the modules are now correctly found!


回答 3

不要将py文件运行为:python filename.py 而是使用:spark-submit filename.py

dont run your py file as: python filename.py instead use: spark-submit filename.py


回答 4

通过导出SPARK路径和Py4j路径,它开始起作用:

export SPARK_HOME=/usr/local/Cellar/apache-spark/1.5.1
export PYTHONPATH=$SPARK_HOME/libexec/python:$SPARK_HOME/libexec/python/build:$PYTHONPATH
PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH 
export PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/build:$PYTHONPATH

因此,如果您不想在每次启动Python Shell时都键入这些内容,则可能需要将其添加到.bashrc文件中

By exporting the SPARK path and the Py4j path, it started to work:

export SPARK_HOME=/usr/local/Cellar/apache-spark/1.5.1
export PYTHONPATH=$SPARK_HOME/libexec/python:$SPARK_HOME/libexec/python/build:$PYTHONPATH
PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH 
export PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/build:$PYTHONPATH

So, if you don’t want to type these everytime you want to fire up the Python shell, you might want to add it to your .bashrc file


回答 5

在Mac上,我使用Homebrew来安装Spark(公式为“ apache-spark”)。然后,我以这种方式设置PYTHONPATH,以便Python导入起作用:

export SPARK_HOME=/usr/local/Cellar/apache-spark/1.2.0
export PYTHONPATH=$SPARK_HOME/libexec/python:$SPARK_HOME/libexec/python/build:$PYTHONPATH

用Mac上的实际apache-spark版本替换“ 1.2.0”。

On Mac, I use Homebrew to install Spark (formula “apache-spark”). Then, I set the PYTHONPATH this way so the Python import works:

export SPARK_HOME=/usr/local/Cellar/apache-spark/1.2.0
export PYTHONPATH=$SPARK_HOME/libexec/python:$SPARK_HOME/libexec/python/build:$PYTHONPATH

Replace the “1.2.0” with the actual apache-spark version on your mac.


回答 6

为了在pyspark中执行Spark,需要两个组件一起工作:

  • pyspark python包
  • JVM中的Spark实例

在使用spark-submit或pyspark启动事物时,这些脚本将同时处理这两个脚本,即它们设置了PYTHONPATH,PATH等,以便您的脚本可以找到pyspark,并且它们还启动spark实例,并根据您的参数进行配置,例如–master X

另外,也可以绕过这些脚本,并直接在python解释器中运行spark应用程序python myscript.py。当spark脚本开始变得更加复杂并最终收到自己的args时,这尤其有趣。

  1. 确保pyspark软件包可以被Python解释器找到。如前所述,可以将spark / python目录添加到PYTHONPATH或使用pip install直接安装pyspark。
  2. 从您的脚本(曾经传递给pyspark的脚本)中设置spark实例的参数。
    • 对于通常使用–conf设置的spark配置,它们在SparkSession.builder.config中使用配置对象(或字符串配置)进行定义
    • 对于当前的主要选项(例如–master或–driver-mem),您可以通过写入PYSPARK_SUBMIT_ARGS环境变量来进行设置。为了使事情更干净,更安全,您可以在Python本身中进行设置,并且启动时spark会读取它。
  3. 启动实例,只需要您getOrCreate()从构建器对象调用即可。

因此,您的脚本可以具有以下内容:

from pyspark.sql import SparkSession

if __name__ == "__main__":
    if spark_main_opts:
        # Set main options, e.g. "--master local[4]"
        os.environ['PYSPARK_SUBMIT_ARGS'] = spark_main_opts + " pyspark-shell"

    # Set spark config
    spark = (SparkSession.builder
             .config("spark.checkpoint.compress", True)
             .config("spark.jars.packages", "graphframes:graphframes:0.5.0-spark2.1-s_2.11")
             .getOrCreate())

For a Spark execution in pyspark two components are required to work together:

  • pyspark python package
  • Spark instance in a JVM

When launching things with spark-submit or pyspark, these scripts will take care of both, i.e. they set up your PYTHONPATH, PATH, etc, so that your script can find pyspark, and they also start the spark instance, configuring according to your params, e.g. –master X

Alternatively, it is possible to bypass these scripts and run your spark application directly in the python interpreter likepython myscript.py. This is especially interesting when spark scripts start to become more complex and eventually receive their own args.

  1. Ensure the pyspark package can be found by the Python interpreter. As already discussed either add the spark/python dir to PYTHONPATH or directly install pyspark using pip install.
  2. Set the parameters of spark instance from your script (those that used to be passed to pyspark).
    • For spark configurations as you’d normally set with –conf they are defined with a config object (or string configs) in SparkSession.builder.config
    • For main options (like –master, or –driver-mem) for the moment you can set them by writing to the PYSPARK_SUBMIT_ARGS environment variable. To make things cleaner and safer you can set it from within Python itself, and spark will read it when starting.
  3. Start the instance, which just requires you to call getOrCreate() from the builder object.

Your script can therefore have something like this:

from pyspark.sql import SparkSession

if __name__ == "__main__":
    if spark_main_opts:
        # Set main options, e.g. "--master local[4]"
        os.environ['PYSPARK_SUBMIT_ARGS'] = spark_main_opts + " pyspark-shell"

    # Set spark config
    spark = (SparkSession.builder
             .config("spark.checkpoint.compress", True)
             .config("spark.jars.packages", "graphframes:graphframes:0.5.0-spark2.1-s_2.11")
             .getOrCreate())

回答 7

要摆脱ImportError: No module named py4j.java_gateway,您需要添加以下几行:

import os
import sys


os.environ['SPARK_HOME'] = "D:\python\spark-1.4.1-bin-hadoop2.4"


sys.path.append("D:\python\spark-1.4.1-bin-hadoop2.4\python")
sys.path.append("D:\python\spark-1.4.1-bin-hadoop2.4\python\lib\py4j-0.8.2.1-src.zip")

try:
    from pyspark import SparkContext
    from pyspark import SparkConf

    print ("success")

except ImportError as e:
    print ("error importing spark modules", e)
    sys.exit(1)

To get rid of ImportError: No module named py4j.java_gateway, you need to add following lines:

import os
import sys


os.environ['SPARK_HOME'] = "D:\python\spark-1.4.1-bin-hadoop2.4"


sys.path.append("D:\python\spark-1.4.1-bin-hadoop2.4\python")
sys.path.append("D:\python\spark-1.4.1-bin-hadoop2.4\python\lib\py4j-0.8.2.1-src.zip")

try:
    from pyspark import SparkContext
    from pyspark import SparkConf

    print ("success")

except ImportError as e:
    print ("error importing spark modules", e)
    sys.exit(1)

回答 8

在Windows 10上,以下内容对我有用。我使用“设置” >“ 编辑您的帐户的环境变量添加了以下环境变量:

SPARK_HOME=C:\Programming\spark-2.0.1-bin-hadoop2.7
PYTHONPATH=%SPARK_HOME%\python;%PYTHONPATH%

(将“ C:\ Programming \ …”更改为安装了spark的文件夹)

On Windows 10 the following worked for me. I added the following environment variables using Settings > Edit environment variables for your account:

SPARK_HOME=C:\Programming\spark-2.0.1-bin-hadoop2.7
PYTHONPATH=%SPARK_HOME%\python;%PYTHONPATH%

(change “C:\Programming\…” to the folder in which you have installed spark)


回答 9

对于Linux用户,以下是在PYTHONPATH中包含pyspark libaray的正确方法(并且不是硬编码)。PATH的两个部分都是必需的:

  1. pyspark Python模块本身的路径,以及
  2. 导入时pyspark模块依赖的压缩库的路径

请注意以下内容,压缩库的版本是动态确定的,因此我们不会对其进行硬编码。

export PYTHONPATH=${SPARK_HOME}/python/:$(echo ${SPARK_HOME}/python/lib/py4j-*-src.zip):${PYTHONPATH}

For Linux users, the following is the correct (and non-hard-coded) way of including the pyspark libaray in PYTHONPATH. Both PATH parts are necessary:

  1. The path to the pyspark Python module itself, and
  2. The path to the zipped library that that pyspark module relies on when imported

Notice below that the zipped library version is dynamically determined, so we do not hard-code it.

export PYTHONPATH=${SPARK_HOME}/python/:$(echo ${SPARK_HOME}/python/lib/py4j-*-src.zip):${PYTHONPATH}

回答 10

我正在CentOS VM上运行一个火花集群,该集群是从cloudera yum软件包安装的。

必须设置以下变量才能运行pyspark。

export SPARK_HOME=/usr/lib/spark;
export PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.9-src.zip:$PYTHONPATH

I am running a spark cluster, on CentOS VM, which is installed from cloudera yum packages.

Had to set the following variables to run pyspark.

export SPARK_HOME=/usr/lib/spark;
export PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.9-src.zip:$PYTHONPATH

回答 11

export PYSPARK_PYTHON=/home/user/anaconda3/bin/python
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS='notebook'

这就是我将Anaconda发行版与Spark结合使用的过程。这是独立于Spark版本的。您可以将第一行更改为用户的python bin。另外,从Spark 2.2.0起,PySpark作为PyPi上的独立程序包提供,但我尚未对其进行测试。

export PYSPARK_PYTHON=/home/user/anaconda3/bin/python
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS='notebook'

This is what I did for using my Anaconda distribution with Spark. This is Spark version independent. You can change the first line to your users’ python bin. Also, as of Spark 2.2.0 PySpark is available as a Stand-alone package on PyPi but I am yet to test it out.


回答 12

您可以使用以下方式获取pyspark pathpython中的in pip(如果您已使用PIP安装了pyspark),如下所示

pip show pyspark

You can get the pyspark path in python using pip (if you have installed pyspark using PIP) as below

pip show pyspark

回答 13

我有同样的问题。

还要确保您使用的是正确的python版本,并且要以正确的pip版本进行安装。就我而言:我同时拥有python 2.7和3.x。我已经安装了pyspark与

pip2.7安装pyspark

而且有效。

I had the same problem.

Also make sure you are using right python version and you are installing it with right pip version. in my case: I had both python 2.7 and 3.x. I have installed pyspark with

pip2.7 install pyspark

and it worked.


回答 14

我收到此错误,是因为我尝试提交的python脚本称为pyspark.py(facepalm)。解决方法是按照上述建议设置我的PYTHONPATH,然后将脚本重命名为pyspark_test.py并清理基于我的脚本原始名称创建的pyspark.pyc并清除此错误。

I got this error because the python script I was trying to submit was called pyspark.py (facepalm). The fix was to set my PYTHONPATH as recommended above, then rename the script to pyspark_test.py and clean up the pyspark.pyc that was created based on my scripts original name and that cleared this error up.


回答 15

对于DSE(DataStax Cassandra和Spark),需要在PYTHONPATH中添加以下位置

export PYTHONPATH=/usr/share/dse/resources/spark/python:$PYTHONPATH

然后使用dse pyspark获取路径中的模块。

dse pyspark

In the case of DSE (DataStax Cassandra & Spark) The following location needs to be added to PYTHONPATH

export PYTHONPATH=/usr/share/dse/resources/spark/python:$PYTHONPATH

Then use the dse pyspark to get the modules in path.

dse pyspark

回答 16

我遇到了同样的问题,将在上面提出的解决方案中添加一件事。在Mac OS X上使用Homebrew安装Spark时,您需要更正py4j路径地址,以在路径中包含libexec(记住将py4j版本更改为您拥有的版本);

PYTHONPATH=$SPARK_HOME/libexec/python/lib/py4j-0.9-src.zip:$PYTHONPATH

I had this same problem and would add one thing to the proposed solutions above. When using Homebrew on Mac OS X to install Spark you will need to correct the py4j path address to include libexec in the path (remembering to change py4j version to the one you have);

PYTHONPATH=$SPARK_HOME/libexec/python/lib/py4j-0.9-src.zip:$PYTHONPATH

回答 17

就我而言,它是在另一个python dist_package(python 3.5)上安装的,而我正在使用python 3.6,因此以下内容有所帮助:

python -m pip install pyspark

In my case it was getting install at a different python dist_package (python 3.5) whereas I was using python 3.6, so the below helped:

python -m pip install pyspark

回答 18

您还可以创建一个以Alpine作为操作系统,并以Python和Pyspark作为软件包的Docker容器。这样就将所有内容打包了。

You can also create a Docker container with Alpine as the OS and the install Python and Pyspark as packages. That will have it all containerised.


使用无值过滤Pyspark数据框列

问题:使用无值过滤Pyspark数据框列

我正在尝试过滤具有None作为行值的PySpark数据框:

df.select('dt_mvmt').distinct().collect()

[Row(dt_mvmt=u'2016-03-27'),
 Row(dt_mvmt=u'2016-03-28'),
 Row(dt_mvmt=u'2016-03-29'),
 Row(dt_mvmt=None),
 Row(dt_mvmt=u'2016-03-30'),
 Row(dt_mvmt=u'2016-03-31')]

我可以使用字符串值正确过滤:

df[df.dt_mvmt == '2016-03-31']
# some results here

但这失败了:

df[df.dt_mvmt == None].count()
0
df[df.dt_mvmt != None].count()
0

但是每个类别上肯定都有价值。这是怎么回事?

I’m trying to filter a PySpark dataframe that has None as a row value:

df.select('dt_mvmt').distinct().collect()

[Row(dt_mvmt=u'2016-03-27'),
 Row(dt_mvmt=u'2016-03-28'),
 Row(dt_mvmt=u'2016-03-29'),
 Row(dt_mvmt=None),
 Row(dt_mvmt=u'2016-03-30'),
 Row(dt_mvmt=u'2016-03-31')]

and I can filter correctly with an string value:

df[df.dt_mvmt == '2016-03-31']
# some results here

but this fails:

df[df.dt_mvmt == None].count()
0
df[df.dt_mvmt != None].count()
0

But there are definitely values on each category. What’s going on?


回答 0

您可以使用Column.isNull/ Column.isNotNull

df.where(col("dt_mvmt").isNull())

df.where(col("dt_mvmt").isNotNull())

如果你想简单地丢弃NULL值,您可以使用na.dropsubset参数:

df.na.drop(subset=["dt_mvmt"])

与Equals进行基于相等的比较NULL将不起作用,因为在SQL NULL中未定义,因此任何将其与另一个值进行比较的尝试都将返回NULL

sqlContext.sql("SELECT NULL = NULL").show()
## +-------------+
## |(NULL = NULL)|
## +-------------+
## |         null|
## +-------------+


sqlContext.sql("SELECT NULL != NULL").show()
## +-------------------+
## |(NOT (NULL = NULL))|
## +-------------------+
## |               null|
## +-------------------+

与值进行比较的唯一有效方法NULLIS/ IS NOT,它等效于isNull/ isNotNull方法调用。

You can use Column.isNull / Column.isNotNull:

df.where(col("dt_mvmt").isNull())

df.where(col("dt_mvmt").isNotNull())

If you want to simply drop NULL values you can use na.drop with subset argument:

df.na.drop(subset=["dt_mvmt"])

Equality based comparisons with NULL won’t work because in SQL NULL is undefined so any attempt to compare it with another value returns NULL:

sqlContext.sql("SELECT NULL = NULL").show()
## +-------------+
## |(NULL = NULL)|
## +-------------+
## |         null|
## +-------------+


sqlContext.sql("SELECT NULL != NULL").show()
## +-------------------+
## |(NOT (NULL = NULL))|
## +-------------------+
## |               null|
## +-------------------+

The only valid method to compare value with NULL is IS / IS NOT which are equivalent to the isNull / isNotNull method calls.


回答 1

尝试仅使用isNotNull函数。

df.filter(df.dt_mvmt.isNotNull()).count()

Try to just use isNotNull function.

df.filter(df.dt_mvmt.isNotNull()).count()

回答 2

为了获得dt_mvmt列中的值不为null的条目,我们有

df.filter("dt_mvmt is not NULL")

对于为空的条目,我们有

df.filter("dt_mvmt is NULL")

To obtain entries whose values in the dt_mvmt column are not null we have

df.filter("dt_mvmt is not NULL")

and for entries which are null we have

df.filter("dt_mvmt is NULL")

回答 3

如果您想保留Pandas语法,这对我有用。

df = df[df.dt_mvmt.isNotNull()]

If you want to keep with the Pandas syntex this worked for me.

df = df[df.dt_mvmt.isNotNull()]

回答 4

如果列=无

COLUMN_OLD_VALUE
----------------
None
1
None
100
20
------------------

使用在数据框上创建一个临时表:

sqlContext.sql("select * from tempTable where column_old_value='None' ").show()

因此使用: column_old_value='None'

if column = None

COLUMN_OLD_VALUE
----------------
None
1
None
100
20
------------------

Use create a temptable on data frame:

sqlContext.sql("select * from tempTable where column_old_value='None' ").show()

So use : column_old_value='None'


回答 5

您可以通过多种方式从DataFrame的列中删除/过滤空值。

让我们用下面的代码创建一个简单的DataFrame:

date = ['2016-03-27','2016-03-28','2016-03-29', None, '2016-03-30','2016-03-31']
df = spark.createDataFrame(date, StringType())

现在,您可以尝试使用以下方法之一来筛选出空值。

# Approach - 1
df.filter("value is not null").show()

# Approach - 2
df.filter(col("value").isNotNull()).show()

# Approach - 3
df.filter(df["value"].isNotNull()).show()

# Approach - 4
df.filter(df.value.isNotNull()).show()

# Approach - 5
df.na.drop(subset=["value"]).show()

# Approach - 6
df.dropna(subset=["value"]).show()

# Note: You can also use where function instead of a filter.

您也可以在我的博客上查看“使用NULL值”部分以获取更多信息。

希望对您有所帮助。

There are multiple ways you can remove/filter the null values from a column in DataFrame.

Lets create a simple DataFrame with below code:

date = ['2016-03-27','2016-03-28','2016-03-29', None, '2016-03-30','2016-03-31']
df = spark.createDataFrame(date, StringType())

Now you can try one of the below approach to filter out the null values.

# Approach - 1
df.filter("value is not null").show()

# Approach - 2
df.filter(col("value").isNotNull()).show()

# Approach - 3
df.filter(df["value"].isNotNull()).show()

# Approach - 4
df.filter(df.value.isNotNull()).show()

# Approach - 5
df.na.drop(subset=["value"]).show()

# Approach - 6
df.dropna(subset=["value"]).show()

# Note: You can also use where function instead of a filter.

You can also check the section “Working with NULL Values” on my blog for more information.

I hope it helps.


回答 6

PySpark根据算术,逻辑和其他条件提供各种过滤选项。NULL值的存在可能会妨碍进一步的处理。可以选择删除它们或从统计学上估算它们。

可以考虑以下代码集:

# Dataset is df
# Column name is dt_mvmt
# Before filtering make sure you have the right count of the dataset
df.count() # Some number

# Filter here
df = df.filter(df.dt_mvmt.isNotNull())

# Check the count to ensure there are NULL values present (This is important when dealing with large dataset)
df.count() # Count should be reduced if NULL values are present

PySpark provides various filtering options based on arithmetic, logical and other conditions. Presence of NULL values can hamper further processes. Removing them or statistically imputing them could be a choice.

Below set of code can be considered:

# Dataset is df
# Column name is dt_mvmt
# Before filtering make sure you have the right count of the dataset
df.count() # Some number

# Filter here
df = df.filter(df.dt_mvmt.isNotNull())

# Check the count to ensure there are NULL values present (This is important when dealing with large dataset)
df.count() # Count should be reduced if NULL values are present

回答 7

我也会尝试:

df = df.dropna(subset=["dt_mvmt"])

I would also try:

df = df.dropna(subset=["dt_mvmt"])


回答 8

如果要过滤出列中没有值的记录,请参见以下示例:

df=spark.createDataFrame([[123,"abc"],[234,"fre"],[345,None]],["a","b"])

现在过滤掉空值记录:

df=df.filter(df.b.isNotNull())

df.show()

如果要从DF中删除这些记录,请参见以下内容:

df1=df.na.drop(subset=['b'])

df1.show()

If you want to filter out records having None value in column then see below example:

df=spark.createDataFrame([[123,"abc"],[234,"fre"],[345,None]],["a","b"])

Now filter out null value records:

df=df.filter(df.b.isNotNull())

df.show()

If you want to remove those records from DF then see below:

df1=df.na.drop(subset=['b'])

df1.show()

回答 9

None / Null是pyspark / python中类NoneType的数据类型,因此,当您尝试将NoneType对象与字符串对象进行比较时,下面的方法将不起作用

错误的过滤方法

df [df.dt_mvmt ==无] .count()0 df [df.dt_mvmt!=无] .count()0

正确

df = df.where(col(“ dt_mvmt”)。isNotNull())返回dt_mvmt为None / Null的所有记录

None/Null is a data type of the class NoneType in pyspark/python so, Below will not work as you are trying to compare NoneType object with string object

Wrong way of filreting

df[df.dt_mvmt == None].count() 0 df[df.dt_mvmt != None].count() 0

correct

df=df.where(col(“dt_mvmt”).isNotNull()) returns all records with dt_mvmt as None/Null


用Spark加载CSV文件

问题:用Spark加载CSV文件

我是Spark的新手,正在尝试使用Spark从文件读取CSV数据。这是我在做什么:

sc.textFile('file.csv')
    .map(lambda line: (line.split(',')[0], line.split(',')[1]))
    .collect()

我希望此调用可以给我列出文件的前两列,但出现此错误:

File "<ipython-input-60-73ea98550983>", line 1, in <lambda>
IndexError: list index out of range

尽管我的CSV文件不止一列。

I’m new to Spark and I’m trying to read CSV data from a file with Spark. Here’s what I am doing :

sc.textFile('file.csv')
    .map(lambda line: (line.split(',')[0], line.split(',')[1]))
    .collect()

I would expect this call to give me a list of the two first columns of my file but I’m getting this error :

File "<ipython-input-60-73ea98550983>", line 1, in <lambda>
IndexError: list index out of range

although my CSV file as more than one column.


回答 0

您确定所有行都至少有2列?您可以尝试类似的方法吗?

sc.textFile("file.csv") \
    .map(lambda line: line.split(",")) \
    .filter(lambda line: len(line)>1) \
    .map(lambda line: (line[0],line[1])) \
    .collect()

或者,您可以打印罪魁祸首(如果有):

sc.textFile("file.csv") \
    .map(lambda line: line.split(",")) \
    .filter(lambda line: len(line)<=1) \
    .collect()

Are you sure that all the lines have at least 2 columns? Can you try something like, just to check?:

sc.textFile("file.csv") \
    .map(lambda line: line.split(",")) \
    .filter(lambda line: len(line)>1) \
    .map(lambda line: (line[0],line[1])) \
    .collect()

Alternatively, you could print the culprit (if any):

sc.textFile("file.csv") \
    .map(lambda line: line.split(",")) \
    .filter(lambda line: len(line)<=1) \
    .collect()

回答 1

Spark 2.0.0+

您可以直接使用内置的csv数据源:

spark.read.csv(
    "some_input_file.csv", header=True, mode="DROPMALFORMED", schema=schema
)

要么

(spark.read
    .schema(schema)
    .option("header", "true")
    .option("mode", "DROPMALFORMED")
    .csv("some_input_file.csv"))

不包括任何外部依赖项。

火花<2.0.0

我建议不要手动解析,这在一般情况下是不容易的,我建议spark-csv

确保星火CSV包含在路径(--packages--jars--driver-class-path

并按以下方式加载数据:

(df = sqlContext
    .read.format("com.databricks.spark.csv")
    .option("header", "true")
    .option("inferschema", "true")
    .option("mode", "DROPMALFORMED")
    .load("some_input_file.csv"))

它可以处理加载,模式推断,删除格式错误的行,并且不需要将数据从Python传递到JVM。

注意事项

如果您知道架构,则最好避免架构推断并将其传递给DataFrameReader。假设您有三列-整数,双精度和字符串:

from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType

schema = StructType([
    StructField("A", IntegerType()),
    StructField("B", DoubleType()),
    StructField("C", StringType())
])

(sqlContext
    .read
    .format("com.databricks.spark.csv")
    .schema(schema)
    .option("header", "true")
    .option("mode", "DROPMALFORMED")
    .load("some_input_file.csv"))

Spark 2.0.0+

You can use built-in csv data source directly:

spark.read.csv(
    "some_input_file.csv", header=True, mode="DROPMALFORMED", schema=schema
)

or

(spark.read
    .schema(schema)
    .option("header", "true")
    .option("mode", "DROPMALFORMED")
    .csv("some_input_file.csv"))

without including any external dependencies.

Spark < 2.0.0:

Instead of manual parsing, which is far from trivial in a general case, I would recommend spark-csv:

Make sure that Spark CSV is included in the path (--packages, --jars, --driver-class-path)

And load your data as follows:

(df = sqlContext
    .read.format("com.databricks.spark.csv")
    .option("header", "true")
    .option("inferschema", "true")
    .option("mode", "DROPMALFORMED")
    .load("some_input_file.csv"))

It can handle loading, schema inference, dropping malformed lines and doesn’t require passing data from Python to the JVM.

Note:

If you know the schema, it is better to avoid schema inference and pass it to DataFrameReader. Assuming you have three columns – integer, double and string:

from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType

schema = StructType([
    StructField("A", IntegerType()),
    StructField("B", DoubleType()),
    StructField("C", StringType())
])

(sqlContext
    .read
    .format("com.databricks.spark.csv")
    .schema(schema)
    .option("header", "true")
    .option("mode", "DROPMALFORMED")
    .load("some_input_file.csv"))

回答 2

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

df = spark.read.csv("/home/stp/test1.csv",header=True,sep="|");

print(df.collect())
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

df = spark.read.csv("/home/stp/test1.csv",header=True,sep="|")

print(df.collect())

回答 3

还有另一个选择,包括使用Pandas读取CSV文件,然后将Pandas DataFrame导入Spark。

例如:

from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd

sc = SparkContext('local','example')  # if using locally
sql_sc = SQLContext(sc)

pandas_df = pd.read_csv('file.csv')  # assuming the file contains a header
# pandas_df = pd.read_csv('file.csv', names = ['column 1','column 2']) # if no header
s_df = sql_sc.createDataFrame(pandas_df)

And yet another option which consist in reading the CSV file using Pandas and then importing the Pandas DataFrame into Spark.

For example:

from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd

sc = SparkContext('local','example')  # if using locally
sql_sc = SQLContext(sc)

pandas_df = pd.read_csv('file.csv')  # assuming the file contains a header
# pandas_df = pd.read_csv('file.csv', names = ['column 1','column 2']) # if no header
s_df = sql_sc.createDataFrame(pandas_df)

回答 4

只需按逗号分割也会将字段内的逗号分割(例如a,b,"1,2,3",c),因此不建议使用。如果要使用DataFrames API,zero323的答案很好,但是如果要坚持使用基本Spark,则可以使用csv模块在基本Python中解析csvs :

# works for both python 2 and 3
import csv
rdd = sc.textFile("file.csv")
rdd = rdd.mapPartitions(lambda x: csv.reader(x))

编辑:正如@muon在评论中提到的那样,它将像其他任何行一样对待标头,因此您需要手动提取它。例如,header = rdd.first(); rdd = rdd.filter(lambda x: x != header)(确保header在评估过滤器之前不要进行修改)。但是在这一点上,最好使用内置的csv解析器。

Simply splitting by comma will also split commas that are within fields (e.g. a,b,"1,2,3",c), so it’s not recommended. zero323’s answer is good if you want to use the DataFrames API, but if you want to stick to base Spark, you can parse csvs in base Python with the csv module:

# works for both python 2 and 3
import csv
rdd = sc.textFile("file.csv")
rdd = rdd.mapPartitions(lambda x: csv.reader(x))

EDIT: As @muon mentioned in the comments, this will treat the header like any other row so you’ll need to extract it manually. For example, header = rdd.first(); rdd = rdd.filter(lambda x: x != header) (make sure not to modify header before the filter evaluates). But at this point, you’re probably better off using a built-in csv parser.


回答 5

这是在PYSPARK中

path="Your file path with file name"

df=spark.read.format("csv").option("header","true").option("inferSchema","true").load(path)

那你可以检查

df.show(5)
df.count()

This is in PYSPARK

path="Your file path with file name"

df=spark.read.format("csv").option("header","true").option("inferSchema","true").load(path)

Then you can check

df.show(5)
df.count()

回答 6

如果要将csv加载为数据帧,则可以执行以下操作:

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

df = sqlContext.read.format('com.databricks.spark.csv') \
    .options(header='true', inferschema='true') \
    .load('sampleFile.csv') # this is your csv file

对我来说很好。

If you want to load csv as a dataframe then you can do the following:

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

df = sqlContext.read.format('com.databricks.spark.csv') \
    .options(header='true', inferschema='true') \
    .load('sampleFile.csv') # this is your csv file

It worked fine for me.


回答 7

这与JP Mercier最初提出的有关使用Pandas的建议是一致的,但进行了重大修改:如果将数据分块读取到Pandas中,应该更具延展性。这意味着,您可以解析比Pandas实际可处理的文件大得多的文件,并将其以较小的尺寸传递给Spark。(这也回答了有关为什么如果他们仍然可以将所有内容加载到Pandas中的人为什么要使用Spark的评论。)

from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd

sc = SparkContext('local','example')  # if using locally
sql_sc = SQLContext(sc)

Spark_Full = sc.emptyRDD()
chunk_100k = pd.read_csv("Your_Data_File.csv", chunksize=100000)
# if you have headers in your csv file:
headers = list(pd.read_csv("Your_Data_File.csv", nrows=0).columns)

for chunky in chunk_100k:
    Spark_Full +=  sc.parallelize(chunky.values.tolist())

YourSparkDataFrame = Spark_Full.toDF(headers)
# if you do not have headers, leave empty instead:
# YourSparkDataFrame = Spark_Full.toDF()
YourSparkDataFrame.show()

This is in-line with what JP Mercier initially suggested about using Pandas, but with a major modification: If you read data into Pandas in chunks, it should be more malleable. Meaning, that you can parse a much larger file than Pandas can actually handle as a single piece and pass it to Spark in smaller sizes. (This also answers the comment about why one would want to use Spark if they can load everything into Pandas anyways.)

from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd

sc = SparkContext('local','example')  # if using locally
sql_sc = SQLContext(sc)

Spark_Full = sc.emptyRDD()
chunk_100k = pd.read_csv("Your_Data_File.csv", chunksize=100000)
# if you have headers in your csv file:
headers = list(pd.read_csv("Your_Data_File.csv", nrows=0).columns)

for chunky in chunk_100k:
    Spark_Full +=  sc.parallelize(chunky.values.tolist())

YourSparkDataFrame = Spark_Full.toDF(headers)
# if you do not have headers, leave empty instead:
# YourSparkDataFrame = Spark_Full.toDF()
YourSparkDataFrame.show()

回答 8

现在,对于任何常规的csv文件,还有另一个选项:https : //github.com/seahboonsiew/pyspark-csv,如下所示:

假设我们具有以下上下文

sc = SparkContext
sqlCtx = SQLContext or HiveContext

首先,使用SparkContext将pyspark-csv.py分发给执行者

import pyspark_csv as pycsv
sc.addPyFile('pyspark_csv.py')

通过SparkContext读取CSV数据并将其转换为DataFrame

plaintext_rdd = sc.textFile('hdfs://x.x.x.x/blah.csv')
dataframe = pycsv.csvToDataFrame(sqlCtx, plaintext_rdd)

Now, there’s also another option for any general csv file: https://github.com/seahboonsiew/pyspark-csv as follows:

Assume we have the following context

sc = SparkContext
sqlCtx = SQLContext or HiveContext

First, distribute pyspark-csv.py to executors using SparkContext

import pyspark_csv as pycsv
sc.addPyFile('pyspark_csv.py')

Read csv data via SparkContext and convert it to DataFrame

plaintext_rdd = sc.textFile('hdfs://x.x.x.x/blah.csv')
dataframe = pycsv.csvToDataFrame(sqlCtx, plaintext_rdd)

回答 9

如果您的csv数据恰好在任何字段中都不包含换行符,则可以使用加载textFile()并解析数据

import csv
import StringIO

def loadRecord(line):
    input = StringIO.StringIO(line)
    reader = csv.DictReader(input, fieldnames=["name1", "name2"])
    return reader.next()

input = sc.textFile(inputFile).map(loadRecord)

If your csv data happens to not contain newlines in any of the fields, you can load your data with textFile() and parse it

import csv
import StringIO

def loadRecord(line):
    input = StringIO.StringIO(line)
    reader = csv.DictReader(input, fieldnames=["name1", "name2"])
    return reader.next()

input = sc.textFile(inputFile).map(loadRecord)

回答 10

如果数据集中的任何一个或多个行的列数少于或多于2,则可能会出现此错误。

我也是Pyspark的新手,正在尝试读取CSV文件。以下代码为我工作:

在这段代码中,我使用来自kaggle的数据集,链接为:https ://www.kaggle.com/carrie1/ecommerce-data

1.不提架构:

from pyspark.sql import SparkSession  
scSpark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example: Reading CSV file without mentioning schema") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

sdfData = scSpark.read.csv("data.csv", header=True, sep=",")
sdfData.show()

现在检查列:sdfData.columns

输出将是:

['InvoiceNo', 'StockCode','Description','Quantity', 'InvoiceDate', 'CustomerID', 'Country']

检查每一列的数据类型:

sdfData.schema
StructType(List(StructField(InvoiceNo,StringType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,StringType,true),StructField(InvoiceDate,StringType,true),StructField(UnitPrice,StringType,true),StructField(CustomerID,StringType,true),StructField(Country,StringType,true)))

这将为数据框提供所有列,其数据类型为StringType

2.使用架构: 如果您知道架构或想要更改上表中任何列的数据类型,请使用此格式(假设我正在关注以下列,并希望它们具有特定的数据类型)

from pyspark.sql import SparkSession  
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType
    schema = StructType([\
        StructField("InvoiceNo", IntegerType()),\
        StructField("StockCode", StringType()), \
        StructField("Description", StringType()),\
        StructField("Quantity", IntegerType()),\
        StructField("InvoiceDate", StringType()),\
        StructField("CustomerID", DoubleType()),\
        StructField("Country", StringType())\
    ])

scSpark = SparkSession \
    .builder \
    .appName("Python Spark SQL example: Reading CSV file with schema") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

sdfData = scSpark.read.csv("data.csv", header=True, sep=",", schema=schema)

现在检查每个列的数据类型的架构:

sdfData.schema

StructType(List(StructField(InvoiceNo,IntegerType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,IntegerType,true),StructField(InvoiceDate,StringType,true),StructField(CustomerID,DoubleType,true),StructField(Country,StringType,true)))

编辑:我们也可以使用以下代码行,而无需明确提及架构:

sdfData = scSpark.read.csv("data.csv", header=True, inferSchema = True)
sdfData.schema

输出为:

StructType(List(StructField(InvoiceNo,StringType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,IntegerType,true),StructField(InvoiceDate,StringType,true),StructField(UnitPrice,DoubleType,true),StructField(CustomerID,IntegerType,true),StructField(Country,StringType,true)))

输出将如下所示:

sdfData.show()

+---------+---------+--------------------+--------+--------------+----------+-------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|CustomerID|Country|
+---------+---------+--------------------+--------+--------------+----------+-------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|      2.55|  17850|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|      3.39|  17850|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|      2.75|  17850|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|      3.39|  17850|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/2010 8:26|      3.39|  17850|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/2010 8:26|      7.65|  17850|
|   536365|    21730|GLASS STAR FROSTE...|       6|12/1/2010 8:26|      4.25|  17850|
|   536366|    22633|HAND WARMER UNION...|       6|12/1/2010 8:28|      1.85|  17850|
|   536366|    22632|HAND WARMER RED P...|       6|12/1/2010 8:28|      1.85|  17850|
|   536367|    84879|ASSORTED COLOUR B...|      32|12/1/2010 8:34|      1.69|  13047|
|   536367|    22745|POPPY'S PLAYHOUSE...|       6|12/1/2010 8:34|       2.1|  13047|
|   536367|    22748|POPPY'S PLAYHOUSE...|       6|12/1/2010 8:34|       2.1|  13047|
|   536367|    22749|FELTCRAFT PRINCES...|       8|12/1/2010 8:34|      3.75|  13047|
|   536367|    22310|IVORY KNITTED MUG...|       6|12/1/2010 8:34|      1.65|  13047|
|   536367|    84969|BOX OF 6 ASSORTED...|       6|12/1/2010 8:34|      4.25|  13047|
|   536367|    22623|BOX OF VINTAGE JI...|       3|12/1/2010 8:34|      4.95|  13047|
|   536367|    22622|BOX OF VINTAGE AL...|       2|12/1/2010 8:34|      9.95|  13047|
|   536367|    21754|HOME BUILDING BLO...|       3|12/1/2010 8:34|      5.95|  13047|
|   536367|    21755|LOVE BUILDING BLO...|       3|12/1/2010 8:34|      5.95|  13047|
|   536367|    21777|RECIPE BOX WITH M...|       4|12/1/2010 8:34|      7.95|  13047|
+---------+---------+--------------------+--------+--------------+----------+-------+
only showing top 20 rows

If you are having any one or more row(s) with less or more number of columns than 2 in the dataset then this error may arise.

I am also new to Pyspark and trying to read CSV file. Following code worked for me:

In this code I am using dataset from kaggle the link is: https://www.kaggle.com/carrie1/ecommerce-data

1. Without mentioning the schema:

from pyspark.sql import SparkSession  
scSpark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example: Reading CSV file without mentioning schema") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

sdfData = scSpark.read.csv("data.csv", header=True, sep=",")
sdfData.show()

Now check the columns: sdfData.columns

Output will be:

['InvoiceNo', 'StockCode','Description','Quantity', 'InvoiceDate', 'CustomerID', 'Country']

Check the datatype for each column:

sdfData.schema
StructType(List(StructField(InvoiceNo,StringType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,StringType,true),StructField(InvoiceDate,StringType,true),StructField(UnitPrice,StringType,true),StructField(CustomerID,StringType,true),StructField(Country,StringType,true)))

This will give the data frame with all the columns with datatype as StringType

2. With schema: If you know the schema or want to change the datatype of any column in the above table then use this (let’s say I am having following columns and want them in a particular data type for each of them)

from pyspark.sql import SparkSession  
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType
    schema = StructType([\
        StructField("InvoiceNo", IntegerType()),\
        StructField("StockCode", StringType()), \
        StructField("Description", StringType()),\
        StructField("Quantity", IntegerType()),\
        StructField("InvoiceDate", StringType()),\
        StructField("CustomerID", DoubleType()),\
        StructField("Country", StringType())\
    ])

scSpark = SparkSession \
    .builder \
    .appName("Python Spark SQL example: Reading CSV file with schema") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

sdfData = scSpark.read.csv("data.csv", header=True, sep=",", schema=schema)

Now check the schema for datatype of each column:

sdfData.schema

StructType(List(StructField(InvoiceNo,IntegerType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,IntegerType,true),StructField(InvoiceDate,StringType,true),StructField(CustomerID,DoubleType,true),StructField(Country,StringType,true)))

Edited: We can use the following line of code as well without mentioning schema explicitly:

sdfData = scSpark.read.csv("data.csv", header=True, inferSchema = True)
sdfData.schema

The output is:

StructType(List(StructField(InvoiceNo,StringType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,IntegerType,true),StructField(InvoiceDate,StringType,true),StructField(UnitPrice,DoubleType,true),StructField(CustomerID,IntegerType,true),StructField(Country,StringType,true)))

The output will look like this:

sdfData.show()

+---------+---------+--------------------+--------+--------------+----------+-------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|CustomerID|Country|
+---------+---------+--------------------+--------+--------------+----------+-------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|      2.55|  17850|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|      3.39|  17850|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|      2.75|  17850|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|      3.39|  17850|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/2010 8:26|      3.39|  17850|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/2010 8:26|      7.65|  17850|
|   536365|    21730|GLASS STAR FROSTE...|       6|12/1/2010 8:26|      4.25|  17850|
|   536366|    22633|HAND WARMER UNION...|       6|12/1/2010 8:28|      1.85|  17850|
|   536366|    22632|HAND WARMER RED P...|       6|12/1/2010 8:28|      1.85|  17850|
|   536367|    84879|ASSORTED COLOUR B...|      32|12/1/2010 8:34|      1.69|  13047|
|   536367|    22745|POPPY'S PLAYHOUSE...|       6|12/1/2010 8:34|       2.1|  13047|
|   536367|    22748|POPPY'S PLAYHOUSE...|       6|12/1/2010 8:34|       2.1|  13047|
|   536367|    22749|FELTCRAFT PRINCES...|       8|12/1/2010 8:34|      3.75|  13047|
|   536367|    22310|IVORY KNITTED MUG...|       6|12/1/2010 8:34|      1.65|  13047|
|   536367|    84969|BOX OF 6 ASSORTED...|       6|12/1/2010 8:34|      4.25|  13047|
|   536367|    22623|BOX OF VINTAGE JI...|       3|12/1/2010 8:34|      4.95|  13047|
|   536367|    22622|BOX OF VINTAGE AL...|       2|12/1/2010 8:34|      9.95|  13047|
|   536367|    21754|HOME BUILDING BLO...|       3|12/1/2010 8:34|      5.95|  13047|
|   536367|    21755|LOVE BUILDING BLO...|       3|12/1/2010 8:34|      5.95|  13047|
|   536367|    21777|RECIPE BOX WITH M...|       4|12/1/2010 8:34|      7.95|  13047|
+---------+---------+--------------------+--------+--------------+----------+-------+
only showing top 20 rows

回答 11

使用时spark.read.csv,我发现使用这些选项escape='"'multiLine=TrueCSV标准提供最一致的解决方案,以我的经验,从Google表格中导出的CSV文件效果最好。

那是,

#set inferSchema=False to read everything as string
df = spark.read.csv("myData.csv", escape='"', multiLine=True,
     inferSchema=False, header=True)

When using spark.read.csv, I find that using the options escape='"' and multiLine=True provide the most consistent solution to the CSV standard, and in my experience works the best with CSV files exported from Google Sheets.

That is,

#set inferSchema=False to read everything as string
df = spark.read.csv("myData.csv", escape='"', multiLine=True,
     inferSchema=False, header=True)

将Spark DataFrame列转换为python列表

问题:将Spark DataFrame列转换为python列表

我在具有两列mvv和count的数据帧上工作。

+---+-----+
|mvv|count|
+---+-----+
| 1 |  5  |
| 2 |  9  |
| 3 |  3  |
| 4 |  1  |

我想获得两个包含mvv值和计数值的列表。就像是

mvv = [1,2,3,4]
count = [5,9,3,1]

因此,我尝试了以下代码:第一行应返回python行列表。我想看第一个值:

mvv_list = mvv_count_df.select('mvv').collect()
firstvalue = mvv_list[0].getInt(0)

但是我在第二行收到一条错误消息:

AttributeError:getInt

I work on a dataframe with two column, mvv and count.

+---+-----+
|mvv|count|
+---+-----+
| 1 |  5  |
| 2 |  9  |
| 3 |  3  |
| 4 |  1  |

i would like to obtain two list containing mvv values and count value. Something like

mvv = [1,2,3,4]
count = [5,9,3,1]

So, I tried the following code: The first line should return a python list of row. I wanted to see the first value:

mvv_list = mvv_count_df.select('mvv').collect()
firstvalue = mvv_list[0].getInt(0)

But I get an error message with the second line:

AttributeError: getInt


回答 0

明白了,为什么这种方式无法正常工作。首先,您尝试从类型获取整数,collect的输出如下所示:

>>> mvv_list = mvv_count_df.select('mvv').collect()
>>> mvv_list[0]
Out: Row(mvv=1)

如果您采取这样的做法:

>>> firstvalue = mvv_list[0].mvv
Out: 1

您将获得mvv价值。如果您需要数组的所有信息,则可以采取以下方法:

>>> mvv_array = [int(row.mvv) for row in mvv_list.collect()]
>>> mvv_array
Out: [1,2,3,4]

但是,如果对另一列尝试相同的操作,则会得到:

>>> mvv_count = [int(row.count) for row in mvv_list.collect()]
Out: TypeError: int() argument must be a string or a number, not 'builtin_function_or_method'

发生这种情况是因为它count是一种内置方法。并且该列的名称与相同count。一种解决方法是将列名称更改count_count

>>> mvv_list = mvv_list.selectExpr("mvv as mvv", "count as _count")
>>> mvv_count = [int(row._count) for row in mvv_list.collect()]

但是不需要此解决方法,因为您可以使用字典语法访问列:

>>> mvv_array = [int(row['mvv']) for row in mvv_list.collect()]
>>> mvv_count = [int(row['count']) for row in mvv_list.collect()]

它将最终成功!

See, why this way that you are doing is not working. First, you are trying to get integer from a Row Type, the output of your collect is like this:

>>> mvv_list = mvv_count_df.select('mvv').collect()
>>> mvv_list[0]
Out: Row(mvv=1)

If you take something like this:

>>> firstvalue = mvv_list[0].mvv
Out: 1

You will get the mvv value. If you want all the information of the array you can take something like this:

>>> mvv_array = [int(row.mvv) for row in mvv_list.collect()]
>>> mvv_array
Out: [1,2,3,4]

But if you try the same for the other column, you get:

>>> mvv_count = [int(row.count) for row in mvv_list.collect()]
Out: TypeError: int() argument must be a string or a number, not 'builtin_function_or_method'

This happens because count is a built-in method. And the column has the same name as count. A workaround to do this is change the column name of count to _count:

>>> mvv_list = mvv_list.selectExpr("mvv as mvv", "count as _count")
>>> mvv_count = [int(row._count) for row in mvv_list.collect()]

But this workaround is not needed, as you can access the column using the dictionary syntax:

>>> mvv_array = [int(row['mvv']) for row in mvv_list.collect()]
>>> mvv_count = [int(row['count']) for row in mvv_list.collect()]

And it will finally work!


回答 1

紧随其后的是一支衬垫,列出了您想要的清单。

mvv = mvv_count_df.select("mvv").rdd.flatMap(lambda x: x).collect()

Following one liner gives the list you want.

mvv = mvv_count_df.select("mvv").rdd.flatMap(lambda x: x).collect()

回答 2

这将为您提供所有元素作为列表。

mvv_list = list(
    mvv_count_df.select('mvv').toPandas()['mvv']
)

This will give you all the elements as a list.

mvv_list = list(
    mvv_count_df.select('mvv').toPandas()['mvv']
)

回答 3

以下代码将为您提供帮助

mvv_count_df.select('mvv').rdd.map(lambda row : row[0]).collect()

The following code will help you

mvv_count_df.select('mvv').rdd.map(lambda row : row[0]).collect()

回答 4

根据我的数据,我得到了这些基准:

>>> data.select(col).rdd.flatMap(lambda x: x).collect()

0.52秒

>>> [row[col] for row in data.collect()]

0.271秒

>>> list(data.select(col).toPandas()[col])

0.427秒

结果是一样的

On my data I got these benchmarks:

>>> data.select(col).rdd.flatMap(lambda x: x).collect()

0.52 sec

>>> [row[col] for row in data.collect()]

0.271 sec

>>> list(data.select(col).toPandas()[col])

0.427 sec

The result is the same


回答 5

如果出现以下错误:

AttributeError:“列表”对象没有属性“收集”

此代码将解决您的问题:

mvv_list = mvv_count_df.select('mvv').collect()

mvv_array = [int(i.mvv) for i in mvv_list]

If you get the error below :

AttributeError: ‘list’ object has no attribute ‘collect’

This code will solve your issues :

mvv_list = mvv_count_df.select('mvv').collect()

mvv_array = [int(i.mvv) for i in mvv_list]

回答 6

我进行了基准分析,这list(mvv_count_df.select('mvv').toPandas()['mvv'])是最快的方法。我很惊讶

我使用5个节点的i3.xlarge集群(每个节点具有30.5 GB的RAM和4个内核)和Spark 2.4.5对10万/亿个行数据集运行了不同的方法。数据以单列均匀分布在20个快速压缩的Parquet文件中。

这是基准测试结果(运行时间以秒为单位):

+-------------------------------------------------------------+---------+-------------+
|                          Code                               | 100,000 | 100,000,000 |
+-------------------------------------------------------------+---------+-------------+
| df.select("col_name").rdd.flatMap(lambda x: x).collect()    |     0.4 | 55.3        |
| list(df.select('col_name').toPandas()['col_name'])          |     0.4 | 17.5        |
| df.select('col_name').rdd.map(lambda row : row[0]).collect()|     0.9 | 69          |
| [row[0] for row in df.select('col_name').collect()]         |     1.0 | OOM         |
| [r[0] for r in mid_df.select('col_name').toLocalIterator()] |     1.2 | *           |
+-------------------------------------------------------------+---------+-------------+

* cancelled after 800 seconds

在驱动程序节点上收集数据时要遵循的黄金法则:

  • 尝试用其他方法解决问题。将数据收集到驱动程序节点非常昂贵,无法利用Spark集群的功能,因此应尽可能避免。
  • 收集尽可能少的行。在收集数据之前,对列进行聚合,重复数据删除,过滤和修剪。尽可能少地将数据发送到驱动程序节点。

toPandas 在Spark 2.3中得到了显着改进。如果您使用的Spark版本早于2.3,则可能不是最佳方法。

有关更多详细信息/基准测试结果,请参见此处

I ran a benchmarking analysis and list(mvv_count_df.select('mvv').toPandas()['mvv']) is the fastest method. I’m very surprised.

I ran the different approaches on 100 thousand / 100 million row datasets using a 5 node i3.xlarge cluster (each node has 30.5 GBs of RAM and 4 cores) with Spark 2.4.5. Data was evenly distributed on 20 snappy compressed Parquet files with a single column.

Here’s the benchmarking results (runtimes in seconds):

+-------------------------------------------------------------+---------+-------------+
|                          Code                               | 100,000 | 100,000,000 |
+-------------------------------------------------------------+---------+-------------+
| df.select("col_name").rdd.flatMap(lambda x: x).collect()    |     0.4 | 55.3        |
| list(df.select('col_name').toPandas()['col_name'])          |     0.4 | 17.5        |
| df.select('col_name').rdd.map(lambda row : row[0]).collect()|     0.9 | 69          |
| [row[0] for row in df.select('col_name').collect()]         |     1.0 | OOM         |
| [r[0] for r in mid_df.select('col_name').toLocalIterator()] |     1.2 | *           |
+-------------------------------------------------------------+---------+-------------+

* cancelled after 800 seconds

Golden rules to follow when collecting data on the driver node:

  • Try to solve the problem with other approaches. Collecting data to the driver node is expensive, doesn’t harness the power of the Spark cluster, and should be avoided whenever possible.
  • Collect as few rows as possible. Aggregate, deduplicate, filter, and prune columns before collecting the data. Send as little data to the driver node as you can.

toPandas was significantly improved in Spark 2.3. It’s probably not the best approach if you’re using a Spark version earlier than 2.3.

See here for more details / benchmarking results.


回答 7

可能的解决方案是使用中的collect_list()功能pyspark.sql.functions。这会将所有列值聚合到一个pyspark数组中,该数组在收集时将转换为python列表:

mvv_list   = df.select(collect_list("mvv")).collect()[0][0]
count_list = df.select(collect_list("count")).collect()[0][0] 

A possible solution is using the collect_list() function from pyspark.sql.functions. This will aggregate all column values into a pyspark array that is converted into a python list when collected:

mvv_list   = df.select(collect_list("mvv")).collect()[0][0]
count_list = df.select(collect_list("count")).collect()[0][0] 

如何在pyspark中将Dataframe列从String类型更改为Double类型

问题:如何在pyspark中将Dataframe列从String类型更改为Double类型

我有一个列为String的数据框。我想在PySpark中将列类型更改为Double type。

以下是我的方法:

toDoublefunc = UserDefinedFunction(lambda x: x,DoubleType())
changedTypedf = joindf.withColumn("label",toDoublefunc(joindf['show']))

只是想知道,这是正确的方法,就像通过Logistic回归运行时一样,我遇到了一些错误,所以我想知道,这是麻烦的原因。

I have a dataframe with column as String. I wanted to change the column type to Double type in PySpark.

Following is the way, I did:

toDoublefunc = UserDefinedFunction(lambda x: x,DoubleType())
changedTypedf = joindf.withColumn("label",toDoublefunc(joindf['show']))

Just wanted to know, is this the right way to do it as while running through Logistic Regression, I am getting some error, so I wonder, is this the reason for the trouble.


回答 0

这里不需要UDF。Column已经提供了带有instance的cast方法DataType

from pyspark.sql.types import DoubleType

changedTypedf = joindf.withColumn("label", joindf["show"].cast(DoubleType()))

或短字符串:

changedTypedf = joindf.withColumn("label", joindf["show"].cast("double"))

规范的字符串名称(也可以支持其他变体)对应于simpleString值。因此对于原子类型:

from pyspark.sql import types 

for t in ['BinaryType', 'BooleanType', 'ByteType', 'DateType', 
          'DecimalType', 'DoubleType', 'FloatType', 'IntegerType', 
           'LongType', 'ShortType', 'StringType', 'TimestampType']:
    print(f"{t}: {getattr(types, t)().simpleString()}")
BinaryType: binary
BooleanType: boolean
ByteType: tinyint
DateType: date
DecimalType: decimal(10,0)
DoubleType: double
FloatType: float
IntegerType: int
LongType: bigint
ShortType: smallint
StringType: string
TimestampType: timestamp

例如复杂类型

types.ArrayType(types.IntegerType()).simpleString()   
'array<int>'
types.MapType(types.StringType(), types.IntegerType()).simpleString()
'map<string,int>'

There is no need for an UDF here. Column already provides cast method with DataType instance :

from pyspark.sql.types import DoubleType

changedTypedf = joindf.withColumn("label", joindf["show"].cast(DoubleType()))

or short string:

changedTypedf = joindf.withColumn("label", joindf["show"].cast("double"))

where canonical string names (other variations can be supported as well) correspond to simpleString value. So for atomic types:

from pyspark.sql import types 

for t in ['BinaryType', 'BooleanType', 'ByteType', 'DateType', 
          'DecimalType', 'DoubleType', 'FloatType', 'IntegerType', 
           'LongType', 'ShortType', 'StringType', 'TimestampType']:
    print(f"{t}: {getattr(types, t)().simpleString()}")
BinaryType: binary
BooleanType: boolean
ByteType: tinyint
DateType: date
DecimalType: decimal(10,0)
DoubleType: double
FloatType: float
IntegerType: int
LongType: bigint
ShortType: smallint
StringType: string
TimestampType: timestamp

and for example complex types

types.ArrayType(types.IntegerType()).simpleString()   
'array<int>'
types.MapType(types.StringType(), types.IntegerType()).simpleString()
'map<string,int>'

回答 1

保留列名,并通过使用与输入列相同的名称来避免添加额外的列:

changedTypedf = joindf.withColumn("show", joindf["show"].cast(DoubleType()))

Preserve the name of the column and avoid extra column addition by using the same name as input column:

changedTypedf = joindf.withColumn("show", joindf["show"].cast(DoubleType()))

回答 2

给定的答案足以解决问题,但是我想分享另一种可能会引入新版本的Spark的方法(我不确定),因此给定的答案无法解决。

我们可以使用col("colum_name")关键字到达spark语句中的列:

from pyspark.sql.functions import col , column
changedTypedf = joindf.withColumn("show", col("show").cast("double"))

Given answers are enough to deal with the problem but I want to share another way which may be introduced the new version of Spark (I am not sure about it) so given answer didn’t catch it.

We can reach the column in spark statement with col("colum_name") keyword:

from pyspark.sql.functions import col , column
changedTypedf = joindf.withColumn("show", col("show").cast("double"))

回答 3

pyspark版本:

  df = <source data>
  df.printSchema()

  from pyspark.sql.types import *

  # Change column type
  df_new = df.withColumn("myColumn", df["myColumn"].cast(IntegerType()))
  df_new.printSchema()
  df_new.select("myColumn").show()

pyspark version:

  df = <source data>
  df.printSchema()

  from pyspark.sql.types import *

  # Change column type
  df_new = df.withColumn("myColumn", df["myColumn"].cast(IntegerType()))
  df_new.printSchema()
  df_new.select("myColumn").show()

回答 4

解决方案很简单-

toDoublefunc = UserDefinedFunction(lambda x: float(x),DoubleType())
changedTypedf = joindf.withColumn("label",toDoublefunc(joindf['show']))

the solution was simple –

toDoublefunc = UserDefinedFunction(lambda x: float(x),DoubleType())
changedTypedf = joindf.withColumn("label",toDoublefunc(joindf['show']))

如何在Spark中关闭INFO日志记录?

问题:如何在Spark中关闭INFO日志记录?

我使用AWS EC2指南安装了Spark,并且可以使用bin/pyspark脚本正常启动该程序以获取Spark 提示,并且还可以成功执行快速入门Quide。

但是,我无法终生解决如何INFO在每个命令后停止所有冗长的日志记录。

我在下面的代码(注释掉,设置为OFF)中的几乎所有可能的情况下都尝试了log4j.propertiesconf文件夹,该文件夹位于我从中以及在每个节点上启动应用程序的文件夹中,没有任何反应。INFO执行每个语句后,我仍然可以打印日志记录语句。

我对应该如何工作感到非常困惑。

#Set everything to be logged to the console log4j.rootCategory=INFO, console                                                                        
log4j.appender.console=org.apache.log4j.ConsoleAppender 
log4j.appender.console.target=System.err     
log4j.appender.console.layout=org.apache.log4j.PatternLayout 
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

# Settings to quiet third party logs that are too verbose
log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO

这是我使用时的完整类路径SPARK_PRINT_LAUNCH_COMMAND

Spark命令:/Library/Java/JavaVirtualMachines/jdk1.8.0_05.jdk/Contents/Home/bin/java -cp:/root/spark-1.0.1-bin-hadoop2/conf:/root/spark-1.0.1 -bin-hadoop2 / conf:/root/spark-1.0.1-bin-hadoop2/lib/spark-assembly-1.0.1-hadoop2.2.0.jar:/root/spark-1.0.1-bin-hadoop2/lib /datanucleus-api-jdo-3.2.1.jar:/root/spark-1.0.1-bin-hadoop2/lib/datanucleus-core-3.2.2.jar:/root/spark-1.0.1-bin-hadoop2 /lib/datanucleus-rdbms-3.2.1.jar -XX:MaxPermSize = 128m -Djava.library.path = -Xms512m -Xmx512m org.apache.spark.deploy.Spark提交spark-shell –class org.apache.spark。代表主

的内容spark-env.sh

#!/usr/bin/env bash

# This file is sourced when running various Spark programs.
# Copy it as spark-env.sh and edit that to configure Spark for your site.

# Options read when launching programs locally with 
# ./bin/run-example or ./bin/spark-submit
# - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files
# - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node
# - SPARK_PUBLIC_DNS, to set the public dns name of the driver program
# - SPARK_CLASSPATH=/root/spark-1.0.1-bin-hadoop2/conf/

# Options read by executors and drivers running inside the cluster
# - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node
# - SPARK_PUBLIC_DNS, to set the public DNS name of the driver program
# - SPARK_CLASSPATH, default classpath entries to append
# - SPARK_LOCAL_DIRS, storage directories to use on this node for shuffle and RDD data
# - MESOS_NATIVE_LIBRARY, to point to your libmesos.so if you use Mesos

# Options read in YARN client mode
# - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files
# - SPARK_EXECUTOR_INSTANCES, Number of workers to start (Default: 2)
# - SPARK_EXECUTOR_CORES, Number of cores for the workers (Default: 1).
# - SPARK_EXECUTOR_MEMORY, Memory per Worker (e.g. 1000M, 2G) (Default: 1G)
# - SPARK_DRIVER_MEMORY, Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)
# - SPARK_YARN_APP_NAME, The name of your application (Default: Spark)
# - SPARK_YARN_QUEUE, The hadoop queue to use for allocation requests (Default: ‘default’)
# - SPARK_YARN_DIST_FILES, Comma separated list of files to be distributed with the job.
# - SPARK_YARN_DIST_ARCHIVES, Comma separated list of archives to be distributed with the job.

# Options for the daemons used in the standalone deploy mode:
# - SPARK_MASTER_IP, to bind the master to a different IP address or hostname
# - SPARK_MASTER_PORT / SPARK_MASTER_WEBUI_PORT, to use non-default ports for the master
# - SPARK_MASTER_OPTS, to set config properties only for the master (e.g. "-Dx=y")
# - SPARK_WORKER_CORES, to set the number of cores to use on this machine
# - SPARK_WORKER_MEMORY, to set how much total memory workers have to give executors (e.g. 1000m, 2g)
# - SPARK_WORKER_PORT / SPARK_WORKER_WEBUI_PORT, to use non-default ports for the worker
# - SPARK_WORKER_INSTANCES, to set the number of worker processes per node
# - SPARK_WORKER_DIR, to set the working directory of worker processes
# - SPARK_WORKER_OPTS, to set config properties only for the worker (e.g. "-Dx=y")
# - SPARK_HISTORY_OPTS, to set config properties only for the history server (e.g. "-Dx=y")
# - SPARK_DAEMON_JAVA_OPTS, to set config properties for all daemons (e.g. "-Dx=y")
# - SPARK_PUBLIC_DNS, to set the public dns name of the master or workers

export SPARK_SUBMIT_CLASSPATH="$FWDIR/conf"

I installed Spark using the AWS EC2 guide and I can launch the program fine using the bin/pyspark script to get to the spark prompt and can also do the Quick Start quide successfully.

However, I cannot for the life of me figure out how to stop all of the verbose INFO logging after each command.

I have tried nearly every possible scenario in the below code (commenting out, setting to OFF) within my log4j.properties file in the conf folder in where I launch the application from as well as on each node and nothing is doing anything. I still get the logging INFO statements printing after executing each statement.

I am very confused with how this is supposed to work.

#Set everything to be logged to the console log4j.rootCategory=INFO, console                                                                        
log4j.appender.console=org.apache.log4j.ConsoleAppender 
log4j.appender.console.target=System.err     
log4j.appender.console.layout=org.apache.log4j.PatternLayout 
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

# Settings to quiet third party logs that are too verbose
log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO

Here is my full classpath when I use SPARK_PRINT_LAUNCH_COMMAND:

Spark Command: /Library/Java/JavaVirtualMachines/jdk1.8.0_05.jdk/Contents/Home/bin/java -cp :/root/spark-1.0.1-bin-hadoop2/conf:/root/spark-1.0.1-bin-hadoop2/conf:/root/spark-1.0.1-bin-hadoop2/lib/spark-assembly-1.0.1-hadoop2.2.0.jar:/root/spark-1.0.1-bin-hadoop2/lib/datanucleus-api-jdo-3.2.1.jar:/root/spark-1.0.1-bin-hadoop2/lib/datanucleus-core-3.2.2.jar:/root/spark-1.0.1-bin-hadoop2/lib/datanucleus-rdbms-3.2.1.jar -XX:MaxPermSize=128m -Djava.library.path= -Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit spark-shell –class org.apache.spark.repl.Main

contents of spark-env.sh:

#!/usr/bin/env bash

# This file is sourced when running various Spark programs.
# Copy it as spark-env.sh and edit that to configure Spark for your site.

# Options read when launching programs locally with 
# ./bin/run-example or ./bin/spark-submit
# - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files
# - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node
# - SPARK_PUBLIC_DNS, to set the public dns name of the driver program
# - SPARK_CLASSPATH=/root/spark-1.0.1-bin-hadoop2/conf/

# Options read by executors and drivers running inside the cluster
# - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node
# - SPARK_PUBLIC_DNS, to set the public DNS name of the driver program
# - SPARK_CLASSPATH, default classpath entries to append
# - SPARK_LOCAL_DIRS, storage directories to use on this node for shuffle and RDD data
# - MESOS_NATIVE_LIBRARY, to point to your libmesos.so if you use Mesos

# Options read in YARN client mode
# - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files
# - SPARK_EXECUTOR_INSTANCES, Number of workers to start (Default: 2)
# - SPARK_EXECUTOR_CORES, Number of cores for the workers (Default: 1).
# - SPARK_EXECUTOR_MEMORY, Memory per Worker (e.g. 1000M, 2G) (Default: 1G)
# - SPARK_DRIVER_MEMORY, Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)
# - SPARK_YARN_APP_NAME, The name of your application (Default: Spark)
# - SPARK_YARN_QUEUE, The hadoop queue to use for allocation requests (Default: ‘default’)
# - SPARK_YARN_DIST_FILES, Comma separated list of files to be distributed with the job.
# - SPARK_YARN_DIST_ARCHIVES, Comma separated list of archives to be distributed with the job.

# Options for the daemons used in the standalone deploy mode:
# - SPARK_MASTER_IP, to bind the master to a different IP address or hostname
# - SPARK_MASTER_PORT / SPARK_MASTER_WEBUI_PORT, to use non-default ports for the master
# - SPARK_MASTER_OPTS, to set config properties only for the master (e.g. "-Dx=y")
# - SPARK_WORKER_CORES, to set the number of cores to use on this machine
# - SPARK_WORKER_MEMORY, to set how much total memory workers have to give executors (e.g. 1000m, 2g)
# - SPARK_WORKER_PORT / SPARK_WORKER_WEBUI_PORT, to use non-default ports for the worker
# - SPARK_WORKER_INSTANCES, to set the number of worker processes per node
# - SPARK_WORKER_DIR, to set the working directory of worker processes
# - SPARK_WORKER_OPTS, to set config properties only for the worker (e.g. "-Dx=y")
# - SPARK_HISTORY_OPTS, to set config properties only for the history server (e.g. "-Dx=y")
# - SPARK_DAEMON_JAVA_OPTS, to set config properties for all daemons (e.g. "-Dx=y")
# - SPARK_PUBLIC_DNS, to set the public dns name of the master or workers

export SPARK_SUBMIT_CLASSPATH="$FWDIR/conf"

回答 0

只需在spark目录中执行以下命令:

cp conf/log4j.properties.template conf/log4j.properties

编辑log4j.properties:

# Set everything to be logged to the console
log4j.rootCategory=INFO, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

# Settings to quiet third party logs that are too verbose
log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO

在第一行替换:

log4j.rootCategory=INFO, console

通过:

log4j.rootCategory=WARN, console

保存并重新启动您的Shell。它适用于OS X上的Spark 1.1.0和Spark 1.5.1。

Just execute this command in the spark directory:

cp conf/log4j.properties.template conf/log4j.properties

Edit log4j.properties:

# Set everything to be logged to the console
log4j.rootCategory=INFO, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

# Settings to quiet third party logs that are too verbose
log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO

Replace at the first line:

log4j.rootCategory=INFO, console

by:

log4j.rootCategory=WARN, console

Save and restart your shell. It works for me for Spark 1.1.0 and Spark 1.5.1 on OS X.


回答 1

受到pyspark / tests.py的启发

def quiet_logs(sc):
    logger = sc._jvm.org.apache.log4j
    logger.LogManager.getLogger("org"). setLevel( logger.Level.ERROR )
    logger.LogManager.getLogger("akka").setLevel( logger.Level.ERROR )

在创建SparkContext之后立即调用此方法,将测试时记录的stderr行从2647减少到163。但是创建SparkContext本身会记录163,直至

15/08/25 10:14:16 INFO SparkDeploySchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0

而且我还不清楚如何以编程方式进行调整。

Inspired by the pyspark/tests.py I did

def quiet_logs(sc):
    logger = sc._jvm.org.apache.log4j
    logger.LogManager.getLogger("org"). setLevel( logger.Level.ERROR )
    logger.LogManager.getLogger("akka").setLevel( logger.Level.ERROR )

Calling this just after creating SparkContext reduced stderr lines logged for my test from 2647 to 163. However creating the SparkContext itself logs 163, up to

15/08/25 10:14:16 INFO SparkDeploySchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0

and it’s not clear to me how to adjust those programmatically.


回答 2

在Spark 2.0中,您还可以使用setLogLevel为应用程序动态配置它:

    from pyspark.sql import SparkSession
    spark = SparkSession.builder.\
        master('local').\
        appName('foo').\
        getOrCreate()
    spark.sparkContext.setLogLevel('WARN')

pyspark控制台中,默认spark会话将已经可用。

In Spark 2.0 you can also configure it dynamically for your application using setLogLevel:

    from pyspark.sql import SparkSession
    spark = SparkSession.builder.\
        master('local').\
        appName('foo').\
        getOrCreate()
    spark.sparkContext.setLogLevel('WARN')

In the pyspark console, a default spark session will already be available.


回答 3

编辑您的conf / log4j.properties文件,然后更改以下行:

   log4j.rootCategory=INFO, console

    log4j.rootCategory=ERROR, console

另一种方法是:

启动spark-shell并输入以下内容:

import org.apache.log4j.Logger
import org.apache.log4j.Level

Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)

之后,您将看不到任何日志。

Edit your conf/log4j.properties file and Change the following line:

   log4j.rootCategory=INFO, console

to

    log4j.rootCategory=ERROR, console

Another approach would be to :

Fireup spark-shell and type in the following:

import org.apache.log4j.Logger
import org.apache.log4j.Level

Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)

You won’t see any logs after that.


回答 4

>>> log4j = sc._jvm.org.apache.log4j
>>> log4j.LogManager.getRootLogger().setLevel(log4j.Level.ERROR)
>>> log4j = sc._jvm.org.apache.log4j
>>> log4j.LogManager.getRootLogger().setLevel(log4j.Level.ERROR)

回答 5

对于PySpark,您还可以使用在脚本中设置日志级别sc.setLogLevel("FATAL")。从文档

控制我们的logLevel。这将覆盖所有用户定义的日志设置。有效的日志级别包括:ALL,DEBUG,ERROR,FATAL,INFO,OFF,TRACE,WARN

For PySpark, you can also set the log level in your scripts with sc.setLogLevel("FATAL"). From the docs:

Control our logLevel. This overrides any user-defined log settings. Valid log levels include: ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN


回答 6

您可以使用setLogLevel

val spark = SparkSession
      .builder()
      .config("spark.master", "local[1]")
      .appName("TestLog")
      .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

You can use setLogLevel

val spark = SparkSession
      .builder()
      .config("spark.master", "local[1]")
      .appName("TestLog")
      .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

回答 7

这可能是由于Spark如何计算其类路径。我的直觉是,Hadoop的log4j.properties文件在类路径中出现在Spark之前,从而阻止您的更改生效。

如果你跑

SPARK_PRINT_LAUNCH_COMMAND=1 bin/spark-shell

然后Spark将打印用于启动Shell的完整类路径;就我而言

Spark Command: /usr/lib/jvm/java/bin/java -cp :::/root/ephemeral-hdfs/conf:/root/spark/conf:/root/spark/lib/spark-assembly-1.0.0-hadoop1.0.4.jar:/root/spark/lib/datanucleus-api-jdo-3.2.1.jar:/root/spark/lib/datanucleus-core-3.2.2.jar:/root/spark/lib/datanucleus-rdbms-3.2.1.jar -XX:MaxPermSize=128m -Djava.library.path=:/root/ephemeral-hdfs/lib/native/ -Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit spark-shell --class org.apache.spark.repl.Main

/root/ephemeral-hdfs/conf类路径的最前面。

我已经发布了一个问题[SPARK-2913],可以在下一发行版中解决此问题(我应该尽快发布一个补丁)。

同时,有两种解决方法:

  • 添加export SPARK_SUBMIT_CLASSPATH="$FWDIR/conf"spark-env.sh
  • 删除(或重命名)/root/ephemeral-hdfs/conf/log4j.properties

This may be due to how Spark computes its classpath. My hunch is that Hadoop’s log4j.properties file is appearing ahead of Spark’s on the classpath, preventing your changes from taking effect.

If you run

SPARK_PRINT_LAUNCH_COMMAND=1 bin/spark-shell

then Spark will print the full classpath used to launch the shell; in my case, I see

Spark Command: /usr/lib/jvm/java/bin/java -cp :::/root/ephemeral-hdfs/conf:/root/spark/conf:/root/spark/lib/spark-assembly-1.0.0-hadoop1.0.4.jar:/root/spark/lib/datanucleus-api-jdo-3.2.1.jar:/root/spark/lib/datanucleus-core-3.2.2.jar:/root/spark/lib/datanucleus-rdbms-3.2.1.jar -XX:MaxPermSize=128m -Djava.library.path=:/root/ephemeral-hdfs/lib/native/ -Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit spark-shell --class org.apache.spark.repl.Main

where /root/ephemeral-hdfs/conf is at the head of the classpath.

I’ve opened an issue [SPARK-2913] to fix this in the next release (I should have a patch out soon).

In the meantime, here’s a couple of workarounds:

  • Add export SPARK_SUBMIT_CLASSPATH="$FWDIR/conf" to spark-env.sh.
  • Delete (or rename) /root/ephemeral-hdfs/conf/log4j.properties.

回答 8

Spark 1.6.2:

log4j = sc._jvm.org.apache.log4j
log4j.LogManager.getRootLogger().setLevel(log4j.Level.ERROR)

Spark 2.x:

spark.sparkContext.setLogLevel('WARN')

(火花是SparkSession)

或者是旧方法

在Spark Dir中重命名conf/log4j.properties.templateconf/log4j.properties

在中log4j.properties,更改log4j.rootCategory=INFO, consolelog4j.rootCategory=WARN, console

可用的不同日志级别:

  • OFF(最具体,不记录)
  • 致命(最具体,数据很少)
  • 错误-仅在出现错误的情况下记录
  • 警告-仅在出现警告或错误时记录
  • INFO(默认)
  • 调试-日志详细信息步骤(以及上述所有日志)
  • TRACE(最具体,很多数据)
  • ALL(最少,所有数据)

Spark 1.6.2:

log4j = sc._jvm.org.apache.log4j
log4j.LogManager.getRootLogger().setLevel(log4j.Level.ERROR)

Spark 2.x:

spark.sparkContext.setLogLevel('WARN')

(spark being the SparkSession)

Alternatively the old methods,

Rename conf/log4j.properties.template to conf/log4j.properties in Spark Dir.

In the log4j.properties, change log4j.rootCategory=INFO, console to log4j.rootCategory=WARN, console

Different log levels available:

  • OFF (most specific, no logging)
  • FATAL (most specific, little data)
  • ERROR – Log only in case of Errors
  • WARN – Log only in case of Warnings or Errors
  • INFO (Default)
  • DEBUG – Log details steps (and all logs stated above)
  • TRACE (least specific, a lot of data)
  • ALL (least specific, all data)

回答 9

编程方式

spark.sparkContext.setLogLevel("WARN")

可用选项

ERROR
WARN 
INFO 

Programmatic way

spark.sparkContext.setLogLevel("WARN")

Available Options

ERROR
WARN 
INFO 

回答 10

我将其用于具有1个主设备和2个从设备以及Spark 1.2.1的Amazon EC2。

# Step 1. Change config file on the master node
nano /root/ephemeral-hdfs/conf/log4j.properties

# Before
hadoop.root.logger=INFO,console
# After
hadoop.root.logger=WARN,console

# Step 2. Replicate this change to slaves
~/spark-ec2/copy-dir /root/ephemeral-hdfs/conf/

I used this with Amazon EC2 with 1 master and 2 slaves and Spark 1.2.1.

# Step 1. Change config file on the master node
nano /root/ephemeral-hdfs/conf/log4j.properties

# Before
hadoop.root.logger=INFO,console
# After
hadoop.root.logger=WARN,console

# Step 2. Replicate this change to slaves
~/spark-ec2/copy-dir /root/ephemeral-hdfs/conf/

回答 11

只需将以下参数添加到您的spark-submit命令中

--conf "spark.driver.extraJavaOptions=-Dlog4jspark.root.logger=WARN,console"

这仅暂时覆盖该作业的系统值。从log4j.properties文件中检查确切的属性名称(此处为log4jspark.root.logger)。

希望这会有所帮助,加油!

Simply add below param to your spark-submit command

--conf "spark.driver.extraJavaOptions=-Dlog4jspark.root.logger=WARN,console"

This overrides system value temporarily only for that job. Check exact property name (log4jspark.root.logger here) from log4j.properties file.

Hope this helps, cheers!


回答 12

以下针对scala用户的代码段:

选项1 :

您可以在摘要下方添加文件级

import org.apache.log4j.{Level, Logger}
Logger.getLogger("org").setLevel(Level.WARN)

选项2:

注意:这将适用于所有正在使用spark会话的应用程序。

import org.apache.spark.sql.SparkSession

  private[this] implicit val spark = SparkSession.builder().master("local[*]").getOrCreate()

spark.sparkContext.setLogLevel("WARN")

选项3:

注意:此配置应添加到您的log4j.properties中。(可能类似于/etc/spark/conf/log4j.properties(其中有spark安装)或项目文件夹级别的log4j.properties),因为您在以下位置进行更改模块级别。这将适用于所有应用程序。

log4j.rootCategory=ERROR, console

恕我直言,选项1是明智的方法,因为可以在文件级别将其关闭。

This below code snippet for scala users :

Option 1 :

Below snippet you can add at the file level

import org.apache.log4j.{Level, Logger}
Logger.getLogger("org").setLevel(Level.WARN)

Option 2 :

Note : which will be applicable for all the application which is using spark session.

import org.apache.spark.sql.SparkSession

  private[this] implicit val spark = SparkSession.builder().master("local[*]").getOrCreate()

spark.sparkContext.setLogLevel("WARN")

Option 3 :

Note : This configuration should be added to your log4j.properties.. (could be like /etc/spark/conf/log4j.properties (where the spark installation is there) or your project folder level log4j.properties) since you are changing at module level. This will be applicable for all the application.

log4j.rootCategory=ERROR, console

IMHO, Option 1 is wise way since it can be switched off at file level.


回答 13

我这样做的方式是:

在我运行spark-submit脚本的位置

$ cp /etc/spark/conf/log4j.properties .
$ nano log4j.properties

更改INFO为所需的日志记录级别,然后运行spark-submit

The way I do it is:

in the location I run the spark-submit script do

$ cp /etc/spark/conf/log4j.properties .
$ nano log4j.properties

change INFO to what ever level of logging you want and then run your spark-submit


回答 14

我想继续使用日志记录(Python的日志记录工具),可以尝试拆分应用程序和Spark的配置:

LoggerManager()
logger = logging.getLogger(__name__)
loggerSpark = logging.getLogger('py4j')
loggerSpark.setLevel('WARNING')

I you want to keep using the logging (Logging facility for Python) you can try splitting configurations for your application and for Spark:

LoggerManager()
logger = logging.getLogger(__name__)
loggerSpark = logging.getLogger('py4j')
loggerSpark.setLevel('WARNING')

如何在pyspark中更改数据框列名称?

问题:如何在pyspark中更改数据框列名称?

我来自熊猫背景,习惯于将CSV文件中的数据读取到数据帧中,然后使用简单的命令将列名更改为有用的东西:

df.columns = new_column_name_list

但是,这在使用sqlContext创建的pyspark数据帧中无效。我能想到的唯一解决方案是:

df = sqlContext.read.format("com.databricks.spark.csv").options(header='false', inferschema='true', delimiter='\t').load("data.txt")
oldSchema = df.schema
for i,k in enumerate(oldSchema.fields):
  k.name = new_column_name_list[i]
df = sqlContext.read.format("com.databricks.spark.csv").options(header='false', delimiter='\t').load("data.txt", schema=oldSchema)

这基本上是两次定义变量,然后首先推断模式,然后重命名列名,然后使用更新后的模式再次加载数据框。

有没有像我们在大熊猫中那样做的更好,更有效的方法?

我的Spark版本是1.5.0

I come from pandas background and am used to reading data from CSV files into a dataframe and then simply changing the column names to something useful using the simple command:

df.columns = new_column_name_list

However, the same doesn’t work in pyspark dataframes created using sqlContext. The only solution I could figure out to do this easily is the following:

df = sqlContext.read.format("com.databricks.spark.csv").options(header='false', inferschema='true', delimiter='\t').load("data.txt")
oldSchema = df.schema
for i,k in enumerate(oldSchema.fields):
  k.name = new_column_name_list[i]
df = sqlContext.read.format("com.databricks.spark.csv").options(header='false', delimiter='\t').load("data.txt", schema=oldSchema)

This is basically defining the variable twice and inferring the schema first then renaming the column names and then loading the dataframe again with the updated schema.

Is there a better and more efficient way to do this like we do in pandas ?

My spark version is 1.5.0


回答 0

有很多方法可以做到这一点:

  • 选项1.使用selectExpr

    data = sqlContext.createDataFrame([("Alberto", 2), ("Dakota", 2)], 
                                      ["Name", "askdaosdka"])
    data.show()
    data.printSchema()
    
    # Output
    #+-------+----------+
    #|   Name|askdaosdka|
    #+-------+----------+
    #|Alberto|         2|
    #| Dakota|         2|
    #+-------+----------+
    
    #root
    # |-- Name: string (nullable = true)
    # |-- askdaosdka: long (nullable = true)
    
    df = data.selectExpr("Name as name", "askdaosdka as age")
    df.show()
    df.printSchema()
    
    # Output
    #+-------+---+
    #|   name|age|
    #+-------+---+
    #|Alberto|  2|
    #| Dakota|  2|
    #+-------+---+
    
    #root
    # |-- name: string (nullable = true)
    # |-- age: long (nullable = true)
  • 选项2。使用withColumnRenamed时,请注意,此方法允许您“覆盖”同一列。对于Python3,请替换xrangerange

    from functools import reduce
    
    oldColumns = data.schema.names
    newColumns = ["name", "age"]
    
    df = reduce(lambda data, idx: data.withColumnRenamed(oldColumns[idx], newColumns[idx]), xrange(len(oldColumns)), data)
    df.printSchema()
    df.show()
  • 选项3.使用 别名,在Scala中,您还可以将as用作

    from pyspark.sql.functions import col
    
    data = data.select(col("Name").alias("name"), col("askdaosdka").alias("age"))
    data.show()
    
    # Output
    #+-------+---+
    #|   name|age|
    #+-------+---+
    #|Alberto|  2|
    #| Dakota|  2|
    #+-------+---+
  • 选项4.使用sqlContext.sql,它使您可以在DataFrames注册为表的地方使用SQL查询。

    sqlContext.registerDataFrameAsTable(data, "myTable")
    df2 = sqlContext.sql("SELECT Name AS name, askdaosdka as age from myTable")
    
    df2.show()
    
    # Output
    #+-------+---+
    #|   name|age|
    #+-------+---+
    #|Alberto|  2|
    #| Dakota|  2|
    #+-------+---+

There are many ways to do that:

  • Option 1. Using selectExpr.

    data = sqlContext.createDataFrame([("Alberto", 2), ("Dakota", 2)], 
                                      ["Name", "askdaosdka"])
    data.show()
    data.printSchema()
    
    # Output
    #+-------+----------+
    #|   Name|askdaosdka|
    #+-------+----------+
    #|Alberto|         2|
    #| Dakota|         2|
    #+-------+----------+
    
    #root
    # |-- Name: string (nullable = true)
    # |-- askdaosdka: long (nullable = true)
    
    df = data.selectExpr("Name as name", "askdaosdka as age")
    df.show()
    df.printSchema()
    
    # Output
    #+-------+---+
    #|   name|age|
    #+-------+---+
    #|Alberto|  2|
    #| Dakota|  2|
    #+-------+---+
    
    #root
    # |-- name: string (nullable = true)
    # |-- age: long (nullable = true)
    
  • Option 2. Using withColumnRenamed, notice that this method allows you to “overwrite” the same column. For Python3, replace xrange with range.

    from functools import reduce
    
    oldColumns = data.schema.names
    newColumns = ["name", "age"]
    
    df = reduce(lambda data, idx: data.withColumnRenamed(oldColumns[idx], newColumns[idx]), xrange(len(oldColumns)), data)
    df.printSchema()
    df.show()
    
  • Option 3. using alias, in Scala you can also use as.

    from pyspark.sql.functions import col
    
    data = data.select(col("Name").alias("name"), col("askdaosdka").alias("age"))
    data.show()
    
    # Output
    #+-------+---+
    #|   name|age|
    #+-------+---+
    #|Alberto|  2|
    #| Dakota|  2|
    #+-------+---+
    
  • Option 4. Using sqlContext.sql, which lets you use SQL queries on DataFrames registered as tables.

    sqlContext.registerDataFrameAsTable(data, "myTable")
    df2 = sqlContext.sql("SELECT Name AS name, askdaosdka as age from myTable")
    
    df2.show()
    
    # Output
    #+-------+---+
    #|   name|age|
    #+-------+---+
    #|Alberto|  2|
    #| Dakota|  2|
    #+-------+---+
    

回答 1

df = df.withColumnRenamed("colName", "newColName")
       .withColumnRenamed("colName2", "newColName2")

使用这种方式的优势:对于长列列表,您只想更改几个列名。在这些情况下,这可能非常方便。连接具有重复列名的表时非常有用。

df = df.withColumnRenamed("colName", "newColName")\
       .withColumnRenamed("colName2", "newColName2")

Advantage of using this way: With long list of columns you would like to change only few column names. This can be very convenient in these scenarios. Very useful when joining tables with duplicate column names.


回答 2

如果要更改所有列名称,请尝试 df.toDF(*cols)

If you want to change all columns names, try df.toDF(*cols)


回答 3

如果您想对所有列名应用简单的转换,则此代码可以解决问题:(我用下划线替换所有空格)

new_column_name_list= list(map(lambda x: x.replace(" ", "_"), df.columns))

df = df.toDF(*new_column_name_list)

感谢@ user8117731的toDf把戏。

In case you would like to apply a simple transformation on all column names, this code does the trick: (I am replacing all spaces with underscore)

new_column_name_list= list(map(lambda x: x.replace(" ", "_"), df.columns))

df = df.toDF(*new_column_name_list)

Thanks to @user8117731 for toDf trick.


回答 4

如果要重命名单个列,并保留其余的原样:

from pyspark.sql.functions import col
new_df = old_df.select(*[col(s).alias(new_name) if s == column_to_change else s for s in old_df.columns])

If you want to rename a single column and keep the rest as it is:

from pyspark.sql.functions import col
new_df = old_df.select(*[col(s).alias(new_name) if s == column_to_change else s for s in old_df.columns])

回答 5

df.withColumnRenamed('age', 'age2')

df.withColumnRenamed('age', 'age2')


回答 6

这是我使用的方法:

创建pyspark会话:

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('changeColNames').getOrCreate()

创建数据框:

df = spark.createDataFrame(data = [('Bob', 5.62,'juice'),  ('Sue',0.85,'milk')], schema = ["Name", "Amount","Item"])

查看具有列名称的df:

df.show()
+----+------+-----+
|Name|Amount| Item|
+----+------+-----+
| Bob|  5.62|juice|
| Sue|  0.85| milk|
+----+------+-----+

用新的列名创建一个列表:

newcolnames = ['NameNew','AmountNew','ItemNew']

更改df的列名:

for c,n in zip(df.columns,newcolnames):
    df=df.withColumnRenamed(c,n)

使用新的列名查看df:

df.show()
+-------+---------+-------+
|NameNew|AmountNew|ItemNew|
+-------+---------+-------+
|    Bob|     5.62|  juice|
|    Sue|     0.85|   milk|
+-------+---------+-------+

this is the approach that I used:

create pyspark session:

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('changeColNames').getOrCreate()

create dataframe:

df = spark.createDataFrame(data = [('Bob', 5.62,'juice'),  ('Sue',0.85,'milk')], schema = ["Name", "Amount","Item"])

view df with column names:

df.show()
+----+------+-----+
|Name|Amount| Item|
+----+------+-----+
| Bob|  5.62|juice|
| Sue|  0.85| milk|
+----+------+-----+

create a list with new column names:

newcolnames = ['NameNew','AmountNew','ItemNew']

change the column names of the df:

for c,n in zip(df.columns,newcolnames):
    df=df.withColumnRenamed(c,n)

view df with new column names:

df.show()
+-------+---------+-------+
|NameNew|AmountNew|ItemNew|
+-------+---------+-------+
|    Bob|     5.62|  juice|
|    Sue|     0.85|   milk|
+-------+---------+-------+

回答 7

我提供了一个易于使用的函数来为pyspark数据帧重命名多个列,以防有人使用它:

def renameCols(df, old_columns, new_columns):
    for old_col,new_col in zip(old_columns,new_columns):
        df = df.withColumnRenamed(old_col,new_col)
    return df

old_columns = ['old_name1','old_name2']
new_columns = ['new_name1', 'new_name2']
df_renamed = renameCols(df, old_columns, new_columns)

请注意,两个列表的长度必须相同。

I made an easy to use function to rename multiple columns for a pyspark dataframe, in case anyone wants to use it:

def renameCols(df, old_columns, new_columns):
    for old_col,new_col in zip(old_columns,new_columns):
        df = df.withColumnRenamed(old_col,new_col)
    return df

old_columns = ['old_name1','old_name2']
new_columns = ['new_name1', 'new_name2']
df_renamed = renameCols(df, old_columns, new_columns)

Be careful, both lists must be the same length.


回答 8

重命名一个列的另一种方法(使用import pyspark.sql.functions as F):

df = df.select( '*', F.col('count').alias('new_count') ).drop('count')

Another way to rename just one column (using import pyspark.sql.functions as F):

df = df.select( '*', F.col('count').alias('new_count') ).drop('count')

回答 9

我用这个:

from pyspark.sql.functions import col
df.select(['vin',col('timeStamp').alias('Date')]).show()

I use this one:

from pyspark.sql.functions import col
df.select(['vin',col('timeStamp').alias('Date')]).show()

回答 10

您可以使用以下函数来重命名数据框的所有列。

def df_col_rename(X, to_rename, replace_with):
    """
    :param X: spark dataframe
    :param to_rename: list of original names
    :param replace_with: list of new names
    :return: dataframe with updated names
    """
    import pyspark.sql.functions as F
    mapping = dict(zip(to_rename, replace_with))
    X = X.select([F.col(c).alias(mapping.get(c, c)) for c in to_rename])
    return X

如果只需要更新几个列名,则可以在replace_with列表中使用相同的列名

重命名所有列

df_col_rename(X,['a', 'b', 'c'], ['x', 'y', 'z'])

重命名一些列

df_col_rename(X,['a', 'b', 'c'], ['a', 'y', 'z'])

You can use the following function to rename all the columns of your dataframe.

def df_col_rename(X, to_rename, replace_with):
    """
    :param X: spark dataframe
    :param to_rename: list of original names
    :param replace_with: list of new names
    :return: dataframe with updated names
    """
    import pyspark.sql.functions as F
    mapping = dict(zip(to_rename, replace_with))
    X = X.select([F.col(c).alias(mapping.get(c, c)) for c in to_rename])
    return X

In case you need to update only a few columns’ names, you can use the same column name in the replace_with list

To rename all columns

df_col_rename(X,['a', 'b', 'c'], ['x', 'y', 'z'])

To rename a some columns

df_col_rename(X,['a', 'b', 'c'], ['a', 'y', 'z'])

回答 11

对于单列重命名,您仍然可以使用toDF()。例如,

df1.selectExpr("SALARY*2").toDF("REVISED_SALARY").show()

For a single column rename, you can still use toDF(). For example,

df1.selectExpr("SALARY*2").toDF("REVISED_SALARY").show()

回答 12

我们可以使用各种方法来重命名列名称。

首先,让我们创建一个简单的DataFrame。

df = spark.createDataFrame([("x", 1), ("y", 2)], 
                                  ["col_1", "col_2"])

现在,让我们尝试将col_1重命名为col_3。PFB有几种方法可以做到相同。

# Approach - 1 : using withColumnRenamed function.
df.withColumnRenamed("col_1", "col_3").show()

# Approach - 2 : using alias function.
df.select(df["col_1"].alias("col3"), "col_2").show()

# Approach - 3 : using selectExpr function.
df.selectExpr("col_1 as col_3", "col_2").show()

# Rename all columns
# Approach - 4 : using toDF function. Here you need to pass the list of all columns present in DataFrame.
df.toDF("col_3", "col_2").show()

这是输出。

+-----+-----+
|col_3|col_2|
+-----+-----+
|    x|    1|
|    y|    2|
+-----+-----+

我希望这有帮助。

We can use various approaches to rename the column name.

First, let create a simple DataFrame.

df = spark.createDataFrame([("x", 1), ("y", 2)], 
                                  ["col_1", "col_2"])

Now let’s try to rename col_1 to col_3. PFB a few approaches to do the same.

# Approach - 1 : using withColumnRenamed function.
df.withColumnRenamed("col_1", "col_3").show()

# Approach - 2 : using alias function.
df.select(df["col_1"].alias("col3"), "col_2").show()

# Approach - 3 : using selectExpr function.
df.selectExpr("col_1 as col_3", "col_2").show()

# Rename all columns
# Approach - 4 : using toDF function. Here you need to pass the list of all columns present in DataFrame.
df.toDF("col_3", "col_2").show()

Here is the output.

+-----+-----+
|col_3|col_2|
+-----+-----+
|    x|    1|
|    y|    2|
+-----+-----+

I hope this helps.