如果只做数据同步,官网有更简单的无代码案例
com.ververica
和 org.apache.flink
的 CDC(Change Data Capture)包都可以用于 Flink 实现流式数据同步
com.ververica
:
- 不需要 Kafka 或其他中间件,直接与数据库交互,部署更简单。
- 适合直接从数据库获取变更,并处理为 Flink 数据流。
org.apache.flink
:
- 内部集成了 Debezium 引擎,用于从 PostgreSQL 的逻辑复制槽中读取 WAL 日志并解析变更事件。
- 更适合已有 Kafka 数据流的环境。
1、运行postgresql和kafka
version: '3.6'
services:
postgresql:
image: bitnami/postgresql:17.1.0
container_name: pg
environment:
- ALLOW_EMPTY_PASSWORD=yes
- POSTGRESQL_WAL_LEVEL=logical
ports:
- "5432:5432"
volumes:
- /data/pg:/bitnami/postgresql
- /data/pg/postgresql.conf:/opt/bitnami/postgresql/conf/postgresql.conf
kafka:
container_name: kafka
image: 'bitnami/kafka:3.3.1'
user: root
ports:
- '9092:9092'
- '9093:9093'
environment:
- KAFKA_ENABLE_KRAFT=yes
- KAFKA_CFG_PROCESS_ROLES=broker,controller
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.1.4:9092
- KAFKA_BROKER_ID=1
- KAFKA_KRAFT_CLUSTER_ID=iZWRiSqjZAlYwlKEqHFQWI
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@localhost:9093
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_HEAP_OPTS=-Xmx512M -Xms256M
volumes:
- /data/kafka/broker01:/bitnami/kafka:rw
2、配置postgres WAL 日志,使用pgoutput
找到postgresql.conf,修改
wal_level = 'logical';
max_replication_slots = 16;
max_wal_senders = 16;
#systemctl restart postgres 重启pg
3、编写本地项目代码
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.cdc.connectors.base.options.StartupOptions;
import org.apache.flink.cdc.connectors.postgres.source.PostgresSourceBuilder;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class PostgresCdcTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
.setBootstrapServers("192.168.1.4:9092")
.setRecordSerializer(
KafkaRecordSerializationSchema.builder()
.setTopic("flink-test")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.build();
PostgresSourceBuilder.PostgresIncrementalSource<String> source = PostgresSourceBuilder.PostgresIncrementalSource.<String>builder()
.hostname("192.168.1.4")
.port(5432)
.database("postgres")
.schemaList("public")
.tableList("public.biz_device_product")
.username("postgres")
.slotName("flink_cdc")
.password("postgres")
.decodingPluginName("pgoutput")
.deserializer(new JsonDebeziumDeserializationSchema())
.startupOptions(StartupOptions.latest())
.build();
env.fromSource(source, WatermarkStrategy.noWatermarks(), "cdc source")
//.print(); 直接控制台打印
.sinkTo(kafkaSink);
env.execute();
}
}
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.20.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>1.20.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-postgres-cdc</artifactId>
<version>3.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.20.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>3.2.0-1.19</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.4.2</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>test.PostgresCdcTest</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
debezuim会自动创建postgres的复制槽和发布,在pg中验证:
SHOW wal_level;
SELECT * FROM pg_replication_slots WHERE slot_name = 'flink_cdc';
SELECT * FROM pg_publication_tables;
#也可以手动创建
SELECT * FROM pg_create_logical_replication_slot('flink_cdc1', 'pgoutput');
CREATE PUBLICATION flink_pub FOR ALL TABLES;
#pg_replication_slots.active只有客户端订阅了流才会=true
4、测试DML或DDL验证
INSERT INTO public.biz_device_product (id, name) VALUES (1, 'Device A');
UPDATE public.biz_device_product SET name = 'Device B' WHERE id = 1;
可以在kafka或控制台中看到推送结果
Comments | NOTHING