Skip to content

Commit b07e9d8

Browse files
authored
Merge pull request #2467 from keboola/fix-report-logs-when-user-error-occurs-on-source
fix: Report user error when wrong data were pushed on HTTP source
2 parents 52158b0 + 1896e34 commit b07e9d8

2 files changed

Lines changed: 162 additions & 16 deletions

File tree

internal/pkg/service/stream/sink/router/router.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package router
55
import (
66
"context"
77
"net/http"
8+
"strings"
89
"sync"
910
"time"
1011

@@ -15,6 +16,7 @@ import (
1516
"golang.org/x/exp/maps"
1617

1718
"github.com/keboola/keboola-as-code/internal/pkg/log"
19+
svcerrors "github.com/keboola/keboola-as-code/internal/pkg/service/common/errors"
1820
"github.com/keboola/keboola-as-code/internal/pkg/service/common/etcdop"
1921
"github.com/keboola/keboola-as-code/internal/pkg/service/common/servicectx"
2022
"github.com/keboola/keboola-as-code/internal/pkg/service/stream/definition"
@@ -263,6 +265,47 @@ func (r *Router) DispatchToSource(sourceKey key.SourceKey, c recordctx.Context)
263265
r.metrics.sourceDuration.Record(finalizationCtx, durationMs, metric.WithAttributes(attrs...))
264266
r.metrics.sourceBytes.Add(finalizationCtx, int64(c.BodyLength()), metric.WithAttributes(attrs...))
265267

268+
// Log errors when has_error is true.
269+
// This ensures that when the metric has_error:true is reported, there is a corresponding log entry in Datadog.
270+
// This is important because some errors (e.g., 400 Bad Request) are not logged at the sink level,
271+
// but they still set has_error:true in the metric.
272+
if result.FailedSinks > 0 {
273+
// Collect failed sink information for logging
274+
var failedSinkDetails []string
275+
for _, sinkResult := range result.Sinks {
276+
if sinkResult.error != nil {
277+
// Format error message for the failed sink
278+
var errorMsg string
279+
var withMsg svcerrors.WithUserMessage
280+
if errors.As(sinkResult.error, &withMsg) {
281+
errorMsg = withMsg.ErrorUserMessage()
282+
} else {
283+
errorMsg = errors.Format(sinkResult.error, errors.FormatAsSentences())
284+
}
285+
failedSinkDetails = append(
286+
failedSinkDetails,
287+
errors.Errorf("sink %s: %s", sinkResult.SinkID, errorMsg).Error(),
288+
)
289+
}
290+
}
291+
292+
// Create log message with failed sink details
293+
logMsg := errors.Errorf(
294+
"source record processing failed: %d/%d sinks failed. Failed sinks: %s",
295+
result.FailedSinks,
296+
result.AllSinks,
297+
strings.Join(failedSinkDetails, "; "),
298+
).Error()
299+
300+
// Log with appropriate level based on status code
301+
// Use Error level for server errors (5xx), Warn level for client errors (4xx)
302+
if result.StatusCode >= http.StatusInternalServerError {
303+
r.logger.Errorf(finalizationCtx, logMsg)
304+
} else {
305+
r.logger.Warnf(finalizationCtx, logMsg)
306+
}
307+
}
308+
266309
return result
267310
}
268311

internal/pkg/service/stream/source/type/httpsource/httpsource_test.go

Lines changed: 119 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"io"
77
"net/http"
8+
"reflect"
89
"strings"
910
"testing"
1011
"time"
@@ -15,6 +16,7 @@ import (
1516
"github.com/stretchr/testify/assert"
1617
"github.com/stretchr/testify/require"
1718

19+
"github.com/keboola/keboola-as-code/internal/pkg/encoding/json"
1820
"github.com/keboola/keboola-as-code/internal/pkg/log"
1921
commonDeps "github.com/keboola/keboola-as-code/internal/pkg/service/common/dependencies"
2022
"github.com/keboola/keboola-as-code/internal/pkg/service/stream"
@@ -43,7 +45,7 @@ type TestCase struct {
4345
ExpectedStatusCode int
4446
ExpectedHeaders map[string]string
4547
ExpectedBody string
46-
ExpectedLogs string
48+
ExpectedLogs []string
4749
}
4850

4951
type testState struct {
@@ -176,7 +178,7 @@ func testCases(t *testing.T, ts *testState) []TestCase {
176178
Path: "/foo",
177179
ExpectedStatusCode: http.StatusNotFound,
178180
ExpectedHeaders: map[string]string{"Server": httpsource.ServerHeader},
179-
ExpectedLogs: `{"level":"info","message":"not found, please send data using POST /stream/<projectID>/<sourceID>/<secret>"}`,
181+
ExpectedLogs: []string{`{"level":"info","message":"not found, please send data using POST /stream/<projectID>/<sourceID>/<secret>"}`},
180182
ExpectedBody: `
181183
{
182184
"statusCode": 404,
@@ -226,7 +228,7 @@ func testCases(t *testing.T, ts *testState) []TestCase {
226228
"error": "stream.in.noSourceFound",
227229
"message": "The specified combination of projectID, sourceID and secret was not found."
228230
}`,
229-
ExpectedLogs: `{"level":"warn","message":"dispatch failed","nodeId":"test-node","project.id":"1111","source.id":"my-source","component":"http-source"}`,
231+
ExpectedLogs: []string{`{"level":"warn","message":"dispatch failed","nodeId":"test-node","project.id":"1111","source.id":"my-source","component":"http-source"}`},
230232
},
231233
{
232234
Name: "stream input - POST - not found - invalid secret",
@@ -241,7 +243,7 @@ func testCases(t *testing.T, ts *testState) []TestCase {
241243
"error": "stream.in.noSourceFound",
242244
"message": "The specified combination of projectID, sourceID and secret was not found."
243245
}`,
244-
ExpectedLogs: `{"level":"warn","message":"dispatch failed","nodeId":"test-node","project.id":"123","source.id":"my-source-1","component":"http-source"}`,
246+
ExpectedLogs: []string{`{"level":"warn","message":"dispatch failed","nodeId":"test-node","project.id":"123","source.id":"my-source-1","component":"http-source"}`},
245247
},
246248
{
247249
Name: "stream input - POST - not found - disabled source",
@@ -256,7 +258,7 @@ func testCases(t *testing.T, ts *testState) []TestCase {
256258
"error": "stream.in.disabledSource",
257259
"message": "The specified source is disabled in all branches."
258260
}`,
259-
ExpectedLogs: `{"level":"warn","message":"dispatch failed","nodeId":"test-node","project.id":"123","source.id":"my-source-2","component":"http-source"}`,
261+
ExpectedLogs: []string{`{"level":"warn","message":"dispatch failed","nodeId":"test-node","project.id":"123","source.id":"my-source-2","component":"http-source"}`},
260262
},
261263
{
262264
Name: "stream input - POST - open pipeline error",
@@ -274,10 +276,10 @@ func testCases(t *testing.T, ts *testState) []TestCase {
274276
"Content-Type": "application/json",
275277
"Server": httpsource.ServerHeader,
276278
},
277-
ExpectedLogs: `
278-
{"level":"error","message":"write record error: cannot open sink pipeline: some open error, next attempt after %s","component":"sink.router"}
279-
{"level":"error","message":"write record error: cannot open sink pipeline: some open error, next attempt after %s","component":"sink.router"}
280-
`,
279+
ExpectedLogs: []string{
280+
`{"level":"error","message":"write record error: cannot open sink pipeline: some open error, next attempt after %s","component":"sink.router"}`,
281+
`{"level":"error","message":"source record processing failed: 1/1 sinks failed. Failed sinks: sink my-sink-1: Cannot open sink pipeline: some open error, next attempt after %s.","component":"sink.router"}`,
282+
},
281283
ExpectedBody: `
282284
{
283285
"statusCode": 500,
@@ -336,10 +338,10 @@ func testCases(t *testing.T, ts *testState) []TestCase {
336338
"Content-Type": "application/json",
337339
"Server": httpsource.ServerHeader,
338340
},
339-
ExpectedLogs: `
340-
{"level":"error","message":"write record error: some write error","component":"sink.router"}
341-
{"level":"error","message":"write record error: some write error","component":"sink.router"}
342-
`,
341+
ExpectedLogs: []string{
342+
`{"level":"error","message":"write record error: some write error","component":"sink.router"}`,
343+
`{"level":"error","message":"source record processing failed: 1/1 sinks failed. Failed sinks: sink my-sink-1: Some write error.","component":"sink.router"}`,
344+
},
343345
ExpectedBody: `
344346
{
345347
"statusCode": 500,
@@ -524,7 +526,7 @@ func testCases(t *testing.T, ts *testState) []TestCase {
524526
Headers: map[string]string{"foo": strings.Repeat(".", ts.maxHeaderSize+1)},
525527
ExpectedStatusCode: http.StatusRequestEntityTooLarge,
526528
ExpectedHeaders: map[string]string{"Server": httpsource.ServerHeader},
527-
ExpectedLogs: `{"level":"info","message":"request header size is over the maximum \"2000B\"","error.type":"%s/errors.HeaderTooLargeError"}`,
529+
ExpectedLogs: []string{`{"level":"info","message":"request header size is over the maximum \"2000B\"","error.type":"%s/errors.HeaderTooLargeError"}`},
528530
ExpectedBody: `
529531
{
530532
"statusCode": 413,
@@ -539,7 +541,7 @@ func testCases(t *testing.T, ts *testState) []TestCase {
539541
Body: strings.NewReader(strings.Repeat(".", ts.maxBodySize+1)),
540542
ExpectedStatusCode: http.StatusRequestEntityTooLarge,
541543
ExpectedHeaders: map[string]string{"Server": httpsource.ServerHeader},
542-
ExpectedLogs: `{"level":"info","message":"request body size is over the maximum \"8000B\"","error.type":"%s/errors.BodyTooLargeError"}`,
544+
ExpectedLogs: []string{`{"level":"info","message":"request body size is over the maximum \"8000B\"","error.type":"%s/errors.BodyTooLargeError"}`},
543545
ExpectedBody: `
544546
{
545547
"statusCode": 413,
@@ -600,6 +602,99 @@ func testCases(t *testing.T, ts *testState) []TestCase {
600602
}
601603
}
602604

605+
// assertLogsUnordered checks that each expected log message appears at least the required number of times
606+
// in the actual logs, regardless of order. This allows for non-deterministic log ordering in tests.
607+
func assertLogsUnordered(t assert.TestingT, logger log.DebugLogger, expectedLogs []string, minOccurrences int) {
608+
if len(expectedLogs) == 0 {
609+
return
610+
}
611+
612+
actualLogs := logger.AllMessages()
613+
actualLines := strings.Split(strings.Trim(actualLogs, "\n"), "\n")
614+
615+
// Count occurrences of each expected log in actual logs
616+
for _, expectedLog := range expectedLogs {
617+
expectedLog = strings.TrimSpace(expectedLog)
618+
if expectedLog == "" {
619+
continue
620+
}
621+
622+
occurrences := 0
623+
expectedData, err := decodeExpectedLog(expectedLog)
624+
if err != nil {
625+
assert.Fail(t, fmt.Sprintf("failed to decode expected log: %s: %v", expectedLog, err))
626+
continue
627+
}
628+
629+
for _, actualLine := range actualLines {
630+
actualLine = strings.TrimSpace(actualLine)
631+
if actualLine == "" {
632+
continue
633+
}
634+
635+
actualData, err := decodeActualLog(actualLine)
636+
if err != nil {
637+
continue // Skip invalid JSON lines
638+
}
639+
640+
// Check if this actual log matches the expected log
641+
if logMatches(expectedData, actualData) {
642+
occurrences++
643+
}
644+
}
645+
646+
assert.GreaterOrEqual(t, occurrences, minOccurrences,
647+
"expected log message should appear at least %d times, found %d times:\n%s",
648+
minOccurrences, occurrences, expectedLog)
649+
}
650+
}
651+
652+
// decodeExpectedLog decodes an expected log message JSON string into a map for comparison.
653+
func decodeExpectedLog(logStr string) (map[string]any, error) {
654+
var result map[string]any
655+
err := json.DecodeString(logStr, &result)
656+
if err != nil {
657+
return nil, errors.Wrapf(err, "expected log contains invalid json: %s", logStr)
658+
}
659+
return result, nil
660+
}
661+
662+
// decodeActualLog decodes an actual log message JSON string into a map for comparison.
663+
func decodeActualLog(logStr string) (map[string]any, error) {
664+
var result map[string]any
665+
err := json.DecodeString(logStr, &result)
666+
if err != nil {
667+
return nil, errors.Wrapf(err, "actual log contains invalid json: %s", logStr)
668+
}
669+
return result, nil
670+
}
671+
672+
// logMatches checks if an actual log message matches an expected log message.
673+
// It compares all fields from the expected log against the actual log using wildcard matching for strings.
674+
func logMatches(expected, actual map[string]any) bool {
675+
for key, expectedValue := range expected {
676+
actualValue, ok := actual[key]
677+
if !ok {
678+
return false
679+
}
680+
681+
// Use wildcard matching for string values
682+
if expectedStr, ok := expectedValue.(string); ok {
683+
if actualStr, ok := actualValue.(string); ok {
684+
err := wildcards.Compare(expectedStr, actualStr)
685+
if err != nil {
686+
return false
687+
}
688+
} else {
689+
return false
690+
}
691+
} else if !reflect.DeepEqual(expectedValue, actualValue) {
692+
return false
693+
}
694+
}
695+
return true
696+
}
697+
603698
func sendTestRequests(t *testing.T, f *testState) {
604699
t.Helper()
605700

@@ -632,7 +727,15 @@ func sendTestRequests(t *testing.T, f *testState) {
632727
// Error + logs
633728
resp, err := http.DefaultClient.Do(req)
634729
assert.EventuallyWithT(t, func(c *assert.CollectT) {
635-
logger.AssertJSONMessages(c, tc.ExpectedLogs)
730+
if len(tc.ExpectedLogs) > 0 {
731+
// Use unordered assertion: 1 occurrence for single log, 2 occurrences for multiple logs
732+
// Multiple expected logs typically indicate we expect duplicates (e.g., per branch)
733+
minOccurrences := 1
734+
if len(tc.ExpectedLogs) > 1 {
735+
minOccurrences = 2
736+
}
737+
assertLogsUnordered(c, logger, tc.ExpectedLogs, minOccurrences)
738+
}
636739
}, 5*time.Second, 10*time.Millisecond)
637740
if tc.ExpectedErr != "" {
638741
if assert.Error(t, err) {

0 commit comments

Comments
 (0)