Linkis 新引擎实现分享

Linkis 新引擎实现分享

在社区大佬的帮助下,我们完成了 0.11 版本的开发,实现了 ElasticSearch 和 Presto 引擎。具体的开发文档可以参考: Linkis引擎开发文档

执行引擎架构的选择

目前 Linkis 的架构可以分为两种,一种是 Entrance-EngineManger-Engine 的模式,一种是 Entrance 模式。统一执行服务的架构可以参考官方文档: Linkis-UJES设计文档

Entrance 服务作为执行的入口,主要负责任务的持久化工作,日志的输出,进行脚本的校验和变量替换,并与 Engine、EngineManager 服务交互,向可用的 Engine 发送执行任务的请求,或者向 EngineManager 发送启动 Engine 的请求。

EngineManager 服务主要负责 Engine 的启动,进行 Engine 请求资源的请求与释放,并持续监控 Engine 的状态。

Engine 服务负责任务的具体执行,包括了任务执行的一些初始化操作、任务脚本的切分、任务的执行、任务的进度监控和结果集的保存等工作。

Spark、Hive 引擎是 Entrance-EngineManger-Engine 模式实现,在这个模式中 Engine 作为 Spark 、Hive 任务的 Driver 端,向外暴露接口可持续的接受 Entrance 发来的请求,完成任务的执行。这个模式中不仅实现了多租户的任务隔离,还提供了单用户的引擎复用,尽量减少 Engine 的启动,大大提高了执行的效率。

上面的各个服务可以看到每个服务的职责非常的明确,不过多个服务也让整个的架构变的比较重,有一些轻量的执行没有必要通过 Entrance-EngineManger-Engine 模式进行实现。例如 Linkis JDBC 引擎的实现就是通过 Entrance 的模式。JDBC 引擎的职责就是作为 JDBC 连接的客户端向服务端发送请求,并进行连接的维护。JDBC 连接的维护是比较轻量级的,而且 JDBC 连接的复用也不是根据平台用户进行区分的,所以单独为每个用户启动一个引擎是没有必要的。

ElasticSearch 和 Presto 的客户端实际上就是 Http Client,所以 ElasticSearch 和 Presto 引擎的实现也应该是比较轻量的,最终我们实现的 ElasticSearch 和 Presto 引擎也是通过 Entrance 的模式实现的。

引擎资源控制

Linkis 的资源管理服务,用来管理用户、系统的资源和并发的控制,实现新的引擎需要考虑到引擎资源相关接口的实现。具体架构可参考:Linkis RM设计文档

Entrance-EngineManger-Engine 模式资源控制

Entrance-EngineManger-Engine 的模式,资源相关主要需要下面两个实现:

  1. EngineManger 注册资源
    Linkis RM设计文档中可以看到,EngineManger 作为 Engine 资源的管理者,需要先向 ResourceManger 进行管理资源的注册。
    Linkis 已经将 EngineManger 注册资源的逻辑进行了抽象,实现的时候只需要在 SpringConfiguration 中进行配置创建 resources 的 spring bean 对象,可以参考 SparkEngineManagerSpringConfiguration 的实现。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// com.webank.wedatasphere.linkis.enginemanager.configuration.SparkEngineManagerSpringConfiguration

@Configuration
class SparkEngineManagerSpringConfiguration {

@Bean(Array("resources"))
def createResource(): ModuleInfo = {
val totalResource = new DriverAndYarnResource(
new LoadInstanceResource(ENGINE_MANAGER_MAX_MEMORY_AVAILABLE.getValue.toLong,
ENGINE_MANAGER_MAX_CORES_AVAILABLE.getValue, ENGINE_MANAGER_MAX_CREATE_INSTANCES.getValue),
null
)

val protectedResource = new DriverAndYarnResource(
new LoadInstanceResource(ENGINE_MANAGER_PROTECTED_MEMORY.getValue.toLong, ENGINE_MANAGER_PROTECTED_CORES.getValue,
ENGINE_MANAGER_PROTECTED_INSTANCES.getValue),
null
)

ModuleInfo(Sender.getThisServiceInstance, totalResource, protectedResource, ResourceRequestPolicy.DriverAndYarn)
}
// ...
}
  1. EngineResourceFactory 实现
    EngineManager 创建 Engine 的时候需要先向 ResourceManger 去请求资源,所以新引擎需要提供 EngineResourceFactory 的实现,用来初始化创新 Engine 所需要的资源,再向 ResourceManger 进行请求。
    Linkis 中提供了 AbstractEngineResourceFactory 的抽象,实现的时候只需要从 AbstractEngineResourceFactory 继承。具体可参考 SparkEngineResourceFactory 的实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// com.webank.wedatasphere.linkis.enginemanager.configuration.SparkEngineResourceFactory

