python教程—如何将数组(即列表)列转换为向量-Python实用宝典

python教程—如何将数组(即列表)列转换为向量

考虑以下代码片段(假设spark已经被设置为某个SparkSession):注意temperature字段是一个浮动列表。我想把这些列表的花车MLlib向量类型,我想这种转换表达使用基本DataFrame API而不是通过抽样(这是低效的,因为它将所有数据从JVM Python,处理完成在Python中,我们没有得到火花的催化剂优化器的好处,雅达雅达)。我该怎么做呢?具体地说:

问题的简短版本!

考虑以下代码片段(假设spark已经被设置为某个SparkSession):

    from pyspark.sql import Row source_data = [ Row(city="Chicago", temperatures=[-1.0, -2.0, -3.0]), Row(city="New York", temperatures=[-7.0, -7.0, -5.0]), ] df = spark.createDataFrame(source_data)

注意,temperature字段是一个浮动列表。我想把这些列表的花车MLlib向量类型,我想这种转换表达使用基本DataFrame API而不是通过抽样(这是低效的,因为它将所有数据从JVM Python,处理完成在Python中,我们没有得到火花的催化剂优化器的好处,雅达雅达)。我该怎么做呢?具体地说:

  1. 有没有办法让演员阵容正常运作?请参阅下面的详细信息(以及解决方案的失败尝试)?或者,还有其他的手术能达到我想要的效果吗?
  2. 下面我建议的两种解决方案(UDF vs .爆炸/重组列表中的项目)中,哪个更有效?或者还有其他一些几乎正确但不完全正确的方法比这两种方法都好吗?

直投是行不通的

这就是我所期望的“正确”解决方案。我想将列的类型从一种类型转换为另一种类型,所以应该使用强制转换。作为上下文,让我提醒您转换为另一种类型的常规方法:

    from pyspark.sql import types df_with_strings = df.select( df["city"], df["temperatures"].cast(types.ArrayType(types.StringType()))), )

现在,例如df_with_strings.collect()[0]["temperature "][1]是'-7.0'。但如果我对ml向量进行强制转换,事情就不那么顺利了:

    from pyspark.ml.linalg import VectorUDT df_with_vectors = df.select(df["city"], df["temperatures"].cast(VectorUDT()))

这就产生了一个错误:

    pyspark.sql.utils.AnalysisException: "cannot resolve 'CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)' due to data type mismatch: cannot cast ArrayType(DoubleType,true) to org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7;; 'Project [city#0, unresolvedalias(cast(temperatures#1 as vector), None)] +- LogicalRDD [city#0, temperatures#1] "

呵!有什么办法解决这个问题吗?

可能的选择

备选方案1:使用VectorAssembler

有一个转换器似乎非常适合这项工作: vectorassembler 。它接受一个或多个列,并将它们连接到单个向量中。不幸的是,它只接受向量和浮动列,而不是数组列,所以下面的方法不起作用:

    from pyspark.ml.feature import VectorAssembler assembler = VectorAssembler(inputCols=["temperatures"], outputCol="temperature_vector") df_fail = assembler.transform(df)

它给出了这样的错误:

    pyspark.sql.utils.IllegalArgumentException: 'Data type ArrayType(DoubleType,true) is not supported.'

我能想到的最好的方法是将列表分解成多个列,然后使用VectorAssembler将它们重新收集起来:

    from pyspark.ml.feature import VectorAssembler TEMPERATURE_COUNT = 3 assembler_exploded = VectorAssembler( inputCols=["temperatures[{}]".format(i) for i in range(TEMPERATURE_COUNT)], outputCol="temperature_vector" ) df_exploded = df.select( df["city"], *[df["temperatures"][i] for i in range(TEMPERATURE_COUNT)] ) converted_df = assembler_exploded.transform(df_exploded) final_df = converted_df.select("city", "temperature_vector")

这似乎是理想的,只是温度_count大于100,有时大于1000。(另一个问题是,如果事先不知道数组的大小,代码会更复杂,不过我的数据不是这样。)并引发实际生成中间数据集,许多列,还是只考虑这一个中间步骤,个别项目通过瞬变(或事实上它优化这一步完全当它看到的只使用这些列是组装成一个向量)?

备选方案2:使用UDF

一个更简单的替代方法是使用UDF进行转换。这让我可以非常直接地用一行代码表达我想要做的事情,而不需要创建一个列数惊人的数据集。但是所有这些数据都必须在Python和JVM之间交换,而且每个单独的数字都必须由Python处理(众所周知,Python在遍历单个数据项时速度很慢)。它是这样的:

    from pyspark.ml.linalg import Vectors, VectorUDT from pyspark.sql.functions import udf list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT()) df_with_vectors = df.select( df["city"], list_to_vector_udf(df["temperatures"]).alias("temperatures") )

可忽略的言论

这个杂乱无章的问题的其余部分是我在试图找到答案时想到的一些额外的东西。读这篇文章的大多数人可能会跳过它们。

不是一个解决方案:首先使用Vector

在这个简单的例子中,一开始可以使用vector类型创建数据,但是当然,我的数据并不是并行化的Python列表,而是从数据源读取的。但郑重声明,这看起来是这样的:

    from pyspark.ml.linalg import Vectors from pyspark.sql import Row source_data = [ Row(city="Chicago", temperatures=Vectors.dense([-1.0, -2.0, -3.0])), Row(city="New York", temperatures=Vectors.dense([-7.0, -7.0, -5.0])), ] df = spark.createDataFrame(source_data)

低效的解决方案:使用map()

