6 Star 16 Fork 10

奥尼尔 / spark_kafka_es

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
spark_kafka 10.00 KB
一键复制 编辑 原始数据 按行查看 历史
奥尼尔 提交于 2016-12-30 00:24 . 新建 spark_kafka
package com.yaochufa.spark.sparkstreaming
import java.beans.Transient
import java.text.SimpleDateFormat
import java.util.{Calendar, Date}
import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo}
import kafka.common.{BrokerNotAvailableException, TopicAndPartition}
import kafka.consumer.SimpleConsumer
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import kafka.utils.{Json, ZKGroupTopicDirs, ZKStringSerializer, ZkUtils}
import model.{EsOrder, MagOrdersDetail, test}
import msql.{mag_order_details, order_price, order_res, order_tuan_price_and_income, package_price, table_wide}
import org.I0Itec.zkclient.ZkClient
import org.apache.commons.lang.StringUtils
import org.apache.commons.lang.time.DateUtils
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.elasticsearch.spark._
import util.JSONUtil
import scala.collection.immutable
/**
* Created by gaolichun on 2016/11/15.
*
*线上参数如下:
* ycf-big5.ycf.com:9090,ycf-big9.ycf.com:9090,ycf-big10.ycf.com:9090
* md_app_log
* sparkDire
* ycf-big1.ycf.com:2181,ycf-big6.ycf.com:2181,ycf-big7.ycf.com:2181,ycf-big8.ycf.com:2181,ycf-big9.ycf.com:2181
* 10.205.0.136:9200
* sparkstreaming/order
* 20
* DirectKafkaDemo
* hbase_orders_temp
*/
object DirectKafkaDemo {
case class Order(order_id:String,order_count:Int)
val table_mag_order_details_not_distinct = "mag_order_details_not_distinct"
val table_mag_order_details = "mag_order_details"
val table_package_price = "package_price"
val table_order_price = "order_price"
val table_order_tuan_price_and_income = "order_tuan_price_and_income"
var table_wide = "table_wide"
val table_res = "res"
def main(args: Array[String]) {
if (args.length < 9 ) {
System.err.println( s"""
|Usage: DirectKafkaDemo <brokers> <topics> <groupid> <zKServers> <esNodes> <esResource> <seconds> <appName> <hbaseTableName>
| <brokers> is a list of one or more Kafka brokers
| <topics> is a list of one or more kafka topics to consume from
| <groupid> is a consume group
| <zKServers>
| <esNodes>
| <esResource>
| <seconds>
| <appName>
| <hbaseTableName>
""")
System.exit(1)
}
Logger.getLogger("org").setLevel(Level.WARN)
val Array(brokers, topics, groupId, zKServers, esNodes,esResource,secords,appName,hbaseTableName) = args
val ssc = setupSsc(brokers, topics, groupId, zKServers, esNodes,esResource,secords,appName,hbaseTableName)()
ssc.start()
ssc.awaitTermination()
}
def setupSsc(
brokers: String,
topics: String,
groupId: String,
zKServers:String,
esNodes:String,
esResource:String,
secords:String,
appName: String,
hbaseTableName:String
)(): StreamingContext = {
val sparkConf = new SparkConf().setAppName(appName)
sparkConf.set("es.nodes",esNodes)
sparkConf.set("es.resource",esResource)
sparkConf.set("es.index.auto.create","true")
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sparkConf.set("spark.streaming.backpressure.enabled","true")
//TODO 本地模式
//sparkConf.setMaster("local[*]")
val ssc = new StreamingContext(sparkConf, Seconds(secords.toLong))
val topicsSeq = topics.split(",").toSeq
val kafkaParams = Map[String, String](
"metadata.broker.list" -> brokers,
"group.id" -> groupId
)
val zkClient = new ZkClient(zKServers,60000,60000,ZKStringSerializer)
val topic2Partitions = ZkUtils.getPartitionsForTopics(zkClient, topicsSeq)
var fromOffsets: Map[TopicAndPartition, Long] = Map()
//获取kafka 上每个topic当前读取到的offset,如果zk上没有记录到offset,就去kafka每个分区上最新的offset作为起点
topic2Partitions.foreach(topic2Partitions => {
val topic:String = topic2Partitions._1
val partitions:Seq[Int] = topic2Partitions._2
val topicDirs = new ZKGroupTopicDirs(groupId ,topic)
partitions.foreach(partition => {
val zkPath = s"${topicDirs.consumerOffsetDir}/$partition"
ZkUtils.makeSurePersistentPathExists(zkClient, zkPath)
val untilOffset = zkClient.readData[String](zkPath)
val tp = TopicAndPartition(topic, partition)
val lastOffset = getMaxOffset(tp,zkClient)
val offset = try {
if (untilOffset == null || untilOffset == "" )
getMaxOffset(tp,zkClient)
else
untilOffset.toLong
} catch {
case e: Exception => getMaxOffset(tp,zkClient)
}
fromOffsets += (tp -> offset)
})
})
val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.topic, mmd.message())
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String,String)](
ssc, kafkaParams, fromOffsets, messageHandler)
//use hive sql
val sqc =new HiveContext(ssc.sparkContext)
//记录每一个TopicAndPartition offset 范围。左闭右开,用于更新到zookeeper
var offsetRanges = Array[OffsetRange]()
messages.transform { rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}.foreachRDD(rdd => {
val messageRdd = xx // json格式(String)rdd
val t = sqc.read.json(messageRdd)
if(t.schema.fields.length > 0){
//DataFrame 注册成临时表
t.registerTempTable(table_mag_order_details_not_distinct)
try {
val tDistict = sqc.sql(sql)//利用sql处理数据
tDistict.registerTempTable(mag_order_details.table_name)
...
val orderRdd = 结果数据
saveToHBase(ordersRdd,zKServers,hbaseTableName)
dropTempTable(sqc,
Array(
table_mag_order_details
))
} catch {
case e: Exception =>
canUpdateOffset = 0
e.printStackTrace()
}
}
}
//更新 topic的相应的分区的偏移量到zookeeper
offsetRanges.filter(x=>{canUpdateOffset == 1}).foreach(o => {
val topicDirs = new ZKGroupTopicDirs(groupId, o.topic)
val zkPath = s"${topicDirs.consumerOffsetDir}/${o.partition}"
ZkUtils.updatePersistentPath(zkClient, zkPath, o.untilOffset.toString)
})
}
)
ssc
}
def getMaxOffset(tp:TopicAndPartition,zkClient: ZkClient):Long = {
val request = OffsetRequest(immutable.Map(tp -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
ZkUtils.getLeaderForPartition(zkClient, tp.topic, tp.partition) match {
case Some(brokerId) => {
ZkUtils.readDataMaybeNull(zkClient, ZkUtils.BrokerIdsPath + "/" + brokerId)._1 match {
case Some(brokerInfoString) => {
Json.parseFull(brokerInfoString) match {
case Some(m) =>
val brokerInfo = m.asInstanceOf[Map[String, Any]]
val host = brokerInfo.get("host").get.asInstanceOf[String]
val port = brokerInfo.get("port").get.asInstanceOf[Int]
new SimpleConsumer(host, port, 10000, 100000, "getMaxOffset")
.getOffsetsBefore(request)
.partitionErrorAndOffsets(tp)
.offsets
.head
case None =>
throw new BrokerNotAvailableException("Broker id %d does not exist".format(brokerId))
}
}
case None =>
throw new BrokerNotAvailableException("Broker id %d does not exist".format(brokerId))
}
}
case None =>
throw new Exception("No broker for partition %s - %s".format(tp.topic, tp.partition))
}
}
def saveToHBase(rdd: RDD[EsOrder], zkQuorum: String, tableName: String) = {
val conf = HBaseConfiguration.create()
//TODO 线上的zookeeper是一致的。线上的使用ycf-big1.ycf.com:2181,ycf-big6.ycf.com:2181,ycf-big7.ycf.com:2181,ycf-big8.ycf.com:2181,ycf-big9.ycf.com:2181
//conf.set("hbase.zookeeper.quorum", "ycf-big-2:2181,ycf-big-5:2181,ycf-big-6:2181,ycf-big-3:2181,ycf-big-4:2181")
//TODO 线上
conf.set("hbase.zookeeper.quorum",zkQuorum)
val job = Job.getInstance(conf)
val jobConf = job.getConfiguration
jobConf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
rdd.map(
x => {
var put = new Put(Bytes.toBytes(x.order_package_detail_id))
put.add(Bytes.toBytes("message"),Bytes.toBytes("order_id"),Bytes.toBytes(x.order_id))
put.add(Bytes.toBytes("message"),Bytes.toBytes("order_no"),Bytes.toBytes(x.order_no))
put.add(Bytes.toBytes("message"),Bytes.toBytes("created_time"),Bytes.toBytes(x.created_time))
put.add(Bytes.toBytes("message"),Bytes.toBytes("created_time_d"),Bytes.toBytes(x.created_time_d))
(new ImmutableBytesWritable,put)
}
).saveAsNewAPIHadoopDataset(job.getConfiguration)
}
}
Scala
1
https://gitee.com/oneal/spark_kafka_es.git
git@gitee.com:oneal/spark_kafka_es.git
oneal
spark_kafka_es
spark_kafka_es
master

搜索帮助