AWS Big Data Blog

Implement Serverless Log Analytics Using Amazon Kinesis Analytics

Applications log a large amount of data that—when analyzed in real time—provides significant insight into your applications. Real-time log analysis can be used to ensure security compliance, troubleshoot operation events, identify application usage patterns, and much more.

Ingesting and analyzing this data in real time can be accomplished by using a variety of open source tools on Amazon EC2. Alternatively, you can use a set of simple, managed AWS services to perform serverless log analytics. The Amazon Kinesis platform includes the following managed services:

  • Amazon Kinesis Streams streams data on AWS, which allows you to collect, store, and process TBs per hour at a low cost.
  • Amazon Kinesis Firehose loads streaming data in to Amazon Kinesis Analytics, Amazon S3, Amazon Redshift, or Amazon OpenSearch Service.
  • Amazon Kinesis Analytics helps you analyze streaming data by writing SQL queries and in turn overcoming the management and monitoring of streaming logs in near real time. Analytics allows you to reference metadata stored in S3 in SQL queries for real-time analytics.

In this post, I show you how to implement a solution that analyzes streaming Apache access log data from an EC2 instance aggregated over 5 minutes. The solution helps you understand where requests to your applications are coming from. If the source is an unknown application or if a particular source application is trying to clog your application, you can contact the application owner.

Some challenges that this solution entails:

  • You do not want to maintain (patch/upgrade) the log application or servers to do log analytics. You also want your log analytics to scale on demand by default, and so all components are managed services.
  • Apache Logs logs the host IP address or host name. However, that information isn’t useful in the cloud where servers are fungible and hosts change constantly either to scale or heal automatically. So you maintain a flat file list of servers in an S3 bucket that can be updated by Auto Scaling policies and mapped to streaming log data.

Architecture

The following diagram shows how this solution works.

implement_serverless_1

  • Application nodes run Apache applications and write Apache logs locally to disk. The Amazon Kinesis agent on the EC2 instance ingests the log stream in to the Amazon Kinesis stream.
  • The log input stream from various application nodes is ingested in to the Amazon Kinesis stream.
  • Machine metadata about the machine or application is stored in flat files in an S3 bucket. It is a mapping of host IP addresses with the application name and contact.
  • The Analytics application processes streaming logs over tumbling windows by adding referenced machine metadata from S3.
  • The output stream, which is the result of the aggregated responses from the Analytics application, is written into the Amazon Kinesis stream.
  • The Lambda function consumes the aggregated response from the destination stream, processes it, and publishes it to Amazon CloudWatch. It is event driven: as soon as new records are pushed to the destination stream, they are processed in batches of 200 records.
  • The CloudWatch dashboard is used to view response trends.
  • Alarms on aggregated data are generated when specified thresholds are reached.

Implementation

You implement this solution in four parts:

  • Create the Amazon Kinesis stream to ingest streaming log data and aggregated results from Analytics. Create referenced metadata in S3 that stores the hostname, host IP address, application name, and contact (team responsible for that application).
  • Install the Amazon Kinesis agent on the EC2 instances running Apache applications. Configure it to monitor logs and ingest data in to the Amazon Kinesis stream.
  • Set up an Analytics application to process data by joining incoming log stream and referenced metadata over tumbling window. Published aggregated response to output stream.
  • Deploy the Lambda function that processes aggregated response log data from output stream and publishes to CloudWatch.

Creating the Amazon Kinesis stream and metadata file

In this step, create the Amazon Kinesis stream and referenced metadata file in the S3 bucket.

Create the log input stream with the following command:

aws kinesis create-stream --stream-name ApplicationLogInputStream --shard-count 10

Create the log output stream for aggregated results with the following command:

aws kinesis create-stream --stream-name AggregratedLogDataStream--shard-count 2

Add the metadata file to the S3 bucket. For this use case, the metadata file is in CSV format and contains the “Host,ApplicationName,Contact” columns and goes to the bucket “referenced-data”.

Installing the Amazon Kinesis agent

In this step, install the Amazon Kinesis agent on the EC2 instances running Apache web applications. This should be part of your start-up script so that new instances start logging in data to the Amazon Kinesis stream. Amazon Kinesis Streams also provides the log4j appender that can be used to append logs to the input stream.

Install the Amazon Kinesis agent using the following command:

sudo yum install –y aws-kinesis-agent

