概要

ロググループにおける サブスクリプションフィルター を使い、対象の検出をトリガーに後続の処理を Lambda が実行する仕組みを導入した際のサンプルコードを紹介します。

必要なリソースはすべて CloudFormation テンプレートで表現していますので、リソースごとに CLI を実行する必要はありません。

前提

以下がインストールされていること

  • SAM CLI
  • Node.js
  • Docker

当記事では SAM 自体がどういったものかについては扱いません。

用意・把握しておくもの

  • 検出元の CloudWatch Logs のロググループ名
  • 通知先の Slack チャンネルの WebhookURL

ファイル構成

.
├── README.md
├── function
│   ├── AWSLogEvent.d.ts
│   ├── index.ts
│   ├── node_modules
│   ├── package-lock.json
│   ├── package.json
│   └── tsconfig.json
├── samconfig.toml
└── template.yaml

template.yaml

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
  cloudwatch-logs-slack-notifier-lambda

  Sample SAM Template for cloudwatch-logs-slack-notifier-lambda
  
# More info about Globals: https://github.com/awslabs/serverless-application-model/blob/master/docs/globals.rst
Globals:
  Function:
    Timeout: 3

Parameters:
  LogGroupName:
    Type: String
    Default: dummyLogGroupName
  FilterPattern:
    Type: String
    Default: dummyFilterPattern
  WebhookURL:
    Type: String
    Default: dummyWebhookURL

Resources:
  NotifierLambdaRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: lambda.amazonaws.com
            Action: sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
      Policies:
        - PolicyName: NotifierLambdaPermissions
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - cloudwatch:Describe*
                  - cloudwatch:Get*
                  - cloudwatch:List*
                  - logs:*
                Resource: "*"

  NotifierLambda:
    Type: AWS::Serverless::Function
    Properties:
      FunctionName: !Sub '${AWS::StackName}'
      CodeUri: function/
      Handler: index.handler
      Runtime: nodejs18.x
      MemorySize: 512
      Timeout: 30
      Role: !GetAtt NotifierLambdaRole.Arn
      Environment:
        Variables:
          WebhookURL: !Ref WebhookURL
    Metadata: # Manage esbuild properties
      BuildMethod: esbuild
      BuildProperties:
        Minify: true
        Target: "es2022"
        Sourcemap: true
        EntryPoints:
          - index.ts

  SubscriptionFilter:
    Type: AWS::Logs::SubscriptionFilter
    DependsOn: SubscriptionFilterInvokeFunctionPermission
    Properties:
      LogGroupName: !Ref LogGroupName
      FilterPattern: !Ref FilterPattern
      DestinationArn: !GetAtt NotifierLambda.Arn

  SubscriptionFilterInvokeFunctionPermission:
    Type: AWS::Lambda::Permission
    Properties:
      Action: lambda:InvokeFunction
      FunctionName: !GetAtt NotifierLambda.Arn
      Principal: logs.ap-northeast-1.amazonaws.com

function/index.ts

import { AWSLogEvent, AWSLogsData, AWSEvent } from './AWSLogEvent';
import * as zlib from 'zlib';
import * as https from 'https';

