object Source2Sink {def main(args: Array[String]): Unit = {//獲取環境對象val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//設置并行度,默認為計算機線程數env.setParallelism(1)/*##############從Source獲取數據##############*//*#############對數據的轉化操作#############*//*############數據輸出到Sink############*///執行env.execute()}
}
數據源是什么?
文件、端口、本地、Kafka-Source、自定義source-Mysql
val localCollectionSource: DataStream[String] = env.fromCollection(List("aa bb", "bb cc", "cc dd", "aa aa"))
val localSequenceSource: DataStream[Long] = env.fromSequence(1, 100)
val localElementsSource: DataStream[String] = env.fromElements("aa bb cc dd ee", "aa bb cc dd ee", "aa bb cc")
val socketSource: DataStream[String] = env.socketTextStream("master", 6666)
val FileSource: DataStream[String] = env.readTextFile("src/main/resources/wc.txt")
val props = new Properties()
props.setProperty("bootstrap.servers","master:9092")
props.setProperty("group.id","wanKafkaSourceTest")
val kafkaSource: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]("test", new SimpleStringSchema(), props))
val jdbc_Source: DataStream[user] = env.addSource(new jdbcSource)//自定義Source類繼承自RichParallelSourceFunction
class jdbcSource extends RichParallelSourceFunction[user]{var conn:Connection =_var statement:PreparedStatement = _var flag:Boolean = trueoverride def open(parameters: Configuration): Unit = {conn=DriverManager.getConnection("jdbc:mysql://localhost:3306/test?characterEncoding=utf-8&serverTimezone=UTC&useSSL=false","root","123456")statement = conn.prepareStatement("select * from user")}override def run(ctx: SourceFunction.SourceContext[user]): Unit = {val resultSet: ResultSet = statement.executeQuery()while (resultSet.next()) {ctx.collect(user(resultSet.getInt(1),resultSet.getString(2)))}}override def cancel(): Unit = {}override def close(): Unit = {if(statement!=null)statement.close()if(conn!=null)conn.close()}
}
//數據轉換 WordCount
val result: DataStream[(String, Int)] = FileSource
.flatMap(_.split(" "))
.map((_, 1))
.keyBy(_._1)
.sum(1)
map、filter、flatMap、keyBy、reduce、aggregations…
union
對兩個或者兩個以上的 DataStream 進行合并操作
需要保證兩個數據集的格式是一致的
connect
兩份數據流被 Connect 之后,只是被放在了同一個流中
內部依然保持各自的數據和形式不發生變化,兩份數據相互獨立
coMap、coFlatMap
對ConnectedStreams進行map和flatmap
Split+Select(之前版本) => 側輸出流
通過定義OutputTag標記側輸出流
def main(args: Array[String]): Unit = {val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)//定義側輸出流標簽val oddOutputTag = new OutputTag[Int]("odd")val result: DataStream[Int] = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9)//側輸出流.process(new ProcessFunction[Int, Int] {override def processElement(value: Int, ctx: ProcessFunction[Int, Int]#Context, out: Collector[Int]): Unit = {if (value % 2 == 0) {//輸出到主流out.collect(value)} else {//輸出到側出流ctx.output(oddOutputTag, value)}}})//主流result.print("偶數:")//從主流中獲取側輸出流result.getSideOutput(oddOutputTag).print("奇數")env.execute()
}
隨機分區
dataStream.shuffle
循環分區
dataStream.rebalance
調節分區
dataStream.rescale
數據發往同一個分區
dataStream.global
自定義分區
繼承Partitioner,實現partition方法
自定義分區器定義好了以后,需要調用partitionCustom來應用分區器,并指定分區器使用到的字段
dataStream.partitionCustom(customPartitioner,”filed_name”)
dataStream.partitionCustom(customPartitioner,0)
Flink 包含了各類算子實現UDF函數的抽象類或者接口: MapFunction, FilterFunction, ProcessFunction 等等
可以獲取運行環境的上下文
getRuntimeContext()獲取運行時上下文,例如函數執行的并行度,任務的名字,以及 state 狀態等
etRuntimeContext()設置運行時上下文
擁有生命周期方法
open()初始化方法,當一個算子被調用之前 open()會被調用
close()生命周期中最后調用的方法,做一些清理工作
如:RichMapFunction、RichFlatMapFunction
處理完的數據去哪?
文件、端口、Kafaka-Sink、自定義Sink-Mysql
result.print()
localSequenceSource.print()
result.map(x=>s"${x._1}_${x._2}\n").writeToSocket("slave1",6666,new SimpleStringSchema())
/*
def writeToSocket(hostname: String,port: Integer,//schema 泛型,一個參數schema: SerializationSchema[T]): DataStreamSink[T] = {
stream.writeToSocket(hostname, port, schema)
}
*/
result.writeAsText("src/main/resources/result.txt")
val inputData: DataStream[String] = result.map(x => s"${x._1} ${x._2}")
val kafkaSink = new FlinkKafkaProducer[String]("test1", new SimpleStringSchema(), props)
inputData.addSink(kafkaSink)
jdbc_Source.map(x=>user(x.id,x.name)).addSink(new jdbcSink)//自定義Sink類繼承自RichParallelSourceFunction
class jdbcSink extends RichSinkFunction[user]{var conn:Connection =_var updateStatement:PreparedStatement=_var insertStatement:PreparedStatement=_override def open(parameters: Configuration): Unit = {conn=DriverManager.getConnection("jdbc:mysql://localhost:3306/test?characterEncoding=utf-8&serverTimezone=UTC&useSSL=false","root","123456")updateStatement=conn.prepareStatement("update user_copy set name = ? where id = ?")insertStatement=conn.prepareStatement("insert into user_copy values(?,?)")}override def invoke(value: user, context: SinkFunction.Context): Unit = {updateStatement.setString(1,value.name)updateStatement.setInt(2,value.id)updateStatement.executeUpdate()if(updateStatement.getUpdateCount==0){insertStatement.setInt(1,value.id)insertStatement.setString(2,value.name)insertStatement.executeUpdate()}}override def close(): Unit = {if(insertStatement!=null)insertStatement.close()if(updateStatement!=null)updateStatement.close()if(conn!=null)conn.close()}
}
版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。
工作时间:8:00-18:00
客服电话
电子邮件
admin@qq.com
扫码二维码
获取最新动态