hbase会不会自动清空老版本

2025-05-22 13:15:09
推荐回答(1个)
回答1:

1.HTablePool的基本使用方式:

由于HTable对象不是线程安全的,因此HBase提供HTablePool来支持多线程写入hbase,多线程同时从HTablePool中取出HTable并写入是安全的。HTablePool的使用方法类似数据库连接,使用时从HTablePool中取出一个HTable,使用完后再close放回HTablePool中。
Put put = new Put(rowkey);
put.add(LOG_COLUMN_FAMILY,HOST_NAME_QUALIFIER,values[0]);
HTableInterface table = HBaseClientFactory.getHTableByName(RAW_LOG_TABLE);
try {
table.put(put);
} catch (IOException e) {
throw new RuntimeException("Put Log meet exception",e);
}finally {
HBaseClientUtil.closeHTable(table);
}

2.HTablePool的maxsize。

HTablePool有一个maxsize,HTablePool针对每个表都有一个Pool,maxsize表示这个Pool的最大大小,在使用HTablePool的过程中我们发现这个值还是有需要注意的地方。

在多线程使用HTablePool拿到同一个表的HTable时,如果线程个数大于maxsize会导致写入始终是autoflush!
public HTableInterface getTable(String tableName) {
// call the old getTable implementation renamed to findOrCreateTable
HTableInterface table = findOrCreateTable(tableName);
// return a proxy table so when user closes the proxy, the actual table
// will be returned to the pool
return new PooledHTable(table);
}

当拿到HTable时会创建一个HTable对象并包装成一个PooledHTable对象。Pooled做了什么纳,其他方法都没变,只是在close时有所不同:
public void close() throws IOException {
returnTable(table);
}
private void returnTable(HTableInterface table) throws IOException {
// this is the old putTable method renamed and made private
String tableName = Bytes.toString(table.getTableName());
if (tables.size(tableName) >= maxSize) {
// release table instance since we're not reusing it
this.tables.remove(tableName, table);
this.tableFactory.releaseHTableInterface(table);
return;
}
tables.put(tableName, table);
}

可以看到如果tables.size大于maxsize,此时会去掉一个保存的HTable对象,而releaseHTableInterface实际调用的就是HTable的close方法,close方法又会强制flushHTable的buffer,因此,如果我们想不使用autoflush提升写入速度失效。

3.HTablePool type。

HTablePool提供了几种方式:ReusablePool,RoundRobinPool,ThreadLocalPool。默认的是reusable,由于2的原因,我们也可以考虑使用ThreadLocal的Pool,这样多线程写入时分别取自己线程的Pool,这样互不影响,写入的效率也会比较高。
static class ThreadLocalPool extends ThreadLocal implements Pool {
private static final Map, AtomicInteger> poolSizes = new HashMap, AtomicInteger>();

public ThreadLocalPool() {
}

@Override
public R put(R resource) {
R previousResource = get();
if (previousResource == null) {
AtomicInteger poolSize = poolSizes.get(this);
if (poolSize == null) {
poolSizes.put(this, poolSize = new AtomicInteger(0));
}
poolSize.incrementAndGet();
}
this.set(resource);
return previousResource;
}

4.HTable的WriteBufferSize和autoflush

如果想追求写入的速度我们可以设置setWriteBufferSize为一个比较大的大小比如1M并autoflush为false,这样写入的速度会有几十倍的提升,但如果BufferSize比较大也会带来写入不够实时的问题,尤其有些表的数据很小会很久都不flush。因此,我们可以添加按时间间隔的flush方式。
@Override
public void put(final List puts) throws IOException {
super.put(puts);
needFlush();
}

private void needFlush() throws IOException {
long currentTime = System.currentTimeMillis();
if ((currentTime - lastFlushTime.longValue()) > flushInterval) {
super.flushCommits();
lastFlushTime.set(currentTime);
}
}