-
Notifications
You must be signed in to change notification settings - Fork 4.8k
HIVE-29578: Iceberg: add support for native views #6449
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -46,6 +46,11 @@ | |
| import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Lists; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Maps; | ||
| import org.apache.iceberg.util.PropertyUtil; | ||
| import org.apache.iceberg.view.BaseView; | ||
| import org.apache.iceberg.view.SQLViewRepresentation; | ||
| import org.apache.iceberg.view.View; | ||
| import org.apache.iceberg.view.ViewMetadata; | ||
| import org.apache.thrift.TException; | ||
|
|
||
| public class MetastoreUtil { | ||
|
|
@@ -148,6 +153,74 @@ public static Table toHiveTable(org.apache.iceberg.Table table, Configuration co | |
| return result; | ||
| } | ||
|
|
||
| /** | ||
| * Builds a Hive metastore {@link Table} representation for an Iceberg {@link View}, for clients | ||
| * (e.g. {@code HiveRESTCatalogClient}) that bridge Iceberg catalog metadata into the HMS API. | ||
| */ | ||
| public static Table toHiveView(View view, Configuration conf) { | ||
| Table result = new Table(); | ||
| TableName tableName = | ||
| TableName.fromString( | ||
| view.name(), MetaStoreUtils.getDefaultCatalog(conf), Warehouse.DEFAULT_DATABASE_NAME); | ||
| result.setCatName(tableName.getCat()); | ||
| result.setDbName(tableName.getDb()); | ||
|
Comment on lines
+163
to
+166
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What happens when a customer db is specified? |
||
| result.setTableName(tableName.getTable()); | ||
| result.setTableType(TableType.VIRTUAL_VIEW.toString()); | ||
|
|
||
| ViewMetadata metadata = ((BaseView) view).operations().current(); | ||
| String sqlText = viewSqlText(view, metadata); | ||
| result.setViewOriginalText(sqlText); | ||
| result.setViewExpandedText(sqlText); | ||
|
|
||
| long nowMillis = System.currentTimeMillis(); | ||
| int nowSec = (int) (nowMillis / 1000); | ||
| String owner = | ||
| PropertyUtil.propertyAsString( | ||
| metadata.properties(), HiveCatalog.HMS_TABLE_OWNER, System.getProperty("user.name")); | ||
| result.setOwner(owner); | ||
| result.setCreateTime(nowSec); | ||
| result.setLastAccessTime(nowSec); | ||
| result.setRetention(Integer.MAX_VALUE); | ||
|
|
||
| boolean hiveEngineEnabled = false; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Why it’s false in This path materializes an HMS So we keep a minimal SD consistent with normal virtual views and avoid implying this HMS object is an Iceberg-backed table for the Hive engine. For tables, |
||
| result.setSd(HiveOperationsBase.storageDescriptor(metadata.schema(), metadata.location(), hiveEngineEnabled)); | ||
| StorageDescriptor sd = result.getSd(); | ||
|
|
||
| if (sd.getBucketCols() == null) { | ||
| sd.setBucketCols(Lists.newArrayList()); | ||
| } | ||
|
|
||
| if (sd.getSortCols() == null) { | ||
| sd.setSortCols(Lists.newArrayList()); | ||
| } | ||
|
|
||
| long maxHiveTablePropertySize = | ||
| conf.getLong( | ||
| HiveOperationsBase.HIVE_TABLE_PROPERTY_MAX_SIZE, | ||
| HiveOperationsBase.HIVE_TABLE_PROPERTY_MAX_SIZE_DEFAULT); | ||
| HMSTablePropertyHelper.updateHmsTableForIcebergView( | ||
| metadata.metadataFileLocation(), | ||
| result, | ||
| metadata, | ||
| Collections.emptySet(), | ||
| maxHiveTablePropertySize, | ||
| null); | ||
|
|
||
| String catalogType = IcebergCatalogProperties.getCatalogType(conf); | ||
| if (!StringUtils.isEmpty(catalogType) && !IcebergCatalogProperties.NO_CATALOG_TYPE.equals(catalogType)) { | ||
| result.getParameters().put(CatalogUtil.ICEBERG_CATALOG_TYPE, IcebergCatalogProperties.getCatalogType(conf)); | ||
| } | ||
| return result; | ||
| } | ||
|
|
||
| private static String viewSqlText(View view, ViewMetadata metadata) { | ||
| SQLViewRepresentation hiveRepr = view.sqlFor("hive"); | ||
| if (hiveRepr != null) { | ||
| return hiveRepr.sql(); | ||
| } | ||
| return HiveViewOperations.sqlFor(metadata); | ||
| } | ||
|
|
||
| private static StorageDescriptor getHiveStorageDescriptor(org.apache.iceberg.Table table) { | ||
| var result = new StorageDescriptor(); | ||
| result.setCols(HiveSchemaUtil.convert(table.schema())); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,143 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.iceberg.hive; | ||
|
|
||
| import java.io.Closeable; | ||
| import java.io.IOException; | ||
| import java.io.UncheckedIOException; | ||
| import java.util.Collections; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import org.apache.commons.lang3.StringUtils; | ||
| import org.apache.hadoop.conf.Configuration; | ||
| import org.apache.hadoop.hive.conf.Constants; | ||
| import org.apache.hadoop.hive.metastore.api.FieldSchema; | ||
| import org.apache.iceberg.CatalogUtil; | ||
| import org.apache.iceberg.catalog.Catalog; | ||
| import org.apache.iceberg.catalog.Namespace; | ||
| import org.apache.iceberg.catalog.TableIdentifier; | ||
| import org.apache.iceberg.catalog.ViewCatalog; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Maps; | ||
| import org.apache.iceberg.view.ViewBuilder; | ||
|
|
||
| /** | ||
| * Commits a native Iceberg view through the configured default Iceberg catalog (HiveCatalog or REST | ||
| * catalog, etc.) when {@code Catalog} also implements {@link ViewCatalog}. | ||
| */ | ||
| public final class NativeIcebergViewSupport { | ||
|
|
||
| /** Value stored with {@link Constants#NATIVE_VIEW_STORAGE_HANDLER_CLASS_PARAM} for Iceberg native views. */ | ||
| public static final String NATIVE_ICEBERG_VIEW_HANDLER_FQCN = HMSTablePropertyHelper.HIVE_ICEBERG_STORAGE_HANDLER; | ||
|
|
||
| /** | ||
| * HMS / Iceberg view marker entries for a native Iceberg catalog view (same map as | ||
| * {@code HiveIcebergStorageHandler#getNativeViewHmsTableProperties()}). | ||
| */ | ||
| public static Map<String, String> defaultNativeViewMarkerTableProperties() { | ||
| return Map.of(Constants.NATIVE_VIEW_STORAGE_HANDLER_CLASS_PARAM, NATIVE_ICEBERG_VIEW_HANDLER_FQCN); | ||
| } | ||
|
|
||
| private NativeIcebergViewSupport() { | ||
| } | ||
|
|
||
| /** | ||
| * Creates or replaces a view in the Iceberg catalog. | ||
| * | ||
| * @return {@code false} if skipped because {@code ifNotExists} is true and the view already exists | ||
| */ | ||
| public static boolean createOrReplaceNativeView( | ||
| Configuration conf, | ||
| String databaseName, | ||
| String viewName, | ||
| List<FieldSchema> fieldSchemas, | ||
| String viewSql, | ||
| Map<String, String> tblProperties, | ||
| String comment, | ||
| boolean replace, | ||
| boolean ifNotExists) { | ||
| TableIdentifier identifier = TableIdentifier.of(databaseName, viewName); | ||
| String catalogName = IcebergCatalogProperties.getCatalogName(conf); | ||
| Map<String, String> catalogProps = IcebergCatalogProperties.getCatalogProperties(conf, catalogName); | ||
| Catalog catalog = CatalogUtil.buildIcebergCatalog(catalogName, catalogProps, conf); | ||
| try { | ||
| ViewCatalog viewCatalog = asViewCatalog(catalog, catalogName); | ||
| if (!replace && ifNotExists && viewCatalog.viewExists(identifier)) { | ||
| return false; | ||
| } | ||
|
|
||
| Map<String, String> mergedProps = mergeDefaultNativeViewTableProperties(tblProperties); | ||
| ViewBuilder builder = | ||
| viewCatalog | ||
| .buildView(identifier) | ||
| .withSchema(HiveSchemaUtil.convert(fieldSchemas, Collections.emptyMap(), true)) | ||
| .withDefaultNamespace(Namespace.of(identifier.namespace().level(0))) | ||
| .withQuery("hive", viewSql); | ||
| if (StringUtils.isNotBlank(comment)) { | ||
| builder = builder.withProperty("comment", comment); | ||
| } | ||
| for (Map.Entry<String, String> e : mergedProps.entrySet()) { | ||
| if (e.getKey() != null && e.getValue() != null) { | ||
| builder = builder.withProperty(e.getKey(), e.getValue()); | ||
| } | ||
| } | ||
| if (replace) { | ||
| builder.createOrReplace(); | ||
| } else { | ||
| builder.create(); | ||
| } | ||
| return true; | ||
| } finally { | ||
| if (catalog instanceof Closeable closeable) { | ||
| try { | ||
| closeable.close(); | ||
| } catch (IOException e) { | ||
| throw new UncheckedIOException("Failed to close Iceberg catalog", e); | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private static ViewCatalog asViewCatalog(Catalog catalog, String catalogName) { | ||
| if (catalog instanceof ViewCatalog viewCatalog) { | ||
| return viewCatalog; | ||
| } | ||
| throw new UnsupportedOperationException( | ||
| String.format( | ||
| "Iceberg catalog '%s' does not implement ViewCatalog.", | ||
| catalogName) + | ||
| " Native views require a catalog that implements ViewCatalog (e.g. HiveCatalog or REST)."); | ||
| } | ||
|
|
||
| /** | ||
| * Fills Iceberg native-view HMS / view marker properties when absent (e.g. direct catalog callers). | ||
| * Handlers that delegate here after {@code HiveStorageHandler#getNativeViewHmsTableProperties()} already | ||
| * supplied markers get the same result. | ||
| */ | ||
| public static Map<String, String> mergeDefaultNativeViewTableProperties(Map<String, String> tblProperties) { | ||
| Map<String, String> merged = Maps.newHashMap(); | ||
| if (tblProperties != null) { | ||
| merged.putAll(tblProperties); | ||
| } | ||
| for (Map.Entry<String, String> e : defaultNativeViewMarkerTableProperties().entrySet()) { | ||
| merged.putIfAbsent(e.getKey(), e.getValue()); | ||
| } | ||
| return merged; | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please rephrase this error message. Remove
STORED BYand letIcebergbe a parameter.