変更データキャプチャ

Version 25.3.9414


変更データキャプチャ


一部のデータソースでは変更データキャプチャ(CDC)をサポートしており、データソースはログファイルを使用して、データベースに変更を加えるイベント(挿入更新、または削除)をログに記録します。CData Sync は、データソーステーブルに変更をクエリするのではなくログファイルを読み込んで変更イベントを確認します。次に、アプリケーションはレプリケーションのためのそれらの変更をほぼリアルタイムで抽出し、次回のレプリケーション用に現在のログを保存します。

以下のデータソースは、CDC の機能をサポートします:

  • Informix (Native) - 拡張型CDC を使用します。

  • MariaDB - バイナリログを使用します。

  • Microsoft Dynamics 365 - 変更の追跡を使用します。

  • MySQL - バイナリログを使用します。

  • Oracle - Oracle LogMiner またはOracle Flashback のいずれかを使用します。両方のメソッドがテーブルで有効になっている場合、Sync はOracle LogMiner を使用します。

  • PostgreSQL - 論理レプリケーションを使用します。

  • SQL Server - CDC または変更の追跡のいずれかを使用します。両方のメソッドがテーブルで有効になっている場合、Sync はCDC を使用します。

このドキュメントでは、以下のコンセプトとタスクについて説明します:

  • CDC パイプラインの定義

  • CDC を使用する場合かどうかの判断

  • ソースデータベースのCDC を有効化

  • Sync でCDC ジョブを作成

  • CDC ジョブにタスクを追加

  • Post-Job 変換を追加

CDC パイプラインとは?

変更データキャプチャ(CDC)は、データソースからサポートされている同期先にデータ変更をストリーミングできるパイプラインを作成し、そのデータに対して変換を実行できるようにします。サポートされている同期先は、以下のいずれかの構造を含めることができます:

  • 別のリレーショナルデータベース

  • クラウドストレージシステムまたはデータレイク

  • データウェアハウス(例えばSnowflake、Amazon Redshift、Google BigQuery など)

  • ファイルストレージシステム

  • メッセージキュー(Kafka またはKinesis など)

CDC を使用するかどうかの判断

CDC のアプローチは、ソースデータベースに加えられた変更を識別、キャプチャ、配信するプロセスを通じてデータを統合します。それらのデータ変更はトランザクションログに保存されます。

このアプローチは、次のような状況で有効です:

  • CDC をサポートするデータソースの1つを使用している場合。前述のとおり、Sync はMySQL、Oracle、PostgreSQL、およびSQL Server のネイティブのログベースの変更データキャプチャをサポートしています。

  • リアルタイムに近いデータを必要としている場合。CDC プロセスは、ETL やELT 処理においてほぼリアルタイムでのデータ転送を提供します。

  • リソースの使用を制限または保持したい場合。CDC データ統合は、アプリケーションレベルで変更を加えたり、トランザクションテーブルをスキャンしたりしないので、システムへ与える影響は小さくなります。

ソースデータベースのCDC を有効化

CDC を有効化する方法は、CDC をサポートするデータベースソースごとに異なります。使用するデータソースのCDC を有効にする方法については、以下の該当するリンクをクリックしてください。

CData Sync でCDC ジョブを作成

ジョブを作成するには、事前設定されたデータソースと同期先の接続が必要です。データソースおよび同期先接続の作成に関する詳細は、接続を参照してください。

データソースへの接続と、同期先データベースを定義後、次の手順に従って新しいジョブを作成します。

  1. Sync のジョブタブで、ジョブを追加 > 新しいジョブを追加を選択します。ジョブを追加ダイアログボックスが開きます。

  2. ダイアログボックスで、ジョブ名を入力し、サポートされているCDC データソースの1つを選択し、さらに同期先も選択します。

  3. レプリケーションタイプとして変更データキャプチャを選択します。

  4. 論理レプリケーションスロットテキストボックスに、レプリケーションスロットの名前を入力します。

  5. ジョブを追加をクリックしてジョブを作成します。

  6. ジョブページに戻り、ジョブにアクセスします。

CDC ジョブにタスクを追加

