import java.util
import ch.hsr.geohash.GeoHash
import com.alibaba.fastjson.JSON
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.{Row, SparkSession}
object Demo04 {
case class LogData(
var guid:Long,
eventid: String,
event: Map[String, String],
uid: String,
imei: String,
mac: String,
imsi: String,
osName: String,
osVer: String,
androidId: String,
resolution: String,
deviceType: String,
deviceId: String,
uuid: String,
appid: String,
appVer: String,
release_ch: String,
promotion_ch: String,
areacode: String,
longtitude: Double,
latitude: Double,
carrier: String,
netType: String,
cid_sn: String,
ip: String,
sessionId: String,
timestamp: Long,
var province:String="unkown",
var city:String="unkown",
var district:String="unkown"
)
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName(this.getClass.getName).master("local[*]").getOrCreate()
val df = spark.read.textFile("hdfs://mycluster/app_log/app.log")
import spark.implicits._
val geoMap = spark.read.parquet("hdfs://mycluster/app_log/geomap").rdd.map({
case Row(geo: String, province: String, city: String, district: String)
=> (geo, (province, city, district))
}).collectAsMap()
val bcGeo = spark.sparkContext.broadcast(geoMap)
val idmpMap = spark.read.parquet("hdfs://mycluster/app_log/idmp").rdd.map({
case Row(id_hashcode: Long, guid: Long) => (id_hashcode, guid)
}).collectAsMap()
val bcIdMap = spark.sparkContext.broadcast(idmpMap)
val result = df.map(line => {
try {
val jonsObject = JSON.parseObject(line);
val eventId = jonsObject.getString("eventid")
import scala.collection.JavaConversions._
val event: Map[String, String] = jonsObject.getJSONObject("event").getInnerMap().asInstanceOf[util.Map[String, String]].toMap
val userObject = jonsObject.getJSONObject("user")
val uid = userObject.getString("uid")
val phoneObject = userObject.getJSONObject("phone")
val imei = phoneObject.getString("imei")
val mac = phoneObject.getString("mac")
val imsi = phoneObject.getString("imsi")
val osName = phoneObject.getString("osName")
val osVer = phoneObject.getString("osVer")
val androidId = phoneObject.getString("androidId")
val resolution = phoneObject.getString("resolution")
val deviceType = phoneObject.getString("deviceType")
val deviceId = phoneObject.getString("deviceId")
val uuid = phoneObject.getString("uuid")
val appObject = userObject.getJSONObject("app")
val appid = appObject.getString("appid")
val appVer = appObject.getString("appVer")
val release_ch = appObject.getString("release_ch")
val promotion_ch = appObject.getString("promotion_ch")
val locObject = userObject.getJSONObject("loc")
val areacode = locObject.getString("areacode")
val longtitude = locObject.getDouble("longtitude")
val latitude = locObject.getDouble("latitude")
val carrier = locObject.getString("carrier")
val netType = locObject.getString("netType")
val cid_sn = locObject.getString("cid_sn")
val ip = locObject.getString("ip")
val sessionId = userObject.getString("sessionId")
val timestamp = jonsObject.getString("timestamp").toLong
val sb = new StringBuilder()
val flagFields = sb.append(uid).append(mac).append(imei).append(imsi).append(androidId).append(uuid).toString().replaceAll("null", "")
var logData: LogData = null
if (StringUtils.isNotBlank(flagFields) && event != null && StringUtils.isNotBlank(sessionId) && StringUtils.isNotBlank(eventId)) {
logData = LogData(Long.MinValue, eventId, event, uid, imei, mac, imsi, osName, osVer, androidId, resolution, deviceType, deviceId, uuid, appid, appVer, release_ch, promotion_ch, areacode, longtitude, latitude, carrier, netType, cid_sn, ip, sessionId, timestamp)
}
logData
} catch {
case e: Exception => null
}
}).filter(_ != null)
.map(bean => {
val geoDict: collection.Map[String, (String, String, String)] = bcGeo.value
val idMapDict: collection.Map[Long, Long] = bcIdMap.value
val longtitude = bean.longtitude
val latitude = bean.latitude
val geo = GeoHash.geoHashStringWithCharacterPrecision(latitude, longtitude, 5)
val maybeTuple = geoDict.get(geo)
if (maybeTuple.isDefined) {
val area: (String, String, String) = maybeTuple.get
bean.province = area._1
bean.city = area._2
bean.district = area._3
}
val ids = Array(bean.imei, bean.imsi, bean.androidId, bean.uuid, bean.mac, bean.uid)
var find = false
for (id <- ids if !find) {
val maybeLong = idMapDict.get(id.hashCode.toLong)
if (maybeLong.isDefined) {
bean.guid = maybeLong.get
find = true
}
}
bean
}).filter(bean => bean.province != "unkown").toDF()
result.coalesce(1).write.parquet("hdfs://mycluster/app_log/log_done")
spark.close()
}
}
import ch.hsr.geohash.GeoHash
import com.alibaba.fastjson.JSON
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.{Row, SparkSession}
object Demo04 {
case class LogData(
var guid:Long,
eventid: String,
event: Map[String, String],
uid: String,
imei: String,
mac: String,
imsi: String,
osName: String,
osVer: String,
androidId: String,
resolution: String,
deviceType: String,
deviceId: String,
uuid: String,
appid: String,
appVer: String,
release_ch: String,
promotion_ch: String,
areacode: String,
longtitude: Double,
latitude: Double,
carrier: String,
netType: String,
cid_sn: String,
ip: String,
sessionId: String,
timestamp: Long,
var province:String="unkown",
var city:String="unkown",
var district:String="unkown"
)
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName(this.getClass.getName).master("local[*]").getOrCreate()
val df = spark.read.textFile("hdfs://mycluster/app_log/app.log")
import spark.implicits._
val geoMap = spark.read.parquet("hdfs://mycluster/app_log/geomap").rdd.map({
case Row(geo: String, province: String, city: String, district: String)
=> (geo, (province, city, district))
}).collectAsMap()
val bcGeo = spark.sparkContext.broadcast(geoMap)
val idmpMap = spark.read.parquet("hdfs://mycluster/app_log/idmp").rdd.map({
case Row(id_hashcode: Long, guid: Long) => (id_hashcode, guid)
}).collectAsMap()
val bcIdMap = spark.sparkContext.broadcast(idmpMap)
val result = df.map(line => {
try {
val jonsObject = JSON.parseObject(line);
val eventId = jonsObject.getString("eventid")
import scala.collection.JavaConversions._
val event: Map[String, String] = jonsObject.getJSONObject("event").getInnerMap().asInstanceOf[util.Map[String, String]].toMap
val userObject = jonsObject.getJSONObject("user")
val uid = userObject.getString("uid")
val phoneObject = userObject.getJSONObject("phone")
val imei = phoneObject.getString("imei")
val mac = phoneObject.getString("mac")
val imsi = phoneObject.getString("imsi")
val osName = phoneObject.getString("osName")
val osVer = phoneObject.getString("osVer")
val androidId = phoneObject.getString("androidId")
val resolution = phoneObject.getString("resolution")
val deviceType = phoneObject.getString("deviceType")
val deviceId = phoneObject.getString("deviceId")
val uuid = phoneObject.getString("uuid")
val appObject = userObject.getJSONObject("app")
val appid = appObject.getString("appid")
val appVer = appObject.getString("appVer")
val release_ch = appObject.getString("release_ch")
val promotion_ch = appObject.getString("promotion_ch")
val locObject = userObject.getJSONObject("loc")
val areacode = locObject.getString("areacode")
val longtitude = locObject.getDouble("longtitude")
val latitude = locObject.getDouble("latitude")
val carrier = locObject.getString("carrier")
val netType = locObject.getString("netType")
val cid_sn = locObject.getString("cid_sn")
val ip = locObject.getString("ip")
val sessionId = userObject.getString("sessionId")
val timestamp = jonsObject.getString("timestamp").toLong
val sb = new StringBuilder()
val flagFields = sb.append(uid).append(mac).append(imei).append(imsi).append(androidId).append(uuid).toString().replaceAll("null", "")
var logData: LogData = null
if (StringUtils.isNotBlank(flagFields) && event != null && StringUtils.isNotBlank(sessionId) && StringUtils.isNotBlank(eventId)) {
logData = LogData(Long.MinValue, eventId, event, uid, imei, mac, imsi, osName, osVer, androidId, resolution, deviceType, deviceId, uuid, appid, appVer, release_ch, promotion_ch, areacode, longtitude, latitude, carrier, netType, cid_sn, ip, sessionId, timestamp)
}
logData
} catch {
case e: Exception => null
}
}).filter(_ != null)
.map(bean => {
val geoDict: collection.Map[String, (String, String, String)] = bcGeo.value
val idMapDict: collection.Map[Long, Long] = bcIdMap.value
val longtitude = bean.longtitude
val latitude = bean.latitude
val geo = GeoHash.geoHashStringWithCharacterPrecision(latitude, longtitude, 5)
val maybeTuple = geoDict.get(geo)
if (maybeTuple.isDefined) {
val area: (String, String, String) = maybeTuple.get
bean.province = area._1
bean.city = area._2
bean.district = area._3
}
val ids = Array(bean.imei, bean.imsi, bean.androidId, bean.uuid, bean.mac, bean.uid)
var find = false
for (id <- ids if !find) {
val maybeLong = idMapDict.get(id.hashCode.toLong)
if (maybeLong.isDefined) {
bean.guid = maybeLong.get
find = true
}
}
bean
}).filter(bean => bean.province != "unkown").toDF()
result.coalesce(1).write.parquet("hdfs://mycluster/app_log/log_done")
spark.close()
}
}