export const handler = async (event: AWSEvent) => {

  if (!process.env.WebhookURL) {
    throw new Error("WebhookURL is not defined in environment variables");
  }
  const webhookURL = new URL(process.env.WebhookURL);

  console.log('Loading function');
  if (!event.awslogs || !event.awslogs.data) {
    console.error('Invalid event format:', JSON.stringify(event));
    throw new Error('Invalid event format');
  }

  try {
    const compressedData = Buffer.from(event.awslogs.data, 'base64');
    const decodedData = zlib.gunzipSync(compressedData).toString('utf-8');
    const payload: AWSLogsData = JSON.parse(decodedData);
    const logGroup = payload.logGroup;
    const logStream = payload.logStream;
    const region = process.env.AWS_REGION;
    const logStreamURL = `https://console.aws.amazon.com/cloudwatch/home?region=${region}#logsV2:log-groups/log-group/${encodeURIComponent(logGroup)}/log-events/${encodeURIComponent(logStream)}`;

    const promises = payload.logEvents.map((logEvent: AWSLogEvent) => {
      const timestamp = new Date(logEvent.timestamp);
      timestamp.setHours(timestamp.getHours() + 9); // Adjust for time zone
      const logEventMessage = logEvent.message.slice(0, 1500) + ' ...';

      const webhookPayload = {
        text:
          `
${timestamp.toISOString()}
LogGroup: ${logGroup}
<${logStreamURL}|Message>:
\`\`\`${logEventMessage}\`\`\`
          `
      };

      const httpRequestOptions = {
        hostname: webhookURL.hostname,
        path: webhookURL.pathname,
        method: 'POST',
        headers: {
          'Content-Type': 'application/json'
        }
      };

      return new Promise((resolve, reject) => {
        const req = https.request(httpRequestOptions, (res) => {
          if (res.statusCode === 200) {
            console.log('Notification sent to webhook successfully');
            resolve(`Successfully processed log event.`);
          } else {
            reject(new Error(`Failed to send notification with status code: ${res.statusCode}`));
          }
        });

        req.on('error', (error) => {
          console.error('Error sending notification to webhook: ', error);
          reject(error);
        });

        req.write(JSON.stringify(webhookPayload));
        req.end();
      });
    });

    await Promise.all(promises);

    return `Successfully processed ${payload.logEvents.length} records.`;
  } catch (error) {
    console.error('Error processing log event: ', error);
    throw error;
  }
};

function/AWSLogEvent.d.ts

export interface AWSLogEvent {
  id: string;
  timestamp: number;
  message: string;
}

export interface AWSLogsData {
  logGroup: string;
  logStream: string;
  logEvents: AWSLogEvent[];
}

export interface AWSEvent {
  awslogs: {
    data: string;
  };
}

デプロイ

parameter-overrides で任意のパラメータを指定してデプロイします。

$ sam build
$ sam deploy --stack-name cloudwatch-logs-slack-notifier-lambda-{purpose} \
    --parameter-overrides \
        LogGroupName={CloudWatchLogGroupName} \
        FilterPattern='{"Arrange\ FilterPattern"}' \
        WebhookURL=https://hooks.slack.com/services/{YourWebhookURL} \
    --profile {YourAccountProfile}

{purpose} には、Lambda の用途を区別するための任意の文言を指定してください。

FilterPattern については、例えば「hogehoge」と「fuga fuga」のように一部空白を含む文字列を OR 条件で設定したい場合、次のように書きます。

        FilterPattern='?hogehoge\ ?\"fuga\ fuga\"'

詳細は https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/FilterAndPatternSyntax.html をご参照ください。

その他

サブスクリプションフィルターを活用すれば、ログデータを OpenSearch Service、Kinesis Data Streams または Firehose へストリーミングできるので、アイデア次第では他にも色々な用途がありそうです。 とはいえ、当記事の執筆時(2024/06)では、一つのロググループに設定できるサブスクリプションフィルターは 2 つまでであることを留意する必要があります。

まとめとして

カルテットのインフラチームには、IaC の考え方が根付いています。 属人性を排除し、再現性・変更容易性・持続可能性の高いアウトプットを担保するために、(手順書や記憶ではなく)可能な限り作業をコードで表現します。

この考え方・仕事の進め方は、個人的にはとても大切なことだと感じています。 当記事で紹介した仕組みは、一部 Web コンソール上で実施可能な作業もありますが、この文化のおかげもあり SAM をある程度扱えるようになりました。

もちろん、業務範囲や裁量がある程度広い、という環境も作用していますが、インフラチームのエンジニアでありながら AWS Lambdaや TypeScript に触れる機会も多くありがたく感じています。