目录

antlr4+spark-sql对业务SQL进行解析

概述

在大数据平台开发过程中,会遇到血缘分析,对 SQL 解析并进行权限的鉴权,需要提前对 SQL 进行基本语法校验这些场景都需要对 SQL 进行解析。

常用的SQL解析工具

  1. 阿里Druid: 支持的数据库类型不少,但是解析时需要制定数据库类型,并且在使用中,对Hive的语法解析版本比较老,兼容性不太好
  2. Hive原生SQL解析: 由于在大数据平台进行业务开发时,开发人员写的SQL并一定是完全符合Hive规范的,因为在运行时是先通过Spark进行解析的,所以也并不能完全满足需要
  3. General SQL Parser(未测试)这款工具在解析的时候也是需要指定数据库

最后通过调研之后,决定还是采用 Spark 原生的 SQL 解析,下面对采用 Spark SQL 原生解析进行介绍,并且最终可以做到不依赖 Spark 的任何 Jar 包。

antlr4包依赖

由于 Spark SQL 的解析是通过 antlr4 实现的,所以首先需要添加依赖。

1
2
3
4
5
<dependency>
    <groupId>org.antlr</groupId>
    <artifactId>antlr4-runtime</artifactId>
    <version>4.5.3</version>
</dependency>

拷贝Spark SQL对应的SQL解析代码

在 Spark 源码中找到以下代码,并将这部分代码拷贝到自己的工程中,包名请修改成自己项目包名。

/antlr4-spark-sql%E5%AF%B9%E4%B8%9A%E5%8A%A1sql%E8%BF%9B%E8%A1%8C%E8%A7%A3%E6%9E%90/img.png

拷贝完之后,在对应的包下新建两个类 ANTLRNoCaseStringStream

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
import org.antlr.v4.runtime.ANTLRInputStream;
import org.antlr.v4.runtime.IntStream;

public class ANTLRNoCaseStringStream extends ANTLRInputStream {
   public ANTLRNoCaseStringStream(String input){
        super(input);
   }

    @Override
    public int LA(int i){
        int la = super.LA(i);
        if (la == 0 || la == IntStream.EOF){
            return la;
        } else {
            return Character.toUpperCase(la);
        }
    }
}

MySqlBaseBaseListener 在解析过程中获取需要的数据,此类实现的是获得解析过程中 select 的表和 insert 表,如果需要获取列名等其他的,请参考 SqlBaseBaseListener 类中的方法,来获得自己需要的数据。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import org.antlr.v4.runtime.tree.ParseTreeWalker;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

public class MySqlBaseBaseListener extends SqlBaseBaseListener {
    private Map<String, Set<String>> dataBaseTablenameAndOper = new HashMap<>();//用来保存表与操作的对应关系

    public Map<String, Set<String>> getDataBaseTablenameAndOper() {
        return dataBaseTablenameAndOper;
    }

    public void enterQuerySpecification(SqlBaseParser.QuerySpecificationContext ctx) {
        final SqlBaseParser.QuerySpecificationContext baseCtx = ctx;
        ParseTreeWalker queryWalker = new ParseTreeWalker();
        queryWalker.walk(new SqlBaseBaseListener() {
            public void enterTableIdentifier(SqlBaseParser.TableIdentifierContext ctx) {
                if(ctx.table!=null) {
                    String table = ctx.getText().toLowerCase();
                    Set<String> oper;
                    if (dataBaseTablenameAndOper.containsKey(table)) {
                        oper = dataBaseTablenameAndOper.get(table);
                    } else {
                        oper = new HashSet<>();
                    }
                    oper.add("SELECT");
                    dataBaseTablenameAndOper.put(table, oper);
                }
            }
        }, ctx);
    }

    public void enterInsertInto(SqlBaseParser.InsertIntoContext ctx){
        final SqlBaseParser.InsertIntoContext baseCtx = ctx;
        ParseTreeWalker queryWalker = new ParseTreeWalker();
        final Set<String> simpleTables = new HashSet<String>();
        queryWalker.walk(new SqlBaseBaseListener() {
            public void enterTableIdentifier(SqlBaseParser.TableIdentifierContext ctx) {
                if(ctx.table!=null) {
                    String table = ctx.getText().toLowerCase();
                    Set<String> oper;
                    if (dataBaseTablenameAndOper.containsKey(table)) {
                        oper = dataBaseTablenameAndOper.get(table);
                    } else {
                        oper = new HashSet<>();
                    }
                    oper.add("INSERT");
                    dataBaseTablenameAndOper.put(table, oper);
                }
            }
        }, ctx);
    }
}

编写 SQL 解析工具类 SparkSqlUtil

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
import org.antlr.v4.runtime.CommonTokenStream;
import org.antlr.v4.runtime.tree.ParseTreeWalker;
import com.luckincoffee.datas.spark.sql.catalyst.parser.*;

import java.util.Map;
import java.util.Set;

public class SparkSqlUtil {
    public static Map<String, Set<String>> getDataBaseTablenameAndOper(String sql){
        SqlBaseLexer lexer = new SqlBaseLexer(new ANTLRNoCaseStringStream(sql));

        CommonTokenStream tokenStream = new CommonTokenStream(lexer);
        SqlBaseParser parser = new SqlBaseParser(tokenStream);
        ParseTreeWalker walker = new ParseTreeWalker();
        MySqlBaseBaseListener mySqlBaseBaseListener = new MySqlBaseBaseListener();

        walker.walk(mySqlBaseBaseListener, parser.statement());

        return mySqlBaseBaseListener.getDataBaseTablenameAndOper();
    }
}
警告
本文最后更新于 2017年2月1日,文中内容可能已过时,请谨慎参考。