人妖在线一区,国产日韩欧美一区二区综合在线,国产啪精品视频网站免费,欧美内射深插日本少妇

新聞動(dòng)態(tài)

Spark SQL 2.4.8 操作 Dataframe的兩種方式

發(fā)布日期:2022-01-30 20:02 | 文章來(lái)源:腳本之家

一、測(cè)試數(shù)據(jù)

7369,SMITH,CLERK,7902,1980/12/17,800,20
7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
7521,WARD,SALESMAN,7698,1981/2/22,1250,500,30
7566,JONES,MANAGER,7839,1981/4/2,2975,20
7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
7698,BLAKE,MANAGER,7839,1981/5/1,2850,30
7782,CLARK,MANAGER,7839,1981/6/9,2450,10
7788,SCOTT,ANALYST,7566,1987/4/19,3000,20
7839,KING,PRESIDENT,1981/11/17,5000,10
7844,TURNER,SALESMAN,7698,1981/9/8,1500,0,30
7876,ADAMS,CLERK,7788,1987/5/23,1100,20
7900,JAMES,CLERK,7698,1981/12/3,9500,30
7902,FORD,ANALYST,7566,1981/12/3,3000,20
7934,MILLER,CLERK,7782,1982/1/23,1300,10

二、創(chuàng)建DataFrame

方式一:DSL方式操作

  • 實(shí)例化SparkContext和SparkSession對(duì)象
  • 利用StructType類型構(gòu)建schema,用于定義數(shù)據(jù)的結(jié)構(gòu)信息
  • 通過(guò)SparkContext對(duì)象讀取文件,生成RDD
  • 將RDD[String]轉(zhuǎn)換成RDD[Row]
  • 通過(guò)SparkSession對(duì)象創(chuàng)建dataframe
  • 完整代碼如下:
package com.scala.demo.sql
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{DataType, DataTypes, StructField, StructType}
object Demo01 {
  def main(args: Array[String]): Unit = {
    // 1.創(chuàng)建SparkContext和SparkSession對(duì)象
    val sc = new SparkContext(new SparkConf().setAppName("Demo01").setMaster("local[2]"))
    val sparkSession = SparkSession.builder().getOrCreate()
    // 2. 使用StructType來(lái)定義Schema
    val mySchema = StructType(List(
      StructField("empno", DataTypes.IntegerType, false),
      StructField("ename", DataTypes.StringType, false),
      StructField("job", DataTypes.StringType, false),
      StructField("mgr", DataTypes.StringType, false),
      StructField("hiredate", DataTypes.StringType, false),
      StructField("sal", DataTypes.IntegerType, false),
      StructField("comm", DataTypes.StringType, false),
      StructField("deptno", DataTypes.IntegerType, false)
    ))
    // 3. 讀取數(shù)據(jù)
    val empRDD = sc.textFile("file:///D:\\TestDatas\\emp.csv")
    // 4. 將其映射成ROW對(duì)象
    val rowRDD = empRDD.map(line => {
      val strings = line.split(",")
      Row(strings(0).toInt, strings(1), strings(2), strings(3), strings(4), strings(5).toInt,strings(6), strings(7).toInt)
    })
    // 5. 創(chuàng)建DataFrame
    val dataFrame = sparkSession.createDataFrame(rowRDD, mySchema)
    // 6. 展示內(nèi)容 DSL
	dataFrame.groupBy("deptno").sum("sal").as("result").sort("sum(sal)").show()
  }
}

結(jié)果如下:

方式二:SQL方式操作

  • 實(shí)例化SparkContext和SparkSession對(duì)象
  • 創(chuàng)建case class Emp樣例類,用于定義數(shù)據(jù)的結(jié)構(gòu)信息
  • 通過(guò)SparkContext對(duì)象讀取文件,生成RDD[String]
  • 將RDD[String]轉(zhuǎn)換成RDD[Emp]
  • 引入spark隱式轉(zhuǎn)換函數(shù)(必須引入)
  • 將RDD[Emp]轉(zhuǎn)換成DataFrame
  • 將DataFrame注冊(cè)成一張視圖或者臨時(shí)表
  • 通過(guò)調(diào)用SparkSession對(duì)象的sql函數(shù),編寫sql語(yǔ)句
  • 停止資源
  • 具體代碼如下:
package com.scala.demo.sql
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.types.{DataType, DataTypes, StructField, StructType}
// 0. 數(shù)據(jù)分析
// 7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
// 1. 定義Emp樣例類
case class Emp(empNo:Int,empName:String,job:String,mgr:String,hiredate:String,sal:Int,comm:String,deptNo:Int)
object Demo02 {
  def main(args: Array[String]): Unit = {
    // 2. 讀取數(shù)據(jù)將其映射成Row對(duì)象
    val sc = new SparkContext(new SparkConf().setMaster("local[2]").setAppName("Demo02"))
    val mapRdd = sc.textFile("file:///D:\\TestDatas\\emp.csv")
      .map(_.split(","))
    val rowRDD:RDD[Emp] = mapRdd.map(line => Emp(line(0).toInt, line(1), line(2), line(3), line(4), line(5).toInt, line(6), line(7).toInt))
    // 3。創(chuàng)建dataframe
    val spark = SparkSession.builder().getOrCreate()
    // 引入spark隱式轉(zhuǎn)換函數(shù)
    import spark.implicits._
    // 將RDD轉(zhuǎn)成Dataframe
    val dataFrame = rowRDD.toDF
    // 4.2 sql語(yǔ)句操作
    // 1、將dataframe注冊(cè)成一張臨時(shí)表
    dataFrame.createOrReplaceTempView("emp")
    // 2. 編寫sql語(yǔ)句進(jìn)行操作
    spark.sql("select deptNo,sum(sal) as total from emp group by deptNo order by total desc").show()
    // 關(guān)閉資源
    spark.stop()
    sc.stop()
  }
}

結(jié)果如下:

到此這篇關(guān)于Spark SQL 2.4.8 操作 Dataframe的兩種方式的文章就介紹到這了,更多相關(guān)Spark SQL 操作 Dataframe內(nèi)容請(qǐng)搜索本站以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持本站!

國(guó)外穩(wěn)定服務(wù)器

版權(quán)聲明:本站文章來(lái)源標(biāo)注為YINGSOO的內(nèi)容版權(quán)均為本站所有,歡迎引用、轉(zhuǎn)載,請(qǐng)保持原文完整并注明來(lái)源及原文鏈接。禁止復(fù)制或仿造本網(wǎng)站,禁止在非www.sddonglingsh.com所屬的服務(wù)器上建立鏡像,否則將依法追究法律責(zé)任。本站部分內(nèi)容來(lái)源于網(wǎng)友推薦、互聯(lián)網(wǎng)收集整理而來(lái),僅供學(xué)習(xí)參考,不代表本站立場(chǎng),如有內(nèi)容涉嫌侵權(quán),請(qǐng)聯(lián)系alex-e#qq.com處理。

相關(guān)文章

實(shí)時(shí)開(kāi)通

自選配置、實(shí)時(shí)開(kāi)通

免備案

全球線路精選!

全天候客戶服務(wù)

7x24全年不間斷在線

專屬顧問(wèn)服務(wù)

1對(duì)1客戶咨詢顧問(wèn)

在線
客服

在線客服:7*24小時(shí)在線

客服
熱線

400-630-3752
7*24小時(shí)客服服務(wù)熱線

關(guān)注
微信

關(guān)注官方微信
頂部