In the following example, my application logs are located at /etc/httpd/logs/ and that configuration file for the Amazon Kinesis agent at /etc/aws-kinesis/agent.json. For this post, I am using combined Apache access logs. You could also use custom Apache, error, sys, CSV, or other logs. For more information, see Use the Agent to Pre-process Data.

    {
  "awsAccessKeyId": "YOUR_ACCESS_KEY_ID",
  "awsSecretAccessKey": "YOUR_SECRET_KEY"
  "cloudwatch.emitMetrics": true,
  "cloudwatch.endpoint": "https://monitoring.us-east-1.amazonaws.com",
  "kinesis.endpoint": "https://kinesis.us-east-1.amazonaws.com",
  "flows": [
    {
      "filePattern": "/etc/httpd/logs/access_log*",
      "kinesisStream": "ApplicationLogInputStream",
      "partitionKeyOption": "RANDOM",
      "dataProcessingOptions": [
        {
          "optionName": "LOGTOJSON",
          "logFormat": "COMBINEDAPACHELOG",
          "customFieldNames": ["host", "ident", "authuser", "datetime", "request", "response", "bytes", "referrer", "agent"]
        }
      ]
    }
  ]
}

Add permission for the Amazon Kinesis agent user to access logs, using the following command:

sudo setfacl -m u:aws-kinesis-agent-user:rwx /etc/httpd/logs

Restart the Amazon Kinesis agent so that it starts streaming access logs to the Amazon Kinesis stream, using the following command:

sudo service aws-kinesis-agent restart

Logs for the Amazon Kinesis agent are available at /var/log/aws-kinesis-agent/aws-kinesis-agent.log.

Creating the Amazon Kinesis Analytics application

In this step, create the Analytics application that processes incoming log streams and metadata in S3 over a tumbling window of 5 minutes, and sends them to the output stream.

Create the Analytics application,“LogAnalytics”, using the following command:

aws kinesisanalytics create-application \
--application-name "LogAnalytics" \
--application-description "Analyze Streaming Logs"

Verify that the application was created and get the application version using the describe-application command. For a newly created instance, the version is 1.0. However, the version number for future instances can also be retrieved.

aws kinesisanalytics describe-application --application-name "LogAnalytics"

Add the input stream “ApplicationLogInputStream” as the source for the Analytics application and define the input schema and parallelism. You also provide the role to be assumed by the Analytics application. In the following example, I used version 1 retrieved by the following command:

aws kinesisanalytics add-application-input --application-name "LogAnalytics" \
       --current-application-version-id 1 \
       --input '{"KinesisStreamsInput":{"ResourceARN":"arn:aws:kinesis:us-east-1:[YOUR-ACCOUNT-ID]:stream/ApplicationLogInputStream","RoleARN":"arn:aws:iam::[YOUR-ACCOUNT-ID]:role/service-role/kinesis-analytics-AppLogAnalytics"},"NamePrefix":"SOURCE_SQL_STREAM","InputParallelism":{"Count":1},"InputSchema":{"RecordColumns":[{"SqlType":"VARCHAR(16)","Name":"host","Mapping":"$.host"},{"SqlType":"VARCHAR(32)","Name":"datetime","Mapping":"$.datetime"},{"SqlType":"VARCHAR(64)","Name":"request","Mapping":"$.request"},{"SqlType":"SMALLINT","Name":"response","Mapping":"$.response"},{"SqlType":"SMALLINT","Name":"bytes","Mapping":"$.bytes"},{"SqlType":"VARCHAR(128)","Name":"agent","Mapping":"$.agent"},{"SqlType":"VARCHAR(32)","Name":"referrer","Mapping":"$.referrer"}],"RecordFormat":{"MappingParameters":{"JSONMappingParameters":{"RecordRowPath":"$"}},"RecordFormatType":"JSON"},"RecordEncoding":"UTF-8"}}'

Earlier, you uploaded the metadata file in S3 for application host mapping. Now, add a CSV file as a table and define the schema for the Analytics application by adding reference data.

