Değişen Verileri Yakalama — Change Data Capture (CDC)

Mirac Öztürk
9 min readJan 28, 2024

Günümüz iş dünyasında; tüm bilgi kaynağı Veri parçacığı üzerine kurulmuş ve karar ekosistemi bunun üzerine inşaa edilmiş/edilmektedir.

Verinin gücü bir yana, doğruluğu/kesinliği ayrı bir önem arz etmektedir.

Bu doğrultuda bugünkü yazımda; dahil olmuş olduğum VBOVeri Bilimi Okulu eğitiminde hazırladığım,

Değişen Verileri YakalamaChange Data Capture (CDC)

konulu çalışmamı özet bir şekilde aktarmaya çalışacağım.

Şimdiden iyi okumalar.

Teknolojik Gereksinimler ve Rolleri​

Çalışmada kullanacağımız teknolojik çözümler ve rolleri özet olarak alt kısımda yer almaktadır.

  • PostgreSQL*: Ana veri kaynağı.
    *Özgür ve açık kaynak kodlu; SQL destekli, ilişkisel veritabanı yönetim sistemidir.
  • Docker*: Uygulama; derleme, test ve dağıtım.
    *Uygulamalar için gerekli bağımlılıkları, bağımsız ve izole bir ortamda çalıştıran sistemdir.
  • Kafka* + Kafka Connect**: Veri işleme ve dağıtma + Entegrasyon sağlama.
    *Akış verilerini, gerçek zamanlı olarak alıp; işlemek için optimize edilmiş, dağıtılmış veri deposudur.
    **Kafka’nın dış sistemler ile entegrasyonunu sağlayan bağlayıcı çözümüdür.
  • Debezium*: Veri değişikliği yakalama.
    *Değişiklik verilerinin yakalanması için açık kaynaklı, dağıtılmış bir platformdur.
  • Spark* + Spark Streaming**: Veri işleme.
    *Büyük ölçekli veri işleme için açık kaynaklı bir birleşik analitik motorudur.
    **Apache Spark motoru üzerinde çalışan ve gerçek zamanlı işlem (Real-Time Processing) yapmanıza olanak sağlayan bir araçtır.
  • MinIO*: Nesne depolama. (Log deposu.)
    *Yüksek performanslı nesne depolama aracıdır.

Akış Yapısı​

Buradaki genel amacımız; bir kaynak üzerindeki (PostgreSQL) veri yapısının değişmesi ile değişikliklerin herhangi bir dosya formatında (Parquet — Log), bir depolama sisteminde kayıt altına alınması. (MinIO)

Genel akış adımları ve görsel aktarımı;

  1. Gereksinimlerin kurulması. (Docker)
  2. Veri kaynağı için test verisi aktarımı. (Data Generator + Github)
  3. Sistemlerin çalıştırılması. (Docker)
  4. Veri manipülasyonu. (PostgreSQL)
  5. Log bilgilerinin yakalanması. (Debezium + Kafka)
  6. Log bilgilerinin düzenlenmesi. (Kafka + Spark)
  7. Log bilgilerinin kayıt altına alınması. (Spark + MinIO)

şeklindedir.

Geliştirme​

Docker ile Gereksinimlerin Kurulumu​

Proje dahilinde belirlenmiş gereksinimlerin kurulumu ve çalıştırılması için önceden hazırlanmış .yaml dosyası kullanılacaktır.

İlgili dosyaya alt kısımda yer alan bağlantı üzerinden erişebilirsiniz; .yaml dosyası için lütfen tıklayınız…

Bu dosyada yer alan konfigürasyonlar; yerel cihazdaki yapı ve gereksinimlere yönelik planlanmıştır.
Çalışmada bulunacağınız yapıya yönelik tekrar kurgulayabilirsiniz.

Docker ile gereksinimlerin kurulumuna başlayalım, bunun için;

docker compose -up --build -d

komutunu çalıştıralım.
*Bu komut ile, Docker Compose aracılığıyla; birden fazla Docker konteynerını, yapılandırma dosyasında tanımlanan hizmetleri oluşturmak ve başlatmak için kullanabiliriz.

İşlemde bulunduğumuzda;

gereksinimlerimiz kurulmuş ve kullanıma hazır gözükmektedir.

Python ile Veri Kaynağı Oluşturma​

Gerekli kurulumlarımız tamamlandıktan sonra, veri kümesi oluşturmamız gerekmektedir.

