Java 命令行参数解析方式探索(四):Spark & Flink
1. Spark
./bin/spark-submit \
--name "Hello Spark" \
--master yarn \
--deploy-mode cluster \
--driver-memory 8g \
--num-executors 4 \
--executor-memory 16g \
--executor-cores 2 \
......
复制代码
我们在提交 Spark 任务时通常会设置任务名称、master 地址、driver 资源和 executor 资源等参数,除此之外还有很多,那么 Spark 是如何一步步解析这些命令行参数呢?让我们一起探索。
2. Spark:解析参数主要流程
- 提交 Spark 任务会执行 spark-submit shell 脚本,脚本中设定了启动类 org.apache.spark.deploy.SparkSubmit,委托 spark-class shell 脚本执行;
- spark-class shell 脚本首先会加载 Spark 环境变量,接着构建运行时依赖的 jar 包,构建完毕执行 Java 命令,调用 org.apache.spark.deploy.SparkSubmit 的 main 方法;
- org.apache.spark.deploy.SparkSubmit 虽然是一个单例对象,执行 main 方法时却又创建了一个新的自身类型实例,调用其 doSubmit 执行操作,doSubmit 方法内委托 SparkSubmitArguments 类来处理命令行参数;
- 在 SparkSubmitArguments 的构造函数中会调用父类 SparkSubmitOptionParser 的 parse 方法来解析实际的命令行参数。
最终经过上面的处理流程完成命令行参数的解析工作,接下来会介绍如果通过 SparkSubmitOptionParser 实现真正的解析工作。
3. Spark:解析实现
从解析的代码实现可以看出,解析逻辑还是十分严谨的:
- 限定参数名的解析范围,定义参数的类型:有值和无值;
- 有一定的灵活性:参数名和参数值之间可以使用 = 号连接;
- 有一定的扩展性:对于未识别的参数名,交由子类处理。
4. Spark:开源代码如何借鉴的更有价值
Spark 解析参数的实现比咱们最开始 Java 命令行参数解析方式探索(一):原始实现 文章中的实现严谨多了,对于好的代码咱们如何学习借鉴呢,最好的途径就是应用在实际的项目里,下面是一个思路,参考注释的步骤,借鉴代码之前添加下 apache 的 license2。
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.ice.impl;
import com.ice.Parameter;
import com.ice.Starter;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class SparkStarter extends Starter {
private static final String HELP = "help";
private static final String THREAD = "thread";
private static final String COUNT = "count";
private static final String SECOND = "second";
private static final String PROPERTY = "property";
private static final String OUTPUT = "output";
private static final Map<String, String> SHORT_TO_LONG;
private static final Set<String> OPTS;
static {
// 1. 缩写 -> 全写,方便后续统一处理
SHORT_TO_LONG = new HashMap<>();
SHORT_TO_LONG.put("t", THREAD);
SHORT_TO_LONG.put("c", COUNT);
SHORT_TO_LONG.put("s", SECOND);
SHORT_TO_LONG.put("p", PROPERTY);
SHORT_TO_LONG.put("o", OUTPUT);
// 2. 限定我们自己的参数
OPTS = new HashSet<>();
OPTS.addAll(SHORT_TO_LONG.values());
}
private boolean isHelp;
private int thread;
private int count;
private int second;
private Map<String, String> property;
private String output;
public SparkStarter(String[] args) {
super(args);
}
private void parse(String[] args) {
/**
* 3. 咱们也使用正则
* group1:忽略前面的前缀 - 或 --,直接获取参数名
* group2:获取实际的参数值
*/
Pattern eqSeparatedOpt = Pattern.compile("-{1,2}([^=]+)=(.+)");
for (int idx = 0; idx < args.length; idx++) {
String arg = args[idx];
// 4. 遇到 help 可以直接退出即可
if(arg.equals("h") || arg.equals(HELP)){
isHelp = true;
return;
}
String value = null;
Matcher m = eqSeparatedOpt.matcher(arg);
if (m.matches()) {
arg = m.group(1);
value = m.group(2);
}
// 5. 校验参数名是否有效
String name = findCliOption(arg);
if (name != null) {
if (value == null) {
if (idx == args.length - 1) {
throw new IllegalArgumentException(
String.format("Missing argument for option '%s'.", arg));
}
idx++;
value = args[idx];
}
// 6. 对于参数值的处理
if (!handle(name, value)) {
break;
}
}
// 这里可以进行容错操作
// if (!handleUnknown(arg)) {
// break;
// }
}
}
@Override
protected Parameter parse() {
// 7. 开始解析
parse(args);
// 8. 内部已经使用了成员变量存储,直接返回即可
return new Parameter() {
@Override
public int getThread() {
return thread;
}
@Override
public int getCount() {
return count;
}
@Override
public int getSecond() {
return second;
}
@Override
public Map<String, String> getProperty() {
return property;
}
@Override
public String getOutput() {
return output;
}
@Override
public boolean isHelp() {
return isHelp;
}
};
}
private String findCliOption(String name) {
// 不必校验直接获取,可以少一次方法调用
name = SHORT_TO_LONG.getOrDefault(name, name);
return OPTS.contains(name) ? name : null;
}
private boolean handle(String opt, String value) {
switch (opt) {
case THREAD:
thread = Integer.parseInt(value);
return true;
case COUNT:
count = Integer.parseInt(value);
return true;
case SECOND:
second = Integer.parseInt(value);
return true;
case OUTPUT:
output = value;
return true;
case PROPERTY:
property = new HashMap<>();
int index = value.indexOf("=");
if (index > 0) {
property.put(value.substring(0, index), value.substring(index + 1));
}
return true;
default:
return false;
}
}
}
复制代码
5. Flink
如果只关注命令行参数的传递、解析和后续的参数设置,Flink 和 Spark 思路上大体一致,不同的是 Flink 基于开源工具 Apache Commons CLI 进行解析,解析流程如下:
- 执行 flink shell 脚本:调用 config.sh 脚本构建任务运行的环境,例如加载环境变量和构建依赖 jar 包等;
- Java 执行启动类 org.apache.flink.client.cli.CliFrontend 的 main 方法;
- 解析实现在 org.apache.flink.client.cli.CliFrontend 的 parseAndRun 方法中完成,第一个命令行参数为执行的动作,根据不同的动作执行相应的逻辑,如果运行的是 run 命令,在 run 方法内部通过 CommandLine 的实例完成其他命令行参数的解析。