aws kinesisanalytics add-application-reference-data-source  \
       --endpoint https://kinesisanalytics.us-east-1.amazonaws.com/ \
       --region us-east-1 \
       --application-name "LogAnalytics" \
       --debug \
       --reference-data-source '{"TableName":"ApplicationHostMapping","S3ReferenceDataSource":{"BucketARN":"arn:aws:s3:::referenced-data","FileKey":"HostApplicationMap.csv","ReferenceRoleARN":"arn:aws:iam::[YOUR-ACCOUNT-ID]:role/ka-demo-role"},"ReferenceSchema":{"RecordFormat":{"RecordFormatType":"CSV","MappingParameters":{"CSVMappingParameters":{"RecordRowDelimiter":"\n","RecordColumnDelimiter":","}}},"RecordEncoding":"UTF-8","RecordColumns":[{"Name":"Host","SqlType":"VARCHAR(64)"},{"Name":"ApplicationName","SqlType":"VARCHAR(64)"},{"Name":"Contact","SqlType":"VARCHAR(64)"}]}}' \
       --current-application-version-id 2

Now, add Analytics code that processes the incoming log stream and referenced data over tumbling window of 5 minutes by updating the application.

SQL code to process logs

— Create the destination stream that stores the response count as per the source application.

— This will help you determine request count per source.

— It also helps you determine if data is coming from unknown sources.

-- Create the destination stream that stores the response count as per the source application. 
-- This will help you determine request count per source. 
-- It also helps you determine if data is coming from unknown sources. 
CREATE STREAM "DESTINATION_SQL_STREAM" (
applicationName VARCHAR(64),contact VARCHAR(64),
response  SMALLINT,responseCount SMALLINT);
 
-- Aggregrate response over joined data with host application mapping stored on S3. 
-- It always uses the latest S3 file 
CREATE OR REPLACE PUMP "DESTINATION_SQL_STREAM" AS 
INSERT INTO "DESTINATION_SQL_STREAM"
    SELECT STREAM  metadata."ApplicationName" , metadata."Contact", logstream."response", COUNT(*) as responseCount 
                  FROM "SOURCE_SQL_STREAM_001" logstream LEFT JOIN "ApplicationHostMapping" metadata
                  ON logstream."host" = metadata."Host"
                  GROUP BY metadata."ApplicationName", metadata."Contact", logstream."response", FLOOR((logstream.ROWTIME - TIMESTAMP '1970-01-01 00:00:00') MINUTE / 5 TO MINUTE);

CLI code to process logs

aws kinesisanalytics update-application --application-name "LogAnalytics" \
      --current-application-version-id 3 \
      --application-update '{"ApplicationCodeUpdate":"
-- Create the destination stream that stores the response count as per the source application. \n-- This helps you determine request count per source. \n-- It also helps you determine if data is coming from unknown sources. \nCREATE STREAM \"DESTINATION_SQL_STREAM\" (\napplicationName VARCHAR(64),\ncontact VARCHAR(64),\nresponse  SMALLINT,\nresponseCount SMALLINT);\n \n-- Aggregrate response over joined data with host application mapping stored on S3. \n-- It will always used latest S3 file \nCREATE OR REPLACE PUMP \"DESTINATION_SQL_STREAM\" AS \nINSERT INTO \"DESTINATION_SQL_STREAM\"\n    SELECT STREAM  metadata.\"ApplicationName\" , metadata.\"Contact\", logstream.\"response\", COUNT(*) as responseCount \n                  FROM \"SOURCE_SQL_STREAM_001\" logstream LEFT JOIN \"ApplicationHostMapping\" metadata\n                  ON logstream.\"host\" = metadata.\"Host\"\n                  GROUP BY metadata.\"ApplicationName\", metadata.\"Contact\", logstream.\"response\", FLOOR((logstream.ROWTIME - TIMESTAMP '"'"'1970-01-01 00:00:00'"'"') MINUTE / 5 TO MINUTE);\n"}'

Add the output stream to the Analytics application, from which Lambda consumes it.

aws kinesisanalytics add-application-output --application-name "LogAnalytics" \
      --current-application-version-id 4 \
      --application-output '{"Name":"DESTINATION_SQL_STREAM","DestinationSchema":{"RecordFormatType":"JSON"},"KinesisStreamsOutput":{"ResourceARN":"arn:aws:kinesis:us-east-1:186764970580:stream/AggregratedLogDataStream","RoleARN":"arn:aws:iam::186764970580:role/service-role/kinesis-analytics-AppLogAnalytics"}}'

Now that you’ve configured the Analytics application, start it when you are ready. It requires an input ID and input starting point. Get the input ID by running the describe-application command used earlier. You can choose from the following start options:

  • Start now for new records (“NOW”)
  • Start from the beginning of the input stream (“TRIM_HORIZON”)
  • Start from where the application had stopped processing (“LAST_STOPPED_POINT”), if the Analytics application is being restarted.
