import org.apache.log4j.{Level, Logger} import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.broadcast.Broadcast import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.classification.LogisticRegressionModel import org.apache.spark.rdd.RDD import org.apache.spark.sql.DataFrame import org.apache.spark.sql.SQLContext import org.apache.spark.sql.functions.udf import org.apache.spark.streaming.{StreamingContext, Milliseconds} import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.kafka.KafkaUtils import org.elasticsearch.spark.sql._ import org.joda.time.DateTime import kafka.serializer.StringDecoder import scala.reflect.runtime.universe /** * Kafkaから配信されるセンサデータから動作種別を判定して、Elasticsearchに格納する. * 動作種別の判定には学習済みのロジスティック回帰モデルを使用する. */ object HumanActivityClassifier { def main(args : Array[String]) { val sparkConf = new SparkConf().setAppName("HumanActivityClassifier") val sc = new SparkContext(sparkConf) val ssc = new StreamingContext(sc, Milliseconds(5000)) // 中間データの保存場所を設定 ssc.checkpoint("/tmp/checkpoint") // ログレベルを設定 Logger.getRootLogger.setLevel(Level.WARN) // ロジスティック回帰モデルを読み込む val lrModel = LogisticRegressionModel.load(sc, "/model/lrModel") val bLRModel = sc.broadcast(lrModel) // Kafkaからデータを読み出し val kafkaDStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, Map("bootstrap.servers" -> "kafka01:9092,kafka02:9093"), Set("human-activity")) // トピック名を除去 val seonsorJsonDStream = kafkaDStream.map(_._2) // キャッシュする seonsorJsonDStream.cache() // 動作種別を判定してElasticsearchに出力 seonsorJsonDStream.foreachRDD{ jsonRDD => val sqlContext = SQLContext.getOrCreate(jsonRDD.sparkContext) import sqlContext.implicits._ val jsonDF = jsonRDD.isEmpty match { case true => // 空データ sqlContext.emptyDataFrame case _ => // JSON文字列の入ったRDDからDataFrameを生成 val sensorDataDF = sqlContext.read.json(jsonRDD) // 時刻を変換するUDFを定義 val formatDate = udf((time: String) => formatTime(time)) // 動作種別を判定するUDFを定義 val activitiy = udf((data: String) => predictActivity(bLRModel.value, data)) // 変換を実行 val predictDataDF = sensorDataDF.select( $"user", formatDate($"time") as "datetime", activitiy($"data") as "activity") predictDataDF } // Elasticsearchに格納 jsonDF.saveToEs("spark/predictions") } ssc.start() ssc.awaitTermination() } /** * 時刻を文字列表記に変換する */ private def formatTime(time: String): String = { new DateTime(time.toLong).toString("yyyy/MM/dd HH:mm:ss.SSS") } /** * センサデータを動作種別に変換する */ private def predictActivity(lrModel: LogisticRegressionModel, data: String): String = { // データをスペースで分割してベクトル化 val sensorDataVector = Vectors.dense(data.trim.split("[\\s]+").map(_.toDouble)) // モデルを使用して動作種別を判定 val activityID = lrModel.predict(sensorDataVector) // 文字列に変換 val activitiy = activityID match { case 0 => "WALKING" case 1 => "WALKING_UPSTAIRS" case 2 => "WALKING_DOWNSTAIRS" case 3 => "SITTING" case 4 => "STANDING" case 5 => "LAYING" } activitiy } }