タスクはデータソースから同期先へのデータフローを制御します。通常のレプリケーションジョブでは、すべてのデータソーステーブルをレプリケーションタスクとしてジョブに追加できます。

テーブルをレプリケーションタスクとして追加するには:

  1. Sync のジョブタブで、ジョブをクリックします。

  2. ジョブ / ジョブ名ページのジョブ設定セクションでタスクを追加をクリックします。このアクションによりスキーマを選択ダイアログボックスが開きます。

  3. スキーマリストからスキーマ(例:public)を選択します。

  4. ダイアログボックスで特定のテーブルを選択するか、データソース名の横のチェックボックスを選択して、すべてのテーブルを選択します。

  5. タスクを追加をクリックして新しいタスクを追加します。

  6. ジョブページに戻り、ジョブにアクセスして実行します。

タスクの作成について、詳しくはタスクを参照してください。

主キーを持たないテーブルに対するCDC のサポート

Sync は、同期先テーブルに新しいカラム(_cdatasync_id)を追加することで、主キーのないソーステーブルのCDC をサポートします。このカラムには、行全体のコンテンツから生成されたハッシュ値が格納されます。このハッシュ値は、内容に基づいて行の一意の識別子として機能します。行のいずれかの値が変更されるとハッシュ値も変更されるため、Sync は主キーがなくても行が更新されたことを認識できます。

Sync が_cdatasync_id カラムを追加すると、アプリケーションはINSERT、UPDATE、およびDELETE 操作を次のように処理します:

  • INSERT:データソースで新しい行が検出されると、Sync は_cdatasync_id 値を生成し、その行を同期先に挿入します。

  • UPDATE:Sync は、_cdatasync_deleted=true を設定することで、既存の行の論理削除を実行します。その後、Sync は新しい行を挿入します。

  • DELETE:Sync は、_cdatasync_deleted=true を設定することで、行の論理削除を実行します。

このアプローチ(新しいカラムの追加)により、主キーを持たないソーステーブルに対して正確なCDC を実現できます。

Post-Job 変換の追加

Sync は、ジョブ完了後のデータ変換プロセスをサポートします。高度なSQL クエリを使用するか、既存のdbt Core およびdbt Cloud プロジェクトを活用することで、単一のプラットフォームですべてのデータニーズに対応できます。

SQL 変換についての詳細は、SQL Transformation を参照してください。DBT 変換についての詳細は、DBT Transformation を参照してください。

CData Sync の拡張型CDC ジョブ

Sync のジョブの拡張型変更データキャプチャ(CDC)機能を使用すると、変更をリアルタイムでキャプチャできます。変更をキャプチャするメカニズムであるCDC エンジンについて、次のセクションで説明します。

CDC エンジン

CDC エンジンは、Sync アプリケーションのコンポーネントで、データベースのリアルタイムのデータ変更を追跡し、ストリーミングする役割を担っています。CDC エンジンは、トランザクションログ(WAL ログやREDO ログなど)を監視することでデータの更新を識別し、これらの変更イベントをステージングエリアにファイルとして一時的に保存して、さらに処理できるようにします。このステージングエリアは、アプリケーションディレクトリのjobs フォルダにあります(C:\ProgramData\CData\sync\jobs)。

各拡張型CDC ジョブには専用のCDC エンジンがあり、ステージングエリア内に当該ジョブ用の一意のサブフォルダを作成します。さらに、ジョブ内のすべてのテーブルには、イベントファイルを含む対応するサブフォルダがあります。

ステージファイルの管理

変更イベントの保存を最適化するには、1つのステージファイルに書き込むことができる最大行数を指定するstage.file.max.rows プロパティを設定します。デフォルトでは、この値は100,000行に設定されています。多数のカラムを持つテーブルや大きなオブジェクト(バイナリデータなど)を含むジョブの場合、CData ではこのプロパティの値を減少させることをお勧めします。

次の例に示すように、ジョブの高度な設定タブでテーブルスキーマに応じてこのプロパティを設定できます:

ステージリミットの管理

