phoenix建表时选择salt模式,在region发生合并后,数据查询可能出现异常。
可以通过如下方式进行修复:

重新灌入数据

恢复表创建时候的分区

拷贝如下代码到目标phoenix客户端所在机器:

import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_FAMILY;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_NAME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.COLUMN_SIZE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DATA_TYPE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ORDINAL_POSITION;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SALT_BUCKETS;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SORT_ORDER;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_NAME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SCHEM;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE_NAME;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.phoenix.expression.function.ExternalSqlTypeIdFunction;
import org.apache.phoenix.expression.function.SqlTypeNameFunction;
import org.apache.phoenix.jdbc.PhoenixDriver;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PColumnImpl;
import org.apache.phoenix.schema.PName;
import org.apache.phoenix.schema.PNameFactory;
import org.apache.phoenix.schema.SaltingUtil;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.util.SchemaUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Created by youhl002@lianlianpay.com
 * date: 2021/12/1
 * comment:
 */
public class PhoenixRestoreMerge implements Closeable {

  private static Logger logger = LoggerFactory.getLogger(PhoenixRestoreMerge.class);
  private final Options options = new Options();

  private String quorum;
  private String url;
  private java.sql.Connection phoenixConn;
  private org.apache.hadoop.hbase.client.Connection hbaseConn;
  private FileSystem fs;

  private List<String> scanTables = new ArrayList<>();
  private List<String> actions = new ArrayList<>();
  private static final String ACTION_SCAN = "scan";
  private static final String ACTION_RESTORE = "restore";
  private static final List<String> ACTIONS = Arrays.asList(ACTION_SCAN, ACTION_RESTORE);
  private Map<String, PhoenixTable> enableTables = new LinkedHashMap<>();
  private int needRestore = 0;
  private long storageSize = 1024L * 1024 * 1024 * 1;
  private boolean checkSplitLength = false;
  private StringBuilder records = new StringBuilder();
  private boolean checkHBaseScan = false;

  private static class PhoenixTable {

    final String fullName;
    final String schema;
    final String name;
    final TableName hbaseTableName;
    String targetHex;
    String currentHex;
    String tag;
    Byte startRowKey;
    Byte endRowKey;
    int rowKeyDataCount;
    Integer saltBuckets;
    LinkedHashSet<PColumn> pkColumns = new LinkedHashSet<>();
    ContentSummary contentSummary;

    public PhoenixTable(String fullName) {
      this.fullName = fullName;
      this.hbaseTableName = TableName.valueOf(fullName);
      String[] ss = fullName.split("\\.");
      if (ss.length == 1) {
        schema = "";
        name = fullName;
      } else {
        schema = ss[0];
        name = IntStream.range(1, ss.length).mapToObj(i -> ss[i]).collect(Collectors.joining("."));
      }
    }

    // byte saltByte = SaltingUtil.getSaltingByte(indexRowKey, SaltingUtil.NUM_SALTING_BYTES, length-SaltingUtil.NUM_SALTING_BYTES, nIndexSaltBuckets);
    private byte[][] generateSaltSplitKeys() throws SQLException, IllegalArgumentIOException {
      // -- 50 个分区 \x01\x00\x00\x00\x00,\x02\x00\x00\x00\x00, ... \x49\x00\x00\x00\x00
      //  byte[][] splitPoints = SaltingUtil.getSalteByteSplitPoints(nSaltBuckets);
      byte[][] splits = SaltingUtil.getSalteByteSplitPoints(saltBuckets);
      if (pkColumns.isEmpty()) {
        throw new IllegalArgumentIOException(fullName + "找不到对应的主键");
      }
      logger.info(String.format("【%s】主键 => %s", fullName, pkColumns.stream().map(s -> s.getName())
          .collect(Collectors.toList())));
      splits = SchemaUtil.processSplits(splits, pkColumns, saltBuckets, false);
      return splits;
    }


    public void updateContentSummary(FileSystem fs) throws IOException {
      Path tableDir = new Path("/apps/hbase/data/data/default/" + fullName);
      this.contentSummary = fs.getContentSummary(tableDir);
    }

