diff --git a/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/ColumnarExec.scala b/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/ColumnarExec.scala index c573cb82f9a53e2698d7218a5d220217f401d871..90455b6f038fda333b06b47c51b9f103fb9395bd 100644 --- a/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/ColumnarExec.scala +++ b/omnioperator/omniop-spark-extension/spark-extension-core/src/main/scala/org/apache/spark/sql/execution/ColumnarExec.scala @@ -86,6 +86,15 @@ private object OmniRowToColumnConverter { case StringType => StringConverter case CalendarIntervalType => CalendarConverter case dt: DecimalType => DecimalConverter(dt) + case struct: StructType => + val fieldConverters = struct.fields.map { f => + getConverterForType(f.dataType, f.nullable) + } + StructConverter(fieldConverters) + case map: MapType => + val keyConverter = getConverterForType(map.keyType, nullable) + val valueConverter = getConverterForType(map.valueType, map.valueContainsNull) + MapConverter(keyConverter, valueConverter) case unknown => throw new UnsupportedOperationException( s"Type $unknown not supported") } @@ -99,6 +108,44 @@ private object OmniRowToColumnConverter { } } + private case class MapConverter( + keyConverter: TypeConverter, + valueConverter: TypeConverter) extends TypeConverter { + override def append(row: SpecializedGetters, column: Int, cv: WritableColumnVector): Unit = { + if (row.isNullAt(column)) { + cv.appendStruct(true) + return + } + + val mapData = row.getMap(column) + val mapLength = mapData.numElements + cv.appendStruct(false) + val keyVector = cv.getChild(0) + val valueVector = cv.getChild(1) + + for (i <- 0 until mapLength) { + keyConverter.append(mapData.keyArray(), i, keyVector) + valueConverter.append(mapData.valueArray(), i, valueVector) + } + } + } + + private case class StructConverter(childConverters: Array[TypeConverter]) extends TypeConverter { + override def append(row: SpecializedGetters, column: Int, cv: WritableColumnVector): Unit = { + val structRow = row.getStruct(column, childConverters.size) + if (structRow == null) { + cv.appendStruct(true) + return + } + cv.appendStruct(false) + + childConverters.zipWithIndex.foreach { case (childConverter, fieldIndex) => + val childColumnVector = cv.getChild(fieldIndex) + childConverter.append(structRow, fieldIndex, childColumnVector) + } + } + } + private object BinaryConverter extends TypeConverter { override def append(row: SpecializedGetters, column: Int, cv: WritableColumnVector): Unit = { val bytes = row.getBinary(column)