LOXODATA

PostgreSQL et Debezium

2025-06-03   1448 mots, 7 minutes de lecture   Philippe Viegas

Dans cet article, nous allons regarder comment connecter PostgreSQL vers d’autres applications avec Debezium, dans le cadre d’un cluster Patroni assurant la haute disponibilité. Debezium repose sur un ensemble de services distribués conçus pour capturer les changements survenants dans vos bases de données afin que d’autres applications puissent consulter ces changements et les consommer, communément appelé CDC pour change data capture.

Debezium

Debezium propose de capturer les flux d’évènements depuis vos bases de données permettant aux applications de répondre à des changements survenant au niveau des lignes de tables sur vos bases de données.

Debezium repose sur le broker de messages Apache Kafka et fournit un ensemble de connecteurs sources pour Kafka Connect par type de sources, par exemple pour PostgreSQL. Le connecteur enregistre l’historique des changements sur une base de données en acquittant les changements au fil de l’eau et en sauvegardant chaque évènement dans un topic Kakfa (file de messages), et la position dans les journaux de transactions (WAL).

Le connecteur Debezium pour PostgreSQL se base sur le décodage logique pour récupérer les changements commités dans les journaux de transactions et un plugin de sortie pour mettre en forme les données.

PostgreSQL ne conserve pas tout l’historique des journaux de transactions, car les fichiers WAL sont recyclés quand nécessaire par PostgreSQL. Debezium doit donc gèrer lui même l’historique des lignes, et donc l’initialiser. Ceci se fait à la première connexion par une sauvegarde de l’état de la base de données (snapshot). Pour ce faire, le connecteur suit les étapes suivantes :

  • Création d’une transaction avec un niveau d’isolation à SERIALIZABLE pour assurer la cohérence des données pour les lectures pendant cette transaction;
  • Lecture de la position actuelle (LSN) du journal de transaction;
  • Lecture des tables et schémas de la base de données et génération d’un évènement READ pour chaque ligne, chaque évènement est ensuite écrit dans un topic Kakfa correspondant à la table;
  • Commit de la transaction;
  • Enregistrement de l’état de la sauvegarde et la dernière position (LSN) lue;

Ensuite, le connecteur entre dans sa boucle d’envoi d’évènements consommés depuis PostgreSQL. Le connecteur convertit chaque changement en évènement create, update ou delete définit par Debezium, avec son LSN et les enregistre dans Kakfa Connect, qui à son tour les transmets à un topic Kakfa. Kakfa Connect stocke en parallèle le LSN actuel afin de repartir de ce point lors de l’arrêt de Kakfa Connect.

Architecture

À l’image de cette première architecture basée sur Kakfa Connect et Apache Kakfa, il existe deux autres types d’architecture disponible :

  • Un serveur Debezium, une application pour diffuser les changements de bases de données sources vers des infrastructures de messagerie de type Redis par exemple;
  • Une librairie Debezium Engine intégrée directement dans votre application Java;

Mise en place avec PostgreSQL

Le connecteur Debezium pour PostgreSQL prend en charge deux plugins de sortie : pgoutput qui est le plugin par défaut utilisé avec la réplication logique ou decoderbufs basé sur le format Protobuf et maintenu par la communauté Debezium.

Pour cet exemple, nous utilisons le plugin decoderbufs. Nous ne détaillons pas la mise en place de Debezium et ses services associés, mais uniquement la mise en place côté PostgreSQL. Pour Debezium, il convient de se référer à la documentation.

Il faut tout d’abord installer le plugin decoderbufs sur votre serveur via le système de paquet approprié.

Sous RedHat, on peut par exemple utiliser la commande dnf pour installer le paquet pour la version 14 de Postgresql en utilisant le dépôt du PGDG :

dnf install postgres-decoderbufs_14.x86_64

On vérifie l’installation du paquet avec :

dnf list installed postgres-decoderbufs_14.x86_64

Installed Packages
postgres-decoderbufs_14.x86_64   3.0.2-1PGDG.rhel9

Puis on modifie la configuration de PostgreSQL via la commande patronictl edit-config pour charger le plugin et autoriser le décodage logique via le paramètre wal_level à logical.

cat | patronictl -c /etc/patroni/patroni.yml edit-config --apply - <<_EOF_
postgresql:
  parameters:
   shared_preload_libraries: decoderbufs
   wal_level: logical
_EOF_

Et on redémarre les instances pour prise en compte des paramètres avec la commande patronictl restart car Les paramètres wal_level et shared_preload_libraries exige un redémarrage de l’instance PostgreSQL.

patronictl -c /etc/patroni/config.yml restart --force loxodemo

Il peut être nécessaire d’affiner les paramètres suivants selon votre cas d’usage :

  • max_wal_senders (défaut à 10) : doit être équivalent ou plus au nombre de slots de réplication, plus le nombre de réplicas présents;
  • max_replication_slots (défaut à 10) : indiquant le nombre de slots de réplication maximum (réplication logique et physique). Doit être au moins égal au nombre de réplica et de souscription (pour chaque souscription, un apply worker et plusieurs tablesync worker);
  • max_slot_wal_keep_size (défaut à -1) : permet d’indiquer la taille maximale de fichiers WAL pouvant être conservés par un slot de réplication;

Nous créons maintenant le rôle de réplication dédié au connecteur Debezium et ajoutons les permissions nécessaires.

demo=# CREATE ROLE debezium_replication LOGIN REPLICATION WITH PASSWORD '<redacted>'
CREATE ROLE
demo=# GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium_replication;
GRANT
demo=# ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO debezium_replication;
ALTER DEFAULT PRIVILEGES