Öncelikle buna uygun bir depolama alanı kuralım.
PostgreSQL üzerine erişerek, verileri depolayacağımız bir tablo kurgulayalım;

(base) [train@localhost project]$ docker exec -it postgresql bash

root@e06a7bfc3b52:/# psql -U postgres
psql (15.5 (Debian 15.5-1.pgdg120+1))
Type "help" for help.

postgres=# create database sales;
CREATE DATABASE

postgres=# GRANT ALL PRIVILEGES ON DATABASE sales TO postgres;
GRANT

postgres=# CREATE TABLE customers ("customerId" SERIAL PRIMARY KEY,"customerFName" VARCHAR(255),"customerLName" VARCHAR(255), ... ,"customerZipcode" VARCHAR(10));
CREATE TABLE

postgres=# select * from public.customers limit 5;
customerId | customerFName | customerLName | ... | customerZipcode
+ - - - - - + - - - - - - - + - - - - - - - + ... + - - - - - - - +

Kaynağımızı yapılandırdıktan sonra; hazır bir verikaynağını kullanarak, PostgreSQL üzerine veri aktarımında bulunalım;
*Bunun için yerel çalışma alanımızda bir Python Data Generator dosyası çalıştırıyoruz.

(base) [train@localhost ~]$ cd project/data-generator/

(base) [train@localhost data-generator]$ python -m venv project_datagenerator

(base) [train@localhost data-generator]$ source project_datagenerator/bin/activate

(project_datagenerator) (base) [train@localhost data-generator]$ python dataframe_to_postgresql.py
input: input/customers.csv
host: localhost
port: 5432
user: postgres
password: 1881
database: sales
table: customers
sep: ,
row_sleep_time: 0.5
repeat: 1
shuffle: False
row_size 0
batch_size 10
primary_key False
postgresql+psycopg2://postgres:1881@localhost:5432/sales
0-10
1/10000 processed, % 99.99 will be completed in 103.62 mins.
10-20
2/10000 processed, % 99.98 will be completed in 103.61 mins. 20-30
...

NOT: İhtiyacınıza yönelik kendi veri kaynağınızı planlayınız.

Veri aktarımımız tamamlandıktan sonra, kısa bir ön kontrol gerçekleştirelim;

root@e06a7bfc3b52:/# psql -U postgres -d sales
psql (15.5 (Debian 15.5-1.pgdg120+1))
Type "help" for help.

sales=# select * from public.customers limit 5;

Sistemlerin Çalıştırılması​

Bu adımda ise projeye yönelik birden çok sistem ve konfigürasyon ayarlaması işleminde bulunmamız gerekmektedir.
Bunlar;

  • 1-Kafka üzerinde producer-consumer yapısının kurulması;

Bunun için; dbserver.public.customers consumer oluşturuldu. [Consumer=Tüketici]
Producer; PostgreSql üzerindeki veri işlem sistemi. [Producer=Üretici]

NOT: Kendi ihtiyacınıza yönelik bunu mimari olarak planlayınız.

  • 2-Debezium’un yapılandırılması;

Debezium-PostgreSql konnektör ayarları oluşturuldu;
debezium_postgres.json

{
"name": "debezium-postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgresql",
"database.port": "5432",
"database.user": "postgres",
"database.password": "1881",
"topic.prefix": "dbserver",
"database.server.name": "dbserver",
"database.dbname": "sales",
"plugin.name": "pgoutput",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "public.customers"
}
}
  • 3-MinIO bucket oluşturulması;

Veri değişikliklerinin dosya formatında kayıt altında alınacağı MinIO üzerinde bir depolama alanı oluşturuldu;
*datasets bucket

Buradaki depolama alanını çeşitli senaryolara göre ayırabilirsiniz.

Yani;

  • Delete — Silme,
  • Update — Güncelleme,
  • Insert — Yeni Veri Kaydı

vb.
Bu; sistem üzerindeki kontrol edilebilirliliği ve denetimi kolaylaştıracaktır.

  • 4-Kafka’nın ayağa kaldırılması;

Veri manipülasyonu işlemleri öncesi Kafka üzerindeki consumer’ın ayağa kaldırılması ve ortam dinlemesinin sağlanması gerekmektedir.

Kafka ayağa kaldırıldı;

(base) [train@localhost ~]$ docker exec -it kafka bash

root@e322e45fdc21:/# cd kafka/bin

