點擊查看InputSplit
// 切片類,表示 一份被Mapper處理的資料
public abstract class InputSplit {
// 獲取切片物件的 長度(單位Bytes)
public abstract long getLength() throws IOException, InterruptedException;
// 獲取當前切片物件的 存盤資訊
public abstract
String[] getLocations() throws IOException, InterruptedException;
// 獲取所有切片物件的 存盤資訊
public SplitLocationInfo[] getLocationInfo() throws IOException {
return null;
}
}
FileSplit對應的是一個輸入檔案,也就是說,如果用FileSplit對應的FileInputFormat作為輸入格式,
那么即使檔案特別小,也是作為一個單獨的InputSplit來處理,而每一個InputSplit將會由一個獨立的Mapper Task來處理,
在輸入資料是由大量小檔案組成的情形下,就會有同樣大量的InputSplit,
從而需要同樣大量的Mapper來處理,大量的Mapper Task創建銷毀開銷將是巨大的,甚至對集群來說,是災難性的!
點擊查看FileSplit
// 切片類,表示 一份被Mapper處理的資料
// 作為 InputFormat的getSplits方法的回傳值
// 作為 InputFormat的createRecordReader方法的輸入
// 每個切片 包含檔案的一部分 或者整個檔案(不可切分或者 檔案大小小于切片*1.1時)
public class FileSplit extends InputSplit implements Writable {
private Path file; // 切片 所屬的檔案名稱
private long start; // 切片對應 在檔案中的 啟示位置
private long length; // 切片長度(位元組數)
private String[] hosts; // 切片 所屬 block的存盤host資訊
private SplitLocationInfo[] hostInfos;
// 構造器
public FileSplit() {}
// 構造器
public FileSplit(Path file, long start, long length, String[] hosts) {
this.file = file;
this.start = start;
this.length = length;
this.hosts = hosts;
}
// 構造器
public FileSplit(Path file, long start, long length, String[] hosts,
String[] inMemoryHosts) {
this(file, start, length, hosts);
hostInfos = new SplitLocationInfo[hosts.length];
for (int i = 0; i < hosts.length; i++) {
// because N will be tiny, scanning is probably faster than a HashSet
boolean inMemory = false;
for (String inMemoryHost : inMemoryHosts) {
if (inMemoryHost.equals(hosts[i])) {
inMemory = true;
break;
}
}
hostInfos[i] = new SplitLocationInfo(hosts[i], inMemory);
}
}
/** The file containing this split's data. */
public Path getPath() { return file; }
/** The position of the first byte in the file to process. */
public long getStart() { return start; }
/** The number of bytes in the file to process. */
@Override
public long getLength() { return length; }
@Override
public String toString() { return file + ":" + start + "+" + length; }
////////////////////////////////////////////
// Writable methods 序列化方法
////////////////////////////////////////////
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, file.toString());
out.writeLong(start);
out.writeLong(length);
}
@Override
public void readFields(DataInput in) throws IOException {
file = new Path(Text.readString(in));
start = in.readLong();
length = in.readLong();
hosts = null;
}
@Override
public String[] getLocations() throws IOException {
if (this.hosts == null) {
return new String[]{};
} else {
return this.hosts;
}
}
@Override
@Evolving
public SplitLocationInfo[] getLocationInfo() throws IOException {
return hostInfos;
}
}
CombineFileSplit是針對小檔案的分片,它將一系列小檔案封裝在一個InputSplit內,這樣一個Mapper就可以處理多個小檔案,
可以有效的降低行程開銷,與FileSplit類似,CombineFileSplit同樣包含檔案路徑,分片起始位置,
分片大小和分片資料所在的host串列四個屬性,只不過這些屬性不再是一個值,而是一個串列,
需要注意的一點是,CombineFileSplit的getLength()方法,回傳的是這一系列資料的資料的總長度,
點擊查看CombineFileSplit
// 切片類,表示 一份被Mapper處理的資料
// 一個切片物件,可以包含多個檔案
public class CombineFileSplit extends InputSplit implements Writable {
private Path[] paths;
private long[] startoffset;
private long[] lengths;
private String[] locations;
private long totLength;
/**
* default constructor
*/
public CombineFileSplit() {}
public CombineFileSplit(Path[] files, long[] start,
long[] lengths, String[] locations) {
initSplit(files, start, lengths, locations);
}
public CombineFileSplit(Path[] files, long[] lengths) {
long[] startoffset = new long[files.length];
for (int i = 0; i < startoffset.length; i++) {
startoffset[i] = 0;
}
String[] locations = new String[files.length];
for (int i = 0; i < locations.length; i++) {
locations[i] = "";
}
initSplit(files, startoffset, lengths, locations);
}
private void initSplit(Path[] files, long[] start,
long[] lengths, String[] locations) {
this.startoffset = start;
this.lengths = lengths;
this.paths = files;
this.totLength = 0;
this.locations = locations;
for(long length : lengths) {
totLength += length;
}
}
/**
* Copy constructor
*/
public CombineFileSplit(CombineFileSplit old) throws IOException {
this(old.getPaths(), old.getStartOffsets(),
old.getLengths(), old.getLocations());
}
public long getLength() {
return totLength;
}
/** Returns an array containing the start offsets of the files in the split*/
public long[] getStartOffsets() {
return startoffset;
}
/** Returns an array containing the lengths of the files in the split*/
public long[] getLengths() {
return lengths;
}
/** Returns the start offset of the i<sup>th</sup> Path */
public long getOffset(int i) {
return startoffset[i];
}
/** Returns the length of the i<sup>th</sup> Path */
public long getLength(int i) {
return lengths[i];
}
/** Returns the number of Paths in the split */
public int getNumPaths() {
return paths.length;
}
/** Returns the i<sup>th</sup> Path */
public Path getPath(int i) {
return paths[i];
}
/** Returns all the Paths in the split */
public Path[] getPaths() {
return paths;
}
/** Returns all the Paths where this input-split resides */
public String[] getLocations() throws IOException {
return locations;
}
public void readFields(DataInput in) throws IOException {
totLength = in.readLong();
int arrLength = in.readInt();
lengths = new long[arrLength];
for(int i=0; i<arrLength;i++) {
lengths[i] = in.readLong();
}
int filesLength = in.readInt();
paths = new Path[filesLength];
for(int i=0; i<filesLength;i++) {
paths[i] = new Path(Text.readString(in));
}
arrLength = in.readInt();
startoffset = new long[arrLength];
for(int i=0; i<arrLength;i++) {
startoffset[i] = in.readLong();
}
}
public void write(DataOutput out) throws IOException {
out.writeLong(totLength);
out.writeInt(lengths.length);
for(long length : lengths) {
out.writeLong(length);
}
out.writeInt(paths.length);
for(Path p : paths) {
Text.writeString(out, p.toString());
}
out.writeInt(startoffset.length);
for(long length : startoffset) {
out.writeLong(length);
}
}
@Override
public String toString() {
StringBuffer sb = new StringBuffer();
for (int i = 0; i < paths.length; i++) {
if (i == 0 ) {
sb.append("Paths:");
}
sb.append(paths[i].toUri().getPath() + ":" + startoffset[i] +
"+" + lengths[i]);
if (i < paths.length -1) {
sb.append(",");
}
}
if (locations != null) {
String locs = "";
StringBuffer locsb = new StringBuffer();
for (int i = 0; i < locations.length; i++) {
locsb.append(locations[i] + ":");
}
locs = locsb.toString();
sb.append(" Locations:" + locs + "; ");
}
return sb.toString();
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/390329.html
標籤:其他
下一篇:Mysql客戶端的安裝
