Java 命令行参数解析方式探索(四):Spark & Flink

createh53个月前 (01-08)技术教程39

1. Spark

./bin/spark-submit \
--name "Hello Spark" \
--master yarn \
--deploy-mode cluster \
--driver-memory 8g \
--num-executors 4 \
--executor-memory 16g \
--executor-cores 2  \
......

复制代码

我们在提交 Spark 任务时通常会设置任务名称、master 地址、driver 资源和 executor 资源等参数,除此之外还有很多,那么 Spark 是如何一步步解析这些命令行参数呢?让我们一起探索。

2. Spark:解析参数主要流程


  1. 提交 Spark 任务会执行 spark-submit shell 脚本,脚本中设定了启动类 org.apache.spark.deploy.SparkSubmit,委托 spark-class shell 脚本执行;
  2. spark-class shell 脚本首先会加载 Spark 环境变量,接着构建运行时依赖的 jar 包,构建完毕执行 Java 命令,调用 org.apache.spark.deploy.SparkSubmit 的 main 方法;
  3. org.apache.spark.deploy.SparkSubmit 虽然是一个单例对象,执行 main 方法时却又创建了一个新的自身类型实例,调用其 doSubmit 执行操作,doSubmit 方法内委托 SparkSubmitArguments 类来处理命令行参数;
  4. 在 SparkSubmitArguments 的构造函数中会调用父类 SparkSubmitOptionParser 的 parse 方法来解析实际的命令行参数。


最终经过上面的处理流程完成命令行参数的解析工作,接下来会介绍如果通过 SparkSubmitOptionParser 实现真正的解析工作。

3. Spark:解析实现

从解析的代码实现可以看出,解析逻辑还是十分严谨的:

  1. 限定参数名的解析范围,定义参数的类型:有值和无值;
  2. 有一定的灵活性:参数名和参数值之间可以使用 = 号连接;
  3. 有一定的扩展性:对于未识别的参数名,交由子类处理。

4. Spark:开源代码如何借鉴的更有价值

Spark 解析参数的实现比咱们最开始 Java 命令行参数解析方式探索(一):原始实现 文章中的实现严谨多了,对于好的代码咱们如何学习借鉴呢,最好的途径就是应用在实际的项目里,下面是一个思路,参考注释的步骤,借鉴代码之前添加下 apache 的 license2。

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */


package com.ice.impl;


import com.ice.Parameter;
import com.ice.Starter;


import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;


public class SparkStarter extends Starter {
    private static final String HELP = "help";
    private static final String THREAD = "thread";
    private static final String COUNT = "count";
    private static final String SECOND = "second";
    private static final String PROPERTY = "property";
    private static final String OUTPUT = "output";
    private static final Map<String, String> SHORT_TO_LONG;
    private static final Set<String> OPTS;


    static {
        // 1. 缩写 -> 全写,方便后续统一处理
        SHORT_TO_LONG = new HashMap<>();
        SHORT_TO_LONG.put("t", THREAD);
        SHORT_TO_LONG.put("c", COUNT);
        SHORT_TO_LONG.put("s", SECOND);
        SHORT_TO_LONG.put("p", PROPERTY);
        SHORT_TO_LONG.put("o", OUTPUT);


        // 2. 限定我们自己的参数
        OPTS = new HashSet<>();
        OPTS.addAll(SHORT_TO_LONG.values());
    }


    private boolean isHelp;
    private int thread;
    private int count;
    private int second;
    private Map<String, String> property;
    private String output;


    public SparkStarter(String[] args) {
        super(args);
    }


    private void parse(String[] args) {
        /**
         * 3. 咱们也使用正则
         * group1:忽略前面的前缀 - 或 --,直接获取参数名
         * group2:获取实际的参数值
         */
        Pattern eqSeparatedOpt = Pattern.compile("-{1,2}([^=]+)=(.+)");


        for (int idx = 0; idx < args.length; idx++) {
            String arg = args[idx];


            // 4. 遇到 help 可以直接退出即可
            if(arg.equals("h") || arg.equals(HELP)){
                isHelp = true;
                return;
            }


            String value = null;
            Matcher m = eqSeparatedOpt.matcher(arg);
            if (m.matches()) {
                arg = m.group(1);
                value = m.group(2);
            }


            // 5. 校验参数名是否有效
            String name = findCliOption(arg);
            if (name != null) {
                if (value == null) {
                    if (idx == args.length - 1) {
                        throw new IllegalArgumentException(
                                String.format("Missing argument for option '%s'.", arg));
                    }
                    idx++;
                    value = args[idx];
                }


                // 6. 对于参数值的处理
                if (!handle(name, value)) {
                    break;
                }
            }


            // 这里可以进行容错操作
//            if (!handleUnknown(arg)) {
//                break;
//            }
        }
    }


