|
|
<
1、BroadcastState 的介绍
播送形态(Broadcast State)是 Operator State 的一种特别范例。假如我们需求将设置 、划定规矩等低吞吐变乱流播送到下流一切 Task 时,就能够利用 BroadcastState。下流的 Task 吸取那些设置、划定规矩并保留为 BroadcastState,一切Task 中的形态连结分歧,感化于另外一个数据流的策画中。
简朴了解:一个低吞吐量流包含一组划定规矩,我们念对去自另外一个流的一切元素基于此划定规矩停止评价。
场景:静态更新策画划定规矩。
播送形态取其他操纵符形态的区分正在于:
- 它有一个 map 格局,用于界说存储构造
- 它仅对具有播送流战非播送流输进的特定操纵符可用
- 如许的操纵符能够具有不同称号的多个播送形态
2、BroadcastState 操纵流程
3、案例完成
- 从端心读与Json数据做为变乱流
- 从Mysql读与数据做为播送流
- 联系关系播送流战变乱流
- 婚配对应的用户疑息
- package cn.kgc.broadcast
- import java.sql.{Connection, DriverManager, PreparedStatement}
- import com.alibaba.fastjson.JSON
- import org.apache.flink.api.common.state.{BroadcastState, MapStateDescriptor}
- import org.apache.flink.configuration.Configuration
- import org.apache.flink.streaming.api.datastream.BroadcastStream
- import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction
- import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}
- import org.apache.flink.streaming.api.scala._
- import org.apache.flink.util.Collector
- // (001,'tom',18,'北京',15830010002)
- // 界说样例类 承受 MySQL的用户数据
- case class BaseUserInfo(id:Long,name:String,age:Int,city:String,phone:Long)
- // user_id、user_name、user_addrss、behaviour、url
- // 输出数据范例
- case class UserVisitInfo(id:Long,name:String,city:String,behaviour:String,url:String)
- // 完成播送ProcessFunction
- class MyBroadcastFunc extends BroadcastProcessFunction[String,(Long, BaseUserInfo),UserVisitInfo]{
- lazy val mapStateDes = new MapStateDescriptor[Long, BaseUserInfo]("mapState",classOf[Long],classOf[BaseUserInfo])
- // 处置的是日记流中的每条数据
- override def processElement(value: String, ctx: BroadcastProcessFunction[String, (Long, BaseUserInfo), UserVisitInfo]#ReadOnlyContext, out: Collector[UserVisitInfo]): Unit = {
- // {"user_id":"001","ts":"2021-07-10 11:10:05","behaviour":"browse","url":"https://www.tb1.com/1.html"}
- val user_id = JSON.parseObject(value).getLong("user_id")
- val behaviour = JSON.parseObject(value).getString("behaviour")
- val url = JSON.parseObject(value).getString("url")
- val mapState = ctx.getBroadcastState(mapStateDes)
- val userInfo = mapState.get(user_id)
- out.collect(UserVisitInfo(user_id,userInfo.name,userInfo.city,behaviour,url))
- }
- // 处置的是播送流的每一个值
- override def processBroadcastElement(value: (Long, BaseUserInfo), ctx: BroadcastProcessFunction[String, (Long, BaseUserInfo), UserVisitInfo]#Context, out: Collector[UserVisitInfo]): Unit = {
- val mapState: BroadcastState[Long, BaseUserInfo] = ctx.getBroadcastState(mapStateDes)
- mapState.put(value._1,value._2)
- }
- }
- class UserSourceFunc extends RichParallelSourceFunction[BaseUserInfo]{
- var conn:Connection = _
- var statement: PreparedStatement = _
- var flag:Boolean = true
- override def open(parameters: Configuration): Unit = {
- conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?characterEncoding=utf-8&serverTimezone=UTC","root","liu911223")
- statement = conn.prepareStatement("select * from base_user")
- }
- override def run(ctx: SourceFunction.SourceContext[BaseUserInfo]): Unit = {
- while (flag){
- Thread.sleep(5000)
- val resultSet = statement.executeQuery()
- while (resultSet.next()){
- val id = resultSet.getLong(1)
- val name = resultSet.getString(2)
- val age = resultSet.getInt(3)
- val city = resultSet.getString(4)
- val phone = resultSet.getLong(5)
- ctx.collect(BaseUserInfo(id,name,age,city,phone))
- }
- }
- }
- override def cancel(): Unit = {
- flag = false
- }
- override def close(): Unit = {
- if (statement != null) statement.close()
- if (conn != null) conn.close()
- }
- }
- object BroadcastDemo01 {
- def main(args: Array[String]): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setParallelism(1)
- // 界说为KV,一圆里是为了播送的时分界说为map,另外一圆里是为了做联系关系操纵
- val userBaseDS: DataStream[(Long, BaseUserInfo)] = env.addSource(new UserSourceFunc)
- .map(user => (user.id, user))
- val mapStateDes = new MapStateDescriptor[Long, BaseUserInfo]("mapState",classOf[Long],classOf[BaseUserInfo])
- val broadCastStream: BroadcastStream[(Long, BaseUserInfo)] = userBaseDS.broadcast(mapStateDes)
- // 日记JSON数据
- val dataInfoDS: DataStream[String] = env.socketTextStream("master",1314)
- dataInfoDS.connect(broadCastStream)
- .process(new MyBroadcastFunc)
- .print()
- env.execute()
- }
- }
复造代码
免责声明:假如进犯了您的权益,请联络站少,我们会实时删除侵权内乱容,感谢协作! |
1、本网站属于个人的非赢利性网站,转载的文章遵循原作者的版权声明,如果原文没有版权声明,按照目前互联网开放的原则,我们将在不通知作者的情况下,转载文章;如果原文明确注明“禁止转载”,我们一定不会转载。如果我们转载的文章不符合作者的版权声明或者作者不想让我们转载您的文章的话,请您发送邮箱:Cdnjson@163.com提供相关证明,我们将积极配合您!
2、本网站转载文章仅为传播更多信息之目的,凡在本网站出现的信息,均仅供参考。本网站将尽力确保所提供信息的准确性及可靠性,但不保证信息的正确性和完整性,且不对因信息的不正确或遗漏导致的任何损失或损害承担责任。
3、任何透过本网站网页而链接及得到的资讯、产品及服务,本网站概不负责,亦不负任何法律责任。
4、本网站所刊发、转载的文章,其版权均归原作者所有,如其他媒体、网站或个人从本网下载使用,请在转载有关文章时务必尊重该文章的著作权,保留本网注明的“稿件来源”,并自负版权等法律责任。
|