{ "Records": [ { "awsRegion": "us-east-1", "eventID": "<event-id>", "eventName": "aws:kinesis:record", "eventSource": "aws:kinesis", "eventSourceARN": "arn:aws:kinesis:us-east-1:<xxxxxxxx>:stream/<stream>", "eventVersion": "1.0", "invokeIdentityArn": "arn:aws:iam::<xxxxxx>:role/<role>", "kinesis": { "approximateArrivalTimestamp": <timestamp>, "data": <data>, "partitionKey": "<partionkey>", "sequenceNumber": "<sequenceNumber>", "kinesisSchemaVersion": "1.0" } } ] }
package main import ( "context" "fmt" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) func handler(ctx context.Context, kinesisEvent events.KinesisEvent) error { for _, record := range kinesisEvent.Records { kinesisRecord := record.Kinesis dataBytes := kinesisRecord.Data dataText := string(dataBytes) fmt.Printf("%s Data = %s \n", record.EventName, dataText) } return nil } func main() { lambda.Start(handler) }
type KinesisEventData struct { FilePath string `json:"filePath"` Id int `json:"id"` }
type CloudSearchDocument struct { Directory string `json:"dir"` FileName string `json:"name"` FileExtension string `json:"ext"` }
[ {"type": "add", "id": "12345", "fields": { "dir": ":", "name": "file.txt", "ext": "txt" } } ]
var amasonDocumentsBatch []AmazonDocumentUploadRequest //Preparing data for _, record := range kinesisEvent.Records { kinesisRecord := record.Kinesis dataBytes := kinesisRecord.Data fmt.Printf("%s Data = %s \n", record.EventName, string(dataBytes)) //Deserialize data from kinesis to KinesisEventData var eventData KinesisEventData err := json.Unmarshal(dataBytes, &eventData) if err != nil { return failed(), err } //Convert data to CloudSearch format document := ConvertToCloudSearchDocument(eventData) request := CreateAmazonDocumentUploadRequest(eventData.Id, document) amasonDocumentsBatch = append(amasonDocumentsBatch, request) }
if len(amasonDocumentsBatch) > 0 { fmt.Print("Connecting to cloudsearch...\n") svc := cloudsearchdomain.New(session.New(&aws.Config{ Region: aws.String(os.Getenv("SearchRegion")), Endpoint: aws.String(os.Getenv("SearchEndpoint")), MaxRetries: aws.Int(6), })) fmt.Print("Creating request...\n") batch, err := json.Marshal(amasonDocumentsBatch) if err != nil { return failed(), err } fmt.Printf("Search document = %s \n", batch) params := &cloudsearchdomain.UploadDocumentsInput{ ContentType: aws.String("application/json"), Documents: strings.NewReader(string(batch)), } fmt.Print("Starting to upload...\n") req, resp := svc.UploadDocumentsRequest(params) fmt.Print("Send request...\n") err = req.Send() if err != nil { return failed(), err } fmt.Println(resp) }
go get -u github.com/aws/aws-lambda-go/cmd/build-lambda-zip go get -d github.com/aws/aws-sdk-go/
os.Getenv("SearchRegion") os.Getenv("SearchEndpoint")
{ "Records": [ { "awsRegion": "us-east-1", "eventID": "shardId-000000000001:1", "eventName": "aws:kinesis:record", "eventSource": "aws:kinesis", "eventSourceARN": "arn:aws:kinesis:us-east-1:xxx", "eventVersion": "1.0", "invokeIdentityArn": "arn:aws:iam::xxx", "kinesis": { "approximateArrivalTimestamp": 1522222222.06, "data": "eyJpZCI6IDEyMzQ1LCJmaWxlUGF0aCI6ICJDOlxcZmlsZS50eHQifQ==", "partitionKey": "key", "sequenceNumber": "1", "kinesisSchemaVersion": "1.0" } }, { "awsRegion": "us-east-1", "eventID": "shardId-000000000001:1", "eventName": "aws:kinesis:record", "eventSource": "aws:kinesis", "eventSourceARN": "arn:aws:kinesis:us-east-1:xxx", "eventVersion": "1.0", "invokeIdentityArn": "arn:aws:iam::xxx", "kinesis": { "approximateArrivalTimestamp": 1522222222.06, "data": "eyJpZCI6IDEyMzQ2LCJmaWxlUGF0aCI6ICJDOlxcZm9sZGVyXFxmaWxlLnR4dCJ9", "partitionKey": "key", "sequenceNumber": "2", "kinesisSchemaVersion": "1.0" } } ] }
Source: https://habr.com/ru/post/351876/
All Articles