From 50f4fce01387f9a074f70fc0f33388af0b4bb44d Mon Sep 17 00:00:00 2001 From: jinhyukify Date: Sat, 6 Jun 2026 18:45:20 +0900 Subject: [PATCH 1/2] HBASE-30159 Make hash algorithm configurable for HashTable and SyncTable --- .../hadoop/hbase/mapreduce/HashTable.java | 45 ++++- .../hadoop/hbase/mapreduce/SyncTable.java | 2 +- .../hadoop/hbase/mapreduce/TestHashTable.java | 171 ++++++++++++++++++ .../hadoop/hbase/mapreduce/TestSyncTable.java | 31 ++++ 4 files changed, 239 insertions(+), 10 deletions(-) diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java index ccaf55e5025b..e49d649c4eea 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java @@ -58,7 +58,6 @@ import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.base.Charsets; -import org.apache.hbase.thirdparty.com.google.common.base.Throwables; import org.apache.hbase.thirdparty.com.google.common.collect.Ordering; @InterfaceAudience.Private @@ -68,7 +67,14 @@ public class HashTable extends Configured implements Tool { private static final int DEFAULT_BATCH_SIZE = 8000; + /** + * Default hash algorithm. Kept as MD5 so that manifests produced by older versions, which did not + * record an algorithm, remain readable. + */ + static final String DEFAULT_HASH_ALGORITHM = "MD5"; + private final static String HASH_BATCH_SIZE_CONF_KEY = "hash.batch.size"; + final static String HASH_ALGORITHM_CONF_KEY = "hash.algorithm"; final static String PARTITIONS_FILE_NAME = "partitions"; final static String MANIFEST_FILE_NAME = "manifest"; final static String HASH_DATA_DIR = "hashes"; @@ -99,6 +105,7 @@ public static class TableHash { long endTime = 0; boolean ignoreTimestamps; boolean rawScan; + String hashAlgorithm = DEFAULT_HASH_ALGORITHM; List partitions; @@ -138,6 +145,7 @@ void writePropertiesFile(FileSystem fs, Path path) throws IOException { p.setProperty("endTimestamp", Long.toString(endTime)); } p.setProperty("rawScan", Boolean.toString(rawScan)); + p.setProperty("hashAlgorithm", hashAlgorithm); try (OutputStreamWriter osw = new OutputStreamWriter(fs.create(path), Charsets.UTF_8)) { p.store(osw, null); @@ -189,6 +197,8 @@ void readPropertiesFile(FileSystem fs, Path path) throws IOException { if (endTimeString != null) { endTime = Long.parseLong(endTimeString); } + + hashAlgorithm = p.getProperty("hashAlgorithm", DEFAULT_HASH_ALGORITHM); } Scan initScan() throws IOException { @@ -316,6 +326,7 @@ public String toString() { sb.append(", versions=").append(versions); } sb.append(", rawScan=").append(rawScan); + sb.append(", hashAlgorithm=").append(hashAlgorithm); if (startTime != 0) { sb.append("startTime=").append(startTime); } @@ -448,6 +459,7 @@ public Job createSubmittableJob(String[] args) throws IOException { Configuration jobConf = job.getConfiguration(); jobConf.setLong(HASH_BATCH_SIZE_CONF_KEY, tableHash.batchSize); jobConf.setBoolean(IGNORE_TIMESTAMPS, tableHash.ignoreTimestamps); + jobConf.set(HASH_ALGORITHM_CONF_KEY, tableHash.hashAlgorithm); job.setJarByClass(HashTable.class); TableMapReduceUtil.initTableMapperJob(tableHash.tableName, tableHash.initScan(), @@ -487,11 +499,11 @@ static class ResultHasher { private long batchSize = 0; boolean ignoreTimestamps; - public ResultHasher() { + public ResultHasher(String algorithm) { try { - digest = MessageDigest.getInstance("MD5"); + digest = MessageDigest.getInstance(algorithm); } catch (NoSuchAlgorithmException e) { - Throwables.propagate(e); + throw new IllegalArgumentException("Unsupported hash algorithm: " + algorithm, e); } } @@ -566,10 +578,10 @@ public static class HashMapper @Override protected void setup(Context context) throws IOException, InterruptedException { - targetBatchSize = - context.getConfiguration().getLong(HASH_BATCH_SIZE_CONF_KEY, DEFAULT_BATCH_SIZE); - hasher = new ResultHasher(); - hasher.ignoreTimestamps = context.getConfiguration().getBoolean(IGNORE_TIMESTAMPS, false); + Configuration conf = context.getConfiguration(); + targetBatchSize = conf.getLong(HASH_BATCH_SIZE_CONF_KEY, DEFAULT_BATCH_SIZE); + hasher = new ResultHasher(conf.get(HASH_ALGORITHM_CONF_KEY, DEFAULT_HASH_ALGORITHM)); + hasher.ignoreTimestamps = conf.getBoolean(IGNORE_TIMESTAMPS, false); TableSplit split = (TableSplit) context.getInputSplit(); hasher.startBatch(new ImmutableBytesWritable(split.getStartRow())); } @@ -640,6 +652,8 @@ private static void printUsage(final String errorMsg) { System.err.println(" families comma-separated list of families to include"); System.err.println(" ignoreTimestamps if true, ignores cell timestamps"); System.err.println(" when calculating hashes"); + System.err.println(" hashAlgorithm MessageDigest algorithm to use for batch hashes"); + System.err.println(" (defaults to " + DEFAULT_HASH_ALGORITHM + ")"); System.err.println(); System.err.println("Args:"); System.err.println(" tablename Name of the table to hash"); @@ -665,7 +679,7 @@ private boolean doCommandLine(final String[] args) { for (int i = 0; i < args.length - NUM_ARGS; i++) { String cmd = args[i]; - if (cmd.equals("-h") || cmd.startsWith("--h")) { + if (cmd.equals("-h") || cmd.equals("--help")) { printUsage(null); return false; } @@ -737,6 +751,12 @@ private boolean doCommandLine(final String[] args) { continue; } + final String hashAlgorithmKey = "--hashAlgorithm="; + if (cmd.startsWith(hashAlgorithmKey)) { + tableHash.hashAlgorithm = cmd.substring(hashAlgorithmKey.length()); + continue; + } + printUsage("Invalid argument '" + cmd + "'"); return false; } @@ -749,6 +769,13 @@ private boolean doCommandLine(final String[] args) { return false; } + try { + MessageDigest.getInstance(tableHash.hashAlgorithm); + } catch (NoSuchAlgorithmException e) { + printUsage("Unsupported hash algorithm: " + tableHash.hashAlgorithm); + return false; + } + } catch (Exception e) { LOG.error("Failed to parse commandLine arguments", e); printUsage("Can't start because " + e.getMessage()); diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java index 3b083b33dbdf..5598f422d87e 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java @@ -270,7 +270,7 @@ protected void setup(Context context) throws IOException { // create a hasher, but don't start it right away // instead, find the first hash batch at or after the start row // and skip any rows that come before. they will be caught by the previous task - targetHasher = new HashTable.ResultHasher(); + targetHasher = new HashTable.ResultHasher(sourceTableHash.hashAlgorithm); targetHasher.ignoreTimestamps = ignoreTimestamp; } diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHashTable.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHashTable.java index ec4d3ce3f02a..c89ad61cb2ac 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHashTable.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHashTable.java @@ -20,8 +20,10 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.fail; +import java.io.OutputStream; import java.util.HashMap; import java.util.Map; +import java.util.Properties; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -181,4 +183,173 @@ ImmutableMap. builder() TEST_UTIL.deleteTable(tableName); TEST_UTIL.cleanupDataTestDirOnTestFS(); } + + @Test + public void testHashTableWithSha256(TestInfo testInfo) throws Exception { + final TableName tableName = TableName.valueOf(testInfo.getTestMethod().get().getName()); + final byte[] family = Bytes.toBytes("family"); + final byte[] column1 = Bytes.toBytes("c1"); + final byte[] column2 = Bytes.toBytes("c2"); + final byte[] column3 = Bytes.toBytes("c3"); + + int numRows = 100; + int numRegions = 10; + int numHashFiles = 3; + + byte[][] splitRows = new byte[numRegions - 1][]; + for (int i = 1; i < numRegions; i++) { + splitRows[i - 1] = Bytes.toBytes(numRows * i / numRegions); + } + + long timestamp = 1430764183454L; + Table t1 = TEST_UTIL.createTable(tableName, family, splitRows); + for (int i = 0; i < numRows; i++) { + Put p = new Put(Bytes.toBytes(i), timestamp); + p.addColumn(family, column1, column1); + p.addColumn(family, column2, column2); + p.addColumn(family, column3, column3); + t1.put(p); + } + t1.close(); + + HashTable hashTable = new HashTable(TEST_UTIL.getConfiguration()); + Path testDir = TEST_UTIL.getDataTestDirOnTestFS(tableName.getNameAsString()); + + long batchSize = 300; + int code = hashTable.run( + new String[] { "--batchsize=" + batchSize, "--numhashfiles=" + numHashFiles, "--scanbatch=2", + "--hashAlgorithm=SHA-256", tableName.getNameAsString(), testDir.toString() }); + assertEquals(0, code, "test job failed"); + + FileSystem fs = TEST_UTIL.getTestFileSystem(); + HashTable.TableHash tableHash = HashTable.TableHash.read(fs.getConf(), testDir); + assertEquals("SHA-256", tableHash.hashAlgorithm, + "manifest must record the algorithm used to produce the digests"); + + ImmutableMap expectedHashes = + ImmutableMap. builder() + .put(-1, + new ImmutableBytesWritable( + Bytes.fromHex("e1452041ec73beb9b5677c0b74ed73a9118ca502d9b2b9abe62ba18d92fc51be"))) + .put(5, + new ImmutableBytesWritable( + Bytes.fromHex("6c55999354be6571a8912b7781d37359e88153173cb5fcf143211682d7ac06c5"))) + .put(10, + new ImmutableBytesWritable( + Bytes.fromHex("44e9c6aedd838e9a9c78d1983ce5139fda3ef3b0b8a3812893512e7c6f05c51f"))) + .put(15, + new ImmutableBytesWritable( + Bytes.fromHex("57f18d831a22057155fb9cfb9bf4706a1c2cf134fc1e92262c8748ca545b58a1"))) + .put(20, + new ImmutableBytesWritable( + Bytes.fromHex("83119dfb3deec8901f69cccac31c1039c624870de0cff4b85946147359966d42"))) + .put(25, + new ImmutableBytesWritable( + Bytes.fromHex("933eabcc837b7e7b24be2b553b1ec50fb00e95cbf3d8f898f59f5f8bbace0a60"))) + .put(30, + new ImmutableBytesWritable( + Bytes.fromHex("b6a3752581d74f362f64b59d56a96ad52763b3245dd5bfc85f6fe9261f2d03f1"))) + .put(35, + new ImmutableBytesWritable( + Bytes.fromHex("d3784bac940584dbc0754eff73bc39cce4f9c4aec87939747fff4b0ecc6a0617"))) + .put(40, + new ImmutableBytesWritable( + Bytes.fromHex("87f4b810b751abd64e9c22cb7b40b5ce600965e4b8eda2c0eae075d5623088c2"))) + .put(45, + new ImmutableBytesWritable( + Bytes.fromHex("ce1f422fcdbe0f926e10b68cb3ead497066560235a1341d29151a9e1847deaab"))) + .put(50, + new ImmutableBytesWritable( + Bytes.fromHex("118c771b1eeabe8523f1ad96fb5bf16537d76e0b3855d84c3dbac864de726229"))) + .put(55, + new ImmutableBytesWritable( + Bytes.fromHex("00dfe840a275aca3de9268ea61699881a441d47fea93071bca69c39bf7845dac"))) + .put(60, + new ImmutableBytesWritable( + Bytes.fromHex("062239ede0306fd9046eb5a3a2f66d997b37c8c1a4defc35789644e66930fff1"))) + .put(65, + new ImmutableBytesWritable( + Bytes.fromHex("09a63a94681e75edf975f9b46fe94f1e592840a627cac728a77728b7f9f695aa"))) + .put(70, + new ImmutableBytesWritable( + Bytes.fromHex("e634097804d269cbaeef49ce7a009a1388e6f636700badcab05fe20759f6043f"))) + .put(75, + new ImmutableBytesWritable( + Bytes.fromHex("69f614ccc16a9c651538681525be1b2e40859c9833a55d9009d77ef39abaffcd"))) + .put(80, + new ImmutableBytesWritable( + Bytes.fromHex("6530b957c8064fc043620bee89647960de0d27a0f986b40f183f5347093a12d2"))) + .put(85, + new ImmutableBytesWritable( + Bytes.fromHex("403ed0417cd8ab955cbd4c8fe84218cd152b95da9237300050e9b7c90c809faf"))) + .put(90, + new ImmutableBytesWritable( + Bytes.fromHex("e27fb9193ae3363fec70a148e62df7c57d514dd7de74a6a332fbda002af67efb"))) + .put(95, + new ImmutableBytesWritable( + Bytes.fromHex("a31cb9d55e37f17c773a6eee757f15d6d7fe52d77cd1037fcb7ee00ed2bef6c9"))) + .build(); + + Map actualHashes = new HashMap<>(); + Path dataDir = new Path(testDir, HashTable.HASH_DATA_DIR); + for (int i = 0; i < numHashFiles; i++) { + Path hashPath = new Path(dataDir, HashTable.TableHash.getDataFileName(i)); + try (MapFile.Reader reader = new MapFile.Reader(hashPath, fs.getConf())) { + ImmutableBytesWritable key = new ImmutableBytesWritable(); + ImmutableBytesWritable hash = new ImmutableBytesWritable(); + while (reader.next(key, hash)) { + int intKey = -1; + if (key.getLength() > 0) { + intKey = Bytes.toInt(key.get(), key.getOffset(), key.getLength()); + } + if (actualHashes.containsKey(intKey)) { + fail("duplicate key in data files: " + intKey); + } + actualHashes.put(intKey, new ImmutableBytesWritable(hash.copyBytes())); + } + } + } + + if (!expectedHashes.equals(actualHashes)) { + LOG.error("Diff: " + Maps.difference(expectedHashes, actualHashes)); + } + assertEquals(expectedHashes, actualHashes); + + TEST_UTIL.deleteTable(tableName); + TEST_UTIL.cleanupDataTestDirOnTestFS(); + } + + /** + * A manifest written by an older HashTable does not carry the hashAlgorithm property. Reading + * such a manifest must default to MD5 so existing on-disk hash data stays usable. + */ + @Test + public void testManifestWithoutAlgorithmDefaultsToMd5(TestInfo testInfo) throws Exception { + Path testDir = + TEST_UTIL.getDataTestDirOnTestFS(testInfo.getTestMethod().get().getName() + "_legacy"); + FileSystem fs = TEST_UTIL.getTestFileSystem(); + fs.mkdirs(testDir); + + // hand-craft a legacy manifest with no hashAlgorithm property + Properties p = new Properties(); + p.setProperty("table", "legacy"); + p.setProperty("targetBatchSize", "8000"); + p.setProperty("numHashFiles", "1"); + p.setProperty("rawScan", "false"); + Path manifest = new Path(testDir, HashTable.MANIFEST_FILE_NAME); + try (OutputStream out = fs.create(manifest)) { + p.store(out, null); + } + + // write an empty partitions file so TableHash.read() succeeds + HashTable.TableHash empty = new HashTable.TableHash(); + empty.partitions = new java.util.ArrayList<>(); + empty.writePartitionFile(fs.getConf(), new Path(testDir, HashTable.PARTITIONS_FILE_NAME)); + + HashTable.TableHash tableHash = HashTable.TableHash.read(fs.getConf(), testDir); + assertEquals("MD5", tableHash.hashAlgorithm, + "Manifests without an algorithm property must default to MD5 for back-compat"); + + TEST_UTIL.cleanupDataTestDirOnTestFS(); + } } diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSyncTable.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSyncTable.java index 7bd65da54c15..a85da23e842e 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSyncTable.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSyncTable.java @@ -181,6 +181,37 @@ public void testSyncTableDoPutsFalse(TestInfo testInfo) throws Exception { UTIL2.deleteTable(targetTableName); } + @Test + public void testSyncTableWithSha256(TestInfo testInfo) throws Exception { + final TableName sourceTableName = + TableName.valueOf(testInfo.getTestMethod().get().getName() + "_source"); + final TableName targetTableName = + TableName.valueOf(testInfo.getTestMethod().get().getName() + "_target"); + Path testDir = UTIL1.getDataTestDirOnTestFS(testInfo.getTestMethod().get().getName()); + + writeTestData(UTIL1, sourceTableName, UTIL1, targetTableName); + hashSourceTable(UTIL1, sourceTableName, testDir, "--hashAlgorithm=SHA-256"); + + HashTable.TableHash tableHash = + HashTable.TableHash.read(UTIL1.getTestFileSystem().getConf(), testDir); + assertEquals("SHA-256", tableHash.hashAlgorithm, + "manifest must carry the algorithm so SyncTable can match the source-side digest"); + + Counters syncCounters = + syncTables(UTIL1.getConfiguration(), sourceTableName, targetTableName, testDir); + assertEqualTables(90, UTIL1, sourceTableName, UTIL1, targetTableName, false); + + assertEquals(60, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue()); + assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue()); + assertEquals(10, syncCounters.findCounter(Counter.TARGETMISSINGROWS).getValue()); + assertEquals(50, syncCounters.findCounter(Counter.SOURCEMISSINGCELLS).getValue()); + assertEquals(50, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue()); + assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue()); + + UTIL1.deleteTable(sourceTableName); + UTIL1.deleteTable(targetTableName); + } + @Test public void testSyncTableIgnoreTimestampsTrue(TestInfo testInfo) throws Exception { final TableName sourceTableName = From e273feebebcbb8e4e48ca8eaf35e27d2ecedf6cb Mon Sep 17 00:00:00 2001 From: jinhyukify Date: Sun, 7 Jun 2026 13:44:44 +0900 Subject: [PATCH 2/2] HBASE-30159 List example algorithm names in HashTable usage --- .../main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java | 1 + .../java/org/apache/hadoop/hbase/mapreduce/TestHashTable.java | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java index e49d649c4eea..34696cd5066b 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HashTable.java @@ -653,6 +653,7 @@ private static void printUsage(final String errorMsg) { System.err.println(" ignoreTimestamps if true, ignores cell timestamps"); System.err.println(" when calculating hashes"); System.err.println(" hashAlgorithm MessageDigest algorithm to use for batch hashes"); + System.err.println(" examples: MD5, SHA-256, SHA-384, SHA-512"); System.err.println(" (defaults to " + DEFAULT_HASH_ALGORITHM + ")"); System.err.println(); System.err.println("Args:"); diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHashTable.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHashTable.java index c89ad61cb2ac..f793a8f72a00 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHashTable.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHashTable.java @@ -347,7 +347,7 @@ public void testManifestWithoutAlgorithmDefaultsToMd5(TestInfo testInfo) throws empty.writePartitionFile(fs.getConf(), new Path(testDir, HashTable.PARTITIONS_FILE_NAME)); HashTable.TableHash tableHash = HashTable.TableHash.read(fs.getConf(), testDir); - assertEquals("MD5", tableHash.hashAlgorithm, + assertEquals(HashTable.DEFAULT_HASH_ALGORITHM, tableHash.hashAlgorithm, "Manifests without an algorithm property must default to MD5 for back-compat"); TEST_UTIL.cleanupDataTestDirOnTestFS();