 
            flinkcdc同步mysql
Flink CDC 实现 MySQL 到 MySQL 数据同步项目解析
1. 项目概述
本项目使用 Apache Flink 和 Debezium 实现从 MySQL 源数据库到目标 MySQL 数据库的数据同步。主要功能包括:
- 增量和全量数据同步。
- 支持 DDL(数据定义语言)事件处理。
- 使用 HikariCP 进行数据库连接池管理。
- 处理 MySQL 时间类型转换。
- 项目地址:https://gitee.com/Blainw/flinkcdc-mysql-to-mysql.git
2. 文件结构与功能说明
2.1 MySqlSourceProperties.java
该文件定义了源 MySQL 数据库的配置属性,通过 Spring Boot 的 @ConfigurationProperties 注解读取配置文件中的参数,并提供默认值。
@Component
@ConfigurationProperties(prefix = "source")
@Data
public class MySqlSourceProperties {
    private String name;
    private String host;
    private int port;
    private String username;
    private String password;
    private String databaseList;
    private String tableList;
    private String startDate;
    private String startUpMode="latest";
}
2.2 MySqlDBUtils.java
负责初始化和关闭 HikariCP 数据库连接池,执行 SQL 语句。
public class MySqlDBUtils {
    private static HikariDataSource dataSource;
    public static synchronized void initConnectionPool() {
        // 初始化连接池逻辑
    }
    public static synchronized void closeConnectionPool() {
        // 关闭连接池逻辑
    }
    public static int executeSql(String sql, List<Object> insertParams, List<Object> updateParams) throws SQLException {
        // 执行 SQL 语句逻辑
    }
}
2.3 RefCountedReentrantLock.java
实现了一个带有引用计数的可重入锁,用于确保在 DDL 操作期间对表进行锁定,防止并发冲突。
public class RefCountedReentrantLock {
    private final ReentrantLock lock;
    private final AtomicInteger refCount;
    public synchronized void lock() {
        lock.lock();
        refCount.incrementAndGet();
    }
    public synchronized void unlock() {
        if (refCount.decrementAndGet() == 0) {
            lock.unlock();
        }
    }
    public boolean isLocked() {
        return refCount.get() > 0;
    }
}
2.4 DateTimeConverter.java
实现了 Debezium 的 CustomConverter 接口,用于将 MySQL 的时间类型转换为标准格式。
public class DateTimeConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {
    private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE;
    private DateTimeFormatter timeFormatter = DateTimeFormatter.ISO_TIME;
    private DateTimeFormatter datetimeFormatter = DateTimeFormatter.ISO_DATE_TIME;
    private DateTimeFormatter timestampFormatter = DateTimeFormatter.ISO_DATE_TIME;
    private ZoneId timestampZoneId = ZoneId.systemDefault();
    @Override
    public void converterFor(RelationalColumn column, ConverterRegistration<SchemaBuilder> registration) {
        // 转换逻辑
    }
    private String convertDate(Object input) {
        // 日期转换逻辑
    }
    private String convertTime(Object input) {
        // 时间转换逻辑
    }
    private String convertDateTime(Object input) {
        // 日期时间转换逻辑
    }
    private String convertTimestamp(Object input) {
        // 时间戳转换逻辑
    }
}
2.5 MysqlSink.java
实现了 Flink 的 RichSinkFunction,负责将捕获到的数据写入目标 MySQL 数据库,并处理 DML 和 DDL 事件。
public class MysqlSink extends RichSinkFunction<String> {
    private static ConcurrentHashMap<String, RefCountedReentrantLock> tableLocks = new ConcurrentHashMap<>();
    private static ExecutorService executorService = Executors.newFixedThreadPool(10);
    private static ExecutorService taskService = Executors.newFixedThreadPool(5);
    private static ConcurrentHashMap<String, BlockingQueue<Runnable>> waitingQueues = new ConcurrentHashMap<>();
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        MySqlDBUtils.initConnectionPool();
    }
    @Override
    public void invoke(String value, Context context) throws Exception {
        // 处理不同类型的操作
    }
    private void handleDmlEvent(JSONObject obj) throws Exception {
        // 处理插入、更新操作
    }
    private void executeDmlEvent(JSONObject afterObj, JSONObject source) throws Exception {
        // 构建并执行 SQL 语句
    }
    private void handleDeleteEvent(JSONObject obj) throws Exception {
        // 处理删除操作
    }
    private void executeDeleteEvent(JSONObject beforeObj, JSONObject source) throws Exception {
        // 构建并执行删除 SQL 语句
    }
    private void handleDdlEvent(String tableName, JSONObject obj) throws Exception {
        // 处理 DDL 事件
    }
    private void addTaskToWaitingQueue(String tableName, Runnable task) {
        // 将任务加入等待队列
    }
    private void processWaitingQueue(String tableName) {
        // 处理等待队列中的任务
    }
    @Override
    public void close() throws Exception {
        super.close();
        MySqlDBUtils.closeConnectionPool();
        executorService.shutdown();
    }
}
2.6 MySqlListenerStart.java
负责启动 Flink 环境并配置 MySQL CDC 源,监听 MySQL 的 binlog 事件,将其发送到 MysqlSink 进行处理。
@Component
public class MySqlListenerStart {
    @Autowired
    private MySqlSourceProperties config;
    @EventListener
    public void start(ContextRefreshedEvent event) throws IOException {
        // 配置 Flink 环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config1);
        // 构建 MySQL Source
        MySqlSourceBuilder<String> builder = MySqlSource.<String>builder()
                .hostname(config.getHost())
                .port(config.getPort())
                .username(config.getUsername())
                .password(config.getPassword())
                .serverTimeZone("Asia/Shanghai")
                .jdbcProperties(prop)
                .debeziumProperties(debeziumProperties)
                .includeSchemaChanges(true)
                .databaseList(config.getDatabaseList())
                .tableList(config.getTableList())
                .deserializer(new JsonDebeziumDeserializationSchema())
                .splitSize(10000)
                .fetchSize(10000);
        // 设置同步模式
        StartupOptions startupOptions;
        if (StringUtils.hasText(config.getStartDate())) {
            startupOptions = StartupOptions.timestamp(DateUtil.parseDate(config.getStartDate()).getTime());
        } else if ("initial".equals(config.getStartUpMode())) {
            startupOptions = StartupOptions.initial();
        } else {
            startupOptions = StartupOptions.latest();
        }
        builder.startupOptions(startupOptions);
        DataStreamSource<String> mySqlDS = env.fromSource(builder.build(), WatermarkStrategy.noWatermarks(), config.getName());
        mySqlDS.addSink(new MysqlSink());
        try {
            env.execute("mysqlSynchronization");
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}
3. 注意事项
3.1 数据库连接池配置
- HikariCP 是一个高性能的 JDBC 连接池,需要根据实际需求调整最大连接数、最小空闲连接数等参数。
- 确保 application.yml中的数据库配置正确无误,避免因配置错误导致连接失败。
3.2 锁机制
- RefCountedReentrantLock用于防止 DDL 操作时的并发冲突,确保同一张表不会同时被多个线程修改。
- 在 DDL 操作完成后,必须解锁并处理等待队列中的任务,以保证后续操作正常进行。
3.3 数据同步模式
- startUpMode参数决定了同步模式:- latest表示增量同步,- initial表示全量同步。
- 如果指定了 startDate,则从指定时间点开始同步。
3.4 异常处理
- 在 invoke方法中,对于不同类型的事件(如 DML、DDL),应确保异常被捕获并处理,避免程序崩溃。
- 使用 CompletableFuture异步处理任务时,需注意异常传播问题。
3.5 性能优化
- 根据实际情况调整线程池大小和 SQL 执行批量大小,以提高性能。
- 合理设置 Flink 的 Checkpoint 和 Savepoint 配置,确保数据一致性。
4. 总结
本项目通过 Flink CDC 实现了 MySQL 到 MySQL 的数据同步,涵盖了增量和全量同步、DDL 事件处理、时间类型转换等功能。通过合理的锁机制和异步任务处理,确保了数据同步的稳定性和高效性。希望这篇博客能帮助你更好地理解和实现类似项目。
            本文是原创文章,采用 CC BY-NC-ND 4.0 协议,完整转载请注明来自 王德明
        
     评论
            
                匿名评论
                隐私政策
            
            
                你无需删除空行,直接评论以获取最佳展示效果
            
         
            
         
        
    