On ajoute l’utilisateur créé dans le fichier .pgpass si ce dernier est utilisé.

Il faut ensuite adapter le fichier pg_hba.conf pour autoriser le connecteur Debezium à se connecter à l’instance PostgreSQL.

host         demo         debezium_replication         <ip du connecteur ou plage autorisée>         scram-sha-256

Nous créons à présent la publication portant sur tous les schémas de la base de données, et créons manuellement le slot de réplication. Il est possible de limiter les tables publiées dans la commande CREATE PUBLICATION ou de définir un schéma en particulier.

demo=# CREATE PUBLICATION pub_con_to_debezium FOR ALL TABLES;
CREATE PUBLICATION
demo=# SELECT * FROM pg_create_logical_replication_slot('logical_pub_con_to_debezium_slot', 'decoderbufs');
INFO:  Exiting startup callback
            slot_name             |    lsn
----------------------------------+------------
 logical_pub_con_to_debezium_slot | D/7901ACD8
(1 row)

Nous mettons à jour la configuration de PostgreSQL via Patroni pour déclarer l’utilisation du slot de réplication logique et rechargeons la configuration.

cat | patronictl -c /etc/patroni/patroni.yml edit-config --apply - <<_EOF_
postgresql:
  use_slots: true
slots:
  logical_pub_con_to_debezium_slot:
    database: demo
    plugin: decoderbufs
    type: logical  
_EOF_

Supervision

Une fois la configuration en place, il est nécessaire de surveiller la consommation des WAL et l’état des slots de réplication. En effet, si le connecteur venait à s’arrêter de consommer les données du slot de réplication, le paramètre confirmed_flush_lsn n’étant pas mis à jour, alors PostgreSQL va accumuler les fichiers WAL nécessaires au slot de réplication en attendant une reprise éventuelle. Ceci peut conduire à la saturation du système de fichiers et entraîner dans le pire des cas l’arrêt de PostgreSQL.

On peut utiliser des gardes fous comme le paramètre max_slot_wal_keep_size pour limiter la quantité de fichiers WAL conservée par un slot de réplication.

Pour surveiller l’activité des slots de réplication, nous utilisons la vue pg_replication_slots (ici en version 14 de PostgreSQL).

demo=# select * from pg_replication_slots;
-[ RECORD 1 ]-------+---------------------------------
slot_name           | logical_pub_con_to_debezium_slot
plugin              | decoderbufs
slot_type           | logical
datoid              | 16437
database            | demo
temporary           | f
active              | t
active_pid          | 2132668
xmin                | 
catalog_xmin        | 134506175
restart_lsn         | 55/99983378
confirmed_flush_lsn | 55/999873C0
wal_status          | reserved
safe_wal_size       | 
two_phase           | f
  • active vaut true si le slot émet des données;
  • catalog_xmin correspondant à la transaction la plus ancienne affectant le catalogue système et requis par le slot;
  • restart_lsn la position du plus ancien WAL requis par le slot;
  • confirmed_flush_lsn la dernière position reçue et rejouée côté consommateur;
  • wal_status indique la disponibilité des fichiers WAL requis par le slot

On vérifie que les paramètres confirmed_flush_lsn et restart_lsn soient bien incrémentés.

On peut calculer en utilisant cette même vue la quantité de WAL retenue et le lag vers le connecteur.

demo=# SELECT slot_name,
       active,
       confirmed_flush_lsn,
       pg_current_wal_lsn(),
       pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS retained_walsize,
       pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) AS subscriber_lag
FROM pg_replication_slots;
            slot_name             | active | confirmed_flush_lsn | pg_current_wal_lsn | retained_walsize | subscriber_lag
----------------------------------+--------+---------------------+--------------------+------------------+----------------
 logical_pub_con_to_debezium_slot | t      | 20D4/EEAB53A0       | 20D4/FB3A6000      | 239 MB           | 201 MB
(1 rows)
  • retained_walsize la quantité de WAL (en octet) retenue par le slot côté publication
  • subscriber_lag le retard de réplication logique (en octet) entre la publication et la souscription.

Configuration du connecteur Debezium

Afin de configurer le connecteur Debezium source pour PostgreSQL, il est nécessaire de fournir au minimum les éléments suivants dans un fichier json que l’on peut nommer pg-example-connector.json :

{
   "name":"pg-example-connector",
   "config":{
      "connector.class":"io.debezium.connector.postgresql.PostgresConnector",
      "database.hostname":"pgdeb01",
      "database.port":"5437",
      "database.user":"debezium_replication",
      "database.password":"<redacted>",
      "database.dbname":"demo",
      "plugin.name":"decoderbufs",
      "slot.name":"logical_pub_con_to_debezium_slot",
      "publication.name":"pub_con_to_debezium",
   }
}

La liste complète des paramètres de configuration du connecteur Debezium pour PostgreSQL est disponible dans la documentation de Debezium.

L'API REST de Kafka Connect permet d’enregistrer de nouveaux connecteurs et de vérifier le status de ces derniers. Une fois enregistré, le connecteur est actif. Côté Kakfa, il est possible d’utiliser l’utilitaire bin/kafka-console-consumer.sh pour s’abonner à un topic donné et visualiser les messages reçus depuis le connecteur PostgreSQL.

Pour le reste des opérations de Apache Kafka, je vous renvoie à la documentation complète de ce dernier.

Crédits photo : Callum Blacoe