概要
ロググループにおける サブスクリプションフィルター を使い、対象の検出をトリガーに後続の処理を Lambda が実行する仕組みを導入した際のサンプルコードを紹介します。
必要なリソースはすべて CloudFormation テンプレートで表現していますので、リソースごとに CLI を実行する必要はありません。
前提
以下がインストールされていること
- SAM CLI
- Node.js
- Docker
当記事では SAM 自体がどういったものかについては扱いません。
用意・把握しておくもの
- 検出元の CloudWatch Logs のロググループ名
- 通知先の Slack チャンネルの WebhookURL
ファイル構成
.
├── README.md
├── function
│ ├── AWSLogEvent.d.ts
│ ├── index.ts
│ ├── node_modules
│ ├── package-lock.json
│ ├── package.json
│ └── tsconfig.json
├── samconfig.toml
└── template.yaml
template.yaml
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
cloudwatch-logs-slack-notifier-lambda
Sample SAM Template for cloudwatch-logs-slack-notifier-lambda
# More info about Globals: https://github.com/awslabs/serverless-application-model/blob/master/docs/globals.rst
Globals:
Function:
Timeout: 3
Parameters:
LogGroupName:
Type: String
Default: dummyLogGroupName
FilterPattern:
Type: String
Default: dummyFilterPattern
WebhookURL:
Type: String
Default: dummyWebhookURL
Resources:
NotifierLambdaRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
Policies:
- PolicyName: NotifierLambdaPermissions
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- cloudwatch:Describe*
- cloudwatch:Get*
- cloudwatch:List*
- logs:*
Resource: "*"
NotifierLambda:
Type: AWS::Serverless::Function
Properties:
FunctionName: !Sub '${AWS::StackName}'
CodeUri: function/
Handler: index.handler
Runtime: nodejs18.x
MemorySize: 512
Timeout: 30
Role: !GetAtt NotifierLambdaRole.Arn
Environment:
Variables:
WebhookURL: !Ref WebhookURL
Metadata: # Manage esbuild properties
BuildMethod: esbuild
BuildProperties:
Minify: true
Target: "es2022"
Sourcemap: true
EntryPoints:
- index.ts
SubscriptionFilter:
Type: AWS::Logs::SubscriptionFilter
DependsOn: SubscriptionFilterInvokeFunctionPermission
Properties:
LogGroupName: !Ref LogGroupName
FilterPattern: !Ref FilterPattern
DestinationArn: !GetAtt NotifierLambda.Arn
SubscriptionFilterInvokeFunctionPermission:
Type: AWS::Lambda::Permission
Properties:
Action: lambda:InvokeFunction
FunctionName: !GetAtt NotifierLambda.Arn
Principal: logs.ap-northeast-1.amazonaws.com
function/index.ts
import { AWSLogEvent, AWSLogsData, AWSEvent } from './AWSLogEvent';
import * as zlib from 'zlib';
import * as https from 'https';
export const handler = async (event: AWSEvent) => {
if (!process.env.WebhookURL) {
throw new Error("WebhookURL is not defined in environment variables");
}
const webhookURL = new URL(process.env.WebhookURL);
console.log('Loading function');
if (!event.awslogs || !event.awslogs.data) {
console.error('Invalid event format:', JSON.stringify(event));
throw new Error('Invalid event format');
}
try {
const compressedData = Buffer.from(event.awslogs.data, 'base64');
const decodedData = zlib.gunzipSync(compressedData).toString('utf-8');
const payload: AWSLogsData = JSON.parse(decodedData);
const logGroup = payload.logGroup;
const logStream = payload.logStream;
const region = process.env.AWS_REGION;
const logStreamURL = `https://console.aws.amazon.com/cloudwatch/home?region=${region}#logsV2:log-groups/log-group/${encodeURIComponent(logGroup)}/log-events/${encodeURIComponent(logStream)}`;
const promises = payload.logEvents.map((logEvent: AWSLogEvent) => {
const timestamp = new Date(logEvent.timestamp);
timestamp.setHours(timestamp.getHours() + 9); // Adjust for time zone
const logEventMessage = logEvent.message.slice(0, 1500) + ' ...';
const webhookPayload = {
text:
`
${timestamp.toISOString()}
LogGroup: ${logGroup}
<${logStreamURL}|Message>:
\`\`\`${logEventMessage}\`\`\`
`
};
const httpRequestOptions = {
hostname: webhookURL.hostname,
path: webhookURL.pathname,
method: 'POST',
headers: {
'Content-Type': 'application/json'
}
};
return new Promise((resolve, reject) => {
const req = https.request(httpRequestOptions, (res) => {
if (res.statusCode === 200) {
console.log('Notification sent to webhook successfully');
resolve(`Successfully processed log event.`);
} else {
reject(new Error(`Failed to send notification with status code: ${res.statusCode}`));
}
});
req.on('error', (error) => {
console.error('Error sending notification to webhook: ', error);
reject(error);
});
req.write(JSON.stringify(webhookPayload));
req.end();
});
});
await Promise.all(promises);
return `Successfully processed ${payload.logEvents.length} records.`;
} catch (error) {
console.error('Error processing log event: ', error);
throw error;
}
};
function/AWSLogEvent.d.ts
export interface AWSLogEvent {
id: string;
timestamp: number;
message: string;
}
export interface AWSLogsData {
logGroup: string;
logStream: string;
logEvents: AWSLogEvent[];
}
export interface AWSEvent {
awslogs: {
data: string;
};
}
デプロイ
parameter-overrides で任意のパラメータを指定してデプロイします。
$ sam build
$ sam deploy --stack-name cloudwatch-logs-slack-notifier-lambda-{purpose} \
--parameter-overrides \
LogGroupName={CloudWatchLogGroupName} \
FilterPattern='{"Arrange\ FilterPattern"}' \
WebhookURL=https://hooks.slack.com/services/{YourWebhookURL} \
--profile {YourAccountProfile}
{purpose} には、Lambda の用途を区別するための任意の文言を指定してください。
FilterPattern については、例えば「hogehoge」と「fuga fuga」のように一部空白を含む文字列を OR 条件で設定したい場合、次のように書きます。
FilterPattern='?hogehoge\ ?\"fuga\ fuga\"'
詳細は https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/FilterAndPatternSyntax.html をご参照ください。
その他
サブスクリプションフィルターを活用すれば、ログデータを OpenSearch Service、Kinesis Data Streams または Firehose へストリーミングできるので、アイデア次第では他にも色々な用途がありそうです。 とはいえ、当記事の執筆時(2024/06)では、一つのロググループに設定できるサブスクリプションフィルターは 2 つまでであることを留意する必要があります。
まとめとして
カルテットのインフラチームには、IaC の考え方が根付いています。 属人性を排除し、再現性・変更容易性・持続可能性の高いアウトプットを担保するために、(手順書や記憶ではなく)可能な限り作業をコードで表現します。
この考え方・仕事の進め方は、個人的にはとても大切なことだと感じています。 当記事で紹介した仕組みは、一部 Web コンソール上で実施可能な作業もありますが、この文化のおかげもあり SAM をある程度扱えるようになりました。
もちろん、業務範囲や裁量がある程度広い、という環境も作用していますが、インフラチームのエンジニアでありながら AWS Lambdaや TypeScript に触れる機会も多くありがたく感じています。