Hello, I am Jungmin Choi from WhaTap Labs DevOps team. In this article, I will introduce why we developed WhaTap RDS Log, which allows you to view Log and Event from AWS RDS with WhaTap Log Monitoring, its motion structure, and installation script.
References: WhaTap Log Monitoring
When monitoring AWS RDS with only the existing WhaTap DB Monitoring, you had to install WhaTap DB Monitoring on AWS RDS to check the metric of DB Monitoring, and then check it in the Log and Event AWS Console as shown in the figure below.
I felt uncomfortable about this and thought that WhaTap DB Monitoring provides complete monitoring of AWS RDS if you can check the Log and Event of AWS RDS in the WhaTap DB Monitoring environment, so I started developing WhaTap AWS RDS LOG.
The overall structure of WhaTap AWS RDS LOG is shown in the figure below.
LOG and EVENT generated by AWS RDS are delivered to WhaTap RDS Log through AWS CloudWatch Log Subscription Filter and AWS EventBridge (Event Rule), respectively.
WhaTap RDS Log is executed whenever Logs and Events come into AWS Lambda, parses them, and sends them to WhaTap.
Today, we will see how to process incoming Logs and Events from WhaTap RDS Log with Golang code.
WhaTap RDS Log, when run with an incoming Event, first distinguishes whether the Event is an AWS RDS Log or an AWS RDS Event.
AWS RDS LOG and EVENT will be passed to WhaTap RDS Log in the format shown in the example below.
- AWS RDS LOG DATA EXAMPLE
{
'awslogs': {
'data': 'H4sIAAAAAAAAK15.....'
}
}
- AWS RDS EVENT DATA EXAMPLE
{
"version": "0",
"id": "12a345b6-78c9-01d2-34e5-123f4ghi5j6k",
"detail-type": "RDS DB Instance Event",
"source": "aws.rds",
"account": "111111111111",
"time": "2021-03-19T19:34:09Z",
"region": "us-east-1",
"resources": ["arn:aws:rds:us-east-1:111111111111:db:testdb"],
"detail": {
"EventCategories": ["notification"],
"SourceType": "DB_INSTANCE",
"SourceArn": "arn:aws:rds:us-east-1:111111111111:db:testdb",
"Date": "2021-03-19T19:34:09.293Z",
"Message": "DB instance stopped",
"SourceIdentifier": "testdb",
"EventID": "RDS-EVENT-0087"
}
}
To differentiate between them, we defined the EVENT type as shown below and applied it to the Lambda Event Handler.
type AwsRdsEventData struct {
Version string `json:"version"`
Id string `json:"id"`
Type string `json:"detail-type"`
Source string `json:"source"`
Account string `json:"account"`
Time string `json:"time"`
Region string `json:"region"`
Resources []string `json:"resources"`
Detail AwsRdsEventDetail `json:"detail"`
Awslog CloudwatchLogsRawData `json:"awslogs"`
}
package main
import (
"log"
"rds-forwarder/model"
"rds-forwarder/whatap"
"github.com/aws/aws-lambda-go/lambda"
)
func HandleLambdaEvent(event model.AwsRdsEventData) (model.Response, error) {
log.SetFlags(log.LstdFlags | log.Lshortfile)
//AWS RDS EVENT
if event.Awslog.Data == "" {
whatap.SendEvent(event)
result := model.Response{Result: "Success to Send Whatap"}
return result, nil
} else { //AWS RDS LOG
logdata, err := model.ParserEvent(event)
if err != nil {
log.Panic(logdata)
}
result := model.Response{Result: "Success to Send Log"}
whatap.SendLog(logdata)
return result, nil
}
}
func main() {
lambda.Start(HandleLambdaEvent)
}
If the incoming Event is an AWS RDS Log, only the Awslog field has a value and the rest is empty; if it is an AWS RDS EVENT, the Awslog field is empty and the rest has a value. We use this to categorize the data.
For AWS RDS EVENTs, you can send them in the WhaTap log data format without separate action. However, AWS RDS Logs are base64 encoded and compressed, so you need to decode and decompress them.
- AWS RDS LOG json
{
'awslogs': {
'data': 'H4sIAAAAAAAAK15.....'
}
}
- Result of decrypting and decompressing the value of the data field
{
"owner": "123456789012",
"logGroup": "CloudTrail",
"logStream": "123456789012_CloudTrail_us-east-1",
"subscriptionFilters": ["Destination"],
"messageType": "DATA_MESSAGE",
"logEvents": [
{
"id": "31953106606966983378809025079804211143289615424298221568",
"timestamp": 1432826855000,
"message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root\"}"
},
{
"id": "31953106606966983378809025079804211143289615424298221569",
"timestamp": 1432826855000,
"message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root\"}"
},
{
"id": "31953106606966983378809025079804211143289615424298221570",
"timestamp": 1432826855000,
"message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root\"}"
}
]
}
For decryption in WhaTap AWS RDS LOG, we defined the model as follows.
type CloudwatchLogsRawData struct {
Data string `json:"data"`
}
type CloudwatchLogsData struct {
Owner string `json:"owner"`
LogGroup string `json:"logGroup"`
LogStream string `json:"logStream"`
SubscriptionFilters []string `json:"subscriptionFilters"`
MessageType string `json:"messageType"`
LogEvents []CloudwatchLogEvent `json:"logEvents"`
}
type CloudwatchLogEvent struct {
ID string `json:"id"`
Timestamp int64 `json:"timestamp"`
Message string `json:"message"`
}
Then, we proceeded with the decryption as follows.
func ParserEvent(event AwsRdsEventData) (CloudwatchLogsData, error) {
//Define empty structure
logdata := CloudwatchLogsData{}
//Decrypt and save as binary data
rawDecodedText, err := base64.StdEncoding.DecodeString(event.Awslog.Data)
if err != nil {
return logdata, err
}
// Decompress the decoded binary data using gzip.NewReader
zipReader, err := gzip.NewReader(bytes.NewBuffer(rawDecodedText))
if err != nil {
return logdata, err
}
defer zipReader.Close()
// Convert uncompressed data to Go data structures with a JSON decoder
dec := json.NewDecoder(zipReader)
err = dec.Decode(&logdata)
return logdata, err
}
The installation and setup scripts for WhaTap RDS Log are written in AWS CloudFormaition to make the process quick and simple for users.
Parameters:
ProjectAccessKey:
Description: "Enter your Project Access Key (Management > Project management > Project access key)"
Type: String
AllowedPattern : ".+"
Pcode:
Description: "Enter your pcode (Management > Project management > pcode)"
Type: String
AllowedPattern : ".+"
Host:
Description: "Enter Whatap Server IP (Management > Agent Installation > Whatap Server)"
Type: String
Default: "13.124.11.223/13.209.172.35"
AllowedPattern : ".+"
Port:
Type: String
Default: 6600
TimeOut:
Description: "Lambda runs your code for a set amount of time before timing out. Timeout is the maximum amount of time in seconds that a Lambda function can run. The default value for this setting is 3 seconds, but you can adjust this in increments of 1 second up to a maximum value of 15 minutes."
Type: Number
Default: 150
MemorySize:
Description: "Lambda allocates CPU power in proportion to the amount of memory configured. Memory is the amount of memory available to your Lambda function at runtime. You can increase or decrease the memory and CPU power allocated to your function using the Memory (MB) setting. To configure the memory for your function, set a value between 128 MB and 10,240 MB in 1-MB increments. At 1,769 MB, a function has the equivalent of one vCPU (one vCPU-second of credits per second)."
Type: Number
Default: 1024
MinValue: 128
MaxValue: 3000
UseReservedConcurrency:
Description: "Reserve concurrency for a function to set the maximum number of simultaneous executions for a function. Provision concurrency to ensure that a function can scale without fluctuations in latency. Reserved concurrency applies to the entire function, including all versions and aliases."
AllowedValues:
- true
- false
Type: String
Default: false
ReservedConcurrency:
Type: Number
Description: "Set Reserved Concurrency"
Default: 10
Conditions:
useConcurrency: !Equals [ !Ref UseReservedConcurrency, true]
noConcurrency: !Equals [ !Ref UseReservedConcurrency, false]
Resources:
LambdaZipsBucket:
Type: AWS::S3::Bucket
CopyZips:
Type: Custom::CopyZips
Properties:
ServiceToken: !GetAtt "CopyZipsFunction.Arn"
DestBucket: !Ref "LambdaZipsBucket"
SourceBucket: !Sub "whatapforwarder"
Objects:
- !Sub "WhaTapRDSLog.zip"
CopyZipsFunctionRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action: sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
Path: /
Policies:
- PolicyName: lambda-copier
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- s3:GetObject
Resource:
- !Sub "arn:aws:s3:::whatapforwarder/*"
- Effect: Allow
Action:
- s3:PutObject
- s3:DeleteObject
Resource:
- !Sub "arn:aws:s3:::${LambdaZipsBucket}/*"
CopyZipsFunction:
Type: AWS::Lambda::Function
Properties:
Description: Copies objects from a source S3 bucket to a destination
Handler: index.handler
Runtime: python3.9
Role: !GetAtt 'CopyZipsFunctionRole.Arn'
Timeout: 240
Code:
ZipFile: |
import json
import logging
import threading
import boto3
import cfnresponse
logger = logging.getLogger()
logger.setLevel(logging.INFO)
s3 = boto3.client('s3')
def copy_objects(source_bucket, dest_bucket, objects):
"""
Copy specified objects from source to destination bucket
:param source_bucket: source bucket name
:param dest_bucket: destination bucket name
:param prefix: source bucket prefix
:param objects: list of objects to copy
:return: None
"""
for item in objects:
key = item
copy_source = {
'CopySource': '/{}/{}'.format(source_bucket, key),
'Bucket': source_bucket,
'Key': key
}
logger.info('copy_source: [{}]'.format(copy_source))
logger.info('dest_bucket: [{}]'.format(dest_bucket))
logger.info('key: [{}]'.format(key))
s3.copy_object(CopySource=copy_source, Bucket=dest_bucket, Key=key)
def delete_objects(bucket, objects):
"""
Delete specified s3 objects
:param bucket: bucket name
:param prefix: bucket prefix
:param objects: list of object names
:return None
"""
objects = {'Objects': [{'Key': item} for item in objects]}
s3.delete_objects(Bucket=bucket, Delete=objects)
def timeout_handler(event, context):
"""
Timeout handling
:param event: lambda function event
:param context: lambda function context
:return None
"""
logger.error('Execution is about to time out, sending failure response to CloudFormation')
cfnresponse.send(event, context, cfnresponse.FAILED, {}, None)
def handler(event, context):
"""
Lambda function handler
:param event: lambda function event
:param context: lambda function context
:return None
"""
# make sure we send a failure to CloudFormation if the function
# is going to timeout
timer = threading.Timer((context.get_remaining_time_in_millis()
/ 1000.00) - 0.5, timeout_handler, args=[event, context])
timer.start()
logger.info('Event: [{}]'.format(event))
status = cfnresponse.SUCCESS
try:
source_bucket = event['ResourceProperties']['SourceBucket']
dest_bucket = event['ResourceProperties']['DestBucket']
objects = event['ResourceProperties']['Objects']
logging.info('SourceBucket=[{}], DestinationBucket=[{}], \
Objects=[{}]'.format(
source_bucket, dest_bucket, objects))
if event['RequestType'] == 'Delete':
delete_objects(dest_bucket, objects)
else:
copy_objects(source_bucket, dest_bucket, objects)
except Exception as e:
logger.error('Exception: %s' % e, exc_info=True)
status = cfnresponse.FAILED
finally:
timer.cancel()
cfnresponse.send(event, context, status, {}, None)
LambdaExecutionRole:
Type: "AWS::IAM::Role"
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Action:
- "sts:AssumeRole"
Policies:
- PolicyDocument:
Version: "2012-10-17"
Statement:
#Lambda Basic Excution Role
- Effect: Allow
Action:
- "logs:CreateLogGroup"
- "logs:CreateLogStream"
- "logs:PutLogEvents"
Resource: !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/*"
PolicyName: forwarderpolicy
WhaTapRDSLogReserved:
DependsOn: CopyZips
Type: AWS::Lambda::Function
Condition: useConcurrency
Properties:
Handler: "main"
Code:
S3Bucket:
Ref: 'LambdaZipsBucket'
S3Key: "WhaTapRDSLog.zip"
Runtime: "go1.x"
MemorySize:
Ref : MemorySize
Timeout:
Ref : TimeOut
ReservedConcurrentExecutions:
Ref : ReservedConcurrency
Environment:
Variables:
WHATAP_HOST:
Ref : Host
WHATAP_LICENSE:
Ref : ProjectAccessKey
WHATAP_PCODE:
Ref : Pcode
WHATAP_PORT:
Ref : Port
Role: !GetAtt LambdaExecutionRole.Arn
CWPermissionReserved:
Type: AWS::Lambda::Permission
Condition: useConcurrency
Properties:
Action: lambda:InvokeFunction
FunctionName: !Ref WhaTapRDSLogReserved
Principal: !Sub "logs.${AWS::Region}.amazonaws.com"
SourceAccount: !Ref "AWS::AccountId"
EventBridgePermissionReserved:
Type: AWS::Lambda::Permission
Condition: useConcurrency
Properties:
Action: lambda:InvokeFunction
FunctionName: !Ref WhaTapRDSLogReserved
Principal: events.amazonaws.com
SourceAccount: !Ref "AWS::AccountId"
WhaTapRDSLog:
DependsOn: CopyZips
Type: AWS::Lambda::Function
Condition: noConcurrency
Properties:
Handler: "main"
Code:
S3Bucket:
Ref: 'LambdaZipsBucket'
S3Key: "WhaTapRDSLog.zip"
Runtime: "go1.x"
MemorySize:
Ref : MemorySize
Timeout:
Ref : TimeOut
Environment:
Variables:
WHATAP_HOST:
Ref : Host
WHATAP_LICENSE:
Ref : ProjectAccessKey
WHATAP_PCODE:
Ref : Pcode
WHATAP_PORT:
Ref : Port
Role: !GetAtt LambdaExecutionRole.Arn
CWPermission:
Type: AWS::Lambda::Permission
Condition: noConcurrency
Properties:
Action: lambda:InvokeFunction
FunctionName: !Ref WhaTapRDSLog
Principal: !Sub "logs.${AWS::Region}.amazonaws.com"
SourceAccount: !Ref "AWS::AccountId"
EventBridgePermission:
Type: AWS::Lambda::Permission
Condition: noConcurrency
Properties:
Action: lambda:InvokeFunction
FunctionName: !Ref WhaTapRDSLog
Principal: events.amazonaws.com
SourceAccount: !Ref "AWS::AccountId"
Outputs:
WhaTapRDSLogArn:
Condition: noConcurrency
Description: "WhaTapRDSLog's Arn"
Value: !GetAtt WhaTapRDSLog.Arn
WhaTapRDSLogArnReserved:
Condition: useConcurrency
Description: "WhaTapRDSLog's Arn"
Value: !GetAtt WhaTapRDSLogReserved.Arn
Setup
Description: "CloudFormation template for Setting WhaTapRDSLog"
Parameters:
WhaTapRDSLogArn:
Description: "Enter your WhatapRdsForwarder ARN (if WhatapRdsForwarder isn't installed, you have to install it first)"
Type: String
EventRuleName:
Description: "Enter AWS EventBridge Rule Name"
Type: String
Default: "WhaTapRDSEventRule"
AllowedPattern: ".+"
AwsRdsNames:
Description: "Enter AWS RDS Name (ex database-1, database-2)"
Type: List
Default: "none, none"
AllowedPattern: ".+"
RdsLogGroupName1:
Description: "Enter your Cloudwatch LogGroup Name that you want to see (ex /aws/rds/database-1/error)"
Default: "none"
Type: String
RdsLogGroupName2:
Description: "Enter your Cloudwatch LogGroup Name that you want to see"
Type: String
Default: "none"
RdsLogGroupName3:
Description: "Enter your Cloudwatch LogGroup Name that you want to see"
Type: String
Default: "none"
RdsLogGroupName4:
Description: "Enter your Cloudwatch LogGroup Name that you want to see"
Type: String
Default: "none"
RdsLogGroupName5:
Description: "Enter your Cloudwatch LogGroup Name that you want to see"
Type: String
Default: "none"
RdsLogGroupName6:
Description: "Enter your Cloudwatch LogGroup Name that you want to see"
Type: String
Default: "none"
RdsLogGroupName7:
Description: "Enter your Cloudwatch LogGroup Name that you want to see"
Type: String
Default: "none"
RdsLogGroupName8:
Description: "Enter your Cloudwatch LogGroup Name that you want to see"
Type: String
Default: "none"
RdsLogGroupName9:
Description: "Enter your Cloudwatch LogGroup Name that you want to see"
Type: String
Default: "none"
Conditions:
Rule : !Not [ !Equals [ !Select [0, !Ref AwsRdsNames], "none" ] ]
RDS1 : !Not [ !Equals [ !Ref RdsLogGroupName1, "none" ] ]
RDS2 : !Not [ !Equals [ !Ref RdsLogGroupName2, "none" ] ]
RDS3 : !Not [ !Equals [ !Ref RdsLogGroupName3, "none" ] ]
RDS4 : !Not [ !Equals [ !Ref RdsLogGroupName4, "none" ] ]
RDS5 : !Not [ !Equals [ !Ref RdsLogGroupName5, "none" ] ]
RDS6 : !Not [ !Equals [ !Ref RdsLogGroupName6, "none" ] ]
RDS7 : !Not [ !Equals [ !Ref RdsLogGroupName7, "none" ] ]
RDS8 : !Not [ !Equals [ !Ref RdsLogGroupName8, "none" ] ]
RDS9 : !Not [ !Equals [ !Ref RdsLogGroupName9, "none" ] ]
Resources:
WhatapEventBridgeRule:
Type: AWS::Events::Rule
Condition: Rule
Properties:
Description: "Event Rule For Monitoring RDS event"
EventBusName: default
EventPattern:
source:
- aws.rds
detail:
SourceIdentifier: !Ref AwsRdsNames
State: ENABLED
Name: !Ref EventRuleName
Targets:
- Id : "EventID"
Arn : !Ref WhaTapRDSLogArn
ConnectRDSLogwithWhatap1:
Type: AWS::Logs::SubscriptionFilter
Condition: RDS1
Properties:
DestinationArn: !Ref "WhaTapRDSLogArn"
FilterPattern: " "
LogGroupName: !Ref "RdsLogGroupName1"
ConnectRDSLogwithWhatap2:
Type: AWS::Logs::SubscriptionFilter
Condition: RDS2
Properties:
DestinationArn: !Ref "WhaTapRDSLogArn"
FilterPattern: " "
LogGroupName: !Ref "RdsLogGroupName2"
ConnectRDSLogwithWhatap3:
Type: AWS::Logs::SubscriptionFilter
Condition: RDS3
Properties:
DestinationArn: !Ref "WhaTapRDSLogArn"
FilterPattern: " "
LogGroupName: !Ref "RdsLogGroupName3"
ConnectRDSLogwithWhatap4:
Type: AWS::Logs::SubscriptionFilter
Condition: RDS4
Properties:
DestinationArn: !Ref "WhaTapRDSLogArn"
FilterPattern: " "
LogGroupName: !Ref "RdsLogGroupName4"
ConnectRDSLogwithWhatap5:
Type: AWS::Logs::SubscriptionFilter
Condition: RDS5
Properties:
DestinationArn: !Ref "WhaTapRDSLogArn"
FilterPattern: " "
LogGroupName: !Ref "RdsLogGroupName5"
ConnectRDSLogwithWhatap6:
Type: AWS::Logs::SubscriptionFilter
Condition: RDS6
Properties:
DestinationArn: !Ref "WhaTapRDSLogArn"
FilterPattern: " "
LogGroupName: !Ref "RdsLogGroupName6"
ConnectRDSLogwithWhatap7:
Type: AWS::Logs::SubscriptionFilter
Condition: RDS7
Properties:
DestinationArn: !Ref "WhaTapRDSLogArn"
FilterPattern: " "
LogGroupName: !Ref "RdsLogGroupName7"
ConnectRDSLogwithWhatap8:
Type: AWS::Logs::SubscriptionFilter
Condition: RDS8
Properties:
DestinationArn: !Ref "WhaTapRDSLogArn"
FilterPattern: " "
LogGroupName: !Ref "RdsLogGroupName8"
ConnectRDSLogwithWhatap9:
Type: AWS::Logs::SubscriptionFilter
Condition: RDS9
Properties:
DestinationArn: !Ref "WhaTapRDSLogArn"
FilterPattern: " "
LogGroupName: !Ref "RdsLogGroupName9"
As a DevOps engineer, I have mostly focused on using pre-made monitoring, but it was a great experience to actually develop monitoring. I will continue to observe and complement the insufficiencies from the perspective of clients who operate/manage services with WhaTap monitoring.