如何使用Flink,通过CTAS语句同步MySQL数据至StarRocks

发布时间:2025-04-30 19:05

背景信息

您可以通过CTAS或CDAS语句将MySQL数据同步至EMR Serverless StarRocks,CTAS可以实现单表的结构和数据同步,CDAS可以实现整库同步或者同一库中的多表结构和数据同步。本文使用CTAS语句,CDAS语句的使用方法与CTAS类似,具体请参见CDAS介绍。

通过CTAS(CREATE TABLE AS)语句,您可以在StarRocks中自动创建和MySQL中表结构一致的表,并进行数据同步。同时还能实时同步上游表结构(Schema)的变更到下游表,提高您在目标存储中创建表和维护源表结构变更的效率。

当执行CTAS语句时,Flink会按照以下流程执行:

检查目标存储中是否存在该目标表。

如果不存在,则通过目标端Catalog在目标存储中创建相应的目标表,该目标表具有和数据源相同的Schema。

如果存在,则跳过建表。如果已存在的目标表与源表Schema不一致,则会报错提示。

提交和启动相应的数据同步作业。同步数据源的数据以及Schema的变更到目标表中。

表结构变更同步策略通过CTAS语句,在实时同步数据的同时,还能同步源表Schema的变更到目标表中。

Schema变更包括初始表的创建以及未来表的变更。

当前支持同步的Schema变更:

添加可空列:会自动在目标表Schema末尾添加对应的列,并自动同步新增列的数据。

删除可空列:不会直接在目标表中删除该列,而是将该列的数据自动填充为NULL值。

重命名列:直接在目标表中末尾添加重命名后的列,并将重命名前的列数据自动填充为NULL值。

例如,如果col_a重命名为col_b,则会在目标表末尾添加col_b,并自动将col_a的数据填充为NULL值。

暂不支持同步的Schema变更:

数据类型的变更。

例如,由VARCHAR变为BIGINT,由NOT NULL变为NULLABLE属性。

主键或索引等约束的变更。

非空列的增加或删除的变更。

说明

如果遇到不支持的Schema变更,则需要您手动删除下游目标表,重新启动CTAS作业,即重新创建目标表并重新同步历史数据。CTAS不会识别具体的DDL类型,而是对比前后两条数据的Schema差异。因此,如果您先删除了某列后,又加回了该列,且这两个DDL之间无数据变化,则CTAS会认为没有发生结构变更。同理,如果您添加了一列,直到该表有数据变化,CTAS才会感知到结构变更,才会同步结构变更到目标表。通过CTAS建表支持的字段类型信息,请参见Flink与StarRocks的数据类型映射关系。

前提条件

已在新版控制台创建DataFlow集群,并选中Flink服务,详情请参见创建集群。

已创建EMR Serverless StarRocks实例,详情请参见创建实例。

已创建RDS MySQL实例,详情请参见快速创建RDS MySQL实例。

说明

本文以5.7版本的MySQL、EMR-3.42.0版本的DataFlow集群为例介绍。

使用限制

DataFlow集群、EMR Serverless StarRocks实例和RDS MySQL实例需要在同一个VPC下。

DataFlow集群和EMR Serverless StarRocks实例均须开启公网访问。

RDS MySQL须为5.7及以上版本。

DataFlow集群须为EMR-3.42.0及后续版本或EMR-5.8.0及后续版本。

步骤一:准备测试数据

创建测试的数据库和账号,详情请参见创建数据库和账号。

创建完数据库和账号后,需要授权测试账号的读写权限。

说明

本文创建的数据库名称为test_cdc,账号为emr_test。

使用创建的测试账号连接MySQL实例,详情请参见通过DMS登录RDS MySQL。

执行以下命令,创建数据表。

use test_cdc; CREATE TABLE IF NOT EXISTS `runoob_tbl`( `runoob_id` INT UNSIGNED AUTO_INCREMENT, `runoob_title` VARCHAR(100) NOT NULL, `runoob_author` VARCHAR(40) NOT NULL, `submission_date` DATE, `add_col` int DEFAULT NULL, PRIMARY KEY ( `runoob_id` ) )ENGINE=InnoDB DEFAULT CHARSET=utf8; INSERT INTO test_cdc.`runoob_tbl` (`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`) values (18,'first','tom','2022-06-22 17:13:44',3)

登录并连接EMR Serverless StarRocks实例,详情请参见通过客户端方式连接StarRocks实例。

执行以下命令,创建数据库test_cdc、创建超级管理员用户test(示例密码为1qaz!QAZ)或者创建普通用户test并给普通用户授予该数据库权限,详情请参见管理用户。

CREATE DATABASE test_cdc; CREATE USER 'test' IDENTIFIED by '1qaz!QAZ'; GRANT ALL on test_cdc to test;

