PgOutput Replication Setup
DLH.io documentation for PgOutput Replication Setup
Setup Logical Replication :: pgoutput
Pgoutput logical replication can capture deletes which enables the soft delete function in DLH.io.
A main difference with pgoutput is tht you MUST create a REPLICATION object in postgres and have the REPLICATION permission also for the user quering the replication solot.
Check to see if any logical replication slots already exist by running the command:
select * from pg_catalog.pg_replication_slots;
There is almost always one default one called pghoard_local with a physical slot_type. If this is the only one that exist then you have NO logical replication slots yet.
Execute and Record the Following Settings
Review Current Settings
select setting from pg_settings where name ='wal_level';
select setting from pg_settings where name ='max_replication_slots';
select setting from pg_settings where name ='wal_sender_timeout';
select setting from pg_settings where name ='max_wal_senders';Ensure that:
- WAL_LEVEL = 'logical'
- WAL_SENDER_TIMEOUT = 0 or 60000 (0 = infinity)
- MAX_REPLICATION_SLOTS >= 8
- MAX_WAL_SENDERS >= 16
Create a User for DLH.io
Replace the placeholders, schema name, etc. below with your actual values and record them for later references as you'll need them to setup the user that will have access to the database you will connect with in the DLH.io connection info, for example we usually recommend the '' as 'DLH.io_sync_svc':
Create a User
CREATE USER <username> PASSWORD 'some-password';
GRANT USAGE ON SCHEMA "public" TO <username>;
GRANT SELECT ON ALL TABLES IN SCHEMA "public" TO <username>;
------- #### or for specifically cherry-picked selected tables, #### -------
-- ALTER DEFAULT PRIVILEGES IN SCHEMA "some_schema" REVOKE SELECT ON TABLES FROM <username>;
-- revoke the tables using this concept
-- REVOKE SELECT ON ALL TABLES IN SCHEMA "some_schema" FROM <username>;
ALTER DEFAULT PRIVILEGES IN SCHEMA "public" GRANT SELECT ON TABLES TO <username>;
-- ………then……for continued individual table access
-- GRANT SELECT ON "some_schema"."some_table" TO <username>;
-- ALTER DEFAULT PRIVILEGES IN SCHEMA "some_schema" GRANT SELECT ON TABLES TO <username>;as an example, to create a user with access to all tables:
CREATE USER datalakehouse_sync_svc PASSWORD 'P@ssword1';
GRANT USAGE ON SCHEMA "public" TO datalakehouse_sync_svc;
GRANT SELECT ON ALL TABLES IN SCHEMA "public" TO datalakehouse_sync_svc;Create a Publication for PgOutput
Creating a publication object on the postgresql database using the following logic, as an example this code creates a publication object with the name of, dlh_pub_selected_tables:
CREATE PUBLICATION dlh_pub_selected_tables FOR TABLE public.actor, public.category, public.staff, public.film;
-- alternatively select all tables for the publication
-- CREATE PUBLICATION dlh_pub_all_tables FOR ALL TABLES;Then confirm that the publication was created,
SELECT * FROM pg_publication_tables where pubname = 'dlh_pub_selected_tables';Once the publication is created you can move forward to creating the logical repication slot with the pgoutput option.
Create the Logical Replication for PgOutput
Creating the logical replication slots require some basic configurations on the PostgreSQL server.
Adjust a few configurations:
-- Set timeout to 30 mins
SET statement_timeout = '1800';
Now create the actual logical replication slot, using specifically the name 'datalakehouseio_replication_slot', or another unique name if this one is already used, and as the second argument use the 'pgoutput' value :
SELECT pg_create_logical_replication_slot('datalakehouseio_replication_slot', 'pgoutput');
Create a ROLE and Grant permissions to the user created previously,
CREATE ROLE datalakehouse_sync_role;ALTER ROLE datalakehouse_sync_role WITH REPLICATION;GRANT datalakehouse_sync_role TO datalakehouse_sync_svc;
Alternatively you can simply alter the user with the syntax, ALTER USER <read-only-username> WITH REPLICATION; though altering a user directly is not preferred over creating a role and assigning the role to the user.
Alternative Publication Tactics (Not Recommended)
Alternatively if your team desires to have the CDC logical replication track only certain type of events: insert, update, or delete, use the following SQL code syntax when creating the publication for the pgoutput plugin extension, adjusting based on your applied changes from the above steps, for example:
CREATE PUBLICATION <publication_name> FOR TABLE <table_name> WITH (publish = 'INSERT, UPDATE');Analyzing Logical Replication Slots
If you are testing or concerned that replication is not working, you may run a SELECT statement against the replication slot to determine if you see changes (inserts, updates, deletes) from your system that you believe should be getting captured. Peeking at the logical changes are the easiest way to check if the logical replication is working and to see if there are any tracked changes flowing through without actually changing the pointer like when you use get_changes command. Using ...peek_changes will just show you the changes without impacting the CDC itself:
SELECT * FROM pg_logical_slot_peek_binary_changes('datalakehouseio_replication_slot', NULL, NULL, 'proto_version', '1', 'publication_names', 'dlh_pub_selected_tables');
[NB]The pg_current_wal_lsn and the lsn_distance continue to change as they are based on the Log Sequence Number (LSN), a 64-bit integer used to determine the position of the WRite Ahead Log (WAL) stream which is there to preserve data integrity acting as a pointer to the XLOG. The print out is two hexadecimal numbers upt to 8-digits each, separated by a slash (/), for examplte 63/B30000220. If you compare to LSNs by using basic operator like =, >, < , −, the result is the number of bytes between the two WAL positions.
Test Your Logical Replication
In order to confirm that your logical replication is working, you can artificially create, or wait for, a DML activity of INSERT, UPDATE, DELETE.
Then run the above peek_changes command,
SELECT * FROM pg_logical_slot_peek_changes('datalakehouseio_replication_slot', null, null);Get the last received WAL Position
This will show the last received and last replayed WAL positions. If there is a delta (in Bytes) beteween pg_last_wal_receive_lsn and pg_last_wal_replay_lsn, then there is a lag then data is available typically.
select pg_last_wal_receive_lsn(), pg_last_wal_replay_lsn(), pg_last_xact_replay_timestamp();Confirm the Logical Replication Lag/Pointer
To quickly shown any flush confirmations from using the get_changes:
SELECT
slot_name,
confirmed_flush_lsn,
pg_current_wal_lsn(),
(pg_current_wal_lsn() - confirmed_flush_lsn) AS lsn_distance
FROM pg_replication_slots
-- WHERE slot_name IN ('datalakehouseio_replication_slot')
;Which will look similar to the following output
| slot_name | confirmed_flush_lsn | pg_current_wal_lsn | lsn_distance |
|---|---|---|---|
| pghoard_local | 63/B00008F0 | ||
| datalakehouseio_replication_slot | 63/390006B0 | 63/B00008F0 | 1996489280 |
| (2 rows) |
Overall Big Picture Replication View
select * from pg_stat_replication;Get the Time Lag In Human Readable Format
SELECT now() - pg_last_xact_replay_timestamp() AS time_lag;General Logical Replication Slot Details
This table provides the basic information about the logic replication you have created. When DLH.io is retrieving from the slot, the active colum will typically be set to "t" instead of "f" for true and false repsectively.
SELECT * FROM pg_replication_slots;Peeking at Logical Slot Changes
There are several parameters for working with the pg_logical_slot_peek_changes function as further described here, https://pgpedia.info/p/pg_logical_slot_peek_changes.html, for testing the flow of WAL changes as identifying the LSNs.
SELECT * FROM pg_logical_slot_peek_changes('datalakehouseio_replication_slot', NULL, NULL, 'include-xids', '0');Changes to the pg_last_wal_replay_lsn
Is not leveraged by DLH.io as this relates to a streaming replication standby replication instance, where DLH.io is on-demand frequency sync scheduled query-based.
Disk Space Increase is Normal
Aiven.io has a good article on standard operation increase in disk space due to WAL, https://developer.aiven.io/docs/products/postgresql/concepts/pg-disk-usage.html
Run any of the main postgreSQL commands to check disk space on the instance:
\lor\l+SELECT pg_size_pretty(pg_database_size('yourdbname'));
Logical Replication Failues
In a standard process where the DDL of your postgreSQL tables do not change much there is very little cause for failure other than not have a sync frequency that is not aligned with the volume of data your database tables produce commensurate with the disk space of the database/server. Since Logical Replication does not track DDL changes an error could occur if a DDL change is made but downstream impacts are not considered. In the case of DLH.io a manual change may be required on your target system in order to reflect a DDL change on your source system.
When errors, if any occur, please report them immediately by opening up a support ticket. Methods used on your source database side to clear issues may include things such as using:
- pg_replication_origin_advance, to skip the transaction that is failing