目录

Spark和Elasticsearch的交互

概述

Spark 和 Elasticsearch 是90%大数据工程师的基础技术栈了,前者不用多说了,就是业界的大数据计算框架,后者则是优秀的搜索框架。

推荐系统

业务上这两个框架经常需要进行交互,常见的场景就是通过 Spark 批处理把应用日志写入 ES 集群来构架,然后通过 Kibana 进行展示。特定场景下 ES 除了可以做搜索引擎以外,还可以作为存储方案。

笔者在两者的交互上使用较多,其中一个有意思的场景是用他们来构建推荐系统的召回层。所谓的召回层,可以理解成根据用户属性,在内容池子中,根据特定策略来挑选符合用户特征的内容(在进行推荐之前,一般还会有一个重排系统,来对召回内容进行打分,来判定推荐的顺序)。

在召回系统中用 ES 的好处是可以利用其搜索引擎的天然属性,来构建复杂的查询语句,假设用户属性很多,召回策略非常复杂的情况下,如果使用 sql 对内容进行查询,可能需要写出很复杂的语句,并且检索效率不会比使用 ES 更好。

在推荐系统中,Spark 和 ES 的交互可以发生在这样的场景。批处理情况下,可以通过 Spark Sql 来计算用户的新增属性,例如通过算法,来为用户画像增加不同维度的特征,例如计算用户对某种内容的偏好程度,并将计算完的特征写入 ES 集群。流处理情况下,Spark Streaming 来对用户实现秒级的画像更新,并更新存储在 ES 中的数据。最后根据召回策略来组合查询的 query 来实现复杂的查询。

下面是我们业务中一个查询示例(特征名和敏感数据进行了处理):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
{
  "query": {
    "bool": {
      "filter": [
        {
          "terms": {
            "pr": [
              "-1",
              "-2",
              "3",
              "6",
              "7"
            ]
          }
        },
        {
          "bool": {
            "should": [
              {
                "term": {
                  "vt": {
                    "value": "2"
                  }
                }
              }
            ]
          }
        }
      ]
    }
  },
  "size": 1000,
  "sort": [
    {
      "ok": {
        "missing": "_last",
        "order": "desc"
      }
    }
  ]
}

通过以上 query 可以把符合用户特征的内容召回出来。

交互操作

为了把内容写入 ES 集群,我们可以在本地简单的跑起来一个 Demo。首先,你需要一个本地的 ES 集群,brew install elasticsearch 安装 ES,然后运行 elasticsearch 命令来运行一个默认配置的集群。

/spark%E5%92%8Celasticsearch%E7%9A%84%E4%BA%A4%E4%BA%92/img.png

然后可以通过官方文档的一个 Demo 来尝试一下把数据通过 Spark 写入到集群当中。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.elasticsearch.spark._

object Write2Es {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("test").setMaster("local")
    conf.set("es.nodes", "localhost")
    conf.set("es.port", "9200")
    conf.set("es.index.auto.create", "true")
    conf.set("es.mapping.id", "one")
    conf.set("es.write.operation", "upsert")

    val sc = new SparkContext(conf)

    val numbers = Map("one" -> 1, "five" -> 8)
    val numbers1 = Map("one" -> 3)

    sc.makeRDD(Seq(numbers, numbers1)).saveToEs("spark/docsid")
  }
}

写入数据之后通过 RESTful 接口查看数据如下:

/spark%E5%92%8Celasticsearch%E7%9A%84%E4%BA%A4%E4%BA%92/img_1.png

参考资料

  1. Apache Spark support
警告
本文最后更新于 2017年2月1日,文中内容可能已过时,请谨慎参考。