spark-dataframe基本的结构化操作 最后更新时间:2022年07月04日 ## 基本的结构化操作 ### 模式 **模式定义DataFrame的列名以及列的数据类型,它可以由数据源来定义(称作读时定义),也可以由用户自己手动定义(显式定义)。** 注意:实际应用场景决定了定义schema的方式。应当用于即席分析时,使用读时模式即可。不过,使用读时模式极有可能出现类型读取错误的问题,如long型读成了int类型,这回导致精度损失,所以最好采用显式的schema定义。 示例: ```python spark.read.format("json").load("./wj.json").schema ``` ** 这里是使用了读时模式进行schema的定义。** ** 一个模式由多个字段构成的StructType。** ** 这些字段即为StructField,StructField具有名称、类型、布尔标志(该标志用于标记该列是否可以有缺失值或空值),并且用户可以指定与该列系关联的元数据(metadata)。元数据存储着有关此列的信息(spark的机器学习库中会使用该功能)。** ** 模式还包含其他的StructType(spark的复杂类型)。** 示例: ```python from pyspark.sql.types import StructField, StructType, StringType, LongType, IntegerType mySchema = StructType([ StructField("name", StringType(), False), StructField("id", LongType(), False), StructField("age",IntegerType(),True) ]) ``` ### 列和表达式 #### 列 有很多不同的方法来构造和引用列,其中最常用的两种方法是col和column函数。使用这两个函数需要传入列名。 示例: ```python from pyspark.sql.functions import col, column col("name") column("id") ``` 在scala中有两种更简短的引用行的方法。 示例: ```scala $"name" 'id ``` 符号$将字符串指定为表达式,而符号( ' )指定一个symbol,是scala引用标识符的特殊结构。 ##### 显式列引用 如果需要对dataframe的某一列进行引用,则可以直接在该dataframe上使用col方法。当进行连接操作的时候,如果两个dataframe有着同名列,该方法会非常有用。 示例: ```python df.col("name") ``` #### 表达式 **表达式(expression)是对一个dataframe中某一个记录的一个或多个值的一组转换操作。** **通常情况下,最简单的方法是通过expr函数创建仅仅对一个dataframe列引用的表达式。即单列应用的情况下expr和col等同。** 示例: ```python expr("name") col("name") ``` ##### 列作为表达式 列提供了表达式功能的一个子集。如果使用col,并想对该列执行转换操作,则必须对该列的引用执行这些转换操作。使用表达式时,expr函数实际上可以将字符串解析成转换操作和列引用,也可以在之后传递到下一步的转换操作。 示例: ```python expr("age - 5") col("age")-5 expr("age")-5 ``` 以上三个方法都是相同的转换操作。spark将他们编译为表示顺序操作的逻辑树。 注意: - 列只是表达式。 - 列与对这些列的转换操作被编译后生成的逻辑计划,与解析后的表达式的逻辑计划是一样的。 ##### 访问dataframe的列 想要在程序中访问所有的列,可以使用columns方法来查询 示例: ```python df.columns ``` #### 记录和行 **在sapark中,dataframe的每一行都是一个记录,而记录是row类型的对象。spark使用列表达式操纵row类型对象。** **row对象内部其实是字节数组,但是spark没有提供相应的访问用的接口,所以只能通过列表达式去操纵。** **当使用dataframe时,向驱动器请求行的命令总是返回一个或多个Row类型的行数据。** #### 创建行 spark可以基于已知的每列数值去手动实例化一个Row对象来创建行。 手动创建行必须按照该行所附属的DataFrame的顺序来初始化Row对象。 示例: ```python from pyspapk.spl import Row val MyRow = Row(“kirito”,2017,17) ``` #### 访问行数据 访问行数据的方法同样简单:只需要像访问数组和列表一样,利用下标访问即可。 示例: ```python MyRow[0] ``` ### DataFrame转换操作 DataFrame转换操作的核心为: - 添加行或列 - 删除行或列 - 将一行转换操作为列 - 根据列中的值更改行的顺序 #### 创建DataFrame 可以使用数据源创建DataFrame,也可以创建临时视图。 示例: ```python df = spark.read.format("json").load("lxyd.json") df.createOrReplaceTempiew("dftable") ``` 我们也可以通过获取一组行并将他们转换操作为一个DataFrame来即时创建一个Dataframe。 示例: ```python # 添加此代码 import findspark findspark.init() # 用于查找本机spark配置位置 from pyspark import SparkConf, SparkContext from pyspark.sql import Row, SparkSession from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType conf = SparkConf().setAppName("createCS").setMaster("local[*]") sc = SparkContext(conf=conf) spark = SparkSession(sc) mySchame: StructType = StructType([ StructField("name", StringType(), False), StructField("id", LongType(), False), StructField("age", IntegerType(), True) ]) myRow: Row = Row("kirito", 1999, 17) myDf = spark.createDataFrame([myRow], mySchame) myDf.show() # --> # --> +------+----+---+ # --> | name| id|age| # --> +------+----+---+ # --> |kirito|1999| 17| # --> +------+----+---+ ``` #### select函数和selectExpr函数 select函数和selectExpr函数支持在DataFrame上执行类似数据表的SQL的查询。 ##### 单列查询 示例: ```python # 添加此代码 import findspark findspark.init() # 用于查找本机spark配置位置 from pyspark import SparkConf, SparkContext from pyspark.sql import Row, SparkSession from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType conf = SparkConf().setAppName("createCS").setMaster("local[*]") sc = SparkContext(conf=conf) spark = SparkSession(sc) mySchame: StructType = StructType([ StructField("name", StringType(), False), StructField("id", LongType(), False), StructField("age", IntegerType(), True) ]) kirito: Row = Row("kirito", 1999, 17) asina: Row = Row("asina", 2000, 16) allen: Row = Row("allen", 2001, 17) myDf = spark.createDataFrame([kirito, asina, allen], mySchame) # # # # # # # # # # # # # # # # # # # # 查询部分 select部分 单列查询 # # # # # # # # # # # # # # # # # # # myDf.select("name").show() # 执行查询 --> # --> +------+ # --> | name| # --> +------+ # --> |kirito| # --> | asina| # --> | allen| # --> +------+ ``` 多列查询 多列查询只需要在select中调用更多的列名即可。 示例: ```python # 添加此代码 import findspark findspark.init() # 用于查找本机spark配置位置 from pyspark import SparkConf, SparkContext from pyspark.sql import Row, SparkSession from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType conf = SparkConf().setAppName("createCS").setMaster("local[*]") sc = SparkContext(conf=conf) spark = SparkSession(sc) mySchame: StructType = StructType([ StructField("name", StringType(), False), StructField("id", LongType(), False), StructField("age", IntegerType(), True) ]) kirito: Row = Row("kirito", 1999, 17) asina: Row = Row("asina", 2000, 16) allen: Row = Row("allen", 2001, 17) myDf = spark.createDataFrame([kirito, asina, allen], mySchame) # # # # # # # # # # # # # # # # # # # # 查询部分 select部分 多列查询 # # # # # # # # # # # # # # # # # # # myDf.select("name", "age").show() # 执行查询 --> # --> +------+---+ # --> | name|age| # --> +------+---+ # --> |kirito| 17| # --> | asina| 16| # --> | allen| 17| # --> +------+---+ ``` ##### 等价互换 DataFrame可以通过多种不同的方式引用列,而且这些方式可以等价互换。 示例: ```python # 添加此代码 import findspark findspark.init() # 用于查找本机spark配置位置 from pyspark import SparkConf, SparkContext from pyspark.sql import Row, SparkSession from pyspark.sql.functions import col, column, expr from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType conf = SparkConf().setAppName("createCS").setMaster("local[*]") sc = SparkContext(conf=conf) spark = SparkSession(sc) mySchame: StructType = StructType([ StructField("name", StringType(), False), StructField("id", LongType(), False), StructField("age", IntegerType(), True) ]) kirito: Row = Row("kirito", 1999, 17) asina: Row = Row("asina", 2000, 16) allen: Row = Row("allen", 2001, 17) myDf = spark.createDataFrame([kirito, asina, allen], mySchame) # # # # # # # # # # # # # # # # # # # # 查询部分 select部分 列引用 # # # # # # # # # # # # # # # # # # # myDf.select(expr("id"), col("name"), column("age")).show() # 执行查询 --> # --> +----+------+---+ # --> | id| name|age| # --> +----+------+---+ # --> |1999|kirito| 17| # --> |2000| asina| 16| # --> |2001| allen| 17| # --> +----+------+---+ ``` 需要注意的是:column对象和字符串类型不能够一起混用,否则会报错。 示例: ```python # 添加此代码 import findspark findspark.init() # 用于查找本机spark配置位置 from pyspark import SparkConf, SparkContext from pyspark.sql import Row, SparkSession from pyspark.sql.functions import col, column, expr from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType conf = SparkConf().setAppName("createCS").setMaster("local[*]") sc = SparkContext(conf=conf) spark = SparkSession(sc) mySchame: StructType = StructType([ StructField("name", StringType(), False), StructField("id", LongType(), False), StructField("age", IntegerType(), True) ]) kirito: Row = Row("kirito", 1999, 17) asina: Row = Row("asina", 2000, 16) allen: Row = Row("allen", 2001, 17) myDf = spark.createDataFrame([kirito, asina, allen], mySchame) # # # # # # # # # # # # # # # # # # # # 查询部分 select部分 混用 # # # # # # # # # # # # # # # # # # # myDf.select(col("name"), "age").show() # 这里在python中执行的时候并没有报错,可能是scala和java中特有的情况。 ``` ##### 操作 expr是相当灵活的引用方式。它能够改引用一列,也能够引用对列进行操作的字符串表达式。 示例: ```python # 添加此代码 import findspark findspark.init() # 用于查找本机spark配置位置 from pyspark import SparkConf, SparkContext from pyspark.sql import Row, SparkSession from pyspark.sql.functions import col, column, expr from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType conf = SparkConf().setAppName("createCS").setMaster("local[*]") sc = SparkContext(conf=conf) spark = SparkSession(sc) mySchame: StructType = StructType([ StructField("name", StringType(), False), StructField("id", LongType(), False), StructField("age", IntegerType(), True) ]) kirito: Row = Row("kirito", 1999, 17) asina: Row = Row("asina", 2000, 16) allen: Row = Row("allen", 2001, 17) myDf = spark.createDataFrame([kirito, asina, allen], mySchame) # # # # # # # # # # # # # # # # # # # # 查询部分 select部分 expr灵活应用 # # # # # # # # # # # # # # # # # # # myDf.select(expr("name as actor")).show(0) # 直接引用字符串表达式,通过as方法进行重命名列 myDf.select(expr("name as actor").alias("name")).show(0) # 也可以通过调用方法来进行操作,通过调用alias方法来进行重命名。 # 执行查询 --> # 执行查询 --> # --> +-----+ # --> |actor| # --> +-----+ # --> +----+ # --> |name| # --> +----+ ``` ##### selectExpr 因为select后面使用expr是非常常见的写法,所以spark有一个有效地描述此操作序列的接口:selectExpr。 示例: ```python # 添加此代码 import findspark findspark.init() # 用于查找本机spark配置位置 from pyspark import SparkConf, SparkContext from pyspark.sql import Row, SparkSession from pyspark.sql.functions import col, column, expr from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType conf = SparkConf().setAppName("createCS").setMaster("local[*]") sc = SparkContext(conf=conf) spark = SparkSession(sc) mySchame: StructType = StructType([ StructField("name", StringType(), False), StructField("id", LongType(), False), StructField("age", IntegerType(), True) ]) kirito: Row = Row("kirito", 1999, 17) asina: Row = Row("asina", 2000, 16) allen: Row = Row("allen", 2001, 17) myDf = spark.createDataFrame([kirito, asina, allen], mySchame) # # # # # # # # # # # # # # # # # # # # 查询部分 select部分 selectExpr的使用 # # # # # # # # # # # # # # # # # # # myDf.selectExpr("name as actor").show() # 执行查询 --> # --> +------+ # --> | actor| # --> +------+ # --> |kirito| # --> | asina| # --> | allen| # --> +------+ myDf.select("name as actor").show() # 执行查询 --> 会报错 # --> pyspark.sql.utils.AnalysisException: "cannot resolve '`name as actor`' given input columns: [name, id, age];;\n'Project ['name as actor]\n+- LogicalRDD [name#0, id#1L, age#2], false\n" ``` #### 转换操作成spark类型(字面量) **有时候需要给spark传递显式的值,它们只是一个值而非新列。**这可能是一个常量,或是接下来需要比较的值。 **传递值的方式是通过字面量(literal)传递。字面量就是表达式。** 示例: ```python # 添加此代码 import findspark findspark.init() # 用于查找本机spark配置位置 from pyspark import SparkConf, SparkContext from pyspark.sql import Row, SparkSession from pyspark.sql.functions import lit from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType conf = SparkConf().setAppName("createCS").setMaster("local[*]") sc = SparkContext(conf=conf) spark = SparkSession(sc) mySchame: StructType = StructType([ StructField("name", StringType(), False), StructField("id", LongType(), False), StructField("age", IntegerType(), True) ]) kirito: Row = Row("kirito", 1999, 17) asina: Row = Row("asina", 2000, 16) allen: Row = Row("allen", 2001, 17) myDf = spark.createDataFrame([kirito, asina, allen], mySchame) # # # # # # # # # # # # # # # # # # # # 字面量 # # # # # # # # # # # # # # # # # # # myDf.select("name", lit(1).alias("value")).show() # --> # --> +------+-----+ # --> | name|value| # --> +------+-----+ # --> |kirito| 1| # --> | asina| 1| # --> | allen| 1| # --> +------+-----+ ``` 当需要比较一个值是否大于一个常量或者是程序创建的变量时,就可以使用这个方法。 #### 添加列 使`withColumn`方法可以为DataFrame添加新列,这种方式也更加规范一些。 注意: 1. 常用列对象:' 、$ 、col 、column。 2. withColumn的第二个参数要传入已有列的Column对象,否则会报错。 3. sql.functions.lit()函数,返回的也是列对象,可以传入任意参数值。 示例: ```python # 添加此代码 import findspark findspark.init() # 用于查找本机spark配置位置 from pyspark import SparkConf, SparkContext from pyspark.sql import Row, SparkSession from pyspark.sql.functions import col, column, expr, lit from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType conf = SparkConf().setAppName("createCS").setMaster("local[*]") sc = SparkContext(conf=conf) spark = SparkSession(sc) mySchame: StructType = StructType([ StructField("name", StringType(), False), StructField("id", LongType(), False), StructField("age", IntegerType(), True) ]) kirito: Row = Row("kirito", 1999, 17) asina: Row = Row("asina", 2000, 16) allen: Row = Row("allen", 2001, 17) myDf = spark.createDataFrame([kirito, asina, allen], mySchame) # # # # # # # # # # # # # # # # # # # # 添加列 # # # # # # # # # # # # # # # # # # # myDf.withColumn("atk",lit(1)).show() # --> # --> +------+----+---+---+ # --> | name| id|age|atk| # --> +------+----+---+---+ # --> |kirito|1999| 17| 1| # --> | asina|2000| 16| 1| # --> | allen|2001| 17| 1| # --> +------+----+---+---+ ``` #### 重命名列 在spark中,有两种重命名列的方法,一种是通过withColumn对列重命名,一种是通过withColumnRenamed方法对列重命名。 withColumn: ```python # 添加此代码 import findspark findspark.init() # 用于查找本机spark配置位置 from pyspark import SparkConf, SparkContext from pyspark.sql import Row, SparkSession from pyspark.sql.functions import lit,expr,col from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType conf = SparkConf().setAppName("createCS").setMaster("local[*]") sc = SparkContext(conf=conf) spark = SparkSession(sc) mySchame: StructType = StructType([ StructField("name", StringType(), False), StructField("id", LongType(), False), StructField("age", IntegerType(), True) ]) kirito: Row = Row("kirito", 1999, 17) asina: Row = Row("asina", 2000, 16) allen: Row = Row("allen", 2001, 17) myDf = spark.createDataFrame([kirito, asina, allen], mySchame) # # # # # # # # # # # # # # # # # # # # 重命名列 # # # # # # # # # # # # # # # # # # # myDf.withColumn("actor", expr("name")).show() # --> # --> +------+----+---+------+ # --> | name| id|age| actor| # --> +------+----+---+------+ # --> |kirito|1999| 17|kirito| # --> | asina|2000| 16| asina| # --> | allen|2001| 17| allen| # --> +------+----+---+------+ ``` withColumnRenamed: ```python # # # # # # # # # # # # # # # # # # # # 重命名列 # # # # # # # # # # # # # # # # # # # myDf.withColumnRenamed("name", "actor").show() # --> # --> +------+----+---+ # --> | actor| id|age| # --> +------+----+---+ # --> |kirito|1999| 17| # --> | asina|2000| 16| # --> | allen|2001| 17| # --> +------+----+---+ ``` #### 保留字与关键字 在spark程序的开发中,遇到列名中包含空格或者连字符等保留字,要处理这些保留字符号意味着要适当地对列名进行转义。 **在spark中,通过使用反引号( ` )字符来实现转义。** 示例: 没有使用转义符: ```python # 添加此代码 import findspark findspark.init() # 用于查找本机spark配置位置 from pyspark import SparkConf, SparkContext from pyspark.sql import Row, SparkSession from pyspark.sql.functions import lit,expr,col from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType conf = SparkConf().setAppName("createCS").setMaster("local[*]") sc = SparkContext(conf=conf) spark = SparkSession(sc) mySchame: StructType = StructType([ StructField("actor name", StringType(), False), StructField("id", LongType(), False), StructField("age", IntegerType(), True) ]) kirito: Row = Row("kirito", 1999, 17) asina: Row = Row("asina", 2000, 16) allen: Row = Row("allen", 2001, 17) myDf = spark.createDataFrame([kirito, asina, allen], mySchame) # # # # # # # # # # # # # # # # # # # # 查询部分 select部分 表达式转义 # # # # # # # # # # # # # # # # # # # myDf.select(expr("actor name as name")).show() # --> # --> pyspark.sql.utils.ParseException: "\nmismatched input 'as' expecting (line 1, pos 11)\n\n== SQL ==\nactor name as name\n-----------^^^\n" ``` 使用转义符: ```python # # # # # # # # # # # # # # # # # # # # 查询部分 select部分 表达式转义 # # # # # # # # # # # # # # # # # # # myDf.select(expr("`actor name` as name")).show() # --> # --> +------+ # --> | name | # --> +------+ # --> |kirito| # --> | asina| # --> | allen| # --> +------+ ``` 注意: - 显式地引用列,可以直接引用带有保留字符的类。如`col("actor name")`。 - 只需要转义使用保留字符或者关键字的表达式。 #### 区分大小写 spark默认是不区分大小写的,但是可以通过配置`spark.sql.caseSensitive`使spark区分大小写。 示例: ```python set spark.sql.caseSensitive true ``` #### 删除列 spark可以通过select进行类似于sql的查询操作,同样也可以使用drop方法来进行删除列。 示例: ```python # 添加此代码 import findspark findspark.init() # 用于查找本机spark配置位置 from pyspark import SparkConf, SparkContext from pyspark.sql import Row, SparkSession from pyspark.sql.functions import lit,expr,col from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType conf = SparkConf().setAppName("createCS").setMaster("local[*]") sc = SparkContext(conf=conf) spark = SparkSession(sc) mySchame: StructType = StructType([ StructField("actor name", StringType(), False), StructField("id", LongType(), False), StructField("age", IntegerType(), True) ]) kirito: Row = Row("kirito", 1999, 17) asina: Row = Row("asina", 2000, 16) allen: Row = Row("allen", 2001, 17) myDf = spark.createDataFrame([kirito, asina, allen], mySchame) # # # # # # # # # # # # # # # # # # # # 删除列 # # # # # # # # # # # # # # # # # # # myDf.drop("id").show() # --> # --> +----------+---+ # --> |actor name|age| # --> +----------+---+ # --> | kirito| 17| # --> | asina| 16| # --> | allen| 17| # --> +----------+---+ ``` #### 更改列的类型(强制类型转换) 在spark中,可以通过更改列的类型来转换数据类型。所需要使用到的一个方法是`cast`方法。 示例: ```python # 添加此代码 import findspark findspark.init() # 用于查找本机spark配置位置 from pyspark import SparkConf, SparkContext from pyspark.sql import Row, SparkSession from pyspark.sql.functions import lit,expr,col from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType conf = SparkConf().setAppName("createCS").setMaster("local[*]") sc = SparkContext(conf=conf) spark = SparkSession(sc) mySchame: StructType = StructType([ StructField("name", StringType(), False), StructField("id", LongType(), False), StructField("age", IntegerType(), True) ]) kirito: Row = Row("kirito", 1999, 17) asina: Row = Row("asina", 2000, 16) allen: Row = Row("allen", 2001, 17) myDf = spark.createDataFrame([kirito, asina, allen], mySchame) # # # # # # # # # # # # # # # # # # # # 更改列的类型 # # # # # # # # # # # # # # # # # # # myDf.withColumn("name",col("name").cast("long")).show() # --> # --> +----+----+---+ # --> |name| id|age| # --> +----+----+---+ # --> |null|1999| 17| # --> |null|2000| 16| # --> |null|2001| 17| # --> +----+----+---+ ``` #### 过滤行 过滤行只需要创建一个表达式来判断这个表达式是true还是false,然后过滤掉返回值为false的表达式。 spark提供了两种实现过滤的方式,分别为where和filter。 示例: ```python # 添加此代码 import findspark findspark.init() # 用于查找本机spark配置位置 from pyspark import SparkConf, SparkContext from pyspark.sql import Row, SparkSession from pyspark.sql.functions import lit,expr,col from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType conf = SparkConf().setAppName("createCS").setMaster("local[*]") sc = SparkContext(conf=conf) spark = SparkSession(sc) mySchame: StructType = StructType([ StructField("name", StringType(), False), StructField("id", LongType(), False), StructField("age", IntegerType(), True) ]) kirito: Row = Row("kirito", 1999, 17) asina: Row = Row("asina", 2000, 16) allen: Row = Row("allen", 2001, 17) myDf = spark.createDataFrame([kirito, asina, allen], mySchame) # # # # # # # # # # # # # # # # # # # # 过滤列 # # # # # # # # # # # # # # # # # # # myDf.filter(expr("age < 17")).show() # --> # --> +-----+----+---+ # --> | name| id|age| # --> +-----+----+---+ # --> |asina|2000| 16| # --> +-----+----+---+ ``` ```python # # # # # # # # # # # # # # # # # # # # 过滤列 # # # # # # # # # # # # # # # # # # # myDf.where(expr("age > 16")).show() # --> # --> +------+----+---+ # --> | name| id|age| # --> +------+----+---+ # --> |kirito|1999| 17| # --> | allen|2001| 17| # --> +------+----+---+ ``` #### 去重 在DataFrame中去重,可以使用distinct方法。 示例: ```python # 添加此代码 import findspark findspark.init() # 用于查找本机spark配置位置 from pyspark import SparkConf, SparkContext from pyspark.sql import Row, SparkSession from pyspark.sql.functions import lit,expr,col from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType conf = SparkConf().setAppName("createCS").setMaster("local[*]") sc = SparkContext(conf=conf) spark = SparkSession(sc) mySchame: StructType = StructType([ StructField("name", StringType(), False), StructField("id", LongType(), False), StructField("age", IntegerType(), True) ]) kirito: Row = Row("kirito", 1999, 17) asina: Row = Row("asina", 2000, 16) allen: Row = Row("allen", 2001, 17) shadow: Row = Row("allen", 2001, 17) myDf = spark.createDataFrame([kirito, asina, allen, shadow], mySchame) # # # # # # # # # # # # # # # # # # # # 去重 # # # # # # # # # # # # # # # # # # # myDf.select("*").distinct().show() # --> # --> +------+----+---+ # --> | name| id|age| # --> +------+----+---+ # --> | allen|2001| 17| # --> | asina|2000| 16| # --> |kirito|1999| 17| # --> +------+----+---+ ``` #### 随机抽样 在spark中,可以使用`sample`方法来进行随即抽样的工作。 sample(withReplacement,fraction,seed)有三个参数,一般来说只需要前两个参数。 参数解释: - withReplacement:该参数表示了是否放回抽样数据,true为放回抽样,false则是不放回(无重复样本抽样)。 - fraction:该参数表示了抽取样本占据数据总数的比例,该参数的值在0-1之间。 - seed:该参数用于取定值,即填入该参数后,sample每一次取出的样本都会是一样的。 示例: ```python # 添加此代码 import findspark findspark.init() # 用于查找本机spark配置位置 from pyspark import SparkConf, SparkContext from pyspark.sql import Row, SparkSession from pyspark.sql.functions import lit,expr,col from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType conf = SparkConf().setAppName("createCS").setMaster("local[*]") sc = SparkContext(conf=conf) spark = SparkSession(sc) mySchame: StructType = StructType([ StructField("name", StringType(), False), StructField("id", LongType(), False), StructField("age", IntegerType(), True) ]) kirito: Row = Row("kirito", 1999, 17) asina: Row = Row("asina", 2000, 16) allen: Row = Row("allen", 2001, 17) shadow: Row = Row("allen", 2001, 17) myDf = spark.createDataFrame([kirito, asina, allen, shadow], mySchame) # # # # # # # # # # # # # # # # # # # # 随即抽样 # # # # # # # # # # # # # # # # # # # seed = 5 withReplacement = False fraction = 0.3 myDf.sample(withReplacement,fraction,seed).show() # --> # --> +-----+----+---+ # --> | name| id|age| # --> +-----+----+---+ # --> |asina|2000| 16| # --> |allen|2001| 17| # --> +-----+----+---+ ``` #### 随机分割 当需要将DataFrame随机分割为多个分片时,可以使用随机分割(randomSplit)。 randomSplit(list(),send)有两个参数,一般只需要用第一个参数即可。 参数解释: - list():表示一个列表,列表中的参数的和必须在0-1之间。 - seed:该参数用于取定值,即填入该参数后,randomSplit每一次分割的结果都会是一样的。 示例: ```python # 添加此代码 import findspark findspark.init() # 用于查找本机spark配置位置 from pyspark import SparkConf, SparkContext from pyspark.sql import Row, SparkSession from pyspark.sql.functions import lit, expr, col from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType conf = SparkConf().setAppName("createCS").setMaster("local[*]") sc = SparkContext(conf=conf) spark = SparkSession(sc) mySchame: StructType = StructType([ StructField("name", StringType(), False), StructField("id", LongType(), False), StructField("age", IntegerType(), True) ]) kirito: Row = Row("kirito", 1999, 17) asina: Row = Row("asina", 2000, 16) allen: Row = Row("allen", 2001, 17) shadow: Row = Row("allen", 2001, 17) myDf = spark.createDataFrame([kirito, asina, allen, shadow], mySchame) # # # # # # # # # # # # # # # # # # # # 随机分割 # # # # # # # # # # # # # # # # # # # seed = 1 fgDf = myDf.randomSplit([0.25, 0.75],seed) fgDf[0].show() # --> # --> +-----+----+---+ # --> | name| id|age| # --> +-----+----+---+ # --> |allen|2001| 17| # --> +-----+----+---+ ``` #### 连接和追加行(联合操作) DataFrame是一个不可变的数据类型。即,在创建了DataFrame后,不能给DataFrame追加行。不过可以通过两个DataFrame联合操作(union)的方式实现该效果。 常见的作法有两种:一是创建新的临时试图,二是注册成一个数据表。 注意: - 联合操作是基于位置而不是基于schame模式来进行合并的。 - 联合操作不会自动根据列名匹配对齐后再进行合并。 - 两个联合的DataFrame需要完全相同的模式和列数。 示例: ```python # 添加此代码 import findspark findspark.init() # 用于查找本机spark配置位置 from pyspark import SparkConf, SparkContext from pyspark.sql import Row, SparkSession from pyspark.sql.functions import lit, expr, col from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType conf = SparkConf().setAppName("createCS").setMaster("local[*]") sc = SparkContext(conf=conf) spark = SparkSession(sc) mySchame: StructType = StructType([ StructField("name", StringType(), False), StructField("id", LongType(), False), StructField("age", IntegerType(), True) ]) kirito: Row = Row("kirito", 1999, 17) asina: Row = Row("asina", 2000, 16) allen: Row = Row("allen", 2001, 17) shadow: Row = Row("allen", 2001, 17) myDf = spark.createDataFrame([kirito, asina, allen, shadow], mySchame) youSchame = StructType([ StructField("name",StringType(),False), StructField("id",LongType(),False), StructField("age",IntegerType(),True) ]) Edward: Row = Row("Edward", 2333, 17) Ella: Row = Row("Ella", 2233, 16) Deval: Row = Row("Deval", 1987, 17) edmond: Row = Row("edmond", 1132, 17) youDf = spark.createDataFrame([Edward, Ella, Deval, edmond], youSchame) # # # # # # # # # # # # # # # # # # # # 连接和追加行(联合操作) # # # # # # # # # # # # # # # # # # # newDf = myDf.union(youDf) newDf.show() # --> # --> +------+----+---+ # --> | name| id|age| # --> +------+----+---+ # --> |kirito|1999| 17| # --> | asina|2000| 16| # --> | allen|2001| 17| # --> | allen|2001| 17| # --> |Edward|2333| 17| # --> | Ella|2233| 16| # --> | Deval|1987| 17| # --> |edmond|1132| 17| # --> +------+----+---+ ``` 小技巧:在scala中,可以使用=!=运算符,该运算符不仅可以比较字符串,也可以比较表达式。 #### 行排序 在DataFrame中进行排序的时候,可以使用`sort`和`orderBy`方法。这两个方法是互相等价的,执行的方式也一样。**默认设置是按升序排序。**如果要更明确地指定排序方式,则需要使用asc函数和desc函数。 提示: 1. 一个高级的技巧:可以指定空值在排序中的位置。 - asc_nulls_first和desc_nulls_first都表示空值排在最前面,asc_nulls_last和desc_nulls_last都表示空值排在最后面。 2. 出于性能优化的目的,最好在进行别的转换之前,先对每个分区进行内部排序。可以使用`sortWithinPartitions`方法进行这一操作。 注意: - sort和orderBy方法,这两个方法是互相等价的,执行的方式也一样。 - sort和orderBy方法,两个可以接收字符串也可以接收列表达式。 - sort和orderBy方法,两个方法可以接收多列。 - 默认设置是按升序排序。 - 如果要更明确地指定排序方式,则需要使用asc函数和desc函数。 示例: ```python # 添加此代码 import findspark findspark.init() # 用于查找本机spark配置位置 from pyspark import SparkConf, SparkContext from pyspark.sql import Row, SparkSession from pyspark.sql.functions import lit, expr, col from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType conf = SparkConf().setAppName("createCS").setMaster("local[*]") sc = SparkContext(conf=conf) spark = SparkSession(sc) mySchame: StructType = StructType([ StructField("name", StringType(), False), StructField("id", LongType(), False), StructField("age", IntegerType(), True) ]) kirito: Row = Row("kirito", 1999, 17) asina: Row = Row("asina", 2000, 16) allen: Row = Row("allen", 2001, 17) shadow: Row = Row("allen", 2001, 17) myDf = spark.createDataFrame([kirito, asina, allen, shadow], mySchame) youSchame = StructType([ StructField("name", StringType(), False), StructField("id", LongType(), False), StructField("age", IntegerType(), True) ]) Edward: Row = Row("Edward", 2333, 17) Ella: Row = Row("Ella", 2233, 16) Deval: Row = Row("Deval", 1987, 17) edmond: Row = Row("edmond", 1132, 17) youDf = spark.createDataFrame([Edward, Ella, Deval, edmond], youSchame) # # # # # # # # # # # # # # # # # # # # 行排序 sort和orderBy # # # # # # # # # # # # # # # # # # # youDf.sort("age").show() # --> # --> +------+----+---+ # --> | name| id|age| # --> +------+----+---+ # --> | Ella|2233| 16| # --> |Edward|2333| 17| # --> | Deval|1987| 17| # --> |edmond|1132| 17| # --> +------+----+---+ youDf.orderBy("age").show() # --> # --> +------+----+---+ # --> | name| id|age| # --> +------+----+---+ # --> | Ella|2233| 16| # --> | Deval|1987| 17| # --> |edmond|1132| 17| # --> |Edward|2333| 17| # --> +------+----+---+ ``` ```python # # # # # # # # # # # # # # # # # # # # 行排序 使用列表达式 # # # # # # # # # # # # # # # # # # # youDf.sort(expr("age")).show() # --> # --> +------+----+---+ # --> | name| id|age| # --> +------+----+---+ # --> | Ella|2233| 16| # --> |edmond|1132| 17| # --> | Deval|1987| 17| # --> |Edward|2333| 17| # --> +------+----+---+ youDf.orderBy(col("age")).show() # --> # --> +------+----+---+ # --> | name| id|age| # --> +------+----+---+ # --> | Ella|2233| 16| # --> | Deval|1987| 17| # --> |Edward|2333| 17| # --> |edmond|1132| 17| # --> +------+----+---+ ``` ```python # # # # # # # # # # # # # # # # # # # # 行排序 多列排序 # # # # # # # # # # # # # # # # # # # youDf.sort("age", "name").show() # --> # --> +------+----+---+ # --> | name| id|age| # --> +------+----+---+ # --> | Ella|2233| 16| # --> | Deval|1987| 17| # --> |Edward|2333| 17| # --> |edmond|1132| 17| # --> +------+----+---+ youDf.orderBy("age", "name").show() # --> # --> +------+----+---+ # --> | name| id|age| # --> +------+----+---+ # --> | Ella|2233| 16| # --> | Deval|1987| 17| # --> |Edward|2333| 17| # --> |edmond|1132| 17| # --> +------+----+---+ ``` ```python # # # # # # # # # # # # # # # # # # # # 行排序 使用asc和desc # # # # # # # # # # # # # # # # # # # youDf.sort(asc("age")).show() # --> # --> +------+----+---+ # --> | name| id|age| # --> +------+----+---+ # --> | Ella|2233| 16| # --> |Edward|2333| 17| # --> | Deval|1987| 17| # --> |edmond|1132| 17| # --> +------+----+---+ youDf.orderBy(desc("age")).show() # --> # --> +------+----+---+ # --> | name| id|age| # --> +------+----+---+ # --> |edmond|1132| 17| # --> | Deval|1987| 17| # --> |Edward|2333| 17| # --> | Ella|2233| 16| # --> +------+----+---+ youDf.sort(expr("age").desc()).show() # --> # --> +------+----+---+ # --> | name| id|age| # --> +------+----+---+ # --> |edmond|1132| 17| # --> |Edward|2333| 17| # --> | Deval|1987| 17| # --> | Ella|2233| 16| # --> +------+----+---+ ``` ```python # # # # # # # # # # # # # # # # # # # # 行排序 分区内部排序 # # # # # # # # # # # # # # # # # # # youDf.repartition(1).sortWithinPartitions("age").show() # --> # --> +------+----+---+ # --> | name| id|age| # --> +------+----+---+ # --> | Ella|2233| 16| # --> |Edward|2333| 17| # --> | Deval|1987| 17| # --> |edmond|1132| 17| # --> +------+----+---+ # 注意,这里已经使用了repartition进行了重新分区,现在所进行的排序是一个分区内的。 ``` 注意: - 在pyspark下,`sort(expr("age desc"))`写法排序不会起作用。 #### 限制提取limit方法 在dataframe中,可以使用limit方法来限制每次可提取的数据数量。 提示: - 该方法结合排序方法,可以获取数据排名前多少行或后多少行的数据。 示例: ```python # # # # # # # # # # # # # # # # # # # # 限制提取 # # # # # # # # # # # # # # # # # # # youDf.limit(2).show() # --> # --> +------+----+---+ # --> | name| id|age| # --> +------+----+---+ # --> |Edward|2333| 17| # --> | Ella|2233| 16| # --> +------+----+---+ ``` #### 重划分和合并 ##### 重划分 根据一些经常过滤的数据进行分区,是一项重要的优化方案。**控制跨集群数据的物理布局,包括分区方案和分区数。** 注意: - 重新分区会导致数据的全面洗牌。 - 如果将来的分区数大于当前的分区数,或是需要基于某一组特定的列来进行分区时,通常只能重新分区。 - 可以使用rdd下的方法`getNumPartitions`来获取当前dataframe的分区数。 示例: ```python # 添加此代码 import findspark findspark.init() # 用于查找本机spark配置位置 from pyspark import SparkConf, SparkContext from pyspark.sql import Row, SparkSession from pyspark.sql.functions import lit, expr, col, asc, desc from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType conf = SparkConf().setAppName("createCS").setMaster("local[*]") sc = SparkContext(conf=conf) spark = SparkSession(sc) mySchame: StructType = StructType([ StructField("name", StringType(), False), StructField("id", LongType(), False), StructField("age", IntegerType(), True) ]) kirito: Row = Row("kirito", 1999, 17) asina: Row = Row("asina", 2000, 16) allen: Row = Row("allen", 2001, 17) shadow: Row = Row("allen", 2001, 17) myDf = spark.createDataFrame([kirito, asina, allen, shadow], mySchame) youSchame = StructType([ StructField("name", StringType(), False), StructField("id", LongType(), False), StructField("age", IntegerType(), True) ]) Edward: Row = Row("Edward", 2333, 17) Ella: Row = Row("Ella", 2233, 16) Deval: Row = Row("Deval", 1987, 17) edmond: Row = Row("edmond", 1132, 17) youDf = spark.createDataFrame([Edward, Ella, Deval, edmond], youSchame) # # # # # # # # # # # # # # # # # # # # 重划分和合并 重划分 # # # # # # # # # # # # # # # # # # # print(myDf.rdd.getNumPartitions()) # --> 8 selfDf = myDf.repartition(2) print(selfDf.rdd.getNumPartitions()) # --> 2 ``` ```python # # # # # # # # # # # # # # # # # # # # 重划分和合并 根据特定列进行重划分 # # # # # # # # # # # # # # # # # # # print(myDf.rdd.getNumPartitions()) # --> 8 selfDf = myDf.repartition("age") print(selfDf.rdd.getNumPartitions()) # --> 200 ``` ```python # # # # # # # # # # # # # # # # # # # # 重划分和合并 根据特定列指定分区数量进行重划分 # # # # # # # # # # # # # # # # # # # print(myDf.rdd.getNumPartitions()) # --> 8 selfDf = myDf.repartition(2,"age") print(selfDf.rdd.getNumPartitions()) # --> 2 ``` ##### 合并 合并操作使用coalesece方法。合并操作**不会重新洗牌**,但是会尝试合并分区。 注意: - 合并操作不会使合并的分区大于原本的分区。 示例: ```python # # # # # # # # # # # # # # # # # # # # 重划分和合并 合并 # # # # # # # # # # # # # # # # # # # print(myDf.rdd.getNumPartitions()) # --> 8 selfDf = myDf.repartition("age") print(selfDf.rdd.getNumPartitions()) # --> 200 mySelfDf = selfDf.coalesce(1) print(mySelfDf.rdd.getNumPartitions()) # --> 1 ``` ```python # # # # # # # # # # # # # # # # # # # # 重划分和合并 # # # # # # # # # # # # # # # # # # # print(myDf.rdd.getNumPartitions()) # --> 8 mySelfDf = myDf.coalesce(10) print(mySelfDf.rdd.getNumPartitions()) # --> 8 ``` #### 驱动器获取行 spark的驱动器维护着集群状态,有时候需要让驱动器收集一些数据到本地,以方便在本地处理它们。 这个操作并没有明确的定义,有以下几种方法可以实现这个效果: - collect:该函数会获取整个Dataframe的数据。 - take:该函数会获取前N行,并使用show打印一些行。 - toLocalIterator:该函数是一个迭代器,会将每个分区的数据返回给驱动器。该函数允许以串行的方式一个一个分区地迭代整个数据集。 注意: - 将数据集传递给驱动器的代价很高。当数据量很大的时候调用collect函数,可能会导致驱动器崩溃。如果使用toLocalIterator,并且分区很大,则容易使驱动器节点崩溃并丢失应用程序的状态,代价也是巨大的。因此我们可以一个一个分区进行操作,而不是并行运行。 示例: ```python # 添加此代码 import findspark findspark.init() # 用于查找本机spark配置位置 from pyspark import SparkConf, SparkContext from pyspark.sql import Row, SparkSession from pyspark.sql.functions import lit, expr, col, asc, desc from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType conf = SparkConf().setAppName("createCS").setMaster("local[*]") sc = SparkContext(conf=conf) spark = SparkSession(sc) mySchame: StructType = StructType([ StructField("name", StringType(), False), StructField("id", LongType(), False), StructField("age", IntegerType(), True) ]) kirito: Row = Row("kirito", 1999, 17) asina: Row = Row("asina", 2000, 16) allen: Row = Row("allen", 2001, 17) shadow: Row = Row("allen", 2001, 17) myDf = spark.createDataFrame([kirito, asina, allen, shadow], mySchame) youSchame = StructType([ StructField("name", StringType(), False), StructField("id", LongType(), False), StructField("age", IntegerType(), True) ]) Edward: Row = Row("Edward", 2333, 17) Ella: Row = Row("Ella", 2233, 16) Deval: Row = Row("Deval", 1987, 17) edmond: Row = Row("edmond", 1132, 17) youDf = spark.createDataFrame([Edward, Ella, Deval, edmond], youSchame) # # # # # # # # # # # # # # # # # # # # 驱动器获取行 # # # # # # # # # # # # # # # # # # # print(type(myDf.take(2))) # --> print(myDf.take(2)) # --> [Row(name='kirito', id=1999, age=17), Row(name='asina', id=2000, age=16)] ```
Comments | NOTHING