@@ -42,7 +42,21 @@ import org.duckdb.DuckDBColumnType.UUID
4242import org.duckdb.DuckDBColumnType.VARCHAR
4343import org.duckdb.DuckDBResultSetMetaData
4444import org.duckdb.JsonNode
45+ import org.jetbrains.kotlinx.dataframe.AnyFrame
46+ import org.jetbrains.kotlinx.dataframe.AnyRow
47+ import org.jetbrains.kotlinx.dataframe.DataColumn
4548import org.jetbrains.kotlinx.dataframe.DataFrame
49+ import org.jetbrains.kotlinx.dataframe.DataRow
50+ import org.jetbrains.kotlinx.dataframe.api.Infer
51+ import org.jetbrains.kotlinx.dataframe.api.asColumnGroup
52+ import org.jetbrains.kotlinx.dataframe.api.asDataColumn
53+ import org.jetbrains.kotlinx.dataframe.api.cast
54+ import org.jetbrains.kotlinx.dataframe.api.castToNotNullable
55+ import org.jetbrains.kotlinx.dataframe.api.first
56+ import org.jetbrains.kotlinx.dataframe.api.toDataFrame
57+ import org.jetbrains.kotlinx.dataframe.columns.ColumnGroup
58+ import org.jetbrains.kotlinx.dataframe.impl.DataCollector
59+ import org.jetbrains.kotlinx.dataframe.impl.schema.DataFrameSchemaImpl
4660import org.jetbrains.kotlinx.dataframe.io.DbConnectionConfig
4761import org.jetbrains.kotlinx.dataframe.io.readAllSqlTables
4862import org.jetbrains.kotlinx.dataframe.schema.ColumnSchema
@@ -56,6 +70,7 @@ import java.sql.ResultSet
5670import java.sql.Struct
5771import java.util.Properties
5872import kotlin.collections.toList
73+ import kotlin.reflect.KClass
5974import kotlin.reflect.KTypeProjection
6075import kotlin.reflect.full.createType
6176import kotlin.reflect.full.withNullability
@@ -100,7 +115,7 @@ public object DuckDb : DbType("duckdb") {
100115 */
101116 internal fun parseDuckDbType (sqlTypeName : String , isNullable : Boolean ): AnyTypeInformation =
102117 duckDbTypeCache.getOrPut(Pair (sqlTypeName, isNullable)) {
103- when (DuckDBResultSetMetaData .TypeNameToType (sqlTypeName)) {
118+ return @getOrPut when (DuckDBResultSetMetaData .TypeNameToType (sqlTypeName)) {
104119 BOOLEAN -> typeInformationForValueColumnOf<Boolean >(isNullable)
105120
106121 TINYINT -> typeInformationForValueColumnOf<Byte >(isNullable)
@@ -208,9 +223,45 @@ public object DuckDb : DbType("duckdb") {
208223
209224 // TODO requires #1266 for specific types
210225 STRUCT -> {
211- val structTypes = parseStructType(sqlTypeName)
226+ val structEntries = parseStructType(sqlTypeName)
227+ val parsedStructEntries = structEntries.mapValues { (_, type) ->
228+ parseDuckDbType(sqlTypeName = type, isNullable = true )
229+ }
212230
213- typeInformationForValueColumnOf<Struct >(isNullable)
231+ val targetSchema = ColumnSchema .Group (
232+ schema = DataFrameSchemaImpl (parsedStructEntries.mapValues { it.value.targetSchema }),
233+ contentType = typeOf<Any ?>(),
234+ )
235+
236+ typeInformationWithProcessingFor<Struct , Map <String , Any ?>, DataRow <* >>(
237+ jdbcSourceType = typeOf<Struct >().withNullability(isNullable),
238+ targetSchema = targetSchema,
239+ valuePreprocessor = { struct, _ ->
240+ // NOTE DataRows cannot be `null` in DataFrame, instead, all its fields become `null`
241+ if (struct == null ) {
242+ parsedStructEntries.mapValues { null }
243+ } else {
244+ // read data from the struct
245+ val attrs = struct.getAttributes(
246+ parsedStructEntries.mapValues {
247+ (it.value.jdbcSourceType.classifier!! as KClass <* >).java
248+ },
249+ )
250+
251+ // and potentially, preprocess each value individually
252+ parsedStructEntries.entries.withIndex().associate { (i, entry) ->
253+ entry.key to entry.value.castToAny().preprocess(attrs[i])
254+ }
255+ }
256+ },
257+ columnPostprocessor = { col, _ ->
258+ col.castToNotNullable()
259+ .values()
260+ .toDataFrame()
261+ .asColumnGroup(col.name())
262+ .asDataColumn()
263+ },
264+ )
214265 }
215266
216267 // Cannot handle this in Kotlin
@@ -222,6 +273,25 @@ public object DuckDb : DbType("duckdb") {
222273 }
223274 }
224275
276+ // Overriding buildDataColumn behavior so we can create the column group in post-processing for effeciency
277+ override fun <D : Any > buildDataColumn (
278+ name : String ,
279+ values : List <D ?>,
280+ typeInformation : TypeInformation <* , D , * >,
281+ inferNullability : Boolean ,
282+ ): DataColumn <D ?> =
283+ when (val schema = typeInformation.targetSchema) {
284+ is ColumnSchema .Group ->
285+ DataColumn .createValueColumn(
286+ name = name,
287+ values = values,
288+ infer = if (inferNullability) Infer .Nulls else Infer .None ,
289+ type = schema.type,
290+ )
291+
292+ else -> super .buildDataColumn(name, values, typeInformation, inferNullability)
293+ }
294+
225295 private fun SqlArray.toList (): List <Any ?> =
226296 when (val array = this .array) {
227297 is IntArray -> array.toList()
0 commit comments