Flink-常用Source與Sink的使用匯總整理

 2023-12-12 阅读 28 评论 0

摘要:Flink-常用Source與Sink的使用匯總整理基礎結構Source本地數據源端口數據源文件數據源Kafka-Source自定義JDBC-Source數據處理SingleDataStreamMultiDataStream側輸出流分區算子UDFFunctionRichFunctionSink本地Sink端口Sink文件SinkKafka-Sink自定義JDBC-Sink 基礎結構 obje

Flink-常用Source與Sink的使用匯總整理

  • 基礎結構
  • Source
    • 本地數據源
    • 端口數據源
    • 文件數據源
    • Kafka-Source
    • 自定義JDBC-Source
  • 數據處理
    • SingleDataStream
    • MultiDataStream
    • 側輸出流
    • 分區算子
    • UDF
      • Function
      • RichFunction
  • Sink
    • 本地Sink
    • 端口Sink
    • 文件Sink
    • Kafka-Sink
    • 自定義JDBC-Sink

基礎結構

請添加圖片描述

object Source2Sink {def main(args: Array[String]): Unit = {//獲取環境對象val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//設置并行度,默認為計算機線程數env.setParallelism(1)/*##############從Source獲取數據##############*//*#############對數據的轉化操作#############*//*############數據輸出到Sink############*///執行env.execute()}
}

Source

數據源是什么?

文件、端口、本地、Kafka-Source、自定義source-Mysql

本地數據源

val localCollectionSource: DataStream[String] = env.fromCollection(List("aa bb", "bb cc", "cc dd", "aa aa"))
val localSequenceSource: DataStream[Long] = env.fromSequence(1, 100)
val localElementsSource: DataStream[String] = env.fromElements("aa bb cc dd ee", "aa bb cc dd ee", "aa bb cc")

端口數據源

val socketSource: DataStream[String] = env.socketTextStream("master", 6666)

文件數據源

val FileSource: DataStream[String] = env.readTextFile("src/main/resources/wc.txt")

Kafka-Source

val props = new Properties()
props.setProperty("bootstrap.servers","master:9092")
props.setProperty("group.id","wanKafkaSourceTest")
val kafkaSource: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]("test", new SimpleStringSchema(), props))

自定義JDBC-Source

val jdbc_Source: DataStream[user] = env.addSource(new jdbcSource)//自定義Source類繼承自RichParallelSourceFunction
class jdbcSource extends RichParallelSourceFunction[user]{var conn:Connection =_var statement:PreparedStatement = _var flag:Boolean = trueoverride def open(parameters: Configuration): Unit = {conn=DriverManager.getConnection("jdbc:mysql://localhost:3306/test?characterEncoding=utf-8&serverTimezone=UTC&useSSL=false","root","123456")statement = conn.prepareStatement("select * from user")}override def run(ctx: SourceFunction.SourceContext[user]): Unit = {val resultSet: ResultSet = statement.executeQuery()while (resultSet.next()) {ctx.collect(user(resultSet.getInt(1),resultSet.getString(2)))}}override def cancel(): Unit = {}override def close(): Unit = {if(statement!=null)statement.close()if(conn!=null)conn.close()}
}

數據處理

//數據轉換 WordCount
val result: DataStream[(String, Int)] = FileSource
.flatMap(_.split(" "))
.map((_, 1))
.keyBy(_._1)
.sum(1)

SingleDataStream

map、filter、flatMap、keyBy、reduce、aggregations…

MultiDataStream

  1. union

    對兩個或者兩個以上的 DataStream 進行合并操作
    需要保證兩個數據集的格式是一致的

請添加圖片描述

  1. connect

    兩份數據流被 Connect 之后,只是被放在了同一個流中
    內部依然保持各自的數據和形式不發生變化,兩份數據相互獨立

請添加圖片描述

  1. coMap、coFlatMap

    對ConnectedStreams進行map和flatmap

請添加圖片描述

  1. Split+Select(之前版本) => 側輸出流

    通過定義OutputTag標記側輸出流
    請添加圖片描述

側輸出流

