diff --git a/backends-velox/src/main/java/org/apache/gluten/vectorized/HashJoinBuilder.java b/backends-velox/src/main/java/org/apache/gluten/vectorized/HashJoinBuilder.java index 106d8b981701..c3429f3e6b2f 100644 --- a/backends-velox/src/main/java/org/apache/gluten/vectorized/HashJoinBuilder.java +++ b/backends-velox/src/main/java/org/apache/gluten/vectorized/HashJoinBuilder.java @@ -39,6 +39,80 @@ public long rtHandle() { public static native long cloneHashTable(long hashTableData); + /** + * Serialize a hash table for broadcasting. + * + * @param hashTableHandle Handle to the hash table builder + * @return Handle to the serialized hash table data + */ + public static native long serializeHashTable(long hashTableHandle); + + /** + * Deserialize a hash table from broadcast data. Uses the default leaf memory pool for allocation. + * + * @param serializedData Byte array containing serialized hash table + * @return Handle to the deserialized hash table builder + */ + public static native long deserializeHashTable(byte[] serializedData); + + /** + * Deserialize a hash table from broadcast data with explicit ignoreNullKeys parameter. + * + * @param serializedData Byte array containing serialized hash table + * @param ignoreNullKeys Whether to ignore null keys (must match the serialized hash table) + * @param joinHasNullKeys Whether the build side has null keys (for null-aware anti join) + * @return Handle to the deserialized hash table builder + */ + public static native long deserializeHashTableWithIgnoreNullKeys( + byte[] serializedData, boolean ignoreNullKeys, boolean joinHasNullKeys); + + /** + * Get the size of serialized hash table data. + * + * @param serializedHandle Handle to serialized data + * @return Size in bytes + */ + public static native long getSerializedSize(long serializedHandle); + + /** + * Get ignoreNullKeys parameter from serialized hash table metadata. + * + * @param serializedHandle Handle to serialized data + * @return ignoreNullKeys flag used when building the hash table + */ + public static native boolean getSerializedIgnoreNullKeys(long serializedHandle); + + /** + * Get joinHasNullKeys parameter from serialized hash table metadata. + * + * @param serializedHandle Handle to serialized data + * @return joinHasNullKeys flag indicating if build side has null keys + */ + public static native boolean getSerializedJoinHasNullKeys(long serializedHandle); + + /** + * Get bloom filter blocks byte size from serialized hash table metadata. + * + * @param serializedHandle Handle to serialized data + * @return bloom filter blocks byte size + */ + public static native long getBloomFilterBlocksByteSize(long serializedHandle); + + /** + * Get serialized hash table data as byte array. + * + * @param serializedHandle Handle to serialized data + * @return Byte array containing serialized data + */ + public static native byte[] getSerializedData(long serializedHandle); + + /** + * Release serialized hash table data. + * + * @param serializedHandle Handle to serialized data + */ + public static native void releaseSerializedData(long serializedHandle); + public native long nativeBuild( String buildHashTableId, long[] batchHandlers, diff --git a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala index 846d527ee44c..6f338adeb21d 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala @@ -103,6 +103,9 @@ class VeloxConfig(conf: SQLConf) extends GlutenConfig(conf) { getConf(VALUE_STREAM_DYNAMIC_FILTER_ENABLED) def enableTimestampNtzValidation: Boolean = getConf(ENABLE_TIMESTAMP_NTZ_VALIDATION) + + def enableDriverSideBroadcastHashTableBuild: Boolean = + getConf(VELOX_DRIVER_SIDE_BROADCAST_HASH_TABLE_BUILD) } object VeloxConfig extends ConfigRegistry { @@ -622,6 +625,18 @@ object VeloxConfig extends ConfigRegistry { .booleanConf .createWithDefault(true) + val VELOX_DRIVER_SIDE_BROADCAST_HASH_TABLE_BUILD = + buildConf("spark.gluten.sql.columnar.backend.velox.driverSideBroadcastHashTableBuild") + .doc( + "Enable driver-side broadcast hash table build. When enabled, the hash table is " + + "built and serialized on the driver, then broadcast to executors. When disabled, " + + "each executor builds its own hash table from the broadcast data. " + + "Note: This feature may have issues with complex queries involving Semi/Anti-Join, " + + "sorting, or complex filter conditions. Consider disabling if you encounter " + + "incorrect results in such queries.") + .booleanConf + .createWithDefault(true) + val QUERY_TRACE_ENABLED = buildConf("spark.gluten.sql.columnar.backend.velox.queryTraceEnabled") .doc("Enable query tracing flag.") .booleanConf diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala index 1554c4ddd3e5..b6b272facc0e 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala @@ -25,9 +25,10 @@ import org.apache.spark.rpc.GlutenDriverEndpoint import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.{BuildRight, BuildSide} import org.apache.spark.sql.catalyst.plans._ -import org.apache.spark.sql.execution.{SparkPlan, SQLExecution} +import org.apache.spark.sql.execution.{ColumnarBuildSideRelation, SparkPlan, SQLExecution} import org.apache.spark.sql.execution.joins.BuildSideRelation import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.execution.unsafe.UnsafeColumnarBuildSideRelation import org.apache.spark.sql.vectorized.ColumnarBatch import io.substrait.proto.JoinRel @@ -179,7 +180,48 @@ case class BroadcastHashJoinExecTransformer( bloomFilterPushdownSize, metrics.get("buildHashTableTime") ) - val broadcastRDD = VeloxBroadcastBuildSideRDD(sparkContext, broadcast, context) + + // Check if the build side can be offloaded to Velox + // If offload=false (e.g., due to unsupported operators), we must use executor-side build + val canOffload = broadcast.value match { + case columnar: ColumnarBuildSideRelation => columnar.offload + case unsafe: UnsafeColumnarBuildSideRelation => unsafe.offload + case _ => false + } + + // Choose between driver-side and executor-side hash table build + val broadcastRDD = if (VeloxBroadcastBuildSideCache.isDriverSideBuildEnabled && canOffload) { + // New approach: Build and serialize hash table on driver + // Only use this when the build side can be offloaded to Velox + logInfo(s"Using driver-side broadcast hash table build for $buildBroadcastTableId") + val serializedHashTable = VeloxBroadcastBuildSideCache.buildAndSerializeOnDriver( + broadcast, + context + ) + val broadcastSerialized = sparkContext.broadcast(serializedHashTable) + val rdd = VeloxSerializedBroadcastRDD(sparkContext, broadcastSerialized, context) + + // Update bloom filter metrics from driver-side build + val (bloomFilterSize, dynamicFiltersProduced) = rdd.getBloomFilterMetrics + metrics.get("bloomFilterBlocksByteSize").foreach(_.set(bloomFilterSize)) + metrics.get("hashProbeDynamicFiltersProduced").foreach(_.set(dynamicFiltersProduced)) + + rdd + } else { + // Legacy approach: Build hash table on each executor + // Use this when: + // 1. Driver-side build is disabled, OR + // 2. The build side cannot be offloaded (offload=false due to unsupported operators) + if (!canOffload) { + logWarning( + s"Build side cannot be offloaded for $buildBroadcastTableId, " + + "falling back to executor-side build") + } else { + logInfo(s"Using executor-side broadcast hash table build for $buildBroadcastTableId") + } + VeloxBroadcastBuildSideRDD(sparkContext, broadcast, context) + } + // FIXME: Do we have to make build side a RDD? streamedRDD :+ broadcastRDD } diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/SerializedBroadcastHashTable.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/SerializedBroadcastHashTable.scala new file mode 100644 index 000000000000..0d52dca3e87b --- /dev/null +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/SerializedBroadcastHashTable.scala @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.gluten.vectorized.HashJoinBuilder + +import org.apache.spark.sql.execution.joins.BuildSideRelation + +import java.io.{Externalizable, ObjectInput, ObjectOutput} + +/** + * Serialized broadcast hash table that can be efficiently broadcast to executors. This is built on + * the driver and contains the serialized hash table data. + */ +case class SerializedBroadcastHashTable( + serializedData: Array[Byte], + numRows: Long, + ignoreNullKeys: Boolean, + joinHasNullKeys: Boolean, + bloomFilterBlocksByteSize: Long, + hashProbeDynamicFiltersProduced: Long, + buildSideRelation: BuildSideRelation) + extends Externalizable { + + def this() = this(null, 0, false, false, 0, 0, null) // Required for Externalizable + + override def writeExternal(out: ObjectOutput): Unit = { + out.writeLong(numRows) + out.writeBoolean(ignoreNullKeys) + out.writeBoolean(joinHasNullKeys) + out.writeLong(bloomFilterBlocksByteSize) + out.writeLong(hashProbeDynamicFiltersProduced) + out.writeInt(serializedData.length) + out.write(serializedData) + out.writeObject(buildSideRelation) + } + + override def readExternal(in: ObjectInput): Unit = { + val numRows = in.readLong() + val ignoreNullKeys = in.readBoolean() + val joinHasNullKeys = in.readBoolean() + val bloomFilterBlocksByteSize = in.readLong() + val hashProbeDynamicFiltersProduced = in.readLong() + val dataLength = in.readInt() + val data = new Array[Byte](dataLength) + in.readFully(data) + val relation = in.readObject().asInstanceOf[BuildSideRelation] + + // Use reflection to set final fields + val numRowsField = classOf[SerializedBroadcastHashTable].getDeclaredField("numRows") + numRowsField.setAccessible(true) + numRowsField.set(this, numRows) + + val dataField = classOf[SerializedBroadcastHashTable].getDeclaredField("serializedData") + dataField.setAccessible(true) + dataField.set(this, data) + + val relationField = classOf[SerializedBroadcastHashTable].getDeclaredField("buildSideRelation") + relationField.setAccessible(true) + relationField.set(this, relation) + + val ignoreNullKeysField = + classOf[SerializedBroadcastHashTable].getDeclaredField("ignoreNullKeys") + ignoreNullKeysField.setAccessible(true) + ignoreNullKeysField.set(this, ignoreNullKeys) + + val joinHasNullKeysField = + classOf[SerializedBroadcastHashTable].getDeclaredField("joinHasNullKeys") + joinHasNullKeysField.setAccessible(true) + joinHasNullKeysField.set(this, joinHasNullKeys) + + val bloomFilterBlocksByteSizeField = + classOf[SerializedBroadcastHashTable].getDeclaredField("bloomFilterBlocksByteSize") + bloomFilterBlocksByteSizeField.setAccessible(true) + bloomFilterBlocksByteSizeField.set(this, bloomFilterBlocksByteSize) + + val hashProbeDynamicFiltersProducedField = + classOf[SerializedBroadcastHashTable].getDeclaredField("hashProbeDynamicFiltersProduced") + hashProbeDynamicFiltersProducedField.setAccessible(true) + hashProbeDynamicFiltersProducedField.set(this, hashProbeDynamicFiltersProduced) + } + + /** + * Deserialize the hash table on executor side. The serialized Velox hash table is already in a + * prepared, probe-ready form, so executor side only needs deserialization without re-running + * prepareJoinTable. + * + * @return + * Hash table builder handle + */ + def deserialize(): Long = { + HashJoinBuilder.deserializeHashTableWithIgnoreNullKeys( + serializedData, + ignoreNullKeys, + joinHasNullKeys) + } + + /** Get the size of serialized data in bytes. */ + def sizeInBytes: Long = serializedData.length.toLong +} + +object SerializedBroadcastHashTable { + + /** + * Build and serialize a hash table on the driver. + * + * @param hashTableHandle + * Handle to the built hash table + * @param buildSideRelation + * The build side relation for metadata + * @return + * Serialized broadcast hash table + */ + def fromHashTable( + hashTableHandle: Long, + buildSideRelation: BuildSideRelation): SerializedBroadcastHashTable = { + + // Serialize the hash table + val serializedHandle = HashJoinBuilder.serializeHashTable(hashTableHandle) + + try { + // Get serialized data + val serializedData = HashJoinBuilder + .getSerializedData(serializedHandle) + val numRows = HashJoinBuilder + .getSerializedSize(serializedHandle) + val ignoreNullKeys = HashJoinBuilder + .getSerializedIgnoreNullKeys(serializedHandle) + val joinHasNullKeys = HashJoinBuilder + .getSerializedJoinHasNullKeys(serializedHandle) + + // Get bloom filter metrics + val bloomFilterBlocksByteSize = HashJoinBuilder + .getBloomFilterBlocksByteSize(serializedHandle) + val hashProbeDynamicFiltersProduced = if (bloomFilterBlocksByteSize > 0) 1L else 0L + + SerializedBroadcastHashTable( + serializedData, + numRows, + ignoreNullKeys, + joinHasNullKeys, + bloomFilterBlocksByteSize, + hashProbeDynamicFiltersProduced, + buildSideRelation) + } finally { + // Clean up serialized handle + HashJoinBuilder.releaseSerializedData(serializedHandle) + // Clean up original hash table + HashJoinBuilder.clearHashTable(hashTableHandle) + } + } +} diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala index 535fd8900e19..e50dcfd2252d 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideCache.scala @@ -16,7 +16,9 @@ */ package org.apache.gluten.execution +import org.apache.gluten.backendsapi.BackendsApiManager import org.apache.gluten.backendsapi.velox.VeloxBackendSettings +import org.apache.gluten.config.VeloxConfig import org.apache.gluten.vectorized.HashJoinBuilder import org.apache.spark.SparkEnv @@ -37,6 +39,10 @@ case class BroadcastHashTable(pointer: Long, relation: BuildSideRelation) * * The complicated part is due to reuse exchange, where multiple BHJ IDs correspond to a * `BuildSideRelation`. + * + * This implementation supports two modes: + * 1. Driver-side build (new): Hash table is built and serialized on driver, then broadcast 2. + * Executor-side build (legacy): Each executor builds its own hash table from broadcast data */ object VeloxBroadcastBuildSideCache extends Logging @@ -75,6 +81,80 @@ object VeloxBroadcastBuildSideCache ) } + /** + * Build hash table on driver and serialize for broadcasting. This is the new driver-side build + * approach similar to Spark's native implementation. + * + * Note: This method runs on the driver, not in a Spark task, so we need to manually create and + * manage the runtime and memory manager. + */ + def buildAndSerializeOnDriver( + broadcast: Broadcast[BuildSideRelation], + broadcastContext: BroadcastHashJoinContext): SerializedBroadcastHashTable = { + + logInfo(s"Building hash table on driver for broadcast ID: ${broadcastContext.buildHashTableId}") + + // For driver-side build, create a standalone runtime without task context dependencies. + val backendName = BackendsApiManager.getBackendName + + val runtime = org.apache.gluten.runtime.Runtime.createStandalone( + backendName, + "DriverBroadcastHashTableBuild" + ) + + try { + // Build hash table on driver using the created runtime + val (hashTableHandle, relation) = broadcast.value match { + case columnar: ColumnarBuildSideRelation => + columnar.reset() + columnar.buildHashTableWithRuntime(broadcastContext, runtime) + case unsafe: UnsafeColumnarBuildSideRelation => + unsafe.reset() + unsafe.buildHashTableWithRuntime(broadcastContext, runtime) + } + + // Serialize the hash table + val serialized = SerializedBroadcastHashTable.fromHashTable(hashTableHandle, relation) + + if (relation != null) { + relation match { + case columnar: ColumnarBuildSideRelation => + columnar.reset() + case unsafe: UnsafeColumnarBuildSideRelation => + unsafe.reset() + } + } + + logInfo( + s"Serialized hash table size: ${serialized.sizeInBytes} bytes, " + + s"rows: ${serialized.numRows} for broadcast ID: ${broadcastContext.buildHashTableId}") + + serialized + } finally { + runtime.close() + } + } + + /** Deserialize hash table on executor from broadcast data. */ + def deserializeOnExecutor( + serialized: SerializedBroadcastHashTable, + broadcastHashTableId: String): BroadcastHashTable = { + + buildSideRelationCache.get( + broadcastHashTableId, + (_: String) => { + logInfo(s"Deserializing hash table on executor for broadcast ID: $broadcastHashTableId") + val hashTableHandle = serialized.deserialize() + BroadcastHashTable(hashTableHandle, serialized.buildSideRelation) + } + ) + } + + /** Check if driver-side build is enabled. */ + def isDriverSideBuildEnabled: Boolean = { + VeloxConfig.get.enableDriverSideBroadcastHashTableBuild + } + /** This is callback from c++ backend. */ def get(broadcastHashtableId: String): Long = { Option(buildSideRelationCache.getIfPresent(broadcastHashtableId)) diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxSerializedBroadcastRDD.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxSerializedBroadcastRDD.scala new file mode 100644 index 000000000000..b2c03a9652de --- /dev/null +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxSerializedBroadcastRDD.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.spark.{broadcast, SparkContext} +import org.apache.spark.sql.vectorized.ColumnarBatch + +/** + * RDD for handling serialized broadcast hash tables built on the driver. This RDD deserializes the + * hash table on each executor. + */ +case class VeloxSerializedBroadcastRDD( + @transient private val sc: SparkContext, + broadcasted: broadcast.Broadcast[SerializedBroadcastHashTable], + broadcastContext: BroadcastHashJoinContext) + extends BroadcastBuildSideRDD(sc, null) { + + override def genBroadcastBuildSideIterator(): Iterator[ColumnarBatch] = { + // Deserialize hash table on executor + val serialized = broadcasted.value + VeloxBroadcastBuildSideCache.deserializeOnExecutor( + serialized, + broadcastContext.buildHashTableId + ) + + // Return empty iterator as hash table is already built + Iterator.empty + } + + /** + * Get bloom filter metrics from the serialized hash table. This is called from the driver to get + * metrics that were computed during hash table build. + */ + def getBloomFilterMetrics: (Long, Long) = { + val serialized = broadcasted.value + (serialized.bloomFilterBlocksByteSize, serialized.hashProbeDynamicFiltersProduced) + } +} diff --git a/backends-velox/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala b/backends-velox/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala index b056cd36a8ed..d32f29ad7913 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/JoinMetricsUpdater.scala @@ -16,6 +16,7 @@ */ package org.apache.gluten.metrics +import org.apache.gluten.config.VeloxConfig import org.apache.gluten.metrics.Metrics.SingleMetric import org.apache.gluten.substrait.JoinParams @@ -129,8 +130,16 @@ class HashJoinMetricsUpdater(override val metrics: Map[String, SQLMetric]) hashProbeSpilledPartitions += hashProbeMetrics.spilledPartitions hashProbeSpilledFiles += hashProbeMetrics.spilledFiles hashProbeReplacedWithDynamicFilterRows += hashProbeMetrics.numReplacedWithDynamicFilterRows - hashProbeDynamicFiltersProduced += hashProbeMetrics.numDynamicFiltersProduced - bloomFilterBlocksByteSize += hashProbeMetrics.bloomFilterBlocksByteSize + + // Only accumulate dynamic filter metrics when driver-side build is disabled. + // When driver-side build is enabled, these metrics are set directly from the + // serialized hash table in HashJoinExecTransformer to avoid double counting. + val isDriverSideBuildEnabled = + VeloxConfig.get.enableDriverSideBroadcastHashTableBuild + if (!isDriverSideBuildEnabled) { + hashProbeDynamicFiltersProduced += hashProbeMetrics.numDynamicFiltersProduced + bloomFilterBlocksByteSize += hashProbeMetrics.bloomFilterBlocksByteSize + } idx += 1 // HashBuild diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala index fea9f149745a..cd83e222c32f 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala @@ -238,6 +238,91 @@ case class ColumnarBuildSideRelation( } } + /** + * Build hash table with provided runtime (for driver-side build). This version doesn't rely on + * TaskContext and can be called from the driver. + */ + def buildHashTableWithRuntime( + broadcastContext: BroadcastHashJoinContext, + runtime: org.apache.gluten.runtime.Runtime): (Long, ColumnarBuildSideRelation) = + synchronized { + if (hashTableData == 0) { + val startTime = System.nanoTime() + val jniWrapper = ColumnarBatchSerializerJniWrapper.create(runtime) + val serializeHandle: Long = { + val allocator = ArrowBufferAllocators.globalInstance() + val cSchema = ArrowSchema.allocateNew(allocator) + val arrowSchema = SparkArrowUtil.toArrowSchema( + SparkShimLoader.getSparkShims.structFromAttributes(output), + SQLConf.get.sessionLocalTimeZone) + ArrowAbiUtil.exportSchema(allocator, arrowSchema, cSchema) + val handle = jniWrapper + .init(cSchema.memoryAddress()) + cSchema.close() + handle + } + + val batchArray = new ArrayBuffer[Long] + + var batchId = 0 + while (batchId < batches.size) { + batchArray.append(jniWrapper.deserialize(serializeHandle, batches(batchId))) + batchId += 1 + } + + logDebug( + s"BHJ value size: " + + s"${broadcastContext.buildHashTableId} = ${batches.length}") + + val (keys, newOutput) = if (newBuildKeys.isEmpty) { + ( + broadcastContext.buildSideJoinKeys.asJava, + broadcastContext.buildSideStructure.asJava + ) + } else { + ( + newBuildKeys.asJava, + output.asJava + ) + } + + val joinKeys = keys.asScala.map { + key => + val attr = ConverterUtils.getAttrFromExpr(key) + ConverterUtils.genColumnNameWithExprId(attr) + }.toArray + + val hashJoinBuilder = HashJoinBuilder.create(runtime) + + // Build the hash table + hashTableData = hashJoinBuilder + .nativeBuild( + broadcastContext.buildHashTableId, + batchArray.toArray, + joinKeys, + broadcastContext.filterBuildColumns, + broadcastContext.filterPropagatesNulls, + broadcastContext.substraitJoinType.ordinal(), + broadcastContext.hasMixedFiltCondition, + broadcastContext.isExistenceJoin, + SubstraitUtil.toNameStruct(newOutput).toByteArray, + broadcastContext.isNullAwareAntiJoin, + broadcastContext.bloomFilterPushdownSize, + buildThreads + ) + + jniWrapper.close(serializeHandle) + + // Update build hash table time metric + val elapsedTime = System.nanoTime() - startTime + broadcastContext.buildHashTableTimeMetric.foreach(_ += elapsedTime / 1000000) + + (hashTableData, this) + } else { + (HashJoinBuilder.cloneHashTable(hashTableData), null) + } + } + def reset(): Unit = synchronized { hashTableData = 0 } diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala index fbc329f36060..9d3a627bbb0a 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/unsafe/UnsafeColumnarBuildSideRelation.scala @@ -93,7 +93,7 @@ class UnsafeColumnarBuildSideRelation( private var batches: Seq[UnsafeByteArray], private var safeBroadcastMode: SafeBroadcastMode, private var newBuildKeys: Seq[Expression], - private var offload: Boolean, + var offload: Boolean, private var buildThreads: Int) extends BuildSideRelation with Externalizable @@ -208,6 +208,92 @@ class UnsafeColumnarBuildSideRelation( } } + /** + * Build hash table with provided runtime (for driver-side build). This version doesn't rely on + * TaskContext and can be called from the driver. + */ + def buildHashTableWithRuntime( + broadcastContext: BroadcastHashJoinContext, + runtime: org.apache.gluten.runtime.Runtime): (Long, BuildSideRelation) = + synchronized { + if (hashTableData == 0) { + val startTime = System.nanoTime() + val jniWrapper = ColumnarBatchSerializerJniWrapper.create(runtime) + val serializeHandle: Long = { + val allocator = ArrowBufferAllocators.globalInstance() + val cSchema = ArrowSchema.allocateNew(allocator) + val arrowSchema = SparkArrowUtil.toArrowSchema( + SparkShimLoader.getSparkShims.structFromAttributes(output), + SQLConf.get.sessionLocalTimeZone) + ArrowAbiUtil.exportSchema(allocator, arrowSchema, cSchema) + val handle = jniWrapper + .init(cSchema.memoryAddress()) + cSchema.close() + handle + } + + val batchArray = new ArrayBuffer[Long] + + var batchId = 0 + while (batchId < batches.size) { + val (offset, length) = (batches(batchId).address(), batches(batchId).size()) + batchArray.append(jniWrapper.deserializeDirect(serializeHandle, offset, length.toInt)) + batchId += 1 + } + + logDebug( + s"BHJ value size: " + + s"${broadcastContext.buildHashTableId} = ${batches.size}") + + val (keys, newOutput) = if (newBuildKeys.isEmpty) { + ( + broadcastContext.buildSideJoinKeys.asJava, + broadcastContext.buildSideStructure.asJava + ) + } else { + ( + newBuildKeys.asJava, + output.asJava + ) + } + + val joinKeys = keys.asScala.map { + key => + val attr = ConverterUtils.getAttrFromExpr(key) + ConverterUtils.genColumnNameWithExprId(attr) + }.toArray + + val hashJoinBuilder = HashJoinBuilder.create(runtime) + + // Build the hash table + hashTableData = hashJoinBuilder + .nativeBuild( + broadcastContext.buildHashTableId, + batchArray.toArray, + joinKeys, + broadcastContext.filterBuildColumns, + broadcastContext.filterPropagatesNulls, + broadcastContext.substraitJoinType.ordinal(), + broadcastContext.hasMixedFiltCondition, + broadcastContext.isExistenceJoin, + SubstraitUtil.toNameStruct(newOutput).toByteArray, + broadcastContext.isNullAwareAntiJoin, + broadcastContext.bloomFilterPushdownSize, + buildThreads + ) + + jniWrapper.close(serializeHandle) + + // Update build hash table time metric + val elapsedTime = System.nanoTime() - startTime + broadcastContext.buildHashTableTimeMetric.foreach(_ += elapsedTime / 1000000) + + (hashTableData, this) + } else { + (HashJoinBuilder.cloneHashTable(hashTableData), null) + } + } + def reset(): Unit = synchronized { hashTableData = 0 } diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/DynamicOffHeapSizingSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/DynamicOffHeapSizingSuite.scala index ddd76f917db9..523aae65c511 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/DynamicOffHeapSizingSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/DynamicOffHeapSizingSuite.scala @@ -35,6 +35,7 @@ class DynamicOffHeapSizingSuite extends VeloxWholeStageTransformerSuite { .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") .set("spark.executor.memory", "2GB") .set("spark.memory.offHeap.enabled", "false") + .set("spark.gluten.sql.columnar.backend.velox.driverSideBroadcastHashTableBuild", "false") .set( "spark.gluten.velox.buildHashTableOncePerExecutor.enabled", "false" diff --git a/cpp/velox/CMakeLists.txt b/cpp/velox/CMakeLists.txt index b9d3e65afe0b..bc34688ae602 100644 --- a/cpp/velox/CMakeLists.txt +++ b/cpp/velox/CMakeLists.txt @@ -170,6 +170,7 @@ set(VELOX_SRCS operators/functions/SparkExprToSubfieldFilterParser.cc operators/plannodes/RowVectorStream.cc operators/hashjoin/HashTableBuilder.cc + operators/hashjoin/HashTableSerializer.cc operators/reader/FileReaderIterator.cc operators/reader/ParquetReaderIterator.cc operators/serializer/VeloxColumnarBatchSerializer.cc diff --git a/cpp/velox/jni/JniHashTable.cc b/cpp/velox/jni/JniHashTable.cc index 11873471575b..5424e20cb482 100644 --- a/cpp/velox/jni/JniHashTable.cc +++ b/cpp/velox/jni/JniHashTable.cc @@ -163,4 +163,162 @@ long getJoin(const std::string& hashTableId) { return JniHashTableContext::getInstance().callJavaGet(hashTableId); } +std::shared_ptr serializeHashTable( + std::shared_ptr builder) { + VELOX_CHECK_NOT_NULL(builder, "Hash table builder cannot be null"); + + auto hashTable = builder->hashTable(); + VELOX_CHECK_NOT_NULL(hashTable, "Hash table cannot be null"); + + // Serialize the hash table + // We need to handle both ignoreNullKeys=true and ignoreNullKeys=false cases + // Try to cast to HashTable first (most common case) + auto serialized = std::make_shared(); + + facebook::velox::exec::BaseHashTable* baseTable = nullptr; + auto* hashTableFalse = dynamic_cast*>(hashTable.get()); + if (hashTableFalse != nullptr) { + *serialized = HashTableSerializer::serialize(hashTableFalse); + serialized->ignoreNullKeys = false; + baseTable = hashTableFalse; + } else { + // Try HashTable + auto* hashTableTrue = dynamic_cast*>(hashTable.get()); + VELOX_CHECK_NOT_NULL(hashTableTrue, "Hash table must be either HashTable or HashTable"); + *serialized = HashTableSerializer::serialize(hashTableTrue); + serialized->ignoreNullKeys = true; + baseTable = hashTableTrue; + } + + // Save the joinHasNullKeys flag from the builder + serialized->joinHasNullKeys = builder->joinHasNullKeys(); + + // Calculate bloom filter blocks byte size + serialized->bloomFilterBlocksByteSize = 0; + if (baseTable != nullptr) { + for (const auto& hasher : baseTable->hashers()) { + const auto& bloomFilter = hasher->getBloomFilter(); + if (bloomFilter != nullptr) { + auto* bfFilter = dynamic_cast(bloomFilter.get()); + if (bfFilter != nullptr) { + serialized->bloomFilterBlocksByteSize += bfFilter->blocksByteSize(); + } + } + } + } + + return serialized; +} + +std::shared_ptr +deserializeHashTable(const uint8_t* data, size_t size, facebook::velox::memory::MemoryPool* memoryPool) { + VELOX_CHECK_NOT_NULL(data, "Serialized data cannot be null"); + VELOX_CHECK_GT(size, 0, "Invalid data size"); + + // Use default leaf memory pool if none provided + // This ensures proper shared_ptr management + auto pool = memoryPool ? memoryPool->addLeafChild("deserializeHashTable") : defaultLeafVeloxMemoryPool(); + + // For deserialization, we need to know the ignoreNullKeys parameter + // This should be stored in the serialized data format + // For now, we'll try to deserialize with ignoreNullKeys=false (most common) + // TODO: Add metadata to serialized format to store this information + std::unique_ptr hashTable; + + try { + hashTable = HashTableSerializer::deserialize(data, size, pool.get()); + } catch (...) { + // If that fails, try with ignoreNullKeys=true + hashTable = HashTableSerializer::deserialize(data, size, pool.get()); + } + + // Create a new builder and set the deserialized hash table + // Note: This is a simplified version. The actual implementation needs to + // reconstruct the builder with proper parameters + + // For now, we'll create a minimal builder + // TODO: Store and restore all builder parameters during serialization + std::vector> emptyKeys; + std::vector emptyChannels; + + // Create a RowType from the hash table's key types. + auto keyTypes = hashTable->rows()->keyTypes(); + std::vector names; + for (size_t i = 0; i < keyTypes.size(); ++i) { + names.push_back("key" + std::to_string(i)); + } + auto rowType = facebook::velox::ROW(std::move(names), std::move(keyTypes)); + + auto builder = std::make_shared( + facebook::velox::core::JoinType::kInner, + false, // nullAware + false, // withFilter + -1, // bloomFilterPushdownSize + emptyKeys, + emptyChannels, + false, // filterPropagatesNulls + rowType, + pool.get(), + 1000, // minTableRowsForParallelJoinBuild + 1000000, // joinBuildVectorHasherMaxNumDistinct + 100000, // abandonHashBuildDedupMinRows + 0 // abandonHashBuildDedupMinPct + ); + + builder->setHashTable(std::move(hashTable)); + + return builder; +} + +std::shared_ptr deserializeHashTable( + const uint8_t* data, + size_t size, + facebook::velox::memory::MemoryPool* memoryPool, + bool ignoreNullKeys, + bool joinHasNullKeys) { + VELOX_CHECK_NOT_NULL(data, "Serialized data cannot be null"); + VELOX_CHECK_GT(size, 0, "Invalid data size"); + + auto pool = memoryPool ? memoryPool->addLeafChild("deserializeHashTable") : defaultLeafVeloxMemoryPool(); + + std::unique_ptr hashTable; + if (ignoreNullKeys) { + auto derived = HashTableSerializer::deserialize(data, size, pool.get()); + hashTable = std::move(derived); + } else { + auto derived = HashTableSerializer::deserialize(data, size, pool.get()); + hashTable = std::move(derived); + } + + std::vector> emptyKeys; + std::vector emptyChannels; + + auto keyTypes = hashTable->rows()->keyTypes(); + std::vector names; + for (size_t i = 0; i < keyTypes.size(); ++i) { + names.push_back("key" + std::to_string(i)); + } + auto rowType = facebook::velox::ROW(std::move(names), std::move(keyTypes)); + + auto builder = std::make_shared( + facebook::velox::core::JoinType::kInner, + false, + false, + -1, + emptyKeys, + emptyChannels, + false, + rowType, + pool.get(), + 1000, + 1000000, + 100000, + 0); + + builder->setHashTable(std::move(hashTable)); + // Restore the joinHasNullKeys flag + builder->setJoinHasNullKeys(joinHasNullKeys); + return builder; +} + } // namespace gluten diff --git a/cpp/velox/jni/JniHashTable.h b/cpp/velox/jni/JniHashTable.h index 47f89d179968..694b179712e7 100644 --- a/cpp/velox/jni/JniHashTable.h +++ b/cpp/velox/jni/JniHashTable.h @@ -21,6 +21,7 @@ #include "memory/ColumnarBatch.h" #include "memory/VeloxMemoryManager.h" #include "operators/hashjoin/HashTableBuilder.h" +#include "operators/hashjoin/HashTableSerializer.h" #include "utils/ObjectStore.h" #include "velox/exec/HashTable.h" @@ -91,6 +92,21 @@ std::shared_ptr nativeHashTableBuild( long getJoin(const std::string& hashTableId); +// Serialize hash table for broadcasting +std::shared_ptr serializeHashTable(std::shared_ptr builder); + +// Deserialize hash table from broadcast data +std::shared_ptr +deserializeHashTable(const uint8_t* data, size_t size, facebook::velox::memory::MemoryPool* memoryPool); + +// Deserialize hash table from broadcast data with explicit ignoreNullKeys parameter +std::shared_ptr deserializeHashTable( + const uint8_t* data, + size_t size, + facebook::velox::memory::MemoryPool* memoryPool, + bool ignoreNullKeys, + bool joinHasNullKeys = false); + // Initialize the JNI hash table context inline void initVeloxJniHashTable(JNIEnv* env, JavaVM* javaVm) { JniHashTableContext::getInstance().initialize(env, javaVm); diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc index e30413d6d357..8f7b2fcc9e8a 100644 --- a/cpp/velox/jni/VeloxJniWrapper.cc +++ b/cpp/velox/jni/VeloxJniWrapper.cc @@ -1027,6 +1027,11 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_native } if (numThreads == 1) { + // Use default global pool for driver-side build + // The hash table will be serialized and broadcast, so it doesn't need runtime's pool + // Using runtime pool causes lifecycle management issues + auto memoryPool = defaultLeafVeloxMemoryPool(); + auto builder = nativeHashTableBuild( hashJoinKeys, filterColumns, @@ -1043,7 +1048,7 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_native abandonHashBuildDedupMinRows, abandonHashBuildDedupMinPct, cb, - defaultLeafVeloxMemoryPool()); + memoryPool); auto mainTable = builder->uniqueTable(); mainTable->prepareJoinTable( @@ -1077,6 +1082,10 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_native threadBatches.push_back(cb[i]); } + // Use default global pool for driver-side build + // The hash table will be serialized and broadcast, so it doesn't need runtime's pool + auto threadMemoryPool = defaultLeafVeloxMemoryPool(); + auto builder = nativeHashTableBuild( hashJoinKeys, filterColumns, @@ -1093,7 +1102,7 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_native abandonHashBuildDedupMinRows, abandonHashBuildDedupMinPct, threadBatches, - defaultLeafVeloxMemoryPool()); + threadMemoryPool); hashTableBuilders[t] = std::move(builder); otherTables[t] = std::move(hashTableBuilders[t]->uniqueTable()); @@ -1155,6 +1164,142 @@ JNIEXPORT void JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_clearHa ObjectStore::release(tableHandler); JNI_METHOD_END() } + +JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_serializeHashTable( // NOLINT + JNIEnv* env, + jclass, + jlong hashTableHandle) { + JNI_METHOD_START + auto builder = ObjectStore::retrieve(hashTableHandle); + auto serialized = gluten::serializeHashTable(builder); + return gluten::getHashTableObjStore()->save(serialized); + JNI_METHOD_END(kInvalidObjectHandle) +} + +JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_deserializeHashTable( // NOLINT + JNIEnv* env, + jclass, + jbyteArray serializedData) { + JNI_METHOD_START + + // Get byte array data + jsize dataSize = env->GetArrayLength(serializedData); + jbyte* dataPtr = env->GetByteArrayElements(serializedData, nullptr); + + if (dataPtr == nullptr) { + throw gluten::GlutenException("Failed to get serialized data"); + } + + // Deserialize using default leaf memory pool + auto builder = gluten::deserializeHashTable( + reinterpret_cast(dataPtr), + static_cast(dataSize), + nullptr); // nullptr will trigger use of defaultLeafVeloxMemoryPool() + + // Release byte array + env->ReleaseByteArrayElements(serializedData, dataPtr, JNI_ABORT); + + return gluten::getHashTableObjStore()->save(builder); + JNI_METHOD_END(kInvalidObjectHandle) +} + +JNIEXPORT jlong JNICALL +Java_org_apache_gluten_vectorized_HashJoinBuilder_deserializeHashTableWithIgnoreNullKeys( // NOLINT + JNIEnv* env, + jclass, + jbyteArray serializedData, + jboolean ignoreNullKeys, + jboolean joinHasNullKeys) { + JNI_METHOD_START + + jsize dataSize = env->GetArrayLength(serializedData); + jbyte* dataPtr = env->GetByteArrayElements(serializedData, nullptr); + + if (dataPtr == nullptr) { + throw gluten::GlutenException("Failed to get serialized data"); + } + + auto builder = gluten::deserializeHashTable( + reinterpret_cast(dataPtr), + static_cast(dataSize), + nullptr, + static_cast(ignoreNullKeys), + static_cast(joinHasNullKeys)); + + env->ReleaseByteArrayElements(serializedData, dataPtr, JNI_ABORT); + + return gluten::getHashTableObjStore()->save(builder); + JNI_METHOD_END(kInvalidObjectHandle) +} + +JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_getSerializedSize( // NOLINT + JNIEnv* env, + jclass, + jlong serializedHandle) { + JNI_METHOD_START + auto serialized = ObjectStore::retrieve(serializedHandle); + return static_cast(serialized->size); + JNI_METHOD_END(0) +} + +JNIEXPORT jboolean JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_getSerializedIgnoreNullKeys( // NOLINT + JNIEnv* env, + jclass, + jlong serializedHandle) { + JNI_METHOD_START + auto serialized = ObjectStore::retrieve(serializedHandle); + return static_cast(serialized->ignoreNullKeys); + JNI_METHOD_END(false) +} + +JNIEXPORT jboolean JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_getSerializedJoinHasNullKeys( // NOLINT + JNIEnv* env, + jclass, + jlong serializedHandle) { + JNI_METHOD_START + auto serialized = ObjectStore::retrieve(serializedHandle); + return static_cast(serialized->joinHasNullKeys); + JNI_METHOD_END(false) +} + +JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_getBloomFilterBlocksByteSize( // NOLINT + JNIEnv* env, + jclass, + jlong serializedHandle) { + JNI_METHOD_START + auto serialized = ObjectStore::retrieve(serializedHandle); + return static_cast(serialized->bloomFilterBlocksByteSize); + JNI_METHOD_END(0L) +} + +JNIEXPORT jbyteArray JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_getSerializedData( // NOLINT + JNIEnv* env, + jclass, + jlong serializedHandle) { + JNI_METHOD_START + auto serialized = ObjectStore::retrieve(serializedHandle); + + jbyteArray result = env->NewByteArray(static_cast(serialized->size)); + if (result == nullptr) { + throw gluten::GlutenException("Failed to allocate byte array"); + } + + env->SetByteArrayRegion( + result, 0, static_cast(serialized->size), reinterpret_cast(serialized->data.get())); + + return result; + JNI_METHOD_END(nullptr) +} + +JNIEXPORT void JNICALL Java_org_apache_gluten_vectorized_HashJoinBuilder_releaseSerializedData( // NOLINT + JNIEnv* env, + jclass, + jlong serializedHandle) { + JNI_METHOD_START + ObjectStore::release(serializedHandle); + JNI_METHOD_END() +} + #ifdef __cplusplus } #endif diff --git a/cpp/velox/operators/hashjoin/HashTableBuilder.cc b/cpp/velox/operators/hashjoin/HashTableBuilder.cc index 363edea4eccc..50db8f2c7fde 100644 --- a/cpp/velox/operators/hashjoin/HashTableBuilder.cc +++ b/cpp/velox/operators/hashjoin/HashTableBuilder.cc @@ -181,7 +181,7 @@ bool HashTableBuilder::abandonHashBuildDedupEarly(int64_t numDistinct) const { void HashTableBuilder::abandonHashBuildDedup() { abandonHashBuildDedup_ = true; - uniqueTable_->setAllowDuplicates(true); + // uniqueTable_->setAllowDuplicates(true); lookup_.reset(); } diff --git a/cpp/velox/operators/hashjoin/HashTableSerializer.cc b/cpp/velox/operators/hashjoin/HashTableSerializer.cc new file mode 100644 index 000000000000..2543a3e4d039 --- /dev/null +++ b/cpp/velox/operators/hashjoin/HashTableSerializer.cc @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "operators/hashjoin/HashTableSerializer.h" +#include +#include +#include "velox/common/base/Exceptions.h" + +namespace gluten { + +template +HashTableSerializer::SerializedHashTable HashTableSerializer::serialize( + const facebook::velox::exec::HashTable* hashTable) { + VELOX_CHECK_NOT_NULL(hashTable, "Hash table cannot be null"); + + std::ostringstream oss(std::ios::binary); + + hashTable->serialize(oss); + + SerializedHashTable result; + std::string str = oss.str(); + result.size = str.size(); + result.data = std::make_unique(result.size); + std::memcpy(result.data.get(), str.data(), result.size); + + return result; +} + +template +std::unique_ptr> +HashTableSerializer::deserialize(const uint8_t* data, size_t size, facebook::velox::memory::MemoryPool* pool) { + VELOX_CHECK_NOT_NULL(data, "Serialized data cannot be null"); + VELOX_CHECK_GT(size, 0, "Invalid serialized data size"); + VELOX_CHECK_NOT_NULL(pool, "Memory pool cannot be null"); + + std::string str(reinterpret_cast(data), size); + std::istringstream iss(str, std::ios::binary); + + return facebook::velox::exec::HashTable::deserialize(iss, pool); +} + +template HashTableSerializer::SerializedHashTable HashTableSerializer::serialize( + const facebook::velox::exec::HashTable*); + +template HashTableSerializer::SerializedHashTable HashTableSerializer::serialize( + const facebook::velox::exec::HashTable*); + +template std::unique_ptr> +HashTableSerializer::deserialize(const uint8_t*, size_t, facebook::velox::memory::MemoryPool*); + +template std::unique_ptr> +HashTableSerializer::deserialize(const uint8_t*, size_t, facebook::velox::memory::MemoryPool*); + +} // namespace gluten diff --git a/cpp/velox/operators/hashjoin/HashTableSerializer.h b/cpp/velox/operators/hashjoin/HashTableSerializer.h new file mode 100644 index 000000000000..60325e2ddb16 --- /dev/null +++ b/cpp/velox/operators/hashjoin/HashTableSerializer.h @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include "velox/exec/HashTable.h" + +namespace gluten { + +/** + * HashTableSerializer provides serialization and deserialization for Velox hash tables. + * This is a thin wrapper around the HashTable's native serialize/deserialize methods + * from IBM Velox's verified implementation. + */ +class HashTableSerializer { + public: + /** + * Serialized hash table data structure. + * Contains the serialized bytes that can be transmitted or stored. + */ + struct SerializedHashTable { + std::unique_ptr data; // Serialized data buffer + size_t size; // Total size in bytes + bool ignoreNullKeys; // ignoreNullKeys used when building the hash table + bool joinHasNullKeys; // Whether the build side has null keys (for null-aware anti join) + int64_t bloomFilterBlocksByteSize; // Total size of bloom filter blocks in bytes + + SerializedHashTable() : size(0), ignoreNullKeys(false), joinHasNullKeys(false), bloomFilterBlocksByteSize(0) {} + }; + + /** + * Serialize a hash table to a contiguous memory buffer. + * Directly uses HashTable's serialize() method from IBM Velox. + * + * @param hashTable The hash table to serialize (must be a join build table) + * @return Serialized hash table data + */ + template + static SerializedHashTable serialize(const facebook::velox::exec::HashTable* hashTable); + + /** + * Deserialize a hash table from a memory buffer. + * Directly uses HashTable's deserialize() method from IBM Velox. + * + * @param data Pointer to serialized data + * @param size Size of serialized data + * @param pool Memory pool for allocations + * @return Deserialized hash table + */ + template + static std::unique_ptr> + deserialize(const uint8_t* data, size_t size, facebook::velox::memory::MemoryPool* pool); +}; + +} // namespace gluten diff --git a/docs/velox-configuration.md b/docs/velox-configuration.md index fd714033c07f..88f0a7bea98d 100644 --- a/docs/velox-configuration.md +++ b/docs/velox-configuration.md @@ -26,6 +26,7 @@ nav_order: 16 | spark.gluten.sql.columnar.backend.velox.cudf.memoryResource | async | GPU RMM memory resource. | | spark.gluten.sql.columnar.backend.velox.cudf.shuffleMaxPrefetchBytes | 1028MB | Maximum bytes to prefetch in CPU memory during GPU shuffle read while waiting for GPU available. | | spark.gluten.sql.columnar.backend.velox.directorySizeGuess | 32KB | Deprecated, rename to spark.gluten.sql.columnar.backend.velox.footerEstimatedSize | +| spark.gluten.sql.columnar.backend.velox.driverSideBroadcastHashTableBuild | true | Enable driver-side broadcast hash table build. When enabled, the hash table is built and serialized on the driver, then broadcast to executors. When disabled, each executor builds its own hash table from the broadcast data. Note: This feature may have issues with complex queries involving Semi/Anti-Join, sorting, or complex filter conditions. Consider disabling if you encounter incorrect results in such queries. | | spark.gluten.sql.columnar.backend.velox.enableTimestampNtzValidation | true | Enable validation fallback for TimestampNTZ type. When true (default), any plan containing TimestampNTZ will fall back to Spark execution. Set to false during development/testing of TimestampNTZ support to allow native execution. | | spark.gluten.sql.columnar.backend.velox.fileHandleCacheEnabled | false | Disables caching if false. File handle cache should be disabled if files are mutable, i.e. file content may change while file path stays the same. | | spark.gluten.sql.columnar.backend.velox.filePreloadThreshold | 1MB | Set the file preload threshold for velox file scan, refer to Velox's file-preload-threshold | diff --git a/ep/build-velox/src/get-velox.sh b/ep/build-velox/src/get-velox.sh index de1b8c426555..4df117f17940 100755 --- a/ep/build-velox/src/get-velox.sh +++ b/ep/build-velox/src/get-velox.sh @@ -17,9 +17,9 @@ set -exu CURRENT_DIR=$(cd "$(dirname "$BASH_SOURCE")"; pwd) -VELOX_REPO=https://github.com/IBM/velox.git -VELOX_BRANCH=dft-2026_04_30 -VELOX_ENHANCED_BRANCH=ibm-2026_04_30 +VELOX_REPO=https://github.com/JkSelf/velox.git +VELOX_BRANCH=dft-2026_04_23-serialize-hashtable +VELOX_ENHANCED_BRANCH=ibm-2026_04_23 VELOX_HOME="" RUN_SETUP_SCRIPT=ON ENABLE_ENHANCED_FEATURES=OFF diff --git a/gluten-arrow/src/main/java/org/apache/gluten/memory/listener/ReservationListeners.java b/gluten-arrow/src/main/java/org/apache/gluten/memory/listener/ReservationListeners.java index 070533c5e4b7..6640eb539718 100644 --- a/gluten-arrow/src/main/java/org/apache/gluten/memory/listener/ReservationListeners.java +++ b/gluten-arrow/src/main/java/org/apache/gluten/memory/listener/ReservationListeners.java @@ -64,7 +64,7 @@ private static ReservationListener create0( return new ManagedReservationListener(target, TaskResources.getSharedUsage(), tmm); } - private static ManagedReservationListener noop() { + public static ManagedReservationListener noop() { return new ManagedReservationListener( new NoopMemoryTarget(), new SimpleMemoryUsageRecorder(), new Object()); } diff --git a/gluten-arrow/src/main/scala/org/apache/gluten/memory/NativeMemoryManager.scala b/gluten-arrow/src/main/scala/org/apache/gluten/memory/NativeMemoryManager.scala index 159e1bba5ecf..794ba75b76cb 100644 --- a/gluten-arrow/src/main/scala/org/apache/gluten/memory/NativeMemoryManager.scala +++ b/gluten-arrow/src/main/scala/org/apache/gluten/memory/NativeMemoryManager.scala @@ -121,4 +121,67 @@ object NativeMemoryManager { def apply(backendName: String, name: String): NativeMemoryManager = { TaskResources.addAnonymousResource(new Impl(backendName, name)) } + + final private class StandaloneImpl(backendName: String, name: String) + extends NativeMemoryManager + with AutoCloseable { + private val LOGGER = LoggerFactory.getLogger(classOf[NativeMemoryManager]) + private val rl = ReservationListeners.noop() + private val handle = NativeMemoryManagerJniWrapper.create( + backendName, + rl, + ConfigUtil.serialize( + GlutenConfig + .getNativeSessionConf(backendName, GlutenConfigUtil.parseConfig(SQLConf.get.getAllConfs)) + .asJava) + ) + + private def collectUsage() = { + MemoryUsageStats.parseFrom(NativeMemoryManagerJniWrapper.collectUsage(handle)) + } + + private val released: AtomicBoolean = new AtomicBoolean(false) + + override def addSpiller(spiller: Spiller): Unit = {} + override def hold(): Unit = NativeMemoryManagerJniWrapper.hold(handle) + override def getHandle(): Long = handle + + override def close(): Unit = { + if (!released.compareAndSet(false, true)) { + throw new GlutenException(s"Memory manager instance already released: $handle, $name") + } + + def dump(): String = { + SparkMemoryUtil.prettyPrintStats( + s"[$name]", + new KnownNameAndStats() { + override def name: String = StandaloneImpl.this.name + override def stats: MemoryUsageStats = collectUsage() + }) + } + + if (LOGGER.isDebugEnabled) { + LOGGER.debug("About to release memory manager, " + dump()) + } + + NativeMemoryManagerJniWrapper.release(handle) + + if (rl.getUsedBytes != 0) { + LOGGER.warn( + String.format( + "%s Reservation listener %s still reserved non-zero bytes, which may cause memory" + + " leak, size: %s.", + name, + rl.toString, + SparkMemoryUtil.bytesToString(rl.getUsedBytes) + )) + } + } + } + + def createStandalone( + backendName: String, + name: String): NativeMemoryManager with AutoCloseable = { + new StandaloneImpl(backendName, name) + } } diff --git a/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala b/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala index e57bec619d0e..985ff92c5fc7 100644 --- a/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala +++ b/gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtime.scala @@ -80,4 +80,49 @@ object Runtime { override def resourceName(): String = s"runtime" } + + final private class StandaloneRuntimeImpl( + backendName: String, + name: String, + extraConf: util.Map[String, String]) + extends Runtime + with AutoCloseable { + + private val nmm: NativeMemoryManager with AutoCloseable = + NativeMemoryManager.createStandalone(backendName, name) + private val handle = RuntimeJniWrapper.createRuntime( + backendName, + nmm.getHandle(), + ConfigUtil.serialize( + (GlutenConfig + .getNativeSessionConf( + backendName, + GlutenConfigUtil.parseConfig(SQLConf.get.getAllConfs)) ++ extraConf.asScala).asJava) + ) + + private val released: AtomicBoolean = new AtomicBoolean(false) + + override def getHandle(): Long = handle + + override def memoryManager(): NativeMemoryManager = nmm + + override def close(): Unit = { + if (!released.compareAndSet(false, true)) { + throw new GlutenException(s"Runtime instance already released: $handle, $name") + } + RuntimeJniWrapper.releaseRuntime(handle) + nmm.close() + } + } + + def createStandalone(backendName: String, name: String): Runtime with AutoCloseable = { + new StandaloneRuntimeImpl(backendName, name, new util.HashMap[String, String]()) + } + + def createStandalone( + backendName: String, + name: String, + extraConf: util.Map[String, String]): Runtime with AutoCloseable = { + new StandaloneRuntimeImpl(backendName, name, extraConf) + } }