メインコンテンツへスキップ

CloudWatch Pipelinesを使ってみた

· loading · loading ·
kiitosu
著者
kiitosu
aws community builder. 画像処理やデバイスドライバ、データ基盤構築からWebバックエンドまで、多様な領域に携わってきました。地図解析や地図アプリケーションの仕組みにも経験があり、幅広い技術を活かした開発に取り組んでいます。休日は草野球とランニングを楽しんでいます。
目次

はじめに
#

re:Invent 2025で発表されたCloudWatch Pipelinesが気になっていたので、ログの変換・正規化がどこまでマネージドにできるのか実際に試してみました。

CloudWatch Pipelinesは、ログデータの収集・変換・ルーティングをフルマネージドで提供するサービスです。2025年12月のre:Inventで発表されました。

ログ処理でよくある「フォーマットがバラバラ問題」を、インフラ管理なし・パイプライン処理自体は追加料金なしで解決できるのがポイントです(ソース種別によって料金の計上タイミングは異なるので後述)。

従来のログ変換、どうしてた?
#

CloudWatch Logsに流れてくるログを加工したいとき、これまではこんな構成が多かったと思います。

パターン1: Lambda経由

flowchart LR
    A[CloudWatch Logs] --> B[Subscription Filter]
    B --> C[Lambda]
    C -- 加工 --> D[CloudWatch Logs<br/>別ロググループ]

パターン2: Firehose経由

flowchart LR
    A[CloudWatch Logs] --> B[Subscription Filter]
    B --> C[Kinesis Firehose]
    C --> D[Lambda Transform]
    D --> E[S3 / OpenSearch]

どちらも自前で管理するコンポーネントが多いのが課題でした。

  • Lambdaの実装・デプロイ・エラーハンドリング
  • Subscription Filterの上限(ロググループあたり2個まで)
  • Firehoseのバッファリング設定やリトライ管理
  • スケーリングの考慮

「JSONのフィールド名を統一したいだけなのに、そこまで必要?」というケースがわりとあります。

Pipelinesのアーキテクチャ
#

CloudWatch Pipelinesは Source → Processor → Sink の3層構成です。

flowchart LR
    subgraph Source
        direction TB
        S1[CloudWatch Logs]
        S2[S3<br/>SQS経由]
        S3[サードパーティ<br/>CrowdStrike等<br/>API or S3+SQS]
        S1 ~~~ S2 ~~~ S3
    end
    subgraph Processors["Processors (合計最大20)"]
        direction TB
        P1["Parser<br/>(任意1つ・必ず先頭)<br/>例: parse_json / ocsf / grok"]
        subgraph Others["以降は順不同・繰り返し可"]
            direction TB
            P2["Transformation系<br/>キー/値の追加・削除・リネーム<br/>例: add_entries / delete_entries / move_keys"]
            P3["String系<br/>文字列値の変換<br/>例: lowercase_string / uppercase_string"]
        end
        P1 --> Others
    end
    subgraph Sink
        D1[CloudWatch Logs]
    end
    Source --> Processors --> Sink
コンポーネント 必須 個数 説明
Source Yes 1 データソース。CloudWatch Logs、S3 (SQS経由)、サードパーティ(API直連携 または S3+SQS)
Processor No 0〜20 順序付きの変換処理。Parser(parse_json/ocsf/grok 等)は最大1つかつ先頭固定。それ以降は Transformation系 / String系 等を順不同・繰り返し可で並べられる
Sink Yes 1 出力先。現時点ではCloudWatch Logsのみ

従来手法との比較
#

項目 CloudWatch Pipelines Kinesis Firehose + Lambda Subscription Filter + Lambda
マネージド フルマネージド Firehoseはマネージド、Lambdaは自前 Lambdaは自前
追加コスト パイプライン処理自体は追加料金なし(CWLソースは処理前、S3/サードパーティは処理後のCustom logsとして課金) Firehose配信料 + Lambda実行料 Lambda実行料
ログ変換 組み込みプロセッサ Lambda内で自由に実装 Lambda内で自由に実装
OCSF変換 組み込みサポート 自前実装 自前実装
出力先 CloudWatch Logsのみ S3, Redshift, OpenSearch, Splunk等 任意(Lambda次第)
インフラ管理 不要 Firehose設定 + Lambda管理 Lambda管理
サードパーティ取り込み CrowdStrike、Okta、Palo Alto、Wiz、Zscaler 等を組み込みサポート なし なし

Pipelinesが向いているケース:

  • ログのJSON正規化、フィールド追加/削除、文字列変換などの定型処理
  • OCSF準拠が必要なセキュリティログ
  • CrowdStrike, Okta等のサードパーティログをCloudWatchに集約したい
  • Lambdaを書きたくない(管理したくない)

