Ushas 主要在spark-sql-catalyst和spark-sql-hive模块进行了修改,catalyst主要是负责spark在数据处理中的关系依赖管理,其中对于普通的dataset会通过如下代码,将逻辑计划处理嵌入对象内:
Dataset.ofRows(sparkSession, logicalPlan)
而spark2.0版本以上对于spark-sql的支持则是通过Antlr4进行语法解析,生成语法树,然后通过深度遍历的方式将Unresolved的语法信息处理为resolved信息。 每条spark的sql,都会预先通过SparkSqlParser执行parse,parse添加了antlr4需要的词法以及处理器,然后生成逻辑计划: https://github.com/frankyu8/ushas/blob/ee36eac54b758e39689c7e2c51ea6f3aa5c27555/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala#L641-L646
parsePlan的内部会执行sql转化logicalplan的操作 https://github.com/frankyu8/ushas/blob/ee36eac54b758e39689c7e2c51ea6f3aa5c27555/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala#L69-L76
ofRows中会触发Analyzer对于逻辑计划的解析,会调用Analyzer里面的batches进行UnresolveLogicalplan 到ResolveLogicalplan的转化(有则转化,无则跳过) https://github.com/frankyu8/ushas/blob/ee36eac54b758e39689c7e2c51ea6f3aa5c27555/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala#L72-L80
Analyzer的具体识别规则如下: https://github.com/frankyu8/ushas/blob/a7066a67ed9c1ad9db6078d68cfff8d28cce6bd4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala#L152-L214
Analyzer中的一个batch具体的解析方式为采取后序遍历或者前序遍历,对每个嵌套逻辑计划进行解析
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp
我们为了让logicplan能够有列级血缘的识别能力,首先对逻辑计划的抽象类进行了修改(这里为了不影响logicplan的其他功能,所以进行了额外trait的添加),这样在Analyzer中新添列级血缘解析规则,所有影响因素都控制在额外的trait内,不会影响spark本身的正常逻辑计划解析 https://github.com/frankyu8/ushas/blob/a7066a67ed9c1ad9db6078d68cfff8d28cce6bd4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala#L30-L36
这里[trait]LineageHelper中携带的属性包括_lineageResolved(是否被Rule解析),childrenLineageResolved用于递归判断所有的子逻辑计划是否已经被Rule解析过,markLineageResolved用来标注当前逻辑计划解析成功 https://github.com/frankyu8/ushas/blob/a46317df9396161a257a2388938289926ff7a46a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/lineageCollect/LineageHelper.scala#L24-L45
所有的列级对象都会被lineageChildren 容器进行接手。
逻辑计划,其实是针对算子进行操作的,并没有对算子里面的涉及成员进行操作,比如select a,b,c from table 那么这里只会生成一个RelationLogicplan 和 ProjectLogicplan, 其中 ProjectLogicplan里面对应的projectList(即所有的字段expression)才是我们关心的列级血缘对象,为了不在projectList里面直接对expression进行操作,所以我们预先定义了列级对象(和expression一一对应),简单的记录了每个expression里面我们所关心的属性[extend treeNode],然后放入lineageChildren中。只要确保Analyzer工作时,每次深度遍历,在不参与计算的节点将lineageChildren进行复制,携带到上层节点,在对应需要操作的节点进行关系的判断,即可保证列级字段的正确解析。
列级对象的子类包括ExpressionColumn,RelationColumn以及UnionColumn,ExpressionColumn主要记录的是Project逻辑计划里面的expression,RelationColumn主要记录的是LeafNode里面的从属关系,UnionColumn主要是记录了特殊的列字段关系,因为需要识别到相应的right lineageChildren信息。 https://github.com/frankyu8/ushas/tree/main/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/lineage
因为logicplan是封装在所有的df对象内的,所以每次的df对象进行方法的操作,都会force 逻辑计划进行Analyzer的解析。我们在Analyzer的规则里面加上了自己的列级血缘判断规则 https://github.com/frankyu8/ushas/blob/a46317df9396161a257a2388938289926ff7a46a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala#L211-L212
我们新增了两个解析规则,一个是针对Relation的解析规则(即字段血缘的叶子节点判断),一个是针对Expression的解析规则(即字段血缘的所有中间关系判断)
这里的Relation为简单的从属关系判断,只是记录了每个字段的attribute,并没有去逻辑计划中寻找catalog
这里的Expression的解析用的是最为基础的map寻址,即上层字段的exprid和下层字段的exprid进行匹配,若匹配中则进行绑定(借用exprid唯一的特性)
针对于hive,目前的开发是拿取了所有的hive信息。sparksession若是启动了enablehive,默认为重写Analyzer,所以我们为了添加对于hive数据源的支持,在Analyzer基类里面新添了tailResolutionRules,并且在继承的analyzer里面对其进行重写
基类的Analyzer为
在 hive 的 sessionStatebuilder 里面重写 Analyzer
idea maven 对spark-catalyst module 进行Gnerated Source Code 生成sqlbase.g4的语法树文件
在跑样例文件时,先设置参数 -DLocal ,再设置 Include with provided scope,用默认本地形式和本地包运行spark
样例文件位置 examples/src/main/scala/org/apache/spark/examples/lineage/SparkLineageExample.scala
打包时,如果需要添加hive的插件支持,需要在spark profile中勾选hive
因为本身就是spark的项目中进行的分离,所以只需要将 spark-hive_2.12-3.1.2.jar,spark-catalyst_2.12-3.1.2.jar 进行替换,即可完成列级血缘的快速部署
准备样例sql : select * from (select substr(a+1,0,1) as c,a+3 as d from (select 1 as a,2 as b))
样例输出:
c#2
+- c#2
+- Alias ( substring(cast((a#0 + 1) as string), 0, 1) AS c#2 )
+- a#0
+- Alias ( 1 AS a#0 )
在spark-shell中如何查看列级血缘(API方法)
df.queryExecution.analyzed.lineageChildren(0).treeString
在pyspark中如何查看列级血缘(API方法)
df._jdf.queryExecution().analyzed().lineageChildren().apply(0).treeString()
优化现有的代码结构,目前的项目代码结构和spark2.4的结构一致,可以对模块进行针对性的修改,将主要代码集中在一起
添加新的模块,目前只提供了spark列级血缘的解析方式,但是对血缘的自动存储和展示这块完全是留给commiter的一片空白区域
现有的rule寻找逻辑优化,目前的寻找对应列关系主要是依托了exprid的全局唯一性,这里因为是遍历寻找,所以会增加耗时,看是否可以进行优化
spark目前是靠UnresolveLogicplan 替换成 resolveLogicplan进行resolve标记,而我们是通过简单的在逻辑计划的lineageresolve中标记为True,并没有给一个规范的样例类,这块若要优化工程量非常大
example中有 通过spark 插件化进行注入的样例,本是希望能够尽少的动源码,在外部进行Rule规则的植入,但是结果不尽如人意,可以看看如何进行插件化的配置
目前是对hive的数据源进行了寻址,也就是说目前的列级血缘可以对hive 的数据源进行准确的Catalog识别,但是别的外接数据源没有做任何的定义,这块可以由commiter进行丰富
Fork 本仓库
新建 Feat_xxx 分支
提交代码
新建 Pull Request
帮助
如何进行issue 和 pr 的关联
https://docs.github.com/cn/issues/tracking-your-work-with-issues/linking-a-pull-request-to-an-issue
如何配置 checkstyle 代码检查
https://blog.csdn.net/qq_31424825/article/details/100050445
计算机编程术语 (中英文对照)
为什么选择将项目进行裁剪,而不是将整个spark代码进行迁移?
目前的需求是支持列级血缘,Ushas也旨在先将列级血缘进行处理和完善,之后不排除会做更多的优化和迭代,逐步的扩大项目架构。 为了精简化打包,快速纳入使用,一次打包只需要5min时间
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。