Google Cloud Platformに戻る

Dataflow

Dataflowとは

Dataflowは、Google Cloud Platform (GCP) が提供するフルマネージドのデータ処理サービスです。バッチ処理とストリーム処理の両方をサポートし、大規模なデータセットに対するETL(抽出・変換・ロード)処理、リアルタイム分析、機械学習パイプラインなどを構築できます。Dataflowは、Apache Beamプログラミングモデルに基づいており、サーバーレスで自動的にスケーリングするため、インフラストラクチャの管理なしにデータ処理パイプラインを実行できます。

Dataflowの主な特徴

Dataflowのアーキテクチャ

Dataflowは以下の主要コンポーネントで構成されています:

コンポーネント 説明
パイプライン データ処理のロジックを定義する一連の操作
PCollection パイプライン内のデータセットを表す抽象データ型
変換(Transform) データに適用される処理操作(マッピング、フィルタリング、集約など)
ソース(Source) データの入力元(Pub/Sub、BigQuery、Cloud Storage、データベースなど)
シンク(Sink) 処理結果の出力先(BigQuery、Cloud Storage、データベースなど)
ワーカー パイプラインの処理を実行する計算リソース
ウィンドウ ストリームデータを有限のセットに分割する時間ベースの概念
ウォーターマーク イベント時間の進行を追跡し、遅延データを処理するための仕組み

Dataflow と他のデータ処理サービスの違い

Google Cloudには他のデータ処理サービスもありますが、以下のような違いがあります:

Dataflowのプログラミングモデル

Dataflowは、Apache Beamプログラミングモデルを使用しています。このモデルの主要な概念は以下の通りです:

パイプラインの基本構造

  1. パイプラインの作成
  2. 入力データの読み込み(PCollectionの作成)
  3. データ変換の適用(Transform)
  4. 結果の出力
  5. パイプラインの実行

Javaでの基本的なパイプライン例

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.FlatMapElements;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;

import java.util.Arrays;

public class WordCount {
  public static void main(String[] args) {
    // パイプラインオプションの設定
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
    Pipeline pipeline = Pipeline.create(options);
    
    // 入力データの読み込み
    PCollection<String> lines = pipeline.apply(
        "ReadLines", TextIO.read().from("gs://my-bucket/input.txt"));
    
    // 単語に分割
    PCollection<String> words = lines.apply(
        "ExtractWords", FlatMapElements.into(TypeDescriptors.strings())
            .via((String line) -> Arrays.asList(line.split("[^\\p{L}]+"))));
    
    // 空の単語をフィルタリング
    PCollection<String> filteredWords = words.apply(
        "FilterEmptyWords", Filter.by((String word) -> !word.isEmpty()));
    
    // 単語のカウント
    PCollection<KV<String, Long>> wordCounts = filteredWords.apply(
        "CountWords", Count.perElement());
    
    // 結果のフォーマット
    PCollection<String> formattedResults = wordCounts.apply(
        "FormatResults", MapElements.into(TypeDescriptors.strings())
            .via((KV<String, Long> wordCount) ->
                wordCount.getKey() + ": " + wordCount.getValue()));
    
    // 結果の出力
    formattedResults.apply(
        "WriteResults", TextIO.write().to("gs://my-bucket/output"));
    
    // パイプラインの実行
    pipeline.run();
  }
}

Pythonでの基本的なパイプライン例

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# パイプラインオプションの設定
options = PipelineOptions()
pipeline = beam.Pipeline(options=options)

# パイプラインの定義
(pipeline
 | 'ReadLines' >> beam.io.ReadFromText('gs://my-bucket/input.txt')
 | 'ExtractWords' >> beam.FlatMap(lambda line: line.split())
 | 'FilterEmptyWords' >> beam.Filter(lambda word: word)
 | 'CountWords' >> beam.combiners.Count.PerElement()
 | 'FormatResults' >> beam.Map(lambda word_count: f'{word_count[0]}: {word_count[1]}')
 | 'WriteResults' >> beam.io.WriteToText('gs://my-bucket/output')
)

