Java 命令行参数解析方式探索(四):Spark & Flink

1. Spark

./bin/spark-submit \
--name "Hello Spark" \
--master yarn \
--deploy-mode cluster \
--driver-memory 8g \
--num-executors 4 \
--executor-memory 16g \
--executor-cores 2  \
......

复制代码

我们在提交 Spark 任务时通常会设置任务名称、master 地址、driver 资源和 executor 资源等参数,除此之外还有很多,那么 Spark 是如何一步步解析这些命令行参数呢?让我们一起探索。

2. Spark:解析参数主要流程


  1. 提交 Spark 任务会执行 spark-submit shell 脚本,脚本中设定了启动类 org.apache.spark.deploy.SparkSubmit,委托 spark-class shell 脚本执行;
  2. spark-class shell 脚本首先会加载 Spark 环境变量,接着构建运行时依赖的 jar 包,构建完毕执行 Java 命令,调用 org.apache.spark.deploy.SparkSubmit 的 main 方法;
  3. org.apache.spark.deploy.SparkSubmit 虽然是一个单例对象,执行 main 方法时却又创建了一个新的自身类型实例,调用其 doSubmit 执行操作,doSubmit 方法内委托 SparkSubmitArguments 类来处理命令行参数;
  4. 在 SparkSubmitArguments 的构造函数中会调用父类 SparkSubmitOptionParser 的 parse 方法来解析实际的命令行参数。


最终经过上面的处理流程完成命令行参数的解析工作,接下来会介绍如果通过 SparkSubmitOptionParser 实现真正的解析工作。

3. Spark:解析实现

从解析的代码实现可以看出,解析逻辑还是十分严谨的:

  1. 限定参数名的解析范围,定义参数的类型:有值和无值;
  2. 有一定的灵活性:参数名和参数值之间可以使用 = 号连接;
  3. 有一定的扩展性:对于未识别的参数名,交由子类处理。

4. Spark:开源代码如何借鉴的更有价值

Spark 解析参数的实现比咱们最开始 Java 命令行参数解析方式探索(一):原始实现 文章中的实现严谨多了,对于好的代码咱们如何学习借鉴呢,最好的途径就是应用在实际的项目里,下面是一个思路,参考注释的步骤,借鉴代码之前添加下 apache 的 license2。

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */


package com.ice.impl;


import com.ice.Parameter;
import com.ice.Starter;


import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;


public class SparkStarter extends Starter {
    private static final String HELP = "help";
    private static final String THREAD = "thread";
    private static final String COUNT = "count";
    private static final String SECOND = "second";
    private static final String PROPERTY = "property";
    private static final String OUTPUT = "output";
    private static final Map<String, String> SHORT_TO_LONG;
    private static final Set<String> OPTS;


    static {
        // 1. 缩写 -> 全写,方便后续统一处理
        SHORT_TO_LONG = new HashMap<>();
        SHORT_TO_LONG.put("t", THREAD);
        SHORT_TO_LONG.put("c", COUNT);
        SHORT_TO_LONG.put("s", SECOND);
        SHORT_TO_LONG.put("p", PROPERTY);
        SHORT_TO_LONG.put("o", OUTPUT);


        // 2. 限定我们自己的参数
        OPTS = new HashSet<>();
        OPTS.addAll(SHORT_TO_LONG.values());
    }


    private boolean isHelp;
    private int thread;
    private int count;
    private int second;
    private Map<String, String> property;
    private String output;


    public SparkStarter(String[] args) {
        super(args);
    }


    private void parse(String[] args) {
        /**
         * 3. 咱们也使用正则
         * group1:忽略前面的前缀 - 或 --,直接获取参数名
         * group2:获取实际的参数值
         */
        Pattern eqSeparatedOpt = Pattern.compile("-{1,2}([^=]+)=(.+)");


        for (int idx = 0; idx < args.length; idx++) {
            String arg = args[idx];


            // 4. 遇到 help 可以直接退出即可
            if(arg.equals("h") || arg.equals(HELP)){
                isHelp = true;
                return;
            }


            String value = null;
            Matcher m = eqSeparatedOpt.matcher(arg);
            if (m.matches()) {
                arg = m.group(1);
                value = m.group(2);
            }


            // 5. 校验参数名是否有效
            String name = findCliOption(arg);
            if (name != null) {
                if (value == null) {
                    if (idx == args.length - 1) {
                        throw new IllegalArgumentException(
                                String.format("Missing argument for option '%s'.", arg));
                    }
                    idx++;
                    value = args[idx];
                }


                // 6. 对于参数值的处理
                if (!handle(name, value)) {
                    break;
                }
            }


            // 这里可以进行容错操作
//            if (!handleUnknown(arg)) {
//                break;
//            }
        }
    }


