Skip to content

Commit 73ff8f9

Browse files
authored
Merge pull request #2554 from keboola/hosan/PAT-1542
Add support for E2B access tokens in Kubernetes apps
2 parents d8d4b6a + 367ce2c commit 73ff8f9

6 files changed

Lines changed: 253 additions & 20 deletions

File tree

internal/pkg/service/appsproxy/dataapps/k8sapp/appinfo.go

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,20 @@ const (
1111
Group = "apps.keboola.com"
1212
Version = "v2"
1313
Resource = "apps"
14+
15+
BackendTypeE2BSandbox = "e2bSandbox"
1416
)
1517

1618
// AppGVR returns the GroupVersionResource for the App CRD.
1719
func AppGVR() schema.GroupVersionResource {
1820
return schema.GroupVersionResource{Group: Group, Version: Version, Resource: Resource}
1921
}
2022

23+
// SecretGVR returns the GroupVersionResource for core/v1 Secrets.
24+
func SecretGVR() schema.GroupVersionResource {
25+
return schema.GroupVersionResource{Group: "", Version: "v1", Resource: "secrets"}
26+
}
27+
2128
// AppActualState is the observed state of the app, read from .status.currentState.
2229
type AppActualState string
2330

@@ -35,8 +42,17 @@ type appObject struct {
3542
}
3643

3744
type appSpec struct {
38-
AppID string `json:"appId"`
39-
AutoRestartEnabled *bool `json:"autoRestartEnabled,omitempty"`
45+
AppID string `json:"appId"`
46+
AutoRestartEnabled *bool `json:"autoRestartEnabled,omitempty"`
47+
Runtime appRuntime `json:"runtime"`
48+
}
49+
50+
type appRuntime struct {
51+
Backend appBackend `json:"backend"`
52+
}
53+
54+
type appBackend struct {
55+
Type string `json:"type,omitempty"`
4056
}
4157

4258
// AppInfo is the cached state for an app, read from the K8s watcher.
@@ -46,11 +62,20 @@ type AppInfo struct {
4662
// UpstreamTarget is the pre-parsed URL from .status.appsProxy.upstreamUrl.
4763
// Nil when the field is absent or unparseable.
4864
UpstreamTarget *url.URL
65+
// E2BAccessToken is the access token loaded from the K8s Secret
66+
// referenced by .status.e2bSandbox.accessTokenSecretName.
67+
// Empty when the app is not an E2B sandbox or the secret is unavailable.
68+
E2BAccessToken string
4969
}
5070

5171
type appStatus struct {
5272
CurrentState AppActualState `json:"currentState"`
5373
AppsProxy appsProxy `json:"appsProxy"`
74+
E2BSandbox e2bSandbox `json:"e2bSandbox"`
75+
}
76+
77+
type e2bSandbox struct {
78+
AccessTokenSecretName string `json:"accessTokenSecretName,omitempty"`
5479
}
5580

5681
type appsProxy struct {

internal/pkg/service/appsproxy/dataapps/k8sapp/watcher.go

Lines changed: 74 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@ package k8sapp
22

33
import (
44
"context"
5+
"encoding/base64"
56
"encoding/json"
67
"net/url"
78
"sync"
89

10+
"golang.org/x/sync/singleflight"
911
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1012
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
1113
"k8s.io/apimachinery/pkg/runtime"
@@ -19,6 +21,7 @@ import (
1921
"github.com/keboola/keboola-as-code/internal/pkg/log"
2022
"github.com/keboola/keboola-as-code/internal/pkg/service/appsproxy/dataapps/api"
2123
"github.com/keboola/keboola-as-code/internal/pkg/service/common/servicectx"
24+
"github.com/keboola/keboola-as-code/internal/pkg/utils/errors"
2225
)
2326

2427
// entry stores the K8s object name and last observed state for an app.
@@ -27,16 +30,18 @@ type entry struct {
2730
state AppActualState
2831
autoRestartEnabled bool
2932
upstreamTarget *url.URL // pre-parsed; nil when appsProxy.upstreamUrl absent/invalid
33+
e2bAccessToken string // loaded from K8s Secret; empty for non-E2B apps
34+
e2bSecretName string // Secret name for lazy token loading; empty for non-E2B apps
3035
}
3136

3237
// StateWatcher watches App CRDs in Kubernetes and provides a local cache of app states.
3338
type StateWatcher struct {
34-
client dynamic.Interface
35-
namespace string
36-
logger log.Logger
37-
hasSynced cache.InformerSynced
38-
// apps: AppID → entry
39-
apps sync.Map
39+
client dynamic.Interface
40+
namespace string
41+
logger log.Logger
42+
hasSynced cache.InformerSynced
43+
apps sync.Map // AppID → entry
44+
tokenLoadGroup singleflight.Group // coalesces concurrent lazy-load K8s API calls per secret
4045
}
4146

4247
type dependencies interface {
@@ -69,10 +74,10 @@ func NewStateWatcher(d dependencies, client dynamic.Interface, namespace string)
6974
})
7075

7176
lw := &cache.ListWatch{
72-
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
77+
ListWithContextFunc: func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) {
7378
return client.Resource(AppGVR()).Namespace(namespace).List(ctx, opts)
7479
},
75-
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
80+
WatchFuncWithContext: func(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
7681
return client.Resource(AppGVR()).Namespace(namespace).Watch(ctx, opts)
7782
},
7883
}
@@ -114,16 +119,34 @@ func NewStateWatcher(d dependencies, client dynamic.Interface, namespace string)
114119
}
115120

