All Collections
Send your logs
AWS Kinesis with Lambda function
AWS Kinesis with Lambda function

Collect your AWS Kinesis logs using our Lambda function

Eldar Aliiev avatar
Written by Eldar Aliiev
Updated over a week ago

Coralogix provides a predefined Lambda function to forward your Kinesis stream straight to Coralogix.

The preferred and easiest integration method will be to use our aws Serverless Application Repository. Search for 'coralogix'. Don't forget to check the 'Show apps that create custom IAM roles or resource policies' box located just under the search field to see all available applications. Select your application of choice and click on it. You will see detailed instructions in the readme section on the left.

The rest of this document describes a manual configuration of this integration and the Lambda associated with it and should be used if there is a need for special customization.

Setup

1. Create an “author from scratch” Node.js 8.10 runtime lambda with the following permissions(Create execution role tutorial):

          kinesis:DescribeStream
​          kinesis:DescribeStreamSummary
​          kinesis:GetRecords
​          kinesis:GetShardIterator
​          kinesis:ListShards
​          kinesis:ListStreams
​          kinesis:SubscribeToShard

2. At “Code entry type” choose “Edit code inline” and paste the following function:

'use strict';

const AWS = require('aws-sdk');
const https = require('https');
const assert = require('assert');

assert(process.env.private_key, 'No private key')
const appName = process.env.app_name ? process.env.app_name : 'NO_APPLICATION';
const subName = process.env.sub_name ? process.env.sub_name : 'NO_SUBSYSTEM';

exports.handler = (event, context, callback) => {

    function parseEvent(streamEventRecord) {
        let streamEventData = new Buffer(streamEventRecord.kinesis.data, 'base64').toString('ascii');
        return {
            "timestamp": streamEventRecord.kinesis.approximateArrivalTimestamp * 1000,
            "severity": getSeverityLevel(streamEventData),
            "text": streamEventData
        };
    }

    function postEventsToCoralogix(parsedEvents) {
        try {
            var options = {
                hostname: 'api.coralogix.com',
                port: 443,
                path: '/api/v1/logs',
                method: 'POST',
                headers: {
                    'Content-Type': 'application/json',
                }
            };

            let retries = 3;
            let timeoutMs = 10000;
            let retryNum = 0;
            let sendRequest = function sendRequest() {
                let req = https.request(options, function (res) {
                    console.log('Status: ' + res.statusCode);
                    console.log('Headers: ' + JSON.stringify(res.headers));
                    res.setEncoding('utf8');
                    res.on('data', function (body) {
                        console.log('Body: ' + body);
                    });
                });

                req.setTimeout(timeoutMs, () => {
                    req.abort();
                    if (retryNum++ < retries) {
                        console.log('problem with request: timeout reached. retrying ' + retryNum + '/' + retries);
                        sendRequest();
                    } else {
                        console.log('problem with request: timeout reached. failed all retries.');
                    }
                });

                req.on('error', function (e) {
                    console.log('problem with request: ' + e.message);
                });

                req.write(JSON.stringify(parsedEvents));
                req.end();
            }

            sendRequest();
        } catch (ex) {
            console.log(ex.message);
            callback(ex.message);
        }
    }

    function getSeverityLevel(message) {
        var severity = 3;

        if(message.includes('debug'))
            severity = 1
        if(message.includes('verbose'))
            severity = 2
        if(message.includes('info'))
            severity = 3
        if(message.includes('warn') || message.includes('warning'))
            severity = 4
        if(message.includes('error'))
            severity = 5
        if(message.includes('critical') || message.includes('panic'))
            severity = 6

        return severity;
    }

    postEventsToCoralogix({
        "privateKey": process.env.private_key,
        "applicationName": appName,
        "subsystemName": subName,
        "logEntries": event.Records.map((eventRecord) => parseEvent(eventRecord))
    });
};

3. Add the mandatory environment variables:
      1. Private Key – A unique ID which represents your company, this Id will be sent to your mail once you register to Coralogix.
      2. Application Name – Used to separate your environment, e.g. SuperApp-test/SuperApp-prod.
      3. SubSystem Name – Your application probably has multiple subsystems, for example, Backend servers, Middleware, Frontend servers etc.

If your Coralogix account top level domain is not ‘.com’, add the following environment variable:

CORALOGIX_URL=https://api.Cluster URL/api/v1/logs

Account Url Ending with .us

Account Url Ending with .com

Account Url Ending with .in

coralogix.us

coralogix.com

app.coralogix.in

4. Go to Add triggers and add Kinesis:

5. Configure the trigger, select the desired “Kinesis stream” and “Consumer”, change “Batch size” equals to 10:

6. Increase Memory to 1024MB and Timeout to 1 min.

7. Click “Save”.

Did this answer your question?