Flink CDC订阅PostgreSQL实现数据捕获并推送到Kafka

发布于 2024-11-21  4.68k 次阅读


如果只做数据同步,官网有更简单的无代码案例

com.ververicaorg.apache.flink 的 CDC(Change Data Capture)包都可以用于 Flink 实现流式数据同步

com.ververica

  • 不需要 Kafka 或其他中间件,直接与数据库交互,部署更简单。
  • 适合直接从数据库获取变更,并处理为 Flink 数据流。

org.apache.flink

  • 内部集成了 Debezium 引擎,用于从 PostgreSQL 的逻辑复制槽中读取 WAL 日志并解析变更事件。
  • 更适合已有 Kafka 数据流的环境。

1、运行postgresql和kafka

version: '3.6'
services:
  postgresql:
    image: bitnami/postgresql:17.1.0
    container_name: pg
    environment:
      - ALLOW_EMPTY_PASSWORD=yes
      - POSTGRESQL_WAL_LEVEL=logical
    ports:
      - "5432:5432"
    volumes:
      - /data/pg:/bitnami/postgresql
      - /data/pg/postgresql.conf:/opt/bitnami/postgresql/conf/postgresql.conf

  kafka:
    container_name: kafka
    image: 'bitnami/kafka:3.3.1'
    user: root
    ports:
      - '9092:9092'
      - '9093:9093'
    environment:
      - KAFKA_ENABLE_KRAFT=yes
      - KAFKA_CFG_PROCESS_ROLES=broker,controller
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.1.4:9092
      - KAFKA_BROKER_ID=1
      - KAFKA_KRAFT_CLUSTER_ID=iZWRiSqjZAlYwlKEqHFQWI
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@localhost:9093
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_HEAP_OPTS=-Xmx512M -Xms256M
    volumes:
      - /data/kafka/broker01:/bitnami/kafka:rw

2、配置postgres WAL 日志,使用pgoutput

找到postgresql.conf,修改

wal_level = 'logical';
max_replication_slots = 16;
max_wal_senders = 16;

#systemctl restart postgres 重启pg

3、编写本地项目代码

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.cdc.connectors.base.options.StartupOptions;
import org.apache.flink.cdc.connectors.postgres.source.PostgresSourceBuilder;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class PostgresCdcTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
                .setBootstrapServers("192.168.1.4:9092")
                .setRecordSerializer(
                        KafkaRecordSerializationSchema.builder()
                                .setTopic("flink-test")
                                .setValueSerializationSchema(new SimpleStringSchema())
                                .build()
                )
                .build();

        PostgresSourceBuilder.PostgresIncrementalSource<String> source = PostgresSourceBuilder.PostgresIncrementalSource.<String>builder()
                .hostname("192.168.1.4")
                .port(5432)
                .database("postgres")
                .schemaList("public")
                .tableList("public.biz_device_product")
                .username("postgres")
                .slotName("flink_cdc")
                .password("postgres")
                .decodingPluginName("pgoutput")
                .deserializer(new JsonDebeziumDeserializationSchema())
                .startupOptions(StartupOptions.latest())
                .build();
        env.fromSource(source, WatermarkStrategy.noWatermarks(), "cdc source")
                //.print(); 直接控制台打印
                .sinkTo(kafkaSink);

        env.execute();
    }
}
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>1.20.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-base</artifactId>
            <version>1.20.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-postgres-cdc</artifactId>
            <version>3.2.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>1.20.0</version>
            <scope>compile</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>3.2.0-1.19</version>
        </dependency>
    </dependencies>


    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.4.2</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass>test.PostgresCdcTest</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

debezuim会自动创建postgres的复制槽和发布,在pg中验证:

SHOW wal_level;
SELECT * FROM pg_replication_slots WHERE slot_name = 'flink_cdc';
SELECT * FROM pg_publication_tables;

#也可以手动创建
SELECT * FROM pg_create_logical_replication_slot('flink_cdc1', 'pgoutput');
CREATE PUBLICATION flink_pub FOR ALL TABLES;

#pg_replication_slots.active只有客户端订阅了流才会=true

4、测试DML或DDL验证

INSERT INTO public.biz_device_product (id, name) VALUES (1, 'Device A');
UPDATE public.biz_device_product SET name = 'Device B' WHERE id = 1;

可以在kafka或控制台中看到推送结果

Flink CDC订阅PostgreSQL实现数据捕获并推送到Kafka