【Spark】解析MySQL原始日志数据到HDFS

import java.util
import ch.hsr.geohash.GeoHash
import com.alibaba.fastjson.JSON
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.{Row, SparkSession}

object Demo04 {

case class LogData(
var guid:Long,
eventid: String,
event: Map[String, String],
uid: String,
imei: String,
mac: String,
imsi: String,
osName: String,
osVer: String,
androidId: String,
resolution: String,
deviceType: String,
deviceId: String,
uuid: String,
appid: String,
appVer: String,
release_ch: String,
promotion_ch: String,
areacode: String,
longtitude: Double,
latitude: Double,
carrier: String,
netType: String,
cid_sn: String,
ip: String,
sessionId: String,
timestamp: Long,
var province:String="unkown",
var city:String="unkown",
var district:String="unkown"
)


def main(args: Array[String]): Unit = {

val spark = SparkSession.builder().appName(this.getClass.getName).master("local[*]").getOrCreate()

val df = spark.read.textFile("hdfs://mycluster/app_log/app.log")
import spark.implicits._

val geoMap = spark.read.parquet("hdfs://mycluster/app_log/geomap").rdd.map({
case Row(geo: String, province: String, city: String, district: String)
=> (geo, (province, city, district))
}).collectAsMap()

val bcGeo = spark.sparkContext.broadcast(geoMap)

val idmpMap = spark.read.parquet("hdfs://mycluster/app_log/idmp").rdd.map({

case Row(id_hashcode: Long, guid: Long) => (id_hashcode, guid)
}).collectAsMap()

val bcIdMap = spark.sparkContext.broadcast(idmpMap)

val result = df.map(line => {

try {
val jonsObject = JSON.parseObject(line);
val eventId = jonsObject.getString("eventid")

import scala.collection.JavaConversions._

val event: Map[String, String] = jonsObject.getJSONObject("event").getInnerMap().asInstanceOf[util.Map[String, String]].toMap

val userObject = jonsObject.getJSONObject("user")
val uid = userObject.getString("uid")
val phoneObject = userObject.getJSONObject("phone")
val imei = phoneObject.getString("imei")
val mac = phoneObject.getString("mac")
val imsi = phoneObject.getString("imsi")
val osName = phoneObject.getString("osName")
val osVer = phoneObject.getString("osVer")
val androidId = phoneObject.getString("androidId")
val resolution = phoneObject.getString("resolution")
val deviceType = phoneObject.getString("deviceType")
val deviceId = phoneObject.getString("deviceId")
val uuid = phoneObject.getString("uuid")
val appObject = userObject.getJSONObject("app")
val appid = appObject.getString("appid")
val appVer = appObject.getString("appVer")
val release_ch = appObject.getString("release_ch")
val promotion_ch = appObject.getString("promotion_ch")
val locObject = userObject.getJSONObject("loc")
val areacode = locObject.getString("areacode")
val longtitude = locObject.getDouble("longtitude")
val latitude = locObject.getDouble("latitude")
val carrier = locObject.getString("carrier")
val netType = locObject.getString("netType")
val cid_sn = locObject.getString("cid_sn")
val ip = locObject.getString("ip")
val sessionId = userObject.getString("sessionId")
val timestamp = jonsObject.getString("timestamp").toLong

val sb = new StringBuilder()
val flagFields = sb.append(uid).append(mac).append(imei).append(imsi).append(androidId).append(uuid).toString().replaceAll("null", "")

var logData: LogData = null
if (StringUtils.isNotBlank(flagFields) && event != null && StringUtils.isNotBlank(sessionId) && StringUtils.isNotBlank(eventId)) {
logData = LogData(Long.MinValue, eventId, event, uid, imei, mac, imsi, osName, osVer, androidId, resolution, deviceType, deviceId, uuid, appid, appVer, release_ch, promotion_ch, areacode, longtitude, latitude, carrier, netType, cid_sn, ip, sessionId, timestamp)
}

logData

} catch {
case e: Exception => null
}

}).filter(_ != null)
.map(bean => {
val geoDict: collection.Map[String, (String, String, String)] = bcGeo.value
val idMapDict: collection.Map[Long, Long] = bcIdMap.value

val longtitude = bean.longtitude
val latitude = bean.latitude

val geo = GeoHash.geoHashStringWithCharacterPrecision(latitude, longtitude, 5)

val maybeTuple = geoDict.get(geo)
if (maybeTuple.isDefined) {
val area: (String, String, String) = maybeTuple.get
bean.province = area._1
bean.city = area._2
bean.district = area._3

}
val ids = Array(bean.imei, bean.imsi, bean.androidId, bean.uuid, bean.mac, bean.uid)

var find = false
for (id <- ids if !find) {
val maybeLong = idMapDict.get(id.hashCode.toLong)
if (maybeLong.isDefined) {
bean.guid = maybeLong.get
find = true
}
}

bean

}).filter(bean => bean.province != "unkown").toDF()

result.coalesce(1).write.parquet("hdfs://mycluster/app_log/log_done")

spark.close()
}
}