# パイプラインの実行
pipeline.run()

変換(Transform)の種類

Apache Beamでは、以下のような基本的な変換操作が提供されています:

ウィンドウとトリガー

ストリーム処理では、無限のデータストリームを有限のセットに分割するためにウィンドウを使用します:

Javaでのウィンドウ適用例

// 固定時間ウィンドウの適用
PCollection<String> windowedEvents = events.apply(
    Window.<String>into(FixedWindows.of(Duration.standardMinutes(5))));

// スライディングウィンドウの適用
PCollection<String> slidingWindowEvents = events.apply(
    Window.<String>into(SlidingWindows.of(Duration.standardMinutes(10))
        .every(Duration.standardMinutes(1))));

// セッションウィンドウの適用
PCollection<String> sessionWindowEvents = events.apply(
    Window.<String>into(Sessions.withGapDuration(Duration.standardMinutes(5))));

Pythonでのウィンドウ適用例

# 固定時間ウィンドウの適用
windowed_events = events | beam.WindowInto(beam.window.FixedWindows(5 * 60))

# スライディングウィンドウの適用
sliding_window_events = events | beam.WindowInto(
    beam.window.SlidingWindows(10 * 60, 60))

# セッションウィンドウの適用
session_window_events = events | beam.WindowInto(
    beam.window.Sessions(5 * 60))

Dataflowの使用方法

Dataflowは、Google Cloud Console、gcloud CLIツール、またはクライアントライブラリを使用して利用できます。

パイプラインの実行

Javaパイプラインの実行(Maven)

mvn compile exec:java \
  -Dexec.mainClass=com.example.WordCount \
  -Dexec.args="--project=my-project \
               --runner=DataflowRunner \
               --region=us-central1 \
               --tempLocation=gs://my-bucket/temp \
               --inputFile=gs://my-bucket/input.txt \
               --output=gs://my-bucket/output"

Pythonパイプラインの実行

python wordcount.py \
  --project=my-project \
  --runner=DataflowRunner \
  --region=us-central1 \
  --temp_location=gs://my-bucket/temp \
  --input=gs://my-bucket/input.txt \
  --output=gs://my-bucket/output

Dataflowテンプレートの使用

Dataflowテンプレートを使用すると、事前に構築されたパイプラインを簡単に実行できます:

事前定義テンプレートの実行

gcloud dataflow jobs run my-job \
  --gcs-location=gs://dataflow-templates/latest/GCS_Text_to_BigQuery \
  --region=us-central1 \
  --parameters \
inputFilePattern=gs://my-bucket/input/*.csv,\
outputTable=my-project:my_dataset.my_table,\
javascriptTextTransformFunctionName=transform,\
javascriptTextTransformGcsPath=gs://my-bucket/transform.js

カスタムテンプレートの作成(Java)

mvn compile exec:java \
  -Dexec.mainClass=com.example.WordCount \
  -Dexec.args="--project=my-project \
               --runner=DataflowRunner \
               --region=us-central1 \
               --templateLocation=gs://my-bucket/templates/wordcount \
               --stagingLocation=gs://my-bucket/staging"

Dataflowのストリーム処理

Dataflowは、リアルタイムデータストリームの処理に特に強みを持っています:

Pub/Subからのストリーム処理

Javaでのストリーム処理例

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.joda.time.Duration;

// パイプラインの設定
Pipeline pipeline = Pipeline.create(
    PipelineOptionsFactory.fromArgs(args).withValidation().create());

// Pub/Subからのメッセージ読み取り
pipeline
    .apply("ReadFromPubSub", PubsubIO.readStrings()
        .fromSubscription("projects/my-project/subscriptions/my-subscription"))
    // 5分間の固定ウィンドウを適用
    .apply("Window", Window.<String>into(FixedWindows.of(Duration.standardMinutes(5))))
    // 以降の変換処理...
    .apply("ProcessData", ...)
    // 結果の出力
    .apply("WriteToBigQuery", ...);