    @Override
    public String toString() {
      StringBuilder sb = new StringBuilder();
      sb.append("表名: ").append(fullName);
      sb.append(" ,主键: ").append(pkColumns.stream().map(s -> s.getName())
          .collect(Collectors.toList()));
      sb.append(" ,盐分区: ").append(saltBuckets);
      if (startRowKey != null) {
        int rowKeyRange = getDataRange();
        sb.append(" ,数据分区: ").append(rowKeyRange);
        sb.append(" ,数据分布: ").append(bytes2Hex(new byte[]{startRowKey})).append("~")
            .append(bytes2Hex(new byte[]{endRowKey}));
        sb.append(" ,数据覆盖率: ").append(String.format("%d/%d=%.1f%%",
            rowKeyDataCount, rowKeyRange, rowKeyDataCount * 100d / rowKeyRange
        ));
      } else {
        sb.append(" ,数据分区: NULL");
      }
      sb.append(" ,存储: ").append(humanSize(contentSummary.getLength()));
      if (currentHex != null) {
        sb.append("\n当前分裂点: ").append(currentHex);
      }
      if (targetHex != null) {
        sb.append("\n目标分裂点: ").append(targetHex);
      }
      return sb.toString();
    }


    public int getDataRange() {
      return ((int) endRowKey - (int) startRowKey) + 1;
    }
  }

  public PhoenixRestoreMerge() {
    options.addOption(new Option("h", "help", false, "帮助"));
    options.addOption(new Option("zq", "zookeeper-quorum", true, "phoenix,hbase连接地址"));
    options.addOption(new Option("pt", "phoenix-table", true, "phoenix表名"));
    options.addOption(new Option("ss", "storage-size", true, "表对应的hdfs文件大小" + storageSize));
    options.addOption(new Option("csl", "check-split-length", false, "检测分隔点的长度"));
    options.addOption(new Option("chs", "check-hbase-scan", false, "检测HBase数据分布"));
    options.addOption(new Option("a", "action", true,
        "动作(" + ACTIONS.stream().collect(Collectors.joining(",")) + ")"));
  }


  private static String humanSize(long size) {
    if (size < 1024) {
      return size + " B";
    } else if (size < 1024l * 1024) {
      return String.format("%.2f KB", size / 1024d);
    } else if (size < 1024l * 1024 * 1024) {
      return String.format("%.2f MB", size / (1024d * 1024));
    } else if (size < 1024l * 1024 * 1024 * 1024) {
      return String.format("%.2f GB", size / (1024d * 1024 * 1024));
    }
    return String.format("%.2f TB", size / (1024d * 1024 * 1024 * 1024));
  }

  public void run(String[] args)
      throws SQLException, ClassNotFoundException, IOException, ParseException {
    CommandLineParser parser = new PosixParser();
    CommandLine cli = parser.parse(options, args);
    if (cli.hasOption("help")) {
      showHelp();
      return;
    }
    this.quorum = cli.getOptionValue("zq");
    if (StringUtils.isBlank(quorum)) {
      throw new IllegalArgumentIOException("连接地址为空");
    }
    this.actions = Arrays.asList(ACTION_SCAN);
    if (cli.hasOption("a")) {
      this.actions = parseOptionValues(cli.getOptionValues("a"));
    }
    this.checkSplitLength = cli.hasOption("csl");
    this.checkHBaseScan = cli.hasOption("chs");
    logger.info("action => " + actions);
    if (cli.hasOption("pt")) {
      this.scanTables = parseOptionValues(cli.getOptionValues("pt")).stream()
          .map(String::toUpperCase)
          .collect(Collectors.toList());
    }
    this.url = String.format("jdbc:phoenix:%s", this.quorum);
    init();
    if (actions.contains(ACTION_SCAN)) {
      scanTables();
    }
    if (this.scanTables.isEmpty()) {
      throw new IllegalArgumentIOException("表名为空");
    }
    if (cli.hasOption("ss")) {
      storageSize = Long.valueOf(cli.getOptionValue("ss"));
      if (storageSize < 1024) {
        throw new IllegalArgumentIOException("数据阈值过小 => " + storageSize);
      }
    }
    logger.info("本次扫描" + scanTables.size() + "张表 => " + scanTables);
    scanEnableTables();
    if (this.enableTables.isEmpty()) {
      throw new IllegalArgumentIOException("无有效表名");
    }
    // 设置 saltBucket 和 pkColumns
    scanSaltBucket();
    scanPKColumns();
    int index = 0;
    for (PhoenixTable pt : enableTables.values()) {
      index++;
      pt.tag = String.format("【%s:%3.2f%%】", pt.fullName, index * 100d / enableTables.size());
      run(pt);
    }
    logger.info(
        String.format("restore/enable/tables: %d/%d/%d, 恢复进度: %.2f%%"
            , needRestore, enableTables.size(), scanTables.size(),
            (enableTables.size() - needRestore) * 100d / enableTables.size()));
    System.out.println(records);
  }