従来手法が向いているケース:

  • CloudWatch Logs以外への出力が必要(S3, OpenSearch等)
  • 複雑なビジネスロジックを含む変換
  • リアルタイムで外部に転送したい

要するに、**「CloudWatch Logs内で完結するログ変換ならPipelines一択」**という棲み分けです。

ハンズオン:アプリケーションログをJSON正規化する
#

実際にパイプラインを組んで、バラバラなフォーマットのアプリケーションログを正規化してみます。

やりたいこと
#

アプリケーションから流れてくるログのフォーマットがバラバラ:

// ログA(構造化済み)
{"timestamp": "2026-04-12T10:00:00Z", "level": "INFO", "message": "Order created", "userId": "u-123"}

// ログB(フィールド名が違う)
{"ts": "2026-04-12T10:01:00Z", "severity": "ERROR", "msg": "Payment failed", "user_id": "u-456"}

これを統一したい:

  • tstimestampseveritylevelmsgmessageuser_iduserId
  • level を小文字に統一
  • 環境情報(environment: production)を付与

CloudFormationテンプレート
#

AWSTemplateFormatVersion: '2010-09-09'
Description: CloudWatch Pipeline - Log Normalization Example

Resources:
  # 対象のロググループ
  AppLogGroup:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName: /app/my-service
      RetentionInDays: 14

  # CloudWatch Logs ソース用 IAMロール
  #   source.cloudwatch_logs.aws.sts_role_arn に指定するロール。
  #   CloudWatch Logs サービス(logs.amazonaws.com)が assume して、
  #   対象ロググループからイベントを読み取るために使う。
  #   ※ パイプライン自体を作成する側の呼び出し者には
  #     別途 logs:PutPipelineRule / logs:DeletePipelineRule 等の権限が必要。
  PipelineSourceRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: logs.amazonaws.com
            Action: sts:AssumeRole

  # パイプライン本体
  LogNormalizationPipeline:
    Type: AWS::ObservabilityAdmin::TelemetryPipelines
    Properties:
      Name: app-log-normalizer
      Configuration:
        Body: !Sub |
          pipeline:
            source:
              cloudwatch_logs:
                log_event_metadata:
                  data_source_name: "my_app_logs"
                  data_source_type: "default"
                aws:
                  sts_role_arn: "${PipelineSourceRole.Arn}"
            processor:
              - parse_json: {}
              - move_keys:
                  entries:
                    - from_key: "ts"
                      to_key: "timestamp"
                      overwrite_if_to_key_exists: false
                    - from_key: "severity"
                      to_key: "level"
                      overwrite_if_to_key_exists: false
                    - from_key: "msg"
                      to_key: "message"
                      overwrite_if_to_key_exists: false
                    - from_key: "user_id"
                      to_key: "userId"
                      overwrite_if_to_key_exists: false
              - lowercase_string:
                  with_keys:
                    - "level"
              - add_entries:
                  entries:
                    - key: "environment"
                      value: "production"
                      overwrite_if_key_exists: false
            sink:
              - cloudwatch_logs:
                  log_group: "@original"
      Tags:
        - Key: Purpose
          Value: log-normalization

Outputs:
  PipelineArn:
    Value: !GetAtt LogNormalizationPipeline.Arn

ポイント:

  • parse_jsonmove_keyslowercase_stringadd_entries の順でプロセッサをチェーン
  • parse_json は parser processor なので先頭に1つだけ配置(ocsfgrok も同じ枠で、混在不可)
  • sinklog_group: "@original" は元のロググループにそのまま書き戻す設定
  • パイプライン処理自体には追加料金が発生しない(CloudWatch Logs ソースの場合は処理前に通常の取り込み料金として課金)

デプロイと動作確認
#

# デプロイ
aws cloudformation deploy \
  --template-file template.yaml \
  --stack-name cw-pipeline-demo \
  --capabilities CAPABILITY_IAM \
  --region ap-northeast-1

# ログストリームを事前に作成(put-log-events の前提)
aws logs create-log-stream \
  --log-group-name /app/my-service \
  --log-stream-name test-stream \
  --region ap-northeast-1

# テストログを送信
aws logs put-log-events \
  --log-group-name /app/my-service \
  --log-stream-name test-stream \
  --log-events \
    timestamp=$(date +%s000),message='{"ts":"2026-04-12T10:01:00Z","severity":"ERROR","msg":"Payment failed","user_id":"u-456"}' \
  --region ap-northeast-1

# ※ 2回目以降は describe-log-streams で uploadSequenceToken を取得し
#    --sequence-token で渡す必要があります

変換結果
#

パイプライン通過後のログ:

{
  "timestamp": "2026-04-12T10:01:00Z",
  "level": "error",
  "message": "Payment failed",
  "userId": "u-456",
  "environment": "production"
}
  • tstimestamp にリネーム
  • severitylevel にリネーム + 小文字化
  • msgmessage にリネーム
  • user_iduserId にリネーム
  • environment: production が付与

