kafka(官方)

建表语句

-- 创建带有版本的表
CREATE TABLE `${库名}`.`KAFKA_${表名}_v${时间戳}` (
  `CUST_ID` STRING,
  `OID_USERNO` STRING,
  `USER_OR_MERCHANT` STRING,
  `ODS_ROOT_TIME` STRING COMMENT 'ODS_ROOT处理时间',
  `BDRIVERKEY` STRING COMMENT 'BDRIVERKEY',
  `BDRIVEROPTYPE` STRING COMMENT 'BDRIVEROPTYPE',
  `BDRIVERTABLE` STRING COMMENT 'BDRIVERTABLE',
  `EVENT_KAFKA_KEY` STRING COMMENT 'EVENT_KAFKA_KEY',
  `PROCTIME` AS `PROCTIME`()
) WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = 'xx.xx.xx.xx:9092',
  'topic' = '${主题}',
  'properties.group.id' = '${消费组}',
  'scan.startup.mode' = 'group-offsets',
  'format' = 'json'
);

底层实现

  • 模块: flink-connector-kafka
  • connector:"kafka"
  • factory: KafkaDynamicTableFactory
  • source:KafkaDynamicSource
  • sink: KafkaDynamicSink

jdbc(官方)

建表语句

CREATE TABLE `${库名1}`.`${表名1}_v${时间戳}` (
  `oid_chnl` VARCHAR(128) COMMENT '支付产品',
  `transactiondevicetype` VARCHAR(128) COMMENT '设备名'
) WITH (
  'connector' = 'jdbc',
  'driver' = 'com.mysql.jdbc.Driver',
  'url' = 'jdbc:mysql://xx.xx.xx.xx:3306/${库名2}?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true',
  'username' = '${账号}',
  'password' = '${密码}',
  'table-name' = '${表名2}'
);

底层实现

  • 模块: flink-connector-jdbc
  • connector:"jdbc"
  • factory: JdbcDynamicTableFactory
  • source:JdbcDynamicTableSource
  • sink: JdbcDynamicTableSink

hbase(官方)

hbase-1.4

建表语句

CREATE TABLE ${库名1}.${表名1} (
  rowkey STRING,
  f ROW < uv BIGINT, click_count BIGINT >,
  PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'hbase-1.4',
'zookeeper.quorum' = '${ZK地址}',
'zookeeper.znode.parent' = '/hbase',
'table-name' = '${库名2}:${表名2}',
'sink.buffer-flush.max-rows' = '1000'
)

hbase-2.2

elasticsearch(官方)

建表语句

CREATE TABLE ${库名}.${表名} (
  emp_no INT NOT NULL,
  first_name varchar(14),
  last_name varchar(16)
) WITH (
  'connector.type' = 'elasticsearch',
  'connector.version' = '6',
  'connector.hosts' = 'http://${esHost}:${esPort}',
  'connector.index' = '${索引名称}',
  'connector.document-type' = 'employee',
  'connector.bulk-flush.max-actions' = '1', -- 每条数据都刷新
  'format.type' = 'json',
  'update-mode' = 'append'
)

底层实现

  • 模块: flink-connector-elasticsearch-base
  • connector:"elasticsearch"
  • factory: Elasticsearch6DynamicSinkFactory,Elasticsearch7DynamicSinkFactory
  • sink: Elasticsearch6DynamicSink,Elasticsearch7DynamicSink

starrocks(第三方)

建表语句

CREATE TABLE `${库名1}`.`SR_${表名1}_v${时间戳}` (
  `cust_id` STRING,
  `oid_userno` STRING,
  `user_or_merchant` STRING
) WITH (
  'connector' = 'starrocks',
  'jdbc-url' = 'jdbc:mysql://${j1}:9030,${j2}:9030,${j3}:9030',
  'load-url' = '${l1}:8030;${l2}:8030;${l3}:8030',
  'database-name' = '${库名2}',
  'table-name' = '${表名2}',
  'username' = '${账号}',
  'password' = '${密码}',
  'sink.buffer-flush.max-rows' = '1000000',
  'sink.buffer-flush.max-bytes' = '300000000',
  'sink.buffer-flush.interval-ms' = '60000',
  'sink.properties.format' = 'json',
  'sink.properties.strip_outer_array' = 'true',
  'sink.max-retries' = '3'
);

底层实现

  • 模块: flink-connector-starrocks-kafka
  • connector:"starrocks"
  • factory: StarRocksDynamicTableSinkFactory
  • sink: StarRocksDynamicTableSink

jdbc-kafka(第三方)

建表语句

