首頁 >頭條 > 正文

【熱聞】大數據Flink進階(六):Flink入門案例

2023-03-22 05:16:24來源:騰訊云

Flink入門案例

需求:讀取本地數據文件,統計文件中每個單詞出現的次數。

一、IDEA Project創建及配置

本案例編寫Flink代碼選擇語言為Java和Scala,所以這里我們通過IntelliJ IDEA創建一個目錄,其中包括Java項目模塊和Scala項目模塊,將Flink Java api和Flink Scala api分別在不同項目模塊中實現。步驟如下:


【資料圖】

1、打開IDEA,創建空項目

2、在IntelliJ IDEA 中安裝Scala插件

使用IntelliJ IDEA開發Flink,如果使用Scala api 那么還需在IntelliJ IDEA中安裝Scala的插件,如果已經安裝可以忽略此步驟,下圖為以安裝Scala插件。

3、打開Structure,創建項目新模塊

創建Java模塊:

繼續點擊"+",創建Scala模塊:

創建好"FlinkScalaCode"模塊后,右鍵該模塊添加Scala框架支持,并修改該模塊中的"java"src源為"scala":

在"FlinkScalaCode"模塊Maven pom.xml中引入Scala依賴包,這里使用的Scala版本為2.12.10。

  org.scala-lang  scala-library  2.12.10  org.scala-lang  scala-compiler  2.12.10  org.scala-lang  scala-reflect  2.12.10

4、Log4j日志配置

為了方便查看項目運行過程中的日志,需要在兩個項目模塊中配置log4j.properties配置文件,并放在各自項目src/main/resources資源目錄下,沒有resources資源目錄需要手動創建并設置成資源目錄。log4j.properties配置文件內容如下:

log4j.rootLogger=ERROR, consolelog4j.appender.console=org.apache.log4j.ConsoleAppenderlog4j.appender.console.target=System.outlog4j.appender.console.layout=org.apache.log4j.PatternLayoutlog4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss} %p %c{2}: %m%n

復制

并在兩個項目中的Maven pom.xml中添加對應的log4j需要的依賴包,使代碼運行時能正常打印結果:

  org.slf4j  slf4j-log4j12  1.7.36  org.apache.logging.log4j  log4j-to-slf4j  2.17.2

5、分別在兩個項目模塊中導入Flink Maven依賴

"FlinkJavaCode"模塊導入Flink Maven依賴如下:

  UTF-8  1.8  1.8  1.16.0  1.7.36  2.17.2        org.apache.flink    flink-clients    ${flink.version}          org.slf4j    slf4j-log4j12    ${slf4j.version}        org.apache.logging.log4j    log4j-to-slf4j    ${log4j.version}  

"FlinkScalaCode"模塊導入Flink Maven依賴如下:

  UTF-8  1.8  1.8  1.16.0  1.7.31  2.17.1  2.12.10  2.12        org.apache.flink    flink-scala_${scala.binary.version}    ${flink.version}        org.apache.flink    flink-streaming-scala_${scala.binary.version}    ${flink.version}        org.apache.flink    flink-clients    ${flink.version}          org.scala-lang    scala-library    ${scala.version}        org.scala-lang    scala-compiler    ${scala.version}        org.scala-lang    scala-reflect    ${scala.version}          org.slf4j    slf4j-log4j12    ${slf4j.version}        org.apache.logging.log4j    log4j-to-slf4j    ${log4j.version}  

注意:在后續實現WordCount需求時,Flink Java Api只需要在Maven中導入"flink-clients"依賴包即可,而Flink Scala Api 需要導入以下三個依賴包:

flink-scala_${scala.binary.version}flink-streaming-scala_${scala.binary.version}flink-clients

主要是因為在Flink1.15版本后,Flink添加對opting-out(排除)Scala的支持,如果你只使用Flink的Java api,導入包不必包含scala后綴,如果使用Flink的Scala api,需要選擇匹配的Scala版本。

二、案例數據準備

在項目"MyFlinkCode"中創建"data"目錄,在目錄中創建"words.txt"文件,向文件中寫入以下內容,方便后續使用Flink編寫WordCount實現代碼。

hello Flinkhello MapReducehello Sparkhello Flinkhello Flinkhello Flinkhello Flinkhello Javahello Scalahello Flinkhello Javahello Flinkhello Scalahello Flinkhello Flinkhello Flink

三、案例實現

數據源分為有界和無界之分,有界數據源可以編寫批處理程序,無界數據源可以編寫流式程序。DataSet API用于批處理,DataStream API用于流式處理。

批處理使用ExecutionEnvironment和DataSet,流式處理使用StreamingExecutionEnvironment和DataStream。DataSet和DataStream是Flink中表示數據的特殊類,DataSet處理的數據是有界的,DataStream處理的數據是無界的,這兩個類都是不可變的,一旦創建出來就無法添加或者刪除數據元。

1、Flink 批數據處理案例

Java版本WordCount