def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)//定義側輸出流標簽val oddOutputTag = new OutputTag[Int]("odd")val result: DataStream[Int] = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9)//側輸出流.process(new ProcessFunction[Int, Int] {override def processElement(value: Int, ctx: ProcessFunction[Int, Int]#Context, out: Collector[Int]): Unit = {if (value % 2 == 0) {//輸出到主流out.collect(value)} else {//輸出到側出流ctx.output(oddOutputTag, value)}}})//主流result.print("偶數:")//從主流中獲取側輸出流result.getSideOutput(oddOutputTag).print("奇數")env.execute()
}

分區算子

  1. 隨機分區

    dataStream.shuffle 
    
  2. 循環分區

    dataStream.rebalance 
    
  3. 調節分區

    dataStream.rescale 
    
  4. 數據發往同一個分區

    dataStream.global 
    
  5. 自定義分區

    繼承Partitioner,實現partition方法

    自定義分區器定義好了以后,需要調用partitionCustom來應用分區器,并指定分區器使用到的字段

    dataStream.partitionCustom(customPartitioner,”filed_name”)
    dataStream.partitionCustom(customPartitioner,0) 
    

UDF

Function

Flink 包含了各類算子實現UDF函數的抽象類或者接口: MapFunction, FilterFunction, ProcessFunction 等等

RichFunction

  • 可以獲取運行環境的上下文

    getRuntimeContext()獲取運行時上下文,例如函數執行的并行度,任務的名字,以及 state 狀態等

    etRuntimeContext()設置運行時上下文

  • 擁有生命周期方法

    open()初始化方法,當一個算子被調用之前 open()會被調用

    close()生命周期中最后調用的方法,做一些清理工作

如:RichMapFunction、RichFlatMapFunction

Sink

處理完的數據去哪?

文件、端口、Kafaka-Sink、自定義Sink-Mysql

本地Sink

result.print()
localSequenceSource.print()

端口Sink

result.map(x=>s"${x._1}_${x._2}\n").writeToSocket("slave1",6666,new SimpleStringSchema())
/*
def writeToSocket(hostname: String,port: Integer,//schema 泛型,一個參數schema: SerializationSchema[T]): DataStreamSink[T] = { 
stream.writeToSocket(hostname, port, schema)
}
*/

文件Sink

result.writeAsText("src/main/resources/result.txt")

Kafka-Sink

val inputData: DataStream[String] = result.map(x => s"${x._1} ${x._2}")
val kafkaSink = new FlinkKafkaProducer[String]("test1", new SimpleStringSchema(), props)
inputData.addSink(kafkaSink)

自定義JDBC-Sink

jdbc_Source.map(x=>user(x.id,x.name)).addSink(new jdbcSink)//自定義Sink類繼承自RichParallelSourceFunction
class jdbcSink extends RichSinkFunction[user]{var conn:Connection =_var updateStatement:PreparedStatement=_var insertStatement:PreparedStatement=_override def open(parameters: Configuration): Unit = {conn=DriverManager.getConnection("jdbc:mysql://localhost:3306/test?characterEncoding=utf-8&serverTimezone=UTC&useSSL=false","root","123456")updateStatement=conn.prepareStatement("update user_copy set name = ? where id = ?")insertStatement=conn.prepareStatement("insert into user_copy values(?,?)")}override def invoke(value: user, context: SinkFunction.Context): Unit = {updateStatement.setString(1,value.name)updateStatement.setInt(2,value.id)updateStatement.executeUpdate()if(updateStatement.getUpdateCount==0){insertStatement.setInt(1,value.id)insertStatement.setString(2,value.name)insertStatement.executeUpdate()}}override def close(): Unit = {if(insertStatement!=null)insertStatement.close()if(updateStatement!=null)updateStatement.close()if(conn!=null)conn.close()}
}

版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。

原文链接:https://808629.com/196323.html

发表评论:

本站为非赢利网站,部分文章来源或改编自互联网及其他公众平台,主要目的在于分享信息,版权归原作者所有,内容仅供读者参考,如有侵权请联系我们删除!

Copyright © 2022 86后生记录生活 Inc. 保留所有权利。

底部版权信息