kafka(官方)
建表语句
-- 创建带有版本的表
CREATE TABLE `${库名}`.`KAFKA_${表名}_v${时间戳}` (
`CUST_ID` STRING,
`OID_USERNO` STRING,
`USER_OR_MERCHANT` STRING,
`ODS_ROOT_TIME` STRING COMMENT 'ODS_ROOT处理时间',
`BDRIVERKEY` STRING COMMENT 'BDRIVERKEY',
`BDRIVEROPTYPE` STRING COMMENT 'BDRIVEROPTYPE',
`BDRIVERTABLE` STRING COMMENT 'BDRIVERTABLE',
`EVENT_KAFKA_KEY` STRING COMMENT 'EVENT_KAFKA_KEY',
`PROCTIME` AS `PROCTIME`()
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = 'xx.xx.xx.xx:9092',
'topic' = '${主题}',
'properties.group.id' = '${消费组}',
'scan.startup.mode' = 'group-offsets',
'format' = 'json'
);
底层实现
- 模块: flink-connector-kafka
- connector:"kafka"
- factory: KafkaDynamicTableFactory
- source:KafkaDynamicSource
- sink: KafkaDynamicSink
jdbc(官方)
建表语句
CREATE TABLE `${库名1}`.`${表名1}_v${时间戳}` (
`oid_chnl` VARCHAR(128) COMMENT '支付产品',
`transactiondevicetype` VARCHAR(128) COMMENT '设备名'
) WITH (
'connector' = 'jdbc',
'driver' = 'com.mysql.jdbc.Driver',
'url' = 'jdbc:mysql://xx.xx.xx.xx:3306/${库名2}?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true',
'username' = '${账号}',
'password' = '${密码}',
'table-name' = '${表名2}'
);
底层实现
- 模块: flink-connector-jdbc
- connector:"jdbc"
- factory: JdbcDynamicTableFactory
- source:JdbcDynamicTableSource
- sink: JdbcDynamicTableSink
hbase(官方)
hbase-1.4
建表语句
CREATE TABLE ${库名1}.${表名1} (
rowkey STRING,
f ROW < uv BIGINT, click_count BIGINT >,
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'hbase-1.4',
'zookeeper.quorum' = '${ZK地址}',
'zookeeper.znode.parent' = '/hbase',
'table-name' = '${库名2}:${表名2}',
'sink.buffer-flush.max-rows' = '1000'
)
hbase-2.2
elasticsearch(官方)
建表语句
CREATE TABLE ${库名}.${表名} (
emp_no INT NOT NULL,
first_name varchar(14),
last_name varchar(16)
) WITH (
'connector.type' = 'elasticsearch',
'connector.version' = '6',
'connector.hosts' = 'http://${esHost}:${esPort}',
'connector.index' = '${索引名称}',
'connector.document-type' = 'employee',
'connector.bulk-flush.max-actions' = '1', -- 每条数据都刷新
'format.type' = 'json',
'update-mode' = 'append'
)
底层实现
- 模块: flink-connector-elasticsearch-base
- connector:"elasticsearch"
- factory: Elasticsearch6DynamicSinkFactory,Elasticsearch7DynamicSinkFactory
- sink: Elasticsearch6DynamicSink,Elasticsearch7DynamicSink
starrocks(第三方)
建表语句
CREATE TABLE `${库名1}`.`SR_${表名1}_v${时间戳}` (
`cust_id` STRING,
`oid_userno` STRING,
`user_or_merchant` STRING
) WITH (
'connector' = 'starrocks',
'jdbc-url' = 'jdbc:mysql://${j1}:9030,${j2}:9030,${j3}:9030',
'load-url' = '${l1}:8030;${l2}:8030;${l3}:8030',
'database-name' = '${库名2}',
'table-name' = '${表名2}',
'username' = '${账号}',
'password' = '${密码}',
'sink.buffer-flush.max-rows' = '1000000',
'sink.buffer-flush.max-bytes' = '300000000',
'sink.buffer-flush.interval-ms' = '60000',
'sink.properties.format' = 'json',
'sink.properties.strip_outer_array' = 'true',
'sink.max-retries' = '3'
);
底层实现
- 模块: flink-connector-starrocks-kafka
- connector:"starrocks"
- factory: StarRocksDynamicTableSinkFactory
- sink: StarRocksDynamicTableSink
jdbc-kafka(第三方)
建表语句
CREATE TABLE `${库名1}`.`MYSQL_KAFKA_${表名1}_v${时间戳}` (
`REFUND_BILL_NO` BIGINT,
`MERCHANT_ID` BIGINT,
`MERCHANT_ORDER_NO` STRING,
`PAY_BILL_NO` BIGINT,
`REFUNDED_AMOUNT` BIGINT,
`REFUNDED_COMMISSION` BIGINT,
`REFUNDED_RESERVE` BIGINT,
`CREATION_TIME` TIMESTAMP,
`MODIFICATION_TIME` TIMESTAMP,
PRIMARY KEY (`REFUND_BILL_NO`) NOT ENFORCED
) WITH (
'connector' = 'jdbc-kafka',
'driver' = 'com.mysql.jdbc.Driver',
'url' = 'jdbc:mysql://xx.xx.xx.xx:3306/${库名2}?useUnicode=true&characterEncoding=UTF-8',
'table-name' = '${表名2}',
'username' = '${账号}',
'password' = '${密码}',
'properties.bootstrap.servers' = 'xx.xx.xx.xx:9092',
'topic' = '${主题}',
'properties.group.id' = '${消费组}',
'key.format' = 'json',
'key.fields' = '`REFUND_BILL_NO`',
'scan.startup.mode' = 'group-offsets',
'format' = 'json'
);
底层实现
- 模块: flink-connector-jdbc-kafka
- connector:"jdbc-kafka"
- factory: JdbcAndKafkaDynamicTableFactory
- source: 不支持
- sink: JdbcAndKafkaDynamicTableSink
oracle(自定义)
建表语句
CREATE TABLE `${库名1}`.`ORACLE_${表名1}_v${时间戳}` (
`ID` STRING,
`CURRENCY_CODE` STRING,
`EXCHANGE` DECIMAL(10, 6),
`BEGIN_TIME` TIMESTAMP,
`END_TIME` TIMESTAMP,
`STATUS` DECIMAL(1, 0),
`CREATE_TIME` TIMESTAMP,
`CREATOR` STRING
) WITH (
'connector' = 'oracle',
'driver' = 'oracle.jdbc.driver.OracleDriver',
'url' = 'jdbc:oracle:thin:@//xx.xx.xx.xx:1521/${实例}',
'table-name' = '${库名2}.${表名2}',
'username' = '${用户}',
'password' = '${密码}'
);
底层实现
- 模块: flink-connector-oracle-$
- connector:"oracle"
- factory: OracleDynamicTableFactory
- source:JdbcDynamicTableSource
- sink: JdbcDynamicTableSink
starrocks-kafka(自定义)
建表语句
CREATE TABLE `${库名1}`.`SR_${表名1}_v${时间戳}` (
`process_time` TIMESTAMP COMMENT '汇率时间',
`rate_type` STRING COMMENT '1分钟:oneMin;5分钟:fiveMin;20分钟 :twentyMin;1小时:oneHour',
`currency_pair` STRING COMMENT '币种对',
`create_time` TIMESTAMP COMMENT '创建时间',
`base_currency` STRING COMMENT '基准币种',
`quote_rate` STRING COMMENT '报价汇率',
`rate_id` STRING COMMENT '汇率ID:根据银行编码、源币种、目的币种、汇率、生效时间(yyyyMMddHHmmss)MD5',
`id` BIGINT COMMENT ''
) WITH (
'connector' = 'starrocks-kafka',
'jdbc-url' = 'jdbc:mysql://${j1}:9030,${j2}:9030,${j3}:9030',
'load-url' = '${l1}:8030;${l2}:8030;${l3}:8030',
'database-name' = '${库名2}',
'table-name' = '${表名2}',
'username' = '${账号}',
'password' = '${密码}',
'sink.buffer-flush.max-rows' = '1000000',
'sink.buffer-flush.max-bytes' = '300000000',
'sink.buffer-flush.interval-ms' = '120000',
'sink.max-retries' = '3',
'properties.bootstrap.servers' = 'xx.xx.xx.xx:9092',
'topic' = '${主题}',
'properties.group.id' = '${消费组}',
'scan.startup.mode' = 'group-offsets',
'sink.properties.strip_outer_array' = 'true',
'format' = 'json',
'key.format' = 'json',
'key.fields' = 'id',
'sink.properties.format' = 'json'
);
底层实现
- 模块: flink-connector-starrocks-kafka_$
- connector:"starrocks-kafka"
- factory: StarRocksKafkaDynamicTableSinkFactory
- sink: StarRocksKafkaDynamicTableSink
mysql-x(数栈)
建表语句
CREATE TABLE `${库名1}`.`MYSQL_X_${表名1}_v${时间戳}` (
`ID` INTEGER,
`NAME` STRING,
`EMAIL` STRING,
`BD_STATUS` STRING,
`MDR_VERSION` INTEGER,
`MEMO` STRING,
`CREATION_TIME` TIMESTAMP,
`MODIFICATION_TIME` TIMESTAMP,
PRIMARY KEY (`ID`) NOT ENFORCED
) WITH (
'connector' = 'mysql-x',
'url' = 'jdbc:mysql://xx.xx.xx.xx:3306/${库名2}?useUnicode=true&characterEncoding=UTF-8',
'table-name' = '${表名2}',
'username' = '${用户}',
'password' = '${密码}',
'scan.query-timeout' = '10'
);
底层实现
- 模块: flinkx-connector-mysql
- connector:"mysql-x"
- factory: MysqlDynamicTableFactory
- source: JdbcDynamicTableSource
- sink: JdbcDynamicTableSink