    @Override
    protected Parameter parse() {
        // 7. 开始解析
        parse(args);
        
        // 8. 内部已经使用了成员变量存储,直接返回即可
        return new Parameter() {
            @Override
            public int getThread() {
                return thread;
            }


            @Override
            public int getCount() {
                return count;
            }


            @Override
            public int getSecond() {
                return second;
            }


            @Override
            public Map<String, String> getProperty() {
                return property;
            }


            @Override
            public String getOutput() {
                return output;
            }


            @Override
            public boolean isHelp() {
                return isHelp;
            }
        };
    }


    private String findCliOption(String name) {
        // 不必校验直接获取,可以少一次方法调用
        name = SHORT_TO_LONG.getOrDefault(name, name);
        return OPTS.contains(name) ? name : null;
    }


    private boolean handle(String opt, String value) {
        switch (opt) {
            case THREAD:
                thread = Integer.parseInt(value);
                return true;
            case COUNT:
                count = Integer.parseInt(value);
                return true;
            case SECOND:
                second = Integer.parseInt(value);
                return true;
            case OUTPUT:
                output = value;
                return true;
            case PROPERTY:
                property = new HashMap<>();
                int index = value.indexOf("=");
                if (index > 0) {
                    property.put(value.substring(0, index), value.substring(index + 1));
                }
                return true;
            default:
                return false;
        }
    }
}

复制代码

5. Flink

如果只关注命令行参数的传递、解析和后续的参数设置,Flink 和 Spark 思路上大体一致,不同的是 Flink 基于开源工具 Apache Commons CLI 进行解析,解析流程如下:

  1. 执行 flink shell 脚本:调用 config.sh 脚本构建任务运行的环境,例如加载环境变量和构建依赖 jar 包等;
  2. Java 执行启动类 org.apache.flink.client.cli.CliFrontend 的 main 方法;
  3. 解析实现在 org.apache.flink.client.cli.CliFrontend 的 parseAndRun 方法中完成,第一个命令行参数为执行的动作,根据不同的动作执行相应的逻辑,如果运行的是 run 命令,在 run 方法内部通过 CommandLine 的实例完成其他命令行参数的解析。

相关文章

你了解JDK常用的7种命令行工具吗?

1 jps1.1 介绍jps是JDK提供的一个可以列出正在运行的Java虚拟机的进程信息的命令行工具,用来显示当前系统的java进程情况及进程id。1.2 命令格式jps [options] [hos...

有望取代 java?GO 语言项目了解一下

GO 语言在编程界一直让人又爱又恨,有人说“ GO 将统治下一个十年”,“几乎所有新的、有趣的东西都是用 Go 写的”;也有人说它过于死板,使用感太差。国外有 Google、AWS、Cloudflar...

Java设计模式——命令模式

文章目录命令模式命令模式命令模式很好理解,举个例子,司令员下令让士兵去干件事情,从整个事情的角度来考虑,司令员的作用是,发出口令,口令经过传递,传到了士兵耳朵里,士兵去执行。这个过程好在,三者相互解耦...

Linux上,最常用的一批命令解析(十年精选,超详细)

Linux最常用命令合辑十年精选万字熬夜肝文建议收藏 | 分享一、系统目录结构1. / :根每一个文件和目录从根目录开始。只有root用户具有该目录下的写权限。请注意,/root是root用户的主目录...

11款常用Java编程软件推荐(建议收藏)

大家好,我是mikechen。优秀的Java编程软件可以极大的提升我们的开发效率,所以今天给大家推荐一波必备Java编程软件@mikechen本篇已经收纳于mikechen创作的《阿里架构师进阶专题合...