主頁 > 後端開發 > 10分鐘教你寫一個資料庫

10分鐘教你寫一個資料庫

2022-10-13 12:25:47 後端開發

今天教大家借助一款框架快速實作一個資料庫,這個框架就是Calcite,下面會帶大家通過兩個例子快速教會大家怎么實作,一個是可以通過 SQL 陳述句的方式可以直接查詢檔案內容,第二個是模擬 Mysql 查詢功能,以及最后告訴大家怎么實作 SQL 查詢 Kafka 資料,

Calcite

Calcite 是一個用于優化異構資料源的查詢處理的可插拔基礎框架(他是一個框架),可以將任意資料(Any data, Anywhere)DML 轉換成基于 SQL 的 DML 引擎,并且我們可以選擇性的使用它的部分功能,

Calcite能干什么

  1. 使用 SQL 訪問記憶體中某個資料

  2. 使用 SQL 訪問某個檔案的資料

  3. 跨資料源的資料訪問、聚合、排序等(例如 Mysql 和 Redis 資料源中的資料進行join)

當我們需要自建一個資料庫的時候,資料可以為任何格式的,比如text、word、xml、mysql、es、csv、第三方介面資料等等,我們只有資料,我們想讓這些資料支持 SQL 形式動態增刪改查,

另外,像Hive、Drill、Flink、Phoenix 和 Storm 等專案中,資料處理系統都是使用 Calcite 來做 SQL 決議和查詢優化,當然,還有部分用來構建自己的 JDBC driver,

名詞解釋

Token

就是將標準 SQL(可以理解為Mysql)關鍵詞以及關鍵詞之間的字串截取出來,每一個token,會被封裝為一個SqlNodeSqlNode會衍生很多子類,比如Select會被封裝為SqlSelect,當前 SqlNode 也能反決議為 SQL 文本,

RelDataTypeField

某個欄位的名稱和型別資訊

RelDataType

多個 RelDataTypeField 組成了 RelDataType,可以理解為資料行

Table

一個完整的表的資訊

Schema

所有元資料的組合,可以理解為一組 Table 或者庫的概念

開始使用

1. 引入包

<dependency>
    <groupId>org.apache.calcite</groupId>
    <artifactId>calcite-core</artifactId>
    <!-- 目前最新版本 2022-09-10日更新-->
    <version>1.32.0</version>
</dependency>

2. 創建model.json檔案和表結構csv

model.json 里面主要描述或者說告訴 Calcite 如何創建 Schema,也就是告訴框架怎么創建出庫,

{
"version": "1.0",//忽略
"defaultSchema": "CSV",//設定默認的schema
"schemas": [//可定義多個schema
        {
          "name": "CSV",//相當于namespace和上面的defaultSchema的值對應
          "type": "custom",//寫死
          "factory": "csv.CsvSchemaFactory",//factory的類名必須是你自己實作的factory的包的全路徑
          "operand": { //這里可以傳遞自定義引數,最侄訓以map的形式傳遞給factory的operand引數
          "directory": "csv"//directory代表calcite會在resources下面的csv目錄下面讀取所有的csv檔案,factory創建的Schema會吧這些檔案全部構建成Table,可以理解為讀取資料檔案的根目錄,當然key的名稱也不一定非得用directory,你可以隨意指定
                }
        }
      ]
}

接下來還需要定義一個 csv 檔案,用來定義表結構,

NAME:string,MONEY:string
aixiaoxian,10000萬
xiaobai,10000萬
adong,10000萬
maomao,10000萬
xixi,10000萬
zizi,10000萬
wuwu,10000萬
kuku,10000萬

整個專案的結構大概就是這樣:

3. 實作Schema的工廠類

在上述檔案中指定的包路徑下去撰寫 CsvSchemaFactory 類,實作 SchemaFactory 介面,并且實作里面唯一的方法 create 方法,創建Schema(庫),

public class CsvSchemaFactory implements SchemaFactory {
    /**
     * parentSchema 父節點,一般為root
     * name 為model.json中定義的名字
     * operand 為model.json中定于的資料,這里可以傳遞自定義引數
     *
     * @param parentSchema Parent schema
     * @param name         Name of this schema
     * @param operand      The "operand" JSON property
     * @return
     */
    @Override
    public Schema create(SchemaPlus parentSchema, String name,
                         Map<String, Object> operand) {
        final String directory = (String) operand.get("directory");
        File directoryFile = new File(directory);
        return new CsvSchema(directoryFile, "scannable");
    }
}

4. 自定義Schma類

有了 SchemaFactory,接下來需要自定義 Schema 類,

自定義的 Schema 需要實作 Schema 介面,但是直接實作要實作的方法太多,我們去實作官方的 AbstractSchema 類,這樣就只需要實作一個方法就行(如果有其他定制化需求可以實作原生介面),

核心的邏輯就是createTableMap方法,用于創建出 Table 表,

他會掃描指定的Resource下面的所有 csv 檔案,將每個檔案映射成Table物件,最終以map形式回傳,Schema介面的其他幾個方法會用到這個物件,

		//實作這一個方法就行了
    @Override
    protected Map<String, Table> getTableMap() {
        if (tableMap == null) {
            tableMap = createTableMap();
        }
        return tableMap;
    }
		private Map<String, Table> createTableMap() {
        // Look for files in the directory ending in ".csv"
        final Source baseSource = Sources.of(directoryFile);
        //會自動過濾掉非指定檔案后綴的檔案,我這里寫的csv
        File[] files = directoryFile.listFiles((dir, name) -> {
            final String nameSansGz = trim(name, ".gz");
            return nameSansGz.endsWith(".csv");
        });
        if (files == null) {
            System.out.println("directory " + directoryFile + " not found");
            files = new File[0];
        }
        // Build a map from table name to table; each file becomes a table.
        final ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder();
        for (File file : files) {
            Source source = Sources.of(file);
            final Source sourceSansCsv = source.trimOrNull(".csv");
            if (sourceSansCsv != null) {
                final Table table = createTable(source);
                builder.put(sourceSansCsv.relative(baseSource).path(), table);
            }
        }
        return builder.build();
    }

