1.job.waitForCompletion(true);
/**
* 主要是將任務提交到集群中去并等待完成
* boolean verbose:是否將進度列印給用戶看
* return 任務成功回傳true
*/
public boolean waitForCompletion(boolean verbose
) throws IOException, InterruptedException,
ClassNotFoundException {
//判斷job狀態是否為define,避免二次提交,JobState為列舉:DEFINE,RUNNING
if (state == JobState.DEFINE) {
submit();//將任務提交到集群 --> 1.1 submit()
}
if (verbose) {
monitorAndPrintJob();
} else {
// get the completion poll interval from the client.
int completionPollIntervalMillis =
Job.getCompletionPollInterval(cluster.getConf());
while (!isComplete()) {
try {
Thread.sleep(completionPollIntervalMillis);
} catch (InterruptedException ie) {
}
}
}
return isSuccessful();
}
1.1submit()
public void submit()
throws IOException, InterruptedException, ClassNotFoundException {
ensureState(JobState.DEFINE);
// 該方法做了系統兼容,避免出現框架更新,老版本無法使用
setUseNewAPI();
// 這里面創建了一個很重要的物件,用于建立連接本地還是集群連接 --> 1.1.1 connect()
connect();
final JobSubmitter submitter =
getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException,
ClassNotFoundException {
/* 真正開始提交job流程,準備作業終于做完了 --> JobStatus submitJobInternal(Job job, Cluster cluster)
*/
return submitter.submitJobInternal(Job.this, cluster);
}
});
state = JobState.RUNNING;
LOG.info("The url to track the job: " + getTrackingURL());
}
1.1.1 connect
private synchronized void connect()
throws IOException, InterruptedException, ClassNotFoundException {
//第一次連接的話一定為null
if (cluster == null) {
cluster =
ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
public Cluster run()
throws IOException, InterruptedException,
ClassNotFoundException {
//創建提交Job的代理 --> cluster(Configuration conf)
return new Cluster(getConfiguration());
}
});
}
}
/**
* public Cluster(Configuration conf)
* 主要作用是獲取你在Driver中Configuration配置的檔案資訊,沒有配置使用默認
*/
public Cluster(Configuration conf) throws IOException {
this(null, conf);
}
/**
* 呼叫initialize(jobTrackAddr, conf)回傳的值
* jobTrackAddr:local還是yarn
* conf:配置的資訊
*/
public Cluster(InetSocketAddress jobTrackAddr, Configuration conf)
throws IOException {
this.conf = conf;
this.ugi = UserGroupInformation.getCurrentUser();
initialize(jobTrackAddr, conf); // --> initialize()
}
/**
* ...表示該方法中的校驗代碼
* jobTrackAddr:狀態
* conf:配置資訊
*/
private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)throws IOException {
...
/*
判斷Driver里配置狀態,如果是yarn則回傳YarnRunner物件,如果沒有配置則回傳LocalJobRunner物件
*/
if (jobTrackAddr == null) {
/*
第一次判斷--> public ClientProtocol create(Configuration conf)
第二次判斷--> public ClientProtocol create(Configuration conf)
*/
clientProtocol = provider.create(conf);
} else {
clientProtocol = provider.create(jobTrackAddr, conf);
}
...
}
/**
* yarn是否和Driver中配置的conf.set("mapreduce.framework.name","yarn");
* 相同,相同的話就回傳一個YARNRunner物件,沒有配置的話就回傳一個null,而后進行
* 第二次判斷
*/
public ClientProtocol create(Configuration conf) throws IOException {
return "yarn".equals(conf.get("mapreduce.framework.name")) ?
new YARNRunner(conf) : null;
}
/**
* 之前回傳為null的話就進行第二次判斷
*/
public ClientProtocol create(Configuration conf) throws IOException {
/*
MRConfig.FRAMEWORK_NAME:"mapreduce.framework.name"
LOCAL_FRAMEWORK_NAME:"local"
沒有這個值就獲得一個LOCAL_FRAMEWORK_NAME
*/
String framework =conf.get(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
//判斷兩方是否都為local,相同的話就創建LocalJobRunner()物件
if (!MRConfig.LOCAL_FRAMEWORK_NAME.equals(framework)) {
return null;
}
conf.setInt(JobContext.NUM_MAPS, 1);
return new LocalJobRunner(conf);
} // --> submint() --> return submitter.submitJobInternal(Job.this, cluster);
總結:connect方法最終要的地方就是,為我們創建了一個關鍵的物件LocalJobRunner物件,這個物件為我們之后提交作業所用,很重要,
1.1.2 提交JOB
JobStatus submitJobInternal(Job job, Cluster cluster)throws ClassNotFoundException, InterruptedException, IOException {
/*
校驗檔案輸出路徑是否在Driver中配置,如果沒有配置拋出InvalidJobConfException,如果檔案路徑存在拋出 FileAlreadyException
*/
checkSpecs(job);
// 獲取conf配置
Configuration conf = job.getConfiguration();
addMRFrameworkToDistributedCache(conf);
// 創建路徑,往路勁里生成資訊,提供給APPMaster使用 在集群中也就是tem路勁
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
// 配置校驗資訊我就用...代替了,太多容易把眼睛看花,感興趣的朋友可以用DeBug邊跳邊看里面的具體資訊,
...
// 創建jobId 也就是8088埠中 任務的id
JobID jobId = submitClient.getNewJobID();
// 獲得jobId設定到job里
job.setJobID(jobId);
// 將jobStagingArea和jobId拼在一起,拼成一個提交資訊(配置資訊,切片資訊,jar包)的路徑
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
...
// 拷貝jar包到集群(本地模式看不到,在向集群提交的時候才能看到jar包)
copyAndConfigureFiles(job, submitJobDir);
// 會在submitJobDir目錄下創建一個job.xml檔案
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
/* 切片,具體怎么切,再看切片原始碼的時候會提到,在執行完該方法后submitJobDir路徑中會多出split和crc 檔案
*/
int maps = writeSplits(job, submitJobDir);
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/254518.html
標籤:其他