    @Override
    protected Parameter parse() {
        // 7. 开始解析
        parse(args);
        
        // 8. 内部已经使用了成员变量存储,直接返回即可
        return new Parameter() {
            @Override
            public int getThread() {
                return thread;
            }


            @Override
            public int getCount() {
                return count;
            }


            @Override
            public int getSecond() {
                return second;
            }


            @Override
            public Map<String, String> getProperty() {
                return property;
            }


            @Override
            public String getOutput() {
                return output;
            }


            @Override
            public boolean isHelp() {
                return isHelp;
            }
        };
    }


    private String findCliOption(String name) {
        // 不必校验直接获取,可以少一次方法调用
        name = SHORT_TO_LONG.getOrDefault(name, name);
        return OPTS.contains(name) ? name : null;
    }


    private boolean handle(String opt, String value) {
        switch (opt) {
            case THREAD:
                thread = Integer.parseInt(value);
                return true;
            case COUNT:
                count = Integer.parseInt(value);
                return true;
            case SECOND:
                second = Integer.parseInt(value);
                return true;
            case OUTPUT:
                output = value;
                return true;
            case PROPERTY:
                property = new HashMap<>();
                int index = value.indexOf("=");
                if (index > 0) {
                    property.put(value.substring(0, index), value.substring(index + 1));
                }
                return true;
            default:
                return false;
        }
    }
}

复制代码

5. Flink

如果只关注命令行参数的传递、解析和后续的参数设置,Flink 和 Spark 思路上大体一致,不同的是 Flink 基于开源工具 Apache Commons CLI 进行解析,解析流程如下:

  1. 执行 flink shell 脚本:调用 config.sh 脚本构建任务运行的环境,例如加载环境变量和构建依赖 jar 包等;
  2. Java 执行启动类 org.apache.flink.client.cli.CliFrontend 的 main 方法;
  3. 解析实现在 org.apache.flink.client.cli.CliFrontend 的 parseAndRun 方法中完成,第一个命令行参数为执行的动作,根据不同的动作执行相应的逻辑,如果运行的是 run 命令,在 run 方法内部通过 CommandLine 的实例完成其他命令行参数的解析。

相关文章

网络常见的 9 大命令,非常实用

1.ping 命令ping 是个使用频率极高的实用程序,主要用于确定网络的连通性。这对确定网络是否正确连接,以及网络连接的状况十分有用。简单的说,ping 就是一个测试程序,如果 ping 运行正确,...

命令行参数

用过Unix命令行,都会感受它的强大。Unix like包括Linux发行版、macOS等命令行在大部分情况下比图形化高效太多,很多高手都在用命令行。CUI和GUI尽管专业的程序员通常用CUI, 但G...

如何用java执行cmd命令

什么场景下用到cmd命令windows的cmd命令对于编程的小伙伴们来说应该是再熟悉不过了,比如说安装完node之后我们要查看一下node是否安装成功那么我们可以通过cmd命令行输入node -v来查...

Java中命令行调用大坑

Java中命令行调用大坑背景我司有一个查询服务接口机,QPS大概40~50,调用方式是Java调用Shell命令行的方式,核心代码如下:Process ps = Runtime.getRuntime(...

快速命令行界面

什么是快速 CLI?快速 CLI 是在 Java 中创建命令行程序的最快方法。它是一个基于开源注释的框架,用于创建shell程序。它使应用程序开发人员能够构建命令行工具,而无需编写样板代码。它非常简单...

你了解JDK常用的7种命令行工具吗?

1 jps1.1 介绍jps是JDK提供的一个可以列出正在运行的Java虚拟机的进程信息的命令行工具,用来显示当前系统的java进程情况及进程id。1.2 命令格式jps [options] [hos...