@Component("engineResourceFactory")
class SparkEngineResourceFactory extends AbstractEngineResourceFactory {

override protected def getRequestResource(properties: java.util.Map[String, String]): DriverAndYarnResource = {
val executorNum = DWC_SPARK_EXECUTOR_INSTANCES.getValue(properties)
new DriverAndYarnResource(
new LoadInstanceResource(ByteTimeUtils.byteStringAsBytes(DWC_SPARK_DRIVER_MEMORY.getValue(properties) + "G"),
DWC_SPARK_DRIVER_CORES,
1),
new YarnResource(ByteTimeUtils.byteStringAsBytes(DWC_SPARK_EXECUTOR_MEMORY.getValue(properties) * executorNum + "G"),
DWC_SPARK_EXECUTOR_CORES.getValue(properties) * executorNum,
0,
DWC_QUEUE_NAME.getValue(properties))
)
}
}

Entrance 模式并发控制

Lnkis 中将 Engine 的实例数作为资源的一种,目前用户请求的并发是通过 Engine 的实例数进行控制的,那么在 Entrance 的模式下,就没有很好的对用户的并发进行控制。

在 ElasticSearch 和 Presto 的实现中,我们参考了 EngineManager 的资源控制,将并发数作为资源的一种,在 Entrance 启动时进行模块资源注册。将每个执行作为一个实例,执行发生时先进行资源的请求和锁定,执行完成后进行资源的释放,从而达到用户并发的控制。

主要包括了以下步骤:

  1. Entrance 注册并发资源
    Entrance 注册并发资源,需要创建资源实例,将并发作为资源的一部分,然后配合 @EnableResourceManager 和 @RegisterResource 注解进行资源注册。
1
2
3
4
5
6
7
8
9
10
11
12
13
// 定义资源
@Bean(Array("resources"))
def createResource(): ModuleInfo = {
// 创建并发资源实例,分为总资源和受保护的资源
val totalResource = new InstanceResource(EsEntranceConfiguration.ENTRANCE_MAX_JOB_INSTANCE.getValue)
val protectResource = new InstanceResource(EsEntranceConfiguration.ENTRANCE_PROTECTED_JOB_INSTANCE.getValue)
info(s"create resource for es engine totalResource is $totalResource, protectResource is $protectResource")
ModuleInfo(Sender.getThisServiceInstance, totalResource, protectResource, ResourceRequestPolicy.Instance)
}

// 注册资源
@RegisterResource
def registerResources(): ModuleInfo = resources
  1. 执行前请求锁定资源
    执行实例初始化前,先通过 ResourceManagerClient#requestResource 方法请求锁定并发实例资源。
1
2
3
4
5
6
7
8
9
10
11
rmClient.requestResource(requestEngine.user, requestEngine.creator, new InstanceResource(1)) match {
case NotEnoughResource(reason) =>
// 没有请求到资源,抛出异常
throw EsEngineException(LogUtils.generateWarn(reason))
case AvailableResource(ticketId) => {
// 请求到资源,创建执行实例,并保存 ticketId 用于释放资源
// ...
// 当资源被实例化后,返回实际占用的资源总量
rmClient.resourceInited(UserResultResource(ticketId, requestEngine.user), new InstanceResource(1))
}
}
  1. 执行完成释放资源
    执行完成后销毁执行实例,并通过 ResourceManagerClient#resourceReleased 方法释放锁定的资源。
1
2
// 使用 ticketId 释放对应的资源
rmClient.resourceReleased(UserResultResource(ticketId, requestEngine.user))

ElasticSearch 引擎的实现

下面是微众王和平大佬帮忙画的 ElasticSearch 引擎整体的架构图:

Linkis 新引擎的实现还是比较容易的,ElasticSearch 引擎的代码结构如下,整体的代码量也是比较少。主要包括了资源的配置、执行器的实例化和ElasticSearch请求与结果解析的相关代码。

  1. 资源注册
    ElasticSearch 引擎需要考虑到用户请求的并发和 Entrance 整体并发的控制。
    Entrance 启动时,需要对 Entrance 可用资源进行注册,主要包括了最大实例数和保护的阈值。在 EsSpringConfiguration 中生成资源的 bean 对象,并传入 EsEngineManager 进行注册,配置 @EnableResourceManager 和 @RegisterResource 就会自动进行注册。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// com.webank.wedatasphere.linkis.entrance.conf.EsSpringConfiguration
