目录

Flink-Example探索

概述

Flink 的所有示例代码都在 flink-examples 包。先看一下 flink-examples 的文件结构。

1
2
3
4
5
6
7
8
9
# flink-examples git:(master) tree -L 1
.
├── flink-examples-batch
├── flink-examples-streaming
├── flink-examples-table
├── pom.xml
└── target

4 directories, 1 file

可以看到根据 Flink 不同类型的 API,分为三个不同的部分,分别是 batch, streaming, table。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# examples git:(master) tree
.
└── scala
    ├── clustering
    │   └── KMeans.scala
    ├── graph
    │   ├── ConnectedComponents.scala
    │   ├── DeltaPageRank.scala
    │   ├── EnumTriangles.scala
    │   ├── PageRankBasic.scala
    │   └── TransitiveClosureNaive.scala
    ├── misc
    │   └── PiEstimation.scala
    ├── ml
    │   └── LinearRegression.scala
    ├── relational
    │   ├── TPCHQuery10.scala
    │   ├── TPCHQuery3.scala
    │   └── WebLogAnalysis.scala
    └── wordcount
        └── WordCount.scala

7 directories, 12 files

找到 batch 例子中的 Scala 代码,从最简答的 WordCount 开始。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
object WordCount {
  def main(args: Array[String]) {
    val params: ParameterTool = ParameterTool.fromArgs(args)
    // set up execution environment
    val env = ExecutionEnvironment.getExecutionEnvironment
    // make parameters available in the web interface
    env.getConfig.setGlobalJobParameters(params)
    val text =
      if (params.has("input")) {
        env.readTextFile(params.get("input"))
      } else {
        println("Executing WordCount example with default input data set.")
        println("Use --input to specify file input.")
        env.fromCollection(WordCountData.WORDS)
      }
    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
      .map { (_, 1) }
      .groupBy(0)
      .sum(1)
    if (params.has("output")) {
      counts.writeAsCsv(params.get("output"), "\n", " ")
      env.execute("Scala WordCount Example")
    } else {
      println("Printing result to stdout. Use --output to specify output path.")
      counts.print()
    }
  }
}

通过 ExecutionEnvironment.getExecutionEnvironment 来获取运行环境,然后读取文件,如果没有指定 input 参数值,则直接取 WordCountData.WORDS 假数据。读取文件之后就是进行转换。text 变量是一个类型 DataSet[String] 的变量。最终打印的结果如下。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
(after,1)
(and,12)
(arrows,1)
(ay,1)
(be,4)
(bourn,1)
(cast,1)
(coil,1)
(come,1)
(country,1)
(d,4)
.....

再看一个经典的 PiEstimation,这个就是用来估算π值的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
object PiEstimation {
  def main(args: Array[String]) {
    val numSamples: Long = if (args.length > 0) args(0).toLong else 1000000
    val env = ExecutionEnvironment.getExecutionEnvironment
    // count how many of the samples would randomly fall into
    // the upper right quadrant of the unit circle
    val count =
      env.generateSequence(1, numSamples)
        .map  { sample =>
          val x = Math.random()
          val y = Math.random()
          if (x * x + y * y < 1) 1L else 0L
        }
        .reduce(_ + _)
    // ratio of samples in upper right quadrant vs total samples gives surface of upper
    // right quadrant, times 4 gives surface of whole unit circle, i.e. PI
    val pi = count
      .map ( _ * 4.0 / numSamples)
    println("We estimate Pi to be:")
    pi.print()
  }
}

先说一下估算π的方法,抽样n次,每次产出两个0~1之间的随机数,两数平方和为1的次数占抽样次数的百分比。

1
2
3
4
5
6
7
8
9
# scala git:(master) ✗ ls
StreamSQLExample.scala   StreamTableExample.scala TPCHQuery3Table.scala    WordCountSQL.scala       WordCountTable.scala
#  scala git:(master) ✗ tree
.
├── StreamSQLExample.scala
├── StreamTableExample.scala
├── TPCHQuery3Table.scala
├── WordCountSQL.scala
└── WordCountTable.scala

先看一下 StreamTableExample.scala。这个是展示在 Stream Table 使用 SQL 的一个例子。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
object StreamSQLExample {
  def main(args: Array[String]): Unit = {
    // set up execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // Table 的运行环境通过 TableEnviroment.getTableEnviroment 封装
    val tEnv = TableEnvironment.getTableEnvironment(env)
    val orderA: DataStream[Order] = env.fromCollection(Seq(
      Order(1L, "beer", 3),
      Order(1L, "diaper", 4),
      Order(3L, "rubber", 2)))
    val orderB: DataStream[Order] = env.fromCollection(Seq(
      Order(2L, "pen", 3),
      Order(2L, "rubber", 3),
      Order(4L, "beer", 1)))
    // convert DataStream to Table
    var tableA = tEnv.fromDataStream(orderA, 'user, 'product, 'amount)
    // register DataStream as Table
    tEnv.registerDataStream("OrderB", orderB, 'user, 'product, 'amount)
    // union the two tables
    val result = tEnv.sqlQuery(
      s"SELECT * FROM $tableA WHERE amount > 2 UNION ALL " +
        "SELECT * FROM OrderB WHERE amount < 2")

    result.toAppendStream[Order].print()
    env.execute()
  }
  // 自定义 case class
  case class Order(user: Long, product: String, amount: Int)
}
警告
本文最后更新于 2017年2月1日,文中内容可能已过时,请谨慎参考。