  private static PColumn createPColumn(ResultSet rs) throws SQLException {
    PName columnName = PNameFactory.newName(rs.getString(COLUMN_NAME));
    PName familyName = PNameFactory.newName(rs.getString(COLUMN_FAMILY));
    String dataTypeStr = rs.getString(DATA_TYPE);
    PDataType dataType = PDataType.fromTypeId(Integer.valueOf(dataTypeStr));
    Integer maxLength = rs.getInt(COLUMN_SIZE);
    Integer scale = null;
    boolean nullable = true;
    int position = rs.getInt(ORDINAL_POSITION);
    SortOrder sortOrder = SortOrder.fromSystemValue(rs.getInt(SORT_ORDER));
    Integer arrSize = null;
    byte[] viewConstant = null;
    boolean isViewReferenced = true;
    String expressionStr = "";
    boolean isRowTimestamp = true;
    boolean isDynamic = true;
    return new PColumnImpl(
        columnName
        , familyName
        , dataType, maxLength, scale, nullable, position, sortOrder, arrSize, viewConstant,
        isViewReferenced
        , expressionStr
        , isRowTimestamp, isDynamic
    );
  }

  private void scanTables() throws SQLException {
    ResultSet rs = phoenixConn.getMetaData().getTables(null, null, null, null);
    List<String> res = new ArrayList<>();
    while (rs.next()) {
      String schema = rs.getString(TABLE_SCHEM);
      String name = rs.getString(TABLE_NAME);
      String fullName = buildFullName(schema, name);
      if ("SYSTEM".equalsIgnoreCase(schema)) {
        System.out.println("跳过系统表 => " + fullName);
        continue;
      }
      if (scanTables.isEmpty()) {
        res.add(fullName);
      } else {
        for (String table : scanTables) {
          if (table.equalsIgnoreCase(fullName)) {
            res.add(fullName);
            break;
          } else if (fullName.matches(table)) {
            res.add(fullName);
          }
        }
      }
    }
    this.scanTables = res;
  }

  private void scanEnableTables() throws IOException {
    enableTables.clear();
    for (String fullTableName : scanTables) {
      TableName tableName = TableName.valueOf(fullTableName);
      if (!hbaseConn.getAdmin().isTableEnabled(tableName)) {
        logger.info("跳过修复,表" + fullTableName + "已被禁用");
        return;
      }
      enableTables.put(fullTableName, new PhoenixTable(fullTableName));
    }
  }

  private void scanSaltBucket() throws SQLException {
    StringBuilder sb = new StringBuilder();
    sb.append("select TABLE_SCHEM,TABLE_NAME,SALT_BUCKETS");
    sb.append(" from system.catalog");
    sb.append(" where SALT_BUCKETS is not null");
    // 优化查询
    if (enableTables.size() < 10) {
      sb.append(" and (");
      sb.append(enableTables.values().stream()
          .map(v -> String.format("(TABLE_SCHEM = '%s' and TABLE_NAME='%s')", v.schema, v.name))
          .collect(Collectors.joining(" or ")));
      sb.append(")");
    }
    sb.append(" group by TABLE_SCHEM,TABLE_NAME,SALT_BUCKETS");
    String query = sb.toString();
    logger.info("查询盐分区数 => " + query);
    Statement statement = phoenixConn.createStatement();
    ResultSet rs = statement.executeQuery(query);
    while (rs.next()) {
      String schema = rs.getString(TABLE_SCHEM);
      String name = rs.getString(TABLE_NAME);
      PhoenixTable pt = enableTables.get(buildFullName(schema, name));
      if (pt != null) {
        pt.saltBuckets = rs.getInt(SALT_BUCKETS);
      }
    }
    statement.close();
  }