5. 自定義 Table

Schema 有了,并且資料檔案 csv 也映射成 Table 了,一個 csv 檔案對應一個 Table

接下來我們去自定義 Table,自定義 Table 的核心是我們要定義欄位的型別和名稱,以及如何讀取 csv檔案,

  1. 先獲取資料型別和名稱,即單表結構,從csv檔案頭中獲取(當前檔案頭需要我們自己定義,包括規則我們也可以定制化),
/**
 * Base class for table that reads CSV files.
 */
public abstract class CsvTable extends AbstractTable {
    protected final Source source;
    protected final @Nullable RelProtoDataType protoRowType;
    private @Nullable RelDataType rowType;
    private @Nullable List<RelDataType> fieldTypes;
?
    /**
     * Creates a CsvTable.
     */
    CsvTable(Source source, @Nullable RelProtoDataType protoRowType) {
        this.source = source;
        this.protoRowType = protoRowType;
    }
		/**
		* 創建一個CsvTable,繼承AbstractTable,需要實作里面的getRowType方法,此方法就是獲取當前的表結構,
			Table的型別有很多種,比如還有視圖型別,AbstractTable類中幫我們默認實作了Table介面的一些方法,比如getJdbcTableType			方法,默認為Table型別,如果有其他定制化需求可直接實作Table介面,
			和AbstractSchema很像
		*/
    @Override
    public RelDataType getRowType(RelDataTypeFactory typeFactory) {
        if (protoRowType != null) {
            return protoRowType.apply(typeFactory);
        }
        if (rowType == null) {
            rowType = CsvEnumerator.deduceRowType((JavaTypeFactory) typeFactory, source,
                    null);
        }
        return rowType;
    }
?
    /**
     * Returns the field types of this CSV table.
     */
    public List<RelDataType> getFieldTypes(RelDataTypeFactory typeFactory) {
        if (fieldTypes == null) {
            fieldTypes = new ArrayList<>();
            CsvEnumerator.deduceRowType((JavaTypeFactory) typeFactory, source,
                    fieldTypes);
        }
        return fieldTypes;
    }
  
   public static RelDataType deduceRowType(JavaTypeFactory typeFactory,
                                            Source source, @Nullable List<RelDataType> fieldTypes) {
        final List<RelDataType> types = new ArrayList<>();
        final List<String> names = new ArrayList<>();
        try (CSVReader reader = openCsv(source)) {
            String[] strings = reader.readNext();
            if (strings == null) {
                strings = new String[]{"EmptyFileHasNoColumns:boolean"};
            }
            for (String string : strings) {
                final String name;
                final RelDataType fieldType;
                //就是簡單的讀取字串冒號前面是名稱,冒號后面是型別
                final int colon = string.indexOf(':');
                if (colon >= 0) {
                    name = string.substring(0, colon);
                    String typeString = string.substring(colon + 1);
                    Matcher decimalMatcher = DECIMAL_TYPE_PATTERN.matcher(typeString);
                    if (decimalMatcher.matches()) {
                        int precision = Integer.parseInt(decimalMatcher.group(1));
                        int scale = Integer.parseInt(decimalMatcher.group(2));
                        fieldType = parseDecimalSqlType(typeFactory, precision, scale);
                    } else {
                        switch (typeString) {
                            case "string":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.VARCHAR);
                                break;
                            case "boolean":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.BOOLEAN);
                                break;
                            case "byte":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.TINYINT);
                                break;
                            case "char":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.CHAR);
                                break;
                            case "short":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.SMALLINT);
                                break;
                            case "int":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.INTEGER);
                                break;
                            case "long":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.BIGINT);
                                break;
                            case "float":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.REAL);
                                break;
                            case "double":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.DOUBLE);
                                break;
                            case "date":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.DATE);
                                break;
                            case "timestamp":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.TIMESTAMP);
                                break;
                            case "time":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.TIME);
                                break;
                            default:
                                LOGGER.warn(
                                        "Found unknown type: {} in file: {} for column: {}. Will assume the type of "
                                                + "column is string.",
                                        typeString, source.path(), name);
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.VARCHAR);
                                break;
                        }
                    }
                } else {
                    //  如果沒定義,默認都是String型別,欄位名稱也是string
                    name = string;
                    fieldType = typeFactory.createSqlType(SqlTypeName.VARCHAR);
                }
                names.add(name);
                types.add(fieldType);
                if (fieldTypes != null) {
                    fieldTypes.add(fieldType);
                }
            }
        } catch (IOException e) {
            // ignore
        }
        if (names.isEmpty()) {
            names.add("line");
            types.add(typeFactory.createSqlType(SqlTypeName.VARCHAR));
        }
        return typeFactory.createStructType(Pair.zip(names, types));
    }
}
  1. 獲取檔案中的資料,上面把Table的表結構欄位名稱和型別都獲取到了以后,就剩最后一步了,獲取檔案中的資料,我們需要自定義一個類,實作 ScannableTable 介面,并且實作里面唯一的方法 scan 方法,其實本質上就是讀檔案,然后把檔案的每一行的資料和上述獲取的 fileType 進行匹配,