步骤二:上传自定义Connector

上传自定义的Connector用于Flink、StarRocks和RDS MySQL连接。

使用SSH方式登录DataFlow集群,详情请参见登录集群。

下载flink-connector-starrocks-1.2.2_flink-1.13_2.11.jar和ververica-connector-mysql-1.13-vvr-4.0.12-1-20220330.065158-3-jar-with-dependencies.jar,并上传到DataFlow集群的/opt/apps/FLINK/flink-current/lib目录下。

通过Session模式提交作业。

使用SSH方式登录DataFlow集群,详情请参见登录集群。

执行以下命令,进入/opt/apps/FLINK/flink-current目录。

cd /opt/apps/FLINK/flink-current

执行以下命令,启动YARN Session。

./bin/yarn-session.sh --detached

执行成功后,返回信息中的application_XXXX_YY,即为登录SQL客户端需要用到的sessionId。

执行以下命令,打开SQL客户端。

./bin/sql-client.sh -s <application_XXXX_YY>

说明

请修改<application_XXXX_YY>为您前一步获取到的sessionId。

创建MySQL和StarRocks的Catalog。

CREATE CATALOG sr WITH ( 'type' = 'starrocks', 'endpoint' = 'fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030', 'username' = 'test', 'password' = '1qaz!QAZ', 'dbname' = 'test_cdc' ); CREATE CATALOG mysql WITH ( 'type' = 'mysql', 'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'emr_test', 'password' = '123456', 'default-database' = 'test_cdc' );

请根据实际信息修改各参数值,各参数描述如下表所示。

表 1. StarRocks Catalog参数

参数

描述

type

类型,固定值为starrocks。

endpoint

指定FE节点的内网地址和查询端口,格式为EMR Serverless StarRocks实例FE节点的内网地址:9030。例如,fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030。

说明

关于如何获取EMR Serverless StarRocks实例FE节点的内网地址,请参见查看实例列表与详情。

username

StarRocks的用户名。

填写步骤一:准备测试数据中创建的用户名。本示例为test。

password

StarRocks数据库服务的密码。

填写步骤一:准备测试数据中账号设置的密码。本示例为1qaz!QAZ。

dbname

StarRocks数据库名称。

填写步骤一:准备测试数据中创建的数据库名。本示例为test_cdc。

表 2. MySQL Catalog参数

参数

描述

type

类型,固定值为mysql。

hostname

RDS的内网地址。

您可以在RDS的数据库连接页面,单击内网地址进行复制。例如,rm-bp1nu0c46fn9k****.mysql.rds.aliyuncs.com。

port

MySQL数据库服务的端口号,默认值为3306。

username

MySQL数据库服务的用户名。

填写步骤一:准备测试数据中账号的用户名。本示例为emr_test。

password

MySQL数据库服务的密码。

填写步骤一:准备测试数据中账号的密码。本示例为123456。

default-database

默认的MySQL数据库名称。

填写步骤一:准备测试数据中创建的数据库名。本示例为test_cdc。

在StarRocks的Catalog下,发送CTAS语句。

您可以使用以下三种示例发送CTAS语句。

At Least Once语义:通过sink.buffer-flush.interval-ms配置项,配置每次写入StarRocks的时间间隔,优点是写入间隔时间短,占用内存较少。

use CATALOG sr; CREATE TABLE IF NOT EXISTS runoob_tbl1 with ( 'starrocks.create.table.properties'=' engine = olap primary key(runoob_id) distributed by hash(runoob_id ) buckets 8', 'database-name'='test_cdc', 'jdbc-url'='jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030', 'load-url'='fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030', 'table-name'='runoob_tbl_sr', 'username'='test', 'password' = '1qaz!QAZ', 'sink.buffer-flush.interval-ms' = '5000', 'sink.properties.row_delimiter' = '\x02', 'sink.properties.column_separator' = '\x01' ) as table mysql.test_cdc.runoob_tbl /*+ OPTIONS ( 'connector' = 'mysql-cdc', 'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'test', 'password' = '123456', 'database-name' = 'test_cdc', 'table-name' = 'runoob_tbl' )*/;

Exactly once语义:需要定义checkpoint间隔,优点是在各种异常情况下保障数据不丢失不重复,缺点是数据可见时间取决于checkpoint间隔。更多信息,请参见Checkpointing。

set 'execution.checkpointing.interval' = '1 min'; set 'execution.checkpointing.mode' = 'EXACTLY_ONCE'; set 'execution.checkpointing.timeout' = '10 min'; use CATALOG sr; CREATE TABLE IF NOT EXISTS runoob_tbl1 with ( 'starrocks.create.table.properties'=' engine = olap primary key(runoob_id) distributed by hash(runoob_id ) buckets 8', 'database-name'='test_cdc', 'jdbc-url'='jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030', 'load-url'='fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030', 'table-name'='runoob_tbl', 'username'='test', 'password' = '1qaz!QAZ', 'sink.semantic' = 'exactly-once', 'sink.properties.row_delimiter' = '\x02', 'sink.properties.column_separator' = '\x01' ) as table mysql.test_cdc.runoob_tbl /*+ OPTIONS ( 'connector' = 'mysql-cdc', 'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'test', 'password' = '123456', 'database-name' = 'test_cdc', 'table-name' = 'runoob_tbl' )*/;

Simple模式:优点是创建表时不需要关注原表有哪些字段,会按照MySQL的表格式照搬过来,开发者使用比较方便。缺点是不能创建分区,对于需要分区的表,仍需要通过normal模式创建。

use CATALOG sr; CREATE TABLE IF NOT EXISTS runoob_tbl1 with ( 'starrocks.create.table.properties'='buckets 8', 'starrocks.create.table.mode'='simple', 'database-name'='test_cdc', 'jdbc-url'='jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030', 'load-url'='fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030', 'table-name'='runoob_tbl_sr', 'username'='test', 'password' = '1qaz!QAZ', 'sink.buffer-flush.interval-ms' = '5000', 'sink.properties.row_delimiter' = '\x02', 'sink.properties.column_separator' = '\x01' ) as table mysql.test_cdc.runoob_tbl /*+ OPTIONS ( 'connector' = 'mysql-cdc', 'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'emr_test', 'password' = '123456', 'database-name' = 'test_cdc', 'table-name' = 'runoob_tbl' )*/;

表 3. WITH参数

参数

是否必选

描述

starrocks.create.table.properties

StarRocks建表语句中除了字段定义以外的其他后缀定义,例如示例中的engine、key和buckets等。

database-name

StarRocks数据库名称。

本示例为test_cdc。

jdbc-url

用于在StarRocks中执行查询操作。

例如,jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030。其中,fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com为EMR Serverless StarRocks实例FE节点的内网地址。

说明

关于如何获取EMR Serverless StarRocks实例FE节点的内网地址,请参见查看实例列表与详情。

load-url

指定FE节点的内网地址和查询端口,格式为EMR Serverless StarRocks实例FE节点的内网地址:8030。

例如,fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030。

说明

关于如何获取EMR Serverless StarRocks实例FE节点的内网地址,请参见查看实例列表与详情。

sink.semantic

填写exactly-once可以保障数据一致性语义,默认为at-least-once。

starrocks.create.table.mode

支持以下参数值:

normal模式(默认值):必须像示例一样在starrocks.create.table.properties配置中填写engine、key和buckets等完整的配置。

simple模式:默认选择engine为olap,选择key类型为primary key,且主键与MySQL的主键保持完全一致,默认distributed by hash(所有的主键),默认无分区。需要在starrocks.create.table.properties配置中填写的必填内容为buckets ,选填内容为properties等配置。

sink.properties.row_delimiter

自定义行分隔符。

sink.properties.column_separator

自定义列分隔符。

说明

因为vvr-6.0.5-flink-1.15及以上版本移除了sink.use.new-api,所以使用vvr-6.0.5-flink-1.15之前的版本时,请在with参数中添加'sink.use.new-api'='false',。

其他配置请参见从Apache Flink持续导入。

表 4. OPTIONS参数

参数

描述

connector

类型,固定值为mysql-cdc。

hostname

RDS的内网地址。

您可以在RDS的数据库连接页面,单击内网地址进行复制。例如,rm-bp1nu0c46fn9k****.mysql.rds.aliyuncs.com。

port

MySQL数据库服务的端口号,默认值为3306。

username

MySQL数据库服务的用户名。

填写步骤一:准备测试数据中账号的用户名。本示例为emr_test。

password

MySQL数据库服务的密码。

填写步骤一:准备测试数据中账号的密码。

table-name

StarRocks中的表名称。

填写步骤一:准备测试数据中创建的表名。本示例为runoob_tbl。

database-name

默认的MySQL数据库名称。

填写步骤一:准备测试数据中创建的数据库名。本示例为test_cdc。

步骤四:验证数据同步结果

说明

如果开启了checkpoint,则最长等待时间大约是checkpoint的时间间隔。

查询数据

登录并连接EMR Serverless StarRocks实例,详情请参见通过客户端方式连接StarRocks实例。

在StarRocks连接窗口执行以下命令,查看表数据。

use test_cdc; select * from runoob_tbl1;

返回信息如下,表示MySQL上的数据已同步至StarRocks。

+-----------+--------------+---------------+-----------------+---------+ | runoob_id | runoob_title | runoob_author | submission_date | add_col | +-----------+--------------+---------------+-----------------+---------+ | 18 | first | tom | 2022-06-22 | 3 | +-----------+--------------+---------------+-----------------+---------+

查询插入后的数据

在RDS数据库窗口执行以下命令,插入数据。

INSERT INTO runoob_tbl(`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`) values(1,'second','tom2','2022-06-23',1);

在StarRocks连接窗口执行以下命令,查看表数据。

select * from runoob_tbl1;

返回信息如下,表示数据已成功插入。

+-----------+--------------+---------------+-----------------+---------+ | runoob_id | runoob_title | runoob_author | submission_date | add_col | +-----------+--------------+---------------+-----------------+---------+ | 1 | second | tom2 | 2022-06-23 | 1 | | 18 | first | tom | 2022-06-22 | 3 | +-----------+--------------+---------------+-----------------+---------+

同步数据更新

在RDS数据库窗口执行以下命令,更新指定数据。

update runoob_tbl set runoob_title= 'new' where runoob_id = 18;

在StarRocks连接窗口执行以下命令,查看表数据。

select * from runoob_tbl1;

返回信息如下,表示数据已同步更新。

+-----------+--------------+---------------+-----------------+---------+ | runoob_id | runoob_title | runoob_author | submission_date | add_col | +-----------+--------------+---------------+-----------------+---------+ | 1 | second | tom2 | 2022-06-23 | 1 | | 18 | new | tom | 2022-06-22 | 3 | +-----------+--------------+---------------+-----------------+---------+

同步数据删除

在RDS数据库窗口执行以下命令,删除指定数据。

DELETE FROM runoob_tbl WHERE runoob_id = 1;

在StarRocks连接窗口执行以下命令,查看表数据。

select * from runoob_tbl1;

返回信息如下,表示数据已同步删除。

+-----------+--------------+---------------+-----------------+---------+ | runoob_id | runoob_title | runoob_author | submission_date | add_col | +-----------+--------------+---------------+-----------------+---------+ | 18 | new | tom | 2022-06-22 | 3 | +-----------+--------------+---------------+-----------------+---------+

增加可空列

在RDS数据库窗口执行以下命令,增加可空列。

alter table `runoob_tbl` add COLUMN `add_col2` INT;

执行以下命令 ,插入数据。

INSERT INTO runoob_tbl(`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`,`add_col2`) values(1,'second','tom2','2022-06-23',1,2)

在StarRocks连接窗口执行以下命令,查看表数据。

select * from runoob_tbl1;

返回信息如下,表示Schema已经成功变更。

+-----------+--------------+---------------+-----------------+---------+---------+ | runoob_id | runoob_title | runoob_author | submission_date | add_col | add_co2 | +-----------+--------------+---------------+-----------------+---------+---------+ | 1 | second | tom2 | 2022-06-23 | 1 | 2 | | 18 | new | tom | 2022-06-22 | 3 | NULL | +-----------+--------------+---------------+-----------------+---------+---------+

CDAS介绍

CDAS是CTAS的一个语法糖。通过CDAS语句,可以实现MySQL中的整库同步,即生成一个Flink Job,源表是MySQL中的Database,目标表是StarRocks中对应的多张表,同时可以使用including table语法,只选择一个Database中的部分表进行CDAS操作。

与CTAS的执行相同,需要在创建MySQL和StarRocks相应的Catalog后,执行CDAS语句。创建语法示例如下。

CREATE DATABASE IF NOT EXISTS sr_db with ( 'starrocks.create.table.properties'=' buckets 8', 'starrocks.create.table.mode'='simple', 'jdbc-url'='jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030', 'load-url'='fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030', 'username'='test', 'password' = '1qaz!QAZ', 'sink.buffer-flush.interval-ms' = '5000', 'sink.properties.row_delimiter' = '\x02', 'sink.properties.column_separator' = '\x01' ) as DATABASEmysql.test_cdc including table 'tabl1','tbl2','tbl3' /*+ OPTIONS ( 'connector' = 'mysql-cdc', 'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'test', 'password' = '123456', 'database-name' = 'test_cdc' )*/;

网址:如何使用Flink,通过CTAS语句同步MySQL数据至StarRocks http://c.mxgxt.com/news/view/905246

相关内容

flink读取starRocks数据,只能读取历史的,不能读实时数据,怎么破?
这是flink + smt + starrocks 的链接,请问sr 指的是什么组件?
如何配置StarRocks输出组件用于向数据源写入数据
使用docker部署单机测试版starrocks
快速上手StarRocks
StarRocks数据质量管理
StarRocks 2.1 新版本特性介绍
让数据分析极速统一!StarRocks和阿里云一起干了件大事
数据库
mysql数据库表关系图怎么生成

随便看看