Skip to content

Commit a88c6b7

Browse files
Provide Struct datat type support for oracle plugin
1 parent 085e485 commit a88c6b7

2 files changed

Lines changed: 187 additions & 1 deletion

File tree

oracle-plugin/src/main/java/io/cdap/plugin/oracle/OracleSourceSchemaReader.java

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,14 @@
2323
import org.slf4j.Logger;
2424
import org.slf4j.LoggerFactory;
2525

26+
import java.sql.Connection;
27+
import java.sql.PreparedStatement;
28+
import java.sql.ResultSet;
2629
import java.sql.ResultSetMetaData;
2730
import java.sql.SQLException;
2831
import java.sql.Types;
32+
import java.util.ArrayList;
33+
import java.util.List;
2934
import java.util.Set;
3035
import javax.annotation.Nullable;
3136

@@ -70,6 +75,7 @@ public class OracleSourceSchemaReader extends CommonSchemaReader {
7075
private final Boolean isTimestampOldBehavior;
7176
private final Boolean isPrecisionlessNumAsDecimal;
7277
private final Boolean isTimestampLtzFieldTimestamp;
78+
private Connection connection;
7379

7480
public OracleSourceSchemaReader() {
7581
this(null, false, false, false);
@@ -136,11 +142,130 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti
136142
}
137143
return Schema.decimalOf(precision, scale);
138144
}
145+
case Types.STRUCT:
146+
if (connection == null) {
147+
throw new SQLException("Cannot resolve STRUCT schema without a database connection. "
148+
+ "Use getSchemaFields(ResultSet) to enable STRUCT type resolution.");
149+
}
150+
String typeName = metadata.getColumnTypeName(index);
151+
String oracleSchemaName = metadata.getSchemaName(index);
152+
return getStructSchema(connection, oracleSchemaName, typeName);
139153
default:
140154
return super.getSchema(metadata, index);
141155
}
142156
}
143157