class EsSpringConfiguration extends Logging{

@Bean(Array("resources"))
def createResource(@Autowired rmClient: ResourceManagerClient): ModuleInfo = {
// Clean up resources before creating resources to prevent dirty data when exiting abnormally (创造资源之前进行资源清理,防止异常退出时产生了脏数据)
Utils.tryQuietly(rmClient.unregister())
Utils.addShutdownHook({
info("rmClient shutdown, unregister resource...")
rmClient.unregister
})
val totalResource = new InstanceResource(EsEntranceConfiguration.ENTRANCE_MAX_JOB_INSTANCE.getValue)
val protectResource = new InstanceResource(EsEntranceConfiguration.ENTRANCE_PROTECTED_JOB_INSTANCE.getValue)
info(s"create resource for es engine totalResource is $totalResource, protectResource is $protectResource")
ModuleInfo(Sender.getThisServiceInstance, totalResource, protectResource, ResourceRequestPolicy.Instance)
}

}

// com.webank.wedatasphere.linkis.entrance.execute.EsEngineManager
@EnableResourceManager
class EsEngineManager(resources: ModuleInfo) extends EngineManager with Logging {

@RegisterResource
def registerResources(): ModuleInfo = resources

}
  1. 请求执行器
    EsEngineRequester 启动一个执行器,用于任务的执行,通过 request 方法对传入的 job 生成一个执行的 EsEntranceEngine,请求时先向 ResourceManager 请求并锁定一个实例的资源,在 EsEntranceEngine 执行结束后会进行释放。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// com.webank.wedatasphere.linkis.entrance.execute.EsEngineRequester