  /**
   * row_begin = 0
   * row_end = 255
   * (row_begin..row_end).map {|i| scan 'DBAML.VDL_SUS_BILL_BASE_INNERPAY',{STARTROW=> i.chr,STOPROW => (i+1).chr, COLUMNS => '0:_0' , LIMIT=> 1} }
   */
  private void scanDataBucket(PhoenixTable pt) throws IOException {
    Table table = hbaseConn.getTable(pt.hbaseTableName);
    for (int i = 0; i <= 255; i++) {
      Scan scan = new Scan();
      byte currByte = (byte) i;
      scan.setStartRow(new byte[]{currByte});
      scan.setMaxResultSize(10);
      scan.setBatch(10);
      scan.setCaching(10);
      ResultScanner scanner = table.getScanner(scan);
      Result result = scanner.next();
      if (result != null && !result.isEmpty()) {
        if (currByte == result.getRow()[0]) {
          if (pt.startRowKey == null) {
            pt.startRowKey = currByte;
          }
          pt.endRowKey = currByte;
          pt.rowKeyDataCount++;
        }
      }
      scanner.close();
    }
    table.close();
  }

  private void scanPKColumns() throws SQLException {
    StringBuilder sb = new StringBuilder("select");
    sb.append(" TABLE_SCHEM");
    sb.append(",TABLE_NAME");
    sb.append(",COLUMN_NAME");
    sb.append(",COLUMN_FAMILY");
    sb.append("," + ExternalSqlTypeIdFunction.NAME + "(" + DATA_TYPE + ") AS " + DATA_TYPE);
    sb.append("," + SqlTypeNameFunction.NAME + "(" + DATA_TYPE + ") AS " + TYPE_NAME);
    sb.append(",COLUMN_SIZE");
    sb.append(",ORDINAL_POSITION");
    sb.append(",SORT_ORDER");
    sb.append(" from SYSTEM.CATALOG");
    sb.append(" where COLUMN_FAMILY is null and TABLE_TYPE is null");
    // 优化查询
    if (enableTables.size() < 10) {
      sb.append(" and (");
      sb.append(enableTables.values().stream()
          .map(v -> String.format("(TABLE_SCHEM = '%s' and TABLE_NAME='%s')", v.schema, v.name))
          .collect(Collectors.joining(" or ")));
      sb.append(")");
    }
    sb.append(" order by ORDINAL_POSITION");
    String query = sb.toString();
    logger.info("查询主键 => " + query);
    Statement statement = phoenixConn.createStatement();
    ResultSet rs = statement.executeQuery(query);
    while (rs.next()) {
      String schema = rs.getString(TABLE_SCHEM);
      String name = rs.getString(TABLE_NAME);
      PhoenixTable pt = enableTables.get(buildFullName(schema, name));
      if (pt != null) {
        pt.pkColumns.add(createPColumn(rs));
      }
    }
    statement.close();
  }

  private static List<String> parseOptionValues(String[] strs) {
    return Arrays.stream(strs).flatMap(s -> Arrays.stream(s.split(",")))
        .map(s -> s.trim())
        .filter(s -> s.length() > 0).collect(Collectors.toList());
  }

  private void showHelp() {
    HelpFormatter helpFormatter = new HelpFormatter();
    helpFormatter.setLeftPadding(5);
    helpFormatter.setWidth(80);
    helpFormatter.printHelp("java -cp `hbase classpath` " + getClass().getName()
        , options, true);
  }