// パイプラインの実行
pipeline.run();

Pythonでのストリーム処理例

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms import window

# パイプラインの設定
options = PipelineOptions()
pipeline = beam.Pipeline(options=options)

# パイプラインの定義
(pipeline
 | 'ReadFromPubSub' >> beam.io.ReadFromPubSub(
     subscription='projects/my-project/subscriptions/my-subscription')
 | 'DecodeData' >> beam.Map(lambda x: x.decode('utf-8'))
 # 5分間の固定ウィンドウを適用
 | 'Window' >> beam.WindowInto(window.FixedWindows(5 * 60))
 # 以降の変換処理...
 | 'ProcessData' >> beam.Map(...)
 # 結果の出力
 | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(...)
)

# パイプラインの実行
pipeline.run()

遅延データの処理

ストリーム処理では、イベント時間と処理時間のずれにより、遅延データが発生することがあります。Dataflowでは、ウォーターマークとトリガーを使用して遅延データを適切に処理できます:

Javaでの遅延データ処理例

import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;

// ウィンドウと複合トリガーの設定
events.apply(Window.<Event>into(FixedWindows.of(Duration.standardMinutes(5)))
    .triggering(
        // メインの出力はウォーターマークが通過した時点
        AfterWatermark.pastEndOfWindow()
            // 遅延データの処理(ウォーターマーク後に到着したデータ)
            .withLateFirings(
                // 1分ごとに遅延データを処理
                AfterProcessingTime.pastFirstElementInPane()
                    .plusDelayOf(Duration.standardMinutes(1)))
    )
    // 結果を累積ではなく個別に出力
    .discardingFiredPanes()
);

Pythonでの遅延データ処理例

from apache_beam.transforms.trigger import AfterWatermark, AfterProcessingTime

# ウィンドウと複合トリガーの設定
events | beam.WindowInto(
    beam.window.FixedWindows(5 * 60),
    trigger=AfterWatermark(
        early=None,
        # 遅延データの処理(ウォーターマーク後に到着したデータ)
        late=AfterProcessingTime(60),
    ),
    accumulation_mode=beam.window.AccumulationMode.DISCARDING
)

Dataflowのセキュリティ

Dataflowでは、データ処理パイプラインのセキュリティを確保するための様々な機能が提供されています:

アクセス制御

IAMを使用して、Dataflowジョブへのアクセスを制御できます:

# Dataflow管理者ロールの付与
gcloud projects add-iam-policy-binding my-project \
    --member=user:user@example.com \
    --role=roles/dataflow.admin

# Dataflowワーカーロールの付与
gcloud projects add-iam-policy-binding my-project \
    --member=serviceAccount:my-service-account@my-project.iam.gserviceaccount.com \
    --role=roles/dataflow.worker

VPCサービスコントロール

VPCサービスコントロールを使用して、Dataflowへのアクセスをセキュリティ境界内に制限できます。

暗号化

Dataflowのデータは、保存時と転送時に自動的に暗号化されます。さらに、カスタマー管理の暗号化キー(CMEK)を使用することもできます:

--dataflowKmsKey=projects/my-project/locations/global/keyRings/my-keyring/cryptoKeys/my-key

プライベートIPの使用

Dataflowワーカーをプライベートネットワーク内で実行できます:

--network=my-vpc-network
--subnetwork=regions/us-central1/subnetworks/my-subnet
--no-use-public-ips

Dataflowのモニタリングと可観測性

DataflowはGoogle Cloud Monitoringと統合されており、パイプラインの状態を監視できます:

主なモニタリング機能

カスタムメトリクスの収集

パイプライン内でカスタムメトリクスを収集するには、Metrics APIを使用できます:

Javaでのカスタムメトリクス例

import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;