root@e322e45fdc21:/kafka/bin# sh kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic dbserver.public.customers
  • 5-Debezium’un ayağa kaldırılması;

Veri manipülasyonu işlemleri öncesi; veri değişimlerinin yakalanması için Debezium’un kaldırılması ve ortam dinlemesinin sağlanması gerekmektedir.

Debezium ayağa kaldırıldı;

(base) [train@localhost project]$ curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @debezium_postgres.json
HTTP/1.1 409 Conflict
Date: Sun, 28 Jan 2024 08:49:29 GMT
Content-Type: application/json
Content-Length: 75
Server: Jetty(9.4.48.v20220622)

PostgreSQL ile Veri Manipülasyonu

Veri değişimi yakalayabilmek için; PostgreSql üzerinde yer alan sales veritabanındaki customers tablosunda çeşitli işlemlerde bulunmamız gerekmektedir.
*Create-Insert, Update, Delete

NOT: Sadece Update işleminde bulunacağız, diğer işlemleri de uygulayabilirsiniz.

customers tablosunda yer alan customerId=1 kaydındaki customerFName bilgisini güncelleyelim;

update customers set "customerFName" = 'Miraç' where "customerId" = 1;

Güncelenen kaydı hızlıca kontrol edelim;

select * from customers where "customerId"=1;

Buradaki veri manipülasyon sistemimiz çalışmaktadır.

NOT: Temel seviyedeki bir işlemde bulunduk.Burada karmaşılık olması durumunda kurulacak mimari yapı için süre ve performans göz önünde bulundurulmalı.

Kafka ve Debezium ile Log (Değişim) Bilgilerinin Yakalanması

Güncelleme işleminden sonra; değiştirilen kayıt bilgilerinin Kafka üzerinde topic olarak görüntülenmesi ve Debezium ile değişimin yakalanması gerekmektedir.

Bunun için öncelikli olarak Kafka’ya dönerek güncelleme sonrası topic bilgisini kontrol edelim;

Güncelleme işlemi sonrası; ilgili kayda yönelik değişim bilgileri, Kafka topic üzerinde detaylı olarak görüntülenmektedir

NOT: Kurulacak mimari için birden çok kaynak takibi ve karmaşık Kafka mimarisi kurgulanabilir.İhtiyaca yönelik planlayınız.

Spark Streaming ve Python ile Log Bilgilerinin Düzenlenmesi

Kafka topic üzerinde yer alan bilgilerin; Spark yardımı ile çeşitli formatlarda düzenlenip, kayıt altına alınması gerekmektedir.

Bu süreçlerin otomatik gerçekleştirilmesi adına; özelleşmiş bir Python dosyasına ihtiyaç duyulmaktadır.

Örnek dosyaya erişmek için alt kısımdaki bağlantıyı kullanabilirsiniz;

https://github.com/miracozturk17/change-data-capture

Yanı sıra detaylar için;

# Spark master local.
# spark-submit --master local --packages org.apache.hadoop:hadoop-aws:3.2.0,io.delta:delta-core_2.12:2.4.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1 kafka_to_spark.py

# Ilgili kutuphaneler eklendi.
from pyspark.sql.types import StructType, StringType, StructField
from pyspark.sql.functions import from_json, col
from pyspark.sql.functions import get_json_object
from pyspark.sql.types import StringType, StructField, IntegerType, StringType
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, LongType, ArrayType, BooleanType

# Spark dizini bulundu.
import findspark
findspark.init("/opt/spark")

# MinIO bilgileri belirlendi.
# NOT: Erisim bilgileri bu sekilde kullanilmamali.
accessKeyId='istanbul'
secretAccessKey='istanbul'

# SparkSession olusturuldu.
spark = SparkSession.builder \
.appName("Spark Example MinIO") \
.master("local[2]") \
.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.0,io.delta:delta-core_2.12:2.4.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1") \
.config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0") \
.config("spark.hadoop.fs.s3a.access.key", accessKeyId) \
.config("spark.hadoop.fs.s3a.secret.key", secretAccessKey) \
.config("spark.hadoop.fs.s3a.path.style.access", True) \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
.getOrCreate()

# Kafka Topic dinlemesinde bulunuldu.
df = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", "dbserver1.public.customers")
.load())