  private void run(PhoenixTable pt) throws IOException, SQLException {
    System.out.println(StringUtils.center("检测" + pt.tag, 100, "-"));
    if ((pt.saltBuckets == null || pt.saltBuckets <= 0)) {
      logger.info(pt.tag + "跳过修复(没有设置盐分区)");
      return;
    }
    pt.updateContentSummary(fs);
    logger.info(pt.tag + "表信息 => " + pt);
    RegionLocator rl = hbaseConn.getRegionLocator(pt.hbaseTableName);
    byte[][] saltSplitKeys = pt.generateSaltSplitKeys();
    pt.targetHex = toHexString(Arrays.asList(saltSplitKeys));
    pt.currentHex = toHexString(
        Arrays.stream(rl.getStartKeys()).filter(s -> s.length > 0).collect(Collectors.toList()));
    long tableStorageSize = pt.contentSummary.getLength();
    logger.info("当前分裂点 => " + pt.targetHex);
    logger.info("目标分裂点 => " + pt.currentHex);
    logger.info("分区数: " + rl.getAllRegionLocations().size() + " => " + pt.saltBuckets);
    logger.info(String.format("当前表大小 => %s", humanSize(tableStorageSize)));
    if (checkSplitLength) {
      if (saltSplitKeys.length > 0 && rl.getStartKeys().length > 1) {
        double saltAvgSize = avgSize(saltSplitKeys, 0);
        double currAvgSize = avgSize(rl.getStartKeys(), 1);
        if (saltAvgSize != currAvgSize) {
          logger.info(
              pt.tag + "跳过修复(分裂点的长度检测失败) current:" + currAvgSize + "=> salt:" + saltAvgSize);
          records.append(pt.tag + "跳过修复(分裂点的长度检测失败) => " + pt).append("\n");
          return;
        }
      }
    }
    if (checkHBaseScan) {
      logger.info(pt.tag + "检测HBase底层数据分布");
      scanDataBucket(pt);
      if (pt.rowKeyDataCount == 0) {
        logger.info(pt.tag + "跳过修复(表数据为空)");
        records.append(pt.tag + "跳过修复(表数据为空) => " + pt).append("\n");
        return;
      }
      if (pt.startRowKey != 0x00) {
        logger.info(pt.tag + "跳过修复(数据范围异常)");
        records.append(pt.tag + "跳过修复(数据范围异常) => " + pt).append("\n");
        return;
      }
      if (pt.getDataRange() != pt.saltBuckets) {
        logger.info(pt.tag + "跳过修复(数据和盐分区不一致)");
        records.append(pt.tag + "跳过修复(数据和盐分区不一致) => " + pt).append("\n");
        return;
      }
      logger.info(pt.tag + "表信息 => " + pt);
    }
    if (pt.targetHex.equals(pt.currentHex)) {
      logger.info(pt.tag + "跳过修复(表的分裂点已正常)");
      return;
    }
    if (tableStorageSize > storageSize) {
      logger.info(pt.tag + "跳过修复(表的hdfs存储数据过大)");
      records.append(pt.tag + "跳过修复(表的hdfs存储数据过大) => " + pt).append("\n");
      return;
    }
    needRestore++;
    System.out.println(StringUtils.center("修复" + pt.tag, 100, "="));
    if (actions.contains(ACTION_RESTORE)) {
      logger.info(String.format("尝试修复 => %s, 表大小 => %.2f MB", pt.fullName,
          tableStorageSize / 1024d / 1024d));
      restore(pt, rl, saltSplitKeys);
    } else {
      records.append(pt.tag + "待修复 => " + pt).append("\n");
    }
  }

  private double avgSize(byte[][] bytes, int skip) {
    long sum = 0;
    long count = 0;
    for (int i = 0; i < bytes.length; i++) {
      if (i >= skip) {
        sum += bytes[i].length;
        count++;
      }
    }
    return Double.parseDouble(String.format("%.2f", sum * 1d / count));
  }