class EsEngineRequester(groupFactory: GroupFactory, rmClient: ResourceManagerClient) extends EngineRequester {
override def request(job: Job): Option[EntranceEngine] = job match {
case entranceJob: EntranceJob => {
val requestEngine = createRequestEngine(job);
// request resource manager
rmClient.requestResource(requestEngine.user, requestEngine.creator, new InstanceResource(1)) match {
case NotEnoughResource(reason) =>
throw EsEngineException(LogUtils.generateWarn(reason))
case AvailableResource(ticketId) => {
val engine = new EsEntranceEngine(idGenerator.incrementAndGet(), new util.HashMap[String, String](requestEngine.properties)
, () => {rmClient.resourceReleased(UserResultResource(ticketId, requestEngine.user))})
engine.setGroup(groupFactory.getOrCreateGroup(getGroupName(requestEngine.creator, requestEngine.user)))
engine.setUser(requestEngine.user)
engine.setCreator(requestEngine.creator)
// engine.updateState(ExecutorState.Starting, ExecutorState.Idle, null, null)
engine.setJob(entranceJob)
engine.init()
executorListener.foreach(_.onExecutorCreated(engine))
rmClient.resourceInited(UserResultResource(ticketId, requestEngine.user), new InstanceResource(1))
Option(engine)
}
}
}
case _ => None
}
}
// com.webank.wedatasphere.linkis.entrance.execute.EsEntranceEngine
class EsEntranceEngine(id: Long, properties: JMap[String, String], resourceRelease: () => Unit) extends EntranceEngine(id) with SingleTaskOperateSupport with SingleTaskInfoSupport {
override def close(): Unit = {
try {
this.job.setResultSize(0)
this.engineExecutor.close
// 释放资源
resourceRelease()
// ......
}
  1. 任务执行
    EsEntranceEngine 是 com.webank.wedatasphere.linkis.entrance.execute.EntranceEngine 的实现,进行脚本的执行。在这里抽出一层 EsEngineExecutor 作为 Es 任务的具体执行。EsEntranceEngine 则负责 EsEngineExecutor 的初始化、脚本解析切分等实现。
1
2
3
4
5
6
7
class EsEntranceEngine(id: Long, properties: JMap[String, String], resourceRelease: () => Unit) extends EntranceEngine(id) with SingleTaskOperateSupport with SingleTaskInfoSupport {
private var engineExecutor: EsEngineExecutor = _
// ...
override def execute(executeRequest: ExecuteRequest): ExecuteResponse // ...
protected def executeLine(code: String): ExecuteResponse = this.engineExecutor.executeLine(code, storePath, s"_$codeLine")

}
  1. ElasticSearch 脚本执行
    entrance.executor 包中就是 ElasticSearch 客户端的封装、请求的封装和结果的解析等相关代码。
    ElasticSearch 客户端封装在 EsClient 中,通过 EsClientFactory 进行实例化,并将 datasourceName 作为唯一 Key 进行缓存。
    EsEngineExecutorImpl 是 EsEngineExecutor 的实现,用于任务的执行。
    ResponseHandlerImpl 用于结果的处理,会根据 ElasticSearch 的返回类型进行反序列化,并保存为 Linkis 的 ResultSet。

DataSource 路由

在与微众大佬的讨论交流中得知后面 Linkis 的架构将会引入 DataSource 的概念,DataSource 模块维护引擎的连接信息和集群等信息,可以减少一些数据源运行配置,方便数据源配置和权限管理,为数据平台提供元数据信息,并可根据 DataSource 进行路由实现多集群的路由。

在 Linkis-0.11.0版本中添加了 linkis-gateway-ujes-datasource-ruler 模块,作为一个 Gateway 插件的形式简单实现了,请求和 Entrance 的路由。

linkis-gateway-ujes-datasource-ruler 模块的实现

抽象出 EntranceGatewayRouterRuler 接口用于执行路由规则,在 Gateway 模块的 EntranceGatewayRouter 中注入 EntranceGatewayRouterRuler 实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Component
class EntranceGatewayRouter extends AbstractGatewayRouter {

@Autowired(required = false)
private var rules: Array[EntranceGatewayRouterRuler] = _

override def route(gatewayContext: GatewayContext): ServiceInstance = {
gatewayContext.getGatewayRoute.getRequestURI match {
case EntranceGatewayRouter.ENTRANCE_REGEX(_) =>
// ...
serviceId.map(applicationName => {
rules match {
case array: Array[EntranceGatewayRouterRuler] => array.foreach(_.rule(applicationName, gatewayContext))
case _ =>
}
ServiceInstance(applicationName, gatewayContext.getGatewayRoute.getServiceInstance.getInstance)
}).orNull
case _ => null
}
}

}

linkis-gateway-ujes-datasource-ruler 模块,主要是做了一个 DataSource 和 Entrance Instance 的映射,并保存在 Mysql 中。DatasourceGatewayRouterRuler 实现了具体的路由策略,DatasourceMapService 接口维护 DataSource 映射。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// 维护 DataSource 映射的接口
public interface DatasourceMapService {

String getInstanceByDatasource(String datasourceName);

long countByInstance(String instance);

String insertDatasourceMap(String datasourceName, String instance, String serviceId);

}

// EntranceGatewayRouterRuler 的实现类,执行具体的路由逻辑
class DatasourceGatewayRouterRuler extends EntranceGatewayRouterRuler with Logging {

// 路由的方法
override def rule(serviceId: String, gatewayContext: GatewayContext): Unit = if(StringUtils.isNotBlank(gatewayContext.getRequest.getRequestBody)) {
// 从请求中获取 datasourceName
val datasourceName = getDatasourceName(gatewayContext.getRequest.getRequestBody)
if (StringUtils.isBlank(datasourceName)) return
debug(s"datasourceName: $datasourceName")
// 通过 datasourceName 获取映射
datasourceMapService.getInstanceByDatasource(datasourceName) match {
case i: String if StringUtils.isNotBlank(i) =>
// 存在映射直接返回 Instance
gatewayContext.getGatewayRoute.getServiceInstance.setInstance(i)
case _ => {
// 不存在映射时,先获取 Instance 列表,并根据已经存在映射的数据按照从小到大排序,获取最少映射的 Instance,插入 DataSource 映射并返回
val newInstance = ServiceInstanceUtils.getRPCServerLoader.getServiceInstances(serviceId)
.map(item => (item, datasourceMapService.countByInstance(item.getInstance)))
.sortBy(_._2).map(_._1.getInstance).headOption match {
case Some(item) => datasourceMapService.insertDatasourceMap(datasourceName, item, serviceId)
case None => null
}
debug(s"newInstance: $newInstance")
if (StringUtils.isNotBlank(newInstance)) {
gatewayContext.getGatewayRoute.getServiceInstance.setInstance(newInstance)
}
}
}
}

}