すべての拡張型CDC ジョブは同じステージングエリアを共有するため、ステージングエリアのサイズを慎重に管理することが重要です。デフォルトでは、ディスクのオーバーロードを防ぐためにステージは10GB に制限されています。この制限はstagemaxsize プロパティで変更できます。

ステージングエリアがサイズ制限に達すると、すべてのCDC エンジンが自動的に停止します。このような場合、手動でジョブを実行するか、スケジュールされたジョブの実行を待って、蓄積された変更イベントを同期先に複製してステージングエリアを解放する必要があります。最大ステージサイズ(stagemaxsize)の3分の1が解放されると、すべてのCDC エンジンが自動的に起動し、リアルタイムデータストリーミング処理を再開します。

オプションとして、概要タブの開始ボタンをクリックしてCDC エンジンを再起動することもできます。

Note:CDC エンジンは継続的に動作し、変更イベントをリアルタイムでストリームするため、ベストプラクティスとして、ジョブを頻繁に実行するようにスケジュールする必要があります。この方法により、ステージングエリアがサイズ制限に達するのを防ぎながら、ニアリアルタイムのレプリケーションが保証されます。

ソースデータベースの拡張型CDC の有効化

拡張型CDC を有効にする方法は、変更データの取得にこの方法を使用できるデータベースソースごとに異なります。ソースで拡張型CDC を有効にする方法については、以下から該当するリンクをクリックしてください。

CData Sync での拡張型CDC ジョブの作成

ジョブを作成するには、事前設定されたデータソースと同期先の接続が必要です。データソースおよび同期先接続の追加に関する詳細は、接続を参照してください。

データソースへの接続と、同期先データベースを定義後、次の手順に従って新しいジョブを作成します。

  1. Sync のジョブタブで、ジョブを追加 > 新しいジョブを追加をクリックします。ジョブを追加ダイアログボックスが開きます。

  2. ダイアログボックスで、ジョブ名を入力し、サポートされている拡張型CDC データソースの1つを選択し、さらに同期先も選択します。ソースを選択すると、レプリケーションの種類(拡張型変更データキャプチャ)が自動的に表示されます。

  3. ソースに応じて、以下のように適切なプロパティを入力します:

    PostgreSQL Source(Postgres-CDC-Enhanced)の場合:

    • パブリケーション名:変更のストリーミング用に作成したPostgreSQL パブリケーションの名前を入力します。パブリケーションは、まだ存在しない場合は起動時に作成されます。

      Note:パブリケーションはジョブに対して一意である必要があります。ジョブはパブリケーションを共有できません。

    • 論理レプリケーションスロット:レプリケーションスロットの名前を入力します。

      Notes:

      • レプリケーションスロットはジョブに対して一意である必要があります。ジョブはレプリケーションスロットを共有できません。

      • PostgreSQL ユーザーは、十分な権限があれば、起動時に論理レプリケーションスロットを作成することもできます。

    Oracle source(OracleDeb)の場合:

    接続したいOracle プラガブルデータベースの名前をプラガブルデータベーステキストボックスに入力します。このプロパティは、コンテナデータベース(CDB)インストールでのみ使用します。

  4. ジョブを追加をクリックします。

  5. ジョブページに戻り、ジョブにアクセスします。

拡張型CDC ジョブへのタスクの追加

タスクはデータソースから同期先へのデータフローを制御します。通常のレプリケーションジョブでは、すべてのデータソーステーブルをレプリケーションタスクとしてジョブに追加できます。

テーブルをレプリケーションタスクとして追加するには:

  1. ジョブタブで、ジョブをクリックします。

  2. ジョブ / ジョブ名ページのジョブ設定セクションでタスクを追加をクリックします。このアクションによりスキーマを選択ダイアログボックスが開きます。

  3. スキーマリストからスキーマ(例:public)を選択します。

  4. ダイアログボックスで特定のテーブルを選択するか、データソース名の横のチェックボックスを選択して、すべてのテーブルを選択します。

  5. タスクを追加をクリックして新しいタスクを追加します。

  6. ジョブページに戻り、ジョブにアクセスします。

タスクの作成について、詳しくはタスクを参照してください。