phoenix建表时选择salt模式,在region发生合并后,数据查询可能出现异常。
可以通过如下方式进行修复:
重新灌入数据
恢复表创建时候的分区
拷贝如下代码到目标phoenix客户端所在机器:
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_FAMILY;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_NAME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_SIZE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TYPE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ORDINAL_POSITION;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SALT_BUCKETS;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SORT_ORDER;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE_NAME;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.phoenix.expression.function.ExternalSqlTypeIdFunction;
import org.apache.phoenix.expression.function.SqlTypeNameFunction;
import org.apache.phoenix.jdbc.PhoenixDriver;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PColumnImpl;
import org.apache.phoenix.schema.PName;
import org.apache.phoenix.schema.PNameFactory;
import org.apache.phoenix.schema.SaltingUtil;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.util.SchemaUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Created by youhl002@lianlianpay.com
* date: 2021/12/1
* comment:
*/
public class PhoenixRestoreMerge implements Closeable {
private static Logger logger = LoggerFactory.getLogger(PhoenixRestoreMerge.class);
private final Options options = new Options();
private String quorum;
private String url;
private java.sql.Connection phoenixConn;
private org.apache.hadoop.hbase.client.Connection hbaseConn;
private FileSystem fs;
private List<String> scanTables = new ArrayList<>();
private List<String> actions = new ArrayList<>();
private static final String ACTION_SCAN = "scan";
private static final String ACTION_RESTORE = "restore";
private static final List<String> ACTIONS = Arrays.asList(ACTION_SCAN, ACTION_RESTORE);
private Map<String, PhoenixTable> enableTables = new LinkedHashMap<>();
private int needRestore = 0;
private long storageSize = 1024L * 1024 * 1024 * 1;
private boolean checkSplitLength = false;
private StringBuilder records = new StringBuilder();
private boolean checkHBaseScan = false;
private static class PhoenixTable {
final String fullName;
final String schema;
final String name;
final TableName hbaseTableName;
String targetHex;
String currentHex;
String tag;
Byte startRowKey;
Byte endRowKey;
int rowKeyDataCount;
Integer saltBuckets;
LinkedHashSet<PColumn> pkColumns = new LinkedHashSet<>();
ContentSummary contentSummary;
public PhoenixTable(String fullName) {
this.fullName = fullName;
this.hbaseTableName = TableName.valueOf(fullName);
String[] ss = fullName.split("\\.");
if (ss.length == 1) {
schema = "";
name = fullName;
} else {
schema = ss[0];
name = IntStream.range(1, ss.length).mapToObj(i -> ss[i]).collect(Collectors.joining("."));
}
}
// byte saltByte = SaltingUtil.getSaltingByte(indexRowKey, SaltingUtil.NUM_SALTING_BYTES, length-SaltingUtil.NUM_SALTING_BYTES, nIndexSaltBuckets);
private byte[][] generateSaltSplitKeys() throws SQLException, IllegalArgumentIOException {
// -- 50 个分区 \x01\x00\x00\x00\x00,\x02\x00\x00\x00\x00, ... \x49\x00\x00\x00\x00
// byte[][] splitPoints = SaltingUtil.getSalteByteSplitPoints(nSaltBuckets);
byte[][] splits = SaltingUtil.getSalteByteSplitPoints(saltBuckets);
if (pkColumns.isEmpty()) {
throw new IllegalArgumentIOException(fullName + "找不到对应的主键");
}
logger.info(String.format("【%s】主键 => %s", fullName, pkColumns.stream().map(s -> s.getName())
.collect(Collectors.toList())));
splits = SchemaUtil.processSplits(splits, pkColumns, saltBuckets, false);
return splits;
}
public void updateContentSummary(FileSystem fs) throws IOException {
Path tableDir = new Path("/apps/hbase/data/data/default/" + fullName);
this.contentSummary = fs.getContentSummary(tableDir);
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("表名: ").append(fullName);
sb.append(" ,主键: ").append(pkColumns.stream().map(s -> s.getName())
.collect(Collectors.toList()));
sb.append(" ,盐分区: ").append(saltBuckets);
if (startRowKey != null) {
int rowKeyRange = getDataRange();
sb.append(" ,数据分区: ").append(rowKeyRange);
sb.append(" ,数据分布: ").append(bytes2Hex(new byte[]{startRowKey})).append("~")
.append(bytes2Hex(new byte[]{endRowKey}));
sb.append(" ,数据覆盖率: ").append(String.format("%d/%d=%.1f%%",
rowKeyDataCount, rowKeyRange, rowKeyDataCount * 100d / rowKeyRange
));
} else {
sb.append(" ,数据分区: NULL");
}
sb.append(" ,存储: ").append(humanSize(contentSummary.getLength()));
if (currentHex != null) {
sb.append("\n当前分裂点: ").append(currentHex);
}
if (targetHex != null) {
sb.append("\n目标分裂点: ").append(targetHex);
}
return sb.toString();
}
public int getDataRange() {
return ((int) endRowKey - (int) startRowKey) + 1;
}
}
public PhoenixRestoreMerge() {
options.addOption(new Option("h", "help", false, "帮助"));
options.addOption(new Option("zq", "zookeeper-quorum", true, "phoenix,hbase连接地址"));
options.addOption(new Option("pt", "phoenix-table", true, "phoenix表名"));
options.addOption(new Option("ss", "storage-size", true, "表对应的hdfs文件大小" + storageSize));
options.addOption(new Option("csl", "check-split-length", false, "检测分隔点的长度"));
options.addOption(new Option("chs", "check-hbase-scan", false, "检测HBase数据分布"));
options.addOption(new Option("a", "action", true,
"动作(" + ACTIONS.stream().collect(Collectors.joining(",")) + ")"));
}
private static String humanSize(long size) {
if (size < 1024) {
return size + " B";
} else if (size < 1024l * 1024) {
return String.format("%.2f KB", size / 1024d);
} else if (size < 1024l * 1024 * 1024) {
return String.format("%.2f MB", size / (1024d * 1024));
} else if (size < 1024l * 1024 * 1024 * 1024) {
return String.format("%.2f GB", size / (1024d * 1024 * 1024));
}
return String.format("%.2f TB", size / (1024d * 1024 * 1024 * 1024));
}
public void run(String[] args)
throws SQLException, ClassNotFoundException, IOException, ParseException {
CommandLineParser parser = new PosixParser();
CommandLine cli = parser.parse(options, args);
if (cli.hasOption("help")) {
showHelp();
return;
}
this.quorum = cli.getOptionValue("zq");
if (StringUtils.isBlank(quorum)) {
throw new IllegalArgumentIOException("连接地址为空");
}
this.actions = Arrays.asList(ACTION_SCAN);
if (cli.hasOption("a")) {
this.actions = parseOptionValues(cli.getOptionValues("a"));
}
this.checkSplitLength = cli.hasOption("csl");
this.checkHBaseScan = cli.hasOption("chs");
logger.info("action => " + actions);
if (cli.hasOption("pt")) {
this.scanTables = parseOptionValues(cli.getOptionValues("pt")).stream()
.map(String::toUpperCase)
.collect(Collectors.toList());
}
this.url = String.format("jdbc:phoenix:%s", this.quorum);
init();
if (actions.contains(ACTION_SCAN)) {
scanTables();
}
if (this.scanTables.isEmpty()) {
throw new IllegalArgumentIOException("表名为空");
}
if (cli.hasOption("ss")) {
storageSize = Long.valueOf(cli.getOptionValue("ss"));
if (storageSize < 1024) {
throw new IllegalArgumentIOException("数据阈值过小 => " + storageSize);
}
}
logger.info("本次扫描" + scanTables.size() + "张表 => " + scanTables);
scanEnableTables();
if (this.enableTables.isEmpty()) {
throw new IllegalArgumentIOException("无有效表名");
}
// 设置 saltBucket 和 pkColumns
scanSaltBucket();
scanPKColumns();
int index = 0;
for (PhoenixTable pt : enableTables.values()) {
index++;
pt.tag = String.format("【%s:%3.2f%%】", pt.fullName, index * 100d / enableTables.size());
run(pt);
}
logger.info(
String.format("restore/enable/tables: %d/%d/%d, 恢复进度: %.2f%%"
, needRestore, enableTables.size(), scanTables.size(),
(enableTables.size() - needRestore) * 100d / enableTables.size()));
System.out.println(records);
}
private static PColumn createPColumn(ResultSet rs) throws SQLException {
PName columnName = PNameFactory.newName(rs.getString(COLUMN_NAME));
PName familyName = PNameFactory.newName(rs.getString(COLUMN_FAMILY));
String dataTypeStr = rs.getString(DATA_TYPE);
PDataType dataType = PDataType.fromTypeId(Integer.valueOf(dataTypeStr));
Integer maxLength = rs.getInt(COLUMN_SIZE);
Integer scale = null;
boolean nullable = true;
int position = rs.getInt(ORDINAL_POSITION);
SortOrder sortOrder = SortOrder.fromSystemValue(rs.getInt(SORT_ORDER));
Integer arrSize = null;
byte[] viewConstant = null;
boolean isViewReferenced = true;
String expressionStr = "";
boolean isRowTimestamp = true;
boolean isDynamic = true;
return new PColumnImpl(
columnName
, familyName
, dataType, maxLength, scale, nullable, position, sortOrder, arrSize, viewConstant,
isViewReferenced
, expressionStr
, isRowTimestamp, isDynamic
);
}
private void scanTables() throws SQLException {
ResultSet rs = phoenixConn.getMetaData().getTables(null, null, null, null);
List<String> res = new ArrayList<>();
while (rs.next()) {
String schema = rs.getString(TABLE_SCHEM);
String name = rs.getString(TABLE_NAME);
String fullName = buildFullName(schema, name);
if ("SYSTEM".equalsIgnoreCase(schema)) {
System.out.println("跳过系统表 => " + fullName);
continue;
}
if (scanTables.isEmpty()) {
res.add(fullName);
} else {
for (String table : scanTables) {
if (table.equalsIgnoreCase(fullName)) {
res.add(fullName);
break;
} else if (fullName.matches(table)) {
res.add(fullName);
}
}
}
}
this.scanTables = res;
}
private void scanEnableTables() throws IOException {
enableTables.clear();
for (String fullTableName : scanTables) {
TableName tableName = TableName.valueOf(fullTableName);
if (!hbaseConn.getAdmin().isTableEnabled(tableName)) {
logger.info("跳过修复,表" + fullTableName + "已被禁用");
return;
}
enableTables.put(fullTableName, new PhoenixTable(fullTableName));
}
}
private void scanSaltBucket() throws SQLException {
StringBuilder sb = new StringBuilder();
sb.append("select TABLE_SCHEM,TABLE_NAME,SALT_BUCKETS");
sb.append(" from system.catalog");
sb.append(" where SALT_BUCKETS is not null");
// 优化查询
if (enableTables.size() < 10) {
sb.append(" and (");
sb.append(enableTables.values().stream()
.map(v -> String.format("(TABLE_SCHEM = '%s' and TABLE_NAME='%s')", v.schema, v.name))
.collect(Collectors.joining(" or ")));
sb.append(")");
}
sb.append(" group by TABLE_SCHEM,TABLE_NAME,SALT_BUCKETS");
String query = sb.toString();
logger.info("查询盐分区数 => " + query);
Statement statement = phoenixConn.createStatement();
ResultSet rs = statement.executeQuery(query);
while (rs.next()) {
String schema = rs.getString(TABLE_SCHEM);
String name = rs.getString(TABLE_NAME);
PhoenixTable pt = enableTables.get(buildFullName(schema, name));
if (pt != null) {
pt.saltBuckets = rs.getInt(SALT_BUCKETS);
}
}
statement.close();
}
/**
* row_begin = 0
* row_end = 255
* (row_begin..row_end).map {|i| scan 'DBAML.VDL_SUS_BILL_BASE_INNERPAY',{STARTROW=> i.chr,STOPROW => (i+1).chr, COLUMNS => '0:_0' , LIMIT=> 1} }
*/
private void scanDataBucket(PhoenixTable pt) throws IOException {
Table table = hbaseConn.getTable(pt.hbaseTableName);
for (int i = 0; i <= 255; i++) {
Scan scan = new Scan();
byte currByte = (byte) i;
scan.setStartRow(new byte[]{currByte});
scan.setMaxResultSize(10);
scan.setBatch(10);
scan.setCaching(10);
ResultScanner scanner = table.getScanner(scan);
Result result = scanner.next();
if (result != null && !result.isEmpty()) {
if (currByte == result.getRow()[0]) {
if (pt.startRowKey == null) {
pt.startRowKey = currByte;
}
pt.endRowKey = currByte;
pt.rowKeyDataCount++;
}
}
scanner.close();
}
table.close();
}
private void scanPKColumns() throws SQLException {
StringBuilder sb = new StringBuilder("select");
sb.append(" TABLE_SCHEM");
sb.append(",TABLE_NAME");
sb.append(",COLUMN_NAME");
sb.append(",COLUMN_FAMILY");
sb.append("," + ExternalSqlTypeIdFunction.NAME + "(" + DATA_TYPE + ") AS " + DATA_TYPE);
sb.append("," + SqlTypeNameFunction.NAME + "(" + DATA_TYPE + ") AS " + TYPE_NAME);
sb.append(",COLUMN_SIZE");
sb.append(",ORDINAL_POSITION");
sb.append(",SORT_ORDER");
sb.append(" from SYSTEM.CATALOG");
sb.append(" where COLUMN_FAMILY is null and TABLE_TYPE is null");
// 优化查询
if (enableTables.size() < 10) {
sb.append(" and (");
sb.append(enableTables.values().stream()
.map(v -> String.format("(TABLE_SCHEM = '%s' and TABLE_NAME='%s')", v.schema, v.name))
.collect(Collectors.joining(" or ")));
sb.append(")");
}
sb.append(" order by ORDINAL_POSITION");
String query = sb.toString();
logger.info("查询主键 => " + query);
Statement statement = phoenixConn.createStatement();
ResultSet rs = statement.executeQuery(query);
while (rs.next()) {
String schema = rs.getString(TABLE_SCHEM);
String name = rs.getString(TABLE_NAME);
PhoenixTable pt = enableTables.get(buildFullName(schema, name));
if (pt != null) {
pt.pkColumns.add(createPColumn(rs));
}
}
statement.close();
}
private static List<String> parseOptionValues(String[] strs) {
return Arrays.stream(strs).flatMap(s -> Arrays.stream(s.split(",")))
.map(s -> s.trim())
.filter(s -> s.length() > 0).collect(Collectors.toList());
}
private void showHelp() {
HelpFormatter helpFormatter = new HelpFormatter();
helpFormatter.setLeftPadding(5);
helpFormatter.setWidth(80);
helpFormatter.printHelp("java -cp `hbase classpath` " + getClass().getName()
, options, true);
}
private void run(PhoenixTable pt) throws IOException, SQLException {
System.out.println(StringUtils.center("检测" + pt.tag, 100, "-"));
if ((pt.saltBuckets == null || pt.saltBuckets <= 0)) {
logger.info(pt.tag + "跳过修复(没有设置盐分区)");
return;
}
pt.updateContentSummary(fs);
logger.info(pt.tag + "表信息 => " + pt);
RegionLocator rl = hbaseConn.getRegionLocator(pt.hbaseTableName);
byte[][] saltSplitKeys = pt.generateSaltSplitKeys();
pt.targetHex = toHexString(Arrays.asList(saltSplitKeys));
pt.currentHex = toHexString(
Arrays.stream(rl.getStartKeys()).filter(s -> s.length > 0).collect(Collectors.toList()));
long tableStorageSize = pt.contentSummary.getLength();
logger.info("当前分裂点 => " + pt.targetHex);
logger.info("目标分裂点 => " + pt.currentHex);
logger.info("分区数: " + rl.getAllRegionLocations().size() + " => " + pt.saltBuckets);
logger.info(String.format("当前表大小 => %s", humanSize(tableStorageSize)));
if (checkSplitLength) {
if (saltSplitKeys.length > 0 && rl.getStartKeys().length > 1) {
double saltAvgSize = avgSize(saltSplitKeys, 0);
double currAvgSize = avgSize(rl.getStartKeys(), 1);
if (saltAvgSize != currAvgSize) {
logger.info(
pt.tag + "跳过修复(分裂点的长度检测失败) current:" + currAvgSize + "=> salt:" + saltAvgSize);
records.append(pt.tag + "跳过修复(分裂点的长度检测失败) => " + pt).append("\n");
return;
}
}
}
if (checkHBaseScan) {
logger.info(pt.tag + "检测HBase底层数据分布");
scanDataBucket(pt);
if (pt.rowKeyDataCount == 0) {
logger.info(pt.tag + "跳过修复(表数据为空)");
records.append(pt.tag + "跳过修复(表数据为空) => " + pt).append("\n");
return;
}
if (pt.startRowKey != 0x00) {
logger.info(pt.tag + "跳过修复(数据范围异常)");
records.append(pt.tag + "跳过修复(数据范围异常) => " + pt).append("\n");
return;
}
if (pt.getDataRange() != pt.saltBuckets) {
logger.info(pt.tag + "跳过修复(数据和盐分区不一致)");
records.append(pt.tag + "跳过修复(数据和盐分区不一致) => " + pt).append("\n");
return;
}
logger.info(pt.tag + "表信息 => " + pt);
}
if (pt.targetHex.equals(pt.currentHex)) {
logger.info(pt.tag + "跳过修复(表的分裂点已正常)");
return;
}
if (tableStorageSize > storageSize) {
logger.info(pt.tag + "跳过修复(表的hdfs存储数据过大)");
records.append(pt.tag + "跳过修复(表的hdfs存储数据过大) => " + pt).append("\n");
return;
}
needRestore++;
System.out.println(StringUtils.center("修复" + pt.tag, 100, "="));
if (actions.contains(ACTION_RESTORE)) {
logger.info(String.format("尝试修复 => %s, 表大小 => %.2f MB", pt.fullName,
tableStorageSize / 1024d / 1024d));
restore(pt, rl, saltSplitKeys);
} else {
records.append(pt.tag + "待修复 => " + pt).append("\n");
}
}
private double avgSize(byte[][] bytes, int skip) {
long sum = 0;
long count = 0;
for (int i = 0; i < bytes.length; i++) {
if (i >= skip) {
sum += bytes[i].length;
count++;
}
}
return Double.parseDouble(String.format("%.2f", sum * 1d / count));
}
private void restore(PhoenixTable pt, RegionLocator rl, byte[][] saltSplitKeys)
throws IOException {
int splitCount = 0;
int mergeCount = 0;
int regionCount = rl.getAllRegionLocations().size();
// 尝试分裂
for (HRegionLocation hrl : rl.getAllRegionLocations()) {
String regionName = hrl.getRegionInfo().getEncodedName();
byte[] startKey = hrl.getRegionInfo().getStartKey();
byte[] endKey = hrl.getRegionInfo().getEndKey();
byte[] midKey = findMidKey(saltSplitKeys, startKey, endKey);
if (midKey != null) {
System.out.println(String.format("split '%s',%s", regionName, toRuby(midKey)));
byte[] regionNameBytes = hrl.getRegionInfo().getEncodedNameAsBytes();
hbaseConn.getAdmin().splitRegion(regionNameBytes, midKey);
splitCount++;
}
}
// 尝试合并
for (int i = 0; i <= saltSplitKeys.length; i++) {
byte[] startKey = i == 0 ? new byte[0] : saltSplitKeys[i - 1];
byte[] endKey = i == saltSplitKeys.length ? new byte[0] : saltSplitKeys[i];
List<HRegionLocation> regions = findRegions(rl, startKey, endKey);
for (int j = 0; j < regions.size() / 2; j++) {
HRegionLocation left = regions.get(j * 2);
HRegionLocation right = regions.get(j * 2 + 1);
System.out.println(
String.format("merge_region '%s','%s'", left.getRegionInfo().getEncodedName()
, right.getRegionInfo().getEncodedName()));
hbaseConn.getAdmin().mergeRegions(
left.getRegionInfo().getEncodedNameAsBytes(),
right.getRegionInfo().getEncodedNameAsBytes(),
false
);
mergeCount++;
}
}
int finalRegionCount = (regionCount + splitCount - mergeCount);
if (splitCount > 0 || mergeCount > 0) {
System.out.println(String.format("major_compact '%s'", pt.fullName));
hbaseConn.getAdmin().compact(pt.hbaseTableName);
}
String tag = "[" + (saltSplitKeys.length == finalRegionCount ? "success" : "continue") + "]";
logger.info(tag + pt.fullName + " salt:" + saltSplitKeys.length
+ ",regions:" + regionCount + " => " + finalRegionCount + ",分裂了"
+ splitCount + "次, 合并了" + mergeCount + "次");
}
private String toRuby(byte[] midKey) {
StringBuilder sb = new StringBuilder();
sb.append("\"");
// "\x0b\x00\x00\x00"
for (int i = 0; i < midKey.length; i++) {
sb.append("\\x").append(bytes2Hex(new byte[]{midKey[i]}));
}
sb.append("\"");
return sb.toString();
}
private List<HRegionLocation> findRegions(RegionLocator rl, byte[] startKey, byte[] endKey)
throws IOException {
List<HRegionLocation> res = new ArrayList<>();
for (HRegionLocation hrl : rl.getAllRegionLocations()) {
byte[] regionStartKey = hrl.getRegionInfo().getStartKey();
byte[] regionEndKey = hrl.getRegionInfo().getEndKey();
if (greaterEquals(regionStartKey, startKey) && lessEquals(regionEndKey, endKey)) {
res.add(hrl);
}
}
return res;
}
private boolean greaterEquals(byte[] left, byte[] right) {
if (left.length == 0 && right.length == 0) {
return true;
} else if (left.length == 0) {
return false;
} else if (right.length == 0) {
return true;
}
return Bytes.compareTo(left, right) >= 0;
}
private boolean lessEquals(byte[] left, byte[] right) {
if (left.length == 0 && right.length == 0) {
return true;
} else if (left.length == 0) {
return false;
} else if (right.length == 0) {
return true;
}
return Bytes.compareTo(left, right) <= 0;
}
private byte[] findMidKey(byte[][] splitKeys, byte[] startKey, byte[] endKey) {
List<byte[]> maybe = new ArrayList<>();
for (int i = 0; i < splitKeys.length; i++) {
boolean startLessThanThis =
startKey.length == 0 || Bytes.compareTo(startKey, splitKeys[i]) < 0;
boolean endGreaterThanThis =
endKey.length == 0 || Bytes.compareTo(endKey, splitKeys[i]) > 0;
if (startLessThanThis && endGreaterThanThis) {
maybe.add(splitKeys[i]);
}
}
if (maybe.size() > 0) {
return maybe.get(maybe.size() / 2);
}
return null;
}
private static String bytes2Hex(byte[] bytes) {
return Hex.encodeHexString(bytes);
}
private static final LinkedHashSet<PColumn> EMPTY_SET = new LinkedHashSet<>();
private String toHexString(List<byte[]> splitKeys) {
return splitKeys.stream().map(PhoenixRestoreMerge::bytes2Hex).collect(Collectors.joining(","));
}
private static String buildFullName(String schema, String name) {
return StringUtils.isBlank(schema) ? name : (schema + "." + name);
}
private void showResultSet(ResultSet rs) throws SQLException {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < rs.getMetaData().getColumnCount(); i++) {
sb.append(i == 0 ? "" : ",").append(rs.getMetaData().getColumnName(i + 1)).append("=")
.append(rs.getObject(i + 1));
}
System.out.println(sb);
}
private void init() throws SQLException, ClassNotFoundException, IOException {
Class.forName(PhoenixDriver.class.getName());
logger.info("url => " + url);
this.phoenixConn = DriverManager.getConnection(url);
Configuration configuration = HBaseConfiguration.create();
String zookeeperQuorum = Arrays.stream(quorum.split(",")).map(s -> s.split(":")[0])
.collect(Collectors.joining(","));
configuration.set("hbase.zookeeper.quorum", zookeeperQuorum);
String clientPort = "2181";
if (quorum.contains(":")) {
clientPort = Arrays.stream(quorum.split(",")).map(s -> s.split(":"))
.filter(ss -> ss.length >= 2).map(ss -> ss[1].trim()).filter(s -> s.length() > 0)
.distinct()
.collect(Collectors.joining(","));
configuration.set("hbase.zookeeper.property.clientPort", clientPort);
}
logger.info("zookeeperQuorum => " + zookeeperQuorum + ", clientPort => " + clientPort);
User user = User.create(UserGroupInformation.createRemoteUser("hbase"));
this.hbaseConn = ConnectionFactory.createConnection(configuration, user);
Configuration conf = new Configuration();
addResource(conf, "/usr/hdp/current/hadoop-client/etc/hadoop");
this.fs = FileSystem.get(conf);
}
private void addResource(Configuration conf, String confDir) {
File dir = new File(confDir);
if (dir.isDirectory()) {
for (File f : dir.listFiles()) {
if (f.isFile() && f.getName().endsWith(".xml")) {
conf.addResource(f.getAbsolutePath());
logger.info("load " + f.getAbsolutePath());
}
}
}
}
@Override
public void close() throws IOException {
if (phoenixConn != null) {
try {
phoenixConn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if (hbaseConn != null) {
hbaseConn.close();
}
}
public static void main(String[] args) {
try (PhoenixRestoreMerge psm = new PhoenixRestoreMerge()) {
psm.run(args);
} catch (Exception e) {
logger.error("执行异常", e);
}
}
}
恢复分区
# 编译代码
javac -cp `hbase classpath` PhoenixRestoreMerge.java
# 执行恢复命令
java -cp `hbase classpath` PhoenixRestoreMerge -zq localhost -a scan -pt xx -a restore