- 连接spark命令行,指定jdbc驱动
1 | spark-shell --driver-class-path $HIVE_HOME/lib/mysql-connector-java-5.1.46.jar |
- 导包
1 2 3 4 5 6 7 | import org.apache.spark._ import org.apache.spark.sql._ import spark.implicits._ import java.util.Properties |
- 声明会话,创建mysql连接配置
1 2 3 4 5 6 7 | val spark = SparkSession.builder().appName("test").master("local").getOrCreate() val prop = new Properties() prop.put("user", "root") prop.put("password", "12345") |
- 连接表a
1 2 3 | val conf_a = "jdbc:mysql://localhost:3306/data_a?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC&useSSL=false" val conn_a = spark.read.jdbc(conf_a,"table_a",prop) |
- 连接表b
1 2 3 | val conf_b = "jdbc:mysql://localhost:3306/data_b?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC&useSSL=false" val conn_b = spark.read.jdbc(conf_b,"table_b",prop) |
- 创建临时表
1 2 3 | conn_a.createOrReplaceTempView("a") conn_b.createOrReplaceTempView("b") |
- 创建查询语句
1 | val result = spark.sql("select a.* from a join b on a.cname=b.cname") |
- 执行查询,展示结果
1 | result.show() |
- 结果写入mysql表格
1 | result.write.mode(SaveMode.Append).jdbc(conf_b,"result",prop) |