158+
@Override
159+
public List<Schema.Field> getSchemaFields(ResultSet resultSet) throws SQLException {
160+
this.connection = resultSet.getStatement().getConnection();
161+
return super.getSchemaFields(resultSet);
162+
}
163+
164+
/**
165+
* Builds a CDAP RECORD schema for an Oracle STRUCT type by querying the database metadata
166+
* for the type's attributes.
167+
*
168+
* @param connection the database connection
169+
* @param schemaName the Oracle schema owning the type
170+
* @param typeName the Oracle type name (e.g., "ADDRESS_TYPE")
171+
* @return a CDAP RECORD schema with fields corresponding to the STRUCT's attributes
172+
*/
173+
private Schema getStructSchema(Connection connection, String schemaName,
174+
String typeName) throws SQLException {
175+
List<Schema.Field> fields = new ArrayList<>();
176+
177+
String sql = "SELECT * FROM ALL_TYPE_ATTRS WHERE TYPE_NAME = ? ORDER BY ATTR_NO";
178+
179+
try (PreparedStatement stmt = connection.prepareStatement(sql)) {
180+
stmt.setString(1, typeName.substring(typeName.lastIndexOf('.') + 1));
181+
182+
try (ResultSet attrRs = stmt.executeQuery()) {
183+
while (attrRs.next()) {
184+
String attrName = attrRs.getString("ATTR_NAME");
185+
String attrTypeName = attrRs.getString("ATTR_TYPE_NAME");
186+
int attrSize = attrRs.getInt("PRECISION");
187+
int attrScale = attrRs.getInt("SCALE");
188+
189+
Schema attrSchema = mapPrimitiveOracleType(attrTypeName, attrSize, attrScale);
190+
if (attrSchema != null) {
191+
fields.add(Schema.Field.of(attrName, attrSchema));
192+
} else {
193+
Schema nestedSchema = getStructSchema(connection, schemaName, attrTypeName);
194+
fields.add(Schema.Field.of(attrName, nestedSchema));
195+
}
196+
}
197+
}
198+
}
199+
if (fields.isEmpty()) {
200+
throw new SQLException(String.format(
201+
"No attributes found for Oracle STRUCT type '%s.%s'. "
202+
+ "Ensure the type exists and is accessible.",
203+
schemaName, typeName));
204+
}
205+
206+
return Schema.recordOf(typeName, fields);
207+
}
208+
209+
private Schema mapPrimitiveOracleType(String typeName, int precision, int scale) {
210+
switch (typeName) {
211+
case "TIMESTAMP WITH TZ":
212+
return isTimestampOldBehavior ? Schema.of(Schema.Type.STRING) : Schema.of(Schema.LogicalType.TIMESTAMP_MICROS);
213+
case "TIMESTAMP WITH LTZ":
214+
return getTimestampLtzSchema();
215+
case "TIMESTAMP":
216+
return Schema.of(Schema.LogicalType.DATETIME);
217+
case "DATE" :
218+
return Schema.of(Schema.LogicalType.DATE);
219+
case "BINARY FLOAT":
220+
case "FLOAT":
221+
return Schema.of(Schema.Type.FLOAT);
222+
case "BINARY DOUBLE":
223+
case "DOUBLE":
224+
return Schema.of(Schema.Type.DOUBLE);
225+
case "BFILE":
226+
case "RAW":
227+
case "LONG RAW":
228+
return Schema.of(Schema.Type.BYTES);
229+
case "INTERVAL DAY TO SECOND":
230+
case "INTERVAL YEAR TO MONTH":
231+
case "VARCHAR2":
232+
case "VARCHAR":
233+
case "CHAR":
234+
case "CLOB":
235+
case "BLOB":
236+
case "LONG":
237+
return Schema.of(Schema.Type.STRING);
238+
case "INTEGER":
239+
return Schema.of(Schema.Type.INT);
240+
case "NUMBER":
241+
case "DECIMAL":
242+
// FLOAT and REAL are returned as java.sql.Types.NUMERIC but with value that is a java.lang.Double
243+
if (Double.class.getTypeName().equals(typeName)) {
244+
return Schema.of(Schema.Type.DOUBLE);
245+
} else {
246+
if (precision == 0) {
247+
if (isPrecisionlessNumAsDecimal) {
248+
precision = 38;
249+
scale = 0;
250+
LOG.warn(String.format("%s type with undefined precision and scale is detected, "
251+
+ "there may be a precision loss while running the pipeline. "
252+
+ "Please define an output precision and scale for field to avoid "
253+
+ "precision loss.", typeName));
254+
return Schema.decimalOf(precision, scale);
255+
} else {
256+
LOG.warn(String.format("%s type without precision and scale, "
257+
+ "converting into STRING type to avoid any precision loss.",
258+
typeName));
259+
return Schema.of(Schema.Type.STRING);
260+
}
261+
}
262+
return Schema.decimalOf(precision, scale);
263+
}
264+
default:
265+
return null;
266+
}
267+
}
268+
144269
private @NotNull Schema getTimestampLtzSchema() {
145270
return isTimestampOldBehavior || isTimestampLtzFieldTimestamp
146271
? Schema.of(Schema.LogicalType.TIMESTAMP_MICROS)

oracle-plugin/src/test/java/io/cdap/plugin/oracle/OracleSchemaReaderTest.java

Lines changed: 62 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,13 @@
2424
import org.mockito.Mockito;
2525
import org.mockito.junit.MockitoJUnitRunner;
2626

27+
import java.sql.Connection;
28+
import java.sql.PreparedStatement;
2729
import java.sql.ResultSet;
2830
import java.sql.ResultSetMetaData;
2931
import java.sql.SQLException;
32+
import java.sql.Statement;
33+
import java.sql.Types;
3034
import java.util.List;
3135

3236
public class OracleSchemaReaderTest {
@@ -37,6 +41,12 @@ public void getSchema_timestampLTZFieldTrue_returnTimestamp() throws SQLExceptio
3741

3842
ResultSet resultSet = Mockito.mock(ResultSet.class);
3943
ResultSetMetaData metadata = Mockito.mock(ResultSetMetaData.class);
44+
Statement statement = Mockito.mock(Statement.class);
45+
Connection connection = Mockito.mock(Connection.class);
46+
47+
Mockito.when(resultSet.getMetaData()).thenReturn(metadata);
48+
Mockito.when(resultSet.getStatement()).thenReturn(statement);
49+
Mockito.when(statement.getConnection()).thenReturn(connection);
4050

4151
Mockito.when(resultSet.getMetaData()).thenReturn(metadata);
4252

@@ -68,9 +78,12 @@ public void getSchema_timestampLTZFieldFalse_returnDatetime() throws SQLExceptio
6878

6979
ResultSet resultSet = Mockito.mock(ResultSet.class);
7080
ResultSetMetaData metadata = Mockito.mock(ResultSetMetaData.class);
81+
Statement statement = Mockito.mock(Statement.class);
82+
Connection connection = Mockito.mock(Connection.class);
7183

7284
Mockito.when(resultSet.getMetaData()).thenReturn(metadata);
73-
85+
Mockito.when(resultSet.getStatement()).thenReturn(statement);
86+
Mockito.when(statement.getConnection()).thenReturn(connection);
7487
Mockito.when(metadata.getColumnCount()).thenReturn(2);
7588
// -101 is for TIMESTAMP_TZ
7689
Mockito.when(metadata.getColumnType(1)).thenReturn(-101);
@@ -91,4 +104,52 @@ public void getSchema_timestampLTZFieldFalse_returnDatetime() throws SQLExceptio
91104
Assert.assertEquals(expectedSchemaFields.get(1).getName(), actualSchemaFields.get(1).getName());
92105
Assert.assertEquals(expectedSchemaFields.get(1).getSchema(), actualSchemaFields.get(1).getSchema());
93106
}
107+
108+
@Test
109+
public void getSchemaFields_structType_returnRecord() throws SQLException {
110+
OracleSourceSchemaReader schemaReader = new OracleSourceSchemaReader();
111+
112+
ResultSet resultSet = Mockito.mock(ResultSet.class);
113+
ResultSetMetaData metadata = Mockito.mock(ResultSetMetaData.class);
114+
Statement statement = Mockito.mock(Statement.class);
115+
Connection connection = Mockito.mock(Connection.class);
116+
PreparedStatement stmt = Mockito.mock(PreparedStatement.class);
117+
ResultSet attrRs = Mockito.mock(ResultSet.class);
118+
119+
Mockito.when(resultSet.getMetaData()).thenReturn(metadata);
120+
Mockito.when(resultSet.getStatement()).thenReturn(statement);
121+
Mockito.when(statement.getConnection()).thenReturn(connection);
122+
Mockito.when(connection.prepareStatement(Mockito.anyString())).thenReturn(stmt);
123+
Mockito.when(stmt.executeQuery()).thenReturn(attrRs);
124+
125+
// One STRUCT column
126+
Mockito.when(metadata.getColumnCount()).thenReturn(1);
127+
Mockito.when(metadata.getColumnType(1)).thenReturn(Types.STRUCT);
128+
Mockito.when(metadata.getColumnName(1)).thenReturn("address");
129+
Mockito.when(metadata.getColumnTypeName(1)).thenReturn("ADDRESS_TYPE");
130+
Mockito.when(metadata.getSchemaName(1)).thenReturn("TEST_SCHEMA");
131+
132+
// Mock ALL_TYPE_ATTRS for ADDRESS_TYPE with two VARCHAR2 attributes
133+
Mockito.when(attrRs.next()).thenReturn(true, true, false);
134+
Mockito.when(attrRs.getString("ATTR_NAME")).thenReturn("STREET", "CITY");
135+
Mockito.when(attrRs.getString("ATTR_TYPE_NAME")).thenReturn("VARCHAR2", "VARCHAR2");
136+
Mockito.when(attrRs.getInt("PRECISION")).thenReturn(0, 0);
137+
Mockito.when(attrRs.getInt("SCALE")).thenReturn(0, 0);
138+
139+
List<Schema.Field> actualFields = schemaReader.getSchemaFields(resultSet);
140+
141+
Assert.assertEquals(1, actualFields.size());
142+
Schema.Field addressField = actualFields.get(0);
143+
Assert.assertEquals("address", addressField.getName());
144+
145+
Schema addressSchema = addressField.getSchema().isNullable()
146+
? addressField.getSchema().getNonNullable() : addressField.getSchema();
147+
Assert.assertEquals(Schema.Type.RECORD, addressSchema.getType());
148+
Assert.assertEquals("ADDRESS_TYPE", addressSchema.getRecordName());
149+
150+
List<Schema.Field> structFields = addressSchema.getFields();
151+
Assert.assertEquals(2, structFields.size());
152+
Assert.assertEquals("STREET", structFields.get(0).getName());
153+
Assert.assertEquals("CITY", structFields.get(1).getName());
154+
}
94155
}

0 commit comments

Comments
 (0)