Lambdaのコードを1行も書かずに、ログの正規化ができました。

OCSF変換も試してみる
#

セキュリティログの標準フォーマットであるOCSF(Open Cybersecurity Schema Framework)への変換も組み込みサポートされています。CloudTrail、VPC Flow Logs、Route 53 Resolverログ、AWS WAFなど主要なAWSサービスに加え、CrowdStrike、Okta、Palo Alto等のサードパーティログもOCSFに変換できます。

# OCSF変換パイプラインの例
#   (自前のロググループにCloudTrail形式JSONを流している想定 = custom logs)
pipeline:
  source:
    cloudwatch_logs:
      log_event_metadata:
        data_source_name: "cloudtrail_logs"   # 任意の識別名
        data_source_type: "default"           # customログは default
      aws:
        sts_role_arn: "arn:aws:iam::123456789012:role/PipelineSourceRole"
  processor:
    - ocsf:
        version: "1.5"
        schema:
          cloud_trail:
  sink:
    - cloudwatch_logs:
        log_group: "@original"

AWS vended の CloudTrail ログ(data_source_name: aws_cloudtrail)を直接扱う場合は、使えるプロセッサが ocsf のみ、または空([] に制限されます。move_keys 等との組み合わせができないので、上記のように「自前ロググループ + default ソース」として扱う方が柔軟性が高い場面もあります。

これだけで、CloudTrailのJSON形式ログがOCSF v1.5準拠に変換されます。Security Lakeやサードパーティ SIEMとの連携で「OCSF準拠にしてくれ」と言われたときに、Lambda不要で対応できるのは嬉しい。

CloudWatch Logs ソース側で対応している schema は、現時点で cloud_trail / route53_resolver / vpc_flow / eks_audit / aws_waf 等です。最新の一覧はOCSF processor のドキュメントを参照してください。

知っておくべき制限事項
#

制限 詳細
Sinkの出力先 CloudWatch Logsのみ(S3やOpenSearchには直接出力できない)
パイプライン数 アカウントあたり最大330(CW Logsソース300 + その他30)
プロセッサ数 パイプラインあたり最大20
パーサープロセッサ パイプラインあたり1つまで、かつ先頭に配置
イベントサイズ 変換後256KB以下
include_original 有効にすると元ログと変換後ログ両方保存(CWLソース限定、processor1つ以上必須、{} で指定)。ストレージコストは変換後サイズ・保持期間次第だが単純には増加
CW Logsソースのsink @original(元のロググループ)固定。別ロググループへの転送は不可

一番大きい制限は出力先がCloudWatch Logsのみという点。変換後のログをS3に出したい場合は、Pipeline通過後にSubscription Filter → Firehose → S3という従来の構成を追加する必要があります。

モニタリング
#

パイプラインのメトリクスは AWS/Observability Admin 名前空間で確認できます。

  • PipelineErrors — エラー発生数。アラーム設定推奨
  • PipelineErrorsByErrorType — エラー種別ディメンション付き。ACCESS_DENIED / PARSE_FAILURE / PROCESSOR_ERRORS / PAYLOAD_SIZE_EXCEEDED / RESOURCE_NOT_FOUND / SOURCE_READ_FAILURE / ALL などが用意されている(最新は公式ドキュメント参照)
  • PipelineWarningsTHROTTLED(レート超過)等
  • PipelineBytesIn / PipelineBytesOut — スループット
  • PipelineRecordsIn / PipelineRecordsOut — レコード数

特に PARSE_FAILURE はフォーマットが想定と異なるログが流れてきたときに発生するので、初期は注意して見ておくのが良いです。

最後に
#

  • 「CloudWatch Logs内で完結するログ変換」に最適 — パイプライン処理自体は追加料金なし、インフラ管理なし(ソース種別で課金タイミングが異なる点は注意)
  • OCSF変換がビルトイン — セキュリティログの標準化がLambdaなしで可能
  • サードパーティ連携が充実 — CrowdStrike、Okta、Palo Alto、Wiz、Zscaler 等のログを API 直連携または S3+SQS で取り込める
  • 制限はSinkがCloudWatch Logsのみ — S3やOpenSearchに出したい場合は従来手法と組み合わせが必要

Log Transformers(2024年11月発表)の進化版として、ログ変換パイプラインの選択肢がかなり広がりました。「ログのフォーマット統一のためだけにLambda書くのつらい」と思っていた人は、まず試してみる価値があると思います。

参考リンク
#

Reply by Email

関連記事

いまさら AWS AppConfig に触ってみた
· loading · loading
Lambda Durable Functionsを使ってみる — Step Functionsとの比較からハンズオンまで
· loading · loading
Obsidian CLIとClaude Codeでタスク管理を改善した話