116121
// GetState returns the cached AppInfo for the app. Returns (AppInfo{}, false) if not yet cached.
122+
// If the E2B access token is missing but a secret name is known, it attempts to load the token lazily.
117123
func (w *StateWatcher) GetState(appID api.AppID) (AppInfo, bool) {
118124
v, ok := w.apps.Load(appID)
119125
if !ok {
120126
return AppInfo{}, false
121127
}
122128
e := v.(entry)
129+
130+
// Lazy-load E2B token: the Secret may not have existed when the App CRD event was processed.
131+
// a singleflight coalesces concurrent requests for the same secret into a single K8s API call.
132+
if e.e2bAccessToken == "" && e.e2bSecretName != "" {
133+
token, err, _ := w.tokenLoadGroup.Do(e.e2bSecretName, func() (any, error) {
134+
return w.loadSecretToken(context.Background(), e.e2bSecretName)
135+
})
136+
if err != nil {
137+
w.logger.Warnf(context.Background(), "App %s: failed to lazy-load E2B access token from secret %q: %s", appID, e.e2bSecretName, err)
138+
} else if t, ok := token.(string); t != "" && ok {
139+
e.e2bAccessToken = t
140+
w.apps.Store(appID, e)
141+
w.logger.Infof(context.Background(), "App %s: lazy-loaded E2B access token from secret %q", appID, e.e2bSecretName)
142+
}
143+
}
144+
123145
return AppInfo{
124146
ActualState: e.state,
125147
AutoRestartEnabled: e.autoRestartEnabled,
126148
UpstreamTarget: e.upstreamTarget,
149+
E2BAccessToken: e.e2bAccessToken,
127150
}, true
128151
}
129152

@@ -194,12 +217,26 @@ func (w *StateWatcher) handleUpsert(ctx context.Context, obj any) {
194217
}
195218
}
196219

