|
| 1 | +package datagateway |
| 2 | + |
| 3 | +import ( |
| 4 | + "context" |
| 5 | + "encoding/json" |
| 6 | + "fmt" |
| 7 | + "math" |
| 8 | + "os" |
| 9 | + "reflect" |
| 10 | + "strconv" |
| 11 | + "strings" |
| 12 | + |
| 13 | + "github.com/keboola/keboola-sdk-go/v2/pkg/keboola" |
| 14 | + |
| 15 | + "github.com/keboola/keboola-as-code/internal/pkg/filesystem" |
| 16 | + "github.com/keboola/keboola-as-code/internal/pkg/filesystem/aferofs" |
| 17 | + "github.com/keboola/keboola-as-code/internal/pkg/log" |
| 18 | + "github.com/keboola/keboola-as-code/internal/pkg/model" |
| 19 | + projectManifest "github.com/keboola/keboola-as-code/internal/pkg/project/manifest" |
| 20 | + "github.com/keboola/keboola-as-code/internal/pkg/state" |
| 21 | + "github.com/keboola/keboola-as-code/internal/pkg/utils/crypto" |
| 22 | + "github.com/keboola/keboola-as-code/internal/pkg/utils/errors" |
| 23 | +) |
| 24 | + |
| 25 | +const dataGatewayComponentID = keboola.ComponentID("keboola.app-data-gateway") |
| 26 | + |
| 27 | +type dependencies interface { |
| 28 | + KeboolaProjectAPI() *keboola.AuthorizedAPI |
| 29 | +} |
| 30 | + |
| 31 | +type dataGatewayMapper struct { |
| 32 | + dependencies |
| 33 | + state *state.State |
| 34 | + logger log.Logger |
| 35 | + projectID keboola.ProjectID // Cached project ID to avoid repeated API calls |
| 36 | +} |
| 37 | + |
| 38 | +func NewMapper(s *state.State, d dependencies) *dataGatewayMapper { |
| 39 | + return &dataGatewayMapper{ |
| 40 | + dependencies: d, |
| 41 | + state: s, |
| 42 | + logger: s.Logger(), |
| 43 | + } |
| 44 | +} |
| 45 | + |
| 46 | +func (m *dataGatewayMapper) isDataGatewayConfigKey(key model.Key) bool { |
| 47 | + configKey, ok := key.(model.ConfigKey) |
| 48 | + if !ok { |
| 49 | + return false |
| 50 | + } |
| 51 | + return configKey.ComponentID == dataGatewayComponentID |
| 52 | +} |
| 53 | + |
| 54 | +// backfillWorkspaceDetails updates the configuration content with workspace details from the API response. |
| 55 | +// It returns true when at least one field has been updated. |
| 56 | +func backfillWorkspaceDetails(config *model.Config, workspace *keboola.StorageWorkspace) bool { |
| 57 | + if config == nil || workspace == nil { |
| 58 | + return false |
| 59 | + } |
| 60 | + |
| 61 | + changed := setWorkspaceID(config, workspace.ID) |
| 62 | + |
| 63 | + details := workspace.StorageWorkspaceDetails |
| 64 | + changed = setStringIfPresent(config, "parameters.db.host", details.Host) || changed |
| 65 | + changed = setStringIfPresent(config, "parameters.db.user", details.User) || changed |
| 66 | + changed = setStringIfPresent(config, "parameters.db.database", details.Database) || changed |
| 67 | + changed = setStringIfPresent(config, "parameters.db.schema", details.Schema) || changed |
| 68 | + changed = setStringIfPresent(config, "parameters.db.warehouse", details.Warehouse) || changed |
| 69 | + changed = setStringIfPresent(config, "parameters.db.role", details.Role) || changed |
| 70 | + changed = setStringIfPresent(config, "parameters.db.account", details.Account) || changed |
| 71 | + changed = setStringIfPresent(config, "parameters.db.region", details.Region) || changed |
| 72 | + |
| 73 | + changed = setNestedIfDifferent(config, "parameters.db.loginType", "snowflake-service-keypair") || changed |
| 74 | + return changed |
| 75 | +} |
| 76 | + |
| 77 | +// ensureWorkspaceForConfig ensures a workspace exists for the given config, creating one if necessary. |
| 78 | +func (m *dataGatewayMapper) ensureWorkspaceForConfig(ctx context.Context, config *model.Config) error { |
| 79 | + api := m.KeboolaProjectAPI() |
| 80 | + |
| 81 | + // Check if config has an ID (required for creating config workspace) |
| 82 | + if config.ID == "" { |
| 83 | + m.logger.Debugf(ctx, `Skipping data-gateway config "%s" without ID - cannot create workspace for local-only configs`, config.Name) |
| 84 | + return nil |
| 85 | + } |
| 86 | + |
| 87 | + // Cache project ID from manifest if not already cached |
| 88 | + // The project ID is available in the manifest, so we can get it from there |
| 89 | + if m.projectID == 0 { |
| 90 | + manifest := m.state.Manifest() |
| 91 | + if projectManifest, ok := manifest.(*projectManifest.Manifest); ok { |
| 92 | + m.projectID = projectManifest.ProjectID() |
| 93 | + } |
| 94 | + } |
| 95 | + |
| 96 | + // List existing workspaces for this config |
| 97 | + workspaces, err := api.ListConfigWorkspacesRequest(config.BranchID, config.ComponentID, config.ID).Send(ctx) |
| 98 | + if err != nil { |
| 99 | + // If configuration doesn't exist yet (not pushed to remote), skip workspace creation. |
| 100 | + // The workspace will be created after the configuration is saved to remote. |
| 101 | + var apiErr *keboola.StorageError |
| 102 | + if errors.As(err, &apiErr) && apiErr.ErrCode == "storage.configuration.notFound" { |
| 103 | + m.logger.Debugf(ctx, `Config "%s" does not exist in remote yet, workspace will be created after config is saved`, config.Name) |
| 104 | + return nil |
| 105 | + } |
| 106 | + return errors.Errorf(`cannot list workspaces for config "%s": %w`, config.Name, err) |
| 107 | + } |
| 108 | + |
| 109 | + // Cache project ID from workspace response if available and not yet cached |
| 110 | + // Note: Workspaces don't directly expose project ID, so we rely on the manifest |
| 111 | + // which is checked earlier in this function. |
| 112 | + |
| 113 | + // If workspace already exists, use it |
| 114 | + if len(*workspaces) > 0 { |
| 115 | + workspace := (*workspaces)[0] |
| 116 | + m.logger.Debugf(ctx, `Using existing workspace %d for config "%s"`, workspace.ID, config.Name) |
| 117 | + backfillWorkspaceDetails(config, workspace) |
| 118 | + return nil |
| 119 | + } |
| 120 | + |
| 121 | + // No workspace exists, create one |
| 122 | + m.logger.Infof(ctx, `Creating workspace for data-gateway config "%s"...`, config.Name) |
| 123 | + |
| 124 | + // Generate keypair |
| 125 | + privateKeyPEM, publicKeyPEM, err := crypto.GenerateRSAKeyPairPEM() |
| 126 | + if err != nil { |
| 127 | + return errors.Errorf(`cannot generate keypair for config "%s": %w`, config.Name, err) |
| 128 | + } |
| 129 | + |
| 130 | + // Store private key to tempDir/<project_id>/<configuration-id> |
| 131 | + // Private key storage is critical for workspace functionality. |
| 132 | + // If project ID is not available, fail immediately to prevent creating an unusable workspace. |
| 133 | + if m.projectID == 0 { |
| 134 | + return errors.Errorf(`project ID not available for config "%s", cannot store private key - workspace would be unusable`, config.Name) |
| 135 | + } |
| 136 | + |
| 137 | + // If storage fails, the workspace will be created but won't be usable. |
| 138 | + // Fail the operation to prevent creating an unusable workspace. |
| 139 | + if err := m.storePrivateKey(ctx, config, privateKeyPEM); err != nil { |
| 140 | + return errors.Errorf(`cannot store private key for config "%s": %w`, config.Name, err) |
| 141 | + } |
| 142 | + |
| 143 | + // Create configuration workspace |
| 144 | + // Data gateway workspaces require useCase to be set to "reader" for read-only access. |
| 145 | + payload := &keboola.StorageConfigWorkspacePayload{ |
| 146 | + StorageWorkspacePayload: keboola.StorageWorkspacePayload{ |
| 147 | + Backend: keboola.StorageWorkspaceBackendSnowflake, |
| 148 | + ReadOnlyStorageAccess: true, |
| 149 | + LoginType: keboola.StorageWorkspaceLoginTypeSnowflakeServiceKeypair, |
| 150 | + PublicKey: &publicKeyPEM, |
| 151 | + }, |
| 152 | + UseCase: keboola.StorageWorkspaceUseCaseReader, |
| 153 | + } |
| 154 | + |
| 155 | + workspace, err := api.CreateConfigWorkspaceRequest(config.BranchID, config.ComponentID, config.ID, payload).Send(ctx) |
| 156 | + if err != nil { |
| 157 | + return errors.Errorf(`cannot create workspace for config "%s": %w`, config.Name, err) |
| 158 | + } |
| 159 | + |
| 160 | + // Project ID should already be cached from manifest at the start of this function |
| 161 | + // If it's still not set, try to get it from the manifest again as a fallback |
| 162 | + if m.projectID == 0 { |
| 163 | + manifest := m.state.Manifest() |
| 164 | + if projectManifest, ok := manifest.(*projectManifest.Manifest); ok { |
| 165 | + m.projectID = projectManifest.ProjectID() |
| 166 | + } |
| 167 | + } |
| 168 | + |
| 169 | + m.logger.Infof(ctx, `Created workspace %d for config "%s"`, workspace.ID, config.Name) |
| 170 | + |
| 171 | + // Backfill configuration with workspace details |
| 172 | + backfillWorkspaceDetails(config, workspace) |
| 173 | + |
| 174 | + return nil |
| 175 | +} |
| 176 | + |
| 177 | +// setStringIfPresent writes non-empty pointer values to the requested path. |
| 178 | +func setStringIfPresent(config *model.Config, path string, value *string) bool { |
| 179 | + if value == nil || *value == "" { |
| 180 | + return false |
| 181 | + } |
| 182 | + return setNestedIfDifferent(config, path, *value) |
| 183 | +} |
| 184 | + |
| 185 | +// setNestedIfDifferent writes the value only if it differs from the existing state. |
| 186 | +// Returns true if the value was set successfully, false if unchanged or if setting failed. |
| 187 | +// If SetNested fails, returns false to prevent incorrectly reporting success. |
| 188 | +// Note: The error from SetNested is not surfaced to callers due to function signature limitations. |
| 189 | +func setNestedIfDifferent(config *model.Config, path string, value any) bool { |
| 190 | + current, found, _ := config.Content.GetNested(path) |
| 191 | + if found && reflect.DeepEqual(current, value) { |
| 192 | + return false |
| 193 | + } |
| 194 | + // Handle SetNested error to prevent silent failures. |
| 195 | + // If the set operation fails, return false to indicate no change was made. |
| 196 | + if err := config.Content.SetNested(path, value); err != nil { |
| 197 | + return false |
| 198 | + } |
| 199 | + return true |
| 200 | +} |
| 201 | + |
| 202 | +// needsWorkspaceDetails returns true when workspace metadata in configuration is missing/incomplete. |
| 203 | +func needsWorkspaceDetails(config *model.Config) bool { |
| 204 | + requiredStringPaths := []string{ |
| 205 | + "parameters.db.host", |
| 206 | + "parameters.db.user", |
| 207 | + "parameters.db.database", |
| 208 | + "parameters.db.schema", |
| 209 | + "parameters.db.warehouse", |
| 210 | + "parameters.db.role", |
| 211 | + "parameters.db.account", |
| 212 | + "parameters.db.region", |
| 213 | + } |
| 214 | + |
| 215 | + loginTypeValue, found, _ := config.Content.GetNested("parameters.db.loginType") |
| 216 | + if !found || loginTypeValue == nil { |
| 217 | + return true |
| 218 | + } |
| 219 | + if str, ok := loginTypeValue.(string); !ok || str != "snowflake-service-keypair" { |
| 220 | + return true |
| 221 | + } |
| 222 | + |
| 223 | + for _, path := range requiredStringPaths { |
| 224 | + value, found, _ := config.Content.GetNested(path) |
| 225 | + if !found || value == nil { |
| 226 | + return true |
| 227 | + } |
| 228 | + str, ok := value.(string) |
| 229 | + if !ok || str == "" { |
| 230 | + return true |
| 231 | + } |
| 232 | + } |
| 233 | + |
| 234 | + return false |
| 235 | +} |
| 236 | + |
| 237 | +// setWorkspaceID stores workspace ID as json.Number so it matches how local JSON is parsed. |
| 238 | +func setWorkspaceID(config *model.Config, workspaceID uint64) bool { |
| 239 | + number := json.Number(strconv.FormatUint(workspaceID, 10)) |
| 240 | + return setNestedIfDifferent(config, "parameters.db.workspaceId", number) |
| 241 | +} |
| 242 | + |
| 243 | +// storePrivateKey saves the private key to tempDir/<project_id>/<configuration-id>/private_key.pem. |
| 244 | +// The directory structure is created if it doesn't exist. |
| 245 | +func (m *dataGatewayMapper) storePrivateKey(ctx context.Context, config *model.Config, privateKeyPEM string) error { |
| 246 | + // Get cached project ID (must be set before calling this method) |
| 247 | + if m.projectID == 0 { |
| 248 | + return errors.New("project ID not set, ensure ensureWorkspaceForConfig sets it first") |
| 249 | + } |
| 250 | + projectID := m.projectID |
| 251 | + |
| 252 | + // Get configuration ID |
| 253 | + configID := config.ID |
| 254 | + if configID == "" { |
| 255 | + return errors.New("config ID is empty") |
| 256 | + } |
| 257 | + |
| 258 | + // Create OS filesystem instance for system temporary directory. |
| 259 | + // os.TempDir returns an OS-agnostic path, so the mapper works on Linux, macOS and Windows. |
| 260 | + // Use aferofs.NewLocalFs to get a proper filesystem.Fs interface |
| 261 | + tmpDir := os.TempDir() |
| 262 | + tmpFs, err := aferofs.NewLocalFs(tmpDir) |
| 263 | + if err != nil { |
| 264 | + return errors.Errorf("cannot create filesystem for temp dir %s: %w", tmpDir, err) |
| 265 | + } |
| 266 | + |
| 267 | + // Build relative directory path: <project_id>/<configuration-id> |
| 268 | + // Use filesystem.Join for filesystem operations (uses forward slashes) |
| 269 | + projectDirPath := filesystem.Join(fmt.Sprintf("%d", projectID)) |
| 270 | + relativeDirPath := filesystem.Join(projectDirPath, string(configID)) |
| 271 | + |
| 272 | + // Create project directory if it doesn't exist |
| 273 | + if !tmpFs.Exists(ctx, projectDirPath) { |
| 274 | + if err := tmpFs.Mkdir(ctx, projectDirPath); err != nil { |
| 275 | + absProjectPath := filesystem.Join(tmpDir, fmt.Sprintf("%d", projectID)) |
| 276 | + return errors.Errorf("cannot create project directory %s: %w", absProjectPath, err) |
| 277 | + } |
| 278 | + } |
| 279 | + |
| 280 | + // Create configuration directory if it doesn't exist |
| 281 | + if !tmpFs.Exists(ctx, relativeDirPath) { |
| 282 | + if err := tmpFs.Mkdir(ctx, relativeDirPath); err != nil { |
| 283 | + absDirPath := filesystem.Join(tmpDir, fmt.Sprintf("%d", projectID), string(configID)) |
| 284 | + return errors.Errorf("cannot create directory %s: %w", absDirPath, err) |
| 285 | + } |
| 286 | + absDirPath := filesystem.Join(tmpDir, fmt.Sprintf("%d", projectID), string(configID)) |
| 287 | + m.logger.Debugf(ctx, `Created directory "%s" for private key storage`, absDirPath) |
| 288 | + } |
| 289 | + |
| 290 | + // Write private key file with restrictive permissions (0600 - owner read/write only) |
| 291 | + // This ensures the private key is not readable by other users on the system. |
| 292 | + privateKeyPath := filesystem.Join(relativeDirPath, "private_key.pem") |
| 293 | + // Use OpenFile directly to set restrictive permissions (0600) instead of default 0644 |
| 294 | + fd, err := tmpFs.OpenFile(ctx, privateKeyPath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0o600) |
| 295 | + if err != nil { |
| 296 | + absKeyPath := filesystem.Join(tmpDir, fmt.Sprintf("%d", projectID), string(configID), "private_key.pem") |
| 297 | + return errors.Errorf("cannot open private key file %s: %w", absKeyPath, err) |
| 298 | + } |
| 299 | + if _, err := fd.WriteString(privateKeyPEM); err != nil { |
| 300 | + absKeyPath := filesystem.Join(tmpDir, fmt.Sprintf("%d", projectID), string(configID), "private_key.pem") |
| 301 | + _ = fd.Close() |
| 302 | + return errors.Errorf("cannot write private key to %s: %w", absKeyPath, err) |
| 303 | + } |
| 304 | + if err := fd.Close(); err != nil { |
| 305 | + absKeyPath := filesystem.Join(tmpDir, fmt.Sprintf("%d", projectID), string(configID), "private_key.pem") |
| 306 | + return errors.Errorf("cannot close private key file %s: %w", absKeyPath, err) |
| 307 | + } |
| 308 | + |
| 309 | + absKeyPath := filesystem.Join(tmpDir, fmt.Sprintf("%d", projectID), string(configID), "private_key.pem") |
| 310 | + m.logger.Infof(ctx, `Stored private key for config "%s" to "%s"`, config.Name, absKeyPath) |
| 311 | + return nil |
| 312 | +} |
| 313 | + |
| 314 | +// normalizeWorkspaceID rewrites workspaceId to json.Number when possible. |
| 315 | +// This ensures consistent type representation across different JSON parsing scenarios. |
| 316 | +func normalizeWorkspaceID(config *model.Config) bool { |
| 317 | + value, found, _ := config.Content.GetNested("parameters.db.workspaceId") |
| 318 | + if !found || value == nil { |
| 319 | + return false |
| 320 | + } |
| 321 | + |
| 322 | + switch v := value.(type) { |
| 323 | + case json.Number: |
| 324 | + return false |
| 325 | + case string: |
| 326 | + trimmed := strings.TrimSpace(v) |
| 327 | + if trimmed == "" { |
| 328 | + return false |
| 329 | + } |
| 330 | + if _, err := strconv.ParseFloat(trimmed, 64); err != nil { |
| 331 | + return false |
| 332 | + } |
| 333 | + return setNestedIfDifferent(config, "parameters.db.workspaceId", json.Number(trimmed)) |
| 334 | + case float64: |
| 335 | + if math.IsNaN(v) { |
| 336 | + return false |
| 337 | + } |
| 338 | + return setNestedIfDifferent(config, "parameters.db.workspaceId", json.Number(strconv.FormatFloat(v, 'f', -1, 64))) |
| 339 | + case float32: |
| 340 | + if math.IsNaN(float64(v)) { |
| 341 | + return false |
| 342 | + } |
| 343 | + return setNestedIfDifferent(config, "parameters.db.workspaceId", json.Number(strconv.FormatFloat(float64(v), 'f', -1, 32))) |
| 344 | + case int, int32, int64: |
| 345 | + return setNestedIfDifferent(config, "parameters.db.workspaceId", json.Number(fmt.Sprintf("%d", v))) |
| 346 | + case uint, uint32, uint64: |
| 347 | + return setNestedIfDifferent(config, "parameters.db.workspaceId", json.Number(fmt.Sprintf("%d", v))) |
| 348 | + default: |
| 349 | + return false |
| 350 | + } |
| 351 | +} |
0 commit comments