  private void restore(PhoenixTable pt, RegionLocator rl, byte[][] saltSplitKeys)
      throws IOException {
    int splitCount = 0;
    int mergeCount = 0;
    int regionCount = rl.getAllRegionLocations().size();
    // 尝试分裂
    for (HRegionLocation hrl : rl.getAllRegionLocations()) {
      String regionName = hrl.getRegionInfo().getEncodedName();
      byte[] startKey = hrl.getRegionInfo().getStartKey();
      byte[] endKey = hrl.getRegionInfo().getEndKey();
      byte[] midKey = findMidKey(saltSplitKeys, startKey, endKey);
      if (midKey != null) {
        System.out.println(String.format("split '%s',%s", regionName, toRuby(midKey)));
        byte[] regionNameBytes = hrl.getRegionInfo().getEncodedNameAsBytes();
        hbaseConn.getAdmin().splitRegion(regionNameBytes, midKey);
        splitCount++;
      }
    }
    // 尝试合并
    for (int i = 0; i <= saltSplitKeys.length; i++) {
      byte[] startKey = i == 0 ? new byte[0] : saltSplitKeys[i - 1];
      byte[] endKey = i == saltSplitKeys.length ? new byte[0] : saltSplitKeys[i];
      List<HRegionLocation> regions = findRegions(rl, startKey, endKey);
      for (int j = 0; j < regions.size() / 2; j++) {
        HRegionLocation left = regions.get(j * 2);
        HRegionLocation right = regions.get(j * 2 + 1);
        System.out.println(
            String.format("merge_region '%s','%s'", left.getRegionInfo().getEncodedName()
                , right.getRegionInfo().getEncodedName()));
        hbaseConn.getAdmin().mergeRegions(
            left.getRegionInfo().getEncodedNameAsBytes(),
            right.getRegionInfo().getEncodedNameAsBytes(),
            false
        );
        mergeCount++;
      }
    }
    int finalRegionCount = (regionCount + splitCount - mergeCount);
    if (splitCount > 0 || mergeCount > 0) {
      System.out.println(String.format("major_compact '%s'", pt.fullName));
      hbaseConn.getAdmin().compact(pt.hbaseTableName);
    }
    String tag = "[" + (saltSplitKeys.length == finalRegionCount ? "success" : "continue") + "]";
    logger.info(tag + pt.fullName + " salt:" + saltSplitKeys.length
        + ",regions:" + regionCount + " => " + finalRegionCount + ",分裂了"
        + splitCount + "次, 合并了" + mergeCount + "次");
  }

  private String toRuby(byte[] midKey) {
    StringBuilder sb = new StringBuilder();
    sb.append("\"");
    // "\x0b\x00\x00\x00"
    for (int i = 0; i < midKey.length; i++) {
      sb.append("\\x").append(bytes2Hex(new byte[]{midKey[i]}));
    }
    sb.append("\"");
    return sb.toString();
  }

  private List<HRegionLocation> findRegions(RegionLocator rl, byte[] startKey, byte[] endKey)
      throws IOException {
    List<HRegionLocation> res = new ArrayList<>();
    for (HRegionLocation hrl : rl.getAllRegionLocations()) {
      byte[] regionStartKey = hrl.getRegionInfo().getStartKey();
      byte[] regionEndKey = hrl.getRegionInfo().getEndKey();
      if (greaterEquals(regionStartKey, startKey) && lessEquals(regionEndKey, endKey)) {
        res.add(hrl);
      }
    }
    return res;
  }

  private boolean greaterEquals(byte[] left, byte[] right) {
    if (left.length == 0 && right.length == 0) {
      return true;
    } else if (left.length == 0) {
      return false;
    } else if (right.length == 0) {
      return true;
    }
    return Bytes.compareTo(left, right) >= 0;
  }

  private boolean lessEquals(byte[] left, byte[] right) {
    if (left.length == 0 && right.length == 0) {
      return true;
    } else if (left.length == 0) {
      return false;
    } else if (right.length == 0) {
      return true;
    }
    return Bytes.compareTo(left, right) <= 0;
  }


