Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.base.Charsets;
import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
import org.apache.hbase.thirdparty.com.google.common.collect.Ordering;

@InterfaceAudience.Private
Expand All @@ -68,7 +67,14 @@ public class HashTable extends Configured implements Tool {

private static final int DEFAULT_BATCH_SIZE = 8000;

/**
* Default hash algorithm. Kept as MD5 so that manifests produced by older versions, which did not
* record an algorithm, remain readable.
*/
static final String DEFAULT_HASH_ALGORITHM = "MD5";

private final static String HASH_BATCH_SIZE_CONF_KEY = "hash.batch.size";
final static String HASH_ALGORITHM_CONF_KEY = "hash.algorithm";
final static String PARTITIONS_FILE_NAME = "partitions";
final static String MANIFEST_FILE_NAME = "manifest";
final static String HASH_DATA_DIR = "hashes";
Expand Down Expand Up @@ -99,6 +105,7 @@ public static class TableHash {
long endTime = 0;
boolean ignoreTimestamps;
boolean rawScan;
String hashAlgorithm = DEFAULT_HASH_ALGORITHM;

List<ImmutableBytesWritable> partitions;

Expand Down Expand Up @@ -138,6 +145,7 @@ void writePropertiesFile(FileSystem fs, Path path) throws IOException {
p.setProperty("endTimestamp", Long.toString(endTime));
}
p.setProperty("rawScan", Boolean.toString(rawScan));
p.setProperty("hashAlgorithm", hashAlgorithm);

try (OutputStreamWriter osw = new OutputStreamWriter(fs.create(path), Charsets.UTF_8)) {
p.store(osw, null);
Expand Down Expand Up @@ -189,6 +197,8 @@ void readPropertiesFile(FileSystem fs, Path path) throws IOException {
if (endTimeString != null) {
endTime = Long.parseLong(endTimeString);
}

hashAlgorithm = p.getProperty("hashAlgorithm", DEFAULT_HASH_ALGORITHM);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For old HashTable files, we fall back to the original MD5 algorithm.

}

Scan initScan() throws IOException {
Expand Down Expand Up @@ -316,6 +326,7 @@ public String toString() {
sb.append(", versions=").append(versions);
}
sb.append(", rawScan=").append(rawScan);
sb.append(", hashAlgorithm=").append(hashAlgorithm);
if (startTime != 0) {
sb.append("startTime=").append(startTime);
}
Expand Down Expand Up @@ -448,6 +459,7 @@ public Job createSubmittableJob(String[] args) throws IOException {
Configuration jobConf = job.getConfiguration();
jobConf.setLong(HASH_BATCH_SIZE_CONF_KEY, tableHash.batchSize);
jobConf.setBoolean(IGNORE_TIMESTAMPS, tableHash.ignoreTimestamps);
jobConf.set(HASH_ALGORITHM_CONF_KEY, tableHash.hashAlgorithm);
job.setJarByClass(HashTable.class);

TableMapReduceUtil.initTableMapperJob(tableHash.tableName, tableHash.initScan(),
Expand Down Expand Up @@ -487,11 +499,11 @@ static class ResultHasher {
private long batchSize = 0;
boolean ignoreTimestamps;

public ResultHasher() {
public ResultHasher(String algorithm) {
try {
digest = MessageDigest.getInstance("MD5");
digest = MessageDigest.getInstance(algorithm);
} catch (NoSuchAlgorithmException e) {
Throwables.propagate(e);
throw new IllegalArgumentException("Unsupported hash algorithm: " + algorithm, e);
}
}

Expand Down Expand Up @@ -566,10 +578,10 @@ public static class HashMapper

@Override
protected void setup(Context context) throws IOException, InterruptedException {
targetBatchSize =
context.getConfiguration().getLong(HASH_BATCH_SIZE_CONF_KEY, DEFAULT_BATCH_SIZE);
hasher = new ResultHasher();
hasher.ignoreTimestamps = context.getConfiguration().getBoolean(IGNORE_TIMESTAMPS, false);
Configuration conf = context.getConfiguration();
targetBatchSize = conf.getLong(HASH_BATCH_SIZE_CONF_KEY, DEFAULT_BATCH_SIZE);
hasher = new ResultHasher(conf.get(HASH_ALGORITHM_CONF_KEY, DEFAULT_HASH_ALGORITHM));
hasher.ignoreTimestamps = conf.getBoolean(IGNORE_TIMESTAMPS, false);
TableSplit split = (TableSplit) context.getInputSplit();
hasher.startBatch(new ImmutableBytesWritable(split.getStartRow()));
}
Expand Down Expand Up @@ -640,6 +652,9 @@ private static void printUsage(final String errorMsg) {
System.err.println(" families comma-separated list of families to include");
System.err.println(" ignoreTimestamps if true, ignores cell timestamps");
System.err.println(" when calculating hashes");
System.err.println(" hashAlgorithm MessageDigest algorithm to use for batch hashes");
System.err.println(" examples: MD5, SHA-256, SHA-384, SHA-512");
System.err.println(" (defaults to " + DEFAULT_HASH_ALGORITHM + ")");
Comment thread
junegunn marked this conversation as resolved.
System.err.println();
System.err.println("Args:");
System.err.println(" tablename Name of the table to hash");
Expand All @@ -665,7 +680,7 @@ private boolean doCommandLine(final String[] args) {

for (int i = 0; i < args.length - NUM_ARGS; i++) {
String cmd = args[i];
if (cmd.equals("-h") || cmd.startsWith("--h")) {
if (cmd.equals("-h") || cmd.equals("--help")) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without this change, we cannot use options that start with --h including --hashAlgorithm

printUsage(null);
return false;
}
Expand Down Expand Up @@ -737,6 +752,12 @@ private boolean doCommandLine(final String[] args) {
continue;
}

final String hashAlgorithmKey = "--hashAlgorithm=";
if (cmd.startsWith(hashAlgorithmKey)) {
tableHash.hashAlgorithm = cmd.substring(hashAlgorithmKey.length());
continue;
}

printUsage("Invalid argument '" + cmd + "'");
return false;
}
Expand All @@ -749,6 +770,13 @@ private boolean doCommandLine(final String[] args) {
return false;
}

try {
MessageDigest.getInstance(tableHash.hashAlgorithm);
} catch (NoSuchAlgorithmException e) {
printUsage("Unsupported hash algorithm: " + tableHash.hashAlgorithm);
return false;
}

} catch (Exception e) {
LOG.error("Failed to parse commandLine arguments", e);
printUsage("Can't start because " + e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ protected void setup(Context context) throws IOException {
// create a hasher, but don't start it right away
// instead, find the first hash batch at or after the start row
// and skip any rows that come before. they will be caught by the previous task
targetHasher = new HashTable.ResultHasher();
targetHasher = new HashTable.ResultHasher(sourceTableHash.hashAlgorithm);
targetHasher.ignoreTimestamps = ignoreTimestamp;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;

import java.io.OutputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -181,4 +183,173 @@ ImmutableMap.<Integer, ImmutableBytesWritable> builder()
TEST_UTIL.deleteTable(tableName);
TEST_UTIL.cleanupDataTestDirOnTestFS();
}

@Test
public void testHashTableWithSha256(TestInfo testInfo) throws Exception {
final TableName tableName = TableName.valueOf(testInfo.getTestMethod().get().getName());
final byte[] family = Bytes.toBytes("family");
final byte[] column1 = Bytes.toBytes("c1");
final byte[] column2 = Bytes.toBytes("c2");
final byte[] column3 = Bytes.toBytes("c3");

int numRows = 100;
int numRegions = 10;
int numHashFiles = 3;

byte[][] splitRows = new byte[numRegions - 1][];
for (int i = 1; i < numRegions; i++) {
splitRows[i - 1] = Bytes.toBytes(numRows * i / numRegions);
}

long timestamp = 1430764183454L;
Table t1 = TEST_UTIL.createTable(tableName, family, splitRows);
for (int i = 0; i < numRows; i++) {
Put p = new Put(Bytes.toBytes(i), timestamp);
p.addColumn(family, column1, column1);
p.addColumn(family, column2, column2);
p.addColumn(family, column3, column3);
t1.put(p);
}
t1.close();

HashTable hashTable = new HashTable(TEST_UTIL.getConfiguration());
Path testDir = TEST_UTIL.getDataTestDirOnTestFS(tableName.getNameAsString());

long batchSize = 300;
int code = hashTable.run(
new String[] { "--batchsize=" + batchSize, "--numhashfiles=" + numHashFiles, "--scanbatch=2",
"--hashAlgorithm=SHA-256", tableName.getNameAsString(), testDir.toString() });
assertEquals(0, code, "test job failed");

FileSystem fs = TEST_UTIL.getTestFileSystem();
HashTable.TableHash tableHash = HashTable.TableHash.read(fs.getConf(), testDir);
assertEquals("SHA-256", tableHash.hashAlgorithm,
"manifest must record the algorithm used to produce the digests");

ImmutableMap<Integer, ImmutableBytesWritable> expectedHashes =
ImmutableMap.<Integer, ImmutableBytesWritable> builder()
Comment on lines +229 to +230
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.put(-1,
new ImmutableBytesWritable(
Bytes.fromHex("e1452041ec73beb9b5677c0b74ed73a9118ca502d9b2b9abe62ba18d92fc51be")))
.put(5,
new ImmutableBytesWritable(
Bytes.fromHex("6c55999354be6571a8912b7781d37359e88153173cb5fcf143211682d7ac06c5")))
.put(10,
new ImmutableBytesWritable(
Bytes.fromHex("44e9c6aedd838e9a9c78d1983ce5139fda3ef3b0b8a3812893512e7c6f05c51f")))
.put(15,
new ImmutableBytesWritable(
Bytes.fromHex("57f18d831a22057155fb9cfb9bf4706a1c2cf134fc1e92262c8748ca545b58a1")))
.put(20,
new ImmutableBytesWritable(
Bytes.fromHex("83119dfb3deec8901f69cccac31c1039c624870de0cff4b85946147359966d42")))
.put(25,
new ImmutableBytesWritable(
Bytes.fromHex("933eabcc837b7e7b24be2b553b1ec50fb00e95cbf3d8f898f59f5f8bbace0a60")))
.put(30,
new ImmutableBytesWritable(
Bytes.fromHex("b6a3752581d74f362f64b59d56a96ad52763b3245dd5bfc85f6fe9261f2d03f1")))
.put(35,
new ImmutableBytesWritable(
Bytes.fromHex("d3784bac940584dbc0754eff73bc39cce4f9c4aec87939747fff4b0ecc6a0617")))
.put(40,
new ImmutableBytesWritable(
Bytes.fromHex("87f4b810b751abd64e9c22cb7b40b5ce600965e4b8eda2c0eae075d5623088c2")))
.put(45,
new ImmutableBytesWritable(
Bytes.fromHex("ce1f422fcdbe0f926e10b68cb3ead497066560235a1341d29151a9e1847deaab")))
.put(50,
new ImmutableBytesWritable(
Bytes.fromHex("118c771b1eeabe8523f1ad96fb5bf16537d76e0b3855d84c3dbac864de726229")))
.put(55,
new ImmutableBytesWritable(
Bytes.fromHex("00dfe840a275aca3de9268ea61699881a441d47fea93071bca69c39bf7845dac")))
.put(60,
new ImmutableBytesWritable(
Bytes.fromHex("062239ede0306fd9046eb5a3a2f66d997b37c8c1a4defc35789644e66930fff1")))
.put(65,
new ImmutableBytesWritable(
Bytes.fromHex("09a63a94681e75edf975f9b46fe94f1e592840a627cac728a77728b7f9f695aa")))
.put(70,
new ImmutableBytesWritable(
Bytes.fromHex("e634097804d269cbaeef49ce7a009a1388e6f636700badcab05fe20759f6043f")))
.put(75,
new ImmutableBytesWritable(
Bytes.fromHex("69f614ccc16a9c651538681525be1b2e40859c9833a55d9009d77ef39abaffcd")))
.put(80,
new ImmutableBytesWritable(
Bytes.fromHex("6530b957c8064fc043620bee89647960de0d27a0f986b40f183f5347093a12d2")))
.put(85,
new ImmutableBytesWritable(
Bytes.fromHex("403ed0417cd8ab955cbd4c8fe84218cd152b95da9237300050e9b7c90c809faf")))
.put(90,
new ImmutableBytesWritable(
Bytes.fromHex("e27fb9193ae3363fec70a148e62df7c57d514dd7de74a6a332fbda002af67efb")))
.put(95,
new ImmutableBytesWritable(
Bytes.fromHex("a31cb9d55e37f17c773a6eee757f15d6d7fe52d77cd1037fcb7ee00ed2bef6c9")))
.build();

Map<Integer, ImmutableBytesWritable> actualHashes = new HashMap<>();
Path dataDir = new Path(testDir, HashTable.HASH_DATA_DIR);
for (int i = 0; i < numHashFiles; i++) {
Path hashPath = new Path(dataDir, HashTable.TableHash.getDataFileName(i));
try (MapFile.Reader reader = new MapFile.Reader(hashPath, fs.getConf())) {
ImmutableBytesWritable key = new ImmutableBytesWritable();
ImmutableBytesWritable hash = new ImmutableBytesWritable();
while (reader.next(key, hash)) {
int intKey = -1;
if (key.getLength() > 0) {
intKey = Bytes.toInt(key.get(), key.getOffset(), key.getLength());
}
if (actualHashes.containsKey(intKey)) {
fail("duplicate key in data files: " + intKey);
}
actualHashes.put(intKey, new ImmutableBytesWritable(hash.copyBytes()));
}
}
}

if (!expectedHashes.equals(actualHashes)) {
LOG.error("Diff: " + Maps.difference(expectedHashes, actualHashes));
}
assertEquals(expectedHashes, actualHashes);

TEST_UTIL.deleteTable(tableName);
TEST_UTIL.cleanupDataTestDirOnTestFS();
}

/**
* A manifest written by an older HashTable does not carry the hashAlgorithm property. Reading
* such a manifest must default to MD5 so existing on-disk hash data stays usable.
*/
@Test
public void testManifestWithoutAlgorithmDefaultsToMd5(TestInfo testInfo) throws Exception {
Path testDir =
TEST_UTIL.getDataTestDirOnTestFS(testInfo.getTestMethod().get().getName() + "_legacy");
FileSystem fs = TEST_UTIL.getTestFileSystem();
fs.mkdirs(testDir);

// hand-craft a legacy manifest with no hashAlgorithm property
Properties p = new Properties();
p.setProperty("table", "legacy");
p.setProperty("targetBatchSize", "8000");
p.setProperty("numHashFiles", "1");
p.setProperty("rawScan", "false");
Path manifest = new Path(testDir, HashTable.MANIFEST_FILE_NAME);
try (OutputStream out = fs.create(manifest)) {
p.store(out, null);
}

// write an empty partitions file so TableHash.read() succeeds
HashTable.TableHash empty = new HashTable.TableHash();
empty.partitions = new java.util.ArrayList<>();
empty.writePartitionFile(fs.getConf(), new Path(testDir, HashTable.PARTITIONS_FILE_NAME));

HashTable.TableHash tableHash = HashTable.TableHash.read(fs.getConf(), testDir);
assertEquals(HashTable.DEFAULT_HASH_ALGORITHM, tableHash.hashAlgorithm,
"Manifests without an algorithm property must default to MD5 for back-compat");

TEST_UTIL.cleanupDataTestDirOnTestFS();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,37 @@ public void testSyncTableDoPutsFalse(TestInfo testInfo) throws Exception {
UTIL2.deleteTable(targetTableName);
}

@Test
public void testSyncTableWithSha256(TestInfo testInfo) throws Exception {
final TableName sourceTableName =
TableName.valueOf(testInfo.getTestMethod().get().getName() + "_source");
final TableName targetTableName =
TableName.valueOf(testInfo.getTestMethod().get().getName() + "_target");
Path testDir = UTIL1.getDataTestDirOnTestFS(testInfo.getTestMethod().get().getName());

writeTestData(UTIL1, sourceTableName, UTIL1, targetTableName);
hashSourceTable(UTIL1, sourceTableName, testDir, "--hashAlgorithm=SHA-256");

HashTable.TableHash tableHash =
HashTable.TableHash.read(UTIL1.getTestFileSystem().getConf(), testDir);
assertEquals("SHA-256", tableHash.hashAlgorithm,
"manifest must carry the algorithm so SyncTable can match the source-side digest");

Counters syncCounters =
syncTables(UTIL1.getConfiguration(), sourceTableName, targetTableName, testDir);
assertEqualTables(90, UTIL1, sourceTableName, UTIL1, targetTableName, false);

assertEquals(60, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue());
assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue());
assertEquals(10, syncCounters.findCounter(Counter.TARGETMISSINGROWS).getValue());
assertEquals(50, syncCounters.findCounter(Counter.SOURCEMISSINGCELLS).getValue());
assertEquals(50, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue());
assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue());

UTIL1.deleteTable(sourceTableName);
UTIL1.deleteTable(targetTableName);
}

@Test
public void testSyncTableIgnoreTimestampsTrue(TestInfo testInfo) throws Exception {
final TableName sourceTableName =
Expand Down