Debezium ile Veritabanınızdaki Değişiklikleri Yakalayın
Yazılım geliştirme dünyasında veritabanları bizim için vazgeçilmez bir araçtır. Onlar sayesinde verilerimizi kayıt altında tutar, ihtiyaç duyduğumuzda kullanırız. Bu yazımda size veritabanlarında yapılan değişiklikleri real-time olarak nasıl dinleyebileceğimizi, kafka üzerinden bu eventleri nasıl yakalayabileceğimizi ve development ortamımızı docker üzerinden nasıl kurabileceğimizi anlatacağım.Debezium Nedir?Debezium, Java ile geliştirilmiş, veritabanı üzerindeki değişiklikleri dinleyen, bunları belirli kafka topiclerine gönderen bir Change Data Capture (CDC) aracıdır. Debezium sayesinde veritabanındaki değişiklikleri anlık olarak dinleyebilir, bu verileri başka bir uygulama üzerinden kullanabilirsiniz. Debeziumun burada bize sağladığı en önemli özelliklerden birisi gönderilen veri değişikliklerinin sırasının korunması ve her bir eventin sadece bir kere gönderilmesidir. Böylece güvenilebilir mimariler inşa edilmeye olanak sağlanmış oluyor.Debeziumu kullanabilmenin 3 farklı yolu var: kafka connect, debezium server ve debezium engine.Debezium’a Ne Zaman İhtiyaç Duyarız?Debeziuma ihtiyaç duyacağımız birden fazla durum vardır. Bunlar, veri replikasyonu, servisler arası veri paylaşımı, ETL besleme vs. olabilir. Benim durumumda bir DB’ye yazma işlemi yapan 2 uygulama arasında veri değişikliklerini gerçek zamanlı yakalayarak iki uygulama arasında senkronizasyonu kolaylaştırdı. Debezium sayesinde legacy appten DB ye yazılan verileri güncel app içerisine aktarabildim böylece legacy appi maintain etme yükümlülüğümü azaltıp bir bağımlılığı daha koparmış oldum.Debezium ServerDebezium server, kafkadan bağımsız bir şekilde çalışıyor. Bunun için döküman üzerinde belirtildiği gibi bir sunucunun içerisine kurulum yapmanız gerekiyor. Kafka Connect kullanmak istemeyenler için geliştirilmiş bir çözüm.Debezium EngineDebezium engine ise var olan java uygulamalarınızın içerisinde embedded olarak çalıştırmak için kullanılan versiyonu. Eğer bir java uygulamanız için CDC ihtiyacınız var ise kafka ekosistemini projenize dahil etmek istemiyorsanız debezium engine kullanabilirsiniz.Kafka ConnectDebeziumun bir diğer kurulumu ve bizim de bu yazımızda değineceğimiz kısmı ise kafka connect’tir. Öncelikle kafka connect’in ne olduğuna değinelim.Kafka connect, kafka ekosistemine diğer veri sistemlerinden stream oluşturmaya yaran bir framework’tür. Bu sayede kafkaya canlı veri stream edebiliyoruz ya da kafkadan başka bir veri sistemine veri gönderebiliyoruz.https://debezium.io/documentation/reference/3.2/architecture.htmlYukarıdaki görsel üzerinden mimariyi inceleyelim. Kafka connect üzerinden debezium CDC kurulumu yaptınız. Burada çalışan debezium, MySQL’in binlog dosyasını dinlemeye alıyor. MySQL veritabanında yapılan işlemleri log dosyası üzerinden yakalıyor, dosyanın en son neresini okuduğunu not alıyor ki uygulamanın çökmesi durumunda kaldığı yerden devam edebilsin. Sonrasında config üzerinden tanımladığınız kural setine göre eğer bir event göndermesi gerekiyor ise bu eventi kafka topiclerine gönderiyor. Daha sonrasında da gelen topiclere isterseniz consume edin, isterseniz bir sink connectörü kullanıp başka bir yere yönlendirin. Örneğin elastic sink üzerinden elesticsearch indexlerinizi üretebilirsiniz ya da mongo sink üzerinden mongodb’ye verinizi replika edebilirsiniz.Ben bu yazımda kafka client üzerinden topiclere nasıl subscribe olabileceğimizi anlatacağım.Kurulumdan Önce…Geliştirme ortamımızı docker üzerinden oluşturacağız. Debezium’un resmi dökümanları üzerindeki tutorial yazısında docker üzerinden kurulum gösteriliyor ama ben aşağıdaki sebeplerden ötürü oradaki kurulum adımlarınızı takip etmenizi önermiyorum:1 - Resmi tutorial içerisindeki image’lar Apple Silicon işlemciler ile uyumlu değil.2- Tutorial içerisindeki gösterilen container’lerin hepsi debezium tarafından yayımlanmış, bunlara kafka ve zookeeper dahil. Community desteği ve apple silicon sorunlarından dolayı debezium/kafka image’ını kullanmak mantıklı değil.3- Tutorialde gösterilen kafka kurulumunda zookeeper kullanılıyor. Kafka’nın 3.3 versiyonundan sonra kafkayı kraft modunda zookeeper bağımlılığı olmadan kullanabiliyoruz. Bu sayede bir container’ı daha sistemden çıkarabiliriz demek.4- Tutorialde docker compose yok, uygulamayı docker-compose üzerinden çalıştırma yükümlülüğü de kullanıcıya bırakılmış.KurulumKurulumda kafka image’ı için bitnami tarafından yayınlanmış bitnami/kafka kullanacağız. Community desteği ve dökümantasyon içeriği bakımından bu image’ı tercih ettim.services: kafka: image: bitnami/kafka:3.8 ports: - "9094:9094" environment: - KAFKA_CFG_NODE_ID=0 - KAFKA_CFG_PROCESS_ROLES=controller,broker - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094 - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093 - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER volumes: - kafka-data:/bitnami/kafka healthcheck: test: ["CMD-SHELL", "kafka-topics.sh --bootstrap-server localhost:9092 --list || exit 1"] interval: 10s timeout: 5s retries: 5 start_period: 30svolumes: kafka-data:Burada değinmek istediğim bir kaç nokta var. Kafkaya burada hem controller hem de broker rolü veriyoruz. Sonrasında daKAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093Bu config ile kafkayı kraft modunda başlatıyoruz. Böylece zookeeper için bir container ayağa kaldırmamız gerekmiyor. Ayrıca 9094 portundan da local makine üzerinden kafkaya bağlanabiliyoruz. Consumer appimizi henüz dockerise etmediysek network problemi yaşamamak için dışarıdan bağlanabiliyoruz.Kafka UIŞimdi sırada opsiyonel bir adım var. Development ortamında kafkayı debuglamak için provectuslabs/kafka-ui kullanmanızı öneririm.services: kafka: image: bitnami/kafka:3.8 ports: - "9094:9094" environment: - KAFKA_CFG_NODE_ID=0 - KAFKA_CFG_PROCESS_ROLES=controller,broker - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094 - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093 - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER volumes: - kafka-data:/bitnami/kafka healthcheck: test: ["CMD-SHELL", "kafka-topics.sh --bootstrap-server localhost:9092 --list || exit 1"] interval: 10s timeout: 5s retries: 5 start_period: 30s kafka-ui: image: provectuslabs/kafka-ui:latest ports: - "8080:8080" environment: - KAFKA_CLUSTERS_0_NAME=local - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092 - KAFKA_CLUSTERS_0_METRICS_PORT=9092 - KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME=connect - KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS=http://connect:8083 depends_on: kafka: condition: service_healthy connect: condition: service_healthy restart: unless-stoppedvolumes: kafka-data:Böylece kafka connect bağlantı durumunu, topiclerdeki mesajları, offsetleri ve daha bir çok şeyi http://localhost:8080/dashboard adresinden görüntüleyebilirsiniz.Kafka ConnectDebezimun aslında bir kafka connect ürünü olduğunu söylemiştim. Bu sebeple önce kafka connect kurmamız gerekiyor, sonrasında da bunun içerisine debeziumu yüklememiz gerekiyor.Kafka connect image’ı için bir kaç alternatif var, ben burada confluentinc/cp-kafka-connect kullanmayı tercih ettim. Bunun nedeni, zengin dökümantasyon desteği, confluent hub üzerinden diğer kafka connect ürünlerine erişim, RestAPI ile connector yönetimi, debezium desteği ve kolay kurulum adımları.Önce bir Dockerfile dosyası oluşturalım..├── docker│ └── kafka-connect│ └── Dockerfile└── docker-compose.ymlVe sonrasında da base image’ımızı alıp içerisine debezium yükleyelim.FROM confluentinc/cp-kafka-connect:7.9.2RUN confluent-hub install --no-prompt debezium/debezium-connector-mysql:3.1.2confluent-hub cli toolu üzerinden connectörleri rahatlıkla yükleyebiliyoruz.Sonrasında docker-compose stackimiz içerisine ekleyelim.services: kafka: image: bitnami/kafka:3.8 ports: - "9094:9094" environment: - KAFKA_CFG_NODE_ID=0 - KAFKA_CFG_PROCESS_ROLES=controller,broker - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094 - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093 - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER volumes: - kafka-data:/bitnami/kafka healthcheck: test: ["CMD-SHELL", "kafka-topics.sh --bootstrap-server localhost:9092 --list || exit 1"] interval: 10s timeout: 5s retries: 5 start_period: 30s kafka-ui: image: provectuslabs/kafka-ui:latest ports: - "8080:8080" environment: - KAFKA_CLUSTERS_0_NAME=local - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092 - KAFKA_CLUSTERS_0_METRICS_PORT=9092 - KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME=connect - KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS=http://connect:8083 depends_on: kafka: condition: service_healthy connect: condition: service_healthy restart: unless-stopped connect: build: context: . dockerfile: docker/kafka-connect/Dockerfile ports: - "8083:8083" environment: - CONNECT_BOOTSTRAP_SERVERS=kafka:9092 - CONNECT_GROUP_ID=connect-cluster - CONNECT_CONFIG_STORAGE_TOPIC=my_connect_configs - CONNECT_OFFSET_STORAGE_TOPIC=my_connect_offsets - CONNECT_STATUS_STORAGE_TOPIC=my_connect_statuses - CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1 - CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1 - CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1 - CONNECT_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter - CONNECT_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter - CONNECT_REST_PORT=8083 - CONNECT_REST_ADVERTISED_HOST_NAME=connect - CONNECT_PRODUCER_MAX_REQUEST_SIZE=15728640 - CONNECT_MAX_REQUEST_SIZE=15728640 depends_on: kafka: condition: service_healthy restart: unless-stopped healthcheck: test: "curl -f http://localhost:8083/ || exit 1" interval: 10s timeout: 5s retries: 3 start_period: 30svolumes: kafka-data:Şu haliyle debezium için gerekli minimum stackimiz hazır hale geldi. Kafka (Kraft mode), kafka connect ve debezium. Şimdi kafka connect üzerinden bir debezium task’i oluşturup MySQL veritabanındaki değişiklikleri dinleyebiliriz.MySQL HazırlıkMySQL veritabanının debezium tarafından dinlenebilmesi için yapmamız gereken bazı ayarlar var. Bunlar user oluşturma, permission, binlog dosyası vs. olarak gitmekte. İşlemler için buradaki debezium dökümanını inceleyebilirsiniz.Debezium Taski OluşturmaÖnce docker compose up -d komutu ile stackimizi ayağa kaldıralım. Sonrasında, kafka connect içerisinde RestAPI üzerinden debezium taski oluşturacağız. Buradaki task oluşturma stratejisi tamamen size kalmış. İsterseniz statik bir json dosyası üzerinden curl isteği yapabilirsiniz, isterseniz bir app üzerinden dinamik bir şekilde task oluşturabilirsiniz. Ben json üzerinden curl isteğiyle size göstereceğim.Önce register-mysql.json adında bir dosya oluşturalım..├── docker│ └── kafka-connect│ └── Dockerfile├── register-mysql.json└── docker-compose.ymlSonrasında dosya içeriğini oluşturalım:{ "name": "mysql-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "host.docker.internal", "database.port": "3306", "database.user": "root", "database.password": "", "database.server.id": "184055", "topic.prefix": "my_topic_prefix", "database.include.list": "awesomeappDB", "table.include.list": "awesomeappDB.products,awesomeappDB.orders", "schema.history.internal.kafka.bootstrap.servers": "kafka:9092", "schema.history.internal.kafka.topic": "schema-changes.awesomeappDB", "tombstones.on.delete": "false", "snapshot.mode": "schema_only", "max.batch.size": "2048", "max.queue.size": "8192" }}Mevcut config ayarlarına debezium MySQL dökümanı üzerinden ulaşabilirsiniz.Buradaki config ile beraber aslında bir debezium taski oluşturuyoruz. Debeziumun hangi veritabanını dinleyeceğini, hangi tabloları dinleyeceğini vs. configi buradan ayarlıyoruz. Daha sonrasındacurl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mysql.jsoncurl üzerinden isteğimizi yapıp debezium taskimizi başlatıyoruz.İşlemlerimiz bu kadar. Artık consumer appimizden kafkaya bağlanıp topiclere subscribe olabiliriz. Debezium her tablo için bir topic oluşturuyor. Örneğin, bizim config setimize göre awesomeappDB veritabanında products ve orders tablolarındaki transactionlar debezium tarafından kafkaya gönderilecek. Her topic için de my_topic_prefix şeklinde bir prefix eklenecek. Yani ürünler tablosu için consume etmemiz gereken topic “my_topic_prefix.awesomeappDB.products” olacaktır.ÖzetBu yazımda size en basit haliyle development ortamında debezium stackini nasıl oluşturabileceğimizi anlattım. Aslına bakarsak debezium ve kafka connect tarafında henüz bahsetmediğim onlarca özellik ve ayar var. Bunları ihtiyaçlarınıza göre ayarlayıp kullanabilirsiniz, resmi dökümanlar ve google bu özelliklerin kullanımı konusunda yardımcı olacaktır.Umarım faydalı olmuştur, bir sonraki makalede görüşmek üzereDebezium ile Veritabanınızdaki Değişiklikleri Yakalayın was originally published in Moneo on Medium, where people are continuing the conversation by highlighting and responding to this story.
4 months ago
10 minutes read