  private byte[] findMidKey(byte[][] splitKeys, byte[] startKey, byte[] endKey) {
    List<byte[]> maybe = new ArrayList<>();
    for (int i = 0; i < splitKeys.length; i++) {
      boolean startLessThanThis =
          startKey.length == 0 || Bytes.compareTo(startKey, splitKeys[i]) < 0;
      boolean endGreaterThanThis =
          endKey.length == 0 || Bytes.compareTo(endKey, splitKeys[i]) > 0;
      if (startLessThanThis && endGreaterThanThis) {
        maybe.add(splitKeys[i]);
      }
    }
    if (maybe.size() > 0) {
      return maybe.get(maybe.size() / 2);
    }
    return null;
  }

  private static String bytes2Hex(byte[] bytes) {
    return Hex.encodeHexString(bytes);
  }


  private static final LinkedHashSet<PColumn> EMPTY_SET = new LinkedHashSet<>();

  private String toHexString(List<byte[]> splitKeys) {
    return splitKeys.stream().map(PhoenixRestoreMerge::bytes2Hex).collect(Collectors.joining(","));
  }


  private static String buildFullName(String schema, String name) {
    return StringUtils.isBlank(schema) ? name : (schema + "." + name);
  }

  private void showResultSet(ResultSet rs) throws SQLException {
    StringBuilder sb = new StringBuilder();
    for (int i = 0; i < rs.getMetaData().getColumnCount(); i++) {
      sb.append(i == 0 ? "" : ",").append(rs.getMetaData().getColumnName(i + 1)).append("=")
          .append(rs.getObject(i + 1));
    }
    System.out.println(sb);
  }


  private void init() throws SQLException, ClassNotFoundException, IOException {
    Class.forName(PhoenixDriver.class.getName());
    logger.info("url => " + url);
    this.phoenixConn = DriverManager.getConnection(url);
    Configuration configuration = HBaseConfiguration.create();
    String zookeeperQuorum = Arrays.stream(quorum.split(",")).map(s -> s.split(":")[0])
        .collect(Collectors.joining(","));

    configuration.set("hbase.zookeeper.quorum", zookeeperQuorum);
    String clientPort = "2181";
    if (quorum.contains(":")) {
      clientPort = Arrays.stream(quorum.split(",")).map(s -> s.split(":"))
          .filter(ss -> ss.length >= 2).map(ss -> ss[1].trim()).filter(s -> s.length() > 0)
          .distinct()
          .collect(Collectors.joining(","));
      configuration.set("hbase.zookeeper.property.clientPort", clientPort);
    }
    logger.info("zookeeperQuorum => " + zookeeperQuorum + ", clientPort => " + clientPort);
    User user = User.create(UserGroupInformation.createRemoteUser("hbase"));
    this.hbaseConn = ConnectionFactory.createConnection(configuration, user);

    Configuration conf = new Configuration();
    addResource(conf, "/usr/hdp/current/hadoop-client/etc/hadoop");
    this.fs = FileSystem.get(conf);
  }

  private void addResource(Configuration conf, String confDir) {
    File dir = new File(confDir);
    if (dir.isDirectory()) {
      for (File f : dir.listFiles()) {
        if (f.isFile() && f.getName().endsWith(".xml")) {
          conf.addResource(f.getAbsolutePath());
          logger.info("load " + f.getAbsolutePath());
        }
      }
    }
  }

  @Override
  public void close() throws IOException {
    if (phoenixConn != null) {
      try {
        phoenixConn.close();
      } catch (SQLException e) {
        e.printStackTrace();
      }
    }
    if (hbaseConn != null) {
      hbaseConn.close();
    }
  }

  public static void main(String[] args) {
    try (PhoenixRestoreMerge psm = new PhoenixRestoreMerge()) {
      psm.run(args);
    } catch (Exception e) {
      logger.error("执行异常", e);
    }
  }
}

恢复分区

# 编译代码
javac -cp `hbase classpath` PhoenixRestoreMerge.java
# 执行恢复命令
java -cp `hbase classpath` PhoenixRestoreMerge -zq localhost -a scan -pt xx -a restore