您的当前位置:首页正文

使用sparksql往kafka推送数据

2020-12-31 来源:步旅网
使⽤sparksql往kafka推送数据

⼀、相关配置参数

1.同级⽬录resource⽂件夹下配置

brokers_list=kafkaxxx02broker01:9092,kafkaxxx02broker02:9092,kafkaxxx02broker03:9092

2.topic:

last_member_info

3.流程

从hive表中读取相关字段,封装成json格式,抛kafka⼆、相关代码(scala)

package kafka

import java.io.InputStream

import java.text.SimpleDateFormat

import java.util.{Date, HashMap, Properties}

import com.google.gson.JsonObject

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}import org.apache.spark.SparkConf

import org.apache.spark.sql.SparkSession

object DakaMemProducer { val prop = new Properties()

val is: InputStream = this.getClass().getResourceAsStream(\"/conf.properties\") prop.load(is)

val environment_broker_list = \"brokers_list\";

private val brokers = prop.getProperty(environment_broker_list) // Zookeeper connection properties

private val props = new HashMap[String, Object]()

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, \"org.apache.kafka.common.serialization.StringSerializer\")

props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, \"org.apache.kafka.common.serialization.StringSerializer\")

private val producer = new KafkaProducer[String, String](this.props)

def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setAppName(\"DakaMemProducer\")

val spark = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate() val date = new Date(new Date().getTime - 86400000L) val dateFormat = new SimpleDateFormat(\"yyyyMMdd\") val statisDate = dateFormat.format(date)

val querySql1 = \"select member_id,flag,nick_name,nick_type from xxx_db.xxx_table where statis_date = \" + statisDate // 离线数据 val resultDF1 = spark.sql(querySql1) resultDF1.rdd.foreach(row => {

val member_id: String = row.getAs[String](\"member_id\").toString() val flag: String = row.getAs[String](\"flag\").toString()

val nick_name: String = row.getAs[String](\"nick_name\").toString() val nick_type: String = row.getAs[String](\"nick_type\").toString() val json = new JsonObject()

json.addProperty(\"memberId\ json.addProperty(\"flag\

json.addProperty(\"nickName\ json.addProperty(\"nickType\ kafkaProducerSend(json.toString) })

if(!environment_broker_list.contains(\"prd\")){ resultDF1.show(100) }

def kafkaProducerSend(args: String) { if (args != null) {

val topic = \"last_member_info\"

val message = new ProducerRecord[String, String](topic, null, args) producer.send(message) } } }}

因篇幅问题不能全部显示,请点此查看更多更全内容