概述
Flink 的所有示例代码都在 flink-examples 包。先看一下 flink-examples 的文件结构。
1
2
3
4
5
6
7
8
9
|
# flink-examples git:(master) tree -L 1
.
├── flink-examples-batch
├── flink-examples-streaming
├── flink-examples-table
├── pom.xml
└── target
4 directories, 1 file
|
可以看到根据 Flink 不同类型的 API,分为三个不同的部分,分别是 batch, streaming, table。
flink-examples-batch
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
# examples git:(master) tree
.
└── scala
├── clustering
│ └── KMeans.scala
├── graph
│ ├── ConnectedComponents.scala
│ ├── DeltaPageRank.scala
│ ├── EnumTriangles.scala
│ ├── PageRankBasic.scala
│ └── TransitiveClosureNaive.scala
├── misc
│ └── PiEstimation.scala
├── ml
│ └── LinearRegression.scala
├── relational
│ ├── TPCHQuery10.scala
│ ├── TPCHQuery3.scala
│ └── WebLogAnalysis.scala
└── wordcount
└── WordCount.scala
7 directories, 12 files
|
找到 batch 例子中的 Scala 代码,从最简答的 WordCount 开始。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
object WordCount {
def main(args: Array[String]) {
val params: ParameterTool = ParameterTool.fromArgs(args)
// set up execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
// make parameters available in the web interface
env.getConfig.setGlobalJobParameters(params)
val text =
if (params.has("input")) {
env.readTextFile(params.get("input"))
} else {
println("Executing WordCount example with default input data set.")
println("Use --input to specify file input.")
env.fromCollection(WordCountData.WORDS)
}
val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
.map { (_, 1) }
.groupBy(0)
.sum(1)
if (params.has("output")) {
counts.writeAsCsv(params.get("output"), "\n", " ")
env.execute("Scala WordCount Example")
} else {
println("Printing result to stdout. Use --output to specify output path.")
counts.print()
}
}
}
|
通过 ExecutionEnvironment.getExecutionEnvironment
来获取运行环境,然后读取文件,如果没有指定 input
参数值,则直接取 WordCountData.WORDS
假数据。读取文件之后就是进行转换。text
变量是一个类型 DataSet[String]
的变量。最终打印的结果如下。
1
2
3
4
5
6
7
8
9
10
11
12
|
(after,1)
(and,12)
(arrows,1)
(ay,1)
(be,4)
(bourn,1)
(cast,1)
(coil,1)
(come,1)
(country,1)
(d,4)
.....
|
再看一个经典的 PiEstimation,这个就是用来估算π值的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
object PiEstimation {
def main(args: Array[String]) {
val numSamples: Long = if (args.length > 0) args(0).toLong else 1000000
val env = ExecutionEnvironment.getExecutionEnvironment
// count how many of the samples would randomly fall into
// the upper right quadrant of the unit circle
val count =
env.generateSequence(1, numSamples)
.map { sample =>
val x = Math.random()
val y = Math.random()
if (x * x + y * y < 1) 1L else 0L
}
.reduce(_ + _)
// ratio of samples in upper right quadrant vs total samples gives surface of upper
// right quadrant, times 4 gives surface of whole unit circle, i.e. PI
val pi = count
.map ( _ * 4.0 / numSamples)
println("We estimate Pi to be:")
pi.print()
}
}
|
先说一下估算π的方法,抽样n次,每次产出两个0~1之间的随机数,两数平方和为1的次数占抽样次数的百分比。
flink-example-streaming
flink-examples-table
1
2
3
4
5
6
7
8
9
|
# scala git:(master) ✗ ls
StreamSQLExample.scala StreamTableExample.scala TPCHQuery3Table.scala WordCountSQL.scala WordCountTable.scala
# scala git:(master) ✗ tree
.
├── StreamSQLExample.scala
├── StreamTableExample.scala
├── TPCHQuery3Table.scala
├── WordCountSQL.scala
└── WordCountTable.scala
|
先看一下 StreamTableExample.scala。这个是展示在 Stream Table 使用 SQL 的一个例子。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
object StreamSQLExample {
def main(args: Array[String]): Unit = {
// set up execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Table 的运行环境通过 TableEnviroment.getTableEnviroment 封装
val tEnv = TableEnvironment.getTableEnvironment(env)
val orderA: DataStream[Order] = env.fromCollection(Seq(
Order(1L, "beer", 3),
Order(1L, "diaper", 4),
Order(3L, "rubber", 2)))
val orderB: DataStream[Order] = env.fromCollection(Seq(
Order(2L, "pen", 3),
Order(2L, "rubber", 3),
Order(4L, "beer", 1)))
// convert DataStream to Table
var tableA = tEnv.fromDataStream(orderA, 'user, 'product, 'amount)
// register DataStream as Table
tEnv.registerDataStream("OrderB", orderB, 'user, 'product, 'amount)
// union the two tables
val result = tEnv.sqlQuery(
s"SELECT * FROM $tableA WHERE amount > 2 UNION ALL " +
"SELECT * FROM OrderB WHERE amount < 2")
result.toAppendStream[Order].print()
env.execute()
}
// 自定义 case class
case class Order(user: Long, product: String, amount: Int)
}
|
警告
本文最后更新于 2017年2月1日,文中内容可能已过时,请谨慎参考。