Skip to content

Commit ccd3230

Browse files
authored
Merge pull request #2585 from keboola/mvasko/PSGO-233/fix-orphaned-scheduler-manifest
fix(manifest): handle orphaned scheduler/orchestrator without panicking
2 parents 594a575 + 58d6425 commit ccd3230

96 files changed

Lines changed: 1006 additions & 21 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

internal/pkg/project/manifest/manifest.go

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,20 @@ const (
2424
BranchIDOverrideENV = "KBC_BRANCH_ID"
2525
)
2626

27+
// loadHintKey is a context key carrying an additional hint to log alongside the
28+
// "manifest loaded with warnings" warning when ignoreErrors=true. Set it with
29+
// WithLoadHint so that commands (push, diff) can provide context-appropriate
30+
// remediation advice without baking command-specific text into this package.
31+
type loadHintKey struct{}
32+
33+
// WithLoadHint returns a copy of ctx that carries an additional hint message.
34+
// The message is logged as a second Warn line when manifest.Load encounters
35+
// errors and ignoreErrors=true. Pull --force does NOT set a hint so the user
36+
// is not told to run pull --force while they are already doing so.
37+
func WithLoadHint(ctx context.Context, hint string) context.Context {
38+
return context.WithValue(ctx, loadHintKey{}, hint)
39+
}
40+
2741
type InvalidManifestError struct {
2842
error
2943
}
@@ -116,8 +130,27 @@ func Load(ctx context.Context, logger log.Logger, fs filesystem.Fs, envs env.Pro
116130
m.repositories = content.Templates.Repositories
117131

118132
// Set records
119-
if err := m.SetRecords(content.records()); err != nil && !ignoreErrors {
120-
return nil, InvalidManifestError{errors.PrefixError(err, "invalid manifest")}
133+
if err := m.SetRecords(content.records()); err != nil {
134+
if !ignoreErrors {
135+
return nil, InvalidManifestError{errors.PrefixError(err, "invalid manifest")}
136+
}
137+
// Log a warning so the user knows some records were skipped.
138+
// This happens when a config's parent config is missing from the manifest
139+
// (e.g. a scheduler config whose orchestrator was never pulled).
140+
// Orphaned records are deleted by SetRecords, so no record is left with
141+
// an unresolved parent path.
142+
logger.Warn(ctx, "Manifest loaded with warnings (some records were skipped):")
143+
var multi errors.MultiError
144+
if errors.As(err, &multi) {
145+
for _, e := range multi.WrappedErrors() {
146+
logger.Warnf(ctx, " - %s", e)
147+
}
148+
} else {
149+
logger.Warnf(ctx, " - %s", err)
150+
}
151+
if hint, _ := ctx.Value(loadHintKey{}).(string); hint != "" {
152+
logger.Warn(ctx, hint)
153+
}
121154
}
122155

123156
// Return

internal/pkg/project/manifest/manifest_test.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,12 @@ func TestManifestCyclicDependency(t *testing.T) {
181181
manifest, err := Load(ctx, log.NewNopLogger(), fs, env.Empty(), false)
182182
assert.Nil(t, manifest)
183183
require.Error(t, err)
184-
assert.Equal(t, "invalid manifest:\n- a cyclic relation was found when resolving path to config \"branch:123/component:keboola.variables/config:111\"", err.Error())
184+
// Config 111 is removed due to the cyclic dependency. Config 222 (whose parent
185+
// is config 111) is then also detected as having a missing parent and removed.
186+
assert.Equal(t, "invalid manifest:"+
187+
"\n- a cyclic relation was found when resolving path to config \"branch:123/component:keboola.variables/config:111\""+
188+
"\n- manifest record for config \"branch:123/component:keboola.variables/config:111\" not found, referenced from config \"branch:123/component:keboola.variables/config:222\"",
189+
err.Error())
185190
}
186191

187192
func TestManifest_AllowTargetENV(t *testing.T) {

internal/pkg/service/cli/cmd/sync/diff/cmd.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package diff
33
import (
44
"github.com/spf13/cobra"
55

6+
projectManifest "github.com/keboola/keboola-as-code/internal/pkg/project/manifest"
67
"github.com/keboola/keboola-as-code/internal/pkg/service/cli/dependencies"
78
"github.com/keboola/keboola-as-code/internal/pkg/service/cli/helpmsg"
89
"github.com/keboola/keboola-as-code/internal/pkg/service/common/configmap"
@@ -43,8 +44,14 @@ func Command(p dependencies.Provider) *cobra.Command {
4344
return err
4445
}
4546

46-
// Get local project
47-
prj, _, err := d.LocalProject(cmd.Context(), false)
47+
// Get local project.
48+
// Use ignoreErrors=true so that an inconsistent manifest (e.g. a scheduler
49+
// whose orchestrator parent was never pulled) does not block diff.
50+
// SetRecords() deletes any orphaned records, so no record is left with an
51+
// unresolved parent path. A warning is logged by manifest.Load() in this case.
52+
ctx := projectManifest.WithLoadHint(cmd.Context(),
53+
"Run `kbc push` to clean up the manifest, or `kbc pull --force` to reset local state.")
54+
prj, _, err := d.LocalProject(ctx, true)
4855
if err != nil {
4956
return err
5057
}

internal/pkg/service/cli/cmd/sync/push/cmd.go

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@ package push
33
import (
44
"github.com/spf13/cobra"
55

6+
projectManifest "github.com/keboola/keboola-as-code/internal/pkg/project/manifest"
67
"github.com/keboola/keboola-as-code/internal/pkg/service/cli/dependencies"
78
"github.com/keboola/keboola-as-code/internal/pkg/service/cli/helpmsg"
89
"github.com/keboola/keboola-as-code/internal/pkg/service/common/configmap"
10+
saveManifest "github.com/keboola/keboola-as-code/pkg/lib/operation/project/local/manifest/save"
911
"github.com/keboola/keboola-as-code/pkg/lib/operation/project/sync/push"
1012
loadState "github.com/keboola/keboola-as-code/pkg/lib/operation/state/load"
1113
)
@@ -45,18 +47,47 @@ func Command(p dependencies.Provider) *cobra.Command {
4547
return err
4648
}
4749

48-
// Get local project
49-
prj, _, err := d.LocalProject(cmd.Context(), false)
50+
// Get local project.
51+
// Use ignoreErrors=true so that an inconsistent manifest (e.g. a scheduler
52+
// whose orchestrator parent was never pulled) does not block push.
53+
// SetRecords() deletes any orphaned records, so no record is left with an
54+
// unresolved parent path. A warning is logged by manifest.Load() in this case.
55+
// NOTE: if the orphaned configs still exist in remote, running kbc push --force
56+
// will schedule them for remote deletion (they are absent from local state).
57+
hint := "Orphaned records are excluded from the push. Running `kbc push --force` may delete them from remote. " +
58+
"Run `kbc pull --force` to reset local state."
59+
if f.Force.Value {
60+
hint = "Orphaned records are excluded from the push. " +
61+
"With --force, these configs will be deleted from remote if they still exist there. " +
62+
"Run `kbc pull --force` to reset local state first."
63+
}
64+
ctx := projectManifest.WithLoadHint(cmd.Context(), hint)
65+
prj, _, err := d.LocalProject(ctx, true)
5066
if err != nil {
5167
return err
5268
}
5369

70+
// Snapshot IsChanged() before LoadState to capture only orphan-cleanup changes.
71+
// LoadState and push.Run do not modify the manifest (push is remote-only),
72+
// so any post-snapshot IsChanged() would only be from orphan cleanup.
73+
orphansCleaned := prj.ProjectManifest().IsChanged()
74+
5475
// Load project state
5576
projectState, err := prj.LoadState(cmd.Context(), loadState.PushOptions(), d)
5677
if err != nil {
5778
return err
5879
}
5980

81+
// If orphaned records were dropped during manifest load, the in-memory state
82+
// diverges from the manifest file on disk. Save the cleaned manifest now so
83+
// the warning does not recur on every subsequent push/diff invocation.
84+
// Skip when --dry-run: that flag promises no local side effects.
85+
if orphansCleaned && !f.DryRun.Value {
86+
if _, err = saveManifest.Run(cmd.Context(), projectState.ProjectManifest(), projectState.Fs(), d); err != nil {
87+
return err
88+
}
89+
}
90+
6091
// Change description - optional arg
6192
changeDescription := "Updated from #KeboolaCLI"
6293
if len(args) > 0 {

internal/pkg/state/manifest/records.go

Lines changed: 49 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -34,35 +34,73 @@ func NewRecords(sortBy string) *Records {
3434

3535
func (r *Records) SetRecords(records []model.ObjectManifest) error {
3636
r.loaded = false
37+
orphaned := false
3738
defer func() {
38-
// Track if manifest was changed after load
39+
// Track if manifest was changed after load.
40+
// If records were dropped (orphaned), the in-memory state diverges from
41+
// the manifest file on disk — preserve changed=true so callers (e.g. push)
42+
// can detect this and save the cleaned manifest to silence future warnings.
3943
r.loaded = true
40-
r.changed = false
44+
if !orphaned {
45+
r.changed = false
46+
}
4147
}()
4248

4349
// Clear
4450
r.all = orderedmap.New()
4551

46-
// Track records
52+
// Track records (only adds to r.all; parent-path resolution happens below).
53+
// Do not return early on error: successfully tracked records must still have
54+
// their parent paths resolved, otherwise they are left with parentPathSet=false
55+
// which causes panics during subsequent state loading.
4756
errs := errors.NewMultiError()
4857
for _, record := range records {
4958
if err := r.trackRecord(record); err != nil {
5059
errs.Append(err)
60+
orphaned = true
5161
}
5262
}
53-
if errs.Len() > 0 {
54-
return errs
63+
64+
// Resolve parent paths — snapshot keys first: orderedmap.Keys() returns the live slice, mutated by DeleteByKey.
65+
// Child-before-parent ordering is safe: if a parent's PersistRecord fails, the cascade loop removes its children.
66+
for _, key := range append([]string{}, r.all.Keys()...) {
67+
v, found := r.all.Get(key)
68+
if !found {
69+
continue
70+
}
71+
record := v.(model.ObjectManifest)
72+
if err := r.PersistRecord(record); err != nil {
73+
errs.Append(err)
74+
r.DeleteByKey(record.Key())
75+
orphaned = true
76+
r.changed = true // explicit: DeleteByKey only propagates changed for persisted records
77+
}
5578
}
5679

57-
// Resolve parent paths, we can do it when all the records are loaded
58-
for _, key := range r.all.Keys() {
59-
record, _ := r.all.Get(key)
60-
if err := r.PersistRecord(record.(model.ObjectManifest)); err != nil {
61-
return err
80+
// Cascade: re-run until stable, detaching orphaned children whose parent was just removed.
81+
for {
82+
removed := false
83+
for _, key := range append([]string{}, r.all.Keys()...) {
84+
v, found := r.all.Get(key)
85+
if !found {
86+
continue
87+
}
88+
record := v.(model.ObjectManifest)
89+
if _, err := r.GetParent(record); err != nil {
90+
// Intentionally not appended to errs: this is a cascade effect of a
91+
// parent already removed above. The root cause is already in errs.
92+
r.naming.Detach(record.Key()) // must detach: PersistRecord already attached it
93+
r.DeleteByKey(record.Key()) // r.changed implicit: records are persisted, unlike the PersistRecord failure path above
94+
removed = true
95+
orphaned = true
96+
}
97+
}
98+
if !removed {
99+
break
62100
}
63101
}
64102

65-
return nil
103+
return errs.ErrorOrNil()
66104
}
67105

68106
func (r *Records) SortBy() string {

internal/pkg/state/manifest/records_test.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,3 +45,81 @@ func TestManifestRecordGetParentNil(t *testing.T) {
4545
assert.Nil(t, parent)
4646
require.NoError(t, err)
4747
}
48+
49+
// TestSetRecords_OrphanedScheduler_SiblingGetsPathResolved is a regression test
50+
// for the panic in AddRelatedPath that occurred when a manifest contained both an
51+
// orphaned scheduler and sibling configs that sorted after it.
52+
//
53+
// The pre-fix code did an early return from SetRecords on the first PersistRecord
54+
// failure. Any record that had not yet been processed was left with
55+
// parentPathSet=false. When a subsequent pull tried to write those records' files,
56+
// Path() returned only the bare relative path (without the branch prefix), so the
57+
// filesystem.IsFrom check in AddRelatedPath panicked.
58+
//
59+
// The fix continues processing all records and deletes only the failing ones, so
60+
// every surviving record has its parent path resolved before SetRecords returns.
61+
func TestSetRecords_OrphanedScheduler_SiblingGetsPathResolved(t *testing.T) {
62+
t.Parallel()
63+
r := NewRecords(model.SortByID)
64+
65+
branchManifest := &model.BranchManifest{
66+
BranchKey: model.BranchKey{ID: 1},
67+
// Simulate JSON-deserialized state: only RelativePath is set,
68+
// parentPath and parentPathSet are zero values.
69+
Paths: model.Paths{AbsPath: model.AbsPath{RelativePath: "main"}},
70+
}
71+
// Scheduler whose target orchestrator is absent from the manifest.
72+
// Placed before the extractor so that the pre-fix early-return would leave
73+
// the extractor unprocessed (parentPathSet=false).
74+
schedulerManifest := &model.ConfigManifest{
75+
ConfigKey: model.ConfigKey{
76+
BranchID: 1,
77+
ComponentID: "keboola.scheduler",
78+
ID: "456",
79+
},
80+
Paths: model.Paths{AbsPath: model.AbsPath{RelativePath: "schedules/scheduler"}},
81+
Relations: model.Relations{
82+
&model.SchedulerForRelation{
83+
ComponentID: "keboola.orchestrator",
84+
ConfigID: "999", // deliberately absent from the manifest
85+
},
86+
},
87+
}
88+
// Sibling extractor with no special relations — its parent is the branch.
89+
// After SetRecords it must have IsParentPathSet()==true and the full path
90+
// "main/extractor/ex-generic-v2/empty" (branch prefix included).
91+
// With the pre-fix code this record would have parentPathSet=false, causing
92+
// a panic in AddRelatedPath when the pull wrote meta.json under the config.
93+
extractorManifest := &model.ConfigManifest{
94+
ConfigKey: model.ConfigKey{
95+
BranchID: 1,
96+
ComponentID: "ex-generic-v2",
97+
ID: "789",
98+
},
99+
Paths: model.Paths{AbsPath: model.AbsPath{RelativePath: "extractor/ex-generic-v2/empty"}},
100+
}
101+
102+
err := r.SetRecords([]model.ObjectManifest{branchManifest, schedulerManifest, extractorManifest})
103+
104+
// The orphaned scheduler is reported but does not hard-fail the whole load.
105+
require.Error(t, err)
106+
assert.Contains(t, err.Error(), `manifest record for config "branch:1/component:keboola.orchestrator/config:999" not found`)
107+
108+
// Orphaned scheduler must be deleted.
109+
_, found := r.GetRecord(schedulerManifest.Key())
110+
assert.False(t, found, "orphaned scheduler must be removed from records")
111+
112+
// Branch must survive and be resolved.
113+
branchRecord, found := r.GetRecord(branchManifest.Key())
114+
require.True(t, found, "branch record must remain")
115+
assert.True(t, branchRecord.IsParentPathSet())
116+
117+
// Sibling extractor must survive with its parent path correctly resolved so
118+
// that Path() returns "main/extractor/ex-generic-v2/empty" and not the bare
119+
// relative path "extractor/ex-generic-v2/empty" (which would panic later).
120+
extRecord, found := r.GetRecord(extractorManifest.Key())
121+
require.True(t, found, "sibling extractor must not be removed")
122+
assert.True(t, extRecord.IsParentPathSet(),
123+
"sibling extractor must have parent path resolved even after the orphaned scheduler fails")
124+
assert.Equal(t, "main/extractor/ex-generic-v2/empty", extRecord.Path())
125+
}
Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,20 @@
11
{
22
"parameters": {
3-
"action": ""
3+
"db": {
4+
"#privateKey": "",
5+
"account": "",
6+
"database": "",
7+
"driver": "",
8+
"host": "",
9+
"loginType": "",
10+
"port": 443,
11+
"region": "",
12+
"role": "",
13+
"schema": "",
14+
"ssoLoginAvailable": false,
15+
"user": "",
16+
"warehouse": "",
17+
"workspaceId": 0
18+
}
419
}
520
}

test/cli/create/config/out/main/extractor/keboola.ex-db-mysql/new-config/config.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
"networkCompression": false,
1111
"port": 0,
1212
"ssh": {
13-
"#privateKey": "",
1413
"enabled": false,
1514
"keys": {
1615
"#private": "",
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
diff --storage-api-token %%TEST_KBC_STORAGE_API_TOKEN%%
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
0

0 commit comments

Comments
 (0)