CREATE TABLE `${库名1}`.`MYSQL_KAFKA_${表名1}_v${时间戳}` (
  `REFUND_BILL_NO` BIGINT,
  `MERCHANT_ID` BIGINT,
  `MERCHANT_ORDER_NO` STRING,
  `PAY_BILL_NO` BIGINT,
  `REFUNDED_AMOUNT` BIGINT,
  `REFUNDED_COMMISSION` BIGINT,
  `REFUNDED_RESERVE` BIGINT,
  `CREATION_TIME` TIMESTAMP,
  `MODIFICATION_TIME` TIMESTAMP,
  PRIMARY KEY (`REFUND_BILL_NO`) NOT ENFORCED
) WITH (
  'connector' = 'jdbc-kafka',
  'driver' = 'com.mysql.jdbc.Driver',
  'url' = 'jdbc:mysql://xx.xx.xx.xx:3306/${库名2}?useUnicode=true&characterEncoding=UTF-8',
  'table-name' = '${表名2}',
  'username' = '${账号}',
  'password' = '${密码}',
  'properties.bootstrap.servers' = 'xx.xx.xx.xx:9092',
  'topic' = '${主题}',
  'properties.group.id' = '${消费组}',
  'key.format' = 'json',
  'key.fields' = '`REFUND_BILL_NO`',
  'scan.startup.mode' = 'group-offsets',
  'format' = 'json'
);

底层实现

  • 模块: flink-connector-jdbc-kafka
  • connector:"jdbc-kafka"
  • factory: JdbcAndKafkaDynamicTableFactory
  • source: 不支持
  • sink: JdbcAndKafkaDynamicTableSink

oracle(自定义)

建表语句

CREATE TABLE `${库名1}`.`ORACLE_${表名1}_v${时间戳}` (
  `ID` STRING,
  `CURRENCY_CODE` STRING,
  `EXCHANGE` DECIMAL(10, 6),
  `BEGIN_TIME` TIMESTAMP,
  `END_TIME` TIMESTAMP,
  `STATUS` DECIMAL(1, 0),
  `CREATE_TIME` TIMESTAMP,
  `CREATOR` STRING
) WITH (
  'connector' = 'oracle',
  'driver' = 'oracle.jdbc.driver.OracleDriver',
  'url' = 'jdbc:oracle:thin:@//xx.xx.xx.xx:1521/${实例}',
  'table-name' = '${库名2}.${表名2}',
  'username' = '${用户}',
  'password' = '${密码}'
);

底层实现

  • 模块: flink-connector-oracle-$
  • connector:"oracle"
  • factory: OracleDynamicTableFactory
  • source:JdbcDynamicTableSource
  • sink: JdbcDynamicTableSink

starrocks-kafka(自定义)

建表语句

CREATE TABLE `${库名1}`.`SR_${表名1}_v${时间戳}` (
  `process_time` TIMESTAMP COMMENT '汇率时间',
  `rate_type` STRING COMMENT '1分钟:oneMin;5分钟:fiveMin;20分钟 :twentyMin;1小时:oneHour',
  `currency_pair` STRING COMMENT '币种对',
  `create_time` TIMESTAMP COMMENT '创建时间',
  `base_currency` STRING COMMENT '基准币种',
  `quote_rate` STRING COMMENT '报价汇率',
  `rate_id` STRING COMMENT '汇率ID:根据银行编码、源币种、目的币种、汇率、生效时间(yyyyMMddHHmmss)MD5',
  `id` BIGINT COMMENT ''
) WITH (
  'connector' = 'starrocks-kafka',
  'jdbc-url' = 'jdbc:mysql://${j1}:9030,${j2}:9030,${j3}:9030',
  'load-url' = '${l1}:8030;${l2}:8030;${l3}:8030',
  'database-name' = '${库名2}',
  'table-name' = '${表名2}',
  'username' = '${账号}',
  'password' = '${密码}',
  'sink.buffer-flush.max-rows' = '1000000',
  'sink.buffer-flush.max-bytes' = '300000000',
  'sink.buffer-flush.interval-ms' = '120000',
  'sink.max-retries' = '3',
  'properties.bootstrap.servers' = 'xx.xx.xx.xx:9092',
  'topic' = '${主题}',
  'properties.group.id' = '${消费组}',
  'scan.startup.mode' = 'group-offsets',
  'sink.properties.strip_outer_array' = 'true',
  'format' = 'json',
  'key.format' = 'json',
  'key.fields' = 'id',
  'sink.properties.format' = 'json'
);

底层实现

  • 模块: flink-connector-starrocks-kafka_$
  • connector:"starrocks-kafka"
  • factory: StarRocksKafkaDynamicTableSinkFactory
  • sink: StarRocksKafkaDynamicTableSink

mysql-x(数栈)

建表语句

CREATE TABLE `${库名1}`.`MYSQL_X_${表名1}_v${时间戳}` (
  `ID` INTEGER,
  `NAME` STRING,
  `EMAIL` STRING,
  `BD_STATUS` STRING,
  `MDR_VERSION` INTEGER,
  `MEMO` STRING,
  `CREATION_TIME` TIMESTAMP,
  `MODIFICATION_TIME` TIMESTAMP,
  PRIMARY KEY (`ID`) NOT ENFORCED
) WITH (
  'connector' = 'mysql-x',
  'url' = 'jdbc:mysql://xx.xx.xx.xx:3306/${库名2}?useUnicode=true&characterEncoding=UTF-8',
  'table-name' = '${表名2}',
  'username' = '${用户}',
  'password' = '${密码}',
  'scan.query-timeout' = '10'
);

底层实现

  • 模块: flinkx-connector-mysql
  • connector:"mysql-x"
  • factory: MysqlDynamicTableFactory
  • source: JdbcDynamicTableSource
  • sink: JdbcDynamicTableSink