Skip to content

Commit e3eeffb

Browse files
authored
Merge pull request #30 from SumoLogic/elevatorAWSreinvent
Changes for making extension work with lambda managed instance environment
2 parents d25eefc + 5f8061e commit e3eeffb

13 files changed

Lines changed: 598 additions & 35 deletions

CONTRIBUTING.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ First of all, thanks for contributing!. Before contributing please read the [COD
5858
1. Change the layer_name variable in zip.sh to avoid replacing the prod.
5959
1. Run the following command to publish the layer:
6060
`sh zip.sh`
61+
1. The zip file generated by previous step should have `extensions` folder in it, which should consist of the binary for the extension.
6162
1. Run the following command to verify that the layer version is published across regions:
6263
`sh verify_layer_versions.sh`
6364

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ All the logs that are not sent to Sumo Logic during the Execution phase of the A
2727

2828
If you would like to always send logs during the execution phase however, you can add extra execution time via a sleep function at the end of lambda code, which will give your extension time to run and send logs to Sumo Logic. We recommend setting this to two seconds.
2929

30+
# Managed Instance Runtime Support
31+
This Lambda extension from version v1.4.0 also supports [managed instance](https://docs.aws.amazon.com/lambda/latest/dg/lambda-managed-instances.html) runtime.
32+
3033
# Using Lambda extension in custom container images
3134

3235
Follow the instruction in [docs](https://help.sumologic.com/03Send-Data/Collect-from-Other-Data-Sources/Collect_AWS_Lambda_Logs_using_an_Extension#For_AWS_Lambda_Functions_Created_Using_Container_Images:)
Binary file not shown.
Binary file not shown.

lambda-extensions/lambdaapi/extensionapiclient.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,15 +46,20 @@ const (
4646
)
4747

4848
var (
49-
lambdaEvents = []EventType{"INVOKE", "SHUTDOWN"}
49+
lambdaEvents = []EventType{"INVOKE", "SHUTDOWN"}
50+
managedInstanceLambdaEvents = []EventType{"SHUTDOWN"}
5051
)
5152

5253
// RegisterExtension is to register extension to Run Time API client. Call the following method on initialization as early as possible,
5354
// otherwise you may get a timeout error. Runtime initialization will start after all extensions are registered.
54-
func (client *Client) RegisterExtension(ctx context.Context) (*RegisterResponse, error) {
55+
func (client *Client) RegisterExtension(ctx context.Context, isManagedInstance bool) (*RegisterResponse, error) {
5556
URL := client.baseURL + extensionURL + "register"
57+
events := lambdaEvents
58+
if isManagedInstance {
59+
events = managedInstanceLambdaEvents
60+
}
5661
reqBody, err := json.Marshal(map[string]interface{}{
57-
"events": lambdaEvents,
62+
"events": events,
5863
})
5964
if err != nil {
6065
return nil, err

lambda-extensions/lambdaapi/extensionapiclient_test.go

Lines changed: 145 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,11 @@ func TestRegisterExtension(t *testing.T) {
3434
client := NewClient(srv.URL[7:], extensionName)
3535

3636
// Without Context
37-
response, err := client.RegisterExtension(context.TODO())
37+
response, err := client.RegisterExtension(context.TODO(), false)
3838
commonAsserts(t, client, response, err)
3939

4040
// With Context
41-
response, err = client.RegisterExtension(context.Background())
41+
response, err = client.RegisterExtension(context.Background(), false)
4242
commonAsserts(t, client, response, err)
4343
}
4444

@@ -147,3 +147,146 @@ func TestExitError(t *testing.T) {
147147
response, err = client.ExitError(context.Background(), "EXIT ERROR")
148148
commonAsserts(t, client, response, err)
149149
}
150+
151+
// TestRegisterExtension_ManagedInstanceMode tests extension registration in managed instance mode
152+
// In ManagedInstance mode, only SHUTDOWN events are registered (not INVOKE)
153+
func TestRegisterExtension_ManagedInstanceMode(t *testing.T) {
154+
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
155+
assertEqual(t, r.Method, http.MethodPost, "Method is not POST")
156+
assertNotEmpty(t, r.Header.Get(extensionNameHeader), "Extension Name Header not present")
157+
158+
reqBytes, err := ioutil.ReadAll(r.Body)
159+
assertNoError(t, err, "Received error while reading request")
160+
defer func() {
161+
if err := r.Body.Close(); err != nil {
162+
log.Printf("failed to close body: %v", err)
163+
}
164+
}()
165+
assertNotEmpty(t, reqBytes, "Received error in request")
166+
167+
// Verify the request body contains only SHUTDOWN event for managed instance mode
168+
var reqBody map[string]interface{}
169+
err = json.Unmarshal(reqBytes, &reqBody)
170+
assertNoError(t, err, "Failed to unmarshal request body")
171+
172+
events, ok := reqBody["events"].([]interface{})
173+
if !ok {
174+
t.Error("Events field not found or not an array")
175+
}
176+
assertEqual(t, len(events), 1, "Expected 1 event for managed instance mode")
177+
assertEqual(t, events[0], "SHUTDOWN", "Expected only SHUTDOWN event for managed instance mode")
178+
179+
w.Header().Add(extensionIdentiferHeader, "test-sumo-id")
180+
w.WriteHeader(200)
181+
respBytes, _ := json.Marshal(RegisterResponse{
182+
FunctionName: "test-function",
183+
FunctionVersion: "$LATEST",
184+
Handler: "index.handler",
185+
})
186+
_, _ = w.Write(respBytes)
187+
}))
188+
189+
defer srv.Close()
190+
client := NewClient(srv.URL[7:], extensionName)
191+
192+
// Test with isManagedInstance = true
193+
response, err := client.RegisterExtension(context.Background(), true)
194+
commonAsserts(t, client, response, err)
195+
196+
// Verify the response is properly unmarshaled
197+
if response.FunctionName != "test-function" {
198+
t.Errorf("Expected function name 'test-function', got '%s'", response.FunctionName)
199+
}
200+
}
201+
202+
// TestRegisterExtension_ManagedInstanceModeWithoutContext tests managed instance mode without context
203+
func TestRegisterExtension_ManagedInstanceModeWithoutContext(t *testing.T) {
204+
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
205+
reqBytes, err := ioutil.ReadAll(r.Body)
206+
assertNoError(t, err, "Received error while reading request")
207+
defer func() {
208+
if err := r.Body.Close(); err != nil {
209+
log.Printf("failed to close body: %v", err)
210+
}
211+
}()
212+
213+
var reqBody map[string]interface{}
214+
err = json.Unmarshal(reqBytes, &reqBody)
215+
assertNoError(t, err, "Failed to unmarshal request body")
216+
217+
events, ok := reqBody["events"].([]interface{})
218+
if !ok {
219+
t.Error("Events field not found or not an array")
220+
}
221+
assertEqual(t, len(events), 1, "Expected 1 event for managed instance mode")
222+
223+
w.Header().Add(extensionIdentiferHeader, "test-sumo-id")
224+
w.WriteHeader(200)
225+
respBytes, _ := json.Marshal(RegisterResponse{})
226+
_, _ = w.Write(respBytes)
227+
}))
228+
229+
defer srv.Close()
230+
client := NewClient(srv.URL[7:], extensionName)
231+
232+
// Test with isManagedInstance = true and nil context
233+
response, err := client.RegisterExtension(context.TODO(), true)
234+
commonAsserts(t, client, response, err)
235+
}
236+
237+
// TestRegisterExtension_ManagedInstanceModeEventValidation tests that managed instance mode registers correct events
238+
func TestRegisterExtension_ManagedInstanceModeEventValidation(t *testing.T) {
239+
receivedEvents := make([]string, 0)
240+
241+
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
242+
reqBytes, err := ioutil.ReadAll(r.Body)
243+
assertNoError(t, err, "Received error while reading request")
244+
defer func() {
245+
if err := r.Body.Close(); err != nil {
246+
log.Printf("failed to close body: %v", err)
247+
}
248+
}()
249+
250+
var reqBody map[string]interface{}
251+
err = json.Unmarshal(reqBytes, &reqBody)
252+
assertNoError(t, err, "Failed to unmarshal request body")
253+
254+
events, ok := reqBody["events"].([]interface{})
255+
if !ok {
256+
t.Error("Events field not found or not an array")
257+
}
258+
259+
for _, e := range events {
260+
receivedEvents = append(receivedEvents, e.(string))
261+
}
262+
263+
w.Header().Add(extensionIdentiferHeader, "test-sumo-id")
264+
w.WriteHeader(200)
265+
respBytes, _ := json.Marshal(RegisterResponse{})
266+
_, _ = w.Write(respBytes)
267+
}))
268+
269+
defer srv.Close()
270+
client := NewClient(srv.URL[7:], extensionName)
271+
272+
_, err := client.RegisterExtension(context.Background(), true)
273+
assertNoError(t, err, "Failed to register extension in ManagedInstance mode")
274+
275+
// Validate that INVOKE event is NOT present in managed instance mode
276+
for _, event := range receivedEvents {
277+
if event == "INVOKE" {
278+
t.Error("INVOKE event should not be registered in managed instance mode")
279+
}
280+
}
281+
282+
// Validate that SHUTDOWN event IS present
283+
foundShutdown := false
284+
for _, event := range receivedEvents {
285+
if event == "SHUTDOWN" {
286+
foundShutdown = true
287+
}
288+
}
289+
if !foundShutdown {
290+
t.Error("SHUTDOWN event should be registered in managed instance mode")
291+
}
292+
}

