`
乡里伢崽
  • 浏览: 109020 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

hive执行流程入口 源码入口

    博客分类:
  • hive
 
阅读更多
图片地址 :http://hi.csdn.net/attachment/201107/29/0_1311922740tXqK.gif
CliDriver可以说是hive的入口,对应上图中的UI部分。大家看它的结构就可以明白了,main()函数!对!你猜的没错就是从main()开始。
下图是类结构,总共有五个关键的函数。

这个类可以说是用户和hive交互的平台,你可以把它认为是hive客户端。总共有4个key函数:
下图是这个CliDriver类在整个Hive执行过程中的作用的地位。

如图,hive执行流程_按正常步骤走:
1.—CliDriver.classz中main()开始,初始化Hive环境变量,获取客户端提供的string或者file。
2 —将其代码送入processLine(cmd),这步主要是读入cmd:‘;’之前的所有字符串都读入(不做任何检查),之后的会忽略。读完后,传入processCmd()处理
3 —调用processCmd(cmd),分情况处理
//– 读入cmd,并分情况处理,总共分为以下五种情况,根据命令的开头字符串来确定用什么方法处理。
// 1.set.. 设置operator参数,hive环境参数
// 2.quit or exit — 退出Hive环境
// 3.! 开头
// 4.dfs 开头 交给FsShell处理
// 5.hivesql 正常hivesql执行语句,我们最关心的是这里。语句交给了、、Hive真正的核心引擎 Driver。返回ret = Driver.run(cmd);
4.—不同情况不同处理方法。我们关心的第五种情况:正常的HiveSQL如何处理?其实是进入driver.class里面run(),
//读入hivesql ,词法分析,语法分析,直到执行结束
//1.ParseDriver 返回 词法树 CommonTree
//2.BaseSemanticAnalyzer sem.analyze(tree, ctx);//语义解释,生成执行计划
5.—。。。etc
今天的主题是hive的入口,我们只聊前三步。
现在我们细化主要函数,看hive实际是怎么处理的。(如果你只想了解hive工作流程或原理,不想拘泥于细节,可以跳过下面的细节,如果你想修改源码,做优化,可以继续往下看)

下面是hive入口 涉及的一些关键类和关键函数。
——————————-类CliDriver —
由于这个类,可以说贯彻Hive的整个流程架构,所以我聊的比较细。
————————————————main()
主函数是CliDriver类的main函数,然后走run函数,再做了一些初始化和检测后,再调用processLine,再调用processCmd。processLocalCmd则调用了Driver类的run函数和runExcute函数。


直到:

while ((line = reader.readLine(curPrompt + "> ")) != null) {
表示重复请求读入 SQL>

1,cli/src/java   CliDriver.main是主函数。

[java] view plaincopy
public static void main(String[] args) throws Exception { 
   int ret = run(args); 
   System.exit(ret); 


2,进入run函数
public static int run(String[] args) throws Exception { 
     
    OptionsProcessor oproc = new OptionsProcessor(); 
//(1) 解析(Parse)args,放入cmdLine,处理 –hiveconf var=val  用于增加或者覆盖hive/hadoop配置,设置到System.Properties中。  
if (!oproc.process_stage1(args)) { 
  return 1; 

//(2) 配置log4j,加载hive-log4j.properties里的配置信息,日志的初始化在其他任何核心类初始化前。  
boolean logInitFailed = false; 
String logInitDetailMessage; 
try { 
  logInitDetailMessage = LogUtils.initHiveLog4j(); 
} catch (LogInitializationException e) { 
  logInitFailed = true; 
  logInitDetailMessage = e.getMessage(); 

//(3) 创建一个CliSessionState(SessionState)  
CliSessionState ss = new CliSessionState(new HiveConf(SessionState.class)); 
ss.in = System.in; 
try { 
  ss.out = new PrintStream(System.out, true, "UTF-8"); 
  ss.err = new PrintStream(System.err, true, "UTF-8"); 
} catch (UnsupportedEncodingException e) { 
  return 3; 

//(4) 处理-S, -e, -f, -h,-i等信息,保存在SessionState中。如果是-h,打印提示信息,并退出。 
if (!oproc.process_stage2(ss)) { 
  return 2; 

//(5) 如果不是-S,就是说不是静默状态,就输出一些提示信息,表示初始化好了。 
if (!ss.getIsSilent()) { 
  if (logInitFailed) { 
    System.err.println(logInitDetailMessage); 
  } else { 
//(5)输出一些信息:12/07/05 16:52:34 INFO SessionState:  
    SessionState.getConsole().printInfo(logInitDetailMessage); 
  } 

//(6)创建一个HiveConf,通过命令行设置的参数配置所有属性。  
HiveConf conf = ss.getConf(); 
for (Map.Entry<Object, Object> item : ss.cmdProperties.entrySet()) { 
  conf.set((String) item.getKey(), (String) item.getValue()); 

//(7)启动CliSessionState ss。  
SessionState.start(ss); 
 
// (8)连接到 Hive Server 
if (ss.getHost() != null) { 
  ss.connect(); 
  if (ss.isRemoteMode()) { 
    prompt = "[" + ss.host + ':' + ss.port + "] " + prompt; 
    char[] spaces = new char[prompt.length()]; 
    Arrays.fill(spaces, ' '); 
    prompt2 = new String(spaces); 
  } 

//(9) ShimLoader,load  HadoopShims  
// CLI remote mode is a thin client: only load auxJars in local mode 
if (!ss.isRemoteMode() && !ShimLoader.getHadoopShims().usesJobShell()) { 
  // hadoop-20 and above - we need to augment classpath using hiveconf 
  // components 
  // see also: code in ExecDriver.java 
  ClassLoader loader = conf.getClassLoader(); 
  //(9)设置hiveJar= hive-exec-0.6.0.jar ,初始化加载hive-default.xml、 hive-site.xml。 
  String auxJars = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEAUXJARS); 
  if (StringUtils.isNotBlank(auxJars)) { 
    loader = Utilities.addToClassPath(loader, StringUtils.split(auxJars, ",")); 
  } 
  conf.setClassLoader(loader); 
  Thread.currentThread().setContextClassLoader(loader); 

//(10) 创建CliDriver.  
CliDriver cli = new CliDrive(); 
cli.setHiveVariables(oproc.getHiveVariables()); 
//(10)在接受hivesql命令前,执行一些初始化命令,这些命令存在文件中,文件可以通过-i选项设置,如果没有设置就去查找是否有$HIVE_HOME/bin/.hiverc和System.getProperty("user.home")/.hiverc两个文件,如果有就执行这两个文件中的命令。  
// Execute -i init files (always in silent mode) 
cli.processInitFiles(ss); 
//(10) 如果是–e,执行命令并退出,如果是-f,执行文件中的命令并退出。  
if (ss.execString != null) { 
  return cli.processLine(ss.execString); 

 
try { 
  if (ss.fileName != null) { 
    return cli.processFile(ss.fileName); 
  } 
} catch (FileNotFoundException e) { 
  System.err.println("Could not open input file for reading. (" + e.getMessage() + ")"); 
  return 3; 

//(11)创建ConsoleReader,读取用户输入,遇到“;”为一个完整的命令,执行该命令(CliDriver.processLine ),接着读取处理用户的输入。用户输入的命令记录在user.home/.hivehistory文件中。  
ConsoleReader reader = new ConsoleReader(); 
reader.setBellEnabled(false); 
// reader.setDebug(new PrintWriter(new FileWriter("writer.debug", true))); 
reader.addCompletor(getCommandCompletor()); 
 
String line; 
final String HISTORYFILE = ".hivehistory"; 
String historyFile = System.getProperty("user.home") + File.separator + HISTORYFILE; 
reader.setHistory(new History(new File(historyFile))); 
int ret = 0; 
 
String prefix = ""; 
String curDB = getFormattedDb(conf, ss); 
String curPrompt = prompt + curDB; 
String dbSpaces = spacesForString(curDB); 
 
while ((line = reader.readLine(curPrompt + "> ")) != null) { 
  if (!prefix.equals("")) { 
    prefix += '\n'; 
  } 
  if (line.trim().endsWith(";") && !line.trim().endsWith("\\;")) { 
    line = prefix + line; 
    ret = cli.processLine(line, true); 
    prefix = ""; 
    curDB = getFormattedDb(conf, ss); 
    curPrompt = prompt + curDB; 
    dbSpaces = dbSpaces.length() == curDB.length() ? dbSpaces : spacesForString(curDB); 
  } else { 
    prefix = prefix + line; 
    curPrompt = prompt2 + dbSpaces; 
    continue; 
  } 

 
ss.close(); 
 
return ret; 

3,主要是调用了 processLine。

     ProcessLine又调用了 processCmd。

    CliDriver.processLine   去掉命令末尾的;,

public int processLine(String line, boolean allowInterupting) { 
   SignalHandler oldSignal = null; 
   Signal interupSignal = null; 
   //(1)整理允许中断 ctrl+C 
   if (allowInterupting) { 
     // Remember all threads that were running at the time we started line processing. 
     // Hook up the custom Ctrl+C handler while processing this line 
     interupSignal = new Signal("INT"); 
     oldSignal = Signal.handle(interupSignal, new SignalHandler() { 
       private final Thread cliThread = Thread.currentThread(); 
       private boolean interruptRequested; 
 
       @Override 
       public void handle(Signal signal) { 
         boolean initialRequest = !interruptRequested; 
         interruptRequested = true; 
 
         // Kill the VM on second ctrl+c 
         if (!initialRequest) { 
           console.printInfo("Exiting the JVM"); 
           System.exit(127); 
         } 
 
         // Interrupt the CLI thread to stop the current statement and return 
         // to prompt 
         console.printInfo("Interrupting... Be patient, this might take some time."); 
         console.printInfo("Press Ctrl+C again to kill JVM"); 
 
         // First, kill any running MR jobs 
         HadoopJobExecHelper.killRunningJobs(); 
         HiveInterruptUtils.interrupt(); 
         this.cliThread.interrupt(); 
       } 
     }); 
   } 
 
   try { 
     int lastRet = 0, ret = 0; 
 
     String command = ""; 
    //(2)循环处理每一个以分号结尾的语句。 
    for (String oneCmd : line.split(";")) { 
 
      if (StringUtils.endsWith(oneCmd, "\\")) { 
        command += StringUtils.chop(oneCmd) + ";"; 
        continue; 
      } else { 
        command += oneCmd; 
      } 
      if (StringUtils.isBlank(command)) { 
        continue; 
      } 
      //(3)执行处理命令 
      ret = processCmd(command); 
      //(4)清除query State的状态。wipe cli query state 
      SessionState ss = SessionState.get(); 
      ss.setCommandType(null); 
      command = ""; 
      lastRet = ret; 
      boolean ignoreErrors = HiveConf.getBoolVar(conf, HiveConf.ConfVars.CLIIGNOREERRORS); 
      if (ret != 0 && !ignoreErrors) { 
        CommandProcessorFactory.clean((HiveConf) conf); 
        return ret; 
      } 
    } 
    CommandProcessorFactory.clean((HiveConf) conf); 
    return lastRet; 
  } finally { 
    // Once we are done processing the line, restore the old handler 
    if (oldSignal != null && interupSignal != null) { 
      Signal.handle(interupSignal, oldSignal); 
    } 
  } 


4,processCmd

CliDriver.processCmd


Split命令,分析第一个单词:
(1)如果是quit或者exit,退出。
(2)source,执行文件中的HiveQL
(3)!,执行命令,如!ls,列出当前目录的文件信息。
(4)list,列出jar/file/archive。
(5)如果是其他,则生成调用相应的CommandProcessor处理。
[java] view plaincopy
public int processCmd(String cmd) { 
    CliSessionState ss = (CliSessionState) SessionState.get(); 
    String cmd_trimmed = cmd.trim(); 
    String[] tokens = tokenizeCmd(cmd_trimmed); 
    int ret = 0; 
    //(1)如果是quit或者exit,退出。  
    if (cmd_trimmed.toLowerCase().equals("quit") || cmd_trimmed.toLowerCase().equals("exit")) { 
 
      // if we have come this far - either the previous commands 
      // are all successful or this is command line. in either case 
      // this counts as a successful run 
      ss.close(); 
      System.exit(0); 
     //(2)source,执行文件中的HiveQL  
    } else if (tokens[0].equalsIgnoreCase("source")) { 
      String cmd_1 = getFirstCmd(cmd_trimmed, tokens[0].length()); 
 
      File sourceFile = new File(cmd_1); 
      if (! sourceFile.isFile()){ 
        console.printError("File: "+ cmd_1 + " is not a file."); 
        ret = 1; 
      } else { 
        try { 
          this.processFile(cmd_1); 
        } catch (IOException e) { 
          console.printError("Failed processing file "+ cmd_1 +" "+ e.getLocalizedMessage(), 
            org.apache.hadoop.util.StringUtils.stringifyException(e)); 
          ret = 1; 
        } 
      }//(3)!,执行命令,如!ls,列出当前目录的文件信息。  
    } else if (cmd_trimmed.startsWith("!")) { 
 
      String shell_cmd = cmd_trimmed.substring(1); 
      shell_cmd = new VariableSubstitution().substitute(ss.getConf(), shell_cmd); 
 
      // shell_cmd = "/bin/bash -c \'" + shell_cmd + "\'"; 
      try { 
        Process executor = Runtime.getRuntime().exec(shell_cmd); 
        StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(), null, ss.out); 
        StreamPrinter errPrinter = new StreamPrinter(executor.getErrorStream(), null, ss.err); 
 
        outPrinter.start(); 
        errPrinter.start(); 
 
        ret = executor.waitFor(); 
        if (ret != 0) { 
          console.printError("Command failed with exit code = " + ret); 
        } 
      } catch (Exception e) { 
        console.printError("Exception raised from Shell command " + e.getLocalizedMessage(), 
            org.apache.hadoop.util.StringUtils.stringifyException(e)); 
        ret = 1; 
      } 
     //(4)list,列出jar/file/archive。  
    } else if (tokens[0].toLowerCase().equals("list")) { 
 
      SessionState.ResourceType t; 
      if (tokens.length < 2 || (t = SessionState.find_resource_type(tokens[1])) == null) { 
        console.printError("Usage: list [" 
            + StringUtils.join(SessionState.ResourceType.values(), "|") + "] [<value> [<value>]*]"); 
        ret = 1; 
      } else { 
        List<String> filter = null; 
        if (tokens.length >= 3) { 
          System.arraycopy(tokens, 2, tokens, 0, tokens.length - 2); 
          filter = Arrays.asList(tokens); 
        } 
        Set<String> s = ss.list_resource(t, filter); 
        if (s != null && !s.isEmpty()) { 
          ss.out.println(StringUtils.join(s, "\n")); 
        } 
      }//(5)如果是其他,则生成调用相应的CommandProcessor处理。//如果是远端 
    } else if (ss.isRemoteMode()) { // remote mode -- connecting to remote hive server 
      HiveClient client = ss.getClient(); 
      PrintStream out = ss.out; 
      PrintStream err = ss.err; 
 
      try { 
        client.execute(cmd_trimmed); 
        List<String> results; 
        do { 
          results = client.fetchN(LINES_TO_FETCH); 
          for (String line : results) { 
            out.println(line); 
          } 
        } while (results.size() == LINES_TO_FETCH); 
      } catch (HiveServerException e) { 
        ret = e.getErrorCode(); 
        if (ret != 0) { // OK if ret == 0 -- reached the EOF 
          String errMsg = e.getMessage(); 
          if (errMsg == null) { 
            errMsg = e.toString(); 
          } 
          ret = e.getErrorCode(); 
          err.println("[Hive Error]: " + errMsg); 
        } 
      } catch (TException e) { 
        String errMsg = e.getMessage(); 
        if (errMsg == null) { 
          errMsg = e.toString(); 
        } 
        ret = -10002; 
        err.println("[Thrift Error]: " + errMsg); 
      } finally { 
        try { 
          client.clean(); 
        } catch (TException e) { 
          String errMsg = e.getMessage(); 
          if (errMsg == null) { 
            errMsg = e.toString(); 
          } 
          err.println("[Thrift Error]: Hive server is not cleaned due to thrift exception: " 
              + errMsg); 
        } 
      }//如果是本地 
    } else { // local mode 
      CommandProcessor proc = CommandProcessorFactory.get(tokens[0], (HiveConf) conf); 
      ret = processLocalCmd(cmd, proc, ss); 
    } 
 
    return ret; 
  } 

5,processLoacalCmd

int processLocalCmd(String cmd, CommandProcessor proc, CliSessionState ss) { 
  int tryCount = 0; 
  boolean needRetry; 
  int ret = 0; 
 
  do { 
    try { 
      needRetry = false; 
      if (proc != null) { 
        if (proc instanceof Driver) { 
          Driver qp = (Driver) proc; 
          PrintStream out = ss.out; 
          long start = System.currentTimeMillis(); 
          if (ss.getIsVerbose()) { 
            out.println(cmd); 
          } 
 
          qp.setTryCount(tryCount); 
          ret = qp.run(cmd).getResponseCode(); 
          if (ret != 0) { 
            qp.close(); 
            return ret; 
          } 
 
          ArrayList<String> res = new ArrayList<String>(); 
 
          printHeader(qp, out); 
 
          try { 
            while (qp.getResults(res)) { 
              for (String r : res) { 
                out.println(r); 
              } 
              res.clear(); 
              if (out.checkError()) { 
                break; 
              } 
            } 
          } catch (IOException e) { 
            console.printError("Failed with exception " + e.getClass().getName() + ":" 
                + e.getMessage(), "\n" 
                + org.apache.hadoop.util.StringUtils.stringifyException(e)); 
            ret = 1; 
          } 
 
          int cret = qp.close(); 
          if (ret == 0) { 
            ret = cret; 
          } 
 
          long end = System.currentTimeMillis(); 
          if (end > start) { 
            double timeTaken = (end - start) / 1000.0; 
            console.printInfo("Time taken: " + timeTaken + " seconds", null); 
          } 
 
        } else { 
          String firstToken = tokenizeCmd(cmd.trim())[0]; 
          String cmd_1 = getFirstCmd(cmd.trim(), firstToken.length()); 
 
          if (ss.getIsVerbose()) { 
            ss.out.println(firstToken + " " + cmd_1); 
          } 
          ret = proc.run(cmd_1).getResponseCode(); 
        } 
      } 
    } catch (CommandNeedRetryException e) { 
      console.printInfo("Retry query with a different approach..."); 
      tryCount++; 
      needRetry = true; 
    } 
  } while (needRetry); 
 
  return ret; 



6,Driver 类 的run 方法。


Driver
Driver.run(String command) // 处理一条命令
{
  int ret =compile(command);  // 分析命令,生成Task。
  ret = execute();  // 运行Task。
}


Driver.compile

Driver.compile(String command) // 处理一条命令
{
(1) Parser(antlr):HiveQL->AbstractSyntaxTree(AST)
      ParseDriver pd = new ParseDriver();
      ASTNode tree = pd.parse(command, ctx);
(2) SemanticAnalyzer
      BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(conf, tree);
      // Do semantic analysis and plan generation
      sem.analyze(tree, ctx);
}



7,plan生成位置

可以通过跟踪到Driver.java文件的Line 663可知其路径为:

/tmp/hive-gexing111/hive_2012-07-09_10-37-27_511_5073252372102100766/test2.py
如果系统自己的plan:

/tmp/hive-gexing111/hive_2012-07-09_12-45-55_479_6444298560478274273/-local-10002/plan.xml

8,Debug  show tables "ge*";

在hive/   metastore/src/java   /com.aliyun.apsara.odps.metastore.ots   /OTSObjectStore.java  

中的Gettables;


9,配置文件位置

hive/build/dist/conf/hive-site.xml


设置为改写版本:

<property>
  <name>com.aliyun.odps.mode</name>
  <value>true</value>
</property>

<property>
<name>native.hive.mode</name>
<value>false</value>
</property>






http://blog.csdn.net/wf1982/article/details/6644330
http://blog.csdn.net/gexiaobaohelloworld/article/details/7719163
  • 大小: 95.1 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics