批处理任务

依赖

  <properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <flink.version>1.12.3</flink.version>
    <scala.binary.version>2.11</scala.binary.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <!-- java stream -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <!-- scala stream -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-yarn_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>log4j</groupId>
      <artifactId>log4j</artifactId>
      <version>1.2.17</version>
    </dependency>
  </dependencies>

代码

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class WordCountBatchByJava {

  public static void main(String[] args) throws Exception {
    // 创建执行环境
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    // 加载或创建源数据
    DataSet<String> text = env.fromElements("this a book", "i love china", "i am chinese");
    // 转化处理数据
    DataSet<Tuple2<String, Integer>> ds = text.flatMap(new LineSplitter()).groupBy(0).sum(1);
    // 输出数据到目的端
    ds.print();

    // 执行任务操作
    // 由于是Batch操作,当DataSet调用print方法时,源码内部已经调用Excute方法,所以此处不再调用,如果调用会出现错误
    //env.execute("Flink Batch Word Count By Java");
  }

  static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {

    @Override
    public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) {
      for (String word : line.split(" ")) {
        collector.collect(new Tuple2<>(word, 1));
      }
    }
  }
}

编译

rm -r -f *.class
javac -cp .:lib/* *.java
echo -e "Manifest-Version: 1.0\nMain-Class: WordCountBatchByJava" > MANIFEST.MF
jar -cvfm example.jar MANIFEST.MF *.class 

运行

  • 注意不能设置 -d 或者 --detached
  • run -t 支持 remote, local, kubernetes-session, yarn-per-job, yarn-session
  • run-application -t 支持 kubernetes-application, yarn-application
模式实现类
yarn-per-joborg.apache.flink.yarn.executors.YarnJobClusterExecutorFactory
yarn-sessionorg.apache.flink.yarn.executors.YarnSessionClusterExecutor
localorg.apache.flink.client.deployment.executors.LocalExecutorFactory
/home/dubbo/flink-streaming-platform-agent/flink/bin/flink \
run -t yarn-per-job \
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=1024mb \
-Dtaskmanager.numberOfTaskSlots=1 \
-Dparallelism.default=1 \
-Dyarn.application.name=youhl_test_01 \
-c WordCountBatchByJava \
example.jar

运行结果:
image.png

如果用detached模式提交,作业不能以 collect, print, printToErr, countoperator做结尾,当作业以blocking模式提交的时候就没有这些限制。

流处理任务

代码

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

public class WordCountStreamingByJava {

  public static void main(String[] args) throws Exception {

    // 创建执行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    String host = args[0];
    int port = Integer.parseInt(args[1]);
    System.out.println("Connect to " + host + ":" + port);
    DataStreamSource<String> source = env.socketTextStream(host, port, "\n");

    // 转化处理数据
    SingleOutputStreamOperator<WordWithCount> s1 = source.flatMap(
        (FlatMapFunction<String, WordWithCount>) (line, collector) -> {
          for (String word : line.split("\\s+")) {
            collector.collect(new WordWithCount(word, 1));
          }
        }).returns(TypeInformation.of(WordWithCount.class));

    KeyedStream<WordWithCount, Tuple> s2 = s1.keyBy("word");
    // 设置窗口函数
    WindowedStream<WordWithCount, Tuple, TimeWindow> s3 = s2.window(
        TumblingProcessingTimeWindows.of(Time.seconds(2)));
    // 计算时间窗口内的词语个数
    DataStream<WordWithCount> s4 = s3.sum("count");
    // 输出数据到目的端
    s4.print();
    // 执行任务操作
    env.execute("Flink Streaming Word Count By Java");

  }

  public static class WordWithCount {

    public String word;
    public int count;

    public WordWithCount() {
    }

    public WordWithCount(String word, int count) {
      this.word = word;
      this.count = count;
    }

    @Override
    public String toString() {
      return "WordWithCount{" +
          "word='" + word + '\'' +
          ", count=" + count +
          '}';
    }
  }
}

运行

# 在targetIp上启动端口,模拟数据流入
nc -l 9999
# 启动流处理任务
/home/dubbo/flink-streaming-platform-agent/flink/bin/flink \
run -t yarn-per-job \
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=1024mb \
-Dtaskmanager.numberOfTaskSlots=1 \
-Dparallelism.default=1 \
-Dyarn.application.name=youhl_test_02 \
-c WordCountStreamingByJava \
example.jar ${targetIp} 9999

输入数据流:

山东 天津 北京 河北 河南 山东 上海 北京
山东 海南 青海 西藏 四川 海南

流式计算结果:
image.png

官方代码

SocketWindowWordCountByJava

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class SocketWindowWordCountByJava {

  public static void main(String[] args) throws Exception {
    // 获取命令传递参数 --host xx.xx.xx.xx --port 9999
    ParameterTool params = ParameterTool.fromArgs(args);
    String host = params.get("host");
    int port = params.getInt("port");
    System.out.println("Connect to " + host + ":" + port);
    // 构建流环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // 接入socket数据
    DataStream<String> text = env.socketTextStream(host, port, "\n");
    DataStream<WordWithCount> windowCounts =
        text.flatMap(new InnerMapper())
            .keyBy(value -> value.word)
            .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
            .reduce(new InnerReducer());
    windowCounts.print();
    env.execute("Socket Window WordCount");
  }

  private static class InnerMapper implements FlatMapFunction<String, WordWithCount> {

    @Override
    public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
      for (String word : value.split("\\s")) {
        out.collect(new WordWithCount(word, 1L));
      }
    }
  }

  private static class InnerReducer implements ReduceFunction<WordWithCount> {

    @Override
    public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception {
      return new WordWithCount(a.word, a.count + b.count);
    }
  }

  private static class WordWithCount {

    public String word;
    public long count;

    public WordWithCount() {
    }

    public WordWithCount(String word, long count) {
      this.word = word;
      this.count = count;
    }

    @Override
    public String toString() {
      return word + " : " + count;
    }
  }
}

SocketWindowWordCountByScala

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

object SocketWindowWordCountByScala {
  def main(args: Array[String]): Unit = {
    val pt = ParameterTool.fromArgs(args)
    val host = pt.get("host")
    val port = pt.getInt("port")
    println("Connect to " + host + ":" + port);
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val text: DataStream[String] = env.socketTextStream(host, port, '\n')
    val windowCounts = text
      .flatMap { w => w.split("\\s") }
      .map { w => (w, 1) }
      .keyBy(_._1)
      .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
      .reduce((a, b) => (a._1, a._2 + b._2))
    windowCounts.print().setParallelism(1)
    env.execute("Socket Window WordCount")
  }
}