Java 数据流之Broadcast State

网友投稿 275 2022-09-27


Java 数据流之Broadcast State

一、BroadcastState 的介绍

广播状态(Broadcast State)是 Operator State 的一种特殊类型。如果我们需要将配置 、规则等低吞吐事件流广播到下游所有 Task 时,就可以使用 BroadcastState。下游的 Task 接收这些配置、规则并保存为 BroadcastState,所有Task 中的状态保持一致,作用于另一个数据流的计算中。

简单理解:一个低吞吐量流包含一组规则,我们想对来自另一个流的所有元素基于此规则进行评估。

场景:动态更新计算规则。

广播状态与其他操作符状态的区别在于:

它有一个 map 格式,用于定义存储结构

它仅对具有广播流和非广播流输入的特定操作符可用

这样的操作符可以具有不同名称的多个广播状态

二、BroadcastState 操作流程

三、案例实现

从端口读取json数据作为事件流

从mysql读取数据作为广播流

关联广播流和事件流

匹配对应的用户信息

package cn.kgc.brbcYoxRKSoadcast

import java.sql.{Connection, DriverManager, PreparedStatement}

import com.alibaba.fastjson.JSON

import org.apache.flink.api.common.state.{BroadcastState, MapStateDescriptor}

import org.apache.flink.configuration.Configuration

import org.apache.flink.streaming.api.datastream.BroadcastStream

import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction

import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}

import org.apache.flink.streaming.api.scala._

import org.apache.flink.util.Collector

// (001,'tom',18,'北京',15830010002)

// 定义样例类 接受 MySQL的用户数据

case class BaseUserInfo(id:Long,name:String,age:Int,city:String,phone:Long)

// user_id、user_name、user_addrss、behaviour、url

// 输出数据类型

case class UserVisitInfo(id:Long,name:String,city:String,behaviour:String,url:String)

// 实现广播ProcessFunction

class MyBroadcastFunc extends BroadcastProcessFunction[String,(Long, BaseUserInfo),UserVisitInfo]{

lazy val mapStateDes = new MapStateDescriptor[Long, BaseUserInfo]("mapState",classOf[Long],classOf[BaseUserInfo])

// 处理的是日志流中的每条数据

override def processElement(value: String, ctx: BroadcastProcessFunction[String, (Long, BaseUserInfo), UserVisitInfo]#ReadOnlyContext, out: Collector[UserVisitInfo]): Unit = {

// {"user_id":"001","ts":"2021-07-10 11:10:05","behaviour":"browse","url":"https://tb1.com/1.html"}

val user_id = JSON.parseObject(value).getLong("user_id")

val behaviour = JSON.parseObject(value).getString("behaviour")

val url = JSON.parseObject(value).getString("url")

val mapState = ctx.getBroadcastState(mapStateDes)

val userInfo = mapState.get(user_id)

out.collect(UserVisitInfo(user_id,userInfo.name,userInfo.city,behaviour,url))

}

// 处理的是广播流的每个值

override def processBroadcastElement(value: (Long, BaseUserInfo), ctx: BroadcastProcessFunction[String, (Long, BaseUserInfo), UserVisitInfo]#Context, out: Collector[UserVisitInfo]): Unit = {

val mapState: BroadcastState[Long, BaseUserInfo] = ctx.getBroadcastState(mapStateDes)

mapState.put(value._1,value._2)

}

}

class UserSourceFunc extends RichParallelSourceFunction[BaseUserInfo]{

var conn:Connection = _

var statement: PreparedStatement = _

var flag:Boolean = true

override def open(parameters: Configuration): Unit = {

conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?characterEncoding=utf-8&serverTimezone=UTC","root","liu911223")

statement = conn.prepareStatement("select * from base_user")

}

override def run(ctx: SourceFunction.SourceContext[BaseUserInfo]): Unit = {

while (flag){

Thread.sleep(5000)

val resultSet = statement.executeQuery()

while (resultSet.next()){

val id = resultSet.getLong(1)

val name = resultSet.getString(2)

val age = resultSet.getInt(3)

val city = resultSet.getString(4)

val phone = resultSet.getLong(5)

ctx.collect(BaseUserInfo(id,name,age,city,phone))

}

}

}

override def cancel(): Unit = {

flag = false

}

override def close(): Unit = {

if (statement != null) statement.close()

if (conn != null) conn.close()

}

}

object BroadcastDemo01 {

def main(args: Array[String]): Unit = {

val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setParallelism(1)

// 定义为KV,一方面是为了广播的时候定义为map,另一方面是为了做关联操作

val userBaseDS: DataStream[(Long, BaseUserInfo)] = env.addSource(new UserSourceFunc)

.map(user => (user.id, user))

val mapStateDes = new MapStateDescriptor[Long, BaseUserInfo]("mapState",classOf[Long],classOf[BaseUserInfo])

val broadCastStream: BroadcastStream[(Long, BaseUserInfo)] = userBaseDS.broadcast(mapStateDes)

// 日志JSON数据

val dataInfoDS: DataStream[String] = env.socketTextStream("master",1314)

dataInfoDS.connect(broadCastStream)

.process(new MyBroadcastFunc)

.print()

env.execute()

}

}


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:路由交换-vlan聚合配置(vlan间路由配置)
下一篇:HCNA RIPv2路由协议与认证模式配置(黑翅女奥)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~