# Her batch islemi icin bu foreach icerisinde calisacak fonksiyon yapilandirildi.
# JSON semasi tanimlandi. (Kafka dan gelen bilgiler.)
def write_json_to_parquet(batch_df, batch_id):
json_schema = StructType([
StructField("schema", StructType([
StructField("type", StringType(), True),
StructField("fields", ArrayType(StructType([
StructField("type", StringType(), True),
StructField("optional", BooleanType(), True),
StructField("default", StringType(), True),
StructField("field", StringType(), True),
])), True),
StructField("optional", BooleanType(), True),
StructField("name", StringType(), True),
StructField("field", StringType(), True),
]), True),

StructField("payload", StructType([
StructField("before", StructType([
StructField("customerId", IntegerType(), False),
StructField("customerFName", StringType(), True),
StructField("customerLName", StringType(), True),
StructField("customerEmail", StringType(), True),
StructField("customerPassword", StringType(), True),
StructField("customerStreet", StringType(), True),
StructField("customerCity", StringType(), True),
StructField("customerState", StringType(), True),
StructField("customerZipcode", StringType(), True),
]), True),

StructField("after", StructType([
StructField("customerId", IntegerType(), False),
StructField("customerFName", StringType(), True),
StructField("customerLName", StringType(), True),
StructField("customerEmail", StringType(), True),
StructField("customerPassword", StringType(), True),
StructField("customerStreet", StringType(), True),
StructField("customerCity", StringType(), True),
StructField("customerState", StringType(), True),
StructField("customerZipcode", StringType(), True),
]), True),

StructField("source", StructType([
StructField("version", StringType(), False),
StructField("connector", StringType(), False),
StructField("name", StringType(), False),
StructField("ts_ms", LongType(), False),
StructField("snapshot", StringType(), True),
StructField("db", StringType(), False),
StructField("sequence", StringType(), True),
StructField("schema", StringType(), False),
StructField("table", StringType(), False),
StructField("txId", LongType(), True),
StructField("lsn", LongType(), True),
StructField("xmin", LongType(), True),
]), False),

StructField("op", StringType(), False),
StructField("ts_ms", LongType(), True),

StructField("transaction", StructType([
StructField("id", StringType(), False),
StructField("total_order", LongType(), False),
StructField("data_collection_order", LongType(), False),
]), True),
]), False),
])

# Veriler semaya uygun hale getirildi.
batch_df = batch_df.selectExpr("cast(value as string) as json") \
.select(F.from_json("json", schema=json_schema).alias("data")) \
.select(
F.col("data.payload.after.customerId").alias("after_customerId"),
F.col("data.payload.after.customerFName").alias("after_customerFName"),
F.col("data.payload.after.customerLName").alias("after_customerLName"),
F.col("data.payload.after.customerEmail").alias("after_customerEmail"),
F.col("data.payload.after.customerPassword").alias("after_customerPassword"),
F.col("data.payload.after.customerStreet").alias("after_customerStreet"),
F.col("data.payload.after.customerCity").alias("after_customerCity"),
F.col("data.payload.after.customerState").alias("after_customerState"),
F.col("data.payload.after.customerZipcode").alias("after_customerZipcode"),
F.col("data.payload.before.customerId").alias("before_customerId"),
F.col("data.payload.before.customerFName").alias("before_customerFName"),
F.col("data.payload.before.customerLName").alias("before_customerLName"),
F.col("data.payload.before.customerEmail").alias("before_customerEmail"),
F.col("data.payload.before.customerPassword").alias("before_customerPassword"),
F.col("data.payload.before.customerStreet").alias("before_customerStreet"),
F.col("data.payload.before.customerCity").alias("before_customerCity"),
F.col("data.payload.before.customerState").alias("before_customerState"),
F.col("data.payload.before.customerZipcode").alias("before_customerZipcode"),
F.col("data.payload.op").alias("op"),
F.col("data.payload.ts_ms").alias("ts_ms")
)

# Gerekli sutunlar/sema terminalde gosterildi. (Kontrol amacli.)
batch_df.printSchema()

# MinIO uzerindeki belirlenen konumuna ilgili dosya yazildi.
output_path = f"s3a://datasets/iris_parquet"
batch_df.write.format("parquet").mode("append").save(output_path)
print(f"Parquet dosyasi basariyla yazildi: {output_path}")

# output_dir = "file:///tmp/streaming/data_output"
# checkpoint bilgilerinin kaydedilecegi dizin belirlendi.
checkpoint_dir = "file:///tmp/streaming/read_from_kafka_test22"