aws kinesisanalytics start-application --application-name "LogAnalytics" \
      --input-configurations '{"Id":"2.1","InputStartingPositionConfiguration":{"InputStartingPosition":"TRIM_HORIZON"}}'

Deploying the Lambda function

In this step, add the Lambda function that is triggered on the aggregated results from the Analytics application and then publishes metrics to CloudWatch. CloudWatch doesn’t need to be configured. It accepts new metrics, which then show up in the dashboard. You can also set alarms in CloudWatch, such as “contact me when requests from unknown sources are greater than 100 in 5 minutes.”

The following Lambda code snippet written in Java can be modified as required. You can also write Lambda functions in Node.js or Python.

// Lambda handler for event triggers from the Amazon Kinesis stream
public void recordHandler(KinesisEvent event) throws IOException {

//Configure CloudWatch and assume the role assigned to Lambda
AWSCredentialsProvider provider = new EnvironmentVariableCredentialsProvider();	    cloudWatchClient = new AmazonCloudWatchClient(provider); 
// Initialize cloudwatch put request    
PutMetricDataRequest putMetricDataRequest = new PutMetricDataRequest();  putMetricDataRequest.setNamespace(“namespace”);

// Processes the event record from the Amazon Kinesis stream
for (KinesisEventRecord rec : event.getRecords()) {
// Implementation of your object as per response from the stream record. 
// It should deserialize JSON 
    YourObject yourObject = YourObject.fromJsonAsBytes(rec.getKinesis().getData().array()); // Business logic to process your object

    // Set Cloudwatch metric
    MetricDatum metricDatum = new MetricDatum();
	    metricDatum.setMetricName(yourObject.getApplicationName());metricDatum.setTimestamp(yourObject.getMetricTime());
metricDatum.setUnit(StandardUnit.Count);	    metricDatum.setValue(yourObject.getValue());    putMetricDataRequest.getMetricData().add(metricDatum);
}	

//Commit all metrics to CloudWatch
cloudWatchClient.putMetricData(putMetricDataReqeust);
}

After you create the Java package for Lambda, upload it to AWS. Add the role to be assumed by the Lambda function. As in the code above, CloudWatch uses the same role, so make sure that the role has an appropriate policy. The Lambda function uses 192 MB of memory and times out in 10 seconds.

aws lambda create-function --function-name "PublishLogMetrics" --runtime "java8" \
      --handler "com.amazonaws.lambda.ProcessKinesisEvents::recordHandler" \
      --role "arn:aws:iam::[YOUR-ACCOUNT-ID]:role/MetricsProcessor" \
      --zip "fileb://log-metrics-cloudwatch-0.0.1-SNAPSHOT.jar" --publish  \
      --memory-size 192 --timeout 10

Now that the function is configured, add the Amazon Kinesis stream (the Analytics output stream) as the event source. The Lambda function starts processing data from the beginning of the stream and in batches of 200 records.

aws lambda create-event-source-mapping --event-source-arn "arn:aws:kinesis:us-east-1:[YOUR-ACCOUNT-ID]:stream/AggregratedLogDataStream" \
      --function-name "PublishLogMetrics" \
       --enabled --batch-size 200 \
      --starting-position "TRIM_HORIZON"

After this is set up, you have a graphical representation of the processed incoming log feed in real time, using AWS services. In CloudWatch, choose Metrics, Custom Metrics, and look for MyApplicationMonitoringV1.1 (your custom Lambda function name). Select the appropriate graph type and period. In the following screenshot, I chose Stacked area for 1 hour of data points.

implement_serverless_2

implement_serverless_3

Summary

In this post, you implemented serverless log analytics that streamed logged data points in a scalable way, with custom business logic. Furthermore, you visualized this data with graphs using AWS services and simple CLI commands.

If you have any questions or suggestions, please comment below.


About the Author

nehal_mehta_100Nehal Mehta is Cloud Architect for AWS Professional Services. As part of professional services team he collaborates with sales, pre-sales, support and product teams to enable partners and customers take benefits of cloud especially big data analytic workloads. He likes to spend time with friends and family, and has interests in technical singularity, politics and surprisingly finance.


Related

Real-time Clickstream Anomaly Detection with Amazon Kinesis Analytics

Clickstream1_o2