220+
var e2bAccessToken string
221+
var e2bSecretName string
222+
if appObj.Spec.Runtime.Backend.Type == BackendTypeE2BSandbox {
223+
e2bSecretName = appObj.Status.E2BSandbox.AccessTokenSecretName
224+
if e2bSecretName != "" {
225+
token, err := w.loadSecretToken(ctx, e2bSecretName)
226+
if err == nil {
227+
e2bAccessToken = token
228+
}
229+
}
230+
}
231+
197232
appID := api.AppID(appObj.Spec.AppID)
198233
w.apps.Store(appID, entry{
199234
k8sName: k8sName,
200235
state: appObj.Status.CurrentState,
201236
autoRestartEnabled: autoRestartEnabled,
202237
upstreamTarget: upstreamTarget,
238+
e2bAccessToken: e2bAccessToken,
239+
e2bSecretName: e2bSecretName,
203240
})
204241
w.logger.Debugf(ctx, "App CRD %q (appID=%s) state updated: actualState=%q autoRestartEnabled=%v upstreamTarget=%v", k8sName, appID, appObj.Status.CurrentState, autoRestartEnabled, upstreamTarget != nil)
205242
}
@@ -229,3 +266,32 @@ func (w *StateWatcher) handleDelete(ctx context.Context, obj any) {
229266
return true
230267
})
231268
}
269+
270+
// loadSecretToken fetches a K8s Secret by name and returns the value of the "token" key.
271+
// The dynamic client returns Secret data values as base64-encoded strings.
272+
func (w *StateWatcher) loadSecretToken(ctx context.Context, secretName string) (string, error) {
273+
obj, err := w.client.Resource(SecretGVR()).Namespace(w.namespace).Get(ctx, secretName, metav1.GetOptions{})
274+
if err != nil {
275+
return "", err
276+
}
277+
278+
data, found, err := unstructured.NestedMap(obj.Object, "data")
279+
if err != nil {
280+
return "", errors.Errorf("secret %q: failed to read data field: %s", secretName, err)
281+
}
282+
if !found {
283+
return "", errors.Errorf("secret %q has no data field", secretName)
284+
}
285+
286+
token, ok := data["token"].(string)
287+
if !ok || token == "" {
288+
return "", errors.Errorf("secret %q has no \"token\" key in data", secretName)
289+
}
290+
291+
tokenBytes, err := base64.StdEncoding.DecodeString(token)
292+
if err != nil {
293+
return "", errors.Errorf("secret %q: failed to base64-decode token: %s", secretName, err)
294+
}
295+
296+
return string(tokenBytes), nil
297+
}

internal/pkg/service/appsproxy/dataapps/k8sapp/watcher_test.go

Lines changed: 118 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package k8sapp_test
22