# Ilgili bilgiler konsola yazildi. (Kontrol amacli.)
console_query = (df
.writeStream
.format("console")
.outputMode("append")
.trigger(processingTime="2 second")
.option("checkpointLocation", checkpoint_dir)
.option("numRows", 20)
.option("truncate", False)
.start())

# Ilgili bilgilerin dosyasi MinIo ya yazildi.
minio_query = (df
.writeStream
.foreachBatch(write_json_to_parquet)
.option("checkpointLocation", checkpoint_dir)
.start())

# Veri akislari baslatildi.
minio_query.awaitTermination()
console_query.awaitTermination()

kod içeriğinden incelemede bulunabilirsiniz.

Buraya kadar Spark üzerinde veri düzenlemesi ve değişimin dosya formatında kayıt alınacağı sürecin otomasyon yapısı aktarıldı.

Peki, Spark’ı ayağa kaldıralım;

(base) [train@localhost project]$ docker exec -it spark bash

root@77d4d039664a:/# cd opt/spark_example/

root@77d4d039664a:/opt/spark_example# spark-submit --master local --packages org.apache.hadoop:hadoop-aws:3.2.0,io.delta:delta-core_2.12:2.4.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1 kafka_to_spark.py

Spark aktif beklemede ve veri değişimi için izlenen şema ise;

message spark_schema {
optional int32 after_customerId;
optional binary after_customerFName (STRING);
optional binary after_customerLName (STRING);
optional binary after_customerEmail (STRING);
optional binary after_customerPassword (STRING);
optional binary after_customerStreet (STRING);
optional binary after_customerCity (STRING);
optional binary after_customerState (STRING);
optional binary after_customerZipcode (STRING);
optional int32 before_customerId;
optional binary before_customerFName (STRING);
optional binary before_customerLName (STRING);
optional binary before_customerEmail (STRING);
optional binary before_customerPassword (STRING);
optional binary before_customerStreet (STRING);
optional binary before_customerCity (STRING);
optional binary before_customerState (STRING);
optional binary before_customerZipcode (STRING);
optional binary op (STRING);
optional int64 ts_ms;
}

şeklindedir.

MinIO ile Log Bilgilerinin Kayıt Altına Alınması

Son olarak ise değişimde bulunduğumuz verilere yönelik dosya formatında işlem bilgilerinin (Log) kayıt altına alıması gerekmektedir.

Bunun için MinIO üzerindeki bucket alanımızı tekrar kontrol edelim;

Ve şimdi tüm sistemlerimiz aktif izlemedeyken bir veri güncelleme işleminde bulunalım;

Hemen Kafka topic üzerindeki bilgileri kontrol edelim;

Bu bilgileri daha anlamlı bir halde görebilmek için JsonCrack üzerinde görüntüleme işleminde bulunalım;

Evet; kontrol ettiğimizde bir veri değişimi (Güncellemesi) yakalanmış.

Peki MinIo üzerinde bir kayıt var mı?

Önceden planladığımız gibi bir .parquet formatında dosya kayıt altına alınmış.

İlgili dosya bağlantısı;
https://github.com/miracozturk17/change-data-capture

Anlık olarak sistemimiz bir veri güncelleme işlemini yakalamış bulunmaktadır.

Bu veri takip sistemini farklı veri değişimleri için; oluşturma, güncelleme ve silme gibi akışlar adına da kullanabilirsiniz.

Sonuç

Günümüzde, iş dünyası için en önemli değer kaynağı; bilgi, dolayısıyla veridir.

Bu noktada doğruluğu ve korunumu adına veri takip sistemleri vazgeçilmez olarak yerini koruyacaktır.

Açıklamaya ve aktarmaya çalıştığım çözüm ise yüzlercesinden (Belkide binlercesi…) sadece biri olabilir.

Bunu ihtiyacınıza yönelik kurgulayıp geliştirebilirsiniz.

Umarım faydalı olur.

İyi çalışmalar.

Sign up to discover human stories that deepen your understanding of the world.

Free

Distraction-free reading. No ads.

Organize your knowledge with lists and highlights.

Tell your story. Find your audience.

Membership

Read member-only stories

Support writers you read most

Earn money for your writing

Listen to audio narrations

Read offline with the Medium app

Mirac Öztürk
Mirac Öztürk

Written by Mirac Öztürk

Data Scientist + Mathematician / Coder — Gamer miracozturk.com

No responses yet

Write a response

Recommended from Medium

Lists

See more recommendations