Stream data from PostgreSQL databases to Materialize using Change Data Capture
Materialize can ingest data from PostgreSQL (11+) databases using PostgreSQL’s native logical replication protocol for Change Data Capture (CDC). This allows you to continuously stream INSERT, UPDATE, and DELETE operations from your PostgreSQL database into Materialize.
Create a publication for the tables you want to replicate:
-- In PostgreSQL: Create publication for specific tablesCREATE PUBLICATION mz_source FOR TABLE table1, table2, table3;-- Or for all tables in the databaseCREATE PUBLICATION mz_source FOR ALL TABLES;
Create a dedicated user with replication permissions:
-- In PostgreSQLCREATE USER materialize WITH REPLICATION PASSWORD '<password>';-- Grant permissions on the publicationGRANT SELECT ON table1, table2, table3 TO materialize;GRANT USAGE ON SCHEMA public TO materialize;-- For all tables in a schemaGRANT SELECT ON ALL TABLES IN SCHEMA public TO materialize;ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO materialize;
Choose a network security option based on your deployment:
AWS PrivateLink
SSH Tunnel
Static IP Allowlist
AWS PrivateLink provides secure, private connectivity between Materialize and your PostgreSQL database in AWS.1. Create a target group for your PostgreSQL instance:
Target type: IP address
Protocol: TCP
Port: 5432 (or your custom port)
2. Create a Network Load Balancer in the same VPC as your PostgreSQL instance3. Create a TCP listener that forwards to your target group4. Create a VPC endpoint service and associate it with the NLB5. In Materialize, create the PrivateLink connection:
CREATE CONNECTION privatelink_svc TO AWS PRIVATELINK ( SERVICE NAME 'com.amazonaws.vpce.us-east-1.vpce-svc-0e123abc123198abc', AVAILABILITY ZONES ('use1-az1', 'use1-az4'));
6. Retrieve the AWS principal:
SELECT principalFROM mz_aws_privatelink_connections plcJOIN mz_connections c ON plc.id = c.idWHERE c.name = 'privatelink_svc';
7. Configure your VPC endpoint service to accept connections from the AWS principal
Use an SSH tunnel to connect to PostgreSQL through a bastion host.1. Get Materialize egress IPs:
SELECT * FROM mz_egress_ips;
2. Configure your bastion host to allow SSH connections from those IPs3. Create the SSH tunnel connection:
CREATE CONNECTION ssh_connection TO SSH TUNNEL ( HOST 'bastion-host.example.com', PORT 22, USER 'materialize');
4. Get the public keys:
SELECT public_key_1, public_key_2FROM mz_ssh_tunnel_connectionsWHERE name = 'ssh_connection';
5. Add both public keys to ~/.ssh/authorized_keys on your bastion host
Allow connections from Materialize’s static egress IP addresses.1. Get Materialize egress IPs:
SELECT * FROM mz_egress_ips;
2. Update your PostgreSQL firewall/security group to allow connections from those IP addresses on port 5432
If your tables contain unsupported data types, use the TEXT COLUMNS option:
CREATE SOURCE pg_source FROM POSTGRES CONNECTION pg_connection ( PUBLICATION 'mz_source', TEXT COLUMNS (table_name.column_with_unsupported_type) ) FOR ALL TABLES;
Materialize automatically creates a replication slot in PostgreSQL for each source. The replication slot name is prefixed with materialize_ for easy identification.
-- Find the replication slot nameSELECT id, replication_slotFROM mz_internal.mz_postgres_sources;
id | replication_slot--------+---------------------------------------------- u8 | materialize_7f8a72d0bf2a4b6e9ebc4e61ba769b71
Each source uses a single replication slot for all tables in the publication, minimizing the performance impact on PostgreSQL.
-- In PostgreSQLSELECT slot_name, pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) AS lagFROM pg_replication_slotsWHERE slot_name LIKE 'materialize_%';
-- List all subsourcesSHOW SUBSOURCES ON pg_source;-- Drop the errored subsourceDROP SOURCE table_name;-- Add it back with the updated schemaALTER SOURCE pg_source ADD SUBSOURCE table_name;
For large initial snapshots, use a larger cluster temporarily:
-- Before creating the sourceALTER CLUSTER postgres_ingest SET (SIZE = '200cc');-- After snapshotting completesALTER CLUSTER postgres_ingest SET (SIZE = '100cc');
If you add or remove tables from the PostgreSQL publication, Materialize will not automatically detect these changes. You must manually add or drop subsources.