一、研發背景
DataX官方開源的版本支持HDFS檔案的讀寫,并沒有支持基于JDBC的Hive資料讀寫,很多時候一些資料同步不太方便,比如在讀取Hive之前先執行一些sql、讀取一些Hive的視圖資料、或者在資料同步時執行一段固定的SQL,將SQL執行結果寫入下游等各種場景,實際上還是需要Hive插件來支持,而在實際作業中,我們也遇到了類似的一些情況需要二次開發DataX以支持此類場景,本插件已在生產環境穩定運行一年有余,現分享給大家,如有問題也可聯系我(qq:1821088755),
二、HiveReader插件介紹
hivereader插件比較簡單,共有三個類,兩個組態檔,其中:
- HiveReader:實作DataX框架核心方法,是具體邏輯,
- HiveReaderErrorCode:繼承了DataX框架的ErrorCode類,是用于統一例外處理DataXException類中呼叫,具體是新增了一個列舉值,
- HiveConnByKerberos:是在檢測到Hive具備Kerberos認證要求時,進行認證的工具類,
- plugin.json:DataX插件固定的組態檔,用于指定插件的入口類,
- plugin_job_template.json:二次開發插件,一般需要提供一下具體的使用方式,此json檔案即為HiveReader插件的配置方式說明,
2.1 HiveReader類
首先是HiveReader類,需要注意的是一些常量或列舉值,需要自行添加,其中DataBaseType列舉類中,需要新增Hive列舉項并添加Hive的驅動類全路徑,具體見注釋,另外就是Kerberos認證相關的幾個配置,一個是keytab的路徑,一個是krb5.conf的路徑,另外一個是principle的值,
package com.alibaba.datax.plugin.reader.hivereader;
import com.alibaba.datax.common.base.Key;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.spi.Reader;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.rdbms.reader.CommonRdbmsReader;
import com.alibaba.datax.rdbms.util.DataBaseType;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.security.authentication.util.KerberosName;
import java.lang.reflect.Field;
import java.util.List;
import static com.alibaba.datax.common.base.Constant.DEFAULT_FETCH_SIZE;//2048,可根據條件自己取值
import static com.alibaba.datax.common.base.Key.FETCH_SIZE; // 引數名:"fetchSize"
@Slf4j
public class HiveReader
extends Reader
{
//此處需現在com.sinosig.plumber.rdbms.util.DataBaseType列舉類中添加Hive型別,內容為:Hive("hive2", "org.apache.hive.jdbc.HiveDriver"),
private static final DataBaseType DATABASE_TYPE = DataBaseType.Hive;
public static class Job
extends Reader.Job
{
private Configuration originalConfig = null;
private CommonRdbmsReader.Job commonRdbmsReaderJob;
@Override
public void init()
{
this.originalConfig = getPluginJobConf();
Boolean haveKerberos = this.originalConfig.getBool(Key.HAVE_KERBEROS, false);
if (haveKerberos) {
log.info("檢測到kerberos認證,正在進行認證");
org.apache.hadoop.conf.Configuration hadoopConf = new org.apache.hadoop.conf.Configuration();
String kerberosKeytabFilePath = this.originalConfig.getString(Key.KERBEROS_KEYTAB_FILE_PATH);
String kerberosPrincipal = this.originalConfig.getString(Key.KERBEROS_PRINCIPAL);
String krb5Path = this.originalConfig.getString(Key.KRB5_CONF_FILE_PATH);
hadoopConf.set("hadoop.security.authentication", "kerberos");
hadoopConf.set("hive.security.authentication", "kerberos");
hadoopConf.set("hadoop.security.authorization", "true");
System.setProperty("java.security.krb5.conf",krb5Path);
refreshConfig();
HiveConnByKerberos.kerberosAuthentication(kerberosPrincipal, kerberosKeytabFilePath, hadoopConf,krb5Path);
}
this.commonRdbmsReaderJob = new CommonRdbmsReader.Job(DATABASE_TYPE);
this.originalConfig = commonRdbmsReaderJob.init(originalConfig);
}
@Override
public void preCheck()
{
this.commonRdbmsReaderJob.preCheck(originalConfig, DATABASE_TYPE);
}
@Override
public List<Configuration> split(int adviceNumber)
{
return this.commonRdbmsReaderJob.split(originalConfig, adviceNumber);
}
@Override
public void post()
{
this.commonRdbmsReaderJob.post(originalConfig);
}
@Override
public void destroy()
{
this.commonRdbmsReaderJob.destroy(originalConfig);
}
}
public static class Task
extends Reader.Task
{
private Configuration readerSliceConfig;
private CommonRdbmsReader.Task commonRdbmsReaderTask;
@Override
public void init()
{
this.readerSliceConfig = getPluginJobConf();
this.commonRdbmsReaderTask = new CommonRdbmsReader.Task(DATABASE_TYPE, getTaskGroupId(), getTaskId());
this.commonRdbmsReaderTask.init(this.readerSliceConfig);
}
@Override
public void startRead(RecordSender recordSender)
{
int fetchSize = this.readerSliceConfig.getInt(FETCH_SIZE, DEFAULT_FETCH_SIZE);
this.commonRdbmsReaderTask.startRead(readerSliceConfig, recordSender, getTaskPluginCollector(), fetchSize);
}
@Override
public void post()
{
this.commonRdbmsReaderTask.post(readerSliceConfig);
}
@Override
public void destroy()
{
this.commonRdbmsReaderTask.destroy(readerSliceConfig);
}
}
/** 重繪krb內容資訊 */
public static void refreshConfig() {
try {
sun.security.krb5.Config.refresh();
Field defaultRealmField = KerberosName.class.getDeclaredField("defaultRealm");
defaultRealmField.setAccessible(true);
defaultRealmField.set(
null,
org.apache.hadoop.security.authentication.util.KerberosUtil.getDefaultRealm());
// reload java.security.auth.login.config
javax.security.auth.login.Configuration.setConfiguration(null);
} catch (Exception e) {
log.warn(
"resetting default realm failed, current default realm will still be used.", e);
}
}
}
2.2 HiveConnByKerberos類
HiveConnByKerberos類比較簡單,是一個通用的Kerberos認證的介面,
package com.alibaba.datax.plugin.reader.hivereader; import com.alibaba.datax.common.exception.PlumberException; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.security.UserGroupInformation; @Slf4j public class HiveConnByKerberos { public static void kerberosAuthentication(String kerberosPrincipal, String kerberosKeytabFilePath, org.apache.hadoop.conf.Configuration hadoopConf,String krb5conf) { System.setProperty("java.security.krb5.conf",krb5conf); if (StringUtils.isNotBlank(kerberosPrincipal) && StringUtils.isNotBlank(kerberosKeytabFilePath)) { UserGroupInformation.setConfiguration(hadoopConf); try { UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeytabFilePath); } catch (Exception e) { log.error("kerberos認證失敗"); String message = String.format("kerberos認證失敗,請檢查 " + "kerberosKeytabFilePath[%s] 和 kerberosPrincipal[%s]", kerberosKeytabFilePath, kerberosPrincipal); e.printStackTrace(); throw DataXException.asDataXException(HiveReaderErrorCode.KERBEROS_LOGIN_ERROR, message, e); } } } }
2.3 HiveReaderErrorCode類
HiveReaderErrorCode類,主要就是集成ErrorCode類,并添加一個列舉項,這塊可直接在ErrorCode類添加,也可使用此類,為固定寫法,
package com.alibaba.datax.plugin.reader.hivereader; import com.alibaba.datax.common.spi.ErrorCode; public enum HiveReaderErrorCode implements ErrorCode { KERBEROS_LOGIN_ERROR("HiveReader-13", "KERBEROS認證失敗"); private final String code; private final String description; HiveReaderErrorCode(String code, String description) { this.code = code; this.description = description; } @Override public String getCode() { return this.code; } @Override public String getDescription() { return this.description; } @Override public String toString() { return String.format("Code:[%s], Description:[%s]. ", this.code, this.description); } }
2.4 plugin.json檔案
{ "name": "hivereader", "class": "com.alibaba.datax.plugin.reader.hivereader.HiveReader", "description": "Retrieve data from Hive via jdbc", "developer": "wxm" }
2.5 plugin_job_template.json檔案
這塊需要注意的一個問題是,如果Kerberos認證的Hive連接URL有兩種方式,如果是基于zookeeper的方式,則需保證運行DataX服務的節點與zookeeper節點網路是打通的,并且一定不要忘記寫上具體的Hive庫名,
{ "name": "hivereader", "parameter": { "column": [ "*" ], "username": "hive", "password": "",
"preSql":"show databases;", "connection": [ { "jdbcUrl": [ "jdbc:hive2://localhost:10000/default;principal=hive/[email protected]" ], "table": [ "hive_reader" ] } ], "where": "logdate='20211013'" , "haveKerberos": true, "kerberosKeytabFilePath": "/etc/security/keytabs/hive.headless.keytab", "kerberosPrincipal": "[email protected]" } }
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/543182.html
標籤:其他