使用Flink Java Dataset api實現WordCount具體代碼如下:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//1.讀取文件DataSource linesDS = env.readTextFile("./data/words.txt");//2.切分單詞FlatMapOperator wordsDS =        linesDS.flatMap((String lines, Collector collector) -> {    String[] arr = lines.split(" ");    for (String word : arr) {        collector.collect(word);    }}).returns(Types.STRING);//3.將單詞轉換成Tuple2 KV 類型MapOperator> kvWordsDS =        wordsDS.map(word -> new Tuple2<>(word, 1L)).returns(Types.TUPLE(Types.STRING, Types.LONG));//4.按照key 進行分組處理得到最后結果并打印kvWordsDS.groupBy(0).sum(1).print();

Scala版本WordCount

使用Flink Scala Dataset api實現WordCount具體代碼如下:

//1.準備環境,注意是Scala中對應的Flink環境val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment//2.導入隱式轉換,使用Scala API 時需要隱式轉換來推斷函數操作后的類型import org.apache.flink.api.scala._//3.讀取數據文件val linesDS: DataSet[String] = env.readTextFile("./data/words.txt")//4.進行 WordCount 統計并打印linesDS.flatMap(line => {  line.split(" ")})  .map((_, 1))  .groupBy(0)  .sum(1)  .print()

以上無論是Java api 或者是Scala api 輸出結果如下,顯示的最終結果是統計好的單詞個數。

(hello,15)(Spark,1)(Scala,2)(Java,2)(MapReduce,1)(Flink,10)

2、Flink流式數據處理案例

Java版本WordCount

使用Flink Java DataStream api實現WordCount具體代碼如下:

//1.創建流式處理環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2.讀取文件數據DataStreamSource lines = env.readTextFile("./data/words.txt");//3.切分單詞,設置KV格式數據SingleOutputStreamOperator> kvWordsDS =        lines.flatMap((String line, Collector> collector) -> {    String[] words = line.split(" ");    for (String word : words) {        collector.collect(Tuple2.of(word, 1L));    }}).returns(Types.TUPLE(Types.STRING, Types.LONG));//4.分組統計獲取 WordCount 結果kvWordsDS.keyBy(tp->tp.f0).sum(1).print();//5.流式計算中需要最后執行execute方法env.execute();
Scala版本WordCount

使用Flink Scala DataStream api實現WordCount具體代碼如下:

//1.創建環境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//2.導入隱式轉換,使用Scala API 時需要隱式轉換來推斷函數操作后的類型import org.apache.flink.streaming.api.scala._//3.讀取文件val ds: DataStream[String] = env.readTextFile("./data/words.txt")//4.進行wordCount統計ds.flatMap(line=>{line.split(" ")})  .map((_,1))  .keyBy(_._1)  .sum(1)  .print()//5.最后使用execute 方法觸發執行env.execute()

以上輸出結果開頭展示的是處理當前數據的線程,一個Flink應用程序執行時默認的線程數與當前節點cpu的總線程數有關。

3、DataStream BATCH模式

下面使用Java代碼使用DataStream API 的Batch 模式來處理批WordCount代碼,方式如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//設置批運行模式env.setRuntimeMode(RuntimeExecutionMode.BATCH);DataStreamSource linesDS = env.readTextFile("./data/words.txt");SingleOutputStreamOperator> wordsDS = linesDS.flatMap(new FlatMapFunction>() {    @Override    public void flatMap(String lines, Collector> out) throws Exception {        String[] words = lines.split(" ");        for (String word : words) {            out.collect(new Tuple2<>(word, 1L));        }    }});wordsDS.keyBy(tp -> tp.f0).sum(1).print();env.execute();

以上代碼運行完成之后結果如下,可以看到結果與批處理結果類似,只是多了對應的處理線程號。

3> (hello,15)8> (Flink,10)8> (Spark,1)7> (Java,2)7> (Scala,2)7> (MapReduce,1)

此外,Stream API 中除了可以設置Batch批處理模式之外,還可以設置 AUTOMATIC、STREAMING模式,STREAMING 模式是流模式,AUTOMATIC模式會根據數據是有界流/無界流自動決定采用BATCH/STREAMING模式來讀取數據,設置方式如下:

//BATCH 設置批處理模式env.setRuntimeMode(RuntimeExecutionMode.BATCH);//AUTOMATIC 會根據有界流/無界流自動決定采用BATCH/STREAMING模式env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//STREAMING 設置流處理模式env.setRuntimeMode(RuntimeExecutionMode.STREAMING);

除了在代碼中設置處理模式外,還可以在Flink配置文件(flink-conf.yaml)中設置execution.runtime-mode參數來指定對應的模式,也可以在集群中提交Flink任務時指定execution.runtime-mode來指定,Flink官方建議在提交Flink任務時指定執行模式,這樣減少了代碼配置給Flink Application提供了更大的靈活性,提交任務指定參數如下:

$FLINK_HOME/bin/flink run -Dexecution.runtime-mode=BATCH -c xxx xxx.jar
責任編輯:

標簽:

免責聲明

頭條新聞