33
import (
44
"context"
5+
"encoding/base64"
56
"testing"
67
"time"
78

@@ -43,11 +44,12 @@ func newTestDeps(t *testing.T) *watcherDeps {
4344
func (d *watcherDeps) Logger() log.Logger { return d.logger }
4445
func (d *watcherDeps) Process() *servicectx.Process { return d.proc }
4546

46-
// newFakeClient creates a fake dynamic client with the App list kind registered.
47+
// newFakeClient creates a fake dynamic client with the App and Secret list kinds registered.
4748
func newFakeClient() *k8sfake.FakeDynamicClient {
4849
scheme := runtime.NewScheme()
4950
return k8sfake.NewSimpleDynamicClientWithCustomListKinds(scheme, map[schema.GroupVersionResource]string{
50-
k8sapp.AppGVR(): "AppList",
51+
k8sapp.AppGVR(): "AppList",
52+
k8sapp.SecretGVR(): "SecretList",
5153
})
5254
}
5355

@@ -211,3 +213,117 @@ func TestStateWatcher_GetState_UpstreamTarget_AbsentWhenMissing(t *testing.T) {
211213
require.True(t, ok)
212214
assert.Nil(t, info.UpstreamTarget)
213215
}
216+
217+
// newE2BAppObject creates an App CRD with spec.runtime.backend.type = "e2bSandbox"
218+
// and status.e2bSandbox.accessTokenSecretName set.
219+
func newE2BAppObject(k8sName, appID string, state k8sapp.AppActualState, secretName string) *unstructured.Unstructured {
220+
obj := newAppObject(k8sName, appID, state)
221+
obj.Object["spec"].(map[string]any)["runtime"] = map[string]any{
222+
"backend": map[string]any{
223+
"type": k8sapp.BackendTypeE2BSandbox,
224+
},
225+
}
226+
obj.Object["status"].(map[string]any)["e2bSandbox"] = map[string]any{
227+
"accessTokenSecretName": secretName,
228+
}
229+
return obj
230+
}
231+
232+
// newSecretObject creates an unstructured K8s Secret with a base64-encoded "token" key.
233+
func newSecretObject(name, namespace, tokenValue string) *unstructured.Unstructured {
234+
return &unstructured.Unstructured{
235+
Object: map[string]any{
236+
"apiVersion": "v1",
237+
"kind": "Secret",
238+
"metadata": map[string]any{
239+
"name": name,
240+
"namespace": namespace,
241+
},
242+
"data": map[string]any{
243+
"token": base64.StdEncoding.EncodeToString([]byte(tokenValue)),
244+
},
245+
},
246+
}
247+
}
248+
249+
func TestStateWatcher_GetState_E2BAccessToken(t *testing.T) {
250+
t.Parallel()
251+
252+
fakeClient := newFakeClient()
253+
d := newTestDeps(t)
254+
255+
// Create the secret first.
256+
secret := newSecretObject("my-e2b-secret", testNamespace, "my-e2b-token")
257+
_, err := fakeClient.Resource(k8sapp.SecretGVR()).Namespace(testNamespace).Create(
258+
t.Context(), secret, metav1.CreateOptions{},
259+
)
260+
require.NoError(t, err)
261+
262+
// Create E2B app referencing the secret.
263+
appObj := newE2BAppObject("my-e2b-app-k8s", "app-e2b", k8sapp.AppActualStateRunning, "my-e2b-secret")
264+
_, err = fakeClient.Resource(k8sapp.AppGVR()).Namespace(testNamespace).Create(
265+
t.Context(), appObj, metav1.CreateOptions{},
266+
)
267+
require.NoError(t, err)
268+
269+
watcher := k8sapp.NewStateWatcher(d, fakeClient, testNamespace)
270+
271+
var info k8sapp.AppInfo
272+
assert.Eventually(t, func() bool {
273+
var ok bool
274+
info, ok = watcher.GetState(api.AppID("app-e2b"))
275+
return ok && info.E2BAccessToken != ""
276+
}, 5*time.Second, 50*time.Millisecond)
277+
278+
assert.Equal(t, "my-e2b-token", info.E2BAccessToken)
279+
}
280+
281+
func TestStateWatcher_GetState_E2BAccessToken_MissingSecret(t *testing.T) {
282+
t.Parallel()
283+
284+
fakeClient := newFakeClient()
285+
d := newTestDeps(t)
286+
287+
// Create E2B app referencing a non-existent secret.
288+
appObj := newE2BAppObject("my-e2b-app-k8s", "app-e2b", k8sapp.AppActualStateRunning, "missing-secret")
289+
_, err := fakeClient.Resource(k8sapp.AppGVR()).Namespace(testNamespace).Create(
290+
t.Context(), appObj, metav1.CreateOptions{},
291+
)
292+
require.NoError(t, err)
293+
294+
watcher := k8sapp.NewStateWatcher(d, fakeClient, testNamespace)
295+
296+
assert.Eventually(t, func() bool {
297+
_, ok := watcher.GetState(api.AppID("app-e2b"))
298+
return ok
299+
}, 5*time.Second, 50*time.Millisecond)
300+
301+
info, ok := watcher.GetState(api.AppID("app-e2b"))
302+
require.True(t, ok)
303+
assert.Empty(t, info.E2BAccessToken)
304+
}
305+
306+
func TestStateWatcher_GetState_NonE2BApp_NoToken(t *testing.T) {
307+
t.Parallel()
308+
309+
fakeClient := newFakeClient()
310+
d := newTestDeps(t)
311+
312+
// Create a regular (non-E2B) app.
313+
appObj := newAppObject("my-app-k8s", "app-regular", k8sapp.AppActualStateRunning)
314+
_, err := fakeClient.Resource(k8sapp.AppGVR()).Namespace(testNamespace).Create(
315+
t.Context(), appObj, metav1.CreateOptions{},
316+
)
317+
require.NoError(t, err)
318+
319+
watcher := k8sapp.NewStateWatcher(d, fakeClient, testNamespace)
320+
321+
assert.Eventually(t, func() bool {
322+
_, ok := watcher.GetState(api.AppID("app-regular"))
323+
return ok
324+
}, 5*time.Second, 50*time.Millisecond)
325+
326+
info, ok := watcher.GetState(api.AppID("app-regular"))
327+
require.True(t, ok)
328+
assert.Empty(t, info.E2BAccessToken)
329+
}

internal/pkg/service/appsproxy/dependencies/mocked.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,8 @@ func newMockedServiceScope(tb testing.TB, ctx context.Context, cfg config.Config
8686
// avoiding a race with the watch channel setup.
8787
scheme := runtime.NewScheme()
8888
fakeClient := k8sfake.NewSimpleDynamicClientWithCustomListKinds(scheme, map[schema.GroupVersionResource]string{
89-
k8sapp.AppGVR(): "AppList",
89+
k8sapp.AppGVR(): "AppList",
90+
k8sapp.SecretGVR(): "SecretList",
9091
}, initialK8sObjects...)
9192

9293
// Validate config

0 commit comments

Comments
 (0)