Skip to content

Commit 5672c5f

Browse files
authored
Merge pull request #2565 from keboola/hosan/PAT-1630
Refactor AppInfo for context support and data retrieval simplification
2 parents 7c295f4 + ad5dfa5 commit 5672c5f

6 files changed

Lines changed: 60 additions & 57 deletions

File tree

internal/pkg/service/appsproxy/dataapps/k8sapp/watcher.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ func NewStateWatcher(d dependencies, client dynamic.Interface, namespace string)
120120

121121
// GetState returns the cached AppInfo for the app. Returns (AppInfo{}, false) if not yet cached.
122122
// If the E2B access token is missing but a secret name is known, it attempts to load the token lazily.
123-
func (w *StateWatcher) GetState(appID api.AppID) (AppInfo, bool) {
123+
func (w *StateWatcher) GetState(ctx context.Context, appID api.AppID) (AppInfo, bool) {
124124
v, ok := w.apps.Load(appID)
125125
if !ok {
126126
return AppInfo{}, false
@@ -130,15 +130,16 @@ func (w *StateWatcher) GetState(appID api.AppID) (AppInfo, bool) {
130130
// Lazy-load E2B token: the Secret may not have existed when the App CRD event was processed.
131131
// a singleflight coalesces concurrent requests for the same secret into a single K8s API call.
132132
if e.e2bAccessToken == "" && e.e2bSecretName != "" {
133+
fetchCtx := context.WithoutCancel(ctx)
133134
token, err, _ := w.tokenLoadGroup.Do(e.e2bSecretName, func() (any, error) {
134-
return w.loadSecretToken(context.Background(), e.e2bSecretName)
135+
return w.loadSecretToken(fetchCtx, e.e2bSecretName)
135136
})
136137
if err != nil {
137-
w.logger.Warnf(context.Background(), "App %s: failed to lazy-load E2B access token from secret %q: %s", appID, e.e2bSecretName, err)
138+
w.logger.Warnf(ctx, "App %s: failed to lazy-load E2B access token from secret %q: %s", appID, e.e2bSecretName, err)
138139
} else if t, ok := token.(string); t != "" && ok {
139140
e.e2bAccessToken = t
140141
w.apps.Store(appID, e)
141-
w.logger.Infof(context.Background(), "App %s: lazy-loaded E2B access token from secret %q", appID, e.e2bSecretName)
142+
w.logger.Infof(ctx, "App %s: lazy-loaded E2B access token from secret %q", appID, e.e2bSecretName)
142143
}
143144
}
144145

internal/pkg/service/appsproxy/dataapps/k8sapp/watcher_test.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ func TestStateWatcher_GetState_UnknownWhenEmpty(t *testing.T) {
8686
fakeClient := newFakeClient()
8787
watcher := k8sapp.NewStateWatcher(newTestDeps(t), fakeClient, testNamespace)
8888

89-
info, ok := watcher.GetState(api.AppID("app-123"))
89+
info, ok := watcher.GetState(t.Context(), api.AppID("app-123"))
9090
assert.False(t, ok)
9191
assert.Empty(t, info.ActualState)
9292
}
@@ -107,7 +107,7 @@ func TestStateWatcher_GetState_AfterCacheSync(t *testing.T) {
107107
watcher := k8sapp.NewStateWatcher(d, fakeClient, testNamespace)
108108

109109
assert.Eventually(t, func() bool {
110-
info, ok := watcher.GetState(api.AppID("app-123"))
110+
info, ok := watcher.GetState(t.Context(), api.AppID("app-123"))
111111
return ok && info.ActualState == k8sapp.AppActualStateStopped
112112
}, 5*time.Second, 50*time.Millisecond)
113113
}
@@ -128,7 +128,7 @@ func TestStateWatcher_WakeupApp(t *testing.T) {
128128

129129
// Wait for the informer to cache the object.
130130
require.Eventually(t, func() bool {
131-
_, ok := watcher.GetState(api.AppID("app-123"))
131+
_, ok := watcher.GetState(t.Context(), api.AppID("app-123"))
132132
return ok
133133
}, 5*time.Second, 50*time.Millisecond)
134134

@@ -180,7 +180,7 @@ func TestStateWatcher_GetState_UpstreamTarget(t *testing.T) {
180180
var info k8sapp.AppInfo
181181
assert.Eventually(t, func() bool {
182182
var ok bool
183-
info, ok = watcher.GetState(api.AppID("app-123"))
183+
info, ok = watcher.GetState(t.Context(), api.AppID("app-123"))
184184
return ok && info.UpstreamTarget != nil
185185
}, 5*time.Second, 50*time.Millisecond)
186186

@@ -205,11 +205,11 @@ func TestStateWatcher_GetState_UpstreamTarget_AbsentWhenMissing(t *testing.T) {
205205
watcher := k8sapp.NewStateWatcher(d, fakeClient, testNamespace)
206206

207207
assert.Eventually(t, func() bool {
208-
_, ok := watcher.GetState(api.AppID("app-123"))
208+
_, ok := watcher.GetState(t.Context(), api.AppID("app-123"))
209209
return ok
210210
}, 5*time.Second, 50*time.Millisecond)
211211

212-
info, ok := watcher.GetState(api.AppID("app-123"))
212+
info, ok := watcher.GetState(t.Context(), api.AppID("app-123"))
213213
require.True(t, ok)
214214
assert.Nil(t, info.UpstreamTarget)
215215
}
@@ -271,7 +271,7 @@ func TestStateWatcher_GetState_E2BAccessToken(t *testing.T) {
271271
var info k8sapp.AppInfo
272272
assert.Eventually(t, func() bool {
273273
var ok bool
274-
info, ok = watcher.GetState(api.AppID("app-e2b"))
274+
info, ok = watcher.GetState(t.Context(), api.AppID("app-e2b"))
275275
return ok && info.E2BAccessToken != ""
276276
}, 5*time.Second, 50*time.Millisecond)
277277

@@ -294,11 +294,11 @@ func TestStateWatcher_GetState_E2BAccessToken_MissingSecret(t *testing.T) {
294294
watcher := k8sapp.NewStateWatcher(d, fakeClient, testNamespace)
295295

296296
assert.Eventually(t, func() bool {
297-
_, ok := watcher.GetState(api.AppID("app-e2b"))
297+
_, ok := watcher.GetState(t.Context(), api.AppID("app-e2b"))
298298
return ok
299299
}, 5*time.Second, 50*time.Millisecond)
300300

301-
info, ok := watcher.GetState(api.AppID("app-e2b"))
301+
info, ok := watcher.GetState(t.Context(), api.AppID("app-e2b"))
302302
require.True(t, ok)
303303
assert.Empty(t, info.E2BAccessToken)
304304
}
@@ -319,11 +319,11 @@ func TestStateWatcher_GetState_NonE2BApp_NoToken(t *testing.T) {
319319
watcher := k8sapp.NewStateWatcher(d, fakeClient, testNamespace)
320320

321321
assert.Eventually(t, func() bool {
322-
_, ok := watcher.GetState(api.AppID("app-regular"))
322+
_, ok := watcher.GetState(t.Context(), api.AppID("app-regular"))
323323
return ok
324324
}, 5*time.Second, 50*time.Millisecond)
325325

326-
info, ok := watcher.GetState(api.AppID("app-regular"))
326+
info, ok := watcher.GetState(t.Context(), api.AppID("app-regular"))
327327
require.True(t, ok)
328328
assert.Empty(t, info.E2BAccessToken)
329329
}

internal/pkg/service/appsproxy/dataapps/wakeup/wakeup_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ func TestManager_Wakeup(t *testing.T) {
7474

7575
// Wait for watcher cache to sync.
7676
require.Eventually(t, func() bool {
77-
_, ok := watcher.GetState(appID)
77+
_, ok := watcher.GetState(ctx, appID)
7878
return ok
7979
}, 5*time.Second, 50*time.Millisecond)
8080

@@ -122,7 +122,7 @@ func TestManager_Wakeup_Race(t *testing.T) {
122122

123123
// Wait for watcher cache to sync.
124124
require.Eventually(t, func() bool {
125-
_, ok := watcher.GetState(appID)
125+
_, ok := watcher.GetState(ctx, appID)
126126
return ok
127127
}, 5*time.Second, 50*time.Millisecond)
128128

internal/pkg/service/appsproxy/proxy/apphandler/manager.go

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,15 @@ package apphandler
22

33
import (
44
"context"
5+
"crypto/sha256"
6+
"encoding/hex"
57
"net/http"
68
"sync"
79

810
"github.com/keboola/keboola-as-code/internal/pkg/service/appsproxy/config"
911
"github.com/keboola/keboola-as-code/internal/pkg/service/appsproxy/dataapps/api"
1012
"github.com/keboola/keboola-as-code/internal/pkg/service/appsproxy/dataapps/appconfig"
13+
"github.com/keboola/keboola-as-code/internal/pkg/service/appsproxy/dataapps/k8sapp"
1114
"github.com/keboola/keboola-as-code/internal/pkg/service/appsproxy/proxy/apphandler/authproxy"
1215
"github.com/keboola/keboola-as-code/internal/pkg/service/appsproxy/proxy/apphandler/upstream"
1316
"github.com/keboola/keboola-as-code/internal/pkg/service/appsproxy/proxy/pagewriter"
@@ -29,11 +32,10 @@ type Manager struct {
2932
}
3033

3134
type appHandlerWrapper struct {
32-
lock *sync.Mutex
33-
handler http.Handler
34-
cancel context.CancelCauseFunc
35-
upstreamURL string // appsProxy.upstreamUrl in effect when this handler was created
36-
e2bAccessToken string // E2B access token in effect when this handler was created
35+
lock *sync.Mutex
36+
handler http.Handler
37+
cancel context.CancelCauseFunc
38+
handlerHash string // hash of UpstreamTarget + E2BAccessToken; handler is recreated when it changes
3739
}
3840

3941
type dependencies interface {
@@ -72,16 +74,15 @@ func (m *Manager) HandlerFor(ctx context.Context, result appconfig.AppConfigResu
7274
return m.newErrorHandler(ctx, api.AppConfig{ID: result.AppID}, result.Err)
7375
}
7476

75-
// Create a new handler, if needed (config changed, upstreamUrl changed, or E2B token changed)
76-
currentURL := m.upstreamManager.UpstreamURL(result.AppID)
77-
currentToken := m.upstreamManager.E2BAccessToken(result.AppID)
78-
if wrapper.handler == nil || result.Modified || wrapper.upstreamURL != currentURL || wrapper.e2bAccessToken != currentToken {
77+
// Create a new handler when config changed, upstream URL changed, or E2B token changed.
78+
// Only a hash is stored so raw secrets don't linger in the wrapper.
79+
currentHash := handlerHash(m.upstreamManager.AppInfo(ctx, result.AppID))
80+
if wrapper.handler == nil || result.Modified || wrapper.handlerHash != currentHash {
7981
if wrapper.cancel != nil {
8082
wrapper.cancel(errors.New("configuration changed"))
8183
}
8284
wrapper.handler, wrapper.cancel = m.newHandler(ctx, result.AppConfig)
83-
wrapper.upstreamURL = currentURL
84-
wrapper.e2bAccessToken = currentToken
85+
wrapper.handlerHash = currentHash
8586
}
8687

8788
return wrapper.handler
@@ -110,6 +111,21 @@ func (m *Manager) newHandler(ctx context.Context, app api.AppConfig) (http.Handl
110111
return handler, appUpstream.Cancel
111112
}
112113

114+
// handlerHash computes a hash from the fields that require handler recreation when they change.
115+
// Only a hash is stored so raw secrets don't linger in the wrapper.
116+
func handlerHash(info k8sapp.AppInfo, ok bool) string {
117+
if !ok {
118+
return ""
119+
}
120+
h := sha256.New()
121+
if info.UpstreamTarget != nil {
122+
h.Write([]byte(info.UpstreamTarget.String()))
123+
}
124+
h.Write([]byte{0})
125+
h.Write([]byte(info.E2BAccessToken))
126+
return hex.EncodeToString(h.Sum(nil))
127+
}
128+
113129
func (m *Manager) newErrorHandler(ctx context.Context, app api.AppConfig, err error) http.Handler {
114130
err = svcErrors.WrapWithExceptionID(middleware.RequestIDFromContext(ctx), err)
115131
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {

internal/pkg/service/appsproxy/proxy/apphandler/upstream/upstream.go

Lines changed: 8 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -98,24 +98,10 @@ func (m *Manager) Shutdown(ctx context.Context) {
9898
m.wg.Wait()
9999
}
100100

101-
// UpstreamURL returns the appsProxy.upstreamUrl string for appID from the K8s cache.
102-
// Returns "" when the app is not cached or the field is absent/invalid.
103-
func (m *Manager) UpstreamURL(appID api.AppID) string {
104-
info, ok := m.stateWatcher.GetState(appID)
105-
if !ok || info.UpstreamTarget == nil {
106-
return ""
107-
}
108-
return info.UpstreamTarget.String()
109-
}
110-
111-
// E2BAccessToken returns the E2B access token for appID from the K8s cache.
112-
// Returns "" when the app is not cached or is not an E2B sandbox.
113-
func (m *Manager) E2BAccessToken(appID api.AppID) string {
114-
info, ok := m.stateWatcher.GetState(appID)
115-
if !ok {
116-
return ""
117-
}
118-
return info.E2BAccessToken
101+
// AppInfo returns the cached AppInfo for appID from the K8s cache in a single call.
102+
// Returns (AppInfo{}, false) when the app is not cached.
103+
func (m *Manager) AppInfo(ctx context.Context, appID api.AppID) (k8sapp.AppInfo, bool) {
104+
return m.stateWatcher.GetState(ctx, appID)
119105
}
120106

121107
func (m *Manager) NewUpstream(ctx context.Context, app api.AppConfig) (upstream *AppUpstream, err error) {
@@ -124,7 +110,7 @@ func (m *Manager) NewUpstream(ctx context.Context, app api.AppConfig) (upstream
124110

125111
// Resolve target URL at creation time; immutable after this point.
126112
var target *url.URL
127-
if info, ok := m.stateWatcher.GetState(app.ID); ok {
113+
if info, ok := m.stateWatcher.GetState(ctx, app.ID); ok {
128114
target = info.UpstreamTarget // pre-parsed by watcher on CRD event; may be nil
129115
}
130116

@@ -160,7 +146,7 @@ func (u *AppUpstream) ServeHTTPOrError(rw http.ResponseWriter, req *http.Request
160146

161147
// K8s state pre-check: if we know the app is not running, handle it synchronously
162148
// without attempting DNS/upstream. Falls through if state is unknown or Running.
163-
if appInfo, ok := u.manager.stateWatcher.GetState(u.app.ID); ok && appInfo.ActualState != k8sapp.AppActualStateRunning {
149+
if appInfo, ok := u.manager.stateWatcher.GetState(ctx, u.app.ID); ok && appInfo.ActualState != k8sapp.AppActualStateRunning {
164150
switch {
165151
case appInfo.ActualState == k8sapp.AppActualStateStarting:
166152
u.manager.pageWriter.WriteSpinnerPage(rw, req, u.app)
@@ -219,8 +205,8 @@ func (u *AppUpstream) newReverseProxy() *httputil.ReverseProxy {
219205
// Always fetch the latest token from the state watcher to handle
220206
// secret recreation (updates propagate asynchronously).
221207
r.Out.Header.Del("e2b-traffic-access-token")
222-
if token := u.manager.E2BAccessToken(u.app.ID); token != "" {
223-
r.Out.Header.Set("e2b-traffic-access-token", token)
208+
if info, ok := u.manager.AppInfo(r.Out.Context(), u.app.ID); ok && info.E2BAccessToken != "" {
209+
r.Out.Header.Set("e2b-traffic-access-token", info.E2BAccessToken)
224210
}
225211
},
226212
Transport: u.manager.transport,

internal/pkg/service/appsproxy/proxy/proxy_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1862,7 +1862,7 @@ func TestAppProxyRouter(t *testing.T) {
18621862
)
18631863
require.NoError(t, err)
18641864
require.Eventually(t, func() bool {
1865-
info, ok := watcher.GetState(api.AppID("123"))
1865+
info, ok := watcher.GetState(t.Context(), api.AppID("123"))
18661866
return ok && info.ActualState == k8sapp.AppActualStateStopped
18671867
}, 5*time.Second, 50*time.Millisecond)
18681868
},
@@ -1881,7 +1881,7 @@ func TestAppProxyRouter(t *testing.T) {
18811881
)
18821882
require.NoError(t, err)
18831883
require.Eventually(t, func() bool {
1884-
info, ok := watcher.GetState(api.AppID("123"))
1884+
info, ok := watcher.GetState(t.Context(), api.AppID("123"))
18851885
return ok && info.ActualState == k8sapp.AppActualStateRunning
18861886
}, 5*time.Second, 50*time.Millisecond)
18871887

@@ -1908,7 +1908,7 @@ func TestAppProxyRouter(t *testing.T) {
19081908
)
19091909
require.NoError(t, err)
19101910
require.Eventually(t, func() bool {
1911-
info, ok := watcher.GetState(api.AppID("123"))
1911+
info, ok := watcher.GetState(t.Context(), api.AppID("123"))
19121912
return ok && info.ActualState == k8sapp.AppActualStateStopped
19131913
}, 5*time.Second, 50*time.Millisecond)
19141914
},
@@ -1952,7 +1952,7 @@ func TestAppProxyRouter(t *testing.T) {
19521952
)
19531953
require.NoError(t, err)
19541954
require.Eventually(t, func() bool {
1955-
info, ok := watcher.GetState(api.AppID("oidc"))
1955+
info, ok := watcher.GetState(t.Context(), api.AppID("oidc"))
19561956
return ok && info.ActualState == k8sapp.AppActualStateStopped
19571957
}, 5*time.Second, 50*time.Millisecond)
19581958
},
@@ -2017,7 +2017,7 @@ func TestAppProxyRouter(t *testing.T) {
20172017
)
20182018
require.NoError(t, err)
20192019
require.Eventually(t, func() bool {
2020-
info, ok := watcher.GetState(api.AppID("oidc"))
2020+
info, ok := watcher.GetState(t.Context(), api.AppID("oidc"))
20212021
return ok && info.ActualState == k8sapp.AppActualStateRunning
20222022
}, 5*time.Second, 50*time.Millisecond)
20232023

@@ -2041,7 +2041,7 @@ func TestAppProxyRouter(t *testing.T) {
20412041
)
20422042
require.NoError(t, err)
20432043
require.Eventually(t, func() bool {
2044-
info, ok := watcher.GetState(api.AppID("oidc"))
2044+
info, ok := watcher.GetState(t.Context(), api.AppID("oidc"))
20452045
return ok && info.ActualState == k8sapp.AppActualStateStopped
20462046
}, 5*time.Second, 50*time.Millisecond)
20472047
},
@@ -2380,7 +2380,7 @@ func TestAppProxyRouter(t *testing.T) {
23802380
require.NoError(t, err)
23812381

23822382
require.Eventually(t, func() bool {
2383-
info, ok := watcher.GetState(api.AppID("123"))
2383+
info, ok := watcher.GetState(t.Context(), api.AppID("123"))
23842384
return ok && !info.AutoRestartEnabled
23852385
}, 5*time.Second, 50*time.Millisecond)
23862386
},

0 commit comments

Comments
 (0)