一种可能性是使用RDD map()方法将列表转换为向量。这与UDF的想法类似,但它更糟糕,因为序列化等成本是针对每一行中的所有字段,而不仅仅是被操作的字段。郑重声明,这个解决方案是这样的:

    df_with_vectors = df.rdd.map(lambda row: Row( city=row["city"], temperatures=Vectors.dense(row["temperatures"]) )).toDF()

为cast进行的工作区尝试失败

在绝望中,我注意到Vector在内部由一个包含四个字段的结构体表示,但是使用该类型结构体的传统强制转换也不起作用。这里有一个例子(我用udf构建结构,但udf不是重要的部分):

    from pyspark.ml.linalg import Vectors, VectorUDT from pyspark.sql.functions import udf list_to_almost_vector_udf = udf(lambda l: (1, None, None, l), VectorUDT.sqlType()) df_almost_vector = df.select( df["city"], list_to_almost_vector_udf(df["temperatures"]).alias("temperatures") ) df_with_vectors = df_almost_vector.select( df_almost_vector["city"], df_almost_vector["temperatures"].cast(VectorUDT()) )

这就产生了误差:

    pyspark.sql.utils.AnalysisException: "cannot resolve 'CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)' due to data type mismatch: cannot cast StructType(StructField(type,ByteType,false), StructField(size,IntegerType,true), StructField(indices,ArrayType(IntegerType,false),true), StructField(values,ArrayType(DoubleType,false),true)) to org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7;; 'Project [city#0, unresolvedalias(cast(temperatures#5 as vector), None)] +- Project [city#0, <lambda>(temperatures#1) AS temperatures#5] +- LogicalRDD [city#0, temperatures#1] "

回答

就我个人而言,我会使用Python UDF,不会为其他任何事情操心:

  • 向量不是原生SQL类型,因此无论如何都会有性能开销。特别是这个过程需要两个步骤,数据首先是< a href = " https://stackoverflow.com/a/32454596/6910411 " >从外部转换类型行< / >,然后< a href = " http://apache-spark-developers-list.1001551.n3.nabble.com/What-is-mainly-different-from-a-UDT-and-a-spark-internal-type-that-ExpressionEncoder-recognized-td20370.html " rel = " noreferrer " >从行内部表示使用通用RowEncoder < / >。
  • 任何下游ML管道都比简单的转换要昂贵得多。此外,它需要一个与上述过程相反的过程

但如果你真的想在这里有其他选择,你可以:

  • Scala UDF与Python封装:

    按照项目站点上的说明安装sbt

    创建Scala包,结构如下:

    .
    ├── build.sbt
    └── udfs.scala
    

    编辑构建。sbt(调整以反映Scala和Spark版本):

    scalaVersion := "2.11.8"
    
    libraryDependencies ++= Seq(
      "org.apache.spark" %% "spark-sql" % "2.1.0",
      "org.apache.spark" %% "spark-mllib" % "2.1.0"
    )
    

    编辑udfs.scala:

    package com.example.spark.udfs
    
    import org.apache.spark.sql.functions.udf
    import org.apache.spark.ml.linalg.DenseVector
    
    object udfs {
      val as_vector = udf((xs: Seq[Double]) => new DenseVector(xs.toArray))
    }
    

    包:

      sbt package

    并包括(或等价于Scala vers:

      $PROJECT_ROOT/target/scala-2.11/udfs_2.11-0.1-SNAPSHOT.jar

    作为启动shell /提交应用程序时——driver-class-path的参数。

    在PySpark中定义一个包装器:

      from pyspark.sql.column import _to_java_column, _to_seq, Column from pyspark import SparkContext def as_vector(col): sc = SparkContext.getOrCreate() f = sc._jvm.com.example.spark.udfs.udfs.as_vector() return Column(f.apply(_to_seq(sc, [col], _to_java_column)))

    测试:

      with_vec = df.withColumn("vector", as_vector("temperatures")) with_vec.show()
    +--------+------------------+----------------+
    |    city|      temperatures|          vector|
    +--------+------------------+----------------+
    | Chicago|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]|
    |New York|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]|
    +--------+------------------+----------------+
    
    with_vec.printSchema()
    
    root
     |-- city: string (nullable = true)
     |-- temperatures: array (nullable = true)
     |    |-- element: double (containsNull = true)
     |-- vector: vector (nullable = true)
    
  • 将数据转储到反映DenseVector模式的JSON格式,并读取回来:

      from pyspark.sql.functions import to_json, from_json, col, struct, lit from pyspark.sql.types import StructType, StructField from pyspark.ml.linalg import VectorUDT json_vec = to_json(struct(struct( lit(1).alias("type"), # type 1 is dense, type 0 is sparse col("temperatures").alias("values") ).alias("v"))) schema = StructType([StructField("v", VectorUDT())]) with_parsed_vector = df.withColumn( "parsed_vector", from_json(json_vec, schema).getItem("v") ) with_parsed_vector.show()
    +--------+------------------+----------------+
    |    city|      temperatures|   parsed_vector|
    +--------+------------------+----------------+
    | Chicago|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]|
    |New York|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]|
    +--------+------------------+----------------+
    
    with_parsed_vector.printSchema()
    
    root
     |-- city: string (nullable = true)
     |-- temperatures: array (nullable = true)
     |    |-- element: double (containsNull = true)
     |-- parsed_vector: vector (nullable = true)
    

​Python实用宝典 (pythondict.com)
不只是一个宝典
欢迎关注公众号:Python实用宝典

本文由 Python实用宝典 作者:Python实用宝典 发表,其版权均为 Python实用宝典 所有,文章内容系作者个人观点,不代表 Python实用宝典 对观点赞同或支持。如需转载,请注明文章来源。
2

发表评论