【Spark】读写MySQL

  • 连接spark命令行,指定jdbc驱动
1
spark-shell --driver-class-path $HIVE_HOME/lib/mysql-connector-java-5.1.46.jar

 

  • 导包
1
2
3
4
5
6
7
import org.apache.spark._

import org.apache.spark.sql._

import spark.implicits._

import java.util.Properties

 

  • 声明会话,创建mysql连接配置
1
2
3
4
5
6
7
val spark = SparkSession.builder().appName("test").master("local").getOrCreate()

val prop = new Properties()

prop.put("user", "root")

prop.put("password", "12345")

 

  • 连接表a
1
2
3
val conf_a = "jdbc:mysql://localhost:3306/data_a?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC&useSSL=false"

val conn_a = spark.read.jdbc(conf_a,"table_a",prop)

 

  • 连接表b
1
2
3
val conf_b = "jdbc:mysql://localhost:3306/data_b?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC&useSSL=false"

val conn_b = spark.read.jdbc(conf_b,"table_b",prop)

 

  • 创建临时表
1
2
3
conn_a.createOrReplaceTempView("a")

conn_b.createOrReplaceTempView("b")

 

  • 创建查询语句
1
val result = spark.sql("select a.* from a join b on a.cname=b.cname")

 

  • 执行查询,展示结果
1
result.show()

 

  • 结果写入mysql表格
1
result.write.mode(SaveMode.Append).jdbc(conf_b,"result",prop)