@Override
    public Enumerable<Object[]> scan(DataContext root) {
        JavaTypeFactory typeFactory = root.getTypeFactory();
        final List<RelDataType> fieldTypes = getFieldTypes(typeFactory);
        final List<Integer> fields = ImmutableIntList.identity(fieldTypes.size());
        final AtomicBoolean cancelFlag = DataContext.Variable.CANCEL_FLAG.get(root);
        return new AbstractEnumerable<@Nullable Object[]>() {
            @Override
            public Enumerator<@Nullable Object[]> enumerator() {
                //回傳我們自定義的讀取資料的類
                return new CsvEnumerator<>(source, cancelFlag, false, null,
                        CsvEnumerator.arrayConverter(fieldTypes, fields, false));
            }
        };
    }
 
 
 public CsvEnumerator(Source source, AtomicBoolean cancelFlag, boolean stream,
                         @Nullable String @Nullable [] filterValues, RowConverter<E> rowConverter) {
        this.cancelFlag = cancelFlag;
        this.rowConverter = rowConverter;
        this.filterValues = filterValues == null ? null
                : ImmutableNullableList.copyOf(filterValues);
        try {
 
            this.reader = openCsv(source);
            //跳過第一行,因為第一行是定義型別和名稱的
            this.reader.readNext(); // skip header row
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
//CsvEnumerator必須實作calcit自己的迭代器,里面有current、moveNext方法,current是回傳當前游標所在的資料記錄,moveNext是將游標指向下一個記錄,官網中自己定義了一個型別轉換器,是將csv檔案中的資料轉換成檔案頭指定的型別,這個需要我們自己來實作
     @Override
    public E current() {
        return castNonNull(current);
    }
 
    @Override
    public boolean moveNext() {
        try {
            outer:
            for (; ; ) {
                if (cancelFlag.get()) {
                    return false;
                }
                final String[] strings = reader.readNext();
                if (strings == null) {
                    current = null;
                    reader.close();
                    return false;
                }
                if (filterValues != null) {
                    for (int i = 0; i < strings.length; i++) {
                        String filterValue = https://www.cnblogs.com/ilovejaney/archive/2022/10/13/filterValues.get(i);
                        if (filterValue != null) {
                            if (!filterValue.equals(strings[i])) {
                                continue outer;
                            }
                        }
                    }
                }
                current = rowConverter.convertRow(strings);
                return true;
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
 
        protected @Nullable Object convert(@Nullable RelDataType fieldType, @Nullable String string) {
            if (fieldType == null || string == null) {
                return string;
            }
            switch (fieldType.getSqlTypeName()) {
                case BOOLEAN:
                    if (string.length() == 0) {
                        return null;
                    }
                    return Boolean.parseBoolean(string);
                case TINYINT:
                    if (string.length() == 0) {
                        return null;
                    }
                    return Byte.parseByte(string);
                case SMALLINT:
                    if (string.length() == 0) {
                        return null;
                    }
                    return Short.parseShort(string);
                case INTEGER:
                    if (string.length() == 0) {
                        return null;
                    }
                    return Integer.parseInt(string);
                case BIGINT:
                    if (string.length() == 0) {
                        return null;
                    }
                    return Long.parseLong(string);
                case FLOAT:
                    if (string.length() == 0) {
                        return null;
                    }
                    return Float.parseFloat(string);
                case DOUBLE:
                    if (string.length() == 0) {
                        return null;
                    }
                    return Double.parseDouble(string);
                case DECIMAL:
                    if (string.length() == 0) {
                        return null;
                    }
                    return parseDecimal(fieldType.getPrecision(), fieldType.getScale(), string);
                case DATE:
                    if (string.length() == 0) {
                        return null;
                    }
                    try {
                        Date date = TIME_FORMAT_DATE.parse(string);
                        return (int) (date.getTime() / DateTimeUtils.MILLIS_PER_DAY);
                    } catch (ParseException e) {
                        return null;
                    }
                case TIME:
                    if (string.length() == 0) {
                        return null;
                    }
                    try {
                        Date date = TIME_FORMAT_TIME.parse(string);
                        return (int) date.getTime();
                    } catch (ParseException e) {
                        return null;
                    }
                case TIMESTAMP:
                    if (string.length() == 0) {
                        return null;
                    }
                    try {
                        Date date = TIME_FORMAT_TIMESTAMP.parse(string);
                        return date.getTime();
                    } catch (ParseException e) {
                        return null;
                    }
                case VARCHAR:
                default:
                    return string;
            }
        }

6. 最后

至此我們需要準備的東西:庫、表名稱、欄位名稱、欄位型別都有了,接下來我們去寫我們的 SQL 陳述句查詢我們的資料檔案,

創建好幾個測驗的資料檔案,例如上面專案結構中我創建 2 個 csv 檔案USERINFO.csvASSET.csv,然后創建測驗類,

這樣跑起來,就可以通過 SQL 陳述句的方式直接查詢資料了,

public class Test {
    public static void main(String[] args) throws SQLException {
        Connection connection = null;
        Statement statement = null;
        try {
            Properties info = new Properties();
            info.put("model", Sources.of(Test.class.getResource("/model.json")).file().getAbsolutePath());
            connection = DriverManager.getConnection("jdbc:calcite:", info);
            statement = connection.createStatement();
            print(statement.executeQuery("select * from asset "));
 
            print(statement.executeQuery(" select * from userinfo "));
 
            print(statement.executeQuery(" select age from userinfo where name ='aixiaoxian' "));
 
            print(statement.executeQuery(" select * from userinfo where age >60 "));
 
            print(statement.executeQuery(" select * from userinfo where name like 'a%' "));
        } finally {
            connection.close();
        }
    }
 
    private static void print(ResultSet resultSet) throws SQLException {
        final ResultSetMetaData metaData = https://www.cnblogs.com/ilovejaney/archive/2022/10/13/resultSet.getMetaData();
        final int columnCount = metaData.getColumnCount();
        while (resultSet.next()) {
            for (int i = 1; ; i++) {
                System.out.print(resultSet.getString(i));
                if (i < columnCount) {
                    System.out.print(", ");
                } else {
                    System.out.println();
                    break;
                }
            }
        }
    }
}

查詢結果:

這里在測驗的時候踩到2個坑,大家如果自己實驗的時候可以避免下,

  1. Calcite 默認會把你的 SQL 陳述句中的表名和類名全部轉換為大寫,因為默認的 csv(其他檔案也一樣)檔案的名稱就是表名,除非你自定義規則,所以你的檔案名要寫成大寫,

  2. Calcite 有一些默認的關鍵字不能用作表名,不然會查詢失敗,比如我剛開始定的user.csv就一直查不出來,改成USERINFO就可以了,這點和Mysql 的內置關鍵字差不多,也可以通過個性化配置去改,

演示Mysql

  1. 首先,還是先準備 Calcite 需要的東西:庫、表名稱、欄位名稱、欄位型別,

如果資料源使用Mysql的話,這些都不用我們去 JAVA 服務中去定義,直接在 Mysql 客戶端創建好,這里直接創建兩張表用于測驗,就和我們的csv檔案一樣,

CREATE TABLE `USERINFO1` (
  `NAME` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8_general_ci DEFAULT NULL,
  `AGE` int DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;

CREATE TABLE `ASSET` (
  `NAME` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8_general_ci DEFAULT NULL,
  `MONEY` varchar(255) CHARACTER SET utf8mb3 COLLATE utf8_general_ci DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;
  1. 上述 csv 案例中的 SchemaFactory 以及 Schema 這些都不需要創建,因為 Calcite 默認提供了 Mysql 的 Adapter配接器,

  2. 其實,上述兩步都不需要做,我們真正要做的是,告訴 Calcite 你的 JDBC 的連接資訊就行了,也是在 model.json 檔案中定義,

{
  "version": "1.0",
  "defaultSchema": "Demo",
  "schemas": [
    {
      "name": "Demo",
      "type": "custom",
    //  這里是calcite默認的SchemaFactory,里面的流程和我們上述自己定義的相同,下面會簡單看看原始碼,
      "factory": "org.apache.calcite.adapter.jdbc.JdbcSchema$Factory",
      "operand": {
        //  我用的是mysql8以上版本,所以這里注意包的名稱
        "jdbcDriver": "com.mysql.cj.jdbc.Driver",
        "jdbcUrl": "jdbc:mysql://localhost:3306/irving",
        "jdbcUser": "root",
        "jdbcPassword": "123456"
      }
    }
  ]
}
  1. 在專案中引入 Mysql 的驅動包
<dependency>
  <groupId>mysql</groupId>
  <artifactId>mysql-connector-java</artifactId>
  <version>8.0.30</version>
</dependency>
  1. 寫好測驗類,這樣直接就相當于完成了所有的功能了,
public class TestMysql {
    public static void main(String[] args) throws SQLException {
        Connection connection = null;
        Statement statement = null;
        try {
            Properties info = new Properties();
            info.put("model", Sources.of(TestMysql.class.getResource("/mysqlmodel.json")).file().getAbsolutePath());
            connection = DriverManager.getConnection("jdbc:calcite:", info);
            statement = connection.createStatement();
            statement.executeUpdate(" insert into  userinfo1 values ('xxx',12) ");
            print(statement.executeQuery("select * from asset "));
 
            print(statement.executeQuery(" select * from userinfo1 "));
 
            print(statement.executeQuery(" select age from userinfo1 where name ='aixiaoxian' "));
 
            print(statement.executeQuery(" select * from userinfo1 where age >60 "));
 
            print(statement.executeQuery(" select * from userinfo1 where name like 'a%' "));
        } finally {
            connection.close();
        }
 
    }
 
    private static void print(ResultSet resultSet) throws SQLException {
        final ResultSetMetaData metaData = https://www.cnblogs.com/ilovejaney/archive/2022/10/13/resultSet.getMetaData();
        final int columnCount = metaData.getColumnCount();
        while (resultSet.next()) {
            for (int i = 1; ; i++) {
                System.out.print(resultSet.getString(i));
                if (i < columnCount) {
                    System.out.print(", ");
                } else {
                    System.out.println();
                    break;
                }
            }
        }
    }
}

查詢結果:

Mysql實作原理

上述我們在 model.json 檔案中指定了org.apache.calcite.adapter.jdbc.JdbcSchema$Factory類,可以看下這個類的代碼,

這個類是把 FactorySchema 寫在了一起,其實就是呼叫schemafactory類的create方法創建一個 schema 出來,和我們上面自定義的流程是一樣的,

其中JdbcSchema類也是 Schema 的子類,所以也會實作getTable方法(這個我們上述也實作了,我們當時是獲取表結構和表的欄位型別以及名稱,是從csv檔案頭中讀檔案的),JdbcSchema的實作是通過連接 Mysql 服務端查詢元資料資訊,再將這些資訊封裝成 Calcite需要的物件格式,

這里同樣要注意 csv方式的2個注意點,大小寫和關鍵字問題,

public static JdbcSchema create(
      SchemaPlus parentSchema,
      String name,
      Map<String, Object> operand) {
    DataSource dataSource;
    try {
      final String dataSourceName = (String) operand.get("dataSource");
      if (dataSourceName != null) {
        dataSource =
            AvaticaUtils.instantiatePlugin(DataSource.class, dataSourceName);
      } else {
        //會走在這里來,這里就是我們在model.json中指定的jdbc的連接資訊,最侄訓創建一個datasource
        final String jdbcUrl = (String) requireNonNull(operand.get("jdbcUrl"), "jdbcUrl");
        final String jdbcDriver = (String) operand.get("jdbcDriver");
        final String jdbcUser = (String) operand.get("jdbcUser");
        final String jdbcPassword = (String) operand.get("jdbcPassword");
        dataSource = dataSource(jdbcUrl, jdbcDriver, jdbcUser, jdbcPassword);
      }
    } catch (Exception e) {
      throw new RuntimeException("Error while reading dataSource", e);
    }
    String jdbcCatalog = (String) operand.get("jdbcCatalog");
    String jdbcSchema = (String) operand.get("jdbcSchema");
    String sqlDialectFactory = (String) operand.get("sqlDialectFactory");
 
    if (sqlDialectFactory == null || sqlDialectFactory.isEmpty()) {
      return JdbcSchema.create(
          parentSchema, name, dataSource, jdbcCatalog, jdbcSchema);
    } else {
      SqlDialectFactory factory = AvaticaUtils.instantiatePlugin(
          SqlDialectFactory.class, sqlDialectFactory);
      return JdbcSchema.create(
          parentSchema, name, dataSource, factory, jdbcCatalog, jdbcSchema);
    }
  }
 
  @Override public @Nullable Table getTable(String name) {
    return getTableMap(false).get(name);
  }
 
  private synchronized ImmutableMap<String, JdbcTable> getTableMap(
      boolean force) {
    if (force || tableMap == null) {
      tableMap = computeTables();
    }
    return tableMap;
  }
 
  private ImmutableMap<String, JdbcTable> computeTables() {
    Connection connection = null;
    ResultSet resultSet = null;
    try {
      connection = dataSource.getConnection();
      final Pair<@Nullable String, @Nullable String> catalogSchema = getCatalogSchema(connection);
      final String catalog = catalogSchema.left;
      final String schema = catalogSchema.right;
      final Iterable<MetaImpl.MetaTable> tableDefs;
      Foo threadMetadata = https://www.cnblogs.com/ilovejaney/archive/2022/10/13/THREAD_METADATA.get();
      if (threadMetadata != null) {
        tableDefs = threadMetadata.apply(catalog, schema);
      } else {
        final List<metaImpl.MetaTable> tableDefList = new ArrayList<>();
        //  獲取元資料
        final DatabaseMetaData metaData = connection.getMetaData();
        resultSet = metaData.getTables(catalog, schema, null, null);
        while (resultSet.next()) {
        //獲取庫名,表明等資訊
          final String catalogName = resultSet.getString(1);
          final String schemaName = resultSet.getString(2);
          final String tableName = resultSet.getString(3);
          final String tableTypeName = resultSet.getString(4);
          tableDefList.add(
              new MetaImpl.MetaTable(catalogName, schemaName, tableName,
                  tableTypeName));
        }
        tableDefs = tableDefList;
      }
 
      final ImmutableMap.Builder builder =
          ImmutableMap.builder();
      for (MetaImpl.MetaTable tableDef : tableDefs) {
        final String tableTypeName2 =
            tableDef.tableType == null
            ? null
            : tableDef.tableType.toUpperCase(Locale.ROOT).replace(' ', '_');
        final TableType tableType =
            Util.enumVal(TableType.OTHER, tableTypeName2);
        if (tableType == TableType.OTHER  && tableTypeName2 != null) {
          System.out.println("Unknown table type: " + tableTypeName2);
        }
        //  最終封裝成JdbcTable物件
        final JdbcTable table =
            new JdbcTable(this, tableDef.tableCat, tableDef.tableSchem,
                tableDef.tableName, tableType);
        builder.put(tableDef.tableName, table);
      }
      return builder.build();
    } catch (SQLException e) {
      throw new RuntimeException(
          "Exception while reading tables", e);
    } finally {
      close(connection, null, resultSet);
    }
  }

SQL執行流程

OK,到這里基本上兩個簡單的案例已經演示好了,最后補充一下整個Calcite架構和整個 SQL 的執行流程,

整個流程如下:SQL決議(Parser)=> SQL校驗(Validator)=> SQL查詢優化(optimizer)=> SQL生成 => SQL執行

SQL Parser

所有的 SQL 陳述句在執行前都需要經歷 SQL 決議器決議,決議器的作業內容就是將 SQL 中的 Token 決議成抽象語法樹,每個樹的節點都是一個 SqlNode,這個程序其實就是 Sql Text => SqlNode 的程序,

我們前面的 Demo 沒有自定義 Parser,是因為 Calcite 采用了自己默認的 Parser(SqlParserImpl),

SqlNode

SqlNode是整個決議中的核心,比如圖中你可以發現,對于每個比如selectfromwhere關鍵字之后的內容其實都是一個SqlNode

parserConfig方法主要是設定 SqlParserFactory 的引數,比如我們上面所說的我本地測驗的時候踩的大小寫的坑,就可以在這里設定,

直接呼叫setCaseSensitive=false即不會將 SQL 陳述句中的表名列名轉為大寫,下面是默認的,其他的引數可以按需配置,

SQL Validator

SQL 陳述句先經過 Parser,然后經過語法驗證器,注意 Parser 并不會驗證語法的正確性,

其實 Parser 只會驗證 SQL 關鍵詞的位置是否正確,我們上述2個 Parser 的例子中都沒有創建 schematable 這些,但是如果這樣寫,那就會報錯,這個錯誤就是 parser 檢測后拋出來的(ParseLocationErrorTest),

真正的校驗在 validator 中,會去驗證查詢的表名是否存在,查詢的欄位是否存在,型別是否匹配,這個程序比較復雜,默認的 validatorSqlValidatorImpl

查詢優化

比如關系代數,比如什么投影、笛卡爾積這些,Calcite提供了很多內部的優化器,也可以實作自己的優化器,

配接器

Calcite 是不包含存盤層的,所以提供一種配接器的機制來訪問外部的資料存盤或者存盤引擎,

最后,進階

官網里面寫了未來會支持Kafka配接器到公共Api中,到時候使用起來就和上述集成Mysql一樣方便,但是現在還沒有支持,我這里給大家提供個自己實作的方式,這樣就可以通過 SQL 的方式直接查詢 Kafka 中的 Topic 資料等資訊,

這里我們內部集成實作了KSQL的能力,查詢結果是OK的,

還是像上述步驟一樣,我們需要準備庫、表名稱、欄位名稱、欄位型別、資料源(多出來的地方),

  1. 自定義Sql決議,之前我們都沒有自定義決議,這里需要自定義決議,是因為我需要動態決議sqlwhere條件里面的partation
  • 配置決議器,就是之前案例中提到的配置大小寫之類的
  • 創建決議器,使用的默認SqlParseImpl
  • 開始決議,生成AST,我們可以基于生成的SqlNode做一些業務相關的校驗和引數決議
  1. 配接器獲取資料源
   public class KafkaConsumerAdapter {
       public static List<KafkaResult> executor(KafkaSqlInfo kafkaSql) {
           Properties props = new Properties();
           props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, kafkaSql.getSeeds());
           props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
           props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getCanonicalName());
           props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
           KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
           List<TopicPartition> topics = new ArrayList<>();
           for (Integer partition : kafkaSql.getPartition()) {
               TopicPartition tp = new TopicPartition(kafkaSql.getTableName(), partition);
               topics.add(tp);
           }
           consumer.assign(topics);
           for (TopicPartition tp : topics) {
               Map<TopicPartition, Long> offsets = consumer.endOffsets(Collections.singleton(tp));
               long position = 500;
               if (offsets.get(tp).longValue() > position) {
                   consumer.seek(tp, offsets.get(tp).longValue() - 500);
               } else {
                   consumer.seek(tp, 0);
               }
           }
           List<KafkaResult> results = new ArrayList<>();
           boolean flag = true;
           while (flag) {
               ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
               for (ConsumerRecord<String, String> record : records) {
                   //轉成我定義的物件集合
                   KafkaResult result = new KafkaResult();
                   result.setPartition(record.partition());
                   result.setOffset(record.offset());
                   result.setMsg(record.value());
                   result.setKey(record.key());
                   results.add(result);
               }
               if (!records.isEmpty()) {
                   flag = false;
               }
           }
           consumer.close();
           return results;
       }
   
   }
  1. 執行查詢,就可以得到我們想要的效果了,
   public class TestKafka {
       public static void main(String[] args) throws Exception {
           KafkaService kafkaService = new KafkaService();
           //把決議到的引數放在我自己定義的kafkaSqlInfo物件中
           KafkaSqlInfo sqlInfo = kafkaService.parseSql("select * from `cmdb-calltopo` where `partition` in (0,1,2) limit 1000 ");
           //配接器獲取資料源,主要是從上述的sqlInfo物件中去poll資料
           List<KafkaResult> results = KafkaConsumerAdapter.executor(sqlInfo);
           //執行查詢
           query(sqlInfo.getTableName(), results, sqlInfo.getSql());
   
           sqlInfo = kafkaService.parseSql("select * from `cmdb-calltopo` where `partition` in (0,1,2) AND msg like '%account%'  limit 1000 ");
           results = KafkaConsumerAdapter.executor(sqlInfo);
           query(sqlInfo.getTableName(), results, sqlInfo.getSql());
   
   
           sqlInfo = kafkaService.parseSql("select count(*) AS addad  from `cmdb-calltopo` where `partition` in (0,1,2) limit 1000 ");
           results = KafkaConsumerAdapter.executor(sqlInfo);
           query(sqlInfo.getTableName(), results, sqlInfo.getSql());
       }
   
       private static void query(String tableName, List<KafkaResult> results,
                                 String sql) throws Exception {
           //創建model.json,設定我的SchemaFactory,設定庫名
           String model = createTempJson();
           //設定我的表結構,表名稱和表欄位名以及型別
           KafkaTableSchema.generateSchema(tableName, results);
           Properties info = new Properties();
           info.setProperty("lex", Lex.JAVA.toString());
           Connection connection = DriverManager.getConnection(Driver.CONNECT_STRING_PREFIX + "model=inline:" + model, info);
           Statement st = connection.createStatement();
           //執行
           ResultSet result = st.executeQuery(sql);
           ResultSetMetaData rsmd = result.getMetaData();
           List<Map<String, Object>> ret = new ArrayList<>();
           while (result.next()) {
               Map<String, Object> map = new LinkedHashMap<>();
               for (int i = 1; i <= rsmd.getColumnCount(); i++) {
                   map.put(rsmd.getColumnName(i), result.getString(rsmd.getColumnName(i)));
               }
               ret.add(map);
           }
           result.close();
           st.close();
           connection.close();
       }
   
       private static void print(ResultSet resultSet) throws SQLException {
           final ResultSetMetaData metaData = https://www.cnblogs.com/ilovejaney/archive/2022/10/13/resultSet.getMetaData();
           final int columnCount = metaData.getColumnCount();
           while (resultSet.next()) {
               for (int i = 1; ; i++) {
                   System.out.print(resultSet.getString(i));
                   if (i < columnCount) {
                       System.out.print(", ");
                   } else {
                       System.out.println();
                       break;
                   }
               }
           }
       }
   
       private static String createTempJson() throws IOException {
           JSONObject object = new JSONObject();
           object.put("version", "1.0");
           object.put("defaultSchema", "QAKAFKA");
           JSONArray array = new JSONArray();
           JSONObject tmp = new JSONObject();
           tmp.put("name", "QAKAFKA");
           tmp.put("type", "custom");
           tmp.put("factory", "kafka.KafkaSchemaFactory");
           array.add(tmp);
           object.put("schemas", array);
           return object.toJSONString();
       }
   }
  • 生成臨時的model.json,之前是基于檔案,現在基于text字串,mode=inline模式
  • 設定我的表結構、表名稱、欄位名、欄位型別等,并放置在記憶體中,同時將配接器查詢出來的資料也放進去table里面
  • 獲取連接,執行查詢,完美!

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/514145.html

標籤:其他

上一篇:我說HashMap初始容量是16,面試官讓我回去等通知

下一篇:NoSuchMethodError

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 【C++】Microsoft C++、C 和匯編程式檔案

    ......

    uj5u.com 2020-09-10 00:57:23 more
  • 例外宣告

    相比于斷言適用于排除邏輯上不可能存在的狀態,例外通常是用于邏輯上可能發生的錯誤。 例外宣告 Item 1:當函式不可能拋出例外或不能接受拋出例外時,使用noexcept 理由 如果不打算拋出例外的話,程式就會認為無法處理這種錯誤,并且應當盡早終止,如此可以有效地阻止例外的傳播與擴散。 示例 //不可 ......

    uj5u.com 2020-09-10 00:57:27 more
  • Codeforces 1400E Clear the Multiset(貪心 + 分治)

    鏈接:https://codeforces.com/problemset/problem/1400/E 來源:Codeforces 思路:給你一個陣列,現在你可以進行兩種操作,操作1:將一段沒有 0 的區間進行減一的操作,操作2:將 i 位置上的元素歸零。最終問:將這個陣列的全部元素歸零后操作的最少 ......

    uj5u.com 2020-09-10 00:57:30 more
  • UVA11610 【Reverse Prime】

    本人看到此題沒有翻譯,就附帶了一個自己的翻譯版本 思考 這一題,它的第一個要求是找出所有 $7$ 位反向質數及其質因數的個數。 我們應該需要質數篩篩選1~$10^{7}$的所有數,這里就不慢慢介紹了。但是,重讀題,我們突然發現反向質數都是 $7$ 位,而將它反過來后的數字卻是 $6$ 位數,這就說明 ......

    uj5u.com 2020-09-10 00:57:36 more
  • 統計區間素數數量

    1 #pragma GCC optimize(2) 2 #include <bits/stdc++.h> 3 using namespace std; 4 bool isprime[1000000010]; 5 vector<int> prime; 6 inline int getlist(int ......

    uj5u.com 2020-09-10 00:57:47 more
  • C/C++編程筆記:C++中的 const 變數詳解,教你正確認識const用法

    1、C中的const 1、區域const變數存放在堆疊區中,會分配記憶體(也就是說可以通過地址間接修改變數的值)。測驗代碼如下: 運行結果: 2、全域const變數存放在只讀資料段(不能通過地址修改,會發生寫入錯誤), 默認為外部聯編,可以給其他源檔案使用(需要用extern關鍵字修飾) 運行結果: ......

    uj5u.com 2020-09-10 00:58:04 more
  • 【C++犯錯記錄】VS2019 MFC添加資源不懂如何修改資源宏ID

    1. 首先在資源視圖中,添加資源 2. 點擊新添加的資源,復制自動生成的ID 3. 在解決方案資源管理器中找到Resource.h檔案,編輯,使用整個專案搜索和替換的方式快速替換 宏宣告 4. Ctrl+Shift+F 全域搜索,點擊查找全部,然后逐個替換 5. 為什么使用搜索替換而不使用屬性視窗直 ......

    uj5u.com 2020-09-10 00:59:11 more
  • 【C++犯錯記錄】VS2019 MFC不懂的批量添加資源

    1. 打開資源頭檔案Resource.h,在其中預先定義好宏 ID(不清楚其實ID值應該設定多少,可以先新建一個相同的資源項,再在這個資源的ID值的基礎上遞增即可) 2. 在資源視圖中選中專案資源,按F7編輯資源檔案,按 ID 型別 相對路徑的形式添加 資源。(別忘了先把檔案拷貝到專案中的res檔案 ......

    uj5u.com 2020-09-10 01:00:19 more
  • C/C++編程筆記:關于C++的參考型別,專供新手入門使用

    今天要講的是C++中我最喜歡的一個用法——參考,也叫別名。 參考就是給一個變數名取一個變數名,方便我們間接地使用這個變數。我們可以給一個變數創建N個參考,這N + 1個變數共享了同一塊記憶體區域。(參考型別的變數會占用記憶體空間,占用的記憶體空間的大小和指標型別的大小是相同的。雖然參考是一個物件的別名,但 ......

    uj5u.com 2020-09-10 01:00:22 more
  • 【C/C++編程筆記】從頭開始學習C ++:初學者完整指南

    眾所周知,C ++的學習曲線陡峭,但是花時間學習這種語言將為您的職業帶來奇跡,并使您與其他開發人員區分開。您會更輕松地學習新語言,形成真正的解決問題的技能,并在編程的基礎上打下堅實的基礎。 C ++將幫助您養成良好的編程習慣(即清晰一致的編碼風格,在撰寫代碼時注釋代碼,并限制類內部的可見性),并且由 ......

    uj5u.com 2020-09-10 01:00:41 more
最新发布
  • Rust中的智能指標:Box<T> Rc<T> Arc<T> Cell<T> RefCell<T> Weak

    Rust中的智能指標是什么 智能指標(smart pointers)是一類資料結構,是擁有資料所有權和額外功能的指標。是指標的進一步發展 指標(pointer)是一個包含記憶體地址的變數的通用概念。這個地址參考,或 ” 指向”(points at)一些其 他資料 。參考以 & 符號為標志并借用了他們所 ......

    uj5u.com 2023-04-20 07:24:10 more
  • Java的值傳遞和參考傳遞

    值傳遞不會改變本身,參考傳遞(如果傳遞的值需要實體化到堆里)如果發生修改了會改變本身。 1.基本資料型別都是值傳遞 package com.example.basic; public class Test { public static void main(String[] args) { int ......

    uj5u.com 2023-04-20 07:24:04 more
  • [2]SpinalHDL教程——Scala簡單入門

    第一個 Scala 程式 shell里面輸入 $ scala scala> 1 + 1 res0: Int = 2 scala> println("Hello World!") Hello World! 檔案形式 object HelloWorld { /* 這是我的第一個 Scala 程式 * 以 ......

    uj5u.com 2023-04-20 07:23:58 more
  • 理解函式指標和回呼函式

    理解 函式指標 指向函式的指標。比如: 理解函式指標的偽代碼 void (*p)(int type, char *data); // 定義一個函式指標p void func(int type, char *data); // 宣告一個函式func p = func; // 將指標p指向函式func ......

    uj5u.com 2023-04-20 07:23:52 more
  • Django筆記二十五之資料庫函式之日期函式

    本文首發于公眾號:Hunter后端 原文鏈接:Django筆記二十五之資料庫函式之日期函式 日期函式主要介紹兩個大類,Extract() 和 Trunc() Extract() 函式作用是提取日期,比如我們可以提取一個日期欄位的年份,月份,日等資料 Trunc() 的作用則是截取,比如 2022-0 ......

    uj5u.com 2023-04-20 07:23:45 more
  • 一天吃透JVM面試八股文

    什么是JVM? JVM,全稱Java Virtual Machine(Java虛擬機),是通過在實際的計算機上仿真模擬各種計算機功能來實作的。由一套位元組碼指令集、一組暫存器、一個堆疊、一個垃圾回收堆和一個存盤方法域等組成。JVM屏蔽了與作業系統平臺相關的資訊,使得Java程式只需要生成在Java虛擬機 ......

    uj5u.com 2023-04-20 07:23:31 more
  • 使用Java接入小程式訂閱訊息!

    更新完微信服務號的模板訊息之后,我又趕緊把微信小程式的訂閱訊息給實作了!之前我一直以為微信小程式也是要企業才能申請,沒想到小程式個人就能申請。 訊息推送平臺🔥推送下發【郵件】【短信】【微信服務號】【微信小程式】【企業微信】【釘釘】等訊息型別。 https://gitee.com/zhongfuch ......

    uj5u.com 2023-04-20 07:22:59 more
  • java -- 緩沖流、轉換流、序列化流

    緩沖流 緩沖流, 也叫高效流, 按照資料型別分類: 位元組緩沖流:BufferedInputStream,BufferedOutputStream 字符緩沖流:BufferedReader,BufferedWriter 緩沖流的基本原理,是在創建流物件時,會創建一個內置的默認大小的緩沖區陣列,通過緩沖 ......

    uj5u.com 2023-04-20 07:22:49 more
  • Java-SpringBoot-Range請求頭設定實作視頻分段傳輸

    老實說,人太懶了,現在基本都不喜歡寫筆記了,但是網上有關Range請求頭的文章都太水了 下面是抄的一段StackOverflow的代碼...自己大修改過的,寫的注釋挺全的,應該直接看得懂,就不解釋了 寫的不好...只是希望能給視頻網站開發的新手一點點幫助吧. 業務場景:視頻分段傳輸、視頻多段傳輸(理 ......

    uj5u.com 2023-04-20 07:22:42 more
  • Windows 10開發教程_編程入門自學教程_菜鳥教程-免費教程分享

    教程簡介 Windows 10開發入門教程 - 從簡單的步驟了解Windows 10開發,從基本到高級概念,包括簡介,UWP,第一個應用程式,商店,XAML控制元件,資料系結,XAML性能,自適應設計,自適應UI,自適應代碼,檔案管理,SQLite資料庫,應用程式到應用程式通信,應用程式本地化,應用程式 ......

    uj5u.com 2023-04-20 07:22:35 more