class MyDoFn extends DoFn<String, String> {
  private final Counter processedElements = Metrics.counter(MyDoFn.class, "processedElements");
  
  @ProcessElement
  public void processElement(@Element String element, OutputReceiver<String> out) {
    // 処理ロジック
    processedElements.inc();
    out.output(element.toUpperCase());
  }
}

Pythonでのカスタムメトリクス例

from apache_beam.metrics import Metrics

class MyDoFn(beam.DoFn):
    def __init__(self):
        self.processed_elements = Metrics.counter(self.__class__, 'processedElements')
    
    def process(self, element):
        # 処理ロジック
        self.processed_elements.inc()
        yield element.upper()

Dataflowのパフォーマンス最適化

Dataflowパイプラインのパフォーマンスを最適化するためのベストプラクティス:

パイプライン設計の最適化

リソース設定の最適化

パフォーマンス最適化オプションの例

--maxNumWorkers=50
--workerMachineType=n1-standard-4
--diskSizeGb=500
--experiments=shuffle_mode=service
--enableStreamingEngine

Dataflowのユースケース

Dataflowは以下のようなユースケースに適しています:

ユースケース例:リアルタイムログ分析

リアルタイムログ分析パイプラインの概要

  1. Pub/Subからログメッセージを受信
  2. ログメッセージをパースして構造化
  3. エラーログをフィルタリング
  4. 時間ウィンドウごとにエラー数を集計
  5. しきい値を超えるとアラートを生成
  6. 集計結果をBigQueryに保存
  7. ダッシュボードでリアルタイム可視化

Dataflowとの統合サービス

Dataflowは他のGoogle Cloudサービスと統合して、より強力なソリューションを構築できます:

サービス 統合の利点
Pub/Sub リアルタイムデータストリームの取り込みと処理
BigQuery 処理結果のデータウェアハウスへの保存と分析
Cloud Storage 大規模データの入力と出力のストレージ
Bigtable 高スループットの読み書きが必要な大量データの保存
Spanner グローバルに分散したトランザクションデータの処理
AI Platform 機械学習モデルのトレーニングと予測
Cloud DLP 機密データの検出と保護

Pub/Subとの統合例

Pub/SubからBigQueryへのストリーミングパイプライン

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.bigquery import WriteToBigQuery
import json

class ParseJsonFn(beam.DoFn):
    def process(self, element):
        try:
            # Pub/Subメッセージをパース
            record = json.loads(element.decode('utf-8'))
            yield record
        except Exception as e:
            # エラーログ
            print(f"Error parsing JSON: {e}")

# パイプラインの設定
options = PipelineOptions()
pipeline = beam.Pipeline(options=options)

# パイプラインの定義
(pipeline
 | 'ReadFromPubSub' >> beam.io.ReadFromPubSub(
     subscription='projects/my-project/subscriptions/my-subscription')
 | 'ParseJson' >> beam.ParDo(ParseJsonFn())
 | 'WriteToBigQuery' >> WriteToBigQuery(
     table='my-project:my_dataset.my_table',
     schema='timestamp:TIMESTAMP,user_id:STRING,event:STRING,data:STRING',
     create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
     write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)

# パイプラインの実行
pipeline.run()

まとめ

Dataflow は、Google Cloud Platform上でのバッチ処理とストリーム処理を統合したフルマネージドのデータ処理サービスです。Apache Beamプログラミングモデルに基づき、サーバーレスで自動的にスケーリングするため、インフラストラクチャの管理なしにデータ処理パイプラインを実行できます。

Dataflowの主な利点は以下の通りです:

Dataflowを効果的に活用するには、Apache Beamプログラミングモデルの理解、適切なパイプライン設計、パフォーマンス最適化のベストプラクティスの適用が重要です。また、ユースケースに応じて、他のデータ処理サービス(Dataproc、BigQuery、Cloud Data Fusion など)との適切な使い分けも検討すべきです。