lambda-extensions/lambdaapi/telemetryapiclient.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,18 @@ const (
1414
//telemetry_receiverPort = 4243
1515
)
1616

17-
// SubscribeToLogsAPI is - Subscribe to Logs API to receive the Lambda Logs.
18-
func (client *Client) SubscribeToTelemetryAPI(ctx context.Context, logEvents []string, telemetryTimeoutMs int, telemetryMaxBytes int64, telemetryMaxItems int) ([]byte, error) {
17+
// SubscribeToTelemetryAPI is - Subscribe to Telemetry API to receive the Lambda Telemetry.
18+
func (client *Client) SubscribeToTelemetryAPI(ctx context.Context, logEvents []string, telemetryTimeoutMs int, telemetryMaxBytes int64, telemetryMaxItems int, isManagedInstance bool) ([]byte, error) {
1919
URL := client.baseURL + telemetryURL
20-
20+
schemaVersion := "2022-07-01"
21+
if isManagedInstance {
22+
schemaVersion = "2025-01-29"
23+
}
2124
reqBody, err := json.Marshal(map[string]interface{}{
2225
"destination": map[string]interface{}{"protocol": "HTTP", "URI": fmt.Sprintf("http://sandbox:%v", receiverPort)},
2326
"types": logEvents,
2427
"buffering": map[string]interface{}{"timeoutMs": telemetryTimeoutMs, "maxBytes": telemetryMaxBytes, "maxItems": telemetryMaxItems},
25-
"schemaVersion": "2022-07-01",
28+
"schemaVersion": schemaVersion,
2629
})
2730
if err != nil {
2831
return nil, err

lambda-extensions/lambdaapi/telemetryapiclient_test.go

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package lambdaapi
22

33
import (
44
"context"
5+
"encoding/json"
56
ioutil "io"
67
"log"
78
"net/http"
@@ -31,10 +32,69 @@ func TestSubscribeToTelemetryAPI(t *testing.T) {
3132
client := NewClient(srv.URL[7:], extensionName)
3233

3334
// Without Context
34-
response, err := client.SubscribeToTelemetryAPI(context.TODO(), []string{"platform", "function", "extension"}, 1000, 262144, 10000)
35+
response, err := client.SubscribeToTelemetryAPI(context.TODO(), []string{"platform", "function", "extension"}, 1000, 262144, 10000, false)
3536
commonAsserts(t, client, response, err)
3637

3738
// With Context
38-
response, err = client.SubscribeToTelemetryAPI(context.Background(), []string{"platform", "function", "extension"}, 1000, 262144, 10000)
39+
response, err = client.SubscribeToTelemetryAPI(context.Background(), []string{"platform", "function", "extension"}, 1000, 262144, 10000, false)
40+
commonAsserts(t, client, response, err)
41+
}
42+
43+
// TestSubscribeToTelemetryAPI_ManagedInstanceMode tests telemetry API subscription in managed instance mode
44+
// In managed instance mode, schema version should be "2025-01-29" instead of "2022-07-01"
45+
func TestSubscribeToTelemetryAPI_ManagedInstanceMode(t *testing.T) {
46+
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
47+
assertEqual(t, r.Method, http.MethodPut, "Method is not PUT")
48+
assertNotEmpty(t, r.Header.Get(extensionNameHeader), "Extension Name Header not present")
49+
50+
reqBytes, err := ioutil.ReadAll(r.Body)
51+
assertNoError(t, err, "Received error")
52+
defer func() {
53+
if err := r.Body.Close(); err != nil {
54+
log.Printf("failed to close body: %v", err)
55+
}
56+
}()
57+
assertNotEmpty(t, reqBytes, "Received error in request")
58+
59+
// Verify the request body contains managed instance mode schema version
60+
var reqBody map[string]interface{}
61+
err = json.Unmarshal(reqBytes, &reqBody)
62+
assertNoError(t, err, "Failed to unmarshal request body")
63+
64+
schemaVersion, ok := reqBody["schemaVersion"].(string)
65+
if !ok {
66+
t.Error("schemaVersion field not found or not a string")
67+
}
68+
assertEqual(t, schemaVersion, "2025-01-29", "Expected managed instance mode schema version '2025-01-29'")
69+
70+
// Verify other required fields are present
71+
_, destinationExists := reqBody["destination"]
72+
if !destinationExists {
73+
t.Error("destination field not found")
74+
}
75+
76+
_, typesExists := reqBody["types"]
77+
if !typesExists {
78+
t.Error("types field not found")
79+
}
80+
81+
_, bufferingExists := reqBody["buffering"]
82+
if !bufferingExists {
83+
t.Error("buffering field not found")
84+
}
85+
86+
w.Header().Add(extensionIdentiferHeader, "test-sumo-id")
87+
w.WriteHeader(200)
88+
}))
89+
90+
defer srv.Close()
91+
client := NewClient(srv.URL[7:], extensionName)
92+
93+
// Test with isManagedInstance = true (context)
94+
response, err := client.SubscribeToTelemetryAPI(context.Background(), []string{"platform", "function", "extension"}, 1000, 262144, 10000, true)
95+
commonAsserts(t, client, response, err)
96+
97+
// Test with isManagedInstance = true (without context)
98+
response, err = client.SubscribeToTelemetryAPI(context.TODO(), []string{"platform", "function", "extension"}, 1000, 262144, 10000, true)
3999
commonAsserts(t, client, response, err)
40100
}

0 commit comments

Comments
 (0)